Source code for ocean_data_gateway.readers.axds

"""
Reader for Axiom databases.
"""

import hashlib
import logging
import multiprocessing
import os
import re

import intake
import numpy as np
import pandas as pd
import requests
import shapely.wkt

from joblib import Parallel, delayed

import ocean_data_gateway as odg


logger = logging.getLogger(__name__)

# this can be queried with
# search.AxdsReader.reader
reader = "axds"


class AxdsReader:
    """
    This class searches Axiom databases for types `platforms2`, which
    are like gliders, and `layer_group`, which are like grids and models.

    Attributes
    ----------
    parallel: boolean
        If True, run with simple parallelization using `multiprocessing`.
        If False, run serially.
    catalog_name: string
        Input catalog path if you want to use an existing catalog.
    axds_type: string
        Which Axiom database type to search for.
        * "platform2" (default): gliders, drifters; result in pandas DataFrames
        * "layer_group": grids, model output; result in xarray Datasets
    url_search_base: string
        Base string of search url
    url_docs_base: string
        Base string of url for a known dataset_id
    search_headers: dict
        Required for reading in the request
    url_axds_type: string
        Url for the given `axds_type`.
    name: string
        f'axds_{axds_type}' so 'axds_platform2' or 'axds_layer_group'
    reader: string
        Reader name: AxdsReader
    """

    def __init__(self, parallel=True, catalog_name=None, axds_type="platform2"):
        """
        Parameters
        ----------
        parallel: boolean, optional
            If True, run with simple parallelization using `multiprocessing`.
            If False, run serially.
        catalog_name: string, optional
            Input catalog path if you want to use an existing catalog.
        axds_type: string, optional
            Which Axiom database type to search for.
            * "platform2" (default): gliders, drifters; result in pandas DataFrames
            * "layer_group": grids, model output; result in xarray Datasets
        """

        self.parallel = parallel

        # search Axiom database, version 2
        self.url_search_base = "https://search.axds.co/v2/search?portalId=-1&page=1&pageSize=10000&verbose=true"
        self.url_docs_base = "https://search.axds.co/v2/docs?verbose=true"

        # this is the json being returned from the request
        self.search_headers = {"Accept": "application/json"}

        self.approach = None

        if catalog_name is None:
            name = f"{pd.Timestamp.now().isoformat()}"
            hash_name = hashlib.sha256(name.encode()).hexdigest()[:7]
            self.catalog_name = odg.catalogs_path.joinpath(f"catalog_{hash_name}.yml")
        else:
            self.catalog_name = catalog_name
            # if catalog_name already exists, read it in to save time
            self.catalog

        # can be 'platform2' or 'layer_group'
        assert axds_type in [
            "platform2",
            "layer_group",
        ], 'variable `axds_type` must be "platform2" or "layer_group"'
        self.axds_type = axds_type

        self.url_axds_type = f"{self.url_search_base}&type={self.axds_type}"
        self.name = f"axds_{axds_type}"
        self.reader = "AxdsReader"

        if self.axds_type == "platform2":
            self.data_type = "csv"
        elif self.axds_type == "layer_group":
            self.data_type = "nc"

        # name
        self.name = f"axds_{axds_type}"

        self.reader = "AxdsReader"

    def url_query(self, query):
        """url modification to add query field.

        Parameters
        ----------
        query: string
            String to query for. Can be multiple words.

        Returns
        -------
        Modification for url to add query field.
        """
        return f"&query={query}"

    def url_variable(self, variable):
        """url modification to add variable search.

        Parameters
        ----------
        variable: string
            String to search for.

        Returns
        -------
        Modification for url to add variable search.

        Notes
        -----
        This variable search is specifically by parameter group and
        only works for `axds_type='platform2'`.
        For `axds_type='layer_group'`, use `url_query` with the variable name.
        """
        return f"&tag=Parameter+Group:{variable}"

    def url_region(self):
        """url modification to add spatial search box.

        Returns
        -------
        Modification for url to add lon/lat filtering.

        Notes
        -----
        Uses the `kw` dictionary already stored in the class object
        to access the spatial limits of the box.
        """
        url_add_box = (
            f'&geom={{"type":"Polygon","coordinates":[[[{self.kw["min_lon"]},{self.kw["min_lat"]}],'
            + f'[{self.kw["max_lon"]},{self.kw["min_lat"]}],'
            + f'[{self.kw["max_lon"]},{self.kw["max_lat"]}],'
            + f'[{self.kw["min_lon"]},{self.kw["max_lat"]}],'
            + f'[{self.kw["min_lon"]},{self.kw["min_lat"]}]]]}}'
        )
        return f"{url_add_box}"

    def url_time(self):
        """url modification to add time filtering.

        Returns
        -------
        Modification for url to add time filtering.

        Notes
        -----
        Uses the `kw` dictionary already stored in the class object
        to access the time limits of the search.
        """
        # convert input datetime to seconds since 1970
        startDateTime = (
            pd.Timestamp(self.kw["min_time"]).tz_localize("UTC")
            - pd.Timestamp("1970-01-01 00:00").tz_localize("UTC")
        ) // pd.Timedelta("1s")
        endDateTime = (
            pd.Timestamp(self.kw["max_time"]).tz_localize("UTC")
            - pd.Timestamp("1970-01-01 00:00").tz_localize("UTC")
        ) // pd.Timedelta("1s")

        # search by time
        url_add_time = f"&startDateTime={startDateTime}&endDateTime={endDateTime}"

        return f"{url_add_time}"

    def url_dataset_id(self, dataset_id):
        """url modification to search for known dataset_id.

        Parameters
        ----------
        dataset_id: string
            String of dataset_id to exactly match.

        Returns
        -------
        Modification for url to search for dataset_id.
        """
        return f"&id={dataset_id}"

    def url_builder(
        self,
        url_base,
        dataset_id=None,
        add_region=False,
        add_time=False,
        variable=None,
        query=None,
    ):
        """Build an individual search url.

        Parameters
        ----------
        url_base: string
            There are 2 possible bases for the url:
            * self.url_axds_type, for searching
            * self.url_docs_base, for selecting known dataset by dataset_id
        dataset_id: string, optional
            dataset_id of station, if known.
        add_region: boolean, optional
            True to filter the search by lon/lat box. Requires self.kw
            that contains keys `min_lon`, `max_lon`, `min_lat`, `max_lat`.
        add_time: boolean, optional
            True to filter the search by time range. Requires self.kw
            that contains keys `min_time` and `max_time`.
        variable: string, optional
            String of variable description to filter by, if desired.
            If `axds_type=='platform2'`, find the variable name options with
            class function `all_variables()`, search for variable names by
            string with `search_variables()`, and check your variable list with
            `check_variables()`.
            If `axds_type=='layer_group'`, there is no official variable list
            and you can instead just put in a basic variable name and hope the
            search works.
        query: string, optional
            This could be any search query you want, but it is used in the code
            to search for station names (not dataset_ids).

        Returns
        -------
        Url for search.
        """
        url = url_base
        if dataset_id is not None:
            url += self.url_dataset_id(dataset_id)
        if add_time:
            url += self.url_time()
        if variable is not None:
            if self.axds_type == "platform2":
                url += self.url_variable(variable)
            elif self.axds_type == "layer_group":
                url += self.url_query(variable)
        if add_region:
            url += self.url_region()
        if query is not None:
            url += self.url_query(query)

        return url

    @property
    def urls(self):
        """Return a list of search urls.

        Notes
        -----
        Use this through the class methods `region` or `stations` to put
        together the search urls to represent the basic reader setup.
        """

        assert (
            self.approach is not None
        ), "Use this property through class method `region` or `stations`"

        if not hasattr(self, "_urls"):

            if self.approach == "region":
                urls = []
                if self.variables is not None:
                    for variable in self.variables:
                        urls.append(
                            self.url_builder(
                                self.url_axds_type,
                                variable=variable,
                                add_time=True,
                                add_region=True,
                            )
                        )
                else:
                    urls.append(
                        self.url_builder(
                            self.url_axds_type, add_time=True, add_region=True
                        )
                    )

            elif self.approach == "stations":
                urls = []
                # if input stations instead of dataset_ids, using different urls here
                # if self._stations is not None:
                if len(self._stations) > 0:
                    for station in self._stations:
                        urls.append(self.url_builder(self.url_axds_type, query=station))
                else:
                    for dataset_id in self._dataset_ids:
                        urls.append(
                            self.url_builder(self.url_docs_base, dataset_id=dataset_id)
                        )

            self._urls = urls

        return self._urls

    @property
    def search_results(self):
        """Loop over self.urls to read in search results.

        Notes
        -----
        The logic removes duplicate searches.
        This returns a dict of the datasets from the search results with the
        key of each entry being the dataset_id. For

        * `self.axds_type == "platform2"`: dataset_id is the uuid
        * `self.axds_type == "layer_group"`: dataset_id is the module_uuid since multiple layer_groups can be linked under one module_uuid
        """

        if not hasattr(self, "_search_results"):

            # loop over urls
            search_results = []
            for url in self.urls:
                res = requests.get(url, headers=self.search_headers).json()
                # get different returns for an id docs grab vs. generic search
                #                 if isinstance(res, list):
                #                     res = res[0]
                if isinstance(res, dict):
                    res = res["results"]
                search_results.extend(res)
            # change search_results to a dictionary to remove
            # duplicate dataset_ids
            search_results_dict = {}
            for search_result in search_results:
                if self.axds_type == "platform2":
                    search_results_dict[search_result["uuid"]] = search_result
                #                     search_results_dict[search_result['data']['uuid']] = search_result
                if self.axds_type == "layer_group":
                    # this is in the case that our search results are for a layer_group
                    if ("module_uuid" in search_result["data"]) and (
                        search_result["type"] == "layer_group"
                    ):
                        # switch to module search results instead of layer_group results
                        module_uuid = search_result["data"]["module_uuid"]
                    # this is the case that our searcb results are for a module
                    elif search_result["type"] == "module":
                        module_uuid = search_result["data"]["uuid"]

                    # don't repeat unnecessarily, if module_uuid has already
                    # been included.
                    if module_uuid in search_results_dict.keys():
                        continue
                    else:
                        url_module = self.url_builder(
                            self.url_docs_base, dataset_id=module_uuid
                        )
                        search_results_dict[module_uuid] = requests.get(
                            url_module, headers=self.search_headers
                        ).json()[0]

            condition = search_results_dict == {}
            assertion = f"No datasets fit the input criteria of kw={self.kw} and variables={self.variables}"
            #             assert condition, assertion
            if condition:
                logger.warning(assertion)
                # self._dataset_ids = []

            # DON'T SAVE THIS LATER, JUST FOR DEBUGGING
            self._search_results = search_results_dict

        #             self._dataset_ids = list(search_results_dict.keys())
        return self._search_results

    def write_catalog_layer_group_entry(
        self, dataset, dataset_id, urlpath, layer_groups
    ):
        """Write part of catalog in case of layer_group.

        Notes
        -----
        This is used to manage the logic for `axds_type='layer_group'` in which
        the module is being linked to the set of layer_groups.
        """

        try:
            model_slug = dataset["data"]["model"]["slug"]
        except:
            model_slug = ""

        # these are from the module
        try:
            label = dataset["label"].replace(":", "-")
        except:
            label = dataset["data"]["short_description"]

        geospatial_lat_min, geospatial_lat_max = (
            dataset["data"]["min_lat"],
            dataset["data"]["max_lat"],
        )
        geospatial_lon_min, geospatial_lon_max = (
            dataset["data"]["min_lng"],
            dataset["data"]["max_lng"],
        )

        lines = f"""
  {dataset_id}:
    description: {label}
    driver: opendap
    args:
      urlpath: {urlpath}
      engine: 'netcdf4'
      xarray_kwargs:
    metadata:
      variables: {list(layer_groups.values())}
      layer_group_uuids: {list(layer_groups.keys())}
      model_slug: {model_slug}
      geospatial_lon_min: {geospatial_lon_min}
      geospatial_lat_min: {geospatial_lat_min}
      geospatial_lon_max: {geospatial_lon_max}
      geospatial_lat_max: {geospatial_lat_max}
      time_coverage_start: {dataset['start_date_time']}
      time_coverage_end: {dataset['end_date_time']}

"""
        return lines

    def write_catalog(self):
        """Write catalog file."""

        # if the catalog already exists, don't do this
        if os.path.exists(self.catalog_name):
            return

        else:

            f = open(self.catalog_name, "w")

            if self.axds_type == "platform2":
                lines = "sources:\n"
                for dataset_id, dataset in self.search_results.items():
                    label = dataset["label"].replace(":", "-")
                    urlpath = dataset["source"]["files"]["data.csv.gz"]["url"]
                    metavars = dataset["source"]["meta"]["variables"]
                    Vars, standard_names = zip(
                        *[
                            (key, metavars[key]["attributes"]["standard_name"])
                            for key in metavars.keys()
                            if ("attributes" in metavars[key].keys())
                            and ("standard_name" in metavars[key]["attributes"])
                        ]
                    )
                    P = shapely.wkt.loads(dataset["data"]["geospatial_bounds"])
                    (
                        geospatial_lon_min,
                        geospatial_lat_min,
                        geospatial_lon_max,
                        geospatial_lat_max,
                    ) = P.bounds

                    lines += f"""
  {dataset["uuid"]}:
    description: {label}
    driver: csv
    args:
      urlpath: {urlpath}
      csv_kwargs:
        parse_dates: ['time']
    metadata:
      variables: {Vars}
      standard_names: {standard_names}
      platform_category: {dataset['data']['platform_category']}
      geospatial_lon_min: {geospatial_lon_min}
      geospatial_lat_min: {geospatial_lat_min}
      geospatial_lon_max: {geospatial_lon_max}
      geospatial_lat_max: {geospatial_lat_max}
      id: {dataset["data"]["packrat_source_id"]}
      time_coverage_start: {dataset['start_date_time']}
      time_coverage_end: {dataset['end_date_time']}

"""

            elif self.axds_type == "layer_group":
                lines = """
plugins:
  source:
    - module: intake_xarray
sources:
"""
                # catalog entries are by module uuid and unique to opendap urls
                # dataset_ids are module uuids
                for dataset_id, dataset in self.search_results.items():

                    # layer_groups associated with module
                    layer_groups = dataset["data"]["layer_group_info"]

                    # get search results for layer_groups
                    urlpaths = []
                    for layer_group_uuid in layer_groups.keys():
                        url_layer_group = self.url_builder(
                            self.url_docs_base, dataset_id=layer_group_uuid
                        )
                        search_results_lg = requests.get(
                            url_layer_group, headers=self.search_headers
                        ).json()[0]

                        if "OPENDAP" in search_results_lg["data"]["access_methods"]:
                            urlpaths.append(
                                search_results_lg["source"]["layers"][0][
                                    "thredds_opendap_url"
                                ][:-5]
                            )
                        else:
                            urlpaths.append("")
                            logger.warning(
                                f"no opendap url for module: module uuid {dataset_id}, layer_group uuid {layer_group_uuid}"
                            )
                            # DO NOT STORE ITEM IN CATALOG IF NOT OPENDAP ACCESSIBLE
                            continue

                    # there may be different urls for different layer_groups
                    # in which case associate the layer_group uuid with the dataset
                    # since the module uuid wouldn't be unique
                    if len(set(urlpaths)) > 1:
                        logger.warning(
                            f"there are multiple urls for module: module uuid {dataset_id}. urls: {set(urlpaths)}"
                        )
                        for urlpath, layer_group_uuid in zip(
                            urlpaths, layer_groups.keys()
                        ):
                            lines += self.write_catalog_layer_group_entry(
                                dataset, layer_group_uuid, urlpath, layer_groups
                            )

                    else:
                        urlpath = list(set(urlpaths))[0]
                        # use module uuid
                        lines += self.write_catalog_layer_group_entry(
                            dataset, dataset_id, urlpath, layer_groups
                        )

            f.write(lines)
            f.close()

    @property
    def catalog(self):
        """Write then open the catalog."""

        if not hasattr(self, "_catalog"):

            self.write_catalog()
            # if we already know there aren't any dataset_ids
            # don't try to read catalog
            if not self.search_results == {}:
                #             if (not self.dataset_ids == []) or (not self.search_results == {}):
                catalog = intake.open_catalog(self.catalog_name)
            else:
                catalog = None
            self._catalog = catalog

        return self._catalog

    @property
    def dataset_ids(self):
        """Find dataset_ids for server.

        Notes
        -----
        The dataset_ids are read from the catalog, so the catalog is created
        before this can happen, unless the dataset_ids were input from the
        beginning of the call via `stations` in which case they are simply
        saved to self._dataset_ids.
        """

        if not hasattr(self, "_dataset_ids"):
            if self.catalog is not None:
                self._dataset_ids = list(self.catalog)
            else:
                self._dataset_ids = []

        return self._dataset_ids

    def meta_by_dataset(self, dataset_id):
        """Return the catalog metadata for a single dataset_id.

        TO DO: Should this return intake-style or a row of the metadata dataframe?
        """

        return self.catalog[dataset_id]

    @property
    def meta(self):
        """Rearrange the individual metadata into a dataframe."""

        if not hasattr(self, "_meta"):

            data = []
            for dataset_id in self.dataset_ids:
                meta = self.meta_by_dataset(dataset_id)
                columns = ["download_url"] + list(
                    meta.metadata.keys()
                )  # this only needs to be set once
                data.append([meta.urlpath] + list(meta.metadata.values()))
            if len(self.dataset_ids) > 0:
                self._meta = pd.DataFrame(
                    index=self.dataset_ids, columns=columns, data=data
                )
            else:
                self._meta = None

        return self._meta

    def data_by_dataset(self, dataset_id):
        """Return the data for a single dataset_id.

        Returns
        -------
        A tuple of (dataset_id, data), where data type depends on `self.axds_type`:
        If `self.axds_type=='platform2'`: a pandas DataFrame
        If `self.axds_type=='layer_group'`: an xarray Dataset

        Notes
        -----
        Read behavior depends on `axds_type`:

        * If `self.axds_type=='platform2'`: data is read into memory with dask.
        * If `self.axds_type=='layer_group'`: data is pointed to with dask but
          nothing is read in except metadata associated with the xarray Dataset.
        """

        if self.axds_type == "platform2":

            # .to_dask().compute() seems faster than read but
            # should do more comparisons
            data = self.catalog[dataset_id].to_dask().compute()
            data = data.set_index("time")
            data = data[self.kw["min_time"] : self.kw["max_time"]]

        elif self.axds_type == "layer_group":

            if self.catalog[dataset_id].urlpath is not None:
                try:
                    data = self.catalog[dataset_id].to_dask()
                    try:
                        timekey = [
                            coord
                            for coord in data.coords
                            if ("standard_name" in data[coord].attrs)
                            and (data[coord].attrs["standard_name"] == "time")
                        ]
                        assert len(timekey) > 0
                    except:
                        timekey = [
                            coord
                            for coord in data.coords
                            if ("time" in coord) or (coord == "t")
                        ]
                        assert len(timekey) > 0
                    timekey = timekey[0]
                    slicedict = {
                        timekey: slice(self.kw["min_time"], self.kw["max_time"])
                    }
                    data = data.sel(slicedict)
                except KeyError as e:
                    #                     logger.exception(e)
                    #                     logger.warning(f'data was not read in for dataset_id {dataset_id} with url path {self.catalog[dataset_id].urlpath} and description {self.catalog[dataset_id].description}.')

                    # try to fix key error assuming it is the following problem:
                    # KeyError: "cannot represent labeled-based slice indexer for dimension 'time' with a slice over integer positions; the index is unsorted or non-unique"
                    try:
                        timekey = [
                            coord
                            for coord in data.coords
                            if ("standard_name" in data[coord].attrs)
                            and (data[coord].attrs["standard_name"] == "time")
                        ]
                        assert len(timekey) > 0
                    except:
                        timekey = [
                            coord
                            for coord in data.coords
                            if ("time" in coord) or (coord == "t")
                        ]
                        assert len(timekey) > 0
                    timekey = timekey[0]

                    slicedict = {
                        timekey: slice(self.kw["min_time"], self.kw["max_time"])
                    }
                    _, index = np.unique(data[timekey], return_index=True)
                    data = data.isel({timekey: index}).sel(slicedict)
                except Exception as e:
                    logger.exception(e)
                    logger.warning(
                        f"data was not read in for dataset_id {dataset_id} with url path {self.catalog[dataset_id].urlpath} and description {self.catalog[dataset_id].description}."
                    )
                    data = None
            else:
                data = None

        return (dataset_id, data)

    #         return (dataset_id, self.catalog[dataset_id].read())

    # @property
    def data(self):
        """Read in data for all dataset_ids.

        Returns
        -------
        A dictionary with keys of the dataset_ids and values the data of type:
        If `self.axds_type=='platform2'`: a pandas DataFrame
        If `self.axds_type=='layer_group'`: an xarray Dataset

        Notes
        -----
        This is either done in parallel with the `multiprocessing` library or
        in serial.
        """

        if not hasattr(self, "_data"):

            if self.parallel:
                num_cores = multiprocessing.cpu_count()
                downloads = Parallel(n_jobs=num_cores)(
                    delayed(self.data_by_dataset)(dataset_id)
                    for dataset_id in self.dataset_ids
                )
            else:
                downloads = []
                for dataset_id in self.dataset_ids:
                    downloads.append(self.data_by_dataset(dataset_id))

            #             if downloads is not None:
            dds = {dataset_id: dd for (dataset_id, dd) in downloads}
            #             else:
            #                 dds = None

            self._data = dds

        return self._data

    def save(self):
        """Save datasets locally."""

        for dataset_id, data in self.data().items():
            # dataframe
            if self.data_type == "csv":
                filename = (
                    f'{dataset_id}_{self.kw["min_time"]}_{self.kw["max_time"]}.csv.gz'
                )
                path_file = odg.path_files.joinpath(filename)
                data.to_csv(path_file)

            # dataset
            elif self.data_type == "nc":
                filename = (
                    f'{dataset_id}_{self.kw["min_time"]}_{self.kw["max_time"]}.nc'
                )
                path_file = odg.path_files.joinpath(filename)
                data.to_netcdf(path_file)

    def all_variables(self):
        """Return a DataFrame of allowed variable names.

        Returns
        -------
        DataFrame of variable names and count of how many times they are present in the database.

        Notes
        -----
        This list is only relevant for `self.axds_type=='platform2'`. It is not
        relevant for `self.axds_type=='layer_group'.

        Example
        -------
        >>> import ocean_data_gateway as odg
        >>> odg.axds.AxdsReader().all_variables()
                                                         count
        variable
        Ammonium                                            23
        Atmospheric Pressure: Air Pressure at Sea Level    362
        Atmospheric Pressure: Barometric Pressure         4152
        Backscatter Intensity                              286
        Battery                                           2705
        ...                                                ...
        Winds: Samples                                       1
        Winds: Speed and Direction                        7091
        Winds: Vertical Wind                                 4
        Winds: at 10 m                                      18
        pH                                                 965
        """

        path_fname = odg.variables_path.joinpath("parameter_group_names.txt")
        path_csv_fname = odg.variables_path.joinpath("axds_platform2_variable_list.csv")
        # read in Axiom Search parameter group names
        # save to file
        if path_csv_fname.is_file():
            df = pd.read_csv(path_csv_fname, index_col="variable")
        else:
            print(
                "Please wait while the list of available variables is made. This only happens once."
            )
            os.system(
                f'curl -sSL -H "Accept: application/json" "https://search.axds.co/v2/search" | jq -r \'.tags["Parameter Group"][] | "\(.label) \(.count)"\' > {path_fname}'
            )

            # read in parameter group names
            f = open(path_fname, "r")
            parameters_temp = f.readlines()
            f.close()
            #         parameters = [parameter.strip('\n') for parameter in parameters]
            parameters = {}
            for parameter in parameters_temp:
                parts = parameter.strip("\n").split()
                name = " ".join(parts[:-1])
                count = parts[-1]
                parameters[name] = count

            df = pd.DataFrame()
            df["variable"] = parameters.keys()
            df["count"] = parameters.values()
            df = df.set_index("variable")
            df.to_csv(path_csv_fname)

        return df

    def search_variables(self, variables):
        """Find valid variables names to use.

        Parameters
        ----------
        variables: string, list
            String or list of strings to use in regex search to find valid
            variable names.

        Returns
        -------
        DataFrame of variable names and count of how many times they are present in the database, sorted by count.

        Notes
        -----
        This list is only relevant for `self.axds_type=='platform2'`. It is not
        relevant for `self.axds_type=='layer_group'.

        Examples
        --------

        Search for variables that contain the substring 'sal':

        >>> odg.axds.AxdsReader().search_variables('sal')
                       count
        variable
        Salinity        3204
        Soil Salinity    622

        Return all available variables, sorted by count (or could use
        `all_variables()` directly):

        >>>  odg.axds.AxdsReader().search_variables('')
                                                            count
        variable
        Stream Height                                       19758
        Water Surface above Datum                           19489
        Stream Flow                                         15203
        Temperature: Air Temperature                         8369
        Precipitation                                        7364
        ...                                                   ...
        Vent Fluid Temperature                                  1
        Vent Fluid Thermocouple Temperature - Low               1
        CO2: PPM of Carbon Dioxide in Sea Water in Wet Gas      1
        CO2: PPM of Carbon Dioxide in Air in Dry Gas            1
        Evaporation Rate                                        1
        """

        if not isinstance(variables, list):
            variables = [variables]

        # set up search for input variables
        search = f"(?i)"
        for variable in variables:
            search += f".*{variable}|"
        search = search.strip("|")

        r = re.compile(search)

        df = self.all_variables()
        parameters = df.index

        matches = list(filter(r.match, parameters))

        # return parameters that match input variable strings
        return df.loc[matches].sort_values("count", ascending=False)

    def check_variables(self, variables, verbose=False):
        """Checks variables for presence in database list.

        Parameters
        ----------
        variables: string, list
            String or list of strings to compare against list of valid
            variable names.
        verbose: boolean, optional
            Print message if variables are matches instead of passing silently.

        Returns
        -------
        Nothing is returned. However, there are two types of behavior:

        if variables is not a valid variable name(s), an AssertionError is raised and `search_variables(variables)` is run on your behalf to suggest valid variable names to use.
        if variables is a valid variable name(s), nothing happens.

        Notes
        -----
        This list is only relevant for `self.axds_type=='platform2'`. It is not
        relevant for `self.axds_type=='layer_group'.

        Examples
        --------

        Check if the variable name 'sal' is valid:

        >>> odg.axds.AxdsReader().check_variables('sal')
        AssertionError                            Traceback (most recent call last)
        <ipython-input-11-454838d2e555> in <module>
        ----> 1 odg.axds.AxdsReader().check_variables('sal')
        ~/projects/ocean_data_gateway/ocean_data_gateway/readers/axds.py in check_variables(self, variables, verbose)
            878         CO2: PPM of Carbon Dioxide in Air in Dry Gas            1
            879         Evaporation Rate                                        1
        --> 880         \"""
            881
            882         if not isinstance(variables, list):
        AssertionError: The input variables are not exact matches to parameter groups.
        Check all parameter group values with `AxdsReader().all_variables()`
        or search parameter group values with `AxdsReader().search_variables(['sal'])`.
         Try some of the following variables:
                       count
        variable
        Salinity        3204
        Soil Salinity    622

        Check if the variable name 'Salinity' is valid:

        >>>  odg.axds.AxdsReader().check_variables('Salinity')

        """

        assertion = f'Variables are only used to filter the search for \
                    \n`axds_type="platform2". Currently, \
                    \naxds_type={self.axds_type}.'
        assert self.axds_type == "platform2", assertion

        if not isinstance(variables, list):
            variables = [variables]

        parameters = list(self.all_variables().index)

        # for a variable to exactly match a parameter
        # this should equal 1
        count = []
        for variable in variables:
            count += [parameters.count(variable)]

        condition = np.allclose(count, 1)

        assertion = f"The input variables are not exact matches to parameter groups. \
                     \nCheck all parameter group values with `AxdsReader().all_variables()` \
                     \nor search parameter group values with `AxdsReader().search_variables({variables})`.\
                     \n\n Try some of the following variables:\n{str(self.search_variables(variables))}"

        assert condition, assertion

        if condition and verbose:
            print("all variables are matches!")


[docs]class region(AxdsReader): """Inherits from AxdsReader to search over a region of space and time. Attributes ---------- kw: dict Contains space and time search constraints: `min_lon`, `max_lon`, `min_lat`, `max_lat`, `min_time`, `max_time`. variables: string or list Variable names if you want to limit the search to those. There is different behavior depending on `axds_type`: * 'platform2': the variable name or names must be from the list available in `all_variables()` and pass the check in `check_variables()`. * 'layer_group': the variable name or names will be searched for as a query so just do your best with the names and experiment. approach: string approach is defined as 'region' for this class. """
[docs] def __init__(self, kwargs): """ Parameters ---------- kwargs: dict Can contain arguments to pass onto the base AxdsReader class (catalog_name, parallel, axds_type). The dict entries to initialize this class are: * kw: dict Contains space and time search constraints: `min_lon`, `max_lon`, `min_lat`, `max_lat`, `min_time`, `max_time`. * variables: string or list, optional Variable names if you want to limit the search to those. There is different behavior depending on `axds_type`: * 'platform2': the variable name or names must be from the list available in `all_variables()` and pass the check in `check_variables()`. * 'layer_group': the variable name or names will be searched for as a query so just do your best with the names and experiment. """ assert isinstance(kwargs, dict), "input arguments as dictionary" ax_kwargs = { "catalog_name": kwargs.get("catalog_name", None), "parallel": kwargs.get("parallel", True), "axds_type": kwargs.get("axds_type", "platform2"), } AxdsReader.__init__(self, **ax_kwargs) kw = kwargs["kw"] variables = kwargs.get("variables", None) self.approach = "region" self._stations = None # run checks for KW # check for lon/lat values and time self.kw = kw if (variables is not None) and (not isinstance(variables, list)): variables = [variables] # make sure variables are on parameter list if platform2 if (variables is not None) and (self.axds_type == "platform2"): self.check_variables(variables) self.variables = variables
[docs]class stations(AxdsReader): """Inherits from AxdsReader to search for 1+ stations or dataset_ids. Attributes ---------- kw: dict Contains time search constraints: `min_time`, `max_time`. If not input, all time will be used. variables: None variables is None for this class since we read search by dataset_id or station name. approach: string approach is defined as 'stations' for this class. """
[docs] def __init__(self, kwargs): """ Parameters ---------- kwargs: dict Can contain arguments to pass onto the base AxdsReader class (catalog_name, parallel, axds_type). The dict entries to initialize this class are: * kw: dict, optional Contains time search constraints: `min_time`, `max_time`. If not input, all time will be used. * dataset_ids: string, list, optional Use this option if you know the exact dataset_ids for the data you want and `axds_type=='platform2'`. These need to be the dataset_ids corresponding to the databases that are being searched, so in this case they need to be the Axiom packrat uuid's. If you know station names but not the specific database uuids, input the names as "stations" instead. If `axds_type=='layer_group'` do not use this approach. Instead, use the keyword "stations" and input the layer_group uuids you want to search for. * stations: string, list, optional Input station names as they might be commonly known and therefore can be searched for as a query term. The station names can be input as something like "TABS B" or "8771972" and has pretty good success. Notes ----- The axds_type needs to match the station name or dataset_id you are searching for. """ assert isinstance(kwargs, dict), "input arguments as dictionary" ax_kwargs = { "catalog_name": kwargs.get("catalog_name", None), "parallel": kwargs.get("parallel", True), "axds_type": kwargs.get("axds_type", "platform2"), } # this inherits AxdsReader's attributes and functions into self AxdsReader.__init__(self, **ax_kwargs) kw = kwargs.get("kw", None) dataset_ids = kwargs.get("dataset_ids", None) stations = kwargs.get("stations", []) self.approach = "stations" # I think this isn't true anymore. # if self.axds_type == "layer_group": # assertion = 'Input "layer_group" (not module) uuids as station names, not dataset_ids.' # assert dataset_ids is None, assertion if dataset_ids is not None: if not isinstance(dataset_ids, list): dataset_ids = [dataset_ids] # self._stations = dataset_ids self._dataset_ids = dataset_ids if not stations == []: if not isinstance(stations, list): stations = [stations] self._stations = stations self.variables = None # CHECK FOR KW VALUES AS TIMES if kw is None: kw = {"min_time": "1900-01-01", "max_time": "2100-12-31"} self.kw = kw