# pylint: disable=missing-docstring, invalid-name,line-too-long
import datetime
import json
import os
import shutil
import sys
import uuid
from collections import OrderedDict
from io import StringIO
from textwrap import dedent
from warnings import warn
import h5py
from numpy import asarray
from pandas import read_csv
from tqdm import tqdm
from ._client import DEFAULT_LOGGING_LEVEL, TEMPLATE_FILES, ClientRequest, WPSInputs
from ._data import CONFIG_SWARM
from ._data_handling import ReturnedDataFile
from ._wps.environment import JINJA2_ENVIRONMENT
from ._wps.multipart import generate_multipart_request
from ._wps.time_util import parse_datetime
TEMPLATE_FILES = {
**TEMPLATE_FILES,
"sync": "vires_fetch_filtered_data.xml",
"async": "vires_fetch_filtered_data_async.xml",
"model_info": "vires_get_model_info.xml",
"times_from_orbits": "vires_times_from_orbits.xml",
"get_observatories": "vires_get_observatories.xml",
"get_conjunctions": "vires_get_conjunctions.xml",
"get_collection_info": "vires_get_collection_info.xml",
"eval_model_mp": "model_eval_multipart_payload.xml",
}
REFERENCES = {
"General Swarm": (
" Swarm Data Handbook, https://swarmhandbook.earth.esa.int/ ",
" The Swarm Satellite Constellation Application and Research Facility (SCARF) and Swarm data products, https://doi.org/10.5047/eps.2013.07.001 ",
" Swarm Science Data Processing and Products (2013), https://link.springer.com/journal/40623/65/11/page/1 ",
" Special issue “Swarm science results after 2 years in space (2016), https://www.springeropen.com/collections/swsr ",
" Earth's Magnetic Field: Understanding Geomagnetic Sources from the Earth's Interior and its Environment (2017), https://link.springer.com/journal/11214/206/1/page/1 ",
)
}
MODEL_REFERENCES = {
"IGRF": (
" International Geomagnetic Reference Field 14 (https://doi.org/10.5281/zenodo.14012302) ",
" https://www.ncei.noaa.gov/products/international-geomagnetic-reference-field ",
),
"CHAOS-Core": (
"CHAOS Core field (SH degrees 1-20)",
" http://www.spacecenter.dk/files/magnetic-models/CHAOS-8/ ",
),
"CHAOS-Static": (
"CHAOS crust field (SH degrees 21-185)",
" http://www.spacecenter.dk/files/magnetic-models/CHAOS-8/ ",
),
"CHAOS-MMA-Primary": (
"CHAOS Primary (external) magnetospheric field",
" http://www.spacecenter.dk/files/magnetic-models/CHAOS-8/ ",
),
"CHAOS-MMA-Secondary": (
"CHAOS Secondary (internal) magnetospheric field",
" http://www.spacecenter.dk/files/magnetic-models/CHAOS-8/ ",
),
"CHAOS-MIO": (
"CHAOS Ionospheric field",
" http://www.spacecenter.dk/files/magnetic-models/CHAOS-8/ ",
),
"MF7": (
"MF7 crustal field model, derived from CHAMP satellite observations",
" http://geomag.org/models/MF7.html",
),
"LCS-1": (
"The LCS-1 high-resolution lithospheric field model, derived from CHAMP and Swarm satellite observations",
" http://www.spacecenter.dk/files/magnetic-models/LCS-1/",
),
"MCO_SHA_2C": (
"[Comprehensive Inversion]: Core field",
"https://swarmhandbook.earth.esa.int/catalogue/sw_mco_sha_2c",
),
"MCO_SHA_2D": (
"[Dedicated Chain]: Core field",
"https://swarmhandbook.earth.esa.int/catalogue/sw_mco_sha_2d",
),
"MLI_SHA_2C": (
"[Comprehensive Inversion]: Lithospheric field",
"https://swarmhandbook.earth.esa.int/catalogue/sw_mli_sha_2c",
),
"MLI_SHA_2D": (
"[Dedicated Chain]: Lithospheric field",
"https://swarmhandbook.earth.esa.int/catalogue/sw_mli_sha_2d",
),
"MLI_SHA_2E": (
"[Extended dedicated chain]: Lithospheric field",
"https://swarmhandbook.earth.esa.int/catalogue/sw_mli_sha_2e",
),
"MMA_SHA_2C-Primary": (
"[Comprehensive Inversion]: Primary (external) magnetospheric field",
"https://swarmhandbook.earth.esa.int/catalogue/sw_mma_sha_2c",
),
"MMA_SHA_2C-Secondary": (
"https://swarmhandbook.earth.esa.int/catalogue/sw_mma_sha_2c",
"[Comprehensive Inversion]: Secondary (internal/induced) magnetospheric field",
),
"MMA_SHA_2F-Primary": (
"[Fast-Track Product]: Primary (external) magnetospheric field",
"https://swarmhandbook.earth.esa.int/catalogue/sw_mma_sha_2f",
),
"MMA_SHA_2F-Secondary": (
"[Fast-Track Product]: Secondary (internal/induced) magnetospheric field",
"https://swarmhandbook.earth.esa.int/catalogue/sw_mma_sha_2f",
),
"MIO_SHA_2C-Primary": (
"[Comprehensive Inversion]: Primary (external) ionospheric field",
"https://swarmhandbook.earth.esa.int/catalogue/sw_mio_sha_2c",
),
"MIO_SHA_2C-Secondary": (
"[Comprehensive Inversion]: Secondary (external/induced) ionospheric field",
"https://swarmhandbook.earth.esa.int/catalogue/sw_mio_sha_2c",
),
"MIO_SHA_2D-Primary": (
"[Dedicated Chain]: Primary (external) ionospheric field, DIFI",
"https://swarmhandbook.earth.esa.int/catalogue/sw_mio_sha_2d",
),
"MIO_SHA_2D-Secondary": (
"[Dedicated Chain]: Secondary (external/induced) ionospheric field, DIFI",
"https://swarmhandbook.earth.esa.int/catalogue/sw_mio_sha_2d",
),
"AMPS": (
"AMPS - Polar currents magnetic field, https://github.com/klaundal/pyAMPS",
"https://swarmhandbook.earth.esa.int/catalogue/sw_mio_sha_2e",
),
"MCO_SHA_2X": (
"Alias for 'CHAOS-Core'",
"https://swarmhandbook.earth.esa.int/catalogue/sw_mco_sha_2x",
),
"CHAOS": (
"Alias for 'CHAOS-Core' + 'CHAOS-Static' + 'CHAOS-MMA-Primary' + 'CHAOS-MMA-Secondary'",
"https://www.spacecenter.dk/files/magnetic-models/CHAOS-8/",
),
"CHAOS-MMA": (
"Alias for 'CHAOS-MMA-Primary' + 'CHAOS-MMA-Secondary'",
"https://www.spacecenter.dk/files/magnetic-models/CHAOS-8/",
),
"MMA_SHA_2C": (
"Alias for 'MMA_SHA_2C-Primary' + 'MMA_SHA_2C-Secondary'",
"https://swarmhandbook.earth.esa.int/catalogue/sw_mma_sha_2c",
),
"MMA_SHA_2F": (
"Alias for 'MMA_SHA_2F-Primary' + 'MMA_SHA_2F-Secondary'",
"https://swarmhandbook.earth.esa.int/catalogue/sw_mma_sha_2f",
),
"MIO_SHA_2C": (
"Alias for 'MIO_SHA_2C-Primary' + 'MIO_SHA_2C-Secondary'",
"https://swarmhandbook.earth.esa.int/catalogue/sw_mio_sha_2c",
),
"MIO_SHA_2D": (
"Alias for 'MIO_SHA_2D-Primary' + 'MIO_SHA_2D-Secondary'",
"https://swarmhandbook.earth.esa.int/catalogue/sw_mio_sha_2d",
),
"SwarmCI": (
"Alias for 'MCO_SHA_2C' + 'MLI_SHA_2C' + 'MIO_SHA_2C-Primary' + 'MIO_SHA_2C-Secondary' + 'MMA_SHA_2C-Primary' + 'MMA_SHA_2C-Secondary'",
"https://swarmhandbook.earth.esa.int/catalogue/sw_mco_sha_2c",
),
}
DEPRECATED_MODELS = {}
COLLECTION_REFERENCES = {
"MAG": (" https://swarmhandbook.earth.esa.int/catalogue/sw_magx_lr_1b ",),
"MAG_HR": (" https://swarmhandbook.earth.esa.int/catalogue/sw_magx_hr_1b ",),
"EFI": (" https://swarmhandbook.earth.esa.int/catalogue/sw_efix_lp_1b ",),
"EFI:B06": (" https://swarmhandbook.earth.esa.int/catalogue/sw_efix_lp_1b ",),
"IBI": (" https://swarmhandbook.earth.esa.int/catalogue/sw_ibixtms_2f ",),
"TEC": (" https://swarmhandbook.earth.esa.int/catalogue/sw_tecxtms_2f ",),
"FAC": (" https://swarmhandbook.earth.esa.int/catalogue/sw_facxtms_2f ",),
"EEF": (" https://swarmhandbook.earth.esa.int/catalogue/sw_eefxtms_2f ",),
"IPD": (" https://swarmhandbook.earth.esa.int/catalogue/sw_ipdxirr_2f ",),
"AUX_OBSH": ("https://doi.org/10.5047/eps.2013.07.011",),
"AUX_OBSM": ("https://doi.org/10.5047/eps.2013.07.011",),
"AUX_OBSS": ("https://doi.org/10.5047/eps.2013.07.011",),
"VOBS_SW_1M": ("https://swarmhandbook.earth.esa.int/catalogue/sw_vobs_1m_2_",),
"AEJ_LPL": ("https://swarmhandbook.earth.esa.int/catalogue/sw_aejxlpl_2f",),
"AEJ_LPS": ("https://swarmhandbook.earth.esa.int/catalogue/sw_aejxlps_2f",),
"AEJ_PBL": ("https://swarmhandbook.earth.esa.int/catalogue/sw_aejxpbl_2f",),
"AEJ_PBS": ("https://swarmhandbook.earth.esa.int/catalogue/sw_aejxpbs_2f",),
"AOB_FAC": ("https://swarmhandbook.earth.esa.int/catalogue/sw_aobxfac_2f",),
"MIT_LP": ("https://swarmhandbook.earth.esa.int/catalogue/sw_mitx_lp_2f",),
"MIT_TEC": ("https://swarmhandbook.earth.esa.int/catalogue/sw_mitxtec_2f",),
"PPI_FAC": ("https://swarmhandbook.earth.esa.int/catalogue/sw_ppixfac_2f",),
"MAG_CHAMP": ("https://doi.org/10.5880/GFZ.2.3.2019.004",),
"MAG_CS": (
"https://swarmhandbook.earth.esa.int/catalogue/cs_mag",
"https://doi.org/10.1186/s40623-020-01171-9",
),
"MAG_GRACE": (
"https://swarmhandbook.earth.esa.int/catalogue/grace_x_mag",
"https://doi.org/10.1186/s40623-021-01373-9",
),
"MAG_GFO": (
"https://swarmhandbook.earth.esa.int/catalogue/gfx_fgm_acal_corr",
"https://doi.org/10.1186/s40623-021-01364-w",
),
"MAG_GFO_ML": ("https://doi.org/10.5880/GFZ.2.3.2023.001",),
"EFI_IDM": ("https://swarmhandbook.earth.esa.int/catalogue/sw_efixidm_2_",),
"MAG_GOCE": (
"https://swarmhandbook.earth.esa.int/catalogue/go_mag_acal_corr",
"https://doi.org/10.5880/GFZ.2.3.2022.001",
),
"MAG_GOCE_ML": ("https://doi.org/10.5880/GFZ.2.3.2022.002",),
"EFI_TIE": ("https://swarmhandbook.earth.esa.int/catalogue/sw_efixtie_2_",),
"EFI_TCT02": ("https://swarmhandbook.earth.esa.int/catalogue/sw_efix_tct02",),
"EFI_TCT16": ("https://swarmhandbook.earth.esa.int/catalogue/sw_efix_tct16",),
"DNS_POD": ("https://swarmhandbook.earth.esa.int/catalogue/SW_DNSxPOD_2_",),
"DNS_ACC": ("https://swarmhandbook.earth.esa.int/catalogue/SW_DNSxACC_2_",),
"DNS_ACC_CHAMP": ("https://swarmhandbook.earth.esa.int/catalogue/CH_DNS_ACC_2_",),
"DNS_ACC_GRACE": ("https://swarmhandbook.earth.esa.int/catalogue/GR_DNSxACC_2_",),
"DNS_ACC_GFO": ("https://swarmhandbook.earth.esa.int/catalogue/GF_DNSxACC_2_",),
"WND_ACC_CHAMP": ("https://swarmhandbook.earth.esa.int/catalogue/CH_WND_ACC_2_",),
"WND_ACC_GRACE": ("https://swarmhandbook.earth.esa.int/catalogue/GR_WNDxACC_2_",),
"WND_ACC_GFO": ("https://swarmhandbook.earth.esa.int/catalogue/GF_WNDxACC_2_",),
"MM_CON_EPH_2_": ("https://swarmhandbook.earth.esa.int/catalogue/MM_CON_EPH_2_",),
"NIX_TMS": ("https://swarmhandbook.earth.esa.int/catalogue/SW_NIX_TMS_2F",),
"TIX_TMS": ("https://swarmhandbook.earth.esa.int/catalogue/SW_TIX_TMS_2F",),
"TEC_TIRO": (
"https://swarmhandbook.earth.esa.int/catalogue/CH_TEC_TMS_2F",
"https://swarmhandbook.earth.esa.int/catalogue/GR_TECxTMS_2F",
"https://swarmhandbook.earth.esa.int/catalogue/GR_TECxTMS_2F",
),
"NE_TIRO": (
"https://swarmhandbook.earth.esa.int/catalogue/GR_NE__KBR_2F",
"https://swarmhandbook.earth.esa.int/catalogue/GF_NE__KBR_2F",
),
"ULF_MAG": ("https://swarmhandbook.earth.esa.int/catalogue/SW_ULFxMAG_2F",),
"PC1_MAG": ("https://swarmhandbook.earth.esa.int/catalogue/SW_PC1xMAG_2F",),
}
for mission in ("SW", "OR", "CH", "CR", "CO"):
for cadence in ("1M", "4M"):
COLLECTION_REFERENCES[f"VOBS_{mission}_{cadence}"] = (
"https://earth.esa.int/eogateway/activities/gvo",
)
DATA_CITATIONS = {
"AUX_OBSH": "https://auxobs-api.bgs.ac.uk/docs",
"AUX_OBSM": "https://auxobs-api.bgs.ac.uk/docs",
"AUX_OBSS": "https://auxobs-api.bgs.ac.uk/docs",
}
IAGA_CODES = CONFIG_SWARM.get("IAGA_CODES")
VOBS_SITES = CONFIG_SWARM.get("VOBS_SITES")
class SwarmWPSInputs(WPSInputs):
"""Holds the set of inputs to be passed to the request template for Swarm"""
NAMES = [
"collection_ids",
"model_expression",
"begin_time",
"end_time",
"variables",
"filters",
"sampling_step",
"response_type",
"custom_shc",
"ignore_cached_models",
"do_not_interpolate_models",
]
def __init__(
self,
collection_ids=None,
model_expression=None,
begin_time=None,
end_time=None,
variables=None,
filters=None,
sampling_step=None,
response_type=None,
custom_shc=None,
ignore_cached_models=False,
do_not_interpolate_models=False,
):
# Set up default values
# Obligatory - these must be replaced before the request is made
self.collection_ids = None if collection_ids is None else collection_ids
self.begin_time = None if begin_time is None else begin_time
self.end_time = None if end_time is None else end_time
self.response_type = None if response_type is None else response_type
# Optional - these defaults will be used if not replaced before the
# request is made
self.model_expression = "" if model_expression is None else model_expression
self.variables = [] if variables is None else variables
self.filters = None if filters is None else filters
self.sampling_step = None if sampling_step is None else sampling_step
self.custom_shc = None if custom_shc is None else custom_shc
self.ignore_cached_models = ignore_cached_models
self.do_not_interpolate_models = do_not_interpolate_models
@property
def collection_ids(self):
return self._collection_ids
@collection_ids.setter
def collection_ids(self, collection_ids):
if isinstance(collection_ids, dict) or collection_ids is None:
self._collection_ids = collection_ids
else:
raise TypeError("collection_ids must be a dict")
@staticmethod
def _spacecraft_from_collection(collection):
"""Identify spacecraft (or ground observatory name) from collection name."""
if "AUX_OBS" in collection or "VOBS" in collection:
name = collection
elif collection[:3] == "SW_":
# 12th character in name, e.g. SW_OPER_MAGx_LR_1B
sc = collection[11]
sc_to_name = {"A": "Alpha", "B": "Bravo", "C": "Charlie"}
name = sc_to_name.get(sc, "NSC")
else:
name = collection
return name
def set_collections(self, collections):
"""Restructure given list of collections as dict required by VirES."""
# Build the output dictionary in the form:
# {"Alpha": ["SW..A..", "SW..A.."], "Bravo": ["SW..B.."], "NSC": [..]}
if isinstance(collections, list):
collection_dict = {}
for collection in collections:
tag = self._spacecraft_from_collection(collection)
if tag in collection_dict.keys():
collection_dict[tag].append(collection)
else:
collection_dict[tag] = [collection]
self.collection_ids = collection_dict
else:
raise TypeError("collections must be a list")
@property
def model_expression(self):
return self._model_expression
@model_expression.setter
def model_expression(self, model_expression):
if isinstance(model_expression, str):
self._model_expression = model_expression
else:
raise TypeError("model_expression must be a string")
@property
def ignore_cached_models(self):
return self._ignore_cached_models
@ignore_cached_models.setter
def ignore_cached_models(self, value):
if isinstance(value, bool):
self._ignore_cached_models = value
else:
raise TypeError
@property
def do_not_interpolate_models(self):
return self._do_not_interpolate_models
@do_not_interpolate_models.setter
def do_not_interpolate_models(self, value):
if isinstance(value, bool):
self._do_not_interpolate_models = value
else:
raise TypeError
@property
def begin_time(self):
return self._begin_time
@begin_time.setter
def begin_time(self, begin_time):
if isinstance(begin_time, datetime.datetime) or begin_time is None:
self._begin_time = begin_time
else:
raise TypeError
@property
def end_time(self):
return self._end_time
@end_time.setter
def end_time(self, end_time):
if isinstance(end_time, datetime.datetime) or end_time is None:
self._end_time = end_time
else:
raise TypeError
@property
def variables(self):
return self._variables
@variables.setter
def variables(self, variables):
if isinstance(variables, list):
self._variables = variables
else:
raise TypeError
@property
def filters(self):
return self._filters
@filters.setter
def filters(self, filters):
if isinstance(filters, str) or filters is None:
self._filters = filters
else:
raise TypeError
@property
def sampling_step(self):
return self._sampling_step
@sampling_step.setter
def sampling_step(self, sampling_step):
if isinstance(sampling_step, str) or sampling_step is None:
self._sampling_step = sampling_step
else:
raise TypeError
@property
def response_type(self):
return self._response_type
@response_type.setter
def response_type(self, response_type):
if isinstance(response_type, str) or response_type is None:
self._response_type = response_type
else:
raise TypeError
@property
def custom_shc(self):
return self._custom_shc
@custom_shc.setter
def custom_shc(self, custom_shc):
if isinstance(custom_shc, str) or custom_shc is None:
self._custom_shc = custom_shc
else:
raise TypeError
[docs]
class SwarmRequest(ClientRequest):
"""Handles the requests to and downloads from the server.
Examples:
Retrieve data::
from viresclient import SwarmRequest
# Set up connection with server
request = SwarmRequest("https://vires.services/ows")
# Set collection to use
request.set_collection("SW_OPER_MAGA_LR_1B")
# Set mix of products to fetch:
# measurements (variables from the given collection)
# models (magnetic model predictions at spacecraft sampling points)
# auxiliaries (variables available with any collection)
request.set_products(
measurements=["F", "B_NEC"],
models=["CHAOS-Core"],
auxiliaries=["QDLat", "QDLon"],
sampling_step="PT10S"
)
# Fetch data from a given time interval
data = request.get_between(
start_time="2014-01-01T00:00",
end_time="2014-01-01T01:00"
)
# Load the data as an xarray.Dataset
ds = data.as_xarray()
Check what data are available::
request.available_collections(details=False)
request.available_measurements("MAG")
request.available_auxiliaries()
request.available_models(details=False)
Args:
url (str):
token (str):
config (str or ClientConfig):
logging_level (str):
"""
MISSION_SPACECRAFTS = {
"Swarm": ["A", "B", "C"],
"GRACE": ["1", "2"],
"GRACE-FO": ["1", "2"],
"CryoSat-2": None,
"GOCE": None,
}
CONJUNCTION_MISSION_SPACECRAFT_PAIRS = {
(("Swarm", "A"), ("Swarm", "B")),
}
FILE_OPTIONS = {
"MM_CON_EPH_2_:crossover": {
"time_variable": "time_1",
"secondary_time_variables": ["time_2"],
},
"MM_CON_EPH_2_:plane_alignment": {"time_variable": "time"},
}
COLLECTIONS = {
"MAG": [
*(f"SW_OPER_MAG{x}_LR_1B" for x in "ABC"),
*(f"SW_FAST_MAG{x}_LR_1B" for x in "ABC"),
],
"MAG_HR": [
*(f"SW_OPER_MAG{x}_HR_1B" for x in "ABC"),
*(f"SW_FAST_MAG{x}_HR_1B" for x in "ABC"),
],
"EFI": [
*(f"SW_OPER_EFI{x}_LP_1B" for x in "ABC"),
*(f"SW_FAST_EFI{x}_LP_1B" for x in "ABC"),
],
"EFI_IDM": [f"SW_PREL_EFI{x}IDM_2_" for x in "ABC"],
"EFI_TIE": [f"SW_OPER_EFI{x}TIE_2_" for x in "ABC"],
"EFI_TCT02": [f"SW_EXPT_EFI{x}_TCT02" for x in "ABC"],
"EFI_TCT16": [f"SW_EXPT_EFI{x}_TCT16" for x in "ABC"],
"IBI": [f"SW_OPER_IBI{x}TMS_2F" for x in "ABC"],
"TEC": [
*(f"SW_OPER_TEC{x}TMS_2F" for x in "ABC"),
*(f"SW_FAST_TEC{x}TMS_2F" for x in "ABC"),
],
"FAC": [
*(f"SW_OPER_FAC{x}TMS_2F" for x in "ABC_"),
*(f"SW_FAST_FAC{x}TMS_2F" for x in "ABC"),
],
"EEF": [f"SW_OPER_EEF{x}TMS_2F" for x in "ABC"],
"IPD": [f"SW_OPER_IPD{x}IRR_2F" for x in "ABC"],
"AEJ_LPL": [
*(f"SW_OPER_AEJ{x}LPL_2F" for x in "ABC"),
*(f"SW_FAST_AEJ{x}LPL_2F" for x in "ABC"),
],
"AEJ_LPL:Quality": [
*(f"SW_OPER_AEJ{x}LPL_2F:Quality" for x in "ABC"),
*(f"SW_FAST_AEJ{x}LPL_2F:Quality" for x in "ABC"),
],
"AEJ_LPS": [f"SW_OPER_AEJ{x}LPS_2F" for x in "ABC"],
"AEJ_LPS:Quality": [f"SW_OPER_AEJ{x}LPS_2F:Quality" for x in "ABC"],
"AEJ_PBL": [
*(f"SW_OPER_AEJ{x}PBL_2F" for x in "ABC"),
*(f"SW_FAST_AEJ{x}PBL_2F" for x in "ABC"),
],
"AEJ_PBS": [f"SW_OPER_AEJ{x}PBS_2F" for x in "ABC"],
"AEJ_PBS:GroundMagneticDisturbance": [
f"SW_OPER_AEJ{x}PBS_2F:GroundMagneticDisturbance" for x in "ABC"
],
"AOB_FAC": [
*(f"SW_OPER_AOB{x}FAC_2F" for x in "ABC"),
*(f"SW_FAST_AOB{x}FAC_2F" for x in "ABC"),
],
"AUX_OBSH": [
"SW_OPER_AUX_OBSH2_",
*[f"SW_OPER_AUX_OBSH2_:{code}" for code in IAGA_CODES],
],
"AUX_OBSM": [
"SW_OPER_AUX_OBSM2_",
*[f"SW_OPER_AUX_OBSM2_:{code}" for code in IAGA_CODES],
],
"AUX_OBSS": [
"SW_OPER_AUX_OBSS2_",
*[f"SW_OPER_AUX_OBSS2_:{code}" for code in IAGA_CODES],
],
"VOBS_SW_1M": [
"SW_OPER_VOBS_1M_2_",
*[f"SW_OPER_VOBS_1M_2_:{site}" for site in VOBS_SITES],
],
"VOBS_SW_4M": [
"SW_OPER_VOBS_4M_2_",
*[f"SW_OPER_VOBS_4M_2_:{site}" for site in VOBS_SITES],
],
"VOBS_CH_1M": [
"CH_OPER_VOBS_1M_2_",
*[f"CH_OPER_VOBS_1M_2_:{site}" for site in VOBS_SITES],
],
"VOBS_CR_1M": [
"CR_OPER_VOBS_1M_2_",
*[f"CR_OPER_VOBS_1M_2_:{site}" for site in VOBS_SITES],
],
"VOBS_OR_1M": [
"OR_OPER_VOBS_1M_2_",
*[f"OR_OPER_VOBS_1M_2_:{site}" for site in VOBS_SITES],
],
"VOBS_CO_1M": [
"CO_OPER_VOBS_1M_2_",
*[f"CO_OPER_VOBS_1M_2_:{site}" for site in VOBS_SITES],
],
"VOBS_OR_4M": [
"OR_OPER_VOBS_4M_2_",
*[f"OR_OPER_VOBS_4M_2_:{site}" for site in VOBS_SITES],
],
"VOBS_CH_4M": [
"CH_OPER_VOBS_4M_2_",
*[f"CH_OPER_VOBS_4M_2_:{site}" for site in VOBS_SITES],
],
"VOBS_CR_4M": [
"CR_OPER_VOBS_4M_2_",
*[f"CR_OPER_VOBS_4M_2_:{site}" for site in VOBS_SITES],
],
"VOBS_CO_4M": [
"CO_OPER_VOBS_4M_2_",
*[f"CO_OPER_VOBS_4M_2_:{site}" for site in VOBS_SITES],
],
"VOBS_SW_1M:SecularVariation": [
"SW_OPER_VOBS_1M_2_:SecularVariation",
*[f"SW_OPER_VOBS_1M_2_:SecularVariation:{site}" for site in VOBS_SITES],
],
"VOBS_SW_4M:SecularVariation": [
"SW_OPER_VOBS_4M_2_:SecularVariation",
*[f"SW_OPER_VOBS_4M_2_:SecularVariation:{site}" for site in VOBS_SITES],
],
"VOBS_CH_1M:SecularVariation": [
"CH_OPER_VOBS_1M_2_:SecularVariation",
*[f"CH_OPER_VOBS_1M_2_:SecularVariation:{site}" for site in VOBS_SITES],
],
"VOBS_CR_1M:SecularVariation": [
"CR_OPER_VOBS_1M_2_:SecularVariation",
*[f"CR_OPER_VOBS_1M_2_:SecularVariation:{site}" for site in VOBS_SITES],
],
"VOBS_OR_1M:SecularVariation": [
"OR_OPER_VOBS_1M_2_:SecularVariation",
*[f"OR_OPER_VOBS_1M_2_:SecularVariation:{site}" for site in VOBS_SITES],
],
"VOBS_CO_1M:SecularVariation": [
"CO_OPER_VOBS_1M_2_:SecularVariation",
*[f"CO_OPER_VOBS_1M_2_:SecularVariation:{site}" for site in VOBS_SITES],
],
"VOBS_OR_4M:SecularVariation": [
"OR_OPER_VOBS_4M_2_:SecularVariation",
*[f"OR_OPER_VOBS_4M_2_:SecularVariation:{site}" for site in VOBS_SITES],
],
"VOBS_CH_4M:SecularVariation": [
"CH_OPER_VOBS_4M_2_:SecularVariation",
*[f"CH_OPER_VOBS_4M_2_:SecularVariation:{site}" for site in VOBS_SITES],
],
"VOBS_CR_4M:SecularVariation": [
"CR_OPER_VOBS_4M_2_:SecularVariation",
*[f"CR_OPER_VOBS_4M_2_:SecularVariation:{site}" for site in VOBS_SITES],
],
"VOBS_CO_4M:SecularVariation": [
"CO_OPER_VOBS_4M_2_:SecularVariation",
*[f"CO_OPER_VOBS_4M_2_:SecularVariation:{site}" for site in VOBS_SITES],
],
"MIT_LP": [f"SW_OPER_MIT{x}_LP_2F" for x in "ABC"],
"MIT_LP:ID": [f"SW_OPER_MIT{x}_LP_2F:ID" for x in "ABC"],
"MIT_TEC": [f"SW_OPER_MIT{x}TEC_2F" for x in "ABC"],
"MIT_TEC:ID": [f"SW_OPER_MIT{x}TEC_2F:ID" for x in "ABC"],
"PPI_FAC": [f"SW_OPER_PPI{x}FAC_2F" for x in "ABC"],
"PPI_FAC:ID": [f"SW_OPER_PPI{x}FAC_2F:ID" for x in "ABC"],
# Multi-mission magnetic products
"MAG_CHAMP": ["CH_ME_MAG_LR_3"],
"MAG_CS": ["CS_OPER_MAG"],
"MAG_GRACE": ["GRACE_A_MAG", "GRACE_B_MAG"],
"MAG_GFO": ["GF1_OPER_FGM_ACAL_CORR", "GF2_OPER_FGM_ACAL_CORR"],
"MAG_GFO_ML": ["GF1_MAG_ACAL_CORR_ML", "GF2_MAG_ACAL_CORR_ML"],
"MAG_GOCE": ["GO_MAG_ACAL_CORR"],
"MAG_GOCE_ML": ["GO_MAG_ACAL_CORR_ML"],
# Multi-mission TEC and NE products
"TEC_TIRO": [
"CH_OPER_TEC_TMS_2F",
"GR_OPER_TEC1TMS_2F",
"GR_OPER_TEC2TMS_2F",
"GF_OPER_TEC1TMS_2F",
"GF_OPER_TEC2TMS_2F",
],
"NE_TIRO": [
"GR_OPER_NE__KBR_2F",
"GF_OPER_NE__KBR_2F",
],
# Swarm spacecraft positions
"MOD_SC": [
*(f"SW_OPER_MOD{x}_SC_1B" for x in "ABC"),
*(f"SW_FAST_MOD{x}_SC_1B" for x in "ABC"),
],
# Swarm thermospheric density products:
"DNS_POD": [f"SW_OPER_DNS{spacecraft}POD_2_" for spacecraft in "ABC"],
"DNS_ACC": [f"SW_OPER_DNS{spacecraft}ACC_2_" for spacecraft in "ABC"],
# TOLEOS thermospheric density and crosswind products:
"DNS_ACC_CHAMP": ["CH_OPER_DNS_ACC_2_"],
"DNS_ACC_GRACE": ["GR_OPER_DNS1ACC_2_", "GR_OPER_DNS2ACC_2_"],
"DNS_ACC_GFO": ["GF_OPER_DNS1ACC_2_"], # empty GF_OPER_DNS2ACC_2_ exists
"WND_ACC_CHAMP": ["CH_OPER_WND_ACC_2_"],
"WND_ACC_GRACE": ["GR_OPER_WND1ACC_2_", "GR_OPER_WND2ACC_2_"],
"WND_ACC_GFO": ["GF_OPER_WND1ACC_2_"], # empty GF_OPER_WND2ACC_2_ exists
# TOLEOS conjunctions
"MM_CON_EPH_2_:crossover": ["MM_OPER_CON_EPH_2_:crossover"],
"MM_CON_EPH_2_:plane_alignment": ["MM_OPER_CON_EPH_2_:plane_alignment"],
"NIX_TMS": ["SW_OPER_NIX_TMS_2F"],
"TIX_TMS": ["SW_OPER_TIX_TMS_2F"],
# ULF and PC1 products
"ULF_MAG": [f"SW_OPER_ULF{spacecraft}MAG_2F" for spacecraft in "ABC"],
"ULF_MAG:event": [
f"SW_OPER_ULF{spacecraft}MAG_2F:event" for spacecraft in "ABC"
],
"ULF_MAG:event_mean": [
f"SW_OPER_ULF{spacecraft}MAG_2F:event_mean" for spacecraft in "ABC"
],
"PC1_MAG:event": [
*(f"SW_OPER_PC1{spacecraft}MAG_2F:Bp_event" for spacecraft in "ABC"),
*(f"SW_OPER_PC1{spacecraft}MAG_2F:Br_event" for spacecraft in "ABC"),
*(f"SW_OPER_PC1{spacecraft}MAG_2F:Ba_event" for spacecraft in "ABC"),
],
"PC1_MAG:event_mean": [
*(f"SW_OPER_PC1{spacecraft}MAG_2F:Bp_event_mean" for spacecraft in "ABC"),
*(f"SW_OPER_PC1{spacecraft}MAG_2F:Br_event_mean" for spacecraft in "ABC"),
*(f"SW_OPER_PC1{spacecraft}MAG_2F:Ba_event_mean" for spacecraft in "ABC"),
],
}
OBS_COLLECTIONS = [
"SW_OPER_AUX_OBSH2_",
"SW_OPER_AUX_OBSM2_",
"SW_OPER_AUX_OBSS2_",
"SW_OPER_VOBS_1M_2_",
"SW_OPER_VOBS_4M_2_",
"CH_OPER_VOBS_1M_2_",
"CR_OPER_VOBS_1M_2_",
"OR_OPER_VOBS_1M_2_",
"CO_OPER_VOBS_1M_2_",
"OR_OPER_VOBS_4M_2_",
"CH_OPER_VOBS_4M_2_",
"CR_OPER_VOBS_4M_2_",
"CO_OPER_VOBS_4M_2_",
"SW_OPER_VOBS_1M_2_:SecularVariation",
"SW_OPER_VOBS_4M_2_:SecularVariation",
"CH_OPER_VOBS_1M_2_:SecularVariation",
"CR_OPER_VOBS_1M_2_:SecularVariation",
"OR_OPER_VOBS_1M_2_:SecularVariation",
"CO_OPER_VOBS_1M_2_:SecularVariation",
"OR_OPER_VOBS_4M_2_:SecularVariation",
"CH_OPER_VOBS_4M_2_:SecularVariation",
"CR_OPER_VOBS_4M_2_:SecularVariation",
"CO_OPER_VOBS_4M_2_:SecularVariation",
]
# These are not necessarily real sampling steps, but are good enough to use
# for splitting long requests into chunks.
# The time step set here should be equal to or shorter than the real sampling
# (defaults to PT1S if not set)
COLLECTION_SAMPLING_STEPS = {
"MAG": "PT1S",
"MAG_HR": "PT0.019S", # approx 50Hz (the sampling is not exactly 50Hz)
"EFI": "PT0.5S",
"EFI_IDM": "PT0.5S",
"EFI_TIE": "PT0.5S",
"EFI_TCT02": "PT0.5S",
"EFI_TCT16": "PT0.0625S",
"IBI": "PT1S",
"TEC": "PT1S", # Actually more complicated - non-unique samples
"FAC": "PT1S",
"EEF": "PT90M",
"IPD": "PT1S",
"AEJ_LPL": "PT15.6S",
"AEJ_LPS": "PT1S",
"AUX_OBSH": "PT60M",
"AUX_OBSM": "PT60S",
"AUX_OBSS": "PT1S",
"VOBS_SW_1M": "P31D",
"VOBS_CH_1M": "P31D",
"VOBS_CR_1M": "P31D",
"VOBS_OR_1M": "P31D",
"VOBS_CO_1M": "P31D",
"VOBS_OR_4M": "P122D",
"VOBS_SW_4M": "P122D",
"VOBS_CH_4M": "P122D",
"VOBS_CR_4M": "P122D",
"VOBS_CO_4M": "P122D",
"VOBS_SW_1M:SecularVariation": "P31D",
"VOBS_CH_1M:SecularVariation": "P31D",
"VOBS_CR_1M:SecularVariation": "P31D",
"VOBS_OR_1M:SecularVariation": "P31D",
"VOBS_CO_1M:SecularVariation": "P31D",
"VOBS_OR_4M:SecularVariation": "P122D",
"VOBS_SW_4M:SecularVariation": "P122D",
"VOBS_CH_4M:SecularVariation": "P122D",
"VOBS_CR_4M:SecularVariation": "P122D",
"VOBS_CO_4M:SecularVariation": "P122D",
"MIT_LP": "PT20M",
"MIT_LP:ID": "PT20M",
"MIT_TEC": "PT20M",
"MIT_TEC:ID": "PT20M",
"PPI_FAC": "PT20M",
"PPI_FAC:ID": "PT20M",
"DNS_POD": "PT30S",
"DNS_ACC": "PT10S",
"DNS_ACC_CHAMP": "PT10S",
"DNS_ACC_GRACE": "PT10S",
"DNS_ACC_GFO": "PT10S",
"WND_ACC_CHAMP": "PT10S",
"WND_ACC_GRACE": "PT10S",
"WND_ACC_GFO": "PT10S",
"MM_CON_EPH_2_:crossover": "PT20M",
"MM_CON_EPH_2_:plane_alignment": "P1D",
"NIX_TMS": "PT8S",
"TIX_TMS": "PT8S",
"TEC_TIRO": "PT1S", # Actually more complicated - non-unique samples
"NE_TIRO": "PT5S",
"ULF_MAG": "PT1M",
"ULF_MAG:event": "PT1S", # irregular sampling
"ULF_MAG:event_mean": "PT1M", # irregular sampling
"PC1_MAG:event": "PT1S", # irregular sampling
"PC1_MAG:event_mean": "PT1M", # irregular sampling
}
PRODUCT_VARIABLES = {
"MAG": [
"F",
"dF_Sun",
"dF_AOCS",
"dF_other",
"F_error",
"B_VFM",
"B_NEC",
"dB_Sun",
"dB_AOCS",
"dB_other",
"B_error",
"q_NEC_CRF",
"Att_error",
"Flags_F",
"Flags_B",
"Flags_q",
"Flags_Platform",
"ASM_Freq_Dev",
],
"MAG_HR": [ # NOTE: F is calculated on the fly from B_NEC (F = |B_NEC|)
"F",
"B_VFM",
"B_NEC",
"dB_Sun",
"dB_AOCS",
"dB_other",
"B_error",
"q_NEC_CRF",
"Att_error",
"Flags_B",
"Flags_q",
"Flags_Platform",
],
"EFI": [
"U_orbit",
"N_ion",
"dN_ion",
"N_ion_error",
"N_elec",
"N_elec_error",
"T_elec",
"dT_elec",
"T_elec_error",
"Vs",
"Vs_error",
"Flags_N_elec",
"Flags_N_ion",
"Flags_T_elec",
"Flags_Vs",
"Flagbits1",
"Flagbits2",
"Gamma1",
"Gamma2",
],
"EFI_IDM": [
"Latitude_GD",
"Longitude_GD",
"Height_GD",
"Radius_GC",
"Latitude_QD",
"MLT_QD",
"V_sat_nec",
"M_i_eff",
"M_i_eff_err",
"M_i_eff_Flags",
"M_i_eff_tbt_model",
"V_i",
"V_i_err",
"V_i_Flags",
"V_i_raw",
"N_i",
"N_i_err",
"N_i_Flags",
"A_fp",
"R_p",
"T_e",
"Phi_sc",
],
"EFI_TIE": [
"Latitude_GD",
"Longitude_GD",
"Height_GD",
"Radius_GC",
"Latitude_QD",
"MLT_QD",
"Tn_msis",
"Te_adj_LP",
"Ti_meas_drift",
"Ti_model_drift",
"Flag_ti_meas",
"Flag_ti_model",
],
"EFI_TCT02": [ # identical to EFI_TCT16
"VsatC",
"VsatE",
"VsatN",
"Bx",
"By",
"Bz",
"Ehx",
"Ehy",
"Ehz",
"Evx",
"Evy",
"Evz",
"Vicrx",
"Vicry",
"Vicrz",
"Vixv",
"Vixh",
"Viy",
"Viz",
"Vixv_error",
"Vixh_error",
"Viy_error",
"Viz_error",
"Latitude_QD",
"MLT_QD",
"Calibration_flags",
"Quality_flags",
],
"EFI_TCT16": [ # identical to EFI_TCT02
"VsatC",
"VsatE",
"VsatN",
"Bx",
"By",
"Bz",
"Ehx",
"Ehy",
"Ehz",
"Evx",
"Evy",
"Evz",
"Vicrx",
"Vicry",
"Vicrz",
"Vixv",
"Vixh",
"Viy",
"Viz",
"Vixv_error",
"Vixh_error",
"Viy_error",
"Viz_error",
"Latitude_QD",
"MLT_QD",
"Calibration_flags",
"Quality_flags",
],
"IBI": [
"Bubble_Index",
"Bubble_Probability",
"Flags_Bubble",
"Flags_F",
"Flags_B",
"Flags_q",
],
"TEC": [
"GPS_Position",
"LEO_Position",
"PRN",
"L1",
"L2",
"P1",
"P2",
"S1",
"S2",
"Elevation_Angle",
"Absolute_VTEC",
"Absolute_STEC",
"Relative_STEC",
"Relative_STEC_RMS",
"DCB",
"DCB_Error",
],
"TEC_TIRO": [
"GPS_Position",
"LEO_Position",
"PRN",
"L1",
"L2",
"P1",
"P2",
"S1_C_N0",
"S2_C_N0",
"Elevation_Angle",
"Absolute_VTEC",
"Absolute_STEC",
"Relative_STEC",
"Relative_STEC_RMS",
"DCB",
"DCB_Error",
],
"NE_TIRO": [
"LEO_Position",
"Distance",
"Relative_Hor_TEC",
"Relative_Ne",
"Absolute_Ne",
],
"FAC": [
"IRC",
"IRC_Error",
"FAC",
"FAC_Error",
"Flags",
"Flags_F",
"Flags_B",
"Flags_q",
],
"EEF": ["EEF", "EEJ_meast", "EEJ_mnorth", "RelErr", "Flags"],
"IPD": [
"Ne",
"Te",
"Background_Ne",
"Foreground_Ne",
"PCP_flag",
"Grad_Ne_at_100km",
"Grad_Ne_at_50km",
"Grad_Ne_at_20km",
"Grad_Ne_at_PCP_edge",
"ROD",
"RODI10s",
"RODI20s",
"delta_Ne10s",
"delta_Ne20s",
"delta_Ne40s",
"Num_GPS_satellites",
"mVTEC",
"mROT",
"mROTI10s",
"mROTI20s",
"IBI_flag",
"Ionosphere_region_flag",
"IPIR_index",
"Ne_quality_flag",
"TEC_STD",
],
"AEJ_LPL": ["Latitude_QD", "Longitude_QD", "MLT_QD", "J_NE", "J_QD"],
"AEJ_LPL:Quality": ["RMS_misfit", "Confidence"],
"AEJ_LPS": [
"Latitude_QD",
"Longitude_QD",
"MLT_QD",
"J_CF_NE",
"J_DF_NE",
"J_CF_SemiQD",
"J_DF_SemiQD",
"J_R",
],
"AEJ_LPS:Quality": ["RMS_misfit", "Confidence"],
"AEJ_PBL": [
"Latitude_QD",
"Longitude_QD",
"MLT_QD",
"J_QD",
"Flags",
"PointType",
],
"AEJ_PBS": [
"Latitude_QD",
"Longitude_QD",
"MLT_QD",
"J_DF_SemiQD",
"Flags",
"PointType",
],
"AEJ_PBS:GroundMagneticDisturbance": ["B_NE"],
"AOB_FAC": [
"Latitude_QD",
"Longitude_QD",
"MLT_QD",
"Boundary_Flag",
"Quality",
"Pair_Indicator",
],
"AUX_OBSH": ["B_NEC", "F", "IAGA_code", "Quality", "ObsIndex"],
"AUX_OBSM": ["B_NEC", "F", "IAGA_code", "Quality"],
"AUX_OBSS": ["B_NEC", "F", "IAGA_code", "Quality"],
"VOBS_SW_1M": ["SiteCode", "B_CF", "B_OB", "sigma_CF", "sigma_OB"],
"VOBS_CH_1M": ["SiteCode", "B_CF", "B_OB", "sigma_CF", "sigma_OB"],
"VOBS_CR_1M": ["SiteCode", "B_CF", "B_OB", "sigma_CF", "sigma_OB"],
"VOBS_OR_1M": ["SiteCode", "B_CF", "B_OB", "sigma_CF", "sigma_OB"],
"VOBS_CO_1M": ["SiteCode", "B_CF", "B_OB", "sigma_CF", "sigma_OB"],
"VOBS_OR_4M": ["SiteCode", "B_CF", "B_OB", "sigma_CF", "sigma_OB"],
"VOBS_SW_4M": ["SiteCode", "B_CF", "B_OB", "sigma_CF", "sigma_OB"],
"VOBS_CH_4M": ["SiteCode", "B_CF", "B_OB", "sigma_CF", "sigma_OB"],
"VOBS_CR_4M": ["SiteCode", "B_CF", "B_OB", "sigma_CF", "sigma_OB"],
"VOBS_CO_4M": ["SiteCode", "B_CF", "B_OB", "sigma_CF", "sigma_OB"],
"VOBS_SW_1M:SecularVariation": ["SiteCode", "B_SV", "sigma_SV"],
"VOBS_CH_1M:SecularVariation": ["SiteCode", "B_SV", "sigma_SV"],
"VOBS_CR_1M:SecularVariation": ["SiteCode", "B_SV", "sigma_SV"],
"VOBS_OR_1M:SecularVariation": ["SiteCode", "B_SV", "sigma_SV"],
"VOBS_CO_1M:SecularVariation": ["SiteCode", "B_SV", "sigma_SV"],
"VOBS_OR_4M:SecularVariation": ["SiteCode", "B_SV", "sigma_SV"],
"VOBS_SW_4M:SecularVariation": ["SiteCode", "B_SV", "sigma_SV"],
"VOBS_CH_4M:SecularVariation": ["SiteCode", "B_SV", "sigma_SV"],
"VOBS_CR_4M:SecularVariation": ["SiteCode", "B_SV", "sigma_SV"],
"VOBS_CO_4M:SecularVariation": ["SiteCode", "B_SV", "sigma_SV"],
"MIT_LP": [
"Counter",
"Latitude_QD",
"Longitude_QD",
"MLT_QD",
"L_value",
"SZA",
"Ne",
"Te",
"Depth",
"DR",
"Width",
"dL",
"PW_Gradient",
"EW_Gradient",
"Quality",
],
"MIT_LP:ID": [
"Counter",
"Latitude_QD",
"Longitude_QD",
"MLT_QD",
"L_value",
"SZA",
"Ne",
"Te",
"Position_Quality",
"PointType",
],
"MIT_TEC": [
"Counter",
"Latitude_QD",
"Longitude_QD",
"MLT_QD",
"L_value",
"SZA",
"TEC",
"Depth",
"DR",
"Width",
"dL",
"PW_Gradient",
"EW_Gradient",
"Quality",
],
"MIT_TEC:ID": [
"Counter",
"Latitude_QD",
"Longitude_QD",
"MLT_QD",
"L_value",
"SZA",
"TEC",
"Position_Quality",
"PointType",
],
"PPI_FAC": [
"Counter",
"Latitude_QD",
"Longitude_QD",
"MLT_QD",
"L_value",
"SZA",
"Sigma",
"PPI",
"dL",
"Quality",
],
"PPI_FAC:ID": [
"Counter",
"Latitude_QD",
"Longitude_QD",
"MLT_QD",
"L_value",
"SZA",
"Position_Quality",
"PointType",
],
"MAG_CHAMP": [
"F",
"B_VFM",
"B_NEC",
"Flags_Position",
"Flags_B",
"Flags_q",
"Mode_q",
"q_ICRF_CRF",
],
"MAG_CS": [
"F",
"B_NEC",
"B_mod_NEC",
"B_NEC1",
"B_NEC2",
"B_NEC3",
"B_FGM1",
"B_FGM2",
"B_FGM3",
"q_NEC_CRF",
"q_error",
],
"MAG_GRACE": [
"F",
"B_NEC",
"B_NEC_raw",
"B_FGM",
"q_NEC_CRF",
"q_error",
],
"MAG_GFO": [
"F",
"B_NEC",
"B_FGM",
"dB_MTQ_FGM",
"dB_XI_FGM",
"dB_NY_FGM",
"dB_BT_FGM",
"dB_ST_FGM",
"dB_SA_FGM",
"dB_BAT_FGM",
"q_NEC_FGM",
"B_FLAG",
],
"MAG_GFO_ML": [
"F",
"B_MAG",
"B_NEC",
"q_NEC_FGM",
"B_FLAG",
"KP_DST_FLAG",
"Latitude_QD",
"Longitude_QD",
],
"MAG_GOCE": [
"F",
"B_MAG",
"B_NEC",
"dB_MTQ_SC",
"dB_XI_SC",
"dB_NY_SC",
"dB_BT_SC",
"dB_ST_SC",
"dB_SA_SC",
"dB_BAT_SC",
"dB_HK_SC",
"dB_BLOCK_CORR",
"q_SC_NEC",
"q_MAG_SC",
"B_FLAG",
],
"MAG_GOCE_ML": [
"F",
"B_MAG",
"B_NEC",
"q_FGM_NEC",
"B_FLAG",
"MAGNETIC_ACTIVITY_FLAG",
"NaN_FLAG",
"Latitude_QD",
"Longitude_QD",
],
"MOD_SC": [],
"DNS_POD": [
"Height_GD",
"Latitude_GD",
"Longitude_GD",
"Height_GD",
"local_solar_time",
"density",
"density_orbitmean",
"validity_flag",
],
"DNS_ACC": [
"Height_GD",
"Latitude_GD",
"Longitude_GD",
"Height_GD",
"density",
"local_solar_time",
],
"DNS_ACC_CHAMP": [
"Height_GD",
"Latitude_GD",
"Longitude_GD",
"density",
"density_orbitmean",
"local_solar_time",
"validity_flag",
"validity_flag_orbitmean",
],
"DNS_ACC_GRACE": [
"Height_GD",
"Latitude_GD",
"Longitude_GD",
"density",
"density_orbitmean",
"local_solar_time",
"validity_flag",
"validity_flag_orbitmean",
],
"DNS_ACC_GFO": [
"Height_GD",
"Latitude_GD",
"Longitude_GD",
"density",
"density_orbitmean",
"local_solar_time",
"validity_flag",
"validity_flag_orbitmean",
],
"WND_ACC_CHAMP": [
"Height_GD",
"Latitude_GD",
"Longitude_GD",
"crosswind",
"crosswind_direction",
"local_solar_time",
"validity_flag",
],
"WND_ACC_GRACE": [
"Height_GD",
"Latitude_GD",
"Longitude_GD",
"crosswind",
"crosswind_direction",
"local_solar_time",
"validity_flag",
],
"WND_ACC_GFO": [
"Height_GD",
"Latitude_GD",
"Longitude_GD",
"crosswind",
"crosswind_direction",
"local_solar_time",
"validity_flag",
],
"MM_CON_EPH_2_:crossover": [
"time_1",
"time_2",
"time_difference",
"satellite_1",
"satellite_2",
"latitude",
"longitude",
"altitude_1",
"altitude_2",
"magnetic_latitude",
"magnetic_longitude",
"local_solar_time_1",
"local_solar_time_2",
],
"MM_CON_EPH_2_:plane_alignment": [
"time",
"altitude_1",
"altitude_2",
"ltan_1",
"ltan_2",
"ltan_rate_1",
"ltan_rate_2",
"satellite_1",
"satellite_2",
],
"NIX_TMS": [
"Distance",
"Azimuth",
"Negix_X",
"Negix_X_Sigma",
"Negix_X_P95",
"Negix_Y",
"Negix_Y_Sigma",
"Negix_Y_P95",
"Negix_Total",
"Negix_Sigma",
"Negix_P95",
"N_Measurements",
"Flag_Negix",
"Orbit_Label",
],
"TIX_TMS": [
"Longitude_Swarm",
"Latitude_Swarm",
"Distance",
"Azimuth",
"Tegix_X",
"Tegix_X_Sigma",
"Tegix_X_P95",
"Tegix_Y",
"Tegix_Y_Sigma",
"Tegix_Y_P95",
"Tegix_Total",
"Tegix_Sigma",
"Tegix_P95",
"N_Measurements",
"Flag_Tegix",
"Orbit_Label",
],
"ULF_MAG": [
"Timestamp",
"Latitude",
"Longitude",
"Radius",
"Latitude_QD",
"Longitude_QD",
"MLT_QD",
"UT",
"SZA",
"Frequency_dominant",
"Halfwidth",
"Power",
"Prominence",
"Pc2_act",
"Pc3_act",
"Pc4_act",
"Pi2_act",
"Flag_Pc2",
"Flag_Pc3",
"Flag_Pc4",
"Flag_Pi2",
"Flag_EPB",
"Flag_FAC",
],
"ULF_MAG:event": [
"Timestamp",
"Latitude",
"Longitude",
"Radius",
"Latitude_QD",
"Longitude_QD",
"MLT_QD",
"SZA",
"ID",
"ORB",
"DIR",
"Frequency",
"Halfwidth",
"Power",
"Prominence",
"EPB",
"FAC",
"Flag_B",
"Quality",
],
"ULF_MAG:event_mean": [
"Timestamp",
"Latitude",
"Longitude",
"Radius",
"Latitude_QD",
"Longitude_QD",
"MLT_QD",
"SZA",
"ID",
"ORB",
"DIR",
"Duration",
"Frequency",
"Freq_std",
"Halfwidth",
"Power",
"Prominence",
"EPB",
"FAC",
"Flag_B",
"Quality",
],
"PC1_MAG:event": [
"ID",
"Timestamp",
"Latitude",
"Longitude",
"Radius",
"Latitude_QD",
"Longitude_QD",
"MLT_QD",
"SZA",
"ORB",
"DIR",
"Frequency",
"Halfwidth",
"Power",
"Prominence",
"Quality_B",
"Quality_p",
"Quality_n",
],
"PC1_MAG:event_mean": [
"Timestamp",
"Latitude",
"Longitude",
"Radius",
"Latitude_QD",
"Longitude_QD",
"MLT_QD",
"SZA",
"ID",
"ORB",
"DIR",
"Duration",
"Frequency",
"Freq_std",
"Halfwidth",
"Power",
"Prominence",
"ROFC",
"Quality",
],
}
AUXILIARY_VARIABLES = [
"Timestamp",
"Latitude",
"Longitude",
"Radius",
"Spacecraft",
"OrbitDirection",
"QDOrbitDirection",
"SyncStatus",
"Kp10",
"Kp",
"Dst",
"F107",
"F107_avg81d",
"F107_avg81d_count",
"IMF_BY_GSM",
"IMF_BZ_GSM",
"IMF_V",
"F10_INDEX",
"OrbitSource",
"OrbitNumber",
"AscendingNodeTime",
"AscendingNodeLongitude",
"QDLat",
"QDLon",
"QDBasis",
"MLT",
"SunDeclination",
"SunHourAngle",
"SunRightAscension",
"SunAzimuthAngle",
"SunZenithAngle",
"SunLongitude",
"SunVector",
"DipoleAxisVector",
"NGPLatitude",
"NGPLongitude",
"DipoleTiltAngle",
"dDst",
]
MAGNETIC_MODEL_VARIABLES = {
"F": "F",
"B_NEC": "B_NEC",
"B_NEC1": "B_NEC",
"B_NEC2": "B_NEC",
"B_NEC3": "B_NEC",
}
MAGNETIC_MODELS = [
"IGRF",
"LCS-1",
"MF7",
"CHAOS-Core",
"CHAOS-Static",
"CHAOS-MMA-Primary",
"CHAOS-MMA-Secondary",
"CHAOS-MIO",
"MCO_SHA_2C",
"MCO_SHA_2D",
"MLI_SHA_2C",
"MLI_SHA_2D",
"MLI_SHA_2E",
"MMA_SHA_2C-Primary",
"MMA_SHA_2C-Secondary",
"MMA_SHA_2F-Primary",
"MMA_SHA_2F-Secondary",
"MIO_SHA_2C-Primary",
"MIO_SHA_2C-Secondary",
"MIO_SHA_2D-Primary",
"MIO_SHA_2D-Secondary",
"AMPS",
"MCO_SHA_2X",
"CHAOS",
"CHAOS-MMA",
"MMA_SHA_2C",
"MMA_SHA_2F",
"MIO_SHA_2C",
"MIO_SHA_2D",
"SwarmCI",
]
def __init__(
self, url=None, token=None, config=None, logging_level=DEFAULT_LOGGING_LEVEL
):
super().__init__(url, token, config, logging_level, server_type="Swarm")
self._available = self._get_available_data()
self._request_inputs = SwarmWPSInputs()
self._templatefiles = TEMPLATE_FILES
self._filterlist = []
self._supported_filetypes = ("csv", "cdf")
self._collection_list = None
@classmethod
def _get_available_data(cls):
# Build the reverse mapping: "SW_OPER_MAGA_LR_1B": "MAG" etc
collections_to_keys = {}
for key, collections in cls.COLLECTIONS.items():
collections_to_keys.update({collection: key for collection in collections})
return {
"collections": cls.COLLECTIONS,
"collections_to_keys": collections_to_keys,
"collection_sampling_steps": cls.COLLECTION_SAMPLING_STEPS,
"measurements": cls.PRODUCT_VARIABLES,
"models": cls.MAGNETIC_MODELS,
"model_variables": cls.MAGNETIC_MODEL_VARIABLES,
"auxiliaries": cls.AUXILIARY_VARIABLES,
}
@staticmethod
def _parse_models_input(models=None):
"""Verify and parse models input.
Args:
models (list/dict): User-provided values
Returns:
list: model_ids, list of model_id strings
str: model_expression_string to be passed to the server
"""
models = [] if models is None else models
# Convert input to OrderedDict
# e.g. {"model_name": "model_expression", ..}
# Check if models input is basic list of strings,
# If not, then handle inputs given as dicts or list of tuples
if isinstance(models, list) and all(isinstance(item, str) for item in models):
# Convert the models list to an OrderedDict
model_expressions = OrderedDict()
for model in models:
model_id, _, model_expression = (
s.strip() for s in model.partition("=")
)
model_expressions[model_id] = model_expression
else:
try:
model_expressions = OrderedDict(models)
# Check that everything is a string
if not all(
isinstance(item, str)
for item in [*model_expressions.values()]
+ [*model_expressions.keys()]
):
raise ValueError
except ValueError:
raise ValueError("Invalid models input!")
# TODO: Verify input model names
# (use self._available["models"])
# Create the combined model expression string passed to the request
model_expression_string = ""
for model_id, model_expression in model_expressions.items():
if model_expression == "":
s = model_id
else:
s = "=".join([model_id, model_expression])
model_expression_string = ",".join([model_expression_string, s])
model_ids = list(s.strip("'\"") for s in model_expressions.keys())
return model_ids, model_expression_string[1:]
[docs]
def available_collections(self, groupname=None, details=True):
"""Show details of available collections.
Args:
groupname (str): one of: ("MAG", "EFI", etc.)
details (bool): If True then print a nice output.
If False then return a dict of available collections.
"""
# Shorter form of the available collections,
# without all the individual SiteCodes
collections_short = self._available["collections"].copy()
collections_short["AUX_OBSS"] = ["SW_OPER_AUX_OBSS2_"]
collections_short["AUX_OBSM"] = ["SW_OPER_AUX_OBSM2_"]
collections_short["AUX_OBSH"] = ["SW_OPER_AUX_OBSH2_"]
for mission in ("SW", "OR", "CH", "CR", "CO"):
for cadence in ("1M", "4M"):
collections_short[f"VOBS_{mission}_{cadence}"] = [
f"{mission}_OPER_VOBS_{cadence}_2_"
]
collections_short[f"VOBS_{mission}_{cadence}:SecularVariation"] = [
f"{mission}_OPER_VOBS_{cadence}_2_:SecularVariation"
]
def _filter_collections(groupname):
"""Reduce the full list to just one group, e.g. "MAG"""
if groupname:
groups = list(collections_short.keys())
if groupname in groups:
return {groupname: collections_short[groupname]}
else:
raise ValueError("Invalid collection group name")
else:
return collections_short
collections_filtered = _filter_collections(groupname)
if details:
print("General References:")
for i in REFERENCES["General Swarm"]:
print(i)
print()
for key, val in collections_filtered.items():
print(key)
for i in val:
print(" ", i)
refs = COLLECTION_REFERENCES.get(key, ("No reference...",))
for ref in refs:
print(ref)
print()
else:
return collections_filtered
[docs]
def available_measurements(self, collection=None):
"""Returns a list of the available measurements for the chosen collection.
Args:
collection (str): one of: ("MAG", "EFI", "IBI", "TEC", "FAC", "EEF")
"""
keys = list(self._available["measurements"].keys())
if collection in keys:
collection_key = collection
return self._available["measurements"][collection_key]
elif collection in self._available["collections_to_keys"]:
collection_key = self._available["collections_to_keys"][collection]
return self._available["measurements"][collection_key]
elif collection is None:
return self._available["measurements"]
else:
raise Exception(
"collection must be one of {}\nor\n{}".format(
", ".join(keys), "\n".join(self._available["collections_to_keys"])
)
)
[docs]
def available_models(self, param=None, details=True, nice_output=True):
"""Show details of avalable models.
If details is True, return a dictionary of model names and details.
If nice_output is True, the dictionary is printed nicely.
If details is False, return a list of model names.
If param is set, filter to only return entries including this
Note:
| F = Fast-Track Products
| C = Comprehensive Inversion
| D = Dedicated Chain
| MCO = Core / main
| MLI = Lithosphere
| MMA = Magnetosphere
| MIO = Ionosphere
Args:
param (str): one of "F C D MCO MLI MMA MIO"
details (bool): True for a dict of details, False for a brief list
nice_output (bool): If True, just print the dict nicely
"""
def filter_by_param(d, param):
if param in ("F", "C", "D"):
param = "2" + param
return [i for i in d if param in i]
# get all models provided by the server
models_info = self.get_model_info()
# keep only models really provided by the server
d = [
model_name
for model_name in self._available["models"]
if model_name in models_info
]
# Filter the dict/list to only include those that contain param
if param is not None:
d = filter_by_param(d, param)
if details:
d = {
model_name: {
"description": MODEL_REFERENCES[model_name],
"details": models_info[model_name],
}
for model_name in d
}
if nice_output and details:
d = OrderedDict(sorted(d.items()))
for model_name, desc_details in d.items():
print(model_name, "=", desc_details["details"]["expression"])
print(" START:", desc_details["details"]["validity"]["start"])
print(" END: ", desc_details["details"]["validity"]["end"])
print("DESCRIPTION:")
for line in desc_details["description"]:
print(line)
print("SOURCES:")
for line in desc_details["details"]["sources"]:
print(" ", line)
print()
else:
return d
[docs]
def available_auxiliaries(self):
"""Returns a list of the available auxiliary parameters."""
return self._available["auxiliaries"]
[docs]
def available_observatories(
self, collection, start_time=None, end_time=None, details=False, verbose=True
):
"""Get list of available observatories from server.
Search availability by collection, one of::
"SW_OPER_AUX_OBSH2_"
"SW_OPER_AUX_OBSM2_"
"SW_OPER_AUX_OBSS2_"
Examples:
::
from viresclient import SwarmRequest
request = SwarmRequest()
# For a list of observatories available:
request.available_observatories("SW_OPER_AUX_OBSM2_")
# For a DataFrame also containing availability start and end times:
request.available_observatories("SW_OPER_AUX_OBSM2_", details=True)
# For available observatories during a given time period:
request.available_observatories(
"SW_OPER_AUX_OBSM2_", "2013-01-01", "2013-02-01"
)
Args:
collection (str): OBS collection name, e.g. "SW_OPER_AUX_OBSM2\\_"
start_time (datetime / ISO_8601 string)
end_time (datetime / ISO_8601 string)
details (bool): returns DataFrame if True
verbose (bool): Notify with special data terms
Returns:
list or DataFrame: IAGA codes (and start/end times)
"""
def _request_get_observatories(collection=None, start_time=None, end_time=None):
"""Make the get_observatories request to the server"""
templatefile = TEMPLATE_FILES["get_observatories"]
template = JINJA2_ENVIRONMENT.get_template(templatefile)
request = template.render(
collection_id=collection,
begin_time=start_time,
end_time=end_time,
response_type="text/csv",
).encode("UTF-8")
response = self._get(request, asynchronous=False, show_progress=False)
return response
def _csv_to_df(csv_data):
"""Convert bytes data to pandas dataframe"""
return read_csv(StringIO(str(csv_data, "utf-8")))
if collection not in self.OBS_COLLECTIONS:
raise ValueError(
f"Invalid collection: {collection}. Must be one of: {self.OBS_COLLECTIONS}."
)
if start_time and end_time:
start_time = parse_datetime(start_time)
end_time = parse_datetime(end_time)
else:
start_time, end_time = None, None
if verbose:
self._detect_AUX_OBS([collection])
response = _request_get_observatories(collection, start_time, end_time)
df = _csv_to_df(response)
if details:
return df
else:
# note: "IAGACode" has been renamed to "site" in VirES 3.5
key = "IAGACode" if "IAGACode" in df.keys() else "site"
return list(df[key])
def _detect_AUX_OBS(self, collections):
# Identify collection types present
collection_types_requested = {
self._available["collections_to_keys"].get(collection)
for collection in collections
}
# Output notification for each of aux_type
for aux_type in ["AUX_OBSH", "AUX_OBSM", "AUX_OBSS"]:
if aux_type in collection_types_requested:
output_text = dedent(
f"""
Accessing INTERMAGNET and/or WDC data
Check usage terms at {DATA_CITATIONS.get(aux_type)}
"""
)
tqdm.write(output_text)
[docs]
def set_collection(self, *args, verbose=True):
"""Set the collection(s) to use.
Args:
(str): one or several from .available_collections()
verbose (bool): Notify if special data terms
"""
collections = [*args]
for collection in collections:
if not isinstance(collection, str):
raise TypeError(f"{collection} invalid. Must be string.")
if collection not in self._available["collections_to_keys"]:
raise ValueError(
"Invalid collection: {}. "
"Check available with SwarmRequest().available_collections()".format(
collection
)
)
if verbose:
self._detect_AUX_OBS(collections)
self._collection_list = collections
self._request_inputs.set_collections(collections)
# type specific file options
self._file_options = (
self.FILE_OPTIONS.get(self._available["collections_to_keys"][collection])
or {}
)
return self
[docs]
def set_products(
self,
measurements=None,
models=None,
custom_model=None,
auxiliaries=None,
residuals=False,
sampling_step=None,
ignore_cached_models=False,
do_not_interpolate_models=False,
):
"""Set the combination of products to retrieve.
If residuals=True then just get the measurement-model residuals,
otherwise get both measurement and model values.
Args:
measurements (list(str)): from .available_measurements(collection_key)
models (list(str)/dict): from .available_models() or defineable with custom expressions
custom_model (str): path to a custom model in .shc format
auxiliaries (list(str)): from .available_auxiliaries()
residuals (bool): True if only returning measurement-model residual
sampling_step (str): ISO_8601 duration, e.g. 10 seconds: PT10S, 1 minute: PT1M
ignore_cached_models (bool): True if cached models should be ignored and calculated on-the-fly
do_not_interpolate_models (bool): True if the models for HR collection should not be interpolated from the LR collection
"""
if self._collection_list is None:
raise Exception("Must run .set_collection() first.")
measurements = [] if measurements is None else measurements
models = [] if models is None else models
model_variables = self._available["model_variables"]
auxiliaries = [] if auxiliaries is None else auxiliaries
# If inputs are strings (when providing only one parameter)
# put them in lists
if isinstance(measurements, str):
measurements = [measurements]
if isinstance(models, str):
models = [models]
if isinstance(auxiliaries, str):
auxiliaries = [auxiliaries]
# print warning for deprecated models
self._check_deprecated_models(models)
# Check the chosen measurements are available for the set collections
available_measurements = []
for collection in self._collection_list:
collection_key = self._available["collections_to_keys"][collection]
available_measurements.extend(
self._available["measurements"][collection_key]
)
for variable in measurements:
if variable not in available_measurements:
raise Exception(
"Measurement '{}' not available for collection '{}'. "
"Check available with "
"SwarmRequest.available_measurements({})".format(
variable, collection_key, collection_key
)
)
# Check if at least one model defined when requesting residuals
if residuals and not models:
raise Exception("Residuals requested but no model defined!")
# Check models format, extract model_ids and string to pass to server
model_ids, model_expression_string = self._parse_models_input(models)
# Check chosen aux is available
for variable in auxiliaries:
if variable not in self._available["auxiliaries"]:
raise Exception(
"'{}' not available. Check available with "
"SwarmRequest.available_auxiliaries()".format(variable)
)
# Load the custom .shc file
if custom_model:
if os.path.exists(custom_model):
with open(custom_model) as custom_shc_file:
custom_shc = custom_shc_file.read()
model_ids.append("Custom_Model")
else:
raise OSError("Custom model .shc file not found")
else:
custom_shc = None
# Set up the variables that actually get passed to the WPS request
# Requested variables, start with the measurements ...
variables = set(measurements)
# model-related measurements
_requested_model_variables = [
variable for variable in measurements if variable in model_variables
]
if residuals:
# Remove the measurements ...
variables.difference_update(_requested_model_variables)
# ... add their residuals instead.
variables.update(
f"{variable}_res_{model_id}"
for variable in _requested_model_variables
for model_id in model_ids
)
else:
# If no variable is requested fall back to B_NEC.
if not _requested_model_variables:
_requested_model_variables = ["B_NEC"]
# Add calculated model variables.
variables.update(
f"{variable}_{model_id}"
for variable in (
model_variables[variable] for variable in _requested_model_variables
)
for model_id in model_ids
)
# Finally, add the auxiliary variables.
variables.update(auxiliaries)
self._request_inputs.model_expression = model_expression_string
self._request_inputs.variables = list(variables)
self._request_inputs.sampling_step = sampling_step
self._request_inputs.custom_shc = custom_shc
self._request_inputs.ignore_cached_models = ignore_cached_models
self._request_inputs.do_not_interpolate_models = do_not_interpolate_models
return self
[docs]
def set_range_filter(self, parameter, minimum=None, maximum=None, negate=False):
"""Set a range filter to apply.
Filters data for minimum ≤ parameter ≤ maximum,
or parameter < minimum OR parameter > maximum if negated.
Note:
- Apply multiple filters with successive calls to ``.set_range_filter()``
- See :py:meth:`SwarmRequest.add_filter` for arbitrary filters.
Args:
parameter (str)
minimum (float or integer)
maximum (float or integer)
Examples:
``request.set_range_filter("Latitude", 0, 90)``
to set "Latitude >= 0 AND Latitude <= 90"
``request.set_range_filter("Latitude", 0, 90, negate=True)``
to set "(Latitude < 0 OR Latitude > 90)"
"""
if not isinstance(parameter, str):
raise TypeError("parameter must be a str")
def _generate_filters(minop, maxop):
if minimum is not None:
yield f"{parameter} {minop} {minimum}"
if maximum is not None:
yield f"{parameter} {maxop} {maximum}"
nargs = 2 - (minimum is None) - (maximum is None)
if nargs == 0:
return
filter_ = (
" AND ".join(_generate_filters(">=", "<="))
if not negate
else " OR ".join(_generate_filters("<", ">"))
)
if nargs > 1:
filter_ = f"({filter_})"
self.add_filter(filter_)
return self
[docs]
def set_choice_filter(self, parameter, *values, negate=False):
"""Set a choice filter to apply.
Filters data for *parameter in values*,
or *parameter not in values* if negated.
Note:
See :py:meth:`SwarmRequest.add_filter` for arbitrary filters.
Args:
parameter (str)
values (float or integer or string)
Examples:
``request.set_choice_filter("Flags_F", 0, 1)``
to set "(Flags_F == 0 OR Flags_F == 1)"
``request.set_choice_filter("Flags_F", 0, 1, negate=True)``
to set "(Flags_F != 0 AND Flags_F != 1)"
"""
if not isinstance(parameter, str):
raise TypeError("parameter must be a str")
def _generate_filters(compop):
for value in values:
yield f"{parameter} {compop} {value!r}"
nargs = len(values)
if nargs == 0:
return
filter_ = (
" OR ".join(_generate_filters("=="))
if not negate
else " AND ".join(_generate_filters("!="))
)
if nargs > 1:
filter_ = f"({filter_})"
self.add_filter(filter_)
return self
[docs]
def set_bitmask_filter(self, parameter, selection=0, mask=-1, negate=False):
"""Set a bitmask filter to apply.
Filters data for *parameter & mask == selection & mask*,
or *parameter & mask != selection & mask* if negated.
Note:
See :py:meth:`SwarmRequest.add_filter` for arbitrary filters.
Args:
parameter (str)
mask (integer)
selection (integer)
Examples:
``request.set_bitmask_filter("Flags_F", 0, 1)``
to set "Flags_F & 1 == 0" (i.e. bit 1 is set to 0)
"""
if not isinstance(parameter, str):
raise TypeError("parameter must be a str")
def _get_filter(compop):
return (
f"{parameter} & {mask} {compop} {selection & mask}"
if mask != -1
else f"{parameter} {compop} {selection}"
)
if not negate:
if mask != 0: # avoid pointless (0 == 0) filter
self.add_filter(_get_filter("=="))
else:
# mask == 0 leads to (0 != 0) filter and nothing is selected.
self.add_filter(_get_filter("!="))
return self
[docs]
def add_filter(self, filter_):
"""Add an arbitrary data filter.
Args:
filter_ (str): string defining the filter, as shown below
Filter grammar:
.. code-block:: text
filter: predicate
predicate:
variable == literal |
variable != literal |
variable < number |
variable > number |
variable <= number |
variable >= number |
variable & unsigned-integer == unsigned-integer |
variable & unsigned-integer != unsigned-integer |
(predicate AND predicate [AND predicate ...]) |
(predicate OR predicate [OR predicate ...]) |
NOT predicate
literal: boolean | integer | float | string
number: integer | float
variable: identifier | identifier[index]
index: integer[, integer ...]
Both single- and double quoted strings are allowed.
NaN values are matched by the ==/!= operators, i.e., the predicates
are internally converted to a proper "IS NaN" or "IS NOT NaN"
comparison.
Examples:
"Flags & 128 == 0"
Match records with Flag bit 7 set to 0.
"Elevation >= 15"
Match values with values greater than or equal to 15.
"(Label == "D" OR Label == "N" OR Label = "X")"
Match records with Label set to D, N or X.
"(Type != 1 AND Type != 34) NOT (Type == 1 OR Type == 34)"
Exclude records with Type set to 1 or 34.
"(Vector[2] <= -0.1 OR Vector[2] >= 0.5)"
Match records with Vector[2] values outside of the (-0.1, 0.5)
range.
"""
if not isinstance(filter_, str):
raise TypeError("parameter must be a str")
self._filterlist.append(filter_)
# Update the SwarmWPSInputs object
self._request_inputs.filters = " AND ".join(self._filterlist)
[docs]
def clear_filters(self):
"""Remove all applied filters."""
self._filterlist = []
self._request_inputs.filters = None
return self
clear_range_filter = clear_filters # alias for backward compatibility
[docs]
def applied_filters(self):
"""Print currently applied filters."""
for filter_ in self._filterlist:
print(filter_)
[docs]
def get_collection_info(self, collections):
"""Get information about a list of collections
Args:
collections (str | list[str]): Collection or list of collections
Returns:
list[dict]: A list of dictionaries containing information about each collection
Examples:
.. code-block:: python
from viresclient import SwarmRequest
request = SwarmRequest("https://vires.services/ows")
info = request.get_collection_info("SW_OPER_MAGA_LR_1B")
gives::
[{'name': 'SW_OPER_MAGA_LR_1B',
'productType': 'SW_MAGx_LR_1B',
'productCount': 3579,
'timeExtent': {'start': '2013-11-25T11:02:52Z',
'end': '2023-09-28T23:59:59Z'}}]
"""
if isinstance(collections, str):
collections = [collections]
if not isinstance(collections, list):
raise TypeError("collections must be a string or list")
templatefile = TEMPLATE_FILES["get_collection_info"]
template = JINJA2_ENVIRONMENT.get_template(templatefile)
request = template.render(
collections=",".join(collections),
response_type="application/json",
).encode("UTF-8")
response = self._get(request, asynchronous=False, show_progress=False)
response = json.loads(response.decode("UTF-8"))
return response
[docs]
def get_times_for_orbits(
self, start_orbit, end_orbit, mission="Swarm", spacecraft=None
):
"""Translate a pair of orbit numbers to a time interval.
Args:
start_orbit (int): a starting orbit number
end_orbit (int): a later orbit number
spacecraft (str):
Swarm: one of ('A','B','C') or ("Alpha", "Bravo", "Charlie")
GRACE: one of ('1','2')
GRACE-FO: one of ('1','2')
CryoSat-2: None
mission (str): one of ('Swarm', 'GRACE', 'GRACE-FO', 'CryoSat-2')
Returns:
tuple (datetime): (start_time, end_time) The start time of the
start_orbit and the ending time of the end_orbit.
(Based on ascending nodes of the orbits)
"""
# check old function signature and print warning
if (
isinstance(start_orbit, str)
and isinstance(mission, int)
and spacecraft is None
):
spacecraft, start_orbit, end_orbit = start_orbit, end_orbit, mission
mission = "Swarm"
warn(
"The order of SwarmRequest.get_times_for_orbits() method's "
"parameters has changed! "
"The backward compatibility will be removed in the future. "
"Please change your code to: "
"request.get_times_for_orbits(start_orbit, end_orbit, "
"'Swarm', spacecraft)",
FutureWarning,
)
start_orbit = int(start_orbit)
end_orbit = int(end_orbit)
# Change to spacecraft = "A" etc. for this request
spacecraft = self._fix_spacecraft(mission, spacecraft)
self._check_mission_spacecraft(mission, spacecraft)
templatefile = TEMPLATE_FILES["times_from_orbits"]
template = JINJA2_ENVIRONMENT.get_template(templatefile)
request = template.render(
mission=mission,
spacecraft=spacecraft,
start_orbit=start_orbit,
end_orbit=end_orbit,
).encode("UTF-8")
response = self._get(request, asynchronous=False, show_progress=False)
responsedict = json.loads(response.decode("UTF-8"))
start_time = parse_datetime(responsedict["start_time"])
end_time = parse_datetime(responsedict["end_time"])
return start_time, end_time
def _fix_spacecraft(self, mission, spacecraft):
# Change to spacecraft = "A" etc. for this request
spacecraft = str(spacecraft) if spacecraft is not None else None
if mission == "Swarm" and spacecraft in ("Alpha", "Bravo", "Charlie"):
spacecraft = spacecraft[0]
return spacecraft
def _check_mission_spacecraft(self, mission, spacecraft):
if mission not in self.MISSION_SPACECRAFTS:
raise ValueError(
f"Invalid mission {mission}!"
f"Allowed options are: {','.join(self.MISSION_SPACECRAFTS)}"
)
if self.MISSION_SPACECRAFTS[mission]:
# missions with required spacecraft id
if not spacecraft:
raise ValueError(
f"The {mission} spacecraft is required!"
f"Allowed options are: {','.join(self.MISSION_SPACECRAFTS[mission])}"
)
if spacecraft not in self.MISSION_SPACECRAFTS[mission]:
raise ValueError(
f"Invalid {mission} spacecraft! "
f"Allowed options are: {','.join(self.MISSION_SPACECRAFTS[mission])}"
)
elif spacecraft: # mission without spacecraft id
raise ValueError(
f"No {mission} spacecraft shall be specified! "
"Set spacecraft to None."
)
[docs]
def get_orbit_number(self, spacecraft, input_time, mission="Swarm"):
"""Translate a time to an orbit number.
Args:
spacecraft (str):
Swarm: one of ('A','B','C') or ("Alpha", "Bravo", "Charlie")
GRACE: one of ('1','2')
GRACE-FO: one of ('1','2')
CryoSat-2: None
input_time (datetime): a point in time
mission (str): one of ('Swarm', 'GRACE', 'GRACE-FO', 'CryoSat-2')
Returns:
int: The current orbit number at the input_time
"""
try:
input_time = parse_datetime(input_time)
except TypeError:
raise TypeError(
"input_time must be datetime object or ISO-8601 " "date/time string"
)
# Change to spacecraft = "A" etc. for this request
if spacecraft in ("Alpha", "Bravo", "Charlie"):
spacecraft = spacecraft[0]
if mission not in self.MISSION_SPACECRAFTS:
raise ValueError(
f"Invalid mission {mission}!"
f"Allowed options are: {','.join(self.MISSION_SPACECRAFTS)}"
)
spacecraft = str(spacecraft)
if mission == "Swarm":
collection = f"SW_OPER_MOD{spacecraft}_SC_1B"
elif mission == "GRACE":
if spacecraft in "12":
spacecraft = "AB"[int(spacecraft) - 1]
elif spacecraft not in "AB":
raise ValueError(f"Invalid spacecraft: {spacecraft}")
collection = f"GRACE_{spacecraft}_MAG"
elif mission == "GRACE-FO":
collection = f"GF{spacecraft}_OPER_FGM_ACAL_CORR"
elif mission == "CryoSat-2":
collection = "CS_OPER_MAG"
request_inputs = SwarmWPSInputs(
collection_ids={collection: [collection]},
begin_time=input_time,
end_time=input_time + datetime.timedelta(seconds=1),
variables=["OrbitNumber"],
response_type="text/csv",
)
request = request_inputs.as_xml(self._templatefiles["sync"])
retdata = ReturnedDataFile(filetype="csv")
response_handler = self._response_handler(retdata, show_progress=False)
self._get(
request,
asynchronous=False,
response_handler=response_handler,
show_progress=False,
)
df = retdata.as_dataframe()
if len(df) == 0:
raise ValueError(
"Orbit number not identified. Probably outside of mission duration or orbit counter file."
)
elif len(df) > 1:
raise RuntimeError("Unexpected server response. More than one OrbitNumber.")
else:
return df["OrbitNumber"][0]
[docs]
def get_model_info(self, models=None, custom_model=None, original_response=False):
"""Get model info from server.
Handles the same models input as .set_products(), and returns a dict
like:
{'IGRF12': {
'expression': 'IGRF12(max_degree=13,min_degree=0)',
'validity': {'start': '1900-01-01T00:00:00Z', 'end': '2020-01-01T00:00:00Z'
}, ...}
If original_response=True, return the list of dicts like:
{'expression': 'MCO_SHA_2C(max_degree=16,min_degree=0)',
'name': 'MCO_SHA_2C',
'validity': {'start': '2013-11-30T14:38:24Z',
'end': '2018-01-01T00:00:00Z'}}, ...
Args:
models (list/dict): as with set_products
custom_model (str): as with set_products
original_response (bool)
Returns:
dict or list
"""
def _request_get_model_info(model_expression=None, custom_shc=None):
"""Make the get_model_info request."""
templatefile = TEMPLATE_FILES["model_info"]
template = JINJA2_ENVIRONMENT.get_template(templatefile)
request = template.render(
model_expression=model_expression,
custom_shc=custom_shc,
response_type="application/json",
).encode("UTF-8")
response = self._get(request, asynchronous=False, show_progress=False)
response_list = json.loads(response.decode("UTF-8"))
return response_list
def _build_dict(response_list):
"""Build dictionary output organised by model name."""
return {model_dict.pop("name"): model_dict for model_dict in response_list}
if custom_model:
with open(custom_model) as custom_shc_file:
custom_shc = custom_shc_file.read()
if not models:
models = ["Custom_Model"]
else:
custom_shc = None
if models is not None:
_, model_expression = self._parse_models_input(models)
else:
model_expression = None
response = _request_get_model_info(model_expression, custom_shc)
if not original_response:
response = _build_dict(response)
return response
@staticmethod
def _check_deprecated_models(models):
"""Print deprecation warning for deprecated models."""
deprecated_models = []
for deprecated_model in DEPRECATED_MODELS:
for model in models:
if deprecated_model in model:
deprecated_models.append(deprecated_model)
break
for deprecated_model in deprecated_models:
print(
"WARNING: Model {} is deprecated. {}".format(
deprecated_model, DEPRECATED_MODELS[deprecated_model]
),
file=sys.stdout,
)
[docs]
def get_conjunctions(
self,
start_time=None,
end_time=None,
threshold=1.0,
spacecraft1="A",
spacecraft2="B",
mission1="Swarm",
mission2="Swarm",
grade="OPER",
):
"""Get times of the spacecraft conjunctions.
Currently available for the following spacecraft pairs:
- Swarm-A/Swarm-B
Args:
start_time (datetime / ISO_8601 string): optional start time
end_time (datetime / ISO_8601 string): optional end time
threshold (float): optional maximum allowed angular separation
in degrees; by default set to 1; allowed values are [0, 180]
spacecraft1: identifier of the first spacecraft, default to 'A'
spacecraft2: identifier of the second spacecraft, default to 'B'
mission1 (str): mission of the first spacecraft, defaults to 'Swarm'
mission2 (str): mission of the first spacecraft, defaults to 'Swarm'
grade (str): products grade, possible values "OPER" or "FAST"
Returns:
ReturnedData:
"""
try:
start_time = parse_datetime(start_time) if start_time else None
end_time = parse_datetime(end_time) if end_time else None
except TypeError:
raise TypeError(
"start_time and end_time must be datetime objects or ISO-8601 "
"date/time strings"
) from None
if not (0 <= threshold <= 180):
raise ValueError("Invalid threshold value!")
spacecraft1 = self._fix_spacecraft(mission1, spacecraft1)
spacecraft2 = self._fix_spacecraft(mission2, spacecraft2)
self._check_mission_spacecraft(mission1, spacecraft1)
self._check_mission_spacecraft(mission2, spacecraft2)
if (mission1, spacecraft1) == (mission2, spacecraft2):
raise ValueError("The first and second spacecraft must not be the same!")
spacecraft_pair = tuple(
sorted([(mission1, spacecraft1), (mission2, spacecraft2)])
)
if spacecraft_pair not in self.CONJUNCTION_MISSION_SPACECRAFT_PAIRS:
raise ValueError(
"Conjunctions not available for the requested "
"spacecraft pair {spacecraft_pair}!"
)
templatefile = TEMPLATE_FILES["get_conjunctions"]
template = JINJA2_ENVIRONMENT.get_template(templatefile)
request = template.render(
begin_time=start_time,
end_time=end_time,
spacecraft1=spacecraft1,
spacecraft2=spacecraft2,
mission1=mission1,
mission2=mission2,
grade=(grade if grade and grade != "OPER" else None),
threshold=threshold,
).encode("UTF-8")
show_progress = False
leave_progress_bar = False
response = ReturnedDataFile(filetype="cdf")
response_handler = self._response_handler(
retdatafile=response,
show_progress=show_progress,
leave_progress_bar=leave_progress_bar,
)
self._get(
request=request,
asynchronous=False,
show_progress=show_progress,
leave_progress_bar=leave_progress_bar,
response_handler=response_handler,
)
return response
[docs]
def eval_model(
self,
models,
time,
latitude,
longitude,
radius,
time_precision="ns",
show_progress=True,
temp_dir=".",
input_prefix="_model_eval_input_",
output_prefix="_model_eval_output_",
):
"""Evaluate models for the given times and locations.
Args:
models (list(str)/dict): from .available_models() or defineable with custom expressions
time (datetime64) array of times
latitude (float64) array of geocentric latitudes (deg)
longitude (float64) array of geocentric longitudes (deg)
radius (float64) array of radii (m)
time_precision (str) optional time precision: ns* | us | ms | s
show_progress (bool) show download progress True
Returns:
dictionary of arrays with the model values
"""
# FIXME show download progress
def _write_hdf5_file(filename, data):
with h5py.File(filename, "w") as hdf:
for key, array in data.items():
options = (
{}
if array.ndim == 0
else {
"compression": "gzip",
"compression_opts": 9,
}
)
hdf.create_dataset(key, data=array, **options)
def _read_hdf5_file(filename):
with h5py.File(filename, "r") as hdf:
data = {key: hdf[key][...] for key in hdf}
sources = hdf.attrs["sources"].tolist()
if "Timestamp" in data:
data["Timestamp"] = data["Timestamp"].astype(time_type)
return data, sources
def _response_handler(filename, chunksize=1024 * 1024):
def _handler(file_obj):
# save received received HDF5 file
with open(filename, "wb") as file:
shutil.copyfileobj(file_obj, file, chunksize)
# read results from the HDF5 file
return _read_hdf5_file(filename)
return _handler
# FIXME: temp. file handling
request_id = uuid.uuid4()
input_filename = os.path.join(temp_dir, f"{input_prefix}{request_id}.hdf5")
output_filename = os.path.join(temp_dir, f"{output_prefix}{request_id}.hdf5")
_, model_expression_string = self._parse_models_input(models)
time_type = f"datetime64[{time_precision}]"
time = asarray(time, time_type)
latitude = asarray(latitude, "float64")
longitude = asarray(longitude, "float64")
radius = asarray(radius, "float64")
# the XML request and binary data are sent as multipart/related request
# see https://en.wikipedia.org/wiki/MIME#Multipart_messages
multipart_boundary = "part-delimiter"
# build XML request
templatefile = TEMPLATE_FILES["eval_model_mp"]
template = JINJA2_ENVIRONMENT.get_template(templatefile)
request = template.render(
model_expression=model_expression_string,
input_content_id=request_id,
input_time_format=time_type,
input_mime_type="application/x-hdf5",
output_time_format=time_type,
output_mime_type="application/x-hdf5",
).encode("UTF-8")
try:
# write input HDF5 file
_write_hdf5_file(
input_filename,
{
"Timestamp": time.astype("int64"),
"Latitude": latitude,
"Longitude": longitude,
"Radius": radius,
},
)
# streaming request from the input HDF5 file
with open(input_filename, "rb") as input_file:
parts = [
(
request,
{
"Content-Type": "application/xml; charset=utf-8",
},
),
(
input_file,
{
"Content-Id": request_id,
"Content-Type": "application/x-hdf5",
},
),
]
# Due to the Django limitations we must aggregate the request
# chunks in one block.
# payload_size = get_multipart_request_size(parts, multipart_boundary)
# payload = generate_multipart_request(parts, multipart_boundary)
payload = b"".join(
generate_multipart_request(parts, multipart_boundary)
)
result, sources = self._get(
payload,
response_handler=_response_handler(output_filename),
asynchronous=False,
show_progress=show_progress,
content_type=(f"multipart/related; boundary={multipart_boundary}"),
headers={
"MIME-Version": "1.0",
# "Content-Length": payload_size,
},
)
finally:
for filename in [input_filename, output_filename]:
if os.path.exists(filename):
os.remove(filename)
return result, sources
[docs]
def eval_model_for_cdf_file(
self,
models,
input_cdf_filename,
output_cdf_filename,
show_progress=True,
):
"""Evaluate models for the coordinates given in a Swarm-like CDF file.
Args:
models (list(str)/dict): from .available_models() or defineable with custom expressions
input_cdf_filename (str): input CDF file.
output_cdf_filename (str): output CDF file.
show_progress (bool): show download progress True
Returns:
copy of output_cdf_filename
"""
# FIXME show download progress
def _response_handler(filename, chunksize=1024 * 1024):
def _handler(file_obj):
# save received received file
with open(filename, "wb") as file:
shutil.copyfileobj(file_obj, file, chunksize)
return filename
return _handler
request_id = uuid.uuid4()
_, model_expression_string = self._parse_models_input(models)
# the XML request and binary data are sent as multipart/related request
# see https://en.wikipedia.org/wiki/MIME#Multipart_messages
multipart_boundary = "part-delimiter"
# build XML request
templatefile = TEMPLATE_FILES["eval_model_mp"]
template = JINJA2_ENVIRONMENT.get_template(templatefile)
request = template.render(
model_expression=model_expression_string,
input_content_id=request_id,
input_time_format="format specific default",
input_mime_type="application/x-cdf",
output_time_format="input time format",
output_mime_type="application/x-cdf",
).encode("UTF-8")
temp_cdf_filename = os.path.join(
os.path.dirname(output_cdf_filename),
f".{os.path.basename(output_cdf_filename)}.tmp.cdf",
)
if os.path.exists(temp_cdf_filename):
os.remove(temp_cdf_filename)
try:
# streaming request from the input HDF5 file
with open(input_cdf_filename, "rb") as input_file:
parts = [
(
request,
{
"Content-Type": "application/xml; charset=utf-8",
},
),
(
input_file,
{
"Content-Id": request_id,
"Content-Type": "application/x-cdf",
},
),
]
# Due to the Django limitations we must aggregate the request
# chunks in one block.
# payload_size = get_multipart_request_size(parts, multipart_boundary)
# payload = generate_multipart_request(parts, multipart_boundary)
payload = b"".join(
generate_multipart_request(parts, multipart_boundary)
)
self._get(
payload,
response_handler=_response_handler(temp_cdf_filename),
asynchronous=False,
show_progress=show_progress,
content_type=(f"multipart/related; boundary={multipart_boundary}"),
headers={
"MIME-Version": "1.0",
# "Content-Length": payload_size,
},
)
os.rename(temp_cdf_filename, output_cdf_filename)
finally:
if os.path.exists(temp_cdf_filename):
os.remove(temp_cdf_filename)
return output_cdf_filename