threadingtools

This module implements HydPy’s fundamental features for performing multi-threaded simulation runs.

Module threadingtools implements the following members:

  • Parallelisability An analyser for the “parallelisability” of a network.

  • Queue A “Last In - First Out” queue for executing the parallelisable parts of a network more efficiently via multi-threading.

  • Worker A worker that interacts with the current Queue instance and is responsible for processing nodes and elements in an individual thread.

  • check_threading() Execute one “sequential”, then two “parallel”, and finally another “sequential” simulation run, and check if the time series calculated for all nodes are identical in all cases.


class hydpy.core.threadingtools.Parallelisability(nodes: Nodes, elements: Elements)[source]

Bases: object

An analyser for the “parallelisability” of a network.

We explain the meaning of the members of class Parallelisability based on the small example project created by function prepare_receiver_example():

>>> from hydpy.core.testtools import prepare_receiver_example
>>> hp, pub = prepare_receiver_example()

You can initialise Parallelisability objects manually, but it usually is sufficient to access the object made available by property parallelisability of class HydPy.

>>> p = hp.parallelisability

The members parallel_elements and sequential_elements divide those elements that can be simulated in parallel and those that must be simulated sequentially:

>>> p.parallel_elements
Elements("l1", "l2", "l3")
>>> p.sequential_elements
Elements("d", "s12", "s23", "s34")

The members parallel_nodes and sequential_nodes list all nodes connected to at least one element that is an item of parallel_elements or sequential_elements, respectively:

>>> p.parallel_nodes
Nodes("n1a", "n2", "n3")
>>> p.sequential_nodes
Nodes("n1a", "n1b", "n2", "n3", "n4")

Nodes that mark a transition between the “parallelisable” and “non-parallelisable” parts of a network, and are thus listed both in parallel_nodes and sequential_nodes, need special treatment during simulation runs and are hence separately available via the member transition_nodes:

>>> p.transition_nodes
Nodes("n1a", "n2", "n3")

The reason why the analysed network is not completely parallelisable is the usage of the receiver node mechanism. The dam_v001 model handled by element d tries to prevent low flow situations at a downstream location and therefore gets information from this place via the receiver node n2. Hence, calculating the latest water release of d requires up-to-date discharge information from n2, which prevents applying the “temporal chunking” strategy, on which HydPy’s multi-threading approach is grounded:

>>> hp.elements.d
Element("d",
        inlets="n1a",
        outlets="n1b",
        receivers="n2")

However, if we tell node n2 not to pass freshly simulated but already available discharge values to d by changing its deploymode, the need for d to wait for other models to contribute to the total discharge at n2 disappears, and the complete network becomes parallelisable:

>>> hp.nodes.n2.deploymode
'newsim'
>>> hp.nodes.n2.deploymode = "oldsim"
>>> p = hp.parallelisability
>>> p.parallel_elements
Elements("d", "l1", "l2", "l3", "s12", "s23", "s34")
>>> p.sequential_elements
Elements()
>>> p.parallel_nodes
Nodes("n1a", "n1b", "n2", "n3", "n4")
>>> p.sequential_nodes
Nodes()
>>> p.transition_nodes
Nodes()
sequential_elements: Elements

All non-parallelisable elements.

parallel_elements: Elements

All parallelisable elements.

sequential_nodes: Nodes

All nodes with a connection to at least one non-parallelisable element.

parallel_nodes: Nodes

All nodes with a connection to at least one parallelisable element.

transition_nodes: Nodes

All nodes with a connection to at least one parallelisable and one non-parallelisable element.

class hydpy.core.threadingtools.Queue(*, starters: Sequence[devicetools.NodeOrElement], dependencies: Mapping[devicetools.NodeOrElement, int], upstream2downstream: Mapping[devicetools.NodeOrElement, Sequence[devicetools.NodeOrElement]])[source]

Bases: LifoQueue[Node | Element]

A “Last In - First Out” queue for executing the parallelisable parts of a network more efficiently via multi-threading.

For most users, Queue is more of an implementation detail that is never directly used. However, if you intend to quench the last bit of performance out of HydPy, you can create custom Queue instances that fit better to your network and model configurations at hand, and pass them to method simulate_multithreaded().

upstream2downstream: Final[Mapping[devicetools.NodeOrElement, Sequence[devicetools.NodeOrElement]]]

A mapping that lists for all nodes and elements, which neighbouring elements or nodes must wait for their processing completion. In other words, each key (node or element) represents one of the dependencies of the corresponding list’s items (elements or nodes).

starters: Final[Sequence[devicetools.NodeOrElement]]

All nodes and elements that do not have any dependencies. These can be processed right at the start of a multi-threaded simulation.

dependencies: Final[Mapping[devicetools.NodeOrElement, int]]

The number of dependencies of all nodes and elements. An element with three dependencies is connected to three nodes that must be processed until itself has all information required simulation.

classmethod from_devices(*, nodes: Nodes, elements: Elements) Self[source]

Create a new Queue instance and determine its members from the given (parallelisable) devices.

classmethod from_queue(*, queue_: Self) Self[source]

Create a new Queue instance and take its members from the given Queue instance.

This copy-like mechanism makes the information contained by old Queue instances already used in a multi-threaded simulation run reusable.

register() None[source]

Put all starters into the queue.

task_done(upstream: devicetools.NodeOrElement | BaseException) None[source]

Take the given readily processed node or element, determine which elements or nodes become ready for processing, and put them into the queue.

join() None[source]

Block the queue until all nodes and elements have been processed.

If something is wrong, method join() tries (with the help of method run() of class Worker) to report the first error that occurred in one of the threads:

>>> from hydpy.core.testtools import prepare_full_example_2
>>> hp, pub, TestIO = prepare_full_example_2()
>>> for element in hp.elements:
...     element.model.simulate_period = None
>>> with pub.options.threads(4):
...     hp.simulate()
Traceback (most recent call last):
...
TypeError: 'NoneType' object is not callable
class hydpy.core.threadingtools.Worker(queue_: Queue, elements: Elements)[source]

Bases: Thread

A worker that interacts with the current Queue instance and is responsible for processing nodes and elements in an individual thread.

run() None[source]

Query the next device from the current Queue instance, update the relevant time series data, and perform a simulation if the device is an element.

hydpy.core.threadingtools.check_threading(hp: HydPy, sequence: IOSequence, pause: Date | None = None) None[source]

Execute one “sequential”, then two “parallel”, and finally another “sequential” simulation run, and check if the time series calculated for all nodes are identical in all cases.

>>> from hydpy.core.threadingtools import check_threading

Tests based on the HydPy-L-Land example project (with the primary goal of checking if all deploy modes are properly supported):

>>> from hydpy.core.testtools import prepare_full_example_2
>>> hp, pub, TestIO = prepare_full_example_2()
>>> leun = hp.nodes.lahn_leun
>>> kalk = hp.nodes.lahn_kalk
>>> check_threading(hp, kalk.sequences.sim)
54.019337, 37.257561, 31.865308, 28.359542
>>> check_threading(hp, kalk.sequences.sim, "1996-01-03")
54.019337, 37.257561, 31.865308, 28.359542
>>> leun.deploymode = "oldsim"
>>> leun.sequences.sim.series -= 10.0
>>> check_threading(hp, kalk.sequences.sim)
44.019337, 27.257561, 21.865308, 18.359542
>>> leun.deploymode = "obs"
>>> leun.sequences.obs.series = 0.0
>>> check_threading(hp, kalk.sequences.sim)
11.672862, 10.100089, 8.984317, 8.202706
>>> leun.deploymode = "obs"
>>> leun.sequences.obs.series = 0.0
>>> check_threading(hp, kalk.sequences.sim)
11.672862, 10.100089, 8.984317, 8.202706
>>> from numpy import nan
>>> with pub.options.checkseries(False):
...     leun.sequences.obs.series= 0.0, nan, 0.0, nan
>>> check_threading(hp, kalk.sequences.sim)
11.672862, nan, 8.984317, nan
>>> leun.deploymode = "obs_newsim"
>>> check_threading(hp, kalk.sequences.sim)
11.672862, 37.257561, 8.984317, 28.359542
>>> leun.deploymode = "obs_oldsim"
>>> leun.sequences.sim.series = 32.3697, 17.210443, 12.930066, 10.20133
>>> check_threading(hp, kalk.sequences.sim)
11.672862, 27.310532, 8.984317, 18.404036
>>> leun.deploymode = "newsim"
>>> leun.sequences.sim.series = 32.3697, 17.210443, 12.930066, 10.20133
>>> check_threading(hp, kalk.sequences.sim)
54.019337, 37.257561, 31.865308, 28.359542

Tests based on the interpolation example project (with the primary goal of checking if the input node mechanism is properly supported within the parallelisable part of a network):

>>> from hydpy.core.testtools import prepare_interpolation_example
>>> hp, pub = prepare_interpolation_example()
>>> check_threading(hp, hp.nodes.q12.sequences.sim)
0.853716, 0.864633, 1.037645
>>> hp.nodes.in1.deploymode = "oldsim"
>>> hp.nodes.in1.sequences.sim.series = hp.nodes.in1.sequences.obs.series
>>> hp.nodes.in1.sequences.obs.series = 0.0
>>> check_threading(hp, hp.nodes.q12.sequences.sim)
0.853716, 0.864633, 1.037645

Tests based on the receiver example project (with the primary goal of checking if all deploy modes are properly supported around the transitions between the parallelisable and the non-parallelisable parts of a network):

>>> from hydpy.core.testtools import prepare_receiver_example
>>> hp, pub = prepare_receiver_example()
>>> check_threading(hp, hp.nodes.n4.sequences.sim)
4.649878, 4.1042, 3.669253, 3.480431, 3.363932, 3.263707
>>> from hydpy import print_vector
>>> print_vector(hp.elements.d.model.sequences.receivers.q.series)
2.324939, 2.0521, 1.834626, 1.822902, 1.853202, 1.876488
>>> hp.nodes.n2.deploymode = "oldsim"
>>> check_threading(hp, hp.nodes.n4.sequences.sim)
4.649878, 4.1042, 3.669253, 3.480431, 3.363932, 3.263707
>>> print_vector(hp.elements.d.model.sequences.receivers.q.series)
2.324939, 2.0521, 1.834626, 1.822902, 1.853202, 1.876488
>>> hp.nodes.n2.deploymode = "obs"
>>> hp.nodes.n2.sequences.obs.series = hp.nodes.n2.sequences.sim.series
>>> hp.nodes.n2.sequences.sim.series = 0.0
>>> check_threading(hp, hp.nodes.n4.sequences.sim)
4.649878, 4.1042, 3.669253, 3.480431, 3.363932, 3.263707
>>> hp.nodes.n2.deploymode = "obs_oldsim"
>>> hp.nodes.n2.sequences.obs.series[2] = nan
>>> check_threading(hp, hp.nodes.n4.sequences.sim)
4.649878, 4.1042, 3.669253, 3.480431, 3.363932, 3.263707
>>> hp.nodes.n2.deploymode = "obs_newsim"
>>> hp.nodes.n2.sequences.obs.series[2] = nan
>>> check_threading(hp, hp.nodes.n4.sequences.sim)
4.649878, 4.1042, 3.669253, 3.480431, 3.363932, 3.263707

Tests based on the collective example project (with the primary goals of checking if collective elements and the output node mechanism are properly supported):

>>> from hydpy.core.testtools import prepare_collective_example
>>> hp, pub = prepare_collective_example()
>>> check_threading(hp, hp.nodes.c3_out.sequences.sim)
0.409196, 0.386017, 0.337494, 0.279784, 0.203433, 0.071322