# AI Assistance Large Language Models (LLMs) are powerful tools for exploring and learning about Tilebox. This section explains how to provide them with Tilebox-specific context for tailored, relevant and up-to-date responses. ## Providing Tilebox-specific context AI assistants and Large Language Models (LLMs) can answer questions, guide you in using Tilebox, suggest relevant sections in the documentation, and assist you in creating workflows. Download the complete Tilebox documentation as a plain text file to share with your AI assistant or language model. The full content of the Tilebox documentation is available in plain markdown format at [https://docs.tilebox.com/llms-full.txt](https://docs.tilebox.com/llms-full.txt). Upload this document to your AI assistant or LLM to provide it with Tilebox-specific context. The [Documentation Context](https://docs.tilebox.com/llms-full.txt) is updated whenever the documentation changes. If you download the file, refresh it occasionally to stay up-to-date. ## Example prompt After you upload the [Documentation Context](https://docs.tilebox.com/llms-full.txt) to your AI assistant or LLM, you can ask it questions and receive tailored, up-to-date responses. Here's an example prompt to get you started. ```txt Example prompt Generate a script that - Creates a cluster - Configures console logging - Contains a Fibonacci calculator workflow that is using a local filesystem cache. Make sure you get task dependencies right. Make sure to only print the final result as the last step of the workflow. Write logs to let us know what is happening. - Submits a job for the number 7 - Starts a processing runner Do not invent APIs. All available Tilebox APIs are documented. ``` ## Claude [Claude 3.5 Sonnet](https://docs.anthropic.com/en/docs/about-claude/models) is a great choice for an AI assistant for using Tilebox. To provide Claude with Tilebox-specific context, create a [new project](https://support.anthropic.com/en/articles/9517075-what-are-projects), and upload the [llms-full.txt](https://docs.tilebox.com/llms-full.txt) as project knowledge. You can then ask questions or use it to generate scripts. ## ChatGPT Experiments with [GPT-4o](https://chatgpt.com/?model=gpt-4o) have shown mixed results. While it effectively answers questions, it has difficulties generating workflow scripts, often misapplying or inventing APIs. Consider using [Claude 3.5 Sonnet](#claude) for a better experience. # Client ```python class Client(url: str, token: str) ``` Create a Tilebox datasets client. ## Parameters Tilebox API Url. Defaults to `https://api.tilebox.com`. The API Key to authenticate with. If not set the `TILEBOX_API_KEY` environment variable will be used. ```python Python from tilebox.datasets import Client # read token from an environment variable client = Client() # or provide connection details directly client = Client( url="https://api.tilebox.com", token="YOUR_TILEBOX_API_KEY" ) ``` # Client.dataset ```python def Client.dataset(slug: str) -> Dataset ``` Get a dataset by its slug. ## Parameters The slug of the dataset ## Returns A dataset object. ```python Python s1_sar = client.dataset( "open_data.copernicus.sentinel1_sar" ) ``` ## Errors The specified dataset does not exist. # Client.datasets ```python def Client.datasets() -> Datasets ``` Fetch all available datasets. ## Returns An object containing all available datasets, accessible via dot notation. ```python Python datasets = client.datasets() s1_sar = datasets.open_data.copernicus.sentinel1_sar ``` # Collection.find ```python def Collection.find( datapoint_id: str, skip_data: bool = False ) -> xarray.Dataset ``` Find a specific datapoint in a collection by its id. ## Parameters The id of the datapoint to find. Whether to skip loading the data for the datapoint. If `True`, only the metadata for the datapoint is loaded. ## Returns An [`xarray.Dataset`](/sdks/python/xarray) containing the requested data point. ## Errors The specified datapoint does not exist in this collection. ```python Python data = collection.find( "0186d6b6-66cc-fcfd-91df-bbbff72499c3", skip_data = False, ) ``` # Collection.info ```python def Collection.info() -> CollectionInfo ``` Fetch metadata about the datapoints in this collection. ## Returns A collection info object. ```python Python info = collection.info() ``` # Collection.load ```python def Collection.load( time_or_interval: TimeIntervalLike, skip_data: bool = False, show_progress: bool = False ) -> xarray.Dataset ``` Load a range of data points in this collection in a specified interval. If no data exists for the requested time or interval, an empty `xarray.Dataset` is returned. ## Parameters The time or time interval for which to load data. This can be a single time scalar, a tuple of two time scalars, or an array of time scalars. Valid time scalars are: `datetime.datetime` objects, strings in ISO 8601 format, or Unix timestamps in seconds. Behavior for each input type: * **TimeScalar**: If a single time scalar is provided, `load` returns all data points for that exact millisecond. * **TimeInterval**: If a time interval is provided, `load` returns all data points in that interval. Intervals can be a tuple of two `TimeScalars` or a `TimeInterval` object. Tuples are interpreted as a half-open interval `[start, end)`. With a `TimeInterval` object, the `start_exclusive` and `end_inclusive` parameters control whether the start and end time are inclusive or exclusive. * **Iterable\[TimeScalar]**: If an array of time scalars is provided, `load` constructs a time interval from the first and last time scalar in the array. Here, both the `start` and `end` times are inclusive. If `True`, the response contains only the [datapoint metadata](/datasets/timeseries) without the actual dataset-specific fields. Defaults to `False`. If `True`, a progress bar is displayed when pagination is required. Defaults to `False`. ## Returns An [`xarray.Dataset`](/sdks/python/xarray) containing the requested data points. ```python Python from datetime import datetime from tilebox.clients.core.data import TimeInterval # loading a specific time time = "2023-05-01 12:45:33.423" data = collection.load(time) # loading a time interval interval = ("2023-05-01", "2023-08-01") data = collection.load(interval, show_progress=True) # loading a time interval with TimeInterval interval = TimeInterval( start=datetime(2023, 5, 1), end=datetime(2023, 8, 1), start_exclusive=False, end_inclusive=False, ) data = collection.load(interval, show_progress=True) # loading with an iterable meta_data = collection.load(..., skip_data=True) first_50 = collection.load(meta_data.time[:50], skip_data=False) ``` # Dataset.collection ```python def Dataset.collection(name: str) -> Collection ``` Access a specific collection by its name. ## Parameters The name of the collection ## Returns A collection object. ```python Python collections = dataset.collection("My-collection") ``` ## Errors The specified collection does not exist in the dataset. # Dataset.collections ```python def Dataset.collections() -> dict[str, Collection] ``` List the available collections in a dataset. ## Returns A dictionary mapping collection names to collection objects. ```python Python collections = dataset.collections() ``` # Client ```python class Client(url: str, token: str) ``` Create a Tilebox workflows client. ## Parameters Tilebox API Url. Defaults to `https://api.tilebox.com`. The API Key to authenticate with. If not set the `TILEBOX_API_KEY` environment variable will be used. ## Sub clients The workflows client exposes sub clients for interacting with different parts of the Tilebox workflows API. ```python def Client.jobs() -> JobClient ``` A client for interacting with jobs. ```python def Client.clusters() -> ClusterClient ``` A client for managing clusters. ```python def Client.recurrent_tasks() -> RecurrentTaskClient ``` A client for scheduling recurrent tasks. ## Task runners ```python def Client.runner(...) -> TaskRunner ``` A client is also used to instantiate task runners. Check out the [`Client.runner` API reference](/api-reference/tilebox.workflows/Client.runner) for more information. ```python Python from tilebox.workflows import Client # read token from an environment variable client = Client() # or provide connection details directly client = Client( url="https://api.tilebox.com", token="YOUR_TILEBOX_API_KEY" ) # access sub clients job_client = client.jobs() cluster_client = client.clusters() recurrent_task_client = client.recurrent_tasks() # or instantiate a task runner runner = client.runner("dev-cluster", tasks=[...]) ``` # Client.runner ```python def Client.runner( cluster: ClusterSlugLike, tasks: list[type[Task]], cache: JobCache | None = None ) -> TaskRunner ``` Initialize a task runner. ## Parameters The [cluster slug](/workflows/concepts/clusters#managing-clusters) for the cluster associated with this task runner. A list of task classes that this runner can execute. An optional [job cache](/workflows/caches) for caching results from tasks and sharing data between tasks. ```python Python from tilebox.workflows import Client from tilebox.workflows.cache import LocalFileSystemCache client = Client() runner = client.runner( "my-cluster-EdsdUozYprBJDL", [MyFirstTask, MySubtask], # optional: cache=LocalFileSystemCache("cache_directory"), ) ``` # Context.job_cache ```python ExecutionContext.job_cache: JobCache ``` Access the job cache for a task. ```python Python # within a Tasks execute method, access the job cache # def execute(self, context: ExecutionContext): context.job_cache["some-key"] = b"my-value" ``` # Context.submit_subtask ```python def ExecutionContext.submit_subtask( task: Task, depends_on: list[FutureTask] = None, cluster: str | None = None, max_retries: int = 0 ) -> FutureTask ``` Submit a [subtask](/workflows/concepts/tasks#subtasks-and-task-composition) from a currently executing task. ## Parameters The task to submit as a subtask. An optional list of tasks already submitted within the same context that this subtask depends on. An optional [cluster slug](/workflows/concepts/clusters#managing-clusters) for running the subtask. If not provided, the subtask runs on the same cluster as the parent task. Specify the maximum number of [retries](/workflows/concepts/tasks#retry-handling) for the subtask in case of failure. The default is 0. ## Returns A `FutureTask` object that represents the submitted subtask. Can be used to set up dependencies between tasks. ```python Python # within the execute method of a Task: subtask = context.submit_subtask(MySubtask()) dependent_subtask = context.submit_subtask( MyOtherSubtask(), depends_on=[subtask] ) gpu_task = context.submit_subtask( MyGPUTask(), cluster="gpu-cluster-slug" ) flaky_task = context.submit_subtask( MyFlakyTask(), max_retries=5 ) ``` # JobCache.__iter__ ```python def JobCache.__iter__() -> Iterator[str] ``` List all available keys in a job cache group. Only keys that are direct children of the current group are returned. For nested groups, first access the group and then iterate over its keys. ```python Python # within a Tasks execute method, access the job cache # def execute(context: ExecutionContext) cache = context.job_cache # iterate over all keys for key in cache: print(cache[key]) # or collect as list keys = list(cache) # also works with nested groups for key in cache.group("some-group"): print(cache[key]) ``` # JobCache.group ```python def JobCache.group(name: str) -> JobCache ``` You can nest caches in a hierarchical manner using [groups](/workflows/caches#groups-and-hierarchical-keys). Groups are separated by a forward slash (/) in the key. This hierarchical structure functions similarly to a file system. ## Parameters The name of the cache group. ```python Python # within a Tasks execute method, access the job cache # def execute(context: ExecutionContext) cache = context.job_cache # use groups to nest related cache values group = cache.group("some-group") group["value"] = b"my-value" nested = group.group("nested") nested["value"] = b"nested-value" # access nested groups directly via / notation value = cache["some-group/nested/value"] ``` # JobClient.cancel ```python def JobClient.cancel(job_or_id: Job | str) ``` Cancel a job. When a job is canceled, no queued tasks will be picked up by task runners and executed even if task runners are idle. Tasks that are already being executed will finish their execution and not be interrupted. All sub-tasks spawned from such tasks after the cancellation will not be picked up by task runners. ## Parameters The job or job id to cancel. ```python Python job_client.cancel(job) ``` # JobClient.retry ```python def JobClient.retry(job_or_id: Job | str) ``` Retry a job. All failed tasks will become queued again, and queued tasks will be picked up by task runners again. ## Parameters The job or job id to retry. ```python Python job_client.retry(job) ``` # JobClient.submit ```python def JobClient.submit( job_name: str, root_task_or_tasks: Task | Iterable[Task], cluster: str | Cluster | Iterable[str | Cluster], max_retries: int = 0 ) -> Job ``` ## Parameters The name of the job. The root task for the job. This task is executed first and can submit subtasks to manage the entire workflow. A job can have optionally consist of multiple root tasks. The [cluster slug](/workflows/concepts/clusters#managing-clusters) for the cluster to run the root task on. In case of multiple root tasks, a list of cluster slugs can be provided. The maximum number of [retries](/workflows/concepts/tasks#retry-handling) for the subtask in case it fails. Defaults to 0. ```python Python from my_workflow import MyTask job = job_client.submit( "my-job", MyTask( message="Hello, World!", value=42, data={"key": "value"} ), "my-cluster-EdsdUozYprBJDL", max_retries=0, ) ``` # JobClient.visualize ```python def JobClient.visualize( job_or_id: Job | str, direction: str = "down", layout: str = "dagre", sketchy: bool = True ) ``` Create a visualization of a job as a diagram. If you are working in an interactive environment, such as Jupyter notebooks, you can use the `display` method to display the visualized job diagram directly in the notebook. ## Parameters The job to visualize. The direction for the diagram to flow. For more details, see the relevant section in the [D2 documentation](https://d2lang.com/tour/layouts/#direction). Valid values are `up`, `down`, `right`, and `left`. The default value is `down`. The layout engine for the diagram. For further information, refer to the [D2 layout engines](https://d2lang.com/tour/layouts). Valid values are `dagre` and `elk`, with the default being `dagre`. Indicates whether to use a sketchy, hand-drawn style for the diagram. The default is `True`. ```python Python svg = job_client.visualize(job) Path("diagram.svg").write_text(svg) # in a notebook job_client.display(job) ``` # Task ```python class Task: def execute(context: ExecutionContext) -> None @staticmethod def identifier() -> tuple[str, str] ``` Base class for Tilebox workflows [tasks](/workflows/concepts/tasks). Inherit from this class to create a task. Inheriting also automatically applies the dataclass decorator. ## Methods ```python def Task.execute(context: ExecutionContext) -> None ``` The entry point for the execution of the task. ```python @staticmethod def Task.identifier() -> tuple[str, str] ``` Override a task identifier and specify its version. If not overridden, the identifier of a task defaults to the class name, and the version to `v0.0`. ## Task Input Parameters ```python class MyTask(Task): message: str value: int data: dict[str, int] ``` Optional task [input parameters](/workflows/concepts/tasks#input-parameters), defined as class attributes. Supported types are `str`, `int`, `float`, `bool`, as well as `lists` and `dicts` thereof. ```python Python from tilebox.workflows import Task, ExecutionContext class MyFirstTask(Task): def execute(self, context: ExecutionContext): print(f"Hello World!") @staticmethod def identifier() -> tuple[str, str]: return ("tilebox.workflows.MyTask", "v3.2") class MyFirstParameterizedTask(Task): name: str greet: bool data: dict[str, str] def execute(self, context: ExecutionContext): if self.greet: print(f"Hello {self.name}!") ``` # TaskRunner.run_all ```python def TaskRunner.run_all() ``` Run the task runner and execute all tasks, until there are no more tasks available. ```python Python # run all tasks until no more tasks are available runner.run_all() ``` # TaskRunner.run_forever ```python def TaskRunner.run_forever() ``` Run the task runner forever. This will poll for new tasks and execute them as they come in. If no tasks are available, it will sleep for a short time and then try again. ```python Python # run forever, until the current process is stopped runner.run_forever() ``` # Authentication To access the Tilebox API, you must authenticate your requests. This guide explains how authentication works, focusing on API keys used as bearer tokens. ## Creating an API key To create an API key, log into the [Tilebox Console](https://console.tilebox.com). Navigate to [Account -> API Keys](https://console.tilebox.com/account/api-keys) and click the "Create API Key" button. Keep your API keys secure. Deactivate any keys if you suspect they have been compromised. ## Bearer token authentication The Tilebox API uses bearer tokens for authentication. You need to pass your API key as the `token` parameter when creating an instance of the client. ```python Python from tilebox.datasets import Client as DatasetsClient from tilebox.workflows import Client as WorkflowsClient datasets_client = DatasetsClient(token="YOUR_TILEBOX_API_KEY") workflows_client = WorkflowsClient(token="YOUR_TILEBOX_API_KEY") ``` If you set your API key as an environment variable named `TILEBOX_API_KEY`, you can skip the token parameter when creating a client. # Console Explore your datasets and workflows with the Tilebox Console The [Tilebox Console](https://console.tilebox.com) is a web interface that enables you to explore your datasets and workflows, manage your account and API keys, add collaborators to your team and monitor your usage. ## Datasets The datasets explorer lets you explore available datasets for your team. You can select a dataset, view its collections, and load data for a collection within a specified time range. Tilebox Console Tilebox Console When you click a specific event time in the data point list view, a detailed view of that data point will appear. Tilebox Console Tilebox Console ### Export as Code After selecting a dataset, collection, and time range, you can export the current selection as a Python code snippet. This will copy a code snippet like the one below to your clipboard. ```python Python from tilebox.datasets import Client client = Client() datasets = client.datasets() sentinel2_msi = datasets.open_data.copernicus.sentinel2_msi data = sentinel2_msi.collection("S2A_S2MSI1C").load( ("2024-07-12", "2024-07-26"), show_progress=True, ) ``` Paste the snippet into a [notebook](/sdks/python/sample-notebooks) to interactively explore the [`xarray.Dataset`](/sdks/python/xarray) that is returned. ## Workflows The workflows section of the console allows you to view jobs, create clusters and create recurring and near real-time tasks. Tilebox Console Tilebox Console ## Account ### API Keys The API Keys page enables you to manage your API keys. You can create new API keys, revoke existing ones, and view currently active API keys. Tilebox Console Tilebox Console ### Usage The Usage page allows you to view your current usage of the Tilebox API. Tilebox Console Tilebox Console # Collections Learn about time series dataset collections Collections group data points within a dataset. They help represent logical groupings of data points that are commonly queried together. For example, if your dataset includes data from a specific instrument on different satellites, you can group the data points from each satellite into a collection. ## Overview This section provides a quick overview of the API for listing and accessing collections. Below are some usage examples for different scenarios. | Method | API Reference | Description | | --------------------- | ---------------------------------------------------------------------------- | --------------------------------------------- | | `dataset.collections` | [Listing collections](/api-reference/tilebox.datasets/Dataset.collections) | List all available collections for a dataset. | | `dataset.collection` | [Accessing a collection](/api-reference/tilebox.datasets/Dataset.collection) | Access an individual collection by its name. | | `collection.info` | [Collection information](/api-reference/tilebox.datasets/Collection.info) | Request information about a collection. | Refer to the examples below for common use cases when working with collections. These examples assume that you have already [created a client](/datasets/introduction#creating-a-datasets-client) and [listed the available datasets](/api-reference/tilebox.datasets/Client.datasets). ```python Python from tilebox.datasets import Client client = Client() datasets = client.datasets() ``` ## Listing collections To list the collections for a dataset, use the `collections` method on the dataset object. ```python Python dataset = datasets.open_data.copernicus.landsat8_oli_tirs collections = dataset.collections() print(collections) ``` ```plaintext Output {'L1GT': Collection L1GT: [2013-03-25T12:08:43.699 UTC, 2024-08-19T12:57:32.456 UTC] (154288 data points), 'L1T': Collection L1T: [2013-03-26T09:33:19.763 UTC, 2020-08-24T03:21:50.000 UTC] (87958 data points), 'L1TP': Collection L1TP: [2013-03-24T00:25:55.457 UTC, 2024-08-19T12:58:20.229 UTC] (322041 data points), 'L2SP': Collection L2SP: [2015-01-01T07:53:35.391 UTC, 2024-08-12T12:52:03.243 UTC] (191110 data points)} ``` [dataset.collections](/api-reference/tilebox.datasets/Dataset.collections) returns a dictionary mapping collection names to their corresponding collection objects. Each collection has a unique name within its dataset. ## Accessing individual collections Once you have listed the collections for a dataset using [dataset.collections()](/api-reference/tilebox.datasets/Dataset.collections), you can access a specific collection by retrieving it from the resulting dictionary with its name. Use [collection.info()](/api-reference/tilebox.datasets/Collection.info) to get details (name, availability, and count) about it. ```python Python collections = dataset.collections() terrain_correction = collections["L1GT"] collection_info = terrain_correction.info() print(collection_info) ``` ```plaintext Output L1GT: [2013-03-25T12:08:43.699 UTC, 2024-08-19T12:57:32.456 UTC] (154288 data points) ``` You can also access a specific collection directly using the [dataset.collection](/api-reference/tilebox.datasets/Dataset.collection) method on the dataset object. This method allows you to get the collection without having to list all collections first. ```python Python terrain_correction = dataset.collection("L1GT") collection_info = terrain_correction.info() print(collection_info) ``` ```plaintext Output L1GT: [2013-03-25T12:08:43.699 UTC, 2024-08-19T12:57:32.456 UTC] (154288 data points) ``` ## Errors you may encounter ### NotFoundError If you attempt to access a collection with a non-existent name, a `NotFoundError` is raised. For example: ```python Python dataset.collection("Sat-X").info() # raises NotFoundError: 'No such collection Sat-X' ``` ## Next steps Learn how to load data points from a collection. # Introduction Learn about Tilebox Datasets Time series datasets refer to datasets where each data point is linked to a timestamp. This format is common for data collected over time, such as satellite data. This section covers: Discover available time series datasets and learn how to list them. Understand the common fields shared by all time series datasets. Learn what collections are and how to access them. Find out how to access data from a collection for specific time intervals. For a quick reference to API methods or specific parameter meanings, [check out the complete time series API Reference](/api-reference/tilebox.datasets/Client). ## Terminology Get familiar with some key terms when working with time series datasets. Time series data points are individual entities that form a dataset. Each data point has a timestamp and consists of a set of fixed [metadata fields](/datasets/timeseries#common-fields) along with dataset-specific fields. Time series datasets act as containers for data points. All data points in a dataset share the same type and fields. Collections group data points within a dataset. They help represent logical groupings of data points that are often queried together. ## Creating a datasets client Prerequisites * You have [installed](/sdks/python/install) the `tilebox-datasets` package. * You have [created](/authentication) a Tilebox API key. After meeting these prerequisites, you can create a client instance to interact with Tilebox Datasets. ```python Python from tilebox.datasets import Client client = Client(token="YOUR_TILEBOX_API_KEY") ``` You can also set the `TILEBOX_API_KEY` environment variable to your API key. You can then instantiate the client without passing the `token` argument. Python will automatically use this environment variable for authentication. ```python Python from tilebox.datasets import Client # requires a TILEBOX_API_KEY environment variable client = Client() ``` Tilebox datasets provide a standard synchronous API by default but also offers an [asynchronous client](/sdks/python/async) if needed. ### Exploring datasets After creating a client instance, you can start exploring available datasets. A straightforward way to do this in an interactive environment is to [list all datasets](/api-reference/tilebox.datasets/Client.datasets) and use the autocomplete feature in your Jupyter notebook. ```python Python datasets = client.datasets() datasets. # trigger autocomplete here to view available datasets ``` The Console also provides an [overview](https://console.tilebox.com/datasets/explorer) of all available datasets. ### Errors you might encounter #### AuthenticationError `AuthenticationError` occurs when the client fails to authenticate with the Tilebox API. This may happen if the provided API key is invalid or expired. A client instantiated with an invalid API key won't raise an error immediately, but an error will occur when making a request to the API. ```python Python client = Client(token="invalid-key") # runs without error datasets = client.datasets() # raises AuthenticationError ``` ## Next steps # Loading Time Series Data Learn how to load data from Time Series Dataset collections. ## Overview This section provides an overview of the API for loading data from a collection. It includes usage examples for many common scenarios. | Method | API Reference | Description | | ----------------- | ----------------------------------------------------------------------- | ---------------------------------------------------- | | `collection.load` | [Loading data](/api-reference/tilebox.datasets/Collection.load) | Load data points from a collection. | | `collection.find` | [Loading a data point](/api-reference/tilebox.datasets/Collection.find) | Find a specific datapoint in a collection by its id. | Check out the examples below for common scenarios when loading data from collections. The examples assume you have already [created a client](/datasets/introduction#creating-a-datasets-client) and [accessed a specific dataset collection](/datasets/collections). ```python Python from tilebox.datasets import Client client = Client() datasets = client.datasets() collections = datasets.open_data.copernicus.sentinel1_sar.collections() collection = collections["S1A_IW_RAW__0S"] ``` ## Loading data To load data points from a dataset collection, use the [load](/api-reference/tilebox.datasets/Collection.load) method. It requires a `time_or_interval` parameter to specify the time or time interval for loading. ### TimeInterval To load data for a specific time interval, use a `tuple` in the form `(start, end)` as the `time_or_interval` parameter. Both `start` and `end` must be [TimeScalars](#time-scalars), which can be `datetime` objects or strings in ISO 8601 format. ```python Python interval = ("2017-01-01", "2023-01-01") data = collection.load(interval, show_progress=True) ``` ```plaintext Output Size: 725MB Dimensions: (time: 1109597, latlon: 2) Coordinates: ingestion_time (time) datetime64[ns] 9MB 2024-06-21T11:03:33.8524... id (time) The `show_progress` parameter is optional and can be used to display a [tqdm](https://tqdm.github.io/) progress bar while loading data. A time interval specified as a tuple is interpreted as a half-closed interval. This means the start time is inclusive, and the end time is exclusive. For instance, using an end time of `2023-01-01` includes data points up to `2022-12-31 23:59:59.999`, but excludes those from `2023-01-01 00:00:00.000`. This behavior mimics the Python `range` function and is useful for chaining time intervals. ```python Python import xarray as xr data = [] for year in [2017, 2018, 2019, 2020, 2021, 2022]: interval = (f"{year}-01-01", f"{year + 1}-01-01") data.append(collection.load(interval, show_progress=True)) # Concatenate the data into a single dataset, which is equivalent # to the result of the single request in the code example above. data = xr.concat(data, dim="time") ``` Above example demonstrates how to split a large time interval into smaller chunks while loading data in separate requests. Typically, this is not necessary as the datasets client auto-paginates large intervals. ### TimeInterval objects For greater control over inclusivity of start and end times, you can use the `TimeInterval` dataclass instead of a tuple with the `load` parameter. This class allows you to specify the `start` and `end` times, as well as their inclusivity. Here's an example of creating equivalent `TimeInterval` objects in two different ways. ```python Python from datetime import datetime from tilebox.datasets.data import TimeInterval interval1 = TimeInterval( datetime(2017, 1, 1), datetime(2023, 1, 1), end_inclusive=False ) interval2 = TimeInterval( datetime(2017, 1, 1), datetime(2022, 12, 31, 23, 59, 59, 999999), end_inclusive=True ) print("Inclusivity is indicated by interval notation: ( and [") print(interval1) print(interval2) print(f"They are equivalent: {interval1 == interval2}") print(interval2.to_half_open()) # Same operation as above data = collection.load(interval1, show_progress=True) ``` ```plaintext Output Inclusivity is indicated by interval notation: ( and [ [2017-01-01T00:00:00.000 UTC, 2023-01-01T00:00:00.000 UTC) [2017-01-01T00:00:00.000 UTC, 2022-12-31T23:59:59.999 UTC] They are equivalent: True [2017-01-01T00:00:00.000 UTC, 2023-01-01T00:00:00.000 UTC) ``` ### Time scalars You can load all points for a specific time using a `TimeScalar` for the `time_or_interval` parameter to `load`. A `TimeScalar` can be a `datetime` object or a string in ISO 8601 format. When passed to the `load` method, it retrieves all data points matching the specified time. Note that the `time` field of data points in a collection may not be unique, so multiple data points could be returned. If you want to fetch only a single data point, use [find](#loading-a-data-point-by-id) instead. Here's how to load a data point at a specific time from a [collection](/datasets/collections). ```python Python data = collection.load("2024-08-01 00:00:01.362") print(data) ``` ```plaintext Output Size: 721B Dimensions: (time: 1, latlon: 2) Coordinates: ingestion_time (time) datetime64[ns] 8B 2024-08-01T08:53:08.450499 id (time) Tilebox uses millisecond precision for timestamps. To load all data points for a specific second, it's a [time interval](/datasets/loading-data#time-intervals) request. Refer to the examples below for details. The output of the `load` method is an `xarray.Dataset` object. To learn more about Xarray, visit the dedicated [Xarray page](/sdks/python/xarray). ### Time iterables You can specify a time interval by using an iterable of `TimeScalar`s as the `time_or_interval` parameter. This is especially useful when you want to use the output of a previous `load` call as input for another load. Here's how that works. ```python Python interval = ("2017-01-01", "2023-01-01") meta_data = collection.load(interval, skip_data=True) first_50_data_points = collection.load(meta_data.time[:50], skip_data=False) print(first_50_data_points) ``` ```plaintext Output Size: 33kB Dimensions: (time: 50, latlon: 2) Coordinates: ingestion_time (time) datetime64[ns] 400B 2024-06-21T11:03:33.852... id (time) ```python Python data = collection.load("2024-08-01 00:00:01.362", skip_data=True) print(data) ``` ```plaintext Output Size: 160B Dimensions: (time: 1) Coordinates: ingestion_time (time) datetime64[ns] 8B 2024-08-01T08:53:08.450499 id (time) ```python Python time_with_no_data_points = "1997-02-06 10:21:00" data = collection.load(time_with_no_data_points) print(data) ``` ```plaintext Output Size: 0B Dimensions: () Data variables: *empty* ``` ## Timezone handling When a `TimeScalar` is specified as a string, the time is treated as UTC. If you want to load data for a specific time in another timezone, use a `datetime` object. In this case, the Tilebox API will convert the datetime to `UTC` before making the request. The output will always contain UTC timestamps, which will need to be converted again if a different timezone is required. ```python Python from datetime import datetime import pytz # Tokyo has a UTC+9 hours offset, so this is the same as # 2017-01-01 02:45:25.679 UTC tokyo_time = pytz.timezone('Asia/Tokyo').localize( datetime(2017, 1, 1, 11, 45, 25, 679000) ) print(tokyo_time) data = collection.load(tokyo_time) print(data) # time is in UTC since API always returns UTC timestamps ``` ```plaintext Output 2017-01-01 11:45:25.679000+09:00 Size: 725B Dimensions: (time: 1, latlon: 2) Coordinates: ingestion_time (time) datetime64[ns] 8B 2024-06-21T11:03:33.852435 id (time) ```python Python datapoint_id = "01916d89-ba23-64c9-e383-3152644bcbde" datapoint = collection.find(datapoint_id) print(datapoint) ``` ```plaintext Output Size: 725B Dimensions: (latlon: 2) Coordinates: ingestion_time datetime64[ns] 8B 2024-08-20T05:53:08.600528 id You can also set the `skip_data` parameter when calling `find` to load only the metadata of the data point, same as for `load`. ### Possible exceptions * `NotFoundError`: raised if no data point with the given ID is found in the collection * `ValueError`: raised if the specified `datapoint_id` is not a valid UUID # Open Data Learn about the Open data available in Tilebox. Tilebox not only provides access to your own, private datasets but also to a growing number of public datasets. These datasets are available to all users of Tilebox and are a great way to get started and prototype your applications even before data from your own satellites is available. If there is a public dataset you would like to see in Tilebox, please get in touch. ## Accessing Open Data through Tilebox Accessing open datasets in Tilebox is straightforward and as easy as accessing your own datasets. Tilebox has already ingested the required metadata for each available dataset, so you can query, preview, and download the data seamlessly. This setup enables you to leverage performance optimizations and simplifies your workflows. By using the [datasets API](/datasets), you can start prototyping your applications and workflows easily. ## Available datasets The Tilebox Console contains in-depth descriptions of each dataset, including many code-snippets to help you get started. Check out the [Sentinel 5P Tropomi](https://console.tilebox.com/datasets/descriptions/feb2bcc9-8fdf-4714-8a63-395ee9d3f323) documentation as an example. ### Copernicus Data Space The [Copernicus Data Space](https://dataspace.copernicus.eu/) is an open ecosystem that provides free instant access to data and services from the Copernicus Sentinel missions. Tilebox currently supports the following datasets from the Copernicus Data Space: The Sentinel-1 mission is the European Radar Observatory for the Copernicus joint initiative of the European Commission (EC) and the European Space Agency (ESA). The Sentinel-1 mission includes C-band imaging operating in four exclusive imaging modes with different resolution (down to 5 m) and coverage (up to 400 km). It provides dual polarization capability, short revisit times and rapid product delivery. Sentinel-2 is equipped with an optical instrument payload that samples 13 spectral bands: four bands at 10 m, six bands at 20 m and three bands at 60 m spatial resolution. Sentinel-3 is equipped with multiple instruments whose data is available in Tilebox. `OLCI` (Ocean and Land Color Instrument) is an optical instrument used to provide data continuity for ENVISAT MERIS. `SLSTR` (Sea and Land Surface Temperature Radiometer) is a dual-view scanning temperature radiometer, which flies in low Earth orbit (800 - 830 km altitude). The `SRAL` (SAR Radar Altimeter) instrument comprises one nadir-looking antenna, and a central electronic chain composed of a Digital Processing Unit (DPU) and a Radio Frequency Unit (RFU). OLCI, in conjunction with the SLSTR instrument, provides the `SYN` products, providing continuity with SPOT VEGETATION. The primary goal of `TROPOMI` is to provide daily global observations of key atmospheric constituents related to monitoring and forecasting air quality, the ozone layer, and climate change. Landsat-8 is part of the long-running Landsat programme led by USGS and NASA and carries the Operational Land Imager (OLI) and the Thermal Infrared Sensor (TIRS). The Operational Land Imager (OLI), on board Landsat-8 measures in the VIS, NIR and SWIR portions of the spectrum. Its images have 15 m panchromatic and 30 m multi-spectral spatial resolutions along a 185 km wide swath, covering wide areas of the Earth's landscape while providing high enough resolution to distinguish features like urban centres, farms, forests and other land uses. The entire Earth falls within view once every 16 days due to Landsat-8's near-polar orbit. The Thermal Infra-Red Sensor instrument, on board Landsat-8, is a thermal imager operating in pushbroom mode with two Infra-Red channels: 10.8 µm and 12 µm with 100 m spatial resolution. ### Alaska Satellite Facility (ASF) The [Alaska Satellite Facility (ASF)](https://asf.alaska.edu/) is a NASA-funded research center at the University of Alaska Fairbanks. ASF supports a wide variety of research and applications using synthetic aperture radar (SAR) and related remote sensing technologies. Tilebox currently supports the following datasets from the Alaska Satellite Facility: European Remote Sensing Satellite (ERS) Synthetic Aperture Radar (SAR) Granules ### Umbra Space [Umbra](https://umbra.space/) satellites provide up to 16 cm resolution Synthetic Aperture Radar (SAR) imagery from space. The Umbra Open Data Program (ODP) features over twenty diverse time-series locations that are frequently updated, allowing users to explore SAR's capabilities. Tilebox currently supports the following datasets from Umbra Space: Time-series SAR data provided as Opendata by Umbra Space. # Storage Clients Learn about the different storage clients available in Tilebox to access open data. Tilebox does not host the actual open data satellite products but instead relies on publicly accessible storage providers for data access. Instead Tilebox ingests available metadata as [datasets](/datasets/timeseries) to enable high performance querying and structured access of the data as [xarray.Dataset](/sdks/python/xarray). Below is a list of the storage providers currently supported by Tilebox. ## Copernicus Data Space The [Copernicus Data Space](https://dataspace.copernicus.eu/) is an open ecosystem that provides free instant access to data and services from the Copernicus Sentinel missions. Check out the [ASF Open Data datasets](/datasets/open-data#copernicus-data-space) that are available in Tilebox. ### Access Copernicus data To download data products from the Copernicus Data Space after querying them via the Tilebox API, you need to [create an account](https://identity.dataspace.copernicus.eu/auth/realms/CDSE/protocol/openid-connect/auth?client_id=cdse-public\&response_type=code\&scope=openid\&redirect_uri=https%3A//dataspace.copernicus.eu/account/confirmed/1) and then generate [S3 credentials here](https://eodata-s3keysmanager.dataspace.copernicus.eu/panel/s3-credentials). The following code snippet demonstrates how to query and download Copernicus data using the Tilebox Python SDK. ```python Python {4,9-13,27} from pathlib import Path from tilebox.datasets import Client from tilebox.storage import CopernicusStorageClient # Creating clients client = Client(token="YOUR_TILEBOX_API_KEY") datasets = client.datasets() storage_client = CopernicusStorageClient( access_key="YOUR_ACCESS_KEY", secret_access_key="YOUR_SECRET_ACCESS_KEY", cache_directory=Path("./data") ) # Choosing the dataset and collection s2_dataset = datasets.open_data.copernicus.sentinel2_msi collections = s2_dataset.collections() collection = collections["S2A_S2MSI2A"] # Loading metadata s2_data = collection.load(("2024-08-01", "2024-08-02"), show_progress=True) # Selecting a data point to download selected = s2_data.isel(time=0) # index 0 selected # Downloading the data downloaded_data = storage_client.download(selected) print(f"Downloaded granule: {downloaded_data.name} to {downloaded_data}") print("Contents: ") for content in downloaded_data.iterdir(): print(f" - {content.relative_to(downloaded_data)}") ``` ```plaintext Output Downloaded granule: S2A_MSIL2A_20240801T002611_N0511_R102_T58WET_20240819T170544.SAFE to data/Sentinel-2/MSI/L2A/2024/08/01/S2A_MSIL2A_20240801T002611_N0511_R102_T58WET_20240819T170544.SAFE Contents: - manifest.safe - GRANULE - INSPIRE.xml - MTD_MSIL2A.xml - DATASTRIP - HTML - rep_info - S2A_MSIL2A_20240801T002611_N0511_R102_T58WET_20240819T170544-ql.jpg ``` ## Alaska Satellite Facility (ASF) The [Alaska Satellite Facility (ASF)](https://asf.alaska.edu/) is a NASA-funded research center at the University of Alaska Fairbanks. Check out the [ASF Open Data datasets](/datasets/open-data#alaska-satellite-facility-asf) that are available in Tilebox. ### Accessing ASF Data You can query ASF metadata without needing an account, as Tilebox has indexed and ingested the relevant metadata. To access and download the actual satellite products, you will need an ASF account. You can create an ASF account in the [ASF Vertex Search Tool](https://search.asf.alaska.edu/). The following code snippet demonstrates how to query and download ASF data using the Tilebox Python SDK. ```python Python {4,9-13,27} from pathlib import Path from tilebox.datasets import Client from tilebox.storage import ASFStorageClient # Creating clients client = Client(token="YOUR_TILEBOX_API_KEY") datasets = client.datasets() storage_client = ASFStorageClient( user="YOUR_ASF_USER", password="YOUR_ASF_PASSWORD", cache_directory=Path("./data") ) # Choosing the dataset and collection ers_dataset = datasets.open_data.asf.ers_sar collections = ers_dataset.collections() collection = collections["ERS-2"] # Loading metadata ers_data = collection.load(("2009-01-01", "2009-01-02"), show_progress=True) # Selecting a data point to download selected = ers_data.isel(time=0) # index 0 selected # Downloading the data downloaded_data = storage_client.download(selected, extract=True) print(f"Downloaded granule: {downloaded_data.name} to {downloaded_data}") print("Contents: ") for content in downloaded_data.iterdir(): print(f" - {content.relative_to(downloaded_data)}") ``` ```plaintext Output Downloaded granule: E2_71629_STD_L0_F183 to data/ASF/E2_71629_STD_F183/E2_71629_STD_L0_F183 Contents: - E2_71629_STD_L0_F183.000.vol - E2_71629_STD_L0_F183.000.meta - E2_71629_STD_L0_F183.000.raw - E2_71629_STD_L0_F183.000.pi - E2_71629_STD_L0_F183.000.nul - E2_71629_STD_L0_F183.000.ldr ``` ### Further Reading ## Umbra Space [Umbra](https://umbra.space/) satellites provide high resolution Synthetic Aperture Radar (SAR) imagery from space. Check out the [Umbra datasets](/datasets/open-data#umbra-space) that are available in Tilebox. ### Accessing Umbra data You don't need an account to access Umbra data. All data is provided under a Creative Commons License (CC BY 4.0), allowing you to freely use it. The following code snippet demonstrates how to query and download Copernicus data using the Tilebox Python SDK. ```python Python {4,9,23} from pathlib import Path from tilebox.datasets import Client from tilebox.storage import UmbraStorageClient # Creating clients client = Client(token="YOUR_TILEBOX_API_KEY") datasets = client.datasets() storage_client = UmbraStorageClient(cache_directory=Path("./data")) # Choosing the dataset and collection umbra_dataset = datasets.open_data.umbra.sar collections = umbra_dataset.collections() collection = collections["SAR"] # Loading metadata umbra_data = collection.load(("2024-01-05", "2024-01-06"), show_progress=True) # Selecting a data point to download selected = umbra_data.isel(time=0) # index 0 selected # Downloading the data downloaded_data = storage_client.download(selected) print(f"Downloaded granule: {downloaded_data.name} to {downloaded_data}") print("Contents: ") for content in downloaded_data.iterdir(): print(f" - {content.relative_to(downloaded_data)}") ``` ```plaintext Output Downloaded granule: 2024-01-05-01-53-37_UMBRA-07 to data/Umbra/ad hoc/Yi_Sun_sin_Bridge_SK/6cf02931-ca2e-4744-b389-4844ddc701cd/2024-01-05-01-53-37_UMBRA-07 Contents: - 2024-01-05-01-53-37_UMBRA-07_SIDD.nitf - 2024-01-05-01-53-37_UMBRA-07_SICD.nitf - 2024-01-05-01-53-37_UMBRA-07_CSI-SIDD.nitf - 2024-01-05-01-53-37_UMBRA-07_METADATA.json - 2024-01-05-01-53-37_UMBRA-07_GEC.tif - 2024-01-05-01-53-37_UMBRA-07_CSI.tif ``` # Time series data Learn about time series datasets Time series datasets act as containers for data points. All data points in a dataset share the same type and fields. Additionally, all time series datasets include a few [common fields](#common-fields). One of these fields, the `time` field, allows you to perform time-based data queries on a dataset. ## Listing datasets You can use [your client instance](/datasets/introduction#creating-a-datasets-client) to access the datasets available to you. For example, to access the `sentinel1_sar` dataset in the `open_data.copernicus` dataset group, use the following code. ```python Python from tilebox.datasets import Client client = Client() datasets = client.datasets() dataset = datasets.open_data.copernicus.sentinel1_sar ``` Once you have your dataset object, you can use it to [list the available collections](/datasets/collections) for the dataset. If you're using an IDE or an interactive environment with auto-complete, you can use it on your client instance to discover the datasets available to you. Type `client.` and trigger auto-complete after the dot to do so. ## Common fields While the specific data fields between different time series datasets can vary, there are common fields that all time series datasets share. The timestamp associated with each data point. Tilebox uses millisecond precision for storing and indexing data points. Timestamps are always in UTC. A [universally unique identifier (UUID)](https://en.wikipedia.org/wiki/Universally_unique_identifier) that uniquely identifies each data point. IDs are generated so that sorting them lexicographically also sorts them by their time field. The time the data point was ingested into the Tilebox API. Timestamps are always in UTC. These fields are present in all time series datasets. Together, they make up the metadata of a data point. Each dataset also has its own set of fields that are specific to that dataset. Tilebox uses millisecond precision timestamps for storing and indexing data points. If multiple data points share the same timestamp within one millisecond, they will all display the same timestamp. Each data point can have any number of timestamp fields with a higher precision. For example, telemetry data commonly includes timestamp fields with nanosecond precision. ## Example data point Below is an example data point from a time series dataset represented as an [`xarray.Dataset`](/sdks/python/xarray). It contains the common fields. When using the Tilebox Python client library, you receive the data in this format. ```plaintext Example timeseries datapoint Dimensions: () Coordinates: time datetime64[ns] 2023-03-12 16:45:23.532 id The datatype ` # Tilebox The Solar System's #1 developer tool for space data management export const HeroCard = ({children, title, description, href}) => { return {children}

{title}

{description}

; }; Tilebox is a space data native distributed computing tool. It provides a framework that simplifies access, processing, and distribution of space data across different environments enabling efficient multi-language, multi-cluster workflows. Tilebox integrates seamlessly with your existing infrastructure, ensuring that you maintain complete control over your data and algorithms. ## Modules Tilebox consists of two primary modules: Datasets Workflows ## Getting Started To get started, check out some of the following resources: ## Guides You can also start by looking through these guides: Learn about time-series datasets and their structure. Discover how to query and load data from a dataset. Gain a deeper understanding of how to create tasks using the Tilebox Workflow Orchestrator. Find out how to deploy Task Runners to run workflows in a parallel, distributed manner. # Quickstart This guide helps you set up and get started using Tilebox. It covers how to install a Tilebox client for your preferred language and how to use it to query data from a dataset and run a workflow. Select your preferred language and follow the steps below to get started. ## Start in a Notebook Explore the provided [Sample Notebooks](/sdks/python/sample-notebooks) to begin your journey with Tilebox. These notebooks offer a step-by-step guide to using the API and showcase many features supported by Tilebox Python clients. You can also use these notebooks as a foundation for your own projects. ## Start on Your Device If you prefer to work locally, follow these steps to get started. Install the Tilebox Python packages. The easiest way to do this is using `pip`: ``` pip install tilebox-datasets tilebox-workflows tilebox-storage ``` Create an API key by logging into the [Tilebox Console](https://console.tilebox.com), navigating to [Account -> API Keys](https://console.tilebox.com/account/api-keys), and clicking the "Create API Key" button. Tilebox Console Tilebox Console Copy the API key and keep it somewhere safe. You will need it to authenticate your requests. Create a cluster by logging into the [Tilebox Console](https://console.tilebox.com), navigating to [Workflows -> Clusters](https://console.tilebox.com/workflows/clusters), and clicking the "Create cluster" button. Tilebox Console Tilebox Console Copy the cluster slug, you will need it to run your workflows. Use the datasets client to query data from a dataset. ```python Python from tilebox.datasets import Client client = Client(token="YOUR_TILEBOX_API_KEY") # select a dataset datasets = client.datasets() dataset = datasets.open_data.copernicus.sentinel2_msi # and load data from a collection in a given time range collection = dataset.collection("S2A_S2MSI1C") data_january_2022 = collection.load(("2022-01-01", "2022-02-01")) ``` Use the workflows client to create a task and submit it as a job. ```python Python from tilebox.workflows import Client, Task # Replace with your actual cluster and token cluster = "YOUR_COMPUTE_CLUSTER" client = Client(token="YOUR_TILEBOX_API_KEY") class HelloWorldTask(Task): greeting: str = "Hello" name: str = "World" def execute(self, context): print(f"{self.greeting} {self.name}, from the main task!") context.submit_subtask(HelloSubtask(name=self.name)) class HelloSubtask(Task): name: str def execute(self, context): print(f"Hello from the subtask, {self.name}!") # Initiate the job jobs = client.jobs() jobs.submit("parameterized-hello-world", HelloWorldTask(greeting="Greetings", name="Universe"), cluster) # Run the tasks runner = client.runner(cluster, tasks=[HelloWorldTask, HelloSubtask]) runner.run_all() ``` Review the following guides to learn more about the modules that make up Tilebox: # Introduction Learn about the Tilebox GO SDK Hang tight - Go support for Tilebox is coming soon. # Async support Tilebox offers a standard synchronous API by default but also provides an async client option when needed. ## Why use async? When working with external datasets, such as [Tilebox datasets](/datasets/timeseries), loading data may take some time. To speed up this process, you can run requests in parallel. While you can use multi-threading or multi-processing, which can be complex, often times a simpler option is to perform data loading tasks asynchronously using coroutines and `asyncio`. ## Switching to an async datasets client To switch to the async client, change the import statement for the `Client`. The example below illustrates this change. ```python Python (Sync) from tilebox.datasets import Client # This client is synchronous client = Client() ``` ```python Python (Async) from tilebox.datasets.aio import Client # This client is asynchronous client = Client() ``` After switching to the async client, use `await` for operations that interact with the Tilebox API. ```python Python (Sync) # Listing datasets datasets = client.datasets() # Listing collections dataset = datasets.open_data.copernicus.sentinel1_sar collections = dataset.collections() # Collection information collection = collections["S1A_IW_RAW__0S"] info = collection.info() print(f"Data for My-collection is available for {info.availability}") # Loading data data = collection.load(("2022-05-01", "2022-06-01"), show_progress=True) # Finding a specific datapoint datapoint_uuid = "01910b3c-8552-7671-3345-b902cc0813f3" datapoint = collection.find(datapoint_uuid) ``` ```python Python (Async) # Listing datasets datasets = await client.datasets() # Listing collections dataset = datasets.open_data.copernicus.sentinel1_sar collections = await dataset.collections() # Collection information collection = collections["S1A_IW_RAW__0S"] info = await collection.info() print(f"Data for My-collection is available for {info.availability}") # Loading data data = await collection.load(("2022-05-01", "2022-06-01"), show_progress=True) # Finding a specific datapoint datapoint_uuid = "01910b3c-8552-7671-3345-b902cc0813f3" datapoint = await collection.find(datapoint_uuid) ``` Jupyter notebooks and similar interactive environments support asynchronous code execution. You can use `await some_async_call()` as the output of a code cell. ## Fetching data concurrently The primary benefit of the async client is that it allows concurrent requests, enhancing performance. In below example, data is fetched from multiple collections. The synchronous approach retrieves data sequentially, while the async approach does so concurrently, resulting in faster execution. ```python Python (Sync) # Example: fetching data sequentially # switch to the async example to compare the differences import time from tilebox.datasets import Client from tilebox.datasets.sync.timeseries import TimeseriesCollection client = Client() datasets = client.datasets() collections = datasets.open_data.copernicus.landsat8_oli_tirs.collections() def stats_for_2020(collection: TimeseriesCollection) -> None: """Fetch data for 2020 and print the number of data points that were loaded.""" data = collection.load(("2020-01-01", "2021-01-01"), show_progress=True) n = data.sizes['time'] if 'time' in data else 0 return (collection.name, n) start = time.monotonic() results = [stats_for_2020(collections[name]) for name in collections] duration = time.monotonic() - start for collection_name, n in results: print(f"There are {n} datapoints in {collection_name} for 2020.") print(f"Fetching data took {duration:.2f} seconds") ``` ```python Python (Async) # Example: fetching data concurrently import asyncio import time from tilebox.datasets.aio import Client from tilebox.datasets.aio.timeseries import TimeseriesCollection client = Client() datasets = await client.datasets() collections = await datasets.open_data.copernicus.landsat8_oli_tirs.collections() async def stats_for_2020(collection: TimeseriesCollection) -> None: """Fetch data for 2020 and print the number of data points that were loaded.""" data = await collection.load(("2020-01-01", "2021-01-01"), show_progress=True) n = data.sizes['time'] if 'time' in data else 0 return (collection.name, n) start = time.monotonic() # Initiate all requests concurrently requests = [stats_for_2020(collections[name]) for name in collections] # Wait for all requests to finish in parallel results = await asyncio.gather(*requests) duration = time.monotonic() - start for collection_name, n in results: print(f"There are {n} datapoints in {collection_name} for 2020.") print(f"Fetching data took {duration:.2f} seconds") ``` The output demonstrates that the async approach runs approximately 30% faster for this example. With `show_progress` enabled, the progress bars update concurrently. ```plaintext Python (Sync) There are 19624 datapoints in L1GT for 2020. There are 1281 datapoints in L1T for 2020. There are 65313 datapoints in L1TP for 2020. There are 25375 datapoints in L2SP for 2020. Fetching data took 10.92 seconds ``` ```plaintext Python (Async) There are 19624 datapoints in L1GT for 2020. There are 1281 datapoints in L1T for 2020. There are 65313 datapoints in L1TP for 2020. There are 25375 datapoints in L2SP for 2020. Fetching data took 7.45 seconds ``` ## Async workflows The Tilebox workflows Python client does not have an async client. This is because workflows are designed for distributed and concurrent execution outside a single async event loop. But within a single task, you may use still use`async` code to take advantage of asynchronous execution, such as parallel data loading. You can achieve this by wrapping your async code in `asyncio.run`. Below is an example of using async code within a workflow task. ```python Python (Async) import asyncio import xarray as xr from tilebox.datasets.aio import Client as DatasetsClient from tilebox.datasets.data import TimeIntervalLike from tilebox.workflows import Task, ExecutionContext class FetchData(Task): def execute(self, context: ExecutionContext) -> None: # The task execution itself is synchronous # But we can leverage async code within the task using asyncio.run # This will fetch three months of data in parallel data_jan, data_feb, data_mar = asyncio.run(load_first_three_months()) async def load_data(interval: TimeIntervalLike): datasets = await DatasetsClient().datasets() collections = await datasets.open_data.copernicus.landsat8_oli_tirs.collections() return await collections["L1T"].load(interval) async def load_first_three_months() -> tuple[xr.Dataset, xr.Dataset, xr.Dataset]: jan = load_data(("2020-01-01", "2020-02-01")) feb = load_data(("2020-02-01", "2020-03-01")) mar = load_data(("2020-03-01", "2020-04-01")) # load the three months in parallel jan, feb, mar = await asyncio.gather(jan, feb, mar) return jan, feb, mar ``` If you encounter an error like `RuntimeError: asyncio.run() cannot be called from a running event loop`, it means you're trying to start another asyncio event loop (with `asyncio.run`) from within an existing one. This often happens in Jupyter notebooks since they automatically start an event loop. A way to resolve this is by using [nest-asyncio](https://pypi.org/project/nest-asyncio/). # Geometries How geometries are handled in the Tilebox Python client. Many datasets consist of granules that represent specific geographical areas on the Earth's surface. Often, a polygon defining the outline of these areas—a footprint—accompanies other granule metadata in time series datasets. Tilebox provides built-in support for working with geometries. Here's an example that loads some granules from the `ERS SAR` Opendata dataset, which contains geometries. ```python Loading ERS data from tilebox.datasets import Client client = Client() datasets = client.datasets() ers_collection = datasets.open_data.asf.ers_sar.collection("ERS-2") ers_data = ers_collection.load(("2008-02-10T21:00", "2008-02-10T22:00")) ``` ## Shapely In the `ers_data` dataset, each granule includes a `geometry` field that represents the footprint of each granule as a polygon. Tilebox automatically converts geometry fields to `Polygon` or `MultiPolygon` objects from the [Shapely](https://shapely.readthedocs.io/en/stable/manual.html) library. By integrating with Shapely, you can use the rich set of libraries and tools it provides. That includes support for computing polygon characteristics such as total area, intersection checks, and conversion to other formats. ```python Printing geometries geometries = ers_data.geometry.values print(geometries) ``` Each geometry is a [shapely.Geometry](https://shapely.readthedocs.io/en/stable/geometry.html#geometry). ```plaintext Output [ ] ``` Geometries are not always [Polygon](https://shapely.readthedocs.io/en/stable/reference/shapely.Polygon.html#shapely.Polygon) objects. More complex footprint geometries are represented as [MultiPolygon](https://shapely.readthedocs.io/en/stable/reference/shapely.MultiPolygon.html#shapely.MultiPolygon) objects. ### Accessing Coordinates You can select a polygon from the geometries and access the underlying coordinates and an automatically computed centroid point. ```python Accessing coordinates and computing a centroid point polygon = geometries[0] lon, lat = polygon.exterior.coords.xy center, = list(polygon.centroid.coords) print(lon) print(lat) print(center) ``` ```plaintext Output array('d', [-150.753244, -152.031574, -149.183655, -147.769339, -150.753244]) array('d', [74.250081, 73.336051, 73.001748, 73.899483, 74.250081]) (-149.92927907414239, 73.62538063474753) ``` Interactive environments such as [Jupyter Notebooks](/sdks/python/sample-notebooks) can visualize Polygon shapes graphically. Just type `polygon` in an empty cell and execute it for a visual representation of the polygon shape. ### Visualization on a Map To visualize polygons on a map, you can use [Folium](https://pypi.org/project/folium/). Below is a helper function that produces an OpenStreetMap with the Polygon overlaid. ```python visualize helper function # pip install folium from folium import Figure, Map, Polygon as FoliumPolygon, GeoJson, TileLayer from folium.plugins import MiniMap from shapely import Polygon, to_geojson from collections.abc import Iterable def visualize(poly: Polygon | Iterable[Polygon], zoom=4): """Visualize a polygon or a list of polygons on a map""" if not isinstance(poly, Iterable): poly = [poly] fig = Figure(width=600, height=600) map = Map(location=geometries[len(geometries)//2].centroid.coords[0][::-1], zoom_start=zoom, control_scale=True) map.add_child(MiniMap()) fig.add_child(map) for p in poly: map.add_child(GeoJson(to_geojson(p))) return fig ``` Here's how to use it. ```python Visualizing a polygon visualize(polygon) ``` Single visualization The `visualize` helper function supports a list of polygons, which can display the data layout of the ERS granules. ```python Visualizing multiple polygons visualize(geometries) ``` Granules visualization ## Format conversion Shapely supports converting Polygons to some common formats, such as [GeoJSON](https://geojson.org/) or [Well-Known Text (WKT)](https://docs.ogc.org/is/18-010r7/18-010r7.html). ```python Converting to GeoJSON from shapely import to_geojson print(to_geojson(polygon)) ``` ```plaintext Output {"type":"Polygon","coordinates":[[[-150.753244,74.250081],[-152.031574,73.336051],[-149.183655,73.001748],[-147.769339,73.899483],[-150.753244,74.250081]]]} ``` ```python Converting to WKT from shapely import to_wkt print(to_wkt(polygon)) ``` ```plaintext Output POLYGON ((-150.753244 74.250081, -152.031574 73.336051, -149.183655 73.001748, -147.769339 73.899483, -150.753244 74.250081)) ``` ## Checking intersections One common task when working with geometries is checking if a given geometry falls into a specific area of interest. Shapely provides an `intersects` method for this purpose. ```python Checking intersections from shapely import box # Box representing the rectangular area lon=(-160, -150) and lat=(69, 70) area_of_interest = box(-160, 69, -150, 70) for i, polygon in enumerate(geometries): if area_of_interest.intersects(polygon): print(f"{ers_data.granule_name[i].item()} intersects the area of interest!") else: print(f"{ers_data.granule_name[i].item()} doesn't intersect the area of interest!") ``` ```plaintext Output E2_66974_STD_F264 doesn't intersect the area of interest! E2_66974_STD_F265 doesn't intersect the area of interest! E2_66974_STD_F267 doesn't intersect the area of interest! E2_66974_STD_F269 doesn't intersect the area of interest! E2_66974_STD_F271 doesn't intersect the area of interest! E2_66974_STD_F273 intersects the area of interest! E2_66974_STD_F275 intersects the area of interest! E2_66974_STD_F277 intersects the area of interest! E2_66974_STD_F279 doesn't intersect the area of interest! E2_66974_STD_F281 doesn't intersect the area of interest! E2_66974_STD_F283 doesn't intersect the area of interest! E2_66974_STD_F285 doesn't intersect the area of interest! E2_66974_STD_F289 doesn't intersect the area of interest! ``` ## Combining polygons As shown in the visualization of the granule footprints, the granules collectively form an orbit from pole to pole. Measurements are often combined during processing. You can do the same with geometries by combining them into a single polygon, which represents the hull around all individual footprints using [shapely.unary\_union](https://shapely.readthedocs.io/en/stable/reference/shapely.unary_union.html). ```python Combining multiple polygons from shapely.ops import unary_union hull = unary_union(geometries) visualize(hull) ``` The computed hull consists of two polygons due to a gap (probably a missing granule) in the geometries. Such geometries are represented as [Multi Polygons](#multi-polygons). Unary union visualization ## Multi Polygons A collection of one or more non-overlapping polygons combined into a single geometry is called a [MultiPolygon](https://shapely.readthedocs.io/en/latest/reference/shapely.MultiPolygon.html). Footprint geometries can be of type `MultiPolygon` due to gaps or pole discontinuities. The computed hull in the previous example is a `MultiPolygon`. ```python Accessing individual polygons of a MultiPolygon print(f"The computed hull of type {type(hull).__name__} consists of {len(hull.geoms)} sub polygons") for i, poly in enumerate(hull.geoms): print(f"Sub polygon {i} has an area of {poly.area}") ``` ```plaintext Output The computed hull of type MultiPolygon consists of 2 sub polygons Sub polygon 0 has an area of 2.025230449898011 Sub polygon 1 has an area of 24.389998081651527 ``` ## Antimeridian Crossings A common issue with `longitude / latitude` geometries is crossings of the 180-degree meridian, or the antimeridian. For example, the coordinates of a `LineString` from Japan to the United States might look like this: `140, 141, 142, ..., 179, 180, -179, -178, ..., -125, -124` Libraries like Shapely are not designed to handle spherical coordinate systems, so caution is necessary with such geometries. Here's an `ERS` granule demonstrating this issue. ```python Antimeridian Crossing # A granule that crosses the antimeridian granule = ers_collection.find("0119bb86-0260-5819-6aab-f99796417155") polygon = granule.geometry.item() print(polygon.exterior.coords.xy) visualize(polygon) ``` ```plaintext Output array('d', [177.993407, 176.605009, 179.563047, -178.904076, 177.993407]) array('d', [74.983185, 74.074615, 73.727752, 74.61847, 74.983185]) ``` Antimeridian buggy visualization This 2D visualization appears incorrect. Both the visualization and any calculations performed may yield inaccurate results. For instance, testing whether the granule intersects the 0-degree meridian provides a false positive. ```python Problems with calculating intersections from shapely import LineString null_meridian = LineString([(0, -90), (0, 90)]) print(polygon.intersects(null_meridian)) # True - but this is incorrect! ``` The GeoJSON specification offers a solution for this problem. In the section [Antimeridian Cutting](https://datatracker.ietf.org/doc/html/rfc7946#section-3.1.9), it suggests always cutting lines and polygons into two parts—one for the eastern hemisphere and one for the western hemisphere. In python, this can be achieved using the [antimeridian](https://pypi.org/project/antimeridian/) package. ```python Cutting the polygon along the antimeridian # pip install antimeridian import antimeridian fixed_polygon = antimeridian.fix_polygon(polygon) visualize(fixed_polygon) print(fixed_polygon.intersects(null_meridian)) # False - this is correct now ``` Antimeridian fixed visualization Since Shapely is unaware of the spherical nature of this data, the **centroid** of the fixed polygon **is still incorrect**. The antimeridian package also includes a function to correct this. ```python Calculating the centroid of a cut polygon crossing the antimeridian print("Wrongly computed centroid coordinates (Shapely)") print(list(fixed_polygon.centroid.coords)) print("Correct centroid coordinates (Antimeridian taken into account)") print(list(antimeridian.centroid(fixed_polygon).coords)) ``` ```plaintext Output Wrongly computed centroid coordinates (shapely) [(139.8766350146937, 74.3747116658462)] Correct centroid coordinates (antimeridian taken into account) [(178.7782777050171, 74.3747116658462)] ``` ## Spherical Geometry Another approach to handle the antimeridian issue is performing all coordinate-related calculations, such as polygon intersections, in a [spherical coordinate system](https://en.wikipedia.org/wiki/Spherical_coordinate_system). One useful library for this is [spherical\_geometry](https://spherical-geometry.readthedocs.io/en/latest/). Here's an example. ```python Spherical Geometry # pip install spherical-geometry from spherical_geometry.polygon import SphericalPolygon from spherical_geometry.vector import lonlat_to_vector lon, lat = polygon.exterior.coords.xy spherical_poly = SphericalPolygon.from_lonlat(lon, lat) # Let's check the x, y, z coordinates of the spherical polygon: print(list(spherical_poly.points)) ``` ```plaintext Output [array([[-0.25894363, 0.00907234, 0.96584983], [-0.2651968 , -0.00507317, 0.96418096], [-0.28019363, 0.00213687, 0.95994112], [-0.27390375, 0.01624885, 0.96161984], [-0.25894363, 0.00907234, 0.96584983]])] ``` Now, you can compute intersections or check if a particular point is within the polygon. You can compare the incorrect calculation using `shapely` with the correct version when using `spherical_geometry`. ```python Correct calculations using spherical geometry # A point on the null-meridian, way off from our polygon null_meridian_point = 0, 74.4 # A point actually inside our polygon point_inside = 178.8, 74.4 print("Shapely results:") print("- Null meridian point inside:", polygon.contains(Point(*null_meridian_point))) print("- Actual inside point inside:", polygon.contains(Point(*point_inside))) print("Spherical geometry results:") print("- Null meridian point inside:", spherical_poly.contains_lonlat(*null_meridian_point)) print("- Actual inside point inside:", spherical_poly.contains_lonlat(*point_inside)) ``` ```plaintext Output Shapely results: - Null meridian point inside: True - Actual inside point inside: False Spherical geometry results: - Null meridian point inside: False - Actual inside point inside: True ``` # Installation Install the Tilebox Python Packages ## Package Overview Tilebox offers a Python SDK for accessing Tilebox services. The SDK includes separate packages that can be installed individually based on the services you wish to use, or all together for a comprehensive experience. Access Tilebox datasets from Python Workflow client and task runner for Tilebox ## Installation Install the Tilebox python packages using your preferred package manager: ```bash pip pip install tilebox-datasets tilebox-workflows tilebox-storage ``` ```bash uv uv add tilebox-datasets tilebox-workflows tilebox-storage ``` ```bash poetry poetry add tilebox-datasets="*" tilebox-workflows="*" tilebox-storage="*" ``` ```bash pipenv pipenv install tilebox-datasets tilebox-workflows tilebox-storage ``` ## Setting up a local JupyterLab environment To get started quickly, you can also use an existing Jupyter-compatible cloud environment such as [Google Colab](https://colab.research.google.com/). If you want to set up a local Jupyter environment to explore the SDK or to run the [Sample notebooks](/sdks/python/sample-notebooks) locally, install [JupyterLab](https://github.com/jupyterlab/jupyterlab) for a browser-based development environment. It's advised to install the Tilebox packages along with JupyterLab, [ipywidgets](https://ipywidgets.readthedocs.io/en/latest/user_install.html), and [tqdm](https://tqdm.github.io/) for an enhanced experience. ```uv uv mkdir tilebox-exploration cd tilebox-exploration uv init --no-package uv add tilebox-datasets tilebox-workflows tilebox-storage uv add jupyterlab ipywidgets tqdm uv run jupyter lab ``` ### Trying it out After installation, create a new notebook and paste the following code snippet to verify your installation. If you're new to Jupyter, you can refer to the guide on [interactive environments](/sdks/python/sample-notebooks#interactive-environments). ```python Python from tilebox.datasets import Client client = Client() datasets = client.datasets() collection = datasets.open_data.copernicus.landsat8_oli_tirs.collection("L1T") data = collection.load(("2015-01-01", "2020-01-01"), show_progress=True) data ``` If the installation is successful, the output should appear as follows. Local JupyterLab Local JupyterLab # Sample notebooks Sample code maintained to use and learn from. To quickly become familiar with the Python client, you can explore some sample notebooks. Each notebook can be executed standalone from top to bottom. ## Sample notebooks You can access the sample notebooks on [ Google Drive](https://drive.google.com/drive/folders/1I7G35LLeB2SmKQJFsyhpSVNyZK9uOeJt). Right click a notebook in Google Drive and select `Open with -> Google Colaboratory` to open it directly in the browser using [Google Colab](https://colab.research.google.com/). ### Notebook overview This notebook demonstrates how to use the Python client to query metadata from the ERS-SAR Opendata dataset. It shows how to filter results by geographical location and download product data for a specific granule. [ Open in Colab](https://colab.research.google.com/drive/1LTYhLKy8m9psMhu0DvANs7hyS3ViVpas) This notebook illustrates how to use the Python client to query the S5P Tropomi Opendata dataset for methane products. It also explains how to filter results based on geographical location and download product data for a specific granule. [ Open in Colab](https://colab.research.google.com/drive/1eVYARNFnTIeQqBs6gqeay01EDvRk2EI4) Execute cells one by one using `Shift+Enter`. Most commonly used libraries are pre-installed. All demo notebooks require Python 3.10 or higher. ## Interactive environments Jupyter, Google Colab, and JetBrains Datalore are interactive environments that simplify the development and sharing of algorithmic code. They allow users to work with notebooks, which combine code and rich text elements like figures, links, and equations. Notebooks require no setup and can be easily shared. [Jupyter notebooks](https://jupyter.org/) are the original interactive environment for Python. They are useful but require local installation. [Google Colab](https://colab.research.google.com/) is a free tool that provides a hosted interactive Python environment. It easily connects to local Jupyter instances and allows code sharing using Google credentials or within organizations using Google Workspace. [JetBrains Datalore](https://datalore.jetbrains.com/) is a free platform for collaborative testing, development, and sharing of Python code and algorithms. It has built-in secret management for storing credentials. Datalore also features advanced JetBrains syntax highlighting and autocompletion. Currently, it only supports Python 3.8, which is not compatible with the Tilebox Python client. Since Colab is a hosted free tool that meets all requirements, including Python ≥3.10, it's recommended for use. ## Installing packages Within your interactive environment, you can install missing packages using pip in "magic" cells, which start with an exclamation mark. ```bash # pip is already installed in your interactive environment !pip3 install .... ``` All APIs or commands that require authentication can be accessed through client libraries that hide tokens, allowing notebooks to be shared without exposing personal credentials. ## Executing code Execute code by clicking the play button in the top left corner of the cell or by pressing `Shift + Enter`. While the code is running, a spinning icon appears. When the execution finishes, the icon changes to a number, indicating the order of execution. The output displays below the code. ## Authorization When sharing notebooks, avoid directly sharing your Tilebox API key. Instead, use one of two methods to authenticate the Tilebox Python client in interactive environments: through environment variables or interactively. ```python Using environment variables to store your API key # Define an environment variable "TILEBOX_API_KEY" that contains your API key import os token = os.getenv("TILEBOX_API_KEY") ``` **Interactive** authorization is possible using the built-in `getpass` module. This prompts the user for the API key when running the code, storing it in memory without sharing it when the notebook is shared. ```python Interactively providing your API key from getpass import getpass token = getpass("API key:") ``` # Xarray Overview of the Xarray library, common use cases, and implementation details. [example_satellite_data.nc]: https://github.com/tilebox/docs/raw/main/assets/data/example_satellite_data.nc [Xarray](https://xarray.dev/) is a library designed for working with labeled multi-dimensional arrays. Built on top of [NumPy](https://numpy.org/) and [Pandas](https://pandas.pydata.org/), Xarray adds labels in the form of dimensions, coordinates, and attributes, enhancing the usability of raw NumPy-like arrays. This enables a more intuitive, concise, and less error-prone development experience. The library also includes a large and expanding collection of functions for advanced analytics and visualization. Overview of the Xarray data structure An overview of the Xarray library and its suitability for N-dimensional data (such as Tilebox time series datasets) is available in the official [Why Xarray? documentation page](https://xarray.pydata.org/en/stable/why-xarray.html). The Tilebox Python client provides access to satellite data as an [xarray.Dataset](https://docs.xarray.dev/en/stable/generated/xarray.Dataset.html#xarray.Dataset). This approach offers a great number of benefits over custom Tilebox-specific data structures: Xarray is based on NumPy and Pandas—two of the most widely used Python libraries for scientific computing. Familiarity with these libraries translates well to using Xarray. Leveraging NumPy, which is built on C and Fortran, Xarray benefits from extensive performance optimizations. This allows Xarray to efficiently handle large datasets. As a widely used library, Xarray easily integrates with many other libraries. Many third-party libraries are also available to expand Xarray's capabilities for diverse use cases. Xarray is versatile and supports a broad range of applications. It's also easy to extend with custom features. ## Example dataset To understand how Xarray functions, below is a quick a look at a sample dataset as it might be retrieved from a [Tilebox datasets](/datasets/timeseries) client. ```python Python from tilebox.datasets import Client client = Client() datasets = client.datasets() collection = datasets.open_data.copernicus.landsat8_oli_tirs.collection("L1GT") satellite_data = collection.load(("2022-05-01", "2022-06-01"), show_progress=True) print(satellite_data) ``` ```plaintext Output Size: 305kB Dimensions: (time: 514, latlon: 2) Coordinates: ingestion_time (time) datetime64[ns] 4kB 2024-07-22T09:06:43.5586... id (time) This basic dataset illustrates common use cases for Xarray. To follow along, you can [download the dataset as a NetCDF file][example_satellite_data.nc]. The [Reading and writing files section](/sdks/python/xarray#reading-and-writing-files) explains how to save and load Xarray datasets from NetCDF files. Here's an overview of the output: * The `satellite_data` **dataset** contains **dimensions**, **coordinates**, and **variables**. * The `time` **dimension** has 514 elements, indicating that there are 514 data points in the dataset. * The `time` **dimension coordinate** contains datetime values representing when the data was measured. The `*` indicates a dimension coordinate, which enables label-based indexing and alignment. * The `ingestion_time` **non-dimension coordinate** holds datetime values for when the data was ingested into Tilebox. Non-dimension coordinates carry coordinate data but are not used for label-based indexing. They can even be [multidimensional](https://docs.xarray.dev/en/stable/examples/multidimensional-coords.html). * The dataset includes 28 **variables**. * The `bands` **variable** contains integers indicating how many bands the data contains. * The `sun_elevation` **variable** contains floating-point values representing the sun's elevation when the data was measured. Explore the [xarray terminology overview](https://docs.xarray.dev/en/stable/user-guide/terminology.html) to broaden your understanding of **datasets**, **dimensions**, **coordinates**, and **variables**. ## Accessing data in a dataset ### By index You can access data in different ways. The Xarray documentation offers a [comprehensive overview](https://docs.xarray.dev/en/stable/user-guide/indexing.html) of these methods. To access the `sun_elevation` variable: ```python Accessing values # Print the first sun elevation value print(satellite_data.sun_elevation[0]) ``` ```plaintext Output Size: 8B array(44.19904463) Coordinates: ingestion_time datetime64[ns] 8B 2024-07-22T09:06:43.558629 id Size: 665B Dimensions: (latlon: 2) Coordinates: ingestion_time datetime64[ns] 8B 2024-07-22T09:06:43.558629 id Size: 2kB Dimensions: (time: 3, latlon: 2) Coordinates: ingestion_time (time) datetime64[ns] 24B 2024-07-22T09:08:24.7395... id (time) Size: 216B array([63.89629314, 63.35038654, ..., 64.37400345, 64.37400345]) Coordinates: ingestion_time (time) datetime64[ns] 216B 2024-07-22T09:06:43.558629 ...... id (time) 45) & (satellite_data.sun_elevation < 90) ) filtered_sun_elevations = satellite_data.sun_elevation[data_filter] print(filtered_sun_elevations) ``` ```plaintext Output Size: 216B array([63.89629314, 63.35038654, ..., 64.37400345, 64.37400345]) Coordinates: ingestion_time (time) datetime64[ns] 216B 2024-07-22T09:06:43.558629 ...... id (time) Selecting data by value requires unique coordinate values. In case of duplicates, you will encounter an `InvalidIndexError`. To avoid this, you can [drop duplicates](#dropping-duplicates). ```python Indexing by time specific_datapoint = satellite_data.sel(time="2022-05-01T11:28:28.249000") print(specific_datapoint) ``` ```plaintext Output Size: 665B Dimensions: (latlon: 2) Coordinates: ingestion_time datetime64[ns] 8B 2024-07-22T09:06:43.558629 id >> raises KeyError: "2022-05-01T11:28:28.000000" ``` To return the closest value instead of raising an error, specify a `method`. ```python Finding the closest data point nearest_datapoint = satellite_data.sel(time="2022-05-01T11:28:28.000000", method="nearest") assert nearest_datapoint.equals(specific_datapoint) # passes ``` ## Dropping duplicates Xarray allows you to drop duplicate values from a dataset. For example, to drop duplicate timestamps: ```python Dropping duplicates deduped = satellite_data.drop_duplicates("time") ``` De-duped datasets are required for certain operations, like [selecting data by value](#selecting-data-by-value). ## Statistics Xarray and NumPy include a wide range of statistical functions that you can apply to a dataset or DataArray. Here are some examples: ```python Computing dataset statistics cloud_cover = satellite_data.cloud_cover min_meas = cloud_cover.min().item() max_meas = cloud_cover.max().item() mean_meas = cloud_cover.mean().item() std_meas = cloud_cover.std().item() print(f"Cloud cover ranges from {min_meas:.2f} to {max_meas:.2f} with a mean of {mean_meas:.2f} and a standard deviation of {std_meas:.2f}") ``` ```plaintext Output Cloud cover ranges from 0.00 to 100.00 with a mean of 76.48 and a standard deviation of 34.17 ``` You can also directly apply many NumPy functions to datasets or DataArrays. For example, to find out how many unique bands the data contains, use [np.unique](https://numpy.org/doc/stable/reference/generated/numpy.unique.html): ```python Finding unique values import numpy as np print("Sensors:", np.unique(satellite_data.bands)) ``` ```plaintext Output Sensors: [12] ``` ## Reading and writing files Xarray provides a simple method for saving and loading datasets from files. This is useful for sharing your data or storing it for future use. Xarray supports many different file formats, including NetCDF, Zarr, GRIB, and more. For a complete list of supported formats, refer to the [official documentation page](https://docs.xarray.dev/en/stable/user-guide/io.html). To save the example dataset as a NetCDF file: You may need to install the `netcdf4` package first. ```python Saving a dataset to a file satellite_data.to_netcdf("example_satellite_data.nc") ``` This creates a file named `example_satellite_data.nc` in your current directory. You can then load this file back into memory with: ```python Loading a dataset from a file import xarray as xr satellite_data = xr.open_dataset("example_satellite_data.nc") ``` If you would like to follow along with the examples in this section, you can download the example dataset as a NetCDF file [here][example_satellite_data.nc]. ## Further reading This section covers only a few common use cases for Xarray. The library offers many more functions and features. For more information, please see the [Xarray documentation](https://xarray.pydata.org/en/stable/) or explore the [Xarray Tutorials](https://tutorial.xarray.dev/intro.html). Some useful capabilities not covered in this section include: # Caches Sharing data between tasks is crucial for workflows, especially in satellite imagery processing, where large datasets are the norm. Tilebox Workflows offers a straightforward API for storing and retrieving data from a shared cache. The cache API is currently experimental and may undergo changes in the future. Many more features and new [backends](#cache-backends) are on the roadmap. There might be breaking changes to the Cache API in the future. Caches are configured at the [task runner](/workflows/concepts/task-runners) level. Because task runners can be deployed across multiple locations, caches must be accessible from all task runners contributing to a workflow. Currently, the default cache implementation uses a Google Cloud Storage bucket, providing a scalable method to share data between tasks. For quick prototyping and local development, you can also use a local file system cache, which is included by default. If needed, you can create your own cache backend by implementing the `Cache` interface. ## Configuring a Cache You can configure a cache while creating a task runner by passing a cache instance to the `cache` parameter. To use an in-memory cache, use `tilebox.workflows.cache.InMemoryCache`. This implementation is helpful for local development and quick testing. For alternatives, see the supported [cache backends](#cache-backends). ```python Python from tilebox.workflows import Client from tilebox.workflows.cache import InMemoryCache client = Client() runner = client.runner( "dev-cluster", tasks=[...], cache=InMemoryCache(), ) ``` By configuring such a cache, the `context` object that is passed to the execution of each task gains access to a `job_cache` attribute that can be used to [store and retrieve data](#storing-and-retrieving-data) from the cache. ## Cache Backends Tilebox Workflows comes with four cache implementations out of the box, each backed by a different storage system. ### Google Storage Cache A cache implementation backed by a Google cloud Storage bucket to store and retrieve data. It's a reliable method for sharing data across tasks, suitable for production environments. You will need access to a GCP project and a bucket. The Tilebox Workflow orchestrator uses the official Python Client for Google Cloud Storage. To set up authentication, refer to the official documentation. ```python Python from tilebox.workflows import Client from tilebox.workflows.cache import GoogleStorageCache from google.cloud.storage import Client as StorageClient storage_client = StorageClient(project="gcp-project") bucket = storage_client.bucket("cache-bucket") client = Client() runner = client.runner( "dev-cluster", tasks=[...], cache=GoogleStorageCache(bucket, prefix="jobs"), ) ``` The `prefix` parameter is optional and can be used to set a common prefix for all cache keys, which helps organize objects within a bucket when re-using the same bucket for other purposes. ### Amazon S3 Cache A cache implementation backed by an Amazon S3 bucket to store and retrieve data. Like the Google Cloud Storage option, it's reliable and scalable for production use. Tilebox utilizes the `boto3` library to communicate with Amazon S3. For the necessary authentication setup, refer to [its documentation](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html#configuration). ```python Python from tilebox.workflows import Client from tilebox.workflows.cache import AmazonS3Cache client = Client() runner = client.runner( "dev-cluster", tasks=[...], cache=AmazonS3Cache("my-bucket-name", prefix="jobs") ) ``` The `prefix` parameter is optional and can be used to set a common prefix for all cache keys, which helps organize objects within a bucket when re-using the same bucket for other purposes. ### Local File System Cache A cache implementation backed by a local file system. It's suitable for quick prototyping and local development, assuming all task runners share the same machine or access the same file system. ```python Python from tilebox.workflows import Client from tilebox.workflows.cache import LocalFileSystemCache client = Client() runner = client.runner( "dev-cluster", tasks=[...], cache=LocalFileSystemCache("/path/to/cache/directory"), ) ``` Local file system caches can also be used in conjunction with network attached storage or similar tools, making it a viable option also for distributed setups. ### In-Memory Cache A simple in-memory cache useful for quick prototyping and development. The data is not shared between task runners and is lost upon task runner restarts. Use this cache only for workflows executed on a single task runner. ```python Python from tilebox.workflows import Client from tilebox.workflows.cache import InMemoryCache client = Client() runner = client.runner( "dev-cluster", tasks=[...], cache=InMemoryCache(), ) ``` ## Data Isolation Caches are isolated per job, meaning that each job's cache data is only accessible to tasks within that job. This setup prevents key conflicts between different job executions. Currently, accessing cache data of other jobs is not supported. The capability to share data across jobs is planned for future updates. This feature will be beneficial for real-time processing workflows or workflows requiring auxiliary data from external sources. ## Storing and Retrieving Data The job cache can be accessed via the `ExecutionContext` passed to a tasks `execute` function. This [`job_cache`](/api-reference/tilebox.workflows/ExecutionContext.job_cache) object provides methods to handle data storage and retrieval from the cache. The specifics of data storage depend on the chosen cache backend. The cache API is designed to be simple and can handle all types of data, supporting binary data in the form of `bytes`, identified by `str` cache keys. This allows for storing many different data types, such as pickled Python objects, serialized JSON, UTF-8, or binary data. The following snippet illustrates storing and retrieving data from the cache. ```python Python from tilebox.workflows import Task, ExecutionContext class ProducerTask(Task): def execute(self, context: ExecutionContext) -> None: # store data in the cache context.job_cache["data"] = b"my_binary_data_to_store" context.submit_subtask(ConsumerTask()) class ConsumerTask(Task): def execute(self, context: ExecutionContext) -> None: data = context.job_cache["data"] print(f"Read {data} from cache") ``` In this example, data stored under the key `"data"` can be any size that fits the cache backend constraints. Ensure the key remains unique within the job's scope to avoid conflicts. To test the workflow, you can start a local task runner using the `InMemoryCache` backend. Then, submit a job to execute the `ProducerTask` and observe the output of the `ConsumerTask`. ```python Python # submit a job to test our workflow job_client = client.jobs() job_client.submit("testing-cache-access", ProducerTask(), cluster="dev-cluster") # start a runner to execute it runner = client.runner( "dev-cluster", tasks=[ProducerTask, ConsumerTask], cache=LocalFileSystemCache("/path/to/cache/directory"), ) runner.run_forever() ``` ```plaintext Output Read b'my_binary_data_to_store' from cache ``` ## Groups and Hierarchical Keys The cache API supports groups and hierarchical keys, analogous to directories and files in a file system. Groups help organize cache keys hierarchically, preventing key conflicts and allowing data to be structured better. Additionally, groups are iterable, enabling retrieval of all keys within the group. This feature is useful when multiple tasks create cache data, and a later task needs to list all produced data by earlier tasks. The following code shows an example of how cache groups can be used. ```python Python {39,44-45} from tilebox.workflows import Task, ExecutionContext import random class CacheGroupDemoWorkflow(Task): n: int def execute(self, context: ExecutionContext): # define a cache group key for subtasks group_key = "random_numbers" produce_numbers = context.submit_subtask( ProduceRandomNumbers(self.n, group_key) ) sum_task = context.submit_subtask( PrintSum(group_key), depends_on=[produce_numbers], ) class ProduceRandomNumbers(Task): n: int group_key: str def execute(self, context: ExecutionContext): for i in range(self.n): context.submit_subtask(ProduceRandomNumber(i, self.group_key)) class ProduceRandomNumber(Task): index: int group_key: str def execute(self, context: ExecutionContext) -> None: number = random.randint(0, 100) group = context.job_cache.group(self.group_key) group[f"key_{self.index}"] = str(number).encode() class PrintSum(Task): group_key: str def execute(self, context: ExecutionContext) -> None: group = context.job_cache.group(self.group_key) # PrintSum doesn't know how many numbers were produced, # instead it iterates over all keys in the cache group numbers = [] for key in group: # iterate over all stored numbers number = group[key] # read data from cache numbers.append(int(number.decode())) # convert bytes back to int print(f"Sum of all numbers: {sum(numbers)}") ``` Submitting a job of the `CacheGroupDemoWorkflow` and running it with a task runner can be done as follows: ```python Python # submit a job to test our workflow job_client = client.jobs() job_client.submit("cache-groups", CacheGroupDemoWorkflow(5), cluster="dev-cluster") # start a runner to execute it runner = client.runner( "dev-cluster", tasks=[CacheGroupDemoWorkflow, ProduceRandomNumbers, ProduceRandomNumber, PrintSum], cache=LocalFileSystemCache("/path/to/cache/directory"), ) runner.run_forever() ``` ```plaintext Output Sum of all numbers: 284 ``` # Clusters Clusters are a logical grouping for [task runners](/workflows/concepts/task-runners). Using clusters, you can scope certain tasks to a specific group of task runners. Tasks, which are always submitted to a specific cluster, are only executed on task runners assigned to the same cluster. ## Use Cases Use clusters to organize [task runners](/workflows/concepts/clusters) into logical groups, which can help with: * Targeting specific task runners for a particular job * Reserving a group of task runners for specific purposes, such as running certain types of batch jobs * Setting up different clusters for different environments (like development and production) Even when using different clusters, task runners within the same cluster may still have different capabilities, such as different registered tasks. If multiple task runners have the same set of registered tasks, you can assign them to different clusters to target specific task runners for a particular job. ### Adding Task Runners to a Cluster You can add task runners to a cluster by specifying the [cluster's slug](#cluster-slug) when [registering a task runner](/workflows/concepts/task-runners). Each task runner must always be assigned to a cluster. ## Managing Clusters Before registering a task runner or submitting a job, you must create a cluster. You can also list, fetch, and delete clusters as needed. The following sections explain how to do this. To manage clusters, first instantiate a cluster client using the `clusters` method in the workflows client. ```python Python from tilebox.workflows import Client client = Client() clusters = client.clusters() ``` ### Creating a Cluster To create a cluster, use the `create` method on the cluster client and provide a name for the cluster. ```python Python cluster = clusters.create("testing") print(cluster) ``` ```plaintext Output Cluster(slug='testing-CvufcSxcC9SKfe', display_name='testing') ``` ### Cluster Slug Each cluster has a unique identifier, combining the cluster's name and an automatically generated identifier. Use this slug to reference the cluster for other operations, like submitting a job or subtasks. ### Listing Clusters To list all available clusters, use the `all` method: ```python Python all_clusters = clusters.all() print(all_clusters) ``` ```plaintext Output [Cluster(slug='testing-CvufcSxcC9SKfe', display_name='testing'), Cluster(slug='production-EifhUozDpwAJDL', display_name='Production')] ``` ### Fetching a Specific Cluster To fetch a specific cluster, use the `find` method and pass the cluster's slug: ```python Python cluster = clusters.find("testing-CvufcSxcC9SKfe") print(cluster) ``` ```plaintext Output Cluster(slug='testing-CvufcSxcC9SKfe', display_name='testing') ``` ### Deleting a Cluster To delete a cluster, use the `delete` method and pass the cluster's slug: ```python Python clusters.delete("testing-CvufcSxcC9SKfe") ``` ## Jobs Across Different Clusters When [submitting a job](/workflows/concepts/jobs), you need to specify which cluster the job's root task should be executed on. This allows you to direct the job to a specific set of task runners. By default, all sub-tasks within a job are also submitted to the same cluster, but this can be overridden to submit sub-tasks to different clusters if needed. See the example below for a job that spans across multiple clusters. ```python Python from tilebox.workflows import Task, ExecutionContext, Client class MultiClusterWorkflow(Task): def execute(self, context: ExecutionContext) -> None: # this submits a task to the same cluster as the one currently executing this task same_cluster = context.submit_subtask(DummyTask()) other_cluster = context.submit_subtask( DummyTask(), # this task runs only on a task runner in the "other-cluster" cluster cluster="other-cluster-As3dcSb3D9SAdK", # dependencies can be specified across clusters depends_on=[same_cluster], ) class DummyTask(Task): def execute(self, context: ExecutionContext) -> None: pass # submit a job to the "testing" cluster client = Client() job_client = client.jobs() job = job_client.submit( MultiClusterWorkflow(), cluster="testing-CvufcSxcC9SKfe", ) ``` This workflow requires at least two task runners to complete. One must be in the "testing" cluster, and the other must be in the "other-cluster" cluster. If no task runners are available in the "other-cluster," the task submitted to that cluster will remain queued until a task runner is available. It won't execute on a task runner in the "testing" cluster, even if the task runner has the `DummyTask` registered. # Jobs A job is a specific execution of a workflow with designated input parameters. It consists of one or more tasks that can run in parallel or sequentially, based on their dependencies. Submitting a job involves creating a root task with specific input parameters, which may trigger the execution of other tasks within the same job. ## Submission To execute a [task](/workflows/concepts/tasks), it must be initialized with concrete inputs and submitted as a job. The task will then run within the context of the job, and if it generates sub-tasks, those will also execute as part of the same job. After submitting a job, the root task is scheduled for execution, and any [eligible task runner](/workflows/concepts/task-runners#task-selection) can pick it up and execute it. First, instantiate a job client by calling the `jobs` method on the workflow client. ```python Python from tilebox.workflows import Client client = Client() job_client = client.jobs() ``` After obtaining a job client, submit a job using the [submit](/api-reference/tilebox.workflows/JobClient.submit) method. You need to provide a name for the job, an instance of the root [task](/workflows/concepts/tasks), and a [cluster](/workflows/concepts/clusters) to execute the root task on. ```python Python # import your own workflow from my_workflow import MyTask cluster = "dev-cluster" job = job_client.submit('my-job', MyTask("some", "parameters"), cluster) ``` Once a job is submitted, it's immediately scheduled for execution. The root task will be picked up and executed as soon as an [eligible task runner](/workflows/concepts/task-runners#task-selection) is available. ## Retry Handling [Tasks support retry handling](/workflows/concepts/tasks#retry-handling) for failed executions. This applies to the root task of a job as well, where you can specify the number of retries using the `max_retries` argument of the `submit` method. ```python Python from my_workflow import MyFlakyTask cluster = "dev-cluster" job = job_client.submit('my-job', MyFlakyTask(), cluster, max_retries=5) ``` In this example, if `MyFlakyTask` fails, it will be retried up to five times before being marked as failed. ## Retrieving a specific job When you submit a job, it's assigned a unique identifier that can be used to retrieve it later. You can use the `find` method on the job client to get a job by its ID. ```python Python job = job_client.submit('my-job', MyTask("some", "parameters"), cluster) print(job.id) # 018dd029-58ca-74e5-8b58-b4f99d610f9a # Later, in another process or machine, retrieve job info job = job_client.find("018dd029-58ca-74e5-8b58-b4f99d610f9a") ``` `find` is also a useful tool for fetching a jobs state after a while, to check if it's still running or has already completed. ## Visualization Visualizing the execution of a job can be helpful. The Tilebox workflow orchestrator tracks all tasks in a job, including [sub-tasks](/workflows/concepts/tasks#task-composition-and-subtasks) and [dependencies](/workflows/concepts/tasks#dependencies). This enables the visualization of the execution of a job as a graph diagram. `display` is designed for use in an [interactive environment](/sdks/python/sample-notebooks#interactive-environments) such as a Jupyter notebook. In non-interactive environments, use [visualize](/api-reference/tilebox.workflows/JobClient.visualize), which returns the rendered diagram as an SVG string. ```python Python job = job_client.find("some-job-id") # or a recently submitted job # Then visualize it job_client.display(job) ``` The following diagram represents the job execution as a graph. Each task is shown as a node, with edges indicating sub-task relationships. The diagram also uses color coding to display the status of each task. Color coding of task states Color coding of task states The color codes for task states are: | Task State | Color | Description | | ---------- | -------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------- | | Queued | SalmonYellow | The task is queued and waiting for execution. | | Running | Blue | The task is currently being executed. | | Computed | Green | The task has successfully been computed. If a task is computed, and all it's sub-tasks are also computed, the task is considered completed. | | Failed | Red | The task has been executed but encountered an error. | Below is another visualization of a job currently being executed by multiple task runners. Job being executed by multiple runners Job being executed by multiple runners This visualization shows: * The root task, `MyTask`, has executed and spawned three sub-tasks. * At least three task runners are available, as three tasks currently are executed simultaneously. * The `SubTask` that is still executing has not generated any sub-tasks yet, as sub-tasks are queued for execution only after the parent task finishes and becomes computed. * The queued `DependentTask` requires the `LeafTask` to complete before it can be executed. Job visualizations are meant for development and debugging. They are not suitable for large jobs with hundreds of tasks, as the diagrams may become too complex. Currently, visualizations are limited to jobs with a maximum of 200 tasks. ### Customizing Task Display Names The text representing a task in the diagram defaults to a tasks class name. You can customize this by modifying the `display` field of the `current_task` object in the task's execution context. The maximum length for a display name is 1024 characters, with any overflow truncated. Line breaks using `\n` are supported as well. ```python Python from tilebox.workflows import Task, ExecutionContext class RootTask(Task): num_subtasks: int def execute(self, context: ExecutionContext): context.current_task.display = f"Root({self.num_subtasks})" for i in range(self.num_subtasks): context.submit_subtask(SubTask(i)) class SubTask(Task): index: int def execute(self, context: ExecutionContext): context.current_task.display = f"Leaf Nr. {self.index}" job = job_client.submit('custom-display-names', RootTask(3), "dev-cluster") job_client.display(job) ``` Customize Tasks Display Names Customize Tasks Display Names ## Cancellation You can cancel a job at any time. When a job is canceled, no queued tasks will be picked up by task runners and executed even if task runners are idle. Tasks that are already being executed will finish their execution and not be interrupted. All sub-tasks spawned from such tasks after the cancellation will not be picked up by task runners. Use the `cancel` method on the job client to cancel a job. ```python Python job = job_client.submit('my-job', MyTask(), "dev-cluster") # After a short while, the job gets canceled job_client.cancel(job) ``` A canceled job can be resumed at any time by [retrying](#retries) it. If any task in a job fails, the job is automatically canceled to avoid executing irrelevant tasks. Future releases will allow configuring this behavior for each task to meet specific requirements. ## Retries If a task fails due to a bug or lack of resources, there is no need to resubmit the entire job. You can simply retry the job, and it will resume from the point of failure. This ensures that all the work that was already done up until the point of the failure isn't lost. Future releases may introduce automatic retries for certain failure conditions, which can be useful for handling temporary issues. Below is an example of a failing job due to a bug in the task's implementation. The following workflow processes a list of movie titles and queries the [OMDb API](http://www.omdbapi.com/) for each movie's release date. ```python Python from urllib.parse import urlencode import httpx from tilebox.workflows import Task, ExecutionContext class MoviesStats(Task): titles: list[str] def execute(self, context: ExecutionContext) -> None: for title in self.titles: context.submit_subtask(PrintMovieStats(title)) class PrintMovieStats(Task): title: str def execute(self, context: ExecutionContext) -> None: params = {"t": self.title, "apikey": ""} url = "http://www.omdbapi.com/?" + urlencode(params) response = httpx.get(url).json() # set the display name of the task to the title of the movie: context.current_task.display = response["Title"] print(f"{response['Title']} was released on {response['Released']}") ``` Submitting the workflow as a job reveals a bug in the `PrintMovieStats` task. ```python Python job = job_client.submit('movies-stats', MoviesStats([ "The Matrix", "Shrek 2", "Tilebox - The Movie", "The Avengers", ]), "dev-cluster") job_client.display(job) ``` Job that failed due to a bug Job that failed due to a bug One of the `PrintMovieStats` tasks fails with a `KeyError`. This error occurs when a movie title is not found by the [OMDb API](http://www.omdbapi.com/), leading to a response without the `Title` and `Released` fields. Console output from the task runners confirms this: ```plaintext Output The Matrix was released on 31 Mar 1999 Shrek 2 was released on 19 May 2004 ERROR: Task PrintMovieStats failed with exception: KeyError('Title') ``` The corrected version of `PrintMovieStats` is as follows: ```python Python class PrintMovieStats(Task): title: str def execute(self, context: ExecutionContext) -> None: params = {"t": self.title, "apikey": ""} url = "http://www.omdbapi.com/?" + urlencode(params) response = httpx.get(url).json() if "Title" in response and "Released" in response: context.current_task.display = response["Title"] print(f"{response['Title']} was released on {response['Released']}") else: context.current_task.display = f"NotFound: {self.title}" print(f"Could not find the release date for {self.title}") ``` With this fix, and after redeploying the task runners with the updated `PrintMovieStats` implementation, you can retry the job: ```python Python job_client.retry(job) job_client.display(job) ``` Job retried successfully Job retried successfully Now the console output shows: ```plaintext Output Could not find the release date for Tilebox - The Movie The Avengers was released on 04 May 2012 ``` The output confirms that only two tasks were executed, resuming from the point of failure instead of re-executing all tasks. The job was retried and succeeded. The two tasks that completed before the failure were not re-executed. # Task Runners Task runners are the execution agents within the Tilebox Workflows ecosystem that execute tasks. They can be deployed in different computing environments, including on-premise servers and cloud-based auto-scaling clusters. Task runners execute tasks as scheduled by the workflow orchestrator, ensuring they have the necessary resources and environment for effective execution. ## Implementing a Task Runner A task runner is a continuously running process that listens for new tasks to execute. You can start multiple task runner processes to execute tasks concurrently. When a task runner receives a task, it executes it and reports the results back to the workflow orchestrator. The task runner also handles any errors that occur during task execution, reporting them to the orchestrator as well. To execute a task, at least one task runner must be running and available. If no task runners are available, tasks will remain queued until one becomes available. To create and start a task runner, follow these steps: Instantiate a client connected to the Tilebox Workflows API. Select or create a [cluster](/workflows/concepts/clusters) and specify its slug when creating a task runner. Register tasks by specifying the task classes that the task runner can execute as a list to the `runner` method. Call the `run_forever` method of the task runner to listen for new tasks until the task runner process is shut down. Here is a simple example demonstrating these steps: ```python Python from tilebox.workflows import Client # your own workflow: from my_workflow import MyTask, OtherTask def main(): client = Client() # 1. connect to the Tilebox Workflows API cluster = "dev-cluster" # 2. select a cluster to join runner = client.runner( cluster, tasks=[MyTask, OtherTask] # 3. register tasks ) runner.run_forever() # 4. listen for new tasks to execute if __name__ == "__main__": main() ``` To start the task runner locally, run it as a Python script: ```bash > python task_runner.py ``` ## Task Selection For a task runner to pick up a submitted task, the following conditions must be met: 1. The [cluster](/workflows/concepts/clusters) where the task was submitted must match the task runner's cluster. 2. The task runner must have a registered task that matches the [task identifier](/workflows/concepts/tasks#task-identifiers) of the submitted task. 3. The version of the task runner's registered task must be [compatible](/workflows/concepts/tasks#semantic-versioning) with the submitted task's version. If a task meets these conditions, the task runner executes it. Otherwise, the task runner remains idle until a matching task is available. Often, multiple submitted tasks match the conditions for execution. In that case, the task runner selects one of the tasks to execute, and the remaining tasks stay in a queue until the selected task is completed or another task runner becomes available. ## Parallelism You can start multiple task runner instances in parallel to execute tasks concurrently. Each task runner listens for new tasks and executes them as they become available. This allows for high parallelism and can be used to scale the execution of tasks to handle large workloads. To test this, run multiple instances of the task runner script in different terminal windows on your local machine, or use a tool like [call-in-parallel](https://github.com/tilebox/call-in-parallel) to start the task runner script multiple times. For example, to start five task runners in parallel, use the following command: ```bash > call-in-parallel -n 5 -- python task_runner.py ``` ## Deploying Task Runners Task runners are continuously running processes that can be deployed in different computing environments. The only requirement for deploying task runners is access to the Tilebox Workflows API. Once this is met, task runners can be deployed in many different environments, including: * On-premise servers * Cloud-based virtual machines * Cloud-based auto-scaling clusters ## Scaling One key benefit of task runners is their **ability to scale even while workflows are executing**. You can start new task runners at any time, and they can immediately pick up queued tasks to execute. It's not necessary to have an entire processing cluster available at the start of a workflow, as task runners can be started and stopped as needed. This is particularly beneficial in cloud environments, where task runners can be automatically started and stopped based on current workload, measured by metrics such as CPU usage. Here's an example scenario: 1. A single instance of a task runner is actively waiting for work in a cloud environment. 2. A large workload is submitted to the workflow orchestrator, prompting the task runner to pick up the first task. 3. The first task creates new sub-tasks for processing, which the task runner also picks up. 4. As the workload increases, the task runner's CPU usage rises, triggering the cloud environment to automatically start new task runner instances. 5. Newly started task runners begin executing queued tasks, distributing the workload among all available task runners. 6. Once the workload decreases, the cloud environment automatically stops some task runners. 7. The first task runner completes the remaining work until everything is done. 8. The first task runner remains idle until new tasks arrive. CPU usage-based auto scaling is just one method to scale task runners. Other metrics, such as memory usage or network bandwidth, are also supported by many cloud environments. In a future release, configuration options for scaling task runners based on custom metrics (for example the number of queued tasks) are planned. ## Distributed Execution Task runners can be distributed across different compute environments. For instance, some data stored on-premise may need pre-processing, while further processing occurs in the cloud. A job might involve tasks that filter relevant on-premise data and publish it to the cloud, and other tasks that read data from the cloud and process it. In such scenarios, a task runners can run on-premise and another in a cloud environments, resulting in them effectively collaborating on the same job. Another advantage of distributed task runners is executing workflows that require specific hardware for certain tasks. For example, one task might need a GPU, while another requires extensive memory. Here's an example of a distributed workflow: ```python Python from tilebox.workflows import Task, ExecutionContext class DistributedWorkflow(Task): def execute(self, context: ExecutionContext) -> None: download_task = context.submit_subtask(DownloadData()) process_task = context.submit_subtask( ProcessData(), depends_on=[download_task], ) class DownloadData(Task): """ Download a dataset and store it in a shared internal bucket. Requires a good network connection for high download bandwidth. """ def execute(self, context: ExecutionContext) -> None: pass class ProcessData(Task): """ Perform compute-intensive processing of a dataset. The dataset must be available in an internal bucket. Requires access to a GPU for optimal performance. """ def execute(self, context: ExecutionContext) -> None: pass ``` To achieve distributed execution for this workflow, no single task runner capable of executing all three of the tasks is set up. Instead, two task runners, each capable of executing one of the tasks are set up: one in a high-speed network environment and the other with GPU access. When the distributed workflow runs, the first task runner picks up the `DownloadData` task, while the second picks up the `ProcessData` task. The `DistributedWorkflow` does not require specific hardware, so it can be registered with both runners and executed by either one. ```python download_task_runner.py from tilebox.workflows import Client client = Client() high_network_speed_runner = client.runner( "dev-cluster", tasks=[DownloadData, DistributedWorkflow] ) high_network_speed_runner.run_forever() ``` ```python gpu_task_runner.py from tilebox.workflows import Client client = Client() gpu_runner = client.runner( "dev-cluster", tasks=[ProcessData, DistributedWorkflow] ) gpu_runner.run_forever() ``` Now, both `download_task_runner.py` and `gpu_task_runner.py` are started, in parallel, on different machines with the required hardware for each. When `DistributedWorkflow` is submitted, it executes on one of the two runners, and it's submitted sub-tasks are handled by the appropriate runner. In this case, since `ProcessData` depends on `DownloadData`, the GPU task runner remains idle until the download completion, then picks up the processing task. You can also differentiate between task runners by specifying different [clusters](/workflows/concepts/clusters) and choosing specific clusters for sub-task submissions. For more details, see the [Clusters](/workflows/concepts/clusters) section. ## Task Failures If an unhandled exception occurs during task execution, the task runner captures it and reports it back to the workflow orchestrator. The orchestrator then marks the task as failed, leading to [job cancellation](/workflows/concepts/jobs#cancellation) to prevent further tasks of the same job-that may not be relevant anymore-from being executed. A task failure does not result in losing all previous work done by the job. If the failure is fixable—by fixing a bug in a task implementation, ensuring the task has necessary resources, or simply retrying it due to a flaky network connection—it may be worth [retrying](/workflows/concepts/jobs#retries) the job. When retrying a job, all failed tasks are added back to the queue, allowing a task runner to potentially execute them. If execution then succeeds, the job continues smoothly. Otherwise, the task will remain marked as failed and can be retried again if desired. If fixing a failure requires modifying the task implementation, it's important to deploy the updated version to the [task runners](/workflows/concepts/task-runners) before retrying the job. Otherwise, a task runner could pick up the original, faulty implementation again, leading to another failure. ## Task idempotency Since a task may be retried, it's possible that a task is executed more than once. Depending on where in the execution of the task it failed, it may have already performed some side effects, such as writing to a database, or sending a message to a queue. Because of that it's crucial to ensure that tasks are [idempotent](https://en.wikipedia.org/wiki/Idempotence). Idempotent tasks can be executed multiple times without altering the outcome beyond the first successful execution. A special case of idempotency involves submitting sub-tasks. After a task calls `context.submit_subtask` and then fails and is retried, those submitted sub-tasks of an earlier failed execution are automatically removed, ensuring that they can be safely submitted again when the task is retried. ## Runner Crashes Tilebox Workflows has an internal mechanism to handle unexpected task runner crashes. When a task runner picks up a task, it periodically sends a heartbeat to the workflow orchestrator. If the orchestrator does not receive this heartbeat for a defined duration, it marks the task as failed and automatically attempts to [retry](/workflows/concepts/jobs#retries) it up to 10 times. This allows another task runner to pick up the task and continue executing the job. This mechanism ensures that scenarios such as power outages, hardware failures, or dropped network connections are handled effectively, preventing any task from remaining in a running state indefinitely. ## Observability Task runners are continuously running processes, making it essential to monitor their health and performance. You can achieve observability by collecting and analyzing logs, metrics, and traces from task runners. Tilebox Workflows provides tools to enable this data collection and analysis. To learn how to configure task runners for observability, head over to the [Observability](/workflows/observability) section. # Understanding and Creating Tasks A Task is the smallest unit of work, designed to perform a specific operation. Each task represents a distinct operation or process that can be executed, such as processing data, performing calculations, or managing resources. Tasks can operate independently or as components of a more complex set of connected tasks known as a Workflow. Tasks are defined by their code, inputs, and dependencies on other tasks. To create tasks, you need to define the input parameters and specify the action to be performed during execution. ## Creating a Task To create a task in Tilebox, define a class that extends the `Task` base class and implements the `execute` method. The `execute` method is the entry point for the task where its logic is defined. It's called when the task is executed. ```python Python from tilebox.workflows import Task, ExecutionContext class MyFirstTask(Task): def execute(self, context: ExecutionContext): print(f"Hello World!") ``` This example demonstrates a simple task that prints "Hello World!" to the console. The key components of this task are: `MyFirstTask` is a subclass of the `Task` class, which serves as the base class for all defined tasks. It provides the essential structure for a task. Inheriting from `Task` automatically makes the class a `dataclass`, which is useful [for specifying inputs](#input-parameters). Additionally, by inheriting from `Task`, the task is automatically assigned an [identifier based on the class name](#task-identifiers). The `execute` method is the entry point for executing the task. This is where the task's logic is defined. It's invoked by a [task runner](/workflows/concepts/task-runners) when the task runs and performs the task's operation. The `context` argument is an `ExecutionContext` instance that provides access to an [API for submitting new tasks](/api-reference/tilebox.workflows/ExecutionContext.submit_subtask) as part of the same job and features like [shared caching](/api-reference/tilebox.workflows/ExecutionContext.job_cache). The code samples on this page do not illustrate how to execute the task. That will be covered in the [next section on task runners](/workflows/concepts/task-runners). The reason for that is that executing tasks is a separate concern from implementing tasks. ## Input Parameters Tasks often require input parameters to operate. These inputs can range from simple values to complex data structures. By inheriting from the `Task` class, the task is treated as a Python `dataclass`, allowing input parameters to be defined as class attributes. Tasks must be **serializable to JSON** because they may be distributed across a cluster of [task runners](/workflows/concepts/task-runners). Supported types for input parameters include: * Basic types such as `str`, `int`, `float`, `bool` * Lists and dictionaries of basic types * Nested data classes that are also JSON-serializable ```python Python class ParametrizableTask(Task): message: str number: int data: dict[str, str] def execute(self, context: ExecutionContext): print(self.message * self.number) task = ParametrizableTask("Hello", 3, {"key": "value"}) ``` ## Task Composition and subtasks Until now, tasks have performed only a single operation. But tasks can be more powerful. **Tasks can submit other tasks as subtasks.** This allows for a modular workflow design, breaking down complex operations into simpler, manageable parts. Additionally, the execution of subtasks is automatically parallelized whenever possible. ```python Python class ParentTask(Task): num_subtasks: int def execute(self, context: ExecutionContext) -> None: for i in range(self.num_subtasks): context.submit_subtask(ChildTask(i)) class ChildTask(Task): index: int def execute(self, context: ExecutionContext) -> None: print(f"Executing ChildTask {self.index}") # after submitting this task, a task runner may pick it up and execute it # which will result in 5 ChildTasks being submitted and executed as well task = ParentTask(5) ``` In this example, a `ParentTask` submits `ChildTask` tasks as subtasks. The number of subtasks to be submitted is based on the `num_subtasks` attribute of the `ParentTask`. The `submit_subtask` method takes an instance of a task as its argument, meaning the task to be submitted must be instantiated with concrete parameters first. By submitting a task as a subtask, its execution is scheduled as part of the same job as the parent task. Compared to just directly invoking the subtask's `execute` method, this allows the subtask's execution to occur on a different machine or in parallel with other subtasks. To learn more about how tasks are executed, see the section on [task runners](/workflows/concepts/task-runners). ### Larger subtasks example A practical workflow example showcasing task composition might help illustrate the capabilities of tasks. Below is an example of a set of tasks forming a workflow capable of downloading a set number of random dog images from the internet. The [Dog API](https://thedogapi.com/) can be used to get the image URLs, and then download them. Implementing this using Task Composition could look like this: ```python Python import httpx # pip install httpx from pathlib import Path class DownloadRandomDogImages(Task): num_images: int def execute(self, context: ExecutionContext) -> None: url = f"https://api.thedogapi.com/v1/images/search?limit={self.num_images}" response = httpx.get(url) for dog_image in response.json(): context.submit_subtask(DownloadImage(dog_image["url"])) class DownloadImage(Task): url: str def execute(self, context: ExecutionContext) -> None: file = Path("dogs") / self.url.split("/")[-1] response = httpx.get(self.url) with file.open("wb") as file: file.write(response.content) ``` This example consists of the following tasks: `DownloadRandomDogImages` fetches a specific number of random dog image URLs from an API. It then submits a `DownloadImage` task for each received image URL. `DownloadImage` downloads an image from a specified URL and saves it to a file. Together, these tasks create a workflow that downloads random dog images from the internet. The relationship between the two tasks and their formation as a workflow becomes clear when `DownloadRandomDogImages` submits `DownloadImage` tasks as subtasks. Visualizing the execution of such a workflow is akin to a tree structure where the `DownloadRandomDogImages` task is the root, and the `DownloadImage` tasks are the leaves. For instance, when downloading five random dog images, the following tasks are executed. ```python Python from tilebox.workflows import Client client = Client() jobs = client.jobs() job = jobs.submit( "download-dog-images", DownloadRandomDogImages(5), "dev-cluster", ) # now our deployed task runners will pick up the task and execute it jobs.display(job) ``` Download Dog Images Workflow Download Dog Images Workflow In total, six tasks are executed: the `DownloadRandomDogImages` task and five `DownloadImage` tasks. The `DownloadImage` tasks can execute in parallel, as they are independent. If more than one task runner is available, the Tilebox Workflow Orchestrator **automatically parallelizes** the execution of these tasks. Check out [job\_client.display](/workflows/concepts/jobs#visualization) to learn how this visualization was automatically generated from the task executions. Currently, a limit of `64` subtasks per task is in place to discourage creating workflows where individual parent tasks submit a large number of subtasks, which can lead to performance issues since those parent tasks are not parallelized. If you need to submit more than `64` subtasks, consider using [recursive subtask submission](#recursive-subtasks) instead. ## Recursive subtasks Tasks can submit other tasks as subtasks, allowing for complex workflows. Sometimes the input to a task is a list, with elements that can be **mapped** to individual subtasks, whose outputs are then aggregated in a **reduce** step. This pattern is commonly known as **MapReduce**. Often times the initial map step—submitting the individual subtasks—might already be an expensive operation. Since this is executed within a single task, it's not parallelizable, which can bottleneck the entire workflow. Fortunately, Tilebox Workflows offers a solution through **recursive subtask submission**. A task can submit instances of itself as subtasks, enabling a recursive breakdown into smaller tasks. For example, the `RecursiveTask` below is a valid task that submits smaller instances of itself as subtasks. ```python Python class RecursiveTask(Task): num: int def execute(self, context: ExecutionContext) -> None: print(f"Executing RecursiveTask with num={self.num}") if self.num >= 2: context.submit_subtask(RecursiveTask(self.num // 2)) ``` ### Recursive subtask example An example for this is the [random dog images workflow](#larger-subtasks-example) mentioned earlier. In the previous implementation, downloading images was already parallelized. But the initial orchestration of the individual download tasks was not parallelized, because `DownloadRandomDogImages` was responsible for fetching all random dog image URLs and only submitted the individual download tasks once all URLs were retrieved. For a large number of images this setup can bottleneck the entire workflow. To improve this, recursive subtask submission decomposes a `DownloadRandomDogImages` task with a high number of images into two smaller `DownloadRandomDogImages` tasks, each fetching half. This process can be repeated until a specified threshold is met, at which point the Dog API can be queried directly for image URLs. That way, image downloads start as soon as the first URLs are retrieved, without initial waiting. An implementation of this recursive submission may look like this: ```python Python class DownloadRandomDogImages(Task): num_images: int def execute(self, context: ExecutionContext) -> None: if self.num_images > 4: half = self.num_images // 2 remaining = self.num_images - half # account for odd numbers context.submit_subtask(DownloadRandomDogImages(half)) context.submit_subtask(DownloadRandomDogImages(remaining)) else: url = f"https://api.thedogapi.com/v1/images/search?limit={self.num_images}" response = httpx.get(url) for dog_image in response.json()[:self.num_images]: context.submit_subtask(DownloadImage(dog_image["url"])) ``` With this implementation, downloading a large number of images (for example, 9) results in the following tasks being executed: Download Dog Images Workflow implemented recursively Download Dog Images Workflow implemented recursively ## Retry Handling By default, when a task fails to execute, it's marked as failed. In some cases, it may be useful to retry the task multiple times before marking it as a failure. This is particularly useful for tasks dependent on external services that might be temporarily unavailable. Tilebox Workflows allows you to specify the number of retries for a task using the `max_retries` argument of the `submit_subtask` method. Check out the example below to see how this might look like in practice. A failed task may be picked up by any available runner and not necessarily the same one that it failed on. ```python Python import random class RootTask(Task): def execute(self, context: ExecutionContext) -> None: context.submit_subtask(FlakyTask(), max_retries=5) class FlakyTask(Task): def execute(self, context: ExecutionContext) -> None: print(f"Executing FlakyTask") if random.random() < 0.1: raise Exception("FlakyTask failed randomly") ``` ## Dependencies Tasks often rely on other tasks. For example, a task that processes data might depend on a task that fetches that data. **Tasks can express their dependencies on other tasks** by using the `depends_on` argument of the [submit\_subtask](/api-reference/tilebox.workflows/ExecutionContext.submit_subtask) method. This means that a dependent task will only execute after the task it relies on has successfully completed. The `depends_on` argument accepts a list of tasks, enabling a task to depend on multiple other tasks. A workflow with dependencies might look like this: ```python Python class RootTask(Task): def execute(self, context: ExecutionContext) -> None: first_task = context.submit_subtask( PrintTask("Executing first") ) second_task = context.submit_subtask( PrintTask("Executing second"), depends_on=[first_task], ) third_task = context.submit_subtask( PrintTask("Executing last"), depends_on=[second_task], ) class PrintTask(Task): message: str def execute(self, context: ExecutionContext) -> None: print(self.message) ``` The `RootTask` submits three `PrintTask` tasks as subtasks. These tasks depend on each other, meaning the second task executes only after the first task has successfully completed, and the third only executes after the second completes. The tasks are executed sequentially. If a task upon which another task depends submits subtasks, those subtasks must also execute before the dependent task begins execution. ### Dependencies Example A practical example is a workflow that fetches news articles from an API and processes them using the [News API](https://newsapi.org/). ```python Python from pathlib import Path import json from collections import Counter import httpx # pip install httpx class NewsWorkflow(Task): category: str max_articles: int def execute(self, context: ExecutionContext) -> None: fetch_task = context.submit_subtask(FetchNews(self.category, self.max_articles)) context.submit_subtask(PrintHeadlines(), depends_on=[fetch_task]) context.submit_subtask(MostFrequentAuthors(), depends_on=[fetch_task]) class FetchNews(Task): category: str max_articles: int def execute(self, context: ExecutionContext) -> None: url = f"https://newsapi.org/v2/top-headlines?category={self.category}&pageSize={self.max_articles}&country=us&apiKey=API_KEY" news = httpx.get(url).json() # check out our documentation page on caches to learn # about a better way of passing data between tasks Path("news.json").write_text(json.dumps(news)) class PrintHeadlines(Task): def execute(self, context: ExecutionContext) -> None: news = json.loads(Path("news.json").read_text()) for article in news["articles"]: print(f"{article['publishedAt'][:10]}: {article['title']}") class MostFrequentAuthors(Task): def execute(self, context: ExecutionContext) -> None: news = json.loads(Path("news.json").read_text()) authors = [article["author"] for article in news["articles"]] for author, count in Counter(authors).most_common(): print(f"Author {author} has written {count} articles") # now submit a job, and then visualize it job = job_client.submit("process-news", NewsWorkflow(category="science", max_articles=5), "dev-cluster" ) ``` ```plaintext Output 2024-02-15: NASA selects ultraviolet astronomy mission but delays its launch two years - SpaceNews 2024-02-15: SpaceX launches Space Force mission from Cape Canaveral - Orlando Sentinel 2024-02-14: Saturn's largest moon most likely uninhabitable - Phys.org 2024-02-14: AI Unveils Mysteries of Unknown Proteins' Functions - Neuroscience News 2024-02-14: Anthropologists' research unveils early stone plaza in the Andes - Phys.org Author Jeff Foust has written 1 articles Author Richard Tribou has written 1 articles Author Jeff Renaud has written 1 articles Author Neuroscience News has written 1 articles Author Science X has written 1 articles ``` Process News Workflow Process News Workflow This workflow consists of four tasks: | Task | Dependencies | Description | | ------------------- | ------------ | ----------------------------------------------------------------------------------------------------------------------- | | NewsWorkflow | - | The root task of the workflow. It spawns the other tasks and sets up the dependencies between them. | | FetchNews | - | A task that fetches news articles from the API and writes the results to a file, which is then read by dependent tasks. | | PrintHeadlines | FetchNews | A task that prints the headlines of the news articles to the console. | | MostFrequentAuthors | FetchNews | A task that counts the number of articles each author has written and prints the result to the console. | An important aspect is that there is no dependency between the `PrintHeadlines` and `MostFrequentAuthors` tasks. This means they can execute in parallel, which the Tilebox Workflow Orchestrator will do, provided multiple task runners are available. In this example, the results from `FetchNews` are stored in a file. This is not the recommended method for passing data between tasks. When executing on a distributed cluster, the existence of a file written by a dependent task cannot be guaranteed. Instead, it's better to use a [shared cache](/workflows/caches). ## Task Identifiers A task identifier is a unique string used by the Tilebox Workflow Orchestrator to identify the task. It's used by [task runners](/workflows/concepts/task-runners) to map submitted tasks to a task class and execute them. It also serves as the default name in execution visualizations as a tree of tasks. If unspecified, the identifier of a task defaults to the class name. For instance, the identifier of the `PrintHeadlines` task in the previous example is `"PrintHeadlines"`. This is good for prototyping, but not recommended for production, as changing the class name also changes the identifier, which can lead to issues during refactoring. It also prevents different tasks from sharing the same class name. To address this, Tilebox Workflows offers a way to explicitly specify the identifier of a task. This is done by overriding the `identifier` method of the `Task` class. This method should return a unique string identifying the task. This decouples the task's identifier from its class name, allowing you to change the identifier without renaming the class. It also allows tasks with the same class name to have different identifiers. The `identifier` method can also specify a version number for the task—see the section on [semantic versioning](#semantic-versioning) below for more details. ```python Python class MyTask(Task): def execute(self, context: ExecutionContext) -> None: pass # MyTask has the identifier "MyTask" and the default version of "v0.0" class MyTask2(Task): @staticmethod def identifier() -> tuple[str, str]: return "tilebox.com/example_workflow/MyTask", "v1.0" def execute(self, context: ExecutionContext) -> None: pass # MyTask2 has the identifier "tilebox.com/example_workflow/MyTask" and the version "v1.0" ``` The `identifier` method must be defined as either a `classmethod` or a `staticmethod`, meaning it can be called without instantiating the class. ## Semantic Versioning As seen in the previous section, the `identifier` method can return a tuple of two strings, where the first string is the identifier and the second string is the version number. This allows for semantic versioning of tasks. Versioning is important for managing changes to a task's execution method. It allows for new features, bug fixes, and changes while ensuring existing workflows operate as expected. Additionally, it enables multiple versions of a task to coexist, enabling gradual rollout of changes without interrupting production deployments. You assign a version number by overriding the `identifier` method of the task class. It must return a tuple of two strings: the first is the [identifier](#task-identifiers) and the second is the version number, which must match the pattern `vX.Y` (where `X` and `Y` are non-negative integers). `X` is the major version number and `Y` is the minor version. For example, this task has the identifier `"tilebox.com/example_workflow/MyTask"` and the version `"v1.3"`: ```python Python class MyTask(Task): @staticmethod def identifier() -> tuple[str, str]: return "tilebox.com/example_workflow/MyTask", "v1.3" def execute(self, context: ExecutionContext) -> None: pass ``` When a task is submitted as part of a job, the version from which it's submitted is recorded and may differ from the version on the task runner executing the task. When task runners execute a task, they require a registered task with a matching identifier and compatible version number. A compatible version is where the major version number on the task runner matches that of the submitted task, and the minor version number on the task runner is equal to or greater than that of the submitted task. Examples of compatible version numbers include: * `MyTask` is submitted as part of a job. The version is `"v1.3"`. * A task runner with version `"v1.3"` of `MyTask` would executes this task. * A task runner with version `"v1.5"` of `MyTask` would also executes this task. * A task runner with version `"v1.2"` of `MyTask` would not execute this task, as its minor version is lower than that of the submitted task. * A task runner with version `"v2.5"` of `MyTask` would not execute this task, as its major version differs from that of the submitted task. ## Conclusion Tasks form the foundation of Tilebox Workflows. By understanding how to create and manage tasks, you can leverage Tilebox's capabilities to automate and optimize your workflows. Experiment with defining your own tasks, utilizing subtasks, managing dependencies, and employing semantic versioning to develop robust and efficient workflows. # Tilebox Workflows The Tilebox workflow orchestrator is a parallel processing engine. It simplifies the creation of dynamic tasks that can be executed across various computing environments, including on-premise and auto-scaling clusters in public clouds. This section provides guides showcasing how to use the Tilebox workflow orchestrator effectively. Here are some of the key learning areas: Create tasks using the Tilebox Workflow Orchestrator. Learn how to submit jobs to the workflow orchestrator, which schedules tasks for execution. Learn how to set up task runners to execute tasks in a distributed manner. Understand how to gain insights into task executions using observability features like tracing and logging. Learn to configure shared data access for all tasks of a job using caches. Trigger jobs based on events or schedules, such as new data availability or CRON schedules. ## Terminology Before exploring Tilebox Workflows in depth, familiarize yourself with some common terms used throughout this section. A Task is the smallest unit of work, designed to perform a specific operation. Each task represents a distinct operation or process that can be executed, such as processing data, performing calculations, or managing resources. Tasks can operate independently or as components of a more complex set of connected tasks known as a Workflow. Tasks are defined by their code, inputs, and dependencies on other tasks. To create tasks, you need to define the input parameters and specify the action to be performed during execution. A job is a specific execution of a workflow with designated input parameters. It consists of one or more tasks that can run in parallel or sequentially, based on their dependencies. Submitting a job involves creating a root task with specific input parameters, which may trigger the execution of other tasks within the same job. Task runners are the execution agents within the Tilebox Workflows ecosystem that execute tasks. They can be deployed in different computing environments, including on-premise servers and cloud-based auto-scaling clusters. Task runners execute tasks as scheduled by the workflow orchestrator, ensuring they have the necessary resources and environment for effective execution. Clusters are a logical grouping for task runners. Using clusters, you can scope certain tasks to a specific group of task runners. Tasks, which are always submitted to a specific cluster, are only executed on task runners assigned to the same cluster. Caches are shared storage that enable data storage and retrieval across tasks within a single job. They store intermediate results and share data among tasks, enabling distributed computing and reducing redundant data processing. Observability refers to the feature set in Tilebox Workflows that provides visibility into the execution of tasks and jobs. Tools like tracing and logging allow users to monitor performance, diagnose issues, and gain insights into job operations, enabling efficient troubleshooting and optimization. # Cron triggers Trigger jobs based on a Cron schedule. ## Creating Cron tasks Cron tasks run repeatedly on a specified [cron](https://en.wikipedia.org/wiki/Cron) schedule. To create a Cron task, use `tilebox.workflows.recurrent_tasks.CronTask` as your tasks base class instead of the regular `tilebox.workflows.Task`. ```python Python from tilebox.workflows import ExecutionContext from tilebox.workflows.recurrent_tasks import CronTask class MyCronTask(CronTask): message: str def execute(self, context: ExecutionContext) -> None: print(f"Hello {self.message} from a Cron Task!") # self.trigger is an attribute of the CronTask class, # which contains information about the trigger event # that caused this task to be submitted as part of a job print(f"This task was triggered at {self.trigger.time}") ``` ## Registering a Cron trigger After implementing a Cron task, register it to be triggered according to a Cron schedule. When the Cron expression matches, a new job is submitted consisting of a single task instance derived from the Cron task prototype. ```python Python from tilebox.workflows import Client client = Client() recurrent_tasks = client.recurrent_tasks() cron_task = recurrent_tasks.create_recurring_cron_task( "my-cron-task", # name of the recurring cron task "dev-cluster", # cluster slug to submit jobs to MyCronTask(message="World"), # the task (and its input parameters) to run repeatedly cron_triggers=[ "12 * * * *", # run every hour at minute 12 "45 18 * * *", # run every day at 18:45 "30 13 * * 3", # run every Wednesday at 13:30 ], ) ``` The syntax for specifying the cron triggers is a [CRON expression](https://en.wikipedia.org/wiki/Cron#CRON_expression). A helpful tool to test your cron expressions is [crontab.guru](https://crontab.guru/). ## Starting a Cron Task Runner With the Cron task registered, a job is submitted whenever the Cron expression matches. But unless a [task runner](/workflows/concepts/task-runners) is available to execute the Cron task the submitted jobs remain in a task queue. Once an [eligible task runner](/workflows/concepts/task-runners#task-selection) becomes available, all jobs in the queue are executed. ```python Python from tilebox.workflows import Client client = Client() runner = client.runner("dev-cluster", tasks=[MyCronTask]) runner.run_forever() ``` If this task runner runs continuously, its output may resemble the following: ```plaintext Output Hello World from a Cron Task! This task was triggered at 2023-09-25 16:12:00 Hello World from a Cron Task! This task was triggered at 2023-09-25 17:12:00 Hello World from a Cron Task! This task was triggered at 2023-09-25 18:12:00 Hello World from a Cron Task! This task was triggered at 2023-09-25 18:45:00 Hello World from a Cron Task! This task was triggered at 2023-09-25 19:12:00 ``` ## Inspecting in the Console The [Tilebox Console](https://console.tilebox.com/workflows/recurrent-tasks) provides a straightforward way to inspect all registered Cron tasks. Tilebox Workflows recurrent tasks in the Tilebox Console Tilebox Workflows recurrent tasks in the Tilebox Console Use the console to view, edit, and delete the registered Cron tasks. ## Deleting Cron triggers To delete a registered Cron task, use `recurrent_tasks.delete`. After deletion, no new jobs will be submitted by that Cron trigger. Past jobs already triggered will still remain queued. ```python Python from tilebox.workflows import Client client = Client() recurrent_tasks = client.recurrent_tasks() # delete the task as returned by create_recurring_cron_task recurrent_tasks.delete(cron_task) # or manually by id: recurrent_tasks.delete("0190bafc-b3b8-88c4-008b-a5db044380d0") ``` ## Submitting Cron jobs manually You can submit Cron tasks as regular tasks for testing purposes or as part of a larger workflow. To do so, instantiate the task with a specific trigger time using the `once` method. Submitting a job with a Cron task using `once` immediately schedules the task, and a runner may pick it up and execute it. The trigger time set in the `once` method does not influence the execution time; it only sets the `self.trigger.time` attribute for the Cron task. ```python Python from datetime import datetime, timezone job_client = client.jobs() # create a Cron task prototype task = MyCronTask(message="Hello") # submitting it directly won't work: raises ValueError: # CronTask cannot be submitted without being triggered. Use task.once(). job_client.submit("manual-cron-job", task, cluster="dev-cluster") # specify a trigger time to submit the task as a regular task triggered_task = task.once() # same as task.once(datetime.now()) job_client.submit("manual-cron-job", triggered_task, cluster="dev-cluster") # simulate a trigger at a specific time triggered_task = task.once(datetime(2030, 12, 12, 15, 15, tzinfo=timezone.utc)) # the task will be scheduled to run immediately, even with a future trigger time # but the self.trigger.time will be 2023-12-12T15:15:00Z for the task instance job_client.submit("manual-cron-job", triggered_task, cluster="dev-cluster") ``` # Recurrent Tasks Process data in near-real-time by triggering jobs based on external events ## Introduction Tilebox Workflows can execute jobs in two ways: a one-time execution triggered by a user, typically a batch processing, and near-real-time execution based on specific external events. By defining trigger conditions, you can automatically submit jobs based on external events. Tilebox Workflows currently supports the following trigger conditions: Trigger jobs based on a Cron schedule. Trigger jobs after objects are created or modified in a storage location such as a cloud bucket. Dataset Event Triggers, which will trigger jobs when new data points are ingested into a Tilebox dataset, are on the roadmap. Stay tuned for updates. ## Recurrent Tasks To create a trigger, define a special task that serves as a prototype. In response to a trigger condition met, this task will be submitted as a new job. Such tasks are referred to as recurrent tasks. Each recurrent task has a [task identifier](/workflows/concepts/tasks#task-identifiers), a [version](/workflows/concepts/tasks#semantic-versioning), and [input parameters](/workflows/concepts/tasks#input-parameters), just like regular tasks. Recurrent tasks also automatically provide a special `trigger` attribute that contains information about the event that initiated the task's execution. ## Recurrent Task Client The Tilebox Workflows client includes a sub-client for managing recurrent tasks. You can create this sub-client by calling the `recurrent_tasks` method on the main client instance. ### Listing Registered Recurrent Tasks To list all registered recurrent tasks, use the `all` method on the recurrent task client. ```python Python from tilebox.workflows import Client client = Client() recurrent_tasks = client.recurrent_tasks() recurrent_tasks = recurrent_tasks.all() print(recurrent_tasks) ``` ```plaintext Output [ RecurrentTaskPrototype( name='Run MyCronTask every hour at 15 minutes past the hour', prototype=TaskSubmission( cluster_slug='dev-cluster', identifier=TaskIdentifier(name='MyCronTask', version='v0.0'), input=b'{"message": "Hello"}, dependencies=[], display='MyCronTask', max_retries=0), storage_event_triggers=[], cron_triggers=[CronTrigger(schedule='15 * * * *')], ) ] ``` ### Registering Recurrent Tasks To register a recurrent task, use the `create_recurring_...` methods specific to each trigger type provided by the recurrent task client. Refer to the documentation for each trigger type for more details. ## Overview in the Tilebox Console You can also use the Tilebox Console to manage recurrent tasks. Visit [the recurrent tasks section](https://console.tilebox.com/workflows/recurrent-tasks) to check it out. Tilebox Workflows recurrent tasks in the Tilebox Console Tilebox Workflows recurrent tasks in the Tilebox Console You can also register new recurrent tasks or edit and delete existing ones directly from the console. Tilebox Workflows recurrent tasks in the Tilebox Console Tilebox Workflows recurrent tasks in the Tilebox Console # Storage Event Triggers Trigger jobs after objects are created or modified in a storage location ## Creating a Storage Event Task Storage Event Tasks are recurring tasks triggered when objects are created or modified in a [storage location](#storage-locations). To create a Cron task, use `tilebox.workflows.recurrent_tasks.StorageEventTask` as your tasks base class instead of the regular `tilebox.workflows.Task`. ```python Python from tilebox.workflows import ExecutionContext from tilebox.workflows.recurrent_tasks import StorageEventTask, StorageEventType from tilebox.workflows.observability.logging import get_logger logger = get_logger() class LogObjectCreation(StorageEventTask): head_bytes: int def execute(self, context: ExecutionContext) -> None: # self.trigger is an attribute of the StorageEventTask class, # which contains information about the storage event that triggered this task if self.trigger.type == StorageEventType.CREATED: path = self.trigger.location logger.info(f"A new object was created: {path}") # trigger.storage is a storage client for interacting with the storage # location that triggered the event # using it, we can read the object to get its content as a bytes object data = self.trigger.storage.read(path) logger.info(f"The object's file size is {len(data)} bytes") logger.info(f"The object's first {self.head_bytes} bytes are: {data[:self.head_bytes]}") ``` ## Storage Locations Storage Event tasks are triggered when objects are created or modified in a storage location. This location can be a cloud storage bucket or a local file system. Tilebox supports the following storage locations: ### Registering a Storage Location To make a storage location available within Tilebox workflows, it must be registered first. This involves specifying the location and setting up a notification system that forwards events to Tilebox, enabling task triggering. The setup varies depending on the storage location type. For example, a GCP storage bucket is integrated by setting up a [PubSub Notification with a push subscription](https://cloud.google.com/storage/docs/pubsub-notifications). A local file system requires installing a filesystem watcher. To set up a storage location registered with Tilebox, please [get in touch](mailto:engineering@tilebox.com). ### Listing Available Storage Locations To list all available storage locations, use the `all` method on the storage location client. ```python Python from tilebox.workflows import Client client = Client() recurrent_tasks_client = client.recurrent_tasks() storage_locations = recurrent_tasks_client.storage_locations() print(storage_locations) ``` ```plaintext Output [ StorageLocation( location="gcp-project:gcs-bucket-fab3fa2", type=StorageType.GCS, ), StorageLocation( location="s3-bucket-263af15", type=StorageType.S3, ), StorageLocation( location='/path/to/a/local/folder', type=StorageType.FS, ), ] ``` ### Reading Files from a Storage Location Once a storage location is registered, you can read files from it using the `read` method on the storage client. ```python Python gcs_bucket = storage_locations[0] s3_bucket = storage_locations[1] local_folder = storage_locations[2] gcs_object = gcs_bucket.read("my-object.txt") s3_object = s3_bucket.read("my-object.txt") local_object = local_folder.read("my-object.txt") ``` The `read` method instantiates a client for the specific storage location. This requires that the storage location is accessible by a task runner and may require credentials for cloud storage or physical/network access to a locally mounted file system. To set up authentication and enable access to a GCS storage bucket, check out the [Google Client docs for authentication](https://cloud.google.com/docs/authentication/client-libraries#python). ## Registering a Storage Event Trigger After implementing a Storage Event task, register it to trigger each time a storage event occurs. This registration submits a new job consisting of a single task instance derived from the registered Storage Event task prototype. ```python Python from tilebox.workflows import Client client = Client() recurrent_tasks = client.recurrent_tasks() storage_event_task = recurrent_tasks.create_recurring_storage_event_task( "log-object-creations", # name of the recurring storage event task "dev-cluster", # cluster slug to submit jobs to LogObjectCreation(head_bytes=20), # the task (and its input parameters) to run repeatedly triggers=[ # you can specify a glob pattern: # run every time a .txt file is created anywhere in the gcs bucket (gcs_bucket, "**.txt"), ], ) ``` The syntax for specifying glob patterns follows [Standard Wildcards](https://tldp.org/LDP/GNU-Linux-Tools-Summary/html/x11655.htm). Additionally, you can use `**` as a super-asterisk, a matching operator not sensitive to slash separators. Here are some examples of valid glob patterns: | Pattern | Matches | | ----------- | ---------------------------------------------------------------------------- | | `*.ext` | Any file ending in `.ext` in the root directory | | `**/*.ext` | Any file ending in `.ext` in any subdirectory, but not in the root directory | | `**.ext` | Any file ending in `.ext` in any subdirectory, including the root directory | | `folder/*` | Any file directly in a `folder` subdirectory | | `folder/**` | Any file directly or recursively part of a `folder` subdirectory | | `[a-z].txt` | Matches `a.txt`, `b.txt`, etc. | ## Start a Storage Event Task Runner With the Storage Event task registered, a job is submitted whenever a storage event occurs. But unless a [task runner](/workflows/concepts/task-runners) is available to execute the Storage Event task the submitted jobs remain in a task queue. Once an [eligible task runner](/workflows/concepts/task-runners#task-selection) becomes available, all jobs in the queue are executed. ```python Python from tilebox.workflows import Client client = Client() runner = client.runner("dev-cluster", tasks=[LogObjectCreation]) runner.run_forever() ``` ### Triggering an Event Creating an object in the bucket where the task is registered results in a job being submitted: ```bash Creating an object echo "Hello World" > my-object.txt gcloud storage cp my-object.txt gs://gcs-bucket-fab3fa2 ``` Inspecting the task runner output reveals that the job was submitted and the task executed: ```plaintext Output 2024-09-25 16:51:45,621 INFO A new object was created: my-object.txt 2024-09-25 16:51:45,857 INFO The object's file size is 12 bytes 2024-09-25 16:51:45,858 INFO The object's first 20 bytes are: b'Hello World\n' ``` ## Inspecting in the Console The [Tilebox Console](https://console.tilebox.com/workflows/recurrent-tasks) provides an easy way to inspect all registered storage event tasks. Tilebox Workflows recurrent tasks in the Tilebox Console Tilebox Workflows recurrent tasks in the Tilebox Console ## Deleting Storage Event triggers To delete a registered storage event task, use `recurrent_tasks.delete`. After deletion, no new jobs will be submitted by the storage event trigger. Past jobs already triggered will still remain queued. ```python Python from tilebox.workflows import Client client = Client() recurrent_tasks = client.recurrent_tasks() # delete the task as returned by create_recurring_storage_event_task recurrent_tasks.delete(storage_event_task) # or manually by id: recurrent_tasks.delete("0190bafc-b3b8-88c4-008b-a5db044380d0") ``` ## Submitting Storage Event jobs manually You can submit Storage event tasks as regular tasks for testing purposes or as part of a larger workflow. To do so, instantiate the task with a specific storage location and object name using the `once` method. ```python Python job_client = client.jobs() task = LogObjectCreation(head_bytes=20) # submitting it directly won't work; raises ValueError: # StorageEventTask cannot be submitted without being triggered. Use task.once(). job_client.submit( "manual-storage-event-job", task, cluster="dev-cluster" ) # instead, specify a trigger, # so that we can submit the task as a regular task triggered_task = task.once(gcs_bucket, "my-object.txt") job_client.submit( "manual-storage-event-job", triggered_task, cluster="dev-cluster" ) ``` # Logging Set up distributed logging using the OpenTelemetry logging protocol ## Overview Tilebox workflows are designed for distributed execution, making it essential to set up logging to a centralized system. Tilebox supports OpenTelemetry logging, which simplifies sending log messages from your tasks to a chosen backend. Collecting and visualizing logs from a distributed cluster of task runners in a tool like [Axiom](https://axiom.co/) can look like this: Tilebox Workflows logging in Axiom Tilebox Workflows logging in Axiom ## Configure logging The Tilebox workflow SDKs include support for exporting OpenTelemetry logs. To enable logging, call the appropriate configuration functions during the startup of your[task runner](/workflows/concepts/task-runners). Then, use the provided `logger` to send log messages from your tasks. To configure logging with Axiom, you first need to create a [Axiom Dataset](https://axiom.co/docs/reference/datasets) to export your workflow logs to. You will also need an [Axiom API key](https://axiom.co/docs/reference/tokens) with the necessary write permissions for your Axiom dataset. ```python Python from tilebox.workflows import Client, Task, ExecutionContext from tilebox.workflows.observability.logging import configure_otel_logging_axiom # your own workflow: from my_workflow import MyTask def main(): configure_otel_logging_axiom( # specify an Axiom dataset to export logs to dataset="my-axiom-logs-dataset", # along with an Axiom API key with ingest permissions for that dataset api_key="my-axiom-api-key", ) # the task runner will export logs from # the executed tasks to the specified dataset client = Client() runner = client.runner("dev-cluster", tasks=[MyTask]) runner.run_forever() if __name__ == "__main__": main() ``` Setting the environment variables `AXIOM_API_KEY` and `AXIOM_LOGS_DATASET` allows you to omit these arguments in the `configure_otel_logging_axiom` function. If you are using another OpenTelemetry-compatible backend besides Axiom, such as OpenTelemetry Collector or Jaeger, you can configure logging by specifying the URL endpoint to export log messages to. ```python Python from tilebox.workflows import Client from tilebox.workflows.observability.logging import configure_otel_logging # your own workflow: from my_workflow import MyTask def main(): configure_otel_logging( # specify an endpoint to export logs to, such as a # locally running instance of OpenTelemetry Collector endpoint="http://localhost:4318/v1/logs", # optional headers for each request headers={"Authorization": "Bearer some-api-key"}, ) # the task runner will export logs from # the executed tasks to the specified endpoint client = Client() runner = client.runner("dev-cluster", tasks=[MyTask]) runner.run_forever() if __name__ == "__main__": main() ``` If you set the environment variable `OTEL_LOGS_ENDPOINT`, you can omit that argument in the `configure_otel_logging` function. To log messages to the standard console output, use the `configure_console_logging` function. ```python Python from tilebox.workflows import Client from tilebox.workflows.observability.logging import configure_console_logging # your own workflow: from my_workflow import MyTask def main(): configure_console_logging() # the task runner will print log messages from # the executed tasks to the console client = Client() runner = client.runner("dev-cluster", tasks=[MyTask]) runner.run_forever() if __name__ == "__main__": main() ``` The console logging backend is not recommended for production use. Log messages will be emitted to the standard output of each task runner rather than a centralized logging system. It is intended for local development and testing of workflows. ## Emitting log messages Use the logger provided by the Tilebox SDK to emit log messages from your tasks. You can then use it to send log messages to the [configured logging backend](#configure-logging). Log messages emitted within a task's `execute` method are also automatically recorded as span events for the current [job trace](/workflows/observability/tracing). ```python Python import logging from tilebox.workflows import Task, ExecutionContext from tilebox.workflows.observability.logging import get_logger logger = get_logger() class MyTask(Task): def execute(self, context: ExecutionContext) -> None: # emit a log message to the configured OpenTelemetry backend logger.info("Hello world from configured logger!") ``` ## Logging task runner internals Tilebox task runners also internally use a logger. By default, it's set to the WARNING level, but you can change it by explicitly configuring a logger for the workflows client when constructing the task runner. ```python Python from tilebox.workflows import Client from tilebox.workflows.observability.logging import configure_otel_logging_axiom from tilebox.workflows.observability.logging import get_logger # configure Axiom or another logging backend configure_otel_logging_axiom( dataset="my-axiom-logs-dataset", api_key="my-axiom-api-key", ) # configure a logger for the Tilebox client at the INFO level client = Client() client.configure_logger(get_logger(level=logging.INFO)) # now the task runner inherits this logger and uses # it to emit its own internal log messages as well runner = client.runner("dev-cluster", tasks=[MyTask]) runner.run_forever() ``` # OpenTelemetry Integration Integrate OpenTelemetry into your Tilebox Workflows ## Observability Effective observability is essential for building reliable workflows. Understanding and monitoring the execution of workflows and their tasks helps ensure correctness and efficiency. This section describes methods to gain insights into your workflow's execution. ## OpenTelemetry Tilebox Workflows is designed with [OpenTelemetry](https://opentelemetry.io/) in mind, which provides a set of APIs and libraries for instrumenting, generating, collecting, and exporting telemetry data (metrics, logs, and traces) in distributed systems. Tilebox Workflows currently supports OpenTelemetry for tracing and logging, with plans to include metrics in the future. ## Integrations Tilebox exports telemetry data using the [OpenTelemetry Protocol](https://opentelemetry.io/docs/specs/otlp/). Tilebox is pre-integrated with Axiom Axiom, a cloud-based observability and telemetry platform, supports this protocol. Tilebox Workflows has built-in support for Axiom, and the examples and screenshots in this section come from this integration. Additionally, any other OpenTelemetry-compatible backend, such as OpenTelemetry Collector or Jaeger, can be used to collect telemetry data generated by Tilebox Workflows. # Tracing Record the execution of your workflow tasks as OpenTelemetry traces and spans ## Overview Applying [OpenTelemetry traces](https://opentelemetry.io/docs/concepts/signals/traces/) to the concept of workflows allows you to monitor the execution of your jobs and their individual tasks. Visualizing the trace for a job in a tool like [Axiom](https://axiom.co/) may look like this: Tilebox Workflows tracing in Axiom Tilebox Workflows tracing in Axiom Tracing your workflows enables you to easily observe: * The order of task execution * Which tasks run in parallel * The [task runner](/workflows/concepts/task-runners) handling each task * The duration of each task * The outcome of each task (success or failure) This information helps identify bottlenecks and performance issues, ensuring that your workflows execute correctly. ## Configure tracing The Tilebox workflow SDKs have built-in support for exporting OpenTelemetry traces. To enable tracing, call the appropriate configuration functions during the startup of your [task runner](/workflows/concepts/task-runners). To configure tracing with Axiom, you first need to create a [Axiom Dataset](https://axiom.co/docs/reference/datasets) to export your workflow traces to. You will also need an [Axiom API key](https://axiom.co/docs/reference/tokens) with the necessary write permissions for your Axiom dataset. ```python Python from tilebox.workflows import Client from tilebox.workflows.observability.tracing import configure_otel_tracing_axiom # your own workflow: from my_workflow import MyTask def main(): configure_otel_tracing_axiom( # specify an Axiom dataset to export traces to dataset="my-axiom-traces-dataset", # along with an Axiom API key for ingest permissions api_key="my-axiom-api-key", ) # the following task runner generates traces for executed tasks and # exports trace and span data to the specified Axiom dataset client = Client() runner = client.runner("dev-cluster", tasks=[MyTask]) runner.run_forever() if __name__ == "__main__": main() ``` Set the environment variables `AXIOM_API_KEY` and `AXIOM_TRACES_DATASET` to omit those arguments in the `configure_otel_tracing_axiom` function. If you are using another OpenTelemetry-compatible backend besides Axiom, like OpenTelemetry Collector or Jaeger, you can configure tracing by specifying the URL endpoint to export traces to. ```python Python from tilebox.workflows import Client from tilebox.workflows.observability.tracing import configure_otel_tracing # your own workflow: from my_workflow import MyTask def main(): configure_otel_tracing( # specify an endpoint for trace ingestion, such as a # locally running instance of OpenTelemetry Collector endpoint="http://localhost:4318/v1/traces", # optional headers for each request headers={"Authorization": "Bearer some-api-key"}, ) # the following task runner generates traces for executed tasks and # exports trace and span data to the specified endpoint client = Client() runner = client.runner("dev-cluster", tasks=[MyTask]) runner.run_forever() if __name__ == "__main__": main() ``` Set the environment variable `OTEL_TRACES_ENDPOINT` to omit that argument in the `configure_otel_tracing` function. Once the runner picks up tasks and executes them, corresponding traces and spans are automatically generated and exported to the configured backend.