Source code for ocean_data_gateway.readers.erddap

"""
Reader for ERDDAP servers.
"""

import logging
import multiprocessing
import re

import numpy as np
import pandas as pd
import xarray as xr

from erddapy import ERDDAP
from joblib import Parallel, delayed

import ocean_data_gateway as odg


logger = logging.getLogger(__name__)

# this can be queried with
# search.ErddapReader.reader
reader = "erddap"


class ErddapReader:
    """
    This class searches ERDDAP servers. There are 2 known_servers but
    others can be input too.

    Attributes
    ----------
    parallel: boolean
        If True, run with simple parallelization using `multiprocessing`.
        If False, run serially.
    known_server: string
        Two ERDDAP servers are built in to be known to this reader: "ioos" and
        "coastwatch".
    e: ERDDAP server instance
    e.protocol: string
        * "tabledap" (pandas, appropriate for reading as csv)
        * "griddap" (xarray, appropriate for reading as netcdf)
    e.server: string
        Return the server name
    columns: list
        Metadata columns
    name: string
        "erddap_ioos", "erddap_coastwatch", or a constructed string if the user
        inputs a new protocol and server.
    reader: string
        reader is defined as "ErddapReader".
    """

    def __init__(self, known_server="ioos", protocol=None, server=None, parallel=True):
        """
        Parameters
        ----------
        known_server: string, optional
            Two ERDDAP servers are built in to be known to this reader:
            "ioos" and "coastwatch".
        protocol, server: string, optional
            For a user-defined ERDDAP server, input the protocol as one of the
            following:
            * "tabledap" (pandas, appropriate for reading as csv)
            * "griddap" (xarray, appropriate for reading as netcdf)
            and the server address (such as
            "http://erddap.sensors.ioos.us/erddap" or
            "http://coastwatch.pfeg.noaa.gov/erddap").
        parallel: boolean
            If True, run with simple parallelization using `multiprocessing`.
            If False, run serially.
        """
        self.parallel = parallel

        # either select a known server or input protocol and server string
        if known_server == "ioos":
            protocol = "tabledap"
            server = "http://erddap.sensors.ioos.us/erddap"
        elif known_server == "coastwatch":
            protocol = "griddap"
            server = "http://coastwatch.pfeg.noaa.gov/erddap"
        elif known_server is not None:
            statement = (
                "either select a known server or input protocol and server string"
            )
            assert (protocol is not None) & (server is not None), statement
        else:
            known_server = server.strip("/erddap").strip("http://").replace(".", "_")
            statement = (
                "either select a known server or input protocol and server string"
            )
            assert (protocol is not None) & (server is not None), statement

        self.known_server = known_server
        self.e = ERDDAP(server=server)
        self.e.protocol = protocol
        self.e.server = server

        # columns for metadata
        self.columns = [
            "geospatial_lat_min",
            "geospatial_lat_max",
            "geospatial_lon_min",
            "geospatial_lon_max",
            "time_coverage_start",
            "time_coverage_end",
            "defaultDataQuery",
            "subsetVariables",  # first works for timeseries sensors, 2nd for gliders
            "keywords",  # for hf radar
            "id",
            "infoUrl",
            "institution",
            "featureType",
            "source",
            "sourceUrl",
        ]

        # name
        self.name = f"erddap_{known_server}"

        self.reader = "ErddapReader"

    def find_dataset_id_from_station(self, station):
        """Find dataset_id from station name.

        Parameters
        ----------
        station: string
            Station name for which to search for dataset_id
        """

        if station is None:
            return None
        # for station in self._stations:
        # if station has more than one word, AND will be put between
        # to search for multiple terms together.
        url = self.e.get_search_url(
            response="csv", items_per_page=5, search_for=station
        )

        try:
            df = pd.read_csv(url)
        except Exception as e:
            logger.exception(e)
            logger.warning(f"search url {url} did not work for station {station}.")
            return

        # first try for exact station match
        try:
            # Special case for TABS when don't split the id name
            if "tabs" in station:  # don't split
                dataset_id = [
                    dataset_id
                    for dataset_id in df["Dataset ID"]
                    if station.lower() == dataset_id.lower()
                ][0]
            else:
                dataset_id = [
                    dataset_id
                    for dataset_id in df["Dataset ID"]
                    if station.lower() in dataset_id.lower().split("_")
                ][0]

        except Exception as e:
            logger.exception(e)
            logger.warning(
                "When searching for a dataset id to match station name %s, the first attempt to match the id did not work."
                % (station)
            )
            # If that doesn't work, return None for dataset_id
            dataset_id = None
            # # if that doesn't work, trying for more general match and just take first returned option
            # dataset_id = df.iloc[0]["Dataset ID"]

        return dataset_id

    @property
    def dataset_ids(self):
        """Find dataset_ids for server.

        Notes
        -----
        The dataset_ids are found by querying the metadata through the ERDDAP server. Or, if running with `stations()` and input dataset_ids, they are simply set initially with those values.
        """

        if not hasattr(self, "_dataset_ids"):

            # This should be a region search
            if self.approach == "region":

                # find all the dataset ids which we will use to get the data
                # This limits the search to our keyword arguments in kw which should
                # have min/max lon/lat/time values
                dataset_ids = []
                if self.variables is not None:
                    for variable in self.variables:

                        # find and save all dataset_ids associated with variable
                        search_url = self.e.get_search_url(
                            response="csv",
                            **self.kw,
                            variableName=variable,
                            items_per_page=10000,
                        )

                        try:
                            search = pd.read_csv(search_url)
                            dataset_ids.extend(search["Dataset ID"])
                        except Exception as e:
                            logger.exception(e)
                            logger.warning(
                                f"variable {variable} was not found in the search"
                            )
                            logger.warning(f"search_url: {search_url}")

                else:

                    # find and save all dataset_ids associated with variable
                    search_url = self.e.get_search_url(
                        response="csv", **self.kw, items_per_page=10000
                    )

                    try:
                        search = pd.read_csv(search_url)
                        dataset_ids.extend(search["Dataset ID"])
                    except Exception as e:
                        logger.exception(e)
                        logger.warning("nothing found in the search")
                        logger.warning(f"search_url: {search_url}")

                # only need a dataset id once since we will check them each for all standard_names
                self._dataset_ids = list(set(dataset_ids))

            # This should be a search for the station names
            elif self.approach == "stations":

                # search by station name for each of stations
                if self.parallel:
                    # get metadata for datasets
                    # run in parallel to save time
                    num_cores = multiprocessing.cpu_count()
                    dataset_ids = Parallel(n_jobs=num_cores)(
                        delayed(self.find_dataset_id_from_station)(station)
                        for station in self._stations
                    )

                else:
                    dataset_ids = []
                    for station in self._stations:
                        dataset_ids.append(self.find_dataset_id_from_station(station))

                # In this case return all dataset_ids so they match 1-1 with
                # the input station list.
                self._dataset_ids = dataset_ids

            else:
                logger.warning(
                    "Neither stations nor region approach were used in function dataset_ids."
                )

        return self._dataset_ids

    def meta_by_dataset(self, dataset_id):
        """Return the catalog metadata for a single dataset_id."""

        info_url = self.e.get_info_url(response="csv", dataset_id=dataset_id)
        try:
            info = pd.read_csv(info_url)
        except Exception as e:
            logger.exception(e)
            logger.warning(f"Could not read info from {info_url}")
            return {dataset_id: []}

        items = []

        for col in self.columns:

            try:
                item = info[info["Attribute Name"] == col]["Value"].values[0]
                dtype = info[info["Attribute Name"] == col]["Data Type"].values[0]
            except:
                if col == "featureType":
                    # this column is not present in HF Radar metadata but want it to
                    # map to data_type, so input 'grid' in that case.
                    item = "grid"
                else:
                    item = "NA"

            if dtype == "String":
                pass
            elif dtype == "double":
                item = float(item)
            elif dtype == "int":
                item = int(item)
            items.append(item)

        ## include download link ##
        self.e.dataset_id = dataset_id
        if self.e.protocol == "tabledap":
            if self.variables is not None:
                self.e.variables = [
                    "time",
                    "longitude",
                    "latitude",
                    "station",
                ] + self.variables
            # set the same time restraints as before
            self.e.constraints = {
                "time<=": self.kw["max_time"],
                "time>=": self.kw["min_time"],
            }
            download_url = self.e.get_download_url(response="csvp")

        elif self.e.protocol == "griddap":
            # the search terms that can be input for tabledap do not work for griddap
            # in erddapy currently. Instead, put together an opendap link and then
            # narrow the dataset with xarray.
            # get opendap link
            download_url = self.e.get_download_url(response="opendap")

        # add erddap server name
        return {dataset_id: [self.e.server, download_url] + items + [self.variables]}

    @property
    def meta(self):
        """Rearrange the individual metadata into a dataframe.

        Notes
        -----
        This should exclude duplicate entries.
        """

        if not hasattr(self, "_meta"):

            if self.parallel:

                # get metadata for datasets
                # run in parallel to save time
                num_cores = multiprocessing.cpu_count()
                downloads = Parallel(n_jobs=num_cores)(
                    delayed(self.meta_by_dataset)(dataset_id)
                    for dataset_id in self.dataset_ids
                )

            else:

                downloads = []
                for dataset_id in self.dataset_ids:
                    downloads.append(self.meta_by_dataset(dataset_id))

            # make dict from individual dicts
            from collections import ChainMap

            meta = dict(ChainMap(*downloads))

            # Make dataframe of metadata
            # variable names are the column names for the dataframe
            self._meta = pd.DataFrame.from_dict(
                meta,
                orient="index",
                columns=["database", "download_url"]
                + self.columns
                + ["variable names"],
            )

        return self._meta

    def data_by_dataset(self, dataset_id):
        """Return the data for a single dataset_id.

        Returns
        -------
        A tuple of (dataset_id, data), where data type is a pandas DataFrame

        Notes
        -----
        Data is read into memory.
        """

        download_url = self.meta.loc[dataset_id, "download_url"]
        # data variables in ds that are not the variables we searched for
        #         varnames = self.meta.loc[dataset_id, 'variable names']

        if self.e.protocol == "tabledap":

            try:

                # fetch metadata if not already present
                # found download_url from metadata and use
                dd = pd.read_csv(download_url, index_col=0, parse_dates=True)

                # Drop cols and rows that are only NaNs.
                dd = dd.dropna(axis="index", how="all").dropna(
                    axis="columns", how="all"
                )

                if self.variables is not None:
                    # check to see if there is any actual data
                    # this is a bit convoluted because the column names are the variable names
                    # plus units so can't match 1 to 1.
                    datacols = (
                        0  # number of columns that represent data instead of metadata
                    )
                    for col in dd.columns:
                        datacols += [
                            varname in col for varname in self.variables
                        ].count(True)
                    # if no datacols, we can skip this one.
                    if datacols == 0:
                        dd = None

            except Exception as e:
                logger.exception(e)
                logger.warning("no data to be read in for %s" % dataset_id)
                dd = None

        elif self.e.protocol == "griddap":

            try:
                dd = xr.open_dataset(download_url, chunks="auto").sel(
                    time=slice(self.kw["min_time"], self.kw["max_time"])
                )

                if ("min_lat" in self.kw) and ("max_lat" in self.kw):
                    dd = dd.sel(latitude=slice(self.kw["min_lat"], self.kw["max_lat"]))

                if ("min_lon" in self.kw) and ("max_lon" in self.kw):
                    dd = dd.sel(longitude=slice(self.kw["min_lon"], self.kw["max_lon"]))

                # use variable names to drop other variables (should. Ido this?)
                if self.variables is not None:
                    l = set(dd.data_vars) - set(self.variables)
                    dd = dd.drop_vars(l)

            except Exception as e:
                logger.exception(e)
                logger.warning("no data to be read in for %s" % dataset_id)
                dd = None

        return (dataset_id, dd)

    # @property
    def data(self):
        """Read in data for all dataset_ids.

        Returns
        -------
        A dictionary with keys of the dataset_ids and values the data of type pandas DataFrame.

        Notes
        -----
        This is either done in parallel with the `multiprocessing` library or
        in serial.
        """

        # if not hasattr(self, '_data'):

        if self.parallel:
            num_cores = multiprocessing.cpu_count()
            downloads = Parallel(n_jobs=num_cores)(
                delayed(self.data_by_dataset)(dataset_id)
                for dataset_id in self.dataset_ids
            )
        else:
            downloads = []
            for dataset_id in self.dataset_ids:
                downloads.append(self.data_by_dataset(dataset_id))

        #             if downloads is not None:
        dds = {dataset_id: dd for (dataset_id, dd) in downloads}
        #             else:
        #                 dds = None

        return dds
        # self._data = dds

        # return self._data

    def count(self, url):
        """Small helper function to count len(results) at url."""
        try:
            return len(pd.read_csv(url))
        except:
            return np.nan

    def all_variables(self):
        """Return a DataFrame of allowed variable names.

        Returns
        -------
        DataFrame of variable names and count of how many times they are present in the database.

        Notes
        -----
        This list is specific to the given ERDDAP server. If you are using
        an user-input server, it will have its own `known_server` name and upon
        running this function the first time, you should get a variable list for
        that server.

        Examples
        --------
        >>> import ocean_data_gateway as odg
        >>> odg.erddap.ErddapReader(known_server='ioos').all_variables()
                                           count
        variable
        air_pressure                        4028
        air_pressure_10011met_a                2
        air_pressure_10311ahlm_a               2
        air_pressure_10311ahlm_a_qc_agg        1
        air_pressure_10311ahlm_a_qc_tests      1
        ...                                  ...
        wind_speed_windbird_qc_agg             1
        wind_speed_windbird_qc_tests           1
        wind_to_direction                     55
        wmo_id                               954
        z                                  37377

        Or for a different `known_server`:

        >>> odg.erddap.ErddapReader(known_server='coastwatch').all_variables()
                      count
        variable
        abund_m3          2
        ac_line           1
        ac_sta            1
        adg_412           8
        adg_412_bias      8
        ...             ...
        yeardeployed      1
        yield             1
        z                 3
        z_mean            2
        zlev              6
        """

        path_name_counts = odg.variables_path.joinpath(
            f"erddap_variable_list_{self.known_server}.csv"
        )

        if path_name_counts.is_file():
            return pd.read_csv(path_name_counts, index_col="variable")
        else:
            print(
                "Please wait while the list of available variables is made. This only happens once but could take 10 minutes."
            )
            # This took 10 min running in parallel for ioos
            # 2 min for coastwatch
            url = f"{self.e.server}/categorize/variableName/index.csv?page=1&itemsPerPage=100000"
            df = pd.read_csv(url)
            if not self.parallel:
                counts = []
                for url in df.URL:
                    counts.append(self.count(url))
            else:
                num_cores = multiprocessing.cpu_count()
                counts = Parallel(n_jobs=num_cores)(
                    delayed(self.count)(url) for url in df.URL
                )
            dfnew = pd.DataFrame()
            dfnew["variable"] = df["Category"]
            dfnew["count"] = counts
            dfnew = dfnew.set_index("variable")
            # remove nans
            if (dfnew.isnull().sum() > 0).values:
                dfnew = dfnew[~dfnew.isnull().values].astype(int)
            dfnew.to_csv(path_name_counts)

        return dfnew

    def search_variables(self, variables):
        """Find valid variables names to use.

        Parameters
        ----------
        variables: string, list
            String or list of strings to use in regex search to find valid
            variable names.

        Returns
        -------
        DataFrame of variable names and count of how many times they are present in the database, sorted by count.

        Notes
        -----
        This list is only specific to the ERDDAP server.

        Examples
        --------

        Search for variables that contain the substring 'sal':

        >>> odg.erddap.ErddapReader(known_server='ioos').search_variables('sal')
                                                        count
        variable
        salinity                                          954
        salinity_qc                                       954
        sea_water_practical_salinity                      778
        soil_salinity_qc_agg                              622
        soil_salinity                                     622
        ...                                               ...
        sea_water_practical_salinity_4161sc_a_qc_tests      1
        sea_water_practical_salinity_6754mc_a_qc_tests      1
        sea_water_practical_salinity_6754mc_a_qc_agg        1
        sea_water_practical_salinity_4161sc_a_qc_agg        1
        sea_water_practical_salinity_10091sc_a              1

        Find available variables sorted by count:

        >>>  odg.erddap.ErddapReader(known_server='ioos').search_variables('')
                                                            count
        variable
        time                                                38331
        longitude                                           38331
        latitude                                            38331
        z                                                   37377
        station                                             37377
        ...                                                   ...
        sea_surface_wave_from_direction_elw11a3t01wv_qc...      1
        sea_surface_wave_from_direction_elw11b2t01wv            1
        sea_surface_wave_from_direction_elw11b2t01wv_qc...      1
        sea_surface_wave_from_direction_elw11b2t01wv_qc...      1
        sea_water_pressure_7263arc_a                            1
        """

        if not isinstance(variables, list):
            variables = [variables]

        # set up search for input variables
        search = f"(?i)"
        for variable in variables:
            search += f".*{variable}|"
        search = search.strip("|")

        r = re.compile(search)

        # just get the variable names
        df = self.all_variables()
        parameters = df.index

        matches = list(filter(r.match, parameters))

        # return parameters that match input variable strings
        return df.loc[matches].sort_values("count", ascending=False)

    def check_variables(self, variables, verbose=False):
        """Checks variables for presence in database list.

        Parameters
        ----------
        variables: string, list
            String or list of strings to compare against list of valid
            variable names.
        verbose: boolean, optional
            Print message if variables are matches instead of passing silently.

        Returns
        -------
        Nothing is returned. However, there are two types of behavior:

        if variables is not a valid variable name(s), an AssertionError is
          raised and `search_variables(variables)` is run on your behalf to
          suggest valid variable names to use.
        if variables is a valid variable name(s), nothing happens.

        Notes
        -----
        This list is specific to the ERDDAP server being used.

        Examples
        --------
        Check if the variable name 'sal' is valid:

        >>> odg.erddap.ErddapReader(known_server='ioos').check_variables('sal')
        AssertionError                            Traceback (most recent call last)
        <ipython-input-13-f8082c9bfafa> in <module>
        ----> 1 odg.erddap.ErddapReader(known_server='ioos').check_variables('sal')
        ~/projects/ocean_data_gateway/ocean_data_gateway/readers/erddap.py in check_variables(self, variables, verbose)
            572         salinity_qc                                       954
            573         sea_water_practical_salinity                      778
        --> 574         soil_salinity_qc_agg                              622
            575         soil_salinity                                     622
            576         ...                                               ...
        AssertionError: The input variables are not exact matches to ok variables for known_server ioos.
        Check all parameter group values with `ErddapReader().all_variables()`
        or search parameter group values with `ErddapReader().search_variables(['sal'])`.
         Try some of the following variables:
                                                        count
        variable
        salinity                                          954
        salinity_qc                                       954
        sea_water_practical_salinity                      778
        soil_salinity_qc_agg                              622
        soil_salinity                                     622
        ...                                               ...
        sea_water_practical_salinity_4161sc_a_qc_tests      1
        sea_water_practical_salinity_6754mc_a_qc_tests      1
        sea_water_practical_salinity_6754mc_a_qc_agg        1
        sea_water_practical_salinity_4161sc_a_qc_agg        1
        sea_water_practical_salinity_10091sc_a              1

        Check if the variable name 'salinity' is valid:

        >>> odg.erddap.ErddapReader(known_server='ioos').check_variables('salinity')

        """

        if not isinstance(variables, list):
            variables = [variables]

        parameters = list(self.all_variables().index)

        # for a variable to exactly match a parameter
        # this should equal 1
        count = []
        for variable in variables:
            count += [parameters.count(variable)]

        condition = np.allclose(count, 1)

        assertion = f"The input variables are not exact matches to ok variables for known_server {self.known_server}. \
                     \nCheck all parameter group values with `ErddapReader().all_variables()` \
                     \nor search parameter group values with `ErddapReader().search_variables({variables})`.\
                     \n\n Try some of the following variables:\n{str(self.search_variables(variables))}"  # \
        assert condition, assertion

        if condition and verbose:
            print("all variables are matches!")


# Search for stations by region
[docs]class region(ErddapReader): """Inherits from ErddapReader to search over a region of space and time. Attributes ---------- kw: dict Contains space and time search constraints: `min_lon`, `max_lon`, `min_lat`, `max_lat`, `min_time`, `max_time`. variables: string or list Variable names if you want to limit the search to those. The variable name or names must be from the list available in `all_variables()` for the specific ERDDAP server and pass the check in `check_variables()`. approach: string approach is defined as 'region' for this class. """
[docs] def __init__(self, kwargs): """ Parameters ---------- kwargs: dict Can contain arguments to pass onto the base ErddapReader class (known_server, protocol, server, parallel). The dict entries to initialize this class are: * kw: dict Contains space and time search constraints: `min_lon`, `max_lon`, `min_lat`, `max_lat`, `min_time`, `max_time`. * variables: string or list, optional Variable names if you want to limit the search to those. The variable name or names must be from the list available in `all_variables()` for the specific ERDDAP server and pass the check in `check_variables()`. """ assert isinstance(kwargs, dict), "input arguments as dictionary" er_kwargs = { "known_server": kwargs.get("known_server", "ioos"), "protocol": kwargs.get("protocol", None), "server": kwargs.get("server", None), "parallel": kwargs.get("parallel", True), } ErddapReader.__init__(self, **er_kwargs) kw = kwargs["kw"] variables = kwargs.get("variables", None) self.approach = "region" self._stations = None # run checks for KW # check for lon/lat values and time self.kw = kw if (variables is not None) and (not isinstance(variables, list)): variables = [variables] # make sure variables are on parameter list if variables is not None: self.check_variables(variables) self.variables = variables
[docs]class stations(ErddapReader): """Inherits from ErddapReader to search for 1+ stations or dataset_ids. Attributes ---------- kw: dict, optional Contains space and time search constraints: `min_time`, `max_time`. variables: None variables is None for this class since we read search by dataset_id or station name. approach: string approach is defined as 'stations' for this class. """
[docs] def __init__(self, kwargs): """ Parameters ---------- kwargs: dict Can contain arguments to pass onto the base ErddapReader class (known_server, protocol, server, parallel). The dict entries to initialize this class are: * kw: dict, optional Contains time search constraints: `min_time`, `max_time`. If not input, all time will be used. * dataset_ids: string, list, optional Use this option if you know the exact dataset_ids for the data you want. These need to be the dataset_ids corresponding to the databases that are being searched, so in this case they need to be the ERDDAP server's dataset_ids. If you know station names but not the specific database uuids, input the names as "stations" instead. * stations: string, list, optional Input station names as they might be commonly known and therefore can be searched for as a query term. The station names can be input as something like "TABS B" or "8771972" and has pretty good success. Notes ----- The known_server needs to match the station name or dataset_id you are searching for. """ assert isinstance(kwargs, dict), "input arguments as dictionary" er_kwargs = { "known_server": kwargs.get("known_server", "ioos"), "protocol": kwargs.get("protocol", None), "server": kwargs.get("server", None), "parallel": kwargs.get("parallel", True), } ErddapReader.__init__(self, **er_kwargs) kw = kwargs.get("kw", None) dataset_ids = kwargs.get("dataset_ids", None) stations = kwargs.get("stations", []) self.approach = "stations" # we want all the data associated with stations self.variables = None if dataset_ids is not None: if not isinstance(dataset_ids, list): dataset_ids = [dataset_ids] self._dataset_ids = dataset_ids if not stations == []: if not isinstance(stations, list): stations = [stations] self._stations = stations self.dataset_ids else: self._stations = stations # CHECK FOR KW VALUES AS TIMES if kw is None: kw = {"min_time": "1900-01-01", "max_time": "2100-12-31"} self.kw = kw