Source code for ocean_data_gateway.readers.erddap

"""
Reader for ERDDAP servers.
"""

import logging
import multiprocessing
import urllib.parse

import cf_xarray  # noqa: F401
import numpy as np
import pandas as pd
import xarray as xr

from erddapy import ERDDAP
from joblib import Parallel, delayed

import ocean_data_gateway as odg

from ocean_data_gateway import Reader


logger = logging.getLogger(__name__)

# this can be queried with
# search.ErddapReader.reader
reader = "erddap"


class ErddapReader(Reader):
    """
    This class searches ERDDAP servers. There are 2 known_servers but
    others can be input too.

    Attributes
    ----------
    parallel: boolean
        If True, run with simple parallelization using `multiprocessing`.
        If False, run serially.
    known_server: string
        Two ERDDAP servers are built in to be known to this reader: "ioos" and
        "coastwatch".
    e: ERDDAP server instance
    e.protocol: string
        * "tabledap" (pandas, appropriate for reading as csv)
        * "griddap" (xarray, appropriate for reading as netcdf)
    e.server: string
        Return the server name
    columns: list
        Metadata columns
    name: string
        "erddap_ioos", "erddap_coastwatch", or a constructed string if the user
        inputs a new protocol and server.
    reader: string
        reader is defined as "ErddapReader".
    """

    def __init__(self, known_server="ioos", protocol=None, server=None, parallel=False):
        """
        Parameters
        ----------
        known_server: string, optional
            Two ERDDAP servers are built in to be known to this reader:
            "ioos" and "coastwatch".
        protocol, server: string, optional
            For a user-defined ERDDAP server, input the protocol as one of the
            following:
            * "tabledap" (pandas, appropriate for reading as csv)
            * "griddap" (xarray, appropriate for reading as netcdf)
            and the server address (such as
            "http://erddap.sensors.ioos.us/erddap" or
            "http://coastwatch.pfeg.noaa.gov/erddap").
        parallel: boolean
            If True, run with simple parallelization using `multiprocessing`.
            If False, run serially.
        """
        self.parallel = parallel

        # hard wire this for now
        filetype = "netcdf"

        # either select a known server or input protocol and server string
        if known_server == "ioos":
            protocol = "tabledap"
            server = "http://erddap.sensors.ioos.us/erddap"
            filetype = "netcdf"  # other option: "csv"
        elif known_server == "coastwatch":
            protocol = "griddap"
            server = "http://coastwatch.pfeg.noaa.gov/erddap"
            filetype = "netcdf"  # other option: "csv"
        elif known_server is not None:
            statement = (
                "either select a known server or input protocol and server string"
            )
            assert (protocol is not None) & (server is not None), statement
        else:
            known_server = urllib.parse.urlparse(server).netloc
            # known_server = server.strip("/erddap").strip("http://").replace(".", "_")
            statement = (
                "either select a known server or input protocol and server string"
            )
            assert (protocol is not None) & (server is not None), statement

        self.known_server = known_server
        self.e = ERDDAP(server=server)
        self.e.protocol = protocol
        self.e.server = server
        self.filetype = filetype

        # columns for metadata
        self.columns = [
            "geospatial_lat_min",
            "geospatial_lat_max",
            "geospatial_lon_min",
            "geospatial_lon_max",
            "time_coverage_start",
            "time_coverage_end",
            "defaultDataQuery",
            "subsetVariables",  # first works for timeseries sensors, 2nd for gliders
            "keywords",  # for hf radar
            "id",
            "infoUrl",
            "institution",
            "featureType",
            "source",
            "sourceUrl",
        ]

        # name
        self.name = f"erddap_{known_server}"

        self.reader = "ErddapReader"
        self.store = dict()

    def __getitem__(self, key):
        """Redefinition of dict-like behavior.

        This enables user to use syntax `reader[dataset_id]` to read in and
        save dataset into the object.

        Parameters
        ----------
        key: str
            dataset_id for a dataset that is available in the search/reader
            object.

        Returns
        -------
        xarray Dataset of the data associated with key
        """

        returned_data = self.data_by_dataset(key)
        # returned_data = self._return_data(key)
        self.__setitem__(key, returned_data)
        return returned_data

    def find_dataset_id_from_station(self, station):
        """Find dataset_id from station name.

        Parameters
        ----------
        station: string
            Station name for which to search for dataset_id
        """

        if station is None:
            return None
        # for station in self._stations:
        # if station has more than one word, AND will be put between
        # to search for multiple terms together.
        url = self.e.get_search_url(
            response="csv", items_per_page=5, search_for=station
        )

        try:
            df = pd.read_csv(url)
        except Exception as e:
            logger.exception(e)
            logger.warning(f"search url {url} did not work for station {station}.")
            return

        # first try for exact station match
        try:
            # Special case for TABS when don't split the id name
            if "tabs" in station:  # don't split
                dataset_id = [
                    dataset_id
                    for dataset_id in df["Dataset ID"]
                    if station.lower() == dataset_id.lower()
                ][0]
            else:
                # first try as dataset_id then do as station name
                dataset_id = [
                    dataset_id
                    for dataset_id in df["Dataset ID"]
                    if station.lower()
                    in [dataset_id.lower()] + dataset_id.lower().split("_")
                ][0]

        except Exception as e:
            logger.exception(e)
            logger.warning(
                "When searching for a dataset id to match station name %s, the first attempt to match the id did not work."
                % (station)
            )
            # If that doesn't work, return None for dataset_id
            dataset_id = None
            # # if that doesn't work, trying for more general match and just take first returned option
            # dataset_id = df.iloc[0]["Dataset ID"]

        return dataset_id

    @property
    def dataset_ids(self):
        """Find dataset_ids for server.

        Notes
        -----
        The dataset_ids are found by querying the metadata through the ERDDAP server.

        The number of dataset_ids can change if a variable is removed from the
        list of variables and this is rerun.
        """

        if not hasattr(self, "_dataset_ids") or (
            self.variables and (len(self.variables) != self.num_variables)
        ):

            # This should be a region search
            if self.approach == "region":

                # find all the dataset ids which we will use to get the data
                # This limits the search to our keyword arguments in kw which should
                # have min/max lon/lat/time values
                dataset_ids = []
                if self.variables is not None:
                    for variable in self.variables:

                        # find and save all dataset_ids associated with variable
                        search_url = self.e.get_search_url(
                            response="csv",
                            **self.kw,
                            variableName=variable,
                            items_per_page=10000,
                        )

                        try:
                            search = pd.read_csv(search_url)
                            dataset_ids.extend(search["Dataset ID"])
                        except Exception as e:
                            logger.exception(e)
                            logger.warning(
                                f"variable {variable} was not found in the search"
                            )
                            logger.warning(f"search_url: {search_url}")

                else:

                    # find and save all dataset_ids associated with variable
                    search_url = self.e.get_search_url(
                        response="csv", **self.kw, items_per_page=10000
                    )

                    try:
                        search = pd.read_csv(search_url)
                        dataset_ids.extend(search["Dataset ID"])
                    except Exception as e:
                        logger.exception(e)
                        logger.warning("nothing found in the search")
                        logger.warning(f"search_url: {search_url}")

                # only need a dataset id once since we will check them each for all standard_names
                self._dataset_ids = list(set(dataset_ids))

            # This should be a search for the station names
            elif self.approach == "stations":

                # search by station name for each of stations
                if self.parallel:
                    # get metadata for datasets
                    # run in parallel to save time
                    num_cores = multiprocessing.cpu_count()
                    dataset_ids = Parallel(n_jobs=num_cores)(
                        delayed(self.find_dataset_id_from_station)(station)
                        for station in self._stations
                    )

                else:
                    dataset_ids = []
                    for station in self._stations:
                        dataset_ids.append(self.find_dataset_id_from_station(station))

                # remove None from list
                dataset_ids = [i for i in dataset_ids if i]

                # In this case return all dataset_ids so they match 1-1 with
                # the input station list.
                self._dataset_ids = dataset_ids

            else:
                logger.warning(
                    "Neither stations nor region approach were used in function dataset_ids."
                )

            # update number of variables
            if self.variables:
                self.num_variables = len(self.variables)

        return self._dataset_ids

    def meta_by_dataset(self, dataset_id):
        """Return the catalog metadata for a single dataset_id."""

        info_url = self.e.get_info_url(response="csv", dataset_id=dataset_id)
        try:
            info = pd.read_csv(info_url)
        except Exception as e:
            logger.exception(e)
            logger.warning(f"Could not read info from {info_url}")
            return {dataset_id: []}

        items = []

        for col in self.columns:

            try:
                item = info[info["Attribute Name"] == col]["Value"].values[0]
                dtype = info[info["Attribute Name"] == col]["Data Type"].values[0]
            except:
                if col == "featureType":
                    # this column is not present in HF Radar metadata but want it to
                    # map to data_type, so input 'grid' in that case.
                    item = "grid"
                else:
                    item = "NA"

            if dtype == "String":
                pass
            elif dtype == "double":
                item = float(item)
            elif dtype == "int":
                item = int(item)
            items.append(item)

        # include download link ##
        self.e.dataset_id = dataset_id
        if self.e.protocol == "tabledap":
            # set the same time restraints as before
            self.e.constraints = {
                "time<=": self.kw["max_time"],
                "time>=": self.kw["min_time"],
            }
            if self.filetype == "csv":
                download_url = self.e.get_download_url(response="csvp")
            elif self.filetype == "netcdf":
                download_url = self.e.get_download_url(response="ncCf")

        elif self.e.protocol == "griddap":
            # the search terms that can be input for tabledap do not work for griddap
            # in erddapy currently. Instead, put together an opendap link and then
            # narrow the dataset with xarray.
            # get opendap link
            download_url = self.e.get_download_url(response="opendap")

        # check if "prediction" is present in metadata, esp in case of NOAA
        # model predictions
        is_prediction = "Prediction" in info["Value"].str.cat()

        # add erddap server name
        return {
            dataset_id: [self.e.server, download_url, info_url, is_prediction]
            + items
            + [self.variables]
        }

    @property
    def meta(self):
        """Rearrange the individual metadata into a dataframe.

        Notes
        -----
        This should exclude duplicate entries.
        """

        if not hasattr(self, "_meta"):

            if self.parallel:

                # get metadata for datasets
                # run in parallel to save time
                num_cores = multiprocessing.cpu_count()
                downloads = Parallel(n_jobs=num_cores)(
                    delayed(self.meta_by_dataset)(dataset_id)
                    for dataset_id in self.dataset_ids
                )

            else:

                downloads = []
                for dataset_id in self.dataset_ids:
                    downloads.append(self.meta_by_dataset(dataset_id))

            # make dict from individual dicts
            from collections import ChainMap

            meta = dict(ChainMap(*downloads))

            # Make dataframe of metadata
            # variable names are the column names for the dataframe
            self._meta = pd.DataFrame.from_dict(
                meta,
                orient="index",
                columns=["database", "download_url", "info_url", "is_prediction"]
                + self.columns
                + ["variable names"],
            )

        return self._meta

    def data_by_dataset(self, dataset_id):
        """Return the data for a single dataset_id.

        Returns
        -------
        A tuple of (dataset_id, data), where data type is a pandas DataFrame

        Notes
        -----
        Data is read into memory.
        """

        if self.filetype == "csv":
            # if self.e.protocol == "tabledap":
            try:
                # fetch metadata if not already present
                # found download_url from metadata and use
                self.e.dataset_id = dataset_id
                # dataset_vars gives a list of the variables in the dataset
                dataset_vars = (
                    self.meta.loc[dataset_id]["defaultDataQuery"]
                    .split("&")[0]
                    .split(",")
                )
                # vars_present gives the variables in self.variables
                # that are actually in the dataset
                vars_present = []
                for selfvariable in self.variables:
                    vp = [var for var in dataset_vars if var == selfvariable]
                    if len(vp) > 0:
                        vars_present.append(vp[0])
                # If any variables are not present, this doesn't work.
                if self.variables is not None:
                    self.e.variables = [
                        "time",
                        "longitude",
                        "latitude",
                        "station",
                    ] + vars_present
                dd = self.e.to_pandas(response="csvp", index_col=0, parse_dates=True)
                # dd = self.e.to_pandas(response='csv', header=[0, 1],
                #                       index_col=0, parse_dates=True)
                # dd = pd.read_csv(
                #     download_url, header=[0, 1], index_col=0, parse_dates=True
                # )

                # Drop cols and rows that are only NaNs.
                dd = dd.dropna(axis="index", how="all").dropna(
                    axis="columns", how="all"
                )

                if self.variables is not None:
                    # check to see if there is any actual data
                    # this is a bit convoluted because the column names are the variable names
                    # plus units so can't match 1 to 1.
                    datacols = (
                        0  # number of columns that represent data instead of metadata
                    )
                    for col in dd.columns:
                        datacols += [
                            varname in col for varname in self.variables
                        ].count(True)
                    # if no datacols, we can skip this one.
                    if datacols == 0:
                        dd = None

            except Exception as e:
                logger.exception(e)
                logger.warning("no data to be read in for %s" % dataset_id)
                dd = None

        elif self.filetype == "netcdf":
            # elif self.e.protocol == "griddap":

            if self.e.protocol == "tabledap":

                try:
                    # assume I don't need to narrow in space since time series (tabledap)
                    self.e.dataset_id = dataset_id
                    dd = self.e.to_xarray()
                    # dd = xr.open_dataset(download_url, chunks="auto")
                    dd = dd.swap_dims({"obs": dd.cf["time"].name})
                    dd = dd.sortby(dd.cf["time"], ascending=True)
                    dd = dd.cf.sel(T=slice(self.kw["min_time"], self.kw["max_time"]))
                    # dd = dd.set_coords(
                    #     [dd.cf["longitude"].name, dd.cf["latitude"].name]
                    # )

                    # use variable names to drop other variables (should. Ido this?)
                    if self.variables is not None:
                        # I don't think this is true with new approach
                        # # ERDDAP prepends variables with 's.' in netcdf files,
                        # # so include those with variables
                        # erd_vars = [f's.{var}' for var in self.variables]
                        # var_list = set(dd.data_vars) - (set(self.variables) | set(erd_vars))
                        var_list = set(dd.data_vars) - set(self.variables)
                        dd = dd.drop_vars(var_list)

                    # the lon/lat are on the 'timeseries' singleton dimension
                    # but the data_var variable was not, which messed up
                    # cf-xarray. When longitude and latitude are not on a
                    # dimension shared with a variable, the variable can't be
                    # called with cf-xarray. e.g. dd.cf['ssh'] won't work.
                    if "timeseries" in dd.dims:
                        for data_var in dd.data_vars:
                            if "timeseries" not in dd[data_var].dims:
                                dd[data_var] = dd[data_var].expand_dims(
                                    dim="timeseries", axis=1
                                )

                except Exception as e:
                    logger.exception(e)
                    logger.warning("no data to be read in for %s" % dataset_id)
                    dd = None

            elif self.e.protocol == "griddap":

                try:
                    # this makes it read in the whole file which might be large
                    self.e.dataset_id = dataset_id
                    # dd = self.e.to_xarray(chunks="auto").sel(
                    #     time=slice(self.kw["min_time"], self.kw["max_time"])
                    # )
                    download_url = self.e.get_download_url(response="opendap")
                    dd = xr.open_dataset(download_url, chunks="auto").sel(
                        time=slice(self.kw["min_time"], self.kw["max_time"])
                    )

                    if ("min_lat" in self.kw) and ("max_lat" in self.kw):
                        dd = dd.sel(
                            latitude=slice(self.kw["min_lat"], self.kw["max_lat"])
                        )

                    if ("min_lon" in self.kw) and ("max_lon" in self.kw):
                        dd = dd.sel(
                            longitude=slice(self.kw["min_lon"], self.kw["max_lon"])
                        )

                    # use variable names to drop other variables (should. Ido this?)
                    if self.variables is not None:
                        vars_list = set(dd.data_vars) - set(self.variables)
                        dd = dd.drop_vars(vars_list)

                except Exception as e:
                    logger.exception(e)
                    logger.warning("no data to be read in for %s" % dataset_id)
                    dd = None

        # return (dataset_id, dd)
        return dd

    # @property
    def data(self, dataset_ids=None):
        """Read in data for some or all dataset_ids.

        NOT USED CURRENTLY

        Once data is read in for a dataset_ids, it is remembered.

        See full documentation in `utils.load_data()`.
        """

        output = odg.utils.load_data(self, dataset_ids)
        return output


# Search for stations by region
[docs]class region(ErddapReader): """Inherits from ErddapReader to search over a region of space and time. Attributes ---------- kw: dict Contains space and time search constraints: `min_lon`, `max_lon`, `min_lat`, `max_lat`, `min_time`, `max_time`. variables: string or list Variable names if you want to limit the search to those. The variable name or names must be from the list available in `odg.all_variables(server)` for the specific ERDDAP server and pass the check in `odg.check_variables(server, variables)`. criteria: dict, str, optional A dictionary describing how to recognize variables by their name and attributes with regular expressions to be used with `cf-xarray`. It can be local or a URL point to a nonlocal gist. This is required for running QC in Gateway. For example: >>> my_custom_criteria = {"salt": { ... "standard_name": "sea_water_salinity$|sea_water_practical_salinity$", ... "name": (?i)sal$|(?i)s.sea_water_practical_salinity$"}} var_def: dict, optional A dictionary with the same keys as criteria (criteria can have more) that describes QC definitions and units. It should include the variable units, fail_span, and suspect_span. For example: >>> var_def = {"salt": {"units": "psu", ... "fail_span": [-10, 60], "suspect_span": [-1, 45]}} approach: string approach is defined as 'region' for this class. num_variables: int Number of variables stored in self.variables. This is set initially and if self.variables is modified, this is updated accordingly. If `variables is None`, `num_variables==0`. """
[docs] def __init__(self, kwargs): """ Parameters ---------- kwargs: dict Can contain arguments to pass onto the base ErddapReader class (known_server, protocol, server, parallel). The dict entries to initialize this class are: * kw: dict Contains space and time search constraints: `min_lon`, `max_lon`, `min_lat`, `max_lat`, `min_time`, `max_time`. * variables: string or list, optional Variable names if you want to limit the search to those. The variable name or names must be from the list available in `odg.all_variables(server)` for the specific ERDDAP server and pass the check in `odg.check_variables(server, variables)`. Alternatively, if the user inputs criteria, variables can be a list of the keys from criteria. * criteria: dict, optional A dictionary describing how to recognize variables by their name and attributes with regular expressions to be used with `cf-xarray`. It can be local or a URL point to a nonlocal gist. This is required for running QC in Gateway. For example: >>> my_custom_criteria = {"salt": { ... "standard_name": "sea_water_salinity$|sea_water_practical_salinity$", ... "name": (?i)sal$|(?i)s.sea_water_practical_salinity$"}} * var_def: dict, optional A dictionary with the same keys as criteria (criteria can have more) that describes QC definitions and units. It should include the variable units, fail_span, and suspect_span. For example: >>> var_def = {"salt": {"units": "psu", ... "fail_span": [-10, 60], "suspect_span": [-1, 45]}} """ assert isinstance(kwargs, dict), "input arguments as dictionary" er_kwargs = { "known_server": kwargs.get("known_server", "ioos"), "protocol": kwargs.get("protocol", None), "server": kwargs.get("server", None), "parallel": kwargs.get("parallel", True), } ErddapReader.__init__(self, **er_kwargs) kw = kwargs["kw"] variables = kwargs.get("variables", None) self.approach = "region" self._stations = None # run checks for KW # check for lon/lat values and time self.kw = kw # check for custom criteria to set up cf-xarray if "criteria" in kwargs: criteria = kwargs["criteria"] # link to nonlocal dictionary definition if isinstance(criteria, str) and criteria[:4] == "http": criteria = odg.return_response(criteria) cf_xarray.set_options(custom_criteria=criteria) self.criteria = criteria else: self.criteria = None if (variables is not None) and (not isinstance(variables, list)): variables = [variables] # make sure variables are on parameter list if variables is not None: # User is using criteria and variable nickname approach if self.criteria and all(var in self.criteria for var in variables): # first translate the input variable nicknames to variable names # that are specific to the reader. variables = odg.select_variables( self.e.server, self.criteria, variables ) # user is inputting specific reader variable names else: odg.check_variables(self.e.server, variables) # record the number of variables so that a user can change it and # the change can be compared. self.num_variables = len(variables) else: self.num_variables = 0 self.variables = variables
[docs]class stations(ErddapReader): """Inherits from ErddapReader to search for 1+ stations or dataset_ids. Attributes ---------- kw: dict, optional Contains space and time search constraints: `min_time`, `max_time`. variables: None variables is None for this class since we read search by dataset_id or station name. approach: string approach is defined as 'stations' for this class. """
[docs] def __init__(self, kwargs): """ Parameters ---------- kwargs: dict Can contain arguments to pass onto the base ErddapReader class (known_server, protocol, server, parallel). The dict entries to initialize this class are: * kw: dict, optional Contains time search constraints: `min_time`, `max_time`. If not input, all time will be used. * stations: string, list, optional Input station names as they might be commonly known and therefore can be searched for as a query term. The station names can be input as something like "TABS B" or "8771972" and has pretty good success. Or, input the exact dataset_ids for the data you want, corresponding to the databases that are being searched, so in this case they need to be the ERDDAP server's dataset_ids. Notes ----- The known_server needs to match the station name or dataset_id you are searching for. """ assert isinstance(kwargs, dict), "input arguments as dictionary" er_kwargs = { "known_server": kwargs.get("known_server", "ioos"), "protocol": kwargs.get("protocol", None), "server": kwargs.get("server", None), "parallel": kwargs.get("parallel", True), } ErddapReader.__init__(self, **er_kwargs) kw = kwargs.get("kw", None) stations = kwargs.get("stations", []) self.approach = "stations" # we want all the data associated with stations self.variables = None if not stations == []: if not isinstance(stations, list): stations = [stations] self._stations = stations # CHECK FOR KW VALUES AS TIMES if kw is None: kw = {"min_time": "1900-01-01", "max_time": "2100-12-31"} self.kw = kw