> ## Documentation Index
> Fetch the complete documentation index at: https://docs.tilebox.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Understanding and Creating Tasks

<Accordion title="What is a Task?">
  A Task is the smallest unit of work, designed to perform a specific operation. Each task represents a distinct operation or process that can be executed, such as processing data, performing calculations, or managing resources. Tasks can operate independently or as components of a more complex set of connected tasks known as a Workflow. Tasks are defined by their code, inputs, and dependencies on other tasks. To create tasks, you need to define the input parameters and specify the action to be performed during execution.
</Accordion>

## Creating a Task

To create a task in Tilebox, define a class that extends the `Task` base class and implements the `execute` method. The `execute` method is the entry point for the task where its logic is defined. It's called when the task is executed.

<CodeGroup>
  ```python Python theme={"system"}
  from tilebox.workflows import Task, ExecutionContext

  class MyFirstTask(Task):
      def execute(self, context: ExecutionContext):
          print("Hello World!")
  ```

  ```go Go theme={"system"}
  type MyFirstTask struct{}

  func (t *MyFirstTask) Execute(ctx context.Context) error {
  	slog.Info("Hello World!")
  	return nil
  }
  ```
</CodeGroup>

This example demonstrates a simple task that prints "Hello World!" to the console.

For python, the key components of this task are:

<AccordionGroup>
  <Accordion title="class MyFirstTask(Task)">
    `MyFirstTask` is a subclass of the `Task` class, which serves as the base class for all defined tasks. It provides the essential structure for a task. Inheriting from `Task` automatically makes the class a `dataclass`, which is useful [for specifying inputs](#input-parameters). Additionally, by inheriting from `Task`, the task is automatically assigned an [identifier based on the class name](#task-identifiers).
  </Accordion>

  <Accordion title="def execute">
    The `execute` method is the entry point for executing the task. This is where the task's logic is defined. It's invoked by a [task runner](/workflows/concepts/task-runners) when the task runs and performs the task's operation.
  </Accordion>

  <Accordion title="context: ExecutionContext">
    The `context` argument is an `ExecutionContext` instance that provides access to an [API for submitting new tasks](/api-reference/python/tilebox.workflows/ExecutionContext.submit_subtask) as part of the same job, [task logging](/api-reference/python/tilebox.workflows/ExecutionContext.logger), [custom tracing](/api-reference/python/tilebox.workflows/ExecutionContext.tracer), and features like [shared caching](/api-reference/python/tilebox.workflows/ExecutionContext.job_cache).
  </Accordion>
</AccordionGroup>

For Go, the key components are:

<AccordionGroup>
  <Accordion title="type MyFirstTask struct{}">
    `MyFirstTask` is a struct that implements the `Task` interface. It represents the task to be executed.
  </Accordion>

  <Accordion title="func (t *MyFirstTask) Execute(ctx context.Context) error">
    The `Execute` method is the entry point for executing the task. This is where the task's logic is defined. It's invoked by a [task runner](/workflows/concepts/task-runners) when the task runs and performs the task's operation.
  </Accordion>
</AccordionGroup>

<Note>
  The code samples on this page do not illustrate how to execute the task. That will be covered in the
  [next section on task runners](/workflows/concepts/task-runners). The reason for that is that executing tasks is a separate concern from implementing tasks.
</Note>

## Input Parameters

Tasks often require input parameters to operate. These inputs can range from simple values to complex data structures. By inheriting from the `Task` class, the task is treated as a Python `dataclass`, allowing input parameters to be defined as class attributes.

<Info>
  Tasks must be **serializable to JSON or to protobuf** because they may be distributed across a cluster of [task runners](/workflows/concepts/task-runners).
</Info>

<Info>
  In Go, task parameters must be exported fields of the task struct (starting with an uppercase letter), otherwise they will not be serialized to JSON.
</Info>

Supported types for input parameters include:

* Basic types such as `str`, `int`, `float`, `bool`
* Lists and dictionaries of basic types
* Nested data classes that are also JSON-serializable or protobuf-serializable

<CodeGroup>
  ```python Python theme={"system"}
  class ParametrizableTask(Task):
      message: str
      number: int
      data: dict[str, str]

      def execute(self, context: ExecutionContext):
          print(self.message * self.number)

  task = ParametrizableTask("Hello", 3, {"key": "value"})
  ```

  ```go Go theme={"system"}
  type ParametrizableTask struct {
    Message string
    Number  int
    Data    map[string]string
  }

  func (t *ParametrizableTask) Execute(context.Context) error {
    slog.Info(strings.Repeat(t.Message, t.Number))
    return nil
  }

  task := &ParametrizableTask{
    message: "Hello",
    number:  3,
    data:    map[string]string{"key": "value"},
  }
  ```
</CodeGroup>

## Task Composition and subtasks

Until now, tasks have performed only a single operation. But tasks can be more powerful. **Tasks can submit other tasks as subtasks.** This allows for a modular workflow design, breaking down complex operations into simpler, manageable parts. Additionally, the execution of subtasks is automatically parallelized whenever possible.

<CodeGroup>
  ```python Python theme={"system"}
  class ParentTask(Task):
      num_subtasks: int

      def execute(self, context: ExecutionContext) -> None:
          for i in range(self.num_subtasks):
              context.submit_subtask(ChildTask(i))

  class ChildTask(Task):
      index: int

      def execute(self, context: ExecutionContext) -> None:
          context.logger.info("Executing child task", index=self.index)

  # after submitting this task, a task runner may pick it up and execute it
  # which will result in 5 ChildTasks being submitted and executed as well
  task = ParentTask(5)
  ```

  ```go Go theme={"system"}
  type ParentTask struct {
  	NumSubtasks int
  }

  func (t *ParentTask) Execute(ctx context.Context) error {
  	for i := range t.NumSubtasks {
  		_, err := workflows.SubmitSubtask(ctx, &ChildTask{Index: i})
  		if err != nil {
  			return err
  		}
  	}

  	return nil
  }

  type ChildTask struct {
  	Index int
  }

  func (t *ChildTask) Execute(context.Context) error {
  	slog.Info("Executing ChildTask", slog.Int("index", t.Index))
  	
  	return nil
  }

  // after submitting this task, a task runner may pick it up and execute it 
  // which will result in 5 ChildTasks being submitted and executed as well
  task := &ParentTask{numSubtasks: 5}
  ```
</CodeGroup>

In this example, a `ParentTask` submits `ChildTask` tasks as subtasks. The number of subtasks to be submitted is based on the `num_subtasks` attribute of the `ParentTask`. The `submit_subtask` method takes an instance of a task as its argument, meaning the task to be submitted must be instantiated with concrete parameters first.

Parent task do not have access to results of subtasks, instead, tasks can use [shared caching](/workflows/caches#storing-and-retrieving-data) to share data between tasks.

<Info>
  By submitting a task as a subtask, its execution is scheduled as part of the same job as the parent task. Compared to just directly invoking the subtask's `execute` method, this allows the subtask's execution to occur on a different machine or in parallel with other subtasks. To learn more about how tasks are executed, see the section on [task runners](/workflows/concepts/task-runners).
</Info>

### Larger subtasks example

A practical workflow example showcasing task composition might help illustrate the capabilities of tasks. Below is an example of a set of tasks forming a workflow capable of downloading a set number of random dog images from the internet. The [Dog API](https://thedogapi.com/) can be used to get the image URLs, and then download them. Implementing this using Task Composition could look like this:

<CodeGroup title="Task Composition">
  ```python Python theme={"system"}
  import httpx  # pip install httpx
  from pathlib import Path

  class DownloadRandomDogImages(Task):
      num_images: int

      def execute(self, context: ExecutionContext) -> None:
          url = f"https://api.thedogapi.com/v1/images/search?limit={self.num_images}"
          response = httpx.get(url)
          for dog_image in response.json():
              context.submit_subtask(DownloadImage(dog_image["url"]))

  class DownloadImage(Task):
      url: str

      def execute(self, context: ExecutionContext) -> None:
          file = Path("dogs") / self.url.split("/")[-1]
          response = httpx.get(self.url)
          with file.open("wb") as file:
              file.write(response.content)
  ```

  ```go Go theme={"system"}
  package dogs

  import (
  	"context"
  	"encoding/json"
  	"fmt"
  	"io"
  	"net/http"
  	"os"
  	"strings"

  	"github.com/tilebox/tilebox-go/workflows/v1"
  )

  type DogImage struct {
  	ID     string `json:"id"`
  	URL    string `json:"url"`
  	Width  *int   `json:"width"`
  	Height *int   `json:"height"`
  }

  type DownloadRandomDogImages struct {
  	NumImages int
  }

  func (t *DownloadRandomDogImages) Execute(ctx context.Context) error {
  	url := fmt.Sprintf("https://api.thedogapi.com/v1/images/search?limit=%d", t.NumImages)
  	response, err := http.Get(url)
  	if err != nil {
  		return fmt.Errorf("failed to download images: %w", err)
  	}

  	defer response.Body.Close()
  	body, err := io.ReadAll(response.Body)
  	if err != nil {
  		return fmt.Errorf("failed to read response: %w", err)
  	}

  	var dogImages []DogImage
  	err = json.Unmarshal(body, &dogImages)
  	if err != nil {
  		return err
  	}

  	for _, dogImage := range dogImages {
  		_, err := workflows.SubmitSubtask(ctx, &DownloadImage{URL: dogImage.URL})
  		if err != nil {
  			return err
  		}
  	}
  	return nil
  }

  type DownloadImage struct {
  	URL string
  }

  func (t *DownloadImage) Execute(context.Context) error {
  	response, err := http.Get(t.URL)
  	if err != nil {
  		return fmt.Errorf("failed to download image: %w", err)
  	}

  	defer response.Body.Close()
  	body, err := io.ReadAll(response.Body)
  	if err != nil {
  		return fmt.Errorf("failed to read response: %w", err)
  	}

  	err = os.MkdirAll("dogs", 0o755)
  	if err != nil {
  		return fmt.Errorf("failed to create dogs directory: %w", err)
  	}

  	elements := strings.Split(t.URL, "/")
  	file := fmt.Sprintf("dogs/%s", elements[len(elements)-1])

  	return os.WriteFile(file, body, 0o600)
  }
  ```
</CodeGroup>

This example consists of the following tasks:

<AccordionGroup>
  <Accordion title="DownloadRandomDogImages">
    `DownloadRandomDogImages` fetches a specific number of random dog image URLs from an API. It then submits a `DownloadImage` task for each received image URL.
  </Accordion>

  <Accordion title="DownloadImage">
    `DownloadImage` downloads an image from a specified URL and saves it to a file.
  </Accordion>
</AccordionGroup>

Together, these tasks create a workflow that downloads random dog images from the internet. The relationship between the two tasks and their formation as a workflow becomes clear when `DownloadRandomDogImages` submits `DownloadImage` tasks as subtasks.

Visualizing the execution of such a workflow is akin to a tree structure where the `DownloadRandomDogImages` task is the root, and the `DownloadImage` tasks are the leaves. For instance, when downloading five random dog images, the following tasks are executed.

<CodeGroup>
  ```python Python theme={"system"}
  from tilebox.workflows import Client

  client = Client()
  jobs = client.jobs()
  job = jobs.submit(
      "download-dog-images",
      DownloadRandomDogImages(5),
  )

  # now our deployed task runners will pick up the task and execute it

  jobs.display(job)
  ```

  ```go Go theme={"system"}
  ctx := context.Background()
  client := workflows.NewClient()

  job, err := client.Jobs.Submit(ctx, "download-dog-images",
    []workflows.Task{
  	&helloworld.DownloadRandomDogImages{
  		NumImages: 5,
  	},
    },
  )
  if err != nil {
    slog.Error("Failed to submit job", slog.Any("error", err))
    return
  }

  // now our deployed task runners will pick up the task and execute it
  ```
</CodeGroup>

<Frame>
  <img src="https://mintcdn.com/tilebox/1gnXQkA0KGk_HxFE/assets/workflows/diagrams/svg/download-dog-images.svg?fit=max&auto=format&n=1gnXQkA0KGk_HxFE&q=85&s=26598eb2fdaddc1e8d3e226de73c901d" alt="Download Dog Images Workflow" className="dark:hidden" width="1127" height="314" data-path="assets/workflows/diagrams/svg/download-dog-images.svg" />

  <img src="https://mintcdn.com/tilebox/1gnXQkA0KGk_HxFE/assets/workflows/diagrams/svg/download-dog-images.dark.svg?fit=max&auto=format&n=1gnXQkA0KGk_HxFE&q=85&s=0ad9cbad69964509218c633e47083896" alt="Download Dog Images Workflow" className="hidden dark:block" width="1127" height="314" data-path="assets/workflows/diagrams/svg/download-dog-images.dark.svg" />
</Frame>

In total, six tasks are executed: the `DownloadRandomDogImages` task and five `DownloadImage` tasks. The `DownloadImage` tasks can execute in parallel, as they are independent. If more than one task runner is available, the Tilebox Workflow Orchestrator **automatically parallelizes** the execution of these tasks.

<Tip>
  Check out [job\_client.display](/workflows/concepts/jobs#visualization) to learn how this visualization was automatically generated from the task executions.
</Tip>

## Task States

Every task goes through a set of states during its lifetime.

* When submitted, either as a job or as a subtask, it starts in the `QUEUED` state and transitions to `RUNNING` when a task runner picks it up.
* If the task executes successfully, it transitions to `COMPUTED`.
* If the task fails, it transitions to `FAILED`, unless it's an [optional task](#optional-tasks), or nested within an [optional task](#nested-optional-tasks), in which case it transitions to `FAILED_OPTIONAL`.
* As soon as all subtasks of a task are `COMPUTED` (or `FAILED_OPTIONAL`), the task is considered `COMPLETED`, allowing dependent tasks to be executed.

The table below summarizes the different task states and their meanings.

| Task State            | Description                                                                                                                                                                                                                                   |
| --------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **Queued**            | The task is queued and waiting for execution. Any [eligible](/workflows/concepts/task-runners#task-selection) task runner can pick it up and execute it, as soon as it's parent task is `COMPUTED` and all it's dependencies are `COMPLETED`. |
| **Running**           | The task is currently being executed by a task runner.                                                                                                                                                                                        |
| **Computed**          | The task has successfully been computed, but still has outstanding subtasks.                                                                                                                                                                  |
| **Completed**         | The task has successfully been computed, and all it's subtasks are also computed, making it `COMPLETED`. This is the final state of a task. Only once a task has been `COMPLETED`, dependent tasks can be executed.                           |
| **Failed**            | The task has been executed but encountered an error.                                                                                                                                                                                          |
| **Failed (Optional)** | The task has been executed but encountered an error. Since the task was [marked as optional](#optional-tasks), the job continues executing.                                                                                                   |
| **Skipped**           | The task was skipped because it's a subtask of an optional task and one of its siblings failed.                                                                                                                                               |

<Frame>
  <img src="https://mintcdn.com/tilebox/2pxSsTbsN4VhVA9Y/assets/workflows/diagrams/svg/task-states.svg?fit=max&auto=format&n=2pxSsTbsN4VhVA9Y&q=85&s=8f8ee29b588e9a99a37461ac35c5cb00" alt="Task States" className="dark:hidden" width="1680" height="512" data-path="assets/workflows/diagrams/svg/task-states.svg" />

  <img src="https://mintcdn.com/tilebox/2pxSsTbsN4VhVA9Y/assets/workflows/diagrams/svg/task-states.dark.svg?fit=max&auto=format&n=2pxSsTbsN4VhVA9Y&q=85&s=c81c2b3975c8c093996d3b0a198cbfa0" alt="Task States" className="hidden dark:block" width="1680" height="512" data-path="assets/workflows/diagrams/svg/task-states.dark.svg" />
</Frame>

## Map-Reduce Pattern

Often times the input to a task is a list, with elements that should then be **mapped** to individual subtasks, whose results are later aggregated in a **reduce** step. This pattern is commonly known as [MapReduce](https://en.wikipedia.org/wiki/MapReduce) and a common pattern in workflows. In Tilebox, the reduce step is typically defined as a separate task that depends on all the map tasks.

For example, the workflow below applies this pattern to a list of numbers to calculate the sum of all squares of the numbers. The `Square` task takes a single number and squares it, and the `Sum` task reduces the list of squared numbers to a single sum.

<CodeGroup title="Map-Reduce">
  ```python Python theme={"system"}
  class SumOfSquares(Task):
      numbers: list[int]

      def execute(self, context: ExecutionContext) -> None:
  		# 1. Map
          square_tasks = context.submit_subtasks(
              [Square(num) for num in self.numbers]
          )
          # 2. Reduce
          sum_task = context.submit_subtask(Sum(), depends_on=square_tasks)


  class Square(Task):  # The map step
      num: int

      def execute(self, context: ExecutionContext) -> None:
          result = self.num ** 2
          # typically the output of a task is a large dataset,
          # so we save individual results into a shared cache
          context.job_cache.group("squares")[str(self.num)] = str(result).encode()
          context.current_task.display = f"Square({self.num})"

  class Sum(Task):  # The reduce step
      def execute(self, context: ExecutionContext) -> None:
          result = 0
          # access our cached results from the map step
          squares = context.job_cache.group("squares")
          for key in squares:
              result += int(squares[key].decode())

          context.logger.info("Computed sum of squares", result=result)
  ```
</CodeGroup>

Submitting a job of the `SumOfSquares` task and running it with a task runner can be done as follows:

<CodeGroup>
  ```python Python theme={"system"}
  from tilebox.workflows import Client
  from tilebox.workflows.cache import InMemoryCache

  client = Client()
  jobs = client.jobs()
  job = jobs.submit(
      "sum-of-squares",
      SumOfSquares([12, 345, 453, 21, 45, 98]),
  )

  client.runner(tasks=[SumOfSquares, Square, Sum], cache=InMemoryCache()).run_all()

  jobs.display(job)
  ```
</CodeGroup>

```plaintext Logs theme={"system"}
Computed sum of squares result=336448
```

<Frame>
  <img src="https://mintcdn.com/tilebox/1gnXQkA0KGk_HxFE/assets/workflows/diagrams/svg/map-reduce-sum-squares.svg?fit=max&auto=format&n=1gnXQkA0KGk_HxFE&q=85&s=6d74ffef26170b06764e0fca4606be28" alt="Sum of squares workflow using the map-reduce pattern" className="dark:hidden" width="1138" height="480" data-path="assets/workflows/diagrams/svg/map-reduce-sum-squares.svg" />

  <img src="https://mintcdn.com/tilebox/1gnXQkA0KGk_HxFE/assets/workflows/diagrams/svg/map-reduce-sum-squares.dark.svg?fit=max&auto=format&n=1gnXQkA0KGk_HxFE&q=85&s=5b2c595f4886cca9b6f48c1f6154ae4f" alt="Sum of squares workflow using the map-reduce pattern" className="hidden dark:block" width="1138" height="480" data-path="assets/workflows/diagrams/svg/map-reduce-sum-squares.dark.svg" />
</Frame>

## Recursive subtasks

Tasks can not only submit other tasks as subtasks, but also instances of themselves. This allows for a recursive breakdown of a task into smaller chunks. Such recursive decomposition algorithms are referred to as [divide and conquer algorithms](https://en.wikipedia.org/wiki/Divide-and-conquer_algorithm).
For example, the `RecursiveTask` below is a valid task that submits smaller instances of itself as subtasks.

<Warning>When implementing a recursive task, it's important to define a base case that stops the recursion. Otherwise, the task will keep submitting subtasks indefinitely, resulting in an infinite loop.</Warning>

<CodeGroup title="Recursive Subtasks">
  ```python Python theme={"system"}
  class RecursiveTask(Task):
      num: int

      def execute(self, context: ExecutionContext) -> None:
          context.logger.info("Executing recursive task", num=self.num)
  		# if num < 2, we reached the base case and stop recursion
          if self.num >= 2:
              context.submit_subtask(RecursiveTask(self.num // 2))
  ```

  ```go Go theme={"system"}
  type RecursiveTask struct {
    Num int
  }

  func (t *RecursiveTask) Execute(ctx context.Context) error {
    slog.Info("Executing RecursiveTask", slog.Int("num", t.Num))
    // if num < 2, we reached the base case and stop recursion
    if t.Num >= 2 {
      _, err := workflows.SubmitSubtask(ctx, &RecursiveTask{Num: t.Num / 2})
      if err != nil {
        return err
      }
    }
    return nil
  }
  ```
</CodeGroup>

### Recursive subtask example

An example for this is the [random dog images workflow](#larger-subtasks-example) mentioned earlier. In the previous implementation, downloading images was already parallelized. But the initial orchestration of the individual download tasks was not parallelized, because `DownloadRandomDogImages` was responsible for fetching all random dog image URLs and only submitted the individual download tasks once all URLs were retrieved. For a large number of images this setup can bottleneck the entire workflow.

To improve this, recursive subtask submission decomposes a `DownloadRandomDogImages` task with a high number of images into two smaller `DownloadRandomDogImages` tasks, each fetching half. This process can be repeated until a specified threshold is met, at which point the Dog API can be queried directly for image URLs. That way, image downloads start as soon as the first URLs are retrieved, without initial waiting.

An implementation of this recursive submission may look like this:

<CodeGroup title="Task Composition">
  ```python Python theme={"system"}
  class DownloadRandomDogImages(Task):
      num_images: int

      def execute(self, context: ExecutionContext) -> None:
          if self.num_images > 4:
              half = self.num_images // 2
              remaining = self.num_images - half  # account for odd numbers
              context.submit_subtask(DownloadRandomDogImages(half))
              context.submit_subtask(DownloadRandomDogImages(remaining))
          else:
              url = f"https://api.thedogapi.com/v1/images/search?limit={self.num_images}"
              response = httpx.get(url)
              for dog_image in response.json()[:self.num_images]:
                  context.submit_subtask(DownloadImage(dog_image["url"]))
  ```

  ```go Go theme={"system"}
  type DownloadRandomDogImages struct {
  	NumImages int
  }

  func (t *DownloadRandomDogImages) Execute(ctx context.Context) error {
  	if t.NumImages > 4 {
  		half := t.NumImages / 2
  		remaining := t.NumImages - half // account for odd numbers
  		_, err := workflows.SubmitSubtask(ctx, &DownloadRandomDogImages{NumImages: half})
  		if err != nil {
  			return err
  		}
  		_, err = workflows.SubmitSubtask(ctx, &DownloadRandomDogImages{NumImages: remaining})
  		if err != nil {
  			return err
  		}
  	} else {
  		url := fmt.Sprintf("https://api.thedogapi.com/v1/images/search?limit=%d", t.NumImages)
  		response, err := http.Get(url)
  		if err != nil {
  			return fmt.Errorf("failed to download images: %w", err)
  		}

  		defer response.Body.Close()
  		body, err := io.ReadAll(response.Body)
  		if err != nil {
  			return fmt.Errorf("failed to read response: %w", err)
  		}

  		var dogImages []DogImage
  		err = json.Unmarshal(body, &dogImages)
  		if err != nil {
  			return err
  		}

  		for _, dogImage := range dogImages {
  			_, err := workflows.SubmitSubtask(ctx, &DownloadImage{URL: dogImage.URL})
  			if err != nil {
  				return err
  			}
  		}
  	}
  	return nil
  }
  ```
</CodeGroup>

With this implementation, downloading a large number of images (for example, 9) results in the following tasks being executed:

<Frame>
  <img src="https://mintcdn.com/tilebox/1gnXQkA0KGk_HxFE/assets/workflows/diagrams/svg/download-dog-images-recursive.svg?fit=max&auto=format&n=1gnXQkA0KGk_HxFE&q=85&s=a17f3adab4bf2b6926394cb6520cf8e7" alt="Download Dog Images Workflow implemented recursively" className="dark:hidden" width="1995" height="646" data-path="assets/workflows/diagrams/svg/download-dog-images-recursive.svg" />

  <img src="https://mintcdn.com/tilebox/1gnXQkA0KGk_HxFE/assets/workflows/diagrams/svg/download-dog-images-recursive.dark.svg?fit=max&auto=format&n=1gnXQkA0KGk_HxFE&q=85&s=8ab07652916f543fb181d7e74b09f5f7" alt="Download Dog Images Workflow implemented recursively" className="hidden dark:block" width="1995" height="646" data-path="assets/workflows/diagrams/svg/download-dog-images-recursive.dark.svg" />
</Frame>

## Retry Handling

By default, when a task fails to execute, it's marked as failed. In some cases, it may be useful to retry the task multiple times before marking it as a failure. This is particularly useful for tasks dependent on external services that might be temporarily unavailable.

Tilebox Workflows allows you to specify the number of retries for a task using the `max_retries` argument of the `submit_subtask` method.

Check out the example below to see how this might look like in practice.

<Note>
  A failed task may be picked up by any available runner and not necessarily the same one that it failed on.
</Note>

<CodeGroup title="Submitting Subtasks">
  ```python Python theme={"system"}
  import random

  class RootTask(Task):
      def execute(self, context: ExecutionContext) -> None:
          context.submit_subtask(FlakyTask(), max_retries=5)

  class FlakyTask(Task):
      def execute(self, context: ExecutionContext) -> None:
          context.logger.info("Executing flaky task")

          if random.random() < 0.1:
              raise Exception("FlakyTask failed randomly")
  ```

  ```go Go theme={"system"}
  package flaky

  import (
  	"context"
  	"errors"
  	"log/slog"
  	"math/rand/v2"

  	"github.com/tilebox/tilebox-go/workflows/v1"
  	"github.com/tilebox/tilebox-go/workflows/v1/subtask"
  )

  type RootTask struct{}

  func (t *RootTask) Execute(ctx context.Context) error {
  	_, err := workflows.SubmitSubtask(ctx, &FlakyTask{},
  		subtask.WithMaxRetries(5),
  	)
  	return err
  }

  type FlakyTask struct{}

  func (t *FlakyTask) Execute(context.Context) error {
  	slog.Info("Executing FlakyTask")

  	if rand.Float64() < 0.1 {
  		return errors.New("FlakyTask failed randomly")
  	}
  	return nil
  }
  ```
</CodeGroup>

## Dependencies

Tasks often rely on other tasks. For example, a task that processes data might depend on a task that fetches that data. **Tasks can express their dependencies on other tasks** by using the `depends_on` argument of the [`submit_subtask`](/api-reference/python/tilebox.workflows/ExecutionContext.submit_subtask) method. This means that a dependent task will only execute after the task it relies on has successfully completed.

<Note>
  The `depends_on` argument accepts a list of tasks, enabling a task to depend on multiple other tasks.
</Note>

A workflow with dependencies might look like this:

<CodeGroup title="Task Composition">
  ```python Python theme={"system"}
  class RootTask(Task):
      def execute(self, context: ExecutionContext) -> None:
          first_task = context.submit_subtask(
            PrintTask("Executing first")
          )
          second_task = context.submit_subtask(
            PrintTask("Executing second"), 
            depends_on=[first_task],
          )
          third_task = context.submit_subtask(
            PrintTask("Executing last"),
            depends_on=[second_task],
          )

  class PrintTask(Task):
      message: str

      def execute(self, context: ExecutionContext) -> None:
          context.logger.info("Print task executed", message=self.message)
  ```

  ```go Go theme={"system"}
  type RootTask struct{}

  func (t *RootTask) Execute(ctx context.Context) error {
  	firstTask, err := workflows.SubmitSubtask(
  		ctx,
  		&PrintTask{Message: "Executing first"},
  	)
  	if err != nil {
  		return err
  	}

  	secondTask, err := workflows.SubmitSubtask(
  		ctx,
  		&PrintTask{Message: "Executing second"},
  		subtask.WithDependencies(firstTask),
  	)
  	if err != nil {
  		return err
  	}

  	_, err = workflows.SubmitSubtask(
  		ctx,
  		&PrintTask{Message: "Executing last"},
  		subtask.WithDependencies(secondTask),
  	)
  	if err != nil {
  		return err
  	}

  	return nil
  }

  type PrintTask struct {
  	Message string
  }

  func (t *PrintTask) Execute(context.Context) error {
  	slog.Info("PrintTask", slog.String("message", t.Message))
  	return nil
  }
  ```
</CodeGroup>

The `RootTask` submits three `PrintTask` tasks as subtasks. These tasks depend on each other, meaning the second task executes only after the first task has successfully completed, and the third only executes after the second completes. The tasks are executed sequentially.

<Note>
  If a task upon which another task depends submits subtasks, those subtasks must also execute before the dependent task begins execution.
</Note>

### Dependencies Example

A practical example is a workflow that fetches news articles from an API and processes them using the [News API](https://newsapi.org/).

<CodeGroup title="Task Dependencies">
  ```python Python theme={"system"}
  from pathlib import Path
  import json
  from collections import Counter
  import httpx  # pip install httpx

  class NewsWorkflow(Task):
      category: str
      max_articles: int

      def execute(self, context: ExecutionContext) -> None:
          fetch_task = context.submit_subtask(FetchNews(self.category, self.max_articles))
          context.submit_subtask(PrintHeadlines(), depends_on=[fetch_task])
          context.submit_subtask(MostFrequentAuthors(), depends_on=[fetch_task])

  class FetchNews(Task):
      category: str
      max_articles: int

      def execute(self, context: ExecutionContext) -> None:
          url = f"https://newsapi.org/v2/top-headlines?category={self.category}&pageSize={self.max_articles}&country=us&apiKey=API_KEY"
          with context.tracer.span("fetch-news") as span:
              span.set_attribute("category", self.category)
              span.set_attribute("max_articles", self.max_articles)
              news = httpx.get(url).json()
          # check out our documentation page on caches to learn
          # about a better way of passing data between tasks
          Path("news.json").write_text(json.dumps(news))
          context.logger.info(
              "Fetched news articles",
              category=self.category,
              article_count=len(news["articles"]),
          )

  class PrintHeadlines(Task):
      def execute(self, context: ExecutionContext) -> None:
          news = json.loads(Path("news.json").read_text())
          for article in news["articles"]:
              context.logger.info(
                  "News headline",
                  published_at=article["publishedAt"][:10],
                  title=article["title"],
              )

  class MostFrequentAuthors(Task):
      def execute(self, context: ExecutionContext) -> None:
          news = json.loads(Path("news.json").read_text())
          authors = [article["author"] for article in news["articles"]]
          for author, count in Counter(authors).most_common():
              context.logger.info("Author article count", author=author, count=count)

  # now submit a job, and then visualize it
  job = job_client.submit("process-news",
      NewsWorkflow(category="science", max_articles=5),
  )
  ```

  ```go Go theme={"system"}
  package news

  import (
  	"context"
  	"encoding/json"
  	"fmt"
  	"io"
  	"log/slog"
  	"net/http"
  	"os"
  	"time"

  	"github.com/tilebox/tilebox-go/workflows/v1"
  	"github.com/tilebox/tilebox-go/workflows/v1/subtask"
  )

  const newsAPIKey = "YOUR_API_KEY"

  type NewsWorkflow struct {
  	Category    string
  	MaxArticles int
  }

  func (t *NewsWorkflow) Execute(ctx context.Context) error {
  	fetchTask, err := workflows.SubmitSubtask(ctx, &FetchNews{
  		Category:    t.Category,
  		MaxArticles: t.MaxArticles,
  	})
  	if err != nil {
  		return err
  	}

  	_, err = workflows.SubmitSubtask(ctx, &PrintHeadlines{}, subtask.WithDependencies(fetchTask))
  	if err != nil {
  		return err
  	}

  	_, err = workflows.SubmitSubtask(ctx, &MostFrequentAuthors{}, subtask.WithDependencies(fetchTask))
  	if err != nil {
  		return err
  	}

  	return nil
  }

  type News struct {
  	Status       string `json:"status"`
  	TotalResults int    `json:"totalResults"`
  	Articles     []struct {
  		Source struct {
  			ID   *string `json:"id"`
  			Name string  `json:"name"`
  		} `json:"source"`
  		Author      *string   `json:"author"`
  		Title       string    `json:"title"`
  		Description *string   `json:"description"`
  		URL         string    `json:"url"`
  		URLToImage  *string   `json:"urlToImage"`
  		PublishedAt time.Time `json:"publishedAt"`
  		Content     *string   `json:"content"`
  	} `json:"articles"`
  }

  type FetchNews struct {
  	Category    string
  	MaxArticles int
  }

  func (t *FetchNews) Execute(context.Context) error {
  	url := fmt.Sprintf("https://newsapi.org/v2/top-headlines?category=%s&pageSize=%d&country=us&apiKey=%s", t.Category, t.MaxArticles, newsAPIKey)
  	response, err := http.Get(url)
  	if err != nil {
  		return fmt.Errorf("failed to download news: %w", err)
  	}

  	defer response.Body.Close()
  	body, err := io.ReadAll(response.Body)
  	if err != nil {
  		return fmt.Errorf("failed to read response: %w", err)
  	}

  	// check out our documentation page on caches to learn
  	// about a better way of passing data between tasks
  	return os.WriteFile("news.json", body, 0o600)
  }

  type PrintHeadlines struct{}

  func (t *PrintHeadlines) Execute(context.Context) error {
  	newsBytes, err := os.ReadFile("news.json")
  	if err != nil {
  		return fmt.Errorf("failed to read news: %w", err)
  	}

  	var news News
  	err = json.Unmarshal(newsBytes, &news)
  	if err != nil {
  		return fmt.Errorf("failed to unmarshal news: %w", err)
  	}

  	for _, article := range news.Articles {
  		slog.Info("Article", slog.Time("published_at", article.PublishedAt), slog.String("title", article.Title))
  	}

  	return nil
  }

  type MostFrequentAuthors struct{}

  func (t *MostFrequentAuthors) Execute(context.Context) error {
  	newsBytes, err := os.ReadFile("news.json")
  	if err != nil {
  		return fmt.Errorf("failed to read news: %w", err)
  	}

  	var news News
  	err = json.Unmarshal(newsBytes, &news)
  	if err != nil {
  		return fmt.Errorf("failed to unmarshal news: %w", err)
  	}

  	authors := make(map[string]int)
  	for _, article := range news.Articles {
  		if article.Author == nil {
  			continue
  		}
  		authors[*article.Author]++
  	}

  	for author, count := range authors {
  		slog.Info("Author", slog.String("author", author), slog.Int("count", count))
  	}

  	return nil
  }

  // in main now submit a job, and then visualize it
  /*
  job, err := client.Jobs.Submit(ctx, "process-news",
  	[]workflows.Task{
  		&NewsWorkflow{
  			Category:    "science",
  			MaxArticles: 5,
  		},
  	},
  )
  */
  ```
</CodeGroup>

```plaintext Logs theme={"system"}
News headline published_at=2024-02-15 title="NASA selects ultraviolet astronomy mission but delays its launch two years - SpaceNews"
News headline published_at=2024-02-15 title="SpaceX launches Space Force mission from Cape Canaveral - Orlando Sentinel"
News headline published_at=2024-02-14 title="Saturn's largest moon most likely uninhabitable - Phys.org"
News headline published_at=2024-02-14 title="AI Unveils Mysteries of Unknown Proteins' Functions - Neuroscience News"
News headline published_at=2024-02-14 title="Anthropologists' research unveils early stone plaza in the Andes - Phys.org"
Author article count author="Jeff Foust" count=1
Author article count author="Richard Tribou" count=1
Author article count author="Jeff Renaud" count=1
Author article count author="Neuroscience News" count=1
Author article count author="Science X" count=1
```

<Frame>
  <img src="https://mintcdn.com/tilebox/1gnXQkA0KGk_HxFE/assets/workflows/diagrams/svg/process-news.svg?fit=max&auto=format&n=1gnXQkA0KGk_HxFE&q=85&s=7c95d472185325b99f3236749ecf1cd0" alt="Process News Workflow" className="dark:hidden" width="707" height="262" data-path="assets/workflows/diagrams/svg/process-news.svg" />

  <img src="https://mintcdn.com/tilebox/1gnXQkA0KGk_HxFE/assets/workflows/diagrams/svg/process-news.dark.svg?fit=max&auto=format&n=1gnXQkA0KGk_HxFE&q=85&s=62383486f4b4be9b5a96a645608f3d81" alt="Process News Workflow" className="hidden dark:block" width="707" height="262" data-path="assets/workflows/diagrams/svg/process-news.dark.svg" />
</Frame>

This workflow consists of four tasks:

| Task                | Dependencies | Description                                                                                                             |
| ------------------- | ------------ | ----------------------------------------------------------------------------------------------------------------------- |
| NewsWorkflow        | -            | The root task of the workflow. It spawns the other tasks and sets up the dependencies between them.                     |
| FetchNews           | -            | A task that fetches news articles from the API and writes the results to a file, which is then read by dependent tasks. |
| PrintHeadlines      | FetchNews    | A task that logs the headlines of the news articles.                                                                    |
| MostFrequentAuthors | FetchNews    | A task that counts the number of articles each author has written and logs the result.                                  |

An important aspect is that there is no dependency between the `PrintHeadlines` and `MostFrequentAuthors` tasks. This means they can execute in parallel, which the Tilebox Workflow Orchestrator will do, provided multiple task runners are available.

<Tip>
  In this example, the results from `FetchNews` are stored in a file. This is not the recommended method for passing data between tasks. When executing on a distributed cluster, the existence of a file written by a dependent task cannot be guaranteed. Instead, it's better to use a [shared cache](/workflows/caches).
</Tip>

## Optional Tasks

By default, if any task in a job fails (after exhausting all [retries](#retry-handling)), the entire job is marked as failed and all remaining queued tasks are canceled. In some workflows though, certain tasks are not critical. Their failure should not prevent the rest of the job from completing. For these cases, you can mark a subtask as **optional**.

An optional task has the following behavior:

* If it **succeeds**, the job continues as normal, there is no difference from a regular task.
* If it **fails**, the job is **not** canceled. Instead:
  * The failed task is marked with the state `FAILED_OPTIONAL` instead of `FAILED`.
  * Tasks that [depend on](#dependencies) the optional task **still execute**, even though the optional task failed.
  * The parent task and the rest of the job continue as normal.

Some scenarios where optional tasks are useful are:

* **Data enrichment**: A task responsible for fetching auxiliary data that is not critical for the job to complete.
* **Reporting**: If a task is a notification or logging task, its failure should not prevent the rest of the job from completing.
* **Fault tolerance**: If a task is known to be flaky and may fail intermittently, marking it as optional can help ensure the job continues to make progress.
* **Aggregation workflows**: If a workflow is composed of multiple independent subtasks, and an aggregation task summarizing the results, not every subtask needs to succeed for the aggregation task to run.
* **Cleanup tasks**: If certain tasks need to always run at the end of a job, for example to send a notification, or to clean up temporary resources, marking the job tasks as optional ensures they always run.

<Note>
  Optional tasks can be combined with [retry handling](#retry-handling). An optional task is only marked as `FAILED_OPTIONAL` after all retries have been exhausted. For example, `context.submit_subtask(FlakyTask(), optional=True, max_retries=3)` will retry up to 3 times before being treated as a failed optional task.
</Note>

### Submitting Optional Tasks

To mark a subtask as optional, use the `optional` parameter when submitting it:

<CodeGroup>
  ```python Python theme={"system"}
  class RootTask(Task):
      def execute(self, context: ExecutionContext) -> None:
          required_step = context.submit_subtask(
              RequiredTask(),
          )
          optional_step = context.submit_subtask(
              FlakyTask(), optional=True
          )
          context.submit_subtask(
              FinalTask(), depends_on=[required_step, optional_step]
          )

  class RequiredTask(Task):
      def execute(self, context: ExecutionContext) -> None:
          # this task may fail, but the job will continue regardless
          context.logger.info("Required task completed")

  class FlakyTask(Task):
      def execute(self, context: ExecutionContext) -> None:
          # this task may fail, but the job will continue regardless
          context.logger.info("Attempting flaky operation")

  class FinalTask(Task):
      def execute(self, context: ExecutionContext) -> None:
          # this task runs even if FlakyTask failed
          context.logger.info("Running final step")
  ```

  ```go Go theme={"system"}
  type RootTask struct{}

  func (t *RootTask) Execute(ctx context.Context) error {
  	requiredStep, err := workflows.SubmitSubtask(ctx, &RequiredTask{})
  	if err != nil {
  		return err
  	}

  	optionalStep, err := workflows.SubmitSubtask(ctx, &FlakyTask{},
  		subtask.WithOptional(),
  	)
  	if err != nil {
  		return err
  	}

  	_, err = workflows.SubmitSubtask(ctx, &FinalTask{},
  		subtask.WithDependencies(requiredStep, optionalStep),
  	)
  	return err
  }

  type RequiredTask struct{}

  func (t *RequiredTask) Execute(context.Context) error {
  	slog.Info("This task is required to succeed, otherwise the job would stop")
  	return nil
  }

  type FlakyTask struct{}

  func (t *FlakyTask) Execute(context.Context) error {
  	// this task may fail, but the job will continue regardless
  	slog.Info("Attempting flaky operation...")
  	return nil
  }

  type FinalTask struct{}

  func (t *FinalTask) Execute(context.Context) error {
  	// this task runs even if FlakyTask failed
  	slog.Info("Running final step")
  	return nil
  }
  ```
</CodeGroup>

In this example, `FlakyTask` is submitted as an optional subtask. If it fails, `FinalTask` still executes because it depends on an optional task. The job completes successfully. A visualization of such a job is shown below.

<Frame>
  <img src="https://mintcdn.com/tilebox/1gnXQkA0KGk_HxFE/assets/workflows/diagrams/svg/optional-subtasks.svg?fit=max&auto=format&n=1gnXQkA0KGk_HxFE&q=85&s=9a5169297ffb9e84fb4bc840522658d4" alt="Optional Subtasks Workflow" className="dark:hidden" width="371" height="456" data-path="assets/workflows/diagrams/svg/optional-subtasks.svg" />

  <img src="https://mintcdn.com/tilebox/1gnXQkA0KGk_HxFE/assets/workflows/diagrams/svg/optional-subtasks.dark.svg?fit=max&auto=format&n=1gnXQkA0KGk_HxFE&q=85&s=01066f715d6ae575b9ff065ccc48fe62" alt="Optional Subtasks Workflow" className="hidden dark:block" width="371" height="456" data-path="assets/workflows/diagrams/svg/optional-subtasks.dark.svg" />
</Frame>

### Nested Optional Tasks

When an optional task itself submits subtasks, those subtasks, and also their subtasks recursively, are also considered optional. If any of those tasks fail, all remaining queued tasks that are nested within the same optional root task are automatically **skipped**. This ensures that the failure does not propagate beyond the optional boundary and the parent job continues normally.

<CodeGroup>
  ```python Python theme={"system"}
  class Pipeline(Task):
      def execute(self, context: ExecutionContext) -> None:
          context.submit_subtask(
              OptionalProcessing(), optional=True
          )
          context.submit_subtask(AlwaysRuns())

  class OptionalProcessing(Task):
      def execute(self, context: ExecutionContext) -> None:
          first = context.submit_subtask(Step1())
          context.submit_subtask(Step2(), depends_on=[first])

  class Step1(Task):
      def execute(self, context: ExecutionContext) -> None:
          raise ValueError("something went wrong")

  class Step1A(Task):
      def execute(self, context: ExecutionContext) -> None:
          context.logger.info("Step1A executed successfully")

  class Step1B(Task):
      def execute(self, context: ExecutionContext) -> None:
          raise ValueError("something went wrong")

  class Step1C(Task):
      def execute(self, context: ExecutionContext) -> None:
          context.logger.info("This will be skipped because Step1B failed")

  class Step2(Task):
      def execute(self, context: ExecutionContext) -> None:
          context.logger.info("This will be skipped because Step1B failed")

  class AlwaysRuns(Task):
      def execute(self, context: ExecutionContext) -> None:
          context.logger.info("This runs regardless")
  ```

  ```go Go theme={"system"}
  type Pipeline struct{}

  func (t *Pipeline) Execute(ctx context.Context) error {
  	_, err := workflows.SubmitSubtask(ctx, &OptionalProcessing{},
  		subtask.WithOptional(),
  	)
  	if err != nil {
  		return err
  	}

  	_, err = workflows.SubmitSubtask(ctx, &AlwaysRuns{})
  	return err
  }

  type OptionalProcessing struct{}

  func (t *OptionalProcessing) Execute(ctx context.Context) error {
  	first, err := workflows.SubmitSubtask(ctx, &Step1{})
  	if err != nil {
  		return err
  	}

  	_, err = workflows.SubmitSubtask(ctx, &Step2{},
  		subtask.WithDependencies(first),
  	)
  	return err
  }

  type Step1 struct{}

  func (t *Step1) Execute(context.Context) error {
  	workflows.SubmitSubtask(ctx, &Step1A{})
  	workflows.SubmitSubtask(ctx, &Step1B{})
  	workflows.SubmitSubtask(ctx, &Step1C{})
  	return nil
  }

  type Step1A struct{}

  func (t *Step1A) Execute(context.Context) error {
    slog.Info("Step1A executed successfully")
  	return nil
  }

  type Step1B struct{}

  func (t *Step1B) Execute(context.Context) error {
  	return errors.New("something went wrong")
  }

  type Step1C struct{}

  func (t *Step1C) Execute(context.Context) error {
    slog.Info("This will be skipped because Step1B failed")
  	return nil
  }

  type Step2 struct{}

  func (t *Step2) Execute(context.Context) error {
  	slog.Info("This will be skipped because Step1B failed")
  	return nil
  }

  type AlwaysRuns struct{}

  func (t *AlwaysRuns) Execute(context.Context) error {
  	slog.Info("This runs regardless")
  	return nil
  }
  ```
</CodeGroup>

In this example, `Step1B` fails. Since it's an indirect subtask of the optional `Processing` subtask, both `Step1C` and `Step2` are skipped and `AlwaysRuns` still executes. The job completes successfully.

<Frame>
  <img src="https://mintcdn.com/tilebox/1gnXQkA0KGk_HxFE/assets/workflows/diagrams/svg/nested-optional-subtasks.svg?fit=max&auto=format&n=1gnXQkA0KGk_HxFE&q=85&s=bf360bbcb67e4d76e8ca5233eb304a29" alt="Optional Subtree Workflow" className="dark:hidden" width="592" height="844" data-path="assets/workflows/diagrams/svg/nested-optional-subtasks.svg" />

  <img src="https://mintcdn.com/tilebox/1gnXQkA0KGk_HxFE/assets/workflows/diagrams/svg/nested-optional-subtasks.dark.svg?fit=max&auto=format&n=1gnXQkA0KGk_HxFE&q=85&s=354a75067abb2e39436f6329fadfb148" alt="Optional Subtree Workflow" className="hidden dark:block" width="592" height="844" data-path="assets/workflows/diagrams/svg/nested-optional-subtasks.dark.svg" />
</Frame>

If instead `Step1B` was also marked as optional, `Step1C` and `Step2` would still be executed, and only after that `AlwaysRuns` would execute. This means that optional subtasks can have other optional subtasks nested within them.

<Frame>
  <img src="https://mintcdn.com/tilebox/1gnXQkA0KGk_HxFE/assets/workflows/diagrams/svg/nested-optional-subtasks-recursive.svg?fit=max&auto=format&n=1gnXQkA0KGk_HxFE&q=85&s=6c707b404c8601952b06ab8b9fda0b81" alt="Optional Subtree Workflow" className="dark:hidden" width="507" height="844" data-path="assets/workflows/diagrams/svg/nested-optional-subtasks-recursive.svg" />

  <img src="https://mintcdn.com/tilebox/1gnXQkA0KGk_HxFE/assets/workflows/diagrams/svg/nested-optional-subtasks-recursive.dark.svg?fit=max&auto=format&n=1gnXQkA0KGk_HxFE&q=85&s=1ded151ab40383fd1944c58b83ec1e37" alt="Optional Subtree Workflow" className="hidden dark:block" width="507" height="844" data-path="assets/workflows/diagrams/svg/nested-optional-subtasks-recursive.dark.svg" />
</Frame>

## Task Identifiers

A task identifier is a unique string used by the Tilebox Workflow Orchestrator to identify the task. It's used by [task runners](/workflows/concepts/task-runners) to map submitted tasks to a task class and execute them. It also serves as the default name in execution visualizations.

If unspecified, the identifier of a task defaults to the class name. For instance, the identifier of the `PrintHeadlines` task in the previous example is `"PrintHeadlines"`. This is good for prototyping, but not recommended for production, as changing the class name also changes the identifier, which can lead to issues during refactoring. It also prevents different tasks from sharing the same class name.

To address this, Tilebox Workflows offers a way to explicitly specify the identifier of a task. This is done by overriding the `identifier` method of the `Task` class. This method should return a unique string identifying the task. This decouples the task's identifier from its class name, allowing you to change the identifier without renaming the class. It also allows tasks with the same class name to have different identifiers. The `identifier` method can also specify a version number for the task—see the section on [semantic versioning](#semantic-versioning) below for more details.

<CodeGroup title="Overriding the Task Identifier">
  ```python Python theme={"system"}
  class MyTask(Task):
      def execute(self, context: ExecutionContext) -> None:
          pass

  # MyTask has the identifier "MyTask" and the default version of "v0.0"

  class MyTask2(Task):
      @staticmethod
      def identifier() -> tuple[str, str]:
          return "tilebox.com/example_workflow/MyTask", "v1.0"

      def execute(self, context: ExecutionContext) -> None:
          pass

  # MyTask2 has the identifier "tilebox.com/example_workflow/MyTask" and the version "v1.0"
  ```

  ```go Go theme={"system"}
  type MyTask struct{}

  func (t *MyTask) Execute(context.Context) error {
  	return nil
  }

  // MyTask has the identifier "MyTask" and the default version of "v0.0"

  type MyTask2 struct{}

  func (t *MyTask2) Identifier() workflows.TaskIdentifier {
  	return workflows.NewTaskIdentifier("tilebox.com/example_workflow/MyTask", "v1.0")
  }

  func (t *MyTask2) Execute(context.Context) error {
  	return nil
  }

  // MyTask2 has the identifier "tilebox.com/example_workflow/MyTask" and the version "v1.0"
  ```
</CodeGroup>

<Note>
  In python, the `identifier` method must be defined as either a `classmethod` or a `staticmethod`, meaning it can be called without instantiating the class.
</Note>

## Semantic Versioning

As seen in the previous section, the `identifier` method can return a tuple of two strings, where the first string is the identifier and the second string is the version number. This allows for semantic versioning of tasks.

Versioning is important for managing changes to a task's execution method. It allows for new features, bug fixes, and changes while ensuring existing workflows operate as expected. Additionally, it enables multiple versions of a task to coexist, enabling gradual rollout of changes without interrupting production deployments.

You assign a version number by overriding the `identifier` method of the task class. It must return a tuple of two strings: the first is the [identifier](#task-identifiers) and the second is the version number, which must match the pattern `vX.Y` (where `X` and `Y` are non-negative integers). `X` is the major version number and `Y` is the minor version.

For example, this task has the identifier `"tilebox.com/example_workflow/MyTask"` and the version `"v1.3"`:

<CodeGroup title="Overriding the Task Identifier">
  ```python Python theme={"system"}
  class MyTask(Task):
      @staticmethod
      def identifier() -> tuple[str, str]:
          return "tilebox.com/example_workflow/MyTask", "v1.3"

      def execute(self, context: ExecutionContext) -> None:
          pass
  ```

  ```go Go theme={"system"}
  type MyTask struct{}

  func (t *MyTask) Identifier() workflows.TaskIdentifier {
  	return workflows.NewTaskIdentifier("tilebox.com/example_workflow/MyTask", "v1.3")
  }

  func (t *MyTask) Execute(context.Context) error {
  	return nil
  }
  ```
</CodeGroup>

When a task is submitted as part of a job, the version from which it's submitted is recorded and may differ from the version on the task runner executing the task.

When task runners execute a task, they require a registered task with a matching identifier and compatible version number. A compatible version is where the major version number on the task runner matches that of the submitted task, and the minor version number on the task runner is equal to or greater than that of the submitted task.

Examples of compatible version numbers include:

* `MyTask` is submitted as part of a job. The version is `"v1.3"`.
* A task runner with version `"v1.3"` of `MyTask` would executes this task.
* A task runner with version `"v1.5"` of `MyTask` would also executes this task.
* A task runner with version `"v1.2"` of `MyTask` would not execute this task, as its minor version is lower than that of the submitted task.
* A task runner with version `"v2.5"` of `MyTask` would not execute this task, as its major version differs from that of the submitted task.

## Conclusion

Tasks form the foundation of Tilebox Workflows. By understanding how to create and manage tasks, you can leverage Tilebox's capabilities to automate and optimize your workflows. Experiment with defining your own tasks, utilizing subtasks, managing dependencies, and employing semantic versioning to develop robust and efficient workflows.
