import pandas as pd
from typing import Literal, Optional
from canopy.core.field import Field
from canopy.core.grid import get_grid
from canopy.core.grid import Grid
from canopy.core.grid import GridEmpty
def _match_entries_exactly(df_left, df_right):
index_overlap = df_left.index.intersection(df_right.index)
df_left_overlap = df_left.loc[index_overlap,:]
df_right_overlap = df_right.loc[index_overlap,:]
return df_left_overlap, df_right_overlap
def _match_labels(field_left, field_right):
if 'label' not in field_left.data.index.names or 'label' not in field_right.data.index.names:
raise ValueError("One or both of the passed fields does not have a 'label' level.")
# As per specification, a field's data cannot have a 'label' index and not have coordinate
# levels, so we know that it must be a MultiIndex with levels ['label', 'xaxis', 'yaxis', 'time'].
# Additionally, since grid compatibility has been checked, we know the spatial axes are the same.
levels_to_reset = list(field_left.grid.axis_names)
df_left, df_right = _match_entries_exactly(field_left.data.reset_index(levels_to_reset), field_right.data.reset_index(levels_to_reset))
df_left = df_left.reset_index().set_index(['label', *levels_to_reset, 'time'])
df_right = df_right.reset_index().set_index(['label', *levels_to_reset, 'time'])
grid = field_left.grid.crop(df_left)
field_left_overlap = Field(df_left, grid, modified=True)
field_right_overlap = Field(df_right, grid, modified=True)
return field_left_overlap, field_right_overlap
def _match_coordinates(field_left, field_right):
levels_to_reset1 = ['label'] if 'label' in field_left.data.index.names else []
levels_to_reset2 = ['label'] if 'label' in field_right.data.index.names else []
df_left_levels = list(field_left.data.index.names)
df_right_levels = list(field_right.data.index.names)
df_left, df_right = _match_entries_exactly(field_left.data.reset_index(levels_to_reset1), field_right.data.reset_index(levels_to_reset2))
df_left = df_left.reset_index().set_index(df_left_levels)
df_right = df_right.reset_index().set_index(df_right_levels)
grid = field_left.grid.crop(df_left)
field_left_overlap = Field(df_left, grid, modified=True)
field_right_overlap = Field(df_right, grid, modified=True)
return field_left_overlap, field_right_overlap
def _cjoin_one_spatial_level(coords_left, coords_right, ax, atol):
cond1 = [(ax, ax, '>='), ('timestamp', 'timestamp', '==')]
cond2 = [(ax, ax, '<'), ('timestamp', 'timestamp', '==')]
# Have the next two lines ignore typing; the conditional_join method is not a Pandas standard method.
# Instead, it is available through the pyjanitor library (https://pyjanitor-devs.github.io/pyjanitor/).
# This seems to currently mess with mypy.
dj1 = coords_left.conditional_join(coords_right, *cond1, use_numba=False) # type: ignore
dj2 = coords_left.conditional_join(coords_right, *cond2, use_numba=False) # type: ignore
mask = (dj1['left']
.loc[:, [ax]]
.sub(dj1['right'].loc[:,[ax]])
.abs()
.le(atol)
.all(axis=1)
)
dj1 = dj1[mask]
mask = (dj2['left']
.loc[:, [ax]]
.sub(dj2['right'].loc[:,[ax]])
.abs()
.le(atol)
.all(axis=1)
)
dj2 = dj2[mask]
dj = pd.concat([dj1, dj2], ignore_index=True, sort=False, copy=False)
return dj
def _cjoin_two_spatial_levels(coords_left, coords_right, ax0, ax1, atol):
cond1 = [(ax0, ax0, '>='), (ax1, ax1, '>='), ('timestamp', 'timestamp', '==')]
cond2 = [(ax0, ax0, '>'), (ax1, ax1, '<'), ('timestamp', 'timestamp', '==')]
cond3 = [(ax0, ax0, '<'), (ax1, ax1, '>'), ('timestamp', 'timestamp', '==')]
cond4 = [(ax0, ax0, '<'), (ax1, ax1, '<'), ('timestamp', 'timestamp', '==')]
# See comment about this type: ignore in _cjoin_one_spatial_level
dj1 = coords_left.conditional_join(coords_right, *cond1, use_numba=False) # type: ignore
dj2 = coords_left.conditional_join(coords_right, *cond2, use_numba=False) # type: ignore
dj3 = coords_left.conditional_join(coords_right, *cond3, use_numba=False) # type: ignore
dj4 = coords_left.conditional_join(coords_right, *cond4, use_numba=False) # type: ignore
mask = (dj1['left']
.loc[:, [ax0, ax1]]
.sub(dj1['right'].loc[:,[ax0, ax1]])
.abs()
.le(atol)
.all(axis=1)
)
dj1 = dj1[mask]
mask = (dj2['left']
.loc[:, [ax0, ax1]]
.sub(dj2['right'].loc[:,[ax0, ax1]])
.abs()
.le(atol)
.all(axis=1)
)
dj2 = dj2[mask]
mask = (dj3['left']
.loc[:, [ax0, ax1]]
.sub(dj3['right'].loc[:,[ax0, ax1]])
.abs()
.le(atol)
.all(axis=1)
)
dj3 = dj3[mask]
mask = (dj4['left']
.loc[:, [ax0, ax1]]
.sub(dj4['right'].loc[:,[ax0, ax1]])
.abs()
.le(atol)
.all(axis=1)
)
dj4 = dj4[mask]
dj = pd.concat([dj1, dj2, dj3, dj4], ignore_index=True, sort=False, copy=False)
return dj
def _match_coordinates_with_tolerance(field_left, field_right, atol, use_index):
# Check use_index before starting the calculations
if use_index not in [None, 'left', 'right']:
raise ValueError("'use_index' must either be 'left', 'right', or None (the default).")
df_left = field_left.data
df_right = field_right.data
df_left_levels = list(df_left.index.names)
df_right_levels = list(df_right.index.names)
spatial_levels = [ax for ax in df_left_levels if ax in field_left.grid.axis_names]
coords_left = df_left.reset_index().loc[:, df_left_levels]
coords_left['timestamp'] = coords_left.time.dt.to_timestamp().astype('datetime64[s]')
coords_right = df_right.reset_index().loc[:, df_right_levels]
coords_right['timestamp'] = coords_right.time.dt.to_timestamp().astype('datetime64[s]')
if len(spatial_levels) == 1:
ax, = spatial_levels
dj = _cjoin_one_spatial_level(coords_left, coords_right, ax, atol)
elif len(spatial_levels) == 2:
ax0, ax1 = spatial_levels
dj = _cjoin_two_spatial_levels(coords_left, coords_right, ax0, ax1, atol)
else:
# Should never happen
raise ValueError("Wrong number of spatial levels.")
index1_overlap = dj['left'].set_index(df_left_levels).index
df_left_overlap = df_left.loc[index1_overlap,:]
index2_overlap = dj['right'].set_index(df_right_levels).index
df_right_overlap = df_right.loc[index2_overlap,:]
grid = field_left.grid.crop(df_left_overlap)
if use_index == 'right':
djj = dj.set_index([('left', ax) for ax in df_left_levels])['right'].loc[:, df_right_levels]
djj.index.names = df_left_levels
df_left_overlap = df_left_overlap.join(djj).reset_index(drop=True).set_index(df_right_levels)
elif use_index == 'left':
djj = dj.set_index([('right', ax) for ax in df_right_levels])['left'].loc[:, df_left_levels]
djj.index.names = df_right_levels
df_right_overlap = df_right_overlap.join(djj).reset_index(drop=True).set_index(df_left_levels)
field_left_overlap = Field(df_left_overlap, grid, modified=True)
field_right_overlap = Field(df_right_overlap, grid, modified=True)
return field_left_overlap, field_right_overlap
def _make_log_messages(field_left, field_right, match, atol, use_index):
how = "The intersection was calculated"
if match == 'coordinates':
how += ' on the coordinates'
if atol is None:
how += ' (exact matching).'
else:
how += f' with a tolerance of {atol}.'
else:
match += " by matching site labels."
log_message_left = f"Selected overlapping rows with field '{field_right.name}'. {how}"
log_message_right = f"Selected overlapping rows with field '{field_left.name}'. {how}"
if use_index == 'left':
log_message_right += " The index was replaced with the other field's."
elif use_index == 'right':
log_message_left += " The index was replaced with the other field's."
return log_message_left, log_message_right
[docs]
def overlap(field_left: Field, field_right: Field, match: Literal['coordinates', 'labels'] = 'coordinates', \
atol: Optional[float] = None, use_index: Optional[Literal['left', 'right']] = None) \
-> tuple[Field, Field]:
"""
Calculate entries with overlapping indices between two fields.
This function takes two fields, and returns another two fields with the data corresponding to where the spatial and
time coordinates of the original fields overlap. The overlap allows for a certain tolerance in the spatial coordinates.
Parameters
----------
field_left : Field
The first field object
field_right : Field
The second field object
match : Literal['coordinates', 'labels']
How to match the spatial part of the index, whether throug coordinates (the default) or through
labels. If 'coordinates' is selected, the 'label' index level, if present, is ignored. If 'label'
is selected, the spatial coordinates are ignored.
atol : Optional[float] = None
If None (the default value), spatial coordinates matching is exact. Otherwise the coordinates are
matched within atol (absolute tolerance).
use_index : Optional[Literal['left', 'right']] = None
If None, each returned field's index corresponds with the original fields. Otherwise, the returned
fields have an index based on the first or second field. This only makes a difference when spatial
coordinates are matched up to a tolerance.
Returns
-------
Two fields holding the data from the original fields, but only where the original indices overlap.
Notes
-----
Matching labels or coordinates exactly is much faster than allowing for tolerance, so use atol=None,
rather than atol=0, if you want an exact match.
Example
-------
# This will match field entries with slightly offset spatial coordinates
import canopy as cp
# Load (dummy) data sources:
my_source = cp.get_source("/path/to/some/data", "lpj_guess")
my_other_source = cp.get_source("/path/to/some/other/data", "lpj_guess")
# Load fields
field_left = my_source.load_field('anpp')
field_right = my_other_source.load_field('anpp')
print(field_left.data)
# TrBE C4G
# lon lat time
# 1.0 1.0 1989 0 0
# 1990 1 2
# 2.0 1989 2 4
# 1990 3 6
# 2.0 3.0 1989 4 8
# 1990 5 10
# 1991 6 12
print(field_right.data)
# TrBE C4G
# lon lat time
# 1.1 2.1 1990 1 2
# 2.1 3.1 1990 3 4
# 1991 5 6
# 1992 7 8
# 3.1 -3.1 1984 9 10
# Exact coordinate matching leads to empty fields:
field_left_overlap, field_right_overlap = cp.overlap(field_left, field_right)
print(field_left)
#
# Field is empty!
#
# History
# -------
# [1] 2025-11-10 19:05:56: overlap: sliced with overlapping rows from another field (description: Pretend Annual NPP just for showcasing cp.overlap). The index intersection was calculated exactly. Field was sliced to empty.
# Allowing some tolerance does the trick. Let's allow a tolerance of 0.2 degrees in the spatial coordinates
field_left_overlap, field_right_overlap = cp.overlap(field_left, field_right, atol=0.2)
print(field_left_overlap.data)
# TrBE C4G
# lon lat time
# 1.0 2.0 1990 3 6
# 2.0 3.0 1990 5 10
# 1991 6 12
print(field_right_overlap.data)
# TrBE C4G
# lon lat time
# 1.1 2.1 1990 1 2
# 2.1 3.1 1990 3 4
# 1991 5 6
# We can specify that the second field's coordinates should be used for both returned fields.
# Having the same coordinates in both fields can help comparing them.
field_left_overlap, field_right_overlap = cp.overlap(field_left, field_right, atol=0.2, use_index="second")
print(field_left_overlap.data)
# TrBE C4G
# lon lat time
# 1.1 2.1 1990 3 6
# 2.1 3.1 1990 5 10
# 1991 6 12
print(field_right_overlap.data)
# TrBE C4G
# lon lat time
# 1.1 2.1 1990 1 2
# 2.1 3.1 1990 3 4
# 1991 5 6
"""
if not field_left.grid.is_compatible(field_right.grid):
raise ValueError("Grids are not compatible.")
if match == "coordinates":
if atol is None:
field_left_overlap, field_right_overlap = _match_coordinates(field_left, field_right)
else:
field_left_overlap, field_right_overlap = _match_coordinates_with_tolerance(field_left, field_right, atol, use_index)
elif match == "labels":
field_left_overlap, field_right_overlap = _match_labels(field_left, field_right)
else:
raise ValueError("'match' can only be 'coordinates' or 'labels'")
log_message_left, log_message_right = _make_log_messages(field_left, field_right, match, atol, use_index)
field_left_overlap.copy_md(field_left)
field_left_overlap.copy_history(field_left)
field_left_overlap.log(log_message_left)
field_right_overlap.copy_md(field_right)
field_right_overlap.copy_history(field_right)
field_right_overlap.log(log_message_right)
return field_left_overlap, field_right_overlap