colmena.thinker

Base classes for ‘thinking’ applications that respond to tasks completing

class colmena.thinker.BaseThinker(queue: ColmenaQueues, resource_counter: ResourceCounter | None = None, daemon: bool = True, logger_name: str | None = None, **kwargs)

Bases: Thread

Base class for dataflow program that steers a Colmena application

The intent of this class is to simplify writing an dataflow programs using Colmena. When implementing a subclass, write each operation in the program as class method. Each method should take no inputs and produce no output, and could be thought of as an “operation” or “agent” that will run as a thread.

Each agent communicates with others via queues or other threading objects and the Colmena task server via the ClientQueues. The only communication method available by default is a class attribute named done that is used to signal that the program should terminate.

Denote each of these agents with the agent() decorator, as in:

The decorator will tell Colmena to launch that method as a separate thread when the “Thinker” thread is started. Colmena will also create a distinct logger for each of the agents to that is accessible as the logger() property.

Start the thinker by calling .start()

Parameters:
  • queue – Queue wrapper used to communicate with task server

  • resource_counter – Utility to used track resource utilization

  • daemon – Whether to launch this as a daemon thread

  • logger_name – An optional name to give to the root logger for this thinker

  • **kwargs – Options passed to Thread

property agent_name

Name of the agent

classmethod list_agents() List[Callable]

List all functions that map to operations within the thinker application

Returns:

List of methods that define agent threads

property logger: Logger

Get the logger for the active thread

make_logger(name: str | None = None)

Make a sub-logger for our application

Parameters:

name – Name to use for the sub-logger

Returns:

Logger with an appropriate name

prepare_agent()

Logic ran before launching an agent.

Override to define how to set up an agent. Consider using local_details() to store any agent-specific objects

run()

Launch all operation threads and wait until all complete

Sets the done flag when a thread completes, then waits for all other flags to finish.

Does not raise exceptions if a thread exits with an exception. Exception and traceback information are printed using logging at the WARNING level.

tear_down_agent()

Logic ran after an agent completes.

Override to define any tear down logic.

colmena.thinker.agent(func: Callable | None = None, startup: bool = False)

Decorator that denotes a function as an “agent” thread that is launched when a Thinker process is started

Parameters:
  • func – Do not directly pass this variable. It is used as an argument to the decorator

  • startup – Whether this agent exiting normally should trigger other agents to stop. All agents will still stop if it exits with an exception

colmena.thinker.event_responder(func: Callable | None = None, event_name: str | None = None, reallocate_resources: bool = False, gather_from: str | None = None, gather_to: str | None = None, disperse_to: str | None = None, max_slots: int | str | None = None, slot_step: int = 1)

Decorator that builds agents which respond to an event being set.

The Thinker associated with this agent must have a class attribute that is an Event with the same name as event_name. The agent will run once the event is set and will reset the event once the function completes (i.e., event.clear). If more than one agent is started by an event, the event will be reset only after all agents finish.

The event responder can launch a thread to acquire resource temporarily. The thread is created if you set reallocate_resources=True in the decorator and transfers resources to a specific pool until the decorated function completes or a user-defined resource cap is set. You must configure from where these resources are acquired, in which resource pool they are placed, and where they are re-allocated after the thread completes. The thread will allocate up to the maximum number of slots defined and then reallocate all slots available to that pool to the designated resource.

Parameters:
  • func – Do not directly pass this variable. It is used as an argument to the decorator

  • event_name – Name of the event to watch

  • reallocate_resources – Whether to re-allocate resources while function is running

  • gather_from – Name of a resource pool from which to acquire resources

  • gather_to – Name of the resource pool to place re-allocated resources

  • disperse_to – Name of the resource pool to move resources to after function completes

  • max_slots – Maximum number of resource slots to acquire. Can be an integer, the name of a class attribute of the thinker, or ‘none’ it no maximum is needed

  • slot_step – Number of slots to acquire per request

colmena.thinker.result_processor(func: Callable | None = None, topic: str = 'default')

Decorator that builds agents which respond to results becoming available in a queue

Decorated functions must take a single argument: a result object

Parameters:
  • func – Do not directly pass this variable. It is used as an argument to the decorator

  • topic – Topic of the queue to pull from

colmena.thinker.task_submitter(func: Callable | None = None, task_type: str | None = None, n_slots: int | str = 1)

Decorator that builds agents which respond to computing resources becoming available

Decorated functions should assume that resources are available and reserved when the function is called

Parameters:
  • func – Do not directly pass this variable. It is used as an argument to the decorator

  • task_type – Name of task pool from which to request resources

  • n_slots – Number of resources to request. Must be either an integer or the name of a class attribute

colmena.thinker.resources

Utilities for tracking resources

class colmena.thinker.resources.ReallocatorThread(resource_counter: ResourceCounter, gather_from: str | None, gather_to: str | None, disperse_to: str | None, max_slots: int | None = None, stop_event: Event | None = None, slot_step: int = 1, logger_name: str | None = None)

Bases: Thread

Thread that reallocates resources until an event is set.

Create a thread by defining the procedure the thread should follow for reallocation (e.g., from where to gather resources, where to store them, where to put them when done).

The resource allocation thread is stopped by calling obj.stop_event.set(). Note that you can provide an Event object to the initializer to use instead of the stop_event attribute.

Runs as a daemon thread.

Parameters:
  • resource_counter – Resource counter used to track resources

  • stop_event – Event which controls when the thread should give resources back. If unset, a new Event is created.

  • logger_name – Name of the logger, if desired

  • gather_from – Name of a resource pool from which to acquire resources

  • gather_to – Name of the resource pool to place re-allocated resources

  • disperse_to – Name of the resource pool to move resources to after function completes

  • max_slots – Maximum number of resource slots to acquire

  • slot_step – Number of slots to acquire per request

run() None

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

class colmena.thinker.resources.ResourceCounter(total_slots: int, task_types: List[str] = ())

Bases: object

Utility class for keeping track of resources available for different tasks.

The class manages two pieces of state: the amount of resources allocated to a certain task, and the amount of resources that are currently available for that task. Users of this class can change either state using a series of thread-safe methods.

Tracking Allocations: The resource counter is initialized with a certain count of resources, which represent the total number of a certain computing device available (e.g., node, GPU). They all begin as “unallocated” for any task.

Users change the amount of resources dedicated to tasks by “reallocating” them from one task to another. The reallocate() method achieves this by requesting a certain number of units from one task and adding them to a second task’s available resources once those units are marked as available.

Tracking Utilization: The amount of resources in use for a certain task is tracked by an internal counter. Users of this class request the use a certain number of resources by calling the acquire() method. The method blocks until either the request is completely fulfilled (i.e., the specified amount of resources are marked as available) or the operation times out.

Resources are marked as available again using the release() method. The release method marks those resources as available to be re-used for other tasks of the same type. Resources must be re-allocated using reallocate().

Implementation: All of the operations described above are thread-safe. Resource utilization is tracked using a semaphore so that threads can acquire and release resources simultaneously. Resources are acquired as first-come-first-served by using a lock to control access to the “acquire” function of the resource utilization semaphore.

Parameters:
  • total_slots – Total number of nodes available to the resources

  • task_types – Names of task types

acquire(task: str | None, n_slots: int, timeout: float = -1.0, cancel_if: Event | None = None) bool

Request a certain number of nodes for a particular task

Draws only from the pool of nodes allocated to this task

Blocks until the request completes

Parameters:
  • task – Name of the task

  • n_slots – Number of slots to request

  • timeout – Maximum time to wait for the request to be filled

  • cancel_if – Cancel the request if this event happens

Returns:

Whether the request was fulfilled

allocated_slots(task: str) int

Number of slots allocated to a certain task

Parameters:

task – Name of the task

available_slots(task: str | None) int

Get the number of nodes available for a certain task

Parameters:

task – Name of the task

Returns:

Number of slots available for that task

reallocate(task_from: str | None, task_to: str | None, n_slots: int | str, block: bool = True, callback: Callable[[], Any] | None = None, timeout: float = -1, cancel_if: Event | None = None) bool

Transfer computer resources from one task to another

Blocks until complete, unless block is set to False

Parameters:
  • task_from – Which task to pull resources from (None to request un-allocated nodes)

  • task_to – Which task to add resources to (None to de-allocate nodes)

  • n_slots – Number of nodes to request. Set to “all” to reallocate all slots (all allocated slots, not just all available slots)

  • block – Whether to block until the tasks completes

  • callback – Callback function. Only used if the call is non-blocking

  • timeout – Maximum time to wait for the request to be filled

  • cancel_if – Cancel the request if this event happens

Returns:

Whether request was fulfilled. Always True if block==False

release(task: str | None = None, n_slots: int = 1, rerequest: bool = False, timeout: float = -1) bool | None

Register that nodes for a particular task are available and, optionally, re-request those nodes for the same task.

Blocks until the task request completes

Parameters:
  • task – Name of the task

  • n_slots – Number of slots to mark as available

  • rerequest – Whether to re-request nodes immediately after releasing them

  • timeout – Maximum time to wait for the request to be filled

Returns:

Whether the re-request was fulfilled

property unallocated_slots: int

Number of unallocated slots