import logging
from flyingpigeon.log import init_process_logger
from flyingpigeon.ocgis_module import call
from flyingpigeon.utils import archive, archiveextract
from flyingpigeon.utils import rename_complexinputs
from flyingpigeon.utils import sort_by_filename, get_values, get_time
from numpy import savetxt, column_stack
from pywps import ComplexInput, ComplexOutput
from pywps import Format
from pywps import LiteralInput
from pywps import Process
from pywps.app.Common import Metadata
from shapely.geometry import Point
LOGGER = logging.getLogger("PYWPS")
[docs]class PointinspectionProcess(Process):
"""
TODO: optionally provide point list as file (csv, geojson) and WFS service
"""
def __init__(self):
inputs = [
ComplexInput('resource', 'Resource',
abstract='NetCDF Files or archive (tar/zip) containing NetCDF files.',
metadata=[Metadata('Info')],
min_occurs=1,
max_occurs=1000,
supported_formats=[
Format('application/x-netcdf'),
Format('application/x-tar'),
Format('application/zip'),
]),
LiteralInput("coords", "Coordinates",
abstract="A comma-seperated tuple of WGS85 lon,lat decimal coordinates (e.g. 2.356138, 48.846450)",
# noqa
default="2.356138, 48.846450",
data_type='string',
min_occurs=1,
max_occurs=100,
),
]
outputs = [
ComplexOutput('output_log', 'Logging information',
abstract="Collected logs during process run.",
as_reference=True,
supported_formats=[Format('text/plain')]
),
ComplexOutput('tarout', 'Subsets',
abstract="Tar archive containing the netCDF files",
as_reference=True,
supported_formats=[Format('application/x-tar')]
),
]
super(PointinspectionProcess, self).__init__(
self._handler,
identifier="pointinspection",
title="Point Inspection",
abstract='Extract the timeseries at the given coordinates.',
version="0.10",
metadata=[
Metadata('LSCE', 'http://www.lsce.ipsl.fr/en/index.php'),
Metadata('Doc', 'http://flyingpigeon.readthedocs.io/en/latest/'),
],
inputs=inputs,
outputs=outputs,
status_supported=True,
store_supported=True,
)
def _handler(self, request, response):
init_process_logger('log.txt')
response.outputs['output_log'].file = 'log.txt'
ncs = archiveextract(
resource=rename_complexinputs(request.inputs['resource']))
LOGGER.info("ncs: %s " % ncs)
coords = []
for coord in request.inputs['coords']:
coords.append(coord.data)
LOGGER.info("coords %s", coords)
filenames = []
nc_exp = sort_by_filename(ncs, historical_concatination=True)
for key in nc_exp.keys():
try:
LOGGER.info('start calculation for %s ' % key)
ncs = nc_exp[key]
times = get_time(ncs) # , format='%Y-%m-%d_%H:%M:%S')
concat_vals = times # ['%s-%02d-%02d_%02d:%02d:%02d' %
# (t.year, t.month, t.day, t.hour, t.minute, t.second) for t in times]
header = 'date_time'
filename = '%s.csv' % key
filenames.append(filename)
for p in coords:
try:
response.update_status('processing point : {0}'.format(p), 20)
# define the point:
p = p.split(',')
point = Point(float(p[0]), float(p[1]))
# get the values
timeseries = call(resource=ncs, geom=point, select_nearest=True)
vals = get_values(timeseries)
# concatenation of values
header = header + ',%s-%s' % (p[0], p[1])
concat_vals = column_stack([concat_vals, vals])
except Exception as e:
LOGGER.debug('failed for point %s %s' % (p, e))
response.update_status('*** all points processed for {0} ****'.format(key), 50)
savetxt(filename, concat_vals, fmt='%s', delimiter=',', header=header)
except Exception as e:
LOGGER.debug('failed for %s %s' % (key, e))
# set the outputs
response.update_status('*** creating output tar archive ****', 90)
tarout_file = archive(filenames)
response.outputs['tarout'].file = tarout_file
return response