colmena.queue

Implementations of the task and result queues from Colmena

colmena.queue.python

Queues built on Python’s native libraries

class colmena.queue.python.PipeQueues(topics: Collection[str] = (), serialization_method: str | SerializationMethod = SerializationMethod.PICKLE, keep_inputs: bool = True, proxystore_name: str | Dict[str, str] | None = None, proxystore_threshold: int | Dict[str, int] | None = None)

Bases: ColmenaQueues

Queues using Python’s implementation of multiprocessing Pipes

Parameters:
  • topics – Names of topics that are known for this queue

  • serialization_method – Method used to serialize task inputs and results

  • keep_inputs – Whether to return task inputs with the result object

  • proxystore_name (str, dict) – Name of a registered ProxyStore Store instance. This can be a single name such that the corresponding Store is used for all topics or a mapping of topics to registered Store names. If a mapping is provided but a topic is not in the mapping, ProxyStore will not be used.

  • proxystore_threshold (int, dict) – Threshold in bytes for using ProxyStore to transfer objects. Optionally can pass a dict mapping topics to threshold to use different threshold values for different topics. None values in the mapping will exclude ProxyStore use with that topic.

flush()

Remove all existing results from the queues

colmena.queue.redis

Queues that use Redis

class colmena.queue.redis.RedisQueues(topics: Collection[str], hostname: str = '127.0.0.1', port: int = 6379, prefix: str = UUID('4df48004-b351-4a79-accc-0bcb7fee5549'), serialization_method: str | SerializationMethod = SerializationMethod.PICKLE, keep_inputs: bool = True, proxystore_name: str | Dict[str, str] | None = None, proxystore_threshold: int | Dict[str, int] | None = None)

Bases: ColmenaQueues

A basic redis queue for communications used by the task server

A queue is defined by its prefix and a “topic” designation. The full list of available topics is defined when creating the queue, and simplifies writing software that waits for only certain types of messages without needing to manage several “queue” objects. By default, the get() methods for the queue listen on all topics and the put() method pushes to the default topic. You can put messages into certain “topical” queue and wait for responses that are from a single topic.

The queue only connects when the connect method is called to avoid issues with passing an object across processes.

Parameters:
  • hostname (str) – Hostname of the Redis server

  • port (int) – Port on which to access Redis

  • prefix (str) – Name of the Redis queue

  • topics – Names of topics that are known for this queue

  • serialization_method – Method used to serialize task inputs and results

  • keep_inputs – Whether to return task inputs with the result object

  • proxystore_name (str, dict) – Name of ProxyStore backend to use for all topics or a mapping of topic to ProxyStore backend for specifying backends for certain tasks. If a mapping is provided but a topic is not in the mapping, ProxyStore will not be used.

  • proxystore_threshold (int, dict) – Threshold in bytes for using ProxyStore to transfer objects. Optionally can pass a dict mapping topics to threshold to use different threshold values for different topics. None values in the mapping will exclude ProxyStore use with that topic.

connect()

Connect to the Redis server

disconnect()

Disconnect from the server

Useful if sending the connection object to another process

flush(*args, **kwargs) Any

Remove all existing results from the queues

property is_connected

colmena.queue.base

Base classes for queues and related functions

class colmena.queue.base.ColmenaQueues(topics: Collection[str], serialization_method: str | SerializationMethod = SerializationMethod.JSON, keep_inputs: bool = True, proxystore_name: str | Dict[str, str] | None = None, proxystore_threshold: int | Dict[str, int] | None = None)

Bases: object

Base class for a queue used in Colmena.

Follows the basic get and put semantics of most queues, with the addition of a “topic” used by Colmena to separate task requests or objects used for different purposes.

Parameters:
  • topics – Names of topics that are known for this queue

  • serialization_method – Method used to serialize task inputs and results

  • keep_inputs – Whether to return task inputs with the result object

  • proxystore_name (str, dict) – Name of a registered ProxyStore Store instance. This can be a single name such that the corresponding Store is used for all topics or a mapping of topics to registered Store names. If a mapping is provided but a topic is not in the mapping, ProxyStore will not be used.

  • proxystore_threshold (int, dict) – Threshold in bytes for using ProxyStore to transfer objects. Optionally can pass a dict mapping topics to threshold to use different threshold values for different topics. None values in the mapping will exclude ProxyStore use with that topic.

property active_count: int

Number of active tasks

abstract flush()

Remove all existing results from the queues

get_result(topic: str = 'default', timeout: float | None = None) Result | None

Get a completed result

Parameters:
  • topic – Which topic of task to wait for

  • timeout – Timeout for waiting for a value

Returns:

(Result) Result from a computation

Raises:

TimeoutException if the timeout is met

get_task(timeout: float | None = None) Tuple[str, Result]

Get a task object

Parameters:

timeout (float) – Timeout for waiting for a task

Returns:

  • (str) Topic of the calculation. Used in defining which queue to use to send the results

  • (Result) Task description

Raises:
send_inputs(*input_args: Any, method: str | None = None, input_kwargs: Dict[str, Any] | None = None, keep_inputs: bool | None = None, resources: ResourceRequirements | dict | None = None, topic: str = 'default', task_info: Dict[str, Any] | None = None) str

Send a task request

Parameters:
  • *input_args (Any) – Positional arguments to a function

  • method (str) – Name of the method to run. Optional

  • input_kwargs (dict) – Any keyword arguments for the function being run

  • keep_inputs (bool) – Whether to override the

  • topic (str) – Topic for the queue, which sets the topic for the result

  • resources – Suggestions for how many resources to use for the task

  • task_info (dict) – Any information used for task tracking

Returns:

Task ID

send_kill_signal()

Send the kill signal to the task server

send_result(result: Result)

Send a value to a client

Parameters:
  • result (Result) – Result object to communicate back

  • topic (str) – Topic of the calculation

set_role(role: QueueRole | str)

Define the role of this queue.

Controls whether users will be warned away from performing actions that are disallowed by a certain queue role, such as sending results from a client or issuing requests from a server

wait_until_done(timeout: float | None = None)

Wait until all out-going tasks have completed

Returns:

Whether the event was set within the timeout

class colmena.queue.base.QueueRole(value)

Bases: str, Enum

Role a queue is used for

ANY = 'any'
CLIENT = 'client'
SERVER = 'server'