Source code for hydpy.cythons.modelutils

# -*- coding: utf-8 -*-
""" This module provides utilities to build Cython models based on Python models
automatically.

.. _`issue`: https://github.com/hydpy-dev/hydpy/issues

Most model developers do not need to be aware of the features implemented in module
|modelutils|, except that they need to initialise class |Cythonizer| within the main
modules of their base and application models (see, for example, the source code of base
model |hland| and application model |hland_96|).

However, when implementing models with functionalities not envisaged so far, problems
might arise.  Please contact the *HydPy* developer team then, preferably by opening an
`issue`_ on GitHub.  Potentially, problems could occur when defining parameters or
sequences with larger dimensionality than anticipated.  The following example shows the
Cython code lines for the |ELSModel.get_point_states| method of class |ELSModel|, used
for deriving the |test| model.  By now, we did only implement 0-dimensional and
1-dimensional sequences requiring this method.  After hackishly changing the
dimensionality of sequences |test_states.S|, we still seem to get plausible results,
but these are untested in model applications:

>>> from hydpy.models.test import cythonizer
>>> pyxwriter = cythonizer.pyxwriter
>>> from hydpy.cythons.modelutils import PyxPxdLines
>>> lines = PyxPxdLines()
>>> pyxwriter.get_point_states(lines)
            . get_point_states
>>> lines.pyx  # doctest: +ELLIPSIS
    cpdef inline void get_point_states(self) noexcept nogil:
        cdef ...int... idx0
        self.sequences.states.s = \
self.sequences.states._s_points[self.numvars.idx_stage]
        for idx0 in range(self.sequences.states._sv_length):
            self.sequences.states.sv[idx0] = \
self.sequences.states._sv_points[self.numvars.idx_stage][idx0]
<BLANKLINE>

>>> pyxwriter.model.sequences.states.s.NDIM = 2
>>> lines.pyx.clear()
>>> pyxwriter.get_point_states(lines)
            . get_point_states
>>> lines.pyx  # doctest: +ELLIPSIS
    cpdef inline void get_point_states(self) noexcept nogil:
        cdef ...int... idx0, idx1
        for idx0 in range(self.sequences.states._s_length0):
            for idx1 in range(self.sequences.states._s_length1):
                self.sequences.states.s[idx0, idx1] = \
self.sequences.states._s_points[self.numvars.idx_stage][idx0, idx1]
        for idx0 in range(self.sequences.states._sv_length):
            self.sequences.states.sv[idx0] = \
self.sequences.states._sv_points[self.numvars.idx_stage][idx0]
<BLANKLINE>

>>> pyxwriter.model.sequences.states.s.NDIM = 3
>>> pyxwriter.get_point_states(lines)
Traceback (most recent call last):
...
NotImplementedError: NDIM of sequence `s` is higher than expected.

The following examples show the results for some methods which are also related to
numerical integration but deal with |FluxSequence| objects.  We start with the method
|ELSModel.integrate_fluxes|:

>>> lines.pyx.clear()
>>> pyxwriter.integrate_fluxes(lines)
            . integrate_fluxes
>>> lines.pyx  # doctest: +ELLIPSIS
    cpdef inline void integrate_fluxes(self) noexcept nogil:
        cdef ...int... jdx, idx0
        self.sequences.fluxes.q = 0.
        for jdx in range(self.numvars.idx_method):
            self.sequences.fluxes.q = \
self.sequences.fluxes.q +self.numvars.dt * \
self.numconsts.a_coefs[self.numvars.idx_method-1, \
self.numvars.idx_stage, jdx]*self.sequences.fluxes._q_points[jdx]
        for idx0 in range(self.sequences.fluxes._qv_length):
            self.sequences.fluxes.qv[idx0] = 0.
            for jdx in range(self.numvars.idx_method):
                self.sequences.fluxes.qv[idx0] = \
self.sequences.fluxes.qv[idx0] + self.numvars.dt * \
self.numconsts.a_coefs[self.numvars.idx_method-1, self.numvars.idx_stage, jdx]*\
self.sequences.fluxes._qv_points[jdx, idx0]
<BLANKLINE>


>>> pyxwriter.model.sequences.fluxes.q.NDIM = 2
>>> lines.pyx.clear()
>>> pyxwriter.integrate_fluxes(lines)
            . integrate_fluxes
>>> lines.pyx  # doctest: +ELLIPSIS
    cpdef inline void integrate_fluxes(self) noexcept nogil:
        cdef ...int... jdx, idx0, idx1
        for idx0 in range(self.sequences.fluxes._q_length0):
            for idx1 in range(self.sequences.fluxes._q_length1):
                self.sequences.fluxes.q[idx0, idx1] = 0.
                for jdx in range(self.numvars.idx_method):
                    self.sequences.fluxes.q[idx0, idx1] = \
self.sequences.fluxes.q[idx0, idx1] + self.numvars.dt * \
self.numconsts.a_coefs[self.numvars.idx_method-1, self.numvars.idx_stage, jdx]*\
self.sequences.fluxes._q_points[jdx, idx0, idx1]
        for idx0 in range(self.sequences.fluxes._qv_length):
            self.sequences.fluxes.qv[idx0] = 0.
            for jdx in range(self.numvars.idx_method):
                self.sequences.fluxes.qv[idx0] = \
self.sequences.fluxes.qv[idx0] + self.numvars.dt * \
self.numconsts.a_coefs[self.numvars.idx_method-1, self.numvars.idx_stage, jdx]\
*self.sequences.fluxes._qv_points[jdx, idx0]
<BLANKLINE>

>>> pyxwriter.model.sequences.fluxes.q.NDIM = 3
>>> pyxwriter.integrate_fluxes(lines)
Traceback (most recent call last):
...
NotImplementedError: NDIM of sequence `q` is higher than expected.

Method |ELSModel.reset_sum_fluxes|:

>>> pyxwriter.model.sequences.fluxes.q.NDIM = 0
>>> lines.pyx.clear()
>>> pyxwriter.reset_sum_fluxes(lines)
            . reset_sum_fluxes
>>> lines.pyx  # doctest: +ELLIPSIS
    cpdef inline void reset_sum_fluxes(self) noexcept nogil:
        cdef ...int... idx0
        self.sequences.fluxes._q_sum = 0.
        for idx0 in range(self.sequences.fluxes._qv_length):
            self.sequences.fluxes._qv_sum[idx0] = 0.
<BLANKLINE>

>>> pyxwriter.model.sequences.fluxes.q.NDIM = 2
>>> lines.pyx.clear()
>>> pyxwriter.reset_sum_fluxes(lines)
            . reset_sum_fluxes
>>> lines.pyx  # doctest: +ELLIPSIS
    cpdef inline void reset_sum_fluxes(self) noexcept nogil:
        cdef ...int... idx0, idx1
        for idx0 in range(self.sequences.fluxes._q_length0):
            for idx1 in range(self.sequences.fluxes._q_length1):
                self.sequences.fluxes._q_sum[idx0, idx1] = 0.
        for idx0 in range(self.sequences.fluxes._qv_length):
            self.sequences.fluxes._qv_sum[idx0] = 0.
<BLANKLINE>

>>> pyxwriter.model.sequences.fluxes.q.NDIM = 3
>>> pyxwriter.reset_sum_fluxes(lines)
Traceback (most recent call last):
...
NotImplementedError: NDIM of sequence `q` is higher than expected.

Method |ELSModel.addup_fluxes|:

>>> pyxwriter.model.sequences.fluxes.q.NDIM = 0
>>> lines.pyx.clear()
>>> pyxwriter.addup_fluxes(lines)
            . addup_fluxes
>>> lines.pyx  # doctest: +ELLIPSIS
    cpdef inline void addup_fluxes(self) noexcept nogil:
        cdef ...int... idx0
        self.sequences.fluxes._q_sum = \
self.sequences.fluxes._q_sum + self.sequences.fluxes.q
        for idx0 in range(self.sequences.fluxes._qv_length):
            self.sequences.fluxes._qv_sum[idx0] = \
self.sequences.fluxes._qv_sum[idx0] + self.sequences.fluxes.qv[idx0]
<BLANKLINE>

>>> pyxwriter.model.sequences.fluxes.q.NDIM = 2
>>> lines.pyx.clear()
>>> pyxwriter.addup_fluxes(lines)
            . addup_fluxes
>>> lines.pyx  # doctest: +ELLIPSIS
    cpdef inline void addup_fluxes(self) noexcept nogil:
        cdef ...int... idx0, idx1
        for idx0 in range(self.sequences.fluxes._q_length0):
            for idx1 in range(self.sequences.fluxes._q_length1):
                self.sequences.fluxes._q_sum[idx0, idx1] = \
self.sequences.fluxes._q_sum[idx0, idx1] + self.sequences.fluxes.q[idx0, idx1]
        for idx0 in range(self.sequences.fluxes._qv_length):
            self.sequences.fluxes._qv_sum[idx0] = \
self.sequences.fluxes._qv_sum[idx0] + self.sequences.fluxes.qv[idx0]
<BLANKLINE>

>>> pyxwriter.model.sequences.fluxes.q.NDIM = 3
>>> pyxwriter.addup_fluxes(lines)
Traceback (most recent call last):
...
NotImplementedError: NDIM of sequence `q` is higher than expected.

Method |ELSModel.calculate_error|:

>>> pyxwriter.model.sequences.fluxes.q.NDIM = 0
>>> lines.pyx.clear()
>>> pyxwriter.calculate_error(lines)
            . calculate_error
>>> lines.pyx  # doctest: +ELLIPSIS
    cpdef inline void calculate_error(self) noexcept nogil:
        cdef ...int... idx0
        cdef double abserror
        self.numvars.abserror = 0.
        if self.numvars.use_relerror:
            self.numvars.relerror = 0.
        else:
            self.numvars.relerror = inf
        abserror = fabs(\
self.sequences.fluxes._q_results[self.numvars.idx_method]-\
self.sequences.fluxes._q_results[self.numvars.idx_method-1])
        self.numvars.abserror = max(self.numvars.abserror, abserror)
        if self.numvars.use_relerror:
            if self.sequences.fluxes._q_results[self.numvars.idx_method] == 0.:
                self.numvars.relerror = inf
            else:
                self.numvars.relerror = max(self.numvars.relerror, \
fabs(abserror/self.sequences.fluxes._q_results[self.numvars.idx_method]))
        for idx0 in range(self.sequences.fluxes._qv_length):
            abserror = fabs(\
self.sequences.fluxes._qv_results[self.numvars.idx_method, idx0]-\
self.sequences.fluxes._qv_results[self.numvars.idx_method-1, idx0])
            self.numvars.abserror = max(self.numvars.abserror, abserror)
            if self.numvars.use_relerror:
                if self.sequences.fluxes._qv_results\
[self.numvars.idx_method, idx0] == 0.:
                    self.numvars.relerror = inf
                else:
                    self.numvars.relerror = max(self.numvars.relerror, \
fabs(abserror/self.sequences.fluxes._qv_results[self.numvars.idx_method, idx0]))
<BLANKLINE>

>>> pyxwriter.model.sequences.fluxes.q.NDIM = 2
>>> lines.pyx.clear()
>>> pyxwriter.calculate_error(lines)
            . calculate_error
>>> lines.pyx  # doctest: +ELLIPSIS
    cpdef inline void calculate_error(self) noexcept nogil:
        cdef ...int... idx0, idx1
        cdef double abserror
        self.numvars.abserror = 0.
        if self.numvars.use_relerror:
            self.numvars.relerror = 0.
        else:
            self.numvars.relerror = inf
        for idx0 in range(self.sequences.fluxes._q_length0):
            for idx1 in range(self.sequences.fluxes._q_length1):
                abserror = fabs(\
self.sequences.fluxes._q_results[self.numvars.idx_method, idx0, idx1]-\
self.sequences.fluxes._q_results[self.numvars.idx_method-1, idx0, idx1])
                self.numvars.abserror = max(self.numvars.abserror, abserror)
                if self.numvars.use_relerror:
                    if self.sequences.fluxes._q_results\
[self.numvars.idx_method, idx0, idx1] == 0.:
                        self.numvars.relerror = inf
                    else:
                        self.numvars.relerror = max(self.numvars.relerror, fabs(\
abserror/self.sequences.fluxes._q_results[self.numvars.idx_method, idx0, idx1]))
        for idx0 in range(self.sequences.fluxes._qv_length):
            abserror = fabs(\
self.sequences.fluxes._qv_results[self.numvars.idx_method, idx0]-\
self.sequences.fluxes._qv_results[self.numvars.idx_method-1, idx0])
            self.numvars.abserror = max(self.numvars.abserror, abserror)
            if self.numvars.use_relerror:
                if self.sequences.fluxes._qv_results\
[self.numvars.idx_method, idx0] == 0.:
                    self.numvars.relerror = inf
                else:
                    self.numvars.relerror = max(\
self.numvars.relerror, \
fabs(abserror/self.sequences.fluxes._qv_results[self.numvars.idx_method, idx0]))
<BLANKLINE>

>>> pyxwriter.model.sequences.fluxes.q.NDIM = 3
>>> pyxwriter.calculate_error(lines)
Traceback (most recent call last):
...
NotImplementedError: NDIM of sequence `q` is higher than expected.
"""
# import...
# ...from standard library
from __future__ import annotations
import copy

# pylint: enable=no-name-in-module
# pylint: enable=import-error
import functools
import importlib
import inspect
import math
import os
import platform
import shutil
import sys
import types

# ...third party modules
import numpy
from numpy import inf  # pylint: disable=unused-import
from numpy import nan  # pylint: disable=unused-import
import setuptools

# ...from HydPy
import hydpy
from hydpy import config
from hydpy import cythons
from hydpy.core import exceptiontools
from hydpy.core import importtools
from hydpy.core import modeltools
from hydpy.core import objecttools
from hydpy.core import parametertools
from hydpy.core import sequencetools
from hydpy.core import testtools
from hydpy.core.typingtools import *
from hydpy.cythons import autogenpath


if TYPE_CHECKING:
    import Cython.Build as build
else:
    build = exceptiontools.OptionalImport("build", ["Cython.Build"], locals())


[docs] def get_dllextension() -> str: """Return the DLL file extension for the current operating system. The returned value depends on the response of function |platform.system| of module |platform|. |get_dllextension| returns `.pyd` if |platform.system| returns the string "windows" and `.so` for all other strings: >>> from hydpy.cythons.modelutils import get_dllextension >>> import platform >>> from unittest import mock >>> with mock.patch.object( ... platform, "system", side_effect=lambda: "Windows") as mocked: ... get_dllextension() '.pyd' >>> with mock.patch.object( ... platform, "system", side_effect=lambda: "Linux") as mocked: ... get_dllextension() '.so' """ if platform.system().lower() == "windows": return ".pyd" return ".so"
_dllextension = get_dllextension() INT = "numpy.int64_t" TYPE2STR: dict[Union[type[Any], str, None], str] = { # pylint: disable=duplicate-key bool: "numpy.npy_bool", "bool": "numpy.npy_bool", int: INT, "int": INT, parametertools.IntConstant: INT, "parametertools.IntConstant": INT, "IntConstant": INT, float: "double", "float": "double", str: "str", "str": "str", None: "void", "None": "void", type(None): "void", Vector: "double[:]", # to be removed as soon as possible "Vector": "double[:]", "Vector": "double[:]", VectorFloat: "double[:]", # This works because the `__getitem__` # of `_ProtocolMeta` is decorated by `_tp_cache`. I don't know if this caching # is documented behaviour, so this might cause (little) trouble in the future. "VectorFloat": "double[:]", "VectorFloat": "double[:]", } """Maps Python types to Cython compatible type declarations. The Cython type belonging to Python's |int| is selected to agree with numpy's default integer type on the current platform/system. """ _checkable_types: list[type[Any]] = [] for maybe_a_type in TYPE2STR: try: isinstance(1, maybe_a_type) # type: ignore[arg-type] except TypeError: continue assert isinstance(maybe_a_type, type) _checkable_types.append(maybe_a_type) CHECKABLE_TYPES: tuple[type[Any], ...] = tuple(_checkable_types) """"Real types" of |TYPE2STR| allowed as second arguments of function |isinstance|.""" del _checkable_types NDIM2STR = {0: "", 1: "[:]", 2: "[:,:]", 3: "[:,:,:]"} _nogil = " noexcept nogil" if config.FASTCYTHON else ""
[docs] class Lines(list[str]): """Handles the code lines for a `.pyx` or a `pxd` file.""" def __init__(self, *args: str) -> None: super().__init__(args)
[docs] def add(self, indent: int, line: Mayberable1[str]) -> None: """Append the given text line with prefixed spaces following the given number of indentation levels.""" if isinstance(line, str): self.append(indent * 4 * " " + line) else: for subline in line: self.append(indent * 4 * " " + subline)
def __repr__(self) -> str: return "\n".join(self) + "\n"
[docs] class PyxPxdLines: """Handles the code lines for a `.pyx` and a `pxd` file.""" pyx: Lines pxd: Lines def __init__(self) -> None: self.pyx = Lines() self.pxd = Lines()
[docs] def add(self, indent: int, line: str) -> None: """Pass the given data to method |Lines.add| of the `pyx` and `pxd` |Lines| instances.""" self.pyx.add(indent, line) if line.endswith(":") and (" class " not in line): line = line[:-1] self.pxd.add(indent, line)
[docs] def get_methodheader( methodname: str, nogil: bool = False, idxarg: bool = False, inline: bool = True ) -> str: """Returns the Cython method header for methods without arguments except`self`. Note the influence of the configuration flag `FASTCYTHON`: >>> from hydpy.cythons.modelutils import get_methodheader >>> from hydpy import config >>> config.FASTCYTHON = False >>> print(get_methodheader("test", nogil=True, idxarg=False, inline=True)) cpdef inline void test(self): >>> config.FASTCYTHON = True >>> methodheader = get_methodheader("test", nogil=True, idxarg=True, inline=False) >>> print(methodheader) # doctest: +ELLIPSIS cpdef void test(self, ...int... idx) noexcept nogil: """ if not config.FASTCYTHON: nogil = False nogil_ = " noexcept nogil" if nogil else "" idxarg_ = f", {INT} idx" if idxarg else "" inline_ = " inline" if inline else "" return f"cpdef{inline_} void {methodname}(self{idxarg_}){nogil_}:"
[docs] def decorate_method( wrapped: Callable[[PyxWriter], Iterator[str]] ) -> Callable[[PyxWriter, PyxPxdLines], None]: """The decorated method returns a |Lines| object including a method header. However, the |Lines| object is empty if the respective model does not implement a method with the same name as the wrapped method. """ def wrapper(self: PyxWriter, lines: PyxPxdLines) -> None: if hasattr(self.model, wrapped.__name__): print(f" . {wrapped.__name__}") pyx, both = lines.pyx.add, lines.add both(1, get_methodheader(wrapped.__name__, nogil=True)) for line in wrapped(self): pyx(2, line) functools.update_wrapper(wrapper, wrapped) return wrapper
[docs] def compile_(cyname: str, pyxfilepath: str, buildpath: str) -> None: """Translate Cython code to C code and compile it.""" argv = copy.deepcopy(sys.argv) try: sys.argv = [ sys.argv[0], "build_ext", f"--build-lib={buildpath}", f"--build-temp={buildpath}", ] print(sys.argv) exc_modules = [ setuptools.Extension( name=f"hydpy.cythons.autogen.{cyname}", sources=[pyxfilepath], extra_compile_args=["-O2"], ) ] setuptools.setup( ext_modules=build.cythonize(exc_modules), include_dirs=[numpy.get_include()] ) finally: sys.argv = argv
[docs] def move_dll(pyname: str, cyname: str, cydirpath: str, buildpath: str) -> None: """Try to find the DLL file created by function |compile_| and try to move it to the `autogen` folder of the `cythons` subpackage. Usually, one does not need to apply |move_dll| directly. However, if you are a model developer, you might see one of the following error messages from time to time: >>> from hydpy.cythons.modelutils import move_dll >>> from hydpy.models.hland_96 import cythonizer as c >>> move_dll(pyname=c.pyname, cyname=c.cyname, ... cydirpath=c.cydirpath, buildpath=c.buildpath) # doctest: +ELLIPSIS Traceback (most recent call last): ... OSError: After trying to cythonize `hland_96`, the resulting file `c_hland_96...` \ could not be found in directory `.../hydpy/cythons/autogen/_build` nor any of its \ subdirectories. The distutil report should tell whether the file has been stored \ somewhere else, is named somehow else, or could not be build at all. >>> import os >>> from unittest import mock >>> from hydpy import TestIO >>> with TestIO(): # doctest: +ELLIPSIS ... with mock.patch.object( ... type(c), "buildpath", new_callable=mock.PropertyMock ... ) as mocked_buildpath: ... mocked_buildpath.return_value = "_build" ... os.makedirs("_build/subdir", exist_ok=True) ... filepath = f"_build/subdir/c_hland_96{get_dllextension()}" ... with open(filepath, "w"): ... pass ... with mock.patch( ... "shutil.move", ... side_effect=PermissionError("Denied!")): ... move_dll(pyname=c.pyname, cyname=c.cyname, ... cydirpath=c.cydirpath, buildpath=c.buildpath) Traceback (most recent call last): ... PermissionError: After trying to cythonize `hland_96`, when trying to move the \ final cython module `c_hland_96...` from directory `_build` to directory \ `.../hydpy/cythons/autogen`, the following error occurred: Denied! A likely error \ cause is that the cython module `c_hland_96...` does already exist in this directory \ and is currently blocked by another Python process. Maybe it helps to close all \ Python processes and restart the cythonization afterwards. """ dirinfos = os.walk(buildpath) system_dependent_filename = None for dirinfo in dirinfos: for filename in dirinfo[2]: if filename.startswith(cyname) and filename.endswith(_dllextension): system_dependent_filename = filename break if system_dependent_filename: try: shutil.move( os.path.join(dirinfo[0], system_dependent_filename), os.path.join(cydirpath, cyname + _dllextension), ) break except BaseException: objecttools.augment_excmessage( f"After trying to cythonize `{pyname}`, when trying to move the " f"final cython module `{system_dependent_filename}` from " f"directory `{buildpath}` to directory " f"`{objecttools.repr_(cydirpath)}`", f"A likely error cause is that the cython module " f"`{cyname}{_dllextension}` does already exist in this directory " f"and is currently blocked by another Python process. Maybe it " f"helps to close all Python processes and restart the " f"cythonization afterwards.", ) else: raise IOError( f"After trying to cythonize `{pyname}`, the resulting file " f"`{cyname}{_dllextension}` could not be found in directory " f"`{objecttools.repr_(buildpath)}` nor any of its subdirectories. The " f"distutil report should tell whether the file has been stored somewhere " f"else, is named somehow else, or could not be build at all." )
[docs] class Cythonizer: """Handles the writing, compiling and initialisation of Cython models.""" Model: type[modeltools.Model] Parameters: type[parametertools.Parameters] Sequences: type[sequencetools.Sequences] tester: testtools.Tester pymodule: str _cymodule: Optional[types.ModuleType] def __init__(self) -> None: self._cymodule = None frame = inspect.currentframe() assert frame is not None frame = frame.f_back assert frame is not None self.pymodule = frame.f_globals["__name__"] for key, value in frame.f_locals.items(): setattr(self, key, value)
[docs] def cythonize(self) -> None: """Translate Python source code of the relevant model first into Cython and then into C, compile it, and move the resulting dll file to the `autogen` subfolder of subpackage `cythons`.""" print(f"Translate module/package {self.pyname}.") self.pyxwriter.write() print(f"Compile module {self.cyname}.") compile_( cyname=self.cyname, pyxfilepath=self.pyxfilepath, buildpath=self.buildpath ) move_dll( pyname=self.pyname, cyname=self.cyname, cydirpath=self.cydirpath, buildpath=self.buildpath, )
@property def pyname(self) -> str: """Name of the original Python module or package. >>> from hydpy.models.hland import cythonizer >>> cythonizer.pyname 'hland' >>> from hydpy.models.hland_96 import cythonizer >>> cythonizer.pyname 'hland_96' """ return self.pymodule.split(".")[-1] @property def cyname(self) -> str: """Name of the compiled module. >>> from hydpy.models.hland import cythonizer >>> cythonizer.cyname 'c_hland' >>> from hydpy.models.hland_96 import cythonizer >>> cythonizer.cyname 'c_hland_96' """ return "c_" + self.pyname @property def cydirpath(self) -> str: """The absolute path of the directory containing the compiled modules. >>> from hydpy.models.hland import cythonizer >>> from hydpy import repr_ >>> repr_(cythonizer.cydirpath) # doctest: +ELLIPSIS '.../hydpy/cythons/autogen' >>> import os >>> os.path.exists(cythonizer.cydirpath) True """ return cythons.autogen.__path__[0] @property def cymodule(self) -> types.ModuleType: """The compiled module. Property |Cythonizer.cymodule| returns the relevant DLL module: >>> from hydpy.models.hland_96 import cythonizer >>> from hydpy.cythons.autogen import c_hland_96 >>> c_hland_96 is cythonizer.cymodule True However, if this module is missing for some reasons, it tries to create the module first and returns it afterwards. For demonstration purposes, we define a wrong |Cythonizer.cyname|: >>> from hydpy.cythons.modelutils import Cythonizer >>> cyname = Cythonizer.cyname >>> Cythonizer.cyname = "wrong" >>> cythonizer._cymodule = None >>> from unittest import mock >>> with mock.patch.object(Cythonizer, "cythonize") as mock: ... cythonizer.cymodule Traceback (most recent call last): ... ModuleNotFoundError: No module named 'hydpy.cythons.autogen.wrong' >>> mock.call_args_list [call()] >>> Cythonizer.cyname = cyname """ cymodule = self._cymodule if cymodule: return cymodule modulepath = f"hydpy.cythons.autogen.{self.cyname}" try: self._cymodule = importlib.import_module(modulepath) except ModuleNotFoundError: self.cythonize() self._cymodule = importlib.import_module(modulepath) return self._cymodule @property def pyxfilepath(self) -> str: """The absolute path of the compiled module. >>> from hydpy.models.hland_96 import cythonizer >>> from hydpy import repr_ >>> repr_(cythonizer.pyxfilepath) # doctest: +ELLIPSIS '.../hydpy/cythons/autogen/c_hland_96.pyx' >>> import os >>> os.path.exists(cythonizer.pyxfilepath) True """ return os.path.join(self.cydirpath, f"{self.cyname}.pyx") @property def dllfilepath(self) -> str: """The absolute path of the compiled module. >>> from hydpy.models.hland_96 import cythonizer >>> from hydpy import repr_ >>> repr_(cythonizer.dllfilepath) # doctest: +ELLIPSIS '.../hydpy/cythons/autogen/c_hland_96...' >>> import os >>> os.path.exists(os.path.split(cythonizer.dllfilepath)[0]) True """ return os.path.join(self.cydirpath, f"{self.cyname}{_dllextension}") @property def buildpath(self) -> str: """The absolute path for temporarily build files. >>> from hydpy.models.hland_96 import cythonizer >>> from hydpy import repr_ >>> repr_(cythonizer.buildpath) # doctest: +ELLIPSIS '.../hydpy/cythons/autogen/_build' """ return os.path.join(self.cydirpath, "_build") @property def pyxwriter(self) -> PyxWriter: """A new |PyxWriter| instance. >>> from hydpy.models.hland_96 import cythonizer >>> pyxwriter = cythonizer.pyxwriter >>> from hydpy import classname >>> classname(pyxwriter) 'PyxWriter' >>> cythonizer.pyxwriter is pyxwriter False """ model = self.Model() dict_ = vars(self) dict_["model"] = model model.parameters = importtools.prepare_parameters(dict_) model.sequences = importtools.prepare_sequences(dict_) return PyxWriter(self, model, self.pyxfilepath)
[docs] class PyxWriter: """Translates the source code of Python models into Cython source code. Method |PyxWriter| serves as a master method, which triggers the complete writing process. The other properties and methods supply the required code lines. Their names are selected to match the names of the original Python models as closely as possible. """ cythonizer: Cythonizer model: modeltools.Model pyxpath: str pxdpath: str def __init__( self, cythonizer: Cythonizer, model: modeltools.Model, pyxpath: str ) -> None: self.cythonizer = cythonizer self.model = model self.pyxpath = pyxpath self.pxdpath = pyxpath.replace(".pyx", ".pxd")
[docs] def write(self) -> None: """Collect the source code and write it into a Cython extension file ("pyx") and its definition file ("pxd").""" lines = PyxPxdLines() print(" * cython options") self.cythondistutilsoptions(lines) print(" * C imports") self.cimports(lines) print(" - callback features") self.callbackfeatures(lines) print(" * constants (if defined)") self.constants(lines) print(" * parameter classes") self.parameters(lines) print(" * sequence classes") self.sequences(lines) print(" * numerical parameters") self.numericalparameters(lines) print(" * submodel classes") self.submodels(lines) print(" * model class") print(" - model attributes") self.modeldeclarations(lines) print(" - standard functions") self.modelstandardfunctions(lines) print(" - numeric functions") self.modelnumericfunctions(lines) print(" - additional functions") self.modeluserfunctions(lines) with open(self.pyxpath, "w", encoding=config.ENCODING) as pyxfile: pyxfile.write(repr(lines.pyx)) with open(self.pxdpath, "w", encoding=config.ENCODING) as pxdfile: pxdfile.write(repr(lines.pxd))
[docs] def cythondistutilsoptions(self, lines: PyxPxdLines) -> None: """Cython and Distutils option lines. Use the configuration options "FASTCYTHON" and "PROFILECYTHON" to configure the cythonization processes as follows: >>> from hydpy.cythons.modelutils import PyxWriter >>> pyxwriter = PyxWriter(None, None, "file.pyx") >>> from hydpy.cythons.modelutils import PyxPxdLines >>> lines = PyxPxdLines() >>> pyxwriter.cythondistutilsoptions(lines) >>> lines.pyx # doctest: +ELLIPSIS #!python # distutils: define_macros=NPY_NO_DEPRECATED_API=NPY_1_7_API_VERSION # cython: language_level=3 # cython: cpow=True # cython: boundscheck=False # cython: wraparound=False # cython: initializedcheck=False # cython: cdivision=True <BLANKLINE> >>> from hydpy import config >>> config.FASTCYTHON = False >>> config.PROFILECYTHON = True >>> lines.pyx.clear() >>> pyxwriter.cythondistutilsoptions(lines) >>> lines.pyx # doctest: +ELLIPSIS #!python # distutils: define_macros=NPY_NO_DEPRECATED_API=NPY_1_7_API_VERSION # cython: language_level=3 # cython: cpow=True # cython: boundscheck=True # cython: wraparound=True # cython: initializedcheck=True # cython: cdivision=False # cython: linetrace=True # distutils: define_macros=CYTHON_TRACE=1 # distutils: define_macros=CYTHON_TRACE_NOGIL=1 <BLANKLINE> >>> config.FASTCYTHON = True >>> config.PROFILECYTHON = False """ # ToDo: do not share code with prepare.__prepare_cythonoptions both = lines.add both(0, "#!python") both(0, "# distutils: define_macros=NPY_NO_DEPRECATED_API=NPY_1_7_API_VERSION") both(0, "# cython: language_level=3") both(0, "# cython: cpow=True") if config.FASTCYTHON: both(0, "# cython: boundscheck=False") both(0, "# cython: wraparound=False") both(0, "# cython: initializedcheck=False") both(0, "# cython: cdivision=True") else: both(0, "# cython: boundscheck=True") both(0, "# cython: wraparound=True") both(0, "# cython: initializedcheck=True") both(0, "# cython: cdivision=False") if config.PROFILECYTHON: both(0, "# cython: linetrace=True") both(0, "# distutils: define_macros=CYTHON_TRACE=1") both(0, "# distutils: define_macros=CYTHON_TRACE_NOGIL=1")
[docs] def cimports(self, lines: PyxPxdLines) -> None: """Import command lines.""" add = lines.add add(0, "from typing import Optional") add(0, "import numpy") add(0, "cimport numpy") add( 0, "from libc.math cimport exp, fabs, log, sin, cos, tan, asin, acos, atan, " "isnan, isinf", ) add(0, "from libc.math cimport NAN as nan") add(0, "from libc.math cimport INFINITY as inf") add(0, "import cython") add(0, "from cpython.mem cimport PyMem_Malloc") add(0, "from cpython.mem cimport PyMem_Realloc") add(0, "from cpython.mem cimport PyMem_Free") add(0, "from hydpy.cythons.autogen cimport configutils") add(0, "from hydpy.cythons.autogen cimport interfaceutils") add(0, "from hydpy.cythons.autogen cimport interputils") add(0, "from hydpy.cythons.autogen import pointerutils") add(0, "from hydpy.cythons.autogen cimport pointerutils") add(0, "from hydpy.cythons.autogen cimport quadutils") add(0, "from hydpy.cythons.autogen cimport rootutils") add(0, "from hydpy.cythons.autogen cimport smoothutils") add(0, "from hydpy.cythons.autogen cimport masterinterface")
[docs] def constants(self, lines: PyxPxdLines) -> None: """Constants declaration lines.""" both = lines.add for name, member in vars(self.cythonizer).items(): if ( name.isupper() and not inspect.isclass(member) and isinstance(member, CHECKABLE_TYPES) ): ndim = numpy.array(member).ndim ctype = TYPE2STR[type(member)] + NDIM2STR[ndim] both(0, f"cdef public {ctype} {name} = {member}")
[docs] def parameters(self, lines: PyxPxdLines) -> None: """Parameter declaration lines.""" if pars := self.model.parameters: pt = parametertools pyx, pxd, both = lines.pyx.add, lines.pxd.add, lines.add both(0, "@cython.final") both(0, "cdef class Parameters:") pyx(1, "pass") if not self.model.parameters: pxd(1, "pass") for subpars in pars: pxd(1, f"cdef public {type(subpars).__name__} {subpars.name}") for subpars in pars: print(f" - {subpars.name}") both(0, "@cython.final") both(0, f"cdef class {type(subpars).__name__}:") for par in subpars: try: ctype = TYPE2STR[par.TYPE] + NDIM2STR[par.NDIM] except KeyError: ctype = par.TYPE + NDIM2STR[par.NDIM] # type: ignore[operator] pxd(1, f"cdef public {ctype} {par.name}") if isinstance(par, pt.KeywordParameter1D): pxd(1, f"cdef public {TYPE2STR[int]} _{par.name}_entrymin") elif isinstance(par, pt.KeywordParameter2D): prefix = f"cdef public {TYPE2STR[int]} _{par.name}" for suffix in ("rowmin", "columnmin"): pxd(1, f"{prefix}_{suffix}") elif isinstance(par, pt.CallbackParameter): pxd(1, f"cdef CallbackType {par.name}_callback") pxd(1, f"cdef CallbackWrapper _{par.name}_wrapper") if callbackpars := tuple( p for p in subpars if isinstance(p, pt.CallbackParameter) ): pyx(0, "") for par in callbackpars: cn = f"{par.name}_callback" both(1, f"cpdef void init_{cn}(self):") pyx(2, f"self.{cn} = do_nothing") pyx(2, "cdef CallbackWrapper wrapper = CallbackWrapper()") pyx(2, "wrapper.callback = do_nothing") pyx(2, f"self._{par.name}_wrapper = wrapper") pyx(0, "") both(1, f"cpdef CallbackWrapper get_{cn}(self):") pyx(2, f"return self._{par.name}_wrapper") pyx(0, "") both(1, f"cpdef void set_{cn}(self, CallbackWrapper wrapper):") pyx(2, f"self.{cn} = wrapper.callback") pyx(2, f"self._{par.name}_wrapper = wrapper") pyx(0, "") else: pyx(1, "pass")
[docs] def sequences(self, lines: PyxPxdLines) -> None: """Sequence declaration lines.""" sqt = sequencetools pyx, pxd, both = lines.pyx.add, lines.pxd.add, lines.add both(0, "@cython.final") both(0, "cdef class Sequences:") pyx(1, "pass") if not self.model.sequences: pxd(1, "pass") for subseqs in self.model.sequences: pxd(1, f"cdef public {type(subseqs).__name__} {subseqs.name}") if self.model.sequences.states: pxd(1, "cdef public StateSequences old_states") pxd(1, "cdef public StateSequences new_states") for subseqs in self.model.sequences: print(f" - {subseqs.name}") both(0, "@cython.final") both(0, f"cdef class {type(subseqs).__name__}:") if isinstance(subseqs, (sqt.LogSequences, sqt.AideSequences)): pyx(1, "pass") for seq in subseqs: ctype = f"double{NDIM2STR[seq.NDIM]}" if isinstance(subseqs, sqt.LinkSequences): if seq.NDIM == 0: pxd(1, f"cdef double *{seq.name}") elif seq.NDIM == 1: pxd(1, f"cdef double **{seq.name}") pxd(1, f"cdef public {INT} len_{seq.name}") pxd(1, f"cdef public {TYPE2STR[int]}[:] _{seq.name}_ready") else: pxd(1, f"cdef public {ctype} {seq.name}") pxd(1, f"cdef public {INT} _{seq.name}_ndim") pxd(1, f"cdef public {INT} _{seq.name}_length") for idx in range(seq.NDIM): pxd(1, f"cdef public {INT} _{seq.name}_length_{idx}") if seq.NUMERIC: ctype_numeric = "double" + NDIM2STR[seq.NDIM + 1] pxd(1, f"cdef public {ctype_numeric} _{seq.name}_points") pxd(1, f"cdef public {ctype_numeric} _{seq.name}_results") if isinstance(subseqs, sqt.FluxSequences): pxd(1, f"cdef public {ctype_numeric} " f"_{seq.name}_integrals") pxd(1, f"cdef public {ctype} _{seq.name}_sum") if isinstance(seq, sqt.IOSequence): self.iosequence(lines, seq) if isinstance(subseqs, sqt.IOSequences): self.load_data(lines, subseqs) self.save_data(lines, subseqs) if isinstance(subseqs, sqt.LinkSequences): self.set_pointer(lines, subseqs) self.get_value(lines, subseqs) self.set_value(lines, subseqs) if isinstance(subseqs, (sqt.InputSequences, sqt.OutputSequences)): self.set_pointer(lines, subseqs) if isinstance(subseqs, sqt.OutputSequences): self.update_outputs(lines, subseqs)
[docs] @staticmethod def iosequence(lines: PyxPxdLines, seq: sequencetools.IOSequence) -> None: """Declaration lines for the given |IOSequence| object.""" ctype = f"double{NDIM2STR[seq.NDIM+1]}" add = lines.pxd.add add(1, f"cdef public bint _{seq.name}_ramflag") add(1, f"cdef public {ctype} _{seq.name}_array") add(1, f"cdef public bint _{seq.name}_diskflag_reading") add(1, f"cdef public bint _{seq.name}_diskflag_writing") add(1, f"cdef public double[:] _{seq.name}_ncarray") if isinstance(seq, sequencetools.InputSequence) and (seq.NDIM == 0): add(1, f"cdef public bint _{seq.name}_inputflag") add(1, f"cdef double *_{seq.name}_inputpointer") elif isinstance(seq, sequencetools.OutputSequence) and (seq.NDIM == 0): add(1, f"cdef public bint _{seq.name}_outputflag") add(1, f"cdef double *_{seq.name}_outputpointer")
@staticmethod def _get_index(ndim: int) -> str: return ", ".join(f"jdx{idx}" for idx in range(ndim)) @staticmethod def _add_cdef_jdxs( lines: PyxPxdLines, subseqs: sequencetools.IOSequences[Any, Any, Any] ) -> None: maxndim = max(seq.NDIM for seq in subseqs) if maxndim: jdxs = ", ".join(f"jdx{ndim}" for ndim in range(maxndim)) lines.pyx.add(2, f"cdef {INT} {jdxs}")
[docs] def reset_reuseflags(self, lines: PyxPxdLines) -> None: """Reset reuse flag statements.""" print(" . reset_reuseflags") pyx, both = lines.pyx.add, lines.add both(1, f"cpdef void reset_reuseflags(self){_nogil}:") if (methods := self.model.REUSABLE_METHODS) or self.model.find_submodels( include_subsubmodels=False, include_optional=True, repeat_sharedmodels=True ): for method in methods: pyx(2, f"self.{method.REUSEMARKER} = False") self._call_submodel_method(lines, "reset_reuseflags()") else: pyx(2, "pass")
[docs] @classmethod def load_data( cls, lines: PyxPxdLines, subseqs: sequencetools.IOSequences[Any, Any, Any] ) -> None: """Load data statements.""" print(" . load_data") pyx, both = lines.pyx.add, lines.add both(1, f"cpdef inline void load_data(self, {INT} idx) {_nogil}:") cls._add_cdef_jdxs(lines, subseqs) pyx(2, f"cdef {INT} k") for seq in subseqs: if isinstance(seq, sequencetools.InputSequence) and (seq.NDIM == 0): pyx(2, f"if self._{seq.name}_inputflag:") pyx(3, f"self.{seq.name} = self._{seq.name}_inputpointer[0]") if_or_elif = "elif" else: if_or_elif = "if" pyx(2, f"{if_or_elif} self._{seq.name}_diskflag_reading:") if seq.NDIM == 0: pyx(3, f"self.{seq.name} = self._{seq.name}_ncarray[0]") else: pyx(3, "k = 0") for idx in range(seq.NDIM): pyx( 3 + idx, f"for jdx{idx} in range(self._{seq.name}_length_{idx}):", ) pyx( 3 + seq.NDIM, f"self.{seq.name}[{cls._get_index(seq.NDIM)}] " f"= self._{seq.name}_ncarray[k]", ) pyx(3 + seq.NDIM, "k += 1") pyx(2, f"elif self._{seq.name}_ramflag:") if seq.NDIM == 0: pyx(3, f"self.{seq.name} = self._{seq.name}_array[idx]") else: for idx in range(seq.NDIM): pyx( 3 + idx, f"for jdx{idx} in " f"range(self._{seq.name}_length_{idx}):", ) index = cls._get_index(seq.NDIM) pyx( 3 + seq.NDIM, f"self.{seq.name}[{index}] = self._{seq.name}_array[idx, {index}]", )
[docs] @classmethod def save_data( cls, lines: PyxPxdLines, subseqs: sequencetools.IOSequences[Any, Any, Any] ) -> None: """Save data statements.""" print(" . save_data") pyx, both = lines.pyx.add, lines.add both(1, f"cpdef inline void save_data(self, {INT} idx) {_nogil}:") cls._add_cdef_jdxs(lines, subseqs) pyx(2, f"cdef {INT} k") for seq in subseqs: pyx(2, f"if self._{seq.name}_diskflag_writing:") if seq.NDIM == 0: pyx(3, f"self._{seq.name}_ncarray[0] = self.{seq.name}") else: pyx(3, "k = 0") for idx in range(seq.NDIM): pyx( 3 + idx, f"for jdx{idx} in " f"range(self._{seq.name}_length_{idx}):", ) index = cls._get_index(seq.NDIM) pyx( 3 + seq.NDIM, f"self._{seq.name}_ncarray[k] = self.{seq.name}[{index}]", ) pyx(3 + seq.NDIM, "k += 1") pyx(2, f"if self._{seq.name}_ramflag:") if seq.NDIM == 0: pyx(3, f"self._{seq.name}_array[idx] = self.{seq.name}") else: for idx in range(seq.NDIM): pyx( 3 + idx, f"for jdx{idx} in " f"range(self._{seq.name}_length_{idx}):", ) index = cls._get_index(seq.NDIM) pyx( 3 + seq.NDIM, f"self._{seq.name}_array[idx, {index}] = self.{seq.name}[{index}]", )
[docs] def set_pointer( self, lines: PyxPxdLines, subseqs: Union[ sequencetools.InputSequences, sequencetools.OutputSequences[Any], sequencetools.LinkSequences[Any], ], ) -> None: """Set pointer statements for all input, output, and link sequences.""" if isinstance(subseqs, sequencetools.InputSequences): self.set_pointerinput(lines, subseqs) elif isinstance(subseqs, sequencetools.OutputSequences): self.set_pointeroutput(lines, subseqs) else: if any(seq.NDIM == 0 for seq in subseqs): self.set_pointer0d(lines, subseqs) if any(seq.NDIM == 1 for seq in subseqs): self.alloc(lines, subseqs) self.dealloc(lines, subseqs) self.set_pointer1d(lines, subseqs)
[docs] @staticmethod def set_pointer0d( lines: PyxPxdLines, subseqs: sequencetools.LinkSequences[Any] ) -> None: """Set pointer statements for 0-dimensional link sequences.""" print(" . set_pointer0d") pyx, both = lines.pyx.add, lines.add both( 1, "cpdef inline set_pointer0d(self, str name, pointerutils.Double value):" ) pyx(2, "cdef pointerutils.PDouble pointer = pointerutils.PDouble(value)") for seq in (seq for seq in subseqs if seq.NDIM == 0): pyx(2, f'if name == "{seq.name}":') pyx(3, f"self.{seq.name} = pointer.p_value")
[docs] @staticmethod def get_value( lines: PyxPxdLines, subseqs: sequencetools.LinkSequences[Any] ) -> None: """Get value statements for link sequences.""" print(" . get_value") pyx, both = lines.pyx.add, lines.add both(1, "cpdef get_value(self, str name):") pyx(2, f"cdef {INT} idx") for seq in subseqs: pyx(2, f'if name == "{seq.name}":') if seq.NDIM == 0: pyx(3, f"return self.{seq.name}[0]") elif seq.NDIM == 1: pyx(3, f"values = numpy.empty(self.len_{seq.name})") pyx(3, f"for idx in range(self.len_{seq.name}):") PyxWriter._check_pointer(lines, seq) pyx(4, f"values[idx] = self.{seq.name}[idx][0]") pyx(3, "return values")
[docs] @staticmethod def set_value( lines: PyxPxdLines, subseqs: sequencetools.LinkSequences[Any] ) -> None: """Set value statements for link sequences.""" print(" . set_value") pyx, both = lines.pyx.add, lines.add both(1, "cpdef set_value(self, str name, value):") for seq in subseqs: pyx(2, f'if name == "{seq.name}":') if seq.NDIM == 0: pyx(3, f"self.{seq.name}[0] = value") elif seq.NDIM == 1: pyx(3, f"for idx in range(self.len_{seq.name}):") PyxWriter._check_pointer(lines, seq) pyx(4, f"self.{seq.name}[idx][0] = value[idx]")
@staticmethod def _check_pointer(lines: PyxPxdLines, seq: sequencetools.LinkSequence) -> None: pyx = lines.pyx.add pyx(4, f"pointerutils.check0(self._{seq.name}_length_0)") pyx(4, f"if self._{seq.name}_ready[idx] == 0:") pyx(5, f"pointerutils.check1(self._{seq.name}_length_0, idx)") pyx(5, f"pointerutils.check2(self._{seq.name}_ready, idx)")
[docs] @staticmethod def alloc(lines: PyxPxdLines, subseqs: sequencetools.LinkSequences[Any]) -> None: """Allocate memory statements for 1-dimensional link sequences.""" print(" . setlength") pyx, both = lines.pyx.add, lines.add both(1, f"cpdef inline alloc(self, name, {TYPE2STR[int]} length):") for seq in (seq for seq in subseqs if seq.NDIM == 1): pyx(2, f'if name == "{seq.name}":') pyx(3, f"self._{seq.name}_length_0 = length") pyx( 3, f"self._{seq.name}_ready = " f"numpy.full(length, 0, dtype={ TYPE2STR[int].split('_')[0]})", ) pyx( 3, f"self.{seq.name} = " f"<double**> PyMem_Malloc(length * sizeof(double*))", )
[docs] @staticmethod def dealloc(lines: PyxPxdLines, subseqs: sequencetools.LinkSequences[Any]) -> None: """Deallocate memory statements for 1-dimensional link sequences.""" print(" . dealloc") pyx, both = lines.pyx.add, lines.add both(1, "cpdef inline dealloc(self, name):") for seq in (seq for seq in subseqs if seq.NDIM == 1): pyx(2, f'if name == "{seq.name}":') pyx(3, f"PyMem_Free(self.{seq.name})")
[docs] @staticmethod def set_pointer1d( lines: PyxPxdLines, subseqs: sequencetools.LinkSequences[Any] ) -> None: """Set_pointer statements for 1-dimensional link sequences.""" print(" . set_pointer1d") pyx, both = lines.pyx.add, lines.add both( 1, "cpdef inline set_pointer1d" f"(self, str name, pointerutils.Double value, {INT} idx):", ) pyx(2, "cdef pointerutils.PDouble pointer = pointerutils.PDouble(value)") for seq in (seq for seq in subseqs if seq.NDIM == 1): pyx(2, f'if name == "{seq.name}":') pyx(3, f"self.{seq.name}[idx] = pointer.p_value") pyx(3, f"self._{seq.name}_ready[idx] = 1")
[docs] @classmethod def set_pointerinput( cls, lines: PyxPxdLines, subseqs: sequencetools.InputSequences ) -> None: """Set pointer statements for input sequences.""" print(" . set_pointerinput") pyx, both = lines.pyx.add, lines.add both( 1, "cpdef inline set_pointerinput" "(self, str name, pointerutils.PDouble value):", ) subseqs_ = cls._filter_inputsequences(subseqs) if subseqs_: for seq in subseqs_: pyx(2, f'if name == "{seq.name}":') pyx(3, f"self._{seq.name}_inputpointer = value.p_value") else: pyx(2, "pass")
[docs] @classmethod def set_pointeroutput( cls, lines: PyxPxdLines, subseqs: sequencetools.OutputSequences[Any] ) -> None: """Set pointer statements for output sequences.""" print(" . set_pointeroutput") pyx, both = lines.pyx.add, lines.add both( 1, "cpdef inline set_pointeroutput" "(self, str name, pointerutils.PDouble value):", ) subseqs_ = cls._filter_outputsequences(subseqs) if subseqs_: for seq in subseqs_: pyx(2, f'if name == "{seq.name}":') pyx(3, f"self._{seq.name}_outputpointer = value.p_value") else: pyx(2, "pass")
@staticmethod def _filter_inputsequences( subseqs: sequencetools.InputSequences, ) -> list[sequencetools.InputSequence]: return [subseq for subseq in subseqs if not subseq.NDIM] @staticmethod def _filter_outputsequences( subseqs: sequencetools.OutputSequences[Any], ) -> list[sequencetools.OutputSequence]: return [subseq for subseq in subseqs if not subseq.NDIM]
[docs] def numericalparameters(self, lines: PyxPxdLines) -> None: """Numeric parameter declaration lines.""" if isinstance(self.model, modeltools.SolverModel): pyx, pxd, both = lines.pyx.add, lines.pxd.add, lines.add both(0, "@cython.final") both(0, "cdef class NumConsts:") pyx(1, "pass") for name in ("nmb_methods", "nmb_stages"): pxd(1, f"cdef public {TYPE2STR[int]} {name}") for name in ("dt_increase", "dt_decrease"): pxd(1, f"cdef public {TYPE2STR[float]} {name}") pxd(1, "cdef public configutils.Config pub") pxd(1, "cdef public double[:, :, :] a_coefs") both(0, "@cython.final") both(0, "cdef class NumVars:") pyx(1, "pass") pxd(1, "cdef public bint use_relerror") for name in ("nmb_calls", "idx_method", "idx_stage"): pxd(1, f"cdef public {TYPE2STR[int]} {name}") for name in ( "t0", "t1", "dt", "dt_est", "abserror", "relerror", "last_abserror", "last_relerror", "extrapolated_abserror", "extrapolated_relerror", ): pxd(1, f"cdef public {TYPE2STR[float]} {name}") pxd(1, f"cdef public {TYPE2STR[bool]} f0_ready")
[docs] def submodels(self, lines: PyxPxdLines) -> None: """Submodel declaration lines.""" for submodel in self.model.SUBMODELS: pyx, pxd, both = lines.pyx.add, lines.pxd.add, lines.add both(0, "@cython.final") cls = submodel.CYTHONBASECLASS both( 0, f"cdef class {submodel.__name__}(" f"{cls.__module__.split('.')[-1]}.{cls.__name__}):", ) pxd(1, "cdef public Model model") pyx(1, "def __init__(self, Model model):") pyx(2, "self.model = model") for idx, method in enumerate(submodel.METHODS): both(1, f"cpdef double apply_method{idx}(self, double x) {_nogil}:") pyx(2, f"return self.model.{method.__name__.lower()}(x)")
[docs] def modeldeclarations(self, lines: PyxPxdLines) -> None: """The attribute declarations of the model class.""" # pylint: disable=too-many-branches submodeltypes_old = getattr(self.model, "SUBMODELS", ()) submodelnames_new = [ n.split(".")[-1] for n in self.model.find_submodels( include_subsubmodels=False, include_sidemodels=True, include_optional=True, aggregate_vectors=True, repeat_sharedmodels=True, ) ] pyx, pxd, both = lines.pyx.add, lines.pxd.add, lines.add both(0, "@cython.final") follows_interface = any( base for base in inspect.getmro(type(self.model)) if issubclass(base, modeltools.SubmodelInterface) and base.__module__.startswith("hydpy.interfaces.") ) if follows_interface: both(0, "cdef class Model(masterinterface.MasterInterface):") else: both(0, "cdef class Model:") for cls in inspect.getmro(type(self.model)): for name, member in vars(cls).items(): if isinstance(member, modeltools.IndexProperty): if (name != "idx_sim") or not follows_interface: pxd(1, f"cdef public {INT} {name}") if isinstance(self.model, modeltools.SubstepModel): pxd(1, f"cdef public {TYPE2STR[float]} timeleft") if self.model.parameters: pxd(1, "cdef public Parameters parameters") pxd(1, "cdef public Sequences sequences") for name in submodelnames_new: if name.endswith("_*"): name = name[:-2] pxd(1, f"cdef public interfaceutils.SubmodelsProperty {name}") else: pxd(1, f"cdef public masterinterface.MasterInterface {name}") pxd(1, f"cdef public {TYPE2STR[bool]} {name}_is_mainmodel") pxd(1, f"cdef public {TYPE2STR[int]} {name}_typeid") for submodel in submodeltypes_old: pxd(1, f"cdef public {submodel.__name__} {submodel.name}") if hasattr(self.model, "numconsts"): pxd(1, "cdef public NumConsts numconsts") if hasattr(self.model, "numvars"): pxd(1, "cdef public NumVars numvars") if submodeltypes_old or submodelnames_new: pyx(1, "def __init__(self):") pyx(2, "super().__init__()") for name in submodelnames_new: if name.endswith("_*"): name = name[:-2] pyx(2, f"self.{name} = interfaceutils.SubmodelsProperty()") else: pyx(2, f"self.{name} = None") pyx(2, f"self.{name}_is_mainmodel = False") for submodel in submodeltypes_old: pyx(2, f"self.{submodel.name} = {submodel.__name__}(self)") baseinterface = "Optional[masterinterface.MasterInterface]" for name in submodelnames_new: if not name.endswith("_*"): pyx(1, f"def get_{name}(self) -> {baseinterface}:") pyx(2, f"return self.{name}") pyx(1, f"def set_{name}(self, {name}: {baseinterface}) -> None:") pyx(2, f"self.{name} = {name}") for method in self.model.REUSABLE_METHODS: pxd(1, f"cdef bint {method.REUSEMARKER}")
[docs] def modelstandardfunctions(self, lines: PyxPxdLines) -> None: """The standard functions of the model class.""" self.simulate(lines) self.reset_reuseflags(lines) self.iofunctions(lines) self.new2old(lines) if isinstance(self.model, modeltools.RunModel): self.run(lines, self.model) self.update_inlets(lines) self.update_outlets(lines) self.update_receivers(lines) self.update_senders(lines) self.update_outputs_model(lines)
[docs] def modelnumericfunctions(self, lines: PyxPxdLines) -> None: """Numerical integration functions of the model class.""" if isinstance(self.model, modeltools.SolverModel): self.solve(lines) self.calculate_single_terms(lines, self.model) self.calculate_full_terms(lines, self.model) self.get_point_states(lines) self.set_point_states(lines) self.set_result_states(lines) self.get_sum_fluxes(lines) self.set_point_fluxes(lines) self.set_result_fluxes(lines) self.integrate_fluxes(lines) self.reset_sum_fluxes(lines) self.addup_fluxes(lines) self.calculate_error(lines) self.extrapolate_error(lines)
[docs] def simulate(self, lines: PyxPxdLines) -> None: """Simulation statements.""" print(" . simulate") pyx, both = lines.pyx.add, lines.add both(1, f"cpdef inline void simulate(self, {INT} idx) {_nogil}:") pyx(2, "self.idx_sim = idx") if self.model.REUSABLE_METHODS or self.model.find_submodels( include_optional=True, include_subsubmodels=False, repeat_sharedmodels=True ): pyx(2, "self.reset_reuseflags()") seqs = self.model.sequences if seqs.inputs or self.model.SUBMODELINTERFACES: pyx(2, "self.load_data(idx)") if self.model.INLET_METHODS: pyx(2, "self.update_inlets()") if isinstance(self.model, modeltools.SolverModel): pyx(2, "self.solve()") else: pyx(2, "self.run()") if seqs.states: pyx(2, "self.new2old()") if self.model.OUTLET_METHODS: pyx(2, "self.update_outlets()") if seqs.factors or seqs.fluxes or seqs.states: pyx(2, "self.update_outputs()")
def _call_submodel_method(self, lines: PyxPxdLines, methodcall: str) -> None: name2submodel = self.model.find_submodels( include_subsubmodels=False, include_optional=True, aggregate_vectors=True, repeat_sharedmodels=True, ) pyx = lines.pyx.add if any(name.endswith("_*") for name in name2submodel): pyx(2, f"cdef {INT} i_submodel") for fullname in name2submodel: name = fullname.rpartition(".")[2] if name.endswith("_*"): name = name[:-2] pyx(2, f"for i_submodel in range(self.{name}.number):") pyx(3, f"if self.{name}.typeids[i_submodel] > 0:") pyx( 4, f"(<masterinterface.MasterInterface>" f"self.{name}.submodels[i_submodel]).{methodcall}", ) else: pyx( 2, f"if (self.{name} is not None) and not self.{name}_is_mainmodel:" ) pyx(3, f"self.{name}.{methodcall}")
[docs] def iofunctions(self, lines: PyxPxdLines) -> None: """Input/output functions of the model class. The result of property |PyxWriter.iofunctions| depends on the availability of different types of sequences. So far, the models implemented in *HydPy* do not reflect all possible combinations, which is why we modify the |hland_96| application model in the following examples: >>> from hydpy.models.hland_96 import cythonizer >>> pyxwriter = cythonizer.pyxwriter >>> from hydpy.cythons.modelutils import PyxPxdLines >>> lines = PyxPxdLines() >>> pyxwriter.iofunctions(lines) . load_data . save_data >>> lines.pyx # doctest: +ELLIPSIS cpdef void load_data(self, ...int... idx) noexcept nogil: self.idx_sim = idx self.sequences.inputs.load_data(idx) if (self.aetmodel is not None) and not self.aetmodel_is_mainmodel: self.aetmodel.load_data(idx) if (self.rconcmodel is not None) and not self.rconcmodel_is_mainmodel: self.rconcmodel.load_data(idx) cpdef void save_data(self, ...int... idx) noexcept nogil: self.idx_sim = idx self.sequences.inputs.save_data(idx) self.sequences.factors.save_data(idx) self.sequences.fluxes.save_data(idx) self.sequences.states.save_data(idx) if (self.aetmodel is not None) and not self.aetmodel_is_mainmodel: self.aetmodel.save_data(idx) if (self.rconcmodel is not None) and not self.rconcmodel_is_mainmodel: self.rconcmodel.save_data(idx) <BLANKLINE> >>> pyxwriter.model.sequences.factors = None >>> pyxwriter.model.sequences.fluxes = None >>> pyxwriter.model.sequences.states = None >>> lines.pyx.clear() >>> pyxwriter.iofunctions(lines) . load_data . save_data >>> lines.pyx # doctest: +ELLIPSIS cpdef void load_data(self, ...int... idx) noexcept nogil: self.idx_sim = idx self.sequences.inputs.load_data(idx) if (self.aetmodel is not None) and not self.aetmodel_is_mainmodel: self.aetmodel.load_data(idx) if (self.rconcmodel is not None) and not self.rconcmodel_is_mainmodel: self.rconcmodel.load_data(idx) cpdef void save_data(self, ...int... idx) noexcept nogil: self.idx_sim = idx self.sequences.inputs.save_data(idx) if (self.aetmodel is not None) and not self.aetmodel_is_mainmodel: self.aetmodel.save_data(idx) if (self.rconcmodel is not None) and not self.rconcmodel_is_mainmodel: self.rconcmodel.save_data(idx) <BLANKLINE> >>> pyxwriter.model.sequences.inputs = None >>> lines.pyx.clear() >>> pyxwriter.iofunctions(lines) >>> lines.pyx # doctest: +ELLIPSIS <BLANKLINE> <BLANKLINE> """ seqs = self.model.sequences if not (seqs.inputs or seqs.factors or seqs.fluxes or seqs.states): return pyx, both = lines.pyx.add, lines.add for func in ("load_data", "save_data"): if (func == "load_data") and not ( seqs.inputs or self.model.SUBMODELINTERFACES ): continue print(f" . {func}") nogil = func in ("load_data", "save_data") both(1, get_methodheader(func, nogil=nogil, idxarg=True, inline=False)) pyx(2, "self.idx_sim = idx") for subseqs in seqs: if func == "load_data": applyfuncs: tuple[str, ...] = ("inputs",) else: applyfuncs = ("inputs", "factors", "fluxes", "states") if subseqs.name in applyfuncs: pyx(2, f"self.sequences.{subseqs.name}." f"{func}(idx)") self._call_submodel_method(lines, f"{func}(idx)")
[docs] def new2old(self, lines: PyxPxdLines) -> None: """Old states to new states statements.""" name2submodel = self.model.find_submodels( include_subsubmodels=False, include_optional=True, aggregate_vectors=True, repeat_sharedmodels=True, ) pyx, both = lines.pyx.add, lines.add if self.model.sequences.states or name2submodel: print(" . new2old") both(1, get_methodheader("new2old", nogil=True, inline=False)) if self.model.sequences.states: self._add_cdef_jdxs(lines, self.model.sequences.states) for seq in self.model.sequences.states: if seq.NDIM == 0: pyx( 2, f"self.sequences.old_states.{seq.name} = " f"self.sequences.new_states.{seq.name}", ) else: indexing = "" for idx in range(seq.NDIM): pyx( 2 + idx, f"for jdx{idx} in range(self.sequences.states." f"_{seq.name}_length_{idx}):", ) indexing += f"jdx{idx}," indexing = indexing[:-1] pyx( 2 + seq.NDIM, f"self.sequences.old_states.{seq.name}[{indexing}] = " f"self.sequences.new_states.{seq.name}[{indexing}]", ) self._call_submodel_method(lines, "new2old()")
def _call_methods( self, lines: PyxPxdLines, name: str, methods: tuple[type[modeltools.Method], ...], idx_as_arg: bool = False, ) -> None: if hasattr(self.model, name): pyx, both = lines.pyx.add, lines.add both(1, get_methodheader(name, nogil=True, idxarg=idx_as_arg)) if idx_as_arg: pyx(2, "self.idx_sim = idx") anything = False for method in methods: pyx(2, f"self.{method.__name__.lower()}()") anything = True if not anything: pyx(2, "pass") def _call_runmethods_segmentwise( self, lines: PyxPxdLines, methods: tuple[type[modeltools.Method], ...] ) -> None: if hasattr(self.model, "run"): pyx, both = lines.pyx.add, lines.add both(1, get_methodheader("run", nogil=True, idxarg=False)) pyx(2, f"cdef {TYPE2STR[int]} idx_segment, idx_run") pyx(2, "for idx_segment in range(self.parameters.control.nmbsegments):") pyx(3, "self.idx_segment = idx_segment") pyx(3, "for idx_run in range(self.parameters.solver.nmbruns):") pyx(4, "self.idx_run = idx_run") for method in methods: pyx(4, f"self.{method.__name__.lower()}()")
[docs] def update_receivers(self, lines: PyxPxdLines) -> None: """Lines of the model method with the same name.""" self._call_methods(lines, "update_receivers", self.model.RECEIVER_METHODS, True)
[docs] def update_inlets(self, lines: PyxPxdLines) -> None: """Lines of the model method with the same name.""" self._call_methods(lines, "update_inlets", self.model.INLET_METHODS)
[docs] def run(self, lines: PyxPxdLines, model: modeltools.RunModel) -> None: """Return the lines of the model method with the same name.""" if isinstance(model, modeltools.SegmentModel): self._call_runmethods_segmentwise(lines, model.RUN_METHODS) else: nmb = len(lines.pyx) self._call_methods(lines, "run", model.RUN_METHODS) if isinstance(model, modeltools.SubstepModel): pyx = Lines() pyx.extend(lines.pyx[: nmb + 1]) add = pyx.add add(2, "self.timeleft = self.parameters.derived.seconds") add(2, "while True:") for line in lines.pyx[nmb + 1 :]: add(1, line) add(3, "if self.timeleft <= 0.0:") add(4, "break") add(3, "self.new2old()") lines.pyx = pyx
[docs] def update_outlets(self, lines: PyxPxdLines) -> None: """Lines of the model method with the same name.""" self._call_methods(lines, "update_outlets", self.model.OUTLET_METHODS)
[docs] def update_senders(self, lines: PyxPxdLines) -> None: """Lines of the model method with the same name.""" self._call_methods(lines, "update_senders", self.model.SENDER_METHODS, True)
[docs] def update_outputs_model(self, lines: PyxPxdLines) -> None: """Lines of the model method with the same name (except the `_model` suffix).""" pyx, both = lines.pyx.add, lines.add both(1, get_methodheader("update_outputs", nogil=True, idxarg=False)) factors = self._filter_outputsequences(self.model.sequences.factors) fluxes = self._filter_outputsequences(self.model.sequences.fluxes) states = self._filter_outputsequences(self.model.sequences.states) if factors: pyx(2, "self.sequences.factors.update_outputs()") if fluxes: pyx(2, "self.sequences.fluxes.update_outputs()") if states: pyx(2, "self.sequences.states.update_outputs()") if not (factors or fluxes or states): pyx(2, "pass")
[docs] def update_outputs( self, lines: PyxPxdLines, subseqs: sequencetools.OutputSequences[Any] ) -> None: """Lines of the subsequences method with the same name.""" pyx, both = lines.pyx.add, lines.add both(1, get_methodheader("update_outputs", nogil=True, idxarg=False)) subseqs_ = self._filter_outputsequences(subseqs) if subseqs_: for seq in subseqs_: name = seq.name pyx(2, f"if self._{name}_outputflag:") pyx(3, f"self._{name}_outputpointer[0] = self.{name}") else: pyx(2, "pass")
[docs] def calculate_single_terms( self, lines: PyxPxdLines, model: modeltools.SolverModel ) -> None: """Return the lines of the model method with the same name.""" nmb = len(lines.pyx) self._call_methods(lines, "calculate_single_terms", model.PART_ODE_METHODS) if len(lines.pyx) > nmb: lines.pyx.insert( nmb + 1, (" self.numvars.nmb_calls = self.numvars.nmb_calls + 1") )
[docs] def calculate_full_terms( self, lines: PyxPxdLines, model: modeltools.SolverModel ) -> None: """Return the lines of the model method with the same name.""" self._call_methods(lines, "calculate_full_terms", model.FULL_ODE_METHODS)
@property def name2function_method(self) -> dict[str, types.MethodType]: """Functions defined by |Method| subclasses.""" name2function = {} for name, member in vars(self.model).items(): if (getattr(member, "__name__", None) == "call_reusablemethod") or getattr( getattr(member, "__func__", None), "__HYDPY_METHOD__", False ): name2function[name] = member return name2function @property def automethod2name(self) -> dict[str, tuple[type[modeltools.Method], ...]]: """Submethods selected by |AutoMethod| and |SetAutoMethod| subclasses.""" automethod2name: dict[str, tuple[type[modeltools.Method], ...]] = {} for name, member in vars(self.model).items(): if ( isinstance(member, types.MethodType) and isinstance(call := member.__func__, types.MethodType) and inspect.isclass(method := call.__self__) and issubclass( automethod := method, (modeltools.AutoMethod, modeltools.SetAutoMethod), ) ): automethod2name[name] = automethod.SUBMETHODS return automethod2name @property def interfacemethods(self) -> set[str]: """The full and abbreviated names of the selected model's interface methods.""" if hasattr(self.model, "INTERFACE_METHODS"): interfaces = set(m.__name__.lower() for m in self.model.INTERFACE_METHODS) interfaces.update(set(i.rpartition("_")[0] for i in interfaces)) return interfaces return set()
[docs] def modeluserfunctions(self, lines: PyxPxdLines) -> None: """Model-specific functions.""" for name, func in self.name2function_method.items(): print(f" . {name}") inline = name not in self.interfacemethods funcconverter = FuncConverter( model=self.model, funcname=name, func=func, inline=inline ) pyxlines = tuple(f" {line}" for line in funcconverter.pyxlines) lines.pyx.extend(pyxlines) lines.pxd.append(pyxlines[0][:-1]) for name, submethods in self.automethod2name.items(): print(f" . {name}") self.automethod(lines, name=name, submethods=submethods)
[docs] def callbackfeatures(self, lines: PyxPxdLines) -> None: """Features to let users define callback functions.""" pyx, pxd = lines.pyx.add, lines.pxd.add pxd(0, f"ctypedef void (*CallbackType) (Model) {_nogil}") pyx(0, "") pxd(0, "cdef class CallbackWrapper:") pxd(1, "cdef CallbackType callback") pyx(0, "") pyx(0, f"cdef void do_nothing(Model model) {_nogil}:") pyx(1, "pass") pyx(0, "") pyx(0, "cpdef get_wrapper():") pyx(1, "cdef CallbackWrapper wrapper = CallbackWrapper()") pyx(1, "wrapper.callback = do_nothing") pyx(1, "return wrapper") pyx(0, "")
[docs] def automethod( self, lines: PyxPxdLines, name: str, submethods: tuple[type[modeltools.Method], ...], ) -> None: """Lines of a method defined by a |AutoMethod| or |SetAutoMethod| subclass.""" pyx, both = lines.pyx.add, lines.add inline = name not in self.interfacemethods submethod2arg, subsignatures = {}, [] for submethod in submethods: if len(args := inspect.getargs(submethod.__call__.__code__).args) == 2: submethod2arg[submethod] = args[1] type_ = get_type_hints(submethod.__call__)[args[1]] subsignatures.append(f"{TYPE2STR[type_]} {args[1]}") else: assert len(args) == 1 header = get_methodheader(methodname=name, nogil=True, inline=inline) if subsignatures: subheaders = list(header.partition(")")) subheaders.insert(1, ", ".join([""] + subsignatures)) header = "".join(subheaders) both(1, header) for submethod in submethods: arg = submethod2arg.get(submethod, "") pyx(2, f"self.{submethod.__name__.lower()}({arg})")
[docs] def solve(self, lines: PyxPxdLines) -> None: """Lines of the model method with the same name.""" if solve := getattr(self.model, "solve", None): print(" . solve") funcconverter = FuncConverter(self.model, "solve", solve) pyxlines = tuple(f" {line}" for line in funcconverter.pyxlines) lines.pyx.extend(pyxlines) lines.pxd.append(pyxlines[0][:-1])
@classmethod def _assign_seqvalues( cls, subseqs: Iterable[sequencetools.IOSequence], subseqs_name: str, target: str, index: Optional[str], load: bool, ) -> Iterator[str]: subseqs = list(subseqs) from1 = f"self.sequences.{subseqs_name}.%s" to1 = f"self.sequences.{subseqs_name}._%s_{target}" if index is not None: to1 += f"[self.numvars.{index}]" if load: from1, to1 = to1, from1 yield from cls._declare_idxs(subseqs) for seq in subseqs: from2 = from1 % seq.name to2 = to1 % seq.name if seq.NDIM == 0: yield f"{to2} = {from2}" elif seq.NDIM == 1: yield ( f"for idx0 in range(self.sequences." f"{subseqs_name}._{seq.name}_length):" ) yield f" {to2}[idx0] = {from2}[idx0]" elif seq.NDIM == 2: yield ( f"for idx0 in range(self.sequences." f"{subseqs_name}._{seq.name}_length0):" ) yield ( f" for idx1 in range(self.sequences." f"{subseqs_name}._{seq.name}_length1):" ) yield f" {to2}[idx0, idx1] = {from2}[idx0, idx1]" else: raise NotImplementedError( f"NDIM of sequence `{seq.name}` is higher than expected." ) @staticmethod def _declare_idxs(subseqs: Iterable[sequencetools.IOSequence]) -> Iterator[str]: maxdim = 0 for seq in subseqs: maxdim = max(maxdim, seq.NDIM) if maxdim == 1: yield f"cdef {INT} idx0" elif maxdim == 2: yield f"cdef {INT} idx0, idx1"
[docs] @decorate_method def get_point_states(self) -> Iterator[str]: """Get point statements for state sequences.""" return self._assign_seqvalues( subseqs=self.model.sequences.states, subseqs_name="states", target="points", index="idx_stage", load=True, )
[docs] @decorate_method def set_point_states(self) -> Iterator[str]: """Set point statements for state sequences.""" return self._assign_seqvalues( subseqs=self.model.sequences.states, subseqs_name="states", target="points", index="idx_stage", load=False, )
[docs] @decorate_method def set_result_states(self) -> Iterator[str]: """Get results statements for state sequences.""" return self._assign_seqvalues( subseqs=self.model.sequences.states, subseqs_name="states", target="results", index="idx_method", load=False, )
[docs] @decorate_method def get_sum_fluxes(self) -> Iterator[str]: """Get sum statements for flux sequences.""" return self._assign_seqvalues( subseqs=self.model.sequences.fluxes.numericsequences, subseqs_name="fluxes", target="sum", index=None, load=True, )
[docs] @decorate_method def set_point_fluxes(self) -> Iterator[str]: """Set point statements for flux sequences.""" return self._assign_seqvalues( subseqs=self.model.sequences.fluxes.numericsequences, subseqs_name="fluxes", target="points", index="idx_stage", load=False, )
[docs] @decorate_method def set_result_fluxes(self) -> Iterator[str]: """Set result statements for flux sequences.""" return self._assign_seqvalues( subseqs=self.model.sequences.fluxes.numericsequences, subseqs_name="fluxes", target="results", index="idx_method", load=False, )
[docs] @decorate_method def integrate_fluxes(self) -> Iterator[str]: """Integrate statements for flux sequences.""" max_ndim = -1 for seq in self.model.sequences.fluxes.numericsequences: max_ndim = max(max_ndim, seq.NDIM) if max_ndim == 0: yield f"cdef {INT} jdx" elif max_ndim == 1: yield f"cdef {INT} jdx, idx0" elif max_ndim == 2: yield f"cdef {INT} jdx, idx0, idx1" for seq in self.model.sequences.fluxes.numericsequences: to_ = f"self.sequences.fluxes.{seq.name}" from_ = f"self.sequences.fluxes._{seq.name}_points" coefs = ( "self.numvars.dt * self.numconsts.a_coefs" "[self.numvars.idx_method-1, self.numvars.idx_stage, jdx]" ) if seq.NDIM == 0: yield f"{to_} = 0." yield "for jdx in range(self.numvars.idx_method):" yield f" {to_} = {to_} +{coefs}*{from_}[jdx]" elif seq.NDIM == 1: yield ( f"for idx0 in " f"range(self.sequences.fluxes._{seq.name}_length):" ) yield f" {to_}[idx0] = 0." yield " for jdx in range(self.numvars.idx_method):" yield ( f" {to_}[idx0] = " f"{to_}[idx0] + {coefs}*{from_}[jdx, idx0]" ) elif seq.NDIM == 2: yield ( f"for idx0 in " f"range(self.sequences.fluxes._{seq.name}_length0):" ) yield ( f" for idx1 in range(" f"self.sequences.fluxes._{seq.name}_length1):" ) yield f" {to_}[idx0, idx1] = 0." yield " for jdx in range(self.numvars.idx_method):" yield ( f" {to_}[idx0, idx1] = " f"{to_}[idx0, idx1] + {coefs}*{from_}[jdx, idx0, idx1]" ) else: raise NotImplementedError( f"NDIM of sequence `{seq.name}` is higher than expected." )
[docs] @decorate_method def reset_sum_fluxes(self) -> Iterator[str]: """Reset sum statements for flux sequences.""" subseqs = list(self.model.sequences.fluxes.numericsequences) yield from PyxWriter._declare_idxs(subseqs) for seq in subseqs: to_ = f"self.sequences.fluxes._{seq.name}_sum" if seq.NDIM == 0: yield f"{to_} = 0." elif seq.NDIM == 1: yield ( f"for idx0 in " f"range(self.sequences.fluxes._{seq.name}_length):" ) yield f" {to_}[idx0] = 0." elif seq.NDIM == 2: yield ( f"for idx0 in " f"range(self.sequences.fluxes._{seq.name}_length0):" ) yield ( f" for idx1 in " f"range(self.sequences.fluxes._{seq.name}_length1):" ) yield f" {to_}[idx0, idx1] = 0." else: raise NotImplementedError( f"NDIM of sequence `{seq.name}` is higher than expected." )
[docs] @decorate_method def addup_fluxes(self) -> Iterator[str]: """Add up statements for flux sequences.""" subseqs = list(self.model.sequences.fluxes.numericsequences) yield from PyxWriter._declare_idxs(subseqs) for seq in subseqs: to_ = f"self.sequences.fluxes._{seq.name}_sum" from_ = f"self.sequences.fluxes.{seq.name}" if seq.NDIM == 0: yield f"{to_} = {to_} + {from_}" elif seq.NDIM == 1: yield ( f"for idx0 in " f"range(self.sequences.fluxes._{seq.name}_length):" ) yield f" {to_}[idx0] = {to_}[idx0] + {from_}[idx0]" elif seq.NDIM == 2: yield ( f"for idx0 in " f"range(self.sequences.fluxes._{seq.name}_length0):" ) yield ( f" for idx1 in " f"range(self.sequences.fluxes._{seq.name}_length1):" ) yield ( f" {to_}[idx0, idx1] = " f"{to_}[idx0, idx1] + {from_}[idx0, idx1]" ) else: raise NotImplementedError( f"NDIM of sequence `{seq.name}` is higher than expected." )
[docs] @decorate_method def calculate_error(self) -> Iterator[str]: """Calculate error statements.""" subseqs = list(self.model.sequences.fluxes.numericsequences) assert isinstance(self.model, modeltools.ELSModel) if self.model.SOLVERSEQUENCES: subseqs = [ seq for seq in subseqs if isinstance(seq, self.model.SOLVERSEQUENCES) ] yield from self._declare_idxs(subseqs) userel = "self.numvars.use_relerror:" abserror = "self.numvars.abserror" relerror = "self.numvars.relerror" index = "self.numvars.idx_method" yield "cdef double abserror" yield f"{abserror} = 0." yield f"if {userel}" yield f" {relerror} = 0." yield "else:" yield f" {relerror} = inf" for seq in subseqs: results = f"self.sequences.fluxes._{seq.name}_results" if seq.NDIM == 0: yield f"abserror = fabs(" f"{results}[{index}]-{results}[{index}-1])" yield f"{abserror} = max({abserror}, abserror)" yield f"if {userel}" yield f" if {results}[{index}] == 0.:" yield f" {relerror} = inf" yield " else:" yield ( f" {relerror} = max(" f"{relerror}, fabs(abserror/{results}[{index}]))" ) elif seq.NDIM == 1: yield ( f"for idx0 in range(" f"self.sequences.fluxes._{seq.name}_length):" ) yield ( f" abserror = fabs(" f"{results}[{index}, idx0]-{results}[{index}-1, idx0])" ) yield f" {abserror} = max({abserror}, abserror)" yield f" if {userel}" yield f" if {results}[{index}, idx0] == 0.:" yield f" {relerror} = inf" yield " else:" yield ( f" {relerror} = max(" f"{relerror}, fabs(abserror/{results}[{index}, idx0]))" ) elif seq.NDIM == 2: yield ( f"for idx0 in range(" f"self.sequences.fluxes._{seq.name}_length0):" ) yield ( f" for idx1 in range(" f"self.sequences.fluxes._{seq.name}_length1):" ) yield ( f" abserror = fabs({results}[{index}, " f"idx0, idx1]-{results}[{index}-1, idx0, idx1])" ) yield f" {abserror} = max({abserror}, abserror)" yield f" if {userel}" yield f" if {results}[{index}, idx0, idx1] == 0.:" yield f" {relerror} = inf" yield " else:" yield ( f" {relerror} = max(" f"{relerror}, " f"fabs(abserror/{results}[{index}, idx0, idx1]))" ) else: raise NotImplementedError( f"NDIM of sequence `{seq.name}` is higher than expected." )
[docs] def extrapolate_error(self, lines: PyxPxdLines) -> None: """Extrapolate error statements.""" extrapolate_error = getattr(self.model, "extrapolate_error", None) if extrapolate_error: print(" . extrapolate_error") funcconverter = FuncConverter( self.model, "extrapolate_error", extrapolate_error ) pyxlines = tuple(f" {line}" for line in funcconverter.pyxlines) lines.pyx.extend(pyxlines) lines.pxd.append(pyxlines[0][:-1])
[docs] def write_stubfile(self) -> None: """Write a stub file for the actual base or application model. At the moment, *HydPy* creates model objects quite dynamically. In many regards, this comes with lots of conveniences. However, there two critical drawbacks compared to more static approaches: some amount of additional initialisation time and, more important, much opaqueness for code inspection tools. In this context, we experiment with "stub files" at the moment. These could either contain typing information only or define statically predefined model classes. The following example uses method |PyxWriter.write_stubfile| to write a (far from perfect) prototype stub file for base model |hland|: >>> from hydpy.models.hland import * >>> cythonizer.pyxwriter.write_stubfile() This is the path to the written file: >>> import os >>> import hydpy >>> filepath = os.path.join(hydpy.__path__[0], "hland.py") >>> os.path.exists(filepath) True However, it's just an experimental prototype, so we better remove it: >>> os.remove(filepath) >>> os.path.exists(filepath) False """ hydpypath: str = hydpy.__path__[0] filepath = os.path.join(hydpypath, f"{self.model.name}.py") base = ".".join(self.model.__module__.split(".")[:3]) with open(filepath, "w", encoding=config.ENCODING) as stubfile: stubfile.write( f"# -*- coding: utf-8 -*-\n\n" f"import hydpy\n" f"from {base} import *\n" f"from hydpy.core.parametertools import (\n" f" FastAccess,)\n" f"from hydpy.core.parametertools import (\n" f" Parameters, FastAccessParameter)\n" f"from hydpy.core.sequencetools import (\n" f" Sequences,)\n\n" ) for subpars in self.model.parameters: classname = f"FastAccess{subpars.name.capitalize()}Parameters" stubfile.write(f"\n\nclass {classname}(FastAccessParameter):\n") for partype in subpars.CLASSES: stubfile.write( f" {partype.__name__.lower()}: " f"{partype.__module__}.{partype.__name__}\n" ) for subpars in self.model.parameters: classname = f"{subpars.name.capitalize()}Parameters" stubfile.write(f"\n\nclass {classname}({classname}):\n") stubfile.write(f" fastaccess: FastAccess{classname}\n") for partype in subpars.CLASSES: stubfile.write( f" {partype.__name__.lower()}: " f"{partype.__module__}.{partype.__name__}\n" ) stubfile.write("\n\nclass Parameters(Parameters):\n") for subpars in self.model.parameters: classname = f"{subpars.name.capitalize()}Parameters" stubfile.write(f" {subpars.name}: {classname}\n") for subseqs in self.model.sequences: classname = f"FastAccess{type(subseqs).__name__}" stubfile.write(f"\n\nclass {classname}(FastAccess):\n") for seqtype in subseqs.CLASSES: stubfile.write( f" {seqtype.__name__.lower()}: " f"{seqtype.__module__}.{seqtype.__name__}\n" ) for subseqs in self.model.sequences: classname = type(subseqs).__name__ stubfile.write(f"\n\nclass {classname}({classname}):\n") stubfile.write(f" fastaccess: FastAccess{classname}\n") if classname == "StateSequences": stubfile.write(f" fastaccess_old: FastAccess{classname}\n") stubfile.write(f" fastaccess_new: FastAccess{classname}\n") for seqtype in subseqs.CLASSES: stubfile.write( f" {seqtype.__name__.lower()}: " f"{seqtype.__module__}.{seqtype.__name__}\n" ) stubfile.write("\n\nclass Sequences(Sequences):\n") for group in self.model.sequences: classname = type(group).__name__ stubfile.write(f" {group.name}: {classname}\n") stubfile.write( "\n\nclass Model(Model):\n" " parameters: Parameters\n" " sequences: Sequences\n" ) for method in self.model.get_methods(): stubfile.write( f" {method.__name__.lower()}: hydpy.core.modeltools.Method\n" ) stubfile.write("\n\nmodel: Model\n") stubfile.write("parameters: Parameters\n") stubfile.write("sequences: Sequences\n") for subpars in self.model.parameters: classname = f"{subpars.name.capitalize()}Parameters" stubfile.write(f"{subpars.name}: {classname}\n") for subseqs in self.model.sequences: classname = type(subseqs).__name__ stubfile.write(f"{subseqs.name}: {classname}\n") if self.model.parameters.control: for partype in self.model.parameters.control.CLASSES: stubfile.write( f"{partype.__name__.lower()}: " f"{partype.__module__}.{partype.__name__}\n" )
[docs] class FuncConverter: """Helper class for class |PyxWriter| that analyses Python functions and provides the required Cython code via property |FuncConverter.pyxlines|.""" model: modeltools.Model funcname: str func: Union[types.MethodType, Callable[[modeltools.Model], None]] inline: bool def __init__( self, model: modeltools.Model, funcname: str, func: Union[types.MethodType, Callable[[modeltools.Model], None]], inline: bool = True, ) -> None: self.model = model self.funcname = funcname self.func = func self.inline = inline @property def realfunc(self) -> Callable: """The "real" function, as as defined by the model developer or user.""" if (reusablemethod := self.reusablemethod) is not None: return reusablemethod.__call__ return self.func @property def argnames(self) -> list[str]: """The argument names of the current function. >>> from hydpy.cythons.modelutils import FuncConverter >>> from hydpy import prepare_model, pub >>> with pub.options.usecython(False): ... model = prepare_model("hland_96") >>> FuncConverter(model, None, model.calc_tc_v1).argnames ['model'] """ return inspect.getargs(self.realfunc.__code__)[0] @property def varnames(self) -> tuple[str, ...]: """The variable names of the current function. >>> from hydpy.cythons.modelutils import FuncConverter >>> from hydpy import prepare_model, pub >>> with pub.options.usecython(False): ... model = prepare_model("hland_96") >>> FuncConverter(model, None, model.calc_tc_v1).varnames ('self', 'con', 'der', 'inp', 'fac', 'k') """ return tuple( vn if vn != "model" else "self" for vn in self.realfunc.__code__.co_varnames ) @property def locnames(self) -> list[str]: """The variable names of the handled function except for the argument names. >>> from hydpy.cythons.modelutils import FuncConverter >>> from hydpy import prepare_model, pub >>> with pub.options.usecython(False): ... model = prepare_model("hland_96") >>> FuncConverter(model, None, model.calc_tc_v1).locnames ['self', 'con', 'der', 'inp', 'fac', 'k'] """ return [vn for vn in self.varnames if vn not in self.argnames] @property def subgroupnames(self) -> list[str]: """The complete names of the subgroups relevant for the current function. >>> from hydpy.cythons.modelutils import FuncConverter >>> from hydpy import prepare_model, pub >>> with pub.options.usecython(False): ... model = prepare_model("hland_96") >>> FuncConverter(model, None, model.calc_tc_v1).subgroupnames ['parameters.control', 'parameters.derived', 'sequences.inputs', \ 'sequences.factors'] """ names = [] for groupname in ("parameters", "sequences"): for subgroup in getattr(self.model, groupname): if subgroup.name[:3] in self.varnames: names.append(groupname + "." + subgroup.name) if "old" in self.varnames: names.append("sequences.old_states") if "new" in self.varnames: names.append("sequences.new_states") return names @property def subgroupshortcuts(self) -> list[str]: """The abbreviated names of the subgroups relevant for the current function. >>> from hydpy.cythons.modelutils import FuncConverter >>> from hydpy import prepare_model, pub >>> with pub.options.usecython(False): ... model = prepare_model("hland_96") >>> FuncConverter(model, None, model.calc_tc_v1).subgroupshortcuts ['con', 'der', 'inp', 'fac'] """ return [name.split(".")[-1][:3] for name in self.subgroupnames] @property def untypedvarnames(self) -> list[str]: """The names of the untyped variables used in the current function. >>> from hydpy.cythons.modelutils import FuncConverter >>> from hydpy import prepare_model, pub >>> with pub.options.usecython(False): ... model = prepare_model("hland_96") >>> FuncConverter(model, None, model.calc_tc_v1).untypedvarnames ['k'] """ return [ name for name in self.varnames if name not in self.subgroupshortcuts + ["self"] ] @property def untypedarguments(self) -> list[str]: """The names of the untyped arguments used by the current function. >>> from hydpy.cythons.modelutils import FuncConverter >>> from hydpy import prepare_model, pub >>> with pub.options.usecython(False): ... model = prepare_model("hland_96") >>> FuncConverter(model, None, model.calc_tc_v1).untypedarguments [] """ defline = self.cleanlines[0] return [ name for name in self.untypedvarnames if ((f", {name}," in defline) or (f", {name})" in defline)) ] @property def untypedinternalvarnames(self) -> list[str]: """The names of the untyped variables used in the current function except for those of the arguments. >>> from hydpy.cythons.modelutils import FuncConverter >>> from hydpy import prepare_model, pub >>> with pub.options.usecython(False): ... model = prepare_model("hland_96") >>> FuncConverter(model, None, model.calc_tc_v1).untypedinternalvarnames ['k'] """ return [ name for name in self.untypedvarnames if name not in self.untypedarguments ] @property def reusablemethod(self) -> Optional[type[modeltools.ReusableMethod]]: """If the currently handled function object is a reusable method, return the corresponding subclass of |ReusableMethod|.""" if isinstance(method_of_model := self.func, types.MethodType): maybe_method_of_reusablemethod = method_of_model.__func__ if isinstance(maybe_method_of_reusablemethod, types.MethodType): maybe_reusablemethod = maybe_method_of_reusablemethod.__self__ if isinstance(maybe_reusablemethod, type) and issubclass( maybe_reusablemethod, modeltools.ReusableMethod ): return maybe_reusablemethod return None @property def cleanlines(self) -> list[str]: """The leaned code lines of the current function. The implemented cleanups: * eventually, remove method version * remove all docstrings * remove all comments * remove all empty lines * remove line bracks within brackets * remove the phrase `modelutils` * remove all lines containing the phrase `fastaccess` * replace all shortcuts with complete reference names * replace " model." with " self." * remove ".values" and "value" * remove the ": float" annotation """ code = inspect.getsource(self.realfunc) code = "\n".join(code.split('"""')[::2]) code = code.replace("modelutils.", "") code = code.replace(" model.", " self.") code = code.replace("[model.", "[self.") code = code.replace("(model.", "(self.") code = code.replace("(model)", "(self)") code = code.replace(".values", "") code = code.replace(".value", "") code = code.replace(": float", "") for name, shortcut in zip(self.subgroupnames, self.subgroupshortcuts): code = code.replace(f"{shortcut}.", f"self.{name}.") code = self.remove_linebreaks_within_equations(code) lines = code.splitlines() self.remove_imath_operators(lines) if lines[0].lstrip().startswith("@"): del lines[0] # remove @staticmethod indent = len(lines[0]) - len(lines[0].lstrip()) lines = [line[indent:] for line in lines] # normalise indentation argnames = self.argnames argnames[0] = "self" lines[0] = f"def {self.funcname}({', '.join(argnames)}):" lines = [line.split("#")[0] for line in lines] lines = [line for line in lines if "fastaccess" not in line] lines = [line.rstrip() for line in lines if line.rstrip()] return Lines(*lines)
[docs] @staticmethod def remove_linebreaks_within_equations(code: str) -> str: r"""Remove line breaks within equations. The following example is not an exhaustive test but shows how the method works in principle: >>> code = "asdf = \\\n(a\n+b)" >>> from hydpy.cythons.modelutils import FuncConverter >>> FuncConverter.remove_linebreaks_within_equations(code) 'asdf = (a+b)' """ code = code.replace("\\\n", "") chars = [] counter = 0 for char in code: if char in ("(", "[", "{"): counter += 1 elif char in (")", "]", "}"): counter -= 1 if not (counter and (char == "\n")): chars.append(char) return "".join(chars)
[docs] @staticmethod def remove_imath_operators(lines: list[str]) -> None: """Remove mathematical expressions that require Pythons global interpreter locking mechanism. The following example is not an exhaustive test but shows how the method works in principle: >>> lines = [" x += 1*1"] >>> from hydpy.cythons.modelutils import FuncConverter >>> FuncConverter.remove_imath_operators(lines) >>> lines [' x = x + (1*1)'] """ for idx, line in enumerate(lines): for operator_ in ("+=", "-=", "**=", "*=", "//=", "/=", "%="): sublines = line.split(operator_) if len(sublines) > 1: indent = line.count(" ") - line.lstrip().count(" ") sublines = [sl.strip() for sl in sublines] line = ( f"{indent*' '}{sublines[0]} = " f"{sublines[0]} {operator_[:-1]} ({sublines[1]})" ) lines[idx] = line
@property def pyxlines(self) -> Lines: """Cython code lines of the current function. Assumptions: * The function shall be a method. * Annotations specify all argument and return types. * Non-default argument and return types are translate to "modulename.classname" strings. * Local variables are generally of type `int` but of type `double` when their name starts with `d_`. * Identical type names in Python and Cython when casting. We import some classes and prepare a pure-Python instance of application model |hland_96|: >>> from types import MethodType >>> from hydpy.core.modeltools import Method, Model >>> from hydpy.core.typingtools import Vector >>> from hydpy.cythons.modelutils import FuncConverter >>> from hydpy import prepare_model, pub >>> with pub.options.usecython(False): ... model = prepare_model("hland_96") First, we show an example of a standard method without additional arguments and returning nothing but requiring two local variables: >>> class Calc_Test_V1(Method): ... @staticmethod ... def __call__(model: Model) -> None: ... con = model.parameters.control.fastaccess ... flu = model.sequences.fluxes.fastaccess ... inp = model.sequences.inputs.fastaccess ... for k in range(con.nmbzones): ... d_pc = con.kg[k]*inp.p[k] ... flu.pc[k] = d_pc >>> model.calc_test_v1 = MethodType(Calc_Test_V1.__call__, model) >>> FuncConverter( ... model, "calc_test_v1", model.calc_test_v1 ... ).pyxlines # doctest: +ELLIPSIS cpdef inline void calc_test_v1(self) noexcept nogil: cdef double d_pc cdef ...int... k for k in range(self.parameters.control.nmbzones): d_pc = self.parameters.control.kg[k]*self.sequences.inputs.p[k] self.sequences.fluxes.pc[k] = d_pc <BLANKLINE> The second example shows that `float` and `Vector` annotations translate into `double` and `double[:]` types, respectively: >>> class Calc_Test_V2(Method): ... @staticmethod ... def __call__(model: Model, value: float, values: Vector) -> float: ... con = model.parameters.control.fastaccess ... return con.kg[0]*value*values[1] >>> model.calc_test_discontinous = MethodType(Calc_Test_V2.__call__, model) >>> FuncConverter( ... model, "calc_test_discontinous", model.calc_test_discontinous ... ).pyxlines cpdef inline double calc_test_discontinous(self, double value, double[:] \ values) noexcept nogil: return self.parameters.control.kg[0]*value*values[1] <BLANKLINE> Third, Python's standard cast function translates into Cython's cast syntax: >>> from hydpy.interfaces import routinginterfaces >>> class Calc_Test_V3(Method): ... @staticmethod ... def __call__(model: Model) -> routinginterfaces.StorageModel_V1: ... return cast(routinginterfaces.StorageModel_V1, model.soilmodel) >>> model.calc_test_stiff1d = MethodType(Calc_Test_V3.__call__, model) >>> FuncConverter(model, "calc_test_stiff1d", model.calc_test_stiff1d).pyxlines cpdef inline masterinterface.MasterInterface calc_test_stiff1d(self) noexcept \ nogil: return (<masterinterface.MasterInterface>self.soilmodel) <BLANKLINE> >>> class Calc_Test_V4(Method): ... @staticmethod ... def __call__(model: Model) -> None: ... cast( ... Union[ ... routinginterfaces.RoutingModel_V1, ... routinginterfaces.RoutingModel_V2, ... ], ... model.routingmodels[0], ... ).get_partialdischargedownstream() >>> model.calc_test_v4 = MethodType(Calc_Test_V4.__call__, model) >>> FuncConverter(model, "calc_test_v4", model.calc_test_v4).pyxlines cpdef inline void calc_test_v4(self) noexcept nogil: (<masterinterface.MasterInterface>self.routingmodels[0]).\ get_partialdischargedownstream() <BLANKLINE> """ def _get_cytype(name_: str) -> str: pytype = annotations_[name_] if pytype in TYPE2STR: return TYPE2STR[pytype] if pytype.__module__.startswith("hydpy.interfaces."): return "masterinterface.MasterInterface" return f"{pytype.__module__.split('.')[-1]}.{pytype.__name__}" annotations_ = get_type_hints(self.realfunc) lines = self.cleanlines lines[0] = lines[0].lower() inline = " inline" if self.inline else "" lines[0] = lines[0].replace("def ", f"cpdef{inline} {_get_cytype('return')} ") lines[0] = lines[0].replace("):", f"){_nogil}:") if (reusablemethod := self.reusablemethod) is not None: lines[0] = lines[0].replace("(self, model", "(self") for i in range(1, len(lines)): lines[i] = f" {lines[i]}" lines.insert(1, f" if not self.{reusablemethod.REUSEMARKER}:") lines.append(f" self.{reusablemethod.REUSEMARKER} = True") for name in self.untypedarguments: cytype = _get_cytype(name) lines[0] = lines[0].replace(f", {name},", f", {cytype} {name},") lines[0] = lines[0].replace(f", {name})", f", {cytype} {name})") code = inspect.getsource(self.realfunc) for name in self.untypedinternalvarnames: if (f" {name}: float" in code) or name.startswith("d_"): cytype = TYPE2STR[float] else: cytype = TYPE2STR[int] lines.insert(1, f" cdef {cytype} {name}") for idx, line in enumerate(lines): if "cast(" in line: part1, _, part23 = line.partition("cast(") part2, _, part3 = part23.partition(")") part2 = part2.strip().strip(",").rpartition(",")[2].strip() lines[idx] = f"{part1}(<masterinterface.MasterInterface>{part2}){part3}" return Lines(*lines)
[docs] def get_callbackcymodule( model: modeltools.Model, parameter: parametertools.CallbackParameter, callback: Callable[[modeltools.Model], None], ) -> types.ModuleType: """Return the cython module containing the required callback module after, if necessary, creating or updating.""" basename = f"callback_{model}_{parameter.name}_{callback.__name__}" pyfilepath = os.path.join(autogenpath, f"{basename}.pysource") pycode = inspect.getsource(callback) lines = pycode.split("\n") indent = len(lines[0]) - len(lines[0].lstrip()) pycode = "\n".join(line[indent:] for line in lines) refresh = True if os.path.exists(pyfilepath): with open(pyfilepath, "r", encoding=config.ENCODING) as sf: refresh = pycode != sf.read() if refresh: if (module := model.__module__).count(".") == 3: module = module.rpartition(".")[0] cythonizer = importlib.import_module(module).cythonizer assert isinstance(cythonizer, Cythonizer) preamble = PyxPxdLines() cythonizer.pyxwriter.cythondistutilsoptions(preamble) cythonizer.pyxwriter.cimports(preamble) pyx = FuncConverter( model=model, funcname=callback.__name__, func=callback, inline=False ).pyxlines pyx.insert( 0, f"from hydpy.cythons.autogen.c_{model} cimport Model, CallbackWrapper\n" ) pyx[1] = pyx[1].replace("cpdef ", "cdef ").replace("(self)", "(Model self)") pyx.add(0, "") pyx.add(0, "cdef class MyCallbackWrapper(CallbackWrapper):") pyx.add(1, "def __call__(self, model):") pyx.add(2, "self.callback(model.cymodel)") pyx.add(1, "def get_name(self):") pyx.add(2, f'return "{callback.__name__}"') pyx.add(1, "def get_sourcecode(self):") pyx.add(2, f"return {repr(pycode.strip())}") pyx.add(0, "") pyx.add(0, "cpdef get_wrapper():") pyx.add(1, "cdef MyCallbackWrapper wrapper = MyCallbackWrapper()") pyx.add(1, f"wrapper.callback = {callback.__name__}") pyx.add(1, "return wrapper") pyxfilepath = os.path.join(autogenpath, f"{basename}.pyx") with open(pyxfilepath, "w", encoding=config.ENCODING) as file_: file_.write("\n".join(preamble.pyx + pyx)) compile_( cyname=basename, pyxfilepath=pyxfilepath, buildpath=os.path.join(autogenpath, "_build"), ) move_dll( pyname=model.name, cyname=basename, cydirpath=autogenpath, buildpath=os.path.join(autogenpath, "_build"), ) with open(pyfilepath, "w", encoding=config.ENCODING) as sf: sf.write(pycode) return importlib.import_module(f"hydpy.cythons.autogen.{basename}")
[docs] def exp(double: float) -> float: """Cython wrapper for the |numpy.exp| function of module |numpy| applied on a single |float| object. >>> from hydpy.cythons.modelutils import exp >>> from unittest import mock >>> with mock.patch("numpy.exp") as func: ... _ = exp(123.4) >>> func.call_args call(123.4) """ return numpy.exp(double)
[docs] def log(double: float) -> float: """Cython wrapper for the |numpy.log| function of module |numpy| applied on a single |float| object. >>> from hydpy.cythons.modelutils import log >>> from unittest import mock >>> with mock.patch("numpy.log") as func: ... _ = log(123.4) >>> func.call_args call(123.4) """ return numpy.log(double)
[docs] def fabs(double: float) -> float: """Cython wrapper for the |math.exp| function of module |math| applied on a single |float| object. >>> from hydpy.cythons.modelutils import fabs >>> from unittest import mock >>> with mock.patch("math.fabs") as func: ... _ = fabs(123.4) >>> func.call_args call(123.4) """ return math.fabs(double)
[docs] def sin(double: float) -> float: """Cython wrapper for the |numpy.sin| function of module |numpy| applied on a single |float| object. >>> from hydpy.cythons.modelutils import sin >>> from unittest import mock >>> with mock.patch("numpy.sin") as func: ... _ = sin(123.4) >>> func.call_args call(123.4) """ return numpy.sin(double)
[docs] def cos(double: float) -> float: """Cython wrapper for the |numpy.cos| function of module |numpy| applied on a single |float| object. >>> from hydpy.cythons.modelutils import cos >>> from unittest import mock >>> with mock.patch("numpy.cos") as func: ... _ = cos(123.4) >>> func.call_args call(123.4) """ return numpy.cos(double)
[docs] def tan(double: float) -> float: """Cython wrapper for the |numpy.tan| function of module |numpy| applied on a single |float| object. >>> from hydpy.cythons.modelutils import tan >>> from unittest import mock >>> with mock.patch("numpy.tan") as func: ... _ = tan(123.4) >>> func.call_args call(123.4) """ return numpy.tan(double)
[docs] def asin(double: float) -> float: """Cython wrapper for the |numpy.arcsin| function of module |numpy| applied on a single |float| object. >>> from hydpy.cythons.modelutils import asin >>> from unittest import mock >>> with mock.patch("numpy.arcsin") as func: ... _ = asin(123.4) >>> func.call_args call(123.4) """ return numpy.arcsin(double)
[docs] def acos(double: float) -> float: """Cython wrapper for the |numpy.arccos| function of module |numpy| applied on a single |float| object. >>> from hydpy.cythons.modelutils import acos >>> from unittest import mock >>> with mock.patch("numpy.arccos") as func: ... _ = acos(123.4) >>> func.call_args call(123.4) """ return numpy.arccos(double)
[docs] def atan(double: float) -> float: """Cython wrapper for the |numpy.arctan| function of module |numpy| applied on a single |float| object. >>> from hydpy.cythons.modelutils import atan >>> from unittest import mock >>> with mock.patch("numpy.arctan") as func: ... _ = atan(123.4) >>> func.call_args call(123.4) """ return numpy.arctan(double)
[docs] def isnan(double: float) -> float: """Cython wrapper for the |numpy.isnan| function of module |numpy| applied on a single |float| object. >>> from hydpy.cythons.modelutils import isnan >>> from unittest import mock >>> with mock.patch("numpy.isnan") as func: ... _ = isnan(123.4) >>> func.call_args call(123.4) """ return numpy.isnan(double)
[docs] def isinf(double: float) -> float: """Cython wrapper for the |numpy.isinf| function of module |numpy| applied on a single |float| object. >>> from hydpy.cythons.modelutils import isnan >>> from unittest import mock >>> with mock.patch("numpy.isinf") as func: ... _ = isinf(123.4) >>> func.call_args call(123.4) """ return numpy.isinf(double)