Advanced Features for Thinkers
The core of a Colmena application is a “thinker” process that controls the use of computational resources by submitting new tasks in response to new data being acquired. This portion of the guide delves into the advanced features for building a “Thinker” application and builds the steps described in the previous page of the guide.
Configuring General Agents
Agent threads in Colmena take a few different configuration options.
For example, the startup
keyword argument means that the self.done
event will not
be set when this agent completes.
See agent()
for more details.
Setup and Teardown Logic
Some agents require expensive operations that only need run once per application or ensure that resources are cleaned up after completion. For example, some may connect to a database to store results persistently between runs of an application.
Override the prepare_agent()
and
tear_down_agent()
to define these methods,
and remember to use thread-local storage as this function
is run by every agent.
Special-Purpose Agents
There are a few common patterns of agents within Colmena, such as agents that wait for results to become available in a queue. We provide decorators that simplify creating agents for such tasks.
The reallocation example application demonstrates all three of these agent types.
Result Processing Agents
The colmena.thinker.result_processor()
is for agents that respond to results becoming available.
It takes a single argument that defines which topic queue to be associated with and
must decorate a function that takes Result object as an input.
class Thinker(BaseThinker):
@result_processor(topic='simulation')
def process(self, result: Result):
self.database.append(result)
The above example runs the process
function whenever a complete task with a “simulation” topic is received.
Task Submission Agents
Task submission agents execute a function as soon as resources are available. The agent runs a decorated function once resources are acquired from a certain resource pool. Task submission agents are often paired with a result processor that receives the result and marks resources as available once a task completes.
class Thinker(BaseThinker):
@task_submitter(task_type="sim", n_slots=1)
def submit(self):
task = self.queue.pop(0)
self.queues.send_inputs(task, method='simulate', topic='simulation')
The above function submits a task from the front of a task queue once 1 slot is available from the “sim” resource pool.
Event Responder Agents
The colmena.thinker.event_responder()
runs a function when a certain event is triggered.
The event responder agents can be configured to request resources in a background thread that are
then deallocated after the function completes.
class Thinker(BaseThinker):
@event_responder(event_name='retrain_now', reallocate_resources=True,
gather_from="sim", gather_to="ml", disperse_to="sim", max_slots=1)
def reorder(self):
# Submit a task to re-order task queue given
self.rec.allocate('ml', 1) # Blocks until resources are free
self.queues.send_inputs(self.database, self.queue, method='reorder', topic='plan')
# Wait for task to complete
result = self.queues.get_result(topic='plan')
self.rec.release('ml', 1) # Mark that resources are unneeded
# Store the new task queue
self.queue = result.value
The above example performs a task to reorder the task queue when the retrain_now
event is set.
Colmena will automatically re-allocate resources from simulation to machine learning when the event
is set and then re-allocate them back to simulation after the function completes.
The Thinker class will also reset the flag once all functions triggered by the event complete.