colmena.models

Models used for different aspects of a Colmena application

class colmena.models.FailureInformation(*, exception: str, traceback: str | None = None)

Bases: BaseModel

Stores information about a task failure

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

exception: str
classmethod from_exception(exc: BaseException) FailureInformation
traceback: str | None
class colmena.models.ResourceRequirements(*, node_count: int = 1, cpu_processes: int = 1, cpu_threads: int = 1)

Bases: BaseModel

Resource requirements for tasks. Used by some Colmena backends to allocate resources to the task

Follows the naming conventions of RADICAL-Pilot.

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

cpu_processes: int
cpu_threads: int
node_count: int
property total_ranks: int

Total number of MPI ranks

class colmena.models.Result(inputs: Tuple[Tuple[Any], Dict[str, Any]], *, task_id: str = None, value: Any = None, method: str | None = None, success: bool | None = None, complete: bool | None = None, task_info: Dict[str, Any] | None = None, resources: ResourceRequirements = None, failure_info: FailureInformation | None = None, worker_info: WorkerInformation | None = None, message_sizes: Dict[str, int] = None, timestamp: Timestamps = None, time: TimeSpans = None, serialization_method: SerializationMethod = SerializationMethod.JSON, keep_inputs: bool = True, proxystore_name: str | None = None, proxystore_config: Dict | None = None, proxystore_threshold: int | None = None, topic: str | None = None)

Bases: BaseModel

A class which describes the inputs and results of the calculations evaluated by the MethodServer

Each instance of this class stores the inputs and outputs to the function along with some tracking information allowing for performance analysis (e.g., time submitted to Queue, time received by client). All times are listed as Unix timestamps.

The Result class also handles serialization of the data to be transmitted over a RedisQueue

Parameters:

inputs (Any, Dict) – Inputs to a function. Separated into positional and keyword arguments

property args: Tuple[Any]
complete: bool | None
deserialize() float

De-serialize the input and value fields

Returns:

(float) The time required to deserialize

failure_info: FailureInformation | None
classmethod from_args_and_kwargs(fn_args: Sequence[Any], fn_kwargs: Dict[str, Any] | None = None, **kwargs)

Create a result object form a the arguments and kwargs for the function

Keyword arguments to this function are passed to the initializer for Result.

Parameters:
  • fn_args – Positional arguments to the function

  • fn_kwargs – Keyword arguments to the function

Returns:

Result object with the results object

inputs: Tuple[Tuple[Any, ...], Dict[str, Any]] | str
json(**kwargs: Dict[str, Any]) str

Override json encoder to use a custom encoder with proxy support

keep_inputs: bool
property kwargs: Dict[str, Any]
mark_compute_ended()

Mark when the task finished executing

mark_compute_started()

Mark that the compute for a method has started

mark_input_received()

Mark that a task server has received a value

mark_result_received()

Mark that a completed computation was received by a client

mark_result_sent()

Mark when a result is sent from the task server

mark_start_task_submission()

Mark when the Task Server submits a task to the engine

mark_task_received()

Mark when the Task Server receives the task from the engine

message_sizes: Dict[str, int]
method: str | None
proxystore_config: Dict | None
proxystore_name: str | None
proxystore_threshold: int | None
resources: ResourceRequirements
serialization_method: SerializationMethod
serialize() Proxy]]

Stores the input and value fields as a pickled objects

Returns:

  • (float) Time to serialize

  • List of any proxies that were created

set_result(result: Any, runtime: float = nan, intermediate: bool = False)

Set the value of this computation

Automatically sets the “time_result_completed” field and, if known, defines the runtime.

Will delete the inputs to the function if the user specifies self.return_inputs == False. Removing the inputs once the result is known can save communication time

Parameters:
  • result – Result to be stored

  • runtime – Runtime for the function

  • intermediate – If this result is not the final one in a workflow

success: bool | None
task_id: str
task_info: Dict[str, Any] | None
time: TimeSpans
timestamp: Timestamps
topic: str | None
value: Any
worker_info: WorkerInformation | None
class colmena.models.SerializationMethod(value)

Bases: str, Enum

Serialization options

JSON = 'json'
PICKLE = 'pickle'
static deserialize(method: SerializationMethod, message: str) Any

Deserialize an object

Parameters:
  • method – Method used to serialize

  • message – Message to deserialize

Returns:

Result object

static serialize(method: SerializationMethod, data: Any) str

Serialize an object using a specified method

Parameters:
  • method – Method used to serialize the object

  • data – Object to be serialized

Returns:

Serialized data

colmena.models.methods

Base classes used by Colmena to describe functions being executed by a workflow engine

class colmena.models.methods.ColmenaMethod

Base wrapper for a Python function run as part of a Colmena workflow

The wrapper handles the parts of running a Colmena task that are beyond running the function, such as serialization, timing, interfaces to ProxyStore.

name: str

Name used to identify the function

function(*args, **kwargs) Any

Function provided by the Colmena user

class colmena.models.methods.PythonMethod(function: Callable, name: str | None = None)

A Python function to be executed on a single worker

Parameters:
  • function – Generator function to be executed

  • name – Name of the function. Defaults to function.__name__

class colmena.models.methods.PythonGeneratorMethod(function: Callable[[...], Generator | Iterable], name: str | None = None, store_return_value: bool = False, streaming_queue: ColmenaQueues | None = None)

Python function which runs on a single worker and generates results iteratively

Generator functions support streaming each iteration of the generator to the Thinker when a streaming_queue is provided.

Parameters:
  • function – Generator function to be executed

  • name – Name of the function. Defaults to function.__name__

  • store_return_value – Whether to capture the return value of the generator and store it in the Result object.

stream_result(y: Any, result: Result, start_time: float)

Send an intermediate result using the task queue

Parameters:
  • y – Yielded data from the generator function

  • result – Result package carrying task metadata

  • start_time – Start time of the algorithm, used to report

function(*args, _result: Result, **kwargs) Any

Run the Colmena task and collect intermediate results to provide as a list

class colmena.models.methods.ExecutableMethod(executable: List[str], name: str | None = None, mpi: bool = False, mpi_command_string: str | None = None)

Task that involves running an executable using a system call.

Such tasks often include a “pre-processing” step in Python that prepares inputs for the executable and a “post-processing” step which stores the outputs (either produced from stdout or written to files) as Python objects.

Separating the task into these two functions and a system call for launching the program simplifies development (shorter functions that ar easier to test), and allows some workflow engines to improve performance by running processing and execution tasks separately.

Implement a new ExecutableTask by defining the executable, a preprocessing method (preprocess()), and a postprocessing method (postprocess()).

Use the ExecutableTask by instantiating a copy of your new class and then passing it to the task server as you would with any other function.

MPI Executables

Launching an MPI executable requires two parts: a path to an executable and a preamble defining how to launch it. Defining an MPI application using the instructions described above and then set the mpi attribute to True. This will tell the Colmena task server to look for a “preamble” for how to launch the application.

You may need to supply an MPI command invocation recipe for your particular cluster, depending on your choice of task server. Supply a template as the mpi_command_string field, which will be converted by Python’s string format function to produce a version of the command with the specific resource requirements of your task by the render_mpi_launch() method. The attributes of this class (e.g., node_count, total_ranks) will be used as arguments to format. For example, a template of aprun -N {total_ranks} -n {cpu_process} will produce aprun -N 6 -n 3 if you specify node_count=2 and cpu_processes=3.

Parameters:
  • executable – List of executable arguments

  • name – Name used for the task. Defaults to executable[0]

  • mpi – Whether to use MPI to launch the exectuable

  • mpi_command_string – Template for MPI launcher. See mpi_command_string.

executable: List[str]

Command used to launch the executable

mpi: bool = False

Whether this is an MPI executable

mpi_command_string: str | None = None

Template string defining how to launch this application using MPI. Should include placeholders named after the fields in ResourceRequirements marked using {}’s. Example: mpirun -np {total_ranks}

render_mpi_launch(resources: ResourceRequirements) str

Create an MPI launch command given the configuration

Returns:

MPI launch configuration

preprocess(run_dir: Path, args: Tuple[Any], kwargs: Dict[str, Any]) Tuple[List[str], str | None]

Perform preprocessing steps necessary to prepare for executable to be started.

These may include writing files to the local directory, creating CLI arguments, or standard input to be passed to the executable

Parameters:
  • run_dir – Path to a directory in which to write files used by an executable

  • args – Arguments to the task, control how the run is set up

  • kwargs – Keyword arguments to the function

Returns:

  • Options to be passed as command line arguments to the executable

  • Values to pass to the standard in of the executable

execute(run_dir: Path, arguments: List[str], stdin: str | None, resources: ResourceRequirements | None = None) float

Run an executable

Parameters:
  • run_dir – Directory in which to execute the code

  • arguments – Command line arguments

  • stdin – Content to pass in via standard in

  • resources – Amount of resources to use for the application

Returns:

Return type:

Runtime (unit

assemble_shell_cmd(arguments: List[str], resources: ResourceRequirements) List[str]

Assemble the shell command to be launched

Parameters:
  • arguments – Command line arguments

  • resources – Resource requirements

Returns:

Components of the shell command

postprocess(run_dir: Path) Any

Extract results after execution completes

Parameters:

run_dir – Run directory for the executable. Stdout will be written to run_dir/colmena.stdout and stderr to run_dir/colmena.stderr

function(*args, _resources: ResourceRequirements | None = None, **kwargs)

Execute the function

Parameters:
  • args – Positional arguments

  • kwargs – Keyword arguments

  • _resources – Resources available. Optional. Only used for MPI tasks.