colmena.thinker
Base classes for ‘thinking’ applications that respond to tasks completing
- class colmena.thinker.BaseThinker(queue: ColmenaQueues, resource_counter: ResourceCounter | None = None, daemon: bool = True, logger_name: str | None = None, **kwargs)
Bases:
Thread
Base class for dataflow program that steers a Colmena application
The intent of this class is to simplify writing an dataflow programs using Colmena. When implementing a subclass, write each operation in the program as class method. Each method should take no inputs and produce no output, and could be thought of as an “operation” or “agent” that will run as a thread.
Each agent communicates with others via queues or other threading objects and the Colmena task server via the
ClientQueues
. The only communication method available by default is a class attribute nameddone
that is used to signal that the program should terminate.Denote each of these agents with the
agent()
decorator, as in:The decorator will tell Colmena to launch that method as a separate thread when the “Thinker” thread is started. Colmena will also create a distinct logger for each of the agents to that is accessible as the
logger()
property.Start the thinker by calling
.start()
- Parameters:
queue – Queue wrapper used to communicate with task server
resource_counter – Utility to used track resource utilization
daemon – Whether to launch this as a daemon thread
logger_name – An optional name to give to the root logger for this thinker
**kwargs – Options passed to
Thread
- property agent_name
Name of the agent
- classmethod list_agents() List[Callable]
List all functions that map to operations within the thinker application
- Returns:
List of methods that define agent threads
- make_logger(name: str | None = None)
Make a sub-logger for our application
- Parameters:
name – Name to use for the sub-logger
- Returns:
Logger with an appropriate name
- prepare_agent()
Logic ran before launching an agent.
Override to define how to set up an agent. Consider using
local_details()
to store any agent-specific objects
- run()
Launch all operation threads and wait until all complete
Sets the
done
flag when a thread completes, then waits for all other flags to finish.Does not raise exceptions if a thread exits with an exception. Exception and traceback information are printed using logging at the
WARNING
level.
- tear_down_agent()
Logic ran after an agent completes.
Override to define any tear down logic.
- colmena.thinker.agent(func: Callable | None = None, startup: bool = False)
Decorator that denotes a function as an “agent” thread that is launched when a Thinker process is started
- Parameters:
func – Do not directly pass this variable. It is used as an argument to the decorator
startup – Whether this agent exiting normally should trigger other agents to stop. All agents will still stop if it exits with an exception
- colmena.thinker.event_responder(func: Callable | None = None, event_name: str | None = None, reallocate_resources: bool = False, gather_from: str | None = None, gather_to: str | None = None, disperse_to: str | None = None, max_slots: int | str | None = None, slot_step: int = 1)
Decorator that builds agents which respond to an event being set.
The Thinker associated with this agent must have a class attribute that is an
Event
with the same name asevent_name
. The agent will run once the event is set and will reset the event once the function completes (i.e.,event.clear
). If more than one agent is started by an event, the event will be reset only after all agents finish.The event responder can launch a thread to acquire resource temporarily. The thread is created if you set
reallocate_resources=True
in the decorator and transfers resources to a specific pool until the decorated function completes or a user-defined resource cap is set. You must configure from where these resources are acquired, in which resource pool they are placed, and where they are re-allocated after the thread completes. The thread will allocate up to the maximum number of slots defined and then reallocate all slots available to that pool to the designated resource.- Parameters:
func – Do not directly pass this variable. It is used as an argument to the decorator
event_name – Name of the event to watch
reallocate_resources – Whether to re-allocate resources while function is running
gather_from – Name of a resource pool from which to acquire resources
gather_to – Name of the resource pool to place re-allocated resources
disperse_to – Name of the resource pool to move resources to after function completes
max_slots – Maximum number of resource slots to acquire. Can be an integer, the name of a class attribute of the thinker, or ‘none’ it no maximum is needed
slot_step – Number of slots to acquire per request
- colmena.thinker.result_processor(func: Callable | None = None, topic: str = 'default')
Decorator that builds agents which respond to results becoming available in a queue
Decorated functions must take a single argument: a result object
- Parameters:
func – Do not directly pass this variable. It is used as an argument to the decorator
topic – Topic of the queue to pull from
- colmena.thinker.task_submitter(func: Callable | None = None, task_type: str | None = None, n_slots: int | str = 1)
Decorator that builds agents which respond to computing resources becoming available
Decorated functions should assume that resources are available and reserved when the function is called
- Parameters:
func – Do not directly pass this variable. It is used as an argument to the decorator
task_type – Name of task pool from which to request resources
n_slots – Number of resources to request. Must be either an integer or the name of a class attribute
colmena.thinker.resources
Utilities for tracking resources
- class colmena.thinker.resources.ReallocatorThread(resource_counter: ResourceCounter, gather_from: str | None, gather_to: str | None, disperse_to: str | None, max_slots: int | None = None, stop_event: Event | None = None, slot_step: int = 1, logger_name: str | None = None)
Bases:
Thread
Thread that reallocates resources until an event is set.
Create a thread by defining the procedure the thread should follow for reallocation (e.g., from where to gather resources, where to store them, where to put them when done).
The resource allocation thread is stopped by calling
obj.stop_event.set()
. Note that you can provide an Event object to the initializer to use instead of thestop_event
attribute.Runs as a daemon thread.
- Parameters:
resource_counter – Resource counter used to track resources
stop_event – Event which controls when the thread should give resources back. If unset, a new Event is created.
logger_name – Name of the logger, if desired
gather_from – Name of a resource pool from which to acquire resources
gather_to – Name of the resource pool to place re-allocated resources
disperse_to – Name of the resource pool to move resources to after function completes
max_slots – Maximum number of resource slots to acquire
slot_step – Number of slots to acquire per request
- run() None
Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
- class colmena.thinker.resources.ResourceCounter(total_slots: int, task_types: List[str] = ())
Bases:
object
Utility class for keeping track of resources available for different tasks.
The class manages two pieces of state: the amount of resources allocated to a certain task, and the amount of resources that are currently available for that task. Users of this class can change either state using a series of thread-safe methods.
Tracking Allocations: The resource counter is initialized with a certain count of resources, which represent the total number of a certain computing device available (e.g., node, GPU). They all begin as “unallocated” for any task.
Users change the amount of resources dedicated to tasks by “reallocating” them from one task to another. The
reallocate()
method achieves this by requesting a certain number of units from one task and adding them to a second task’s available resources once those units are marked as available.Tracking Utilization: The amount of resources in use for a certain task is tracked by an internal counter. Users of this class request the use a certain number of resources by calling the
acquire()
method. The method blocks until either the request is completely fulfilled (i.e., the specified amount of resources are marked as available) or the operation times out.Resources are marked as available again using the
release()
method. The release method marks those resources as available to be re-used for other tasks of the same type. Resources must be re-allocated usingreallocate()
.Implementation: All of the operations described above are thread-safe. Resource utilization is tracked using a semaphore so that threads can acquire and release resources simultaneously. Resources are acquired as first-come-first-served by using a lock to control access to the “acquire” function of the resource utilization semaphore.
- Parameters:
total_slots – Total number of nodes available to the resources
task_types – Names of task types
- acquire(task: str | None, n_slots: int, timeout: float = -1.0, cancel_if: Event | None = None) bool
Request a certain number of nodes for a particular task
Draws only from the pool of nodes allocated to this task
Blocks until the request completes
- Parameters:
task – Name of the task
n_slots – Number of slots to request
timeout – Maximum time to wait for the request to be filled
cancel_if – Cancel the request if this event happens
- Returns:
Whether the request was fulfilled
- allocated_slots(task: str) int
Number of slots allocated to a certain task
- Parameters:
task – Name of the task
- available_slots(task: str | None) int
Get the number of nodes available for a certain task
- Parameters:
task – Name of the task
- Returns:
Number of slots available for that task
- reallocate(task_from: str | None, task_to: str | None, n_slots: int | str, block: bool = True, callback: Callable[[], Any] | None = None, timeout: float = -1, cancel_if: Event | None = None) bool
Transfer computer resources from one task to another
Blocks until complete, unless
block
is set toFalse
- Parameters:
task_from – Which task to pull resources from (None to request un-allocated nodes)
task_to – Which task to add resources to (None to de-allocate nodes)
n_slots – Number of nodes to request. Set to “all” to reallocate all slots (all allocated slots, not just all available slots)
block – Whether to block until the tasks completes
callback – Callback function. Only used if the call is non-blocking
timeout – Maximum time to wait for the request to be filled
cancel_if – Cancel the request if this event happens
- Returns:
Whether request was fulfilled. Always
True
ifblock==False
- release(task: str | None = None, n_slots: int = 1, rerequest: bool = False, timeout: float = -1) bool | None
Register that nodes for a particular task are available and, optionally, re-request those nodes for the same task.
Blocks until the task request completes
- Parameters:
task – Name of the task
n_slots – Number of slots to mark as available
rerequest – Whether to re-request nodes immediately after releasing them
timeout – Maximum time to wait for the request to be filled
- Returns:
Whether the re-request was fulfilled