import datetime
import json
import os
import sys
from collections import OrderedDict
from io import StringIO
from textwrap import dedent
from warnings import warn

from pandas import read_csv
from tqdm import tqdm

from ._client import TEMPLATE_FILES, ClientRequest, WPSInputs
from ._data import CONFIG_SWARM
from ._data_handling import ReturnedDataFile
from ._wps.environment import JINJA2_ENVIRONMENT
from ._wps.time_util import parse_datetime

    "sync": "vires_fetch_filtered_data.xml",
    "async": "vires_fetch_filtered_data_async.xml",
    "model_info": "vires_get_model_info.xml",
    "times_from_orbits": "vires_times_from_orbits.xml",
    "get_observatories": "vires_get_observatories.xml",
    "get_conjunctions": "vires_get_conjunctions.xml",

    "General Swarm": (
        " Swarm Data Handbook, ",
        " The Swarm Satellite Constellation Application and Research Facility (SCARF) and Swarm data products, ",
        " Swarm Science Data Processing and Products (2013), ",
        " Special issue “Swarm science results after 2 years in space (2016), ",
        " Earth's Magnetic Field: Understanding Geomagnetic Sources from the Earth's Interior and its Environment (2017), ",

    "IGRF": (
        " International Geomagnetic Reference Field: the thirteenth generation, ( ",
        " ",
    "IGRF12": (
        " International Geomagnetic Reference Field: the 12th generation, ",
        " "
        " deprecated model identifier, use IGRF instead",
    "CHAOS-Core": (
        "CHAOS-7 Core field (SH degrees 1-20)",
        " ",
    "CHAOS-Static": (
        "CHAOS-7 crust field (SH degrees 21-185)",
        " ",
    "CHAOS-MMA-Primary": (
        "CHAOS-7 Primary (external) magnetospheric field",
        " h ",
    "CHAOS-MMA-Secondary": (
        "CHAOS-7 Secondary (internal) magnetospheric field",
        " ",
    "CHAOS-6-Core": (
        "CHAOS Core field",
        " deprecated model identifier, use CHAOS-Core instead",
    "CHAOS-6-Static": (
        "CHAOS crust field",
        " deprecated model identifier, use CHAOS-Static instead",
    "CHAOS-6-MMA-Primary": (
        "CHAOS Primary (external) magnetospheric field",
        " deprecated model identifier, use CHAOS-MMA-Primary instead",
    "CHAOS-6-MMA-Secondary": (
        "CHAOS-Secondary (internal) magnetospheric field",
        " deprecated model identifier, use CHAOS-MMA-Secondary instead",
    "MF7": (
        "MF7 crustal field model, derived from CHAMP satellite observations",
    "LCS-1": (
        "The LCS-1 high-resolution lithospheric field model, derived from CHAMP and Swarm satellite observations",
    "MCO_SHA_2C": (
        "[Comprehensive Inversion]: Core field of CIY4",
        " A comprehensive model of Earth’s magnetic field determined from 4 years of Swarm satellite observations, ",
        "Validation: ",
    "MCO_SHA_2D": (
        "[Dedicated Chain]: Core field",
        "An algorithm for deriving core magnetic field models from the Swarm data set, ",
        "Validation: ",
    "MLI_SHA_2C": (
        "[Comprehensive Inversion]: Lithospheric field of CIY4",
    "MLI_SHA_2D": (
        "[Dedicated Chain]: Lithospheric field",
        " Swarm SCARF Dedicated Lithospheric Field Inversion chain, ",
        " Validation: ",
    "MLI_SHA_2E": (
        "[Extended dedicated chain]: Lithospheric field",
        " Joint inversion of Swarm, CHAMP, and WDMAM data ",
        " ",
    "MMA_SHA_2C-Primary": (
        "[Comprehensive Inversion]: Primary (external) magnetospheric field of CIY4",
    "MMA_SHA_2C-Secondary": (
        "[Comprehensive Inversion]: Secondary (internal/induced) magnetospheric field of CIY4",
    "MMA_SHA_2F-Primary": (
        "[Fast-Track Product]: Primary (external) magnetospheric field",
        " Rapid modelling of the large-scale magnetospheric field from Swarm satellite data, ",
    "MMA_SHA_2F-Secondary": (
        "[Fast-Track Product]: Secondary (internal/induced) magnetospheric field",
    "MIO_SHA_2C-Primary": (
        "[Comprehensive Inversion]: Primary (external) ionospheric field of CIY4",
        "Validation: ",
    "MIO_SHA_2C-Secondary": (
        "[Comprehensive Inversion]: Secondary (external/induced) ionospheric field of CIY4",
    "MIO_SHA_2D-Primary": (
        "[Dedicated Chain]: Primary (external) ionospheric field, DIFI",
        " Swarm SCARF dedicated ionospheric field inversion chain, ",
        " First results from the Swarm Dedicated Ionospheric Field Inversion chain, ",
        " ",
        "Validation: ",
    "MIO_SHA_2D-Secondary": (
        "[Dedicated Chain]: Secondary (external/induced) ionospheric field, DIFI",
    "AMPS": ("AMPS - associated magnetic field,",),
    "MCO_SHA_2X": ("Alias for 'CHAOS-Core'",),
    "CHAOS": (
        "Alias for 'CHAOS-Core' + 'CHAOS-Static' + 'CHAOS-MMA-Primary' + 'CHAOS-MMA-Secondary'",
    "CHAOS-MMA": ("Alias for 'CHAOS-MMA-Primary' + 'CHAOS-MMA-Secondary'",),
    "MMA_SHA_2C": ("Alias for 'MMA_SHA_2C-Primary' + 'MMA_SHA_2C-Secondary'",),
    "MMA_SHA_2F": ("Alias for 'MMA_SHA_2F-Primary' + 'MMA_SHA_2F-Secondary'",),
    "MIO_SHA_2C": ("Alias for 'MIO_SHA_2C-Primary' + 'MIO_SHA_2C-Secondary'",),
    "MIO_SHA_2D": ("Alias for 'MIO_SHA_2D-Primary' + 'MIO_SHA_2D-Secondary'",),
    "SwarmCI": (
        "Alias for 'MCO_SHA_2C' + 'MLI_SHA_2C' + 'MIO_SHA_2C-Primary' + 'MIO_SHA_2C-Secondary' + 'MMA_SHA_2C-Primary' + 'MMA_SHA_2C-Secondary'",

    "IGRF12": "Use IGRF instead.",
    "CHAOS-6-Core": "Use CHAOS-Core instead.",
    "CHAOS-6-Static": "Use CHAOS-Static instead.",
    "CHAOS-6-MMA-Primary": "Use CHAOS-MMA-Primary instead.",
    "CHAOS-6-MMA-Secondary": "Use CHAOS-MMA-Secondary instead.",

    "MAG": (
        " ",
    "MAG_HR": (
        " ",
    "EFI": (
        " ",
    "IBI": (
        " ",
        " ",
    "TEC": (
        " ",
        " ",
    "FAC": (
        " ",
        " ",
        " ",
        " ",
    "EEF": (
        " ",
        " ",
    "IPD": (
        " ",
    "AUX_OBSH": ("",),
    "AUX_OBSM": ("",),
    "AUX_OBSS": ("",),
    "VOBS_SW_1M": ("",),
    "AEJ_LPL": ("",),
    "AEJ_LPS": ("",),
    "AEJ_PBL": ("",),
    "AEJ_PBS": ("",),
    "AOB_FAC": ("",),
    "MIT_LP": (
    "MIT_TEC": (
    "PPI_FAC": (
    "MAG_CS": ("",),
    "MAG_GRACE": ("",),
    "MAG_GFO": ("",),
    "EFI_IDM": (
    "MAG_GOCE": ("",),
    "MAG_GOCE_ML": ("",),
    "EFI_TIE": (
    "EFI_TCT02": (
    "EFI_TCT16": (
for mission in ("SW", "OR", "CH", "CR", "CO"):
    for cadence in ("1M", "4M"):
        COLLECTION_REFERENCES[f"VOBS_{mission}_{cadence}"] = (

    "AUX_OBSH": "",
    "AUX_OBSM": "",
    "AUX_OBSS": "",



class SwarmWPSInputs(WPSInputs):
    """Holds the set of inputs to be passed to the request template for Swarm"""

    NAMES = [

    def __init__(
        # Set up default values
        # Obligatory - these must be replaced before the request is made
        self.collection_ids = None if collection_ids is None else collection_ids
        self.begin_time = None if begin_time is None else begin_time
        self.end_time = None if end_time is None else end_time
        self.response_type = None if response_type is None else response_type
        # Optional - these defaults will be used if not replaced before the
        #            request is made
        self.model_expression = "" if model_expression is None else model_expression
        self.variables = [] if variables is None else variables
        self.filters = None if filters is None else filters
        self.sampling_step = None if sampling_step is None else sampling_step
        self.custom_shc = None if custom_shc is None else custom_shc
        self.ignore_cached_models = ignore_cached_models

    def collection_ids(self):
        return self._collection_ids

    def collection_ids(self, collection_ids):
        if isinstance(collection_ids, dict) or collection_ids is None:
            self._collection_ids = collection_ids
            raise TypeError("collection_ids must be a dict")

    def _spacecraft_from_collection(collection):
        """Identify spacecraft (or ground observatory name) from collection name."""
        if "AUX_OBS" in collection:
            name = "AUX_OBS"
            if ":" in collection:
                name = f"{name}:{collection[19:22]}"
        elif collection[:3] == "SW_":
            # 12th character in name, e.g. SW_OPER_MAGx_LR_1B
            sc = collection[11]
            sc_to_name = {"A": "Alpha", "B": "Bravo", "C": "Charlie"}
            name = sc_to_name.get(sc, "NSC")
            name = collection
        return name

    def set_collections(self, collections):
        """Restructure given list of collections as dict required by VirES."""
        # Build the output dictionary in the form:
        #  {"Alpha": ["SW..A..", "SW..A.."], "Bravo": ["SW..B.."], "NSC": [..]}
        if isinstance(collections, list):
            collection_dict = {}
            for collection in collections:
                tag = self._spacecraft_from_collection(collection)
                if tag in collection_dict.keys():
                    collection_dict[tag] = [collection]
            self.collection_ids = collection_dict
            raise TypeError("collections must be a list")

    def model_expression(self):
        return self._model_expression

    def model_expression(self, model_expression):
        if isinstance(model_expression, str):
            self._model_expression = model_expression
            raise TypeError("model_expression must be a string")

    def ignore_cached_models(self):
        return self._ignore_cached_models

    def ignore_cached_models(self, value):
        if isinstance(value, bool):
            self._ignore_cached_models = value
            raise TypeError

    def begin_time(self):
        return self._begin_time

    def begin_time(self, begin_time):
        if isinstance(begin_time, datetime.datetime) or begin_time is None:
            self._begin_time = begin_time
            raise TypeError

    def end_time(self):
        return self._end_time

    def end_time(self, end_time):
        if isinstance(end_time, datetime.datetime) or end_time is None:
            self._end_time = end_time
            raise TypeError

    def variables(self):
        return self._variables

    def variables(self, variables):
        if isinstance(variables, list):
            self._variables = variables
            raise TypeError

    def filters(self):
        return self._filters

    def filters(self, filters):
        if isinstance(filters, str) or filters is None:
            self._filters = filters
            raise TypeError

    def sampling_step(self):
        return self._sampling_step

    def sampling_step(self, sampling_step):
        if isinstance(sampling_step, str) or sampling_step is None:
            self._sampling_step = sampling_step
            raise TypeError

    def response_type(self):
        return self._response_type

    def response_type(self, response_type):
        if isinstance(response_type, str) or response_type is None:
            self._response_type = response_type
            raise TypeError

    def custom_shc(self):
        return self._custom_shc

    def custom_shc(self, custom_shc):
        if isinstance(custom_shc, str) or custom_shc is None:
            self._custom_shc = custom_shc
            raise TypeError

[docs]class SwarmRequest(ClientRequest): """Handles the requests to and downloads from the server. Examples: Retrieve data:: from viresclient import SwarmRequest # Set up connection with server request = SwarmRequest("") # Set collection to use request.set_collection("SW_OPER_MAGA_LR_1B") # Set mix of products to fetch: # measurements (variables from the given collection) # models (magnetic model predictions at spacecraft sampling points) # auxiliaries (variables available with any collection) request.set_products( measurements=["F", "B_NEC"], models=["CHAOS-Core"], auxiliaries=["QDLat", "QDLon"], sampling_step="PT10S" ) # Fetch data from a given time interval data = request.get_between( start_time="2014-01-01T00:00", end_time="2014-01-01T01:00" ) # Load the data as an xarray.Dataset ds = data.as_xarray() Check what data are available:: request.available_collections(details=False) request.available_measurements("MAG") request.available_auxiliaries() request.available_models(details=False) Args: url (str): token (str): config (str or ClientConfig): logging_level (str): """ MISSION_SPACECRAFTS = { "Swarm": ["A", "B", "C"], "GRACE": ["1", "2"], "GRACE-FO": ["1", "2"], "CryoSat-2": None, "GOCE": None, } CONJUNCTION_MISSION_SPACECRAFT_PAIRS = { (("Swarm", "A"), ("Swarm", "B")), } COLLECTIONS = { "MAG": [f"SW_OPER_MAG{x}_LR_1B" for x in "ABC"], "MAG_HR": [f"SW_OPER_MAG{x}_HR_1B" for x in "ABC"], "EFI": [f"SW_OPER_EFI{x}_LP_1B" for x in "ABC"], "EFI_IDM": [f"SW_PREL_EFI{x}IDM_2_" for x in "ABC"], "EFI_TIE": [f"SW_OPER_EFI{x}TIE_2_" for x in "ABC"], "EFI_TCT02": [f"SW_EXPT_EFI{x}_TCT02" for x in "ABC"], "EFI_TCT16": [f"SW_EXPT_EFI{x}_TCT16" for x in "ABC"], "IBI": [f"SW_OPER_IBI{x}TMS_2F" for x in "ABC"], "TEC": [f"SW_OPER_TEC{x}TMS_2F" for x in "ABC"], "FAC": [f"SW_OPER_FAC{x}TMS_2F" for x in "ABC_"], "EEF": [f"SW_OPER_EEF{x}TMS_2F" for x in "ABC"], "IPD": [f"SW_OPER_IPD{x}IRR_2F" for x in "ABC"], "AEJ_LPL": [f"SW_OPER_AEJ{x}LPL_2F" for x in "ABC"], "AEJ_LPL:Quality": [f"SW_OPER_AEJ{x}LPL_2F:Quality" for x in "ABC"], "AEJ_LPS": [f"SW_OPER_AEJ{x}LPS_2F" for x in "ABC"], "AEJ_LPS:Quality": [f"SW_OPER_AEJ{x}LPS_2F:Quality" for x in "ABC"], "AEJ_PBL": [f"SW_OPER_AEJ{x}PBL_2F" for x in "ABC"], "AEJ_PBS": [f"SW_OPER_AEJ{x}PBS_2F" for x in "ABC"], "AEJ_PBS:GroundMagneticDisturbance": [ f"SW_OPER_AEJ{x}PBS_2F:GroundMagneticDisturbance" for x in "ABC" ], "AOB_FAC": [f"SW_OPER_AOB{x}FAC_2F" for x in "ABC"], "AUX_OBSH": [ "SW_OPER_AUX_OBSH2_", *[f"SW_OPER_AUX_OBSH2_:{code}" for code in IAGA_CODES], ], "AUX_OBSM": [ "SW_OPER_AUX_OBSM2_", *[f"SW_OPER_AUX_OBSM2_:{code}" for code in IAGA_CODES], ], "AUX_OBSS": [ "SW_OPER_AUX_OBSS2_", *[f"SW_OPER_AUX_OBSS2_:{code}" for code in IAGA_CODES], ], "VOBS_SW_1M": [ "SW_OPER_VOBS_1M_2_", *[f"SW_OPER_VOBS_1M_2_:{site}" for site in VOBS_SITES], ], "VOBS_SW_4M": [ "SW_OPER_VOBS_4M_2_", *[f"SW_OPER_VOBS_4M_2_:{site}" for site in VOBS_SITES], ], "VOBS_CH_1M": [ "CH_OPER_VOBS_1M_2_", *[f"CH_OPER_VOBS_1M_2_:{site}" for site in VOBS_SITES], ], "VOBS_CR_1M": [ "CR_OPER_VOBS_1M_2_", *[f"CR_OPER_VOBS_1M_2_:{site}" for site in VOBS_SITES], ], "VOBS_OR_1M": [ "OR_OPER_VOBS_1M_2_", *[f"OR_OPER_VOBS_1M_2_:{site}" for site in VOBS_SITES], ], "VOBS_CO_1M": [ "CO_OPER_VOBS_1M_2_", *[f"CO_OPER_VOBS_1M_2_:{site}" for site in VOBS_SITES], ], "VOBS_OR_4M": [ "OR_OPER_VOBS_4M_2_", *[f"OR_OPER_VOBS_4M_2_:{site}" for site in VOBS_SITES], ], "VOBS_CH_4M": [ "CH_OPER_VOBS_4M_2_", *[f"CH_OPER_VOBS_4M_2_:{site}" for site in VOBS_SITES], ], "VOBS_CR_4M": [ "CR_OPER_VOBS_4M_2_", *[f"CR_OPER_VOBS_4M_2_:{site}" for site in VOBS_SITES], ], "VOBS_CO_4M": [ "CO_OPER_VOBS_4M_2_", *[f"CO_OPER_VOBS_4M_2_:{site}" for site in VOBS_SITES], ], "VOBS_SW_1M:SecularVariation": [ "SW_OPER_VOBS_1M_2_:SecularVariation", *[f"SW_OPER_VOBS_1M_2_:SecularVariation:{site}" for site in VOBS_SITES], ], "VOBS_SW_4M:SecularVariation": [ "SW_OPER_VOBS_4M_2_:SecularVariation", *[f"SW_OPER_VOBS_4M_2_:SecularVariation:{site}" for site in VOBS_SITES], ], "VOBS_CH_1M:SecularVariation": [ "CH_OPER_VOBS_1M_2_:SecularVariation", *[f"CH_OPER_VOBS_1M_2_:SecularVariation:{site}" for site in VOBS_SITES], ], "VOBS_CR_1M:SecularVariation": [ "CR_OPER_VOBS_1M_2_:SecularVariation", *[f"CR_OPER_VOBS_1M_2_:SecularVariation:{site}" for site in VOBS_SITES], ], "VOBS_OR_1M:SecularVariation": [ "OR_OPER_VOBS_1M_2_:SecularVariation", *[f"OR_OPER_VOBS_1M_2_:SecularVariation:{site}" for site in VOBS_SITES], ], "VOBS_CO_1M:SecularVariation": [ "CO_OPER_VOBS_1M_2_:SecularVariation", *[f"CO_OPER_VOBS_1M_2_:SecularVariation:{site}" for site in VOBS_SITES], ], "VOBS_OR_4M:SecularVariation": [ "OR_OPER_VOBS_4M_2_:SecularVariation", *[f"OR_OPER_VOBS_4M_2_:SecularVariation:{site}" for site in VOBS_SITES], ], "VOBS_CH_4M:SecularVariation": [ "CH_OPER_VOBS_4M_2_:SecularVariation", *[f"CH_OPER_VOBS_4M_2_:SecularVariation:{site}" for site in VOBS_SITES], ], "VOBS_CR_4M:SecularVariation": [ "CR_OPER_VOBS_4M_2_:SecularVariation", *[f"CR_OPER_VOBS_4M_2_:SecularVariation:{site}" for site in VOBS_SITES], ], "VOBS_CO_4M:SecularVariation": [ "CO_OPER_VOBS_4M_2_:SecularVariation", *[f"CO_OPER_VOBS_4M_2_:SecularVariation:{site}" for site in VOBS_SITES], ], "MIT_LP": [f"SW_OPER_MIT{x}_LP_2F" for x in "ABC"], "MIT_LP:ID": [f"SW_OPER_MIT{x}_LP_2F:ID" for x in "ABC"], "MIT_TEC": [f"SW_OPER_MIT{x}TEC_2F" for x in "ABC"], "MIT_TEC:ID": [f"SW_OPER_MIT{x}TEC_2F:ID" for x in "ABC"], "PPI_FAC": [f"SW_OPER_PPI{x}FAC_2F" for x in "ABC"], "PPI_FAC:ID": [f"SW_OPER_PPI{x}FAC_2F:ID" for x in "ABC"], # Multi-mission magnetic products "MAG_CS": [ "CS_OPER_MAG", ], "MAG_GRACE": ["GRACE_A_MAG", "GRACE_B_MAG"], "MAG_GFO": ["GF1_OPER_FGM_ACAL_CORR", "GF2_OPER_FGM_ACAL_CORR"], "MAG_GOCE": ["GO_MAG_ACAL_CORR"], "MAG_GOCE_ML": ["GO_MAG_ACAL_CORR_ML"], # Swarm spacecraft positions "MOD_SC": [f"SW_OPER_MOD{x}_SC_1B" for x in "ABC"], } OBS_COLLECTIONS = [ "SW_OPER_AUX_OBSH2_", "SW_OPER_AUX_OBSM2_", "SW_OPER_AUX_OBSS2_", "SW_OPER_VOBS_1M_2_", "SW_OPER_VOBS_4M_2_", "CH_OPER_VOBS_1M_2_", "CR_OPER_VOBS_1M_2_", "OR_OPER_VOBS_1M_2_" "CO_OPER_VOBS_1M_2_", "OR_OPER_VOBS_4M_2_", "CH_OPER_VOBS_4M_2_", "CR_OPER_VOBS_4M_2_", "CO_OPER_VOBS_4M_2_", "SW_OPER_VOBS_1M_2_:SecularVariation", "SW_OPER_VOBS_4M_2_:SecularVariation", "CH_OPER_VOBS_1M_2_:SecularVariation", "CR_OPER_VOBS_1M_2_:SecularVariation", "OR_OPER_VOBS_1M_2_:SecularVariation", "CO_OPER_VOBS_1M_2_:SecularVariation", "OR_OPER_VOBS_4M_2_:SecularVariation", "CH_OPER_VOBS_4M_2_:SecularVariation", "CR_OPER_VOBS_4M_2_:SecularVariation", "CO_OPER_VOBS_4M_2_:SecularVariation", ] # These are not necessarily real sampling steps, but are good enough to use # for splitting long requests into chunks COLLECTION_SAMPLING_STEPS = { "MAG": "PT1S", "MAG_HR": "PT0.019S", # approx 50Hz (the sampling is not exactly 50Hz) "EFI": "PT0.5S", "EFI_IDM": "PT0.5S", "EFI_TIE": "PT0.5S", "EFI_TCT02": "PT0.5S", "EFI_TCT16": "PT0.0625S", "IBI": "PT1S", "TEC": "PT1S", # Actually more complicated "FAC": "PT1S", "EEF": "PT90M", "IPD": "PT1S", "AEJ_LPL": "PT15.6S", "AEJ_LPS": "PT1S", "AUX_OBSH": "PT60M", "AUX_OBSM": "PT60S", "AUX_OBSS": "PT1S", "VOBS_SW_1M": "P31D", "VOBS_CH_1M": "P31D", "VOBS_CR_1M": "P31D", "VOBS_OR_1M": "P31D", "VOBS_CO_1M": "P31D", "VOBS_OR_4M": "P122D", "VOBS_SW_4M": "P122D", "VOBS_CH_4M": "P122D", "VOBS_CR_4M": "P122D", "VOBS_CO_4M": "P122D", "VOBS_SW_1M:SecularVariation": "P31D", "VOBS_CH_1M:SecularVariation": "P31D", "VOBS_CR_1M:SecularVariation": "P31D", "VOBS_OR_1M:SecularVariation": "P31D", "VOBS_CO_1M:SecularVariation": "P31D", "VOBS_OR_4M:SecularVariation": "P122D", "VOBS_SW_4M:SecularVariation": "P122D", "VOBS_CH_4M:SecularVariation": "P122D", "VOBS_CR_4M:SecularVariation": "P122D", "VOBS_CO_4M:SecularVariation": "P122D", "MIT_LP": "PT20M", "MIT_LP:ID": "PT20M", "MIT_TEC": "PT20M", "MIT_TEC:ID": "PT20M", "PPI_FAC": "PT20M", "PPI_FAC:ID": "PT20M", } PRODUCT_VARIABLES = { "MAG": [ "F", "dF_Sun", "dF_AOCS", "dF_other", "F_error", "B_VFM", "B_NEC", "dB_Sun", "dB_AOCS", "dB_other", "B_error", "q_NEC_CRF", "Att_error", "Flags_F", "Flags_B", "Flags_q", "Flags_Platform", "ASM_Freq_Dev", ], "MAG_HR": [ # NOTE: F is calculated on the fly from B_NEC (F = |B_NEC|) "F", "B_VFM", "B_NEC", "dB_Sun", "dB_AOCS", "dB_other", "B_error", "q_NEC_CRF", "Att_error", "Flags_B", "Flags_q", "Flags_Platform", ], "EFI": [ "U_orbit", "Ne", "Ne_error", "Te", "Te_error", "Vs", "Vs_error", "Flags_LP", "Flags_Ne", "Flags_Te", "Flags_Vs", ], "EFI_IDM": [ "Latitude_GD", "Longitude_GD", "Height_GD", "Radius_GC", "Latitude_QD", "MLT_QD", "V_sat_nec", "M_i_eff", "M_i_eff_err", "M_i_eff_Flags", "M_i_eff_tbt_model", "V_i", "V_i_err", "V_i_Flags", "V_i_raw", "N_i", "N_i_err", "N_i_Flags", "A_fp", "R_p", "T_e", "Phi_sc", ], "EFI_TIE": [ "Latitude_GD", "Longitude_GD", "Height_GD", "Radius_GC", "Latitude_QD", "MLT_QD", "Tn_msis", "Te_adj_LP", "Ti_meas_drift", "Ti_model_drift", "Flag_ti_meas", "Flag_ti_model", ], "EFI_TCT02": [ # identical to EFI_TCT16 "VsatC", "VsatE", "VsatN", "Bx", "By", "Bz", "Ehx", "Ehy", "Ehz", "Evx", "Evy", "Evz", "Vicrx", "Vicry", "Vicrz", "Vixv", "Vixh", "Viy", "Viz", "Vixv_error", "Vixh_error", "Viy_error", "Viz_error", "Latitude_QD", "MLT_QD", "Calibration_flags", "Quality_flags", ], "EFI_TCT16": [ # identical to EFI_TCT02 "VsatC", "VsatE", "VsatN", "Bx", "By", "Bz", "Ehx", "Ehy", "Ehz", "Evx", "Evy", "Evz", "Vicrx", "Vicry", "Vicrz", "Vixv", "Vixh", "Viy", "Viz", "Vixv_error", "Vixh_error", "Viy_error", "Viz_error", "Latitude_QD", "MLT_QD", "Calibration_flags", "Quality_flags", ], "IBI": [ "Bubble_Index", "Bubble_Probability", "Flags_Bubble", "Flags_F", "Flags_B", "Flags_q", ], "TEC": [ "GPS_Position", "LEO_Position", "PRN", "L1", "L2", "P1", "P2", "S1", "S2", "Elevation_Angle", "Absolute_VTEC", "Absolute_STEC", "Relative_STEC", "Relative_STEC_RMS", "DCB", "DCB_Error", ], "FAC": [ "IRC", "IRC_Error", "FAC", "FAC_Error", "Flags", "Flags_F", "Flags_B", "Flags_q", ], "EEF": ["EEF", "EEJ", "RelErr", "Flags"], "IPD": [ "Ne", "Te", "Background_Ne", "Foreground_Ne", "PCP_flag", "Grad_Ne_at_100km", "Grad_Ne_at_50km", "Grad_Ne_at_20km", "Grad_Ne_at_PCP_edge", "ROD", "RODI10s", "RODI20s", "delta_Ne10s", "delta_Ne20s", "delta_Ne40s", "Num_GPS_satellites", "mVTEC", "mROT", "mROTI10s", "mROTI20s", "IBI_flag", "Ionosphere_region_flag", "IPIR_index", "Ne_quality_flag", "TEC_STD", ], "AEJ_LPL": ["Latitude_QD", "Longitude_QD", "MLT_QD", "J_NE", "J_QD"], "AEJ_LPL:Quality": ["RMS_misfit", "Confidence"], "AEJ_LPS": [ "Latitude_QD", "Longitude_QD", "MLT_QD", "J_CF_NE", "J_DF_NE", "J_CF_SemiQD", "J_DF_SemiQD", "J_R", ], "AEJ_LPS:Quality": ["RMS_misfit", "Confidence"], "AEJ_PBL": [ "Latitude_QD", "Longitude_QD", "MLT_QD", "J_QD", "Flags", "PointType", ], "AEJ_PBS": [ "Latitude_QD", "Longitude_QD", "MLT_QD", "J_DF_SemiQD", "Flags", "PointType", ], "AEJ_PBS:GroundMagneticDisturbance": ["B_NE"], "AOB_FAC": [ "Latitude_QD", "Longitude_QD", "MLT_QD", "Boundary_Flag", "Quality", "Pair_Indicator", ], "AUX_OBSH": ["B_NEC", "F", "IAGA_code", "Quality", "ObsIndex"], "AUX_OBSM": ["B_NEC", "F", "IAGA_code", "Quality"], "AUX_OBSS": ["B_NEC", "F", "IAGA_code", "Quality"], "VOBS_SW_1M": ["SiteCode", "B_CF", "B_OB", "sigma_CF", "sigma_OB"], "VOBS_CH_1M": ["SiteCode", "B_CF", "B_OB", "sigma_CF", "sigma_OB"], "VOBS_CR_1M": ["SiteCode", "B_CF", "B_OB", "sigma_CF", "sigma_OB"], "VOBS_OR_1M": ["SiteCode", "B_CF", "B_OB", "sigma_CF", "sigma_OB"], "VOBS_CO_1M": ["SiteCode", "B_CF", "B_OB", "sigma_CF", "sigma_OB"], "VOBS_OR_4M": ["SiteCode", "B_CF", "B_OB", "sigma_CF", "sigma_OB"], "VOBS_SW_4M": ["SiteCode", "B_CF", "B_OB", "sigma_CF", "sigma_OB"], "VOBS_CH_4M": ["SiteCode", "B_CF", "B_OB", "sigma_CF", "sigma_OB"], "VOBS_CR_4M": ["SiteCode", "B_CF", "B_OB", "sigma_CF", "sigma_OB"], "VOBS_CO_4M": ["SiteCode", "B_CF", "B_OB", "sigma_CF", "sigma_OB"], "VOBS_SW_1M:SecularVariation": ["SiteCode", "B_SV", "sigma_SV"], "VOBS_CH_1M:SecularVariation": ["SiteCode", "B_SV", "sigma_SV"], "VOBS_CR_1M:SecularVariation": ["SiteCode", "B_SV", "sigma_SV"], "VOBS_OR_1M:SecularVariation": ["SiteCode", "B_SV", "sigma_SV"], "VOBS_CO_1M:SecularVariation": ["SiteCode", "B_SV", "sigma_SV"], "VOBS_OR_4M:SecularVariation": ["SiteCode", "B_SV", "sigma_SV"], "VOBS_SW_4M:SecularVariation": ["SiteCode", "B_SV", "sigma_SV"], "VOBS_CH_4M:SecularVariation": ["SiteCode", "B_SV", "sigma_SV"], "VOBS_CR_4M:SecularVariation": ["SiteCode", "B_SV", "sigma_SV"], "VOBS_CO_4M:SecularVariation": ["SiteCode", "B_SV", "sigma_SV"], "MIT_LP": [ "Counter", "Latitude_QD", "Longitude_QD", "MLT_QD", "L_value", "SZA", "Ne", "Te", "Depth", "DR", "Width", "dL", "PW_Gradient", "EW_Gradient", "Quality", ], "MIT_LP:ID": [ "Counter", "Latitude_QD", "Longitude_QD", "MLT_QD", "L_value", "SZA", "Ne", "Te", "Position_Quality", "PointType", ], "MIT_TEC": [ "Counter", "Latitude_QD", "Longitude_QD", "MLT_QD", "L_value", "SZA", "TEC", "Depth", "DR", "Width", "dL", "PW_Gradient", "EW_Gradient", "Quality", ], "MIT_TEC:ID": [ "Counter", "Latitude_QD", "Longitude_QD", "MLT_QD", "L_value", "SZA", "TEC", "Position_Quality", "PointType", ], "PPI_FAC": [ "Counter", "Latitude_QD", "Longitude_QD", "MLT_QD", "L_value", "SZA", "Sigma", "PPI", "dL", "Quality", ], "PPI_FAC:ID": [ "Counter", "Latitude_QD", "Longitude_QD", "MLT_QD", "L_value", "SZA", "Position_Quality", "PointType", ], "MAG_CS": [ "F", "B_NEC", "B_mod_NEC", "B_NEC1", "B_NEC2", "B_NEC3", "B_FGM1", "B_FGM2", "B_FGM3", "q_NEC_CRF", "q_error", ], "MAG_GRACE": [ "F", "B_NEC", "B_NEC_raw", "B_FGM", "q_NEC_CRF", "q_error", ], "MAG_GFO": [ "F", "B_NEC", "B_FGM", "dB_MTQ_FGM", "dB_XI_FGM", "dB_NY_FGM", "dB_BT_FGM", "dB_ST_FGM", "dB_SA_FGM", "dB_BAT_FGM", "q_NEC_FGM", "B_FLAG", ], "MAG_GOCE": [ "F", "B_MAG", "B_NEC", "dB_MTQ_SC", "dB_XI_SC", "dB_NY_SC", "dB_BT_SC", "dB_ST_SC", "dB_SA_SC", "dB_BAT_SC", "dB_HK_SC", "dB_BLOCK_CORR", "q_SC_NEC", "q_MAG_SC", "B_FLAG", ], "MAG_GOCE_ML": [ "F", "B_MAG", "B_NEC", "q_FGM_NEC", "B_FLAG", "KP_DST_FLAG", "NaN_FLAG", "Latitude_QD", "Longitude_QD", ], "MOD_SC": [], } AUXILIARY_VARIABLES = [ "Timestamp", "Latitude", "Longitude", "Radius", "Spacecraft", "OrbitDirection", "QDOrbitDirection", "SyncStatus", "Kp10", "Kp", "Dst", "F107", "IMF_BY_GSM", "IMF_BZ_GSM", "IMF_V", "F10_INDEX", "OrbitSource", "OrbitNumber", "AscendingNodeTime", "AscendingNodeLongitude", "QDLat", "QDLon", "QDBasis", "MLT", "SunDeclination", "SunHourAngle", "SunRightAscension", "SunAzimuthAngle", "SunZenithAngle", "SunLongitude", "SunVector", "DipoleAxisVector", "NGPLatitude", "NGPLongitude", "DipoleTiltAngle", "dDst", ] MAGNETIC_MODEL_VARIABLES = { "F": "F", "B_NEC": "B_NEC", "B_NEC1": "B_NEC", "B_NEC2": "B_NEC", "B_NEC3": "B_NEC", } MAGNETIC_MODELS = [ "IGRF", "IGRF12", "LCS-1", "MF7", "CHAOS-Core", "CHAOS-Static", "CHAOS-MMA-Primary", "CHAOS-MMA-Secondary", "CHAOS-6-Core", "CHAOS-6-Static", "CHAOS-6-MMA-Primary", "CHAOS-6-MMA-Secondary", "MCO_SHA_2C", "MCO_SHA_2D", "MLI_SHA_2C", "MLI_SHA_2D", "MLI_SHA_2E", "MMA_SHA_2C-Primary", "MMA_SHA_2C-Secondary", "MMA_SHA_2F-Primary", "MMA_SHA_2F-Secondary", "MIO_SHA_2C-Primary", "MIO_SHA_2C-Secondary", "MIO_SHA_2D-Primary", "MIO_SHA_2D-Secondary", "AMPS", "MCO_SHA_2X", "CHAOS", "CHAOS-MMA", "MMA_SHA_2C", "MMA_SHA_2F", "MIO_SHA_2C", "MIO_SHA_2D", "SwarmCI", ] def __init__(self, url=None, token=None, config=None, logging_level="NO_LOGGING"): super().__init__(url, token, config, logging_level, server_type="Swarm") self._available = self._get_available_data() self._request_inputs = SwarmWPSInputs() self._templatefiles = TEMPLATE_FILES self._filterlist = [] self._supported_filetypes = ("csv", "cdf") self._collection_list = None @classmethod def _get_available_data(cls): # Build the reverse mapping: "SW_OPER_MAGA_LR_1B": "MAG" etc collections_to_keys = {} for key, collections in cls.COLLECTIONS.items(): collections_to_keys.update({collection: key for collection in collections}) return { "collections": cls.COLLECTIONS, "collections_to_keys": collections_to_keys, "collection_sampling_steps": cls.COLLECTION_SAMPLING_STEPS, "measurements": cls.PRODUCT_VARIABLES, "models": cls.MAGNETIC_MODELS, "model_variables": cls.MAGNETIC_MODEL_VARIABLES, "auxiliaries": cls.AUXILIARY_VARIABLES, } @staticmethod def _parse_models_input(models=None): """Verify and parse models input. Args: models (list/dict): User-provided values Returns: list: model_ids, list of model_id strings str: model_expression_string to be passed to the server """ models = [] if models is None else models # Convert input to OrderedDict # e.g. {"model_name": "model_expression", ..} # Check if models input is basic list of strings, # If not, then handle inputs given as dicts or list of tuples if isinstance(models, list) and all(isinstance(item, str) for item in models): # Convert the models list to an OrderedDict model_expressions = OrderedDict() for model in models: model_id, _, model_expression = ( s.strip() for s in model.partition("=") ) model_expressions[model_id] = model_expression else: try: model_expressions = OrderedDict(models) # Check that everything is a string if not all( isinstance(item, str) for item in [*model_expressions.values()] + [*model_expressions.keys()] ): raise ValueError except ValueError: raise ValueError("Invalid models input!") # TODO: Verify input model names # (use self._available["models"]) # Create the combined model expression string passed to the request model_expression_string = "" for model_id, model_expression in model_expressions.items(): if model_expression == "": s = model_id else: s = "=".join([model_id, model_expression]) model_expression_string = ",".join([model_expression_string, s]) model_ids = list(s.strip("'\"") for s in model_expressions.keys()) return model_ids, model_expression_string[1:]
[docs] def available_collections(self, groupname=None, details=True): """Show details of available collections. Args: groupname (str): one of: ("MAG", "EFI", etc.) details (bool): If True then print a nice output. If False then return a dict of available collections. """ # Shorter form of the available collections, # without all the individual SiteCodes collections_short = self._available["collections"].copy() collections_short["AUX_OBSS"] = ["SW_OPER_AUX_OBSS2_"] collections_short["AUX_OBSM"] = ["SW_OPER_AUX_OBSM2_"] collections_short["AUX_OBSH"] = ["SW_OPER_AUX_OBSH2_"] for mission in ("SW", "OR", "CH", "CR", "CO"): for cadence in ("1M", "4M"): collections_short[f"VOBS_{mission}_{cadence}"] = [ f"{mission}_OPER_VOBS_{cadence}_2_" ] collections_short[f"VOBS_{mission}_{cadence}:SecularVariation"] = [ f"{mission}_OPER_VOBS_{cadence}_2_:SecularVariation" ] def _filter_collections(groupname): """Reduce the full list to just one group, e.g. "MAG""" if groupname: groups = list(collections_short.keys()) if groupname in groups: return {groupname: collections_short[groupname]} else: raise ValueError("Invalid collection group name") else: return collections_short collections_filtered = _filter_collections(groupname) if details: print("General References:") for i in REFERENCES["General Swarm"]: print(i) print() for key, val in collections_filtered.items(): print(key) for i in val: print(" ", i) refs = COLLECTION_REFERENCES.get(key, ("No reference...",)) for ref in refs: print(ref) print() else: return collections_filtered
[docs] def available_measurements(self, collection=None): """Returns a list of the available measurements for the chosen collection. Args: collection (str): one of: ("MAG", "EFI", "IBI", "TEC", "FAC", "EEF") """ keys = list(self._available["measurements"].keys()) if collection in keys: collection_key = collection return self._available["measurements"][collection_key] elif collection in self._available["collections_to_keys"]: collection_key = self._available["collections_to_keys"][collection] return self._available["measurements"][collection_key] elif collection is None: return self._available["measurements"] else: raise Exception( "collection must be one of {}\nor\n{}".format( ", ".join(keys), "\n".join(self._available["collections_to_keys"]) ) )
[docs] def available_models(self, param=None, details=True, nice_output=True): """Show details of avalable models. If details is True, return a dictionary of model names and details. If nice_output is True, the dictionary is printed nicely. If details is False, return a list of model names. If param is set, filter to only return entries including this Note: | F = Fast-Track Products | C = Comprehensive Inversion | D = Dedicated Chain | MCO = Core / main | MLI = Lithosphere | MMA = Magnetosphere | MIO = Ionosphere Args: param (str): one of "F C D MCO MLI MMA MIO" details (bool): True for a dict of details, False for a brief list nice_output (bool): If True, just print the dict nicely """ def filter_by_param(d, param): if param in ("F", "C", "D"): param = "2" + param return [i for i in d if param in i] # get all models provided by the server models_info = self.get_model_info() # keep only models really provided by the server d = [ model_name for model_name in self._available["models"] if model_name in models_info ] # Filter the dict/list to only include those that contain param if param is not None: d = filter_by_param(d, param) if details: d = { model_name: { "description": MODEL_REFERENCES[model_name], "details": models_info[model_name], } for model_name in d } if nice_output and details: d = OrderedDict(sorted(d.items())) for model_name, desc_details in d.items(): print(model_name, "=", desc_details["details"]["expression"]) print(" START:", desc_details["details"]["validity"]["start"]) print(" END: ", desc_details["details"]["validity"]["end"]) print("DESCRIPTION:") for line in desc_details["description"]: print(line) print("SOURCES:") for line in desc_details["details"]["sources"]: print(" ", line) print() else: return d
[docs] def available_auxiliaries(self): """Returns a list of the available auxiliary parameters.""" return self._available["auxiliaries"]
[docs] def available_observatories( self, collection, start_time=None, end_time=None, details=False, verbose=True ): """Get list of available observatories from server. Search availability by collection, one of:: "SW_OPER_AUX_OBSH2_" "SW_OPER_AUX_OBSM2_" "SW_OPER_AUX_OBSS2_" Examples: :: from viresclient import SwarmRequest request = SwarmRequest() # For a list of observatories available: request.available_observatories("SW_OPER_AUX_OBSM2_") # For a DataFrame also containing availability start and end times: request.available_observatories("SW_OPER_AUX_OBSM2_", details=True) # For available observatories during a given time period: request.available_observatories( "SW_OPER_AUX_OBSM2_", "2013-01-01", "2013-02-01" ) Args: collection (str): OBS collection name, e.g. "SW_OPER_AUX_OBSM2\\_" start_time (datetime / ISO_8601 string) end_time (datetime / ISO_8601 string) details (bool): returns DataFrame if True verbose (bool): Notify with special data terms Returns: list or DataFrame: IAGA codes (and start/end times) """ def _request_get_observatories(collection=None, start_time=None, end_time=None): """Make the get_observatories request to the server""" templatefile = TEMPLATE_FILES["get_observatories"] template = JINJA2_ENVIRONMENT.get_template(templatefile) request = template.render( collection_id=collection, begin_time=start_time, end_time=end_time, response_type="text/csv", ).encode("UTF-8") response = self._get(request, asynchronous=False, show_progress=False) return response def _csv_to_df(csv_data): """Convert bytes data to pandas dataframe""" return read_csv(StringIO(str(csv_data, "utf-8"))) if collection not in self.OBS_COLLECTIONS: raise ValueError( f"Invalid collection: {collection}. Must be one of: {self.OBS_COLLECTIONS}." ) if start_time and end_time: start_time = parse_datetime(start_time) end_time = parse_datetime(end_time) else: start_time, end_time = None, None if verbose: self._detect_AUX_OBS([collection]) response = _request_get_observatories(collection, start_time, end_time) df = _csv_to_df(response) if details: return df else: # note: "IAGACode" has been renamed to "site" in VirES 3.5 key = "IAGACode" if "IAGACode" in df.keys() else "site" return list(df[key])
def _detect_AUX_OBS(self, collections): # Identify collection types present collection_types_requested = { self._available["collections_to_keys"].get(collection) for collection in collections } # Output notification for each of aux_type for aux_type in ["AUX_OBSH", "AUX_OBSM", "AUX_OBSS"]: if aux_type in collection_types_requested: output_text = dedent( f""" Accessing INTERMAGNET and/or WDC data Check usage terms at {DATA_CITATIONS.get(aux_type)} """ ) tqdm.write(output_text)
[docs] def set_collection(self, *args, verbose=True): """Set the collection(s) to use. Args: (str): one or several from .available_collections() verbose (bool): Notify if special data terms """ collections = [*args] for collection in collections: if not isinstance(collection, str): raise TypeError(f"{collection} invalid. Must be string.") if collection not in self._available["collections_to_keys"]: raise ValueError( "Invalid collection: {}. " "Check available with SwarmRequest().available_collections()".format( collection ) ) if verbose: self._detect_AUX_OBS(collections) self._collection_list = collections self._request_inputs.set_collections(collections) return self
[docs] def set_products( self, measurements=None, models=None, custom_model=None, auxiliaries=None, residuals=False, sampling_step=None, ignore_cached_models=False, ): """Set the combination of products to retrieve. If residuals=True then just get the measurement-model residuals, otherwise get both measurement and model values. Args: measurements (list(str)): from .available_measurements(collection_key) models (list(str)/dict): from .available_models() or defineable with custom expressions custom_model (str): path to a custom model in .shc format auxiliaries (list(str)): from .available_auxiliaries() residuals (bool): True if only returning measurement-model residual sampling_step (str): ISO_8601 duration, e.g. 10 seconds: PT10S, 1 minute: PT1M ignore_cached_models (bool): True if cached models should be ignored and calculated on-the-fly """ if self._collection_list is None: raise Exception("Must run .set_collection() first.") measurements = [] if measurements is None else measurements models = [] if models is None else models model_variables = self._available["model_variables"] auxiliaries = [] if auxiliaries is None else auxiliaries # If inputs are strings (when providing only one parameter) # put them in lists if isinstance(measurements, str): measurements = [measurements] if isinstance(models, str): models = [models] if isinstance(auxiliaries, str): auxiliaries = [auxiliaries] # print warning for deprecated models self._check_deprecated_models(models) # Check the chosen measurements are available for the set collections available_measurements = [] for collection in self._collection_list: collection_key = self._available["collections_to_keys"][collection] available_measurements.extend( self._available["measurements"][collection_key] ) for variable in measurements: if variable not in available_measurements: raise Exception( "Measurement '{}' not available for collection '{}'. " "Check available with " "SwarmRequest.available_measurements({})".format( variable, collection_key, collection_key ) ) # Check if at least one model defined when requesting residuals if residuals and not models: raise Exception("Residuals requested but no model defined!") # Check models format, extract model_ids and string to pass to server model_ids, model_expression_string = self._parse_models_input(models) # Check chosen aux is available for variable in auxiliaries: if variable not in self._available["auxiliaries"]: raise Exception( "'{}' not available. Check available with " "SwarmRequest.available_auxiliaries()".format(variable) ) # Load the custom .shc file if custom_model: if os.path.exists(custom_model): with open(custom_model) as custom_shc_file: custom_shc = model_ids.append("Custom_Model") else: raise OSError("Custom model .shc file not found") else: custom_shc = None # Set up the variables that actually get passed to the WPS request # Requested variables, start with the measurements ... variables = set(measurements) # model-related measurements _requested_model_variables = [ variable for variable in measurements if variable in model_variables ] if residuals: # Remove the measurements ... variables.difference_update(_requested_model_variables) # ... add their residuals instead. variables.update( f"{variable}_res_{model_id}" for variable in _requested_model_variables for model_id in model_ids ) else: # If no variable is requested fall back to B_NEC. if not _requested_model_variables: _requested_model_variables = ["B_NEC"] # Add calculated model variables. variables.update( f"{variable}_{model_id}" for variable in ( model_variables[variable] for variable in _requested_model_variables ) for model_id in model_ids ) # Finally, add the auxiliary variables. variables.update(auxiliaries) self._request_inputs.model_expression = model_expression_string self._request_inputs.variables = list(variables) self._request_inputs.sampling_step = sampling_step self._request_inputs.custom_shc = custom_shc self._request_inputs.ignore_cached_models = ignore_cached_models return self
[docs] def set_range_filter(self, parameter, minimum=None, maximum=None, negate=False): """Set a range filter to apply. Filters data for minimum ≤ parameter ≤ maximum, or parameter < minimum OR parameter > maximum if negated. Note: - Apply multiple filters with successive calls to ``.set_range_filter()`` - See :py:meth:`SwarmRequest.add_filter` for arbitrary filters. Args: parameter (str) minimum (float or integer) maximum (float or integer) Examples: ``request.set_range_filter("Latitude", 0, 90)`` to set "Latitude >= 0 AND Latitude <= 90" ``request.set_range_filter("Latitude", 0, 90, negate=True)`` to set "(Latitude < 0 OR Latitude > 90)" """ if not isinstance(parameter, str): raise TypeError("parameter must be a str") def _generate_filters(minop, maxop): if minimum is not None: yield f"{parameter} {minop} {minimum}" if maximum is not None: yield f"{parameter} {maxop} {maximum}" nargs = 2 - (minimum is None) - (maximum is None) if nargs == 0: return filter_ = ( " AND ".join(_generate_filters(">=", "<=")) if not negate else " OR ".join(_generate_filters("<", ">")) ) if nargs > 1: filter_ = f"({filter_})" self.add_filter(filter_) return self
[docs] def set_choice_filter(self, parameter, *values, negate=False): """Set a choice filter to apply. Filters data for *parameter in values*, or *parameter not in values* if negated. Note: See :py:meth:`SwarmRequest.add_filter` for arbitrary filters. Args: parameter (str) values (float or integer or string) Examples: ``request.set_choice_filter("Flags_F", 0, 1)`` to set "(Flags_F == 0 OR Flags_F == 1)" ``request.set_choice_filter("Flags_F", 0, 1, negate=True)`` to set "(Flags_F != 0 AND Flags_F != 1)" """ if not isinstance(parameter, str): raise TypeError("parameter must be a str") def _generate_filters(compop): for value in values: yield f"{parameter} {compop} {value!r}" nargs = len(values) if nargs == 0: return filter_ = ( " OR ".join(_generate_filters("==")) if not negate else " AND ".join(_generate_filters("!=")) ) if nargs > 1: filter_ = f"({filter_})" self.add_filter(filter_) return self
[docs] def set_bitmask_filter(self, parameter, selection=0, mask=-1, negate=False): """Set a bitmask filter to apply. Filters data for *parameter & mask == selection & mask*, or *parameter & mask != selection & mask* if negated. Note: See :py:meth:`SwarmRequest.add_filter` for arbitrary filters. Args: parameter (str) mask (integer) selection (integer) Examples: ``request.set_bitmask_filter("Flags_F", 0, 1)`` to set "Flags_F & 1 == 0" (i.e. bit 1 is set to 0) """ if not isinstance(parameter, str): raise TypeError("parameter must be a str") def _get_filter(compop): return ( f"{parameter} & {mask} {compop} {selection & mask}" if mask != -1 else f"{parameter} {compop} {selection}" ) if not negate: if mask != 0: # avoid pointless (0 == 0) filter self.add_filter(_get_filter("==")) else: # mask == 0 leads to (0 != 0) filter and nothing is selected. self.add_filter(_get_filter("!=")) return self
[docs] def add_filter(self, filter_): """Add an arbitrary data filter. Args: filter_ (str): string defining the filter, as shown below Filter grammar: .. code-block:: text filter: predicate predicate: variable == literal | variable != literal | variable < number | variable > number | variable <= number | variable >= number | variable & unsigned-integer == unsigned-integer | variable & unsigned-integer != unsigned-integer | (predicate AND predicate [AND predicate ...]) | (predicate OR predicate [OR predicate ...]) | NOT predicate literal: boolean | integer | float | string number: integer | float variable: identifier | identifier[index] index: integer[, integer ...] Both single- and double quoted strings are allowed. NaN values are matched by the ==/!= operators, i.e., the predicates are internally converted to a proper "IS NaN" or "IS NOT NaN" comparison. Examples: "Flags & 128 == 0" Match records with Flag bit 7 set to 0. "Elevation >= 15" Match values with values greater than or equal to 15. "(Label == "D" OR Label == "N" OR Label = "X")" Match records with Label set to D, N or X. "(Type != 1 AND Type != 34) NOT (Type == 1 OR Type == 34)" Exclude records with Type set to 1 or 34. "(Vector[2] <= -0.1 OR Vector[2] >= 0.5)" Match records with Vector[2] values outside of the (-0.1, 0.5) range. """ if not isinstance(filter_, str): raise TypeError("parameter must be a str") self._filterlist.append(filter_) # Update the SwarmWPSInputs object self._request_inputs.filters = " AND ".join(self._filterlist)
[docs] def clear_filters(self): """Remove all applied filters.""" self._filterlist = [] self._request_inputs.filters = None return self
clear_range_filter = clear_filters # alias for backward compatibility
[docs] def applied_filters(self): """Print currently applied filters.""" for filter_ in self._filterlist: print(filter_)
[docs] def get_times_for_orbits( self, start_orbit, end_orbit, mission="Swarm", spacecraft=None ): """Translate a pair of orbit numbers to a time interval. Args: start_orbit (int): a starting orbit number end_orbit (int): a later orbit number spacecraft (str): Swarm: one of ('A','B','C') or ("Alpha", "Bravo", "Charlie") GRACE: one of ('1','2') GRACE-FO: one of ('1','2') CryoSat-2: None mission (str): one of ('Swarm', 'GRACE', 'GRACE-FO', 'CryoSat-2') Returns: tuple (datetime): (start_time, end_time) The start time of the start_orbit and the ending time of the end_orbit. (Based on ascending nodes of the orbits) """ # check old function signature and print warning if ( isinstance(start_orbit, str) and isinstance(mission, int) and spacecraft is None ): spacecraft, start_orbit, end_orbit = start_orbit, end_orbit, mission mission = "Swarm" warn( "The order of SwarmRequest.get_times_for_orbits() method's " "parameters has changed! " "The backward compatibility will be removed in the future. " "Please change your code to: " "request.get_times_for_orbits(start_orbit, end_orbit, " "'Swarm', spacecraft)", FutureWarning, ) start_orbit = int(start_orbit) end_orbit = int(end_orbit) # Change to spacecraft = "A" etc. for this request spacecraft = self._fix_spacecraft(mission, spacecraft) self._check_mission_spacecraft(mission, spacecraft) templatefile = TEMPLATE_FILES["times_from_orbits"] template = JINJA2_ENVIRONMENT.get_template(templatefile) request = template.render( mission=mission, spacecraft=spacecraft, start_orbit=start_orbit, end_orbit=end_orbit, ).encode("UTF-8") response = self._get(request, asynchronous=False, show_progress=False) responsedict = json.loads(response.decode("UTF-8")) start_time = parse_datetime(responsedict["start_time"]) end_time = parse_datetime(responsedict["end_time"]) return start_time, end_time
def _fix_spacecraft(self, mission, spacecraft): # Change to spacecraft = "A" etc. for this request spacecraft = str(spacecraft) if spacecraft is not None else None if mission == "Swarm" and spacecraft in ("Alpha", "Bravo", "Charlie"): spacecraft = spacecraft[0] return spacecraft def _check_mission_spacecraft(self, mission, spacecraft): if mission not in self.MISSION_SPACECRAFTS: raise ValueError( f"Invalid mission {mission}!" f"Allowed options are: {','.join(self.MISSION_SPACECRAFTS)}" ) if self.MISSION_SPACECRAFTS[mission]: # missions with required spacecraft id if not spacecraft: raise ValueError( f"The {mission} spacecraft is required!" f"Allowed options are: {','.join(self.MISSION_SPACECRAFTS[mission])}" ) if spacecraft not in self.MISSION_SPACECRAFTS[mission]: raise ValueError( f"Invalid {mission} spacecraft! " f"Allowed options are: {','.join(self.MISSION_SPACECRAFTS[mission])}" ) elif spacecraft: # mission without spacecraft id raise ValueError( f"No {mission} spacecraft shall be specified! " "Set spacecraft to None." )
[docs] def get_orbit_number(self, spacecraft, input_time, mission="Swarm"): """Translate a time to an orbit number. Args: spacecraft (str): Swarm: one of ('A','B','C') or ("Alpha", "Bravo", "Charlie") GRACE: one of ('1','2') GRACE-FO: one of ('1','2') CryoSat-2: None input_time (datetime): a point in time mission (str): one of ('Swarm', 'GRACE', 'GRACE-FO', 'CryoSat-2') Returns: int: The current orbit number at the input_time """ try: input_time = parse_datetime(input_time) except TypeError: raise TypeError( "input_time must be datetime object or ISO-8601 " "date/time string" ) # Change to spacecraft = "A" etc. for this request if spacecraft in ("Alpha", "Bravo", "Charlie"): spacecraft = spacecraft[0] if mission not in self.MISSION_SPACECRAFTS: raise ValueError( f"Invalid mission {mission}!" f"Allowed options are: {','.join(self.MISSION_SPACECRAFTS)}" ) spacecraft = str(spacecraft) if mission == "Swarm": collection = f"SW_OPER_MOD{spacecraft}_SC_1B" elif mission == "GRACE": if spacecraft in "12": spacecraft = "AB"[int(spacecraft) - 1] elif spacecraft not in "AB": raise ValueError(f"Invalid spacecraft: {spacecraft}") collection = f"GRACE_{spacecraft}_MAG" elif mission == "GRACE-FO": collection = f"GF{spacecraft}_OPER_FGM_ACAL_CORR" elif mission == "CryoSat-2": collection = "CS_OPER_MAG" request_inputs = SwarmWPSInputs( collection_ids={collection: [collection]}, begin_time=input_time, end_time=input_time + datetime.timedelta(seconds=1), variables=["OrbitNumber"], response_type="text/csv", ) request = request_inputs.as_xml(self._templatefiles["sync"]) retdata = ReturnedDataFile(filetype="csv") response_handler = self._response_handler(retdata, show_progress=False) self._get( request, asynchronous=False, response_handler=response_handler, show_progress=False, ) df = retdata.as_dataframe() if len(df) == 0: raise ValueError( "Orbit number not identified. Probably outside of mission duration or orbit counter file." ) elif len(df) > 1: raise RuntimeError("Unexpected server response. More than one OrbitNumber.") else: return df["OrbitNumber"][0]
[docs] def get_model_info(self, models=None, custom_model=None, original_response=False): """Get model info from server. Handles the same models input as .set_products(), and returns a dict like: {'IGRF12': { 'expression': 'IGRF12(max_degree=13,min_degree=0)', 'validity': {'start': '1900-01-01T00:00:00Z', 'end': '2020-01-01T00:00:00Z' }, ...} If original_response=True, return the list of dicts like: {'expression': 'MCO_SHA_2C(max_degree=16,min_degree=0)', 'name': 'MCO_SHA_2C', 'validity': {'start': '2013-11-30T14:38:24Z', 'end': '2018-01-01T00:00:00Z'}}, ... Args: models (list/dict): as with set_products custom_model (str): as with set_products original_response (bool) Returns: dict or list """ def _request_get_model_info(model_expression=None, custom_shc=None): """Make the get_model_info request.""" templatefile = TEMPLATE_FILES["model_info"] template = JINJA2_ENVIRONMENT.get_template(templatefile) request = template.render( model_expression=model_expression, custom_shc=custom_shc, response_type="application/json", ).encode("UTF-8") response = self._get(request, asynchronous=False, show_progress=False) response_list = json.loads(response.decode("UTF-8")) return response_list def _build_dict(response_list): """Build dictionary output organised by model name.""" return {model_dict.pop("name"): model_dict for model_dict in response_list} if custom_model: with open(custom_model) as custom_shc_file: custom_shc = if not models: models = ["Custom_Model"] else: custom_shc = None if models is not None: _, model_expression = self._parse_models_input(models) else: model_expression = None response = _request_get_model_info(model_expression, custom_shc) if not original_response: response = _build_dict(response) return response
@staticmethod def _check_deprecated_models(models): """Print deprecation warning for deprecated models.""" deprecated_models = [] for deprecated_model in DEPRECATED_MODELS: for model in models: if deprecated_model in model: deprecated_models.append(deprecated_model) break for deprecated_model in deprecated_models: print( "WARNING: Model {} is deprecated. {}".format( deprecated_model, DEPRECATED_MODELS[deprecated_model] ), file=sys.stdout, )
[docs] def get_conjunctions( self, start_time=None, end_time=None, threshold=1.0, spacecraft1="A", spacecraft2="B", mission1="Swarm", mission2="Swarm", ): """Get times of the spacecraft conjunctions. Currently available for the following spacecraft pairs: - Swarm-A/Swarm-B Args: start_time (datetime / ISO_8601 string): optional start time end_time (datetime / ISO_8601 string): optional end time threshold (float): optional maximum allowed angular separation in degrees; by default set to 1; allowed values are [0, 180] spacecraft1: identifier of the first spacecraft, default to 'A' spacecraft2: identifier of the second spacecraft, default to 'B' mission1 (str): mission of the first spacecraft, defaults to 'Swarm' mission2 (str): mission of the first spacecraft, defaults to 'Swarm' Returns: ReturnedData: """ try: start_time = parse_datetime(start_time) if start_time else None end_time = parse_datetime(end_time) if end_time else None except TypeError: raise TypeError( "start_time and end_time must be datetime objects or ISO-8601 " "date/time strings" ) from None if not (0 <= threshold <= 180): raise ValueError("Invalid threshold value!") spacecraft1 = self._fix_spacecraft(mission1, spacecraft1) spacecraft2 = self._fix_spacecraft(mission2, spacecraft2) self._check_mission_spacecraft(mission1, spacecraft1) self._check_mission_spacecraft(mission2, spacecraft2) if (mission1, spacecraft1) == (mission2, spacecraft2): raise ValueError("The first and second spacecraft must not be the same!") spacecraft_pair = tuple( sorted([(mission1, spacecraft1), (mission2, spacecraft2)]) ) if spacecraft_pair not in self.CONJUNCTION_MISSION_SPACECRAFT_PAIRS: raise ValueError( "Conjunctions not available for the requested " "spacecraft pair {spacecraft_pair}!" ) templatefile = TEMPLATE_FILES["get_conjunctions"] template = JINJA2_ENVIRONMENT.get_template(templatefile) request = template.render( begin_time=start_time, end_time=end_time, spacecraft1=spacecraft1, spacecraft2=spacecraft2, mission1=mission1, mission2=mission2, threshold=threshold, ).encode("UTF-8") show_progress = False leave_progress_bar = False response = ReturnedDataFile(filetype="cdf") response_handler = self._response_handler( retdatafile=response, show_progress=show_progress, leave_progress_bar=leave_progress_bar, ) self._get( request=request, asynchronous=False, show_progress=show_progress, leave_progress_bar=leave_progress_bar, response_handler=response_handler, ) return response