Source code for flyingpigeon.processes.wps_pointinspection

import logging

from flyingpigeon.log import init_process_logger
from flyingpigeon.ocgis_module import call
from flyingpigeon.utils import archive, archiveextract
from flyingpigeon.utils import rename_complexinputs
from flyingpigeon.utils import sort_by_filename, get_values, get_time
from numpy import savetxt, column_stack
from pywps import ComplexInput, ComplexOutput
from pywps import Format
from pywps import LiteralInput
from pywps import Process
from pywps.app.Common import Metadata
from shapely.geometry import Point

LOGGER = logging.getLogger("PYWPS")


[docs]class PointinspectionProcess(Process): """ TODO: optionally provide point list as file (csv, geojson) and WFS service """ def __init__(self): inputs = [ ComplexInput('resource', 'Resource', abstract='NetCDF Files or archive (tar/zip) containing NetCDF files.', metadata=[Metadata('Info')], min_occurs=1, max_occurs=1000, supported_formats=[ Format('application/x-netcdf'), Format('application/x-tar'), Format('application/zip'), ]), LiteralInput("coords", "Coordinates", abstract="A comma-seperated tuple of WGS85 lon,lat decimal coordinates (e.g. 2.356138, 48.846450)", # noqa default="2.356138, 48.846450", data_type='string', min_occurs=1, max_occurs=100, ), ] outputs = [ ComplexOutput('output_log', 'Logging information', abstract="Collected logs during process run.", as_reference=True, supported_formats=[Format('text/plain')] ), ComplexOutput('tarout', 'Subsets', abstract="Tar archive containing the netCDF files", as_reference=True, supported_formats=[Format('application/x-tar')] ), ] super(PointinspectionProcess, self).__init__( self._handler, identifier="pointinspection", title="Point Inspection", abstract='Extract the timeseries at the given coordinates.', version="0.10", metadata=[ Metadata('LSCE', 'http://www.lsce.ipsl.fr/en/index.php'), Metadata('Doc', 'http://flyingpigeon.readthedocs.io/en/latest/'), ], inputs=inputs, outputs=outputs, status_supported=True, store_supported=True, ) def _handler(self, request, response): init_process_logger('log.txt') response.outputs['output_log'].file = 'log.txt' ncs = archiveextract( resource=rename_complexinputs(request.inputs['resource'])) LOGGER.info("ncs: %s " % ncs) coords = [] for coord in request.inputs['coords']: coords.append(coord.data) LOGGER.info("coords %s", coords) filenames = [] nc_exp = sort_by_filename(ncs, historical_concatination=True) for key in nc_exp.keys(): try: LOGGER.info('start calculation for %s ' % key) ncs = nc_exp[key] times = get_time(ncs) # , format='%Y-%m-%d_%H:%M:%S') concat_vals = times # ['%s-%02d-%02d_%02d:%02d:%02d' % # (t.year, t.month, t.day, t.hour, t.minute, t.second) for t in times] header = 'date_time' filename = '%s.csv' % key filenames.append(filename) for p in coords: try: response.update_status('processing point : {0}'.format(p), 20) # define the point: p = p.split(',') point = Point(float(p[0]), float(p[1])) # get the values timeseries = call(resource=ncs, geom=point, select_nearest=True) vals = get_values(timeseries) # concatenation of values header = header + ',%s-%s' % (p[0], p[1]) concat_vals = column_stack([concat_vals, vals]) except Exception as e: LOGGER.debug('failed for point %s %s' % (p, e)) response.update_status('*** all points processed for {0} ****'.format(key), 50) savetxt(filename, concat_vals, fmt='%s', delimiter=',', header=header) except Exception as e: LOGGER.debug('failed for %s %s' % (key, e)) # set the outputs response.update_status('*** creating output tar archive ****', 90) tarout_file = archive(filenames) response.outputs['tarout'].file = tarout_file return response