import json
from datetime import datetime
from dateutil import parser as date_parser
from pywps import Process
from pywps import LiteralInput
from pywps import ComplexInput
from pywps import LiteralOutput
from pywps import ComplexOutput
from pywps import Format, FORMATS
from pywps.app.Common import Metadata
from malleefowl.esgf.search import ESGSearch
import logging
LOGGER = logging.getLogger(__name__)
[docs]class ESGSearchProcess(Process):
"""
wps wrapper for esg search.
TODO: bbox constraint for datasets
"""
def __init__(self):
inputs = [
LiteralInput('url', 'URL',
data_type='string',
abstract="URL of ESGF Search Index. Example: http://esgf-data.dkrz.de/esg-search",
min_occurs=1,
max_occurs=1,
default="http://esgf-data.dkrz.de/esg-search",
),
LiteralInput('distrib', 'Distributed',
data_type='boolean',
abstract="If flag is set then a distributed search will be run.",
min_occurs=0,
max_occurs=1,
default='0',
),
LiteralInput('replica', 'Replica',
data_type='boolean',
abstract="If flag is set then search will include replicated datasets.",
min_occurs=0,
max_occurs=1,
default='False',
),
LiteralInput('latest', 'Latest',
data_type='boolean',
abstract="If flag is set then search will include only latest datasets.",
min_occurs=0,
max_occurs=1,
default='True',
),
LiteralInput('temporal', 'Temporal',
data_type='boolean',
abstract="If flag is set then search will use temporal filter.",
min_occurs=0,
max_occurs=1,
default='1',
),
LiteralInput('search_type', 'Search Type',
data_type='string',
abstract="Search on Datasets, Files or Aggregations.",
min_occurs=0,
max_occurs=1,
default='Dataset',
allowed_values=['Dataset', 'File', 'Aggregation']
),
LiteralInput('constraints', 'Constraints',
data_type='string',
abstract="Constraints as list of key/value pairs."
"Example: project:CORDEX, time_frequency:mon, variable:tas",
min_occurs=1,
max_occurs=1,
default="project:CORDEX, time_frequency:mon, variable:tas",
),
LiteralInput('query', 'Query',
data_type='string',
abstract="Freetext query. For Example: temperatue",
min_occurs=0,
max_occurs=1,
default='*',
),
LiteralInput('start', 'Start',
data_type='dateTime',
abstract="Startime: 2000-01-11T12:00:00Z",
min_occurs=0,
max_occurs=1,
),
LiteralInput('end', 'End',
data_type='dateTime',
abstract="Endtime: 2005-12-31T12:00:00Z",
min_occurs=0,
max_occurs=1,
),
LiteralInput('limit', 'Limit',
data_type='integer',
abstract="Maximum number of datasets in search result",
min_occurs=0,
max_occurs=1,
default='10',
allowed_values=[0, 1, 2, 5, 10, 20, 50, 100, 200]
),
LiteralInput('offset', 'Offset',
data_type='integer',
abstract="Start search of datasets at offset.",
min_occurs=0,
max_occurs=1,
default='0',
),
]
outputs = [
ComplexOutput('output', 'Search Result',
abstract="JSON document with search result",
as_reference=True,
supported_formats=[Format('application/json')]),
ComplexOutput('summary', 'Search Result Summary',
abstract="JSON document with search result summary",
as_reference=True,
supported_formats=[Format('application/json')]),
ComplexOutput('facet_counts', 'Facet Counts',
abstract="JSON document with facet counts for constraints.",
as_reference=True,
supported_formats=[Format('application/json')]),
]
super(ESGSearchProcess, self).__init__(
self._handler,
identifier="esgsearch",
title="ESGF Search",
version="0.6",
abstract="Search ESGF datasets, files and aggreations.",
metadata=[
Metadata('Birdhouse', 'http://bird-house.github.io/'),
Metadata('User Guide', 'http://malleefowl.readthedocs.io/en/latest/'),
],
inputs=inputs,
outputs=outputs,
status_supported=True,
store_supported=True,
)
def _handler(self, request, response):
distrib = False
if 'distrib' in request.inputs:
distrib = request.inputs['distrib'][0].data
replica = False
if 'replica' in request.inputs:
replica = request.inputs['replica'][0].data
latest = True
if 'latest' in request.inputs:
latest = request.inputs['latest'][0].data
esgsearch = ESGSearch(
url=request.inputs['url'][0].data,
distrib=distrib,
replica=replica,
latest=latest,
)
constrains_str = request.inputs['constraints'][0].data.strip()
constraints = []
for constrain in constrains_str.split(','):
key, value = constrain.split(':')
constraints.append((key.strip(), value.strip()))
if 'start' in request.inputs:
start = request.inputs['start'][0].data
else:
start = None
if 'end' in request.inputs:
end = request.inputs['end'][0].data
else:
end = None
if 'offset' in request.inputs:
offset = request.inputs['offset'][0].data
else:
offset = 0
if 'limit' in request.inputs:
limit = request.inputs['limit'][0].data
else:
limit = 10
if 'query' in request.inputs:
query = request.inputs['query'][0].data
else:
query = '*'
if 'search_type' in request.inputs:
search_type = request.inputs['search_type'][0].data
else:
search_type = 'Dataset'
temporal = True
if 'temporal' in request.inputs:
temporal = request.inputs['temporal'][0].data
(result, summary, facet_counts) = esgsearch.search(
constraints=constraints,
query=query,
start=start, end=end,
search_type=search_type,
limit=limit,
offset=offset,
temporal=temporal)
with open('out.json', 'w') as fp:
json.dump(obj=result, fp=fp, indent=4, sort_keys=True)
response.outputs['output'].file = fp.name
with open('summary.json', 'w') as fp:
json.dump(obj=summary, fp=fp, indent=4, sort_keys=True)
response.outputs['summary'].file = fp.name
with open('counts.json', 'w') as fp:
json.dump(obj=facet_counts, fp=fp, indent=4, sort_keys=True)
response.outputs['facet_counts'].file = fp.name
return response