Source code for canopy.core.field

import copy
import pandas as pd
from pandas.api.types import is_numeric_dtype, is_string_dtype
import xarray as xr
import numpy as np
import random
from datetime import datetime
from types import MappingProxyType
from typing import List, Optional, Union, Sequence, Callable, Literal, Any, SupportsFloat, cast
from typing_extensions import Self
from canopy.readers.registry import get_reader, get_format_description
from canopy.core.grid import get_grid, get_grid_type
from canopy.core.grid.grid_abc import Grid
from canopy.core.grid.grid_empty import GridEmpty
from canopy.core.grid.grid_lonlat import GridLonLat
from canopy.core.grid.grid_sites import GridSites
from canopy.core.redspec import RedSpec
from canopy.source_data import get_source_data
import regionmask # type: ignore
import canopy.core.frameops as frameops
import canopy.core.arithm as arithm
import janitor # type: ignore


def _validate(data: pd.DataFrame, grid: Grid):

    if not isinstance(data, pd.DataFrame):
        raise ValueError("'data' must be a pandas.DataFrame.")

    if 'time' not in data.index.names:
        raise ValueError("The DataFrame index must have at least a 'time' level.")

    if len(data.index.names) > 1 and data.index.names[-1] != 'time':
        raise ValueError("'time' must be the last level of the DataFrame's index.")

    if len(data.index.names) == 1 and not isinstance(data.index, pd.PeriodIndex) \
    or isinstance(data.index, pd.MultiIndex) and not isinstance(data.index.dtypes['time'], pd.PeriodDtype):
        raise ValueError("'time' level must be of pandas PeriodIndex type")

    if not is_string_dtype(data.columns):
        raise ValueError("The DataFrame column names (i.e. the Field's layer names) must be of string type.")

    # The spatial coordinates must be double precision floats
    spatial_levels = [x for x in data.index.names if x not in ['label', 'time']]
    for level in spatial_levels:
        index: pd.MultiIndex = cast(pd.MultiIndex, data.index)
        if index.dtypes[level] != 'float64':
            raise ValueError(f"Spatial level '{level}' must be a double precision floating point number.")

    if not isinstance(grid, Grid):
        raise ValueError("'grid' must be an instance of a Grid subclass.")

    # This is a special case that concerns all grid types, so it is checked here
    if 'label' in data.index.names:
        if not isinstance(grid, GridSites) and not isinstance(grid, GridEmpty):
            raise ValueError("A 'label' index is allowed only in combination with a grid of type 'sites' (or an empty grid).")
        if data.index.names[0] != 'label':
            raise ValueError("'label' must be the first level of the DataFrame's index.")

    # This is the same for all grids (although in some of the grids' validate_frame method it might be checked too)
    if not isinstance(grid, GridEmpty):
        for axis_key in grid.axis_names:
            if axis_key not in data.index.names and not grid.is_reduced(axis_key):
                raise ValueError(f"{axis_key} is not a reduced dimension, but it is also not present in the DataFrame's index.")

    # Coordinates must be float
    for axis_key in grid.axis_names:
        index = cast(pd.MultiIndex, data.index)
        if axis_key in data.index.names and not is_numeric_dtype(index.dtypes[axis_key]):
            raise ValueError(f"Spatial coordinates in axis '{axis_key}' must be of numeric type")

    # Check if dataframe and grid are compatible
    if not grid.validate_frame(data):
        raise ValueError("Supplied dataframe and grid are not compatible. Activate all warnings (`import warnings; warnings.filterwarnings('always', category=Warning)`) for details.")


[docs] class Field: """Container for data derived from model output or observations This object contains model output or observation data and associated grid information. It allows for basic data manipulation, such as time- and spatial reductions and slicing. The Field object is canopy's elemental interface between data and user. """ def __init__(self, data: pd.DataFrame, grid: Grid, modified: bool = False, source: str | None = None) -> None: """ Parameters ---------- data : pd.DataFrame Pandas DataFrame containing the Field's data (see specification below). grid : Grid A canopy Grid object (see Grid object documentation). modified : bool Whether the field being created has been modified (e.g. reduced). source : str The source to retrieve the file's metadata. The format for this argument is 'source:field'. For example, to read an LPJ-GUESS file and add metadata corresponding to Annual GPP: agpp = Field.from_file('/path/to/file/file_name.out', file_format='lpjg_annual', source='lpjguess:agpp') """ _validate(data, grid) self._data = data self._grid: Grid = grid self._modified: bool = modified if source is None: self._metadata: dict[str, str] = { 'name': '[no name]', 'description': '[no description]', 'units': '[no units]', } else: src, fld = source.split(':') source_data = get_source_data(src) self._metadata = { 'name': source_data['fields'][fld]['name'], 'description': source_data['fields'][fld]['description'], 'units': source_data['fields'][fld]['units'], } self._history: list[str] = [] self._timeop: str | None = None time_index = cast(pd.PeriodIndex, data.index.get_level_values('time')) self._time_freq: str = str(time_index.freqstr) # Order MultiIndex, necessary for slicing # Attribute _lexsort_depth is not to be used directly; but Pandas will raise an unsorted index error # after slicing, even if is_monotonic_increasing returns True, because _lexsort_depth is incorrectly set (I think) # Mypy doesn't register this attribute of pd.MultiIndex -> ignore if isinstance(self._data.index, pd.MultiIndex): if self._data.index._lexsort_depth < len(self._data.index.names): # type: ignore self._data = self._data.sort_index(level=0) def __getitem__(self, layers): """ Invoke `self.select_layers` with indexing notation """ return self.select_layers(layers) def __setitem__(self, layers: str | Sequence[str], value: Union[SupportsFloat, 'Field']) -> None: """ Set data values with indexing notation Parameters ----------- layers: str | Sequence[str] Layer or list of layers where the data is set value: int | float | Field If value is a number, all data values in the selected layers are set to that number. If a Field is passed, it is checked that the indices match exactly before copying the supplied Field's data onto the selected layers. """ if isinstance(layers, str): layers = [layers] if isinstance(value, SupportsFloat): self.data[layers] = float(value) what = f'the scalar {value}' elif isinstance(value, Field): frameops.check_indices_match(self.data, value.data) self.data[layers] = value.data.copy() what = f'layers {value.layers} from field: {value.name}.' else: raise ValueError("Can only assign another Field or a scalar.") self._modified = True log_message = f'Layers {list(layers)} were assigned {what}.' self.log(log_message) @property def data(self) -> pd.DataFrame: """pd.DataFrame: The Field's data""" return self._data @property def grid(self) -> Grid: """Grid: The grid associated with the data""" return self._grid @property def modified(self) -> bool: """bool: True if the Field has been modified after loading.""" return self._modified @property def metadata(self) -> MappingProxyType[str, str]: """dict: Field's metadata (units, etc...)""" return MappingProxyType(self._metadata) @property def name(self) -> str: return self.metadata['name'] @property def units(self) -> str: return self.metadata['units'] @property def description(self) -> str: return self.metadata['description'] @property def history(self) -> list[str]: """list: Keeps the history of modifications of the Field""" return self._history @property def timeop(self) -> str | None: """str: returns the reduction operation applied to the time dimension (if any).""" return self._timeop @property def time_freq(self) -> str: """str: returns the sampling frequency of the time axis.""" return self._time_freq @property def layers(self) -> list[str]: """str: returns a list with the Field's layer names.""" return list(self.data.columns) @property def coordinates(self) -> list: """Produce a list of spatial coordinates""" if len(self.data.index.names) == 1: raise ValueError("Field has only a 'time' level") levels_to_drop = [x for x in self.data.index.names if x in ['label', 'time']] return self.data.index.droplevel('time').unique().tolist() @property def sites(self) -> MappingProxyType[str,tuple[float,float]]: """ Get a dictionary of sites for unstructured grids If the data has a 'label' level, the keys will be the labels. If not, the keys will be the coordinate pair in string format: '(x_axis_name, y_axis_name)'. In both cases the values of the dictionary are the sites' coordinates as a tuple: (x, y) """ if not isinstance(self.grid, GridSites): raise ValueError("Site selection by label is only allowed for unstructured ('sites') grids.") xax, yax = self.grid.axis_names if self.grid.is_reduced(xax) or self.grid.is_reduced(yax): raise ValueError("The grid is spatially reduced.") # Non-reduced grid of type Sites can, per spec, have levels: # [label, x, y, time] # [x, y, time] site_tuples = self.data.index.droplevel('time').unique() if self.data.index.names[0] != 'label': return MappingProxyType({f'({x}, {y})': (x, y) for x, y in site_tuples}) else: return MappingProxyType({l: (x, y) for l, x, y in site_tuples})
[docs] @classmethod def from_file(cls, path: str, file_format: str = 'lpjg_annual', grid_type: str = 'lonlat', source: str | None = None, reader_params: dict[str, Any] | None = None, grid_params: dict[str, Any] | None = None) -> Self: """Construct a Field object from an LPJ-GUESS output file. Parameters ---------- path : str Path to file(s) file_format : str One of the registered file formats. See file readers documentation for details. For LPJ-GUESS standard output: - 'lpjg_annual' (annual output) - 'lpjg_monthly' (monthly output) The default format is 'lpjg_annual'. grid_type : str The type of grid associated to the data. source : str The source to retrieve the file's metadata. The format for this argument is 'source:field'. For example, to read an LPJ-GUESS file and add metadata corresponding to Annual GPP: agpp = Field.from_file('/path/to/file/file_name.out', file_format='lpjg_annual', source='lpjguess:agpp') reader_params : dict[str,Any] A dictionary of parameters passed to the file reader as keyword arguments. See the documentation of the different file readers for details. grid_params : dict[str,Any] A dictionary of parameters passed to the Grid.from_frame class method as keyword arguments. See the Grid object documentation. Returns ------- A Field object. """ # Read file or source and get pandas DataFrame if reader_params is None: reader_params = {} df = get_reader(file_format)(path, **reader_params) format_desc = get_format_description(file_format) # Construct grid from pandas DataFrame if grid_params is None: grid_params = {} grid = get_grid(grid_type).from_frame(df, **grid_params) field = cls(df, grid, source=source) field.log(f"Data read from {path}") field.add_md('file format', format_desc) field.add_md('original file', path) return field
[docs] def convert_units(self, factor: SupportsFloat, units: str, inplace: bool = False) -> Union[None, 'Field']: """Convert the field's units by a multiplicative factor Parameters ---------- factor : SupportsFloat Scalar by which all values in the field's data are multiplied units : str The new units to be set in the metadata 'units' entry. inplace : bool = False If True, the reduction is performed on the current Field. Notes ----- This function does not perform any checks on whether the passed factor or units string make sense! It just trusts that the user knows what they are doing. """ try: df = self._data*float(factor) except ValueError: raise TypeError("'factor' must be of numeric type, or convertible to 'float'.") log_message = f'Units changed to {units} (factor: {factor}).' if inplace: self._data = df self.set_md('units', units) self._modified = True self.log(log_message) return None else: grid = copy.deepcopy(self.grid) field = Field(df, grid, modified=True) field.copy_history(self) field.set_md('units', units) field.log(log_message) return field
[docs] def rename_layers(self, new_names: dict[str, str]) -> None: """Rename the field's layers Parameters ---------- new_names : dict A dictionary mapping existing layer names to their new names. """ self.data.rename(columns=new_names, inplace=True) self.log(f"Layers renamed: {new_names}")
# Metadata management methods # ---------------------------
[docs] def add_md(self, key: str, value: str) -> None: """Add an entry to the metadata dictionary. Parameters ---------- key : str The key under which the value is stored. value : str | float | int The value of the metadata entry. Example ------- # Load field anpp = Field.from_file("/path/to/file/anpp.out") # Add the GCM used to force the simulation to the metadata field.add_md('gcm', 'MPIESM2.1') Notes ----- If the key already exists in the metadata dictionary, a KeyError will be raised. In order to overwrite an entry, use the set_md method. """ if key in self._metadata: raise KeyError(f"Key {key} already exists (use set_md method to replace it)") else: self._metadata[key] = str(value)
[docs] def set_md(self, key: str, value: str) -> None: """Add or replace an entry in the metadata dictionary. Parameters ---------- key : str The key under which the value is stored. value : str | float | int The value of the metadata entry. Example ------- # Load field anpp = Field.from_file("/path/to/file/anpp.out") # Fails because 'name' is set in the constructor field.add_md('name', 'NPP') # KeyError field.set_md('name', 'NPP') # Okay """ self._metadata[key] = str(value)
[docs] def copy_md(self, field: 'Field') -> None: """Replace the metadata with a copy of the passed Field's. Parameters ---------- field : Field The field whose metadata is to be copied. """ self._metadata = copy.deepcopy(field._metadata)
[docs] def log(self, entries: str | list[str]) -> None: """File one or more entries in the Field's history log. The function files the passed entry in the history log and adds a timestamp. Parameters ---------- entries : str | list[str] The entry or list of entries to log. If a list is provided, the same timestamp will be attached to all entries. """ if isinstance(entries, str): entries = [entries] dt = datetime.now().strftime("%Y-%m-%d %H:%M:%S") for entry in entries: self._history.append(f"[{len(self.history) + 1}] {dt}: {entry}")
[docs] def copy_history(self, field: 'Field') -> None: """Replace history with a copy of the history of the passed Field. Parameters ---------- field : Field The field whose history is to be copied. """ self._history = copy.deepcopy(field.history)
# Data selection methods # ----------------------
[docs] def select_slice(self, slices: dict[str, tuple], inplace: bool = False) -> Union[None,'Field']: """Select a time or spatial slice from the Field. Parameters ---------- slices : dict[str, tuple] A dictionary of axis slices where the keys are the axis name and the values tuples representing the slice. inplace : bool = False If True, the slicing is performed in place. Returns ------- If inplace is True, the slicing is performed in place and the method returns None. Otherwise the method returns a Field object containing the sliced data. """ df, grid, log_message = frameops.select_slice(self.data, self.grid, slices) if inplace: index: pd.MultiIndex = cast(pd.MultiIndex, df.index) # Attribute _lexsort_depth is not to be used directly; but Pandas will raise an unsorted index error # after slicing, even if is_monotonic_increasing returns True, because _lexsort_depth is incorrectly set (I think) # Mypy doesn't register this attribute of pd.MultiIndex -> ignore if index._lexsort_depth < len(index.names): # type: ignore df = df.sort_index(level=0) self._data = df self._grid = grid self._modified = True self.log(log_message) return None else: field = Field(df, grid, modified = True) field.copy_history(self) field.copy_md(self) field.log(log_message) return field
[docs] def select_sites(self, site_labels: str | Sequence[str], inplace: bool = False) -> Union[None,'Field']: if not isinstance(self.grid, GridSites): raise ValueError("Site selection by label is only allowed for unstructured grids.") if self.data.index.names[0] != 'label': raise ValueError("DataFrame's index has no 'label' level.") if isinstance(site_labels, str): site_labels = [site_labels] if not all(isinstance(label, str) for label in site_labels): raise ValueError("Site labels must be of string type.") index_labels: pd.Index = cast(pd.Index, site_labels) df = self.data.loc[index_labels] log_message = f"Selected sites: {site_labels}." if inplace: self._data = df self.log(log_message) self._modified = True return None else: grid = GridSites() field = Field(df, grid, modified = True) field.copy_history(self) field.copy_md(self) field.log(log_message) return field
[docs] def drop_sites(self, site_labels: str | Sequence[str], inplace: bool = False) -> Union[None,'Field']: if not isinstance(self.grid, GridSites): raise ValueError("Site selection by label is only allowed for unstructured grids.") if self.data.index.names[0] != 'label': raise ValueError("DataFrame's index has no 'label' level.") if isinstance(site_labels, str): site_labels = [site_labels] if not all(isinstance(label, str) for label in site_labels): raise ValueError("Site labels must be of string type.") df = self.data.drop(site_labels) log_message = f"Dropped sites: {site_labels}." if inplace: self._data = df self.log(log_message) self._modified = True return None else: grid = GridSites() field = Field(df, grid, modified = True) field.copy_history(self) field.copy_md(self) field.log(log_message) return field
[docs] def sample_gridcells(self, size: int, inplace: bool = False): """Select random sample of gridcells Parameters ---------- size : int Size of the sample inplace : bool = False If True, the selection is performed in place Returns ------- A new field with a subset of 'size' randomly chosen gridcells or None if the operation is inplace. """ if len(self.data.index) == 1: raise ValueError("Field has no spatial levels.") coords = self.coordinates if len(coords) <= size: raise ValueError("The number of gridcells is <= than the requested sample size.") coords = random.sample(coords, size) # Using janitor's select function. Mypy thinks it's a pandas Series and will complain that it's not callable. # Possibly a bug in janitor's type hints? df = self.data.select(index=coords) # type: ignore grid = self.grid.crop(df) log_message = f"Selected random sample of {size} gridcells." if inplace: index: pd.MultiIndex = cast(pd.MultiIndex, df.index) # Attribute _lexsort_depth is not to be used directly; but Pandas will raise an unsorted index error # after slicing, even if is_monotonic_increasing returns True, because _lexsort_depth is incorrectly set (I think) # Mypy doesn't register this attribute of pd.MultiIndex -> ignore if index._lexsort_depth < len(index.names): # type: ignore df = df.sort_index(level=0) self._data = df self._grid = grid self.log(log_message) self._modified = True return None else: field = Field(df, grid, modified = True) field.copy_history(self) field.copy_md(self) field.log(log_message) return field
[docs] def select_region( self, region: Union[str, Sequence[str]], region_type: str = "country", inplace: bool = False, ) -> Union[None, 'Field']: """Select grid cells whose lon/lat fall inside a named geographical region. Parameters ---------- region : str | Sequence[str] One or more region identifiers understood by the chosen region set. For example: names for countries, or region names for Giorgi/SREX/AR6. Pass a list (or other sequence) to keep grid cells that fall inside any of the listed regions (logical OR). region_type : str = "country" Which predefined region set to use: - "country": natural_earth_v5_0_0.countries_10 (https://www.naturalearthdata.com/downloads/10m-cultural-vectors/10m-admin-0-countries/) - "giorgi": Giorgi regions (https://link.springer.com/article/10.1007/PL00013733) - "SREX"/"srex": IPCC SREX regions (https://www.ipcc-data.org/guidelines/pages/ar5_regions.html) - "AR6"/"ar6": IPCC AR6 regions (https://github.com/IPCC-WG1/Atlas/tree/devel/reference-regions) inplace : bool = False If True, the selection is performed in place. Returns ------- If inplace is True, the selection is performed in place and the method returns None. Otherwise the method returns a Field object containing only rows whose coordinates are inside the region. """ # Checks first if self.grid.grid_type != 'lonlat': raise ValueError("Region selection is only supported for lonlat grids.") # Select which region set to use based on region_type match region_type.lower(): case "country": regionmask_type = regionmask.defined_regions.natural_earth_v5_0_0.countries_10 case "giorgi": regionmask_type = regionmask.defined_regions.giorgi case "srex": regionmask_type = regionmask.defined_regions.srex case "ar6": regionmask_type = regionmask.defined_regions.ar6.all case _: raise ValueError("Unsupported region_type; expected one of {'country','giorgi','SREX','AR6'}.") if not region: raise ValueError("region must be a non-empty string or a non-empty sequence of strings.") region_names: List[str] = [region] if isinstance(region, str) else list(region) region_keys: List[Any] = [] for name in region_names: try: region_keys.append(regionmask_type.map_keys(name)) except KeyError: raise ValueError(f"Unknown region '{name}' for region_type '{region_type}'.") grid_lonlat = cast(GridLonLat, self.grid) # region_idx represents the union of the regions with names region_keys region_idx = regionmask_type[region_keys] region_mask = region_idx.mask(grid_lonlat.lons, grid_lonlat.lats) mask_values = region_mask.values lons = self.data.index.get_level_values("lon").to_numpy() lats = self.data.index.get_level_values("lat").to_numpy() lat_idx = np.clip( np.searchsorted(region_mask.lat.values, lats), 0, mask_values.shape[0] - 1 ) lon_idx = np.clip( np.searchsorted(region_mask.lon.values, lons), 0, mask_values.shape[1] - 1 ) # Inside at least one region → finite index; outside all listed regions → NaN. is_in_region = np.isfinite(mask_values[lat_idx, lon_idx]) # Apply the boolean mask to keep only rows inside the region(s) df = self.data.loc[is_in_region].copy() # Crop grid to match filtered data grid = self.grid.crop(df) region_md: str = ( region_names[0] if len(region_names) == 1 else str(region_names) ) if len(region_names) == 1: log_message = f"Selected region: '{region_names[0]}' (region_type='{region_type}')." else: log_message = ( f"Selected regions: {region_names} (combined, region_type='{region_type}')." ) if inplace: index: pd.MultiIndex = cast(pd.MultiIndex, df.index) if index._lexsort_depth < len(index.names): # type: ignore df = df.sort_index(level=0) self._data = df self._grid = grid self._modified = True self.set_md("region", region_md) self.set_md("region_type", region_type) self.log(log_message) return None else: field = Field(df, grid, modified=True) field.copy_history(self) field.copy_md(self) field.set_md("region", region_md) field.set_md("region_type", region_type) field.log(log_message) return field
[docs] def select_layers(self, layers: str | Sequence[str] | pd.Index, inplace: bool = False) -> Union[None,'Field']: """Select one or more layers from the Field. Parameters ---------- layers: str | Sequence[str] Layer name or list of layer names. inplace : bool = False If True, the selection is performed in place. Returns ------- If inplace is True, the selection is performed in place and the method returns None. Otherwise the method returns a Field object containing the selected data. """ if isinstance(layers, str): layers = [layers] log_message=f"Selected layers {list(layers)}." if inplace: self._data = self._data[layers] self._modified = True self.log(log_message) return None else: df = self._data[layers].copy() grid = copy.deepcopy(self._grid) field = Field(df, grid, modified=True) field.copy_history(self) field.copy_md(self) field.log(log_message) return field
[docs] def drop_layers(self, layers: str | Sequence[str] | pd.Index, inplace: bool = False) -> Union[None,'Field']: """Drop one or more layers from the Field. Parameters ---------- layers : str | list Layer name or list of layer names. inplace : bool = False If True, the layers are dropped from the current object. Returns ------- If inplace is True, the layers are dropped from the current Field and the method returns None. Otherwise the method returns a Field object with the selected data. """ if isinstance(layers, str): layers = [layers] log_message=f"Dropped layers {list(layers)}." if inplace: self._data.drop(layers, axis=1, inplace=True) self._modified = True self.log(log_message) return None else: df = self._data.drop(layers, axis=1) grid = copy.deepcopy(self._grid) field = Field(df, grid, modified=True) field.copy_history(self) field.log(log_message) return field
[docs] def filter(self, query: str, fill_nan: bool = False, inplace: bool = False) -> Union[None, 'Field']: '''Filter rows based on boolean query Parameters ---------- query: str The string describing the boolean query in terms of the index or layers fill_nan: bool If False, rows where the query is False are removed (default behaviour). If True, the resulting field's data has NaNs where the query is false. inplace: bool Whether to perform the operation in place Returns ------- If inplace is True, the filtering is performed on the current Field and the method returns None. Otherwise the method returns a Field object with the reduced data. Example -------- # Load field aaet = Field.from_file("/path/to/file/aaet.out") print(aaet.layers) # [..., 'Total'] # Filter rows for which layer 'Total' is greater than 100 aaet1 = aaet.filter('Total > 100') # Filter rows for which layer 'Total' is greater than 100 and layer 'C3G' is lower than 10 (inplace) aaet.filter('Total > 100 and C3G < 10', inplace=True) Notes ----- See pandas documentation for DataFrame.query() for more details on the query string. ''' if fill_nan: index = self.data.query(f'not ({query})').index if inplace: self.data.loc[index,:] = np.nan df = self.data self._modified = True else: df = self.data.copy() df.loc[index,:] = np.nan else: if inplace: self.data.query(query, inplace=True) df = self.data self._modified = True else: df = self.data.query(query) log_message = f"Filter data: query = '{query}'{', fill with NaNs.' if fill_nan else '.'}" if df.empty and not fill_nan: grid: Grid = GridEmpty() log_message += ' Filter operation yielded empty field.' else: grid = self.grid.crop(df) if inplace: self._data = df self._grid = grid self._modified = True self.log(log_message) return None else: field = Field(df, grid, modified=True) field.copy_history(self) field.copy_md(self) field.log(log_message) return field
# Data reduction methods # ----------------------
[docs] def reduce_layers(self, redop: str, layers: Optional[Sequence[str] | pd.Index] = None, name: Optional[str] = None, drop: bool = False, inplace: bool = False) -> Union[None,'Field']: """Perform a reduction operation on the Field's layers. Parameters ---------- redop : str The reduction operation. One of 'sum', 'av', 'maxLay', '/'. layers : None or a list of strings. List of names of layers to be reduced. If None, all layers are reduced. name : None or str. Name of the new layer to store the reduction. If None, the redop argument is used. drop : bool If True, the reduced layers are dropped from the data. inplace : bool If True, the layers are reduced in the current object. Returns ------- If inplace is True, the layers are reduced in the current Field and the method returns None. Otherwise the method returns a Field object with the reduced data. """ if layers is None: layers = self._data.columns # Copy avoids warning (error) df_red = self._data.copy() if name is None: name = f"{redop}" if redop == 'sum': df_red[name] = df_red[layers].sum(axis=1) if redop == 'av': df_red[name] = df_red[layers].mean(axis=1) if redop == 'maxLay': df_red[name] = df_red[layers].idxmax(axis=1) if redop == '/': if not len(layers) == 2: raise ValueError(f"Operation '{redop}' requires exactly two operands") df_red[name] = df_red[layers[0]] / df_red[layers[1]] if drop: layers_to_drop = [l for l in layers if l != name] df_red.drop(layers_to_drop, axis=1, inplace=True) log_message=f"Layer reduction operation '{redop}' applied to layers {list(layers)}, stored as layer '{name}'.{' Original layers were dropped.' if drop else ''}" if inplace: self._data = df_red self._modified = True self.log(log_message) return None else: grid = copy.deepcopy(self._grid) field = Field(df_red, grid, modified=True) field.copy_history(self) field.log(log_message) return field
[docs] def reduce_time(self, timeop: str, freq: str | None = None, inplace: bool = False) -> Union[None,'Field']: """Perform a reduction operation on the time axis. Parameters ---------- timeop : str The time reduction operation: one of 'av', 'sum'. freq : str | None = None A string specifying the frequency of the reduction. This is formed by an integer number and one of 'M', 'Y'. For example, to perform an average every five years, specify timeop='av' and freq='5Y'. If freq=None the whole time series is reduced. inplace : bool = False If True, the reduction is performed on the current Field. Returns ------- If inplace is True, the reduction is performed on the current Field and the method returns None. Otherwise the method returns a Field object with the reduced data. """ df_red, log_message = frameops.reduce_time(self._data, timeop, freq) if inplace: self._data = df_red self._timeop = timeop self._modified = True self.log(log_message) return None else: field_red = Field(df_red, copy.deepcopy(self._grid), modified=True) field_red._timeop = timeop field_red.copy_history(self) field_red.log(log_message) return field_red
[docs] def reduce_grid(self, gridop: str, axis: str = 'both', inplace: bool = False) -> Union[None,'Field']: """Perform a reduction operation on the spatial axes. Parameters ---------- gridop : str The spatial reduction operation: one of 'av', 'sum'. axis : str = 'both' A grid axis or 'both' (the default value). inplace : bool = False If True, the reduction is performed on the current Field. Returns ------- If inplace is True, the reduction is performed on the current Field and the method returns None. Otherwise the method returns a Field object with the reduced data. """ df_red, grid_red, log_message = frameops.reduce_grid(self.data, self.grid, gridop, axis) if inplace: self._data = df_red self._grid = grid_red self._modified = True self.log(log_message) return None else: field_red = Field(df_red, grid_red, modified=True) field_red.copy_history(self) field_red.log(log_message) return field_red
[docs] def reduce(self, redspec: RedSpec, inplace: bool = False) -> Union[None,'Field']: """Perform the selection/slicing/reduction operations specified in te passed RedSpec object. Parameters ---------- redspec : RedSpec A RedSpec object specifying how to slice and/or reduce the data. inplace : bool = False If True, the reduction is performed on the current Field. Returns ------- If inplace is True, the reduction is performed on the current Field and the method returns None. Otherwise the method returns a Field object with the reduced data. """ df_red, grid_red, log_message = frameops.apply_reduction(self._data, self._grid, redspec) time_index = cast(pd.PeriodIndex, df_red.index.get_level_values('time')) if inplace: self._data = df_red self._grid = grid_red self._modified = True self.log(log_message) if redspec.timeop is not None: self._timeop = redspec.timeop setattr(self, '_time_freq', str(time_index.freqstr)) return None else: field_red = Field(df_red, grid_red, modified=True) field_red.copy_history(self) field_red.log(log_message) return field_red
# Arithmethic operations # ----------------------
[docs] def apply(self, op: str | Callable, operand: SupportsFloat | list[str] | None = None, layers: str | list[str] | None = None, how: Literal['left', 'right'] = 'left', inplace: bool = False) -> Union[None,'Field']: """Apply an operation/function to selected layers Parameters ---------- op : str | Callable Operation to apply to layers. If a callable is passed, it must be numpy-vectorizable. operand : str | list[str] | None Operand to combine with layers through 'op'. Operand can be a constant number or the name of a layer. In the latter case, the operation will be performed element-wise. If 'op' is a Callable, this parameter is ignored. layers : str | list[str] | None A list of the names of the layers to apply the operation to. If None, the operation is applied to all layers. how : str = 'left' Position of the layers in the operation. This argument is relevant for non-commutative operations. For example, if how == 'left', a '-' operation will be layers - operand. If how = 'right', the operation will be operand - layers. inplace : bool Whether to perform the operation in place. Returns ------- A field with the modified layers, or None if the operation is performed in place. """ commutative_ops = { '+': frameops.apply_sum, 'sum': frameops.apply_sum, '*': frameops.apply_mul, 'mul': frameops.apply_mul, } non_commutative_ops = { '-': frameops.apply_sub, 'sub': frameops.apply_sub, '/': frameops.apply_div, 'div': frameops.apply_div, } if layers is None: layers = self.layers elif isinstance(layers, str): layers = [layers] if isinstance(operand, str): operand = [operand] if (op in commutative_ops or op in non_commutative_ops) and operand is None: raise ValueError(f"Operation {op} requires an operand") if inplace: df = self.data else: df = self.data.copy() if callable(op): frameops.apply_function(df, layers, op) elif op in commutative_ops: # Ignoring typing; at this point it's alredy known that operand is not None, but mypy's narrowing doesn't catch it commutative_ops[op](df, operand, layers) # type: ignore elif op in non_commutative_ops: # Ignoring typing; at this point it's alredy known that operand is not None, but mypy's narrowing doesn't catch it non_commutative_ops[op](df, operand, layers, how) # type: ignore else: raise ValueError(f"Operation must be a callable or one of {list(commutative_ops.keys()) + list(non_commutative_ops.keys())}.") log_message = f"Applied operation '{op}' to layers {layers}." if not inplace: grid = copy.deepcopy(self.grid) field = Field(df, grid, modified=True) field.copy_history(self) field.log(log_message) return field else: self._modified = True self.log(log_message) return None
def _check_arithm_operands(self, other: Union['Field', SupportsFloat]) -> None: if isinstance(other, Field): if self.grid.grid_type != other.grid.grid_type: raise ValueError("Field grids are of different type.") frameops.check_indices_match(self.data, other.data) else: try: other = float(other) except: raise ValueError("Operand must be a scalar or another field.") def _binary_operation_log_message( self, other: Union['Field', SupportsFloat], operation: str, how: Literal['left', 'right'], ) -> str: actions = { '+': 'added to', '-': 'subtracted from', '*': 'multiplied by', '/': 'divided by', } if operation in ['+', '-']: if isinstance(other, SupportsFloat): if how == 'left': log_message = f"Scalar {float(other)} was {actions[operation]} field layers." else: log_message = f"Field layers were {actions[operation]} scalar {float(other)}." else: if len(other.layers) == 1: log_message = f"Layer '{other.layers[0]}' from field '{other.name}' was {actions[operation]} all layers." elif len(self.layers) == 1: log_message = f"Layers {other.layers} from field '{other.name}' were {actions[operation]} layer '{self.layers[0]}', one layer at a time." else: log_message = f"Layers {other.layers} from field '{other.name}' were {actions[operation]} field layer-wise." elif operation in ['*', '/']: if isinstance(other, SupportsFloat): if how == 'left': log_message = f"Field layers were {actions[operation]} scalar {float(other)}." else: log_message = f"Scalar {float(other)} was {actions[operation]} field layers." else: if len(other.layers) == 1: log_message = f"Field layers were {actions[operation]} layer '{other.layers[0]}' from field '{other.name}'." elif len(self.layers) == 1: log_message = f"Layer '{self.layers[0]}' was {actions[operation]} layers {other.layers} from field '{other.name}', one layer at a time." else: log_message = f"Field was {actions[operation]} layers {other.layers} from field '{other.name}' layer-wise." return log_message def _binary_operation_new_name(self, other: Union['Field', SupportsFloat], operation: str, how: Literal['left', 'right']) -> str: if isinstance(other, Field): if len(other.layers) == 1: self_name = self.name other_name = f"{other.name}['{other.layers[0]}']" elif len(self.layers) == 1: self_name = f"{self.name}['{self.layers[0]}']" other_name = other.name else: self_name = self.name other_name = other.name else: self_name = self.name other_name = str(float(other)) if how == 'left': new_name = f"{self_name} {operation} {other_name}" else: new_name = f"{other_name} {operation} {self_name}" return new_name def _binary_operation_layers(self, other: Union['Field', SupportsFloat], operation: str, how: Literal['left', 'right']) -> List[str]: if isinstance(other, Field): if len(other.layers) == 1: layers = [ f"({l} {operation} {other.layers[0]})" for l in self.layers ] elif len(self.layers) == 1: layers = [ f"({self.layers[0]} {operation} {l})" for l in other.layers ] else: layers = [ f"({ls} {operation} {lo})" for ls, lo in zip(self.layers, other.layers) ] else: layers = [ f"({l} {operation} {float(other)})" for l in self.layers ] return layers def _binary_operation(self, other: Union['Field', SupportsFloat], operation: str, inplace: bool = False, how: Literal['left', 'right'] = 'left' ) -> 'Field': self._check_arithm_operands(other) if how == 'left': if isinstance(other, Field): df = arithm.binary_operation(self.data, other.data, operation) else: df = arithm.binary_operation(self.data, float(other), operation) elif how == 'right': # This cannot happen if other is a Field (it's the __radd__, __rsub__, ... case) # But mypy doesn't know that so we need to ignore type checking df = arithm.binary_operation(float(other), self.data, operation) # type: ignore else: raise ValueError("Argument 'how' can only take the values 'left' or 'right'.") df.columns = cast(pd.Index, self._binary_operation_layers(other, operation, how)) log_message = self._binary_operation_log_message(other, operation, how) new_name = self._binary_operation_new_name(other, operation, how) if inplace: self._data = df self.set_md('name', new_name) self.set_md('units', '[no units]') self.set_md('description', '[no description]') self.log(log_message) return self else: grid = copy.deepcopy(self.grid) field = Field(df, grid) field.copy_history(self) field.set_md('name', new_name) field.log(log_message) return field def __add__(self, other: Union['Field', SupportsFloat]) -> 'Field': return self._binary_operation(other, '+') def __sub__(self, other: Union['Field', SupportsFloat]) -> 'Field': return self._binary_operation(other, '-') def __mul__(self, other: Union['Field', SupportsFloat]) -> 'Field': return self._binary_operation(other, '*') def __truediv__(self, other: Union['Field', SupportsFloat]) -> 'Field': return self._binary_operation(other, '/') def __radd__(self, other: SupportsFloat) -> 'Field': return self._binary_operation(other, '+', how='right') def __rsub__(self, other: SupportsFloat) -> 'Field': return self._binary_operation(other, '-', how='right') def __rmul__(self, other: SupportsFloat) -> 'Field': return self._binary_operation(other, '*', how='right') def __rtruediv__(self, other: SupportsFloat) -> 'Field': return self._binary_operation(other, '/', how='right') def __iadd__(self, other: Union['Field', SupportsFloat]) -> 'Field': return self._binary_operation(other, '+', inplace=True) def __isub__(self, other: Union['Field', SupportsFloat]) -> 'Field': return self._binary_operation(other, '-', inplace=True) def __imul__(self, other: Union['Field', SupportsFloat]) -> 'Field': return self._binary_operation(other, '*', inplace=True) def __itruediv__(self, other: Union['Field', SupportsFloat]) -> 'Field': return self._binary_operation(other, '/', inplace=True) # Representation methods # ---------------------- def __str__(self) -> str: history_str = '\n'.join(self.history) if self.data.empty: repr_str = [ f"Field is empty!", "", f"History", f"-------", history_str, ] return '\n'.join(repr_str) metadata_str = '\n'.join([f"{k}: {v}" for k, v in self.metadata.items()]) grid_type = get_grid_type(self._grid) time_index = self.data.index.get_level_values('time') time_start = time_index.min().start_time time_end = time_index.max().end_time time_str = [f"Span: {time_start} - {time_end}", f"Frequency: {self.time_freq}", ] if self._timeop is not None: time_str.append(f"Reduction: '{self._timeop}'") repr_str = [ f"Data", f"----", metadata_str, "", f"Grid", f"----", f"{self._grid}", "\n" f"Time series", f"-----------", '\n'.join(time_str), "\n" f"History", f"-------", history_str, ] return "\n".join(repr_str) def __repr__(self) -> str: return self.__str__() # Output # ------
[docs] def to_xarray(self): raise NotImplementedError
#return self._data.to_xarray().sortby('lon').sortby('lat').sortby('time')
[docs] def to_netcdf(self, fname: Optional[str] = None): raise NotImplementedError
# TODO: This doesn't work w/ PeriodIndex! #data_xr = to_xarray(self._data) #data_xr.to_netcdf(fname)
[docs] def to_csv(self, fname: str): raise NotImplementedError
#self._data.to_csv(fname)