Source code for pyClimat.analysis

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu Jul 29 18:49:19 2021

@author: dboateng
Contains analysis routine required for calculation, extracting variables and domain, masking out areas 
and certian statistics. 

"""

# importing packages 
import xarray as xr
import os
import pandas as pd
import numpy as np
from scipy import stats
from eofs.xarray import Eof

#importing other routines

try:
    from .data import *
    from .utils import vert_coord_convertion
except:
    from data import *
    from utils import vert_coord_convertion
    



[docs]def extract_var(Dataset, varname, units=None, Dataset_wiso=None, other_data=None, lev_units=None, lev=None): """ This function extracts some defined variables from a netCDF file. Moreover, if the variable require calculation or unit conversion, the user speficication can be pass to such task. For example, echam out put the differrent component of precipitation (convective, large scale, etc). Therefore extraction of total precipitation would require the calculation of the total precipitation. Example of the defined variables: "temp2" -- near surface temperature "prec" -- total precipitation "d18op" -- O^18 isotopic composition in precipitation "d18ov" -- O^18 isotopic composition in vapour "relhum" -- relative humidity "elev" -- topography or elevation "slm" -- mean sea level pressure "evap" -- evaporation "u, v, omega" -- zonal, meridoinal, and vertical velocity and others example -------- data = xr.open_dataset(path_to_data) temp = extract_var(data, varname= "temp2", unit= "°C",) ---> extract the t2m variable and convert it from K to °C Parameters ---------- Dataset : TYPE: Dataset DESCRIPTION. Processed output containing all variables or certain ones varname : TYPE: STR DESCRIPTION: Name of variable (eg. temp2:Tempeature, prec:Total precipitation, d18op: d18o in precipitation) units : TYPE, optional DESCRIPTION. The default is None. Units of the variable for eg. °C for temperature or mm/month for precipitation Dataset_wiso : TYPE, optional DESCRIPTION. The default is None. Wiso Dataset is required for calcullating d18op and d18ov other_data: TYPE: datarray, optional DESCRIPTION: Additional data required for calculating units eg. Pa/s--> m/s require temperature data on pressure levels (in K) lev_units: TYPE: str, optional DESCRIPTION: Units of vertical levels eg. hPa lev: TYPE: int, optional DESCRIPTION: The vertical height required for the variable, eg. 500 hPa for winds Returns ------- var_data : TYPE: datarray DESCRIPTION: returns the variable for etracted or calculated from the Dataset (notmally monthly long-term climatologies) """ #Temperature if hasattr(Dataset, "mlev"): Dataset = Dataset.rename({"mlev":"lev"}) if Dataset_wiso is not None: Dataset_wiso = Dataset_wiso.rename({"mlev":"lev"}) if varname == "temp2": var_data = Dataset[varname] if units is not None: if units == "°C": var_data = var_data - 273.15 # convert temperature to dec C var_data.attrs["units"] = units else: print("Define unit well or the default is kelvin") #Total precipitation amount elif varname == "prec": var_data = Dataset["aprl"] + Dataset["aprc"] if units is not None: if units == "mm/month": var_data = var_data *60*60*24*30 #mm/month var_data.attrs["units"] = units elif units == "mm/a": var_data = var_data *60*60*24*365 #mm/a var_data.attrs["units"] = units else: print("Define unit well or the default is kg/m²s") # d18o in Precipitation elif varname == "d18op": var_data_echam = Dataset["aprl"] + Dataset["aprc"] if Dataset_wiso is not None: var_data_wiso = Dataset_wiso["wisoaprl"][:,1,:,:] + Dataset_wiso["wisoaprc"][:,1,:,:] SMOW = 0.2228 wiso_wtgs = var_data_wiso / var_data_echam var_data = ((wiso_wtgs/SMOW) -1) if units is not None: if units == "per mil": var_data = var_data *1000 # convert to permil var_data.attrs["units"] = units else: print("Define unit well or the default is isotopic ratio") # d18o in water vapour elif varname == "d18ov": if lev is not None: var_data_echam = Dataset["q"].sel(lev=lev) if Dataset_wiso is not None: var_data_wiso = Dataset_wiso["q18o"].sel(lev=lev) else: var_data_echam = Dataset["q"][:,30,:,:] if Dataset_wiso is not None: var_data_wiso = Dataset_wiso["q18o"][:,30,:,:] SMOW = 0.2228 wiso_wtgs = var_data_wiso / var_data_echam var_data = ((wiso_wtgs/SMOW) -1) if units is not None: if units == "per mil": var_data = var_data *1000 # convert to permil var_data.attrs["units"] = units else: print("Define unit well or the default is isotopic ratio") # Evaporation elif varname == "evap": var_data = Dataset["evap"] if units is not None: if units == "mm/month": var_data = var_data *60*60*24*30 *-1 #mm/month (positive values) var_data.attrs["units"] = units else: print("Define unit well or the default is kg/m²s") # relative humidity near surface elif varname == "relhum": if lev is not None: var_data = Dataset["relhum"].sel(lev=lev) else: var_data = Dataset["relhum"][:,30,:,:] if units is not None: if units == "%": var_data = var_data *100 #% var_data.attrs["units"] = units else: print("Define unit well or the default is ratio") # elevation from GEOSP elif varname == "elev": var_data = Dataset["geosp"] / 9.8 # convert to m**2/s**2 --> m if units is not None: if units == "km": var_data = var_data /1000 #km var_data.attrs["units"] = units else: var_data.attrs["units"] = units print("The default units of elevation is used [m]") # sea land mask elif varname == "slm": var_data = Dataset["slm"] # Surface Wind velocity zonal elif varname == "u10": var_data = Dataset["u10"] # Surface wind velocity meridoinal elif varname == "v10": var_data = Dataset["v10"] elif varname == "wind10": var_data = Dataset["wind10"] # geopotential height at pressure levels elif varname == "geopoth": var_data = Dataset["geopoth"] if lev_units is not None: if lev_units == "hPa": var_data["lev"] = var_data.lev / 100 #Pa --> hPa var_data["lev"].attrs["units"] = "hPa" else: raise ValueError("You have defined incorrect units or its not implemented") # mean sea level pressure or surface pressure elif varname == "slp": if hasattr(Dataset, "aps"): var_data = Dataset["aps"] elif hasattr(Dataset, "psl"): var_data = Dataset["psl"] else: var_data = Dataset["slp"] if units is not None: if units == "hPa": var_data /= 100 var_data.attrs["units"] = units else: raise ValueError("You have defined incorrect units or its not implemented") # Vertical verlocity elif varname =="omega": # Pa/s var_data = Dataset[varname] if lev_units is not None: var_data = vert_coord_convertion(data=var_data, units=lev_units) if units is not None: #Other data must be temperature on pressure levels with the same shape as omega if units == "m/s": rgas=287.058 # m²/s²K g=9.80665 pa = Dataset["lev"] # must be the same size and shape as omega and t rho = pa/(rgas*other_data) var_data /= (rho*g) #Pa/s -->m/s elif units == "Pa/s": var_data = var_data else: raise ValueError("You have defined incorrect units or its not implemented") if lev is not None: var_data = var_data.sel(lev=lev) # meridional wind at vertical levels elif varname == "v": var_data = Dataset[varname] if lev_units is not None: var_data = vert_coord_convertion(data=var_data, units=lev_units) if lev is not None: var_data = var_data.sel(lev=lev) # zonal wind at vertical levels elif varname == "u": var_data = Dataset[varname] if lev_units is not None: var_data = vert_coord_convertion(data=var_data, units=lev_units) if lev is not None: var_data = var_data.sel(lev=lev) elif varname == "e/p": var_prec = Dataset["aprl"] + Dataset["aprc"] var_evap = Dataset["evap"] var_data = var_evap/var_prec elif varname == "E-P": var_prec = Dataset["aprl"] + Dataset["aprc"] var_evap = Dataset["evap"] if units is not None: if units == "mm/month": var_prec = var_prec *60*60*24*30 #mm/month (positive values) var_evap = var_evap *60*60*24*30*-1 #mm/month (positive values) else: print("Define unit well or the default is kg/m²s") var_data = var_evap - var_prec var_data.attrs["units"] = units else: var_data = var_evap - var_prec # temperature at vertical levels elif varname == "st": var_data = Dataset[varname] if lev_units is not None: var_data = vert_coord_convertion(data=var_data, units=lev_units) if lev is not None: var_data = var_data.sel(lev=lev) # total cloud cover at vertical levels elif varname == "aclcac": var_data = Dataset[varname] if lev_units is not None: var_data = vert_coord_convertion(data=var_data, units=lev_units) if lev is not None: var_data = var_data.sel(lev=lev) # specific humidity at vertical levels elif varname == "q": var_data = Dataset[varname] if units is not None: if units =="g/kg": var_data = var_data * 1000 #--->g/kg if lev_units is not None: var_data = vert_coord_convertion(data=var_data, units=lev_units) if lev is not None: var_data = var_data.sel(lev=lev) # relativehumidity at vertical levels (pressure) elif varname == "rh": var_data = Dataset["relhum"] if units is not None: if units =="%": var_data = var_data * 100 #---% if lev_units is not None: var_data = vert_coord_convertion(data=var_data, units=lev_units) if lev is not None: var_data = var_data.sel(lev=lev) elif varname == "latent heat": var_data = Dataset["ahfl"] * -1 #positive values else: print("Check the Varname or it has not been implemeted") return var_data
[docs]def compute_lterm_mean(data, time="annual", month=None, season=None, season_calendar = None): """ Parameters ---------- data : TYPE: datarray DESCRIPTION. The var_data extracted from dataset time : TYPE: str, optional DESCRIPTION. The default is "annual". or season, month can be used for long-term estimates month : TYPE, optional DESCRIPTION. The default is None. season : TYPE, optional DESCRIPTION. The default is None. season_calendar : TYPE, optional DESCRIPTION. The default is None. Use standard if you want to consider the days of the month into consideration Returns ------- data_ltmean : TYPE: datarray DESCRIPTION. Long-term means """ data_ltmean = data.mean(dim="time") if time =="season": if season_calendar is not None: if season_calendar == "standard": # Make a DataArray with the number of days in each month, size = len(time) month_length = data.time.dt.days_in_month # Calculate the weights by grouping by 'time.season' weights = month_length.groupby('time.season') / month_length.groupby('time.season').sum() # Test that the sum of the weights for each season is 1.0 np.testing.assert_allclose(weights.groupby('time.season').sum().values, np.ones(4)) # Calculate the weighted average data_ltmean = (data * weights).groupby('time.season').sum(dim='time') else: print("Define the season calendar well eg. standard") else: print("Calculating the seasonal mean without considering the number of days in a month") data_ltmean = data.groupby("time.season").mean("time") if season is not None: print("Calculating the seasonal long-term mean for:", season) data_ltmean = data_ltmean.sel(season = season) elif time == "month": data_ltmean = data.groupby("time.month").mean("time") if month is not None: print("Calculating the monthly long-term mean for the month number:", month) data_ltmean = data_ltmean.sel(month=month) else: print("Define the time period for long-term mean or the annual mean is computed") return data_ltmean
[docs]def compute_lterm_diff(data_control, data_main, time="annual", month=None, season=None, season_calendar = None): """ Parameters ---------- data_control : TYPE: datarray DESCRIPTION. Reference or control data data_main : TYPE: dataarray DESCRIPTION. : Main module run data time : TYPE: STR, optional DESCRIPTION. The default is "annual". But can be changed to season or month month : TYPE:INT, optional DESCRIPTION. The default is None. Define the specific month number to be computed or all will be eatimated season : TYPE: STR, optional DESCRIPTION. The default is None. Define the specific season (eg. DJF, JJA, MAM, SON) number to be computed or all will be eatimated season_calendar : TYPE:STR, optional DESCRIPTION. The default is None. Or Standard to consider the number of days in a month Returns ------- data_ltmean_diff : TYPE: datarray DESCRIPTION. Long-tem difference """ # Use compute_lterm_diff to calculate the long-term mean before the the difference data_ltmean_control = compute_lterm_mean(data=data_control, time=time, month=month, season=season, season_calendar = season_calendar) data_ltmean_main = compute_lterm_mean(data=data_main, time=time, month=month, season=season, season_calendar = season_calendar) data_ltmean_diff = data_ltmean_main - data_ltmean_control return data_ltmean_diff
[docs]def extract_transect(data, maxlon, minlon, maxlat, minlat, sea_land_mask=False, minelev=None, maxelev=None, Dataset=None): """ This function extract grid points base on coordinate extents or land sea masks or max, min elevations: it can be used to estimate the statistics of a selected domain like the Alps or Andes! Parameters ---------- data : TYPE: dataarray DESCRIPTION: Data to extract transect from base on coordinates, elevation or land sea masks maxlon : TYPE: float DESCRIPTION.: Maximum longitude minlon : TYPE: float DESCRIPTION: Minimum longitude maxlat : TYPE: float DESCRIPTION:Maximum latitude minlat : TYPE: float DESCRIPTION: Minimum latitude sea_land_mask : TYPE: str, optional DESCRIPTION. The default is None. Yes, means that the land mask will be selected and No means the sea nask points will be selected minelev : TYPE, optional DESCRIPTION. The default is None. To select data points base on the minimum elevation value maxelev : TYPE. float, optional DESCRIPTION. The default is None. To select data points base on the maximum elevation value Dataset : TYPE: float, optional DESCRIPTION. The default is None. Dataset containing geosp and slm for masking out elevation condition and continental values Returns ------- data_extract : TYPE DESCRIPTION. """ #convert lon to -180 to 180 if hasattr(data, "longitude"): data = data.rename({"longitude": "lon", "latitude":"lat"}) if Dataset is not None: if hasattr(Dataset, "longitude"): Dataset = Dataset.rename({"longitude": "lon", "latitude":"lat"}) # add mask arrary (sea mask and elevation is the same for the time, therefore, extrating one index is fine) if sea_land_mask == True: slm = Dataset["slm"] data.coords["slm"] = (("lat","lon"), slm[0].data) if minelev or maxelev is not None: geosp = Dataset["geosp"] / 9.8 data.coords["geosp"] = (("lat","lon"), geosp[0].data) #same for all time point data = data.assign_coords({"lon": (((data.lon + 180) % 360) - 180)}) lat_range = (data.lat >= minlat) & (data.lat <= maxlat) lon_range = (data.lon >= minlon) & (data.lon <= maxlon) data_extract = data.where((lat_range & lon_range), drop=True) # Caution! Since the mask can only be removed when both the lat and lon cordinates are satisfied, some points might # still be there UNLESS the mean is calculated with dim("lat", "lon")!!! # Alternative solution implemented! # I have replace all the false conditions with np.nan values and must be removed when converted to DataFrame if sea_land_mask == True: if hasattr(data_extract, "slm"): data_extract = xr.where(data_extract["slm"]==1, data_extract, data_extract*np.nan) if minelev and maxelev is not None: if hasattr(data_extract, "geosp"): data_extract = xr.where((data_extract.geosp >= minelev) & (data_extract.geosp <= maxelev), data_extract, data_extract*np.nan) elif minelev is not None: if hasattr(data_extract, "geosp"): data_extract = xr.where(data_extract.geosp >= minelev, data_extract, data_extract*np.nan) elif maxelev is not None: if hasattr(data_extract, "geosp"): data_extract = xr.where(data_extract.geosp <= maxelev, data_extract, data_extract*np.nan) return data_extract
[docs]def extract_profile(data, maxlon, minlon, maxlat, minlat, dim, to_pandas=True, sea_land_mask=False, minelev=None, maxelev=None, Dataset=None): """ Parameters ---------- data : TYPE: dataarray DESCRIPTION: Data to extract transect from base on coordinates, elevation or land sea masks maxlon : TYPE: float DESCRIPTION.: Maximum longitude minlon : TYPE: float DESCRIPTION: Minimum longitude maxlat : TYPE: float DESCRIPTION:Maximum latitude minlat : TYPE: float DESCRIPTION: Minimum latitude sea_land_mask : TYPE: str, optional DESCRIPTION. The default is None. Yes, means that the land mask will be selected and No means the sea nask points will be selected minelev : TYPE, optional DESCRIPTION. The default is None. To select data points base on the minimum elevation value maxelev : TYPE. float, optional DESCRIPTION. The default is None. To select data points base on the maximum elevation value Dataset : TYPE: float, optional DESCRIPTION. The default is None. Dataset containing geosp and slm for masking out elevation condition and continental values dim : TYPE: str DESCRIPTION. lat ot lon depending on the axis of the profile to_pandas : TYPE:str, optional (recommended for plotting) DESCRIPTION. The default is None. yes if you want the data to be stored in DataFrame or Pandas Series Returns ------- data_prof : TYPE: datarray or DataFrame or Pandas Series DESCRIPTION. Data extracted along a profile (lat or lon) """ # use extract transect to get the domain or points data_extract = extract_transect(data=data, maxlon=maxlon, minlon=minlon, maxlat=maxlat, minlat=minlat, sea_land_mask=sea_land_mask, minelev=minelev, maxelev=maxelev, Dataset=Dataset) if dim in ["lat", "latitude"]: print("Computing the mean across longitude") data_prof = data_extract.mean(dim="lon") elif dim in ["lon", "longitude"]: print("Computing the mean across latitude") data_prof = data_extract.mean(dim="lat") else: print("Define the dimension to extract the profile") if to_pandas ==True: data_prof = data_prof.to_pandas() data_prof = data_prof.T # sort values base on the dim if isinstance(data_prof, pd.Series): data_prof = data_prof.sort_index(axis = 0, ascending = True) elif isinstance(data_prof, pd.DataFrame): data_prof = data_prof.sort_values(by=[dim]) else: print("Check the instance of the data to be sorted") else: print("Profile data not stored in DataFrame") return data_prof
[docs]def extract_vertical_section(data, maxlon, minlon, maxlat, minlat, dim, sea_land_mask=False, minelev=None, maxelev=None, Dataset=None, season = None, month=None): data_extract = extract_transect(data=data, maxlon=maxlon, minlon=minlon, maxlat=maxlat, minlat=minlat, sea_land_mask=sea_land_mask, minelev=minelev, maxelev=maxelev, Dataset=Dataset) if dim in ["lat", "latitude"]: print("Computing the mean across longitude") data_sect = data_extract.mean(dim="lon") elif dim in ["lon", "longitude"]: print("Computing the mean across latitude") data_sect = data_extract.mean(dim="lat") else: print("Define the dimension to extract the profile") # select season if required if season is not None: data_sect = data_sect.sel(season=season) if month is not None: data_sect = data_sect.sel(month=month) # convert to pandas df = data_sect.to_pandas() df = df.T # transpose axis # sortby using the columns if isinstance(df, pd.Series): df = df.sort_values(axis=1, ascending=True) df = df.sort_values(axis=0, ascending=True) elif isinstance(df, pd.DataFrame): df = df.sort_values(by=[dim], axis=0) #df = df.sort_values(by=["lev"], axis=1) return df
[docs]def linregression(data_x, data_y, season=None, month=None, return_yhat=True): """ Parameters ---------- data_x : TYPE: datarray DESCRIPTION. The x-axis data for fitting data_y : TYPE: datarray DESCRIPTION. The y-axis data for fitting season : TYPE, optional (of If specific season the data is required for fitting) DESCRIPTION. The default is None. Date must be in seasonal coordinates for time month : TYPE, optional (of If specific month the data is required for fitting) DESCRIPTION. The default is None. Date must be in monthly coordinates for time return_yhat : TYPE, optional or if DataFrame containing all the fitting data and predictions are required DESCRIPTION. The default is None. Returns ------- TYPE: Scipy.stats output or plus DataFrame DESCRIPTION. """ if season is not None: data_x = data_x.sel(season = season).to_pandas().values.ravel() data_y = data_y.sel(season = season).to_pandas().values.ravel() elif month is not None: data_x = data_x.sel(month = month).to_pandas().values.ravel() data_y = data_y.sel(month = month).to_pandas().values.ravel() else: data_x = data_x.to_pandas().values.ravel() data_y = data_y.to_pandas().values.ravel() # remove nan values before fitting data_x = data_x[~np.isnan(data_x)] data_y = data_y[~np.isnan(data_y)] regression_stats = stats.linregress(data_x, data_y) print("y = {:.5f}x [‰/100m]+ {:.2f}, r²={:.2f}".format(regression_stats.slope*100, regression_stats.intercept, regression_stats.rvalue*-1)) if return_yhat == True: yhat = regression_stats.slope * data_x + regression_stats.intercept df_x_y_yhat = pd.DataFrame(columns=["X", "Y", "yhat"]) df_x_y_yhat["yhat"] = yhat df_x_y_yhat["X"] = data_x df_x_y_yhat["Y"] = data_y return regression_stats, df_x_y_yhat else: print("Only regression stats are return") return regression_stats
[docs]def EOF_analysis(data, maxlon, minlon, maxlat, minlat, return_variance=False, return_pcs=False, season=None, standardized=None, apply_coslat_weights=None, neofs=None, pcscaling=None, neigs=None, npcs=None, lev=None): """ # the projectField function can be used to generate corresponding set of pseudo-PCs using different data field Parameters ---------- data : TYPE: datarray DESCRIPTION. Dataset required for EOF analysis (eg. slp or geopoth500) maxlon : TYPE: float DESCRIPTION.: Maximum longitude minlon : TYPE: float DESCRIPTION: Minimum longitude maxlat : TYPE: float DESCRIPTION:Maximum latitude minlat : TYPE: float DESCRIPTION: Minimum latitude return_variance : TYPE: Boolean, optional DESCRIPTION. The default is False. If estimated varainces of the eofs are required as ouput return_pcs : TYPE: Boolean, optional DESCRIPTION. The default is False. If the extracted pca series are required as ouput season : TYPE:STR, optional DESCRIPTION. The default is None. Name of the season eg. DFJ, JJA standardized : TYPE: Boolean, optional DESCRIPTION. The default is None. True to standardized anomalies before EOF analysis apply_coslat_weights : TYPE: Boolean, optional DESCRIPTION. The default is None. True to apply coslat area weights before the EOF analysis neofs : TYPE: Float, optional DESCRIPTION. The default is None. The no. of PCA to perform on the dataset pcscaling : TYPE: Int, optional DESCRIPTION. The default is None. 0 : Unsclaed PCS, 1: Scaled to Unit variance, 2: PCs are multiplied by the square-root of their eigen values neigs : TYPE: Int, optional DESCRIPTION. The default is None. the no. of eigenvalues to return fraction variance npcs : TYPE:Int, optional DESCRIPTION. The default is None. The no. of pcs retrieve lev : TYPE: float, optional DESCRIPTION. The default is None. Vertical level if the dataset is on hybrid levels (eg. 500 for geopoth) Returns ------- TYPE DESCRIPTION. eofs: covariance matrix between the npcs time series and eofs input time series pcs: Principal Component time series var_frac: variance fraction of the estimated eigen values """ if lev is not None: data = data.sel(lev=lev) if season is not None: data = data.groupby("time.season") data = data[season] # extracting transects for the analysis # converting lon to -180 t0 180 data = data.assign_coords({"lon": (((data.lon + 180) % 360) - 180)}) data = data.where((data.lat >= minlat) & (data.lat <= maxlat), drop=True) data = data.where((data.lon >= minlon) & (data.lon <= maxlon), drop=True) # calculation of anomalies data_mean = data.mean(dim="time") data_anomalies = data - data_mean # standardized data if standardized == True: data_anomalies_std = data_anomalies.std(dim="time") data_anomalies /= data_anomalies_std # apply weights if equal area using cosalt if apply_coslat_weights == True: wtgs = np.sqrt(np.abs(np.cos(data_anomalies.lat * np.pi / 180))) data_anomalies = data_anomalies * wtgs # applying the EOF function (ref: http://doi.org/10.5334/jors.122) if True in data_anomalies.isnull(): data_anomalies = data_anomalies.dropna(dim="time") Solver = Eof(data_anomalies) if all(parameter is not None for parameter in [neofs, pcscaling]): eofs_cov = Solver.eofsAsCovariance(neofs=neofs, pcscaling=pcscaling) else: eofs_cov = Solver.eofsAsCovariance() #sort with the right lon positoin --> becuase of changing the lon to -+180 eofs_cov = eofs_cov.sortby(eofs_cov.lon) if return_variance == True: if neigs is not None: var_frac = Solver.varianceFraction(neigs = neigs) else: var_frac = Solver.varianceFraction() var_frac = var_frac.to_pandas() if return_pcs == True: if all(parameter is not None for parameter in [npcs, pcscaling]): pcs = Solver.pcs(pcscaling = pcscaling, npcs = npcs) else: pcs = Solver.pcs() pcs = pcs.to_pandas() if return_pcs == True and return_variance == True: return eofs_cov, pcs, var_frac elif return_pcs == True and return_variance == False: return eofs_cov, pcs elif return_pcs == False and return_variance == True: return eofs_cov, var_frac else: return eofs_cov
[docs]def correlation(dataA, dataB, max_pvalue=0.1, use_spearmanr=False, use_pearsonr=False, return_pvalue=False, maxlon=None, minlon=None, maxlat=None, minlat=None): """ Parameters ---------- dataA : TYPE: Dataarray (3D) DESCRIPTION. Comparison data 1 dataB : TYPE: Dataaray (3D) DESCRIPTION. Comparison data 2 max_pvalue : TYPE: Float, optional DESCRIPTION. The default is 0.1. The confidence interval for correlation estimation eg. 0.05 for 95% use_spearmanr : TYPE: Boolean, optional DESCRIPTION. The default is False. True to use spearman correlation use_pearsonr : TYPE: Boolean, optional DESCRIPTION. The default is False. True to use pearson correlation return_pvalue : TYPE: Boolean, optional DESCRIPTION. The default is False. True to retrieve pvalue as an ouput variable maxlon : TYPE: float DESCRIPTION.: Maximum longitude minlon : TYPE: float DESCRIPTION: Minimum longitude maxlat : TYPE: float DESCRIPTION:Maximum latitude minlat : TYPE: float DESCRIPTION: Minimum latitude Raises ------ ValueError DESCRIPTION. If the required stats module for correlation analysis is not defined Returns ------- stats_result : TYPE: datarray DESCRIPTION. Contians correlation map distribution and corresponding pvalues """ #extract specific domain for correlation if all(par is not None for par in [maxlon, minlon, maxlat, minlat]): dataA = dataA.where((dataA.lat >= minlat) & (dataA.lat <= maxlat), drop=True) dataA= dataA.where((dataA.lon >= minlon) & (dataA.lon <= maxlon), drop=True) dataB = dataB.where((dataB.lat >= minlat) & (dataB.lat <= maxlat), drop=True) dataB = dataB.where((dataB.lon >= minlon) & (dataB.lon <= maxlon), drop=True) # define empty matrix to store r corr_data = np.zeros((dataA.shape[1], dataA.shape[2]), dtype=float) pvalue_data = np.zeros((dataA.shape[1], dataA.shape[2]), dtype=float) #looping over logitude and latitude for i in range(0, dataA.shape[1]): # lat for j in range(0, dataB.shape[2]): #lon if use_spearmanr==True: r,p = stats.spearmanr(dataA.isel(lat=[i], lon=[j]).values.ravel(), dataB.isel(lat=[i], lon=[j]).values.ravel()) elif use_pearsonr == True: r,p = stats.pearsonr(dataA.isel(lat=[i], lon=[j]).values.ravel(), dataB.isel(lat=[i], lon=[j]).values.ravel()) else: raise ValueError("Stats module not available, use_spearman or pearsonr") if p >= max_pvalue: corr_data[i,j] = np.nan else: corr_data[i,j] = r pvalue_data[i,j] = p #creating dataset to store values stats_result = xr.Dataset(coords={"lon": (["lon"], dataA.lon), "lat": (["lat"], dataA.lat)}) stats_result["correlation"] = (["lat", "lon"], corr_data) if return_pvalue == True: stats_result["pvalue"] = (["lat", "lon"], pvalue_data) return stats_result else: return stats_result
[docs]def student_t_test_btn_datasets(dataA, dataB, max_pvalue=0.1, return_pvalue=False, maxlon=None, minlon=None, maxlat=None, minlat=None): """ Parameters ---------- dataA : TYPE: Dataarray (3D) DESCRIPTION. Comparison data 1 dataB : TYPE: Dataaray (3D) DESCRIPTION. Comparison data 2 max_pvalue : TYPE: Float, optional DESCRIPTION. The default is 0.1. The confidence interval for correlation estimation eg. 0.05 for 95% return_pvalue : TYPE: Boolean, optional DESCRIPTION. The default is False. True to retrieve pvalue as an ouput variable maxlon : TYPE: float DESCRIPTION.: Maximum longitude minlon : TYPE: float DESCRIPTION: Minimum longitude maxlat : TYPE: float DESCRIPTION:Maximum latitude minlat : TYPE: float DESCRIPTION: Minimum latitude Returns ------- stats_result : TYPE DESCRIPTION. """ if all(par is not None for par in [maxlon, minlon, maxlat, minlat]): dataA = dataA.assign_coords({"lon": (((dataA.lon + 180) % 360) - 180)}) dataB = dataB.assign_coords({"lon": (((dataB.lon + 180) % 360) - 180)}) dataA = dataA.where((dataA.lat >= minlat) & (dataA.lat <= maxlat), drop=True) dataA= dataA.where((dataA.lon >= minlon) & (dataA.lon <= maxlon), drop=True) dataB = dataB.where((dataB.lat >= minlat) & (dataB.lat <= maxlat), drop=True) dataB = dataB.where((dataB.lon >= minlon) & (dataB.lon <= maxlon), drop=True) # define empty matrix to store r stats_data = np.zeros((dataA.shape[1], dataA.shape[2]), dtype=float) pvalue_data = np.zeros((dataA.shape[1], dataA.shape[2]), dtype=float) #looping over logitude and latitude for i in range(0, dataA.shape[1]): # lat for j in range(0, dataB.shape[2]): #lon s,p = stats.ttest_ind(dataA.isel(lat=[i], lon=[j]).values.ravel(), dataB.isel(lat=[i], lon=[j]).values.ravel()) if p >= max_pvalue: stats_data[i,j] = np.nan else: stats_data[i,j] = s pvalue_data[i,j] = p #creating dataset to store values stats_result = xr.Dataset(coords={"lon": (["lon"], dataA.lon.data), "lat": (["lat"], dataA.lat.data)}) stats_result["t_statistic"] = (["lat", "lon"], stats_data) if return_pvalue == True: stats_result["pvalue"] = (["lat", "lon"], pvalue_data) return stats_result else: return stats_result