Source code for flyingpigeon.processes.wps_ocgis_func

"""
The idea with this set of processes is to create one individual process for each
function. The advantage of using this approach rather than parameterizing the
function is that it allow platforms to parse the metadata of each function to
find those matching user search criteria.

These generic processes apply on a full dataset, that is, we assume that they
have been spatially and temporally cropped beforehand.


TODO: Add keywords to each function description once pyWPS implements support
for it.

Author: David Huard, Ouranos, 2017
"""

import logging
from collections import OrderedDict

from flyingpigeon.log import init_process_logger
from flyingpigeon.utils import GROUPING
from flyingpigeon.utils import archiveextract
from flyingpigeon.utils import rename_complexinputs
from ocgis.calc.library import register
from ocgis.contrib import library_icclim as libclim
from pywps import ComplexInput, ComplexOutput
from pywps import Format
from pywps import LiteralInput
from pywps import Process
from pywps.app.Common import Metadata

LOGGER = logging.getLogger("PYWPS")

# Register ocgis functions, including icclim
fr = register.FunctionRegistry()
register.register_icclim(fr)
icclim_classes = [k for k in fr.keys() if isinstance(k, str) and k.startswith('icclim')]


class IndicatorProcess(Process, object):
    """A Process class wrapping OCGIS functions."""
    key = 'to_be_subclassed'
    version = '1.0'

    #################
    # Common inputs #
    #################
    resource_inputs = [
        ComplexInput('resource', 'Resource',
                     abstract='NetCDF Files or archive (tar/zip) containing netCDF files.',
                     metadata=[Metadata('Info')],
                     min_occurs=1,
                     max_occurs=1000,
                     supported_formats=[
                         Format('application/x-netcdf'),
                         Format('application/x-tar'),
                         Format('application/zip'),
                     ]), ]

    option_inputs = [
        LiteralInput("grouping", "Grouping",
                     abstract="Temporal group over which the index is computed.",
                     default='yr',
                     data_type='string',
                     min_occurs=0,
                     max_occurs=1,  # len(GROUPING),
                     allowed_values=GROUPING
                     ), ]

    ############################
    # Function-specific inputs #
    ############################
    extra_inputs = []

    ##################
    # Common outputs #
    ##################
    outputs = [
        ComplexOutput('output_netcdf', 'Function output in netCDF',
                      abstract="The indicator values computed on the original input grid.",
                      as_reference=True,
                      supported_formats=[Format('application/x-netcdf')]
                      ),

        ComplexOutput('output_log', 'Logging information',
                      abstract="Collected logs during process run.",
                      as_reference=True,
                      supported_formats=[Format('text/plain')]),
    ]

    def __init__(self):
        self.load_meta()
        super(IndicatorProcess, self).__init__(
            self._handler,
            identifier=self.identifier,
            title=self.title,
            abstract=self.abstract,
            inputs=self.resource_inputs + self.option_inputs + self.extra_inputs,
            outputs=self.outputs,
            status_supported=True,
            store_supported=True,
        )

    def load_meta(self):
        """Extract process meta data from underlying object."""
        self.ocgis_cls = fr[self.key]
        self.identifier = self.ocgis_cls.key
        self.title = self.ocgis_cls.long_name
        self.abstract = self.ocgis_cls.description

    def _resource_input_handler(self, request):
        out = OrderedDict()

        for obj in self.resource_inputs:
            key = obj.identifier
            out[key] = archiveextract(resource=rename_complexinputs(
                request.inputs[key]))
        return out

    def _option_input_handler(self, request):
        from flyingpigeon.utils import calc_grouping
        out = {'calc_grouping': None}

        for obj in self.option_inputs:
            key = obj.identifier
            val = request.inputs[key][0].data

            if key == 'grouping':
                out['calc_grouping'] = calc_grouping(val)
            else:
                out[key] = val

        return out

    def _extra_input_handler(self, request):
        out = {}

        for obj in self.extra_inputs:
            key = obj.identifier
            out[key] = request.inputs[key][0].data
        return out

    def _handler(self, request, response):

        init_process_logger('log.txt')
        response.outputs['output_log'].file = 'log.txt'

        ######################################
        # Process inputs
        ######################################

        try:
            resources = self._resource_input_handler(request)
            options = self._option_input_handler(request)
            extras = self._extra_input_handler(request)

        except Exception as e:
            msg = 'Failed to read input parameter {}'.format(e)
            LOGGER.error(msg)
            raise Exception(msg)

        response.update_status('Input parameters ingested', 2)

        ######################################
        # Call ocgis function
        ######################################
        # Mapping for multivariate functions
        if getattr(self, 'has_required_variables', None):
            extras.update({k: k for k in resources.keys()})

        output = run_op(resource=resources,
                        calc=[{'func': self.identifier,
                               'name': self.identifier,
                               'kwds': extras}],
                        options=options)

        response.outputs['output_netcdf'].file = output

        response.update_status('Execution completed', 100)

        return response


def run_op(resource, calc, options):
    """Create an OCGIS operation, launch it and return the results."""
    from os.path import abspath, curdir
    from ocgis import OcgOperations, RequestDataset, env
    import uuid

    LOGGER.info('Start ocgis module call function')

    # Prepare the environment
    env.OVERWRITE = True
    dir_output = abspath(curdir)

    prefix = str(uuid.uuid1())
    env.PREFIX = prefix

    rd = [RequestDataset(val, variable=key if key != 'resource' else None) for key, val in resource.items()]

    ops = OcgOperations(dataset=rd,
                        calc=calc,
                        calc_grouping=options['calc_grouping'],
                        dir_output=dir_output,
                        prefix=prefix,
                        add_auxiliary_files=False,
                        output_format='nc')

    return ops.execute()


#############################################
#          Custom class definitions         #
#############################################


[docs]class FreezeThawProcess(IndicatorProcess): key = 'freezethaw' extra_inputs = [LiteralInput("threshold", "Threshold", abstract="The number of degree-days above or below the freezing point after which the" "ground is considered frozen or thawed.", data_type='float', default=15.0, min_occurs=0, max_occurs=1), ]
[docs]class Duration(IndicatorProcess): key = 'duration' extra_inputs = [LiteralInput("threshold", "Threshold", abstract="The threshold value to use for the logical operation.", data_type='float', min_occurs=1, max_occurs=1), LiteralInput("operation", "Operation", abstract="The logical operation. One of 'gt','gte','lt', or 'lte'.", data_type='string', min_occurs=1, max_occurs=1), LiteralInput("summary", "Summary", abstract="The summary operation to apply the durations. One of 'mean','median','std'," "'max', or 'min'.", data_type='string', default='mean', min_occurs=0, max_occurs=1), ]
############################################# # Automated ICCLIM class definitions # ############################################# # TODO: Implement check to make sure that the data is daily ? class ICCLIMProcess(IndicatorProcess): """Process class instantiated using definitions from the ICCLIM library. """ def load_meta(self): """Extract process meta data from underlying object.""" self.icclim_func = libclim._icclim_function_map[self.key]['func'] doc = self.icclim_func.func_doc self.ocgis_cls = fr[self.key] self.identifier = self.ocgis_cls.key self.title = self.ocgis_cls.key.split('_')[1] self.abstract = doc.split('\n')[1].strip() self.has_required_variables = hasattr(self.ocgis_cls, 'required_variables') if self.has_required_variables: self.resource_inputs = [] # No more resource input. for key in self.ocgis_cls.required_variables: self.resource_inputs.append( ComplexInput(key, key, abstract='NetCDF Files or archive (tar/zip) containing netCDF files.', metadata=[Metadata('Info')], min_occurs=1, max_occurs=1000, supported_formats=[ Format('application/x-netcdf'), Format('application/x-tar'), Format('application/zip'), ] )) def create_icclim_process_class(key): """Create a subclass of an ICCLIMProcess for a given indicator.""" name = key.upper()+'Process' clazz = type(name, (ICCLIMProcess,), {'key': key, '__name__': name}) return clazz ICCLIM_PROCESSES = [create_icclim_process_class(key) for key in icclim_classes] OCGIS_INDEX_PROCESSES = [FreezeThawProcess, Duration] + ICCLIM_PROCESSES __all__ = [c.__name__ for c in OCGIS_INDEX_PROCESSES] + ['OCGIS_INDEX_PROCESSES'] # Add generated classes to namespace for c in ICCLIM_PROCESSES: globals()[c.__name__] = c