Source code for speasy.core.codecs.bundled_codecs.istp.cdf

from typing import List, AnyStr, Optional, Mapping, Union
import io
import logging
from datetime import timedelta

import numpy as np
import pycdfpp
import pyistp

from speasy.core.codecs import CodecInterface, register_codec, Buffer
from speasy.core.cache import CacheCall
from speasy.products import SpeasyVariable, VariableAxis

import re

from . import _load_variable, _resolve_url_type, _simplify_shape

log = logging.getLogger(__name__)
_PTR_rx = re.compile(r".*_PTR(_\d+)?")


def _load_variables(variables, file=None, buffer=None, master_file=None, master_buffer=None):
    istp_loader = pyistp.load(file=file, buffer=buffer, master_file=master_file, master_buffer=master_buffer)
    if istp_loader is not None:
        return {variable: _load_variable(istp_loader, variable) for variable in variables}
    return None


def _convert_attributes_to_variables(variable_name: str, attrs: Mapping, cdf: pycdfpp.CDF):
    clean_attrs = {}
    for name, attr_v in attrs.items():
        target_name = f"{variable_name}_{name}_{variable_name}"
        if _PTR_rx.match(name):
            cdf.add_variable(
                name=target_name,
                values=attr_v
            )
            clean_attrs[name] = target_name
        else:
            clean_attrs[name] = attr_v
    return clean_attrs


def _write_axis(ax: VariableAxis, cdf: pycdfpp.CDF, compress_variables=False) -> bool:
    data_type = None
    if ax.values.dtype == np.dtype("datetime64[ns]"):
        data_type = pycdfpp.DataType.CDF_TIME_TT2000
    cdf.add_variable(
        name=ax.name,
        values=_simplify_shape(ax.values),
        data_type=data_type,
        compression=pycdfpp.CompressionType.gzip_compression if compress_variables else pycdfpp.CompressionType.no_compression
    )
    return True


def _write_variable(v: SpeasyVariable, cdf: pycdfpp.CDF, already_saved_axes: List[VariableAxis],
                    compress_variables=False) -> None:
    def _already_in_cdf(ax: VariableAxis):
        for _ax in already_saved_axes:
            if _ax == ax:
                return _ax.name
        return None

    depends = {}
    for index, ax in enumerate(v.axes):
        a = _already_in_cdf(ax)
        if a is None:
            _write_axis(ax, cdf, compress_variables)
            depends[f"DEPEND_{index}"] = ax.name
            already_saved_axes.append(ax)
        else:
            depends[f"DEPEND_{index}"] = a
    attributes = v.meta
    attributes.update(depends)
    cdf.add_variable(
        name=v.name,
        values=_simplify_shape(v.values),
        attributes=_convert_attributes_to_variables(variable_name=v.name, attrs=attributes, cdf=cdf),
        compression=pycdfpp.CompressionType.gzip_compression if compress_variables else pycdfpp.CompressionType.no_compression
    )


[docs] @register_codec class IstpCdf(CodecInterface): """Codec for ISTP CDF files. This codec is a wrapper around PyISTP library. It supports some variations around the ISTP standard."""
[docs] def load_variables(self, variables: List[AnyStr], file: Union[Buffer, str, io.IOBase], cache_remote_files=True, master_cdf_url: Optional[Union[Buffer, str, io.IOBase]] = None, **kwargs ) -> Optional[Mapping[AnyStr, SpeasyVariable]]: kwargs["variables"] = variables kwargs.update((_resolve_url_type(file, prefix="", cache_remote_files=cache_remote_files), _resolve_url_type(master_cdf_url, prefix="master_", cache_remote_files=cache_remote_files))) return _load_variables(**kwargs)
[docs] @CacheCall(cache_retention=timedelta(seconds=120), is_pure=True) def load_variable(self, variable: AnyStr, file: Union[Buffer, str, io.IOBase], cache_remote_files=True, master_cdf_url: Optional[Union[Buffer, str, io.IOBase]] = None, **kwargs ) -> Optional[SpeasyVariable]: r = self.load_variables(variables=[variable], file=file, master_cdf_url=master_cdf_url, cache_remote_files=cache_remote_files, **kwargs) if r is not None: return r.get(variable) return None
[docs] def save_variables(self, variables: List[SpeasyVariable], file: Optional[Union[str, io.IOBase]] = None, compress_variables=False, **kwargs ) -> Union[bool, Buffer]: cdf = pycdfpp.CDF() axes = [] for variable in variables: if not isinstance(variable, SpeasyVariable): raise ValueError(f"Expected SpeasyVariable, got {type(variable)}") _write_variable(variable, cdf, axes, compress_variables) if type(file) is str: pycdfpp.save(cdf, file) return True elif isinstance(file, io.IOBase): file.write(pycdfpp.save(cdf)) return True elif file is None: return memoryview(pycdfpp.save(cdf)) return False
@property def supported_extensions(self) -> List[str]: return ["cdf"] @property def supported_mimetypes(self) -> List[str]: return ["application/x-cdf"] @property def name(self) -> str: return self.__class__.__name__