# AI Assistance
Large Language Models (LLMs) are powerful tools for exploring and learning about Tilebox. This section explains how to provide them with Tilebox-specific context for tailored, relevant and up-to-date responses.
## Providing Tilebox-specific context
AI assistants and Large Language Models (LLMs) can answer questions, guide you in using Tilebox, suggest relevant sections in the documentation, and assist you in creating workflows.
Download the complete Tilebox documentation as a plain text file to share with your AI assistant or language model.
The full content of the Tilebox documentation is available in plain markdown format at [https://docs.tilebox.com/llms-full.txt](https://docs.tilebox.com/llms-full.txt). Upload this document to your AI assistant or LLM to provide it with Tilebox-specific context.
The [Documentation Context](https://docs.tilebox.com/llms-full.txt) is updated whenever the documentation changes. If you download the file, refresh it occasionally to stay up-to-date.
## Example prompt
After you upload the [Documentation Context](https://docs.tilebox.com/llms-full.txt) to your AI assistant or LLM, you can ask it questions and receive tailored, up-to-date responses.
Here's an example prompt to get you started.
```txt Example prompt
Generate a script that
- Creates a cluster
- Configures console logging
- Contains a Fibonacci calculator workflow that is using a local filesystem cache.
Make sure you get task dependencies right.
Make sure to only print the final result as the last step of the workflow.
Write logs to let us know what is happening.
- Submits a job for the number 7
- Starts a processing runner
Do not invent APIs. All available Tilebox APIs are documented.
```
## Claude
[Claude 3.5 Sonnet](https://docs.anthropic.com/en/docs/about-claude/models) is a great choice for an AI assistant for using Tilebox. To provide Claude with Tilebox-specific context, create a [new project](https://support.anthropic.com/en/articles/9517075-what-are-projects), and upload the [llms-full.txt](https://docs.tilebox.com/llms-full.txt) as project knowledge. You can then ask questions or use it to generate scripts.
## ChatGPT
Experiments with [GPT-4o](https://chatgpt.com/?model=gpt-4o) have shown mixed results. While it effectively answers questions, it has difficulties generating workflow scripts, often misapplying or inventing APIs. Consider using [Claude 3.5 Sonnet](#claude) for a better experience.
# Client
```python
class Client(url: str, token: str)
```
Create a Tilebox datasets client.
## Parameters
Tilebox API Url. Defaults to `https://api.tilebox.com`.
The API Key to authenticate with. If not set the `TILEBOX_API_KEY` environment variable will be used.
```python Python
from tilebox.datasets import Client
# read token from an environment variable
client = Client()
# or provide connection details directly
client = Client(
url="https://api.tilebox.com",
token="YOUR_TILEBOX_API_KEY"
)
```
# Client.dataset
```python
def Client.dataset(slug: str) -> Dataset
```
Get a dataset by its slug.
## Parameters
The slug of the dataset
## Returns
A dataset object.
```python Python
s1_sar = client.dataset(
"open_data.copernicus.sentinel1_sar"
)
```
## Errors
The specified dataset does not exist.
# Client.datasets
```python
def Client.datasets() -> Datasets
```
Fetch all available datasets.
## Returns
An object containing all available datasets, accessible via dot notation.
```python Python
datasets = client.datasets()
s1_sar = datasets.open_data.copernicus.sentinel1_sar
```
# Collection.find
```python
def Collection.find(
datapoint_id: str,
skip_data: bool = False
) -> xarray.Dataset
```
Find a specific datapoint in a collection by its id.
## Parameters
The id of the datapoint to find.
Whether to skip loading the data for the datapoint. If `True`, only the metadata for the datapoint is loaded.
## Returns
An [`xarray.Dataset`](/sdks/python/xarray) containing the requested data point.
## Errors
The specified datapoint does not exist in this collection.
```python Python
data = collection.find(
"0186d6b6-66cc-fcfd-91df-bbbff72499c3",
skip_data = False,
)
```
# Collection.info
```python
def Collection.info() -> CollectionInfo
```
Fetch metadata about the datapoints in this collection.
## Returns
A collection info object.
```python Python
info = collection.info()
```
# Collection.load
```python
def Collection.load(
time_or_interval: TimeIntervalLike,
skip_data: bool = False,
show_progress: bool = False
) -> xarray.Dataset
```
Load a range of data points in this collection in a specified interval.
If no data exists for the requested time or interval, an empty `xarray.Dataset` is returned.
## Parameters
The time or time interval for which to load data. This can be a single time scalar, a tuple of two time scalars, or an array of time scalars.
Valid time scalars are: `datetime.datetime` objects, strings in ISO 8601 format, or Unix timestamps in seconds.
Behavior for each input type:
* **TimeScalar**: If a single time scalar is provided, `load` returns all data points for that exact millisecond.
* **TimeInterval**: If a time interval is provided, `load` returns all data points in that interval. Intervals can be a tuple of two `TimeScalars` or a `TimeInterval` object. Tuples are interpreted as a half-open interval `[start, end)`. With a `TimeInterval` object, the `start_exclusive` and `end_inclusive` parameters control whether the start and end time are inclusive or exclusive.
* **Iterable\[TimeScalar]**: If an array of time scalars is provided, `load` constructs a time interval from the first and last time scalar in the array. Here, both the `start` and `end` times are inclusive.
If `True`, the response contains only the [datapoint metadata](/datasets/timeseries) without the actual dataset-specific fields. Defaults to `False`.
If `True`, a progress bar is displayed when pagination is required. Defaults to `False`.
## Returns
An [`xarray.Dataset`](/sdks/python/xarray) containing the requested data points.
```python Python
from datetime import datetime
from tilebox.clients.core.data import TimeInterval
# loading a specific time
time = "2023-05-01 12:45:33.423"
data = collection.load(time)
# loading a time interval
interval = ("2023-05-01", "2023-08-01")
data = collection.load(interval, show_progress=True)
# loading a time interval with TimeInterval
interval = TimeInterval(
start=datetime(2023, 5, 1),
end=datetime(2023, 8, 1),
start_exclusive=False,
end_inclusive=False,
)
data = collection.load(interval, show_progress=True)
# loading with an iterable
meta_data = collection.load(..., skip_data=True)
first_50 = collection.load(meta_data.time[:50], skip_data=False)
```
# Dataset.collection
```python
def Dataset.collection(name: str) -> Collection
```
Access a specific collection by its name.
## Parameters
The name of the collection
## Returns
A collection object.
```python Python
collections = dataset.collection("My-collection")
```
## Errors
The specified collection does not exist in the dataset.
# Dataset.collections
```python
def Dataset.collections() -> dict[str, Collection]
```
List the available collections in a dataset.
## Returns
A dictionary mapping collection names to collection objects.
```python Python
collections = dataset.collections()
```
# Client
```python
class Client(url: str, token: str)
```
Create a Tilebox workflows client.
## Parameters
Tilebox API Url. Defaults to `https://api.tilebox.com`.
The API Key to authenticate with. If not set the `TILEBOX_API_KEY` environment variable will be used.
## Sub clients
The workflows client exposes sub clients for interacting with different parts of the Tilebox workflows API.
```python
def Client.jobs() -> JobClient
```
A client for interacting with jobs.
```python
def Client.clusters() -> ClusterClient
```
A client for managing clusters.
```python
def Client.recurrent_tasks() -> RecurrentTaskClient
```
A client for scheduling recurrent tasks.
## Task runners
```python
def Client.runner(...) -> TaskRunner
```
A client is also used to instantiate task runners. Check out the [`Client.runner` API reference](/api-reference/tilebox.workflows/Client.runner) for more information.
```python Python
from tilebox.workflows import Client
# read token from an environment variable
client = Client()
# or provide connection details directly
client = Client(
url="https://api.tilebox.com",
token="YOUR_TILEBOX_API_KEY"
)
# access sub clients
job_client = client.jobs()
cluster_client = client.clusters()
recurrent_task_client = client.recurrent_tasks()
# or instantiate a task runner
runner = client.runner("dev-cluster", tasks=[...])
```
# Client.runner
```python
def Client.runner(
cluster: ClusterSlugLike,
tasks: list[type[Task]],
cache: JobCache | None = None
) -> TaskRunner
```
Initialize a task runner.
## Parameters
The [cluster slug](/workflows/concepts/clusters#managing-clusters) for the cluster associated with this task runner.
A list of task classes that this runner can execute.
An optional [job cache](/workflows/caches) for caching results from tasks and sharing data between tasks.
```python Python
from tilebox.workflows import Client
from tilebox.workflows.cache import LocalFileSystemCache
client = Client()
runner = client.runner(
"my-cluster-EdsdUozYprBJDL",
[MyFirstTask, MySubtask],
# optional:
cache=LocalFileSystemCache("cache_directory"),
)
```
# Context.job_cache
```python
ExecutionContext.job_cache: JobCache
```
Access the job cache for a task.
```python Python
# within a Tasks execute method, access the job cache
# def execute(self, context: ExecutionContext):
context.job_cache["some-key"] = b"my-value"
```
# Context.submit_subtask
```python
def ExecutionContext.submit_subtask(
task: Task,
depends_on: list[FutureTask] = None,
cluster: str | None = None,
max_retries: int = 0
) -> FutureTask
```
Submit a [subtask](/workflows/concepts/tasks#subtasks-and-task-composition) from a currently executing task.
## Parameters
The task to submit as a subtask.
An optional list of tasks already submitted within the same context that this subtask depends on.
An optional [cluster slug](/workflows/concepts/clusters#managing-clusters) for running the subtask. If not provided, the subtask runs on the same cluster as the parent task.
Specify the maximum number of [retries](/workflows/concepts/tasks#retry-handling) for the subtask in case of failure. The default is 0.
## Returns
A `FutureTask` object that represents the submitted subtask. Can be used to set up dependencies between tasks.
```python Python
# within the execute method of a Task:
subtask = context.submit_subtask(MySubtask())
dependent_subtask = context.submit_subtask(
MyOtherSubtask(), depends_on=[subtask]
)
gpu_task = context.submit_subtask(
MyGPUTask(),
cluster="gpu-cluster-slug"
)
flaky_task = context.submit_subtask(
MyFlakyTask(),
max_retries=5
)
```
# JobCache.__iter__
```python
def JobCache.__iter__() -> Iterator[str]
```
List all available keys in a job cache group. Only keys that are direct children of the current group are returned.
For nested groups, first access the group and then iterate over its keys.
```python Python
# within a Tasks execute method, access the job cache
# def execute(context: ExecutionContext)
cache = context.job_cache
# iterate over all keys
for key in cache:
print(cache[key])
# or collect as list
keys = list(cache)
# also works with nested groups
for key in cache.group("some-group"):
print(cache[key])
```
# JobCache.group
```python
def JobCache.group(name: str) -> JobCache
```
You can nest caches in a hierarchical manner using [groups](/workflows/caches#groups-and-hierarchical-keys).
Groups are separated by a forward slash (/) in the key. This hierarchical structure functions similarly to a file system.
## Parameters
The name of the cache group.
```python Python
# within a Tasks execute method, access the job cache
# def execute(context: ExecutionContext)
cache = context.job_cache
# use groups to nest related cache values
group = cache.group("some-group")
group["value"] = b"my-value"
nested = group.group("nested")
nested["value"] = b"nested-value"
# access nested groups directly via / notation
value = cache["some-group/nested/value"]
```
# JobClient.cancel
```python
def JobClient.cancel(job_or_id: Job | str)
```
Cancel a job. When a job is canceled, no queued tasks will be picked up by task runners and executed even if task runners are idle. Tasks that are already being executed will finish their execution and not be interrupted. All sub-tasks spawned from such tasks after the cancellation will not be picked up by task runners.
## Parameters
The job or job id to cancel.
```python Python
job_client.cancel(job)
```
# JobClient.retry
```python
def JobClient.retry(job_or_id: Job | str)
```
Retry a job. All failed tasks will become queued again, and queued tasks will be picked up by task runners again.
## Parameters
The job or job id to retry.
```python Python
job_client.retry(job)
```
# JobClient.submit
```python
def JobClient.submit(
job_name: str,
root_task_or_tasks: Task | Iterable[Task],
cluster: str | Cluster | Iterable[str | Cluster],
max_retries: int = 0
) -> Job
```
## Parameters
The name of the job.
The root task for the job. This task is executed first and can submit subtasks to manage the entire workflow. A job can have optionally consist of multiple root tasks.
The [cluster slug](/workflows/concepts/clusters#managing-clusters) for the cluster to run the root task on. In case of multiple root tasks, a list of cluster slugs can be provided.
The maximum number of [retries](/workflows/concepts/tasks#retry-handling) for the subtask in case it fails. Defaults to 0.
```python Python
from my_workflow import MyTask
job = job_client.submit(
"my-job",
MyTask(
message="Hello, World!",
value=42,
data={"key": "value"}
),
"my-cluster-EdsdUozYprBJDL",
max_retries=0,
)
```
# JobClient.visualize
```python
def JobClient.visualize(
job_or_id: Job | str,
direction: str = "down",
layout: str = "dagre",
sketchy: bool = True
)
```
Create a visualization of a job as a diagram.
If you are working in an interactive environment, such as Jupyter notebooks, you can use the `display` method to display the visualized job diagram directly in the notebook.
## Parameters
The job to visualize.
The direction for the diagram to flow. For more details, see the relevant section in the [D2 documentation](https://d2lang.com/tour/layouts/#direction). Valid values are `up`, `down`, `right`, and `left`. The default value is `down`.
The layout engine for the diagram. For further information, refer to the [D2 layout engines](https://d2lang.com/tour/layouts). Valid values are `dagre` and `elk`, with the default being `dagre`.
Indicates whether to use a sketchy, hand-drawn style for the diagram. The default is `True`.
```python Python
svg = job_client.visualize(job)
Path("diagram.svg").write_text(svg)
# in a notebook
job_client.display(job)
```
# Task
```python
class Task:
def execute(context: ExecutionContext) -> None
@staticmethod
def identifier() -> tuple[str, str]
```
Base class for Tilebox workflows [tasks](/workflows/concepts/tasks). Inherit from this class to create a task.
Inheriting also automatically applies the dataclass decorator.
## Methods
```python
def Task.execute(context: ExecutionContext) -> None
```
The entry point for the execution of the task.
```python
@staticmethod
def Task.identifier() -> tuple[str, str]
```
Override a task identifier and specify its version. If not overridden, the identifier of a task defaults to the class name, and the version to `v0.0`.
## Task Input Parameters
```python
class MyTask(Task):
message: str
value: int
data: dict[str, int]
```
Optional task [input parameters](/workflows/concepts/tasks#input-parameters), defined as class attributes. Supported types
are `str`, `int`, `float`, `bool`, as well as `lists` and `dicts` thereof.
```python Python
from tilebox.workflows import Task, ExecutionContext
class MyFirstTask(Task):
def execute(self, context: ExecutionContext):
print(f"Hello World!")
@staticmethod
def identifier() -> tuple[str, str]:
return ("tilebox.workflows.MyTask", "v3.2")
class MyFirstParameterizedTask(Task):
name: str
greet: bool
data: dict[str, str]
def execute(self, context: ExecutionContext):
if self.greet:
print(f"Hello {self.name}!")
```
# TaskRunner.run_all
```python
def TaskRunner.run_all()
```
Run the task runner and execute all tasks, until there are no more tasks available.
```python Python
# run all tasks until no more tasks are available
runner.run_all()
```
# TaskRunner.run_forever
```python
def TaskRunner.run_forever()
```
Run the task runner forever. This will poll for new tasks and execute them as they come in.
If no tasks are available, it will sleep for a short time and then try again.
```python Python
# run forever, until the current process is stopped
runner.run_forever()
```
# Authentication
To access the Tilebox API, you must authenticate your requests. This guide explains how authentication works, focusing on API keys used as bearer tokens.
## Creating an API key
To create an API key, log into the [Tilebox Console](https://console.tilebox.com). Navigate to [Account -> API Keys](https://console.tilebox.com/account/api-keys) and click the "Create API Key" button.
Keep your API keys secure. Deactivate any keys if you suspect they have been compromised.
## Bearer token authentication
The Tilebox API uses bearer tokens for authentication. You need to pass your API key as the `token` parameter when creating an instance of the client.
```python Python
from tilebox.datasets import Client as DatasetsClient
from tilebox.workflows import Client as WorkflowsClient
datasets_client = DatasetsClient(token="YOUR_TILEBOX_API_KEY")
workflows_client = WorkflowsClient(token="YOUR_TILEBOX_API_KEY")
```
If you set your API key as an environment variable named `TILEBOX_API_KEY`, you can skip the token parameter when creating a client.
# Console
Explore your datasets and workflows with the Tilebox Console
The [Tilebox Console](https://console.tilebox.com) is a web interface that enables you to explore your datasets and workflows, manage your account and API keys, add collaborators to your team and monitor your usage.
## Datasets
The datasets explorer lets you explore available datasets for your team. You can select a dataset, view its collections, and load data for a collection within a specified time range.
When you click a specific event time in the data point list view, a detailed view of that data point will appear.
### Export as Code
After selecting a dataset, collection, and time range, you can export the current selection as a Python code snippet. This will copy a code snippet like the one below to your clipboard.
```python Python
from tilebox.datasets import Client
client = Client()
datasets = client.datasets()
sentinel2_msi = datasets.open_data.copernicus.sentinel2_msi
data = sentinel2_msi.collection("S2A_S2MSI1C").load(
("2024-07-12", "2024-07-26"),
show_progress=True,
)
```
Paste the snippet into a [notebook](/sdks/python/sample-notebooks) to interactively explore the
[`xarray.Dataset`](/sdks/python/xarray) that is returned.
## Workflows
The workflows section of the console allows you to view jobs, create clusters and create recurring and near real-time tasks.
## Account
### API Keys
The API Keys page enables you to manage your API keys. You can create new API keys, revoke existing ones, and view currently active API keys.
### Usage
The Usage page allows you to view your current usage of the Tilebox API.
# Collections
Learn about time series dataset collections
Collections group data points within a dataset. They help represent logical groupings of data points that are commonly queried together. For example, if your dataset includes data from a specific instrument on different satellites, you can group the data points from each satellite into a collection.
## Overview
This section provides a quick overview of the API for listing and accessing collections. Below are some usage examples for different scenarios.
| Method | API Reference | Description |
| --------------------- | ---------------------------------------------------------------------------- | --------------------------------------------- |
| `dataset.collections` | [Listing collections](/api-reference/tilebox.datasets/Dataset.collections) | List all available collections for a dataset. |
| `dataset.collection` | [Accessing a collection](/api-reference/tilebox.datasets/Dataset.collection) | Access an individual collection by its name. |
| `collection.info` | [Collection information](/api-reference/tilebox.datasets/Collection.info) | Request information about a collection. |
Refer to the examples below for common use cases when working with collections. These examples assume that you have already [created a client](/datasets/introduction#creating-a-datasets-client) and [listed the available datasets](/api-reference/tilebox.datasets/Client.datasets).
```python Python
from tilebox.datasets import Client
client = Client()
datasets = client.datasets()
```
## Listing collections
To list the collections for a dataset, use the `collections` method on the dataset object.
```python Python
dataset = datasets.open_data.copernicus.landsat8_oli_tirs
collections = dataset.collections()
print(collections)
```
```plaintext Output
{'L1GT': Collection L1GT: [2013-03-25T12:08:43.699 UTC, 2024-08-19T12:57:32.456 UTC] (154288 data points),
'L1T': Collection L1T: [2013-03-26T09:33:19.763 UTC, 2020-08-24T03:21:50.000 UTC] (87958 data points),
'L1TP': Collection L1TP: [2013-03-24T00:25:55.457 UTC, 2024-08-19T12:58:20.229 UTC] (322041 data points),
'L2SP': Collection L2SP: [2015-01-01T07:53:35.391 UTC, 2024-08-12T12:52:03.243 UTC] (191110 data points)}
```
[dataset.collections](/api-reference/tilebox.datasets/Dataset.collections) returns a dictionary mapping collection names to their corresponding collection objects. Each collection has a unique name within its dataset.
## Accessing individual collections
Once you have listed the collections for a dataset using [dataset.collections()](/api-reference/tilebox.datasets/Dataset.collections), you can access a specific collection by retrieving it from the resulting dictionary with its name. Use [collection.info()](/api-reference/tilebox.datasets/Collection.info) to get details (name, availability, and count) about it.
```python Python
collections = dataset.collections()
terrain_correction = collections["L1GT"]
collection_info = terrain_correction.info()
print(collection_info)
```
```plaintext Output
L1GT: [2013-03-25T12:08:43.699 UTC, 2024-08-19T12:57:32.456 UTC] (154288 data points)
```
You can also access a specific collection directly using the [dataset.collection](/api-reference/tilebox.datasets/Dataset.collection) method on the dataset object. This method allows you to get the collection without having to list all collections first.
```python Python
terrain_correction = dataset.collection("L1GT")
collection_info = terrain_correction.info()
print(collection_info)
```
```plaintext Output
L1GT: [2013-03-25T12:08:43.699 UTC, 2024-08-19T12:57:32.456 UTC] (154288 data points)
```
## Errors you may encounter
### NotFoundError
If you attempt to access a collection with a non-existent name, a `NotFoundError` is raised. For example:
```python Python
dataset.collection("Sat-X").info() # raises NotFoundError: 'No such collection Sat-X'
```
## Next steps
Learn how to load data points from a collection.
# Introduction
Learn about Tilebox Datasets
Time series datasets refer to datasets where each data point is linked to a timestamp. This format is common for data collected over time, such as satellite data.
This section covers:
Discover available time series datasets and learn how to list them.
Understand the common fields shared by all time series datasets.
Learn what collections are and how to access them.
Find out how to access data from a collection for specific time intervals.
For a quick reference to API methods or specific parameter meanings, [check out the complete time series API Reference](/api-reference/tilebox.datasets/Client).
## Terminology
Get familiar with some key terms when working with time series datasets.
Time series data points are individual entities that form a dataset. Each data point has a timestamp and consists of a set of fixed [metadata fields](/datasets/timeseries#common-fields) along with dataset-specific fields.
Time series datasets act as containers for data points. All data points in a dataset share the same type and fields.
Collections group data points within a dataset. They help represent logical groupings of data points that are often queried together.
## Creating a datasets client
Prerequisites
* You have [installed](/sdks/python/install) the `tilebox-datasets` package.
* You have [created](/authentication) a Tilebox API key.
After meeting these prerequisites, you can create a client instance to interact with Tilebox Datasets.
```python Python
from tilebox.datasets import Client
client = Client(token="YOUR_TILEBOX_API_KEY")
```
You can also set the `TILEBOX_API_KEY` environment variable to your API key. You can then instantiate the client without passing the `token` argument. Python will automatically use this environment variable for authentication.
```python Python
from tilebox.datasets import Client
# requires a TILEBOX_API_KEY environment variable
client = Client()
```
Tilebox datasets provide a standard synchronous API by default but also offers an [asynchronous client](/sdks/python/async) if needed.
### Exploring datasets
After creating a client instance, you can start exploring available datasets. A straightforward way to do this in an interactive environment is to [list all datasets](/api-reference/tilebox.datasets/Client.datasets) and use the autocomplete feature in your Jupyter notebook.
```python Python
datasets = client.datasets()
datasets. # trigger autocomplete here to view available datasets
```
The Console also provides an [overview](https://console.tilebox.com/datasets/explorer) of all available datasets.
### Errors you might encounter
#### AuthenticationError
`AuthenticationError` occurs when the client fails to authenticate with the Tilebox API. This may happen if the provided API key is invalid or expired. A client instantiated with an invalid API key won't raise an error immediately, but an error will occur when making a request to the API.
```python Python
client = Client(token="invalid-key") # runs without error
datasets = client.datasets() # raises AuthenticationError
```
## Next steps
# Loading Time Series Data
Learn how to load data from Time Series Dataset collections.
## Overview
This section provides an overview of the API for loading data from a collection. It includes usage examples for many common scenarios.
| Method | API Reference | Description |
| ----------------- | ----------------------------------------------------------------------- | ---------------------------------------------------- |
| `collection.load` | [Loading data](/api-reference/tilebox.datasets/Collection.load) | Load data points from a collection. |
| `collection.find` | [Loading a data point](/api-reference/tilebox.datasets/Collection.find) | Find a specific datapoint in a collection by its id. |
Check out the examples below for common scenarios when loading data from collections. The examples assume you have already [created a client](/datasets/introduction#creating-a-datasets-client) and [accessed a specific dataset collection](/datasets/collections).
```python Python
from tilebox.datasets import Client
client = Client()
datasets = client.datasets()
collections = datasets.open_data.copernicus.sentinel1_sar.collections()
collection = collections["S1A_IW_RAW__0S"]
```
## Loading data
To load data points from a dataset collection, use the [load](/api-reference/tilebox.datasets/Collection.load) method. It requires a `time_or_interval` parameter to specify the time or time interval for loading.
### TimeInterval
To load data for a specific time interval, use a `tuple` in the form `(start, end)` as the `time_or_interval` parameter. Both `start` and `end` must be [TimeScalars](#time-scalars), which can be `datetime` objects or strings in ISO 8601 format.
```python Python
interval = ("2017-01-01", "2023-01-01")
data = collection.load(interval, show_progress=True)
```
```plaintext Output
Size: 725MB
Dimensions: (time: 1109597, latlon: 2)
Coordinates:
ingestion_time (time) datetime64[ns] 9MB 2024-06-21T11:03:33.8524...
id (time)
The `show_progress` parameter is optional and can be used to display a [tqdm](https://tqdm.github.io/) progress bar while loading data.
A time interval specified as a tuple is interpreted as a half-closed interval. This means the start time is inclusive, and the end time is exclusive. For instance, using an end time of `2023-01-01` includes data points up to `2022-12-31 23:59:59.999`, but excludes those from `2023-01-01 00:00:00.000`. This behavior mimics the Python `range` function and is useful for chaining time intervals.
```python Python
import xarray as xr
data = []
for year in [2017, 2018, 2019, 2020, 2021, 2022]:
interval = (f"{year}-01-01", f"{year + 1}-01-01")
data.append(collection.load(interval, show_progress=True))
# Concatenate the data into a single dataset, which is equivalent
# to the result of the single request in the code example above.
data = xr.concat(data, dim="time")
```
Above example demonstrates how to split a large time interval into smaller chunks while loading data in separate requests. Typically, this is not necessary as the datasets client auto-paginates large intervals.
### TimeInterval objects
For greater control over inclusivity of start and end times, you can use the `TimeInterval` dataclass instead of a tuple with the `load` parameter. This class allows you to specify the `start` and `end` times, as well as their inclusivity. Here's an example of creating equivalent `TimeInterval` objects in two different ways.
```python Python
from datetime import datetime
from tilebox.datasets.data import TimeInterval
interval1 = TimeInterval(
datetime(2017, 1, 1), datetime(2023, 1, 1),
end_inclusive=False
)
interval2 = TimeInterval(
datetime(2017, 1, 1), datetime(2022, 12, 31, 23, 59, 59, 999999),
end_inclusive=True
)
print("Inclusivity is indicated by interval notation: ( and [")
print(interval1)
print(interval2)
print(f"They are equivalent: {interval1 == interval2}")
print(interval2.to_half_open())
# Same operation as above
data = collection.load(interval1, show_progress=True)
```
```plaintext Output
Inclusivity is indicated by interval notation: ( and [
[2017-01-01T00:00:00.000 UTC, 2023-01-01T00:00:00.000 UTC)
[2017-01-01T00:00:00.000 UTC, 2022-12-31T23:59:59.999 UTC]
They are equivalent: True
[2017-01-01T00:00:00.000 UTC, 2023-01-01T00:00:00.000 UTC)
```
### Time scalars
You can load all points for a specific time using a `TimeScalar` for the `time_or_interval` parameter to `load`. A `TimeScalar` can be a `datetime` object or a string in ISO 8601 format. When passed to the `load` method, it retrieves all data points matching the specified time. Note that the `time` field of data points in a collection may not be unique, so multiple data points could be returned. If you want to fetch only a single data point, use [find](#loading-a-data-point-by-id) instead.
Here's how to load a data point at a specific time from a [collection](/datasets/collections).
```python Python
data = collection.load("2024-08-01 00:00:01.362")
print(data)
```
```plaintext Output
Size: 721B
Dimensions: (time: 1, latlon: 2)
Coordinates:
ingestion_time (time) datetime64[ns] 8B 2024-08-01T08:53:08.450499
id (time)
Tilebox uses millisecond precision for timestamps. To load all data points for a specific second, it's a [time interval](/datasets/loading-data#time-intervals) request. Refer to the examples below for details.
The output of the `load` method is an `xarray.Dataset` object. To learn more about Xarray, visit the dedicated [Xarray page](/sdks/python/xarray).
### Time iterables
You can specify a time interval by using an iterable of `TimeScalar`s as the `time_or_interval` parameter. This is especially useful when you want to use the output of a previous `load` call as input for another load. Here's how that works.
```python Python
interval = ("2017-01-01", "2023-01-01")
meta_data = collection.load(interval, skip_data=True)
first_50_data_points = collection.load(meta_data.time[:50], skip_data=False)
print(first_50_data_points)
```
```plaintext Output
Size: 33kB
Dimensions: (time: 50, latlon: 2)
Coordinates:
ingestion_time (time) datetime64[ns] 400B 2024-06-21T11:03:33.852...
id (time)
```python Python
data = collection.load("2024-08-01 00:00:01.362", skip_data=True)
print(data)
```
```plaintext Output
Size: 160B
Dimensions: (time: 1)
Coordinates:
ingestion_time (time) datetime64[ns] 8B 2024-08-01T08:53:08.450499
id (time)
```python Python
time_with_no_data_points = "1997-02-06 10:21:00"
data = collection.load(time_with_no_data_points)
print(data)
```
```plaintext Output
Size: 0B
Dimensions: ()
Data variables:
*empty*
```
## Timezone handling
When a `TimeScalar` is specified as a string, the time is treated as UTC. If you want to load data for a specific time in another timezone, use a `datetime` object. In this case, the Tilebox API will convert the datetime to `UTC` before making the request. The output will always contain UTC timestamps, which will need to be converted again if a different timezone is required.
```python Python
from datetime import datetime
import pytz
# Tokyo has a UTC+9 hours offset, so this is the same as
# 2017-01-01 02:45:25.679 UTC
tokyo_time = pytz.timezone('Asia/Tokyo').localize(
datetime(2017, 1, 1, 11, 45, 25, 679000)
)
print(tokyo_time)
data = collection.load(tokyo_time)
print(data) # time is in UTC since API always returns UTC timestamps
```
```plaintext Output
2017-01-01 11:45:25.679000+09:00
Size: 725B
Dimensions: (time: 1, latlon: 2)
Coordinates:
ingestion_time (time) datetime64[ns] 8B 2024-06-21T11:03:33.852435
id (time)
```python Python
datapoint_id = "01916d89-ba23-64c9-e383-3152644bcbde"
datapoint = collection.find(datapoint_id)
print(datapoint)
```
```plaintext Output
Size: 725B
Dimensions: (latlon: 2)
Coordinates:
ingestion_time datetime64[ns] 8B 2024-08-20T05:53:08.600528
id
You can also set the `skip_data` parameter when calling `find` to load only the metadata of the data point, same as for `load`.
### Possible exceptions
* `NotFoundError`: raised if no data point with the given ID is found in the collection
* `ValueError`: raised if the specified `datapoint_id` is not a valid UUID
# Open Data
Learn about the Open data available in Tilebox.
Tilebox not only provides access to your own, private datasets but also to a growing number of public datasets. These datasets are available to all users of Tilebox and are a great way to get started and prototype your applications even before data from your own satellites is available.
If there is a public dataset you would like to see in Tilebox,
please get in touch.
## Accessing Open Data through Tilebox
Accessing open datasets in Tilebox is straightforward and as easy as accessing your own datasets. Tilebox has already ingested the required metadata for each available dataset, so you can query, preview, and download the data seamlessly. This setup enables you to leverage performance optimizations and simplifies your workflows.
By using the [datasets API](/datasets), you can start prototyping your applications and workflows easily.
## Available datasets
The Tilebox Console contains in-depth descriptions of each dataset, including many code-snippets to help you get started. Check out the [Sentinel 5P Tropomi](https://console.tilebox.com/datasets/descriptions/feb2bcc9-8fdf-4714-8a63-395ee9d3f323) documentation as an example.
### Copernicus Data Space
The [Copernicus Data Space](https://dataspace.copernicus.eu/) is an open ecosystem that provides free instant access to data and services from the Copernicus Sentinel missions.
Tilebox currently supports the following datasets from the Copernicus Data Space:
The Sentinel-1 mission is the European Radar Observatory for the Copernicus
joint initiative of the European Commission (EC) and the European Space
Agency (ESA). The Sentinel-1 mission includes C-band imaging operating in
four exclusive imaging modes with different resolution (down to 5 m) and
coverage (up to 400 km). It provides dual polarization capability, short
revisit times and rapid product delivery.
Sentinel-2 is equipped with an optical instrument payload that samples 13
spectral bands: four bands at 10 m, six bands at 20 m and three bands at 60
m spatial resolution.
Sentinel-3 is equipped with multiple instruments whose data is available in Tilebox.
`OLCI` (Ocean and Land Color Instrument) is an optical instrument used to provide
data continuity for ENVISAT MERIS.
`SLSTR` (Sea and Land Surface Temperature Radiometer) is a dual-view scanning
temperature radiometer, which flies in low Earth orbit (800 - 830 km
altitude).
The `SRAL` (SAR Radar Altimeter) instrument comprises one nadir-looking antenna,
and a central electronic chain composed of a Digital Processing Unit (DPU)
and a Radio Frequency Unit (RFU).
OLCI, in conjunction with the SLSTR instrument, provides the `SYN` products,
providing continuity with SPOT VEGETATION.
The primary goal of `TROPOMI` is to provide daily global observations of key
atmospheric constituents related to monitoring and forecasting air quality,
the ozone layer, and climate change.
Landsat-8 is part of the long-running Landsat programme led by USGS and NASA and
carries the Operational Land Imager (OLI) and the Thermal Infrared Sensor
(TIRS). The Operational Land Imager (OLI), on board Landsat-8 measures in
the VIS, NIR and SWIR portions of the spectrum. Its images have 15 m
panchromatic and 30 m multi-spectral spatial resolutions along a 185 km wide
swath, covering wide areas of the Earth's landscape while providing
high enough resolution to distinguish features like urban centres, farms,
forests and other land uses. The entire Earth falls within view once every
16 days due to Landsat-8's near-polar orbit. The Thermal Infra-Red Sensor
instrument, on board Landsat-8, is a thermal imager operating in pushbroom
mode with two Infra-Red channels: 10.8 µm and 12 µm with 100 m spatial
resolution.
### Alaska Satellite Facility (ASF)
The [Alaska Satellite Facility (ASF)](https://asf.alaska.edu/) is a NASA-funded research center at the University of Alaska Fairbanks. ASF supports a wide variety of research and applications using synthetic aperture radar (SAR) and related remote sensing technologies.
Tilebox currently supports the following datasets from the Alaska Satellite Facility:
European Remote Sensing Satellite (ERS) Synthetic Aperture Radar (SAR) Granules
### Umbra Space
[Umbra](https://umbra.space/) satellites provide up to 16 cm resolution Synthetic Aperture Radar (SAR) imagery from space. The Umbra Open Data Program (ODP) features over twenty diverse time-series locations that are frequently updated, allowing users to explore SAR's capabilities.
Tilebox currently supports the following datasets from Umbra Space:
Time-series SAR data provided as Opendata by Umbra Space.
# Storage Clients
Learn about the different storage clients available in Tilebox to access open data.
Tilebox does not host the actual open data satellite products but instead relies on publicly accessible storage providers for data access. Instead Tilebox ingests available metadata as [datasets](/datasets/timeseries) to enable high performance querying and structured access of the data as [xarray.Dataset](/sdks/python/xarray).
Below is a list of the storage providers currently supported by Tilebox.
## Copernicus Data Space
The [Copernicus Data Space](https://dataspace.copernicus.eu/) is an open ecosystem that provides free instant access to data and services from the Copernicus Sentinel missions. Check out the [ASF Open Data datasets](/datasets/open-data#copernicus-data-space) that are available in Tilebox.
### Access Copernicus data
To download data products from the Copernicus Data Space after querying them via the Tilebox API, you need to [create an account](https://identity.dataspace.copernicus.eu/auth/realms/CDSE/protocol/openid-connect/auth?client_id=cdse-public\&response_type=code\&scope=openid\&redirect_uri=https%3A//dataspace.copernicus.eu/account/confirmed/1) and then generate [S3 credentials here](https://eodata-s3keysmanager.dataspace.copernicus.eu/panel/s3-credentials).
The following code snippet demonstrates how to query and download Copernicus data using the Tilebox Python SDK.
```python Python {4,9-13,27}
from pathlib import Path
from tilebox.datasets import Client
from tilebox.storage import CopernicusStorageClient
# Creating clients
client = Client(token="YOUR_TILEBOX_API_KEY")
datasets = client.datasets()
storage_client = CopernicusStorageClient(
access_key="YOUR_ACCESS_KEY",
secret_access_key="YOUR_SECRET_ACCESS_KEY",
cache_directory=Path("./data")
)
# Choosing the dataset and collection
s2_dataset = datasets.open_data.copernicus.sentinel2_msi
collections = s2_dataset.collections()
collection = collections["S2A_S2MSI2A"]
# Loading metadata
s2_data = collection.load(("2024-08-01", "2024-08-02"), show_progress=True)
# Selecting a data point to download
selected = s2_data.isel(time=0) # index 0 selected
# Downloading the data
downloaded_data = storage_client.download(selected)
print(f"Downloaded granule: {downloaded_data.name} to {downloaded_data}")
print("Contents: ")
for content in downloaded_data.iterdir():
print(f" - {content.relative_to(downloaded_data)}")
```
```plaintext Output
Downloaded granule: S2A_MSIL2A_20240801T002611_N0511_R102_T58WET_20240819T170544.SAFE to data/Sentinel-2/MSI/L2A/2024/08/01/S2A_MSIL2A_20240801T002611_N0511_R102_T58WET_20240819T170544.SAFE
Contents:
- manifest.safe
- GRANULE
- INSPIRE.xml
- MTD_MSIL2A.xml
- DATASTRIP
- HTML
- rep_info
- S2A_MSIL2A_20240801T002611_N0511_R102_T58WET_20240819T170544-ql.jpg
```
## Alaska Satellite Facility (ASF)
The [Alaska Satellite Facility (ASF)](https://asf.alaska.edu/) is a NASA-funded research center at the University of Alaska Fairbanks. Check out the [ASF Open Data datasets](/datasets/open-data#alaska-satellite-facility-asf) that are available in Tilebox.
### Accessing ASF Data
You can query ASF metadata without needing an account, as Tilebox has indexed and ingested the relevant metadata. To access and download the actual satellite products, you will need an ASF account.
You can create an ASF account in the [ASF Vertex Search Tool](https://search.asf.alaska.edu/).
The following code snippet demonstrates how to query and download ASF data using the Tilebox Python SDK.
```python Python {4,9-13,27}
from pathlib import Path
from tilebox.datasets import Client
from tilebox.storage import ASFStorageClient
# Creating clients
client = Client(token="YOUR_TILEBOX_API_KEY")
datasets = client.datasets()
storage_client = ASFStorageClient(
user="YOUR_ASF_USER",
password="YOUR_ASF_PASSWORD",
cache_directory=Path("./data")
)
# Choosing the dataset and collection
ers_dataset = datasets.open_data.asf.ers_sar
collections = ers_dataset.collections()
collection = collections["ERS-2"]
# Loading metadata
ers_data = collection.load(("2009-01-01", "2009-01-02"), show_progress=True)
# Selecting a data point to download
selected = ers_data.isel(time=0) # index 0 selected
# Downloading the data
downloaded_data = storage_client.download(selected, extract=True)
print(f"Downloaded granule: {downloaded_data.name} to {downloaded_data}")
print("Contents: ")
for content in downloaded_data.iterdir():
print(f" - {content.relative_to(downloaded_data)}")
```
```plaintext Output
Downloaded granule: E2_71629_STD_L0_F183 to data/ASF/E2_71629_STD_F183/E2_71629_STD_L0_F183
Contents:
- E2_71629_STD_L0_F183.000.vol
- E2_71629_STD_L0_F183.000.meta
- E2_71629_STD_L0_F183.000.raw
- E2_71629_STD_L0_F183.000.pi
- E2_71629_STD_L0_F183.000.nul
- E2_71629_STD_L0_F183.000.ldr
```
### Further Reading
## Umbra Space
[Umbra](https://umbra.space/) satellites provide high resolution Synthetic Aperture Radar (SAR) imagery from space. Check out the [Umbra datasets](/datasets/open-data#umbra-space) that are available in Tilebox.
### Accessing Umbra data
You don't need an account to access Umbra data. All data is provided under a Creative Commons License (CC BY 4.0), allowing you to freely use it.
The following code snippet demonstrates how to query and download Copernicus data using the Tilebox Python SDK.
```python Python {4,9,23}
from pathlib import Path
from tilebox.datasets import Client
from tilebox.storage import UmbraStorageClient
# Creating clients
client = Client(token="YOUR_TILEBOX_API_KEY")
datasets = client.datasets()
storage_client = UmbraStorageClient(cache_directory=Path("./data"))
# Choosing the dataset and collection
umbra_dataset = datasets.open_data.umbra.sar
collections = umbra_dataset.collections()
collection = collections["SAR"]
# Loading metadata
umbra_data = collection.load(("2024-01-05", "2024-01-06"), show_progress=True)
# Selecting a data point to download
selected = umbra_data.isel(time=0) # index 0 selected
# Downloading the data
downloaded_data = storage_client.download(selected)
print(f"Downloaded granule: {downloaded_data.name} to {downloaded_data}")
print("Contents: ")
for content in downloaded_data.iterdir():
print(f" - {content.relative_to(downloaded_data)}")
```
```plaintext Output
Downloaded granule: 2024-01-05-01-53-37_UMBRA-07 to data/Umbra/ad hoc/Yi_Sun_sin_Bridge_SK/6cf02931-ca2e-4744-b389-4844ddc701cd/2024-01-05-01-53-37_UMBRA-07
Contents:
- 2024-01-05-01-53-37_UMBRA-07_SIDD.nitf
- 2024-01-05-01-53-37_UMBRA-07_SICD.nitf
- 2024-01-05-01-53-37_UMBRA-07_CSI-SIDD.nitf
- 2024-01-05-01-53-37_UMBRA-07_METADATA.json
- 2024-01-05-01-53-37_UMBRA-07_GEC.tif
- 2024-01-05-01-53-37_UMBRA-07_CSI.tif
```
# Time series data
Learn about time series datasets
Time series datasets act as containers for data points. All data points in a dataset share the same type and fields.
Additionally, all time series datasets include a few [common fields](#common-fields). One of these fields, the `time` field, allows you to perform time-based data queries on a dataset.
## Listing datasets
You can use [your client instance](/datasets/introduction#creating-a-datasets-client) to access the datasets available to you. For example, to access the `sentinel1_sar` dataset in the `open_data.copernicus` dataset group, use the following code.
```python Python
from tilebox.datasets import Client
client = Client()
datasets = client.datasets()
dataset = datasets.open_data.copernicus.sentinel1_sar
```
Once you have your dataset object, you can use it to [list the available collections](/datasets/collections) for the dataset.
If you're using an IDE or an interactive environment with auto-complete, you can use it on your client instance to discover the datasets available to you. Type `client.` and trigger auto-complete after the dot to do so.
## Common fields
While the specific data fields between different time series datasets can vary, there are common fields that all time series datasets share.
The timestamp associated with each data point. Tilebox uses millisecond precision for storing and indexing data points. Timestamps are always in UTC.
A [universally unique identifier (UUID)](https://en.wikipedia.org/wiki/Universally_unique_identifier) that uniquely identifies each data point. IDs are generated so that sorting them lexicographically also sorts them by their time field.
The time the data point was ingested into the Tilebox API. Timestamps are always in UTC.
These fields are present in all time series datasets. Together, they make up the metadata of a data point. Each dataset also has its own set of fields that are specific to that dataset.
Tilebox uses millisecond precision timestamps for storing and indexing data points. If multiple data points share the same timestamp within one millisecond, they will all display the same timestamp. Each data point can have any number of timestamp fields with a higher precision. For example, telemetry data commonly includes timestamp fields with nanosecond precision.
## Example data point
Below is an example data point from a time series dataset represented as an [`xarray.Dataset`](/sdks/python/xarray). It contains the common fields. When using the Tilebox Python client library, you receive the data in this format.
```plaintext Example timeseries datapoint
Dimensions: ()
Coordinates:
time datetime64[ns] 2023-03-12 16:45:23.532
id
The datatype `
# Tilebox
The Solar System's #1 developer tool for space data management
export const HeroCard = ({children, title, description, href}) => {
return
{children}
;
};
Tilebox is a space data native distributed computing tool. It provides a framework that simplifies access, processing, and distribution of space data across different environments enabling efficient multi-language, multi-cluster workflows. Tilebox integrates seamlessly with your existing infrastructure, ensuring that you maintain complete control over your data and algorithms.
## Modules
Tilebox consists of two primary modules:
## Getting Started
To get started, check out some of the following resources:
## Guides
You can also start by looking through these guides:
Learn about time-series datasets and their structure.
Discover how to query and load data from a dataset.
Gain a deeper understanding of how to create tasks using the Tilebox Workflow Orchestrator.
Find out how to deploy Task Runners to run workflows in a parallel, distributed manner.
# Quickstart
This guide helps you set up and get started using Tilebox. It covers how to install a Tilebox client for your preferred language and how to use it to query data from a dataset and run a workflow.
Select your preferred language and follow the steps below to get started.
## Start in a Notebook
Explore the provided [Sample Notebooks](/sdks/python/sample-notebooks) to begin your journey with Tilebox. These notebooks offer a step-by-step guide to using the API and showcase many features supported by Tilebox Python clients. You can also use these notebooks as a foundation for your own projects.
## Start on Your Device
If you prefer to work locally, follow these steps to get started.
Install the Tilebox Python packages. The easiest way to do this is using `pip`:
```
pip install tilebox-datasets tilebox-workflows tilebox-storage
```
Create an API key by logging into the [Tilebox Console](https://console.tilebox.com), navigating to [Account -> API Keys](https://console.tilebox.com/account/api-keys), and clicking the "Create API Key" button.
Copy the API key and keep it somewhere safe. You will need it to authenticate your requests.
Create a cluster by logging into the [Tilebox Console](https://console.tilebox.com), navigating to [Workflows -> Clusters](https://console.tilebox.com/workflows/clusters), and clicking the "Create cluster" button.
Copy the cluster slug, you will need it to run your workflows.
Use the datasets client to query data from a dataset.
```python Python
from tilebox.datasets import Client
client = Client(token="YOUR_TILEBOX_API_KEY")
# select a dataset
datasets = client.datasets()
dataset = datasets.open_data.copernicus.sentinel2_msi
# and load data from a collection in a given time range
collection = dataset.collection("S2A_S2MSI1C")
data_january_2022 = collection.load(("2022-01-01", "2022-02-01"))
```
Use the workflows client to create a task and submit it as a job.
```python Python
from tilebox.workflows import Client, Task
# Replace with your actual cluster and token
cluster = "YOUR_COMPUTE_CLUSTER"
client = Client(token="YOUR_TILEBOX_API_KEY")
class HelloWorldTask(Task):
greeting: str = "Hello"
name: str = "World"
def execute(self, context):
print(f"{self.greeting} {self.name}, from the main task!")
context.submit_subtask(HelloSubtask(name=self.name))
class HelloSubtask(Task):
name: str
def execute(self, context):
print(f"Hello from the subtask, {self.name}!")
# Initiate the job
jobs = client.jobs()
jobs.submit("parameterized-hello-world", HelloWorldTask(greeting="Greetings", name="Universe"), cluster)
# Run the tasks
runner = client.runner(cluster, tasks=[HelloWorldTask, HelloSubtask])
runner.run_all()
```
Review the following guides to learn more about the modules that make up Tilebox:
# Introduction
Learn about the Tilebox GO SDK
Hang tight - Go support for Tilebox is coming soon.
# Async support
Tilebox offers a standard synchronous API by default but also provides an async client option when needed.
## Why use async?
When working with external datasets, such as [Tilebox datasets](/datasets/timeseries), loading data may take some time. To speed up this process, you can run requests in parallel. While you can use multi-threading or multi-processing, which can be complex, often times a simpler option is to perform data loading tasks asynchronously using coroutines and `asyncio`.
## Switching to an async datasets client
To switch to the async client, change the import statement for the `Client`. The example below illustrates this change.
```python Python (Sync)
from tilebox.datasets import Client
# This client is synchronous
client = Client()
```
```python Python (Async)
from tilebox.datasets.aio import Client
# This client is asynchronous
client = Client()
```
After switching to the async client, use `await` for operations that interact with the Tilebox API.
```python Python (Sync)
# Listing datasets
datasets = client.datasets()
# Listing collections
dataset = datasets.open_data.copernicus.sentinel1_sar
collections = dataset.collections()
# Collection information
collection = collections["S1A_IW_RAW__0S"]
info = collection.info()
print(f"Data for My-collection is available for {info.availability}")
# Loading data
data = collection.load(("2022-05-01", "2022-06-01"), show_progress=True)
# Finding a specific datapoint
datapoint_uuid = "01910b3c-8552-7671-3345-b902cc0813f3"
datapoint = collection.find(datapoint_uuid)
```
```python Python (Async)
# Listing datasets
datasets = await client.datasets()
# Listing collections
dataset = datasets.open_data.copernicus.sentinel1_sar
collections = await dataset.collections()
# Collection information
collection = collections["S1A_IW_RAW__0S"]
info = await collection.info()
print(f"Data for My-collection is available for {info.availability}")
# Loading data
data = await collection.load(("2022-05-01", "2022-06-01"), show_progress=True)
# Finding a specific datapoint
datapoint_uuid = "01910b3c-8552-7671-3345-b902cc0813f3"
datapoint = await collection.find(datapoint_uuid)
```
Jupyter notebooks and similar interactive environments support asynchronous code execution. You can use
`await some_async_call()` as the output of a code cell.
## Fetching data concurrently
The primary benefit of the async client is that it allows concurrent requests, enhancing performance.
In below example, data is fetched from multiple collections. The synchronous approach retrieves data sequentially, while the async approach does so concurrently, resulting in faster execution.
```python Python (Sync)
# Example: fetching data sequentially
# switch to the async example to compare the differences
import time
from tilebox.datasets import Client
from tilebox.datasets.sync.timeseries import TimeseriesCollection
client = Client()
datasets = client.datasets()
collections = datasets.open_data.copernicus.landsat8_oli_tirs.collections()
def stats_for_2020(collection: TimeseriesCollection) -> None:
"""Fetch data for 2020 and print the number of data points that were loaded."""
data = collection.load(("2020-01-01", "2021-01-01"), show_progress=True)
n = data.sizes['time'] if 'time' in data else 0
return (collection.name, n)
start = time.monotonic()
results = [stats_for_2020(collections[name]) for name in collections]
duration = time.monotonic() - start
for collection_name, n in results:
print(f"There are {n} datapoints in {collection_name} for 2020.")
print(f"Fetching data took {duration:.2f} seconds")
```
```python Python (Async)
# Example: fetching data concurrently
import asyncio
import time
from tilebox.datasets.aio import Client
from tilebox.datasets.aio.timeseries import TimeseriesCollection
client = Client()
datasets = await client.datasets()
collections = await datasets.open_data.copernicus.landsat8_oli_tirs.collections()
async def stats_for_2020(collection: TimeseriesCollection) -> None:
"""Fetch data for 2020 and print the number of data points that were loaded."""
data = await collection.load(("2020-01-01", "2021-01-01"), show_progress=True)
n = data.sizes['time'] if 'time' in data else 0
return (collection.name, n)
start = time.monotonic()
# Initiate all requests concurrently
requests = [stats_for_2020(collections[name]) for name in collections]
# Wait for all requests to finish in parallel
results = await asyncio.gather(*requests)
duration = time.monotonic() - start
for collection_name, n in results:
print(f"There are {n} datapoints in {collection_name} for 2020.")
print(f"Fetching data took {duration:.2f} seconds")
```
The output demonstrates that the async approach runs approximately 30% faster for this example. With `show_progress` enabled, the progress bars update concurrently.
```plaintext Python (Sync)
There are 19624 datapoints in L1GT for 2020.
There are 1281 datapoints in L1T for 2020.
There are 65313 datapoints in L1TP for 2020.
There are 25375 datapoints in L2SP for 2020.
Fetching data took 10.92 seconds
```
```plaintext Python (Async)
There are 19624 datapoints in L1GT for 2020.
There are 1281 datapoints in L1T for 2020.
There are 65313 datapoints in L1TP for 2020.
There are 25375 datapoints in L2SP for 2020.
Fetching data took 7.45 seconds
```
## Async workflows
The Tilebox workflows Python client does not have an async client. This is because workflows are designed for distributed and concurrent execution outside a single async event loop. But within a single task, you may use still use`async` code to take advantage of asynchronous execution, such as parallel data loading. You can achieve this by wrapping your async code in `asyncio.run`.
Below is an example of using async code within a workflow task.
```python Python (Async)
import asyncio
import xarray as xr
from tilebox.datasets.aio import Client as DatasetsClient
from tilebox.datasets.data import TimeIntervalLike
from tilebox.workflows import Task, ExecutionContext
class FetchData(Task):
def execute(self, context: ExecutionContext) -> None:
# The task execution itself is synchronous
# But we can leverage async code within the task using asyncio.run
# This will fetch three months of data in parallel
data_jan, data_feb, data_mar = asyncio.run(load_first_three_months())
async def load_data(interval: TimeIntervalLike):
datasets = await DatasetsClient().datasets()
collections = await datasets.open_data.copernicus.landsat8_oli_tirs.collections()
return await collections["L1T"].load(interval)
async def load_first_three_months() -> tuple[xr.Dataset, xr.Dataset, xr.Dataset]:
jan = load_data(("2020-01-01", "2020-02-01"))
feb = load_data(("2020-02-01", "2020-03-01"))
mar = load_data(("2020-03-01", "2020-04-01"))
# load the three months in parallel
jan, feb, mar = await asyncio.gather(jan, feb, mar)
return jan, feb, mar
```
If you encounter an error like `RuntimeError: asyncio.run() cannot be called from a running event loop`, it means you're trying to start another asyncio event loop (with `asyncio.run`) from within an existing one. This often happens in Jupyter notebooks since they automatically start an event loop. A way to resolve this is by using [nest-asyncio](https://pypi.org/project/nest-asyncio/).
# Geometries
How geometries are handled in the Tilebox Python client.
Many datasets consist of granules that represent specific geographical areas on the Earth's surface. Often, a polygon defining the outline of these areas—a footprint—accompanies other granule metadata in time series datasets. Tilebox provides built-in support for working with geometries.
Here's an example that loads some granules from the `ERS SAR` Opendata dataset, which contains geometries.
```python Loading ERS data
from tilebox.datasets import Client
client = Client()
datasets = client.datasets()
ers_collection = datasets.open_data.asf.ers_sar.collection("ERS-2")
ers_data = ers_collection.load(("2008-02-10T21:00", "2008-02-10T22:00"))
```
## Shapely
In the `ers_data` dataset, each granule includes a `geometry` field that represents the footprint of each granule as a polygon. Tilebox automatically converts geometry fields to `Polygon` or `MultiPolygon` objects from the [Shapely](https://shapely.readthedocs.io/en/stable/manual.html) library. By integrating with Shapely, you can use the rich set of libraries and tools it provides. That includes support for computing polygon characteristics such as total area, intersection checks, and conversion to other formats.
```python Printing geometries
geometries = ers_data.geometry.values
print(geometries)
```
Each geometry is a [shapely.Geometry](https://shapely.readthedocs.io/en/stable/geometry.html#geometry).
```plaintext Output
[]
```
Geometries are not always [Polygon](https://shapely.readthedocs.io/en/stable/reference/shapely.Polygon.html#shapely.Polygon) objects. More complex footprint geometries are represented as [MultiPolygon](https://shapely.readthedocs.io/en/stable/reference/shapely.MultiPolygon.html#shapely.MultiPolygon) objects.
### Accessing Coordinates
You can select a polygon from the geometries and access the underlying coordinates and an automatically computed centroid point.
```python Accessing coordinates and computing a centroid point
polygon = geometries[0]
lon, lat = polygon.exterior.coords.xy
center, = list(polygon.centroid.coords)
print(lon)
print(lat)
print(center)
```
```plaintext Output
array('d', [-150.753244, -152.031574, -149.183655, -147.769339, -150.753244])
array('d', [74.250081, 73.336051, 73.001748, 73.899483, 74.250081])
(-149.92927907414239, 73.62538063474753)
```
Interactive environments such as [Jupyter Notebooks](/sdks/python/sample-notebooks) can visualize Polygon shapes graphically. Just type `polygon` in an empty cell and execute it for a visual representation of the polygon shape.
### Visualization on a Map
To visualize polygons on a map, you can use [Folium](https://pypi.org/project/folium/). Below is a helper function that produces an OpenStreetMap with the Polygon overlaid.
```python visualize helper function
# pip install folium
from folium import Figure, Map, Polygon as FoliumPolygon, GeoJson, TileLayer
from folium.plugins import MiniMap
from shapely import Polygon, to_geojson
from collections.abc import Iterable
def visualize(poly: Polygon | Iterable[Polygon], zoom=4):
"""Visualize a polygon or a list of polygons on a map"""
if not isinstance(poly, Iterable):
poly = [poly]
fig = Figure(width=600, height=600)
map = Map(location=geometries[len(geometries)//2].centroid.coords[0][::-1], zoom_start=zoom, control_scale=True)
map.add_child(MiniMap())
fig.add_child(map)
for p in poly:
map.add_child(GeoJson(to_geojson(p)))
return fig
```
Here's how to use it.
```python Visualizing a polygon
visualize(polygon)
```
The `visualize` helper function supports a list of polygons, which can display the data layout of the ERS granules.
```python Visualizing multiple polygons
visualize(geometries)
```
## Format conversion
Shapely supports converting Polygons to some common formats, such as [GeoJSON](https://geojson.org/) or [Well-Known Text (WKT)](https://docs.ogc.org/is/18-010r7/18-010r7.html).
```python Converting to GeoJSON
from shapely import to_geojson
print(to_geojson(polygon))
```
```plaintext Output
{"type":"Polygon","coordinates":[[[-150.753244,74.250081],[-152.031574,73.336051],[-149.183655,73.001748],[-147.769339,73.899483],[-150.753244,74.250081]]]}
```
```python Converting to WKT
from shapely import to_wkt
print(to_wkt(polygon))
```
```plaintext Output
POLYGON ((-150.753244 74.250081, -152.031574 73.336051, -149.183655 73.001748, -147.769339 73.899483, -150.753244 74.250081))
```
## Checking intersections
One common task when working with geometries is checking if a given geometry falls into a specific area of interest. Shapely provides an `intersects` method for this purpose.
```python Checking intersections
from shapely import box
# Box representing the rectangular area lon=(-160, -150) and lat=(69, 70)
area_of_interest = box(-160, 69, -150, 70)
for i, polygon in enumerate(geometries):
if area_of_interest.intersects(polygon):
print(f"{ers_data.granule_name[i].item()} intersects the area of interest!")
else:
print(f"{ers_data.granule_name[i].item()} doesn't intersect the area of interest!")
```
```plaintext Output
E2_66974_STD_F264 doesn't intersect the area of interest!
E2_66974_STD_F265 doesn't intersect the area of interest!
E2_66974_STD_F267 doesn't intersect the area of interest!
E2_66974_STD_F269 doesn't intersect the area of interest!
E2_66974_STD_F271 doesn't intersect the area of interest!
E2_66974_STD_F273 intersects the area of interest!
E2_66974_STD_F275 intersects the area of interest!
E2_66974_STD_F277 intersects the area of interest!
E2_66974_STD_F279 doesn't intersect the area of interest!
E2_66974_STD_F281 doesn't intersect the area of interest!
E2_66974_STD_F283 doesn't intersect the area of interest!
E2_66974_STD_F285 doesn't intersect the area of interest!
E2_66974_STD_F289 doesn't intersect the area of interest!
```
## Combining polygons
As shown in the visualization of the granule footprints, the granules collectively form an orbit from pole to pole. Measurements are often combined during processing. You can do the same with geometries by combining them into a single polygon, which represents the hull around all individual footprints using [shapely.unary\_union](https://shapely.readthedocs.io/en/stable/reference/shapely.unary_union.html).
```python Combining multiple polygons
from shapely.ops import unary_union
hull = unary_union(geometries)
visualize(hull)
```
The computed hull consists of two polygons due to a gap (probably a missing granule) in the geometries. Such geometries are represented as [Multi Polygons](#multi-polygons).
## Multi Polygons
A collection of one or more non-overlapping polygons combined into a single geometry is called a [MultiPolygon](https://shapely.readthedocs.io/en/latest/reference/shapely.MultiPolygon.html). Footprint geometries can be of type `MultiPolygon` due to gaps or pole discontinuities. The computed hull in the previous example is a `MultiPolygon`.
```python Accessing individual polygons of a MultiPolygon
print(f"The computed hull of type {type(hull).__name__} consists of {len(hull.geoms)} sub polygons")
for i, poly in enumerate(hull.geoms):
print(f"Sub polygon {i} has an area of {poly.area}")
```
```plaintext Output
The computed hull of type MultiPolygon consists of 2 sub polygons
Sub polygon 0 has an area of 2.025230449898011
Sub polygon 1 has an area of 24.389998081651527
```
## Antimeridian Crossings
A common issue with `longitude / latitude` geometries is crossings of the 180-degree meridian, or the antimeridian. For example, the coordinates of a `LineString` from Japan to the United States might look like this:
`140, 141, 142, ..., 179, 180, -179, -178, ..., -125, -124`
Libraries like Shapely are not designed to handle spherical coordinate systems, so caution is necessary with such geometries.
Here's an `ERS` granule demonstrating this issue.
```python Antimeridian Crossing
# A granule that crosses the antimeridian
granule = ers_collection.find("0119bb86-0260-5819-6aab-f99796417155")
polygon = granule.geometry.item()
print(polygon.exterior.coords.xy)
visualize(polygon)
```
```plaintext Output
array('d', [177.993407, 176.605009, 179.563047, -178.904076, 177.993407])
array('d', [74.983185, 74.074615, 73.727752, 74.61847, 74.983185])
```
This 2D visualization appears incorrect. Both the visualization and any calculations performed may yield inaccurate results. For instance, testing whether the granule intersects the 0-degree meridian provides a false positive.
```python Problems with calculating intersections
from shapely import LineString
null_meridian = LineString([(0, -90), (0, 90)])
print(polygon.intersects(null_meridian)) # True - but this is incorrect!
```
The GeoJSON specification offers a solution for this problem. In the section [Antimeridian Cutting](https://datatracker.ietf.org/doc/html/rfc7946#section-3.1.9), it suggests always cutting lines and polygons into two parts—one for the eastern hemisphere and one for the western hemisphere.
In python, this can be achieved using the [antimeridian](https://pypi.org/project/antimeridian/) package.
```python Cutting the polygon along the antimeridian
# pip install antimeridian
import antimeridian
fixed_polygon = antimeridian.fix_polygon(polygon)
visualize(fixed_polygon)
print(fixed_polygon.intersects(null_meridian)) # False - this is correct now
```
Since Shapely is unaware of the spherical nature of this data, the **centroid** of the fixed polygon **is still incorrect**. The antimeridian package also includes a function to correct this.
```python Calculating the centroid of a cut polygon crossing the antimeridian
print("Wrongly computed centroid coordinates (Shapely)")
print(list(fixed_polygon.centroid.coords))
print("Correct centroid coordinates (Antimeridian taken into account)")
print(list(antimeridian.centroid(fixed_polygon).coords))
```
```plaintext Output
Wrongly computed centroid coordinates (shapely)
[(139.8766350146937, 74.3747116658462)]
Correct centroid coordinates (antimeridian taken into account)
[(178.7782777050171, 74.3747116658462)]
```
## Spherical Geometry
Another approach to handle the antimeridian issue is performing all coordinate-related calculations, such as polygon intersections, in a [spherical coordinate system](https://en.wikipedia.org/wiki/Spherical_coordinate_system).
One useful library for this is [spherical\_geometry](https://spherical-geometry.readthedocs.io/en/latest/). Here's an example.
```python Spherical Geometry
# pip install spherical-geometry
from spherical_geometry.polygon import SphericalPolygon
from spherical_geometry.vector import lonlat_to_vector
lon, lat = polygon.exterior.coords.xy
spherical_poly = SphericalPolygon.from_lonlat(lon, lat)
# Let's check the x, y, z coordinates of the spherical polygon:
print(list(spherical_poly.points))
```
```plaintext Output
[array([[-0.25894363, 0.00907234, 0.96584983],
[-0.2651968 , -0.00507317, 0.96418096],
[-0.28019363, 0.00213687, 0.95994112],
[-0.27390375, 0.01624885, 0.96161984],
[-0.25894363, 0.00907234, 0.96584983]])]
```
Now, you can compute intersections or check if a particular point is within the polygon. You can compare the incorrect calculation using `shapely` with the correct version when using `spherical_geometry`.
```python Correct calculations using spherical geometry
# A point on the null-meridian, way off from our polygon
null_meridian_point = 0, 74.4
# A point actually inside our polygon
point_inside = 178.8, 74.4
print("Shapely results:")
print("- Null meridian point inside:", polygon.contains(Point(*null_meridian_point)))
print("- Actual inside point inside:", polygon.contains(Point(*point_inside)))
print("Spherical geometry results:")
print("- Null meridian point inside:", spherical_poly.contains_lonlat(*null_meridian_point))
print("- Actual inside point inside:", spherical_poly.contains_lonlat(*point_inside))
```
```plaintext Output
Shapely results:
- Null meridian point inside: True
- Actual inside point inside: False
Spherical geometry results:
- Null meridian point inside: False
- Actual inside point inside: True
```
# Installation
Install the Tilebox Python Packages
## Package Overview
Tilebox offers a Python SDK for accessing Tilebox services. The SDK includes separate packages that can be installed individually based on the services you wish to use, or all together for a comprehensive experience.
Access Tilebox datasets from Python
Workflow client and task runner for Tilebox
## Installation
Install the Tilebox python packages using your preferred package manager:
```bash pip
pip install tilebox-datasets tilebox-workflows tilebox-storage
```
```bash uv
uv add tilebox-datasets tilebox-workflows tilebox-storage
```
```bash poetry
poetry add tilebox-datasets="*" tilebox-workflows="*" tilebox-storage="*"
```
```bash pipenv
pipenv install tilebox-datasets tilebox-workflows tilebox-storage
```
## Setting up a local JupyterLab environment
To get started quickly, you can also use an existing Jupyter-compatible cloud environment such as [Google Colab](https://colab.research.google.com/).
If you want to set up a local Jupyter environment to explore the SDK or to run the [Sample notebooks](/sdks/python/sample-notebooks) locally, install [JupyterLab](https://github.com/jupyterlab/jupyterlab) for a browser-based development environment. It's advised to install the Tilebox packages along with JupyterLab, [ipywidgets](https://ipywidgets.readthedocs.io/en/latest/user_install.html), and [tqdm](https://tqdm.github.io/) for an enhanced experience.
```uv uv
mkdir tilebox-exploration
cd tilebox-exploration
uv init --no-package
uv add tilebox-datasets tilebox-workflows tilebox-storage
uv add jupyterlab ipywidgets tqdm
uv run jupyter lab
```
### Trying it out
After installation, create a new notebook and paste the following code snippet to verify your installation. If you're new to Jupyter, you can refer to the guide on [interactive environments](/sdks/python/sample-notebooks#interactive-environments).
```python Python
from tilebox.datasets import Client
client = Client()
datasets = client.datasets()
collection = datasets.open_data.copernicus.landsat8_oli_tirs.collection("L1T")
data = collection.load(("2015-01-01", "2020-01-01"), show_progress=True)
data
```
If the installation is successful, the output should appear as follows.
# Sample notebooks
Sample code maintained to use and learn from.
To quickly become familiar with the Python client, you can explore some sample notebooks. Each notebook can be executed standalone from top to bottom.
## Sample notebooks
You can access the sample notebooks on [ Google Drive](https://drive.google.com/drive/folders/1I7G35LLeB2SmKQJFsyhpSVNyZK9uOeJt).
Right click a notebook in Google Drive and select `Open with -> Google Colaboratory` to open it directly in the browser using [Google Colab](https://colab.research.google.com/).
### Notebook overview
This notebook demonstrates how to use the Python client to query metadata from the ERS-SAR Opendata dataset. It shows how to filter results by geographical location and download product data for a specific granule.
[ Open in
Colab](https://colab.research.google.com/drive/1LTYhLKy8m9psMhu0DvANs7hyS3ViVpas)
This notebook illustrates how to use the Python client to query the S5P Tropomi Opendata dataset for methane products. It also explains how to filter results based on geographical location and download product data for a specific granule.
[ Open in
Colab](https://colab.research.google.com/drive/1eVYARNFnTIeQqBs6gqeay01EDvRk2EI4)
Execute cells one by one using `Shift+Enter`. Most commonly used libraries are pre-installed.
All demo notebooks require Python 3.10 or higher.
## Interactive environments
Jupyter, Google Colab, and JetBrains Datalore are interactive environments that simplify the development and sharing of algorithmic code. They allow users to work with notebooks, which combine code and rich text elements like figures, links, and equations. Notebooks require no setup and can be easily shared.
[Jupyter notebooks](https://jupyter.org/) are the original interactive environment for Python. They are useful but require local installation.
[Google Colab](https://colab.research.google.com/) is a free tool that provides a hosted interactive Python environment. It easily connects to local Jupyter instances and allows code sharing using Google credentials or within organizations using Google Workspace.
[JetBrains Datalore](https://datalore.jetbrains.com/) is a free platform for collaborative testing, development, and sharing of Python code and algorithms. It has built-in secret management for storing credentials. Datalore also features advanced JetBrains syntax highlighting and autocompletion. Currently, it only supports Python 3.8, which is not compatible with the Tilebox Python client.
Since Colab is a hosted free tool that meets all requirements, including Python ≥3.10, it's recommended for use.
## Installing packages
Within your interactive environment, you can install missing packages using pip in "magic" cells, which start with an exclamation mark.
```bash
# pip is already installed in your interactive environment
!pip3 install ....
```
All APIs or commands that require authentication can be accessed through client libraries that hide tokens, allowing notebooks to be shared without exposing personal credentials.
## Executing code
Execute code by clicking the play button in the top left corner of the cell or by pressing `Shift + Enter`. While the code is running, a spinning icon appears. When the execution finishes, the icon changes to a number, indicating the order of execution. The output displays below the code.
## Authorization
When sharing notebooks, avoid directly sharing your Tilebox API key. Instead, use one of two methods to authenticate the Tilebox Python client in interactive environments: through environment variables or interactively.
```python Using environment variables to store your API key
# Define an environment variable "TILEBOX_API_KEY" that contains your API key
import os
token = os.getenv("TILEBOX_API_KEY")
```
**Interactive** authorization is possible using the built-in `getpass` module. This prompts the user for the API key when running the code, storing it in memory without sharing it when the notebook is shared.
```python Interactively providing your API key
from getpass import getpass
token = getpass("API key:")
```
# Xarray
Overview of the Xarray library, common use cases, and implementation details.
[example_satellite_data.nc]: https://github.com/tilebox/docs/raw/main/assets/data/example_satellite_data.nc
[Xarray](https://xarray.dev/) is a library designed for working with labeled multi-dimensional arrays. Built on top of [NumPy](https://numpy.org/) and [Pandas](https://pandas.pydata.org/), Xarray adds labels in the form of dimensions, coordinates, and attributes, enhancing the usability of raw NumPy-like arrays. This enables a more intuitive, concise, and less error-prone development experience. The library also includes a large and expanding collection of functions for advanced analytics and visualization.
An overview of the Xarray library and its suitability for N-dimensional data (such as Tilebox time series datasets) is available in the official [Why Xarray? documentation page](https://xarray.pydata.org/en/stable/why-xarray.html).
The Tilebox Python client provides access to satellite data as an [xarray.Dataset](https://docs.xarray.dev/en/stable/generated/xarray.Dataset.html#xarray.Dataset). This approach offers a great number of benefits over custom Tilebox-specific data structures:
Xarray is based on NumPy and Pandas—two of the most widely used Python libraries for scientific computing. Familiarity with these libraries translates well to using Xarray.
Leveraging NumPy, which is built on C and Fortran, Xarray benefits from extensive performance optimizations. This allows Xarray to efficiently handle large datasets.
As a widely used library, Xarray easily integrates with many other libraries. Many third-party libraries are also available to expand Xarray's capabilities for diverse use cases.
Xarray is versatile and supports a broad range of applications. It's also easy to extend with custom features.
## Example dataset
To understand how Xarray functions, below is a quick a look at a sample dataset as it might be retrieved from a [Tilebox datasets](/datasets/timeseries) client.
```python Python
from tilebox.datasets import Client
client = Client()
datasets = client.datasets()
collection = datasets.open_data.copernicus.landsat8_oli_tirs.collection("L1GT")
satellite_data = collection.load(("2022-05-01", "2022-06-01"), show_progress=True)
print(satellite_data)
```
```plaintext Output
Size: 305kB
Dimensions: (time: 514, latlon: 2)
Coordinates:
ingestion_time (time) datetime64[ns] 4kB 2024-07-22T09:06:43.5586...
id (time)
This basic dataset illustrates common use cases for Xarray. To follow along, you can [download the dataset as a NetCDF file][example_satellite_data.nc]. The [Reading and writing files section](/sdks/python/xarray#reading-and-writing-files) explains how to save and load Xarray datasets from NetCDF files.
Here's an overview of the output:
* The `satellite_data` **dataset** contains **dimensions**, **coordinates**, and **variables**.
* The `time` **dimension** has 514 elements, indicating that there are 514 data points in the dataset.
* The `time` **dimension coordinate** contains datetime values representing when the data was measured. The `*` indicates a dimension coordinate, which enables label-based indexing and alignment.
* The `ingestion_time` **non-dimension coordinate** holds datetime values for when the data was ingested into Tilebox. Non-dimension coordinates carry coordinate data but are not used for label-based indexing. They can even be [multidimensional](https://docs.xarray.dev/en/stable/examples/multidimensional-coords.html).
* The dataset includes 28 **variables**.
* The `bands` **variable** contains integers indicating how many bands the data contains.
* The `sun_elevation` **variable** contains floating-point values representing the sun's elevation when the data was measured.
Explore the [xarray terminology overview](https://docs.xarray.dev/en/stable/user-guide/terminology.html) to broaden your understanding of **datasets**, **dimensions**, **coordinates**, and **variables**.
## Accessing data in a dataset
### By index
You can access data in different ways. The Xarray documentation offers a [comprehensive overview](https://docs.xarray.dev/en/stable/user-guide/indexing.html) of these methods.
To access the `sun_elevation` variable:
```python Accessing values
# Print the first sun elevation value
print(satellite_data.sun_elevation[0])
```
```plaintext Output
Size: 8B
array(44.19904463)
Coordinates:
ingestion_time datetime64[ns] 8B 2024-07-22T09:06:43.558629
id Size: 665B
Dimensions: (latlon: 2)
Coordinates:
ingestion_time datetime64[ns] 8B 2024-07-22T09:06:43.558629
id Size: 2kB
Dimensions: (time: 3, latlon: 2)
Coordinates:
ingestion_time (time) datetime64[ns] 24B 2024-07-22T09:08:24.7395...
id (time) Size: 216B
array([63.89629314, 63.35038654, ..., 64.37400345, 64.37400345])
Coordinates:
ingestion_time (time) datetime64[ns] 216B 2024-07-22T09:06:43.558629 ......
id (time) 45) &
(satellite_data.sun_elevation < 90)
)
filtered_sun_elevations = satellite_data.sun_elevation[data_filter]
print(filtered_sun_elevations)
```
```plaintext Output
Size: 216B
array([63.89629314, 63.35038654, ..., 64.37400345, 64.37400345])
Coordinates:
ingestion_time (time) datetime64[ns] 216B 2024-07-22T09:06:43.558629 ......
id (time)
Selecting data by value requires unique coordinate values. In case of duplicates, you will encounter an `InvalidIndexError`. To avoid this, you can [drop duplicates](#dropping-duplicates).
```python Indexing by time
specific_datapoint = satellite_data.sel(time="2022-05-01T11:28:28.249000")
print(specific_datapoint)
```
```plaintext Output
Size: 665B
Dimensions: (latlon: 2)
Coordinates:
ingestion_time datetime64[ns] 8B 2024-07-22T09:06:43.558629
id >> raises KeyError: "2022-05-01T11:28:28.000000"
```
To return the closest value instead of raising an error, specify a `method`.
```python Finding the closest data point
nearest_datapoint = satellite_data.sel(time="2022-05-01T11:28:28.000000", method="nearest")
assert nearest_datapoint.equals(specific_datapoint) # passes
```
## Dropping duplicates
Xarray allows you to drop duplicate values from a dataset. For example, to drop duplicate timestamps:
```python Dropping duplicates
deduped = satellite_data.drop_duplicates("time")
```
De-duped datasets are required for certain operations, like [selecting data by value](#selecting-data-by-value).
## Statistics
Xarray and NumPy include a wide range of statistical functions that you can apply to a dataset or DataArray. Here are some examples:
```python Computing dataset statistics
cloud_cover = satellite_data.cloud_cover
min_meas = cloud_cover.min().item()
max_meas = cloud_cover.max().item()
mean_meas = cloud_cover.mean().item()
std_meas = cloud_cover.std().item()
print(f"Cloud cover ranges from {min_meas:.2f} to {max_meas:.2f} with a mean of {mean_meas:.2f} and a standard deviation of {std_meas:.2f}")
```
```plaintext Output
Cloud cover ranges from 0.00 to 100.00 with a mean of 76.48 and a standard deviation of 34.17
```
You can also directly apply many NumPy functions to datasets or DataArrays. For example, to find out how many unique bands the data contains, use [np.unique](https://numpy.org/doc/stable/reference/generated/numpy.unique.html):
```python Finding unique values
import numpy as np
print("Sensors:", np.unique(satellite_data.bands))
```
```plaintext Output
Sensors: [12]
```
## Reading and writing files
Xarray provides a simple method for saving and loading datasets from files. This is useful for sharing your data or storing it for future use. Xarray supports many different file formats, including NetCDF, Zarr, GRIB, and more. For a complete list of supported formats, refer to the [official documentation page](https://docs.xarray.dev/en/stable/user-guide/io.html).
To save the example dataset as a NetCDF file:
You may need to install the `netcdf4` package first.
```python Saving a dataset to a file
satellite_data.to_netcdf("example_satellite_data.nc")
```
This creates a file named `example_satellite_data.nc` in your current directory. You can then load this file back into memory with:
```python Loading a dataset from a file
import xarray as xr
satellite_data = xr.open_dataset("example_satellite_data.nc")
```
If you would like to follow along with the examples in this section, you can download the example dataset as a NetCDF file [here][example_satellite_data.nc].
## Further reading
This section covers only a few common use cases for Xarray. The library offers many more functions and features. For more information, please see the [Xarray documentation](https://xarray.pydata.org/en/stable/) or explore the [Xarray Tutorials](https://tutorial.xarray.dev/intro.html).
Some useful capabilities not covered in this section include:
# Caches
Sharing data between tasks is crucial for workflows, especially in satellite imagery processing, where large datasets are the norm. Tilebox Workflows offers a straightforward API for storing and retrieving data from a shared cache.
The cache API is currently experimental and may undergo changes in the future. Many more features and new [backends](#cache-backends) are on the roadmap. There might be breaking changes to the Cache API in the future.
Caches are configured at the [task runner](/workflows/concepts/task-runners) level. Because task runners can be deployed across multiple locations, caches must be accessible from all task runners contributing to a workflow.
Currently, the default cache implementation uses a Google Cloud Storage bucket, providing a scalable method to share data between tasks. For quick prototyping and local development, you can also use a local file system cache, which is included by default.
If needed, you can create your own cache backend by implementing the `Cache` interface.
## Configuring a Cache
You can configure a cache while creating a task runner by passing a cache instance to the `cache` parameter. To use an in-memory cache, use `tilebox.workflows.cache.InMemoryCache`. This implementation is helpful for local development and quick testing. For alternatives, see the supported [cache backends](#cache-backends).
```python Python
from tilebox.workflows import Client
from tilebox.workflows.cache import InMemoryCache
client = Client()
runner = client.runner(
"dev-cluster",
tasks=[...],
cache=InMemoryCache(),
)
```
By configuring such a cache, the `context` object that is passed to the execution of each task gains access to a `job_cache`
attribute that can be used to [store and retrieve data](#storing-and-retrieving-data) from the cache.
## Cache Backends
Tilebox Workflows comes with four cache implementations out of the box, each backed by a different storage system.
### Google Storage Cache
A cache implementation backed by a Google cloud Storage bucket to store and retrieve data. It's a reliable method for sharing data across tasks, suitable for production environments. You will need access to a GCP project and a bucket.
The Tilebox Workflow orchestrator uses the official Python Client for Google Cloud Storage. To set up authentication, refer to the official documentation.
```python Python
from tilebox.workflows import Client
from tilebox.workflows.cache import GoogleStorageCache
from google.cloud.storage import Client as StorageClient
storage_client = StorageClient(project="gcp-project")
bucket = storage_client.bucket("cache-bucket")
client = Client()
runner = client.runner(
"dev-cluster",
tasks=[...],
cache=GoogleStorageCache(bucket, prefix="jobs"),
)
```
The `prefix` parameter is optional and can be used to set a common prefix for all cache keys, which helps organize objects within a bucket when re-using the same bucket for other purposes.
### Amazon S3 Cache
A cache implementation backed by an Amazon S3 bucket to store and retrieve data. Like the Google Cloud Storage option, it's reliable and scalable for production use.
Tilebox utilizes the `boto3` library to communicate with Amazon S3. For the necessary authentication setup, refer to [its documentation](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html#configuration).
```python Python
from tilebox.workflows import Client
from tilebox.workflows.cache import AmazonS3Cache
client = Client()
runner = client.runner(
"dev-cluster",
tasks=[...],
cache=AmazonS3Cache("my-bucket-name", prefix="jobs")
)
```
The `prefix` parameter is optional and can be used to set a common prefix for all cache keys, which helps organize objects within a bucket when re-using the same bucket for other purposes.
### Local File System Cache
A cache implementation backed by a local file system. It's suitable for quick prototyping and local development, assuming all task runners share the same machine or access the same file system.
```python Python
from tilebox.workflows import Client
from tilebox.workflows.cache import LocalFileSystemCache
client = Client()
runner = client.runner(
"dev-cluster",
tasks=[...],
cache=LocalFileSystemCache("/path/to/cache/directory"),
)
```
Local file system caches can also be used in conjunction with network attached storage or similar tools, making it a viable option also for distributed setups.
### In-Memory Cache
A simple in-memory cache useful for quick prototyping and development. The data is not shared between task runners and is lost upon task runner restarts. Use this cache only for workflows executed on a single task runner.
```python Python
from tilebox.workflows import Client
from tilebox.workflows.cache import InMemoryCache
client = Client()
runner = client.runner(
"dev-cluster",
tasks=[...],
cache=InMemoryCache(),
)
```
## Data Isolation
Caches are isolated per job, meaning that each job's cache data is only accessible to tasks within that job. This setup prevents key conflicts between different job executions. Currently, accessing cache data of other jobs is not supported.
The capability to share data across jobs is planned for future updates. This feature will be beneficial for real-time processing workflows or workflows requiring auxiliary data from external sources.
## Storing and Retrieving Data
The job cache can be accessed via the `ExecutionContext` passed to a tasks `execute` function. This [`job_cache`](/api-reference/tilebox.workflows/ExecutionContext.job_cache) object provides methods to handle data storage and retrieval from the cache. The specifics of data storage depend on the chosen cache backend.
The cache API is designed to be simple and can handle all types of data, supporting binary data in the form of `bytes`, identified by `str` cache keys. This allows for storing many different data types, such as pickled Python objects, serialized JSON, UTF-8, or binary data.
The following snippet illustrates storing and retrieving data from the cache.
```python Python
from tilebox.workflows import Task, ExecutionContext
class ProducerTask(Task):
def execute(self, context: ExecutionContext) -> None:
# store data in the cache
context.job_cache["data"] = b"my_binary_data_to_store"
context.submit_subtask(ConsumerTask())
class ConsumerTask(Task):
def execute(self, context: ExecutionContext) -> None:
data = context.job_cache["data"]
print(f"Read {data} from cache")
```
In this example, data stored under the key `"data"` can be any size that fits the cache backend constraints. Ensure the key remains unique within the job's scope to avoid conflicts.
To test the workflow, you can start a local task runner using the `InMemoryCache` backend. Then, submit a job to execute the `ProducerTask` and observe the output of the `ConsumerTask`.
```python Python
# submit a job to test our workflow
job_client = client.jobs()
job_client.submit("testing-cache-access", ProducerTask(), cluster="dev-cluster")
# start a runner to execute it
runner = client.runner(
"dev-cluster",
tasks=[ProducerTask, ConsumerTask],
cache=LocalFileSystemCache("/path/to/cache/directory"),
)
runner.run_forever()
```
```plaintext Output
Read b'my_binary_data_to_store' from cache
```
## Groups and Hierarchical Keys
The cache API supports groups and hierarchical keys, analogous to directories and files in a file system. Groups help organize cache keys hierarchically, preventing key conflicts and allowing data to be structured better. Additionally, groups are iterable, enabling retrieval of all keys within the group. This feature is useful when multiple tasks create cache data, and a later task needs to list all produced data by earlier tasks.
The following code shows an example of how cache groups can be used.
```python Python {39,44-45}
from tilebox.workflows import Task, ExecutionContext
import random
class CacheGroupDemoWorkflow(Task):
n: int
def execute(self, context: ExecutionContext):
# define a cache group key for subtasks
group_key = "random_numbers"
produce_numbers = context.submit_subtask(
ProduceRandomNumbers(self.n, group_key)
)
sum_task = context.submit_subtask(
PrintSum(group_key),
depends_on=[produce_numbers],
)
class ProduceRandomNumbers(Task):
n: int
group_key: str
def execute(self, context: ExecutionContext):
for i in range(self.n):
context.submit_subtask(ProduceRandomNumber(i, self.group_key))
class ProduceRandomNumber(Task):
index: int
group_key: str
def execute(self, context: ExecutionContext) -> None:
number = random.randint(0, 100)
group = context.job_cache.group(self.group_key)
group[f"key_{self.index}"] = str(number).encode()
class PrintSum(Task):
group_key: str
def execute(self, context: ExecutionContext) -> None:
group = context.job_cache.group(self.group_key)
# PrintSum doesn't know how many numbers were produced,
# instead it iterates over all keys in the cache group
numbers = []
for key in group: # iterate over all stored numbers
number = group[key] # read data from cache
numbers.append(int(number.decode())) # convert bytes back to int
print(f"Sum of all numbers: {sum(numbers)}")
```
Submitting a job of the `CacheGroupDemoWorkflow` and running it with a task runner can be done as follows:
```python Python
# submit a job to test our workflow
job_client = client.jobs()
job_client.submit("cache-groups", CacheGroupDemoWorkflow(5), cluster="dev-cluster")
# start a runner to execute it
runner = client.runner(
"dev-cluster",
tasks=[CacheGroupDemoWorkflow, ProduceRandomNumbers, ProduceRandomNumber, PrintSum],
cache=LocalFileSystemCache("/path/to/cache/directory"),
)
runner.run_forever()
```
```plaintext Output
Sum of all numbers: 284
```
# Clusters
Clusters are a logical grouping for [task runners](/workflows/concepts/task-runners). Using clusters, you can scope certain tasks to a specific group of task runners. Tasks, which are always submitted to a specific cluster, are only executed on task runners assigned to the same cluster.
## Use Cases
Use clusters to organize [task runners](/workflows/concepts/clusters) into logical groups, which can help with:
* Targeting specific task runners for a particular job
* Reserving a group of task runners for specific purposes, such as running certain types of batch jobs
* Setting up different clusters for different environments (like development and production)
Even when using different clusters, task runners within the same cluster may still have different capabilities, such as different registered tasks.
If multiple task runners have the same set of registered tasks, you can assign them to different clusters to target specific task runners for a particular job.
### Adding Task Runners to a Cluster
You can add task runners to a cluster by specifying the [cluster's slug](#cluster-slug) when [registering a task runner](/workflows/concepts/task-runners).
Each task runner must always be assigned to a cluster.
## Managing Clusters
Before registering a task runner or submitting a job, you must create a cluster. You can also list, fetch, and delete clusters as needed. The following sections explain how to do this.
To manage clusters, first instantiate a cluster client using the `clusters` method in the workflows client.
```python Python
from tilebox.workflows import Client
client = Client()
clusters = client.clusters()
```
### Creating a Cluster
To create a cluster, use the `create` method on the cluster client and provide a name for the cluster.
```python Python
cluster = clusters.create("testing")
print(cluster)
```
```plaintext Output
Cluster(slug='testing-CvufcSxcC9SKfe', display_name='testing')
```
### Cluster Slug
Each cluster has a unique identifier, combining the cluster's name and an automatically generated identifier. Use this slug to reference the cluster for other operations, like submitting a job or subtasks.
### Listing Clusters
To list all available clusters, use the `all` method:
```python Python
all_clusters = clusters.all()
print(all_clusters)
```
```plaintext Output
[Cluster(slug='testing-CvufcSxcC9SKfe', display_name='testing'),
Cluster(slug='production-EifhUozDpwAJDL', display_name='Production')]
```
### Fetching a Specific Cluster
To fetch a specific cluster, use the `find` method and pass the cluster's slug:
```python Python
cluster = clusters.find("testing-CvufcSxcC9SKfe")
print(cluster)
```
```plaintext Output
Cluster(slug='testing-CvufcSxcC9SKfe', display_name='testing')
```
### Deleting a Cluster
To delete a cluster, use the `delete` method and pass the cluster's slug:
```python Python
clusters.delete("testing-CvufcSxcC9SKfe")
```
## Jobs Across Different Clusters
When [submitting a job](/workflows/concepts/jobs), you need to specify which cluster the job's root task should be executed on. This allows you to direct the job to a specific set of task runners. By default, all sub-tasks within a job are also submitted to the same cluster, but this can be overridden to submit sub-tasks to different clusters if needed. See the example below for a job that spans across multiple clusters.
```python Python
from tilebox.workflows import Task, ExecutionContext, Client
class MultiClusterWorkflow(Task):
def execute(self, context: ExecutionContext) -> None:
# this submits a task to the same cluster as the one currently executing this task
same_cluster = context.submit_subtask(DummyTask())
other_cluster = context.submit_subtask(
DummyTask(),
# this task runs only on a task runner in the "other-cluster" cluster
cluster="other-cluster-As3dcSb3D9SAdK",
# dependencies can be specified across clusters
depends_on=[same_cluster],
)
class DummyTask(Task):
def execute(self, context: ExecutionContext) -> None:
pass
# submit a job to the "testing" cluster
client = Client()
job_client = client.jobs()
job = job_client.submit(
MultiClusterWorkflow(),
cluster="testing-CvufcSxcC9SKfe",
)
```
This workflow requires at least two task runners to complete. One must be in the "testing" cluster, and the other must be in the "other-cluster" cluster. If no task runners are available in the "other-cluster," the task submitted to that cluster will remain queued until a task runner is available. It won't execute on a task runner in the "testing" cluster, even if the task runner has the `DummyTask` registered.
# Jobs
A job is a specific execution of a workflow with designated input parameters. It consists of one or more tasks that can run in parallel or sequentially, based on their dependencies. Submitting a job involves creating a root task with specific input parameters, which may trigger the execution of other tasks within the same job.
## Submission
To execute a [task](/workflows/concepts/tasks), it must be initialized with concrete inputs and submitted as a job. The task will then run within the context of the job, and if it generates sub-tasks, those will also execute as part of the same job.
After submitting a job, the root task is scheduled for execution, and any [eligible task runner](/workflows/concepts/task-runners#task-selection) can pick it up and execute it.
First, instantiate a job client by calling the `jobs` method on the workflow client.
```python Python
from tilebox.workflows import Client
client = Client()
job_client = client.jobs()
```
After obtaining a job client, submit a job using the [submit](/api-reference/tilebox.workflows/JobClient.submit) method. You need to provide a name for the job, an instance of the root [task](/workflows/concepts/tasks), and a [cluster](/workflows/concepts/clusters) to execute the root task on.
```python Python
# import your own workflow
from my_workflow import MyTask
cluster = "dev-cluster"
job = job_client.submit('my-job', MyTask("some", "parameters"), cluster)
```
Once a job is submitted, it's immediately scheduled for execution. The root task will be picked up and executed as soon as an [eligible task runner](/workflows/concepts/task-runners#task-selection) is available.
## Retry Handling
[Tasks support retry handling](/workflows/concepts/tasks#retry-handling) for failed executions. This applies to the root task of a job as well, where you can specify the number of retries using the `max_retries` argument of the `submit` method.
```python Python
from my_workflow import MyFlakyTask
cluster = "dev-cluster"
job = job_client.submit('my-job', MyFlakyTask(), cluster, max_retries=5)
```
In this example, if `MyFlakyTask` fails, it will be retried up to five times before being marked as failed.
## Retrieving a specific job
When you submit a job, it's assigned a unique identifier that can be used to retrieve it later.
You can use the `find` method on the job client to get a job by its ID.
```python Python
job = job_client.submit('my-job', MyTask("some", "parameters"), cluster)
print(job.id) # 018dd029-58ca-74e5-8b58-b4f99d610f9a
# Later, in another process or machine, retrieve job info
job = job_client.find("018dd029-58ca-74e5-8b58-b4f99d610f9a")
```
`find` is also a useful tool for fetching a jobs state after a while, to check if it's still running or has already completed.
## Visualization
Visualizing the execution of a job can be helpful. The Tilebox workflow orchestrator tracks all tasks in a job, including [sub-tasks](/workflows/concepts/tasks#task-composition-and-subtasks) and [dependencies](/workflows/concepts/tasks#dependencies). This enables the visualization of the execution of a job as a graph diagram.
`display` is designed for use in an [interactive environment](/sdks/python/sample-notebooks#interactive-environments) such as a Jupyter notebook. In non-interactive environments, use [visualize](/api-reference/tilebox.workflows/JobClient.visualize), which returns the rendered diagram as an SVG string.
```python Python
job = job_client.find("some-job-id") # or a recently submitted job
# Then visualize it
job_client.display(job)
```
The following diagram represents the job execution as a graph. Each task is shown as a node, with edges indicating sub-task relationships. The diagram also uses color coding to display the status of each task.
The color codes for task states are:
| Task State | Color | Description |
| ---------- | -------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------- |
| Queued | SalmonYellow | The task is queued and waiting for execution. |
| Running | Blue | The task is currently being executed. |
| Computed | Green | The task has successfully been computed. If a task is computed, and all it's sub-tasks are also computed, the task is considered completed. |
| Failed | Red | The task has been executed but encountered an error. |
Below is another visualization of a job currently being executed by multiple task runners.
This visualization shows:
* The root task, `MyTask`, has executed and spawned three sub-tasks.
* At least three task runners are available, as three tasks currently are executed simultaneously.
* The `SubTask` that is still executing has not generated any sub-tasks yet, as sub-tasks are queued for execution only after the parent task finishes and becomes computed.
* The queued `DependentTask` requires the `LeafTask` to complete before it can be executed.
Job visualizations are meant for development and debugging. They are not suitable for large jobs with hundreds of tasks, as the diagrams may become too complex. Currently, visualizations are limited to jobs with a maximum of 200 tasks.
### Customizing Task Display Names
The text representing a task in the diagram defaults to a tasks class name. You can customize this by modifying the `display` field of the `current_task` object in the task's execution context. The maximum length for a display name is 1024 characters, with any overflow truncated. Line breaks using `\n` are supported as well.
```python Python
from tilebox.workflows import Task, ExecutionContext
class RootTask(Task):
num_subtasks: int
def execute(self, context: ExecutionContext):
context.current_task.display = f"Root({self.num_subtasks})"
for i in range(self.num_subtasks):
context.submit_subtask(SubTask(i))
class SubTask(Task):
index: int
def execute(self, context: ExecutionContext):
context.current_task.display = f"Leaf Nr. {self.index}"
job = job_client.submit('custom-display-names', RootTask(3), "dev-cluster")
job_client.display(job)
```
## Cancellation
You can cancel a job at any time. When a job is canceled, no queued tasks will be picked up by task runners and executed even if task runners are idle. Tasks that are already being executed will finish their execution and not be interrupted. All sub-tasks spawned from such tasks after the cancellation will not be picked up by task runners.
Use the `cancel` method on the job client to cancel a job.
```python Python
job = job_client.submit('my-job', MyTask(), "dev-cluster")
# After a short while, the job gets canceled
job_client.cancel(job)
```
A canceled job can be resumed at any time by [retrying](#retries) it.
If any task in a job fails, the job is automatically canceled to avoid executing irrelevant tasks. Future releases will allow configuring this behavior for each task to meet specific requirements.
## Retries
If a task fails due to a bug or lack of resources, there is no need to resubmit the entire job. You can simply retry the job, and it will resume from the point of failure. This ensures that all the work that was already done up until the point of the failure isn't lost.
Future releases may introduce automatic retries for certain failure conditions, which can be useful for handling temporary issues.
Below is an example of a failing job due to a bug in the task's implementation. The following workflow processes a list of movie titles and queries the [OMDb API](http://www.omdbapi.com/) for each movie's release date.
```python Python
from urllib.parse import urlencode
import httpx
from tilebox.workflows import Task, ExecutionContext
class MoviesStats(Task):
titles: list[str]
def execute(self, context: ExecutionContext) -> None:
for title in self.titles:
context.submit_subtask(PrintMovieStats(title))
class PrintMovieStats(Task):
title: str
def execute(self, context: ExecutionContext) -> None:
params = {"t": self.title, "apikey": ""}
url = "http://www.omdbapi.com/?" + urlencode(params)
response = httpx.get(url).json()
# set the display name of the task to the title of the movie:
context.current_task.display = response["Title"]
print(f"{response['Title']} was released on {response['Released']}")
```
Submitting the workflow as a job reveals a bug in the `PrintMovieStats` task.
```python Python
job = job_client.submit('movies-stats', MoviesStats([
"The Matrix",
"Shrek 2",
"Tilebox - The Movie",
"The Avengers",
]), "dev-cluster")
job_client.display(job)
```
One of the `PrintMovieStats` tasks fails with a `KeyError`. This error occurs when a movie title is not found by the [OMDb API](http://www.omdbapi.com/), leading to a response without the `Title` and `Released` fields.
Console output from the task runners confirms this:
```plaintext Output
The Matrix was released on 31 Mar 1999
Shrek 2 was released on 19 May 2004
ERROR: Task PrintMovieStats failed with exception: KeyError('Title')
```
The corrected version of `PrintMovieStats` is as follows:
```python Python
class PrintMovieStats(Task):
title: str
def execute(self, context: ExecutionContext) -> None:
params = {"t": self.title, "apikey": ""}
url = "http://www.omdbapi.com/?" + urlencode(params)
response = httpx.get(url).json()
if "Title" in response and "Released" in response:
context.current_task.display = response["Title"]
print(f"{response['Title']} was released on {response['Released']}")
else:
context.current_task.display = f"NotFound: {self.title}"
print(f"Could not find the release date for {self.title}")
```
With this fix, and after redeploying the task runners with the updated `PrintMovieStats` implementation, you can retry the job:
```python Python
job_client.retry(job)
job_client.display(job)
```
Now the console output shows:
```plaintext Output
Could not find the release date for Tilebox - The Movie
The Avengers was released on 04 May 2012
```
The output confirms that only two tasks were executed, resuming from the point of failure instead of re-executing all tasks.
The job was retried and succeeded. The two tasks that completed before the failure were not re-executed.
# Task Runners
Task runners are the execution agents within the Tilebox Workflows ecosystem that execute tasks. They can be deployed in different computing environments, including on-premise servers and cloud-based auto-scaling clusters. Task runners execute tasks as scheduled by the workflow orchestrator, ensuring they have the necessary resources and environment for effective execution.
## Implementing a Task Runner
A task runner is a continuously running process that listens for new tasks to execute. You can start multiple task runner processes to execute tasks concurrently. When a task runner receives a task, it executes it and reports the results back to the workflow orchestrator. The task runner also handles any errors that occur during task execution, reporting them to the orchestrator as well.
To execute a task, at least one task runner must be running and available. If no task runners are available, tasks will remain queued until one becomes available.
To create and start a task runner, follow these steps:
Instantiate a client connected to the Tilebox Workflows API.
Select or create a [cluster](/workflows/concepts/clusters) and specify its slug when creating a task runner.
Register tasks by specifying the task classes that the task runner can execute as a list to the `runner` method.
Call the `run_forever` method of the task runner to listen for new tasks until the task runner process is shut down.
Here is a simple example demonstrating these steps:
```python Python
from tilebox.workflows import Client
# your own workflow:
from my_workflow import MyTask, OtherTask
def main():
client = Client() # 1. connect to the Tilebox Workflows API
cluster = "dev-cluster" # 2. select a cluster to join
runner = client.runner(
cluster,
tasks=[MyTask, OtherTask] # 3. register tasks
)
runner.run_forever() # 4. listen for new tasks to execute
if __name__ == "__main__":
main()
```
To start the task runner locally, run it as a Python script:
```bash
> python task_runner.py
```
## Task Selection
For a task runner to pick up a submitted task, the following conditions must be met:
1. The [cluster](/workflows/concepts/clusters) where the task was submitted must match the task runner's cluster.
2. The task runner must have a registered task that matches the [task identifier](/workflows/concepts/tasks#task-identifiers) of the submitted task.
3. The version of the task runner's registered task must be [compatible](/workflows/concepts/tasks#semantic-versioning) with the submitted task's version.
If a task meets these conditions, the task runner executes it. Otherwise, the task runner remains idle until a matching task is available.
Often, multiple submitted tasks match the conditions for execution. In that case, the task runner selects one of the tasks to execute, and the remaining tasks stay in a queue until the selected task is completed or another task runner becomes available.
## Parallelism
You can start multiple task runner instances in parallel to execute tasks concurrently. Each task runner listens for new tasks and executes them as they become available. This allows for high parallelism and can be used to scale the execution of tasks to handle large workloads.
To test this, run multiple instances of the task runner script in different terminal windows on your local machine, or use a tool like [call-in-parallel](https://github.com/tilebox/call-in-parallel) to start the task runner script multiple times.
For example, to start five task runners in parallel, use the following command:
```bash
> call-in-parallel -n 5 -- python task_runner.py
```
## Deploying Task Runners
Task runners are continuously running processes that can be deployed in different computing environments. The only requirement for deploying task runners is access to the Tilebox Workflows API. Once this is met, task runners can be deployed in many different environments, including:
* On-premise servers
* Cloud-based virtual machines
* Cloud-based auto-scaling clusters
## Scaling
One key benefit of task runners is their **ability to scale even while workflows are executing**. You can start new task runners at any time, and they can immediately pick up queued tasks to execute. It's not necessary to have an entire processing cluster available at the start of a workflow, as task runners can be started and stopped as needed.
This is particularly beneficial in cloud environments, where task runners can be automatically started and stopped based on current workload, measured by metrics such as CPU usage. Here's an example scenario:
1. A single instance of a task runner is actively waiting for work in a cloud environment.
2. A large workload is submitted to the workflow orchestrator, prompting the task runner to pick up the first task.
3. The first task creates new sub-tasks for processing, which the task runner also picks up.
4. As the workload increases, the task runner's CPU usage rises, triggering the cloud environment to automatically start new task runner instances.
5. Newly started task runners begin executing queued tasks, distributing the workload among all available task runners.
6. Once the workload decreases, the cloud environment automatically stops some task runners.
7. The first task runner completes the remaining work until everything is done.
8. The first task runner remains idle until new tasks arrive.
CPU usage-based auto scaling is just one method to scale task runners. Other metrics, such as memory usage or network bandwidth, are also supported by many cloud environments. In a future release, configuration options for scaling task runners based on custom metrics (for example the number of queued tasks) are planned.
## Distributed Execution
Task runners can be distributed across different compute environments. For instance, some data stored on-premise may need pre-processing, while further processing occurs in the cloud. A job might involve tasks that filter relevant on-premise data and publish it to the cloud, and other tasks that read data from the cloud and process it. In such scenarios, a task runners can run on-premise and another in a cloud environments, resulting in them effectively collaborating on the same job.
Another advantage of distributed task runners is executing workflows that require specific hardware for certain tasks. For example, one task might need a GPU, while another requires extensive memory.
Here's an example of a distributed workflow:
```python Python
from tilebox.workflows import Task, ExecutionContext
class DistributedWorkflow(Task):
def execute(self, context: ExecutionContext) -> None:
download_task = context.submit_subtask(DownloadData())
process_task = context.submit_subtask(
ProcessData(),
depends_on=[download_task],
)
class DownloadData(Task):
"""
Download a dataset and store it in a shared internal bucket.
Requires a good network connection for high download bandwidth.
"""
def execute(self, context: ExecutionContext) -> None:
pass
class ProcessData(Task):
"""
Perform compute-intensive processing of a dataset.
The dataset must be available in an internal bucket.
Requires access to a GPU for optimal performance.
"""
def execute(self, context: ExecutionContext) -> None:
pass
```
To achieve distributed execution for this workflow, no single task runner capable of executing all three of the tasks is set up. Instead, two task runners, each capable of executing one of the tasks are set up: one in a high-speed network environment and the other with GPU access. When the distributed workflow runs, the first task runner picks up the `DownloadData` task, while the second picks up the `ProcessData` task. The `DistributedWorkflow` does not require specific hardware, so it can be registered with both runners and executed by either one.
```python download_task_runner.py
from tilebox.workflows import Client
client = Client()
high_network_speed_runner = client.runner(
"dev-cluster",
tasks=[DownloadData, DistributedWorkflow]
)
high_network_speed_runner.run_forever()
```
```python gpu_task_runner.py
from tilebox.workflows import Client
client = Client()
gpu_runner = client.runner(
"dev-cluster",
tasks=[ProcessData, DistributedWorkflow]
)
gpu_runner.run_forever()
```
Now, both `download_task_runner.py` and `gpu_task_runner.py` are started, in parallel, on different machines with the required hardware for each. When `DistributedWorkflow` is submitted, it executes on one of the two runners, and it's submitted sub-tasks are handled by the appropriate runner.
In this case, since `ProcessData` depends on `DownloadData`, the GPU task runner remains idle until the download completion, then picks up the processing task.
You can also differentiate between task runners by specifying different [clusters](/workflows/concepts/clusters) and choosing specific clusters for sub-task submissions. For more details, see the [Clusters](/workflows/concepts/clusters) section.
## Task Failures
If an unhandled exception occurs during task execution, the task runner captures it and reports it back to the workflow orchestrator. The orchestrator then marks the task as failed, leading to [job cancellation](/workflows/concepts/jobs#cancellation) to prevent further tasks of the same job-that may not be relevant anymore-from being executed.
A task failure does not result in losing all previous work done by the job. If the failure is fixable—by fixing a bug in a task implementation, ensuring the task has necessary resources, or simply retrying it due to a flaky network connection—it may be worth [retrying](/workflows/concepts/jobs#retries) the job.
When retrying a job, all failed tasks are added back to the queue, allowing a task runner to potentially execute them. If execution then succeeds, the job continues smoothly. Otherwise, the task will remain marked as failed and can be retried again if desired.
If fixing a failure requires modifying the task implementation, it's important to deploy the updated version to the [task runners](/workflows/concepts/task-runners) before retrying the job. Otherwise, a task runner could pick up the original, faulty implementation again, leading to another failure.
## Task idempotency
Since a task may be retried, it's possible that a task is executed more than once. Depending on where in the execution of the task it failed, it may have already performed some side effects, such as writing to a database, or sending a message to a queue. Because of that it's crucial to ensure that tasks are [idempotent](https://en.wikipedia.org/wiki/Idempotence). Idempotent tasks can be executed multiple times without altering the outcome beyond the first successful execution.
A special case of idempotency involves submitting sub-tasks. After a task calls `context.submit_subtask` and then fails and is retried, those submitted sub-tasks of an earlier failed execution are automatically removed, ensuring that they can be safely submitted again when the task is retried.
## Runner Crashes
Tilebox Workflows has an internal mechanism to handle unexpected task runner crashes. When a task runner picks up a task, it periodically sends a heartbeat to the workflow orchestrator. If the orchestrator does not receive this heartbeat for a defined duration, it marks the task as failed and automatically attempts to [retry](/workflows/concepts/jobs#retries) it up to 10 times. This allows another task runner to pick up the task and continue executing the job.
This mechanism ensures that scenarios such as power outages, hardware failures, or dropped network connections are handled effectively, preventing any task from remaining in a running state indefinitely.
## Observability
Task runners are continuously running processes, making it essential to monitor their health and performance. You can achieve observability by collecting and analyzing logs, metrics, and traces from task runners. Tilebox Workflows provides tools to enable this data collection and analysis. To learn how to configure task runners for observability, head over to the [Observability](/workflows/observability) section.
# Understanding and Creating Tasks
A Task is the smallest unit of work, designed to perform a specific operation. Each task represents a distinct operation or process that can be executed, such as processing data, performing calculations, or managing resources. Tasks can operate independently or as components of a more complex set of connected tasks known as a Workflow. Tasks are defined by their code, inputs, and dependencies on other tasks. To create tasks, you need to define the input parameters and specify the action to be performed during execution.
## Creating a Task
To create a task in Tilebox, define a class that extends the `Task` base class and implements the `execute` method. The `execute` method is the entry point for the task where its logic is defined. It's called when the task is executed.
```python Python
from tilebox.workflows import Task, ExecutionContext
class MyFirstTask(Task):
def execute(self, context: ExecutionContext):
print(f"Hello World!")
```
This example demonstrates a simple task that prints "Hello World!" to the console. The key components of this task are:
`MyFirstTask` is a subclass of the `Task` class, which serves as the base class for all defined tasks. It provides the essential structure for a task. Inheriting from `Task` automatically makes the class a `dataclass`, which is useful [for specifying inputs](#input-parameters). Additionally, by inheriting from `Task`, the task is automatically assigned an [identifier based on the class name](#task-identifiers).
The `execute` method is the entry point for executing the task. This is where the task's logic is defined. It's invoked by a [task runner](/workflows/concepts/task-runners) when the task runs and performs the task's operation.
The `context` argument is an `ExecutionContext` instance that provides access to an [API for submitting new tasks](/api-reference/tilebox.workflows/ExecutionContext.submit_subtask) as part of the same job and features like [shared caching](/api-reference/tilebox.workflows/ExecutionContext.job_cache).
The code samples on this page do not illustrate how to execute the task. That will be covered in the
[next section on task runners](/workflows/concepts/task-runners). The reason for that is that executing tasks is a separate concern from implementing tasks.
## Input Parameters
Tasks often require input parameters to operate. These inputs can range from simple values to complex data structures. By inheriting from the `Task` class, the task is treated as a Python `dataclass`, allowing input parameters to be defined as class attributes.
Tasks must be **serializable to JSON** because they may be distributed across a cluster of [task runners](/workflows/concepts/task-runners).
Supported types for input parameters include:
* Basic types such as `str`, `int`, `float`, `bool`
* Lists and dictionaries of basic types
* Nested data classes that are also JSON-serializable
```python Python
class ParametrizableTask(Task):
message: str
number: int
data: dict[str, str]
def execute(self, context: ExecutionContext):
print(self.message * self.number)
task = ParametrizableTask("Hello", 3, {"key": "value"})
```
## Task Composition and subtasks
Until now, tasks have performed only a single operation. But tasks can be more powerful. **Tasks can submit other tasks as subtasks.** This allows for a modular workflow design, breaking down complex operations into simpler, manageable parts. Additionally, the execution of subtasks is automatically parallelized whenever possible.
```python Python
class ParentTask(Task):
num_subtasks: int
def execute(self, context: ExecutionContext) -> None:
for i in range(self.num_subtasks):
context.submit_subtask(ChildTask(i))
class ChildTask(Task):
index: int
def execute(self, context: ExecutionContext) -> None:
print(f"Executing ChildTask {self.index}")
# after submitting this task, a task runner may pick it up and execute it
# which will result in 5 ChildTasks being submitted and executed as well
task = ParentTask(5)
```
In this example, a `ParentTask` submits `ChildTask` tasks as subtasks. The number of subtasks to be submitted is based on the `num_subtasks` attribute of the `ParentTask`. The `submit_subtask` method takes an instance of a task as its argument, meaning the task to be submitted must be instantiated with concrete parameters first.
By submitting a task as a subtask, its execution is scheduled as part of the same job as the parent task. Compared to just directly invoking the subtask's `execute` method, this allows the subtask's execution to occur on a different machine or in parallel with other subtasks. To learn more about how tasks are executed, see the section on [task runners](/workflows/concepts/task-runners).
### Larger subtasks example
A practical workflow example showcasing task composition might help illustrate the capabilities of tasks. Below is an example of a set of tasks forming a workflow capable of downloading a set number of random dog images from the internet. The [Dog API](https://thedogapi.com/) can be used to get the image URLs, and then download them. Implementing this using Task Composition could look like this:
```python Python
import httpx # pip install httpx
from pathlib import Path
class DownloadRandomDogImages(Task):
num_images: int
def execute(self, context: ExecutionContext) -> None:
url = f"https://api.thedogapi.com/v1/images/search?limit={self.num_images}"
response = httpx.get(url)
for dog_image in response.json():
context.submit_subtask(DownloadImage(dog_image["url"]))
class DownloadImage(Task):
url: str
def execute(self, context: ExecutionContext) -> None:
file = Path("dogs") / self.url.split("/")[-1]
response = httpx.get(self.url)
with file.open("wb") as file:
file.write(response.content)
```
This example consists of the following tasks:
`DownloadRandomDogImages` fetches a specific number of random dog image URLs from an API. It then submits a `DownloadImage` task for each received image URL.
`DownloadImage` downloads an image from a specified URL and saves it to a file.
Together, these tasks create a workflow that downloads random dog images from the internet. The relationship between the two tasks and their formation as a workflow becomes clear when `DownloadRandomDogImages` submits `DownloadImage` tasks as subtasks.
Visualizing the execution of such a workflow is akin to a tree structure where the `DownloadRandomDogImages` task is the root, and the `DownloadImage` tasks are the leaves. For instance, when downloading five random dog images, the following tasks are executed.
```python Python
from tilebox.workflows import Client
client = Client()
jobs = client.jobs()
job = jobs.submit(
"download-dog-images",
DownloadRandomDogImages(5),
"dev-cluster",
)
# now our deployed task runners will pick up the task and execute it
jobs.display(job)
```
In total, six tasks are executed: the `DownloadRandomDogImages` task and five `DownloadImage` tasks. The `DownloadImage` tasks can execute in parallel, as they are independent. If more than one task runner is available, the Tilebox Workflow Orchestrator **automatically parallelizes** the execution of these tasks.
Check out [job\_client.display](/workflows/concepts/jobs#visualization) to learn how this visualization was automatically generated from the task executions.
Currently, a limit of `64` subtasks per task is in place to discourage creating workflows where individual parent tasks submit a large number of subtasks, which can lead to performance issues since those parent tasks are not parallelized. If you need to submit more than `64` subtasks, consider using [recursive subtask submission](#recursive-subtasks) instead.
## Recursive subtasks
Tasks can submit other tasks as subtasks, allowing for complex workflows. Sometimes the input to a task is a list, with elements that can be **mapped** to individual subtasks, whose outputs are then aggregated in a **reduce** step. This pattern is commonly known as **MapReduce**.
Often times the initial map step—submitting the individual subtasks—might already be an expensive operation. Since this is executed within a single task, it's not parallelizable, which can bottleneck the entire workflow.
Fortunately, Tilebox Workflows offers a solution through **recursive subtask submission**. A task can submit instances of itself as subtasks, enabling a recursive breakdown into smaller tasks.
For example, the `RecursiveTask` below is a valid task that submits smaller instances of itself as subtasks.
```python Python
class RecursiveTask(Task):
num: int
def execute(self, context: ExecutionContext) -> None:
print(f"Executing RecursiveTask with num={self.num}")
if self.num >= 2:
context.submit_subtask(RecursiveTask(self.num // 2))
```
### Recursive subtask example
An example for this is the [random dog images workflow](#larger-subtasks-example) mentioned earlier. In the previous implementation, downloading images was already parallelized. But the initial orchestration of the individual download tasks was not parallelized, because `DownloadRandomDogImages` was responsible for fetching all random dog image URLs and only submitted the individual download tasks once all URLs were retrieved. For a large number of images this setup can bottleneck the entire workflow.
To improve this, recursive subtask submission decomposes a `DownloadRandomDogImages` task with a high number of images into two smaller `DownloadRandomDogImages` tasks, each fetching half. This process can be repeated until a specified threshold is met, at which point the Dog API can be queried directly for image URLs. That way, image downloads start as soon as the first URLs are retrieved, without initial waiting.
An implementation of this recursive submission may look like this:
```python Python
class DownloadRandomDogImages(Task):
num_images: int
def execute(self, context: ExecutionContext) -> None:
if self.num_images > 4:
half = self.num_images // 2
remaining = self.num_images - half # account for odd numbers
context.submit_subtask(DownloadRandomDogImages(half))
context.submit_subtask(DownloadRandomDogImages(remaining))
else:
url = f"https://api.thedogapi.com/v1/images/search?limit={self.num_images}"
response = httpx.get(url)
for dog_image in response.json()[:self.num_images]:
context.submit_subtask(DownloadImage(dog_image["url"]))
```
With this implementation, downloading a large number of images (for example, 9) results in the following tasks being executed:
## Retry Handling
By default, when a task fails to execute, it's marked as failed. In some cases, it may be useful to retry the task multiple times before marking it as a failure. This is particularly useful for tasks dependent on external services that might be temporarily unavailable.
Tilebox Workflows allows you to specify the number of retries for a task using the `max_retries` argument of the `submit_subtask` method.
Check out the example below to see how this might look like in practice.
A failed task may be picked up by any available runner and not necessarily the same one that it failed on.
```python Python
import random
class RootTask(Task):
def execute(self, context: ExecutionContext) -> None:
context.submit_subtask(FlakyTask(), max_retries=5)
class FlakyTask(Task):
def execute(self, context: ExecutionContext) -> None:
print(f"Executing FlakyTask")
if random.random() < 0.1:
raise Exception("FlakyTask failed randomly")
```
## Dependencies
Tasks often rely on other tasks. For example, a task that processes data might depend on a task that fetches that data. **Tasks can express their dependencies on other tasks** by using the `depends_on` argument of the [submit\_subtask](/api-reference/tilebox.workflows/ExecutionContext.submit_subtask) method. This means that a dependent task will only execute after the task it relies on has successfully completed.
The `depends_on` argument accepts a list of tasks, enabling a task to depend on multiple other tasks.
A workflow with dependencies might look like this:
```python Python
class RootTask(Task):
def execute(self, context: ExecutionContext) -> None:
first_task = context.submit_subtask(
PrintTask("Executing first")
)
second_task = context.submit_subtask(
PrintTask("Executing second"),
depends_on=[first_task],
)
third_task = context.submit_subtask(
PrintTask("Executing last"),
depends_on=[second_task],
)
class PrintTask(Task):
message: str
def execute(self, context: ExecutionContext) -> None:
print(self.message)
```
The `RootTask` submits three `PrintTask` tasks as subtasks. These tasks depend on each other, meaning the second task executes only after the first task has successfully completed, and the third only executes after the second completes. The tasks are executed sequentially.
If a task upon which another task depends submits subtasks, those subtasks must also execute before the dependent task begins execution.
### Dependencies Example
A practical example is a workflow that fetches news articles from an API and processes them using the [News API](https://newsapi.org/).
```python Python
from pathlib import Path
import json
from collections import Counter
import httpx # pip install httpx
class NewsWorkflow(Task):
category: str
max_articles: int
def execute(self, context: ExecutionContext) -> None:
fetch_task = context.submit_subtask(FetchNews(self.category, self.max_articles))
context.submit_subtask(PrintHeadlines(), depends_on=[fetch_task])
context.submit_subtask(MostFrequentAuthors(), depends_on=[fetch_task])
class FetchNews(Task):
category: str
max_articles: int
def execute(self, context: ExecutionContext) -> None:
url = f"https://newsapi.org/v2/top-headlines?category={self.category}&pageSize={self.max_articles}&country=us&apiKey=API_KEY"
news = httpx.get(url).json()
# check out our documentation page on caches to learn
# about a better way of passing data between tasks
Path("news.json").write_text(json.dumps(news))
class PrintHeadlines(Task):
def execute(self, context: ExecutionContext) -> None:
news = json.loads(Path("news.json").read_text())
for article in news["articles"]:
print(f"{article['publishedAt'][:10]}: {article['title']}")
class MostFrequentAuthors(Task):
def execute(self, context: ExecutionContext) -> None:
news = json.loads(Path("news.json").read_text())
authors = [article["author"] for article in news["articles"]]
for author, count in Counter(authors).most_common():
print(f"Author {author} has written {count} articles")
# now submit a job, and then visualize it
job = job_client.submit("process-news",
NewsWorkflow(category="science", max_articles=5),
"dev-cluster"
)
```
```plaintext Output
2024-02-15: NASA selects ultraviolet astronomy mission but delays its launch two years - SpaceNews
2024-02-15: SpaceX launches Space Force mission from Cape Canaveral - Orlando Sentinel
2024-02-14: Saturn's largest moon most likely uninhabitable - Phys.org
2024-02-14: AI Unveils Mysteries of Unknown Proteins' Functions - Neuroscience News
2024-02-14: Anthropologists' research unveils early stone plaza in the Andes - Phys.org
Author Jeff Foust has written 1 articles
Author Richard Tribou has written 1 articles
Author Jeff Renaud has written 1 articles
Author Neuroscience News has written 1 articles
Author Science X has written 1 articles
```
This workflow consists of four tasks:
| Task | Dependencies | Description |
| ------------------- | ------------ | ----------------------------------------------------------------------------------------------------------------------- |
| NewsWorkflow | - | The root task of the workflow. It spawns the other tasks and sets up the dependencies between them. |
| FetchNews | - | A task that fetches news articles from the API and writes the results to a file, which is then read by dependent tasks. |
| PrintHeadlines | FetchNews | A task that prints the headlines of the news articles to the console. |
| MostFrequentAuthors | FetchNews | A task that counts the number of articles each author has written and prints the result to the console. |
An important aspect is that there is no dependency between the `PrintHeadlines` and `MostFrequentAuthors` tasks. This means they can execute in parallel, which the Tilebox Workflow Orchestrator will do, provided multiple task runners are available.
In this example, the results from `FetchNews` are stored in a file. This is not the recommended method for passing data between tasks. When executing on a distributed cluster, the existence of a file written by a dependent task cannot be guaranteed. Instead, it's better to use a [shared cache](/workflows/caches).
## Task Identifiers
A task identifier is a unique string used by the Tilebox Workflow Orchestrator to identify the task. It's used by [task runners](/workflows/concepts/task-runners) to map submitted tasks to a task class and execute them. It also serves as the default name in execution visualizations as a tree of tasks.
If unspecified, the identifier of a task defaults to the class name. For instance, the identifier of the `PrintHeadlines` task in the previous example is `"PrintHeadlines"`. This is good for prototyping, but not recommended for production, as changing the class name also changes the identifier, which can lead to issues during refactoring. It also prevents different tasks from sharing the same class name.
To address this, Tilebox Workflows offers a way to explicitly specify the identifier of a task. This is done by overriding the `identifier` method of the `Task` class. This method should return a unique string identifying the task. This decouples the task's identifier from its class name, allowing you to change the identifier without renaming the class. It also allows tasks with the same class name to have different identifiers. The `identifier` method can also specify a version number for the task—see the section on [semantic versioning](#semantic-versioning) below for more details.
```python Python
class MyTask(Task):
def execute(self, context: ExecutionContext) -> None:
pass
# MyTask has the identifier "MyTask" and the default version of "v0.0"
class MyTask2(Task):
@staticmethod
def identifier() -> tuple[str, str]:
return "tilebox.com/example_workflow/MyTask", "v1.0"
def execute(self, context: ExecutionContext) -> None:
pass
# MyTask2 has the identifier "tilebox.com/example_workflow/MyTask" and the version "v1.0"
```
The `identifier` method must be defined as either a `classmethod` or a `staticmethod`, meaning it can be called without instantiating the class.
## Semantic Versioning
As seen in the previous section, the `identifier` method can return a tuple of two strings, where the first string is the identifier and the second string is the version number. This allows for semantic versioning of tasks.
Versioning is important for managing changes to a task's execution method. It allows for new features, bug fixes, and changes while ensuring existing workflows operate as expected. Additionally, it enables multiple versions of a task to coexist, enabling gradual rollout of changes without interrupting production deployments.
You assign a version number by overriding the `identifier` method of the task class. It must return a tuple of two strings: the first is the [identifier](#task-identifiers) and the second is the version number, which must match the pattern `vX.Y` (where `X` and `Y` are non-negative integers). `X` is the major version number and `Y` is the minor version.
For example, this task has the identifier `"tilebox.com/example_workflow/MyTask"` and the version `"v1.3"`:
```python Python
class MyTask(Task):
@staticmethod
def identifier() -> tuple[str, str]:
return "tilebox.com/example_workflow/MyTask", "v1.3"
def execute(self, context: ExecutionContext) -> None:
pass
```
When a task is submitted as part of a job, the version from which it's submitted is recorded and may differ from the version on the task runner executing the task.
When task runners execute a task, they require a registered task with a matching identifier and compatible version number. A compatible version is where the major version number on the task runner matches that of the submitted task, and the minor version number on the task runner is equal to or greater than that of the submitted task.
Examples of compatible version numbers include:
* `MyTask` is submitted as part of a job. The version is `"v1.3"`.
* A task runner with version `"v1.3"` of `MyTask` would executes this task.
* A task runner with version `"v1.5"` of `MyTask` would also executes this task.
* A task runner with version `"v1.2"` of `MyTask` would not execute this task, as its minor version is lower than that of the submitted task.
* A task runner with version `"v2.5"` of `MyTask` would not execute this task, as its major version differs from that of the submitted task.
## Conclusion
Tasks form the foundation of Tilebox Workflows. By understanding how to create and manage tasks, you can leverage Tilebox's capabilities to automate and optimize your workflows. Experiment with defining your own tasks, utilizing subtasks, managing dependencies, and employing semantic versioning to develop robust and efficient workflows.
# Tilebox Workflows
The Tilebox workflow orchestrator is a parallel processing engine. It simplifies the creation of dynamic tasks that can be executed across various computing environments, including on-premise and auto-scaling clusters in public clouds.
This section provides guides showcasing how to use the Tilebox workflow orchestrator effectively. Here are some of the key learning areas:
Create tasks using the Tilebox Workflow Orchestrator.
Learn how to submit jobs to the workflow orchestrator, which schedules tasks for execution.
Learn how to set up task runners to execute tasks in a distributed manner.
Understand how to gain insights into task executions using observability features like tracing and logging.
Learn to configure shared data access for all tasks of a job using caches.
Trigger jobs based on events or schedules, such as new data availability or CRON schedules.
## Terminology
Before exploring Tilebox Workflows in depth, familiarize yourself with some common terms used throughout this section.
A Task is the smallest unit of work, designed to perform a specific operation. Each task represents a distinct operation or process that can be executed, such as processing data, performing calculations, or managing resources. Tasks can operate independently or as components of a more complex set of connected tasks known as a Workflow. Tasks are defined by their code, inputs, and dependencies on other tasks. To create tasks, you need to define the input parameters and specify the action to be performed during execution.
A job is a specific execution of a workflow with designated input parameters. It consists of one or more tasks that can run in parallel or sequentially, based on their dependencies. Submitting a job involves creating a root task with specific input parameters, which may trigger the execution of other tasks within the same job.
Task runners are the execution agents within the Tilebox Workflows ecosystem that execute tasks. They can be deployed in different computing environments, including on-premise servers and cloud-based auto-scaling clusters. Task runners execute tasks as scheduled by the workflow orchestrator, ensuring they have the necessary resources and environment for effective execution.
Clusters are a logical grouping for task runners. Using clusters, you can scope certain tasks to a specific group of task runners. Tasks, which are always submitted to a specific cluster, are only executed on task runners assigned to the same cluster.
Caches are shared storage that enable data storage and retrieval across tasks within a single job. They store intermediate results and share data among tasks, enabling distributed computing and reducing redundant data processing.
Observability refers to the feature set in Tilebox Workflows that provides visibility into the execution of tasks and jobs. Tools like tracing and logging allow users to monitor performance, diagnose issues, and gain insights into job operations, enabling efficient troubleshooting and optimization.
# Cron triggers
Trigger jobs based on a Cron schedule.
## Creating Cron tasks
Cron tasks run repeatedly on a specified [cron](https://en.wikipedia.org/wiki/Cron) schedule. To create a Cron task, use `tilebox.workflows.recurrent_tasks.CronTask` as your tasks base class instead of the regular `tilebox.workflows.Task`.
```python Python
from tilebox.workflows import ExecutionContext
from tilebox.workflows.recurrent_tasks import CronTask
class MyCronTask(CronTask):
message: str
def execute(self, context: ExecutionContext) -> None:
print(f"Hello {self.message} from a Cron Task!")
# self.trigger is an attribute of the CronTask class,
# which contains information about the trigger event
# that caused this task to be submitted as part of a job
print(f"This task was triggered at {self.trigger.time}")
```
## Registering a Cron trigger
After implementing a Cron task, register it to be triggered according to a Cron schedule. When the Cron expression matches, a new job is submitted consisting of a single task instance derived from the Cron task prototype.
```python Python
from tilebox.workflows import Client
client = Client()
recurrent_tasks = client.recurrent_tasks()
cron_task = recurrent_tasks.create_recurring_cron_task(
"my-cron-task", # name of the recurring cron task
"dev-cluster", # cluster slug to submit jobs to
MyCronTask(message="World"), # the task (and its input parameters) to run repeatedly
cron_triggers=[
"12 * * * *", # run every hour at minute 12
"45 18 * * *", # run every day at 18:45
"30 13 * * 3", # run every Wednesday at 13:30
],
)
```
The syntax for specifying the cron triggers is a [CRON expression](https://en.wikipedia.org/wiki/Cron#CRON_expression).
A helpful tool to test your cron expressions is [crontab.guru](https://crontab.guru/).
## Starting a Cron Task Runner
With the Cron task registered, a job is submitted whenever the Cron expression matches. But unless a [task runner](/workflows/concepts/task-runners) is available to execute the Cron task the submitted jobs remain in a task queue.
Once an [eligible task runner](/workflows/concepts/task-runners#task-selection) becomes available, all jobs in the queue are executed.
```python Python
from tilebox.workflows import Client
client = Client()
runner = client.runner("dev-cluster", tasks=[MyCronTask])
runner.run_forever()
```
If this task runner runs continuously, its output may resemble the following:
```plaintext Output
Hello World from a Cron Task!
This task was triggered at 2023-09-25 16:12:00
Hello World from a Cron Task!
This task was triggered at 2023-09-25 17:12:00
Hello World from a Cron Task!
This task was triggered at 2023-09-25 18:12:00
Hello World from a Cron Task!
This task was triggered at 2023-09-25 18:45:00
Hello World from a Cron Task!
This task was triggered at 2023-09-25 19:12:00
```
## Inspecting in the Console
The [Tilebox Console](https://console.tilebox.com/workflows/recurrent-tasks) provides a straightforward way to inspect all registered Cron tasks.
Use the console to view, edit, and delete the registered Cron tasks.
## Deleting Cron triggers
To delete a registered Cron task, use `recurrent_tasks.delete`. After deletion, no new jobs will be submitted by that Cron trigger. Past jobs already triggered will still remain queued.
```python Python
from tilebox.workflows import Client
client = Client()
recurrent_tasks = client.recurrent_tasks()
# delete the task as returned by create_recurring_cron_task
recurrent_tasks.delete(cron_task)
# or manually by id:
recurrent_tasks.delete("0190bafc-b3b8-88c4-008b-a5db044380d0")
```
## Submitting Cron jobs manually
You can submit Cron tasks as regular tasks for testing purposes or as part of a larger workflow. To do so, instantiate the task with a specific trigger time using the `once` method.
Submitting a job with a Cron task using `once` immediately schedules the task, and a runner may pick it up and execute it. The trigger time set in the `once` method does not influence the execution time; it only sets the `self.trigger.time` attribute for the Cron task.
```python Python
from datetime import datetime, timezone
job_client = client.jobs()
# create a Cron task prototype
task = MyCronTask(message="Hello")
# submitting it directly won't work: raises ValueError:
# CronTask cannot be submitted without being triggered. Use task.once().
job_client.submit("manual-cron-job", task, cluster="dev-cluster")
# specify a trigger time to submit the task as a regular task
triggered_task = task.once() # same as task.once(datetime.now())
job_client.submit("manual-cron-job", triggered_task, cluster="dev-cluster")
# simulate a trigger at a specific time
triggered_task = task.once(datetime(2030, 12, 12, 15, 15, tzinfo=timezone.utc))
# the task will be scheduled to run immediately, even with a future trigger time
# but the self.trigger.time will be 2023-12-12T15:15:00Z for the task instance
job_client.submit("manual-cron-job", triggered_task, cluster="dev-cluster")
```
# Recurrent Tasks
Process data in near-real-time by triggering jobs based on external events
## Introduction
Tilebox Workflows can execute jobs in two ways: a one-time execution triggered by a user, typically a batch processing, and near-real-time execution based on specific external events. By defining trigger conditions, you can automatically submit jobs based on external events.
Tilebox Workflows currently supports the following trigger conditions:
Trigger jobs based on a Cron schedule.
Trigger jobs after objects are created or modified in a storage location such as a cloud bucket.
Dataset Event Triggers, which will trigger jobs when new data points are ingested into a Tilebox dataset, are on the roadmap. Stay tuned for updates.
## Recurrent Tasks
To create a trigger, define a special task that serves as a prototype. In response to a trigger condition met, this task will be submitted as a new job. Such tasks are referred to as recurrent tasks.
Each recurrent task has a [task identifier](/workflows/concepts/tasks#task-identifiers), a [version](/workflows/concepts/tasks#semantic-versioning), and [input parameters](/workflows/concepts/tasks#input-parameters), just like regular tasks.
Recurrent tasks also automatically provide a special `trigger` attribute that contains information about the event that initiated the task's execution.
## Recurrent Task Client
The Tilebox Workflows client includes a sub-client for managing recurrent tasks. You can create this sub-client by calling the `recurrent_tasks` method on the main client instance.
### Listing Registered Recurrent Tasks
To list all registered recurrent tasks, use the `all` method on the recurrent task client.
```python Python
from tilebox.workflows import Client
client = Client()
recurrent_tasks = client.recurrent_tasks()
recurrent_tasks = recurrent_tasks.all()
print(recurrent_tasks)
```
```plaintext Output
[
RecurrentTaskPrototype(
name='Run MyCronTask every hour at 15 minutes past the hour',
prototype=TaskSubmission(
cluster_slug='dev-cluster',
identifier=TaskIdentifier(name='MyCronTask', version='v0.0'),
input=b'{"message": "Hello"},
dependencies=[],
display='MyCronTask',
max_retries=0),
storage_event_triggers=[],
cron_triggers=[CronTrigger(schedule='15 * * * *')],
)
]
```
### Registering Recurrent Tasks
To register a recurrent task, use the `create_recurring_...` methods specific to each trigger type provided by the recurrent task client. Refer to the documentation for each trigger type for more details.
## Overview in the Tilebox Console
You can also use the Tilebox Console to manage recurrent tasks. Visit [the recurrent tasks section](https://console.tilebox.com/workflows/recurrent-tasks) to check it out.
You can also register new recurrent tasks or edit and delete existing ones directly from the console.
# Storage Event Triggers
Trigger jobs after objects are created or modified in a storage location
## Creating a Storage Event Task
Storage Event Tasks are recurring tasks triggered when objects are created or modified in a [storage location](#storage-locations). To create a Cron task, use `tilebox.workflows.recurrent_tasks.StorageEventTask` as your tasks base class instead of the regular `tilebox.workflows.Task`.
```python Python
from tilebox.workflows import ExecutionContext
from tilebox.workflows.recurrent_tasks import StorageEventTask, StorageEventType
from tilebox.workflows.observability.logging import get_logger
logger = get_logger()
class LogObjectCreation(StorageEventTask):
head_bytes: int
def execute(self, context: ExecutionContext) -> None:
# self.trigger is an attribute of the StorageEventTask class,
# which contains information about the storage event that triggered this task
if self.trigger.type == StorageEventType.CREATED:
path = self.trigger.location
logger.info(f"A new object was created: {path}")
# trigger.storage is a storage client for interacting with the storage
# location that triggered the event
# using it, we can read the object to get its content as a bytes object
data = self.trigger.storage.read(path)
logger.info(f"The object's file size is {len(data)} bytes")
logger.info(f"The object's first {self.head_bytes} bytes are: {data[:self.head_bytes]}")
```
## Storage Locations
Storage Event tasks are triggered when objects are created or modified in a storage location. This location can be a cloud storage bucket or a local file system. Tilebox supports the following storage locations:
### Registering a Storage Location
To make a storage location available within Tilebox workflows, it must be registered first. This involves specifying the location and setting up a notification system that forwards events to Tilebox, enabling task triggering. The setup varies depending on the storage location type.
For example, a GCP storage bucket is integrated by setting up a [PubSub Notification with a push subscription](https://cloud.google.com/storage/docs/pubsub-notifications). A local file system requires installing a filesystem watcher. To set up a storage location registered with Tilebox, please [get in touch](mailto:engineering@tilebox.com).
### Listing Available Storage Locations
To list all available storage locations, use the `all` method on the storage location client.
```python Python
from tilebox.workflows import Client
client = Client()
recurrent_tasks_client = client.recurrent_tasks()
storage_locations = recurrent_tasks_client.storage_locations()
print(storage_locations)
```
```plaintext Output
[
StorageLocation(
location="gcp-project:gcs-bucket-fab3fa2",
type=StorageType.GCS,
),
StorageLocation(
location="s3-bucket-263af15",
type=StorageType.S3,
),
StorageLocation(
location='/path/to/a/local/folder',
type=StorageType.FS,
),
]
```
### Reading Files from a Storage Location
Once a storage location is registered, you can read files from it using the `read` method on the storage client.
```python Python
gcs_bucket = storage_locations[0]
s3_bucket = storage_locations[1]
local_folder = storage_locations[2]
gcs_object = gcs_bucket.read("my-object.txt")
s3_object = s3_bucket.read("my-object.txt")
local_object = local_folder.read("my-object.txt")
```
The `read` method instantiates a client for the specific storage location. This requires that
the storage location is accessible by a task runner and may require credentials for cloud storage
or physical/network access to a locally mounted file system.
To set up authentication and enable access to a GCS storage bucket, check out the [Google Client docs for authentication](https://cloud.google.com/docs/authentication/client-libraries#python).
## Registering a Storage Event Trigger
After implementing a Storage Event task, register it to trigger each time a storage event occurs. This registration submits a new job consisting of a single task instance derived from the registered Storage Event task prototype.
```python Python
from tilebox.workflows import Client
client = Client()
recurrent_tasks = client.recurrent_tasks()
storage_event_task = recurrent_tasks.create_recurring_storage_event_task(
"log-object-creations", # name of the recurring storage event task
"dev-cluster", # cluster slug to submit jobs to
LogObjectCreation(head_bytes=20), # the task (and its input parameters) to run repeatedly
triggers=[
# you can specify a glob pattern:
# run every time a .txt file is created anywhere in the gcs bucket
(gcs_bucket, "**.txt"),
],
)
```
The syntax for specifying glob patterns follows [Standard Wildcards](https://tldp.org/LDP/GNU-Linux-Tools-Summary/html/x11655.htm).
Additionally, you can use `**` as a super-asterisk, a matching operator not sensitive to slash separators.
Here are some examples of valid glob patterns:
| Pattern | Matches |
| ----------- | ---------------------------------------------------------------------------- |
| `*.ext` | Any file ending in `.ext` in the root directory |
| `**/*.ext` | Any file ending in `.ext` in any subdirectory, but not in the root directory |
| `**.ext` | Any file ending in `.ext` in any subdirectory, including the root directory |
| `folder/*` | Any file directly in a `folder` subdirectory |
| `folder/**` | Any file directly or recursively part of a `folder` subdirectory |
| `[a-z].txt` | Matches `a.txt`, `b.txt`, etc. |
## Start a Storage Event Task Runner
With the Storage Event task registered, a job is submitted whenever a storage event occurs. But unless a [task runner](/workflows/concepts/task-runners) is available to execute the Storage Event task the submitted jobs remain in a task queue.
Once an [eligible task runner](/workflows/concepts/task-runners#task-selection) becomes available, all jobs in the queue are executed.
```python Python
from tilebox.workflows import Client
client = Client()
runner = client.runner("dev-cluster", tasks=[LogObjectCreation])
runner.run_forever()
```
### Triggering an Event
Creating an object in the bucket where the task is registered results in a job being submitted:
```bash Creating an object
echo "Hello World" > my-object.txt
gcloud storage cp my-object.txt gs://gcs-bucket-fab3fa2
```
Inspecting the task runner output reveals that the job was submitted and the task executed:
```plaintext Output
2024-09-25 16:51:45,621 INFO A new object was created: my-object.txt
2024-09-25 16:51:45,857 INFO The object's file size is 12 bytes
2024-09-25 16:51:45,858 INFO The object's first 20 bytes are: b'Hello World\n'
```
## Inspecting in the Console
The [Tilebox Console](https://console.tilebox.com/workflows/recurrent-tasks) provides an easy way to inspect all registered storage event tasks.
## Deleting Storage Event triggers
To delete a registered storage event task, use `recurrent_tasks.delete`. After deletion, no new jobs will be submitted by the storage event trigger. Past jobs already triggered will still remain queued.
```python Python
from tilebox.workflows import Client
client = Client()
recurrent_tasks = client.recurrent_tasks()
# delete the task as returned by create_recurring_storage_event_task
recurrent_tasks.delete(storage_event_task)
# or manually by id:
recurrent_tasks.delete("0190bafc-b3b8-88c4-008b-a5db044380d0")
```
## Submitting Storage Event jobs manually
You can submit Storage event tasks as regular tasks for testing purposes or as part of a larger workflow. To do so, instantiate the task with a specific storage location and object name using the `once` method.
```python Python
job_client = client.jobs()
task = LogObjectCreation(head_bytes=20)
# submitting it directly won't work; raises ValueError:
# StorageEventTask cannot be submitted without being triggered. Use task.once().
job_client.submit(
"manual-storage-event-job",
task,
cluster="dev-cluster"
)
# instead, specify a trigger,
# so that we can submit the task as a regular task
triggered_task = task.once(gcs_bucket, "my-object.txt")
job_client.submit(
"manual-storage-event-job",
triggered_task,
cluster="dev-cluster"
)
```
# Logging
Set up distributed logging using the OpenTelemetry logging protocol
## Overview
Tilebox workflows are designed for distributed execution, making it essential to set up logging to a centralized system. Tilebox supports OpenTelemetry logging, which simplifies sending log messages from your tasks to a chosen backend. Collecting and visualizing logs from a distributed cluster of task runners in a tool like [Axiom](https://axiom.co/) can look like this:
## Configure logging
The Tilebox workflow SDKs include support for exporting OpenTelemetry logs. To enable logging, call the appropriate configuration functions during the startup of your[task runner](/workflows/concepts/task-runners). Then, use the provided `logger` to send log messages from your tasks.
To configure logging with Axiom, you first need to create a [Axiom Dataset](https://axiom.co/docs/reference/datasets) to export your workflow logs to. You will also need an [Axiom API key](https://axiom.co/docs/reference/tokens) with the necessary write permissions for your Axiom dataset.
```python Python
from tilebox.workflows import Client, Task, ExecutionContext
from tilebox.workflows.observability.logging import configure_otel_logging_axiom
# your own workflow:
from my_workflow import MyTask
def main():
configure_otel_logging_axiom(
# specify an Axiom dataset to export logs to
dataset="my-axiom-logs-dataset",
# along with an Axiom API key with ingest permissions for that dataset
api_key="my-axiom-api-key",
)
# the task runner will export logs from
# the executed tasks to the specified dataset
client = Client()
runner = client.runner("dev-cluster", tasks=[MyTask])
runner.run_forever()
if __name__ == "__main__":
main()
```
Setting the environment variables `AXIOM_API_KEY` and `AXIOM_LOGS_DATASET` allows you to omit these arguments in the `configure_otel_logging_axiom` function.
If you are using another OpenTelemetry-compatible backend besides Axiom, such as OpenTelemetry Collector or Jaeger, you can configure logging by specifying the URL endpoint to export log messages to.
```python Python
from tilebox.workflows import Client
from tilebox.workflows.observability.logging import configure_otel_logging
# your own workflow:
from my_workflow import MyTask
def main():
configure_otel_logging(
# specify an endpoint to export logs to, such as a
# locally running instance of OpenTelemetry Collector
endpoint="http://localhost:4318/v1/logs",
# optional headers for each request
headers={"Authorization": "Bearer some-api-key"},
)
# the task runner will export logs from
# the executed tasks to the specified endpoint
client = Client()
runner = client.runner("dev-cluster", tasks=[MyTask])
runner.run_forever()
if __name__ == "__main__":
main()
```
If you set the environment variable `OTEL_LOGS_ENDPOINT`, you can omit that argument in the `configure_otel_logging` function.
To log messages to the standard console output, use the `configure_console_logging` function.
```python Python
from tilebox.workflows import Client
from tilebox.workflows.observability.logging import configure_console_logging
# your own workflow:
from my_workflow import MyTask
def main():
configure_console_logging()
# the task runner will print log messages from
# the executed tasks to the console
client = Client()
runner = client.runner("dev-cluster", tasks=[MyTask])
runner.run_forever()
if __name__ == "__main__":
main()
```
The console logging backend is not recommended for production use. Log messages will be emitted to the standard output of each task runner rather than a centralized logging system. It is intended for local development and testing of workflows.
## Emitting log messages
Use the logger provided by the Tilebox SDK to emit log messages from your tasks. You can then use it to send log messages to the [configured logging backend](#configure-logging).
Log messages emitted within a task's `execute` method are also automatically recorded as span events for the current [job trace](/workflows/observability/tracing).
```python Python
import logging
from tilebox.workflows import Task, ExecutionContext
from tilebox.workflows.observability.logging import get_logger
logger = get_logger()
class MyTask(Task):
def execute(self, context: ExecutionContext) -> None:
# emit a log message to the configured OpenTelemetry backend
logger.info("Hello world from configured logger!")
```
## Logging task runner internals
Tilebox task runners also internally use a logger. By default, it's set to the WARNING level, but you can change it by explicitly configuring a logger for the workflows client when constructing the task runner.
```python Python
from tilebox.workflows import Client
from tilebox.workflows.observability.logging import configure_otel_logging_axiom
from tilebox.workflows.observability.logging import get_logger
# configure Axiom or another logging backend
configure_otel_logging_axiom(
dataset="my-axiom-logs-dataset",
api_key="my-axiom-api-key",
)
# configure a logger for the Tilebox client at the INFO level
client = Client()
client.configure_logger(get_logger(level=logging.INFO))
# now the task runner inherits this logger and uses
# it to emit its own internal log messages as well
runner = client.runner("dev-cluster", tasks=[MyTask])
runner.run_forever()
```
# OpenTelemetry Integration
Integrate OpenTelemetry into your Tilebox Workflows
## Observability
Effective observability is essential for building reliable workflows. Understanding and monitoring the execution of workflows and their tasks helps ensure correctness and efficiency. This section describes methods to gain insights into your workflow's execution.
## OpenTelemetry
Tilebox Workflows is designed with [OpenTelemetry](https://opentelemetry.io/) in mind, which provides a set of APIs and libraries for instrumenting, generating, collecting, and exporting telemetry data (metrics, logs, and traces) in distributed systems.
Tilebox Workflows currently supports OpenTelemetry for tracing and logging, with plans to include metrics in the future.
## Integrations
Tilebox exports telemetry data using the [OpenTelemetry Protocol](https://opentelemetry.io/docs/specs/otlp/).
Axiom, a cloud-based observability and telemetry platform, supports this protocol. Tilebox Workflows has built-in support for Axiom, and the examples and screenshots in this section come from this integration.
Additionally, any other OpenTelemetry-compatible backend, such as OpenTelemetry Collector or Jaeger, can be used to collect telemetry data generated by Tilebox Workflows.
# Tracing
Record the execution of your workflow tasks as OpenTelemetry traces and spans
## Overview
Applying [OpenTelemetry traces](https://opentelemetry.io/docs/concepts/signals/traces/) to the concept of workflows allows you to monitor the execution of your jobs and their individual tasks. Visualizing the trace for a job in a tool like [Axiom](https://axiom.co/) may look like this:
Tracing your workflows enables you to easily observe:
* The order of task execution
* Which tasks run in parallel
* The [task runner](/workflows/concepts/task-runners) handling each task
* The duration of each task
* The outcome of each task (success or failure)
This information helps identify bottlenecks and performance issues, ensuring that your workflows execute correctly.
## Configure tracing
The Tilebox workflow SDKs have built-in support for exporting OpenTelemetry traces. To enable tracing, call the appropriate configuration functions during the startup of your [task runner](/workflows/concepts/task-runners).
To configure tracing with Axiom, you first need to create a [Axiom Dataset](https://axiom.co/docs/reference/datasets) to export your workflow traces to. You will also need an [Axiom API key](https://axiom.co/docs/reference/tokens) with the necessary write permissions for your Axiom dataset.
```python Python
from tilebox.workflows import Client
from tilebox.workflows.observability.tracing import configure_otel_tracing_axiom
# your own workflow:
from my_workflow import MyTask
def main():
configure_otel_tracing_axiom(
# specify an Axiom dataset to export traces to
dataset="my-axiom-traces-dataset",
# along with an Axiom API key for ingest permissions
api_key="my-axiom-api-key",
)
# the following task runner generates traces for executed tasks and
# exports trace and span data to the specified Axiom dataset
client = Client()
runner = client.runner("dev-cluster", tasks=[MyTask])
runner.run_forever()
if __name__ == "__main__":
main()
```
Set the environment variables `AXIOM_API_KEY` and `AXIOM_TRACES_DATASET` to omit those arguments
in the `configure_otel_tracing_axiom` function.
If you are using another OpenTelemetry-compatible backend besides Axiom, like OpenTelemetry Collector or Jaeger, you can configure tracing by specifying the URL endpoint to export traces to.
```python Python
from tilebox.workflows import Client
from tilebox.workflows.observability.tracing import configure_otel_tracing
# your own workflow:
from my_workflow import MyTask
def main():
configure_otel_tracing(
# specify an endpoint for trace ingestion, such as a
# locally running instance of OpenTelemetry Collector
endpoint="http://localhost:4318/v1/traces",
# optional headers for each request
headers={"Authorization": "Bearer some-api-key"},
)
# the following task runner generates traces for executed tasks and
# exports trace and span data to the specified endpoint
client = Client()
runner = client.runner("dev-cluster", tasks=[MyTask])
runner.run_forever()
if __name__ == "__main__":
main()
```
Set the environment variable `OTEL_TRACES_ENDPOINT` to omit that argument
in the `configure_otel_tracing` function.
Once the runner picks up tasks and executes them, corresponding traces and spans are automatically generated and exported to the configured backend.