colmena.queue
Implementations of the task and result queues from Colmena
colmena.queue.python
Queues built on Python’s native libraries
- class colmena.queue.python.PipeQueues(topics: Collection[str] = (), serialization_method: str | SerializationMethod = SerializationMethod.PICKLE, keep_inputs: bool = True, proxystore_name: str | Dict[str, str] | None = None, proxystore_threshold: int | Dict[str, int] | None = None)
Bases:
ColmenaQueues
Queues using Python’s implementation of multiprocessing Pipes
- Parameters:
topics – Names of topics that are known for this queue
serialization_method – Method used to serialize task inputs and results
keep_inputs – Whether to return task inputs with the result object
proxystore_name (str, dict) – Name of a registered ProxyStore Store instance. This can be a single name such that the corresponding Store is used for all topics or a mapping of topics to registered Store names. If a mapping is provided but a topic is not in the mapping, ProxyStore will not be used.
proxystore_threshold (int, dict) – Threshold in bytes for using ProxyStore to transfer objects. Optionally can pass a dict mapping topics to threshold to use different threshold values for different topics. None values in the mapping will exclude ProxyStore use with that topic.
- flush()
Remove all existing results from the queues
colmena.queue.redis
Queues that use Redis
- class colmena.queue.redis.RedisQueues(topics: Collection[str], hostname: str = '127.0.0.1', port: int = 6379, prefix: str = UUID('4df48004-b351-4a79-accc-0bcb7fee5549'), serialization_method: str | SerializationMethod = SerializationMethod.PICKLE, keep_inputs: bool = True, proxystore_name: str | Dict[str, str] | None = None, proxystore_threshold: int | Dict[str, int] | None = None)
Bases:
ColmenaQueues
A basic redis queue for communications used by the task server
A queue is defined by its prefix and a “topic” designation. The full list of available topics is defined when creating the queue, and simplifies writing software that waits for only certain types of messages without needing to manage several “queue” objects. By default, the
get()
methods for the queue listen on all topics and theput()
method pushes to the default topic. You can put messages into certain “topical” queue and wait for responses that are from a single topic.The queue only connects when the connect method is called to avoid issues with passing an object across processes.
- Parameters:
hostname (str) – Hostname of the Redis server
port (int) – Port on which to access Redis
prefix (str) – Name of the Redis queue
topics – Names of topics that are known for this queue
serialization_method – Method used to serialize task inputs and results
keep_inputs – Whether to return task inputs with the result object
proxystore_name (str, dict) – Name of ProxyStore backend to use for all topics or a mapping of topic to ProxyStore backend for specifying backends for certain tasks. If a mapping is provided but a topic is not in the mapping, ProxyStore will not be used.
proxystore_threshold (int, dict) – Threshold in bytes for using ProxyStore to transfer objects. Optionally can pass a dict mapping topics to threshold to use different threshold values for different topics. None values in the mapping will exclude ProxyStore use with that topic.
- connect()
Connect to the Redis server
- disconnect()
Disconnect from the server
Useful if sending the connection object to another process
- property is_connected
colmena.queue.base
Base classes for queues and related functions
- class colmena.queue.base.ColmenaQueues(topics: Collection[str], serialization_method: str | SerializationMethod = SerializationMethod.JSON, keep_inputs: bool = True, proxystore_name: str | Dict[str, str] | None = None, proxystore_threshold: int | Dict[str, int] | None = None)
Bases:
object
Base class for a queue used in Colmena.
Follows the basic
get
andput
semantics of most queues, with the addition of a “topic” used by Colmena to separate task requests or objects used for different purposes.- Parameters:
topics – Names of topics that are known for this queue
serialization_method – Method used to serialize task inputs and results
keep_inputs – Whether to return task inputs with the result object
proxystore_name (str, dict) – Name of a registered ProxyStore Store instance. This can be a single name such that the corresponding Store is used for all topics or a mapping of topics to registered Store names. If a mapping is provided but a topic is not in the mapping, ProxyStore will not be used.
proxystore_threshold (int, dict) – Threshold in bytes for using ProxyStore to transfer objects. Optionally can pass a dict mapping topics to threshold to use different threshold values for different topics. None values in the mapping will exclude ProxyStore use with that topic.
- abstract flush()
Remove all existing results from the queues
- get_result(topic: str = 'default', timeout: float | None = None) Result | None
Get a completed result
- Parameters:
topic – Which topic of task to wait for
timeout – Timeout for waiting for a value
- Returns:
(Result) Result from a computation
- Raises:
TimeoutException if the timeout is met –
- get_task(timeout: float | None = None) Tuple[str, Result]
Get a task object
- Parameters:
timeout (float) – Timeout for waiting for a task
- Returns:
(str) Topic of the calculation. Used in defining which queue to use to send the results
(Result) Task description
- Raises:
TimeoutException – If the timeout on the queue is reached
KillSignalException – If the queue receives a kill signal
- send_inputs(*input_args: Any, method: str | None = None, input_kwargs: Dict[str, Any] | None = None, keep_inputs: bool | None = None, resources: ResourceRequirements | dict | None = None, topic: str = 'default', task_info: Dict[str, Any] | None = None) str
Send a task request
- Parameters:
*input_args (Any) – Positional arguments to a function
method (str) – Name of the method to run. Optional
input_kwargs (dict) – Any keyword arguments for the function being run
keep_inputs (bool) – Whether to override the
topic (str) – Topic for the queue, which sets the topic for the result
resources – Suggestions for how many resources to use for the task
task_info (dict) – Any information used for task tracking
- Returns:
Task ID
- send_kill_signal()
Send the kill signal to the task server