Quickstart

A short introduction to building a Colmena app.

For this exercise, our goal is to find a point that minimizes \(f(x) = x^2\) using a simple search algorithm: pick a new point within ±0.5 from minimum so far.

This tutorial will explain our multi-agent-thinker.py demo application.

0. Write functions

Translating our target function, f(x), and search algorithm into Python yields:

def target_function(x: float) -> float:
    return x ** 2


def task_generator(best_to_date: float) -> float:
    from random import random
    return best_to_date + random() - 0.5

1. Define Communication

_images/overview.svg

Colmena applications are split into a “thinker” application generates tasks that are executed on remote resources. The easiest way to connect them is using Python’s native interprocess communication via our PipeQueues:

queues = PipeQueues(keep_inputs=True, topics=['generate', 'simulate'])

This command creates separates queue for simulation and task generation results, and ensures that the task inputs will get sent back to the client with the result.

Using ProxyStore

Colmena can use ProxyStore to efficiently transfer large objects, typically on the order of 100KB or larger, between the thinker and workers directly. Enable ProxyStore by initializing a Store then passing the name (proxystore_name) and threshold size (proxystore_threshold) for the store to make_queue_pairs. Any input/output object of a target function larger than proxystore_threshold will be automatically passed via ProxyStore.

For example, a common use case is to initialize ProxyStore to use a Redis server to communicate data directly to workers

from proxystore.connectors.redis import RedisConnector
from proxystore.store import Store
from proxystore.store import register_store
from colmena.queue import PipeQueues

store = Store('redis', RedisConnector('localhost', 6379))
register_store(store)

queue = PipeQueues(
    proxystore_name='redis',
    proxystore_threshold=100000
)

Any object larger than 100kB will get sent via Redis, reducing the communication costs of your application.

Learn more about ProxyStore and find the “Getting Started” guides at docs.proxystore.dev.

2. Build a task server

The “task server” in Colmena distributes request to run functions across distributed resources. We create one by defining a list of functions and the resources to run them across.

Colmena uses Parsl to manage executing tasks. Parsl can scale to 1000s of parallel workers and run on HPC or cloud, but we will define it to only use up to 4 processes on a single machine:

config = Config(executors=[HighThroughputExecutor(max_workers=4)])

Build a task server by providing a list of methods and resources:

doer = ParslTaskServer([target_function, task_generator], queues, config)

3. Write the thinker

Colmena provides a “BaseThinker” class to create steering applications. These applications run multiple operations (called agents) that send tasks and receive results from the task server.

Our example thinker has two agents that each are class methods marked with the @agent decorator:

class Thinker(BaseThinker):

    def __init__(self, queue):
        super().__init__(queue)
        self.remaining_guesses = 10
        self.parallel_guesses = 4
        self.best_guess = 10
        self.best_result = inf

    @agent
    def consumer(self):
        for _ in range(self.remaining_guesses):
            # Update the current guess with the
            result = self.queues.get_result(topic='simulate')
            if result.value < self.best_result:
                self.best_result = result.value
                self.best_guess = result.args[0]

    @agent
    def producer(self):
        while not self.done.is_set():
            # Make a new guess
            self.queues.send_inputs(self.best_guess, method='task_generator', topic='generate')

            # Get the result, push new task to queue
            result = self.queues.get_result(topic='generate')
            self.logger.info(f'Created a new guess: {result.value:.2f}')
            self.queues.send_inputs(result.value, method='target_function', topic='simulate')

“Producer” creates new tasks by calling the “task_generator” method (defined with the task server) and then using that new task as input to the “target_function.”

“Consumer” retrieves completed tasks and determines whether to update the best result so far.

A few things to note:

  1. Tasks are run as threads and share class attributes (e.g., self.best_guess)

  2. The queue takes arguments, method name and topic name as inputs to send a task

  3. The self.done attribute tracks if any thread has completed.

  4. The thinker class provides a logger: self.logger

4. Launching the application

The task server and thinker objects are run asynchronously. Accordingly, we call their .start() methods to launch them.

try:
    # Launch the servers
    doer.start()
    thinker.start()
    logging.info('Launched the servers')

    # Wait for the task generator to complete
    thinker.join()
    logging.info('Task generator has completed')
finally:
    queues.send_kill_signal()

# Wait for the task server to complete
doer.join()

5. Running the application

Launch the Colmena application by running it with Python: python multi-agent-thinker.py

The application will produce log messages from many components, including:

  1. Log items from the thinker that mark the agent which wrote them:

    ... - thinker.producer - INFO - Created a new guess: 9.51

  2. Messages from the Colmena queue or task server

    ... - colmena.queue.base - INFO - Client received a task_generator result with topic generate`

  3. Parsl workflow engine status messages

    ... - parsl.dataflow.dflow - INFO - Task 45 completed

6. Learning more

We recommend reading more from our how-to guide next. With that knowledge in hand, try improving the optimization algorithm from this example.