import warnings
from typing import Optional, List, Tuple, Any
import matplotlib.pyplot as plt
from matplotlib.axes import Axes
from matplotlib.figure import Figure
import pandas as pd
import seaborn as sns
import canopy as cp
from canopy.visualization.multiple_figs import setup_figure_and_axes, create_wrapper_from_locals
from canopy.visualization.visualization_helpers import (
handle_figure_output, get_color_palette, make_dark_mode,
format_value_label, set_axis_style, get_field_metadata,
)
from canopy.visualization.line_plot.line_plot_helpers import (
apply_legend_style, var_case_matrix, get_n_hue_classes,
)
MAX_GRIDCELLS = 20
# Define a custom warning format to show only the message
def _custom_warning_format(message, category, filename, lineno, line=None):
return f"{category.__name__}: {message}\n"
warnings.formatwarning = _custom_warning_format
[docs]
def make_time_series(
fields: cp.Field | List[cp.Field],
output_file: Optional[str] = None,
layers: Optional[List[str] | str] = None,
gridop: Optional[str] = None,
make_diff: bool = False,
yaxis_label: Optional[str] = None,
yaxis_lim: Optional[List[float]] = None,
field_labels: Optional[List[str]] = None,
unit: Optional[str] = None,
title: Optional[str] = None,
palette: Optional[str] = None,
custom_palette: Optional[str] = None,
move_legend: bool = False,
legend_style: str = 'default',
reverse_hue_style: bool = False,
max_labels_per_col: int = 15,
baseline: bool = False,
rolling_size: Optional[int] = None,
stacked: bool = False,
relative: bool = False,
dark_mode: bool = False,
transparent: bool = False,
x_label_rotation: float = 0,
x_fig: float = 10,
y_fig: float = 10,
subfig=None,
return_fig: bool = False,
**kwargs,
) -> Optional[plt.Figure]:
"""
Create a time-series plot from the given fields.
Parameters
----------
fields : cp.Field or List[cp.Field]
Input data Field or list of Fields to display.
output_file : str, optional
File path for saving the plot.
layers : List[str] or str, optional
List of layer names to display.
gridop : str, optional
The reduction operation. Either 'sum', 'av' or None. Default is None.
make_diff : bool, optional
Option to make the difference between two time-series. Default is False.
yaxis_label : str, optional
Y-axis label, if not provided canopy will try to retrieve the name of the variable in the metadata.
yaxis_lim : List[float], optional
List of y-axis limits.
field_labels : List[str], optional
List of labels for the time series.
unit : str, optional
Unit of the y-axis variable, if not provided canopy will try to retrieve
the unit of the variable in the metadata.
title : str, optional
Title of the plot.
palette : str, optional
Seaborn color palette to use for the line colors (https://seaborn.pydata.org/tutorial/color_palettes.html,
recommended palette are in https://colorbrewer2.org).
custom_palette : str, optional
Path of custom color palette .txt file to use. Names should match label names.
move_legend : bool, optional
Move the the legend outside of plot. Default is False.
legend_style : str or None, optional
Style of the legend ('default', 'highlighted', 'end-of-line', 'hidden').
If 'hidden', the legend will not be shown.
reverse_hue_style : bool, optional
Reverse how seaborn uses hue for different time series and style for different layers. Default is False.
max_labels_per_col : int, optional
Maximum number of labels per layer in the legend. Default is 15.
baseline : bool, optional
Option to add a y=0 dotted line. Default is False.
rolling_size : int, optional
Window of rolling mean. Only available when using a single dimension
(multiple layers, multiple fields, or multiple gridcells).
stacked : bool, optional
Option to create a stacked plot. Default is False.
relative : bool, optional
Option to plot relative values. Default is False.
dark_mode : bool, optional
Option to use dark mode. Default is False.
transparent: bool, optional
Option to use a transparent background. Default is False.
x_label_rotation : float, optional
Rotation angle in degrees for the x-axis tick labels. Default is 0.
x_fig : float, optional
Width of the figure in inches. Default is 10.
y_fig : float, optional
Height of the figure in inches. Default is 10.
subfig : matplotlib.figure.SubFigure, optional
If provided, the plot will be created in this subfigure instead of creating a new figure.
This is used by multiple_figs() to combine multiple plots.
User can also provide a plt.figure.subfigure object
(https://matplotlib.org/stable/gallery/subplots_axes_and_figures/subfigures.html)
return_fig : bool, optional
If True, return a callable wrapper function instead of creating the plot immediately.
This wrapper can be used with multiple_figs(). Default is False.
**kwargs
Additional keyword arguments are passed directly to `seaborn.lineplot`. This allows customization of
line aesthetics such as `linewidth`, `linestyle`, `alpha`, etc.
"""
# If return_fig is True, create a wrapper function and return it
if return_fig:
return create_wrapper_from_locals(make_time_series, locals())
# Force fields and layers to be a list
if isinstance(fields, cp.Field):
fields = [fields]
if isinstance(layers, str):
layers = [layers]
# Create boolean if grid is not reduced
grid_not_reduced = not (gridop or fields[0].grid.is_reduced("lat") or fields[0].grid.is_reduced("lon"))
# Retrieve metadata
yaxis_label, unit, layers = get_field_metadata(fields, yaxis_label, unit, layers)
# Pre-checks
n_fields = len(fields)
n_layers = len(layers)
legend_style = _pre_checks(
fields, n_fields, n_layers, grid_not_reduced, stacked, legend_style, move_legend,
rolling_size, kwargs
)
# Make space reduction and limit gridcells of the different fields
fields_red = []
for field in fields:
# Reduce grid if gridop is provided and not already reduced
if gridop and not field.grid.is_reduced("lat") and not field.grid.is_reduced("lon"):
fields_red.append(field.reduce_grid(gridop))
# If grid is already reduced, add the field as is
elif field.grid.is_reduced("lat") and field.grid.is_reduced("lon"):
fields_red.append(field)
# If grid is not reduced, sample gridcells
else:
n_gridcells = len(field.coordinates)
if n_gridcells > MAX_GRIDCELLS:
fields_red.append(field.sample_gridcells(MAX_GRIDCELLS))
else:
fields_red.append(field)
# Make line objects and flatten columns
time_series = []
for field in fields_red:
if relative:
field = _field_layers_to_relative(field)
df = cp.make_lines(field, flatten_columns=True, layers=layers)
time_series.append(df)
# Make the difference between the lines (second minus first)
if make_diff:
if n_fields == 2:
time_series = [time_series[1] - time_series[0]]
n_fields = 1
else:
raise ValueError("make_diff is True, but the number of time_series is not equal to 2.")
# Set up the figure and axes
fig, ax = setup_figure_and_axes(subfig=subfig, x_fig=x_fig, y_fig=y_fig)
if relative is True:
if unit != "%":
warnings.warn("Unit will be changed to '%' for relative values.", UserWarning)
unit = "%"
# Choose which variables map to line colour (hue) vs line style based on case matrix
hue_var, style_var = var_case_matrix(n_fields, n_layers, grid_not_reduced, reverse_hue_style)
# Post-checks
legend_style, rolling_size = _post_checks(
time_series, n_fields, field_labels, style_var, legend_style, rolling_size
)
n_classes = get_n_hue_classes(hue_var, time_series, n_fields, n_layers)
# Get the colors for the line colour (hue) based on the number of classes
colors, palette_dict = get_color_palette(n_classes, palette=palette, custom_palette=custom_palette)
if custom_palette: # reorder the palette to match the order of the layers
hue_order = (layers if hue_var == "layer" else field_labels) if hue_var and field_labels else None
if hue_order:
colors = [palette_dict[c] for c in hue_order if c in palette_dict]
# Convert PeriodIndex to DatetimeIndex if necessary
for ts in time_series:
if isinstance(ts.index, pd.PeriodIndex):
ts.index = ts.index.to_timestamp()
# Plot
if stacked is False:
x_col = "time"
plot_df = _to_long_format(
time_series, n_fields, n_layers, field_labels, grid_not_reduced, x_col
)
fig, ax = _plot_time_series(
fig, ax, plot_df, hue_var, style_var, colors, move_legend, legend_style,
max_labels_per_col, rolling_size, x_col=x_col, **kwargs
)
else:
stacked_layers = [c for c in layers if c in time_series[0].columns] or list(time_series[0].columns)
ax = _plot_stacked_time_series(
ax, time_series, stacked_layers, colors, max_labels_per_col,
legend_style
)
# Set axis style
set_axis_style(ax, title=title, x_label="Year", y_label=format_value_label(yaxis_label, unit))
if x_label_rotation:
ax.tick_params(axis='x', labelrotation=x_label_rotation)
ax.set_xlim(min(ts.index.min() for ts in time_series), max(ts.index.max() for ts in time_series))
if relative is True:
ax.set_ylim([0, 100])
elif yaxis_lim is not None:
ax.set_ylim(yaxis_lim)
# Dark mode
if dark_mode is True:
fig, ax = make_dark_mode(fig, ax, legend_style)
# Add y=0 line
if baseline is True:
ax.axhline(0, color="black", linestyle="--", linewidth=1)
return handle_figure_output(fig, output_file=output_file, transparent=transparent, subfig=subfig)
# ---------------------------------------------------------------------------
# Data preparation helpers
# ---------------------------------------------------------------------------
def _pre_checks(
fields: List[cp.Field],
n_fields: int,
n_layers: int,
grid_not_reduced: bool,
stacked: bool,
legend_style: str,
move_legend: bool,
rolling_size: Optional[int],
kwargs: dict[str],
) -> str:
"""
Validate and normalize plot options before processing.
Returns the (possibly adjusted) legend_style.
"""
for i, field in enumerate(fields):
if getattr(field, "timeop", None) is not None:
raise ValueError(
f"Field {i} has already been time-reduced (timeop='{field.timeop}'). "
"Time series plot requires unreduced time dimension."
)
if n_fields > 1 and n_layers > 1 and grid_not_reduced:
raise ValueError(
"Cannot differentiate 3 categories (fields, layers, gridcells). "
"Use at most 2: e.g. reduce grid or use one layer."
)
if n_fields > 1 and stacked is True:
raise ValueError("Stacked plot is only available for one time series.")
if legend_style not in ["default", "hidden"]:
if move_legend is True:
warnings.warn("Legend_style with move_legend can only be 'default.'", UserWarning)
legend_style = "default"
if rolling_size:
warnings.warn("Legend style can only be 'default' or 'hidden' when using rolling mean.", UserWarning)
legend_style = "default"
if stacked is True:
warnings.warn("Legend style in stacked plot can only be 'default.'", UserWarning)
legend_style = "default"
if rolling_size:
if stacked is True:
warnings.warn("Rolling mean is not available for stacked plot.", UserWarning)
if kwargs:
warnings.warn(
f"Rolling mean is not available with custom lineplot kwargs: {kwargs}",
UserWarning,
)
return legend_style
def _field_layers_to_relative(field: cp.Field) -> cp.Field:
"""Express selected layers as percentages of their per-row sum (Field arithmetic)."""
# Add Total layers
total_in_layer = "Total" in field.layers
if not total_in_layer:
field_with_total = field.reduce_layers("sum", name="Total")
else:
field_with_total = field
# Calculate percentage
field_perc = 100.0 * field_with_total / field_with_total["Total"]
# Remove Total layer
if not total_in_layer:
field_perc = field_perc.drop_layers(field_perc.layers[-1])
# Rename layers
rename_map = dict(zip(field_perc.layers, field_with_total.layers))
field_perc.rename_layers(rename_map)
return field_perc
def _post_checks(
time_series: List[pd.DataFrame],
n_fields: int,
field_labels: Optional[List[str]],
style_var: Optional[str],
legend_style: str,
rolling_size: Optional[int],
) -> Tuple[str, Optional[int]]:
"""
Validate inputs and normalize options after data preparation.
Returns (legend_style, rolling_size), both possibly adjusted.
"""
if any(ts.empty for ts in time_series):
raise ValueError("One or more time_series are empty")
if n_fields > 1:
if field_labels is None:
raise ValueError("field_labels must be defined for multiple time-series.")
if len(field_labels) != n_fields:
raise ValueError("field_labels should be of the same size as the number of time series.")
if style_var is not None and legend_style not in ["default", "hidden"]:
warnings.warn(
"Legend style can only be 'default' when style are used for differentiation.",
UserWarning,
)
legend_style = "default"
if style_var is not None and rolling_size:
warnings.warn(
"Rolling mean is only available when using a single dimension (layers, fields, or gridcells)",
UserWarning,
)
rolling_size = None
return legend_style, rolling_size
def _to_long_format(
time_series: List[pd.DataFrame],
n_fields: int,
n_layers: List[str],
field_labels: Optional[List[str]],
grid_not_reduced: bool,
x_col: str = "time",
) -> pd.DataFrame:
"""Convert list of DataFrames to single long-format DataFrame for plotting."""
dfs = []
# Convert to long format
for i, df in enumerate(time_series):
label = (field_labels[i] if field_labels else f"Field {i}")
flat_df = df.reset_index()
id_vars = [x_col] if x_col in flat_df.columns else []
value_vars = [c for c in flat_df.columns if c not in id_vars]
# Melt the DataFrame
melted = flat_df.melt(id_vars=id_vars, value_vars=value_vars, var_name="series", value_name="value")
# Add column labels to the melted DataFrame
if n_fields > 1:
melted["field_label"] = label
if n_layers > 1 and grid_not_reduced:
melted["layer"] = melted["series"].str.split(" - ", n=1).str[0]
melted["gridcell"] = melted["series"].str.split(" - ", n=1).str[1]
elif n_layers > 1:
melted["layer"] = melted["series"]
elif grid_not_reduced:
melted["gridcell"] = melted["series"]
dfs.append(melted)
return pd.concat(dfs, ignore_index=True)
# ---------------------------------------------------------------------------
# Plotting helpers
# ---------------------------------------------------------------------------
def _plot_time_series(
fig: Figure,
ax: Axes,
plot_df: pd.DataFrame,
hue_var: Optional[str],
style_var: Optional[str],
colors: Any,
move_legend: bool,
legend_style: str,
max_labels_per_col: int,
rolling_size: Optional[int],
x_col: str = "time",
**kwargs,
) -> Tuple[Figure, Axes]:
"""Plot lines from long-format DataFrame."""
# Extra kwargs
extra = {k: v for k, v in kwargs.items() if k not in ("subfig", "x_col")}
if rolling_size:
# Plot original data (dashed) and rolling mean (solid) separately
plot_df_rolling = plot_df.copy()
# Calculate rolling mean
plot_df_rolling["value"] = plot_df_rolling.groupby("series")["value"].transform(
lambda x: x.rolling(window=rolling_size).mean()
)
hue_col = hue_var if hue_var else "series" # if only one colour
sns.lineplot(data=plot_df, x=x_col, y="value", hue=hue_col, palette=colors, ax=ax,
legend=False, linestyle="--", **extra)
sns.lineplot(data=plot_df_rolling, x=x_col, y="value", hue=hue_col, palette=colors, ax=ax,
legend=bool(legend_style), linestyle="-", **extra)
else:
# Standard line plot with hue/style for differentiation
plot_kwargs = {"data": plot_df, "x": x_col, "y": "value", "ax": ax, "legend": bool(legend_style)}
if hue_var:
plot_kwargs["hue"] = hue_var
plot_kwargs["palette"] = colors
if style_var:
plot_kwargs["style"] = style_var
else:
plot_kwargs["dashes"] = False # Solid lines when no style differentiation
plot_kwargs.update(extra)
sns.lineplot(**plot_kwargs)
# Apply the legend style
apply_legend_style(
ax, legend_style, max_labels_per_col, move_legend, rolling_size,
hue_var=hue_var, style_var=style_var
)
return fig, ax
def _plot_stacked_time_series(
ax: Axes,
time_series: List[pd.DataFrame],
layers: List[str],
colors: Any,
max_labels_per_col: int,
legend_style: str,
) -> Axes:
"""Same as _plot_time_series but for stacked plot."""
# Plot each line in time_series
ax.stackplot(time_series[0].index, time_series[0][layers].T.values, colors=colors, linewidth=0.75)
if legend_style != 'hidden':
# Add legend with reversed order
ncols = (len(layers) + max_labels_per_col - 1) // max_labels_per_col # Ceiling division
ax.legend(layers, loc='center left', bbox_to_anchor=(1, 0.5), frameon=False,
fontsize=14, reverse=True, ncols=ncols)
return ax