"""
Reader for Axiom databases.
"""
import hashlib
import logging
import os
import cf_xarray
import fsspec
import intake
import numpy as np
import pandas as pd
import requests
import xarray as xr
import ocean_data_gateway as odg
from ocean_data_gateway import Reader
logger = logging.getLogger(__name__)
# this can be queried with
# search.AxdsReader.reader
reader = "axds"
class AxdsReader(Reader):
"""
This class searches Axiom databases for types `platforms2`, which
are like gliders, and `layer_group`, which are like grids and models.
Attributes
----------
parallel: boolean
If True, run with simple parallelization using `multiprocessing`.
If False, run serially.
catalog_name: string
Input catalog path if you want to use an existing catalog.
axds_type: string
Which Axiom database type to search for.
* "platform2" (default): gliders, drifters; result in pandas DataFrames
* "layer_group": grids, model output; result in xarray Datasets
url_search_base: string
Base string of search url
url_docs_base: string
Base string of url for a known dataset_id
search_headers: dict
Required for reading in the request
url_axds_type: string
Url for the given `axds_type`.
name: string
f'axds_{axds_type}' so 'axds_platform2' or 'axds_layer_group'
reader: string
Reader name: AxdsReader
"""
def __init__(
self, parallel=True, catalog_name=None, axds_type="platform2", filetype="netcdf"
):
"""
Parameters
----------
parallel: boolean, optional
If True, run with simple parallelization using `multiprocessing`.
If False, run serially.
catalog_name: string, optional
Input catalog path if you want to use an existing catalog.
axds_type: string, optional
Which Axiom database type to search for.
* "platform2" (default): gliders, drifters; result in pandas DataFrames
* "layer_group": grids, model output; result in xarray Datasets
"""
self.parallel = parallel
# search Axiom database, version 2
self.url_search_base = "https://search.axds.co/v2/search?portalId=-1&page=1&pageSize=10000&verbose=true"
self.url_docs_base = "https://search.axds.co/v2/docs?verbose=true"
# this is the json being returned from the request
self.search_headers = {"Accept": "application/json"}
self.approach = None
if catalog_name is None:
name = f"{pd.Timestamp.now().isoformat()}"
hash_name = hashlib.sha256(name.encode()).hexdigest()[:7]
self.catalog_name = odg.catalogs_path.joinpath(f"catalog_{hash_name}.yml")
else:
self.catalog_name = catalog_name
# if catalog_name already exists, read it in to save time
self.catalog
# can be 'platform2' or 'layer_group'
assert axds_type in [
"platform2",
"layer_group",
], 'variable `axds_type` must be "platform2" or "layer_group"'
self.axds_type = axds_type
self.url_axds_type = f"{self.url_search_base}&type={self.axds_type}"
self.name = f"axds_{axds_type}"
self.reader = "AxdsReader"
if self.axds_type == "platform2":
self.data_type = "csv"
elif self.axds_type == "layer_group":
self.data_type = "nc"
# name
self.name = f"axds_{axds_type}"
self.reader = "AxdsReader"
self.filetype = filetype
self.store = dict()
def __getitem__(self, key):
"""Redefinition of dict-like behavior.
This enables user to use syntax `reader[dataset_id]` to read in and
save dataset into the object.
Parameters
----------
key: str
dataset_id for a dataset that is available in the search/reader
object.
Returns
-------
xarray Dataset of the data associated with key
"""
returned_data = self.data_by_dataset(key)
# returned_data = self._return_data(key)
self.__setitem__(key, returned_data)
return returned_data
def url_query(self, query):
"""url modification to add query field.
Parameters
----------
query: string
String to query for. Can be multiple words.
Returns
-------
Modification for url to add query field.
"""
return f"&query={query}"
def url_variable(self, variable):
"""url modification to add variable search.
Parameters
----------
variable: string
String to search for.
Returns
-------
Modification for url to add variable search.
Notes
-----
This variable search is specifically by parameter group and
only works for `axds_type='platform2'`.
For `axds_type='layer_group'`, use `url_query` with the variable name.
"""
return f"&tag=Parameter+Group:{variable}"
def url_region(self):
"""url modification to add spatial search box.
Returns
-------
Modification for url to add lon/lat filtering.
Notes
-----
Uses the `kw` dictionary already stored in the class object
to access the spatial limits of the box.
"""
url_add_box = (
f'&geom={{"type":"Polygon","coordinates":[[[{self.kw["min_lon"]},{self.kw["min_lat"]}],'
+ f'[{self.kw["max_lon"]},{self.kw["min_lat"]}],'
+ f'[{self.kw["max_lon"]},{self.kw["max_lat"]}],'
+ f'[{self.kw["min_lon"]},{self.kw["max_lat"]}],'
+ f'[{self.kw["min_lon"]},{self.kw["min_lat"]}]]]}}'
)
return f"{url_add_box}"
def url_time(self):
"""url modification to add time filtering.
Returns
-------
Modification for url to add time filtering.
Notes
-----
Uses the `kw` dictionary already stored in the class object
to access the time limits of the search.
"""
# convert input datetime to seconds since 1970
startDateTime = (
pd.Timestamp(self.kw["min_time"]).tz_localize("UTC")
- pd.Timestamp("1970-01-01 00:00").tz_localize("UTC")
) // pd.Timedelta("1s")
endDateTime = (
pd.Timestamp(self.kw["max_time"]).tz_localize("UTC")
- pd.Timestamp("1970-01-01 00:00").tz_localize("UTC")
) // pd.Timedelta("1s")
# search by time
url_add_time = f"&startDateTime={startDateTime}&endDateTime={endDateTime}"
return f"{url_add_time}"
def url_dataset_id(self, dataset_id):
"""url modification to search for known dataset_id.
Parameters
----------
dataset_id: string
String of dataset_id to exactly match.
Returns
-------
Modification for url to search for dataset_id.
"""
return f"&id={dataset_id}"
def url_builder(
self,
url_base,
dataset_id=None,
add_region=False,
add_time=False,
variable=None,
query=None,
):
"""Build an individual search url.
Parameters
----------
url_base: string
There are 2 possible bases for the url:
* self.url_axds_type, for searching
* self.url_docs_base, for selecting known dataset by dataset_id
dataset_id: string, optional
dataset_id of station, if known.
add_region: boolean, optional
True to filter the search by lon/lat box. Requires self.kw
that contains keys `min_lon`, `max_lon`, `min_lat`, `max_lat`.
add_time: boolean, optional
True to filter the search by time range. Requires self.kw
that contains keys `min_time` and `max_time`.
variable: string, optional
String of variable description to filter by, if desired.
If `axds_type=='platform2'`, find the variable name options with
class function `odg.all_variables('axds')`, search for variable names by
string with `odg.search_variables('axds', variables)`, and check your variable list with
`check_variables('axds', variables)`.
If `axds_type=='layer_group'`, there is no official variable list
and you can instead just put in a basic variable name and hope the
search works.
query: string, optional
This could be any search query you want, but it is used in the code
to search for station names (not dataset_ids).
Returns
-------
Url for search.
"""
url = url_base
if dataset_id is not None:
url += self.url_dataset_id(dataset_id)
if add_time:
url += self.url_time()
if variable is not None:
if self.axds_type == "platform2":
url += self.url_variable(variable)
elif self.axds_type == "layer_group":
url += self.url_query(variable)
if add_region:
url += self.url_region()
if query is not None:
url += self.url_query(query)
return url
@property
def urls(self):
"""Return a list of search urls.
Notes
-----
Use this through the class methods `region` or `stations` to put
together the search urls to represent the basic reader setup.
"""
assert (
self.approach is not None
), "Use this property through class method `region` or `stations`"
if not hasattr(self, "_urls"):
if self.approach == "region":
urls = []
if self.variables is not None:
for variable in self.variables:
urls.append(
self.url_builder(
self.url_axds_type,
variable=variable,
add_time=True,
add_region=True,
)
)
else:
urls.append(
self.url_builder(
self.url_axds_type, add_time=True, add_region=True
)
)
elif self.approach == "stations":
urls = []
# check station names as both queries and as exact names
if len(self._stations) > 0:
for station in self._stations:
urls.append(self.url_builder(self.url_axds_type, query=station))
urls.append(
self.url_builder(self.url_docs_base, dataset_id=station)
)
self._urls = urls
return self._urls
@property
def search_results(self):
"""Loop over self.urls to read in search results.
Notes
-----
The logic removes duplicate searches.
This returns a dict of the datasets from the search results with the
key of each entry being the dataset_id. For
* `self.axds_type == "platform2"`: dataset_id is the uuid
* `self.axds_type == "layer_group"`: dataset_id is the module_uuid since multiple layer_groups can be linked under one module_uuid
"""
if not hasattr(self, "_search_results"):
# loop over urls
search_results = []
for url in self.urls:
# first make sure is legitimate web address
if requests.get(url).status_code == 200:
res = requests.get(url, headers=self.search_headers).json()
# get different returns for an id docs grab vs. generic search
# if isinstance(res, list):
# res = res[0]
if isinstance(res, dict):
res = res["results"]
search_results.extend(res)
# change search_results to a dictionary to remove
# duplicate dataset_ids
search_results_dict = {}
for search_result in search_results:
if self.axds_type == "platform2":
search_results_dict[search_result["uuid"]] = search_result
# search_results_dict[search_result['data']['uuid']] = search_result
if self.axds_type == "layer_group":
# this is in the case that our search results are for a layer_group
if ("module_uuid" in search_result["data"]) and (
search_result["type"] == "layer_group"
):
# switch to module search results instead of layer_group results
module_uuid = search_result["data"]["module_uuid"]
# this is the case that our searcb results are for a module
elif search_result["type"] == "module":
module_uuid = search_result["data"]["uuid"]
# don't repeat unnecessarily, if module_uuid has already
# been included.
if module_uuid in search_results_dict.keys():
continue
else:
url_module = self.url_builder(
self.url_docs_base, dataset_id=module_uuid
)
search_results_dict[module_uuid] = requests.get(
url_module, headers=self.search_headers
).json()[0]
condition = search_results_dict == {}
assertion = f"No datasets fit the input criteria of kw={self.kw} and variables={self.variables}"
# assert condition, assertion
if condition:
logger.warning(assertion)
# self._dataset_ids = []
# DON'T SAVE THIS LATER, JUST FOR DEBUGGING
self._search_results = search_results_dict
# self._dataset_ids = list(search_results_dict.keys())
return self._search_results
def write_catalog_layer_group_entry(
self, dataset, dataset_id, urlpath, layer_groups
):
"""Write part of catalog in case of layer_group.
Notes
-----
This is used to manage the logic for `axds_type='layer_group'` in which
the module is being linked to the set of layer_groups.
"""
try:
model_slug = dataset["data"]["model"]["slug"]
except:
model_slug = ""
# these are from the module
try:
label = dataset["label"].replace(":", "-")
except:
label = dataset["data"]["short_description"]
geospatial_lat_min, geospatial_lat_max = (
dataset["data"]["min_lat"],
dataset["data"]["max_lat"],
)
geospatial_lon_min, geospatial_lon_max = (
dataset["data"]["min_lng"],
dataset["data"]["max_lng"],
)
# set up lines
file_intake = intake.open_opendap(
urlpath, engine="netcdf4", xarray_kwargs=dict()
)
file_intake.description = label
file_intake.engine = "netcdf4"
metadata = {
"urlpath": urlpath,
"variables": list(layer_groups.values()),
"layer_group_uuids": list(layer_groups.keys()),
"model_slug": model_slug,
"geospatial_lon_min": geospatial_lon_min,
"geospatial_lat_min": geospatial_lat_min,
"geospatial_lon_max": geospatial_lon_max,
"geospatial_lat_max": geospatial_lat_max,
"time_coverage_start": dataset["start_date_time"],
"time_coverage_end": dataset["end_date_time"],
}
file_intake.metadata = metadata
file_intake.name = dataset_id
lines = file_intake.yaml().strip("sources:")
return lines
def write_catalog(self):
"""Write catalog file."""
# if the catalog already exists, don't do this
if os.path.exists(self.catalog_name):
return
else:
f = open(self.catalog_name, "w")
if self.axds_type == "platform2":
lines = "sources:\n"
for dataset_id, dataset in self.search_results.items():
if self.filetype == "csv":
urlpath = dataset["source"]["files"]["data.csv.gz"]["url"]
file_intake = intake.open_csv(
urlpath, csv_kwargs=dict(parse_dates=["time"])
)
elif self.filetype == "netcdf":
key = [
key
for key in dataset["source"]["files"].keys()
if ".nc" in key
][0]
urlpath = dataset["source"]["files"][key]["url"]
file_intake = intake.open_netcdf(
urlpath
) # , xarray_kwargs=dict(parse_dates=['time']))
# to get all metadata
# source = intake.open_textfiles(meta_url, decoder=json.loads)
# source.metadata = source.read()[0]
meta_url = dataset["source"]["files"]["meta.json"]["url"]
meta_url = meta_url.replace(" ", "%20")
attributes = pd.read_json(meta_url)["attributes"]
file_intake.description = attributes["summary"]
metadata = {
"urlpath": urlpath,
"meta_url": meta_url,
"platform_category": attributes["platform_category"],
"geospatial_lon_min": attributes["geospatial_lon_min"],
"geospatial_lat_min": attributes["geospatial_lat_min"],
"geospatial_lon_max": attributes["geospatial_lon_max"],
"geospatial_lat_max": attributes["geospatial_lat_max"],
"source_id": attributes["packrat_source_id"],
"packrat_uuid": attributes["packrat_uuid"],
"time_coverage_start": attributes["time_coverage_start"],
"time_coverage_end": attributes["time_coverage_end"],
}
file_intake.metadata = metadata
file_intake.name = attributes["packrat_uuid"]
lines += file_intake.yaml().strip("sources:")
elif self.axds_type == "layer_group":
lines = """
plugins:
source:
- module: intake_xarray
sources:
"""
# catalog entries are by module uuid and unique to opendap urls
# dataset_ids are module uuids
for dataset_id, dataset in self.search_results.items():
# layer_groups associated with module
layer_groups = dataset["data"]["layer_group_info"]
# get search results for layer_groups
urlpaths = []
for layer_group_uuid in layer_groups.keys():
url_layer_group = self.url_builder(
self.url_docs_base, dataset_id=layer_group_uuid
)
search_results_lg = requests.get(
url_layer_group, headers=self.search_headers
).json()[0]
if "OPENDAP" in search_results_lg["data"]["access_methods"]:
url = search_results_lg["source"]["layers"][0][
"thredds_opendap_url"
]
if ".html" in url:
url = url.replace(".html", "")
urlpaths.append(url)
else:
urlpaths.append("")
logger.warning(
f"no opendap url for module: module uuid {dataset_id}, layer_group uuid {layer_group_uuid}"
)
continue
# there may be different urls for different layer_groups
# in which case associate the layer_group uuid with the dataset
# since the module uuid wouldn't be unique
# if there were no urlpaths for any of the layer_groups,
# urlpaths is like ['', '', '', '', '', '', '', '']
if len(set(urlpaths)) > 1:
logger.warning(
f"there are multiple urls for module: module uuid {dataset_id}. urls: {set(urlpaths)}"
)
for urlpath, layer_group_uuid in zip(
urlpaths, layer_groups.keys()
):
lines += self.write_catalog_layer_group_entry(
dataset, layer_group_uuid, urlpath, layer_groups
)
# check for when no urlpaths, don't save entry
# if not opendap accessible
elif set(urlpaths) == {""}:
logger.warning(
f"no opendap url for module: module uuid {dataset_id} for any of its layer_groups. Do not include entry in catalog."
)
continue
else:
urlpath = list(set(urlpaths))[0]
# use module uuid
lines += self.write_catalog_layer_group_entry(
dataset, dataset_id, urlpath, layer_groups
)
f.write(lines)
f.close()
@property
def catalog(self):
"""Write then open the catalog."""
if not hasattr(self, "_catalog"):
self.write_catalog()
# if we already know there aren't any dataset_ids
# don't try to read catalog
if not self.search_results == {}:
catalog = intake.open_catalog(self.catalog_name)
else:
catalog = None
self._catalog = catalog
return self._catalog
@property
def dataset_ids(self):
"""Find dataset_ids for server.
Notes
-----
The dataset_ids are read from the catalog, so the catalog is created
before this can happen.
The number of dataset_ids can change if a variable is removed from the
list of variables and this is rerun.
"""
if not hasattr(self, "_dataset_ids") or (
self.variables and (len(self.variables) != self.num_variables)
):
if self.catalog is not None:
self._dataset_ids = list(self.catalog)
else:
self._dataset_ids = []
# update number of variables
if self.variables:
self.num_variables = len(self.variables)
return self._dataset_ids
def meta_by_dataset(self, dataset_id):
"""Return the catalog metadata for a single dataset_id.
TO DO: Should this return intake-style or a row of the metadata dataframe?
"""
return self.catalog[dataset_id]
@property
def meta(self):
"""Rearrange the individual metadata into a dataframe."""
if not hasattr(self, "_meta"):
data = []
for dataset_id in self.dataset_ids:
meta = self.meta_by_dataset(dataset_id)
columns = ["download_url"] + list(
meta.metadata.keys()
) # this only needs to be set once
data.append([meta.urlpath] + list(meta.metadata.values()))
if len(self.dataset_ids) > 0:
self._meta = pd.DataFrame(
index=self.dataset_ids, columns=columns, data=data
)
else:
self._meta = None
return self._meta
def data_by_dataset(self, dataset_id):
"""Return the data for a single dataset_id.
Returns
-------
A tuple of (dataset_id, data), where data type depends on `self.axds_type`:
If `self.axds_type=='platform2'`: a pandas DataFrame
If `self.axds_type=='layer_group'`: an xarray Dataset
Notes
-----
Read behavior depends on `axds_type`:
* If `self.axds_type=='platform2'`: data is read into memory with dask.
* If `self.axds_type=='layer_group'`: data is pointed to with dask but
nothing is read in except metadata associated with the xarray Dataset.
"""
if self.axds_type == "platform2":
if self.filetype == "csv":
# read units from metadata variable meta_url for columns
variables = pd.read_json(self.meta.loc[dataset_id]["meta_url"])[
"variables"
]
# .to_dask().compute() seems faster than read but
# should do more comparisons
data = self.catalog[dataset_id].to_dask().compute()
data = data.set_index("time")
data = data[self.kw["min_time"] : self.kw["max_time"]]
units = []
for col in data.columns:
try:
units.append(variables.loc[col]["attributes"]["units"])
except:
units.append("")
# add units to 2nd header row
data.columns = pd.MultiIndex.from_tuples(zip(data.columns, units))
elif self.filetype == "netcdf":
# this downloads the http-served file to cache I think
download_url = self.catalog[dataset_id].urlpath
infile = fsspec.open(f"simplecache::{download_url}")
data = xr.open_dataset(infile.open()) # , engine='h5netcdf')
# we need 'time' as a dimension for the subsequent line to work
dim = [
dim for dim, size in data.dims.items() if size == data.cf["T"].size
]
if len(dim) > 0:
data = data.swap_dims({dim[0]: data.cf["T"].name})
# .swap_dims({"profile": "time"})
# filter by time
data = data.cf.sel(T=slice(self.kw["min_time"], self.kw["max_time"]))
elif self.axds_type == "layer_group":
if self.catalog[dataset_id].urlpath is not None:
try:
data = self.catalog[dataset_id].to_dask()
# preprocess to avoid a sometimes-problem:
# try to fix key error assuming it is the following problem:
# KeyError: "cannot represent labeled-based slice indexer for dimension 'time' with a slice over integer positions; the index is unsorted or non-unique"
try:
_, index = np.unique(data.cf["T"], return_index=True)
data = data.cf.isel(T=index)
# filter by time
data = data.cf.sel(
T=slice(self.kw["min_time"], self.kw["max_time"])
)
except KeyError as e:
logger.exception(e)
logger.warning("Could not subset in time.")
pass
except Exception as e:
logger.exception(e)
logger.warning(
f"data was not read in for dataset_id {dataset_id} with url path {self.catalog[dataset_id].urlpath} and description {self.catalog[dataset_id].description}."
)
data = None
# return (dataset_id, data)
return data
# @property
def data(self, dataset_ids=None):
"""Read in data for some or all dataset_ids.
NOT USED CURRENTLY
Once data is read in for a dataset_ids, it is remembered.
See full documentation in `utils.load_data()`.
"""
output = odg.utils.load_data(self, dataset_ids)
return output
def save(self):
"""Save datasets locally."""
for dataset_id, data in self.data().items():
# dataframe
if self.data_type == "csv":
filename = (
f'{dataset_id}_{self.kw["min_time"]}_{self.kw["max_time"]}.csv.gz'
)
path_file = odg.path_files.joinpath(filename)
data.to_csv(path_file)
# dataset
elif self.data_type == "nc":
filename = (
f'{dataset_id}_{self.kw["min_time"]}_{self.kw["max_time"]}.nc'
)
path_file = odg.path_files.joinpath(filename)
data.to_netcdf(path_file)
[docs]class region(AxdsReader):
"""Inherits from AxdsReader to search over a region of space and time.
Attributes
----------
kw: dict
Contains space and time search constraints: `min_lon`, `max_lon`,
`min_lat`, `max_lat`, `min_time`, `max_time`.
variables: string or list
Variable names if you want to limit the search to those. There is
different behavior depending on `axds_type`:
* 'platform2': the variable name or names must be from the list available in `odg.all_variables('axds')` and pass the check in `odg.check_variables('axds', variables)`.
* 'layer_group': the variable name or names will be searched for as a query so just do your best with the names and experiment.
Alternatively, if the user inputs criteria, variables can be a
list of the keys from criteria.
criteria: dict, str, optional
A dictionary describing how to recognize variables by their name
and attributes with regular expressions to be used with
`cf-xarray`. It can be local or a URL point to a nonlocal gist.
This is required for running QC in Gateway. For example:
>>> my_custom_criteria = {"salt": {
... "standard_name": "sea_water_salinity$|sea_water_practical_salinity$",
... "name": (?i)sal$|(?i)s.sea_water_practical_salinity$"}}
var_def: dict, optional
A dictionary with the same keys as criteria (criteria can have
more) that describes QC definitions and units. It should include
the variable units, fail_span, and suspect_span. For example:
>>> var_def = {"salt": {"units": "psu",
... "fail_span": [-10, 60], "suspect_span": [-1, 45]}}
approach: string
approach is defined as 'region' for this class.
num_variables: int
Number of variables stored in self.variables. This is set initially and
if self.variables is modified, this is updated accordingly. If
`variables is None`, `num_variables==0`.
"""
[docs] def __init__(self, kwargs):
"""
Parameters
----------
kwargs: dict
Can contain arguments to pass onto the base AxdsReader class
(catalog_name, parallel, axds_type). The dict entries to initialize
this class are:
* kw: dict
Contains space and time search constraints: `min_lon`, `max_lon`, `min_lat`, `max_lat`, `min_time`, `max_time`.
* variables: string or list, optional
Variable names if you want to limit the search to those. There is
different behavior depending on `axds_type`:
* 'platform2': the variable name or names must be from the list available in `odg.all_variables('axds')` and pass the check in
`odg.check_variables('axds', variables)`.
* 'layer_group': the variable name or names will be searched for
as a query so just do your best with the names and experiment.
Alternatively, if the user inputs criteria, variables can be a
list of the keys from criteria.
* criteria: dict, optional
A dictionary describing how to recognize variables by their name
and attributes with regular expressions to be used with
`cf-xarray`. It can be local or a URL point to a nonlocal gist.
This is required for running QC in Gateway. For example:
>>> my_custom_criteria = {"salt": {
... "standard_name": "sea_water_salinity$|sea_water_practical_salinity$",
... "name": (?i)sal$|(?i)s.sea_water_practical_salinity$"}}
* var_def: dict, optional
A dictionary with the same keys as criteria (criteria can have
more) that describes QC definitions and units. It should include
the variable units, fail_span, and suspect_span. For example:
>>> var_def = {"salt": {"units": "psu",
... "fail_span": [-10, 60], "suspect_span": [-1, 45]}}
"""
assert isinstance(kwargs, dict), "input arguments as dictionary"
ax_kwargs = {
"catalog_name": kwargs.get("catalog_name", None),
"parallel": kwargs.get("parallel", True),
"axds_type": kwargs.get("axds_type", "platform2"),
}
AxdsReader.__init__(self, **ax_kwargs)
kw = kwargs["kw"]
variables = kwargs.get("variables", None)
self.approach = "region"
self._stations = None
# run checks for KW
# check for lon/lat values and time
self.kw = kw
# check for custom criteria to set up cf-xarray
if "criteria" in kwargs:
criteria = kwargs["criteria"]
# link to nonlocal dictionary definition
if isinstance(criteria, str) and criteria[:4] == "http":
criteria = odg.return_response(criteria)
cf_xarray.set_options(custom_criteria=criteria)
self.criteria = criteria
else:
self.criteria = None
if (variables is not None) and (not isinstance(variables, list)):
variables = [variables]
# make sure variables are on parameter list if platform2
if (variables is not None) and (self.axds_type == "platform2"):
# User is using criteria and variable nickname approach
if self.criteria and all(var in self.criteria for var in variables):
variables = odg.select_variables("axds", self.criteria, variables)
# user is inputting specific reader variable names
else:
odg.check_variables("axds", variables)
# record the number of variables so that a user can change it and
# the change can be compared.
self.num_variables = len(variables)
else:
self.num_variables = 0
self.variables = variables
[docs]class stations(AxdsReader):
"""Inherits from AxdsReader to search for 1+ stations or dataset_ids.
Attributes
----------
kw: dict
Contains time search constraints: `min_time`, `max_time`.
If not input, all time will be used.
variables: None
variables is None for this class since we read search by dataset_id or
station name.
approach: string
approach is defined as 'stations' for this class.
"""
[docs] def __init__(self, kwargs):
"""
Parameters
----------
kwargs: dict
Can contain arguments to pass onto the base AxdsReader class
(catalog_name, parallel, axds_type). The dict entries to initialize
this class are:
* kw: dict, optional
Contains time search constraints: `min_time`, `max_time`.
If not input, all time will be used.
* stations: string, list, optional
Input station names as they might be commonly known and therefore
can be searched for as a query term. The station names can be
input as something like "TABS B" or "8771972" and has pretty good
success.
Or, use this option if you know the exact dataset_ids for
the data you want and `axds_type=='platform2'`. These need to be
the dataset_ids corresponding to the databases that are being
searched, so in this case they need to be the Axiom packrat
uuid's.
If `axds_type=='layer_group'`, input the layer_group uuids you
want to search for.
Notes
-----
The axds_type needs to match the station name or dataset_id you are
searching for.
"""
assert isinstance(kwargs, dict), "input arguments as dictionary"
ax_kwargs = {
"catalog_name": kwargs.get("catalog_name", None),
"parallel": kwargs.get("parallel", True),
"axds_type": kwargs.get("axds_type", "platform2"),
}
# this inherits AxdsReader's attributes and functions into self
AxdsReader.__init__(self, **ax_kwargs)
kw = kwargs.get("kw", None)
stations = kwargs.get("stations", [])
self.approach = "stations"
# I think this isn't true anymore.
# if self.axds_type == "layer_group":
# assertion = 'Input "layer_group" (not module) uuids as station names, not dataset_ids.'
# assert dataset_ids is None, assertion
if not stations == []:
if not isinstance(stations, list):
stations = [stations]
self._stations = stations
self.variables = None
# CHECK FOR KW VALUES AS TIMES
if kw is None:
kw = {"min_time": "1900-01-01", "max_time": "2100-12-31"}
self.kw = kw