colmena.models
Models used for different aspects of a Colmena application
- class colmena.models.FailureInformation(*, exception: str, traceback: str | None = None)
Bases:
BaseModel
Stores information about a task failure
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- classmethod from_exception(exc: BaseException) FailureInformation
- class colmena.models.ResourceRequirements(*, node_count: int = 1, cpu_processes: int = 1, cpu_threads: int = 1)
Bases:
BaseModel
Resource requirements for tasks. Used by some Colmena backends to allocate resources to the task
Follows the naming conventions of RADICAL-Pilot.
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- class colmena.models.Result(inputs: Tuple[Tuple[Any], Dict[str, Any]], *, task_id: str = None, value: Any = None, method: str | None = None, success: bool | None = None, complete: bool | None = None, task_info: Dict[str, Any] | None = None, resources: ResourceRequirements = None, failure_info: FailureInformation | None = None, worker_info: WorkerInformation | None = None, message_sizes: Dict[str, int] = None, timestamp: Timestamps = None, time: TimeSpans = None, serialization_method: SerializationMethod = SerializationMethod.JSON, keep_inputs: bool = True, proxystore_name: str | None = None, proxystore_config: Dict | None = None, proxystore_threshold: int | None = None, topic: str | None = None)
Bases:
BaseModel
A class which describes the inputs and results of the calculations evaluated by the MethodServer
Each instance of this class stores the inputs and outputs to the function along with some tracking information allowing for performance analysis (e.g., time submitted to Queue, time received by client). All times are listed as Unix timestamps.
The Result class also handles serialization of the data to be transmitted over a RedisQueue
- Parameters:
inputs (Any, Dict) – Inputs to a function. Separated into positional and keyword arguments
- deserialize() float
De-serialize the input and value fields
- Returns:
(float) The time required to deserialize
- failure_info: FailureInformation | None
- classmethod from_args_and_kwargs(fn_args: Sequence[Any], fn_kwargs: Dict[str, Any] | None = None, **kwargs)
Create a result object form a the arguments and kwargs for the function
Keyword arguments to this function are passed to the initializer for Result.
- Parameters:
fn_args – Positional arguments to the function
fn_kwargs – Keyword arguments to the function
- Returns:
Result object with the results object
- json(**kwargs: Dict[str, Any]) str
Override json encoder to use a custom encoder with proxy support
- mark_compute_ended()
Mark when the task finished executing
- mark_compute_started()
Mark that the compute for a method has started
- mark_input_received()
Mark that a task server has received a value
- mark_result_received()
Mark that a completed computation was received by a client
- mark_result_sent()
Mark when a result is sent from the task server
- mark_start_task_submission()
Mark when the Task Server submits a task to the engine
- mark_task_received()
Mark when the Task Server receives the task from the engine
- resources: ResourceRequirements
- serialization_method: SerializationMethod
- serialize() Proxy]]
Stores the input and value fields as a pickled objects
- Returns:
(float) Time to serialize
List of any proxies that were created
- set_result(result: Any, runtime: float = nan, intermediate: bool = False)
Set the value of this computation
Automatically sets the “time_result_completed” field and, if known, defines the runtime.
Will delete the inputs to the function if the user specifies
self.return_inputs == False
. Removing the inputs once the result is known can save communication time- Parameters:
result – Result to be stored
runtime – Runtime for the function
intermediate – If this result is not the final one in a workflow
- time: TimeSpans
- timestamp: Timestamps
- class colmena.models.SerializationMethod(value)
-
Serialization options
- JSON = 'json'
- PICKLE = 'pickle'
- static deserialize(method: SerializationMethod, message: str) Any
Deserialize an object
- Parameters:
method – Method used to serialize
message – Message to deserialize
- Returns:
Result object
- static serialize(method: SerializationMethod, data: Any) str
Serialize an object using a specified method
- Parameters:
method – Method used to serialize the object
data – Object to be serialized
- Returns:
Serialized data
colmena.models.methods
Base classes used by Colmena to describe functions being executed by a workflow engine
- class colmena.models.methods.ColmenaMethod
Base wrapper for a Python function run as part of a Colmena workflow
The wrapper handles the parts of running a Colmena task that are beyond running the function, such as serialization, timing, interfaces to ProxyStore.
- class colmena.models.methods.PythonMethod(function: Callable, name: str | None = None)
A Python function to be executed on a single worker
- Parameters:
function – Generator function to be executed
name – Name of the function. Defaults to function.__name__
- class colmena.models.methods.PythonGeneratorMethod(function: Callable[[...], Generator | Iterable], name: str | None = None, store_return_value: bool = False, streaming_queue: ColmenaQueues | None = None)
Python function which runs on a single worker and generates results iteratively
Generator functions support streaming each iteration of the generator to the Thinker when a streaming_queue is provided.
- Parameters:
function – Generator function to be executed
name – Name of the function. Defaults to function.__name__
store_return_value – Whether to capture the return value of the generator and store it in the Result object.
- class colmena.models.methods.ExecutableMethod(executable: List[str], name: str | None = None, mpi: bool = False, mpi_command_string: str | None = None)
Task that involves running an executable using a system call.
Such tasks often include a “pre-processing” step in Python that prepares inputs for the executable and a “post-processing” step which stores the outputs (either produced from stdout or written to files) as Python objects.
Separating the task into these two functions and a system call for launching the program simplifies development (shorter functions that ar easier to test), and allows some workflow engines to improve performance by running processing and execution tasks separately.
Implement a new ExecutableTask by defining the executable, a preprocessing method (
preprocess()
), and a postprocessing method (postprocess()
).Use the ExecutableTask by instantiating a copy of your new class and then passing it to the task server as you would with any other function.
MPI Executables
Launching an MPI executable requires two parts: a path to an executable and a preamble defining how to launch it. Defining an MPI application using the instructions described above and then set the
mpi
attribute toTrue
. This will tell the Colmena task server to look for a “preamble” for how to launch the application.You may need to supply an MPI command invocation recipe for your particular cluster, depending on your choice of task server. Supply a template as the
mpi_command_string
field, which will be converted by Python’s string format function to produce a version of the command with the specific resource requirements of your task by therender_mpi_launch()
method. The attributes of this class (e.g.,node_count
,total_ranks
) will be used as arguments to format. For example, a template ofaprun -N {total_ranks} -n {cpu_process}
will produceaprun -N 6 -n 3
if you specifynode_count=2
andcpu_processes=3
.- Parameters:
executable – List of executable arguments
name – Name used for the task. Defaults to
executable[0]
mpi – Whether to use MPI to launch the exectuable
mpi_command_string – Template for MPI launcher. See
mpi_command_string
.
- mpi_command_string: str | None = None
Template string defining how to launch this application using MPI. Should include placeholders named after the fields in ResourceRequirements marked using {}’s. Example: mpirun -np {total_ranks}
- render_mpi_launch(resources: ResourceRequirements) str
Create an MPI launch command given the configuration
- Returns:
MPI launch configuration
- preprocess(run_dir: Path, args: Tuple[Any], kwargs: Dict[str, Any]) Tuple[List[str], str | None]
Perform preprocessing steps necessary to prepare for executable to be started.
These may include writing files to the local directory, creating CLI arguments, or standard input to be passed to the executable
- Parameters:
run_dir – Path to a directory in which to write files used by an executable
args – Arguments to the task, control how the run is set up
kwargs – Keyword arguments to the function
- Returns:
Options to be passed as command line arguments to the executable
Values to pass to the standard in of the executable
- execute(run_dir: Path, arguments: List[str], stdin: str | None, resources: ResourceRequirements | None = None) float
Run an executable
- Parameters:
run_dir – Directory in which to execute the code
arguments – Command line arguments
stdin – Content to pass in via standard in
resources – Amount of resources to use for the application
- Returns:
- Return type:
Runtime (unit
- assemble_shell_cmd(arguments: List[str], resources: ResourceRequirements) List[str]
Assemble the shell command to be launched
- Parameters:
arguments – Command line arguments
resources – Resource requirements
- Returns:
Components of the shell command
- postprocess(run_dir: Path) Any
Extract results after execution completes
- Parameters:
run_dir – Run directory for the executable. Stdout will be written to run_dir/colmena.stdout and stderr to run_dir/colmena.stderr
- function(*args, _resources: ResourceRequirements | None = None, **kwargs)
Execute the function
- Parameters:
args – Positional arguments
kwargs – Keyword arguments
_resources – Resources available. Optional. Only used for MPI tasks.