Source code for speasy.webservices.cda

# -*- coding: utf-8 -*-

"""CDA_Webservice package for Space Physics WebServices Client."""

__author__ = """Alexis Jeandet"""
__email__ = ''
__version__ = '0.1.0'

import logging
import re
from datetime import datetime, timedelta
from typing import Dict, Optional, Tuple

from speasy.core import AllowedKwargs
from speasy.core import any_files, http, url_utils
from speasy.core import cdf
from speasy.core.cache import CACHE_ALLOWED_KWARGS, UnversionedProviderCache
from speasy.core.dataprovider import (GET_DATA_ALLOWED_KWARGS, DataProvider,
from speasy.core.datetime_range import DateTimeRange
from speasy.core.inventory.indexes import (DatasetIndex, ParameterIndex,
from speasy.core.proxy import PROXY_ALLOWED_KWARGS, GetProduct, Proxyfiable
from speasy.core.requests_scheduling import SplitLargeRequests
from speasy.products.variable import SpeasyVariable

log = logging.getLogger(__name__)

_burst_regex = re.compile("(.*MMS.*FPI.*BRST.*|.*MMS.*SCM.*BRST.*)")

def _is_burst_product(product: ParameterIndex or str) -> bool:
    if isinstance(product, ParameterIndex):
        product = product.spz_uid()
    return bool(_burst_regex.match(str(product)))

def _large_request_max_duration(product):
    if _is_burst_product(product):
        return timedelta(hours=2)
        return timedelta(days=7)

def _cache_fragment_size(product):
    if _is_burst_product(product):
        return 2
        return 12

[docs] class CdaWebException(BaseException): def __init__(self, text): super(CdaWebException, self).__init__(text)
[docs] def get_parameter_args(start_time: datetime, stop_time: datetime, product: str, **kwargs): return {'path': f"cdaweb/{product}", 'start_time': f'{start_time.isoformat()}', 'stop_time': f'{stop_time.isoformat()}'}
[docs] class CDA_Webservice(DataProvider): BASE_URL = "" def __init__(self): self.__url = f"{self.BASE_URL}/WS/cdasr/1" DataProvider.__init__(self, provider_name='cda', provider_alt_names=['cdaweb'])
[docs] def build_inventory(self, root: SpeasyIndex): from ._inventory_builder import build_inventory root = build_inventory(root=root) return root
[docs] def parameter_range(self, parameter_id: str or ParameterIndex) -> Optional[DateTimeRange]: """Get product time range. Parameters ---------- parameter_id: str or ParameterIndex parameter id Returns ------- Optional[DateTimeRange] Data time range Examples -------- >>> import speasy as spz >>> spz.cda.parameter_range("AC_H0_MFI/BGSEc") <DateTimeRange: 1997-09-02T00:00:12+00:00 -> ...> """ return self._parameter_range(parameter_id)
[docs] def dataset_range(self, dataset_id: str or DatasetIndex) -> Optional[DateTimeRange]: """Get product time range. Parameters ---------- dataset_id: str or DatasetIndex parameter id Returns ------- Optional[DateTimeRange] Data time range Examples -------- >>> import speasy as spz >>> spz.cda.dataset_range("AC_H0_MFI") <DateTimeRange: 1997-09-02T00:00:12+00:00 -> ...> """ return self._dataset_range(dataset_id)
def _to_dataset_and_variable(self, index_or_str: ParameterIndex or str) -> Tuple[str, str]: if isinstance(index_or_str, ParameterIndex): index_or_str = index_or_str.spz_uid() if type(index_or_str) is str: if '/' in index_or_str: parts = index_or_str.split('/') if len(parts) == 2: return parts[0], parts[1] for pos in range(1, len(parts)): ds = '/'.join(parts[:pos]) var = '/'.join(parts[pos:]) if (ds in self.flat_inventory.datasets) and (index_or_str in self.flat_inventory.parameters): return ds, var raise ValueError( f"Given string is ambiguous, it contains several '/', tried all combinations but failed to find a matching dataset/variable pair in inventory: {index_or_str}") raise ValueError(f"Given string does not look like a CDA dataset/variable pair: {index_or_str}") raise TypeError(f"Wrong type for {index_or_str}, expecting a string or a SpeasyIndex, got {type(index_or_str)}") def _dl_variable(self, dataset: str, variable: str, start_time: datetime, stop_time: datetime, if_newer_than: datetime or None = None, extra_http_headers: Dict or None = None) -> Optional[ SpeasyVariable]: start_time, stop_time = start_time.strftime('%Y%m%dT%H%M%SZ'), stop_time.strftime('%Y%m%dT%H%M%SZ') fmt = "cdf" url = f"{self.__url}/dataviews/sp_phys/datasets/{url_utils.quote(dataset, safe='')}/data/{start_time},{stop_time}/{url_utils.quote(variable, safe='')}?format={fmt}" headers = {"Accept": "application/json"} if if_newer_than is not None: headers["If-Modified-Since"] = if_newer_than.ctime() if extra_http_headers is not None: headers.update(extra_http_headers) resp = http.get(url, headers=headers) log.debug(resp.url) if resp.status_code == 200 and 'FileDescription' in resp.json(): return cdf.load_variable(file=resp.json()['FileDescription'][0]['Name'], variable=variable) elif not resp.ok: if resp.status_code == 404 and "No data available" in resp.json().get('Message', [""])[0]: log.warning(f"Got 404 'No data available' from CDAWeb with {url}") return None raise CdaWebException(f'Failed to get data with request: {url}, got {resp.status_code} HTTP response') else: return None
[docs] @AllowedKwargs( PROXY_ALLOWED_KWARGS + CACHE_ALLOWED_KWARGS + GET_DATA_ALLOWED_KWARGS + ['if_newer_than']) @ParameterRangeCheck() @UnversionedProviderCache(prefix="cda", fragment_hours=_cache_fragment_size, cache_retention=timedelta(days=7)) @SplitLargeRequests(threshold=_large_request_max_duration) @Proxyfiable(GetProduct, get_parameter_args) def get_data(self, product, start_time: datetime, stop_time: datetime, if_newer_than: datetime or None = None, extra_http_headers: Dict or None = None): dataset, variable = self._to_dataset_and_variable(product) return self._dl_variable(start_time=start_time, stop_time=stop_time, dataset=dataset, variable=variable, if_newer_than=if_newer_than, extra_http_headers=extra_http_headers)
[docs] def get_variable(self, dataset: str, variable: str, start_time: datetime or str, stop_time: datetime or str, **kwargs) -> \ Optional[SpeasyVariable]: return self.get_data(f"{dataset}/{variable}", start_time, stop_time, **kwargs)