Source code for speasy.webservices.csa

import io
import logging
import tarfile
from datetime import datetime, timedelta
from tempfile import TemporaryDirectory
from typing import Optional, Tuple, Dict

from astroquery.utils.tap.core import TapPlus

from speasy.core import any_files, AllowedKwargs, fix_name
from speasy.core import cdf
from speasy.core.cache import Cacheable, CACHE_ALLOWED_KWARGS  # _cache is used for tests (hack...)
from speasy.core.dataprovider import DataProvider, ParameterRangeCheck, GET_DATA_ALLOWED_KWARGS
from speasy.core.datetime_range import DateTimeRange
from speasy.core.inventory.indexes import ParameterIndex, DatasetIndex, SpeasyIndex, make_inventory_node
from speasy.core.proxy import Proxyfiable, GetProduct, PROXY_ALLOWED_KWARGS
from speasy.core.requests_scheduling import SplitLargeRequests
from speasy.core.url_utils import build_url
from speasy.products.variable import SpeasyVariable

log = logging.getLogger(__name__)


[docs] def to_dataset_and_variable(index_or_str: ParameterIndex or str) -> Tuple[str, str]: if type(index_or_str) is str: parts = index_or_str.split('/') elif isinstance(index_or_str, ParameterIndex): parts = index_or_str.product.split('/') else: raise TypeError(f"given parameter {index_or_str} of type {type(index_or_str)} is not a compatible index") assert len(parts) == 2 return parts[0], parts[1]
[docs] def register_dataset(instruments, datasets, dataset): meta = {cname: dataset[cname] for cname in dataset.colnames} meta['stop_date'] = meta.pop('end_date') name = fix_name(meta['dataset_id']) node = make_inventory_node(instruments[dataset['instruments']], DatasetIndex, name=name, provider="csa", uid=meta['dataset_id'], **meta) datasets[meta['dataset_id']] = node
[docs] def register_observatory(missions, observatories, observatory): meta = {cname: observatory[cname] for cname in observatory.colnames} name = meta.pop('name') node = make_inventory_node(missions[observatory['mission_name']], SpeasyIndex, name=fix_name(name), provider="csa", uid=name, **meta) observatories[name] = node
[docs] def register_mission(inventory_tree, missions, mission): meta = {cname: mission[cname] for cname in mission.colnames} name = meta.pop('name') node = make_inventory_node(inventory_tree, SpeasyIndex, name=fix_name(name), provider="csa", uid=name, **meta) missions[name] = node
[docs] def register_instrument(observatories, instruments, instrument): meta = {cname: instrument[cname] for cname in instrument.colnames} name = meta.pop('name') node = make_inventory_node(observatories.get(instrument['observatories'], observatories['MULTIPLE']), SpeasyIndex, name=fix_name(name), provider="csa", uid=name, **meta) instruments[name] = node
[docs] def register_param(datasets, parameter): parent_dataset = datasets.get(parameter["dataset_id"], None) if parent_dataset is not None: meta = {cname: parameter[cname] for cname in parameter.colnames} meta['dataset'] = parameter["dataset_id"] meta['start_date'] = parent_dataset.start_date meta['stop_date'] = parent_dataset.stop_date name = fix_name(meta['parameter_id']) make_inventory_node(parent_dataset, ParameterIndex, name=name, provider="csa", uid=f"{parameter['dataset_id']}/{parameter['parameter_id']}", **meta)
[docs] def build_inventory(root: SpeasyIndex, tapurl="https://csa.esac.esa.int/csa-sl-tap/tap/"): CSA = TapPlus(url=tapurl) missions_req = CSA.launch_job_async("SELECT * FROM csa.v_mission") observatories_req = CSA.launch_job_async("SELECT * FROM csa.v_observatory") instruments_req = CSA.launch_job_async("SELECT * FROM csa.v_instrument") datasets_req = CSA.launch_job_async("SELECT * FROM csa.v_dataset WHERE is_cef='true' AND is_istp='true'") parameters_req = CSA.launch_job_async("SELECT * FROM csa.v_parameter WHERE data_type='Data'") missions = {} observatories = {} instruments = {} datasets = {} list(map(lambda m: register_mission(root, missions, m), missions_req.get_results())) list(map(lambda o: register_observatory(missions, observatories, o), observatories_req.get_results())) list(map(lambda i: register_instrument(observatories, instruments, i), instruments_req.get_results())) list(map(lambda d: register_dataset(instruments, datasets, d), datasets_req.get_results())) list(map(lambda p: register_param(datasets, p), parameters_req.get_results())) return root
def _load_variable(archive: io.BytesIO, variable: str) -> SpeasyVariable: with tarfile.open(fileobj=archive) as tar: tarname = tar.getnames() if len(tarname): with TemporaryDirectory() as tmp_dir: tar.extractall(tmp_dir) return cdf.load_variable(file=f"{tmp_dir}/{tarname[0]}", variable=variable)
[docs] def get_parameter_args(start_time: datetime, stop_time: datetime, product: str, **kwargs): return {'path': f"csa/{product}", 'start_time': f'{start_time.isoformat()}', 'stop_time': f'{stop_time.isoformat()}'}
[docs] class CSA_Webservice(DataProvider): BASE_URL = "https://csa.esac.esa.int" def __init__(self): DataProvider.__init__(self, provider_name='csa') self.__url = f"{self.BASE_URL}/csa-sl-tap/data" def _dataset_range(self, dataset: str or DatasetIndex) -> DateTimeRange: if type(dataset) is str: dataset = self.flat_inventory.datasets[dataset] return DateTimeRange(dataset.start_date, dataset.stop_date) def _dl_variable(self, dataset: str, variable: str, start_time: datetime, stop_time: datetime, extra_http_headers: Dict[str, str] or None = None) -> \ Optional[SpeasyVariable]: # https://csa.esac.esa.int/csa-sl-tap/data?RETRIEVAL_TYPE=product&&DATASET_ID=C3_CP_PEA_LERL_DEFlux&START_DATE=2001-06-10T22:12:14Z&END_DATE=2001-06-11T06:12:14Z&DELIVERY_FORMAT=CDF_ISTP&DELIVERY_INTERVAL=all ds_range = self._dataset_range(dataset) if not ds_range.intersect(DateTimeRange(start_time, stop_time)): log.warning(f"You are requesting {dataset}/{variable} outside of its definition range {ds_range}") return None headers = {} if extra_http_headers is not None: headers.update(extra_http_headers) return _load_variable( any_files.any_loc_open( build_url(base=self.__url, parameters={ "RETRIEVAL_TYPE": "product", "DATASET_ID": dataset, "START_DATE": start_time.strftime('%Y-%m-%dT%H:%M:%SZ'), "END_DATE": stop_time.strftime('%Y-%m-%dT%H:%M:%SZ'), "DELIVERY_FORMAT": "CDF_ISTP", "DELIVERY_INTERVAL": "all" }), headers=headers), variable)
[docs] @staticmethod def build_inventory(root: SpeasyIndex): return build_inventory(root)
[docs] def parameter_range(self, parameter_id: str or ParameterIndex) -> Optional[DateTimeRange]: """Get product time range. Parameters ---------- parameter_id: str or ParameterIndex parameter id Returns ------- Optional[DateTimeRange] Data time range Examples -------- >>> import speasy as spz >>> spz.csa.parameter_range("C3_CP_WBD_WAVEFORM_BM2/B__C3_CP_WBD_WAVEFORM_BM2") <DateTimeRange: 2001-03-07T17:45:22+00:00 -> ...> """ return self._parameter_range(parameter_id)
[docs] def dataset_range(self, dataset_id: str or DatasetIndex) -> Optional[DateTimeRange]: """Get product time range. Parameters ---------- dataset_id: str or DatasetIndex parameter id Returns ------- Optional[DateTimeRange] Data time range Examples -------- >>> import speasy as spz >>> spz.csa.dataset_range("D2_CP_FGM_SPIN") <DateTimeRange: 2004-07-27T00:00:00+00:00 -> ...> """ return self._dataset_range(dataset_id)
[docs] def product_last_update(self, product: str or ParameterIndex): """Get date of last modification of dataset or parameter. Parameters ---------- product: str or ParameterIndex product Returns ------- str product last update date """ dataset, variable = to_dataset_and_variable(product) return self.flat_inventory.datasets[dataset].date_last_update
[docs] @AllowedKwargs(PROXY_ALLOWED_KWARGS + CACHE_ALLOWED_KWARGS + GET_DATA_ALLOWED_KWARGS) @ParameterRangeCheck() @Cacheable(prefix="csa", fragment_hours=lambda x: 12, version=product_last_update) @SplitLargeRequests(threshold=lambda x: timedelta(days=7)) @Proxyfiable(GetProduct, get_parameter_args) def get_data(self, product, start_time: datetime, stop_time: datetime, extra_http_headers: Dict[str, str] or None = None): dataset, variable = to_dataset_and_variable(product) return self._dl_variable(start_time=start_time, stop_time=stop_time, dataset=dataset, variable=variable, extra_http_headers=extra_http_headers)
[docs] def get_variable(self, dataset: str, variable: str, start_time: datetime or str, stop_time: datetime or str, **kwargs) -> \ Optional[SpeasyVariable]: return self.get_data(f"{dataset}/{variable}", start_time, stop_time, **kwargs)