Source code for speasy.webservices.generic_archive

# -*- coding: utf-8 -*-

"""cda package for Space Physics WebServices Client."""

__author__ = """Alexis Jeandet"""
__email__ = 'alexis.jeandet@member.fsf.org'
__version__ = '0.1.0'

import logging
from typing import Optional
from datetime import timedelta

from speasy.config import SPEASY_CONFIG_DIR
from speasy.config import archive as cfg
from speasy.core import AnyDateTimeType, AllowedKwargs
from speasy.core.cdf.inventory_extractor import make_dataset_index
from speasy.core.dataprovider import DataProvider, GET_DATA_ALLOWED_KWARGS
from speasy.core.direct_archive_downloader import get_product
from speasy.core.inventory.indexes import SpeasyIndex, ParameterIndex
from speasy.core.http import is_server_up
from speasy.core.url_utils import host_and_port, is_local_file
from speasy.products.variable import SpeasyVariable
from speasy.core.cache import CacheCall, CACHE_ALLOWED_KWARGS

log = logging.getLogger(__name__)


def _global_inventory_dir():
    import os
    return os.path.join(os.path.dirname(__file__), "../../data/archive")


[docs] def user_inventory_dir(): import os return os.path.join(SPEASY_CONFIG_DIR, "archive")
[docs] def get_or_make_node(path: str, root: SpeasyIndex) -> SpeasyIndex: parts = path.split('/', maxsplit=1) name = parts[0] if name not in root.__dict__: root.__dict__[name] = SpeasyIndex(name=name, provider='archive', uid='') if len(parts) == 1: return root.__dict__[name] return get_or_make_node(parts[1], root.__dict__[name])
@CacheCall(cache_retention=timedelta(seconds=120), is_pure=True) def _is_up(host, port) -> bool: return is_server_up(host=host, port=port) def _is_reachable(url: str) -> bool: host, port = host_and_port(url) return _is_up(host, port)
[docs] def load_inventory_file(file: str, root: SpeasyIndex): import yaml with open(file, 'r') as f: entries = yaml.safe_load(f) for name, entry in entries.items(): path = f"{entry['inventory_path']}/{name}" parent = get_or_make_node(entry['inventory_path'], root) entry_meta = {"spz_ga_cfg": entry} entry_meta['spz_ga_cfg']['use_file_list'] = entry_meta['spz_ga_cfg'].get('use_file_list', False) if is_local_file(entry['master_cdf']) or _is_reachable(entry['master_cdf']): dataset = make_dataset_index(entry['master_cdf'], name=name, uid=path, provider='archive', meta=entry_meta, params_uid_format=f"{path}/{{var_name}}", params_meta=entry_meta) if dataset: parent.__dict__[dataset.spz_name()] = dataset else: log.warning(f"Master CDF {entry['master_cdf']} is not available, skipping dataset {name}")
[docs] class GenericArchive(DataProvider): def __init__(self): DataProvider.__init__(self, provider_name='archive', provider_alt_names=['generic_archive', 'file'], inventory_disable_proxy=True)
[docs] def build_inventory(self, root: SpeasyIndex): from glob import glob lookup_dirs = cfg.extra_inventory_lookup_dirs.get() lookup_dirs.add(_global_inventory_dir()) lookup_dirs.add(user_inventory_dir()) for lookup_dir in lookup_dirs: for file in glob(f"{lookup_dir}/*.y*ml"): load_inventory_file(file, root) return root
def _parameter_index(self, product: str or ParameterIndex) -> ParameterIndex: if type(product) is str: if product in self.flat_inventory.parameters: return self.flat_inventory.parameters[product] else: raise ValueError(f"Unknown product {product}") elif isinstance(product, ParameterIndex): return product else: raise ValueError(f"Got unexpected type {type(product)}, expecting str or ParameterIndex")
[docs] @AllowedKwargs(GET_DATA_ALLOWED_KWARGS + CACHE_ALLOWED_KWARGS + ['force_refresh']) def get_data(self, product: str or ParameterIndex, start_time: AnyDateTimeType, stop_time: AnyDateTimeType, **kwargs) -> Optional[SpeasyVariable]: var = self._get_data(product=self._parameter_index(product), start_time=start_time, stop_time=stop_time, **kwargs) return var
def _get_data(self, product: ParameterIndex, start_time: AnyDateTimeType, stop_time: AnyDateTimeType, **kwargs) -> \ Optional[ SpeasyVariable]: ga_cfg: dict = getattr(product, 'spz_ga_cfg') return get_product(**ga_cfg, variable=product.spz_name(), start_time=start_time, stop_time=stop_time, **kwargs)