Source code for speasy.core.hapi.client

from enum import Enum
import io
from json import JSONDecodeError
from typing import Dict, List, Mapping, Optional
from urllib.parse import urlencode

from speasy.core import http
from speasy.core.hapi.parser import _parse_hapi_csv
from speasy.products.variable import SpeasyVariable

from .exceptions import (
    HapiRequestError, HapiServerError, HapiNoData
)


[docs] class HapiEndpoint(Enum): CAPABILITIES = "capabilities" CATALOG = "catalog" ABOUT = "about" INFO = "info" DATA = "data"
def _fetch_response(url: str): response = http.get(url) _check_response(response) return response def _check_hapi_status(data: Dict) -> None: code = data["status"]["code"] message = data["status"]["message"] if code == 1201: raise HapiNoData() elif 1400 <= code < 1500: raise HapiRequestError(code, message) elif code >= 1500: raise HapiServerError(code, message) def _check_http_status(status_code: int, text: str) -> None: if 400 <= status_code < 500: raise HapiRequestError(status_code, text) elif status_code >= 500: raise HapiServerError(status_code, text) def _check_response(response) -> None: try: _check_hapi_status(response.json()) except (JSONDecodeError, KeyError): _check_http_status(response.status_code, response.text)
[docs] class HapiClient: def __init__(self, server_url: str): self.server_url = server_url.rstrip('/') self._dataset_param_name = self._init_dataset_param_name() def _init_dataset_param_name(self) -> str: """ Starting from HAPI-3.0, 'id' parameter becomes 'dataset' set by major version number """ version = self.get_capabilities().get("HAPI") if not version: raise RuntimeError("HAPI version not provided by server") major = int(version.split('.')[0]) if major == 2: return "id" elif major == 3: return "dataset" raise RuntimeError(f"Unsupported HAPI version: {version}") def _fetch_variables(self, query_parameters: Dict) -> Mapping[str, SpeasyVariable]: parameters = query_parameters.get("parameters", []) url = self._build_url(HapiEndpoint.DATA, query_parameters) f = io.BytesIO(_fetch_response(url).text.encode("utf-8")) return _parse_hapi_csv(f, parameters) def _build_url( self, endpoint: Optional[HapiEndpoint] = None, query_parameters: Optional[Dict] = None ) -> str: base = f"{self.server_url}/hapi" url = f"{base}/{endpoint.value}" if endpoint else base # Flatten "parameters" into a comma-separated query string if query_parameters: query_params_copy = query_parameters.copy() parameters = query_params_copy.get("parameters") if parameters: query_params_copy["parameters"] = ",".join(parameters) else: query_params_copy.pop("parameters", None) url = f"{url}?{urlencode(query_params_copy)}" return url def _endpoint_to_json( self, endpoint: HapiEndpoint, query_parameters: Optional[Dict] = None ) -> Dict: url = self._build_url(endpoint, query_parameters) return _fetch_response(url).json()
[docs] def get_hapi(self) -> str: url = self._build_url() with http.urlopen(url) as response: html_page = response.text return html_page
[docs] def get_capabilities(self) -> Dict: return self._endpoint_to_json(HapiEndpoint.CAPABILITIES)
[docs] def get_catalog(self) -> Dict: return self._endpoint_to_json(HapiEndpoint.CATALOG)
[docs] def get_about(self) -> Dict: return self._endpoint_to_json(HapiEndpoint.ABOUT)
[docs] def get_info(self, dataset: str, parameters: Optional[List[str]] = None) -> Dict: query_params = { self._dataset_param_name: dataset, } if parameters is not None: query_params["parameters"] = parameters # List[str], will be flatten in _build_url return self._endpoint_to_json(HapiEndpoint.INFO, query_params)
[docs] def get_data( self, dataset: str, start: str, stop: str, parameters: List[str] ) -> Mapping[str, SpeasyVariable]: query_params = { self._dataset_param_name: dataset, "parameters": parameters, "start": start, "stop": stop, "format": "csv", "include": "header", } return self._fetch_variables(query_params)