Source code for speasy.core.codecs.bundled_codecs.hapi.codec

from datetime import timedelta

import numpy as np

from typing import Any, AnyStr, Dict, List, Mapping

from speasy.core.cache._function_cache import CacheCall
from speasy.core.codecs.codec_interface import CodecInterface
from speasy.core.data_containers import DataContainer, VariableAxis, VariableTimeAxis
from speasy.products.variable import SpeasyVariable, same_time_axis

from .hapi_file import HapiFile, HapiParameter

import logging
log = logging.getLogger(__name__)

def _time_dependent_axis_name(ax: VariableAxis) -> str:
    return f"{ax.name}_centers_time_varying"

def _get_variable_axes(variable: SpeasyVariable, is_time_dependent: bool) -> List[VariableAxis]:
    return [ax for ax in variable.axes[1:] if ax.is_time_dependent == is_time_dependent]

def _decode_meta(meta: Dict[str, Any]) -> Dict[str, Any]:
    if "units" in meta:
        meta["UNITS"] = meta.pop("units")
    return meta

def _make_hapi_time_axis(time_axis: VariableTimeAxis) -> HapiParameter:
    return HapiParameter(values=time_axis.values,
                            meta={"name": "Time", "type": "isotime", "units": "UTC", "length": 24, "fill": None})

def _make_hapi_parameter(variable: SpeasyVariable) -> HapiParameter:
    return HapiParameter(values=variable.values,
                            meta=_create_meta(variable))

def _numpy_dtype_to_hapi_type(dtype: np.dtype) -> str:
    if  np.issubdtype(dtype, np.integer):
        return "int"
    elif np.issubdtype(dtype, np.floating):
        return "double"
    else:
        raise ValueError(f"Unsupported data type {dtype}")


def _create_meta(variable:SpeasyVariable) -> Dict[str, Any]:
    meta = {
        "name": variable.name,
        "units": variable.unit,
        "fill": variable.fill_value,
        "description": variable.meta.get("description", "")
    }
    meta["type"] = _numpy_dtype_to_hapi_type(variable.values.dtype)

    labels  =  variable.columns
    if labels is not None and len(labels) > 0:
        meta["label"] = labels

    if "coordinateSystemName" in variable.meta:
        meta["coordinateSystemName"] = variable.meta["coordinateSystemName"]
    if "vectorComponents" in variable.meta:
        meta["vectorComponents"] = variable.meta["vectorComponents"]

    if  len(variable.values.shape) > 1:
        meta["size"] = variable.values.shape[1:]

    bins = []
    time_independent_axes = _get_variable_axes(variable, is_time_dependent=False)
    if time_independent_axes:
        bins.extend([
            {"name": ax.name, "units": ax.unit, "centers": ax.values.tolist()}
            for ax in time_independent_axes
        ])

    time_dependent_axes = _get_variable_axes(variable, is_time_dependent=True)
    if time_dependent_axes:
        bins.extend([
            {"name": ax.name, "units": ax.unit, "centers": _time_dependent_axis_name(ax)}
            for ax in time_dependent_axes
        ])

    if bins:
        meta["bins"] = bins

    return meta

def _get_hapi_varying_axes(variable: SpeasyVariable) -> List[HapiParameter]:
    result = []
    for ax in _get_variable_axes(variable, is_time_dependent=True):
        # VariableTimeAxis has member 'unit' not 'units'
        # but HapiParameter expects 'units' in meta
        meta = {
            "name": _time_dependent_axis_name(ax),
            "units": ax.unit,
            "size": [ax.values.shape[1]]
        }
        meta["type"] = _numpy_dtype_to_hapi_type(ax.values.dtype)
        result.append(HapiParameter(values=ax.values, meta=meta))
    return result


def _speasy_variables_to_hapi(variables: List[SpeasyVariable]) -> HapiFile:
    if not same_time_axis(variables):
        raise ValueError("All variables must have the same time axis")
    if len(variables) == 0:
        raise ValueError("No variables to save")
    hapi_file = HapiFile()
    hapi_file.add_parameter(_make_hapi_time_axis(variables[0].axes[0]))
    for spz_var in variables:
        # add spz_var as hapi_param
        hapi_file.add_parameter(_make_hapi_parameter(spz_var))
        # add spz_var.axes as hapi_param if time_varying axis
        for hapi_axis_parameter in _get_hapi_varying_axes(spz_var):
            hapi_file.add_parameter(hapi_axis_parameter) 
    return hapi_file

def _bin_to_axis(json_bin: Dict[str, Any], hap_file: HapiFile) -> VariableAxis:
    centers = json_bin.get("centers")
    name = json_bin.get("name", "bin_axis")
    if centers is None:
        raise ValueError("Invalid bin specification: missing 'centers' field")
    if isinstance(centers, str):
        hapi_parameter = hap_file.get_parameter(centers)
        if hapi_parameter is None:
            raise ValueError(f"Unknown parameter referenced in bins: {centers}")
        _meta = _decode_meta(hapi_parameter.meta)
        variable_axis = VariableAxis(values=hapi_parameter.values,
                                     meta=_meta,
                                     is_time_dependent=True,
                                     name=name)
    elif isinstance(centers, list):
        try:
            axis_values = np.array(centers, dtype=float)
        except ValueError:
            raise ValueError("Invalid bin specification: 'centers' list must contain numeric values")
        variable_axis = VariableAxis(values=axis_values,
                                     meta={"name": "centers", "UNITS": json_bin.get("units", None)},
                                     is_time_dependent=False,
                                     name=name)
    else:
        raise ValueError("Invalid bin specification: 'centers' must be either a string or a list")
    return variable_axis


def _bins_to_axes(json_bins: List[Dict[str, Any]], hap_file: HapiFile) -> List[VariableAxis]:
    axes = []
    for json_bin in json_bins:
        try:
            axis = _bin_to_axis(json_bin, hap_file)
            axes.append(axis)
        except ValueError as e:
            log.warning(f"Skipping invalid bin specification: {e}")
    return axes

def _hapifile_to_speasy_variables(hapi_file: HapiFile, variables: List[str]) -> Mapping[str, SpeasyVariable]:
    time_axis = VariableTimeAxis(values=hapi_file.time_axis, meta=hapi_file.time_axis_meta)
    loaded_vars = {}
    for var_name in variables:
        parameter = hapi_file.get_parameter(var_name)
        if parameter is None:
            continue
        _axes: List[VariableAxis] = [time_axis]
        if 'bins' in parameter.meta.keys():
            _axes.extend(_bins_to_axes(parameter.meta.get("bins", []), hapi_file))
        loaded_vars[var_name] = SpeasyVariable(axes=_axes, values=DataContainer(parameter.values,
                                                                                name=parameter.name,
                                                                                meta=_decode_meta(
                                                                                parameter.meta)))
    return loaded_vars


[docs] class HapiBaseCodec(CodecInterface): def __init__(self, loader, saver): self._loader = loader self._saver = saver
[docs] def load_variables(self, variables, file, cache_remote_files=True, **kwargs): hapi_file = self._loader(file) if hapi_file is not None: return _hapifile_to_speasy_variables(hapi_file, variables) return None
[docs] @CacheCall(cache_retention=timedelta(seconds=120), is_pure=True) def load_variable(self, variable, file, cache_remote_files=True, **kwargs): return self.load_variables([variable], file, cache_remote_files)[variable]
[docs] def save_variables(self, variables, file=None, **kwargs): hapi_file = _speasy_variables_to_hapi(variables) return self._saver(hapi_file, file, **kwargs)
@property def supported_extensions(self): return [] @property def supported_mimetypes(self): return []