colmena.task_server

Implementations of the task server

colmena.task_server.parsl

Parsl task server and related utilities

class colmena.task_server.parsl.ParslTaskServer(methods: List[Callable | Tuple[Callable, Dict]], queues: ColmenaQueues, config: Config, timeout: int | None = None, default_executors: str | List[str] = 'all')

Bases: FutureBasedTaskServer

Task server based on Parsl

Create a Parsl task server by first creating a resource configuration following the recommendations in the Parsl documentation. Then instantiate a task server with a list of Python functions, configurations defining on which Parsl executors each function can run, and the Parsl resource configuration. The executor(s) for each function can be defined with a combination of per method specifications

ParslTaskServer([(f, {'executors': ['a']})], queue, config)

and also using a default executor

ParslTaskServer([f], queue, config, default_executors=['a'])

Further configuration options for each method can be defined in the list of methods.

Technical Details

The task server stores each of the supplied methods as Parsl “Apps”. Tasks are launched on remote workers by calling these Apps, and results are placed in the result queue by callbacks attached the resultant Parsl Futures.

The behavior of an ExecutableTask involves several Apps and callbacks.

  1. A PythonApp to invoke the “preprocessing” function that is given the Result.

    The app produces a path to a temporary run directory containing the input files, content for the standard input of the executable, and an updated copy of the Result object containing timing information.

    Note that the Result object returned by this app lacks the inputs to reduce communication costs.

    Once complete (successfully or not), it invokes a callback which launches the next two tasks and creates the next callback. In the even of an unsuccessful execution, the callback function returns the failure information to the client and exits.

  2. A BashApp to run the executable that is given the path to the run directory and the list of resources required for executing the task.

    There is no callback for app.

  3. A PythonApp to store the results of the execution that is given the exit code of the executable (should be 0), a copy of the Result object produced by the preprocessing, the path to the run directory, and a serialized version of the inputs to the app.

    The application parses the outputs from the execution, stores them in the Result object, and then serializes results for transmission back to the client. The application also re-inserts the inputs if they are required to be sent back to the client.

    The callback for this function submits the outputs, if successful, or any failure information, if not, to the result queue.

Every one of the Apps is run on the remote system as they may involve manipulating files on the remote system.

Parameters:
  • methods (list) – List of methods to be served. Each element in the list is either a function or a tuple where the first element is a function and the second is a dictionary of the arguments being used to create the Parsl ParslApp see Parsl documentation.

  • queues – Queues for the task server

  • config – Parsl configuration

  • timeout (int) – Timeout, if desired

  • default_executors – Executor or list of executors to use by default.

colmena.task_server.globus

Task server based on Globus Compute

Globus Compute provides the ability to execute functions on remote “endpoints” that provide access to computational resources (e.g., cloud providers, HPC). Tasks and results are communicated to/from the endpoint through a cloud service secured using Globus Auth.

class colmena.task_server.globus.GlobusComputeTaskServer(methods: Dict[Callable, str], funcx_client: Client, queues: PipeQueues, timeout: int | None = None, batch_size: int = 128)

Bases: FutureBasedTaskServer

Task server that uses Globus Compute to execute tasks on remote systems

Create a task server by providing a dictionary of functions mapped to the endpoint on which each should run. The task server will wrap the provided function in an interface that tracks execution information (e.g., runtime) and registers the wrapped function with Globus Compute. You must also provide a Globus Compute Client that the task server will use to authenticate with the web service.

The task server works using Globus Compute’s Executor to communicate to the web service over a web socket. The functions used by the executor are registered when you create the task server, and the Executor is launched when you start the task server.

Parameters:
  • methods – Map of functions to the endpoint on which it will run

  • funcx_client – Authenticated Globus Compute client

  • queues – Queues used to communicate with thinker

  • timeout – Timeout for requests from the task queue

  • batch_size – Maximum number of task request to receive before submitting

perform_callback(future: Future, result: Result, topic: str)

Send a completed result back to queue. Used as a callback for complete tasks

Parameters:
  • future – Future for a task

  • result – Initial result object. Used if the future throws an exception

  • topic – Topic used to send back to the user

colmena.task_server.local

Use Python’s Executor to run workers on a local system

class colmena.task_server.local.LocalTaskServer(queues: ColmenaQueues, methods: Collection[Callable | ColmenaMethod], threads: bool = True, num_workers: int | None = None)

Bases: FutureBasedTaskServer

Use Python’s native concurrent libraries to execute tasks

Parameters:
  • methods – Methods to be served

  • queues – Queues used to commmunicate with thinker

  • threads – Use threads instead of workers

  • num_workers – Number of workers to deploy.

colmena.task_server.base

Base classes for the Task Server and associated functions

class colmena.task_server.base.BaseTaskServer(queues: ColmenaQueues, method_names: Collection[str], timeout: int | None = None)

Bases: Process

Abstract class for the Colmena Task Server, which manages the execution of tasks

Start the task server by first instantiating it and then calling start() to launch the server in a separate process. Clients submit task requests to the server by pushing them to a Redis queue, and then receive results from a second queue.

The task server can be stopped by pushing a None to the task queue, signaling that no new tasks will be incoming. The remaining tasks will continue to be pushed to the output queue.

## Implementing a Task Server

Different implementations vary in how the queue is processed.

Each implementation must provide the process_queue() function is responsible for executing tasks supplied on the tasks queue and ensuring completed results are written back to the result queue on completion. Tasks must first be wrapped in the run_and_record_timing() decorator function to capture the runtime information.

Implementations should also provide a _cleanup function that releases any resources reserved by the task server.

Parameters:
  • queues (TaskServerQueues) – Queues for the task server

  • timeout (int) – Timeout for reading from the task queue, if desired

listen_and_launch()
abstract process_queue(topic: str, task: Result)

Execute a single task from the task queue

Parameters:
  • topic – Which task queue this result came from

  • task – Task description

run() None

Launch the thread and start running tasks

Blocks until the inputs queue is closed and all tasks have completed

class colmena.task_server.base.FutureBasedTaskServer(queues: ColmenaQueues, method_names: Collection[str], timeout: int | None = None)

Bases: BaseTaskServer

Base class for workflow engines that use Python’s native Future object

Implementations need to specify a function, _submit(), that creates the Future and FutureBasedTaskServer’s implementation of process_queue() will add a callback to submit the output to the result queue. Note that implementations are still responsible for adding the run_and_record_timing() decorator.

Parameters:
  • queues (TaskServerQueues) – Queues for the task server

  • timeout (int) – Timeout for reading from the task queue, if desired

perform_callback(future: Future, result: Result, topic: str)

Send a completed result back to queue. Used as a callback for complete tasks

Parameters:
  • future – Future for a task

  • result – Initial result object. Used if the future throws an exception

  • topic – Topic used to send back to the user

process_queue(topic: str, task: Result)

Execute a single task from the task queue

Parameters:
  • topic – Which task queue this result came from

  • task – Task description

colmena.task_server.base.convert_to_colmena_method(function: Callable | ColmenaMethod) ColmenaMethod

Wrap user-supplified functions in the task model wrapper, if needed

Parameters:

function – User-provided function

Returns:

Function as appropriate subclasses of Colmena Task wrapper