"""
This controls and connects to the individual readers.
"""
import ocean_data_gateway as odg
# MAYBE SHOULD BE ABLE TO INITIALIZE THE CLASS WITH ONLY METADATA OR DATASET NAMES?
# to skip looking for the datasets
[docs]class Gateway(object):
"""
Wraps together the individual readers in order to have a single way to
search.
Attributes
----------
kwargs_all: dict
Input keyword arguments that are not specific to one of the readers.
These may include "approach", "parallel", "kw" containing the time and
space region to search for, etc.
kwargs: dict
Keyword arguments that contain specific arguments for the readers.
"""
[docs] def __init__(self, **kwargs):
"""
Parameters
----------
kw: dict
Contains space and time search constraints: `min_lon`, `max_lon`,
`min_lat`, `max_lat`, `min_time`, `max_time`.
approach: string
approach is defined as 'stations' or 'region' depending on user
choice.
parallel: boolean, optional
If True, run with simple parallelization using `multiprocessing`.
If False, run serially. True by default. If input in this manner,
the same value is used for all readers. If input by individual
reader dictionary, the value can vary by reader.
readers: ocean_data_gateway Reader, list of readers, optional
Use this to use fewer than the full set of readers. For example,
`readers=odg.erddap` or to specifically include all by name
`readers = [odg.ErddapReader, odg.axdsReader, odg.localReader]`.
erddap: dict, optional
Dictionary of reader specifications. For example,
`erddap={'known_server': 'ioos'}`. See odg.erddap.ErddapReader for
more input options.
axds: dict, optional
Dictionary of reader specifications. For example,
`axds={'axds_type': 'platform2'}`. See odg.axds.AxdsReader for
more input options.
local: dict, optional
Dictionary of reader specifications. For example,
`local={'filenames': filenames}` for a list of filenames.
See odg.local.LocalReader for more input options.
Notes
-----
To select search variables, input the variable names to each reader
individually in the format `erddap={'variables': [list of variables]}`.
Make sure that the variable names are correct for each individual
reader. Check individual reader docs for more information.
Input keyword arguments that are not specific to one of the readers will be collected in local dictionary kwargs_all. These may include "approach", "parallel", "kw" containing the time and space region to search for, etc.
Input keyword arguments that are specific to readers will be collected
in local dictionary kwargs.
"""
# make sure only known keys are input in kwargs
unknown_keys = set(list(kwargs.keys())) - set(odg.keys_kwargs)
assertion = f"keys into Gateway {unknown_keys} are unknown."
assert len(unknown_keys) == 0, assertion
# set up a dictionary for general input kwargs
exclude_keys = ["erddap", "axds", "local"]
kwargs_all = {
k: kwargs[k] for k in set(list(kwargs.keys())) - set(exclude_keys)
}
self.kwargs_all = kwargs_all
# default approach is region
if "approach" not in self.kwargs_all:
self.kwargs_all["approach"] = "region"
assertion = '`approach` has to be "region" or "stations"'
assert self.kwargs_all["approach"] in ["region", "stations"], assertion
# if "kw" not in self.kwargs_all:
# kw = {
# "min_lon": -124.0,
# "max_lon": -122.0,
# "min_lat": 38.0,
# "max_lat": 40.0,
# "min_time": "2021-4-1",
# "max_time": "2021-4-2",
# }
# self.kwargs_all["kw"] = kw
self.kwargs = kwargs
self.sources
@property
def sources(self):
"""Set up data sources (readers).
Notes
-----
All readers are included by default (readers as listed in odg._SOURCES). See
__init__ for options.
"""
if not hasattr(self, "_sources"):
# allow user to override what readers to use
if "readers" in self.kwargs_all.keys():
SOURCES = self.kwargs_all["readers"]
if not isinstance(SOURCES, list):
SOURCES = [SOURCES]
else:
SOURCES = odg._SOURCES
# loop over data sources to set them up
sources = []
for source in SOURCES:
# print(source.reader)
# in case of important options for readers
# but the built in options are ignored for a reader
# if one is input in kwargs[source.reader]
if source.reader in odg.OPTIONS.keys():
reader_options = odg.OPTIONS[source.reader]
reader_key = list(reader_options.keys())[0]
# if the user input their own option for this, use it instead
# this makes it loop once
if (source.reader in self.kwargs.keys()) and (
reader_key in self.kwargs[source.reader]
):
# reader_values = [None]
reader_values = self.kwargs[source.reader][reader_key]
else:
reader_values = list(reader_options.values())[0]
else:
reader_key = None
# this is to make it loop once for cases without
# extra options like localReader
reader_values = [None]
if not isinstance(reader_values, list):
reader_values = [reader_values]
# catch if the user is putting in a set of variables to match
# the set of reader options
if (source.reader in self.kwargs) and (
"variables" in self.kwargs[source.reader]
):
variables_values = self.kwargs[source.reader]["variables"]
if not isinstance(variables_values, list):
variables_values = [variables_values]
# if len(reader_values) == variables_values:
# variables_values
else:
variables_values = [None] * len(reader_values)
# catch if the user is putting in a set of dataset_ids to match
# the set of reader options
if (source.reader in self.kwargs) and (
"dataset_ids" in self.kwargs[source.reader]
):
dataset_ids_values = self.kwargs[source.reader]["dataset_ids"]
if not isinstance(dataset_ids_values, list):
dataset_ids_values = [dataset_ids_values]
# if len(reader_values) == variables_values:
# variables_values
else:
dataset_ids_values = [None] * len(reader_values)
for option, variables, dataset_ids in zip(
reader_values, variables_values, dataset_ids_values
):
# setup reader with kwargs for that reader
# prioritize input kwargs over default args
# NEED TO INCLUDE kwargs that go to all the readers
args = {}
args_in = {
**args,
**self.kwargs_all,
# reader_key: option,
# **self.kwargs[source.reader],
}
if source.reader in self.kwargs.keys():
args_in = {
**args_in,
**self.kwargs[source.reader],
}
args_in = {**args_in, reader_key: option}
# deal with variables separately
args_in = {
**args_in,
"variables": variables,
}
# deal with dataset_ids separately
args_in = {
**args_in,
"dataset_ids": dataset_ids,
}
if self.kwargs_all["approach"] == "region":
reader = source.region(args_in)
elif self.kwargs_all["approach"] == "stations":
reader = source.stations(args_in)
sources.append(reader)
self._sources = sources
return self._sources
@property
def dataset_ids(self):
"""Find dataset_ids for each source/reader.
Returns
-------
A list of dataset_ids where each entry in the list corresponds to one source/reader, which in turn contains a list of dataset_ids.
"""
if not hasattr(self, "_dataset_ids"):
dataset_ids = []
for source in self.sources:
dataset_ids.append(source.dataset_ids)
self._dataset_ids = dataset_ids
return self._dataset_ids
@property
def meta(self):
"""Find and return metadata for datasets.
Returns
-------
A list with an entry for each reader. Each entry in the list contains a pandas DataFrames of metadata for that reader.
Notes
-----
This is done by querying each data source function for metadata and
then using the metadata for quick returns.
This will not rerun if the metadata has already been found.
Different sources have different metadata, though certain attributes
are always available.
TO DO: SEPARATE DATASOURCE FUNCTIONS INTO A PART THAT RETRIEVES THE
DATASET_IDS AND METADATA AND A PART THAT READS IN THE DATA.
"""
if not hasattr(self, "_meta"):
# loop over data sources to read in metadata
meta = []
for source in self.sources:
meta.append(source.meta)
self._meta = meta
return self._meta
@property
def data(self):
"""Return the data, given metadata.
Notes
-----
This is either done in parallel with the `multiprocessing` library or
in serial.
"""
if not hasattr(self, "_data"):
# loop over data sources to read in data
data = []
for source in self.sources:
data.append(source.data)
self._data = data
return self._data