threadingtools¶
This module implements HydPy’s fundamental features for performing multi-threaded simulation runs.
Module threadingtools
implements the following members:
Parallelisability
An analyser for the “parallelisability” of a network.
Queue
A “Last In - First Out” queue for executing the parallelisable parts of a network more efficiently via multi-threading.
Worker
A worker that interacts with the currentQueue
instance and is responsible for processing nodes and elements in an individual thread.
check_threading()
Execute one “sequential”, then two “parallel”, and finally another “sequential” simulation run, and check if the time series calculated for all nodes are identical in all cases.
- class hydpy.core.threadingtools.Parallelisability(nodes: Nodes, elements: Elements)[source]¶
Bases:
object
An analyser for the “parallelisability” of a network.
We explain the meaning of the members of class
Parallelisability
based on the small example project created by functionprepare_receiver_example()
:>>> from hydpy.core.testtools import prepare_receiver_example >>> hp, pub = prepare_receiver_example()
You can initialise
Parallelisability
objects manually, but it usually is sufficient to access the object made available by propertyparallelisability
of classHydPy
.>>> p = hp.parallelisability
The members
parallel_elements
andsequential_elements
divide those elements that can be simulated in parallel and those that must be simulated sequentially:>>> p.parallel_elements Elements("l1", "l2", "l3") >>> p.sequential_elements Elements("d", "s12", "s23", "s34")
The members
parallel_nodes
andsequential_nodes
list all nodes connected to at least one element that is an item ofparallel_elements
orsequential_elements
, respectively:>>> p.parallel_nodes Nodes("n1a", "n2", "n3") >>> p.sequential_nodes Nodes("n1a", "n1b", "n2", "n3", "n4")
Nodes that mark a transition between the “parallelisable” and “non-parallelisable” parts of a network, and are thus listed both in
parallel_nodes
andsequential_nodes
, need special treatment during simulation runs and are hence separately available via the membertransition_nodes
:>>> p.transition_nodes Nodes("n1a", "n2", "n3")
The reason why the analysed network is not completely parallelisable is the usage of the receiver node mechanism. The
dam_v001
model handled by element d tries to prevent low flow situations at a downstream location and therefore gets information from this place via the receiver node n2. Hence, calculating the latest water release of d requires up-to-date discharge information from n2, which prevents applying the “temporal chunking” strategy, on which HydPy’s multi-threading approach is grounded:>>> hp.elements.d Element("d", inlets="n1a", outlets="n1b", receivers="n2")
However, if we tell node n2 not to pass freshly simulated but already available discharge values to d by changing its
deploymode
, the need for d to wait for other models to contribute to the total discharge at n2 disappears, and the complete network becomes parallelisable:>>> hp.nodes.n2.deploymode 'newsim' >>> hp.nodes.n2.deploymode = "oldsim" >>> p = hp.parallelisability >>> p.parallel_elements Elements("d", "l1", "l2", "l3", "s12", "s23", "s34") >>> p.sequential_elements Elements() >>> p.parallel_nodes Nodes("n1a", "n1b", "n2", "n3", "n4") >>> p.sequential_nodes Nodes() >>> p.transition_nodes Nodes()
- class hydpy.core.threadingtools.Queue(*, starters: Sequence[devicetools.NodeOrElement], dependencies: Mapping[devicetools.NodeOrElement, int], upstream2downstream: Mapping[devicetools.NodeOrElement, Sequence[devicetools.NodeOrElement]])[source]¶
Bases:
LifoQueue
[Node
|Element
]A “Last In - First Out” queue for executing the parallelisable parts of a network more efficiently via multi-threading.
For most users,
Queue
is more of an implementation detail that is never directly used. However, if you intend to quench the last bit of performance out of HydPy, you can create customQueue
instances that fit better to your network and model configurations at hand, and pass them to methodsimulate_multithreaded()
.- upstream2downstream: Final[Mapping[devicetools.NodeOrElement, Sequence[devicetools.NodeOrElement]]]¶
A mapping that lists for all nodes and elements, which neighbouring elements or nodes must wait for their processing completion. In other words, each key (node or element) represents one of the dependencies of the corresponding list’s items (elements or nodes).
- starters: Final[Sequence[devicetools.NodeOrElement]]¶
All nodes and elements that do not have any dependencies. These can be processed right at the start of a multi-threaded simulation.
- dependencies: Final[Mapping[devicetools.NodeOrElement, int]]¶
The number of dependencies of all nodes and elements. An element with three dependencies is connected to three nodes that must be processed until itself has all information required simulation.
- classmethod from_devices(*, nodes: Nodes, elements: Elements) Self [source]¶
Create a new
Queue
instance and determine its members from the given (parallelisable) devices.
- classmethod from_queue(*, queue_: Self) Self [source]¶
Create a new
Queue
instance and take its members from the givenQueue
instance.This copy-like mechanism makes the information contained by old
Queue
instances already used in a multi-threaded simulation run reusable.
- task_done(upstream: devicetools.NodeOrElement | BaseException) None [source]¶
Take the given readily processed node or element, determine which elements or nodes become ready for processing, and put them into the queue.
- join() None [source]¶
Block the queue until all nodes and elements have been processed.
If something is wrong, method
join()
tries (with the help of methodrun()
of classWorker
) to report the first error that occurred in one of the threads:>>> from hydpy.core.testtools import prepare_full_example_2 >>> hp, pub, TestIO = prepare_full_example_2() >>> for element in hp.elements: ... element.model.simulate_period = None >>> with pub.options.threads(4): ... hp.simulate() Traceback (most recent call last): ... TypeError: 'NoneType' object is not callable
- class hydpy.core.threadingtools.Worker(queue_: Queue, elements: Elements)[source]¶
Bases:
Thread
A worker that interacts with the current
Queue
instance and is responsible for processing nodes and elements in an individual thread.
- hydpy.core.threadingtools.check_threading(hp: HydPy, sequence: IOSequence, pause: Date | None = None) None [source]¶
Execute one “sequential”, then two “parallel”, and finally another “sequential” simulation run, and check if the time series calculated for all nodes are identical in all cases.
>>> from hydpy.core.threadingtools import check_threading
Tests based on the HydPy-L-Land example project (with the primary goal of checking if all deploy modes are properly supported):
>>> from hydpy.core.testtools import prepare_full_example_2 >>> hp, pub, TestIO = prepare_full_example_2() >>> leun = hp.nodes.lahn_leun >>> kalk = hp.nodes.lahn_kalk
>>> check_threading(hp, kalk.sequences.sim) 54.019337, 37.257561, 31.865308, 28.359542
>>> check_threading(hp, kalk.sequences.sim, "1996-01-03") 54.019337, 37.257561, 31.865308, 28.359542
>>> leun.deploymode = "oldsim" >>> leun.sequences.sim.series -= 10.0 >>> check_threading(hp, kalk.sequences.sim) 44.019337, 27.257561, 21.865308, 18.359542
>>> leun.deploymode = "obs" >>> leun.sequences.obs.series = 0.0 >>> check_threading(hp, kalk.sequences.sim) 11.672862, 10.100089, 8.984317, 8.202706
>>> leun.deploymode = "obs" >>> leun.sequences.obs.series = 0.0 >>> check_threading(hp, kalk.sequences.sim) 11.672862, 10.100089, 8.984317, 8.202706
>>> from numpy import nan >>> with pub.options.checkseries(False): ... leun.sequences.obs.series= 0.0, nan, 0.0, nan >>> check_threading(hp, kalk.sequences.sim) 11.672862, nan, 8.984317, nan
>>> leun.deploymode = "obs_newsim" >>> check_threading(hp, kalk.sequences.sim) 11.672862, 37.257561, 8.984317, 28.359542
>>> leun.deploymode = "obs_oldsim" >>> leun.sequences.sim.series = 32.3697, 17.210443, 12.930066, 10.20133 >>> check_threading(hp, kalk.sequences.sim) 11.672862, 27.310532, 8.984317, 18.404036
>>> leun.deploymode = "newsim" >>> leun.sequences.sim.series = 32.3697, 17.210443, 12.930066, 10.20133 >>> check_threading(hp, kalk.sequences.sim) 54.019337, 37.257561, 31.865308, 28.359542
Tests based on the interpolation example project (with the primary goal of checking if the input node mechanism is properly supported within the parallelisable part of a network):
>>> from hydpy.core.testtools import prepare_interpolation_example >>> hp, pub = prepare_interpolation_example()
>>> check_threading(hp, hp.nodes.q12.sequences.sim) 0.853716, 0.864633, 1.037645
>>> hp.nodes.in1.deploymode = "oldsim" >>> hp.nodes.in1.sequences.sim.series = hp.nodes.in1.sequences.obs.series >>> hp.nodes.in1.sequences.obs.series = 0.0 >>> check_threading(hp, hp.nodes.q12.sequences.sim) 0.853716, 0.864633, 1.037645
Tests based on the receiver example project (with the primary goal of checking if all deploy modes are properly supported around the transitions between the parallelisable and the non-parallelisable parts of a network):
>>> from hydpy.core.testtools import prepare_receiver_example >>> hp, pub = prepare_receiver_example()
>>> check_threading(hp, hp.nodes.n4.sequences.sim) 4.649878, 4.1042, 3.669253, 3.480431, 3.363932, 3.263707
>>> from hydpy import print_vector >>> print_vector(hp.elements.d.model.sequences.receivers.q.series) 2.324939, 2.0521, 1.834626, 1.822902, 1.853202, 1.876488
>>> hp.nodes.n2.deploymode = "oldsim" >>> check_threading(hp, hp.nodes.n4.sequences.sim) 4.649878, 4.1042, 3.669253, 3.480431, 3.363932, 3.263707
>>> print_vector(hp.elements.d.model.sequences.receivers.q.series) 2.324939, 2.0521, 1.834626, 1.822902, 1.853202, 1.876488
>>> hp.nodes.n2.deploymode = "obs" >>> hp.nodes.n2.sequences.obs.series = hp.nodes.n2.sequences.sim.series >>> hp.nodes.n2.sequences.sim.series = 0.0 >>> check_threading(hp, hp.nodes.n4.sequences.sim) 4.649878, 4.1042, 3.669253, 3.480431, 3.363932, 3.263707
>>> hp.nodes.n2.deploymode = "obs_oldsim" >>> hp.nodes.n2.sequences.obs.series[2] = nan >>> check_threading(hp, hp.nodes.n4.sequences.sim) 4.649878, 4.1042, 3.669253, 3.480431, 3.363932, 3.263707
>>> hp.nodes.n2.deploymode = "obs_newsim" >>> hp.nodes.n2.sequences.obs.series[2] = nan >>> check_threading(hp, hp.nodes.n4.sequences.sim) 4.649878, 4.1042, 3.669253, 3.480431, 3.363932, 3.263707
Tests based on the collective example project (with the primary goals of checking if collective elements and the output node mechanism are properly supported):
>>> from hydpy.core.testtools import prepare_collective_example >>> hp, pub = prepare_collective_example()
>>> check_threading(hp, hp.nodes.c3_out.sequences.sim) 0.409196, 0.386017, 0.337494, 0.279784, 0.203433, 0.071322