colmena.task_server
Implementations of the task server
colmena.task_server.parsl
Parsl task server and related utilities
- class colmena.task_server.parsl.ParslTaskServer(methods: List[Callable | Tuple[Callable, Dict]], queues: ColmenaQueues, config: Config, timeout: int | None = None, default_executors: str | List[str] = 'all')
Bases:
FutureBasedTaskServer
Task server based on Parsl
Create a Parsl task server by first creating a resource configuration following the recommendations in the Parsl documentation. Then instantiate a task server with a list of Python functions, configurations defining on which Parsl executors each function can run, and the Parsl resource configuration. The executor(s) for each function can be defined with a combination of per method specifications
ParslTaskServer([(f, {'executors': ['a']})], queue, config)
and also using a default executor
ParslTaskServer([f], queue, config, default_executors=['a'])
Further configuration options for each method can be defined in the list of methods.
Technical Details
The task server stores each of the supplied methods as Parsl “Apps”. Tasks are launched on remote workers by calling these Apps, and results are placed in the result queue by callbacks attached the resultant Parsl Futures.
The behavior of an
ExecutableTask
involves several Apps and callbacks.A
PythonApp
to invoke the “preprocessing” function that is given theResult
.The app produces a path to a temporary run directory containing the input files, content for the standard input of the executable, and an updated copy of the
Result
object containing timing information.Note that the
Result
object returned by this app lacks the inputs to reduce communication costs.Once complete (successfully or not), it invokes a callback which launches the next two tasks and creates the next callback. In the even of an unsuccessful execution, the callback function returns the failure information to the client and exits.
A
BashApp
to run the executable that is given the path to the run directory and the list of resources required for executing the task.There is no callback for app.
A
PythonApp
to store the results of the execution that is given the exit code of the executable (should be 0), a copy of theResult
object produced by the preprocessing, the path to the run directory, and a serialized version of the inputs to the app.The application parses the outputs from the execution, stores them in the
Result
object, and then serializes results for transmission back to the client. The application also re-inserts the inputs if they are required to be sent back to the client.The callback for this function submits the outputs, if successful, or any failure information, if not, to the result queue.
Every one of the Apps is run on the remote system as they may involve manipulating files on the remote system.
- Parameters:
methods (list) – List of methods to be served. Each element in the list is either a function or a tuple where the first element is a function and the second is a dictionary of the arguments being used to create the Parsl ParslApp see Parsl documentation.
queues – Queues for the task server
config – Parsl configuration
timeout (int) – Timeout, if desired
default_executors – Executor or list of executors to use by default.
colmena.task_server.globus
Task server based on Globus Compute
Globus Compute provides the ability to execute functions on remote “endpoints” that provide access to computational resources (e.g., cloud providers, HPC). Tasks and results are communicated to/from the endpoint through a cloud service secured using Globus Auth.
- class colmena.task_server.globus.GlobusComputeTaskServer(methods: Dict[Callable, str], funcx_client: Client, queues: PipeQueues, timeout: int | None = None, batch_size: int = 128)
Bases:
FutureBasedTaskServer
Task server that uses Globus Compute to execute tasks on remote systems
Create a task server by providing a dictionary of functions mapped to the endpoint on which each should run. The task server will wrap the provided function in an interface that tracks execution information (e.g., runtime) and registers the wrapped function with Globus Compute. You must also provide a Globus Compute
Client
that the task server will use to authenticate with the web service.The task server works using Globus Compute’s
Executor
to communicate to the web service over a web socket. The functions used by the executor are registered when you create the task server, and the Executor is launched when you start the task server.- Parameters:
methods – Map of functions to the endpoint on which it will run
funcx_client – Authenticated Globus Compute client
queues – Queues used to communicate with thinker
timeout – Timeout for requests from the task queue
batch_size – Maximum number of task request to receive before submitting
colmena.task_server.local
Use Python’s Executor
to run workers on a local system
- class colmena.task_server.local.LocalTaskServer(queues: ColmenaQueues, methods: Collection[Callable | ColmenaMethod], threads: bool = True, num_workers: int | None = None)
Bases:
FutureBasedTaskServer
Use Python’s native concurrent libraries to execute tasks
- Parameters:
methods – Methods to be served
queues – Queues used to commmunicate with thinker
threads – Use threads instead of workers
num_workers – Number of workers to deploy.
colmena.task_server.base
Base classes for the Task Server and associated functions
- class colmena.task_server.base.BaseTaskServer(queues: ColmenaQueues, method_names: Collection[str], timeout: int | None = None)
Bases:
Process
Abstract class for the Colmena Task Server, which manages the execution of tasks
Start the task server by first instantiating it and then calling
start()
to launch the server in a separate process. Clients submit task requests to the server by pushing them to a Redis queue, and then receive results from a second queue.The task server can be stopped by pushing a
None
to the task queue, signaling that no new tasks will be incoming. The remaining tasks will continue to be pushed to the output queue.## Implementing a Task Server
Different implementations vary in how the queue is processed.
Each implementation must provide the
process_queue()
function is responsible for executing tasks supplied on the tasks queue and ensuring completed results are written back to the result queue on completion. Tasks must first be wrapped in therun_and_record_timing()
decorator function to capture the runtime information.Implementations should also provide a _cleanup function that releases any resources reserved by the task server.
- Parameters:
queues (TaskServerQueues) – Queues for the task server
timeout (int) – Timeout for reading from the task queue, if desired
- listen_and_launch()
- class colmena.task_server.base.FutureBasedTaskServer(queues: ColmenaQueues, method_names: Collection[str], timeout: int | None = None)
Bases:
BaseTaskServer
Base class for workflow engines that use Python’s native Future object
Implementations need to specify a function,
_submit()
, that creates the Future and FutureBasedTaskServer’s implementation ofprocess_queue()
will add a callback to submit the output to the result queue. Note that implementations are still responsible for adding therun_and_record_timing()
decorator.- Parameters:
queues (TaskServerQueues) – Queues for the task server
timeout (int) – Timeout for reading from the task queue, if desired
- colmena.task_server.base.convert_to_colmena_method(function: Callable | ColmenaMethod) ColmenaMethod
Wrap user-supplified functions in the task model wrapper, if needed
- Parameters:
function – User-provided function
- Returns:
Function as appropriate subclasses of Colmena Task wrapper