Source code for wrainfo.attenuation_corr

"""Attenuation correction module."""

# WRaINfo, Is a software to process FURUNO weather radar data.
#
# Copyright (c) 2022, FernLab (GFZ Potsdam, fernlab@gfz-potsdam.de)
#
# This software was developed within the context of the RaINfo ("Potential use of
# high resolution weather data in agriculture") project of FernLab funded by
# the Impulse and Networking Fund of the Helmholtz Association.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
#
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# imports
# -------

import xarray as xr
import numpy as np
import wradlib as wrl
from scipy import integrate


# phase processing
# ----------------

[docs]def phase_zphi(phi, rng=1000., start_range=0.): """Preprocess of PHIDP. Parameters ---------- phi : xarray.DataArray rng : int range to calculate the window to search precipitation bins start_range : int distance from radar where the processing is starting Returns ------- : xarray.Dataset xarray.Dataset of preprocessed PHIDP. """ # select measurements after startrange phi = phi.where(phi.range >= start_range) # get first rangestep range_step = np.diff(phi.range)[0] # calculate window (need uneven number) nprec = int(rng / range_step) if (nprec % 2) == 0: nprec += 1 # create binary array phib = xr.where(np.isnan(phi), 0, 1) # take nprec range bins and calculate sum of valid values phib_sum = phib.rolling(range=nprec, center=True).sum(skipna=True) # offset in m offset = nprec // 2 * range_step # offset in idx offset_idx = nprec // 2 # start_range in m (centered) start_range = phib_sum.idxmax(dim="range") - offset # start_range in idx (centered) start_range_idx = phib_sum.argmax(dim="range") - offset_idx # stop_range in m (centered) stop_range = phib_sum[..., ::-1].idxmax(dim="range") + offset # stop_range in idx (centered) stop_range_idx = len(phib_sum.range) - (phib_sum[..., ::-1].argmax(dim="range") - offset_idx) - 2 first0 = phi.where((phi.range >= start_range) & (phi.range <= start_range + rng), drop=True) first_min = first0.min(dim='range', skipna=True) first_max = first0.max(dim='range', skipna=True) first_mean = first0.mean(dim='range', skipna=True) first_median = first0.median(dim='range', skipna=True) last0 = phi.where((phi.range >= stop_range - rng) & (phi.range <= stop_range), drop=True) last_min = last0.min(dim='range', skipna=True) last_max = last0.max(dim='range', skipna=True) last_mean = last0.mean(dim='range', skipna=True) last_median = last0.median(dim='range', skipna=True) # # get min phase values in specified range # first = phi.where((phi.range >= start_range) & (phi.range <= start_range + rng), # drop=True).min(dim='range', skipna=True) # # get max phase values in specified range # last = phi.where((phi.range >= stop_range - rng) & (phi.range <= stop_range), # drop=True).max(dim='range', skipna=True) return xr.Dataset(dict(phib=phib_sum, offset=offset, offset_idx=offset_idx, start_range=start_range, stop_range=stop_range, first_min=first_min, first_max=first_max, first_mean=first_mean, first_median=first_median, first_idx=start_range_idx, last_min=last_min, last_max=last_max, last_mean=last_mean, last_median=last_median, last_idx=stop_range_idx, ))
[docs]def cumulative_trapezoid(da, coord, **kwargs): """Cumulative trapezoid integration. Parameters --------- da : xarray.DataArray array with data to integrate coord : str name of coordinate to integrate over Returns ------- : xarray.DataArray DataArray with integrated values. """ x = da[coord] dim = x.dims[0] input_core_dim = [dim] output_core_dim = [dim] # calculate dx dx = da[coord].diff(dim).median() if coord == "range": dx /= 1000. kwargs['dx'] = kwargs.get("dx", dx.values) kwargs['axis'] = kwargs.get("axis", -1) kwargs["initial"] = kwargs.get("initial", 0) return xr.apply_ufunc(integrate.cumulative_trapezoid, da, input_core_dims=[input_core_dim], output_core_dims=[output_core_dim], output_dtypes=['float'], dask='parallelized', kwargs=kwargs, dask_gufunc_kwargs=dict(allow_rechunk=True),)
# zphi-method (Testud et al. 2001) # -------------------------------
[docs]def zphi(refl, cphase, alphax=0.28, bx=0.78): """Process of PHIDP, KDP and specific attenuation AH_ZPHI. Parameter --------- refl : xarray.DataArray array with reflectivity cphase : xarray.DataArray array of function phase_zphi alphax : float threshold, default for x-band: 0.28 bx : float threshold Returns ------- : xarray.Dataset xarray.Dataset with processed variables. """ # calculate delta-phi dphi = cphase.last_median - cphase.first_median # if negative set dphi zero dphi = dphi.where(dphi >= 0).fillna(0) # calculate function of dphi fdphi = 10 ** (0.1 * bx * dphi * alphax) - 1 # extract reflectivity zhraw = refl.where((refl.range < cphase.stop_range)) # transform to linear space and fill NaN with zero zax = zhraw.pipe(wrl.trafo.idecibel).fillna(0) # transform to decibel again # zh_cal = zax.pipe(wrl.trafo.decibel) # caclulate power # equ. 9 za = zax ** bx # fill NaN with zero za_zero = za.fillna(0) # calculate cumulative integral over range iza_x = 0.46 * bx * cumulative_trapezoid(za_zero, coord='range') # get maximum over last axis (range) and subtract cumulative integral # equ. 10 (r->r2) iza = np.max(iza_x, axis=-1) - iza_x # divide by function of delta-phi iza_fdphi = iza / fdphi # TODO: check if this calculates the correct/wanted iza_first # equ. 10 (r1->r2) # iza_first = iza_fdphi.isel(range=0) iza_first = iza_fdphi.isel(range=cphase.first_idx.load()) # calculate specific attenuation # equ. 8 att = za / (iza_first + iza) phi = 2 * cumulative_trapezoid(att / alphax, coord='range') # KDP kdp = att / alphax # encoding properties for output enc = dict(zlib=True, complevel=4, chunksizes=(1,) + phi.shape[1:]) phi.encoding = enc phi.name = "PHIDP_RECALC" phi_attrs = wrl.io.xarray.moments_mapping["PHIDP"].copy() phi_attrs.pop("gamic") phi.attrs = phi_attrs att.encoding = enc pol = refl.long_name[-1:] att_name = f"A{pol.upper()}" att.name = att_name + "_ZPHI" spec_att = dict(units='dB/km', standard_name=f'specific_attenuation_{pol.lower()}', long_name=f'Specific attenuation {pol.upper()}', short_name=att_name ) att.attrs = spec_att kdp.encoding = enc kdp.name = "KDP_RECALC" kdp_attrs = wrl.io.xarray.moments_mapping["KDP"].copy() kdp_attrs.pop("gamic") kdp.attrs = kdp_attrs return xr.merge([phi.to_dataset(), att.to_dataset(), kdp.to_dataset()])
# process chain for attenuation correction # ----------------------------------------
[docs]def attenuation_correction(ds, moment, dims=["azimuth", "range"]): """Attenuation correction with zphi method. Parameters ---------- ds : xarray.DataArray moment : xarray.DataArray DBZH (clutter corrected) dims : list dimension of the dataset Returns ------- : xarray.Dataset xarray.Dataset with attenuation corrected refelctivity and recalculated variables. """ # phase processing # ---------------- cphase = phase_zphi(ds.PHIDP.where((ds.RHOHV > 0.8) & (ds[moment] > 10)), rng=1000., start_range=0) ds_out = zphi(ds[moment], cphase) # attenuation correction # ---------------------- zhcorr = ds[moment] + 0.28 * ds_out.PHIDP_RECALC # concat datasets # --------------- ds["DBZH_CORR"] = xr.DataArray(zhcorr, dims=dims, attrs=ds.DBZH.attrs) phidp_recalc = ds_out.PHIDP_RECALC ds["PHIDP_RECALC"] = xr.DataArray(phidp_recalc, dims=dims, attrs=ds_out.PHIDP_RECALC.attrs) ah_zphi = ds_out.AH_ZPHI ds["AH_ZPHI"] = xr.DataArray(ah_zphi, dims=dims, attrs=ds_out.AH_ZPHI.attrs) kdp = ds_out.KDP_RECALC ds["KDP_RECALC"] = xr.DataArray(kdp, dims=dims, attrs=ds_out.KDP_RECALC.attrs) return ds