Source code for speasy.products.variable

from copy import deepcopy
from typing import Dict, List, Optional, Any, Tuple, Union

import astropy.table
import astropy.units
import numpy as np
import pandas as pds

from speasy.core.data_containers import (
    DataContainer,
    VariableAxis,
    VariableTimeAxis,
    _to_index
)
from speasy.plotting import Plot
from .base_product import SpeasyProduct


def _values(input: Any) -> Any:
    if isinstance(input, SpeasyVariable):
        return input.values
    return input


def _name(input: Any) -> str:
    if isinstance(input, SpeasyVariable):
        return input.name
    return str(input)


def _np_build_result_name(func, *args, **kwargs):
    return f"{func.__name__}({', '.join(map(_name, args))}{', ' * bool(kwargs)}{', '.join([f'{k}={_name(v)}' for k, v in kwargs.items()])})"


def _check_time_axis(axis: VariableTimeAxis, values: DataContainer):
    if not isinstance(axis, VariableTimeAxis):
        raise TypeError(
            f"axes[0] must be a VariableTimeAxis instance, got {type(axis)}"
        )
    if axis.shape[0] != values.shape[0]:
        raise ValueError(
            f"Time and data must have the same length, got time:{len(axis)} and data:{len(values)}"
        )


def _check_time_dependent_axis(axis: VariableAxis, axis_index, time_axis: VariableTimeAxis, values: DataContainer):
    if axis.shape[0] != len(time_axis):
        raise ValueError(
            f"Time dependent axis must have the same length than time axis, got {len(axis)} and {len(time_axis)}"
        )
    if axis.shape[1] != values.shape[axis_index]:
        raise ValueError(
            f"Axis {axis_index} must match data shape, got {axis.shape[1]} and {values.shape[axis_index]}"
        )


def _check_time_independent_axis(axis: VariableAxis, axis_index, values: DataContainer):
    if axis.shape[0] != values.shape[axis_index]:
        raise ValueError(
            f"Axis {axis_index} must match data shape, got {axis.shape[0]} and {values.shape[axis_index]}"
        )


def _check_extra_axes(time_axis: VariableTimeAxis, axes: List[VariableAxis], values: DataContainer):
    for index, axis in enumerate(axes):
        if not isinstance(axis, VariableAxis):
            raise TypeError(
                f"axes[1:] must be a VariableAxis instance, got {type(axis)}"
            )
        if axis.is_time_dependent:
            _check_time_dependent_axis(axis, index + 1, time_axis, values)
        else:
            _check_time_independent_axis(axis, index + 1, values)


def _check_axes(axes: List[VariableAxis or VariableTimeAxis], values: DataContainer):
    _check_time_axis(axes[0], values)
    _check_extra_axes(axes[0], axes[1:], values)


[docs] class SpeasyVariable(SpeasyProduct): """SpeasyVariable object. Base class for storing variable data. Attributes ---------- time: numpy.ndarray time vector (x-axis data) values: numpy.ndarray data meta: Optional[dict] metadata columns: Optional[List[str]] column names, might be empty for spectrograms or 3D+ data axes: List[np.ndarray] Collection composed of time axis plus eventual additional axes according to values' shape axes_labels: List[str] Axes names unit: str Values physical unit name: str SpeasyVariable name nbytes: int memory usage in bytes fill_value: Any fill value if found in meta-data valid_range: Tuple[Any, Any] valid range if found in meta-data Methods ------- view: Returns a view of the current variable within the desired :data:`index_range` to_dataframe: Converts the variable to a pandas.DataFrame object from_dataframe: Builds a SpeasyVariable from a pandas.DataFrame object to_astropy_table: Converts the variable to an astropy.table.Table unit_applied: Returns a copy where values are astropy.units.Quantity filter_columns: Returns a copy only containing selected columns replace_fillval_by_nan: Returns a SpeasyVaraible with NaN instead of fill value if fill value is set in meta data plot: Plot the data with matplotlib by default to_dictionary: Converts a SpeasyVariable to a Python dictionary, mostly used for serialization purposes copy: Returns a copy """ __slots__ = ["__values_container", "__columns", "__axes"] __LIKE_NP_FUNCTIONS__ = {'zeros_like', 'empty_like', 'ones_like'} def __init__( self, axes: List[VariableAxis or VariableTimeAxis], values: DataContainer, columns: Optional[List[str]] = None, ): super().__init__() _check_axes(axes, values) if not isinstance(values, DataContainer): raise TypeError( f"values must be a DataContainer instance, got {type(values)}" ) self.__columns = list(map(str.strip, columns or [])) if values.ndim == 1: # to be consistent with pandas values = values.reshape((-1, 1)) self.__values_container = values self.__axes = axes
[docs] def view(self, index_range: Union[slice, np.ndarray]) -> "SpeasyVariable": """Return view of the current variable within the desired :data:`index_range`. Parameters ---------- index_range: slice index range Returns ------- speasy.common.variable.SpeasyVariable view of the variable on the given range """ if (type(index_range) is np.ndarray) and (index_range.dtype == bool): index_range = np.all(index_range, axis=tuple(range(1, index_range.ndim)), keepdims=False) return SpeasyVariable( axes=[ axis[index_range] if axis.is_time_dependent else axis for axis in self.__axes ], values=self.__values_container[index_range], columns=self.columns, )
[docs] def copy(self, name=None) -> "SpeasyVariable": """Makes a deep copy the variable Parameters ---------- name: str, optional new variable name, by default None, keeps the same name Returns ------- SpeasyVariable deep copy the variable """ return SpeasyVariable( axes=deepcopy(self.__axes), values=self.__values_container.copy(name=name), columns=deepcopy(self.columns), )
[docs] def filter_columns(self, columns: List[str]) -> "SpeasyVariable": """Builds a SpeasyVariable with only selected columns Parameters ---------- columns : List[str] list of column names to keep Returns ------- SpeasyVariable a SpeasyVariable with only selected columns """ indexes = list(map(lambda v: self.__columns.index(v), columns)) return SpeasyVariable( axes=deepcopy(self.__axes), values=DataContainer( is_time_dependent=self.__values_container.is_time_dependent, name=self.__values_container.name, meta=deepcopy(self.__values_container.meta), values=self.__values_container.values[:, indexes], ), columns=columns, )
def __eq__(self, other: Union["SpeasyVariable", float, int]) -> bool: """Check if this variable equals another. Or apply the numpy array comparison if other is a scalar. Parameters ---------- other: speasy.common.variable.SpeasyVariable, float, int SpeasyVariable or scalar to compare with Returns ------- bool, np.ndarray True if both variables are equal or an array with the element wise comparison between values and the given scalar """ if type(other) is SpeasyVariable: return self.__axes == other.__axes and self.__values_container == other.__values_container else: return self.values.__eq__(other) def __ne__(self, other: Union["SpeasyVariable", float, int]) -> bool: if type(other) is SpeasyVariable: return not (other == self) else: return self.values.__ne__(other) def __len__(self): return len(self.__axes[0]) def __getitem__(self, key): if isinstance(key, slice): return self.view( slice(_to_index(key.start, self.time), _to_index(key.stop, self.time)) ) if type(key) in (list, tuple): if type(key[0]) is np.ndarray: return self.view(key[0]) elif all(map(lambda v: type(v) is str, key)): return self.filter_columns(key) if type(key) is str and key in self.__columns: return self.filter_columns([key]) if type(key) is np.ndarray: return self.view(key) raise ValueError( f"No idea how to slice SpeasyVariable with given value: {key}") def __setitem__(self, k, v: Union["SpeasyVariable", float, int]): if type(v) is SpeasyVariable: self.__values_container[k] = v.__values_container for axis, src_axis in zip(self.__axes, v.__axes): if axis.is_time_dependent: axis[k] = src_axis else: self.__values_container[k] = v def __ge__(self, other): return np.greater_equal(self, other) def __gt__(self, other): return np.greater(self, other) def __le__(self, other): return np.less_equal(self, other) def __lt__(self, other): return np.less(self, other) def __mul__(self, other): return np.multiply(self, other) def __rmul__(self, other): return self.__mul__(other) def __pow__(self, power, modulo=None): return np.power(self, power) def __add__(self, other): if type(other) is np.timedelta64: result = self.copy(name=f"shifted({self.name}, {other})") result.time[:] += other return result return np.add(self, other) def __radd__(self, other): if type(other) is np.timedelta64: raise ValueError( "timedelta64 + SpeasyVariable is not supported, only SpeasyVariable + timedelta64 is supported") return np.add(other, self) def __sub__(self, other): if type(other) is np.timedelta64: result = self.copy(name=f"shifted({self.name}, -{other})") result.time[:] -= other return result return np.subtract(self, other) def __rsub__(self, other): if type(other) is np.timedelta64: raise ValueError( "timedelta64 - SpeasyVariable is not supported, only SpeasyVariable - timedelta64 is supported") return np.subtract(other, self) def __truediv__(self, other): return np.divide(self, other) def __rtruediv__(self, other): return np.divide(other, self) def __np_build_axes__(self, other, axis=None): if axis is None or self.ndim == other.ndim: return deepcopy(self.__axes) else: axes = [] for i, ax in enumerate(self.__axes): if i != axis: axes.append(deepcopy(ax)) return axes def __array_function__(self, func, types, args, kwargs): if func.__name__ in SpeasyVariable.__LIKE_NP_FUNCTIONS__: return SpeasyVariable.__dict__[func.__name__].__func__(self) if 'out' in kwargs: raise ValueError("out parameter is not supported") f_args = [_values(arg) for arg in args] f_kwargs = {name: _values(value) for name, value in kwargs.items()} res = func(*f_args, **f_kwargs) if np.isscalar(res): return res if isinstance(res, np.ndarray): if len(res.shape) != self.shape and (res.shape[0] != len(self.time) or kwargs.get('axis', None) == 0): return res n_cols = res.shape[1] if len(res.shape) > 1 else 1 return SpeasyVariable( axes=self.__np_build_axes__(res, axis=kwargs.get('axis', None)), values=DataContainer(values=res, name=_np_build_result_name(func, *args, **kwargs), meta=deepcopy(self.__values_container.meta)), columns=[f"column_{i}" for i in range(n_cols)], ) def __array_ufunc__(self, ufunc, method, *inputs, out: 'SpeasyVariable' or None = None, **kwargs): if out is not None: _out = _values(out[0]) else: _out = None values = ufunc(*list(map(_values, inputs)), **{name: _values(value) for name, value in kwargs}, out=_out) axes = self.__np_build_axes__(values, axis=kwargs.get('axis', None)) if out is not None: if isinstance(out, SpeasyVariable): out.__axes = axes return out if type(values) is np.ndarray and values.dtype == bool: return np.all(values, axis=tuple(range(1, values.ndim)), keepdims=True) else: return SpeasyVariable( axes=axes, values=DataContainer(values=values, name=_np_build_result_name(ufunc, *inputs, **kwargs), meta=deepcopy(self.__values_container.meta)), columns=[f"column_{i}" for i in range(values.shape[1])], ) @property def ndim(self): return self.__values_container.ndim @property def shape(self): return self.__values_container.shape @property def dtype(self): return self.__values_container.dtype @property def name(self) -> str: """SpeasyVariable name Returns ------- str SpeasyVariable name """ return self.__values_container.name @property def values(self) -> np.array: """SpeasyVariable values Returns ------- np.array SpeasyVariable values """ return self.__values_container.values @property def time(self) -> np.array: """Time axis values, equivalent to var.axes[0].values Returns ------- np.array time axis values as numpy array of datetime64[ns] """ return self.__axes[0].values @property def meta(self) -> Dict: """SpeasyVariable meta-data Returns ------- Dict SpeasyVariable meta-data """ return self.__values_container.meta @property def axes(self) -> List[VariableTimeAxis or VariableAxis]: """SpeasyVariable axes, axis 0 is always a VariableTimeAxis, there should be the same number of axes than values dimensions Returns ------- List[VariableTimeAxis or VariableAxis] list of variable axes """ return self.__axes @property def axes_labels(self) -> List[str]: """Axes names respecting axes order Returns ------- List[str] list of axes names """ return [axis.name for axis in self.__axes] @property def columns(self) -> List[str]: """SpeasyVariable columns names when it makes sense Returns ------- List[str] list of columns names """ return self.__columns @property def unit(self) -> str: """SpeasyVariable unit if found in meta-data Returns ------- str unit if found in meta-data """ return self.__values_container.unit @property def nbytes(self) -> int: """SpeasyVariable's values and axes memory usage Returns ------- int number of bytes used to store values and axes """ return self.__values_container.nbytes + np.sum( list(map(lambda ax: ax.nbytes, self.__axes)) ) @property def fill_value(self) -> Optional[Any]: """SpeasyVariable fill value if found in meta-data Returns ------- Any fill value if found in meta-data """ return self.meta.get("FILLVAL", None) @property def valid_range(self) -> Optional[Tuple[Any, Any]]: """SpeasyVariable valid range if found in meta-data Returns ------- Tuple[Any, Any] valid range if found in meta-data """ return self.meta.get("VALIDMIN", None), self.meta.get("VALIDMAX", None)
[docs] def unit_applied(self, unit: str or None = None, copy=True) -> "SpeasyVariable": """Returns a SpeasyVariable with given or automatically found unit applied to values Parameters ---------- unit : str or None, optional Use given unit or gets one from variable metadata, by default None copy : bool, optional Preserves source variable and returns a modified copy if true, by default True Returns ------- SpeasyVariable SpeasyVariable identic to source one with values converted to astropy.units.Quantity according to given or found unit Notes ----- This interface assume that there is only one unit for the whole variable since all stored in the same array See Also -------- unit: returns variable unit if found in meta-data """ if copy: axes = deepcopy(self.__axes) values = deepcopy(self.__values_container) columns = deepcopy(self.__columns) else: axes = self.__axes values = self.__values_container columns = self.__columns return SpeasyVariable( axes=axes, values=values.unit_applied(unit), columns=columns )
[docs] def to_astropy_table(self) -> astropy.table.Table: """Convert the variable to an astropy.Table object. Parameters ---------- datetime_index: bool boolean indicating that the index is datetime Returns ------- astropy.Table: Variable converted to astropy.Table See Also -------- from_dataframe: builds a SpeasyVariable from a pandas DataFrame to_dataframe: exports a SpeasyVariable to a pandas DataFrame """ try: units = astropy.units.Unit(self.meta["UNITS"]) except (ValueError, KeyError): units = None df = self.to_dataframe() umap = {c: units for c in df.columns} return astropy.table.Table.from_pandas(df, units=umap, index=True)
[docs] def to_dataframe(self) -> pds.DataFrame: """Convert the variable to a pandas.DataFrame object. Returns ------- pandas.DataFrame: Variable converted to Pandas DataFrame See Also -------- from_dataframe: builds a SpeasyVariable from a pandas DataFrame to_astropy_table: exports a SpeasyVariable to an astropy.Table object """ if len(self.__values_container.shape) != 2: raise ValueError( f"Cant' convert a SpeasyVariable with shape {self.__values_container.shape} to DataFrame, only 1D/2D variables are accepted" ) return pds.DataFrame( index=self.time, data=self.values, columns=self.__columns, copy=True )
[docs] @staticmethod def from_dataframe(df: pds.DataFrame) -> "SpeasyVariable": """Load from pandas.DataFrame object. Parameters ---------- df: pandas.DataFrame Input DataFrame to convert Returns ------- SpeasyVariable: Variable created from DataFrame See Also -------- to_dataframe: exports a SpeasyVariable to a pandas DataFrame to_astropy_table: exports a SpeasyVariable to an astropy.Table object """ if df.index.dtype == np.dtype("datetime64[ns]"): time = np.array(df.index) elif hasattr(df.index[0], "timestamp"): time = np.array( [np.datetime64(d.timestamp() * 1e9, "ns") for d in df.index] ) else: raise ValueError( "Can't convert DataFrame index to datetime64[ns] array") return SpeasyVariable( axes=[VariableTimeAxis(values=time, meta={})], values=DataContainer(values=df.values, meta={}, name="Unknown"), columns=list(df.columns), )
[docs] def to_dictionary(self, array_to_list=False) -> Dict[str, object]: """Converts SpeasyVariable to dictionary Parameters ---------- array_to_list : bool, optional Converts numpy arrays to Python Lists when true, by default False Returns ------- Dict[str, object] See Also -------- from_dictionary: builds variable from dictionary """ return { "axes": [ axis.to_dictionary(array_to_list=array_to_list) for axis in self.__axes ], "values": self.__values_container.to_dictionary( array_to_list=array_to_list ), "columns": deepcopy(self.__columns), }
[docs] @staticmethod def from_dictionary(dictionary: Dict[str, object] or None) -> "SpeasyVariable" or None: """Builds a SpeasyVariable from a well formed dictionary Returns ------- SpeasyVariable or None See Also -------- to_dictionary: exports SpeasyVariable to dictionary """ if dictionary is not None: axes = dictionary["axes"] axes = [VariableTimeAxis.from_dictionary(axes[0])] + [ VariableAxis.from_dictionary(axis) for axis in axes[1:] ] return SpeasyVariable( values=DataContainer.from_dictionary(dictionary["values"]), axes=axes, columns=dictionary.get("columns", None), ) else: return None
@property def plot(self, *args, **kwargs): """Plot the variable, tries to do its best to detect variable type and to populate plot labels """ return Plot( values=self.__values_container, columns_names=self.columns, axes=self.axes )
[docs] def replace_fillval_by_nan(self, inplace=False) -> "SpeasyVariable": """Replaces fill values by NaN, non float values are automatically converted to float. Fill value is taken from metadata field "FILLVAL" Parameters ---------- inplace : bool, optional Modifies source variable when true else modifies and returns a copy, by default False Returns ------- SpeasyVariable source variable or copy with fill values replaced by NaN See Also -------- clamp_with_nan: replaces values outside valid range by NaN sanitized: removes fill and invalid values """ if inplace: res = self else: res = deepcopy(self) if (fill_value := self.fill_value) is not None: res[res == fill_value] = np.nan return res
[docs] def clamp_with_nan(self, inplace=False, valid_min=None, valid_max=None) -> "SpeasyVariable": """Replaces values outside valid range by NaN, valid range is taken from metadata fields "VALIDMIN" and "VALIDMAX" Parameters ---------- inplace : bool, optional Modifies source variable when true else modifies and returns a copy, by default False valid_min : Float, optional Optional minimum valid value, takes metadata field "VALIDMIN" if not provided, by default None valid_max : Float, optional Optional maximum valid value, takes metadata field "VALIDMAX" if not provided, by default None Returns ------- SpeasyVariable source variable or copy with values clamped by NaN See Also -------- replace_fillval_by_nan: replaces fill values by NaN sanitized: removes fill and invalid values """ if inplace: res = self else: res = deepcopy(self) valid_min = valid_min or self.valid_range[0] valid_max = valid_max or self.valid_range[1] res[np.logical_or(res > valid_max, res < valid_min)] = np.nan return res
[docs] def sanitized(self, drop_fill_values=True, drop_out_of_range_values=True, drop_nan_and_inf=True, inplace=False, valid_min=None, valid_max=None) -> "SpeasyVariable": """Returns a copy of the variable with fill values and invalid values removed Parameters ---------- drop_fill_values : bool, optional Remove fill values, by default True drop_out_of_range_values : bool, optional Remove values outside valid range, by default True drop_nan_and_inf : bool, optional Remove NaN and Infinite values, by default True inplace : bool, optional Modifies source variable when true else modifies and returns a copy, by default False valid_min : Float, optional Minimum valid value, takes metadata field "VALIDMIN" if not provided, by default None valid_max : Float, optional Maximum valid value, takes metadata field "VALIDMAX" if not provided, by default None Returns ------- SpeasyVariable source variable or copy with fill and invalid values removed See Also -------- replace_fillval_by_nan: replaces fill values by NaN clamp_with_nan: replaces values outside valid range by NaN """ if inplace: res = self else: res = deepcopy(self) indexes_without_nan = None indexes_without_fill = None indexes_without_invalid = None if drop_nan_and_inf: indexes_without_nan = np.isfinite(res).reshape(-1) if drop_fill_values and res.fill_value is not None: indexes_without_fill = np.all(res != res.fill_value, axis=tuple(range(1, res.ndim))) if drop_out_of_range_values: valid_min = valid_min or res.valid_range[0] valid_max = valid_max or res.valid_range[1] if valid_min is not None and valid_max is not None: indexes_without_invalid = np.logical_and( res >= valid_min, res <= valid_max ).reshape(-1) return res[ np.logical_and( indexes_without_nan, indexes_without_fill, indexes_without_invalid ) ]
[docs] @staticmethod def reserve_like(other: "SpeasyVariable", length: int = 0) -> "SpeasyVariable": """Create a SpeasyVariable of given length and with the same properties than given variable but unset values Parameters ---------- other : SpeasyVariable variable used as reference for shape and meta-data length : int, optional output variable length, by default 0 Returns ------- SpeasyVariable a SpeasyVariable similar to given one of given length """ axes = [] for axis in other.__axes: if axis.is_time_dependent: new_axis = type(axis).reserve_like(axis, length) axes.append(new_axis) else: axes.append(deepcopy(axis)) return SpeasyVariable( values=DataContainer.reserve_like( other.__values_container, length), axes=axes, columns=other.columns, )
[docs] @staticmethod def empty_like(other: "SpeasyVariable") -> "SpeasyVariable": """Create a SpeasyVariable with the same properties than given variable but unset values Parameters ---------- other : SpeasyVariable variable used as reference for shape and meta-data Returns ------- SpeasyVariable a SpeasyVariable similar to given one """ return SpeasyVariable( values=DataContainer.empty_like(other.__values_container), axes=deepcopy(other.__axes), columns=deepcopy(other.columns), )
[docs] @staticmethod def zeros_like(other: "SpeasyVariable") -> "SpeasyVariable": """Create a SpeasyVariable with the same properties than given variable but filled with zeros Parameters ---------- other : SpeasyVariable variable used as reference for shape and meta-data Returns ------- SpeasyVariable a SpeasyVariable similar to given one filled with zeros """ return SpeasyVariable( values=DataContainer.zeros_like(other.__values_container), axes=deepcopy(other.__axes), columns=deepcopy(other.columns), )
[docs] @staticmethod def ones_like(other: "SpeasyVariable") -> "SpeasyVariable": """Create a SpeasyVariable with the same properties than given variable but filled with ones Parameters ---------- other : SpeasyVariable variable used as reference for shape and meta-data Returns ------- SpeasyVariable a SpeasyVariable similar to given one filled with ones """ return SpeasyVariable( values=DataContainer.ones_like(other.__values_container), axes=deepcopy(other.__axes), columns=deepcopy(other.columns), )
def _repr_pretty_(self, p, cycle): import humanize if cycle: p.text("SpeasyVariable(...)") else: def _print_member(name, value, last=False): p.text(f"{name}: ") p.pretty(value) if not last: p.text(", ") p.breakable() def _print_dict(name, d, last=False): p.text(f"{name}: ") with p.group(4, '{', '}'): p.breakable() for k, v in d.items(): p.text(f"{k}: ") p.pretty(v) p.text(", ") p.breakable() if not last: p.text(", ") p.breakable() def _print_time_range(): p.text(f"Time Range: {self.time[0]} - {self.time[-1]}") p.breakable() with p.group(4, f'{self.__class__.__name__}(', ')'): p.breakable() _print_member("Name", self.name) if len(self): _print_time_range() _print_member("Shape", self.shape) _print_member("Unit", self.unit) _print_member("Columns", self.columns) _print_dict("Meta", self.meta) _print_member("Size", humanize.naturalsize(self.nbytes))
[docs] def to_dictionary(var: SpeasyVariable, array_to_list=False) -> Dict[str, object]: return var.to_dictionary(array_to_list=array_to_list)
[docs] def from_dictionary(dictionary: Dict[str, object] or None) -> SpeasyVariable or None: return SpeasyVariable.from_dictionary(dictionary)
[docs] def from_dataframe(df: pds.DataFrame) -> SpeasyVariable: """Convert a dataframe to SpeasyVariable. See Also -------- SpeasyVariable.from_dataframe """ return SpeasyVariable.from_dataframe(df)
[docs] def to_dataframe(var: SpeasyVariable) -> pds.DataFrame: """Convert a :class:`~speasy.common.variable.SpeasyVariable` to pandas.DataFrame. See Also -------- SpeasyVariable.to_dataframe """ return SpeasyVariable.to_dataframe(var)
[docs] def merge(variables: List[SpeasyVariable]) -> Optional[SpeasyVariable]: """Merge a list of :class:`~speasy.common.variable.SpeasyVariable` objects. Parameters ---------- variables: List[SpeasyVariable] Variables to merge together Returns ------- SpeasyVariable: Resulting variable from merge operation """ if len(variables) == 0: return None sorted_var_list = [v for v in variables if ( v is not None) and (len(v.time) > 0)] sorted_var_list.sort(key=lambda v: v.time[0]) # drop variables covered by previous ones for prev, current in zip(sorted_var_list[:-1], sorted_var_list[1:]): if prev.time[-1] >= current.time[-1]: sorted_var_list.remove(current) # drop variables covered by next ones for current, nxt in zip(sorted_var_list[:-1], sorted_var_list[1:]): if nxt.time[0] == current.time[0] and nxt.time[-1] >= current.time[-1]: sorted_var_list.remove(current) if len(sorted_var_list) == 0: for v in variables: if v is not None: return SpeasyVariable.reserve_like(v, length=0) return None overlaps = [ np.where(current.time >= nxt.time[0])[0][0] if current.time[-1] >= nxt.time[0] else -1 for current, nxt in zip(sorted_var_list[:-1], sorted_var_list[1:]) ] dest_len = int( np.sum( [ overlap if overlap != -1 else len(r.time) for overlap, r in zip(overlaps, sorted_var_list[:-1]) ] ) ) dest_len += len(sorted_var_list[-1].time) result = SpeasyVariable.reserve_like(sorted_var_list[0], dest_len) pos = 0 for r, overlap in zip(sorted_var_list, overlaps + [-1]): frag_len = len(r.time) if overlap == -1 else overlap result[pos: (pos + frag_len)] = r[0:frag_len] pos += frag_len return result
[docs] def same_time_axis(variables: List[SpeasyVariable]) -> bool: """Check if all variables have the same time axis values and length If only one variable is provided, it returns True. Parameters ---------- variables : List[SpeasyVariable] list of variables to check Returns ------- bool True if all variables have the same time axis values and length """ if len(variables) < 2: return True ref_time_axis = variables[0].time return all([np.all(var.time == ref_time_axis) for var in variables[1:]])