Source code for canopy.util.overlap

import pandas as pd
from typing import Literal, Optional
from canopy.core.field import Field
from canopy.core.grid import get_grid
from canopy.core.grid import Grid
from canopy.core.grid import GridEmpty


def _match_entries_exactly(df_left, df_right):

    index_overlap = df_left.index.intersection(df_right.index)
    df_left_overlap = df_left.loc[index_overlap,:]
    df_right_overlap = df_right.loc[index_overlap,:]

    return df_left_overlap, df_right_overlap


def _match_labels(field_left, field_right):

    if 'label' not in field_left.data.index.names or 'label' not in field_right.data.index.names:
        raise ValueError("One or both of the passed fields does not have a 'label' level.")

    # As per specification, a field's data cannot have a 'label' index and not have coordinate
    # levels, so we know that it must be a MultiIndex with levels ['label', 'xaxis', 'yaxis', 'time'].
    # Additionally, since grid compatibility has been checked, we know the spatial axes are the same.
    levels_to_reset = list(field_left.grid.axis_names)
    df_left, df_right = _match_entries_exactly(field_left.data.reset_index(levels_to_reset), field_right.data.reset_index(levels_to_reset))
    df_left = df_left.reset_index().set_index(['label', *levels_to_reset, 'time'])
    df_right = df_right.reset_index().set_index(['label', *levels_to_reset, 'time'])
    grid = field_left.grid.crop(df_left)
    
    field_left_overlap = Field(df_left, grid, modified=True)
    field_right_overlap = Field(df_right, grid, modified=True)

    return field_left_overlap, field_right_overlap


def _match_coordinates(field_left, field_right):

    levels_to_reset1 = ['label'] if 'label' in field_left.data.index.names else []
    levels_to_reset2 = ['label'] if 'label' in field_right.data.index.names else []
    df_left_levels = list(field_left.data.index.names)
    df_right_levels = list(field_right.data.index.names)

    df_left, df_right = _match_entries_exactly(field_left.data.reset_index(levels_to_reset1), field_right.data.reset_index(levels_to_reset2))
    df_left = df_left.reset_index().set_index(df_left_levels)
    df_right = df_right.reset_index().set_index(df_right_levels)
    grid = field_left.grid.crop(df_left)

    field_left_overlap = Field(df_left, grid, modified=True)
    field_right_overlap = Field(df_right, grid, modified=True)
    
    return field_left_overlap, field_right_overlap


def _cjoin_one_spatial_level(coords_left, coords_right, ax, atol):

    cond1 = [(ax, ax, '>='), ('timestamp', 'timestamp', '==')]
    cond2 = [(ax, ax, '<'), ('timestamp', 'timestamp', '==')]
    # Have the next two lines ignore typing; the conditional_join method is not a Pandas standard method.
    # Instead, it is available through the pyjanitor library (https://pyjanitor-devs.github.io/pyjanitor/).
    # This seems to currently mess with mypy.
    dj1 = coords_left.conditional_join(coords_right, *cond1, use_numba=False) # type: ignore
    dj2 = coords_left.conditional_join(coords_right, *cond2, use_numba=False) # type: ignore

    mask = (dj1['left']
            .loc[:, [ax]]
            .sub(dj1['right'].loc[:,[ax]])
            .abs()
            .le(atol)
            .all(axis=1)
            )
    dj1 = dj1[mask]

    mask = (dj2['left']
            .loc[:, [ax]]
            .sub(dj2['right'].loc[:,[ax]])
            .abs()
            .le(atol)
            .all(axis=1)
            )
    dj2 = dj2[mask]

    dj = pd.concat([dj1, dj2], ignore_index=True, sort=False, copy=False)

    return dj


def _cjoin_two_spatial_levels(coords_left, coords_right, ax0, ax1, atol):

    cond1 = [(ax0, ax0, '>='), (ax1, ax1, '>='), ('timestamp', 'timestamp', '==')]
    cond2 = [(ax0, ax0, '>'), (ax1, ax1, '<'), ('timestamp', 'timestamp', '==')]
    cond3 = [(ax0, ax0, '<'), (ax1, ax1, '>'), ('timestamp', 'timestamp', '==')]
    cond4 = [(ax0, ax0, '<'), (ax1, ax1, '<'), ('timestamp', 'timestamp', '==')]

    # See comment about this type: ignore in _cjoin_one_spatial_level
    dj1 = coords_left.conditional_join(coords_right, *cond1, use_numba=False) # type: ignore
    dj2 = coords_left.conditional_join(coords_right, *cond2, use_numba=False) # type: ignore
    dj3 = coords_left.conditional_join(coords_right, *cond3, use_numba=False) # type: ignore
    dj4 = coords_left.conditional_join(coords_right, *cond4, use_numba=False) # type: ignore

    mask = (dj1['left']
            .loc[:, [ax0, ax1]]
            .sub(dj1['right'].loc[:,[ax0, ax1]])
            .abs()
            .le(atol)
            .all(axis=1)
            )
    dj1 = dj1[mask]

    mask = (dj2['left']
            .loc[:, [ax0, ax1]]
            .sub(dj2['right'].loc[:,[ax0, ax1]])
            .abs()
            .le(atol)
            .all(axis=1)
            )
    dj2 = dj2[mask]

    mask = (dj3['left']
            .loc[:, [ax0, ax1]]
            .sub(dj3['right'].loc[:,[ax0, ax1]])
            .abs()
            .le(atol)
            .all(axis=1)
            )
    dj3 = dj3[mask]

    mask = (dj4['left']
            .loc[:, [ax0, ax1]]
            .sub(dj4['right'].loc[:,[ax0, ax1]])
            .abs()
            .le(atol)
            .all(axis=1)
            )
    dj4 = dj4[mask]

    dj = pd.concat([dj1, dj2, dj3, dj4], ignore_index=True, sort=False, copy=False)

    return dj


def _match_coordinates_with_tolerance(field_left, field_right, atol, use_index):

    # Check use_index before starting the calculations
    if use_index not in [None, 'left', 'right']:
        raise ValueError("'use_index' must either be 'left', 'right', or None (the default).")

    df_left = field_left.data
    df_right = field_right.data
    df_left_levels = list(df_left.index.names)
    df_right_levels = list(df_right.index.names)

    spatial_levels = [ax for ax in df_left_levels if ax in field_left.grid.axis_names]
    coords_left = df_left.reset_index().loc[:, df_left_levels]
    coords_left['timestamp'] = coords_left.time.dt.to_timestamp().astype('datetime64[s]')
    coords_right = df_right.reset_index().loc[:, df_right_levels]
    coords_right['timestamp'] = coords_right.time.dt.to_timestamp().astype('datetime64[s]')

    if len(spatial_levels) == 1:
        ax, = spatial_levels
        dj = _cjoin_one_spatial_level(coords_left, coords_right, ax, atol)
    elif len(spatial_levels) == 2:
        ax0, ax1 = spatial_levels
        dj = _cjoin_two_spatial_levels(coords_left, coords_right, ax0, ax1, atol)
    else:
        # Should never happen
        raise ValueError("Wrong number of spatial levels.")

    index1_overlap = dj['left'].set_index(df_left_levels).index
    df_left_overlap = df_left.loc[index1_overlap,:]
    index2_overlap = dj['right'].set_index(df_right_levels).index
    df_right_overlap = df_right.loc[index2_overlap,:]
    grid = field_left.grid.crop(df_left_overlap)

    if use_index == 'right':
        djj = dj.set_index([('left', ax) for ax in df_left_levels])['right'].loc[:, df_right_levels]
        djj.index.names = df_left_levels
        df_left_overlap = df_left_overlap.join(djj).reset_index(drop=True).set_index(df_right_levels)
    elif use_index == 'left':
        djj = dj.set_index([('right', ax) for ax in df_right_levels])['left'].loc[:, df_left_levels]
        djj.index.names = df_right_levels
        df_right_overlap = df_right_overlap.join(djj).reset_index(drop=True).set_index(df_left_levels)

    field_left_overlap = Field(df_left_overlap, grid, modified=True)
    field_right_overlap = Field(df_right_overlap, grid, modified=True)

    return field_left_overlap, field_right_overlap


def _make_log_messages(field_left, field_right, match, atol, use_index):

    how = "The intersection was calculated"
    if match == 'coordinates':
        how += ' on the coordinates'
        if atol is None:
            how += ' (exact matching).'
        else:
            how += f' with a tolerance of {atol}.'
    else:
        match += " by matching site labels."

    log_message_left = f"Selected overlapping rows with field '{field_right.name}'. {how}"
    log_message_right = f"Selected overlapping rows with field '{field_left.name}'. {how}"

    if use_index == 'left':
        log_message_right += " The index was replaced with the other field's."
    elif use_index == 'right':
        log_message_left += " The index was replaced with the other field's."
 
    return log_message_left, log_message_right


[docs] def overlap(field_left: Field, field_right: Field, match: Literal['coordinates', 'labels'] = 'coordinates', \ atol: Optional[float] = None, use_index: Optional[Literal['left', 'right']] = None) \ -> tuple[Field, Field]: """ Calculate entries with overlapping indices between two fields. This function takes two fields, and returns another two fields with the data corresponding to where the spatial and time coordinates of the original fields overlap. The overlap allows for a certain tolerance in the spatial coordinates. Parameters ---------- field_left : Field The first field object field_right : Field The second field object match : Literal['coordinates', 'labels'] How to match the spatial part of the index, whether throug coordinates (the default) or through labels. If 'coordinates' is selected, the 'label' index level, if present, is ignored. If 'label' is selected, the spatial coordinates are ignored. atol : Optional[float] = None If None (the default value), spatial coordinates matching is exact. Otherwise the coordinates are matched within atol (absolute tolerance). use_index : Optional[Literal['left', 'right']] = None If None, each returned field's index corresponds with the original fields. Otherwise, the returned fields have an index based on the first or second field. This only makes a difference when spatial coordinates are matched up to a tolerance. Returns ------- Two fields holding the data from the original fields, but only where the original indices overlap. Notes ----- Matching labels or coordinates exactly is much faster than allowing for tolerance, so use atol=None, rather than atol=0, if you want an exact match. Example ------- # This will match field entries with slightly offset spatial coordinates import canopy as cp # Load (dummy) data sources: my_source = cp.get_source("/path/to/some/data", "lpj_guess") my_other_source = cp.get_source("/path/to/some/other/data", "lpj_guess") # Load fields field_left = my_source.load_field('anpp') field_right = my_other_source.load_field('anpp') print(field_left.data) # TrBE C4G # lon lat time # 1.0 1.0 1989 0 0 # 1990 1 2 # 2.0 1989 2 4 # 1990 3 6 # 2.0 3.0 1989 4 8 # 1990 5 10 # 1991 6 12 print(field_right.data) # TrBE C4G # lon lat time # 1.1 2.1 1990 1 2 # 2.1 3.1 1990 3 4 # 1991 5 6 # 1992 7 8 # 3.1 -3.1 1984 9 10 # Exact coordinate matching leads to empty fields: field_left_overlap, field_right_overlap = cp.overlap(field_left, field_right) print(field_left) # # Field is empty! # # History # ------- # [1] 2025-11-10 19:05:56: overlap: sliced with overlapping rows from another field (description: Pretend Annual NPP just for showcasing cp.overlap). The index intersection was calculated exactly. Field was sliced to empty. # Allowing some tolerance does the trick. Let's allow a tolerance of 0.2 degrees in the spatial coordinates field_left_overlap, field_right_overlap = cp.overlap(field_left, field_right, atol=0.2) print(field_left_overlap.data) # TrBE C4G # lon lat time # 1.0 2.0 1990 3 6 # 2.0 3.0 1990 5 10 # 1991 6 12 print(field_right_overlap.data) # TrBE C4G # lon lat time # 1.1 2.1 1990 1 2 # 2.1 3.1 1990 3 4 # 1991 5 6 # We can specify that the second field's coordinates should be used for both returned fields. # Having the same coordinates in both fields can help comparing them. field_left_overlap, field_right_overlap = cp.overlap(field_left, field_right, atol=0.2, use_index="second") print(field_left_overlap.data) # TrBE C4G # lon lat time # 1.1 2.1 1990 3 6 # 2.1 3.1 1990 5 10 # 1991 6 12 print(field_right_overlap.data) # TrBE C4G # lon lat time # 1.1 2.1 1990 1 2 # 2.1 3.1 1990 3 4 # 1991 5 6 """ if not field_left.grid.is_compatible(field_right.grid): raise ValueError("Grids are not compatible.") if match == "coordinates": if atol is None: field_left_overlap, field_right_overlap = _match_coordinates(field_left, field_right) else: field_left_overlap, field_right_overlap = _match_coordinates_with_tolerance(field_left, field_right, atol, use_index) elif match == "labels": field_left_overlap, field_right_overlap = _match_labels(field_left, field_right) else: raise ValueError("'match' can only be 'coordinates' or 'labels'") log_message_left, log_message_right = _make_log_messages(field_left, field_right, match, atol, use_index) field_left_overlap.copy_md(field_left) field_left_overlap.copy_history(field_left) field_left_overlap.log(log_message_left) field_right_overlap.copy_md(field_right) field_right_overlap.copy_history(field_right) field_right_overlap.log(log_message_right) return field_left_overlap, field_right_overlap