Source code for ocean_data_gateway.readers.local

"""
Reader for local files.
"""

import hashlib
import logging
import os

import cf_xarray
import intake
import pandas as pd

import ocean_data_gateway as odg

from ocean_data_gateway import Reader


logger = logging.getLogger(__name__)

# this can be queried with
# search.LocalReader.reader
reader = "local"


class LocalReader(Reader):
    """
    This class searches local files.

    Attributes
    ----------
    parallel: boolean
        If True, run with simple parallelization using `multiprocessing`.
        If False, run serially.
    catalog_name: string
        Input catalog path if you want to use an existing catalog.
    filenames: string, list
        Specific file locations from which to read data.
    kw: dict, optional
      Contains space and time search constraints: `min_lon`, `max_lon`,
      `min_lat`, `max_lat`, `min_time`, `max_time`.
    name: string
        f'axds_{axds_type}' so 'axds_platform2' or 'axds_layer_group'
    reader: string
        Reader name: AxdsReader

    TO DO: Can this reader be used for remote datasets but for
    which we know the specific file location?
    """

    def __init__(self, parallel=True, catalog_name=None, filenames=None, kw=None):
        """
        Parameters
        ----------
        parallel: boolean, optional
            If True, run with simple parallelization using `multiprocessing`.
            If False, run serially.
        catalog_name: string, optional
            Input catalog path if you want to use an existing catalog.
        filenames: string, list
            Specific file locations from which to read data.
        kw: dict
            Contains space and time search constraints: `min_lon`, `max_lon`,
            `min_lat`, `max_lat`, `min_time`, `max_time`.

        Notes
        -----
        All input data is currently used, regardless of whether `kw` is input
        with constraints on lon, lat, or time.

        There is no real difference between searching with `region` or
        `stations` for this reader.
        """

        self.parallel = parallel

        if catalog_name is None:
            name = f"{pd.Timestamp.now().isoformat()}"
            hash_name = hashlib.sha256(name.encode()).hexdigest()[:7]
            catalog_path = odg.catalogs_path.joinpath(f"catalog_{hash_name}.yml")
            self.catalog_name = catalog_path
        else:
            self.catalog_name = catalog_name
            # if catalog_name already exists, read it in to save time
            self.catalog

        if (filenames is not None) and (not isinstance(filenames, list)):
            filenames = [filenames]
        self.filenames = filenames

        if kw is None:
            kw = {"min_time": "1900-01-01", "max_time": "2100-12-31"}

        self.kw = kw

        if (filenames is None) and (catalog_name is None):
            self._dataset_ids = []
            logger.warning(
                f"no datasets for LocalReader with catalog_name {catalog_name} and filenames {filenames}."
            )

        # name
        self.name = "local"

        self.reader = "LocalReader"
        self.store = dict()

    def __getitem__(self, key):
        """Redefinition of dict-like behavior.

        This enables user to use syntax `reader[dataset_id]` to read in and
        save dataset into the object.

        Parameters
        ----------
        key: str
            dataset_id for a dataset that is available in the search/reader
            object.

        Returns
        -------
        xarray Dataset of the data associated with key
        """

        returned_data = self.data_by_dataset(key)
        self.__setitem__(key, returned_data)
        return returned_data

    def write_catalog(self):
        """Write catalog file."""

        # if the catalog already exists, don't do this
        if os.path.exists(self.catalog_name):
            return

        else:
            lines = "sources:\n"

            for filename in self.filenames:

                if "csv" in filename:
                    file_intake = intake.open_csv(filename)
                    data = file_intake.read()
                    #                     # Remove skiprows entry and input header entry that we want
                    #                     file_intake._csv_kwargs.pop("skiprows")
                    #                     file_intake._csv_kwargs.update({"header": [0, 1]})
                    metadata = {
                        "variables": list(data.columns.values),
                        "geospatial_lon_min": float(data["longitude"].min()),
                        "geospatial_lat_min": float(data["latitude"].min()),
                        "geospatial_lon_max": float(data["longitude"].max()),
                        "geospatial_lat_max": float(data["latitude"].max()),
                        "time_coverage_start": data["time"].min(),
                        "time_coverage_end": data["time"].max(),
                    }
                    file_intake.metadata = metadata

                elif "nc" in filename:
                    file_intake = intake.open_netcdf(filename)
                    data = file_intake.read()
                    coords = list(data.coords.keys())
                    if "T" in data.cf.get_valid_keys():
                        time_coverage_start = str(data.cf["T"].min().values)
                        time_coverage_end = str(data.cf["T"].max().values)
                    else:
                        time_coverage_start = ""
                        time_coverage_end = ""
                    if "longitude" in data.cf.get_valid_keys():
                        geospatial_lon_min = float(data.cf["longitude"].min())
                        geospatial_lon_max = float(data.cf["longitude"].max())
                    else:
                        geospatial_lon_min = ""
                        geospatial_lon_max = ""
                    if "latitude" in data.cf.get_valid_keys():
                        geospatial_lat_min = float(data.cf["latitude"].min())
                        geospatial_lat_max = float(data.cf["latitude"].max())
                    else:
                        geospatial_lat_min = ""
                        geospatial_lat_max = ""
                    metadata = {
                        "coords": coords,
                        "variables": list(data.data_vars.keys()),
                        "time_variable": data.cf["T"].name,
                        "lon_variable": data.cf["longitude"].name,
                        "lat_variable": data.cf["latitude"].name,
                        "geospatial_lon_min": geospatial_lon_min,
                        "geospatial_lon_max": geospatial_lon_max,
                        "geospatial_lat_min": geospatial_lat_min,
                        "geospatial_lat_max": geospatial_lat_max,
                        "time_coverage_start": time_coverage_start,
                        "time_coverage_end": time_coverage_end,
                    }
                    file_intake.metadata = metadata

                file_intake.name = filename.split("/")[-1]
                lines += file_intake.yaml().strip("sources:")

            f = open(self.catalog_name, "w")
            f.write(lines)
            f.close()

    @property
    def catalog(self):
        """Write then open catalog."""

        if not hasattr(self, "_catalog"):

            self.write_catalog()
            catalog = intake.open_catalog(self.catalog_name)
            self._catalog = catalog

        return self._catalog

    @property
    def dataset_ids(self):
        """Find dataset_ids for catalog.

        Notes
        -----
        The dataset_ids are read from the catalog, so the catalog is created
        before this can happen.
        """

        if not hasattr(self, "_dataset_ids"):
            self._dataset_ids = list(self.catalog)

        return self._dataset_ids

    def meta_by_dataset(self, dataset_id):
        """Return the catalog metadata for a single dataset_id.

        TODO: Should this return intake-style or a row of the metadata dataframe?
        """

        return self.catalog[dataset_id]

    @property
    def meta(self):
        """Rearrange the individual metadata into a dataframe."""

        if not hasattr(self, "_meta"):

            if self.dataset_ids == []:
                self._meta = None
            else:
                # set up columns which might be different for datasets
                columns = ["download_url"]
                for dataset_id in self.dataset_ids:
                    meta = self.meta_by_dataset(dataset_id)
                    columns += list(meta.metadata.keys())
                columns = set(columns)  # take unique column names

                self._meta = pd.DataFrame(index=self.dataset_ids, columns=columns)
                for dataset_id in self.dataset_ids:
                    meta = self.meta_by_dataset(dataset_id)
                    self._meta.loc[dataset_id]["download_url"] = meta.urlpath
                    self._meta.loc[dataset_id, list(meta.metadata.keys())] = list(
                        meta.metadata.values()
                    )
                    # self._meta.loc[dataset_id][meta.metadata.keys()] = meta.metadata.values()
                    # data.append([meta.urlpath] + list(meta.metadata.values()))
                # self._meta = pd.DataFrame(index=self.dataset_ids, columns=columns, data=data)

        return self._meta

    def data_by_dataset(self, dataset_id):
        """Return the data for a single dataset_id.

        Returns
        -------
        A tuple of (dataset_id, data), where data type is a pandas DataFrame.

        Notes
        -----
        Data is read into memory.

        TODO: SHOULD I INCLUDE TIME RANGE?
        """

        data = self.catalog[dataset_id].read()
        #         data = data.set_index('time')
        #         data = data[self.kw['min_time']:self.kw['max_time']]
        return data
        # return (dataset_id, data)

    #         return (dataset_id, self.catalog[dataset_id].read())

    # @property
    def data(self, dataset_ids=None):
        """Read in data for some or all dataset_ids.

        NOT USED CURRENTLY

        Once data is read in for a dataset_ids, it is remembered.

        See full documentation in `utils.load_data()`.
        """

        output = odg.utils.load_data(self, dataset_ids)
        return output


[docs]class region(LocalReader): """Inherits from LocalReader to search over a region of space and time. Attributes ---------- kw: dict Contains space and time search constraints: `min_lon`, `max_lon`, `min_lat`, `max_lat`, `min_time`, `max_time`. variables: string or list Variable names if you want to limit the search to those. This is currently not used. approach: string approach is defined as 'region' for this class. """
[docs] def __init__(self, kwargs): """ Parameters ---------- kwargs: dict Can contain arguments to pass onto the base AxdsReader class (catalog_name, parallel, filenames). The dict entries to initialize this class are: * kw: dict Contains space and time search constraints: `min_lon`, `max_lon`, `min_lat`, `max_lat`, `min_time`, `max_time`. Not used to filter data currently. * variables: string or list, optional Variable names if you want to limit the search to those. This is not used. """ assert isinstance(kwargs, dict), "input arguments as dictionary" lo_kwargs = { "catalog_name": kwargs.get("catalog_name", None), "filenames": kwargs.get("filenames", None), "parallel": kwargs.get("parallel", True), } LocalReader.__init__(self, **lo_kwargs) kw = kwargs.get("kw", None) variables = kwargs.get("variables", None) self.approach = "region" self._stations = None # run checks for KW # check for lon/lat values and time self.kw = kw if (variables is not None) and (not isinstance(variables, list)): variables = [variables] self.variables = variables
[docs]class stations(LocalReader): """Inherits from LocalReader to search for 1+ stations or dataset_ids. Attributes ---------- kw: dict Contains space and time search constraints: `min_lon`, `max_lon`, `min_lat`, `max_lat`, `min_time`, `max_time`. approach: string approach is defined as 'stations' for this class. """
[docs] def __init__(self, kwargs): """ Parameters ---------- kwargs: dict Can contain arguments to pass onto the base LocalReader class (catalog_name, parallel, filenames). The dict entries to initialize this class are: * kw: dict, optional Contains space and time search constraints: `min_lon`, `max_lon`, `min_lat`, `max_lat`, `min_time`, `max_time`. """ assert isinstance(kwargs, dict), "input arguments as dictionary" loc_kwargs = { "catalog_name": kwargs.get("catalog_name", None), "filenames": kwargs.get("filenames", None), "parallel": kwargs.get("parallel", True), } LocalReader.__init__(self, **loc_kwargs) kw = kwargs.get("kw", None) self.approach = "stations" # CHECK FOR KW VALUES AS TIMES if kw is None: kw = {"min_time": "1900-01-01", "max_time": "2100-12-31"} self.kw = kw