Source code for wrainfo.process_chains

"""Process chains module."""

# WRaINfo, Is a software to process FURUNO weather radar data.
#
# Copyright (c) 2022, FernLab (GFZ Potsdam, fernlab@gfz-potsdam.de)
#
# This software was developed within the context of the RaINfo ("Potential use of
# high resolution weather data in agriculture") project of FernLab funded by
# the Impulse and Networking Fund of the Helmholtz Association.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
#
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import os
import glob
import xarray as xr
import pathlib
import datetime as dt
import numpy as np
import wrainfo


# clutter map will be calculated monthly every first day of the month
# -------------------------------------------------------------------

[docs]def clutter_chain(start_time, path, elevation_angle="dataset1", days=90, threshold=0, status=True, pattern="_000.scnx.gz", data_type="cmap", logger=logging.getLogger()): """Create a ground clutter map and save NetCDF-file. Parameters ---------- start_time : datetime.datetime start time which files will be read path : str path to configuration file elevation_angle : str choose dataset of one elevation angle (for example: dataset 1 is elevation angle 0.5°) days : integer Number of days for which a cmap is created threshold : float this is a threshold for the DBZH it should not be negative status : bool percentage of the finished clutter map pattern : str extension of the scnx files (scnx file: elevation angle 0.5° = "_000.scnx") data_type : str the clutter map (cmap) were created logger : file writing status messages to a file Returns ------- : files NetCDF - files in output directory. """ # read output directory for clutter map from configuration file clutter_directory = wrainfo.reader.read_config_file(path=path, selection="monthly_clutter_directory") radar_location_identifier = wrainfo.reader.read_config_file(path=path, selection="radar_location_identifier") # create the stop time # ----------------------- stop_time = start_time + dt.timedelta(days=days) # create a list of all input files for the defined time period # --------------------------------------------------------------- flist = wrainfo.reader.create_filelist(path=path, starttime=start_time, endtime=stop_time, pattern=pattern) # create clutter map # --------------------- logger.info("Creating cluttermap.") cmap = wrainfo.clutter.create_clutter_map_sequential(files=flist, threshold=threshold, grp=elevation_angle, status=status, logger=logger.getChild("wr_furuno.clutter" ".create_clutter_map_sequential")) # elevation angle # ------------------ el = cmap.elevation.values[0] # create outfilename # --------------------- outfilename = f"{radar_location_identifier}_{start_time:%Y%m%d}_{stop_time:%Y%m%d}_elev_{el:0.1f}_{data_type}.nc" year = str(stop_time.year) path1 = clutter_directory + "/" + year if not os.path.exists(path1): os.makedirs(path1, exist_ok=True) # overwrite/remove if exist f = pathlib.Path(outfilename) f.unlink(missing_ok=True) outfilename = os.path.join(path1, outfilename) # output file as netcdf # ------------------------ logger.info("Creating cluttermap netcdf output.") print(f"-- output to {outfilename}") cmap.to_netcdf(outfilename) logger.info("Finished cluttermap chain.") return True
# static clutter map will be processed each month from 3 last cluttermaps # -----------------------------------------------------------------------
[docs]def static_cmap(path, pattern="_elev_0.5_cmap.nc"): """Create a static ground clutter map and save NetCDF-file. Parameters ---------- path : str path to configuration file pattern : str extension of the netcdf file (cmaps for different elevation angles) Returns ------- : files NetCDF - files in output directory. """ # read path of clutter directory from configuration file path_cmap = wrainfo.reader.read_config_file(path=path, selection="monthly_clutter_directory") path_cmap = path_cmap + "*" # read radar location identifier from configuration file radar_location_identifier = wrainfo.reader.read_config_file(path=path, selection="radar_location_identifier") # read output directory from configuration file outdir = wrainfo.reader.read_config_file(path=path, selection="static_clutter_directory") # load the last 3 cmaps cmaps = sorted(glob.glob(os.path.join(path_cmap, "*"))) # filter for elevation angle cmaps_filter = [] for cmap in cmaps: if cmap.endswith(pattern): cmaps_filter.append(cmap) cmaps_filter = cmaps_filter[-3:] # read the last 3 cmaps cmap1 = xr.open_dataset(cmaps_filter[0]) cmap2 = xr.open_dataset(cmaps_filter[1]) cmap3 = xr.open_dataset(cmaps_filter[2]) # add cmaps cmap_final = cmap1 + cmap2 + cmap3 # outfilename base_name = os.path.basename(cmaps_filter[1]) el = base_name[28:36] outfilename = f"{radar_location_identifier}_static_cluttermap_{el}.nc" # output directory if not os.path.exists(outdir): os.makedirs(outdir, exist_ok=True) # overwrite/remove if exist f = pathlib.Path(outfilename) f.unlink(missing_ok=True) outfilename = os.path.join(outdir, outfilename) print(f"-- output to {outfilename}") cmap_final.to_netcdf(outfilename) return True
# weather radar routine # ---------------------
[docs]def wr_routine_furuno(starttime, endtime, delta, path, data_type="Level2a", elevation_angle="dataset1", pattern="_000.scnx.gz", logger=logging.getLogger()): """Process of Furuno raw data. Parameters ---------- starttime : datetime.datetime endtime : datetime.datetime delta : datetime.delta path : str path to configuration file data_type : str Level of processing pattern : extension of the scnx file (scnx file: elevation angle 0.5° = "_000.scnx") elevation_angle : str group of hdf5 dataset logger : file writing status messages to a file Returns ------- : files NetCDF - files in output directory """ date = starttime stop_time = endtime # read settings from configuration file re_idx = wrainfo.reader.read_config_file(path=path, selection="re_index_parameters") re_index_parameters = np.arange(re_idx[0], re_idx[1], re_idx[2]) dims = wrainfo.reader.read_config_file(path=path, selection="dimensions") while date < stop_time: # read static cmap # ---------------- logger.info("Read static clutter map") clutter_dircetory = wrainfo.reader.read_config_file(path=path, selection="static_clutter_directory") cmap = xr.open_dataset(clutter_dircetory) cmap = cmap.DBZH_sum_bool # load files # ---------- logger.info("Create list of files to be processed") flist_raw = wrainfo.reader.create_filelist(path=path, starttime=date, endtime=date + delta, pattern=pattern) # start processing # ---------------- for file in flist_raw: # extension of the file # --------------------- extension = os.path.splitext(file)[1] if extension == ".gz": # read Furuno scnx data # --------------------- logger.info("Read Furuno scnx file") try: ds = xr.open_dataset(file, engine="furuno", group=1, backend_kwargs=dict(reindex_angle=1.0)) except Exception: # except errors if a file could not read logger.warning("Could not load scnx file:' " + file + "'.") continue # rename VRADH to VRAD ds['VRAD'] = ds['VRADH'] ds = ds.drop(['VRADH']) if extension == ".h5": # read Furuno hdf5 data # --------------------- logger.info("Read Furuno hdf5 file") try: ds = xr.open_dataset(file, engine="odim", group=elevation_angle, backend_kwargs=dict(reindex_angle=1.0)) except Exception: # except errors if a file could not read logger.warning("Could not load hdf5 file:' " + file + "'.") continue # reindexing azimuth resolution # ----------------------------- ds = ds.reindex(azimuth=re_index_parameters, method="nearest") # fix errors by elevation angle # ----------------------------- elevation_angle_ds = ds.elevation.median(dim="azimuth") ds["elevation"][:] = elevation_angle_ds # SNR filter by RHOHV # -------------------- ds = ds.where(ds.RHOHV > 0.9) # remove static clutter with cmap # -------------------------------- dbzh = ds.DBZH.where((cmap == False)) # noqa E712 dbzh_attrs = ds.DBZH.attrs ds["DBZH"] = xr.DataArray(dbzh, dims=dims, attrs=dbzh_attrs) # clutter detection # -------------------- logger.info("Clutter classification.") ds = wrainfo.clutter.fuzzy_echo_classification(ds, cmap=cmap, dims=dims) # set clutter pixel to NA ds = wrainfo.clutter.dbzh_no_clutter(ds) # phase processing & attenuation correction (ZPHI method) # ------------------------------------------------------- logger.info("Phase processing and attenuation correction.") ds = wrainfo.attenuation_corr.attenuation_correction(ds, moment="DBZH_no_clutter", dims=dims) # rain rate retreival # -------------------- logger.info("Create precipitation products.") # QPE Z(R) logger.info("Create precipitation product with Z(R) relationship.") ds = wrainfo.precipitation.qpe_zr(ds, "DBZH_CORR", path=path, a=200, b=1.6) # smoothing ds = ds.pad(azimuth=(3, 3), mode="wrap").rolling(azimuth=7, min_periods=3, center=True).mean().isel(azimuth=slice(3, -3)) # georeferencing and gridding # --------------------------- logger.info("Georeferencing and gridding") swp_georef = wrainfo.geometry.furuno_georeferencing(ds, path=path) wrainfo.geometry.furuno_sweep_to_netcdf(swp_georef, path=path, data_type=data_type) del ds del swp_georef date += delta return True