import copy
import pandas as pd
from pandas.api.types import is_numeric_dtype, is_string_dtype
import xarray as xr
import numpy as np
import random
from datetime import datetime
from types import MappingProxyType
from typing import List, Optional, Union, Sequence, Callable, Literal, Any, SupportsFloat, cast
from typing_extensions import Self
from canopy.readers.registry import get_reader, get_format_description
from canopy.core.grid import get_grid, get_grid_type
from canopy.core.grid.grid_abc import Grid
from canopy.core.grid.grid_empty import GridEmpty
from canopy.core.grid.grid_lonlat import GridLonLat
from canopy.core.grid.grid_sites import GridSites
from canopy.core.redspec import RedSpec
from canopy.source_data import get_source_data
import regionmask # type: ignore
import canopy.core.frameops as frameops
import canopy.core.arithm as arithm
import janitor # type: ignore
def _validate(data: pd.DataFrame, grid: Grid):
if not isinstance(data, pd.DataFrame):
raise ValueError("'data' must be a pandas.DataFrame.")
if 'time' not in data.index.names:
raise ValueError("The DataFrame index must have at least a 'time' level.")
if len(data.index.names) > 1 and data.index.names[-1] != 'time':
raise ValueError("'time' must be the last level of the DataFrame's index.")
if len(data.index.names) == 1 and not isinstance(data.index, pd.PeriodIndex) \
or isinstance(data.index, pd.MultiIndex) and not isinstance(data.index.dtypes['time'], pd.PeriodDtype):
raise ValueError("'time' level must be of pandas PeriodIndex type")
if not is_string_dtype(data.columns):
raise ValueError("The DataFrame column names (i.e. the Field's layer names) must be of string type.")
# The spatial coordinates must be double precision floats
spatial_levels = [x for x in data.index.names if x not in ['label', 'time']]
for level in spatial_levels:
index: pd.MultiIndex = cast(pd.MultiIndex, data.index)
if index.dtypes[level] != 'float64':
raise ValueError(f"Spatial level '{level}' must be a double precision floating point number.")
if not isinstance(grid, Grid):
raise ValueError("'grid' must be an instance of a Grid subclass.")
# This is a special case that concerns all grid types, so it is checked here
if 'label' in data.index.names:
if not isinstance(grid, GridSites) and not isinstance(grid, GridEmpty):
raise ValueError("A 'label' index is allowed only in combination with a grid of type 'sites' (or an empty grid).")
if data.index.names[0] != 'label':
raise ValueError("'label' must be the first level of the DataFrame's index.")
# This is the same for all grids (although in some of the grids' validate_frame method it might be checked too)
if not isinstance(grid, GridEmpty):
for axis_key in grid.axis_names:
if axis_key not in data.index.names and not grid.is_reduced(axis_key):
raise ValueError(f"{axis_key} is not a reduced dimension, but it is also not present in the DataFrame's index.")
# Coordinates must be float
for axis_key in grid.axis_names:
index = cast(pd.MultiIndex, data.index)
if axis_key in data.index.names and not is_numeric_dtype(index.dtypes[axis_key]):
raise ValueError(f"Spatial coordinates in axis '{axis_key}' must be of numeric type")
# Check if dataframe and grid are compatible
if not grid.validate_frame(data):
raise ValueError("Supplied dataframe and grid are not compatible. Activate all warnings (`import warnings; warnings.filterwarnings('always', category=Warning)`) for details.")
[docs]
class Field:
"""Container for data derived from model output or observations
This object contains model output or observation data and associated
grid information. It allows for basic data manipulation, such as
time- and spatial reductions and slicing. The Field object is canopy's
elemental interface between data and user.
"""
def __init__(self, data: pd.DataFrame, grid: Grid, modified: bool = False, source: str | None = None) -> None:
"""
Parameters
----------
data : pd.DataFrame
Pandas DataFrame containing the Field's data (see specification below).
grid : Grid
A canopy Grid object (see Grid object documentation).
modified : bool
Whether the field being created has been modified (e.g. reduced).
source : str
The source to retrieve the file's metadata. The format for this argument is 'source:field'. For example, to read
an LPJ-GUESS file and add metadata corresponding to Annual GPP:
agpp = Field.from_file('/path/to/file/file_name.out', file_format='lpjg_annual', source='lpjguess:agpp')
"""
_validate(data, grid)
self._data = data
self._grid: Grid = grid
self._modified: bool = modified
if source is None:
self._metadata: dict[str, str] = {
'name': '[no name]',
'description': '[no description]',
'units': '[no units]',
}
else:
src, fld = source.split(':')
source_data = get_source_data(src)
self._metadata = {
'name': source_data['fields'][fld]['name'],
'description': source_data['fields'][fld]['description'],
'units': source_data['fields'][fld]['units'],
}
self._history: list[str] = []
self._timeop: str | None = None
time_index = cast(pd.PeriodIndex, data.index.get_level_values('time'))
self._time_freq: str = str(time_index.freqstr)
# Order MultiIndex, necessary for slicing
# Attribute _lexsort_depth is not to be used directly; but Pandas will raise an unsorted index error
# after slicing, even if is_monotonic_increasing returns True, because _lexsort_depth is incorrectly set (I think)
# Mypy doesn't register this attribute of pd.MultiIndex -> ignore
if isinstance(self._data.index, pd.MultiIndex):
if self._data.index._lexsort_depth < len(self._data.index.names): # type: ignore
self._data = self._data.sort_index(level=0)
def __getitem__(self, layers):
"""
Invoke `self.select_layers` with indexing notation
"""
return self.select_layers(layers)
def __setitem__(self, layers: str | Sequence[str], value: Union[SupportsFloat, 'Field']) -> None:
"""
Set data values with indexing notation
Parameters
-----------
layers: str | Sequence[str]
Layer or list of layers where the data is set
value: int | float | Field
If value is a number, all data values in the selected layers are set to that number. If a
Field is passed, it is checked that the indices match exactly before copying the supplied
Field's data onto the selected layers.
"""
if isinstance(layers, str):
layers = [layers]
if isinstance(value, SupportsFloat):
self.data[layers] = float(value)
what = f'the scalar {value}'
elif isinstance(value, Field):
frameops.check_indices_match(self.data, value.data)
self.data[layers] = value.data.copy()
what = f'layers {value.layers} from field: {value.name}.'
else:
raise ValueError("Can only assign another Field or a scalar.")
self._modified = True
log_message = f'Layers {list(layers)} were assigned {what}.'
self.log(log_message)
@property
def data(self) -> pd.DataFrame:
"""pd.DataFrame: The Field's data"""
return self._data
@property
def grid(self) -> Grid:
"""Grid: The grid associated with the data"""
return self._grid
@property
def modified(self) -> bool:
"""bool: True if the Field has been modified after loading."""
return self._modified
@property
def metadata(self) -> MappingProxyType[str, str]:
"""dict: Field's metadata (units, etc...)"""
return MappingProxyType(self._metadata)
@property
def name(self) -> str:
return self.metadata['name']
@property
def units(self) -> str:
return self.metadata['units']
@property
def description(self) -> str:
return self.metadata['description']
@property
def history(self) -> list[str]:
"""list: Keeps the history of modifications of the Field"""
return self._history
@property
def timeop(self) -> str | None:
"""str: returns the reduction operation applied to the time dimension (if any)."""
return self._timeop
@property
def time_freq(self) -> str:
"""str: returns the sampling frequency of the time axis."""
return self._time_freq
@property
def layers(self) -> list[str]:
"""str: returns a list with the Field's layer names."""
return list(self.data.columns)
@property
def coordinates(self) -> list:
"""Produce a list of spatial coordinates"""
if len(self.data.index.names) == 1:
raise ValueError("Field has only a 'time' level")
levels_to_drop = [x for x in self.data.index.names if x in ['label', 'time']]
return self.data.index.droplevel('time').unique().tolist()
@property
def sites(self) -> MappingProxyType[str,tuple[float,float]]:
"""
Get a dictionary of sites for unstructured grids
If the data has a 'label' level, the keys will be the labels. If not, the keys will be the coordinate pair
in string format: '(x_axis_name, y_axis_name)'. In both cases the values of the dictionary are the sites'
coordinates as a tuple: (x, y)
"""
if not isinstance(self.grid, GridSites):
raise ValueError("Site selection by label is only allowed for unstructured ('sites') grids.")
xax, yax = self.grid.axis_names
if self.grid.is_reduced(xax) or self.grid.is_reduced(yax):
raise ValueError("The grid is spatially reduced.")
# Non-reduced grid of type Sites can, per spec, have levels:
# [label, x, y, time]
# [x, y, time]
site_tuples = self.data.index.droplevel('time').unique()
if self.data.index.names[0] != 'label':
return MappingProxyType({f'({x}, {y})': (x, y) for x, y in site_tuples})
else:
return MappingProxyType({l: (x, y) for l, x, y in site_tuples})
[docs]
@classmethod
def from_file(cls,
path: str,
file_format: str = 'lpjg_annual',
grid_type: str = 'lonlat',
source: str | None = None,
reader_params: dict[str, Any] | None = None,
grid_params: dict[str, Any] | None = None) -> Self:
"""Construct a Field object from an LPJ-GUESS output file.
Parameters
----------
path : str
Path to file(s)
file_format : str
One of the registered file formats. See file readers documentation for details. For LPJ-GUESS standard output:
- 'lpjg_annual' (annual output)
- 'lpjg_monthly' (monthly output)
The default format is 'lpjg_annual'.
grid_type : str
The type of grid associated to the data.
source : str
The source to retrieve the file's metadata. The format for this argument is 'source:field'. For example,
to read an LPJ-GUESS file and add metadata corresponding to Annual GPP:
agpp = Field.from_file('/path/to/file/file_name.out', file_format='lpjg_annual', source='lpjguess:agpp')
reader_params : dict[str,Any]
A dictionary of parameters passed to the file reader as keyword arguments.
See the documentation of the different file readers for details.
grid_params : dict[str,Any]
A dictionary of parameters passed to the Grid.from_frame class method as keyword arguments.
See the Grid object documentation.
Returns
-------
A Field object.
"""
# Read file or source and get pandas DataFrame
if reader_params is None:
reader_params = {}
df = get_reader(file_format)(path, **reader_params)
format_desc = get_format_description(file_format)
# Construct grid from pandas DataFrame
if grid_params is None:
grid_params = {}
grid = get_grid(grid_type).from_frame(df, **grid_params)
field = cls(df, grid, source=source)
field.log(f"Data read from {path}")
field.add_md('file format', format_desc)
field.add_md('original file', path)
return field
[docs]
def convert_units(self, factor: SupportsFloat, units: str, inplace: bool = False) -> Union[None, 'Field']:
"""Convert the field's units by a multiplicative factor
Parameters
----------
factor : SupportsFloat
Scalar by which all values in the field's data are multiplied
units : str
The new units to be set in the metadata 'units' entry.
inplace : bool = False
If True, the reduction is performed on the current Field.
Notes
-----
This function does not perform any checks on whether the passed factor or
units string make sense! It just trusts that the user knows what they are doing.
"""
try:
df = self._data*float(factor)
except ValueError:
raise TypeError("'factor' must be of numeric type, or convertible to 'float'.")
log_message = f'Units changed to {units} (factor: {factor}).'
if inplace:
self._data = df
self.set_md('units', units)
self._modified = True
self.log(log_message)
return None
else:
grid = copy.deepcopy(self.grid)
field = Field(df, grid, modified=True)
field.copy_history(self)
field.set_md('units', units)
field.log(log_message)
return field
[docs]
def rename_layers(self, new_names: dict[str, str]) -> None:
"""Rename the field's layers
Parameters
----------
new_names : dict
A dictionary mapping existing layer names to their new names.
"""
self.data.rename(columns=new_names, inplace=True)
self.log(f"Layers renamed: {new_names}")
# Metadata management methods
# ---------------------------
[docs]
def add_md(self, key: str, value: str) -> None:
"""Add an entry to the metadata dictionary.
Parameters
----------
key : str
The key under which the value is stored.
value : str | float | int
The value of the metadata entry.
Example
-------
# Load field
anpp = Field.from_file("/path/to/file/anpp.out")
# Add the GCM used to force the simulation to the metadata
field.add_md('gcm', 'MPIESM2.1')
Notes
-----
If the key already exists in the metadata dictionary, a KeyError will be raised. In
order to overwrite an entry, use the set_md method.
"""
if key in self._metadata:
raise KeyError(f"Key {key} already exists (use set_md method to replace it)")
else:
self._metadata[key] = str(value)
[docs]
def set_md(self, key: str, value: str) -> None:
"""Add or replace an entry in the metadata dictionary.
Parameters
----------
key : str
The key under which the value is stored.
value : str | float | int
The value of the metadata entry.
Example
-------
# Load field
anpp = Field.from_file("/path/to/file/anpp.out")
# Fails because 'name' is set in the constructor
field.add_md('name', 'NPP') # KeyError
field.set_md('name', 'NPP') # Okay
"""
self._metadata[key] = str(value)
[docs]
def copy_md(self, field: 'Field') -> None:
"""Replace the metadata with a copy of the passed Field's.
Parameters
----------
field : Field
The field whose metadata is to be copied.
"""
self._metadata = copy.deepcopy(field._metadata)
[docs]
def log(self, entries: str | list[str]) -> None:
"""File one or more entries in the Field's history log.
The function files the passed entry in the history log and
adds a timestamp.
Parameters
----------
entries : str | list[str]
The entry or list of entries to log. If a list is provided,
the same timestamp will be attached to all entries.
"""
if isinstance(entries, str):
entries = [entries]
dt = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
for entry in entries:
self._history.append(f"[{len(self.history) + 1}] {dt}: {entry}")
[docs]
def copy_history(self, field: 'Field') -> None:
"""Replace history with a copy of the history of the passed Field.
Parameters
----------
field : Field
The field whose history is to be copied.
"""
self._history = copy.deepcopy(field.history)
# Data selection methods
# ----------------------
[docs]
def select_slice(self, slices: dict[str, tuple],
inplace: bool = False) -> Union[None,'Field']:
"""Select a time or spatial slice from the Field.
Parameters
----------
slices : dict[str, tuple]
A dictionary of axis slices where the keys are the axis name and the values tuples representing
the slice.
inplace : bool = False
If True, the slicing is performed in place.
Returns
-------
If inplace is True, the slicing is performed in place and the method returns None.
Otherwise the method returns a Field object containing the sliced data.
"""
df, grid, log_message = frameops.select_slice(self.data, self.grid, slices)
if inplace:
index: pd.MultiIndex = cast(pd.MultiIndex, df.index)
# Attribute _lexsort_depth is not to be used directly; but Pandas will raise an unsorted index error
# after slicing, even if is_monotonic_increasing returns True, because _lexsort_depth is incorrectly set (I think)
# Mypy doesn't register this attribute of pd.MultiIndex -> ignore
if index._lexsort_depth < len(index.names): # type: ignore
df = df.sort_index(level=0)
self._data = df
self._grid = grid
self._modified = True
self.log(log_message)
return None
else:
field = Field(df, grid, modified = True)
field.copy_history(self)
field.copy_md(self)
field.log(log_message)
return field
[docs]
def select_sites(self, site_labels: str | Sequence[str], inplace: bool = False) -> Union[None,'Field']:
if not isinstance(self.grid, GridSites):
raise ValueError("Site selection by label is only allowed for unstructured grids.")
if self.data.index.names[0] != 'label':
raise ValueError("DataFrame's index has no 'label' level.")
if isinstance(site_labels, str):
site_labels = [site_labels]
if not all(isinstance(label, str) for label in site_labels):
raise ValueError("Site labels must be of string type.")
index_labels: pd.Index = cast(pd.Index, site_labels)
df = self.data.loc[index_labels]
log_message = f"Selected sites: {site_labels}."
if inplace:
self._data = df
self.log(log_message)
self._modified = True
return None
else:
grid = GridSites()
field = Field(df, grid, modified = True)
field.copy_history(self)
field.copy_md(self)
field.log(log_message)
return field
[docs]
def drop_sites(self, site_labels: str | Sequence[str], inplace: bool = False) -> Union[None,'Field']:
if not isinstance(self.grid, GridSites):
raise ValueError("Site selection by label is only allowed for unstructured grids.")
if self.data.index.names[0] != 'label':
raise ValueError("DataFrame's index has no 'label' level.")
if isinstance(site_labels, str):
site_labels = [site_labels]
if not all(isinstance(label, str) for label in site_labels):
raise ValueError("Site labels must be of string type.")
df = self.data.drop(site_labels)
log_message = f"Dropped sites: {site_labels}."
if inplace:
self._data = df
self.log(log_message)
self._modified = True
return None
else:
grid = GridSites()
field = Field(df, grid, modified = True)
field.copy_history(self)
field.copy_md(self)
field.log(log_message)
return field
[docs]
def sample_gridcells(self, size: int, inplace: bool = False):
"""Select random sample of gridcells
Parameters
----------
size : int
Size of the sample
inplace : bool = False
If True, the selection is performed in place
Returns
-------
A new field with a subset of 'size' randomly chosen gridcells or None
if the operation is inplace.
"""
if len(self.data.index) == 1:
raise ValueError("Field has no spatial levels.")
coords = self.coordinates
if len(coords) <= size:
raise ValueError("The number of gridcells is <= than the requested sample size.")
coords = random.sample(coords, size)
# Using janitor's select function. Mypy thinks it's a pandas Series and will complain that it's not callable.
# Possibly a bug in janitor's type hints?
df = self.data.select(index=coords) # type: ignore
grid = self.grid.crop(df)
log_message = f"Selected random sample of {size} gridcells."
if inplace:
index: pd.MultiIndex = cast(pd.MultiIndex, df.index)
# Attribute _lexsort_depth is not to be used directly; but Pandas will raise an unsorted index error
# after slicing, even if is_monotonic_increasing returns True, because _lexsort_depth is incorrectly set (I think)
# Mypy doesn't register this attribute of pd.MultiIndex -> ignore
if index._lexsort_depth < len(index.names): # type: ignore
df = df.sort_index(level=0)
self._data = df
self._grid = grid
self.log(log_message)
self._modified = True
return None
else:
field = Field(df, grid, modified = True)
field.copy_history(self)
field.copy_md(self)
field.log(log_message)
return field
[docs]
def select_region(
self,
region: Union[str, Sequence[str]],
region_type: str = "country",
inplace: bool = False,
) -> Union[None, 'Field']:
"""Select grid cells whose lon/lat fall inside a named geographical region.
Parameters
----------
region : str | Sequence[str]
One or more region identifiers understood by the chosen region set.
For example: names for countries, or region names for Giorgi/SREX/AR6.
Pass a list (or other sequence) to keep grid cells that fall inside any
of the listed regions (logical OR).
region_type : str = "country"
Which predefined region set to use:
- "country": natural_earth_v5_0_0.countries_10
(https://www.naturalearthdata.com/downloads/10m-cultural-vectors/10m-admin-0-countries/)
- "giorgi": Giorgi regions
(https://link.springer.com/article/10.1007/PL00013733)
- "SREX"/"srex": IPCC SREX regions
(https://www.ipcc-data.org/guidelines/pages/ar5_regions.html)
- "AR6"/"ar6": IPCC AR6 regions
(https://github.com/IPCC-WG1/Atlas/tree/devel/reference-regions)
inplace : bool = False
If True, the selection is performed in place.
Returns
-------
If inplace is True, the selection is performed in place and the method returns None.
Otherwise the method returns a Field object containing only rows whose coordinates
are inside the region.
"""
# Checks first
if self.grid.grid_type != 'lonlat':
raise ValueError("Region selection is only supported for lonlat grids.")
# Select which region set to use based on region_type
match region_type.lower():
case "country":
regionmask_type = regionmask.defined_regions.natural_earth_v5_0_0.countries_10
case "giorgi":
regionmask_type = regionmask.defined_regions.giorgi
case "srex":
regionmask_type = regionmask.defined_regions.srex
case "ar6":
regionmask_type = regionmask.defined_regions.ar6.all
case _:
raise ValueError("Unsupported region_type; expected one of {'country','giorgi','SREX','AR6'}.")
if not region:
raise ValueError("region must be a non-empty string or a non-empty sequence of strings.")
region_names: List[str] = [region] if isinstance(region, str) else list(region)
region_keys: List[Any] = []
for name in region_names:
try:
region_keys.append(regionmask_type.map_keys(name))
except KeyError:
raise ValueError(f"Unknown region '{name}' for region_type '{region_type}'.")
grid_lonlat = cast(GridLonLat, self.grid)
# region_idx represents the union of the regions with names region_keys
region_idx = regionmask_type[region_keys]
region_mask = region_idx.mask(grid_lonlat.lons, grid_lonlat.lats)
mask_values = region_mask.values
lons = self.data.index.get_level_values("lon").to_numpy()
lats = self.data.index.get_level_values("lat").to_numpy()
lat_idx = np.clip(
np.searchsorted(region_mask.lat.values, lats), 0, mask_values.shape[0] - 1
)
lon_idx = np.clip(
np.searchsorted(region_mask.lon.values, lons), 0, mask_values.shape[1] - 1
)
# Inside at least one region → finite index; outside all listed regions → NaN.
is_in_region = np.isfinite(mask_values[lat_idx, lon_idx])
# Apply the boolean mask to keep only rows inside the region(s)
df = self.data.loc[is_in_region].copy()
# Crop grid to match filtered data
grid = self.grid.crop(df)
region_md: str = (
region_names[0] if len(region_names) == 1 else str(region_names)
)
if len(region_names) == 1:
log_message = f"Selected region: '{region_names[0]}' (region_type='{region_type}')."
else:
log_message = (
f"Selected regions: {region_names} (combined, region_type='{region_type}')."
)
if inplace:
index: pd.MultiIndex = cast(pd.MultiIndex, df.index)
if index._lexsort_depth < len(index.names): # type: ignore
df = df.sort_index(level=0)
self._data = df
self._grid = grid
self._modified = True
self.set_md("region", region_md)
self.set_md("region_type", region_type)
self.log(log_message)
return None
else:
field = Field(df, grid, modified=True)
field.copy_history(self)
field.copy_md(self)
field.set_md("region", region_md)
field.set_md("region_type", region_type)
field.log(log_message)
return field
[docs]
def select_layers(self, layers: str | Sequence[str] | pd.Index, inplace: bool = False) -> Union[None,'Field']:
"""Select one or more layers from the Field.
Parameters
----------
layers: str | Sequence[str]
Layer name or list of layer names.
inplace : bool = False
If True, the selection is performed in place.
Returns
-------
If inplace is True, the selection is performed in place and the method returns None.
Otherwise the method returns a Field object containing the selected data.
"""
if isinstance(layers, str):
layers = [layers]
log_message=f"Selected layers {list(layers)}."
if inplace:
self._data = self._data[layers]
self._modified = True
self.log(log_message)
return None
else:
df = self._data[layers].copy()
grid = copy.deepcopy(self._grid)
field = Field(df, grid, modified=True)
field.copy_history(self)
field.copy_md(self)
field.log(log_message)
return field
[docs]
def drop_layers(self, layers: str | Sequence[str] | pd.Index, inplace: bool = False) -> Union[None,'Field']:
"""Drop one or more layers from the Field.
Parameters
----------
layers : str | list
Layer name or list of layer names.
inplace : bool = False
If True, the layers are dropped from the current object.
Returns
-------
If inplace is True, the layers are dropped from the current Field and the method returns None.
Otherwise the method returns a Field object with the selected data.
"""
if isinstance(layers, str):
layers = [layers]
log_message=f"Dropped layers {list(layers)}."
if inplace:
self._data.drop(layers, axis=1, inplace=True)
self._modified = True
self.log(log_message)
return None
else:
df = self._data.drop(layers, axis=1)
grid = copy.deepcopy(self._grid)
field = Field(df, grid, modified=True)
field.copy_history(self)
field.log(log_message)
return field
[docs]
def filter(self, query: str, fill_nan: bool = False, inplace: bool = False) -> Union[None, 'Field']:
'''Filter rows based on boolean query
Parameters
----------
query: str
The string describing the boolean query in terms of the index or layers
fill_nan: bool
If False, rows where the query is False are removed (default behaviour).
If True, the resulting field's data has NaNs where the query is false.
inplace: bool
Whether to perform the operation in place
Returns
-------
If inplace is True, the filtering is performed on the current Field and the method returns None.
Otherwise the method returns a Field object with the reduced data.
Example
--------
# Load field
aaet = Field.from_file("/path/to/file/aaet.out")
print(aaet.layers) # [..., 'Total']
# Filter rows for which layer 'Total' is greater than 100
aaet1 = aaet.filter('Total > 100')
# Filter rows for which layer 'Total' is greater than 100 and layer 'C3G' is lower than 10 (inplace)
aaet.filter('Total > 100 and C3G < 10', inplace=True)
Notes
-----
See pandas documentation for DataFrame.query() for more details on the query string.
'''
if fill_nan:
index = self.data.query(f'not ({query})').index
if inplace:
self.data.loc[index,:] = np.nan
df = self.data
self._modified = True
else:
df = self.data.copy()
df.loc[index,:] = np.nan
else:
if inplace:
self.data.query(query, inplace=True)
df = self.data
self._modified = True
else:
df = self.data.query(query)
log_message = f"Filter data: query = '{query}'{', fill with NaNs.' if fill_nan else '.'}"
if df.empty and not fill_nan:
grid: Grid = GridEmpty()
log_message += ' Filter operation yielded empty field.'
else:
grid = self.grid.crop(df)
if inplace:
self._data = df
self._grid = grid
self._modified = True
self.log(log_message)
return None
else:
field = Field(df, grid, modified=True)
field.copy_history(self)
field.copy_md(self)
field.log(log_message)
return field
# Data reduction methods
# ----------------------
[docs]
def reduce_layers(self, redop: str, layers: Optional[Sequence[str] | pd.Index] = None,
name: Optional[str] = None, drop: bool = False, inplace: bool = False) -> Union[None,'Field']:
"""Perform a reduction operation on the Field's layers.
Parameters
----------
redop : str
The reduction operation. One of 'sum', 'av', 'maxLay', '/'.
layers : None or a list of strings.
List of names of layers to be reduced. If None, all layers are reduced.
name : None or str.
Name of the new layer to store the reduction. If None, the redop argument is used.
drop : bool
If True, the reduced layers are dropped from the data.
inplace : bool
If True, the layers are reduced in the current object.
Returns
-------
If inplace is True, the layers are reduced in the current Field and the method returns None.
Otherwise the method returns a Field object with the reduced data.
"""
if layers is None:
layers = self._data.columns
# Copy avoids warning (error)
df_red = self._data.copy()
if name is None:
name = f"{redop}"
if redop == 'sum':
df_red[name] = df_red[layers].sum(axis=1)
if redop == 'av':
df_red[name] = df_red[layers].mean(axis=1)
if redop == 'maxLay':
df_red[name] = df_red[layers].idxmax(axis=1)
if redop == '/':
if not len(layers) == 2:
raise ValueError(f"Operation '{redop}' requires exactly two operands")
df_red[name] = df_red[layers[0]] / df_red[layers[1]]
if drop:
layers_to_drop = [l for l in layers if l != name]
df_red.drop(layers_to_drop, axis=1, inplace=True)
log_message=f"Layer reduction operation '{redop}' applied to layers {list(layers)}, stored as layer '{name}'.{' Original layers were dropped.' if drop else ''}"
if inplace:
self._data = df_red
self._modified = True
self.log(log_message)
return None
else:
grid = copy.deepcopy(self._grid)
field = Field(df_red, grid, modified=True)
field.copy_history(self)
field.log(log_message)
return field
[docs]
def reduce_time(self, timeop: str, freq: str | None = None, inplace: bool = False) -> Union[None,'Field']:
"""Perform a reduction operation on the time axis.
Parameters
----------
timeop : str
The time reduction operation: one of 'av', 'sum'.
freq : str | None = None
A string specifying the frequency of the reduction. This is formed by an integer number and
one of 'M', 'Y'. For example, to perform an average every five years, specify timeop='av' and
freq='5Y'. If freq=None the whole time series is reduced.
inplace : bool = False
If True, the reduction is performed on the current Field.
Returns
-------
If inplace is True, the reduction is performed on the current Field and the method returns None.
Otherwise the method returns a Field object with the reduced data.
"""
df_red, log_message = frameops.reduce_time(self._data, timeop, freq)
if inplace:
self._data = df_red
self._timeop = timeop
self._modified = True
self.log(log_message)
return None
else:
field_red = Field(df_red, copy.deepcopy(self._grid), modified=True)
field_red._timeop = timeop
field_red.copy_history(self)
field_red.log(log_message)
return field_red
[docs]
def reduce_grid(self, gridop: str, axis: str = 'both', inplace: bool = False) -> Union[None,'Field']:
"""Perform a reduction operation on the spatial axes.
Parameters
----------
gridop : str
The spatial reduction operation: one of 'av', 'sum'.
axis : str = 'both'
A grid axis or 'both' (the default value).
inplace : bool = False
If True, the reduction is performed on the current Field.
Returns
-------
If inplace is True, the reduction is performed on the current Field and the method returns None.
Otherwise the method returns a Field object with the reduced data.
"""
df_red, grid_red, log_message = frameops.reduce_grid(self.data, self.grid, gridop, axis)
if inplace:
self._data = df_red
self._grid = grid_red
self._modified = True
self.log(log_message)
return None
else:
field_red = Field(df_red, grid_red, modified=True)
field_red.copy_history(self)
field_red.log(log_message)
return field_red
[docs]
def reduce(self, redspec: RedSpec, inplace: bool = False) -> Union[None,'Field']:
"""Perform the selection/slicing/reduction operations specified in te passed RedSpec object.
Parameters
----------
redspec : RedSpec
A RedSpec object specifying how to slice and/or reduce the data.
inplace : bool = False
If True, the reduction is performed on the current Field.
Returns
-------
If inplace is True, the reduction is performed on the current Field and the method returns None.
Otherwise the method returns a Field object with the reduced data.
"""
df_red, grid_red, log_message = frameops.apply_reduction(self._data, self._grid, redspec)
time_index = cast(pd.PeriodIndex, df_red.index.get_level_values('time'))
if inplace:
self._data = df_red
self._grid = grid_red
self._modified = True
self.log(log_message)
if redspec.timeop is not None:
self._timeop = redspec.timeop
setattr(self, '_time_freq', str(time_index.freqstr))
return None
else:
field_red = Field(df_red, grid_red, modified=True)
field_red.copy_history(self)
field_red.log(log_message)
return field_red
# Arithmethic operations
# ----------------------
[docs]
def apply(self, op: str | Callable, operand: SupportsFloat | list[str] | None = None, layers: str | list[str] | None = None, how: Literal['left', 'right'] = 'left', inplace: bool = False) -> Union[None,'Field']:
"""Apply an operation/function to selected layers
Parameters
----------
op : str | Callable
Operation to apply to layers. If a callable is passed, it must be numpy-vectorizable.
operand : str | list[str] | None
Operand to combine with layers through 'op'. Operand can be a constant number or the name of a layer. In the latter
case, the operation will be performed element-wise. If 'op' is a Callable, this parameter is ignored.
layers : str | list[str] | None
A list of the names of the layers to apply the operation to. If None, the operation is applied to all layers.
how : str = 'left'
Position of the layers in the operation. This argument is relevant for non-commutative operations. For example, if
how == 'left', a '-' operation will be layers - operand. If how = 'right', the operation will be operand - layers.
inplace : bool
Whether to perform the operation in place.
Returns
-------
A field with the modified layers, or None if the operation is performed in place.
"""
commutative_ops = {
'+': frameops.apply_sum,
'sum': frameops.apply_sum,
'*': frameops.apply_mul,
'mul': frameops.apply_mul,
}
non_commutative_ops = {
'-': frameops.apply_sub,
'sub': frameops.apply_sub,
'/': frameops.apply_div,
'div': frameops.apply_div,
}
if layers is None:
layers = self.layers
elif isinstance(layers, str):
layers = [layers]
if isinstance(operand, str):
operand = [operand]
if (op in commutative_ops or op in non_commutative_ops) and operand is None:
raise ValueError(f"Operation {op} requires an operand")
if inplace:
df = self.data
else:
df = self.data.copy()
if callable(op):
frameops.apply_function(df, layers, op)
elif op in commutative_ops:
# Ignoring typing; at this point it's alredy known that operand is not None, but mypy's narrowing doesn't catch it
commutative_ops[op](df, operand, layers) # type: ignore
elif op in non_commutative_ops:
# Ignoring typing; at this point it's alredy known that operand is not None, but mypy's narrowing doesn't catch it
non_commutative_ops[op](df, operand, layers, how) # type: ignore
else:
raise ValueError(f"Operation must be a callable or one of {list(commutative_ops.keys()) + list(non_commutative_ops.keys())}.")
log_message = f"Applied operation '{op}' to layers {layers}."
if not inplace:
grid = copy.deepcopy(self.grid)
field = Field(df, grid, modified=True)
field.copy_history(self)
field.log(log_message)
return field
else:
self._modified = True
self.log(log_message)
return None
def _check_arithm_operands(self, other: Union['Field', SupportsFloat]) -> None:
if isinstance(other, Field):
if self.grid.grid_type != other.grid.grid_type:
raise ValueError("Field grids are of different type.")
frameops.check_indices_match(self.data, other.data)
else:
try:
other = float(other)
except:
raise ValueError("Operand must be a scalar or another field.")
def _binary_operation_log_message(
self,
other: Union['Field', SupportsFloat],
operation: str,
how: Literal['left', 'right'],
) -> str:
actions = {
'+': 'added to',
'-': 'subtracted from',
'*': 'multiplied by',
'/': 'divided by',
}
if operation in ['+', '-']:
if isinstance(other, SupportsFloat):
if how == 'left':
log_message = f"Scalar {float(other)} was {actions[operation]} field layers."
else:
log_message = f"Field layers were {actions[operation]} scalar {float(other)}."
else:
if len(other.layers) == 1:
log_message = f"Layer '{other.layers[0]}' from field '{other.name}' was {actions[operation]} all layers."
elif len(self.layers) == 1:
log_message = f"Layers {other.layers} from field '{other.name}' were {actions[operation]} layer '{self.layers[0]}', one layer at a time."
else:
log_message = f"Layers {other.layers} from field '{other.name}' were {actions[operation]} field layer-wise."
elif operation in ['*', '/']:
if isinstance(other, SupportsFloat):
if how == 'left':
log_message = f"Field layers were {actions[operation]} scalar {float(other)}."
else:
log_message = f"Scalar {float(other)} was {actions[operation]} field layers."
else:
if len(other.layers) == 1:
log_message = f"Field layers were {actions[operation]} layer '{other.layers[0]}' from field '{other.name}'."
elif len(self.layers) == 1:
log_message = f"Layer '{self.layers[0]}' was {actions[operation]} layers {other.layers} from field '{other.name}', one layer at a time."
else:
log_message = f"Field was {actions[operation]} layers {other.layers} from field '{other.name}' layer-wise."
return log_message
def _binary_operation_new_name(self, other: Union['Field', SupportsFloat], operation: str, how: Literal['left', 'right']) -> str:
if isinstance(other, Field):
if len(other.layers) == 1:
self_name = self.name
other_name = f"{other.name}['{other.layers[0]}']"
elif len(self.layers) == 1:
self_name = f"{self.name}['{self.layers[0]}']"
other_name = other.name
else:
self_name = self.name
other_name = other.name
else:
self_name = self.name
other_name = str(float(other))
if how == 'left':
new_name = f"{self_name} {operation} {other_name}"
else:
new_name = f"{other_name} {operation} {self_name}"
return new_name
def _binary_operation_layers(self, other: Union['Field', SupportsFloat], operation: str, how: Literal['left', 'right']) -> List[str]:
if isinstance(other, Field):
if len(other.layers) == 1:
layers = [ f"({l} {operation} {other.layers[0]})" for l in self.layers ]
elif len(self.layers) == 1:
layers = [ f"({self.layers[0]} {operation} {l})" for l in other.layers ]
else:
layers = [ f"({ls} {operation} {lo})" for ls, lo in zip(self.layers, other.layers) ]
else:
layers = [ f"({l} {operation} {float(other)})" for l in self.layers ]
return layers
def _binary_operation(self,
other: Union['Field', SupportsFloat],
operation: str,
inplace: bool = False,
how: Literal['left', 'right'] = 'left'
) -> 'Field':
self._check_arithm_operands(other)
if how == 'left':
if isinstance(other, Field):
df = arithm.binary_operation(self.data, other.data, operation)
else:
df = arithm.binary_operation(self.data, float(other), operation)
elif how == 'right':
# This cannot happen if other is a Field (it's the __radd__, __rsub__, ... case)
# But mypy doesn't know that so we need to ignore type checking
df = arithm.binary_operation(float(other), self.data, operation) # type: ignore
else:
raise ValueError("Argument 'how' can only take the values 'left' or 'right'.")
df.columns = cast(pd.Index, self._binary_operation_layers(other, operation, how))
log_message = self._binary_operation_log_message(other, operation, how)
new_name = self._binary_operation_new_name(other, operation, how)
if inplace:
self._data = df
self.set_md('name', new_name)
self.set_md('units', '[no units]')
self.set_md('description', '[no description]')
self.log(log_message)
return self
else:
grid = copy.deepcopy(self.grid)
field = Field(df, grid)
field.copy_history(self)
field.set_md('name', new_name)
field.log(log_message)
return field
def __add__(self, other: Union['Field', SupportsFloat]) -> 'Field':
return self._binary_operation(other, '+')
def __sub__(self, other: Union['Field', SupportsFloat]) -> 'Field':
return self._binary_operation(other, '-')
def __mul__(self, other: Union['Field', SupportsFloat]) -> 'Field':
return self._binary_operation(other, '*')
def __truediv__(self, other: Union['Field', SupportsFloat]) -> 'Field':
return self._binary_operation(other, '/')
def __radd__(self, other: SupportsFloat) -> 'Field':
return self._binary_operation(other, '+', how='right')
def __rsub__(self, other: SupportsFloat) -> 'Field':
return self._binary_operation(other, '-', how='right')
def __rmul__(self, other: SupportsFloat) -> 'Field':
return self._binary_operation(other, '*', how='right')
def __rtruediv__(self, other: SupportsFloat) -> 'Field':
return self._binary_operation(other, '/', how='right')
def __iadd__(self, other: Union['Field', SupportsFloat]) -> 'Field':
return self._binary_operation(other, '+', inplace=True)
def __isub__(self, other: Union['Field', SupportsFloat]) -> 'Field':
return self._binary_operation(other, '-', inplace=True)
def __imul__(self, other: Union['Field', SupportsFloat]) -> 'Field':
return self._binary_operation(other, '*', inplace=True)
def __itruediv__(self, other: Union['Field', SupportsFloat]) -> 'Field':
return self._binary_operation(other, '/', inplace=True)
# Representation methods
# ----------------------
def __str__(self) -> str:
history_str = '\n'.join(self.history)
if self.data.empty:
repr_str = [
f"Field is empty!",
"",
f"History",
f"-------",
history_str,
]
return '\n'.join(repr_str)
metadata_str = '\n'.join([f"{k}: {v}" for k, v in self.metadata.items()])
grid_type = get_grid_type(self._grid)
time_index = self.data.index.get_level_values('time')
time_start = time_index.min().start_time
time_end = time_index.max().end_time
time_str = [f"Span: {time_start} - {time_end}",
f"Frequency: {self.time_freq}",
]
if self._timeop is not None:
time_str.append(f"Reduction: '{self._timeop}'")
repr_str = [
f"Data",
f"----",
metadata_str,
"",
f"Grid",
f"----",
f"{self._grid}",
"\n"
f"Time series",
f"-----------",
'\n'.join(time_str),
"\n"
f"History",
f"-------",
history_str,
]
return "\n".join(repr_str)
def __repr__(self) -> str:
return self.__str__()
# Output
# ------
[docs]
def to_xarray(self):
raise NotImplementedError
#return self._data.to_xarray().sortby('lon').sortby('lat').sortby('time')
[docs]
def to_netcdf(self, fname: Optional[str] = None):
raise NotImplementedError
# TODO: This doesn't work w/ PeriodIndex!
#data_xr = to_xarray(self._data)
#data_xr.to_netcdf(fname)
[docs]
def to_csv(self, fname: str):
raise NotImplementedError
#self._data.to_csv(fname)