Design
Colmena is a library for building applications that steer ensembles of simulations running on distributed computing resources.
Key Concepts
Applications based on Colmena have two parts: a “Thinker” and “Doer”. The Thinker determines which computations to perform and delegates them to the Doer.
“Thinker”: Planning Agent
The “Thinker” defines the strategy for a computational campaign. The strategy is expressed by a series of “agents” that identify which computations to run and adapt to their results. As demonstrated in our optimization examples, complex strategies are simple if broken into many agents.
“Doer”: Task Server
The “Doer” server accepts tasks specification from the Thinker, deploys tasks on remote services and sends results back to the Thinker. Doers are interfaces to workflow engines, such as Parsl or Globus Compute.
Implementation
The Thinker and Doer from Colmena run as separate Python processes that interact over queues.
Client
The “Thinker” process is a Python program that runs a separate thread for each agent.
Agents are functions that define which computations to run by sending task requests to a task server or reading results from a queue. Results are returned in the order they are completed.
A simple “run a large batch in parallel” can be defined with a single agent:
class Thinker(BaseThinker)
# ...
@agent
def run_batch(self):
# Submit computations
for x in self.to_run:
self.queues.send_inputs(x, method='f')
# Collect results
results = [self.queues.get_result() for _ in range(len(self.to_run))]
# Find best
best_ind = np.argmin([r.value for r in results])
print(f'Best result: {results[best_in].args}')
We provide a Python API for the message format, Result
,
which provides utility operations for tasks that include accessing the positional
or keyword arguments for a task and serializing the inputs and results.
Task Server
We support Task Servers that use different workflow engines, but all follow the same pattern. Each are defined by registering computations (often expressed as Python functions) to be run, a set of available computational resources, and a queue to communicate with the client.
The best Task Server to start with is Parsl, ParslTaskServer
.
Having it run tasks locally can be achieved by
# Function
def f(x):
return x ** 2 - 3
# Compute configuration
from parsl.configs.htex_local import config
# Communicator
queues = PipeQueues()
# Doer
doer = ParslTaskServer([f], queues, config)
Communication
Task requests and results are communicated between Thinker and Doer via queues. Thinkers submit a task request to one queue and receive results in a second as soon it completes. Users can also denote tasks with a “topic” to separate tasks used by different agents.
The easiest-to-configure queue, PipeQueues
, is based on Python’s multiprocessing Pipes.
Creating it requires no other services or configuration beyond the topics:
queues = PipeQueues(topics=['steer', 'simulate'])
queues.send_inputs(1, method='expensive_func', topic='simulation')
result = queue.get_result(topic='simulation')
Task inputs are serialized using Pickle (we support most Python objects this way), and task information is communicated over queues as JSON-serialized objects.
Other implementations of the queue, such as a Redis-backed version (RedisQueue
)
are available.
Life-Cycle of a Task
We describe the life-cycle of a task to illustrate how all of the components of Colmena work together
by illustrating a typical Result
object.
1{
2 "inputs": [[1, 1], {"operator": "add"}],
3 "serialization_method": "pickle",
4 "method": "reduce",
5 "value": 2,
6 "success": true,
7 "timestamps": {
8 "created": 1593498015.132477,
9 "input_received": 1593498015.13357,
10 "compute_started": 1593498018.856764,
11 "result_sent": 1593498018.858268,
12 "result_received": 1593498018.860002
13 },
14 "time": {
15 "running": 1.8e-05,
16 "serialize_inputs": 4.07e-05,
17 "deserialize_inputs": 4.28-05,
18 "serialize_results": 3.32e-05,
19 "deserialize_results": 3.30e-05
20 }
21}
Launching Tasks: A client creates a task request at timestamp.created
and adds the the input
specification (method
and inputs
) to an “outbound” Redis queue. The task request is formatted
in the JSON format defined above with only the method
, inputs
and timestamp.created
fields
populated. The task inputs are then serialized (time.serialize_inputs
records the execution time)
and passed via the queue to the Task Server.
Task Routing: The task server reads the task request from the outbound queue at timestamp.input_received
and submits the task to the distributed workflow engine.
The method definitions in the task server denote on which resources they can run,
and Parsl chooses when and to which resource to submit tasks.
Computation: A Parsl worker starts a task at timestamp.compute_started
.
The task inputs are deserialized (time.deserialize_inputs
),
the requested work is executed (time.running
),
and the results serialized (time.serialize_results
).
Result Communication: The task server adds the result to the task specification (value
) and
sends it back to the client in an “inbound” queue at (timestamp.result_sent
).
Result Retrieval: The client retrieves the message from the inbound queue.
The result is deserialized (time_deserialize_result
) and returned
back to the client at timestamp.result_received
.
The overall efficiency of the task system can be approximated by comparing the time.running
, which
denotes the actual time spent executing the task on the workers, to the difference between the timestamp.created
and timestamp.result_returned
(i.e., the round-trip time).
The overhead specific to Colmena (i.e., and not Parsl) can be measured by assessing the communication time for each step.
For example, the inbound queue can be assessed by comparing the timestamp.created
and timestamp.input_received
.
The communication times for Parsl can be measured through the differences between
timestamp.inputs_received
and timestamp.compute_started
,
provided the task does not wait for a worker to become available.
The communication times related to serialization are also stored (e.g., time.serialize_result
).