Coverage for wrainfo/attenuation_corr.py: 99%
86 statements
« prev ^ index » next coverage.py v6.5.0, created at 2022-10-18 12:40 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2022-10-18 12:40 +0000
1"""Attenuation correction module."""
3# WRaINfo, Is a software to process FURUNO weather radar data.
4#
5# Copyright (c) 2022, FernLab (GFZ Potsdam, fernlab@gfz-potsdam.de)
6#
7# This software was developed within the context of the RaINfo ("Potential use of
8# high resolution weather data in agriculture") project of FernLab funded by
9# the Impulse and Networking Fund of the Helmholtz Association.
10#
11# Licensed under the Apache License, Version 2.0 (the "License");
12# you may not use this file except in compliance with the License.
13#
14# You may obtain a copy of the License at
15# http://www.apache.org/licenses/LICENSE-2.0
16#
17# Unless required by applicable law or agreed to in writing, software
18# distributed under the License is distributed on an "AS IS" BASIS,
19# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20# See the License for the specific language governing permissions and
21# limitations under the License.
23# imports
24# -------
26import xarray as xr
27import numpy as np
28import wradlib as wrl
29from scipy import integrate
32# phase processing
33# ----------------
35def phase_zphi(phi, rng=1000., start_range=0.):
36 """Preprocess of PHIDP.
38 Parameters
39 ----------
40 phi : xarray.DataArray
41 rng : int
42 range to calculate the window to search precipitation bins
43 start_range : int
44 distance from radar where the processing is starting
46 Returns
47 -------
48 : xarray.Dataset
49 xarray.Dataset of preprocessed PHIDP.
50 """
51 # select measurements after startrange
52 phi = phi.where(phi.range >= start_range)
54 # get first rangestep
55 range_step = np.diff(phi.range)[0]
57 # calculate window (need uneven number)
58 nprec = int(rng / range_step)
59 if (nprec % 2) == 0:
60 nprec += 1
62 # create binary array
63 phib = xr.where(np.isnan(phi), 0, 1)
65 # take nprec range bins and calculate sum of valid values
66 phib_sum = phib.rolling(range=nprec, center=True).sum(skipna=True)
68 # offset in m
69 offset = nprec // 2 * range_step
70 # offset in idx
71 offset_idx = nprec // 2
73 # start_range in m (centered)
74 start_range = phib_sum.idxmax(dim="range") - offset
75 # start_range in idx (centered)
76 start_range_idx = phib_sum.argmax(dim="range") - offset_idx
78 # stop_range in m (centered)
79 stop_range = phib_sum[..., ::-1].idxmax(dim="range") + offset
80 # stop_range in idx (centered)
81 stop_range_idx = len(phib_sum.range) - (phib_sum[..., ::-1].argmax(dim="range") - offset_idx) - 2
83 first0 = phi.where((phi.range >= start_range) & (phi.range <= start_range + rng),
84 drop=True)
86 first_min = first0.min(dim='range', skipna=True)
87 first_max = first0.max(dim='range', skipna=True)
88 first_mean = first0.mean(dim='range', skipna=True)
89 first_median = first0.median(dim='range', skipna=True)
91 last0 = phi.where((phi.range >= stop_range - rng) & (phi.range <= stop_range),
92 drop=True)
94 last_min = last0.min(dim='range', skipna=True)
95 last_max = last0.max(dim='range', skipna=True)
96 last_mean = last0.mean(dim='range', skipna=True)
97 last_median = last0.median(dim='range', skipna=True)
99 # # get min phase values in specified range
100 # first = phi.where((phi.range >= start_range) & (phi.range <= start_range + rng),
101 # drop=True).min(dim='range', skipna=True)
102 # # get max phase values in specified range
103 # last = phi.where((phi.range >= stop_range - rng) & (phi.range <= stop_range),
104 # drop=True).max(dim='range', skipna=True)
106 return xr.Dataset(dict(phib=phib_sum,
107 offset=offset,
108 offset_idx=offset_idx,
109 start_range=start_range,
110 stop_range=stop_range,
111 first_min=first_min,
112 first_max=first_max,
113 first_mean=first_mean,
114 first_median=first_median,
115 first_idx=start_range_idx,
116 last_min=last_min,
117 last_max=last_max,
118 last_mean=last_mean,
119 last_median=last_median,
120 last_idx=stop_range_idx,
121 ))
124def cumulative_trapezoid(da, coord, **kwargs):
125 """Cumulative trapezoid integration.
127 Parameters
128 ---------
129 da : xarray.DataArray
130 array with data to integrate
131 coord : str
132 name of coordinate to integrate over
134 Returns
135 -------
136 : xarray.DataArray
137 DataArray with integrated values.
138 """
139 x = da[coord]
140 dim = x.dims[0]
141 input_core_dim = [dim]
142 output_core_dim = [dim]
144 # calculate dx
145 dx = da[coord].diff(dim).median()
146 if coord == "range":
147 dx /= 1000.
149 kwargs['dx'] = kwargs.get("dx", dx.values)
150 kwargs['axis'] = kwargs.get("axis", -1)
151 kwargs["initial"] = kwargs.get("initial", 0)
153 return xr.apply_ufunc(integrate.cumulative_trapezoid,
154 da,
155 input_core_dims=[input_core_dim],
156 output_core_dims=[output_core_dim],
157 output_dtypes=['float'],
158 dask='parallelized',
159 kwargs=kwargs,
160 dask_gufunc_kwargs=dict(allow_rechunk=True),)
163# zphi-method (Testud et al. 2001)
164# -------------------------------
166def zphi(refl, cphase, alphax=0.28, bx=0.78):
167 """Process of PHIDP, KDP and specific attenuation AH_ZPHI.
169 Parameter
170 ---------
171 refl : xarray.DataArray
172 array with reflectivity
173 cphase : xarray.DataArray
174 array of function phase_zphi
175 alphax : float
176 threshold, default for x-band: 0.28
177 bx : float
178 threshold
180 Returns
181 -------
182 : xarray.Dataset
183 xarray.Dataset with processed variables.
184 """
185 # calculate delta-phi
186 dphi = cphase.last_median - cphase.first_median
188 # if negative set dphi zero
189 dphi = dphi.where(dphi >= 0).fillna(0)
191 # calculate function of dphi
192 fdphi = 10 ** (0.1 * bx * dphi * alphax) - 1
194 # extract reflectivity
195 zhraw = refl.where((refl.range < cphase.stop_range))
197 # transform to linear space and fill NaN with zero
198 zax = zhraw.pipe(wrl.trafo.idecibel).fillna(0)
200 # transform to decibel again
201 # zh_cal = zax.pipe(wrl.trafo.decibel)
203 # caclulate power
204 # equ. 9
205 za = zax ** bx
207 # fill NaN with zero
208 za_zero = za.fillna(0)
210 # calculate cumulative integral over range
211 iza_x = 0.46 * bx * cumulative_trapezoid(za_zero, coord='range')
213 # get maximum over last axis (range) and subtract cumulative integral
214 # equ. 10 (r->r2)
215 iza = np.max(iza_x, axis=-1) - iza_x
217 # divide by function of delta-phi
218 iza_fdphi = iza / fdphi
220 # TODO: check if this calculates the correct/wanted iza_first
221 # equ. 10 (r1->r2)
222 # iza_first = iza_fdphi.isel(range=0)
223 iza_first = iza_fdphi.isel(range=cphase.first_idx.load())
225 # calculate specific attenuation
226 # equ. 8
227 att = za / (iza_first + iza)
228 phi = 2 * cumulative_trapezoid(att / alphax, coord='range')
230 # KDP
231 kdp = att / alphax
233 # encoding properties for output
234 enc = dict(zlib=True, complevel=4, chunksizes=(1,) + phi.shape[1:])
235 phi.encoding = enc
236 phi.name = "PHIDP_RECALC"
237 phi_attrs = wrl.io.xarray.moments_mapping["PHIDP"].copy()
238 phi_attrs.pop("gamic")
239 phi.attrs = phi_attrs
241 att.encoding = enc
242 pol = refl.long_name[-1:]
243 att_name = f"A{pol.upper()}"
244 att.name = att_name + "_ZPHI"
245 spec_att = dict(units='dB/km',
246 standard_name=f'specific_attenuation_{pol.lower()}',
247 long_name=f'Specific attenuation {pol.upper()}',
248 short_name=att_name
249 )
250 att.attrs = spec_att
252 kdp.encoding = enc
253 kdp.name = "KDP_RECALC"
254 kdp_attrs = wrl.io.xarray.moments_mapping["KDP"].copy()
255 kdp_attrs.pop("gamic")
256 kdp.attrs = kdp_attrs
258 return xr.merge([phi.to_dataset(), att.to_dataset(), kdp.to_dataset()])
261# process chain for attenuation correction
262# ----------------------------------------
264def attenuation_correction(ds, moment, dims=["azimuth", "range"]):
265 """Attenuation correction with zphi method.
267 Parameters
268 ----------
269 ds : xarray.DataArray
270 moment : xarray.DataArray
271 DBZH (clutter corrected)
272 dims : list
273 dimension of the dataset
275 Returns
276 -------
277 : xarray.Dataset
278 xarray.Dataset with attenuation corrected refelctivity and recalculated variables.
279 """
280 # phase processing
281 # ----------------
283 cphase = phase_zphi(ds.PHIDP.where((ds.RHOHV > 0.8) & (ds[moment] > 10)), rng=1000., start_range=0)
285 ds_out = zphi(ds[moment], cphase)
287 # attenuation correction
288 # ----------------------
290 zhcorr = ds[moment] + 0.28 * ds_out.PHIDP_RECALC
292 # concat datasets
293 # ---------------
295 ds["DBZH_CORR"] = xr.DataArray(zhcorr,
296 dims=dims,
297 attrs=ds.DBZH.attrs)
299 phidp_recalc = ds_out.PHIDP_RECALC
300 ds["PHIDP_RECALC"] = xr.DataArray(phidp_recalc,
301 dims=dims,
302 attrs=ds_out.PHIDP_RECALC.attrs)
304 ah_zphi = ds_out.AH_ZPHI
305 ds["AH_ZPHI"] = xr.DataArray(ah_zphi,
306 dims=dims,
307 attrs=ds_out.AH_ZPHI.attrs)
309 kdp = ds_out.KDP_RECALC
310 ds["KDP_RECALC"] = xr.DataArray(kdp,
311 dims=dims,
312 attrs=ds_out.KDP_RECALC.attrs)
314 return ds