Implementing a Task Runner

Practically speaking, a task runner is a continuously running process that listens for new tasks to execute. More than one such task runner processes can be started to execute tasks concurrently. Once a task runner receives a task, it executes it and then report the results back to the workflow orchestrator. The task runner also handles any errors that occur during task execution, and reports these back to the orchestrator as well.

In order for a task to be executed, there must at least be one task runner running and available to execute the task. Otherwise, tasks remain in a queue until an eligible task runner becomes available.

Implementing a task runner consists of the following steps:

1

Connecting to the Tilebox Workflows API

Instantiate a client connected to the Tilebox Workflows API.

2

Selecting a cluster

Select or create a cluster and then specify it’s slug when creating a task runner.

3

Registering tasks

To register tasks, you need to specify the task classes that the task runner can execute as a list to the runner method.

4

Listening for new tasks

To listen for new tasks until the task runner process is shut down call the run_forever method of the task runner.

A simple example of those steps put together might look like this:

To now start the task runner locally, simply run it as a Python script:

> python task_runner.py

Task Selection

In order for a task runner to pick up a submitted task, there are certain conditions that must be met:

  1. The cluster that a task was submitted to must match the task runner’s cluster
  2. The task runner must have a registered task matching the task identifier of the submitted task
  3. The version of the task runner’s registered task must be compatible with the submitted task’s version

If there is a task that meets these conditions, the task runner picks it up and executes it. Otherwise, the task runner idles until a matching task is submitted.

Often times there are many submitted tasks that match the conditions for a task runner to execute. In this case, the task runner picks one of the tasks to execute, and the remaining tasks remain in a queue until the selected task is executed or another task runner becomes available to execute them.

Parallelism

Many instances of a task runners can be started in parallel to execute tasks concurrently. Each task runner listens for new tasks to execute, and executes tasks as they become available. This allows for a high degree of parallelism, and can be used to scale the execution of tasks to handle large workloads.

For testing this out, you can run an arbitrary number of task runners in parallel on your local machine by running the task runner script in different terminal windows. Or you can use a tool such as call-in-parallel to start the task runner script many times.

For example, to start 5 task runners in parallel, you can use the following command:

> call-in-parallel -n 5 -- python task_runner.py

Deploying Task Runners

Since task runners are continuously running processes, they can be deployed in different compute environments. The only prerequisite for deploying task runners is that they have access to the Tilebox Workflows API, and that they are configured to join a specific cluster. Once this is met, task runners can be deployed in pretty much any compute environment, including:

  • On-premise servers
  • cloud-based virtual machines
  • cloud-based auto-scaling clusters

Scaling

One of the key benefits of task runners is their ability to scale, even while workflows are already running. This means that even tough a workflow is already being executed, new task runners can be started anytime and immediately begin picking up queued tasks of the existing workflow to execute. It’s not necessary for a whole processing cluster to be available at the start of a workflow, as task runners can be started and stopped at any time.

This is especially useful in cloud-based environments, where task runners can be started and stopped automatically based on the current workload, determined by metrics such as CPU usage. That scenario might look like this:

  1. A single instance of a task runner is running in a cloud-based environment, and it’s currently idling, waiting for new tasks to execute.
  2. A large workload is submitted to the workflow orchestrator, and the task runner picks up the first task to execute.
  3. The first task results in new sub-tasks being submitted to the orchestrator, and the task runner picks up these sub-tasks to execute as well.
  4. Due to the large workload, the CPU usage of the task runner increases, and new instances of the task runner are automatically started by the cloud-based environment.
  5. Newly started task runners immediately begin picking up queued tasks to execute, and the workload is distributed between all available task runners.
  6. As the workload decreases, the CPU usage of the task runners decreases, and the cloud-based environment automatically stops task runners again.
  7. The first task runner continues to execute the remaining tasks until the workload is complete.
  8. The first task runner idles until new tasks are submitted to the orchestrator.

CPU usage based scaling is just one example of how task runners can be scaled. Other metrics, such as memory usage or network bandwidth are also supported out of the box by cloud-based environments. Additionally, in a future release, a configuration option to scale task runners based on custom metrics, such as the number of tasks in the queue, or the number of tasks executed per second is planned.

Distributed Execution

Another benefit of task runners is their ability to be distributed across different compute environments. Imagine a scenario where some data is stored on-premise, and other data is available in a cloud environment. A workflow could now consist of tasks that do some pre-processing of the on-premise data, filtering irrelevant data points, and then publishing the data to the cloud. The workflow could then continue in the cloud, where the data is processed further and combined with other data sources. In this scenario, task runners could be deployed in both the on-premise environment and the cloud environment, and they would be able to pick up tasks from the same workflow, even though they are running in different environments.

Another scenario where distributed task runners are useful is when a workflow requires specific hardware for certain tasks. For example, a workflow might require a task to be executed on a machine with a GPU, and another task to be executed on a machine with a large amount of memory.

Below is an example of a distributed workflow:

To achieve a distributed execution of this workflow, no single task runner capable of executing all three of the tasks is set up. Instead, two task runners, each capable of executing one of the tasks are set up. The first task runner is set up in an environment with a good network connection, and the second task runner is set up in an environment with access to a GPU. When the workflow is executed, the first task runner picks up the DownloadData task, and the second task runner picks up the ProcessData task. The DistributedWorkflow does not have any specific hardware requirements, so it can be registered for both runners, and it gets executed by either one of them.

Now, both download_task_runner.py and gpu_task_runner.pyare started, in parallel, on different machines with the required hardware for each. As soon as DistributedWorkflow is submitted, it runs on either of the task runners, and the sub-tasks are executed by the appropriate task runner.

In this example, since ProcessData depends on DownloadData, the GPU task runner idles until the download task is completed, and then pick up the process task.

You could also differentiate between the task runners by specifying different clusters, and then specifying explicit clusters to run on when submitting sub-tasks. To learn more about this, and the use case for clusters, head over to the Clusters section.

Task Failures

If an uncaught exception occurs during task execution, the task runner catches the exception and reports it back to the workflow orchestrator. The orchestrator then marks the task as failed, which results in a job cancellation. This is to prevent the execution of further tasks in the job, which may not be relevant anymore, due to the failure of the task.

But a task failure does not mean that all the work done by the job so far is lost. If the failure is fixable, by either fixing a bug in the task implementation, by providing the task with the necessary resources, or by just retrying it a second time due to a flaky network connection, it may make sense to retry the job.

Retrying a job adds all failed tasks back to the queue, and a task runner may pick them up again and attempt to execute them. If task execution then succeeds, the job continue as if nothing happened. If it fails again, the task is again marked as failed, and could be retried again if desired.

In case a failure is fixable by modifying the task implementation, it’s important to fix the bug and deploy the new implementation to the task runners before retrying the job. Otherwise, a task runner with the original, broken implementation may pick up the task again and fail again.

Task idempotency

Since a task may be retried, it’s possible that a task is executed more than once. Depending on where in the execution of the task it failed, it may have already performed some side effects, such as writing to a database, or sending a message to a queue. This is why it’s important to make tasks idempotent. A task is idempotent if it can be executed many times without changing the result beyond an initial successful execution.

One special case of idempotency is the submission of sub-tasks. Sub-tasks are submitted as one at the completion of a task. That means that if a task fails after calling context.submit_subtask, and is then retried it’s assured that it does not have any pending sub-tasks, and they can be safely submitted again.

Runner Crashes

Tilebox Workflows has a built-in mechanism to handle even the case of task runners crashing unexpectedly. When a task runner picks up a task to execute, it sends a heartbeat to the workflow orchestrator periodically while it’s executing the task. If the orchestrator does not receive a heartbeat from the task runner for a certain period of time, the task is marked as failed and retried automatically, for up to 10 times. As a result, another task runner can pick up the task and execute it, and the job continues as if nothing happened.

This handles even cases such as a power outage, a hardware failure or a dropped network connection, where the runner is unable to report to the workflow orchestrator that the current task has now failed. No task ever remains in a running state indefinitely.

Observability

Task runners are continuously running processes, and it’s important to observe their health and performance. Observability can be achieved by collecting and analyzing logs, metrics, and traces from the task runners. Tilebox Workflows provides a set of tools to easily collect and analyze these observability data. To learn how you can configure Task Runners to enable this, head over to the Observability section.