from copy import deepcopy
from typing import Dict, List, Optional
import astropy.table
import astropy.units
import numpy as np
import pandas as pds
from speasy.core.data_containers import (
DataContainer,
VariableAxis,
VariableTimeAxis,
_to_index,
)
from speasy.plotting import Plot
from .base_product import SpeasyProduct
[docs]
class SpeasyVariable(SpeasyProduct):
"""SpeasyVariable object. Base class for storing variable data.
Attributes
----------
time: numpy.ndarray
time vector (x-axis data)
values: numpy.ndarray
data
meta: Optional[dict]
metadata
columns: Optional[List[str]]
column names, might be empty for spectrograms or 3D+ data
axes: List[np.ndarray]
Collection composed of time axis plus eventual additional axes according to values' shape
axes_labels: List[str]
Axes names
unit: str
Values physical unit
name: str
SpeasyVariable name
nbytes: int
memory usage in bytes
Methods
-------
view:
Returns a view of the current variable within the desired :data:`index_range`
to_dataframe:
Converts the variable to a pandas.DataFrame object
from_dataframe:
Builds a SpeasyVariable from a pandas.DataFrame object
to_astropy_table:
Converts the variable to an astropy.table.Table
unit_applied:
Returns a copy where values are astropy.units.Quantity
filter_columns:
Returns a copy only containing selected columns
replace_fillval_by_nan:
Returns a SpeasyVaraible with NaN instead of fill value if fill value is set in meta data
plot:
Plot the data with matplotlib by default
to_dictionary:
Converts a SpeasyVariable to a Python dictionary, mostly used for serialization purposes
copy:
Returns a copy
"""
__slots__ = ["__values_container", "__columns", "__axes"]
def __init__(
self,
axes: List[VariableAxis or VariableTimeAxis],
values: DataContainer,
columns: Optional[List[str]] = None,
):
super().__init__()
if not isinstance(axes[0], VariableTimeAxis):
raise TypeError(
f"axes[0] must be a VariableTimeAxis instance, got {type(axes[0])}"
)
if axes[0].shape[0] != values.shape[0]:
raise ValueError(
f"Time and data must have the same length, got time:{len(axes[0])} and data:{len(values)}"
)
self.__columns = list(map(str.strip, columns or []))
if len(values.values.shape) == 1:
# to be consistent with pandas
values.reshape((values.shape[0], 1))
self.__values_container = values
self.__axes = axes
[docs]
def view(self, index_range: slice) -> "SpeasyVariable":
"""Return view of the current variable within the desired :data:`index_range`.
Parameters
----------
index_range: slice
index range
Returns
-------
speasy.common.variable.SpeasyVariable
view of the variable on the given range
"""
return SpeasyVariable(
axes=[
axis[index_range] if axis.is_time_dependent else axis
for axis in self.__axes
],
values=self.__values_container[index_range],
columns=self.columns,
)
[docs]
def copy(self) -> "SpeasyVariable":
"""Makes a deep copy the variable
Returns
-------
SpeasyVariable
deep copy the variable
"""
return SpeasyVariable(
axes=deepcopy(self.__axes),
values=deepcopy(self.__values_container),
columns=deepcopy(self.columns),
)
[docs]
def filter_columns(self, columns: List[str]) -> "SpeasyVariable":
"""Builds a SpeasyVariable with only selected columns
Parameters
----------
columns : List[str]
list of column names to keep
Returns
-------
SpeasyVariable
a SpeasyVariable with only selected columns
"""
indexes = list(map(lambda v: self.__columns.index(v), columns))
return SpeasyVariable(
axes=deepcopy(self.__axes),
values=DataContainer(
is_time_dependent=self.__values_container.is_time_dependent,
name=self.__values_container.name,
meta=deepcopy(self.__values_container.meta),
values=self.__values_container.values[:, indexes],
),
columns=columns,
)
def __eq__(self, other: "SpeasyVariable") -> bool:
"""Check if this variable equals another.
Parameters
----------
other: speasy.common.variable.SpeasyVariable
another SpeasyVariable object to compare with
Returns
-------
bool:
True if all attributes are equal
"""
return (
type(other) is SpeasyVariable
and self.__axes == other.__axes
and self.__values_container == other.__values_container
)
def __len__(self):
return len(self.__axes[0])
def __getitem__(self, key):
if isinstance(key, slice):
return self.view(
slice(_to_index(key.start, self.time),
_to_index(key.stop, self.time))
)
if type(key) in (list, tuple) and all(map(lambda v: type(v) is str, key)):
return self.filter_columns(key)
if type(key) is str and key in self.__columns:
return self.filter_columns([key])
raise ValueError(
f"No idea how to slice SpeasyVariable with given value: {key}")
def __setitem__(self, k, v: "SpeasyVariable"):
assert type(v) is SpeasyVariable
self.__values_container[k] = v.__values_container
for axis, src_axis in zip(self.__axes, v.__axes):
if axis.is_time_dependent:
axis[k] = src_axis
@property
def name(self) -> str:
"""SpeasyVariable name
Returns
-------
str
SpeasyVariable name
"""
return self.__values_container.name
@property
def values(self) -> np.array:
"""SpeasyVariable values
Returns
-------
np.array
SpeasyVariable values
"""
return self.__values_container.values
@property
def time(self) -> np.array:
"""Time axis values, equivalent to var.axes[0].values
Returns
-------
np.array
time axis values as numpy array of datetime64[ns]
"""
return self.__axes[0].values
@property
def meta(self) -> Dict:
"""SpeasyVariable meta-data
Returns
-------
Dict
SpeasyVariable meta-data
"""
return self.__values_container.meta
@property
def axes(self) -> List[VariableTimeAxis or VariableAxis]:
"""SpeasyVariable axes, axis 0 is always a VariableTimeAxis, there should be the same number of axes than values dimensions
Returns
-------
List[VariableTimeAxis or VariableAxis]
list of variable axes
"""
return self.__axes
@property
def axes_labels(self) -> List[str]:
"""Axes names respecting axes order
Returns
-------
List[str]
list of axes names
"""
return [axis.name for axis in self.__axes]
@property
def columns(self) -> List[str]:
"""SpeasyVariable columns names when it makes sense
Returns
-------
List[str]
list of columns names
"""
return self.__columns
@property
def unit(self) -> str:
"""SpeasyVariable unit if found in meta-data
Returns
-------
str
unit if found in meta-data
"""
return self.__values_container.unit
@property
def nbytes(self) -> int:
"""SpeasyVariable's values and axes memory usage
Returns
-------
int
number of bytes used to store values and axes
"""
return self.__values_container.nbytes + np.sum(
list(map(lambda ax: ax.nbytes, self.__axes))
)
[docs]
def unit_applied(self, unit: str or None = None, copy=True) -> "SpeasyVariable":
"""Returns a SpeasyVariable with given or automatically found unit applied to values
Parameters
----------
unit : str or None, optional
Use given unit or gets one from variable metadata, by default None
copy : bool, optional
Preserves source variable and returns a modified copy if true, by default True
Returns
-------
SpeasyVariable
SpeasyVariable identic to source one with values converted to astropy.units.Quantity according to given or found unit
See Also
--------
unit: returns variable unit if found in meta-data
"""
if copy:
axes = deepcopy(self.__axes)
values = deepcopy(self.__values_container)
columns = deepcopy(self.__columns)
else:
axes = self.__axes
values = self.__values_container
columns = self.__columns
return SpeasyVariable(
axes=axes, values=values.unit_applied(unit), columns=columns
)
[docs]
def to_astropy_table(self) -> astropy.table.Table:
"""Convert the variable to an astropy.Table object.
Parameters
----------
datetime_index: bool
boolean indicating that the index is datetime
Returns
-------
astropy.Table:
Variable converted to astropy.Table
See Also
--------
from_dataframe: builds a SpeasyVariable from a pandas DataFrame
to_dataframe: exports a SpeasyVariable to a pandas DataFrame
"""
try:
units = astropy.units.Unit(self.meta["UNITS"])
except (ValueError, KeyError):
units = None
df = self.to_dataframe()
umap = {c: units for c in df.columns}
return astropy.table.Table.from_pandas(df, units=umap, index=True)
[docs]
def to_dataframe(self) -> pds.DataFrame:
"""Convert the variable to a pandas.DataFrame object.
Returns
-------
pandas.DataFrame:
Variable converted to Pandas DataFrame
See Also
--------
from_dataframe: builds a SpeasyVariable from a pandas DataFrame
to_astropy_table: exports a SpeasyVariable to an astropy.Table object
"""
if len(self.__values_container.shape) != 2:
raise ValueError(
f"Cant' convert a SpeasyVariable with shape {self.__values_container.shape} to DataFrame, only 1D/2D variables are accepted"
)
return pds.DataFrame(
index=self.time, data=self.values, columns=self.__columns, copy=True
)
[docs]
@staticmethod
def from_dataframe(df: pds.DataFrame) -> "SpeasyVariable":
"""Load from pandas.DataFrame object.
Parameters
----------
df: pandas.DataFrame
Input DataFrame to convert
Returns
-------
SpeasyVariable:
Variable created from DataFrame
See Also
--------
to_dataframe: exports a SpeasyVariable to a pandas DataFrame
to_astropy_table: exports a SpeasyVariable to an astropy.Table object
"""
if df.index.dtype == np.dtype("datetime64[ns]"):
time = np.array(df.index)
elif hasattr(df.index[0], "timestamp"):
time = np.array(
[np.datetime64(d.timestamp() * 1e9, "ns") for d in df.index]
)
else:
raise ValueError(
"Can't convert DataFrame index to datetime64[ns] array")
return SpeasyVariable(
axes=[VariableTimeAxis(values=time, meta={})],
values=DataContainer(values=df.values, meta={}, name="Unknown"),
columns=list(df.columns),
)
[docs]
def to_dictionary(self, array_to_list=False) -> Dict[str, object]:
"""Converts SpeasyVariable to dictionary
Parameters
----------
array_to_list : bool, optional
Converts numpy arrays to Python Lists when true, by default False
Returns
-------
Dict[str, object]
See Also
--------
from_dictionary: builds variable from dictionary
"""
return {
"axes": [
axis.to_dictionary(array_to_list=array_to_list) for axis in self.__axes
],
"values": self.__values_container.to_dictionary(
array_to_list=array_to_list
),
"columns": deepcopy(self.__columns),
}
[docs]
@staticmethod
def from_dictionary(dictionary: Dict[str, object] or None) -> "SpeasyVariable" or None:
"""Builds a SpeasyVariable from a well formed dictionary
Returns
-------
SpeasyVariable or None
See Also
--------
to_dictionary: exports SpeasyVariable to dictionary
"""
if dictionary is not None:
axes = dictionary["axes"]
axes = [VariableTimeAxis.from_dictionary(axes[0])] + [
VariableAxis.from_dictionary(axis) for axis in axes[1:]
]
return SpeasyVariable(
values=DataContainer.from_dictionary(dictionary["values"]),
axes=axes,
columns=dictionary.get("columns", None),
)
else:
return None
@property
def plot(self, *args, **kwargs):
"""Plot the variable, tries to do its best to detect variable type and to populate plot labels
"""
return Plot(
values=self.__values_container, columns_names=self.columns, axes=self.axes
)
[docs]
def replace_fillval_by_nan(self, inplace=False) -> "SpeasyVariable":
"""Replaces fill values by NaN, non float values are automatically converted to float.
Fill value is taken from metadata field "FILLVAL"
Parameters
----------
inplace : bool, optional
Modifies source variable when true else modifies and returns a copy, by default False
Returns
-------
SpeasyVariable
source variable or copy with fill values replaced by NaN
"""
if inplace:
res = self
else:
res = deepcopy(self)
if "FILLVAL" in res.meta:
res.__values_container.replace_val_by_nan(res.meta["FILLVAL"])
return res
[docs]
@staticmethod
def reserve_like(other: "SpeasyVariable", length: int = 0) -> "SpeasyVariable":
"""Create a SpeasyVariable of given length and with the same properties than given variable but unset values
Parameters
----------
other : SpeasyVariable
variable used as reference for shape and meta-data
length : int, optional
output variable length, by default 0
Returns
-------
SpeasyVariable
a SpeasyVariable similar to given one of given length
"""
axes = []
for axis in other.__axes:
if axis.is_time_dependent:
new_axis = type(axis).reserve_like(axis, length)
axes.append(new_axis)
else:
axes.append(deepcopy(axis))
return SpeasyVariable(
values=DataContainer.reserve_like(
other.__values_container, length),
axes=axes,
columns=other.columns,
)
[docs]
def to_dictionary(var: SpeasyVariable, array_to_list=False) -> Dict[str, object]:
return var.to_dictionary(array_to_list=array_to_list)
[docs]
def from_dictionary(dictionary: Dict[str, object] or None) -> SpeasyVariable or None:
return SpeasyVariable.from_dictionary(dictionary)
[docs]
def from_dataframe(df: pds.DataFrame) -> SpeasyVariable:
"""Convert a dataframe to SpeasyVariable.
See Also
--------
SpeasyVariable.from_dataframe
"""
return SpeasyVariable.from_dataframe(df)
[docs]
def to_dataframe(var: SpeasyVariable) -> pds.DataFrame:
"""Convert a :class:`~speasy.common.variable.SpeasyVariable` to pandas.DataFrame.
See Also
--------
SpeasyVariable.to_dataframe
"""
return SpeasyVariable.to_dataframe(var)
[docs]
def merge(variables: List[SpeasyVariable]) -> Optional[SpeasyVariable]:
"""Merge a list of :class:`~speasy.common.variable.SpeasyVariable` objects.
Parameters
----------
variables: List[SpeasyVariable]
Variables to merge together
Returns
-------
SpeasyVariable:
Resulting variable from merge operation
"""
if len(variables) == 0:
return None
sorted_var_list = [v for v in variables if (
v is not None) and (len(v.time) > 0)]
sorted_var_list.sort(key=lambda v: v.time[0])
# drop variables covered by previous ones
for prev, current in zip(sorted_var_list[:-1], sorted_var_list[1:]):
if prev.time[-1] >= current.time[-1]:
sorted_var_list.remove(current)
# drop variables covered by next ones
for current, nxt in zip(sorted_var_list[:-1], sorted_var_list[1:]):
if nxt.time[0] == current.time[0] and nxt.time[-1] >= current.time[-1]:
sorted_var_list.remove(current)
if len(sorted_var_list) == 0:
for v in variables:
if v is not None:
return SpeasyVariable.reserve_like(v, length=0)
return None
overlaps = [
np.where(current.time >= nxt.time[0])[0][0]
if current.time[-1] >= nxt.time[0]
else -1
for current, nxt in zip(sorted_var_list[:-1], sorted_var_list[1:])
]
dest_len = int(
np.sum(
[
overlap if overlap != -1 else len(r.time)
for overlap, r in zip(overlaps, sorted_var_list[:-1])
]
)
)
dest_len += len(sorted_var_list[-1].time)
result = SpeasyVariable.reserve_like(sorted_var_list[0], dest_len)
pos = 0
for r, overlap in zip(sorted_var_list, overlaps + [-1]):
frag_len = len(r.time) if overlap == -1 else overlap
result[pos: (pos + frag_len)] = r[0:frag_len]
pos += frag_len
return result