Coverage for wrainfo/attenuation_corr.py: 99%

86 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2022-10-18 12:40 +0000

1"""Attenuation correction module.""" 

2 

3# WRaINfo, Is a software to process FURUNO weather radar data. 

4# 

5# Copyright (c) 2022, FernLab (GFZ Potsdam, fernlab@gfz-potsdam.de) 

6# 

7# This software was developed within the context of the RaINfo ("Potential use of 

8# high resolution weather data in agriculture") project of FernLab funded by 

9# the Impulse and Networking Fund of the Helmholtz Association. 

10# 

11# Licensed under the Apache License, Version 2.0 (the "License"); 

12# you may not use this file except in compliance with the License. 

13# 

14# You may obtain a copy of the License at 

15# http://www.apache.org/licenses/LICENSE-2.0 

16# 

17# Unless required by applicable law or agreed to in writing, software 

18# distributed under the License is distributed on an "AS IS" BASIS, 

19# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

20# See the License for the specific language governing permissions and 

21# limitations under the License. 

22 

23# imports 

24# ------- 

25 

26import xarray as xr 

27import numpy as np 

28import wradlib as wrl 

29from scipy import integrate 

30 

31 

32# phase processing 

33# ---------------- 

34 

35def phase_zphi(phi, rng=1000., start_range=0.): 

36 """Preprocess of PHIDP. 

37 

38 Parameters 

39 ---------- 

40 phi : xarray.DataArray 

41 rng : int 

42 range to calculate the window to search precipitation bins 

43 start_range : int 

44 distance from radar where the processing is starting 

45 

46 Returns 

47 ------- 

48 : xarray.Dataset 

49 xarray.Dataset of preprocessed PHIDP. 

50 """ 

51 # select measurements after startrange 

52 phi = phi.where(phi.range >= start_range) 

53 

54 # get first rangestep 

55 range_step = np.diff(phi.range)[0] 

56 

57 # calculate window (need uneven number) 

58 nprec = int(rng / range_step) 

59 if (nprec % 2) == 0: 

60 nprec += 1 

61 

62 # create binary array 

63 phib = xr.where(np.isnan(phi), 0, 1) 

64 

65 # take nprec range bins and calculate sum of valid values 

66 phib_sum = phib.rolling(range=nprec, center=True).sum(skipna=True) 

67 

68 # offset in m 

69 offset = nprec // 2 * range_step 

70 # offset in idx 

71 offset_idx = nprec // 2 

72 

73 # start_range in m (centered) 

74 start_range = phib_sum.idxmax(dim="range") - offset 

75 # start_range in idx (centered) 

76 start_range_idx = phib_sum.argmax(dim="range") - offset_idx 

77 

78 # stop_range in m (centered) 

79 stop_range = phib_sum[..., ::-1].idxmax(dim="range") + offset 

80 # stop_range in idx (centered) 

81 stop_range_idx = len(phib_sum.range) - (phib_sum[..., ::-1].argmax(dim="range") - offset_idx) - 2 

82 

83 first0 = phi.where((phi.range >= start_range) & (phi.range <= start_range + rng), 

84 drop=True) 

85 

86 first_min = first0.min(dim='range', skipna=True) 

87 first_max = first0.max(dim='range', skipna=True) 

88 first_mean = first0.mean(dim='range', skipna=True) 

89 first_median = first0.median(dim='range', skipna=True) 

90 

91 last0 = phi.where((phi.range >= stop_range - rng) & (phi.range <= stop_range), 

92 drop=True) 

93 

94 last_min = last0.min(dim='range', skipna=True) 

95 last_max = last0.max(dim='range', skipna=True) 

96 last_mean = last0.mean(dim='range', skipna=True) 

97 last_median = last0.median(dim='range', skipna=True) 

98 

99 # # get min phase values in specified range 

100 # first = phi.where((phi.range >= start_range) & (phi.range <= start_range + rng), 

101 # drop=True).min(dim='range', skipna=True) 

102 # # get max phase values in specified range 

103 # last = phi.where((phi.range >= stop_range - rng) & (phi.range <= stop_range), 

104 # drop=True).max(dim='range', skipna=True) 

105 

106 return xr.Dataset(dict(phib=phib_sum, 

107 offset=offset, 

108 offset_idx=offset_idx, 

109 start_range=start_range, 

110 stop_range=stop_range, 

111 first_min=first_min, 

112 first_max=first_max, 

113 first_mean=first_mean, 

114 first_median=first_median, 

115 first_idx=start_range_idx, 

116 last_min=last_min, 

117 last_max=last_max, 

118 last_mean=last_mean, 

119 last_median=last_median, 

120 last_idx=stop_range_idx, 

121 )) 

122 

123 

124def cumulative_trapezoid(da, coord, **kwargs): 

125 """Cumulative trapezoid integration. 

126 

127 Parameters 

128 --------- 

129 da : xarray.DataArray 

130 array with data to integrate 

131 coord : str 

132 name of coordinate to integrate over 

133 

134 Returns 

135 ------- 

136 : xarray.DataArray 

137 DataArray with integrated values. 

138 """ 

139 x = da[coord] 

140 dim = x.dims[0] 

141 input_core_dim = [dim] 

142 output_core_dim = [dim] 

143 

144 # calculate dx 

145 dx = da[coord].diff(dim).median() 

146 if coord == "range": 

147 dx /= 1000. 

148 

149 kwargs['dx'] = kwargs.get("dx", dx.values) 

150 kwargs['axis'] = kwargs.get("axis", -1) 

151 kwargs["initial"] = kwargs.get("initial", 0) 

152 

153 return xr.apply_ufunc(integrate.cumulative_trapezoid, 

154 da, 

155 input_core_dims=[input_core_dim], 

156 output_core_dims=[output_core_dim], 

157 output_dtypes=['float'], 

158 dask='parallelized', 

159 kwargs=kwargs, 

160 dask_gufunc_kwargs=dict(allow_rechunk=True),) 

161 

162 

163# zphi-method (Testud et al. 2001) 

164# ------------------------------- 

165 

166def zphi(refl, cphase, alphax=0.28, bx=0.78): 

167 """Process of PHIDP, KDP and specific attenuation AH_ZPHI. 

168 

169 Parameter 

170 --------- 

171 refl : xarray.DataArray 

172 array with reflectivity 

173 cphase : xarray.DataArray 

174 array of function phase_zphi 

175 alphax : float 

176 threshold, default for x-band: 0.28 

177 bx : float 

178 threshold 

179 

180 Returns 

181 ------- 

182 : xarray.Dataset 

183 xarray.Dataset with processed variables. 

184 """ 

185 # calculate delta-phi 

186 dphi = cphase.last_median - cphase.first_median 

187 

188 # if negative set dphi zero 

189 dphi = dphi.where(dphi >= 0).fillna(0) 

190 

191 # calculate function of dphi 

192 fdphi = 10 ** (0.1 * bx * dphi * alphax) - 1 

193 

194 # extract reflectivity 

195 zhraw = refl.where((refl.range < cphase.stop_range)) 

196 

197 # transform to linear space and fill NaN with zero 

198 zax = zhraw.pipe(wrl.trafo.idecibel).fillna(0) 

199 

200 # transform to decibel again 

201 # zh_cal = zax.pipe(wrl.trafo.decibel) 

202 

203 # caclulate power 

204 # equ. 9 

205 za = zax ** bx 

206 

207 # fill NaN with zero 

208 za_zero = za.fillna(0) 

209 

210 # calculate cumulative integral over range 

211 iza_x = 0.46 * bx * cumulative_trapezoid(za_zero, coord='range') 

212 

213 # get maximum over last axis (range) and subtract cumulative integral 

214 # equ. 10 (r->r2) 

215 iza = np.max(iza_x, axis=-1) - iza_x 

216 

217 # divide by function of delta-phi 

218 iza_fdphi = iza / fdphi 

219 

220 # TODO: check if this calculates the correct/wanted iza_first 

221 # equ. 10 (r1->r2) 

222 # iza_first = iza_fdphi.isel(range=0) 

223 iza_first = iza_fdphi.isel(range=cphase.first_idx.load()) 

224 

225 # calculate specific attenuation 

226 # equ. 8 

227 att = za / (iza_first + iza) 

228 phi = 2 * cumulative_trapezoid(att / alphax, coord='range') 

229 

230 # KDP 

231 kdp = att / alphax 

232 

233 # encoding properties for output 

234 enc = dict(zlib=True, complevel=4, chunksizes=(1,) + phi.shape[1:]) 

235 phi.encoding = enc 

236 phi.name = "PHIDP_RECALC" 

237 phi_attrs = wrl.io.xarray.moments_mapping["PHIDP"].copy() 

238 phi_attrs.pop("gamic") 

239 phi.attrs = phi_attrs 

240 

241 att.encoding = enc 

242 pol = refl.long_name[-1:] 

243 att_name = f"A{pol.upper()}" 

244 att.name = att_name + "_ZPHI" 

245 spec_att = dict(units='dB/km', 

246 standard_name=f'specific_attenuation_{pol.lower()}', 

247 long_name=f'Specific attenuation {pol.upper()}', 

248 short_name=att_name 

249 ) 

250 att.attrs = spec_att 

251 

252 kdp.encoding = enc 

253 kdp.name = "KDP_RECALC" 

254 kdp_attrs = wrl.io.xarray.moments_mapping["KDP"].copy() 

255 kdp_attrs.pop("gamic") 

256 kdp.attrs = kdp_attrs 

257 

258 return xr.merge([phi.to_dataset(), att.to_dataset(), kdp.to_dataset()]) 

259 

260 

261# process chain for attenuation correction 

262# ---------------------------------------- 

263 

264def attenuation_correction(ds, moment, dims=["azimuth", "range"]): 

265 """Attenuation correction with zphi method. 

266 

267 Parameters 

268 ---------- 

269 ds : xarray.DataArray 

270 moment : xarray.DataArray 

271 DBZH (clutter corrected) 

272 dims : list 

273 dimension of the dataset 

274 

275 Returns 

276 ------- 

277 : xarray.Dataset 

278 xarray.Dataset with attenuation corrected refelctivity and recalculated variables. 

279 """ 

280 # phase processing 

281 # ---------------- 

282 

283 cphase = phase_zphi(ds.PHIDP.where((ds.RHOHV > 0.8) & (ds[moment] > 10)), rng=1000., start_range=0) 

284 

285 ds_out = zphi(ds[moment], cphase) 

286 

287 # attenuation correction 

288 # ---------------------- 

289 

290 zhcorr = ds[moment] + 0.28 * ds_out.PHIDP_RECALC 

291 

292 # concat datasets 

293 # --------------- 

294 

295 ds["DBZH_CORR"] = xr.DataArray(zhcorr, 

296 dims=dims, 

297 attrs=ds.DBZH.attrs) 

298 

299 phidp_recalc = ds_out.PHIDP_RECALC 

300 ds["PHIDP_RECALC"] = xr.DataArray(phidp_recalc, 

301 dims=dims, 

302 attrs=ds_out.PHIDP_RECALC.attrs) 

303 

304 ah_zphi = ds_out.AH_ZPHI 

305 ds["AH_ZPHI"] = xr.DataArray(ah_zphi, 

306 dims=dims, 

307 attrs=ds_out.AH_ZPHI.attrs) 

308 

309 kdp = ds_out.KDP_RECALC 

310 ds["KDP_RECALC"] = xr.DataArray(kdp, 

311 dims=dims, 

312 attrs=ds_out.KDP_RECALC.attrs) 

313 

314 return ds