Source code for ocean_data_gateway.gateway

"""
This controls and connects to the individual readers.
"""

import cf_xarray  # isort:skip
from cf_xarray.units import units  # isort:skip
import pint_xarray  # isort:skip

pint_xarray.unit_registry = units  # isort:skip

import pandas as pd  # noqa: E402
import pint_xarray  # noqa: E402
import xarray as xr  # noqa: E402

from ioos_qc.config import QcConfig  # noqa: E402

import ocean_data_gateway as odg  # noqa: E402

from ocean_data_gateway import Reader  # noqa: E402


[docs]class Gateway(Reader): """ Wraps together the individual readers in order to have a single way to search. Attributes ---------- kwargs_all: dict Input keyword arguments that are not specific to one of the readers. These may include "approach", "parallel", "kw" containing the time and space region to search for, etc. kwargs: dict Keyword arguments that contain specific arguments for the readers. """
[docs] def __init__(self, *args, **kwargs): """ Parameters ---------- kw: dict Contains space and time search constraints: `min_lon`, `max_lon`, `min_lat`, `max_lat`, `min_time`, `max_time`. approach: string approach is defined as 'stations' or 'region' depending on user choice. parallel: boolean, optional If True, run with simple parallelization using `multiprocessing`. If False, run serially. True by default. If input in this manner, the same value is used for all readers. If input by individual reader dictionary, the value can vary by reader. readers: ocean_data_gateway Reader, list of readers, optional Use this to use fewer than the full set of readers. For example, `readers=odg.erddap` or to specifically include all by name `readers = [odg.ErddapReader, odg.axdsReader, odg.localReader]`. erddap: dict, optional Dictionary of reader specifications. For example, `erddap={'known_server': 'ioos'}`. See odg.erddap.ErddapReader for more input options. axds: dict, optional Dictionary of reader specifications. For example, `axds={'axds_type': 'platform2'}`. See odg.axds.AxdsReader for more input options. local: dict, optional Dictionary of reader specifications. For example, `local={'filenames': filenames}` for a list of filenames. See odg.local.LocalReader for more input options. criteria: dict, str, optional A dictionary describing how to recognize variables by their name and attributes with regular expressions to be used with `cf-xarray`. It can be local or a URL point to a nonlocal gist. This is required for running QC in Gateway. For example: >>> my_custom_criteria = {"salt": { ... "standard_name": "sea_water_salinity$|sea_water_practical_salinity$", ... "name": (?i)sal$|(?i)s.sea_water_practical_salinity$"}} var_def: dict, optional A dictionary with the same keys as criteria (criteria can have more) that describes QC definitions and units. It should include the variable units, fail_span, and suspect_span. For example: >>> var_def = {"salt": {"units": "psu", ... "fail_span": [-10, 60], "suspect_span": [-1, 45]}} Notes ----- To select search variables, input the variable names to each reader individually in the format `erddap={'variables': [list of variables]}`. Make sure that the variable names are correct for each individual reader. Check individual reader docs for more information. Alternatively, the user can input `criteria` and then input as variables the nicknames provided in `criteria` for variable names. These should then be input generally, not to an individual reader. Input keyword arguments that are not specific to one of the readers will be collected in local dictionary kwargs_all. These may include "approach", "parallel", "kw" containing the time and space region to search for, etc. Input keyword arguments that are specific to readers will be collected in local dictionary kwargs. """ # make sure only known keys are input in kwargs unknown_keys = set(list(kwargs.keys())) - set(odg.keys_kwargs) assertion = f"keys into Gateway {unknown_keys} are unknown." assert len(unknown_keys) == 0, assertion # set up a dictionary for general input kwargs exclude_keys = ["erddap", "axds", "local"] kwargs_all = { k: kwargs[k] for k in set(list(kwargs.keys())) - set(exclude_keys) } self.kwargs_all = kwargs_all # default approach is region if "approach" not in self.kwargs_all: self.kwargs_all["approach"] = "region" assertion = '`approach` has to be "region" or "stations"' assert self.kwargs_all["approach"] in ["region", "stations"], assertion # check for custom criteria to set up cf-xarray if "criteria" in self.kwargs_all: criteria = self.kwargs_all["criteria"] # link to nonlocal dictionary definition if isinstance(criteria, str) and criteria[:4] == "http": criteria = odg.return_response(criteria) cf_xarray.set_options(custom_criteria=criteria) self.criteria = criteria else: self.criteria = None # user-input variable definitions for QC if "var_def" in self.kwargs_all: var_def = self.kwargs_all["var_def"] # link to nonlocal dictionary definition if isinstance(var_def, str) and var_def[:4] == "http": var_def = odg.return_response(var_def) self.var_def = var_def else: self.var_def = None # if both criteria and var_def are input by user, make sure the keys # in var_def are all available in criteria. if self.criteria and self.var_def: assertion = ( "All variable keys in `var_def` must be available in `criteria`." ) assert all(elem in self.criteria for elem in self.var_def), assertion self.kwargs = kwargs self.sources self.store = dict()
def __getitem__(self, key): """Redefinition of dict-like behavior. This enables user to use syntax `reader[dataset_id]` to read in and save dataset into the object. Parameters ---------- key: str dataset_id for a dataset that is available in the search/reader object. Returns ------- xarray Dataset of the data associated with key """ returned_data = self.data_by_dataset(key) # returned_data = self._return_data(key) self.__setitem__(key, returned_data) return returned_data @property def sources(self): """Set up data sources (readers). Notes ----- All readers are included by default (readers as listed in odg._SOURCES). See __init__ for options. """ if not hasattr(self, "_sources"): # allow user to override what readers to use if "readers" in self.kwargs_all.keys(): SOURCES = self.kwargs_all["readers"] if not isinstance(SOURCES, list): SOURCES = [SOURCES] else: SOURCES = odg._SOURCES # loop over data sources to set them up sources = [] for source in SOURCES: # print(source.reader) # in case of important options for readers # but the built in options are ignored for a reader # if one is input in kwargs[source.reader] if source.reader in odg.OPTIONS.keys(): reader_options = odg.OPTIONS[source.reader] reader_key = list(reader_options.keys())[0] # if the user input their own option for this, use it instead # this makes it loop once if (source.reader in self.kwargs.keys()) and ( reader_key in self.kwargs[source.reader] ): # reader_values = [None] reader_values = self.kwargs[source.reader][reader_key] else: reader_values = list(reader_options.values())[0] else: reader_key = None # this is to make it loop once for cases without # extra options like localReader reader_values = [None] if not isinstance(reader_values, list): reader_values = [reader_values] # catch if the user is putting in a set of variables to match # the set of reader options if (source.reader in self.kwargs) and ( "variables" in self.kwargs[source.reader] ): variables_values = self.kwargs[source.reader]["variables"] if not isinstance(variables_values, list): variables_values = [variables_values] # if len(reader_values) == variables_values: # variables_values # catch scenario where variables input to all readers at once elif "variables" in self.kwargs_all: variables_values = [self.kwargs_all["variables"]] * len( reader_values ) else: variables_values = [None] * len(reader_values) # catch if the user is putting in a set of dataset_ids to match # the set of reader options if (source.reader in self.kwargs) and ( "dataset_ids" in self.kwargs[source.reader] ): dataset_ids_values = self.kwargs[source.reader]["dataset_ids"] if not isinstance(dataset_ids_values, list): dataset_ids_values = [dataset_ids_values] # if len(reader_values) == variables_values: # variables_values else: dataset_ids_values = [None] * len(reader_values) for option, variables, dataset_ids in zip( reader_values, variables_values, dataset_ids_values ): # setup reader with kwargs for that reader # prioritize input kwargs over default args # NEED TO INCLUDE kwargs that go to all the readers args = {} args_in = { **args, **self.kwargs_all, # reader_key: option, # **self.kwargs[source.reader], } if source.reader in self.kwargs.keys(): args_in = { **args_in, **self.kwargs[source.reader], } args_in = {**args_in, reader_key: option} # deal with variables separately args_in = { **args_in, "variables": variables, } # # deal with dataset_ids separately # args_in = { # **args_in, # "dataset_ids": dataset_ids, # } if self.kwargs_all["approach"] == "region": reader = source.region(args_in) elif self.kwargs_all["approach"] == "stations": reader = source.stations(args_in) sources.append(reader) self._sources = sources return self._sources @property def dataset_ids(self): """Find dataset_ids for each source/reader. Returns ------- A list of dataset_ids where each entry in the list corresponds to one source/reader, which in turn contains a list of dataset_ids. """ dataset_ids = [] for source in self.sources: dataset_ids.extend(source.dataset_ids) return dataset_ids @property def meta(self): """Find and return metadata for datasets. Returns ------- A list with an entry for each reader. Each entry in the list contains a pandas DataFrames of metadata for that reader. Notes ----- This is done by querying each data source function for metadata and then using the metadata for quick returns. This will not rerun if the metadata has already been found. Different sources have different metadata, though certain attributes are always available. """ if not hasattr(self, "_meta"): # loop over data sources to read in metadata meta = [] for source in self.sources: meta.append(source.meta) # self._meta = meta # merge metadata into one DataFrame self._meta = pd.concat(meta, axis=0, join="outer") return self._meta
[docs] def data_by_dataset(self, dataset_id): """Return the data for a single dataset_id. All available sources are checked (in order) for the dataset. Once a dataset matching dataset_id is found, it is returned. Returns ------- An xarray Dataset Notes ----- Data is read into memory. """ for source in self.sources: if dataset_id in source.dataset_ids: found_data = source[dataset_id] return found_data
@property def data(self, dataset_ids=dataset_ids): """Return the data, given metadata. THIS IS NOW OUTDATED. Notes ----- This is either done in parallel with the `multiprocessing` library or in serial. """ if not hasattr(self, "_data"): # loop over data sources to read in data data = [] for source in self.sources: # import pdb; pdb.set_trace() data.append(source.data) # data.append(source[dataset_ids]) # data.append(source.data(dataset_ids=dataset_ids)) # import pdb; pdb.set_trace() # # make dict from individual dicts # from collections import ChainMap # # data = ChainMap(*[d() for d in data]) self._data = data return self._data
[docs] def qc(self, dataset_ids=None, verbose=False, skip_units=False): """Light quality check on data. This runs one IOOS QARTOD on data as a first order quality check. Only returns data that is quality checked. Requires pint for unit handling. Requires user-input `criteria` and `var_def` to run. This is slow if your data is both chunks of time and space, so this should first narrow by both as much as possible. Parameters ---------- dataset_ids: str, list, optional Read in data for dataset_ids specifically. If none are provided, data will be read in for all `self.keys()`. verbose: boolean, optional If True, report summary statistics on QC flag distribution in datasets. skip_units: boolean, optional If True, do not interpret or alter units and assume the data is in the units described in var_def already. Returns ------- Dataset with added variables for each variable in dataset that was checked, with name of [variable]+'_qc'. Notes ----- Code has been saved for data in DataFrames, but is changing so that data will be in Datasets. This way, can use cf-xarray functionality for custom variable names and easier to have recognizable units for variables with netcdf than csv. """ assertion = ( "Need to have custom criteria and variable information defined to run QC." ) assert self.criteria and self.var_def, assertion if dataset_ids is None: data_ids = ( self.keys() ) # Only return already read-in dataset_ids # self.dataset_ids else: data_ids = dataset_ids if not isinstance(data_ids, list): data_ids = [data_ids] data_out = {} for data_id in data_ids: # access the Dataset dd = self[data_id] # which custom variable names are in dataset # dd_varnames are the variable names in the Dataset dd # cf_varnames are the custom names we can use to refer to the # variables through cf-xarray if isinstance(dd, pd.DataFrame): varnames, cf_varnames = [], [] for var in self.var_def.keys(): try: varname = dd.cf[var].name varnames.append(varname) cf_varnames.append(var) except: pass elif isinstance(dd, xr.Dataset): varnames = [ (cf_xarray.accessor._get_custom_criteria(dd, var), var) for var in self.var_def.keys() if len(cf_xarray.accessor._get_custom_criteria(dd, var)) > 0 ] assert len(varnames) > 0, "no custom names matched in Dataset." if isinstance(dd, pd.DataFrame): dd_varnames = varnames.copy() elif isinstance(dd, xr.Dataset): dd_varnames, cf_varnames = zip(*varnames) dd_varnames = sum(dd_varnames, []) assert len(dd_varnames) == len( cf_varnames ), "looks like multiple variables might have been identified for a custom variable name" # subset to just the boem or requested variables for each df or ds if isinstance(dd, pd.DataFrame): dd2 = dd[list(varnames)] elif isinstance(dd, xr.Dataset): dd2 = dd.cf[cf_varnames] # dd2 = dd[varnames] # equivalent if not skip_units: # Preprocess to change salinity units away from 1e-3 if isinstance(dd, pd.DataFrame): # this replaces units in the 2nd column level of 1e-3 with psu new_levs = [ "psu" if col == "1e-3" else col for col in dd2.columns.levels[1] ] dd2.columns.set_levels(new_levs, level=1, inplace=True) elif isinstance(dd, xr.Dataset): for Var in dd2.data_vars: if ( "units" in dd2[Var].attrs and dd2[Var].attrs["units"] == "1e-3" ): dd2[Var].attrs["units"] = "psu" # run pint quantify on each data structure dd2 = dd2.pint.quantify() # dd2 = dd2.pint.quantify(level=-1) # go through each variable by name to make sure in correct units # have to do this in separate loop so that can dequantify afterward if isinstance(dd, pd.DataFrame): print("NOT IMPLEMENTED FOR DATAFRAME YET") elif isinstance(dd, xr.Dataset): # form of "temp": "degree_Celsius" units_dict = { dd_varname: self.var_def[cf_varname]["units"] for (dd_varname, cf_varname) in zip(dd_varnames, cf_varnames) } # convert to conventional units dd2 = dd2.pint.to(units_dict) dd2 = dd2.pint.dequantify() # now loop for QARTOD on each variable for dd_varname, cf_varname in zip(dd_varnames, cf_varnames): # run QARTOD qc_config = { "qartod": { "gross_range_test": { "fail_span": self.var_def[cf_varname]["fail_span"], "suspect_span": self.var_def[cf_varname]["suspect_span"], }, } } qc = QcConfig(qc_config) qc_results = qc.run(inp=dd2[dd_varname]) # qc_results = qc.run(inp=dd2.cf[cf_varname]) # this isn't working for some reason # put flags into dataset new_qc_var = f"{dd_varname}_qc" if isinstance(dd, pd.DataFrame): dd2[new_qc_var] = qc_results["qartod"]["gross_range_test"] elif isinstance(dd, xr.Dataset): new_data = qc_results["qartod"]["gross_range_test"] dims = dd2[dd_varname].dims dd2[f"{dd_varname}_qc"] = (dims, new_data) data_out[data_id] = dd2 if verbose: for dataset_id, dd in data_out.items(): print(dataset_id) qckeys = dd2[[var for var in dd.data_vars if "_qc" in var]] for qckey in qckeys: print(qckey) for flag, desc in odg.qcdefs.items(): print( f"Flag == {flag} ({desc}): {int((dd[qckey] == int(flag)).sum())}" ) return data_out