"""Access data using `xcube <https://xcube.readthedocs.io>`_.
Run the command ``esmvaltool config copy data-xcube-esacci.yml`` to update
your :ref:`configuration <config-data-sources>` to use this module. This will
create a file with the following content in your configuration directory:
.. literalinclude:: ../configurations/data-xcube-esacci.yml
:language: yaml
:caption: Contents of ``data-xcube-esacci.yml``
"""
from __future__ import annotations
import copy
import fnmatch
import logging
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
import xcube.core.store
from fixer import fix
import esmvalcore.io.protocol
from esmvalcore.iris_helpers import dataset_to_iris
if TYPE_CHECKING:
import iris.cube
from esmvalcore.typing import Facets, FacetValue
logger = logging.getLogger(__name__)
FREQUENCIES = {
"P1D": "day",
"P1M": "mon",
"P1Y": "yr",
}
[docs]
@dataclass
class XCubeDataset(esmvalcore.io.protocol.DataElement):
"""A dataset that can be used to load data found using xcube_."""
name: str
"""A unique name identifying the data."""
facets: Facets = field(repr=False)
"""Facets are key-value pairs that were used to find this data."""
store: xcube.core.store.store.DataStore = field(repr=False)
"""The store containing the data."""
open_params: dict[str, Any] = field(default_factory=dict, repr=False)
"""Parameters to use when opening the data."""
_attributes: dict[str, Any] | None = field(
init=False,
repr=False,
default=None,
)
def __hash__(self) -> int:
"""Return a number uniquely representing the data element."""
return hash((self.name, self.facets.get("version")))
[docs]
def prepare(self) -> None:
"""Prepare the data for access."""
self.store.preload_data(self.name)
@property
def attributes(self) -> dict[str, Any]:
"""Attributes are key-value pairs describing the data."""
if self._attributes is None:
msg = (
"Attributes have not been read yet. Call the `to_iris` method "
"first to read the attributes from the file."
)
raise ValueError(msg)
return self._attributes
@attributes.setter
def attributes(self, value: dict[str, Any]) -> None:
self._attributes = value
[docs]
def to_iris(self) -> iris.cube.CubeList:
"""Load the data as Iris cubes.
Returns
-------
:
The loaded data.
"""
dataset = self.store.open_data(self.name, **self.open_params)
dataset = fix(dataset, self.name)
dataset.attrs["source_file"] = repr(self)
# Cache the attributes.
self.attributes = copy.deepcopy(dataset.attrs)
return dataset_to_iris(dataset)
_DATASETS_LOGGED: set[str] = set()
[docs]
@dataclass
class XCubeDataSource(esmvalcore.io.protocol.DataSource):
"""Data source for finding files on a local filesystem."""
name: str
"""A name identifying the data source."""
project: str
"""The project that the data source provides data for."""
priority: int
"""The priority of the data source. Lower values have priority."""
debug_info: str = field(init=False, repr=False, default="")
"""A string containing debug information when no data is found."""
data_store_id: str
"""Name of the data store.
A list of available data stores can be found in the `xcube documentation
<https://xcube.readthedocs.io/en/latest/dataaccess.html#available-data-stores>`__.
"""
values: dict[str, dict[str, str]] = field(default_factory=dict)
"""Mapping between the ESMValCore and xcube facet values."""
data_store_params: dict[str, Any] = field(default_factory=dict, repr=False)
"""Parameters to use when creating the data store."""
open_params: dict[str, Any] = field(default_factory=dict, repr=False)
"""Parameters to use when opening the data."""
[docs]
def find_data(self, **facets: FacetValue) -> list[XCubeDataset]:
"""Find data.
Parameters
----------
**facets :
Find data matching these facets.
Returns
-------
:
A list of data elements that have been found.
"""
store = xcube.core.store.new_data_store(
self.data_store_id,
**self.data_store_params,
)
result = []
requested_short_names = facets.get("short_name", "*")
if isinstance(requested_short_names, str | int | float):
requested_short_names = [str(requested_short_names)]
requested_xcube_short_names = [
self.values.get("short_name", {}).get(short_name, short_name)
for short_name in requested_short_names
]
requested_datasets = facets.get("dataset", "*")
if isinstance(requested_datasets, str | int | float):
requested_datasets = [str(requested_datasets)]
available_datasets = store.list_data_ids()
self.debug_info = (
"No dataset matching "
+ ", ".join(f"'{d}'" for d in requested_datasets)
+ f" was found in {self.data_store_id}. Available datasets are:\n"
+ "\n".join(sorted(available_datasets))
)
for data_id in available_datasets:
for dataset_pattern in requested_datasets:
if fnmatch.fnmatchcase(data_id, dataset_pattern):
description = store.describe_data(data_id)
available_xcube_short_names = list(description.data_vars)
xcube_short_names = [
short_name
for short_name in available_xcube_short_names
for short_name_pattern in requested_xcube_short_names
if fnmatch.fnmatchcase(short_name, short_name_pattern)
]
if not xcube_short_names:
self.debug_info = (
"No variable matching "
+ ", ".join(
f"'{s}'" for s in requested_xcube_short_names
)
+ f" was found in dataset '{data_id}'. Available variables are:\n"
+ "\n".join(sorted(available_xcube_short_names))
)
continue
timerange = f"{description.time_range[0]}/{description.time_range[1]}".replace(
"-",
"",
)
short_names = [
short_name
for short_name, xcube_short_name in self.values.get(
"short_name",
{},
).items()
if xcube_short_name in xcube_short_names
]
dataset = XCubeDataset(
name=data_id,
facets={
"dataset": data_id,
"short_name": (
short_names[0]
if len(short_names) == 1
else short_names
),
"timerange": timerange,
},
store=store,
open_params=copy.deepcopy(self.open_params),
)
frequency = FREQUENCIES.get(
description.attrs.get("time_coverage_resolution", ""),
)
if frequency:
# Assign the frequency facet if it is a known frequency.
dataset.facets["frequency"] = frequency
dataset.attributes = description.attrs
result.append(dataset)
if result:
self.debug_info = (
f"Found dataset{'' if len(result) == 1 else 's'} "
f"{', '.join(d.name for d in result)} in data store "
f"{self.data_store_id}."
)
return result