Source code for esmvalcore.io.xcube
"""Access data using `xcube <https://xcube.readthedocs.io>`_.
Run the command ``esmvaltool config copy data-xcube-ccizarr.yml`` to update
your :ref:`configuration <config-data-sources>` to use this module. This will
create a file with the following content in your configuration directory:
.. literalinclude:: ../configurations/data-xcube-ccizarr.yml
:language: yaml
:caption: Contents of ``data-xcube-ccizarr.yml``
"""
from __future__ import annotations
import copy
import fnmatch
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
import iris.cube
import iris.std_names
import xcube.core.store
import esmvalcore.io.protocol
from esmvalcore.iris_helpers import dataset_to_iris
if TYPE_CHECKING:
from esmvalcore.typing import Facets, FacetValue
[docs]
@dataclass
class XCubeDataset(esmvalcore.io.protocol.DataElement):
"""A dataset that can be used to load data found using xcube_."""
name: str
"""A unique name identifying the data."""
facets: Facets = field(repr=False)
"""Facets are key-value pairs that were used to find this data."""
store: xcube.core.store.store.DataStore = field(repr=False)
"""The store containing the data."""
open_params: dict[str, Any] = field(default_factory=dict, repr=False)
"""Parameters to use when opening the data."""
_attributes: dict[str, Any] | None = field(
init=False,
repr=False,
default=None,
)
def __hash__(self) -> int:
"""Return a number uniquely representing the data element."""
return hash((self.name, self.facets.get("version")))
[docs]
def prepare(self) -> None:
"""Prepare the data for access."""
self.store.preload_data(self.name)
@property
def attributes(self) -> dict[str, Any]:
"""Attributes are key-value pairs describing the data."""
if self._attributes is None:
msg = (
"Attributes have not been read yet. Call the `to_iris` method "
"first to read the attributes from the file."
)
raise ValueError(msg)
return self._attributes
@attributes.setter
def attributes(self, value: dict[str, Any]) -> None:
self._attributes = value
[docs]
def to_iris(self) -> iris.cube.CubeList:
"""Load the data as Iris cubes.
Returns
-------
:
The loaded data.
"""
dataset = self.store.open_data(self.name, **self.open_params)
# Keep only variables matching the "short_name" facet.
short_names = self.facets.get("short_name", [])
if isinstance(short_names, str | int):
short_names = [str(short_names)]
if short_names:
dataset = dataset[short_names]
# Drop invalid standard_names.
# TODO: move this to a standalone fixes package.
for data_var in dataset.data_vars.values():
if (
"standard_name" in data_var.attrs
and data_var.attrs["standard_name"]
not in iris.std_names.STD_NAMES
):
data_var.attrs.pop("standard_name")
# Cache the attributes.
self.attributes = copy.deepcopy(dataset.attrs)
return dataset_to_iris(dataset)
[docs]
@dataclass
class XCubeDataSource(esmvalcore.io.protocol.DataSource):
"""Data source for finding files on a local filesystem."""
name: str
"""A name identifying the data source."""
project: str
"""The project that the data source provides data for."""
priority: int
"""The priority of the data source. Lower values have priority."""
debug_info: str = field(init=False, repr=False, default="")
"""A string containing debug information when no data is found."""
data_store_id: str
"""Name of the data store.
A list of available data stores can be found in the `xcube documentation
<https://xcube.readthedocs.io/en/latest/dataaccess.html#available-data-stores>`__.
"""
data_store_params: dict[str, Any] = field(default_factory=dict, repr=False)
"""Parameters to use when creating the data store."""
open_params: dict[str, Any] = field(default_factory=dict, repr=False)
"""Parameters to use when opening the data."""
[docs]
def find_data(self, **facets: FacetValue) -> list[XCubeDataset]: # noqa: C901
# TODO: fix complexity
"""Find data.
Parameters
----------
**facets :
Find data matching these facets.
Returns
-------
:
A list of data elements that have been found.
"""
store = xcube.core.store.new_data_store(
self.data_store_id,
**self.data_store_params,
)
result = []
requested_short_names = facets.get("short_name", "*")
if isinstance(requested_short_names, str | int):
requested_short_names = [str(requested_short_names)]
requested_datasets = facets.get("dataset", "*")
if isinstance(requested_datasets, str | int):
requested_datasets = [str(requested_datasets)]
available_datasets = store.list_data_ids()
for data_id in available_datasets:
for dataset_pattern in requested_datasets:
if fnmatch.fnmatchcase(data_id, dataset_pattern):
description = store.describe_data(data_id)
available_short_names = list(description.data_vars)
short_names = [
short_name
for short_name in available_short_names
for short_name_pattern in requested_short_names
if fnmatch.fnmatchcase(short_name, short_name_pattern)
]
# TODO: Maybe this is too complicated and we should only
# decide which variables to keep/drop after load and conversion
# to iris cube.
open_params = copy.deepcopy(self.open_params)
open_params_schema = store.get_open_data_params_schema()
if "variable_names" in open_params_schema.properties:
open_params["variable_names"] = short_names
elif "drop_variables" in open_params_schema.properties:
drop_variables = {
short_name
for short_name in available_short_names
if short_name not in short_names
}
for coord in description.coords.values():
if bound_var := coord.attrs.get("bounds"):
drop_variables.remove(bound_var)
for data_var in description.data_vars.values():
# TODO: keep cell measures
for ancillary_var in data_var.attrs.get(
"ancillary_variables",
"",
).split():
drop_variables.remove(ancillary_var)
open_params["drop_variables"] = sorted(drop_variables)
timerange = f"{description.time_range[0]}/{description.time_range[1]}".replace(
"-",
"",
)
frequencies = {
"P1M": "mon",
}
frequency = frequencies[
description.attrs["time_coverage_resolution"]
]
dataset = XCubeDataset(
name=data_id,
facets={
"dataset": data_id,
"short_name": short_names
if len(short_names) > 1
else short_names[0],
"frequency": frequency,
"timerange": timerange,
},
store=store,
open_params=open_params,
)
dataset.attributes = description.attrs
result.append(dataset)
return result