Source code for canopy.readers.lpjguess

"""Reader functions for standard LPJ-GUESS output

This module provides reader functions for the standard, text-based output of LPJ-GUESS.

Registered file formats:
    - lpjg_annual: e.g. anpp.out[.gz], aaet.out[.gz] ...
    - lpjg_monthly: e.g. mnpp.out[.gz], maet.out[.gz] ...
"""

import pandas as pd
import numpy as np
from typing import Any, cast
from canopy.core.redspec import RedSpec
from canopy.core import frameops
from canopy.core.grid import get_grid, get_gridop
from canopy.readers.registry import register_reader_desc

# Readers for LPJ-GUESS standard (common) output
# ----------------------------------------------

# Number of rows to load for working out the length of the time series when reading file by chunks
NROWS_PROBE = 2000
# Number of gridcells per chunk when reading file by chunks
NGRIDCELLS_PER_CHUNK = 1000
# Columns to build index
INDEX_COLS = ['Lon', 'Lat', 'Year', ]


def _get_column_dtypes(path: str, data_dtype):

    dtypes = {'Lon': np.float64, 'Lat': np.float64, 'Year': np.int32}

    header = pd.read_csv(path, sep=r'\s+', nrows=0).columns.tolist()
    column_dtypes = {x: dtypes.get(x, data_dtype) for x in header}

    return column_dtypes


def _insert_index_yearly(df: pd.DataFrame) -> pd.DataFrame:
    """Insert a MultiIndex for an annual output file

    Parameters
    ----------
    df : pd.DataFrame
        A pandas DataFrame
    """
    # Ignore the type of index because the type hints for PeriodIndex are outdated and don't include from_fields
    index = pd.PeriodIndex.from_fields(year=df.Year, month=[1]*len(df), freq='Y') # type: ignore
    df.Year = index
    df.index = pd.MultiIndex.from_frame(df[INDEX_COLS])
    df = df.drop(INDEX_COLS, axis=1)
    df.index.names = ['lon', 'lat', 'time', ]

    return df


def _insert_index_monthly(df: pd.DataFrame) -> pd.DataFrame:
    """Insert a MultiIndex for a monthly output file

    Parameters
    ----------
    df : pd.DataFrame
        A pandas DataFrame
    """
    month_numbers = list(range(1,13))
    df.columns = cast(pd.Index, INDEX_COLS + month_numbers)
    df = df.melt(var_name='_month', value_name='Data', id_vars=INDEX_COLS)
    # Disabled type checking on next line because the 'from_field' method is a 'future' feature that will only replace
    #   the fields being passed to the constructor in Pandas 3.0 (I am using from_fields here for future compatibility
    #   and to avoid a deprecation warning).
    df['_time'] = pd.PeriodIndex.from_fields(year=df['Year'], month=df['_month'], freq='M') # type: ignore
    df = df.set_index(['Lon', 'Lat', '_time'])
    df.index.names = ['lon', 'lat', 'time']
    df = df.drop(['Year', '_month'], axis=1)

    return df


def _chunk_spatial_aggregation(ichunk, chunk, grid, preprocess):

    weights = pd.DataFrame([1]*len(chunk.index), index=chunk.index, columns=['w'])
    sumop = get_gridop(grid, 'sum', preprocess.axis)
    chunk = sumop(chunk, grid)
    if preprocess.gridop == 'av':
        weights = sumop(weights, grid)
    else:
        weights = None

    return chunk, weights


def _read_by_chunks(path: str, data_dtype: type, preprocess: RedSpec,
                   grid_type: str | None, grid_params: dict, insert_index, use_layers):

    #TODO: include reduction info in history?

    column_dtypes = _get_column_dtypes(path, data_dtype)

    if grid_type is None:
        raise ValueError(f"Parameter 'grid_type' must be specified.")

    probe = pd.read_csv(path, nrows=NROWS_PROBE, sep=r'\s+', dtype=column_dtypes, usecols=use_layers)

    chunksize = len(probe['Year'].unique()) * NGRIDCELLS_PER_CHUNK

    probe = insert_index(probe)
    grid = get_grid(grid_type).from_frame(probe, **grid_params)

    if preprocess.gridop is not None and preprocess.axis is None:
        preprocess.axis = 'both'

    chunk_list = []
    weight_list = []
    for ichunk, chunk in enumerate(pd.read_csv(path, chunksize=chunksize, sep=r'\s+', usecols=use_layers, dtype=column_dtypes)):
        chunk = insert_index(chunk)
        # Slice
        if preprocess.slices is not None:
            chunk, _, _ = frameops.select_slice(chunk, grid, preprocess.slices)
            if chunk.empty:
                continue

        # Time reduction
        if preprocess.timeop is not None:
            chunk, _ = frameops.reduce_time(chunk, timeop=preprocess.timeop, freq=preprocess.freq)

        # Spatial reduction
        if preprocess.gridop is not None:
            chunk, weights = _chunk_spatial_aggregation(ichunk, chunk, grid, preprocess)
            weight_list.append(weights)

        chunk_list.append(chunk)

    if len(chunk_list) == 0:
        raise ValueError("Requested slicing yields empty data.")

    df_red = pd.concat(chunk_list)

    if preprocess.gridop is not None:
        group_levels = ['time']
        if preprocess.axis != 'both':
            group_levels = list(grid.axis_names) + group_levels
            group_levels.remove(preprocess.axis)
    
        df_red = df_red.groupby(group_levels).sum()
        if preprocess.gridop == 'av':
            weights = pd.concat(weight_list).groupby(group_levels).sum()
            values = df_red.values/weights.values
            df_red = pd.DataFrame(values, index=df_red.index, columns=df_red.columns)

    return df_red


[docs] @register_reader_desc('LPJ-GUESS: common output (annual)') def lpjg_annual(path: str, use_layers: str | list[str] | None = None, preprocess: RedSpec | None = None, grid_type: str = 'lonlat', grid_params: dict[str,Any] | None = None, data_dtype: type = np.float32 ): """Read an annual output file from the standard LPJ-GUESS output.""" if use_layers: use_layers = [use_layers] if isinstance(use_layers, str) else use_layers use_layers = INDEX_COLS + use_layers if preprocess is None: column_dtypes = _get_column_dtypes(path, data_dtype) df = pd.read_csv(path, sep=r'\s+', dtype=column_dtypes, usecols=use_layers) df = _insert_index_yearly(df) else: if grid_params is None: grid_params = {} df = _read_by_chunks(path, data_dtype, preprocess, grid_type, grid_params, _insert_index_yearly, use_layers) return df
[docs] @register_reader_desc('LPJ-GUESS: common output (monthly)') def lpjg_monthly(path: str, preprocess: RedSpec | None = None, grid_type: str = 'lonlat', grid_params: dict[str,Any] | None = None, layer_name: str = 'Data', data_dtype: type = np.float32 ): """Read a monthly output file from the standard LPJ-GUESS output.""" if preprocess is None: column_dtypes = _get_column_dtypes(path, data_dtype) df = pd.read_csv(path, sep=r'\s+', dtype=column_dtypes) df = _insert_index_monthly(df) # Ignoring types in the below assignment (pandas stubs demands that expression is of type Index[str]) df.columns = [layer_name] # type: ignore else: if grid_params is None: grid_params = {} # The monthly reader always reads all the columns (Jan-Dec) -> Must past None as usecols for read_csv df = _read_by_chunks(path, data_dtype, preprocess, grid_type, grid_params, _insert_index_monthly, None) # Ignoring types in the below assignment (pandas stubs demands that expression is of type Index[str]) df.columns = [layer_name] # type: ignore return df