Source code for ocean_data_gateway.readers.local

"""
Reader for local files.
"""

import hashlib
import logging
import multiprocessing
import os

import intake
import pandas as pd

from joblib import Parallel, delayed

import ocean_data_gateway as odg


logger = logging.getLogger(__name__)

# this can be queried with
# search.LocalReader.reader
reader = "local"


class LocalReader:
    """
    This class searches local files.

    Attributes
    ----------
    parallel: boolean
        If True, run with simple parallelization using `multiprocessing`.
        If False, run serially.
    catalog_name: string
        Input catalog path if you want to use an existing catalog.
    filenames: string, list
        Specific file locations from which to read data.
    kw: dict, optional
      Contains space and time search constraints: `min_lon`, `max_lon`,
      `min_lat`, `max_lat`, `min_time`, `max_time`.
    name: string
        f'axds_{axds_type}' so 'axds_platform2' or 'axds_layer_group'
    reader: string
        Reader name: AxdsReader

    TO DO: Can this reader be used for remote datasets but for
    which we know the specific file location?
    """

    def __init__(self, parallel=True, catalog_name=None, filenames=None, kw=None):
        """
        Parameters
        ----------
        parallel: boolean, optional
            If True, run with simple parallelization using `multiprocessing`.
            If False, run serially.
        catalog_name: string, optional
            Input catalog path if you want to use an existing catalog.
        filenames: string, list
            Specific file locations from which to read data.
        kw: dict
            Contains space and time search constraints: `min_lon`, `max_lon`,
            `min_lat`, `max_lat`, `min_time`, `max_time`.

        Notes
        -----
        All input data is currently used, regardless of whether `kw` is input
        with constraints on lon, lat, or time.

        There is no real difference between searching with `region` or
        `stations` for this reader.
        """

        self.parallel = parallel

        if catalog_name is None:
            name = f"{pd.Timestamp.now().isoformat()}"
            hash_name = hashlib.sha256(name.encode()).hexdigest()[:7]
            catalog_path = odg.catalogs_path.joinpath(f"catalog_{hash_name}.yml")
            self.catalog_name = catalog_path
        else:
            self.catalog_name = catalog_name
            # if catalog_name already exists, read it in to save time
            self.catalog

        if (filenames is not None) and (not isinstance(filenames, list)):
            filenames = [filenames]
        self.filenames = filenames

        if kw is None:
            kw = {"min_time": "1900-01-01", "max_time": "2100-12-31"}

        self.kw = kw

        if (filenames is None) and (catalog_name is None):
            self._dataset_ids = []
            logger.warning(
                f"no datasets for LocalReader with catalog_name {catalog_name} and filenames {filenames}."
            )

        # name
        self.name = "local"

        self.reader = "LocalReader"

    def write_catalog(self):
        """Write catalog file."""

        # if the catalog already exists, don't do this
        if os.path.exists(self.catalog_name):
            return

        else:
            lines = "sources:\n"

            for filename in self.filenames:

                if "csv" in filename:
                    file_intake = intake.open_csv(filename)
                    data = file_intake.read()
                    metadata = {
                        "variables": list(data.columns.values),
                        "geospatial_lon_min": float(data["longitude"].min()),
                        "geospatial_lat_min": float(data["latitude"].min()),
                        "geospatial_lon_max": float(data["longitude"].max()),
                        "geospatial_lat_max": float(data["latitude"].max()),
                        "time_coverage_start": data["time"].min(),
                        "time_coverage_end": data["time"].max(),
                    }
                    file_intake.metadata = metadata

                elif "nc" in filename:
                    file_intake = intake.open_netcdf(filename)
                    data = file_intake.read()
                    coords = list(data.coords.keys())
                    timekey = [
                        coord
                        for coord in coords
                        if ("time" in data[coord].attrs.values())
                        or ("T" in data[coord].attrs.values())
                    ]
                    if len(timekey) > 0:
                        timekey = timekey[0]
                        time_coverage_start = str(data[timekey].min().values)
                        time_coverage_end = str(data[timekey].max().values)
                    else:
                        time_coverage_start = ""
                        time_coverage_end = ""
                    lonkey = [
                        coord
                        for coord in coords
                        if ("lon" in data[coord].attrs.values())
                        or ("X" in data[coord].attrs.values())
                    ]
                    if len(lonkey) > 0:
                        lonkey = lonkey[0]
                        geospatial_lon_min = float(data[lonkey].min())
                        geospatial_lon_max = float(data[lonkey].max())
                    else:
                        geospatial_lon_min = ""
                        geospatial_lon_max = ""
                    latkey = [
                        coord
                        for coord in coords
                        if ("lat" in data[coord].attrs.values())
                        or ("Y" in data[coord].attrs.values())
                    ]
                    if len(latkey) > 0:
                        latkey = latkey[0]
                        geospatial_lat_min = float(data[latkey].min())
                        geospatial_lat_max = float(data[latkey].max())
                    else:
                        geospatial_lat_min = ""
                        geospatial_lat_max = ""
                    metadata = {
                        "coords": coords,
                        "variables": list(data.data_vars.keys()),
                        "time_variable": timekey,
                        "lon_variable": lonkey,
                        "lat_variable": latkey,
                        "geospatial_lon_min": geospatial_lon_min,
                        "geospatial_lon_max": geospatial_lon_max,
                        "geospatial_lat_min": geospatial_lat_min,
                        "geospatial_lat_max": geospatial_lat_max,
                        "time_coverage_start": time_coverage_start,
                        "time_coverage_end": time_coverage_end,
                    }
                    file_intake.metadata = metadata

                file_intake.name = filename.split("/")[-1]
                lines += file_intake.yaml().strip("sources:")

            f = open(self.catalog_name, "w")
            f.write(lines)
            f.close()

    @property
    def catalog(self):
        """Write then open catalog."""

        if not hasattr(self, "_catalog"):

            self.write_catalog()
            catalog = intake.open_catalog(self.catalog_name)
            self._catalog = catalog

        return self._catalog

    @property
    def dataset_ids(self):
        """Find dataset_ids for catalog.

        Notes
        -----
        The dataset_ids are read from the catalog, so the catalog is created
        before this can happen.
        """

        if not hasattr(self, "_dataset_ids"):
            self._dataset_ids = list(self.catalog)

        return self._dataset_ids

    def meta_by_dataset(self, dataset_id):
        """Return the catalog metadata for a single dataset_id.

        TODO: Should this return intake-style or a row of the metadata dataframe?
        """

        return self.catalog[dataset_id]

    @property
    def meta(self):
        """Rearrange the individual metadata into a dataframe."""

        if not hasattr(self, "_meta"):

            if self.dataset_ids == []:
                self._meta = None
            else:
                # set up columns which might be different for datasets
                columns = ["download_url"]
                for dataset_id in self.dataset_ids:
                    meta = self.meta_by_dataset(dataset_id)
                    columns += list(meta.metadata.keys())
                columns = set(columns)  # take unique column names

                self._meta = pd.DataFrame(index=self.dataset_ids, columns=columns)
                for dataset_id in self.dataset_ids:
                    meta = self.meta_by_dataset(dataset_id)
                    self._meta.loc[dataset_id]["download_url"] = meta.urlpath
                    self._meta.loc[dataset_id, list(meta.metadata.keys())] = list(
                        meta.metadata.values()
                    )
                    # self._meta.loc[dataset_id][meta.metadata.keys()] = meta.metadata.values()
                    # data.append([meta.urlpath] + list(meta.metadata.values()))
                # self._meta = pd.DataFrame(index=self.dataset_ids, columns=columns, data=data)

        return self._meta

    def data_by_dataset(self, dataset_id):
        """Return the data for a single dataset_id.

        Returns
        -------
        A tuple of (dataset_id, data), where data type is a pandas DataFrame.

        Notes
        -----
        Data is read into memory.

        TODO: SHOULD I INCLUDE TIME RANGE?
        """

        data = self.catalog[dataset_id].read()
        #         data = data.set_index('time')
        #         data = data[self.kw['min_time']:self.kw['max_time']]

        return (dataset_id, data)

    #         return (dataset_id, self.catalog[dataset_id].read())

    # @property
    def data(self):
        """Read in data for all dataset_ids.

        Returns
        -------
        A dictionary with keys of the dataset_ids and values the data of type:
        If `filename` is a csv file: a pandas DataFrame
        If `filename` is a netcdf file: an xarray Dataset

        Notes
        -----
        This is either done in parallel with the `multiprocessing` library or
        in serial.
        """

        if not hasattr(self, "_data"):

            if self.parallel:
                num_cores = multiprocessing.cpu_count()
                downloads = Parallel(n_jobs=num_cores)(
                    delayed(self.data_by_dataset)(dataset_id)
                    for dataset_id in self.dataset_ids
                )
            else:
                downloads = []
                for dataset_id in self.dataset_ids:
                    downloads.append(self.data_by_dataset(dataset_id))

            #             if downloads is not None:
            dds = {dataset_id: dd for (dataset_id, dd) in downloads}
            #             else:
            #                 dds = None

            self._data = dds

        return self._data


[docs]class region(LocalReader): """Inherits from LocalReader to search over a region of space and time. Attributes ---------- kw: dict Contains space and time search constraints: `min_lon`, `max_lon`, `min_lat`, `max_lat`, `min_time`, `max_time`. variables: string or list Variable names if you want to limit the search to those. This is currently ignored. approach: string approach is defined as 'region' for this class. """
[docs] def __init__(self, kwargs): """ Parameters ---------- kwargs: dict Can contain arguments to pass onto the base AxdsReader class (catalog_name, parallel, filenames). The dict entries to initialize this class are: * kw: dict Contains space and time search constraints: `min_lon`, `max_lon`, `min_lat`, `max_lat`, `min_time`, `max_time`. Not used to filter data currently. * variables: string or list, optional Variable names if you want to limit the search to those. This is not used to filter data currently. """ assert isinstance(kwargs, dict), "input arguments as dictionary" lo_kwargs = { "catalog_name": kwargs.get("catalog_name", None), "filenames": kwargs.get("filenames", None), "parallel": kwargs.get("parallel", True), } LocalReader.__init__(self, **lo_kwargs) kw = kwargs.get("kw", None) variables = kwargs.get("variables", None) self.approach = "region" self._stations = None # run checks for KW # check for lon/lat values and time self.kw = kw if (variables is not None) and (not isinstance(variables, list)): variables = [variables] self.variables = variables
[docs]class stations(LocalReader): """Inherits from LocalReader to search for 1+ stations or dataset_ids. Attributes ---------- kw: dict Contains space and time search constraints: `min_lon`, `max_lon`, `min_lat`, `max_lat`, `min_time`, `max_time`. approach: string approach is defined as 'stations' for this class. """
[docs] def __init__(self, kwargs): """ Parameters ---------- kwargs: dict Can contain arguments to pass onto the base LocalReader class (catalog_name, parallel, filenames). The dict entries to initialize this class are: * kw: dict, optional Contains space and time search constraints: `min_lon`, `max_lon`, `min_lat`, `max_lat`, `min_time`, `max_time`. """ assert isinstance(kwargs, dict), "input arguments as dictionary" loc_kwargs = { "catalog_name": kwargs.get("catalog_name", None), "filenames": kwargs.get("filenames", None), "parallel": kwargs.get("parallel", True), } LocalReader.__init__(self, **loc_kwargs) kw = kwargs.get("kw", None) self.approach = "stations" # CHECK FOR KW VALUES AS TIMES if kw is None: kw = {"min_time": "1900-01-01", "max_time": "2100-12-31"} self.kw = kw