Source code for wrainfo.reader

"""Reader module."""

# WRaINfo, Is a software to process FURUNO weather radar data.
#
# Copyright (c) 2022, FernLab (GFZ Potsdam, fernlab@gfz-potsdam.de)
#
# This software was developed within the context of the RaINfo ("Potential use of
# high resolution weather data in agriculture") project of FernLab funded by
# the Impulse and Networking Fund of the Helmholtz Association.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
#
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# imports
# -------

import datetime as dt
import os
import glob
import re
import xarray as xr
import numpy as np
import pickle
import json
from pathlib import Path


# read configuration file
# -----------------------

[docs]def read_config_file(path=".../example_settings_path_dependencies_wr_furuno.json", selection=None): """Read wr_furuno config file in json format. Parameter --------- path : str Path to config file. selection : str select setting parameter Return ------ : dict configured directories in the configuration file. """ path = path.replace("~", str(Path.home())) if not Path(path).is_file(): raise Exception("wrainfo.reader.read_config_file(): Config file'", path, "' not found.") file = open(path) conf = json.load(file) file.close() if selection is not None: if selection in conf.keys(): return conf[selection] else: raise Exception("wrainfo.reader.read_config_file(): Setting parameter'", selection, "' not in settings file '", path, "'.") else: return conf
# Functions to read the Furuno data # ----------------------------------
[docs]def get_furuno_path(path, start_time=dt.datetime.today()): """Create path of Furuno radar data files. Parameters ---------- path : str Path to configuration file start_time : datetime.datetime datetime object to select correct folder Returns ------- radar_path : str Path to radar data """ raw_path = read_config_file(path=path, selection="raw_data_directory") subfolder_struct_raw_path = read_config_file(path=path, selection="subfolder_structure_raw_data") radar_path = os.path.join(raw_path, subfolder_struct_raw_path) return radar_path.format(start_time.year, start_time.month, start_time.day)
[docs]def get_file_date_regex(filename): """Get regex from filename.""" # regex for ""%Y-%m-%d--%H:%M:%S" reg0 = r"\d{4}.\d{2}.\d{2}..\d{2}.\d{2}.\d{2}" # regex for "%Y%m%d%H%M%S" reg1 = r"\d{14}" # regex for 20220216_085000 reg2 = r"\d{4}\d{2}\d{2}.\d{2}\d{2}\d{2}" for reg in [reg0, reg1, reg2]: match = re.search(reg, os.path.basename(filename)) if match is not None: return reg return None
[docs]def get_datetime_from_filename(filename, regex): """Get datetime from filename.""" fmt = "%Y%m%d%H%M%S" match = re.search(regex, os.path.basename(filename)) match = "".join(re.findall(r"[0-9]+", match.group())) return dt.datetime.strptime(match, fmt)
[docs]def load_error_flist(path): """Load error file list. Parameter --------- path : str Path to configuration file Returns ------ : list list of error files. """ path_error_flist = read_config_file(path=path, selection="error_flist_directory") with open(path_error_flist, 'rb') as fp: error_files = pickle.load(fp) return error_files
[docs]def create_filelist(starttime, endtime, path, pattern="_000.scnx.gz"): """Create filelist from path_glob and filename dates. Parameters ---------- starttime : dt.datetime start time endtime : dt.datetime end time path : str Path to configuration file pattern : str extension of the scnx/netcdf file (scnx file: elevation angle 0.5° = "_000.scnx.gz") (netcdf file: = ".nc" , elevation angle is included in the filename) Returns ------- : list list of files """ flist = [] flist_scnx = [] flist_h5 = [] flist_nc = [] datetime_list = [] date = starttime error_flist = load_error_flist(path=path) while date <= endtime: raw_path = get_furuno_path(path=path, start_time=date) file_names = sorted(glob.glob(os.path.join(raw_path, "*"))) # fixed empty list of file_names if len(file_names) > 0: regex = get_file_date_regex(file_names[0]) for fname in file_names: time = get_datetime_from_filename(fname, regex) if time >= date: if time < endtime: if fname.endswith(pattern): if fname not in error_flist: flist_scnx.append(fname) if fname.endswith(".h5"): if fname not in error_flist: flist_h5.append(fname) if fname.endswith(".nc"): if fname not in error_flist: flist_nc.append(fname) if len(flist_nc) > 0: for file in flist_nc: flist.append(file) if len(flist_scnx) == 0: if len(flist_h5) > 0: for file in flist_h5: flist.append(file) if len(flist_scnx) > 0: for file in flist_scnx: filetime_scnx = get_datetime_from_filename(file, regex) datetime_list.append(filetime_scnx) flist.append(file) if len(flist_h5) > 0: for file in flist_h5: filetime_h5 = get_datetime_from_filename(file, regex) if filetime_h5 not in datetime_list: flist.append(file) date = date + dt.timedelta(1) flist = sorted(set(flist)) return flist
# read single file of Furuno data # (important for create the clutter map) # ---------------------------------------
[docs]def read_single_file(file, grp="dataset1"): """Read a single file, reindexes and returns a dataset object. Parameters ---------- file : str path to file as string grp : str hdf5 group (elevation angle) Returns ------- : xarray Dataset xarray Dataset with variables. """ extension = os.path.splitext(file)[1] if extension == ".gz": # the elevation angle the elevation angle is already selected in the flist! tmp = xr.open_mfdataset((file,), engine="furuno", group=1, concat_dim="time", combine="nested", backend_kwargs=dict(reindex_angle=1.0)) else: tmp = xr.open_mfdataset((file,), engine="odim", group=grp, concat_dim="time", combine="nested", backend_kwargs=dict(reindex_angle=1.0)) # TODO add case sensitive decision on these parameters re_index_parameters = np.arange(0.25, 360, 0.5) ri = tmp.reindex(azimuth=re_index_parameters, method="nearest") return ri
# read processed cluttermap from Furuno # -------------------------------------
[docs]def get_cmap(starttime, endtime, path, elev="0.5", timestr="%Y%m%d"): """Read processed cluttermap from Furuno. Parameters --------- starttime : datetime.datetime datetime - object to select correct files for list endtime : datetime.datetime datetime - object to select correct files for list path : str Path to configuration file elev : str selected elevation angle of cmap Returns ------- : xarray.Dataset clutter map as xarray.Dataset. """ # read settings from configuration file clutter_dir = read_config_file(path=path, selection="monthly_clutter_directory") subfolderstruct = read_config_file(path=path, selection="subfolder_structure_clutter_directory") # create file structure file_struct = os.path.join(f"{clutter_dir}", f"{subfolderstruct}", "*") path = file_struct.format(year=starttime.year) # list all cmaps in the folder cmaps = sorted(glob.glob(path)) selected_map = None last_date = dt.datetime(1900, 1, 1) for cmap in cmaps: # choose the stop time from filename filetime_end = dt.datetime.strptime((os.path.basename(cmap)[19:27]), timestr) elev_cmap = os.path.basename(cmap)[33:36] if filetime_end > (endtime - dt.timedelta(45)): if filetime_end > last_date: if elev_cmap == elev: # noqa E712 selected_map = cmap last_date = filetime_end if selected_map is None: print("wrainfo.reader.get_cmap(): No cmap found for these dates.") # read cmap cmap = xr.open_dataset(selected_map, engine="h5netcdf") return cmap