import datetime
import json
import pandas as pd
from ._client import ClientRequest, WPSInputs
from ._data import CONFIG_AEOLUS
from ._data_handling import ReturnedDataFile
# from pandas import DataFrame, json_normalize
TEMPLATE_FILES = {
"sync": "vires_aeolus_fetch_filtered_data.xml",
"async": "vires_aeolus_fetch_filtered_data_async.xml",
}
class AeolusWPSInputs(WPSInputs):
NAMES = [
"processId",
"collection_ids",
"begin_time",
"end_time",
"response_type",
"fields",
"filters",
"aux_type",
"observation_fields",
"measurement_fields",
"mie_profile_fields",
"rayleigh_profile_fields",
"mie_wind_fields",
"rayleigh_wind_fields",
"mie_grouping_fields",
"rayleigh_grouping_fields",
"ica_fields",
"sca_fields",
"mca_fields",
"group_fields",
"bbox",
"dsd_info",
]
def __init__(
self,
processId=None,
collection_ids=None,
begin_time=None,
end_time=None,
response_type=None,
fields=None,
filters=None,
aux_type=None,
observation_fields=None,
measurement_fields=None,
sca_fields=None,
ica_fields=None,
mca_fields=None,
group_fields=None,
mie_profile_fields=None,
rayleigh_profile_fields=None,
mie_wind_fields=None,
rayleigh_wind_fields=None,
mie_grouping_fields=None,
rayleigh_grouping_fields=None,
bbox=None,
dsd_info=False,
):
# Obligatory
self.processId = None if processId is None else processId
self.collection_ids = None if collection_ids is None else collection_ids
self.begin_time = None if begin_time is None else begin_time
self.end_time = None if end_time is None else end_time
self.response_type = None if response_type is None else response_type
# Optional
self.fields = fields
self.filters = filters
self.aux_type = aux_type
self.observation_fields = observation_fields
self.measurement_fields = measurement_fields
self.ica_fields = ica_fields
self.mca_fields = mca_fields
self.group_fields = group_fields
self.sca_fields = sca_fields
self.mie_profile_fields = mie_profile_fields
self.rayleigh_profile_fields = rayleigh_profile_fields
self.mie_wind_fields = mie_wind_fields
self.rayleigh_wind_fields = rayleigh_wind_fields
self.rayleigh_grouping_fields = rayleigh_grouping_fields
self.mie_grouping_fields = mie_grouping_fields
self.bbox = bbox
self.dsd_info = dsd_info
@property
def as_dict(self):
# Add these as properties later:
self._filters = self.filters
self._fields = self.fields
self._observation_fields = self.observation_fields
self._measurement_fields = self.measurement_fields
self._mie_profile_fields = self.mie_profile_fields
self._rayleigh_profile_fields = self.rayleigh_profile_fields
self._mie_wind_fields = self.mie_wind_fields
self._rayleigh_wind_fields = self.rayleigh_wind_fields
self._rayleigh_grouping_fields = self.rayleigh_grouping_fields
self._mie_grouping_fields = self.mie_grouping_fields
self._ica_fields = self.ica_fields
self._mca_fields = self.mca_fields
self._group_fields = self.group_fields
self._sca_fields = self.sca_fields
self._bbox = self.bbox
self._dsd_info = self.dsd_info
return {key: self.__dict__[f"_{key}"] for key in self.NAMES}
@property
def processId(self):
return self.processId
@processId.setter
def processId(self, processId):
if isinstance(processId, str) or processId is None:
self._processId = processId
else:
raise TypeError
@property
def collection_ids(self):
return self._collection_ids
@collection_ids.setter
def collection_ids(self, collection):
if isinstance(collection, str) or collection is None:
# tag = 'X'
# collections = [collection]
# self._collection_ids = {tag: collections}
self._collection_ids = [collection]
else:
raise TypeError("collection_ids must be a string")
# @collection_ids.setter
# def collection_ids(self, collection_ids):
# if isinstance(collection_ids, dict) or collection_ids is None:
# self._collection_ids = collection_ids
# else:
# raise TypeError("collection_ids must be a dict")
#
# def set_collection(self, collection):
# if isinstance(collection, str):
# tag = 'X'
# collections = [collection]
# self.collection_ids = {tag: collections}
# else:
# raise TypeError("collection must be a string")
@property
def begin_time(self):
return self._begin_time
@begin_time.setter
def begin_time(self, begin_time):
if isinstance(begin_time, datetime.datetime) or begin_time is None:
self._begin_time = begin_time
else:
raise TypeError
@property
def end_time(self):
return self._end_time
@end_time.setter
def end_time(self, end_time):
if isinstance(end_time, datetime.datetime) or end_time is None:
self._end_time = end_time
else:
raise TypeError
@property
def filters(self):
return self._filters
@filters.setter
def filters(self, filters):
if isinstance(filters, str) or filters is None:
self._filters = filters
else:
raise TypeError
@property
def response_type(self):
return self._response_type
@response_type.setter
def response_type(self, response_type):
if isinstance(response_type, str) or response_type is None:
self._response_type = response_type
else:
raise TypeError
@property
def aux_type(self):
return self._aux_type
@aux_type.setter
def aux_type(self, aux_type):
if isinstance(aux_type, str) or aux_type is None:
self._aux_type = aux_type
else:
raise TypeError
[docs]class AeolusRequest(ClientRequest):
"""Handles the requests to and downloads from the server.
Args:
url (str):
username (str):
password (str):
token (str):
config (str or ClientConfig):
logging_level (str):
"""
def __init__(self, url=None, token=None, config=None, logging_level="NO_LOGGING"):
super().__init__(url, token, config, logging_level, server_type="Aeolus")
# self._available = self._set_available_data()
self._request_inputs = AeolusWPSInputs()
self._request_inputs.processId = "aeolus:level1B"
self._templatefiles = TEMPLATE_FILES
self._filterlist = {}
self._supported_filetypes = ("nc",)
[docs] def set_collection(self, collection):
# self._request_inputs.set_collection = collection
# We set the process id corresponding to the selected collection
collection_mapping = {
"ALD_U_N_1A": "aeolus:level1A",
"ALD_U_N_1B": "aeolus:level1B",
"ALD_U_N_2A": "aeolus:level2A",
"ALD_U_N_2B": "aeolus:level2B",
"ALD_U_N_2C": "aeolus:level2C",
"AUX_MRC_1B": "aeolus:level1B:AUX:MRC",
"AUX_RRC_1B": "aeolus:level1B:AUX:RRC",
"AUX_ISR_1B": "aeolus:level1B:AUX:ISR",
"AUX_ZWC_1B": "aeolus:level1B:AUX:ZWC",
"AUX_MET_12": "aeolus:AUX:MET",
}
if collection in collection_mapping:
self._request_inputs.processId = collection_mapping[collection]
self._request_inputs.collection_ids = collection
else:
raise ValueError("Product not found")
[docs] def available_collections(
self, collection=None, field_type=None, like=None, details=True
):
return CONFIG_AEOLUS
[docs] def print_available_collections(
self, collection=None, field_type=None, regex=None, details=True, path=False
):
pd.set_option("display.max_rows", None)
pd.set_option("display.max_colwidth", None)
collection_dfs = []
collection_names = []
for c_name, collection_obj in CONFIG_AEOLUS["collections"].items():
field_dfs = []
for _, ft in collection_obj.items():
fdf = pd.DataFrame(ft).transpose()
fdf.index.name = "identifier"
if regex is not None:
fdf = fdf[fdf.index.str.contains(regex, regex=True)]
if path is False:
del fdf["path"]
field_dfs.append(fdf)
ft_df = pd.concat(
field_dfs, names=["field type"], keys=collection_obj.keys()
)
if field_type is not None:
try:
ft_df = ft_df.loc[field_type]
collection_dfs.append(ft_df)
collection_names.append(c_name)
except KeyError:
pass
else:
collection_dfs.append(ft_df)
collection_names.append(c_name)
if len(collection_dfs) == 0:
print("Passed field_type not found")
return
df = pd.concat(collection_dfs, names=["collection"], keys=collection_names)
df.fillna("-", inplace=True)
if collection is not None:
try:
df = df.loc[collection]
except KeyError:
print("Passed collection not found")
return
if not details:
df = df.filter("")
return df
[docs] def set_bbox(self, bbox=None):
"""Set a bounding box to apply as filter.
Note:
Dictionary argument has to contain n, e, s, w keys for
north, east, south and west values as EPSG 4326 coordinates
Args:
bbox (dict)
"""
if bbox:
self._request_inputs.bbox = bbox
[docs] def set_fields(
self,
observation_fields=None,
measurement_fields=None,
ica_fields=None,
sca_fields=None,
mca_fields=None,
mie_profile_fields=None,
rayleigh_profile_fields=None,
rayleigh_wind_fields=None,
mie_wind_fields=None,
rayleigh_grouping_fields=None,
mie_grouping_fields=None,
group_fields=None,
fields=None,
):
if observation_fields:
self._request_inputs.observation_fields = ",".join(observation_fields)
if measurement_fields:
self._request_inputs.measurement_fields = ",".join(measurement_fields)
if ica_fields:
self._request_inputs.ica_fields = ",".join(ica_fields)
if mca_fields:
self._request_inputs.mca_fields = ",".join(mca_fields)
if sca_fields:
self._request_inputs.sca_fields = ",".join(sca_fields)
if mie_profile_fields:
self._request_inputs.mie_profile_fields = ",".join(mie_profile_fields)
if rayleigh_profile_fields:
self._request_inputs.rayleigh_profile_fields = ",".join(
rayleigh_profile_fields
)
if rayleigh_wind_fields:
self._request_inputs.rayleigh_wind_fields = ",".join(rayleigh_wind_fields)
if rayleigh_grouping_fields:
self._request_inputs.rayleigh_grouping_fields = ",".join(
rayleigh_grouping_fields
)
if mie_grouping_fields:
self._request_inputs.mie_grouping_fields = ",".join(mie_grouping_fields)
if mie_wind_fields:
self._request_inputs.mie_wind_fields = ",".join(mie_wind_fields)
if group_fields:
self._request_inputs.group_fields = ",".join(group_fields)
if fields:
self._request_inputs.fields = ",".join(fields)
[docs] def set_variables(self, aux_type=None, fields=None, dsd_info=False):
self._request_inputs.aux_type = aux_type
self._request_inputs.fields = fields
self._request_inputs.dsd_info = dsd_info
[docs] def set_range_filter(self, parameter=None, minimum=None, maximum=None):
"""Set a filter to apply.
Filters data for minimum ≤ parameter ≤ maximum
Note:
Apply multiple filters with successive calls to set_range_filter()
Args:
parameter (str)
minimum (float)
maximum (float)
"""
if not isinstance(parameter, str):
raise TypeError("parameter must be a str")
# Update filter dictionary
self._filterlist[parameter] = {"min": minimum, "max": maximum}
# Update the inputs object with dictionary converted
# to JSON used by XML template
self._request_inputs.filters = json.dumps(self._filterlist)
return self
[docs] def clear_range_filter(self):
"""Remove all applied filters."""
self._filterlist = []
self._request_inputs.filters = None
return self
[docs] def get_from_file(self, path=None, filetype="nc"):
"""Get VirES ReturnedData object from file path
Allows loading of locally saved netCDF file (e.g. using to_file method)
providing access to data manipulation methods such as as_xarray
Args:
path (str)
filetype (str)
"""
if filetype != "nc":
raise NotImplementedError(
"Currently only loading of netCDF files is supported"
)
df = ReturnedDataFile(filetype=filetype)
df._file.name = path
return df