Source code for speasy.core.cdf

import io

import pyistp
import re
from ..any_files import any_loc_open
from ..url_utils import urlparse, is_local_file
from ...products import SpeasyVariable, VariableAxis, VariableTimeAxis, DataContainer


def _fix_value_type(value):
    if type(value) in (str, int, float):
        return value
    if type(value) is list:
        return [_fix_value_type(sub_v) for sub_v in value]
    return str(value)


def _fix_attributes_types(attributes: dict):
    cleaned = {}
    for key, value in attributes.items():
        cleaned[key] = _fix_value_type(value)
    return cleaned


def _is_time_dependent(axis, time_axis_name):
    if axis.attributes.get('DEPEND_TIME', '') == time_axis_name:
        return True
    if axis.attributes.get('DEPEND_0', '') == time_axis_name:
        return True
    return False


def _make_axis(axis, time_axis_name):
    return VariableAxis(values=axis.values.copy(), meta=_fix_attributes_types(axis.attributes), name=axis.name,
                        is_time_dependent=_is_time_dependent(axis, time_axis_name))


def _build_labels(variable: pyistp.loader.DataVariable):
    if len(variable.values.shape) != 2:
        return variable.labels
    if type(variable.labels) is list and len(variable.labels) == variable.values.shape[1]:
        return variable.labels
    if type(variable.labels) is list and len(variable.labels) == 1:
        return [f"{variable.labels[0]}[{i}]" for i in range(variable.values.shape[1])]
    return [f"component_{i}" for i in range(variable.values.shape[1])]


def _load_variable(variable="", file=None, buffer=None) -> SpeasyVariable or None:
    istp = pyistp.load(file=file, buffer=buffer)
    if istp is not None:
        if variable in istp.data_variables():
            var = istp.data_variable(variable)
        elif variable.replace('-', '_') in istp.data_variables():  # THX CSA/ISTP
            var = istp.data_variable(variable.replace('-', '_'))
        else:  # CDA https://cdaweb.gsfc.nasa.gov/WebServices/REST/#Get_Data_GET
            alternative = re.sub(r"[\\/.%!@#^&*()\-+=`~|?<> ]", "$", variable)
            if alternative in istp.data_variables():
                var = istp.data_variable(alternative)
            else:
                return None
        if var is not None:
            time_axis_name = var.axes[0].name
            return SpeasyVariable(
                axes=[VariableTimeAxis(values=var.axes[0].values.copy(),
                                       meta=_fix_attributes_types(var.axes[0].attributes))] + [
                         _make_axis(axis, time_axis_name) for axis in var.axes[1:]],
                values=DataContainer(values=var.values.copy(), meta=_fix_attributes_types(var.attributes),
                                     name=var.name,
                                     is_time_dependent=True),
                columns=_build_labels(var))
    return None


[docs] def load_variable(variable, file: bytes or str or io.IOBase, cache_remote_files=True) -> SpeasyVariable or None: if type(file) is str: if is_local_file(file): return _load_variable(variable=variable, file=urlparse(url=file).path) return _load_variable(variable=variable, buffer=any_loc_open(file, mode='rb', cache_remote_files=cache_remote_files).read()) if type(file) is bytes: return _load_variable(variable=variable, buffer=bytes) if hasattr(file, 'read'): return _load_variable(variable=variable, buffer=file.read()) return None