# AI Assistance
Source: https://docs.tilebox.com/ai-assistance
Large Language Models (LLMs) are powerful tools for exploring and learning about Tilebox. This section explains how to provide them with Tilebox-specific context for tailored, relevant and up-to-date responses.
## Providing complete Tilebox context
AI assistants and Large Language Models (LLMs) can answer questions, guide you in using Tilebox, suggest relevant sections in the documentation, and assist you in creating workflows.
Download the complete Tilebox documentation as a plain text file to share with your AI assistant or language model.
The full content of the Tilebox documentation is available in plain markdown format at [https://docs.tilebox.com/llms-full.txt](https://docs.tilebox.com/llms-full.txt). Upload this document to your AI assistant or LLM to provide it with Tilebox-specific context.
The [Documentation Context](https://docs.tilebox.com/llms-full.txt) is updated whenever the documentation changes. If you download the file, refresh it occasionally to stay up-to-date.
## Providing tailored context via MCP
Tilebox Docs can be installed as an MCP tool, so an MCP client can ask for detailed context on specific topics.
Run the following command to generate an MCP server for Tilebox Docs.
```bash
npx mint-mcp add tilebox
```
The command line tool will guide you along the installation process for Cursor, Windsurf, Claude Code, Augment Code or other MCP clients.
The MCP server always retrieves the most up to date version of the documentation.
# As
Source: https://docs.tilebox.com/api-reference/go/datasets/As
```go
func As[T proto.Message](seq iter.Seq2[[]byte, error]) iter.Seq2[T, error]
```
Convert a sequence of bytes into a sequence of `proto.Message`.
Useful to convert the output of [`Datapoints.Query`](/api-reference/go/datasets/Datapoints.Query) into a sequence of `proto.Message`.
## Parameters
The sequence of bytes to convert
## Returns
A sequence of `proto.Message` or an error if any.
```go Go
import (
"time"
datasets "github.com/tilebox/tilebox-go/datasets/v1"
"github.com/tilebox/tilebox-go/query"
)
startDate := time.Date(2014, 10, 4, 0, 0, 0, 0, time.UTC)
endDate := time.Date(2021, 2, 24, 0, 0, 0, 0, time.UTC)
queryInterval := query.NewTimeInterval(startDate, endDate)
seq := datasets.As[*v1.Sentinel1Sar](
client.Datapoints.Query(ctx, collectionID, datasets.WithTemporalExtent(queryInterval)),
)
```
# Collect
Source: https://docs.tilebox.com/api-reference/go/datasets/Collect
```go
func Collect[K any](seq iter.Seq2[K, error]) ([]K, error)
```
Convert any sequence into a slice.
It return an error if any of the elements in the sequence has a non-nil error.
## Parameters
The sequence of bytes to convert
## Returns
A slice of `K` or an error if any.
```go Go
import (
"time"
datasets "github.com/tilebox/tilebox-go/datasets/v1"
"github.com/tilebox/tilebox-go/query"
)
startDate := time.Date(2014, 10, 4, 0, 0, 0, 0, time.UTC)
endDate := time.Date(2021, 2, 24, 0, 0, 0, 0, time.UTC)
queryInterval := query.NewTimeInterval(startDate, endDate)
datapoints, err := datasets.Collect(datasets.As[*v1.Sentinel1Sar](
client.Datapoints.Query(ctx, collectionID, datasets.WithTemporalExtent(queryInterval)),
))
```
# CollectAs
Source: https://docs.tilebox.com/api-reference/go/datasets/CollectAs
```go
func CollectAs[T proto.Message](seq iter.Seq2[[]byte, error]) ([]T, error)
```
Convert a sequence of bytes into a slice of `proto.Message`.
Useful to convert the output of [`Datapoints.Query`](/api-reference/go/datasets/Datapoints.Query) into a slice of `proto.Message`.
This a convenience function for `Collect(As[T](seq))`.
## Parameters
The sequence of bytes to convert
## Returns
A slice of `proto.Message` or an error if any.
```go Go
import (
"time"
datasets "github.com/tilebox/tilebox-go/datasets/v1"
"github.com/tilebox/tilebox-go/query"
)
startDate := time.Date(2014, 10, 4, 0, 0, 0, 0, time.UTC)
endDate := time.Date(2021, 2, 24, 0, 0, 0, 0, time.UTC)
queryInterval := query.NewTimeInterval(startDate, endDate)
datapoints, err := datasets.CollectAs[*v1.Sentinel1Sar](
client.Datapoints.Query(ctx, collectionID, datasets.WithTemporalExtent(queryInterval)),
)
```
# Client.Collections.Create
Source: https://docs.tilebox.com/api-reference/go/datasets/Collections.Create
```go
func (collectionClient) Create(
ctx context.Context,
datasetID uuid.UUID,
collectionName string,
) (*datasets.Collection, error)
```
Create a collection in the dataset.
## Parameters
The id of the dataset
The name of the collection
## Returns
The created collection object.
```go Go
collection, err := client.Collections.Create(ctx,
datasetID,
"My-collection",
)
```
# Client.Collections.Delete
Source: https://docs.tilebox.com/api-reference/go/datasets/Collections.Delete
```go
func (collectionClient) Delete(
ctx context.Context,
datasetID uuid.UUID,
collectionID uuid.UUID,
) error
```
Delete a collection by its id.
## Parameters
The id of the dataset
The id of the collection
## Returns
An error if the collection could not be deleted.
```go Go
err := client.Collections.Delete(ctx,
datasetID,
collectionID,
)
```
## Errors
The specified dataset does not exist.
The specified collection does not exist.
# Client.Collections.Get
Source: https://docs.tilebox.com/api-reference/go/datasets/Collections.Get
```go
func (collectionClient) Get(
ctx context.Context,
datasetID uuid.UUID,
name string,
) (*datasets.Collection, error)
```
Get a dataset by its slug.
## Parameters
The id of the dataset
The name of the collection
## Returns
The created collection object.
```go Go
collection, err := client.Collections.Get(ctx,
datasetID,
"My-collection",
)
```
## Errors
The specified dataset does not exist.
# Client.Collections.GetOrCreate
Source: https://docs.tilebox.com/api-reference/go/datasets/Collections.GetOrCreate
```go
func (collectionClient) GetOrCreate(
ctx context.Context,
datasetID uuid.UUID,
name string,
) (*datasets.Collection, error)
```
Get or create a collection by its name. If the collection does not exist, it will be created.
## Parameters
The id of the dataset
The name of the collection
## Returns
A collection object.
```go Go
collection, err := client.Collections.GetOrCreate(ctx,
datasetID,
"My-collection",
)
```
# Client.Collections.List
Source: https://docs.tilebox.com/api-reference/go/datasets/Collections.List
```go
func (collectionClient) List(
ctx context.Context,
datasetID uuid.UUID,
) ([]*datasets.Collection, error)
```
List the available collections in a dataset.
## Parameters
The id of the dataset
## Returns
A list of collection objects.
```go Go
collections, err := client.Collections.List(ctx,
datasetID,
)
```
## Errors
The specified dataset does not exist.
# Client.Datapoints.Delete
Source: https://docs.tilebox.com/api-reference/go/datasets/Datapoints.Delete
```go
func (datapointClient) Delete(
ctx context.Context,
collectionID uuid.UUID,
datapoints any,
) (int64, error)
```
Delete data points from a collection.
Data points are identified and deleted by their ids.
## Parameters
The id of the collection
The datapoints to delete from the collection
## Returns
The number of data points that were deleted.
```go Go
var datapoints []*v1.Sentinel1Sar
// assuming the slice is filled with datapoints
numDeleted, err := client.Datapoints.Delete(ctx,
collectionID,
datapoints,
)
```
# Client.Datapoints.DeleteIDs
Source: https://docs.tilebox.com/api-reference/go/datasets/Datapoints.DeleteIDs
```go
func (datapointClient) DeleteIDs(
ctx context.Context,
collectionID uuid.UUID,
datapointIDs []uuid.UUID,
) (int64, error)
```
Delete data points from a collection.
## Parameters
The id of the collection
The ids of the data points to delete from the collection
## Returns
The number of data points that were deleted.
```go Go
numDeleted, err := client.Datapoints.DeleteIDs(ctx,
collectionID,
[]uuid.UUID{
uuid.MustParse("0195c87a-49f6-5ffa-e3cb-92215d057ea6"),
uuid.MustParse("0195c87b-bd0e-3998-05cf-af6538f34957"),
},
)
```
# Client.Datapoints.GetInto
Source: https://docs.tilebox.com/api-reference/go/datasets/Datapoints.GetInto
```go
func (datapointClient) GetInto(
ctx context.Context,
collectionIDs []uuid.UUID,
datapointID uuid.UUID,
datapoint proto.Message,
options ...QueryOption,
) error
```
Get a data point by its id from one of the specified collections.
The data point is stored in the `datapoint` parameter.
## Parameters
The ids of the collections to query
The id of the datapoint to query
The datapoint to query into
Options for querying data points.
## Options
Skip the data when querying datapoint.
If set, only the required and auto-generated fields will be returned.
## Returns
An error if data point could not be queried.
```go Go
var datapoint v1.Sentinel1Sar
err = client.Datapoints.GetInto(ctx,
[]uuid.UUID{collection.ID}, datapointID, &datapoint,
)
```
# Client.Datapoints.Ingest
Source: https://docs.tilebox.com/api-reference/go/datasets/Datapoints.Ingest
```go
func (datapointClient) Ingest(
ctx context.Context,
collectionID uuid.UUID,
datapoints any,
allowExisting bool,
) (*datasets.IngestResponse, error)
```
Ingest data points into a collection.
## Parameters
The id of the collection
The datapoints to ingest
Datapoint fields are used to generate a deterministic unique `UUID` for each
datapoint in a collection. Duplicate data points result in the same ID being generated.
If `allowExisting` is `true`, `ingest` will skip those datapoints, since they already exist.
If `allowExisting` is `false`, `ingest` will raise an error if any of the generated datapoint IDs already exist.
## Returns
The list of datapoint ids that were ingested, including the IDs of existing data points in case of duplicates and
`allowExisting=true`.
```go Go
datapoints := []*v1.Modis{
v1.Modis_builder{
Time: timestamppb.New(time.Now()),
GranuleName: proto.String("Granule 1"),
}.Build(),
v1.Modis_builder{
Time: timestamppb.New(time.Now().Add(-5 * time.Hour)),
GranuleName: proto.String("Past Granule 2"),
}.Build(),
}
ingestResponse, err := client.Datapoints.Ingest(ctx,
collectionID,
&datapoints
false,
)
```
## Errors
If `allowExisting` is `False` and any of the datapoints attempting to ingest already exist.
# Client.Datapoints.Query
Source: https://docs.tilebox.com/api-reference/go/datasets/Datapoints.Query
```go
func (datapointClient) Query(
ctx context.Context,
collectionIDs []uuid.UUID,
options ...datasets.QueryOption,
) iter.Seq2[[]byte, error]
```
Query a range of data points in the specified collections in a specified interval.
The datapoints are lazily queried and returned as a sequence of bytes.
The output sequence can be transformed into a typed `proto.Message` using [CollectAs](/api-reference/go/datasets/CollectAs) or [As](/api-reference/go/datasets/As) functions.
## Parameters
The ids of the collections to query
Options for querying data points
## Options
Specify the time interval for which data should be queried.
Right now, a temporal extent is required for every query.
Specify the geographical extent in which to query data.
Optional, if not specified the query will return all results found globally.
Skip the data when querying datapoints.
If set, only the required and auto-generated fields will be returned.
## Returns
A sequence of bytes containing the requested data points as bytes.
```go Go
import (
"time"
datasets "github.com/tilebox/tilebox-go/datasets/v1"
"github.com/tilebox/tilebox-go/query"
)
startDate := time.Date(2014, 10, 4, 0, 0, 0, 0, time.UTC)
endDate := time.Date(2021, 2, 24, 0, 0, 0, 0, time.UTC)
queryInterval := query.NewTimeInterval(startDate, endDate)
datapoints, err := datasets.CollectAs[*v1.Sentinel1Sar](
client.Datapoints.Query(ctx, []uuid.UUID{collection.ID}, datasets.WithTemporalExtent(queryInterval)),
)
```
# Client.Datapoints.QueryInto
Source: https://docs.tilebox.com/api-reference/go/datasets/Datapoints.QueryInto
```go
func (datapointClient) QueryInto(
ctx context.Context,
collectionIDs []uuid.UUID,
datapoints any,
options ...datasets.QueryOption,
) error
```
Query a range of data points in the specified collections in a specified interval.
QueryInto is a convenience function for [Query](/api-reference/go/datasets/Datapoints.Query), when no manual pagination or custom iteration is required.
## Parameters
The ids of the collections to query
The datapoints to query into
Options for querying data points
## Options
Specify the time interval for which data should be queried.
Right now, a temporal extent is required for every query.
Specify the geographical extent in which to query data.
Optional, if not specified the query will return all results found globally.
Skip the data when querying datapoints.
If set, only the required and auto-generated fields will be returned.
## Returns
An error if data points could not be queried.
```go Go
import (
"time"
datasets "github.com/tilebox/tilebox-go/datasets/v1"
"github.com/tilebox/tilebox-go/query"
)
startDate := time.Date(2014, 10, 4, 0, 0, 0, 0, time.UTC)
endDate := time.Date(2021, 2, 24, 0, 0, 0, 0, time.UTC)
queryInterval := query.NewTimeInterval(startDate, endDate)
var datapoints []*v1.Sentinel1Sar
err := client.Datapoints.QueryInto(ctx,
[]uuid.UUID{collection.ID},
&datapoints,
datasets.WithTemporalExtent(queryInterval),
)
```
# Client.Datasets.Get
Source: https://docs.tilebox.com/api-reference/go/datasets/Get
```go
func (datasetClient) Get(
ctx context.Context,
slug string,
) (*datasets.Dataset, error)
```
Get a dataset by its slug.
## Parameters
The slug of the dataset
## Returns
A dataset object.
```go Go
s1_sar, err := client.Datasets.Get(ctx,
"open_data.copernicus.sentinel1_sar"
)
```
## Errors
The specified dataset does not exist.
# Client.Datasets.List
Source: https://docs.tilebox.com/api-reference/go/datasets/List
```go
func (datasetClient) List(ctx context.Context) ([]*datasets.Dataset, error)
```
Fetch all available datasets.
## Returns
A list of all available datasets.
```go Go
datasets, err := client.Datasets.List(ctx)
```
# Client.Clusters.Create
Source: https://docs.tilebox.com/api-reference/go/workflows/Clusters.Create
```go
func (*ClusterClient) Create(
ctx context.Context,
name string,
) (*workflows.Cluster, error)
```
Create a cluster.
## Parameters
A display name for the cluster
## Returns
The created cluster object.
```go Go
cluster, err := client.Clusters.Create(ctx,
"My cluster"
)
```
# Client.Clusters.Delete
Source: https://docs.tilebox.com/api-reference/go/workflows/Clusters.Delete
```go
func (*ClusterClient) Delete(ctx context.Context, slug string) error
```
Delete a cluster by its slug.
## Parameters
The slug of the cluster to delete
## Returns
An error if the cluster could not be deleted.
```go Go
err := client.Clusters.Delete(ctx,
"my-cluster-tZD9Ca1qsqt3V"
)
```
## Errors
The specified cluster does not exist.
# Client.Clusters.Get
Source: https://docs.tilebox.com/api-reference/go/workflows/Clusters.Get
```go
func (*ClusterClient) Get(
ctx context.Context,
slug string,
) (*workflows.Cluster, error)
```
Get a cluster by its slug.
## Parameters
The slug of the cluster
## Returns
A cluster object.
```go Go
cluster, err := client.Clusters.Get(ctx,
"my-cluster-tZD9Ca1qsqt3V"
)
```
## Errors
The specified cluster does not exist.
# Client.Clusters.List
Source: https://docs.tilebox.com/api-reference/go/workflows/Clusters.List
```go
func (*ClusterClient) List(ctx context.Context) ([]*workflows.Cluster, error)
```
Fetch all available clusters.
## Returns
A list of all available clusters.
```go Go
clusters, err := client.Clusters.List(ctx)
```
# Collect
Source: https://docs.tilebox.com/api-reference/go/workflows/Collect
```go
func Collect[K any](seq iter.Seq2[K, error]) ([]K, error)
```
Convert any sequence into a slice.
It return an error if any of the elements in the sequence has a non-nil error.
## Parameters
The sequence of bytes to convert
## Returns
A slice of `K` or an error if any.
```go Go
jobs, err := workflows.Collect(
client.Jobs.List(ctx, interval),
)
```
# workflows.GetCurrentCluster
Source: https://docs.tilebox.com/api-reference/go/workflows/GetCurrentCluster
```go
workflows.GetCurrentCluster(ctx context.Context) (string, error)
```
Get the current cluster slug.
This function is intended to be used in tasks.
## Returns
The current cluster slug.
```go Go
type Task struct{}
func (t *Task) Execute(ctx context.Context) error {
clusterSlug, err := workflows.GetCurrentCluster(ctx)
if err != nil {
return fmt.Errorf("failed to get current cluster: %w", err)
}
return nil
}
```
# Client.Jobs.Cancel
Source: https://docs.tilebox.com/api-reference/go/workflows/Jobs.Cancel
```go
func (*JobClient) Cancel(ctx context.Context, jobID uuid.UUID) error
```
Cancel a job. When a job is canceled, no queued tasks will be picked up by task runners and executed even if task runners are idle. Tasks that are already being executed will finish their execution and not be interrupted. All sub-tasks spawned from such tasks after the cancellation will not be picked up by task runners.
## Parameters
The id of the job
## Returns
An error if the job could not be cancelled.
```go Go
err := client.Jobs.Cancel(ctx,
uuid.MustParse("0195c87a-49f6-5ffa-e3cb-92215d057ea6"),
)
```
# Client.Jobs.Get
Source: https://docs.tilebox.com/api-reference/go/workflows/Jobs.Get
```go
func (*JobClient) Get(
ctx context.Context,
jobID uuid.UUID,
) (*workflows.Job, error)
```
Get a job by its id.
## Parameters
The id of the job
## Returns
A job object.
```go Go
job, err := client.Jobs.Get(ctx,
uuid.MustParse("0195c87a-49f6-5ffa-e3cb-92215d057ea6"),
)
```
## Errors
The specified job does not exist.
# Client.Jobs.Query
Source: https://docs.tilebox.com/api-reference/go/workflows/Jobs.Query
```go
func (*JobClient) Query(
ctx context.Context,
options ...job.QueryOption,
) iter.Seq2[*workflows.Job, error]
```
Query jobs in the specified interval.
The jobs are lazily loaded and returned as a sequence of Jobs.
The jobs are returned sorted by creation time in reverse order.
The output sequence can be transformed into a slice of Job using [Collect](/api-reference/go/workflows/Collect) function.
## Parameters
Options for querying jobs
## Options
Specify the time interval for which data should be queried.
Right now, a temporal extent is required for every query.
Specify the automation id for which data should be queried.
Only jobs that were created by the specified automation will be returned.
## Returns
A sequence of jobs.
```go Go
import (
"time"
workflows "github.com/tilebox/tilebox-go/workflows/v1"
"github.com/tilebox/tilebox-go/workflows/v1/job"
"github.com/tilebox/tilebox-go/query"
)
interval := query.NewTimeInterval(
time.Now().Add(-24 * time.Hour),
time.Now(),
)
jobs, err := workflows.Collect(
client.Jobs.Query(ctx,
job.WithTemporalExtent(interval),
),
)
```
# Client.Jobs.Retry
Source: https://docs.tilebox.com/api-reference/go/workflows/Jobs.Retry
```go
func (*JobClient) Retry(
ctx context.Context,
jobID uuid.UUID,
) (int64, error)
```
Retry a job. All failed tasks will become queued again, and queued tasks will be picked up by task runners again.
## Parameters
The id of the job to retry
## Returns
The number of tasks that were rescheduled.
```go Go
nbRescheduled, err := client.Jobs.Retry(ctx,
uuid.MustParse("0195c87a-49f6-5ffa-e3cb-92215d057ea6"),
)
```
## Errors
The specified job does not exist.
# Client.Jobs.Submit
Source: https://docs.tilebox.com/api-reference/go/workflows/Jobs.Submit
```go
func (*JobClient) Submit(
ctx context.Context,
jobName string,
tasks []workflows.Task,
options ...job.SubmitOption
) (*workflows.Job, error)
```
Submit a job.
## Parameters
The name of the job
The root task for the job. This task is executed first and can submit subtasks to manage the entire workflow. A job can have optionally consist of multiple root tasks.
Options for the job
## Options
Set the maximum number of [retries](/workflows/concepts/tasks#retry-handling) for the subtask in case it fails
The [cluster](/workflows/concepts/clusters#managing-clusters) to run the root task on. If not provided, the default cluster is used.
## Returns
A job object.
```go Go
job, err := client.Jobs.Submit(ctx,
"My job",
[]workflows.Task{rootTask},
)
```
# Client.NewTaskRunner
Source: https://docs.tilebox.com/api-reference/go/workflows/NewTaskRunner
```go
func (*Client) NewTaskRunner(
ctx context.Context,
options ...runner.Option,
) (*workflows.TaskRunner, error)
```
Initialize a task runner.
## Parameters
Options for initializing the task runner
## Options
The [cluster](/workflows/concepts/clusters#managing-clusters) to connect to. If not provided, the default cluster is used.
Set the logger to use for the task runner
Disable OpenTelemetry metrics for the task runner
## Returns
The created task runner object.
```go Go
runner, err := client.NewTaskRunner(ctx)
```
# workflows.SubmitSubtask
Source: https://docs.tilebox.com/api-reference/go/workflows/SubmitSubtask
```go
workflows.SubmitSubtask(
ctx context.Context,
task workflows.Task,
options ...subtask.SubmitOption,
) (subtask.FutureTask, error)
```
Submit a subtask to the task runner.
This function is intended to be used in tasks.
## Parameters
A subtask to submit
Options for the subtask
## Options
Set dependencies for the task
Set the cluster slug of the cluster where the task will be executed.
Set the maximum number of [retries](/workflows/concepts/tasks#retry-handling) for the subtask in case it fails
## Returns
A future task that can be used to set dependencies between tasks.
```go Go
type MySubTask struct {
Sensor string
Value float64
}
type Task struct{}
func (t *Task) Execute(ctx context.Context) error {
err := workflows.SubmitSubtask(ctx,
&MySubTask{
Sensor: "A",
Value: 42,
},
)
if err != nil {
return fmt.Errorf("failed to submit subtasks: %w", err)
}
return nil
}
```
# workflows.SubmitSubtasks
Source: https://docs.tilebox.com/api-reference/go/workflows/SubmitSubtasks
```go
workflows.SubmitSubtasks(
ctx context.Context,
tasks []workflows.Task,
options ...subtask.SubmitOption,
) ([]subtask.FutureTask, error)
```
Submit multiple subtasks to the task runner. Same as [SubmitSubtask](/api-reference/go/workflows/SubmitSubtask), but accepts a list of tasks.
This function is intended to be used in tasks.
## Parameters
A list of tasks to submit
Options for the subtasks
## Options
Set dependencies for the tasks
Set the cluster slug of the cluster where the tasks will be executed.
Set the maximum number of [retries](/workflows/concepts/tasks#retry-handling) for the subtasks in case it fails
## Returns
A list of future tasks that can be used to set dependencies between tasks.
```go Go
type MySubTask struct {
Sensor string
Value float64
}
type Task struct{}
func (t *Task) Execute(ctx context.Context) error {
err := workflows.SubmitSubtasks(ctx,
[]workflows.Task{
&MySubTask{
Sensor: "A",
Value: 42,
},
&MySubTask{
Sensor: "B",
Value: 42,
}
},
)
if err != nil {
return fmt.Errorf("failed to submit subtasks: %w", err)
}
return nil
}
```
# Task
Source: https://docs.tilebox.com/api-reference/go/workflows/Task
```go
type Task interface{}
```
Base interface for Tilebox workflows [tasks](/workflows/concepts/tasks).
It doesn't need to be identifiable or executable, but it can be both (see below).
## Methods
```go
Task.Execute(ctx context.Context) error
```
The entry point for the execution of the task.
If not defined, the task can't be registered with a task runner but can still be submitted.
```go
Task.Identifier() TaskIdentifier
```
Provides a user-defined task identifier.
The identifier is used to uniquely identify the task and specify its version.
If not defined, the task runner will generate an identifier for it using reflection.
## JSON-serializable task
```go
type SampleTask struct {
Message string
Depth int
BranchFactor int
}
```
Optional task [input parameters](/workflows/concepts/tasks#input-parameters), defined as struct fields.
Supported types are all types supported by [json.Marshal](https://pkg.go.dev/encoding/json#Marshal).
## Protobuf-serializable task
```go
type SampleTask struct {
examplesv1.SpawnWorkflowTreeTask
}
```
Task can also be defined as a protobuf message.
An example using task protobuf messages can be found [here](https://github.com/tilebox/tilebox-go/tree/main/examples/sampleworkflow).
```go Go
package helloworld
import (
"context"
"fmt"
"github.com/tilebox/tilebox-go/workflows/v1"
)
type MyFirstTask struct{}
func (t *MyFirstTask) Execute(ctx context.Context) error {
fmt.Println("Hello World!")
return nil
}
func (t *MyFirstTask) Identifier() workflows.TaskIdentifier {
return workflows.NewTaskIdentifier("tilebox.workflows.MyTask", "v3.2")
}
type MyFirstParameterizedTask struct {
Name string
Greet bool
Data map[string]string
}
func (t *MyFirstParameterizedTask) Execute(ctx context.Context) error {
if t.Greet {
fmt.Printf("Hello %s!\n", t.Name)
}
return nil
}
```
# TaskRunner.GetRegisteredTask
Source: https://docs.tilebox.com/api-reference/go/workflows/TaskRunner.GetRegisteredTask
```go
func (*TaskRunner) GetRegisteredTask(
identifier workflows.TaskIdentifier,
) (workflows.ExecutableTask, bool)
```
Get the task with the given identifier.
## Parameters
A display name for the cluster
## Returns
The registered task. Returns `false` if not found.
```go Go
identifier := workflows.NewTaskIdentifier("my-task", "v1.0")
task, found := runner.GetRegisteredTask(
identifier,
)
```
# TaskRunner.RegisterTasks
Source: https://docs.tilebox.com/api-reference/go/workflows/TaskRunner.RegisterTasks
```go
func (*TaskRunner) RegisterTasks(tasks ...workflows.ExecutableTask) error
```
Register tasks that can be executed by this task runner.
## Parameters
A list of task classes that this runner can execute
## Returns
An error if the tasks could not be registered.
```go Go
err := runner.RegisterTasks(
&MyTask{},
&MyOtherTask{},
)
```
# TaskRunner.Run
Source: https://docs.tilebox.com/api-reference/go/workflows/TaskRunner.Run
```go
func (*TaskRunner) Run(ctx context.Context)
```
Run the task runner forever, looking for new tasks to run and polling for new tasks when idle.
```go Go
runner.Run(ctx)
```
# workflows.WithTaskSpan
Source: https://docs.tilebox.com/api-reference/go/workflows/WithTaskSpan
```go
workflows.WithTaskSpan(
ctx context.Context,
name string,
f func(ctx context.Context) error,
) error
```
Wrap a function with a [tracing span](/workflows/observability/tracing).
## Parameters
The name of the span
The function to wrap
## Returns
An error if any.
```go Go
type Task struct{}
func (t *Task) Execute(ctx context.Context) error {
err := workflows.WithTaskSpan(ctx, "Database insert", func(ctx context.Context) error {
// Do something
return nil
})
if err != nil {
return fmt.Errorf("failed to insert into database: %w", err)
}
return nil
}
```
# workflows.WithTaskSpanResult
Source: https://docs.tilebox.com/api-reference/go/workflows/WithTaskSpanResult
```go
workflows.WithTaskSpanResult[Result any](
ctx context.Context,
name string,
f func(ctx context.Context) (Result, error),
) (Result, error)
```
Wrap a function with a [tracing span](/workflows/observability/tracing).
## Parameters
The name of the span
The function to wrap
## Returns
The result of the function and an error if any.
```go Go
type Task struct{}
func (t *Task) Execute(ctx context.Context) error {
result, err := workflows.WithTaskSpanResult(ctx, "Expensive Compute", func(ctx context.Context) (int, error) {
return 6 * 7, nil
})
if err != nil {
return fmt.Errorf("failed to compute: %w", err)
}
return nil
}
```
# Client
Source: https://docs.tilebox.com/api-reference/python/tilebox.datasets/Client
```python
class Client(url: str, token: str)
```
Create a Tilebox datasets client.
## Parameters
Tilebox API Url. Defaults to `https://api.tilebox.com`.
The API Key to authenticate with. If not set the `TILEBOX_API_KEY` environment variable will be used.
```python Python
from tilebox.datasets import Client
# read token from an environment variable
client = Client()
# or provide connection details directly
client = Client(
url="https://api.tilebox.com",
token="YOUR_TILEBOX_API_KEY"
)
```
# Client.dataset
Source: https://docs.tilebox.com/api-reference/python/tilebox.datasets/Client.dataset
```python
def Client.dataset(slug: str) -> Dataset
```
Get a dataset by its slug.
## Parameters
The slug of the dataset
## Returns
A dataset object.
```python Python
s1_sar = client.dataset(
"open_data.copernicus.sentinel1_sar"
)
```
## Errors
The specified dataset does not exist.
# Client.datasets
Source: https://docs.tilebox.com/api-reference/python/tilebox.datasets/Client.datasets
```python
def Client.datasets() -> Datasets
```
Fetch all available datasets.
## Returns
An object containing all available datasets, accessible via dot notation.
```python Python
datasets = client.datasets()
s1_sar = datasets.open_data.copernicus.sentinel1_sar
```
# Collection.delete
Source: https://docs.tilebox.com/api-reference/python/tilebox.datasets/Collection.delete
```python
def Collection.delete(datapoints: DatapointIDs) -> int
```
Delete data points from the collection.
Data points are identified and deleted by their ids.
You need to have write permission on the collection to be able to delete data points.
## Parameters
Datapoint IDs to delete from the collection.
Supported `DatapointIDs` types are.
* A `pandas.DataFrame` containing an `id` column.
* A `pandas.Series` containing datapoint IDs.
* An `xarray.Dataset` containing an "id" variable.
* An `xarray.DataArray` containing datapoint IDs.
* A `numpy.ndarray` containing datapoint IDs.
* A `Collection[UUID]` containing datapoint IDs as python built-in `UUID` objects, e.g. `list[UUID]`.
* A `Collection[str]` containing datapoint IDs as strings, e.g. `list[str]`.
## Returns
The number of data points that were deleted.
```python Python
collection.delete([
"0195c87a-49f6-5ffa-e3cb-92215d057ea6",
"0195c87b-bd0e-3998-05cf-af6538f34957",
])
```
## Errors
One of the data points is not found in the collection. If any of the data points are not found,
nothing will be deleted.
One of the specified ids is not a valid UUID
# Collection.find
Source: https://docs.tilebox.com/api-reference/python/tilebox.datasets/Collection.find
```python
def Collection.find(
datapoint_id: str,
skip_data: bool = False
) -> xarray.Dataset
```
Find a specific datapoint in a collection by its id.
## Parameters
The id of the datapoint to find.
Whether to skip loading the data for the datapoint. If `True`, only the metadata for the datapoint is loaded.
## Returns
An [`xarray.Dataset`](/sdks/python/xarray) containing the requested data point.
Since it returns only a single data point, the output xarray dataset does not include a `time` dimension.
## Errors
The specified datapoint does not exist in this collection.
```python Python
data = collection.find(
"0186d6b6-66cc-fcfd-91df-bbbff72499c3",
skip_data = False,
)
```
# Collection.info
Source: https://docs.tilebox.com/api-reference/python/tilebox.datasets/Collection.info
```python
def Collection.info() -> CollectionInfo
```
Fetch metadata about the data points in this collection.
## Returns
A collection info object.
```python Python
info = collection.info()
```
# Collection.ingest
Source: https://docs.tilebox.com/api-reference/python/tilebox.datasets/Collection.ingest
```python
def Collection.ingest(
data: IngestionData,
allow_existing: bool = True
) -> list[UUID]
```
Ingest data into a collection.
You need to have write permission on the collection to be able to delete data points.
## Parameters
The data to ingest.
Supported `IngestionData` data types are:
* A `pandas.DataFrame`, mapping the column names to dataset fields.
* An `xarray.Dataset`, mapping variables and coordinates to dataset fields.
* An `Iterable`, `dict` or `nd-array`: ingest any object that can be converted to a `pandas.DataFrame` using
it's constructor, equivalent to `ingest(pd.DataFrame(data))`.
Datapoint fields are used to generate a deterministic unique `UUID` for each
datapoint in a collection. Duplicate data points result in the same ID being generated.
If `allow_existing` is `True`, `ingest` will skip those data points, since they already exist.
If `allow_existing` is `False`, `ingest` will raise an error if any of the generated datapoint IDs already exist.
Defaults to `True`.
## Returns
List of datapoint ids that were ingested, including the IDs of existing data points in case of duplicates and
`allow_existing=True`.
```python Python
import pandas as pd
collection.ingest(pd.DataFrame({
"time": [
"2023-05-01T12:00:00Z",
"2023-05-02T12:00:00Z",
],
"value": [1, 2],
"sensor": ["A", "B"],
}))
```
## Errors
If `allow_existing` is `False` and any of the datapoints attempting to ingest already exist.
# Collection.load
Source: https://docs.tilebox.com/api-reference/python/tilebox.datasets/Collection.load
```python
def Collection.load(
time_or_interval: TimeIntervalLike,
skip_data: bool = False,
show_progress: bool = False
) -> xarray.Dataset
```
Load a range of data points in this collection in a specified interval.
If no data exists for the requested time or interval, an empty `xarray.Dataset` is returned.
## Parameters
The time or time interval for which to load data. This can be a single time scalar, a tuple of two time scalars, or an array of time scalars.
Valid time scalars are: `datetime.datetime` objects, strings in ISO 8601 format, or Unix timestamps in seconds.
Behavior for each input type:
* **TimeScalar**: If a single time scalar is provided, `load` returns all data points for that exact millisecond.
* **TimeInterval**: If a time interval is provided, `load` returns all data points in that interval. Intervals can be a tuple of two `TimeScalars` or a `TimeInterval` object. Tuples are interpreted as a half-open interval `[start, end)`. With a `TimeInterval` object, the `start_exclusive` and `end_inclusive` parameters control whether the start and end time are inclusive or exclusive.
* **Iterable\[TimeScalar]**: If an array of time scalars is provided, `load` constructs a time interval from the first and last time scalar in the array. Here, both the `start` and `end` times are inclusive.
If `True`, the response contains only the [required fields for the dataset type](/datasets/types/timeseries) without the actual dataset-specific fields. Defaults to `False`.
If `True`, a progress bar is displayed when pagination is required. Defaults to `False`.
## Returns
An [`xarray.Dataset`](/sdks/python/xarray) containing the requested data points.
```python Python
from datetime import datetime
from tilebox.clients.core.data import TimeInterval
# loading a specific time
time = "2023-05-01 12:45:33.423"
data = collection.load(time)
# loading a time interval
interval = ("2023-05-01", "2023-08-01")
data = collection.load(interval, show_progress=True)
# loading a time interval with TimeInterval
interval = TimeInterval(
start=datetime(2023, 5, 1),
end=datetime(2023, 8, 1),
start_exclusive=False,
end_inclusive=False,
)
data = collection.load(interval, show_progress=True)
# loading with an iterable
meta_data = collection.load(..., skip_data=True)
first_50 = collection.load(meta_data.time[:50], skip_data=False)
```
# Dataset.collection
Source: https://docs.tilebox.com/api-reference/python/tilebox.datasets/Dataset.collection
```python
def Dataset.collection(name: str) -> Collection
```
Access a specific collection by its name.
## Parameters
The name of the collection
## Returns
A collection object.
```python Python
collection = dataset.collection("My-collection")
```
## Errors
The specified collection does not exist in the dataset.
# Dataset.collections
Source: https://docs.tilebox.com/api-reference/python/tilebox.datasets/Dataset.collections
```python
def Dataset.collections() -> dict[str, Collection]
```
List the available collections in a dataset.
## Returns
A dictionary mapping collection names to collection objects.
```python Python
collections = dataset.collections()
```
# Dataset.create_collection
Source: https://docs.tilebox.com/api-reference/python/tilebox.datasets/Dataset.create_collection
```python
def Dataset.create_collection(name: str) -> Collection
```
Create a collection in the dataset.
## Parameters
The name of the collection
## Returns
The created collection object.
```python Python
collection = dataset.create_collection("My-collection")
```
# Dataset.delete_collection
Source: https://docs.tilebox.com/api-reference/python/tilebox.datasets/Dataset.delete_collection
```python
def Dataset.delete_collection(collection: str | UUID | CollectionClient) -> None
```
Delete a collection in the dataset.
## Parameters
The collection to delete. Can be specified by name, id, or as a collection object.
```python Python
dataset.delete_collection("My-collection")
```
# Dataset.get_or_create_collection
Source: https://docs.tilebox.com/api-reference/python/tilebox.datasets/Dataset.get_or_create_collection
```python
def Dataset.get_or_create_collection(name: str) -> Collection
```
Get a collection by its name. If the collection does not exist, it will be created.
## Parameters
The name of the collection
## Returns
A collection object.
```python Python
collection = dataset.get_or_create_collection("My-collection")
```
# Client
Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/Client
```python
class Client(url: str, token: str)
```
Create a Tilebox workflows client.
## Parameters
Tilebox API Url. Defaults to `https://api.tilebox.com`.
The API Key to authenticate with. If not set the `TILEBOX_API_KEY` environment variable will be used.
## Sub clients
The workflows client exposes sub clients for interacting with different parts of the Tilebox workflows API.
```python
def Client.jobs() -> JobClient
```
A client for interacting with jobs.
```python
def Client.clusters() -> ClusterClient
```
A client for managing clusters.
```python
def Client.automations() -> AutomationClient
```
A client for scheduling automations.
## Task runners
```python
def Client.runner(...) -> TaskRunner
```
A client is also used to instantiate task runners. Check out the [`Client.runner` API reference](/api-reference/python/tilebox.workflows/Client.runner) for more information.
```python Python
from tilebox.workflows import Client
# read token from an environment variable
client = Client()
# or provide connection details directly
client = Client(
url="https://api.tilebox.com",
token="YOUR_TILEBOX_API_KEY"
)
# access sub clients
job_client = client.jobs()
cluster_client = client.clusters()
automation_client = client.automations()
# or instantiate a task runner
runner = client.runner(tasks=[...])
```
# Client.runner
Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/Client.runner
```python
def Client.runner(
cluster: ClusterSlugLike | None = None,
tasks: list[type[Task]],
cache: JobCache | None = None
) -> TaskRunner
```
Initialize a task runner.
## Parameters
The [cluster slug](/workflows/concepts/clusters#managing-clusters) for the cluster associated with this task runner.
If not provided, the default cluster is used.
A list of task classes that this runner can execute.
An optional [job cache](/workflows/caches) for caching results from tasks and sharing data between tasks.
```python Python
from tilebox.workflows import Client
from tilebox.workflows.cache import LocalFileSystemCache
client = Client()
runner = client.runner(
tasks=[MyFirstTask, MySubtask],
# optional:
cache=LocalFileSystemCache("cache_directory"),
)
```
# ClusterClient.all
Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/ClusterClient.all
```python
def ClusterClient.all(): list[Cluster]
```
List all available clusters.
## Returns
A list of all available clusters.
```python Python
clusters = cluster_client.all()
```
# ClusterClient.create
Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/ClusterClient.create
```python
def ClusterClient.create(name: str): Cluster
```
Create a cluster.
## Parameters
A display name for the cluster
## Returns
The created cluster object.
```python Python
cluster = cluster_client.create("My cluster")
```
# ClusterClient.delete
Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/ClusterClient.delete
```python
def ClusterClient.delete(cluster_or_slug: Cluster | str)
```
Delete a cluster.
## Parameters
The cluster or cluster slug to delete.
```python Python
cluster_client.delete(cluster)
```
# ClusterClient.find
Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/ClusterClient.find
```python
def ClusterClient.find(cluster_or_slug: Cluster | str): Cluster
```
Get a cluster by its slug.
## Parameters
The cluster or cluster slug to get.
## Returns
A cluster object.
```python Python
cluster = cluster_client.find("my-cluster-CvufcSxcC9SKfe")
```
# Context.job_cache
Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/ExecutionContext.job_cache
```python
ExecutionContext.job_cache: JobCache
```
Access the job cache for a task.
```python Python
# within a Tasks execute method, access the job cache
# def execute(self, context: ExecutionContext):
context.job_cache["some-key"] = b"my-value"
```
# Context.submit_subtask
Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/ExecutionContext.submit_subtask
```python
def ExecutionContext.submit_subtask(
task: Task,
depends_on: list[FutureTask] = None,
cluster: str | None = None,
max_retries: int = 0
) -> FutureTask
```
Submit a [subtask](/workflows/concepts/tasks#subtasks-and-task-composition) from a currently executing task.
## Parameters
The task to submit as a subtask.
An optional list of tasks already submitted within the same context that this subtask depends on.
An optional [cluster slug](/workflows/concepts/clusters#managing-clusters) for running the subtask. If not provided, the subtask runs on the same cluster as the parent task.
Specify the maximum number of [retries](/workflows/concepts/tasks#retry-handling) for the subtask in case of failure. The default is 0.
## Returns
A `FutureTask` object that represents the submitted subtask. Can be used to set up dependencies between tasks.
```python Python
# within the execute method of a Task:
subtask = context.submit_subtask(MySubtask())
dependent_subtask = context.submit_subtask(
MyOtherSubtask(), depends_on=[subtask]
)
gpu_task = context.submit_subtask(
MyGPUTask(),
cluster="gpu-cluster-slug"
)
flaky_task = context.submit_subtask(
MyFlakyTask(),
max_retries=5
)
```
# JobCache.__iter__
Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/JobCache.__iter__
```python
def JobCache.__iter__() -> Iterator[str]
```
List all available keys in a job cache group. Only keys that are direct children of the current group are returned.
For nested groups, first access the group and then iterate over its keys.
```python Python
# within a Tasks execute method, access the job cache
# def execute(context: ExecutionContext)
cache = context.job_cache
# iterate over all keys
for key in cache:
print(cache[key])
# or collect as list
keys = list(cache)
# also works with nested groups
for key in cache.group("some-group"):
print(cache[key])
```
# JobCache.group
Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/JobCache.group
```python
def JobCache.group(name: str) -> JobCache
```
You can nest caches in a hierarchical manner using [groups](/workflows/caches#groups-and-hierarchical-keys).
Groups are separated by a forward slash (/) in the key. This hierarchical structure functions similarly to a file system.
## Parameters
The name of the cache group.
```python Python
# within a Tasks execute method, access the job cache
# def execute(context: ExecutionContext)
cache = context.job_cache
# use groups to nest related cache values
group = cache.group("some-group")
group["value"] = b"my-value"
nested = group.group("nested")
nested["value"] = b"nested-value"
# access nested groups directly via / notation
value = cache["some-group/nested/value"]
```
# JobClient.cancel
Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/JobClient.cancel
```python
def JobClient.cancel(job_or_id: Job | str)
```
Cancel a job. When a job is canceled, no queued tasks will be picked up by task runners and executed even if task runners are idle. Tasks that are already being executed will finish their execution and not be interrupted. All sub-tasks spawned from such tasks after the cancellation will not be picked up by task runners.
## Parameters
The job or job id to cancel.
```python Python
job_client.cancel(job)
```
# JobClient.find
Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/JobClient.find
```python
def JobClient.find(job_or_id: Job | str) -> Job
```
Get a job by its id. Can also be an existing job object, in which case this method acts as a refresh operation to fetch the latest job details.
## Parameters
The job or job id to get.
## Returns
A job object.
```python Python
job = job_client.find("0195c87a-49f6-5ffa-e3cb-92215d057ea6")
```
# JobClient.query
Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/JobClient.query
```python
def JobClient.query(
temporal_extent: TimeIntervalLike | IDIntervalLike,
automation_id: UUID | None = None,
) -> list[Job]
```
Query jobs in the specified interval.
## Parameters
The temporal extent to filter jobs by. If an `IDInterval` is given, jobs are filtered by their job id instead of their creation time.
It can be specified in the following ways:
* TimeInterval: interval -> Use the time interval as its given
* DatetimeScalar: \[time, time] -> Construct a TimeInterval with start and end time set to the given
value and the end time inclusive
* tuple of two DatetimeScalar: \[start, end) -> Construct a TimeInterval with the given start and
end time
* `IDInterval`: interval -> Use the ID interval as its given
* tuple of two UUIDs: \[start, end) -> Construct an `IDInterval` with the given start and end id
* tuple of two strings: \[start, end) -> Construct an `IDInterval` with the given start and end id
parsed from the strings
The automation id to filter jobs by. If not provided, jobs from all automations are returned.
## Returns
A list of jobs.
```python Python
jobs = job_client.query(("2025-01-01", "2025-02-01"))
```
# JobClient.retry
Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/JobClient.retry
```python
def JobClient.retry(job_or_id: Job | str) -> int
```
Retry a job. All failed tasks will become queued again, and queued tasks will be picked up by task runners again.
## Parameters
The job or job id to retry.
## Returns
The number of tasks that were rescheduled.
```python Python
nb_rescheduled = job_client.retry(job)
```
# JobClient.submit
Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/JobClient.submit
```python
def JobClient.submit(
job_name: str,
root_task_or_tasks: Task | Iterable[Task],
cluster: str | Cluster | Iterable[str | Cluster] | None = None,
max_retries: int = 0
) -> Job
```
Submit a job.
## Parameters
The name of the job.
The root task for the job. This task is executed first and can submit subtasks to manage the entire workflow. A job can have optionally consist of multiple root tasks.
The [cluster slug](/workflows/concepts/clusters#managing-clusters) for the cluster to run the root task on. In case of multiple root tasks, a list of cluster slugs can be provided.
If not provided, the default cluster is used.
The maximum number of [retries](/workflows/concepts/tasks#retry-handling) for the subtask in case it fails. Defaults to 0.
```python Python
from my_workflow import MyTask
job = job_client.submit(
"my-job",
MyTask(
message="Hello, World!",
value=42,
data={"key": "value"}
),
)
```
# JobClient.visualize
Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/JobClient.visualize
```python
def JobClient.visualize(
job_or_id: Job | str,
direction: str = "down",
layout: str = "dagre",
sketchy: bool = True
)
```
Create a visualization of a job as a diagram.
If you are working in an interactive environment, such as Jupyter notebooks, you can use the `display` method to display the visualized job diagram directly in the notebook.
## Parameters
The job to visualize.
The direction for the diagram to flow. For more details, see the relevant section in the [D2 documentation](https://d2lang.com/tour/layouts/#direction). Valid values are `up`, `down`, `right`, and `left`. The default value is `down`.
The layout engine for the diagram. For further information, refer to the [D2 layout engines](https://d2lang.com/tour/layouts). Valid values are `dagre` and `elk`, with the default being `dagre`.
Indicates whether to use a sketchy, hand-drawn style for the diagram. The default is `True`.
```python Python
svg = job_client.visualize(job)
Path("diagram.svg").write_text(svg)
# in a notebook
job_client.display(job)
```
# Task
Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/Task
```python
class Task:
def execute(context: ExecutionContext) -> None
@staticmethod
def identifier() -> tuple[str, str]
```
Base class for Tilebox workflows [tasks](/workflows/concepts/tasks). Inherit from this class to create a task.
Inheriting also automatically applies the dataclass decorator.
## Methods
```python
def Task.execute(context: ExecutionContext) -> None
```
The entry point for the execution of the task.
```python
@staticmethod
def Task.identifier() -> tuple[str, str]
```
Override a task identifier and specify its version. If not overridden, the identifier of a task defaults to the class name, and the version to `v0.0`.
## Task Input Parameters
```python
class MyTask(Task):
message: str
value: int
data: dict[str, int]
```
Optional task [input parameters](/workflows/concepts/tasks#input-parameters), defined as class attributes. Supported types
are `str`, `int`, `float`, `bool`, as well as `lists` and `dicts` thereof.
```python Python
from tilebox.workflows import Task, ExecutionContext
class MyFirstTask(Task):
def execute(self, context: ExecutionContext):
print(f"Hello World!")
@staticmethod
def identifier() -> tuple[str, str]:
return ("tilebox.workflows.MyTask", "v3.2")
class MyFirstParameterizedTask(Task):
name: str
greet: bool
data: dict[str, str]
def execute(self, context: ExecutionContext):
if self.greet:
print(f"Hello {self.name}!")
```
# TaskRunner.run_all
Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/TaskRunner.run_all
```python
def TaskRunner.run_all()
```
Run the task runner and execute all tasks, until there are no more tasks available.
```python Python
# run all tasks until no more tasks are available
runner.run_all()
```
# TaskRunner.run_forever
Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/TaskRunner.run_forever
```python
def TaskRunner.run_forever()
```
Run the task runner forever. This will poll for new tasks and execute them as they come in.
If no tasks are available, it will sleep for a short time and then try again.
```python Python
# run forever, until the current process is stopped
runner.run_forever()
```
# Authentication
Source: https://docs.tilebox.com/authentication
To access the Tilebox API, you must authenticate your requests. This guide explains how authentication works, focusing on API keys used as bearer tokens.
## Creating an API key
To create an API key, log into the [Tilebox Console](https://console.tilebox.com). Navigate to [Account -> API Keys](https://console.tilebox.com/account/api-keys) and click the "Create API Key" button.
Keep your API keys secure. Deactivate any keys if you suspect they have been compromised.
## Bearer token authentication
The Tilebox API uses bearer tokens for authentication. You need to pass your API key as the `token` parameter when creating an instance of the client.
```python Python
from tilebox.datasets import Client as DatasetsClient
from tilebox.workflows import Client as WorkflowsClient
datasets_client = DatasetsClient(token="YOUR_TILEBOX_API_KEY")
workflows_client = WorkflowsClient(token="YOUR_TILEBOX_API_KEY")
```
```go Go
import (
"github.com/tilebox/tilebox-go/datasets/v1"
"github.com/tilebox/tilebox-go/workflows/v1"
)
datasetsClient := datasets.NewClient(datasets.WithAPIKey("YOUR_TILEBOX_API_KEY"))
workflowsClient := workflows.NewClient(workflows.WithAPIKey("YOUR_TILEBOX_API_KEY"))
```
If you set your API key as an environment variable named `TILEBOX_API_KEY`, you can skip the token parameter when creating a client.
# Product Updates
Source: https://docs.tilebox.com/changelog
New features, updates and improvements
## Go Client

Excited to announce the release of the Tilebox Go client!
**Features**
* Datasets client
* Statically typed dataset types
* CLI to generate dataset types
* Workflows client
* Go task runners
To get started, check out the [Go SDK documentation](/sdks/go/install).
## Spatio-Temporal datasets

Spatio-temporal datasets are officially out, fully supported in all languages and available as a category to create in custom datasets!
The core problems that spatio-temporal datasets solve are
* finding relevant data quickly (e.g. all Sentinel 2 granules along the US coastline, last year),
* storing auxiliary geographically coded data (e.g. weather station data, ground truth data),
* cataloging higher level data and results
[Here's a short video](https://share.descript.com/view/khO3QJslhgU) on performance and core capabilities.
We're excited about this as cataloging has until now been an unsexy but hard problem, and it's great to finally have a solution out there
**More information**
* [Spatio Temporal datasets documentation](https://docs.tilebox.com/datasets/types/spatiotemporal)
* All open data now supports spatio-temporal queries
* [Create your own spatio-temporal datasets](/guides/datasets/create)
* [Ingesting spatio-temporal data](/datasets/ingest)
## Custom Datasets

Create your own custom datasets!
* statically typed
* with clients in Python and Go
Use it to organize anything from telemetry, raw payload metadata, auxiliary sensor data, configuration data, or
internal data catalogs.
**Quickstart**
1. Specify the data type in the Console
2. Create a collection
3. Use client.ingest() to ingest a `xarray.Dataset` or `pandas.DataFrame`
4. Query away!
For detailed instructions, check out the [Creating a dataset](/guides/datasets/create) how-to guide.
# Console
Source: https://docs.tilebox.com/console
Explore your datasets and workflows with the Tilebox Console
The [Tilebox Console](https://console.tilebox.com) is a web interface that enables you to explore your datasets and workflows, manage your account and API keys, add collaborators to your team and monitor your usage.
## Datasets
The datasets explorer lets you explore available datasets for your team. You can select a dataset, view its collections, and load data for a collection within a specified time range.
When you click a specific event time in the data point list view, a detailed view of that data point will appear.
### Export as Code
After selecting a dataset, collection, and time range, you can export the current selection as a Python code snippet. This will copy a code snippet like the one below to your clipboard.
```python Python
from tilebox.datasets import Client
client = Client()
datasets = client.datasets()
sentinel2_msi = datasets.open_data.copernicus.sentinel2_msi
data = sentinel2_msi.collection("S2A_S2MSI1C").query(
temporal_extent=("2024-07-12", "2024-07-26"),
show_progress=True,
)
```
```go Go
ctx := context.Background()
client := datasets.NewClient()
dataset, err := client.Datasets.Get(ctx, "open_data.copernicus.sentinel2_msi")
if err != nil {
log.Fatalf("Failed to get dataset: %v", err)
}
collection, err := client.Collections.Get(ctx, dataset.ID, "S2A_S2MSI1C")
if err != nil {
log.Fatalf("Failed to get collection: %v", err)
}
startDate := time.Date(2024, 7, 12, 0, 0, 0, 0, time.UTC)
endDate := time.Date(2024, 7, 26, 0, 0, 0, 0, time.UTC)
timeInterval := query.NewTimeInterval(startDate, endDate)
var datapoints []*v1.Sentinel2Msi
err = client.Datapoints.QueryInto(ctx,
[]uuid.UUID{collection.ID}, &datapoints,
datasets.WithTemporalExtent(timeInterval),
)
if err != nil {
log.Fatalf("Failed to query datapoints: %v", err)
}
```
Paste the snippet into a [notebook](/sdks/python/sample-notebooks) to interactively explore the
[`xarray.Dataset`](/sdks/python/xarray) that is returned.
## Workflows
The workflows section of the console allows you to view jobs, create clusters and create near real-time automations.
## Account
### API Keys
The API Keys page enables you to manage your API keys. You can create new API keys, revoke existing ones, and view currently active API keys.
### Usage
The Usage page allows you to view your current usage of the Tilebox API.
# Collections
Source: https://docs.tilebox.com/datasets/concepts/collections
Learn about dataset collections
Collections group data points within a dataset. They help represent logical groupings of data points that are commonly queried together.
For example, if your dataset includes data from a specific instrument on different satellites, you can group the data points from each satellite into a collection.
Refer to the examples below for common use cases when working with collections.
These examples assume that you have already created a client and selected a dataset as shown below.
```python Python
from tilebox.datasets import Client
client = Client()
datasets = client.datasets()
dataset = datasets.open_data.copernicus.landsat8_oli_tirs
```
```go Go
package main
import (
"context"
"log"
"github.com/tilebox/tilebox-go/datasets/v1"
)
func main() {
ctx := context.Background()
client := datasets.NewClient()
dataset, err := client.Datasets.Get(ctx, "open_data.copernicus.landsat8_oli_tirs")
if err != nil {
log.Fatalf("Failed to get dataset: %v", err)
}
}
```
## Listing collections
To list the collections for a dataset, use the `collections` method on the dataset object.
```python Python
collections = dataset.collections()
print(collections)
```
```go Go
collections, err := client.Collections.List(ctx, dataset.ID)
if err != nil {
log.Fatalf("Failed to list collections: %v", err)
}
log.Println(collections)
```
```plaintext Output
{'L1GT': Collection L1GT: [2013-03-25T12:08:43.699 UTC, 2024-08-19T12:57:32.456 UTC] (154288 data points),
'L1T': Collection L1T: [2013-03-26T09:33:19.763 UTC, 2020-08-24T03:21:50.000 UTC] (87958 data points),
'L1TP': Collection L1TP: [2013-03-24T00:25:55.457 UTC, 2024-08-19T12:58:20.229 UTC] (322041 data points),
'L2SP': Collection L2SP: [2015-01-01T07:53:35.391 UTC, 2024-08-12T12:52:03.243 UTC] (191110 data points)}
```
[dataset.collections](/api-reference/python/tilebox.datasets/Dataset.collections) returns a dictionary mapping collection names to their corresponding collection objects. Each collection has a unique name within its dataset.
## Creating collections
To create a collection in a dataset, use [dataset.create\_collection()](/api-reference/python/tilebox.datasets/Dataset.create_collection). This method returns the created collection object.
```python Python
collection = dataset.create_collection("My-collection")
```
```go Go
collection, err := client.Collections.Create(ctx, dataset.ID, "My-collection")
```
You can use [dataset.get\_or\_create\_collection()](/api-reference/python/tilebox.datasets/Dataset.get_or_create_collection) to get a collection by its name. If the collection does not exist, it will be created.
```python Python
collection = dataset.get_or_create_collection("My-collection")
```
```go Go
collection, err := client.Collections.GetOrCreate(ctx, dataset.ID, "My-collection")
```
## Accessing individual collections
Once you have listed the collections for a dataset using [dataset.collections()](/api-reference/python/tilebox.datasets/Dataset.collections), you can access a specific collection by retrieving it from the resulting dictionary with its name. Use [collection.info()](/api-reference/python/tilebox.datasets/Collection.info) in Python or `String()` in Go to get details (name, availability, and count) about it.
```python Python
collections = dataset.collections()
terrain_correction = collections["L1GT"]
collection_info = terrain_correction.info()
print(collection_info)
```
```go Go
dataset, err := client.Datasets.Get(ctx, "open_data.copernicus.landsat8_oli_tirs")
if err != nil {
log.Fatalf("Failed to get dataset: %v", err)
}
collection, err := client.Collections.Get(ctx, dataset.ID, "L1GT")
if err != nil {
log.Fatalf("Failed to get collection: %v", err)
}
log.Println(collection.String())
```
```plaintext Output
L1GT: [2013-03-25T12:08:43.699 UTC, 2024-08-19T12:57:32.456 UTC] (154288 data points)
```
You can also access a specific collection directly using the [dataset.collection](/api-reference/python/tilebox.datasets/Dataset.collection) method on the dataset object. This method allows you to get the collection without having to list all collections first.
```python Python
terrain_correction = dataset.collection("L1GT")
collection_info = terrain_correction.info()
print(collection_info)
```
```go Go
dataset, err := client.Datasets.Get(ctx, "open_data.copernicus.landsat8_oli_tirs")
if err != nil {
log.Fatalf("Failed to get dataset: %v", err)
}
collection, err := client.Collections.Get(ctx, dataset.ID, "L1GT")
if err != nil {
log.Fatalf("Failed to get collection: %v", err)
}
log.Println(collection.String())
```
```plaintext Output
L1GT: [2013-03-25T12:08:43.699 UTC, 2024-08-19T12:57:32.456 UTC] (154288 data points)
```
## Deleting collections
Collections can be deleted from a dataset using the `delete_collection` method.
To delete a collection, you need to have write permission on the dataset.
Deleting a collection will delete all data points in the collection.
```python Python
dataset.delete_collection(collection)
```
```go Go
err := client.Collections.Delete(ctx, dataset.ID, collection.ID)
```
## Errors you may encounter
### NotFoundError
If you attempt to access a collection with a non-existent name, a `NotFoundError` is raised. For example:
```python Python
dataset.collection("Sat-X").info() # raises NotFoundError: 'No such collection Sat-X'
```
```go Go
collection, err := client.Collections.Get(ctx, dataset.ID, "Sat-X")
if err != nil {
log.Fatal(err) // prints 'failed to get collections: not_found: no such collection'
}
```
## Next steps
Learn how to query data from a collection.
# Datasets
Source: https://docs.tilebox.com/datasets/concepts/datasets
Tilebox Datasets act as containers for data points. All data points in a dataset share the same type and fields.
You can create your own, Custom Datasets via the [Tilebox Console](/console).
## Related Guides
Learn how to create a Timeseries dataset using the Tilebox Console.
Learn how to ingest an existing CSV dataset into a Timeseries dataset collection.
## Dataset types
Each dataset is of a specific type. Each dataset type comes with a set of required fields for each data point.
The dataset type also determines the query capabilities for a dataset, for example, whether a dataset supports time-based queries
or additionally also spatially filtered queries.
To find out which fields are required for each dataset type check out the documentation for the available dataset types
below.
Each data point is linked to a specific point in time. Common for satellite telemetry, or other time-based data.
Supports efficient time-based queries.
Each data point is linked to a specific point in time and a location on the Earth's surface. Common for satellite
imagery. Supports efficient time-based and spatially filtered queries.
## Dataset specific fields
Additionally, each dataset has a set of fields that are specific to that dataset. Fields are defined during dataset
creation. That way, all data points in a dataset are strongly typed and are validated during ingestion.
The required fields of the dataset type, as well as the custom fields specific to each dataset together make up the
**dataset schema**.
Once a **dataset schema** is defined, existing fields cannot be removed or edited as soon as data has been ingested into it.
You can always add new fields to a dataset, since all fields are always optional.
The only exception to this rule are empty datasets. If you empty all collections in a dataset, you can freely
edit the data schema, since no conflicts with existing data points can occur.
## Field types
When defining the data schema, you can specify each field's type. The following field types are supported.
### Primitives
| Type | Description | Example value |
| ----------------------------------------------------------------------------------------- | ------------------------------------------- | ------------- |
| string | A string of characters of arbitrary length. | `Some string` |
| int64 | A 64-bit signed integer. | `123` |
| uint64 | A 64-bit unsigned integer. | `123` |
| float64 | A 64-bit floating-point number. | `123.45` |
| bool | A boolean. | `true` |
| bytes | A sequence of arbitrary length bytes. | `0xAF1E28D4` |
### Time
| Type | Description | Example value |
| ------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------------------- |
| Duration | A signed, fixed-length span of time represented as a count of seconds and fractions of seconds at nanosecond resolution. See [Duration](https://protobuf.dev/reference/protobuf/google.protobuf/#duration) for more information. | `12s 345ms` |
| Timestamp | A point in time, represented as seconds and fractions of seconds at nanosecond resolution in UTC Epoch time. See [Timestamp](https://protobuf.dev/reference/protobuf/google.protobuf/#timestamp) for more information. | `2023-05-17T14:30:00Z` |
### Identifier
| Type | Description | Example value |
| -------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------ | -------------------------------------- |
| UUID | A [universally unique identifier (UUID)](https://en.wikipedia.org/wiki/Universally_unique_identifier). | `126a2531-c98d-4e06-815a-34bc5b1228cc` |
### Geospatial
| Type | Description | Example value |
| ------------------------------------------------------------------------------------------ | ------------------------------------------------------------------------- | --------------------------------------- |
| Geometry | Geospatial geometries of type Point, LineString, Polygon or MultiPolygon. | `POLYGON ((12.3 -5.4, 12.5 -5.4, ...))` |
### Arrays
Every type is also available as an array, allowing to ingest multiple values of the underlying type for each data point. The size of the array is flexible, and can be different for each data point.
## Creating a dataset
You can create a dataset in Tilebox using the [Tilebox Console](/console). Check out the [Creating a dataset](/guides/datasets/create) guide for an example of how to achieve this.
## Listing datasets
You can use [your client instance](/datasets/introduction#creating-a-datasets-client) to access the datasets available to you. To list all available datasets, use the `datasets` method of the client.
```python Python
from tilebox.datasets import Client
client = Client()
datasets = client.datasets()
print(datasets)
```
```go Go
package main
import (
"context"
"log"
"github.com/tilebox/tilebox-go/datasets/v1"
)
func main() {
ctx := context.Background()
client := datasets.NewClient()
allDatasets, err := client.Datasets.List(ctx)
if err != nil {
log.Fatalf("Failed to list datasets: %v", err)
}
for _, dataset := range allDatasets {
log.Println(dataset)
}
}
```
```plaintext Output
open_data:
asf:
ers_sar: European Remote Sensing Satellite (ERS) Synthetic Aperture Radar (SAR) Granules
copernicus:
landsat8_oli_tirs: Landsat-8 is part of the long-running Landsat programme ...
sentinel1_sar: The Sentinel-1 mission is the European Radar Observatory for the ...
sentinel2_msi: Sentinel-2 is equipped with an optical instrument payload that samples ...
sentinel3_olci: OLCI (Ocean and Land Colour Instrument) is an optical instrument used to ...
...
```
Once you have your dataset object, you can use it to [list the available collections](/datasets/concepts/collections) for the dataset.
In python, if you're using an IDE or an interactive environment with auto-complete, you can use it on your client instance to discover the datasets available to you. Type `client.` and trigger auto-complete after the dot to do so.
## Accessing a dataset
Each dataset has an automatically generated *slug* that can be used to access it. The *slug* is the name of the group, followed by a dot, followed by the dataset *code name*.
For example, the *slug* for the Sentinel-2 MSI dataset, which is part of the `open_data.copernicus` group, is `open_data.copernicus.sentinel2_msi`.
To access a dataset, use the `dataset` method of your client instance and pass the *slug* of the dataset as an argument.
```python Python
from tilebox.datasets import Client
client = Client()
s2_msi_dataset = client.dataset("open_data.copernicus.sentinel2_msi")
```
```go Go
s2MsiDataset, err := client.Datasets.Get(ctx, "open_data.copernicus.sentinel2_msi")
```
Once you have your dataset object, you can use it to [access available collections](/datasets/concepts/collections) for the dataset.
## Deleting a dataset
Datasets can be deleted through the [Tilebox Console](/console) by clicking the `Delete` button in the dataset page.
Empty datasets will be deleted right away. A dataset is considered empty if none of its collections contain any data points.
A non-empty dataset can also be deleted, but is a destructive operation.
Every data point in every collection of the dataset will be deleted.
As a safety measure, Tilebox soft-deletes the dataset for 7 days before permanently deleting it.
Please [get in touch](mailto:support@tilebox.com) if you deleted a dataset by accident and want to restore it.
# Deleting Data
Source: https://docs.tilebox.com/datasets/delete
Learn how to delete data points from Tilebox datasets.
You need to have write permission on the collection to be able to delete datapoints.
Check out the examples below for common scenarios of deleting data from a collection.
## Deleting data by datapoint IDs
To delete data from a collection, use the [delete](/api-reference/python/tilebox.datasets/Collection.delete) method. This method accepts a list of datapoint IDs to delete.
```python Python
from tilebox.datasets import Client
client = Client()
datasets = client.datasets()
collections = datasets.my_custom_dataset.collections()
collection = collections["Sensor-1"]
n_deleted = collection.delete([
"0195c87a-49f6-5ffa-e3cb-92215d057ea6",
"0195c87b-bd0e-3998-05cf-af6538f34957",
])
print(f"Deleted {n_deleted} data points.")
```
```go Go
package main
import (
"context"
"log"
"log/slog"
"github.com/google/uuid"
"github.com/tilebox/tilebox-go/datasets/v1"
)
func main() {
ctx := context.Background()
client := datasets.NewClient()
dataset, err := client.Datasets.Get(ctx, "my_custom_dataset")
if err != nil {
log.Fatalf("Failed to get dataset: %v", err)
}
collection, err := client.Collections.Get(ctx, dataset.ID, "Sensor-1")
if err != nil {
log.Fatalf("Failed to create collection: %v", err)
}
datapointIDs := []uuid.UUID{
uuid.MustParse("0195c87a-49f6-5ffa-e3cb-92215d057ea6"),
uuid.MustParse("0195c87b-bd0e-3998-05cf-af6538f34957"),
}
numDeleted, err := client.Datapoints.DeleteIDs(ctx, collection.ID, datapointIDs)
if err != nil {
log.Fatalf("Failed to delete datapoints: %v", err)
}
slog.Info("Deleted data points", slog.Int64("deleted", numDeleted))
}
```
```plaintext Output
Deleted 2 data points.
```
In python, `delete` not only takes a list of datapoint IDs as string, but supports a wide range of other useful input types as well.
See the [delete](/api-reference/python/tilebox.datasets/Collection.delete) API documentation for more details.
### Possible errors
* `NotFoundError`: raised if one of the data points is not found in the collection. If any of the data points are not found,
nothing will be deleted
* `ValueError`: raised if one of the specified ids is not a valid UUID
## Deleting a time interval
One common way to delete data is to first load it from a collection and then forward it to the `delete` method. For
this use case it often is a good idea to query the datapoints with `skip_data=True` to avoid loading the data fields,
since you only need the datapoint IDs. See [fetching only metadata](/datasets/query#fetching-only-metadata) for more details.
```python Python
to_delete = collection.load(("2023-05-01", "2023-06-01"), skip_data=True)
n_deleted = collection.delete(datapoints)
print(f"Deleted {n_deleted} data points.")
```
```go Go
collectionID := uuid.MustParse("c5145c99-1843-4816-9221-970f9ce3ac93")
startDate := time.Date(2023, time.May, 1, 0, 0, 0, 0, time.UTC)
endDate := time.Date(2023, time.June, 1, 0, 0, 0, 0, time.UTC)
mai2023 := query.NewTimeInterval(startDate, endDate)
var toDelete []*v1.Sentinel2Msi
err := client.Datapoints.QueryInto(ctx,
[]uuid.UUID{collectionID}, &toDelete,
datasets.WithTemporalExtent(mai2023),
datasets.WithSkipData(),
)
if err != nil {
log.Fatalf("Failed to query datapoints: %v", err)
}
numDeleted, err := client.Datapoints.Delete(ctx, collectionID, toDelete)
if err != nil {
log.Fatalf("Failed to delete datapoints: %v", err)
}
slog.Info("Deleted data points", slog.Int64("deleted", numDeleted))
```
```plaintext Output
Deleted 104 data points.
```
## Automatic batching
Tilebox automatically batches the delete requests for you, so you don't have to worry about the maximum request size.
# Ingesting Data
Source: https://docs.tilebox.com/datasets/ingest
Learn how to ingest data into a Tilebox dataset.
You need to have write permission on the collection to be able to ingest data.
Check out the examples below for common scenarios of ingesting data into a collection.
## Dataset schema
Tilebox Datasets are strongly typed. This means you can only ingest data that matches the schema of a dataset. The schema is defined during dataset creation time.
The examples on this page assume that you have access to a [Timeseries dataset](/datasets/types/timeseries) that has the following schema:
Check out the [Creating a dataset](/guides/datasets/create) guide for an example of how to create such a dataset.
**MyCustomDataset schema**
| Field name | Type | Description |
| ---------------- | ------------------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------- |
| `time` | Timestamp | Timestamp of the data point. Required by the [Timeseries dataset](/datasets/types/timeseries) type. |
| `id` | UUID | Auto-generated UUID for each datapoint. |
| `ingestion_time` | Timestamp | Auto-generated timestamp for when the data point was ingested into the Tilebox API. |
| `value` | float64 | A numeric measurement value. |
| `sensor` | string | A name of the sensor that generated the data point. |
| `precise_time` | Timestamp | A precise measurement time in nanosecond precision. |
| `sensor_history` | Array\[float64] | The last few measurements of the sensor. |
A full overview of available data types can be found in the [here](/datasets/concepts/datasets#field-types).
Once you've defined the schema and created a dataset, you can access it and create a collection to ingest data into.
```python Python
from tilebox.datasets import Client
client = Client()
dataset = client.dataset("my_org.my_custom_dataset")
collection = dataset.get_or_create_collection("Measurements")
```
```go Go
package main
import (
"context"
"log"
"github.com/tilebox/tilebox-go/datasets/v1"
)
func main() {
ctx := context.Background()
client := datasets.NewClient()
dataset, err := client.Datasets.Get(ctx, "my_org.my_custom_dataset")
if err != nil {
log.Fatalf("Failed to get dataset: %v", err)
}
collection, err := client.Collections.GetOrCreate(ctx, dataset.ID, "Measurements")
if err != nil {
log.Fatalf("Failed to get collection: %v", err)
}
}
```
## Preparing data for ingestion
Ingestion can be done either in Python or Go.
### Python
[`collection.ingest`](/api-reference/python/tilebox.datasets/Collection.ingest) supports a wide range of input types. Below is an example of using either a `pandas.DataFrame` or an `xarray.Dataset` as input.
#### pandas.DataFrame
A [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) is a representation of two-dimensional, potentially heterogeneous tabular data. It's a powerful tool for working with structured data, and Tilebox supports it as input for `ingest`.
The example below shows how to construct a `pandas.DataFrame` from scratch, that matches the schema of the `MyCustomDataset` dataset and can be ingested into it.
```python Python
import pandas as pd
data = pd.DataFrame({
"time": [
"2025-03-28T11:44:23Z",
"2025-03-28T11:45:19Z",
],
"value": [45.16, 273.15],
"sensor": ["A", "B"],
"precise_time": [
"2025-03-28T11:44:23.345761444Z",
"2025-03-28T11:45:19.128742312Z",
],
"sensor_history": [
[-12.15, 13.45, -8.2, 16.5, 45.16],
[300.16, 280.12, 273.15],
],
})
print(data)
```
```plaintext Output
time value sensor precise_time sensor_history
0 2025-03-28T11:44:23Z 45.16 A 2025-03-28T11:44:23.345761444Z [-12.15, 13.45, -8.2, 16.5, 45.16]
1 2025-03-28T11:45:19Z 273.15 B 2025-03-28T11:45:19.128742312Z [300.16, 280.12, 273.15]
```
Once you have the data ready in this format, you can `ingest` it into a collection.
```python Python
# now that we have the data frame in the correct format
# we can ingest it into the Tilebox dataset
collection.ingest(data)
# To verify it now contains the 2 data points
print(collection.info())
```
```plaintext Output
Measurements: [2025-03-28T11:44:23.000 UTC, 2025-03-28T11:45:19.000 UTC] (2 data points)
```
You can now also head on over to the [Tilebox Console](/console) and view the newly ingested data points there.
#### xarray.Dataset
[`xarray.Dataset`](/sdks/python/xarray) is the default format in which Tilebox Datasets returns data when
[querying data](/datasets/query) from a collection.
Tilebox also supports it as input for ingestion. The example below shows how to construct an `xarray.Dataset`
from scratch, that matches the schema of the `MyCustomDataset` dataset and can then be ingested into it.
To learn more about `xarray.Dataset`, visit Tilebox dedicated [Xarray documentation page](/sdks/python/xarray).
```python Python
import pandas as pd
data = xr.Dataset({
"time": ("time", [
"2025-03-28T11:46:13Z",
"2025-03-28T11:46:54Z",
]),
"value": ("time", [48.1, 290.12]),
"sensor": ("time", ["A", "B"]),
"precise_time": ("time", [
"2025-03-28T11:46:13.345761444Z",
"2025-03-28T11:46:54.128742312Z",
]),
"sensor_history": (("time", "n_sensor_history"), [
[13.45, -8.2, 16.5, 45.16, 48.1],
[280.12, 273.15, 290.12, np.nan, np.nan],
]),
})
print(data)
```
```plaintext Output
Size: 504B
Dimensions: (time: 2, n_sensor_history: 5)
Coordinates:
* time (time)
Array fields manifest in xarray using an extra dimension, in this case `n_sensor_history`. In case
of different array sizes for each data point, remaining values are filled up with a fill value, depending on the
`dtype` of the array. For `float64` this is `np.nan` (not a number).
Don't worry - when ingesting data into a Tilebox dataset, Tilebox will automatically skip those padding fill values
and not store them in the dataset.
Now that you have the `xarray.Dataset` in the correct format, you can ingest it into the Tilebox dataset collection.
```python Python
collection = dataset.get_or_create_collection("OtherMeasurements")
collection.ingest(data)
# To verify it now contains the 2 data points
print(collection.info())
```
```plaintext Output
OtherMeasurements: [2025-03-28T11:46:13.000 UTC, 2025-03-28T11:46:54.000 UTC] (2 data points)
```
### Go
[`Client.Datapoints.Ingest`](/api-reference/go/datasets/Datapoints.Ingest) supports ingestion of data points in the form of a slice of protobuf messages.
#### Protobuf
Protobuf is Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data.
More details on protobuf can be found in the [protobuf section](/sdks/go/protobuf).
In the example below, `v1.Modis` type has been generated using [tilebox-generate](https://github.com/tilebox/tilebox-generate) as described in the [protobuf section](/sdks/go/protobuf).
```go Go
datapoints := []*v1.Modis{
v1.Modis_builder{
Time: timestamppb.New(time.Now()),
GranuleName: proto.String("Granule 1"),
}.Build(),
v1.Modis_builder{
Time: timestamppb.New(time.Now().Add(-5 * time.Hour)),
GranuleName: proto.String("Past Granule 2"),
}.Build(),
}
ingestResponse, err := client.Datapoints.Ingest(ctx,
collectionID,
&datapoints
false,
)
```
## Copying or moving data
Since `ingest` takes `query`'s output as input, you can easily copy or move data from one collection to another.
Copying data like this also works across datasets in case the dataset schemas are compatible.
```python Python
src_collection = dataset.collection("Measurements")
data_to_copy = src_collection.load(("2025-03-28", "2025-03-29"))
dest_collection = dataset.collection("OtherMeasurements")
dest_collection.ingest(data_to_copy) # copy the data to the other collection
# To verify it now contains 4 datapoints (2 we ingested already, and 2 we copied just now)
print(dest_collection.info())
```
```go Go
dataset, err := client.Datasets.Get(ctx, "my_org.my_custom_dataset")
if err != nil {
log.Fatalf("Failed to get dataset: %v", err)
}
srcCollection, err := client.Collections.GetOrCreate(ctx, dataset.ID, "Measurements")
if err != nil {
log.Fatalf("Failed to get collection: %v", err)
}
startDate := time.Date(2025, time.March, 28, 0, 0, 0, 0, time.UTC)
endDate := time.Date(2025, time.March, 29, 0, 0, 0, 0, time.UTC)
var dataToCopy []*v1.MyCustomDataset
err = client.Datapoints.QueryInto(ctx,
[]uuid.UUID{srcCollection.ID}, &dataToCopy,
datasets.WithTemporalExtent(query.NewTimeInterval(startDate, endDate)),
)
if err != nil {
log.Fatalf("Failed to query datapoints: %v", err)
}
destCollection, err := client.Collections.GetOrCreate(ctx, dataset.ID, "OtherMeasurements")
if err != nil {
log.Fatalf("Failed to get collection: %v", err)
}
// copy the data to the other collection
_, err = client.Datapoints.Ingest(ctx, destCollection.ID, &dataToCopy, false)
if err != nil {
log.Fatalf("Failed to ingest datapoints: %v", err)
}
// To verify it now contains 4 datapoints (2 we ingested already, and 2 we copied just now)
updatedDestCollection, err := client.Collections.Get(ctx, dataset.ID, "OtherMeasurements")
if err != nil {
log.Fatalf("Failed to get collection: %v", err)
}
slog.Info("Updated collection", slog.String("collection", updatedDestCollection.String()))
```
```plaintext Output
OtherMeasurements: [2025-03-28T11:44:23.000 UTC, 2025-03-28T11:46:54.000 UTC] (4 data points)
```
## Automatic batching
Tilebox automatically batches the ingestion requests for you, so you don't have to worry about the maximum request size.
## Idempotency
Tilebox will auto-generate datapoint IDs based on the data of all its fields - except for the auto-generated
`ingestion_time`, so ingesting the same data twice will result in the same ID being generated. By default, Tilebox
will silently skip any data points that are duplicates of existing ones in a collection. This behavior is especially
useful when implementing idempotent algorithms. That way, re-executions of certain ingestion tasks due to retries
or other reasons will never result in duplicate data points.
You can instead also request an error to be raised if any of the generated datapoint IDs already exist.
This can be done by setting the `allow_existing` parameter to `False`.
```python Python
data = pd.DataFrame({
"time": [
"2025-03-28T11:45:19Z",
],
"value": [45.16],
"sensor": ["A"],
"precise_time": [
"2025-03-28T11:44:23.345761444Z",
],
"sensor_history": [
[-12.15, 13.45, -8.2, 16.5, 45.16],
],
})
# we already ingested the same data point previously
collection.ingest(data, allow_existing=False)
# we can still ingest it, by setting allow_existing=True
# but the total number of datapoints will still be the same
# as before in that case, since it already exists and therefore
# will be skipped
collection.ingest(data, allow_existing=True) # no-op
```
```go Go
datapoints := []*v1.MyCustomDataset{
v1.MyCustomDataset_builder{
Time: timestamppb.New(time.Date(2025, time.March, 28, 11, 45, 19, 0, time.UTC)),
Value: proto.Float64(45.16),
Sensor: proto.String("A"),
PreciseTime: timestamppb.New(time.Date(2025, time.March, 28, 11, 44, 23, 345761444, time.UTC)),
SensorHistory: []float64{-12.15, 13.45, -8.2, 16.5, 45.16},
}.Build(),
}
// we already ingested the same data point previously
_, err = client.Datapoints.Ingest(ctx, collection.ID, &datapoints, false)
if err != nil {
log.Fatalf("Failed to ingest datapoints: %v", err)
}
// we can still ingest it, by setting allowExisting to true
// but the total number of datapoints will still be the same
// as before in that case, since it already exists and therefore
// will be skipped
_, err = client.Datapoints.Ingest(ctx, collection.ID, &datapoints, true) // no-op
if err != nil {
log.Fatalf("Failed to ingest datapoints: %v", err)
}
```
```plaintext Output
ArgumentError: found existing datapoints with same id, refusing to ingest with "allow_existing=false"
```
## Ingestion from common file formats
Through the usage of `xarray` and `pandas` you can also easily ingest existing datasets available in file
formats, such as CSV, [Parquet](https://parquet.apache.org/), [Feather](https://arrow.apache.org/docs/python/feather.html) and more.
Check out the [Ingestion from common file formats](/guides/datasets/ingest-format) guide for examples of how to achieve this.
# Tilebox Datasets
Source: https://docs.tilebox.com/datasets/introduction
Tilebox Datasets provides structured and high-performance satellite metadata access.
Tilebox Datasets ingests and structures metadata for efficient querying, reducing data transfer and storage costs.
Create your own [Custom Datasets](/datasets/concepts/datasets) and easily set up a private, custom, strongly typed and highly available catalogue, or
explore any of the wide range of [available public open data datasets](/datasets/open-data) available on Tilebox.
Learn more about datasets by exploring the following sections:
Learn what dataset types are available on Tilebox and how to create, list and access them.
Learn what collections are and how to access them.
Find out how to access data from a collection for specific time intervals.
Learn how to ingest data into a collection.
For a quick reference to API methods or specific parameter meanings, [check out the complete datasets API Reference](/api-reference/python/tilebox.datasets/Client).
## Terminology
Get familiar with some key terms when working with time series datasets.
Data points are the individual entities that form a dataset. Each data point has a set of required [fields](/datasets/types/timeseries) determined by the dataset type, and can have custom user-defined fields.
Datasets act as containers for data points. All data points in a dataset share the same type and fields. Tilebox supports different types of datasets, currently those are [Timeseries](/datasets/types/timeseries) and [Spatio-temporal](/datasets/types/spatiotemporal) datasets.
Collections group data points within a dataset. They help represent logical groupings of data points that are often queried together.
## Creating a datasets client
Prerequisites
* You have installed the [python](/sdks/python/install) `tilebox-datasets` package or [go](/sdks/go/install) library.
* You have [created](/authentication) a Tilebox API key.
After meeting these prerequisites, you can create a client instance to interact with Tilebox Datasets.
```python Python
from tilebox.datasets import Client
client = Client(token="YOUR_TILEBOX_API_KEY")
```
```go Go
import (
"github.com/tilebox/tilebox-go/datasets/v1"
)
client := datasets.NewClient(
datasets.WithAPIKey("YOUR_TILEBOX_API_KEY"),
)
```
You can also set the `TILEBOX_API_KEY` environment variable to your API key. You can then instantiate the client without passing the `token` argument. Python will automatically use this environment variable for authentication.
```python Python
from tilebox.datasets import Client
# requires a TILEBOX_API_KEY environment variable
client = Client()
```
```go Go
import (
"github.com/tilebox/tilebox-go/datasets/v1"
)
// requires a TILEBOX_API_KEY environment variable
client := datasets.NewClient()
```
Tilebox datasets provide a standard synchronous API by default but also offers an [asynchronous client](/sdks/python/async) if needed.
### Exploring datasets
After creating a client instance, you can start exploring available datasets. A straightforward way to do this in an interactive environment is to [list all datasets](/api-reference/python/tilebox.datasets/Client.datasets) and use the autocomplete feature in your Jupyter notebook.
```python Python
datasets = client.datasets()
datasets. # trigger autocomplete here to view available datasets
```
```go Go
package main
import (
"context"
"github.com/tilebox/tilebox-go/datasets/v1"
"log"
)
func main() {
client := datasets.NewClient()
ctx := context.Background()
allDatasets, err := client.Datasets.List(ctx)
if err != nil {
log.Fatalf("Failed to list datasets: %v", err)
}
for _, dataset := range allDatasets {
log.Printf("Dataset: %s", dataset.Name)
}
}
```
The Console also provides an [overview](https://console.tilebox.com/datasets/explorer) of all available datasets.
### Errors you might encounter
#### AuthenticationError
`AuthenticationError` occurs when the client fails to authenticate with the Tilebox API. This may happen if the provided API key is invalid or expired. A client instantiated with an invalid API key won't raise an error immediately, but an error will occur when making a request to the API.
```python Python
client = Client(token="invalid-key") # runs without error
datasets = client.datasets() # raises AuthenticationError
```
```go Go
package main
import (
"context"
"github.com/tilebox/tilebox-go/datasets/v1"
"log"
)
func main() {
// runs without error
client := datasets.NewClient(datasets.WithAPIKey("invalid-key"))
// returns an error
_, err := client.Datasets.List(context.Background())
if err != nil {
log.Fatalf("Failed to list datasets: %v", err)
}
}
```
## Next steps
# Open Data
Source: https://docs.tilebox.com/datasets/open-data
Learn about the Open data available in Tilebox.
On top of access to your own, private datasets, Tilebox provides access to a growing number of public datasets.
These datasets are available to all users of Tilebox and are a great way to get started and prototype your applications even before data from your own satellites is available.
If there is a dataset you would like to see in Tilebox, you can request it in the [Console open data page](https://console.tilebox.com/datasets/open-data).
## Accessing Open Data through Tilebox
Accessing open datasets in Tilebox is straightforward and as easy as accessing your own datasets. Tilebox has already ingested the required metadata for each available dataset, so you can query, preview, and download the data seamlessly. This setup enables you to leverage performance optimizations and simplifies your workflows.
By using the [datasets API](/datasets), you can start prototyping your applications and workflows easily.
## Available datasets
The Tilebox Console contains in-depth descriptions of each dataset. Check out the [Sentinel 5P Tropomi](https://console.tilebox.com/datasets/explorer/bb394de4-b47f-4069-bc4c-6e6a2c9f0641?view=documentation) documentation as an example.
### Copernicus Data Space
The [Copernicus Data Space](https://dataspace.copernicus.eu/) is an open ecosystem that provides free instant access to data and services from the Copernicus Sentinel missions.
Tilebox currently supports the following datasets from the Copernicus Data Space:
The Sentinel-1 mission is the European Radar Observatory for the Copernicus
joint initiative of the European Commission (EC) and the European Space
Agency (ESA). The Sentinel-1 mission includes C-band imaging operating in
four exclusive imaging modes with different resolution (down to 5 m) and
coverage (up to 400 km). It provides dual polarization capability, short
revisit times and rapid product delivery.
Sentinel-2 is equipped with an optical instrument payload that samples 13
spectral bands: four bands at 10 m, six bands at 20 m and three bands at 60
m spatial resolution.
Sentinel-3 is equipped with multiple instruments whose data is available in Tilebox.
`OLCI` (Ocean and Land Color Instrument) is an optical instrument used to provide
data continuity for ENVISAT MERIS.
`SLSTR` (Sea and Land Surface Temperature Radiometer) is a dual-view scanning
temperature radiometer, which flies in low Earth orbit (800 - 830 km
altitude).
The `SRAL` (SAR Radar Altimeter) instrument comprises one nadir-looking antenna,
and a central electronic chain composed of a Digital Processing Unit (DPU)
and a Radio Frequency Unit (RFU).
OLCI, in conjunction with the SLSTR instrument, provides the `SYN` products,
providing continuity with SPOT VEGETATION.
The primary goal of `TROPOMI` is to provide daily global observations of key
atmospheric constituents related to monitoring and forecasting air quality,
the ozone layer, and climate change.
Landsat-8 is part of the long-running Landsat programme led by USGS and NASA and
carries the Operational Land Imager (OLI) and the Thermal Infrared Sensor
(TIRS). The Operational Land Imager (OLI), on board Landsat-8 measures in
the VIS, NIR and SWIR portions of the spectrum. Its images have 15 m
panchromatic and 30 m multi-spectral spatial resolutions along a 185 km wide
swath, covering wide areas of the Earth's landscape while providing
high enough resolution to distinguish features like urban centres, farms,
forests and other land uses. The entire Earth falls within view once every
16 days due to Landsat-8's near-polar orbit. The Thermal Infra-Red Sensor
instrument, on board Landsat-8, is a thermal imager operating in pushbroom
mode with two Infra-Red channels: 10.8 µm and 12 µm with 100 m spatial
resolution.
### Alaska Satellite Facility (ASF)
The [Alaska Satellite Facility (ASF)](https://asf.alaska.edu/) is a NASA-funded research center at the University of Alaska Fairbanks. ASF supports a wide variety of research and applications using synthetic aperture radar (SAR) and related remote sensing technologies.
Tilebox currently supports the following datasets from the Alaska Satellite Facility:
European Remote Sensing Satellite (ERS) Synthetic Aperture Radar (SAR) Granules
### Umbra Space
[Umbra](https://umbra.space/) satellites provide up to 16 cm resolution Synthetic Aperture Radar (SAR) imagery from space. The Umbra Open Data Program (ODP) features over twenty diverse time-series locations that are frequently updated, allowing users to explore SAR's capabilities.
Tilebox currently supports the following datasets from Umbra Space:
Time-series SAR data provided as Opendata by Umbra Space.
# Querying data
Source: https://docs.tilebox.com/datasets/query
Learn how to query and load data from Tilebox datasets.
Check out the examples below for common scenarios when loading data from collections.
```python Python
from tilebox.datasets import Client
client = Client()
datasets = client.datasets()
collections = datasets.open_data.copernicus.sentinel1_sar.collections()
collection = collections["S1A_IW_RAW__0S"]
```
```go Go
package main
import (
"context"
"log"
"github.com/tilebox/tilebox-go/datasets/v1"
)
func main() {
ctx := context.Background()
client := datasets.NewClient()
dataset, err := client.Datasets.Get(ctx, "open_data.copernicus.sentinel1_sar")
if err != nil {
log.Fatalf("Failed to get dataset: %v", err)
}
collection, err := client.Collections.Get(ctx, dataset.ID, "S1A_IW_RAW__0S")
if err != nil {
log.Fatalf("Failed to get collection: %v", err)
}
}
```
To load data points from a dataset collection, use the [load](/api-reference/python/tilebox.datasets/Collection.load) method. It requires a `time_or_interval` parameter to specify the time or time interval for loading.
## Filtering by time
### Time interval
To load data for a specific time interval, use a `tuple` in the form `(start, end)` as the `time_or_interval` parameter. Both `start` and `end` must be [TimeScalars](#time-scalars), which can be `datetime` objects or strings in ISO 8601 format.
```python Python
interval = ("2017-01-01", "2023-01-01")
data = collection.load(interval, show_progress=True)
```
```go Go
startDate := time.Date(2017, time.January, 1, 0, 0, 0, 0, time.UTC)
endDate := time.Date(2023, time.January, 1, 0, 0, 0, 0, time.UTC)
interval := query.NewTimeInterval(startDate, endDate)
var datapoints []*v1.Sentinel1Sar
err = client.Datapoints.QueryInto(ctx,
[]uuid.UUID{collection.ID},
&datapoints,
datasets.WithTemporalExtent(interval),
)
if err != nil {
log.Fatalf("Failed to query datapoints: %v", err)
}
log.Printf("Queried %d datapoints", len(datapoints))
```
Output
```plaintext Python
Size: 725MB
Dimensions: (time: 1109597, latlon: 2)
Coordinates:
ingestion_time (time) datetime64[ns] 9MB 2024-06-21T11:03:33.8524...
id (time)
The `show_progress` parameter is optional and can be used to display a [tqdm](https://tqdm.github.io/) progress bar while loading data.
A time interval specified as a tuple is interpreted as a half-closed interval. This means the start time is inclusive, and the end time is exclusive.
For instance, using an end time of `2023-01-01` includes data points up to `2022-12-31 23:59:59.999`, but excludes those from `2023-01-01 00:00:00.000`.
This behavior mimics the Python `range` function and is useful for chaining time intervals.
```python Python
import xarray as xr
data = []
for year in [2017, 2018, 2019, 2020, 2021, 2022]:
interval = (f"{year}-01-01", f"{year + 1}-01-01")
data.append(collection.load(interval, show_progress=True))
# Concatenate the data into a single dataset, which is equivalent
# to the result of the single request in the code example above.
data = xr.concat(data, dim="time")
```
```go Go
var datapoints []*v1.Sentinel1Sar
for year := 2017; year <= 2022; year++ {
startDate := time.Date(year, time.January, 1, 0, 0, 0, 0, time.UTC)
interval := query.NewTimeInterval(startDate, startDate.AddDate(1, 0, 0))
var partialDatapoints []*v1.Sentinel1Sar
err = client.Datapoints.QueryInto(ctx,
[]uuid.UUID{collection.ID},
&partialDatapoints,
datasets.WithTemporalExtent(interval),
)
if err != nil {
log.Fatalf("Failed to query datapoints: %v", err)
}
// Concatenate the data into a single dataset, which is equivalent
// to the result of the single request in the code example above.
datapoints = append(datapoints, partialDatapoints...)
}
```
Above example demonstrates how to split a large time interval into smaller chunks while loading data in separate requests. Typically, this is not necessary as the datasets client auto-paginates large intervals.
### Endpoint inclusivity
For greater control over inclusivity of start and end times, you can use the `TimeInterval` dataclass instead of a tuple of two [TimeScalars](#time-scalars). This class allows you to specify the `start` and `end` times, as well as their inclusivity. Here's an example of creating equivalent `TimeInterval` objects in two different ways.
```python Python
from datetime import datetime
from tilebox.datasets.data import TimeInterval
interval1 = TimeInterval(
datetime(2017, 1, 1), datetime(2023, 1, 1),
end_inclusive=False
)
interval2 = TimeInterval(
# python datetime granularity is in milliseconds
datetime(2017, 1, 1), datetime(2022, 12, 31, 23, 59, 59, 999999),
end_inclusive=True
)
print("Inclusivity is indicated by interval notation: ( and [")
print(interval1)
print(interval2)
print(f"They are equivalent: {interval1 == interval2}")
print(interval2.to_half_open())
# Query data for a time interval
data = collection.load(interval1, show_progress=True)
```
```go Go
interval1 := query.TimeInterval{
Start: time.Date(2017, time.January, 1, 0, 0, 0, 0, time.UTC),
End: time.Date(2023, time.January, 1, 0, 0, 0, 0, time.UTC),
EndInclusive: false,
}
interval2 := query.TimeInterval{
Start: time.Date(2017, time.January, 1, 0, 0, 0, 0, time.UTC),
End: time.Date(2022, time.December, 31, 23, 59, 59, 999999999, time.UTC),
EndInclusive: true,
}
log.Println("Inclusivity is indicated by interval notation: ( and [")
log.Println(interval1.String())
log.Println(interval2.String())
log.Println("They are equivalent:", interval1.Equal(&interval2))
log.Println(interval2.ToHalfOpen().String())
// Query data for a time interval
var datapoints []*v1.Sentinel1Sar
err = client.Datapoints.QueryInto(ctx,
[]uuid.UUID{collection.ID},
&datapoints,
datasets.WithTemporalExtent(interval1),
)
```
```plaintext Output
Inclusivity is indicated by interval notation: ( and [
[2017-01-01T00:00:00.000 UTC, 2023-01-01T00:00:00.000 UTC)
[2017-01-01T00:00:00.000 UTC, 2022-12-31T23:59:59.999 UTC]
They are equivalent: True
[2017-01-01T00:00:00.000 UTC, 2023-01-01T00:00:00.000 UTC)
```
### Time scalars
You can load all datapoints linked to a specific timestamp by specifying a `TimeScalar` as the time query argument. A `TimeScalar` can be a `datetime` object or a string in ISO 8601 format. When passed to the `load` method, it retrieves all data points matching exactly that specified time, with a millisecond precision.
A collection may contain multiple datapoints for one millisecond, so multiple data points could still be returned. If you want to fetch only a single data point, [query the collection by id](#loading-a-data-point-by-id) instead.
Here's how to load a data point at a specific millisecond from a [collection](/datasets/concepts/collections).
```python Python
data = collection.load("2024-08-01 00:00:01.362")
print(data)
```
```go Go
temporalExtent := query.NewPointInTime(time.Date(2024, time.August, 1, 0, 0, 1, 362000000, time.UTC))
var datapoints []*v1.Sentinel1Sar
err = client.Datapoints.QueryInto(ctx,
[]uuid.UUID{collection.ID}, &datapoints,
datasets.WithTemporalExtent(temporalExtent),
)
log.Printf("Queried %d datapoints", len(datapoints))
log.Printf("First datapoint time: %s", datapoints[0].GetTime().AsTime())
```
Output
```plaintext Python
Size: 721B
Dimensions: (time: 1, latlon: 2)
Coordinates:
ingestion_time (time) datetime64[ns] 8B 2024-08-01T08:53:08.450499
id (time)
Tilebox uses millisecond precision for timestamps. To load all data points for a specific second, it's a [time interval](/datasets/query#time-interval) request. Refer to the examples below for details.
The output of the `load` method is an `xarray.Dataset` object. To learn more about Xarray, visit the dedicated [Xarray page](/sdks/python/xarray).
### Time iterables (Python only)
You can specify a time interval by using an iterable of `TimeScalar`s as the `time_or_interval` parameter. This is especially useful when you want to use the output of a previous `load` call as input for another load. Here's how that works.
```python Python
interval = ("2017-01-01", "2023-01-01")
meta_data = collection.load(interval, skip_data=True)
first_50_data_points = collection.load(meta_data.time[:50], skip_data=False)
print(first_50_data_points)
```
```plaintext Output
Size: 33kB
Dimensions: (time: 50, latlon: 2)
Coordinates:
ingestion_time (time) datetime64[ns] 400B 2024-06-21T11:03:33.852...
id (time)
```python Python
from datetime import datetime
import pytz
# Tokyo has a UTC+9 hours offset, so this is the same as
# 2017-01-01 02:45:25.679 UTC
tokyo_time = pytz.timezone('Asia/Tokyo').localize(
datetime(2017, 1, 1, 11, 45, 25, 679000)
)
print(tokyo_time)
data = collection.load(tokyo_time)
print(data)
```
```go Go
// Tokyo has a UTC+9 hours offset, so this is the same as
// 2017-01-01 02:45:25.679 UTC
location, _ := time.LoadLocation("Asia/Tokyo")
tokyoTime := query.NewPointInTime(time.Date(2017, 1, 1, 11, 45, 25, 679000000, location))
log.Println(tokyoTime)
var datapoints []*v1.Sentinel1Sar
err = client.Datapoints.QueryInto(ctx,
[]uuid.UUID{collection.ID}, &datapoints,
datasets.WithTemporalExtent(tokyoTime),
)
if err != nil {
log.Fatalf("Failed to query datapoints: %v", err)
}
log.Printf("Queried %d datapoints", len(datapoints))
// time is in UTC since API always returns UTC timestamps
log.Printf("First datapoint time: %s", datapoints[0].GetTime().AsTime())
```
Output
```plaintext Python
2017-01-01 11:45:25.679000+09:00
Size: 725B
Dimensions: (time: 1, latlon: 2)
Coordinates:
ingestion_time (time) datetime64[ns] 8B 2024-06-21T11:03:33.852435
id (time)
## Filtering by area of interest
[Spatio-temporal](/datasets/types/spatiotemporal) also come with spatial filtering capabilities. When querying, you can specify a time interval, and additionally also specify a bounding box or a polygon as an area of interest to filter by.
Here is how to query Sentinel-2 `S2A_S2MSI2A` data over Colorado for April 2025.
```python Python
from shapely import MultiPolygon
from tilebox.datasets import Client
area = MultiPolygon(
[
(((-109.10, 40.98), (-102.01, 40.95), (-102.06, 36.82), (-109.06, 36.96), (-109.10, 40.98)),),
]
)
client = Client()
datasets = client.datasets()
sentinel2_msi = datasets.open_data.copernicus.sentinel2_msi
data = sentinel2_msi.collection("S2A_S2MSI2A").query(
temporal_extent=("2025-04-01", "2025-05-02"),
spatial_extent=area,
show_progress=True,
)
```
```go Go
ctx := context.Background()
client := datasets.NewClient()
dataset, err := client.Datasets.Get(ctx, "open_data.copernicus.sentinel2_msi")
if err != nil {
log.Fatalf("Failed to get dataset: %v", err)
}
collection, err := client.Collections.Get(ctx, dataset.ID, "S2A_S2MSI2A")
if err != nil {
log.Fatalf("Failed to get collection: %v", err)
}
startDate := time.Date(2025, 4, 1, 0, 0, 0, 0, time.UTC)
endDate := time.Date(2025, 5, 2, 0, 0, 0, 0, time.UTC)
timeInterval := query.NewTimeInterval(startDate, endDate)
area := orb.MultiPolygon{
{
{{-109.10, 40.98}, {-102.01, 40.95}, {-102.06, 36.82}, {-109.06, 36.96}, {-109.10, 40.98}},
},
}
var datapoints []*v1.Sentinel2Msi
err = client.Datapoints.QueryInto(ctx,
[]uuid.UUID{collection.ID}, &datapoints,
datasets.WithTemporalExtent(timeInterval),
datasets.WithSpatialExtent(area),
)
if err != nil {
log.Fatalf("Failed to query datapoints: %v", err)
}
```
## Fetching only metadata
Sometimes, it may be useful to load only dataset metadata fields without the actual data fields. This can be done by setting the `skip_data` parameter to `True`.
For example, when only checking if a datapoint exists, you may want to use `skip_data=True` to avoid loading the data fields.
If this flag is set, the response will only include the required fields for the given dataset type, but no custom data fields.
```python Python
data = collection.load("2024-08-01 00:00:01.362", skip_data=True)
print(data)
```
```go Go
temporalExtent := query.NewPointInTime(time.Date(2024, time.August, 1, 0, 0, 1, 362000000, time.UTC))
var datapoints []*v1.Sentinel1Sar
err = client.Datapoints.QueryInto(ctx,
[]uuid.UUID{collection.ID}, &datapoints,
datasets.WithTemporalExtent(temporalExtent),
datasets.WithSkipData(),
)
if err != nil {
log.Fatalf("Failed to query datapoints: %v", err)
}
log.Printf("Queried %d datapoints", len(datapoints))
log.Printf("First datapoint time: %s", datapoints[0].GetTime().AsTime())
```
Output
```plaintext Python
Size: 160B
Dimensions: (time: 1)
Coordinates:
ingestion_time (time) datetime64[ns] 8B 2024-08-01T08:53:08.450499
id (time)
## Empty response
The `load` method always returns an `xarray.Dataset` object, even if there are no data points for the specified query. In such cases, the returned dataset will be empty, but no error will be raised.
```python Python
time_with_no_data_points = "1997-02-06 10:21:00"
data = collection.load(time_with_no_data_points)
print(data)
```
```go Go
timeWithNoDatapoints := query.NewPointInTime(time.Date(1997, time.February, 6, 10, 21, 0, 0, time.UTC))
var datapoints []*v1.Sentinel1Sar
err = client.Datapoints.QueryInto(ctx,
[]uuid.UUID{collection.ID}, &datapoints,
datasets.WithTemporalExtent(timeWithNoDatapoints),
)
if err != nil {
log.Fatalf("Failed to query datapoints: %v", err)
}
log.Printf("Queried %d datapoints", len(datapoints))
```
Output
```plaintext Python
Size: 0B
Dimensions: ()
Data variables:
*empty*
```
```plaintext Go
Queried 0 datapoints
```
## By datapoint ID
If you know the ID of the data point you want to load, you can use [find](/api-reference/python/tilebox.datasets/Collection.find).
This method always returns a single data point or raises an exception if no data point with the specified ID exists.
```python Python
datapoint_id = "01916d89-ba23-64c9-e383-3152644bcbde"
datapoint = collection.find(datapoint_id)
print(datapoint)
```
```go Go
datapointID := uuid.MustParse("01910b3c-8552-424d-e116-81d0c3402ccc")
var datapoint v1.Sentinel1Sar
err = client.Datapoints.GetInto(ctx,
[]uuid.UUID{collection.ID}, datapointID, &datapoint,
)
if err != nil {
log.Fatalf("Failed to query datapoint: %v", err)
}
fmt.Println(protojson.Format(&datapoint))
```
Output
```plaintext Python
Size: 725B
Dimensions: (latlon: 2)
Coordinates:
ingestion_time datetime64[ns] 8B 2024-08-20T05:53:08.600528
id
You can also set the `skip_data` parameter when calling `find` to query only the required fields of the data point, same as for `load`.
## Automatic pagination
Querying large time intervals can return a large number of data points.
Tilebox automatically handles pagination for you by sending paginated requests to the server.
# Spatio-temporal
Source: https://docs.tilebox.com/datasets/types/spatiotemporal
Spatio-temporal datasets link each data point to a specific point in time and a location on the Earth's surface.
Spatio-temporal datasets are currently in development and not available yet. Stay tuned for updates
Each spatio-temporal dataset comes with a set of required and auto-generated fields for each data point.
## Required fields
While the specific data fields between different time series datasets can vary, there are common fields that all time series datasets share.
The timestamp associated with each data point. Timestamps are always in UTC.
For indexing and querying, Tilebox truncates timestamps to millisecond precision. But Timeseries datasets may contain arbitrary custom `Timestamp` fields that store timestamps up to a nanosecond precision.
A location on the earth's surface associated with each data point. Supported geometry types are `Polygon`, `MultiPolygon`, `Point` and `MultiPoint`.
## Auto-generated fields
A [universally unique identifier (UUID)](https://en.wikipedia.org/wiki/Universally_unique_identifier) that uniquely identifies each data point. IDs are generated so that sorting them lexicographically also sorts them by time.
IDs generated by Tilebox are deterministic, meaning that ingesting the exact same data values into the same collection will always result in the same ID.
The time the data point was ingested into the Tilebox API.
## Creating a spatio-temporal dataset
To create a spatio-temporal dataset, use the [Tilebox Console](/console) and select `Spatio-temporal Dataset` as the dataset type. The required and auto-generated fields
already outlined will be automatically added to the dataset schema.
## Spatio-temporal queries
Spatio-temporal datasets support efficient time-based and spatially filtered queries. To query a specific location in a given time interval,
specify a time range and a geometry when [querying data points](/datasets/query) from a collection.
# Timeseries
Source: https://docs.tilebox.com/datasets/types/timeseries
Timeseries datasets link each data point to a specific point in time.
Each timeseries dataset comes with a set of required and auto-generated fields for each data point.
## Required fields
While the specific data fields between different time series datasets can vary, there are common fields that all time series datasets share.
The timestamp associated with each data point. Timestamps are always in UTC.
For indexing and querying, Tilebox truncates timestamps to millisecond precision. But Timeseries datasets may contain arbitrary custom `Timestamp` fields that store timestamps up to a nanosecond precision.
## Auto-generated fields
A [universally unique identifier (UUID)](https://en.wikipedia.org/wiki/Universally_unique_identifier) that uniquely identifies each data point. IDs are generated so that sorting them lexicographically also sorts them by time.
IDs generated by Tilebox are deterministic, meaning that ingesting the exact same data values into the same collection will always result in the same ID.
The time the data point was ingested into the Tilebox API.
## Creating a timeseries dataset
To create a timeseries dataset, use the [Tilebox Console](/console) and select `Timeseries Dataset` as the dataset type. The required and auto-generated fields
already outlined will be automatically added to the dataset schema.
## Time-based queries
Timeseries datasets support time-based queries. To query a specific time interval, specify a time range when [querying data](/datasets/query) from a collection.
# Creating a dataset
Source: https://docs.tilebox.com/guides/datasets/create
Learn how to create a dataset
This page guides you through the process of creating a dataset in Tilebox using the [Tilebox Console](/console).
## Related documentation
Learn about Tilebox datasets and how to use them.
Learn about Timeseries datasets, which link each data point to a specific point in time.
## Creating a dataset in the Console
Create a dataset in Tilebox by going to [My datasets](https://console.tilebox.com/datasets/my-datasets) and clicking the `Create dataset` button.
Choose a [dataset kind](/datasets/concepts/datasets#dataset-types) from the dropdown menu. Required fields for the selected dataset kind are automatically added.
Complete these fields:
* `Name` is the name of your dataset.
* `Code name` is a unique identifier for the dataset within your team. It's automatically generated, but you can adjust it if needed.
* `Description` is a brief description of the dataset purpose.
Specify the fields for your dataset.
Each field has these properties:
* `Name` is the name of the field (it should be `snake_case`).
* `Type` and `Array` let you specify the field data type and whether it's an array. See below for an explanation of the available data.
* `Description` is an optional brief description of the field. You can use it to provide more context and details about the data.
* `Example value` is an optional example for this field. It can be useful for documentation purposes.
Once you're done completing the fields, click "Create" to create and save the dataset. You are redirected to your newly created dataset.
## Automatic dataset schema documentation
By specifying the fields for your dataset, including the data type, description, and an example value for each one, Tilebox
is capable of automatically generating a documentation page for your dataset schema.
## Adding extra documentation
You can also add custom documentation to your dataset, providing more context and details about the data included data.
This documentation supports rich formatting, including links, tables, code snippets, and more.
To add documentation, click the edit pencil button on the dataset page to open the documentation editor.
You can use Markdown to format your documentation; you can include links, tables, code snippets, and other Markdown features.
If you don't see the edit pencil button, you don't have the required permissions to edit the
documentation.
Once you are done editing the documentation, click the `Save` button to save your changes.
## Changing the dataset schema
You can always add new fields to a dataset.
If you want to remove or edit existing fields, you'll first need to empty all collections in the dataset.
Then, you can edit the dataset schema in the console.
# Ingesting data
Source: https://docs.tilebox.com/guides/datasets/ingest
Learn how to ingest an existing dataset into Tilebox
This guide is also available as a Google Colab notebook. Click here for an interactive version.
This page guides you through the process of ingesting data into a Tilebox dataset. Starting from an existing
dataset available as file in the [GeoParquet](https://geoparquet.org/) format, you'll go through the process of
ingesting that data into Tilebox as a [Timeseries](/datasets/types/timeseries) dataset.
If you have your data in a different format, check out the [Ingesting from common file formats](/guides/datasets/ingest-format) examples of how to ingest it.
## Related documentation
Learn about Tilebox datasets and how to use them.
Learn how to ingest data into a Tilebox dataset.
## Downloading the example dataset
The dataset used in this example is available as a [GeoParquet](https://geoparquet.org/) file. You can download it
from here: [modis\_MCD12Q1.geoparquet](https://storage.googleapis.com/tbx-web-assets-2bad228/docs/data-samples/modis_MCD12Q1.geoparquet).
## Installing the necessary packages
This example uses a couple of python packages for reading parquet files and for visualizing the dataset. Install the
required packages using your preferred package manager. For new projects, Tilebox recommend using [uv](https://docs.astral.sh/uv/).
```bash uv
uv add tilebox-datasets geopandas folium matplotlib mapclassify
```
```bash pip
pip install tilebox-datasets geopandas folium matplotlib mapclassify
```
```bash poetry
poetry add tilebox-datasets="*" geopandas="*" folium="*" matplotlib="*" mapclassify="*"
```
```bash pipenv
pipenv install tilebox-datasets geopandas folium matplotlib mapclassify
```
## Reading and previewing the dataset
The dataset is available as a [GeoParquet](https://geoparquet.org/) file. You can read it using the `geopandas.read_parquet` function.
```python Python
import geopandas as gpd
modis_data = gpd.read_parquet("modis_MCD12Q1.geoparquet")
modis_data.head(5)
```
```plaintext Output
time end_time granule_name geometry horizontal_tile_number vertical_tile_number tile_id file_size checksum checksum_type day_night_flag browse_granule_id published_at
0 2001-01-01 00:00:00+00:00 2001-12-31 23:59:59+00:00 MCD12Q1.A2001001.h00v08.061.2022146024956.hdf POLYGON ((-180 10, -180 0, -170 0, -172.62252 ... 0 8 51000008 275957 941243048 CKSUM Day None 2022-06-23 10:54:43.824000+00:00
1 2001-01-01 00:00:00+00:00 2001-12-31 23:59:59+00:00 MCD12Q1.A2001001.h00v09.061.2022146024922.hdf POLYGON ((-180 0, -180 -10, -172.62252 -10, -1... 0 9 51000009 285389 3014510714 CKSUM Day None 2022-06-23 10:54:44.697000+00:00
2 2001-01-01 00:00:00+00:00 2001-12-31 23:59:59+00:00 MCD12Q1.A2001001.h00v10.061.2022146032851.hdf POLYGON ((-180 -10, -180 -20, -180 -20, -172.6... 0 10 51000010 358728 2908215698 CKSUM Day None 2022-06-23 10:54:44.669000+00:00
3 2001-01-01 00:00:00+00:00 2001-12-31 23:59:59+00:00 MCD12Q1.A2001001.h01v08.061.2022146025203.hdf POLYGON ((-172.62252 10, -170 0, -160 0, -162.... 1 8 51001008 146979 1397661843 CKSUM Day None 2022-06-23 10:54:44.309000+00:00
4 2001-01-01 00:00:00+00:00 2001-12-31 23:59:59+00:00 MCD12Q1.A2001001.h01v09.061.2022146025902.hdf POLYGON ((-170 0, -172.62252 -10, -162.46826 -... 1 9 51001009 148935 2314263965 CKSUM Day None 2022-06-23 10:54:44.023000+00:00
```
## Exploring it visually
Geopandas comes with a built in explorer to visually explore the dataset.
```python Python
modis_data.head(1000).explore(width=800, height=600)
```
## Create a Tilebox dataset
Now you'll create a [Timeseries](/datasets/types/timeseries) dataset with the same schema as the given MODIS dataset.
To do so, you'll use the [Tilebox Console](/console), navigate to `My Datasets` and click `Create Dataset`. Then select
`Timeseries Dataset` as the dataset type.
For more information on creating a dataset, check out the [Creating a dataset](/guides/datasets/create) guide for a
Step by step guide.
Now, to match the given MODIS dataset, you'll specify the following fields:
| Field | Type | Note |
| ------------------------ | --------- | ---------------------------------------- |
| granule\_name | string | MODIS granule name |
| geometry | Geometry | Tile boundary coordinates of the granule |
| end\_time | Timestamp | Measurement end time |
| horizontal\_tile\_number | int64 | Horizontal modis tile number (0-35) |
| vertical\_tile\_number | int64 | Vertical modis tile number (0-17) |
| tile\_id | int64 | Modis Tile ID |
| file\_size | uint64 | File size of the product in bytes |
| checksum | string | Hash checksum of the file |
| checksum\_type | string | Checksum algorithm (MD5 / CKSUM) |
| day\_night\_flag | int64 | Day / Night / Both |
| browse\_granule\_id | string | Optional granule ID for browsing |
| published\_at | Timestamp | The time the product was published |
In the console, this will look like the following:
## Access the dataset from Python
Your newly created dataset is now available. You can access it from Python. For this, you'll need to know the dataset slug,
which was assigned automatically based on the specified `code_name`. To find out the slug, navigate to the dataset overview
in the console.
You can now instantiate the dataset client and access the dataset.
```python Python
from tilebox.datasets import Client
client = Client()
dataset = client.dataset("tilebox.modis") # replace with your dataset slug
```
## Create a collection
Next, you'll create a collection to insert your data into.
```python Python
collection = dataset.get_or_create_collection("MCD12Q1")
```
## Ingest the data
Now, you'll finally ingest the MODIS data into the collection.
```python Python
datapoint_ids = collection.ingest(modis_data)
print(f"Successfully ingested {len(datapoint_ids)} datapoints!")
```
```plaintext Output
Successfully ingested 7245 datapoints!
```
## Query the newly ingested data
You can now query the newly ingested data. You can query a subset of the data for a specific time range.
Since the data is now stored directly in the Tilebox dataset, you can query and access it from anywhere.
```python Python
data = collection.load(("2015-01-01", "2020-01-01"))
data
```
```plaintext Output
Size: 403kB
Dimensions: (time: 1575)
Coordinates:
* time (time) datetime64[ns] 13kB 2015-01-01 ... 2019-01-01
Data variables: (12/14)
id (time)
For more information on accessing and querying data, check out [querying data](/datasets/query).
## View the data in the console
You can also view your data in the Console, by navigate to the dataset, selecting the collection and then clicking
on one of the data points.
## Next steps
Congrats. You've successfully ingested data into Tilebox. You can now explore the data in the console and use it for
further processing and analysis.
Learn all about [querying your newly created dataset](https://docs.tilebox.com/datasets/query)
Explore the different dataset types available in Tilebox
Check out a growing number of publicly available open data datasets on Tilebox
# Ingesting from common file formats
Source: https://docs.tilebox.com/guides/datasets/ingest-format
Learn how to ingest data from common file formats into Tilebox
Through the usage of `xarray` and `pandas` you can also easily ingest existing datasets available in file
formats, such as CSV, [Parquet](https://parquet.apache.org/), [Feather](https://arrow.apache.org/docs/python/feather.html) and more.
## CSV
Comma-separated values (CSV) is a common file format for tabular data. It's widely used in data science. Tilebox
supports CSV ingestion using the `pandas.read_csv` function.
Assuming you have a CSV file named `data.csv` with the following content. If you want to follow along, you can
download the file [here](https://storage.googleapis.com/tbx-web-assets-2bad228/docs/data-samples/ingestion_data.csv).
```csv ingestion_data.csv
time,value,sensor,precise_time,sensor_history,some_unwanted_column
2025-03-28T11:44:23Z,45.16,A,2025-03-28T11:44:23.345761444Z,"[-12.15, 13.45, -8.2, 16.5, 45.16]","Unsupported"
2025-03-28T11:45:19Z,273.15,B,2025-03-28T11:45:19.128742312Z,"[300.16, 280.12, 273.15]","Unsupported"
```
This data already conforms to the schema of the `MyCustomDataset` dataset, except for `some_unwanted_column` which
you want to drop before you ingest it. Here is how this could look like:
```python Python
import pandas as pd
data = pd.read_csv("ingestion_data.csv")
data = data.drop(columns=["some_unwanted_column"])
collection = dataset.get_or_create_collection("CSVMeasurements")
collection.ingest(data)
```
## Parquet
[Apache Parquet](https://parquet.apache.org/) is an open source, column-oriented data file format designed for efficient data storage and retrieval.
Tilebox supports Parquet ingestion using the `pandas.read_parquet` function.
The parquet file used in this example [is available here](https://storage.googleapis.com/tbx-web-assets-2bad228/docs/data-samples/ingestion_data.parquet).
```python Python
import pandas as pd
data = pd.read_parquet("ingestion_data.parquet")
# our data already conforms to the schema of the MyCustomDataset
# dataset, so lets ingest it
collection = dataset.get_or_create_collection("ParquetMeasurements")
collection.ingest(data)
```
## Feather
[Feather](https://arrow.apache.org/docs/python/feather.html) is a file format originating from the Apache Arrow project,
designed for storing tabular data in a fast and memory-efficient way. It's supported by many programming languages,
including Python. Tilebox supports Feather ingestion using the `pandas.read_feather` function.
The feather file used in this example [is available here](https://storage.googleapis.com/tbx-web-assets-2bad228/docs/data-samples/ingestion_data.feather).
```python Python
import pandas as pd
data = pd.read_feather("ingestion_data.feather")
# our data already conforms to the schema of the MyCustomDataset
# dataset, so lets ingest it
collection = dataset.get_or_create_collection("FeatherMeasurements")
collection.ingest(data)
```
## GeoParquet
Please check out the [Ingesting data](/guides/datasets/ingest) guide for an example of ingesting a GeoParquet file.
# Multi-language Workflows
Source: https://docs.tilebox.com/guides/workflows/multi-language
Learn how to create workflows that use tasks written in different languages
The code for this guide is available on GitHub.
## Tilebox languages and SDKs
Tilebox supports multiple languages and SDKs for running workflows.
All Tilebox SDKs and workflows are designed to be interoperable, which means it's possible to have a workflow where individual tasks are executed in different languages.
Check out [Languages & SDKs](/sdks/introduction) to learn more about currently available programming SDKs.
## Why multi-language workflows?
You might need to use multiple languages in a single workflow for many reasons, such as:
* You want to use a language that is better suited for a specific task (for example Python for data processing, Go for a backend API)
* You want to use a library that is only available in a specific language (for example xarray in Python)
* You started prototyping in Python, but need to start migrating the compute-intensive parts of your workflow to a different language for performance reasons
## Multi-language workflow example
This guide will tackle the first use case: you have a tasking server in Go and want to offload some of the processing to Python.
## Defining tasks in Python and Go
```python Python
class ScheduleImageCapture(Task):
# The input parameters must match the ones defined in the Go task
location: tuple[float, float] # lat_lon
resolution_m: int
spectral_bands: list[float] # spectral bands in nm
def execute(self, context: ExecutionContext) -> None:
# Here you can implement your task logic, submit subtasks, etc.
print(f"Image captured for {self.location} with {self.resolution_m}m resolution and bands {self.spectral_bands}")
@staticmethod
def identifier() -> tuple[str, str]:
# The identifier must match the one defined in the Go task
return "tilebox.com/schedule_image_capture", "v1.0"
```
```go Go
type ScheduleImageCapture struct {
// json tags must match the Python task definition
Location [2]float64 `json:"location"` // lat_lon
ResolutionM int `json:"resolution_m"`
SpectralBands []float64 `json:"spectral_bands"` // spectral bands in nm
}
// No need to define the Execute method since we're only submitting the task
// Identifier must match with the task identifier in the Python runner
func (t *ScheduleImageCapture) Identifier() workflows.TaskIdentifier {
return workflows.NewTaskIdentifier("tilebox.com/schedule_image_capture", "v1.0")
}
```
A couple important points to note:
The dataclass parameters in Python must match the struct fields in Go, including the types and the names (through the JSON tags in Go).
Due to Go and Python having different naming conventions, it's recommended to use JSON tags in the Go struct to match the Python dataclass field names to respect the language-specific conventions.
Go fields must start with an uppercase letter to be serialized to JSON.
The need for JSON tags in the preceding Go code is currently necessary but might be removed in the future.
The execute method is defined in the Python task but not in the Go task since Go will only be used to submit the task, not executing it.
It's necessary to define the `identifier` method in both the Python and Go tasks and to make sure they match.
The `identifier` method has two values, the first being an arbitrary unique task identifier and the second being the version in the `v{major}.{minor}` format.
## Creating a Go server that submits jobs
Write a simple HTTP tasking server in Go with a `/submit` endpoint that accepts requests to submit a `ScheduleImageCapture` job.
Both Go and Python code are using `test-cluster-tZD9Ca2qsqt4V` as the cluster slug. You should replace it with your own cluster slug, which you can create in the [Tilebox Console](https://console.tilebox.com/workflows/clusters).
```go Go
func main() {
log.Println("Server starting on http://localhost:8080")
client := workflows.NewClient()
http.HandleFunc("/submit", submitHandler(client))
log.Fatal(http.ListenAndServe(":8080", nil))
}
// Submit a job based on some query parameters
func submitHandler(client *workflows.Client) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
latArg := r.URL.Query().Get("lat")
lonArg := r.URL.Query().Get("lon")
resolutionArg := r.URL.Query().Get("resolution")
bandsArg := r.URL.Query().Get("bands[]")
latFloat, err := strconv.ParseFloat(latArg, 64)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
lonFloat, err := strconv.ParseFloat(lonArg, 64)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
resolutionM, err := strconv.Atoi(resolutionArg)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
var spectralBands []float64
for _, bandArg := range strings.Split(bandsArg, ",") {
band, err := strconv.ParseFloat(bandArg, 64)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
spectralBands = append(spectralBands, band)
}
job, err := client.Jobs.Submit(r.Context(), "Schedule Image capture",
[]workflows.Task{
&ScheduleImageCapture{
Location: [2]float64{latFloat, lonFloat},
ResolutionM: resolutionM,
SpectralBands: spectralBands,
},
},
)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
_, _ = io.WriteString(w, fmt.Sprintf("Job submitted: %s\n", job.ID))
}
}
```
In the same way that you can submit jobs across languages you can also submit subtasks across languages.
## Creating a Python runner
Write a Python script that starts a task runner and registers the `ScheduleImageCapture` task.
```python Python
from tilebox.workflows import Client
def main():
client = Client()
runner = client.runner(tasks=[ScheduleImageCapture])
runner.run_forever()
if __name__ == "__main__":
main()
```
## Testing it
Start the Go server.
```bash Shell
go run .
```
In another terminal, start the Python runner.
```bash Shell
uv run runner.py
```
Submit a job to the Go server.
```bash Shell
curl http://localhost:8080/submit?lat=40.75&lon=-73.98&resolution=30&bands[]=489.0,560.6,666.5
```
Check the Python runner output, it should print the following line:
```plaintext Output
Image captured for [40.75, -73.98] with 30m resolution and bands [489, 560.6, 666.5]
```
## Next Steps
The code for this guide is available on GitHub.
As a learning exercise, you can try to change the [News API Workflow](/workflows/concepts/tasks#dependencies-example) to replace the `FetchNews` task with a Go task and keep all the other tasks in Python.
You'll learn how to submit a subtask in another language than what the current task is executed in.
# Tilebox
Source: https://docs.tilebox.com/introduction
The Solar System's #1 developer tool for space data management
export const HeroCard = ({children, title, description, href}) => {
return
{children}
;
};
Tilebox is a lightweight space data management and orchestration software - on ground and in orbit. It provides a framework that simplifies access, processing, and distribution of space data across different environments enabling efficient multi-language, multi-cluster workflows. Tilebox integrates seamlessly with your existing infrastructure, ensuring that you maintain complete control over your data and algorithms.
## Modules
Tilebox consists of two primary modules:
## Getting Started
To get started, check out some of the following resources:
## Guides
You can also start by looking through these guides:
Learn about time-series datasets and their structure.
Discover how to query and load data from a dataset.
Gain a deeper understanding of how to create tasks using the Tilebox Workflow Orchestrator.
Find out how to deploy Task Runners to run workflows in a parallel, distributed manner.
# Quickstart
Source: https://docs.tilebox.com/quickstart
This guide helps you set up and get started using Tilebox. It covers how to install a Tilebox client for your preferred language and how to use it to query data from a dataset and run a workflow.
Select your preferred language and follow the steps below to get started.
## Start in a Notebook
Explore the provided [Sample Notebooks](/sdks/python/sample-notebooks) to begin your journey with Tilebox. These notebooks offer a step-by-step guide to using the API and showcase many features supported by Tilebox Python clients. You can also use these notebooks as a foundation for your own projects.
## Start on Your Device
If you prefer to work locally, follow these steps to get started.
Install the Tilebox Python packages.
```bash uv
uv add tilebox-datasets tilebox-workflows tilebox-storage
```
```bash pip
pip install tilebox-datasets tilebox-workflows tilebox-storage
```
```bash poetry
poetry add tilebox-datasets="*" tilebox-workflows="*" tilebox-storage="*"
```
```bash pipenv
pipenv install tilebox-datasets tilebox-workflows tilebox-storage
```
For new projects we recommend using [uv](https://docs.astral.sh/uv/). More information about installing the Tilebox Python SDKs can be found in the [Installation](/sdks/python/install) section.
Create an API key by logging into the [Tilebox Console](https://console.tilebox.com), navigating to [Account -> API Keys](https://console.tilebox.com/account/api-keys), and clicking the "Create API Key" button.
Copy the API key and keep it somewhere safe. You will need it to authenticate your requests.
Use the datasets client to query data from a dataset.
```python Python
from tilebox.datasets import Client
client = Client(token="YOUR_TILEBOX_API_KEY")
# select a dataset
datasets = client.datasets()
dataset = datasets.open_data.copernicus.sentinel2_msi
# and load data from a collection in a given time range
collection = dataset.collection("S2A_S2MSI1C")
data_january_2022 = collection.load(("2022-01-01", "2022-02-01"))
```
Use the workflows client to create a task and submit it as a job.
```python Python
from tilebox.workflows import Client, Task
# Replace with your actual token
client = Client(token="YOUR_TILEBOX_API_KEY")
class HelloWorldTask(Task):
greeting: str = "Hello"
name: str = "World"
def execute(self, context):
print(f"{self.greeting} {self.name}, from the main task!")
context.submit_subtask(HelloSubtask(name=self.name))
class HelloSubtask(Task):
name: str
def execute(self, context):
print(f"Hello from the subtask, {self.name}!")
# Initiate the job
jobs = client.jobs()
jobs.submit("parameterized-hello-world", HelloWorldTask(greeting="Greetings", name="Universe"))
# Run the tasks
runner = client.runner(tasks=[HelloWorldTask, HelloSubtask])
runner.run_all()
```
Review the following guides to learn more about the modules that make up Tilebox:
Learn how to create a Timeseries dataset using the Tilebox Console.
Learn how to ingest an existing CSV dataset into a Timeseries dataset collection.
## Start with Examples
Explore the provided [Examples](/sdks/go/examples) to begin your journey with Tilebox. These examples offer a step-by-step guide to using the API and showcase many features supported by Tilebox Go clients. You can also use these examples as a foundation for your own projects.
## Start on Your Device
If you prefer to work locally, follow these steps to get started.
Add the Tilebox library in your project.
```bash Shell
go get github.com/tilebox/tilebox-go
```
Install [tilebox-generate](https://github.com/tilebox/tilebox-generate) command-line tool on your machine.
It's used to generate Go structs for Tilebox datasets.
```bash Shell
go install github.com/tilebox/tilebox-generate@latest
```
Create an API key by logging into the [Tilebox Console](https://console.tilebox.com), navigating to [Account -> API Keys](https://console.tilebox.com/account/api-keys), and clicking the "Create API Key" button.
Copy the API key and keep it somewhere safe. You will need it to authenticate your requests.
Run [tilebox-generate](https://github.com/tilebox/tilebox-generate) in the root directory of your Go project.
It generates the dataset type for Sentinel-2 MSI dataset. It will generate a `./protogen/tilebox/v1/sentinel2_msi.pb.go` file.
```bash Shell
tilebox-generate --dataset open_data.copernicus.sentinel2_msi --tilebox-api-key $TILEBOX_API_KEY
```
Use the datasets client to query data from a dataset.
```go Go
package main
import (
"context"
"log"
"log/slog"
"time"
"github.com/google/uuid"
"github.com/paulmach/orb"
"github.com/paulmach/orb/encoding/wkt"
"github.com/tilebox/tilebox-go/datasets/v1"
"github.com/tilebox/tilebox-go/query"
)
func main() {
ctx := context.Background()
client := datasets.NewClient()
// select a dataset
dataset, err := client.Datasets.Get(ctx, "open_data.copernicus.sentinel2_msi")
if err != nil {
log.Fatalf("Failed to get dataset: %v", err)
}
// select a collection
collection, err := client.Collections.Get(ctx, dataset.ID, "S2A_S2MSI1C")
if err != nil {
log.Fatalf("Failed to get collection: %v", err)
}
// load data from a collection in a given time range and spatial extent
colorado := orb.Polygon{
{{-109.05, 37.09}, {-102.06, 37.09}, {-102.06, 41.59}, {-109.05, 41.59}, {-109.05, 37.09}},
}
startDate := time.Date(2025, time.March, 1, 0, 0, 0, 0, time.UTC)
endDate := time.Date(2025, time.April, 1, 0, 0, 0, 0, time.UTC)
march2025 := query.NewTimeInterval(startDate, endDate)
// You have to use tilebox-generate to generate the dataset type
var datapointsOverColorado []*v1.Sentinel2Msi
err = client.Datapoints.QueryInto(ctx,
[]uuid.UUID{collection.ID}, &datapointsOverColorado,
datasets.WithTemporalExtent(march2025),
datasets.WithSpatialExtent(colorado),
)
if err != nil {
log.Fatalf("Failed to query datapoints: %v", err)
}
slog.Info("Found datapoints over Colorado in March 2025", slog.Int("count", len(datapointsOverColorado)))
slog.Info("First datapoint over Colorado",
slog.String("id", datapointsOverColorado[0].GetId().AsUUID().String()),
slog.Time("event time", datapointsOverColorado[0].GetTime().AsTime()),
slog.Time("ingestion time", datapointsOverColorado[0].GetIngestionTime().AsTime()),
slog.String("geometry", wkt.MarshalString(datapointsOverColorado[0].GetGeometry().AsGeometry())),
slog.String("granule name", datapointsOverColorado[0].GetGranuleName()),
slog.String("processing level", datapointsOverColorado[0].GetProcessingLevel().String()),
slog.String("product type", datapointsOverColorado[0].GetProductType()),
// and so on...
)
}
```
Use the workflows client to create a task and submit it as a job.
```go Go
package main
import (
"context"
"log/slog"
"github.com/tilebox/tilebox-go/workflows/v1"
)
type HelloTask struct {
Greeting string
Name string
}
func (t *HelloTask) Execute(ctx context.Context) error {
slog.InfoContext(ctx, "Hello from the main task!", slog.String("Greeting", t.Greeting), slog.String("Name", t.Name))
err := workflows.SubmitSubtasks(ctx, &HelloSubtask{Name: t.Name})
if err != nil {
return err
}
return nil
}
type HelloSubtask struct {
Name string
}
func (t *HelloSubtask) Execute(context.Context) error {
slog.Info("Hello from the subtask!", slog.String("Name", t.Name))
return nil
}
func main() {
ctx := context.Background()
// Replace with your actual token
client := workflows.NewClient()
job, err := client.Jobs.Submit(ctx, "hello-world",
[]workflows.Task{
&HelloTask{
Greeting: "Greetings",
Name: "Tilebox",
},
},
)
if err != nil {
slog.ErrorContext(ctx, "Failed to submit job", slog.Any("error", err))
return
}
slog.InfoContext(ctx, "Job submitted", slog.String("job_id", job.ID.String()))
runner, err := client.NewTaskRunner(ctx)
if err != nil {
slog.Error("failed to create task runner", slog.Any("error", err))
return
}
err = runner.RegisterTasks(
&HelloTask{},
&HelloSubtask{},
)
if err != nil {
slog.Error("failed to register task", slog.Any("error", err))
return
}
runner.Run(ctx)
}
```
Review the following guides to learn more about the modules that make up Tilebox:
Learn how to create a Timeseries dataset using the Tilebox Console.
Learn how to ingest an existing CSV dataset into a Timeseries dataset collection.
# Examples
Source: https://docs.tilebox.com/sdks/go/examples
Examples maintained to use and learn from.
To quickly become familiar with the Go client, you can explore some standalone examples.
You can access the examples on [ GitHub](https://github.com/tilebox/tilebox-go/tree/main/examples).
More examples can be found throughout the docs.
## Workflows examples
How to use Tilebox Workflows to submit and execute a simple task.
How to submit a task and run a workflow using protobuf messages.
How to set up tracing and logging for workflows using Axiom observability platform.
How to set up tracing and logging for workflows using OpenTelemetry.
## Datasets examples
How to query datapoints from a Tilebox dataset.
How to create a collection, ingest datapoints, and then delete them.
# Installation
Source: https://docs.tilebox.com/sdks/go/install
Install the Tilebox Go library
## Package Overview
Tilebox offers a Go SDK for accessing Tilebox services. It additionally includes a command-line tool (tilebox-generate) that can be installed separately.
Datasets and workflows client for Tilebox
Command-line tool to generate Tilebox datasets types for Go
## Installation
Add `tilebox-go` to your project.
```bash Shell
go get github.com/tilebox/tilebox-go
```
Install `tilebox-generate` command-line tool on your machine.
```bash Shell
go install github.com/tilebox/tilebox-generate@latest
```
# Protobuf
Source: https://docs.tilebox.com/sdks/go/protobuf
Overview of protobuf, common use cases, and implementation details.
Tilebox uses [Protocol Buffers](https://protobuf.dev/), with a custom generation tool, combined with standard Go data structures.
[Protocol Buffers](https://protobuf.dev/) (often referred to as `protobuf`) is a schema definition language with an efficient binary format and native language support for lots of languages, including Go.
Protocol buffers are open source since 2008 and are maintained by Google.
## tilebox-generate
Protobuf schemas are typically defined in a `.proto` file, and then converted to a native Go struct using the protobuf compiler.
Tilebox datasets already define a protobuf schema as well, and automate the generation of Go structs for existing datasets through a quick `tilebox-generate` command-line tool.
See [Installation](/sdks/go/install) for more details on how to install `tilebox-generate`.
```sh
tilebox-generate --dataset open_data.copernicus.sentinel1_sar
```
The preceding command will generate a `./protogen/tilebox/v1/sentinel1_sar.pb.go` file. More flags can be set to change the default output folders, package name, etc.
This file contains everything needed to work with the [Sentinel-1 SAR](https://console.tilebox.com/datasets/explorer/e27e6a58-c149-4379-9fdf-9d43903cba74) dataset.
It's recommended to check the generated files you use in your version control system.
If you open this file, you will see that it starts with `// Code generated by protoc-gen-go. DO NOT EDIT.`.
It means that the file was generated by the `protoc-gen-go` tool, which is part of the protobuf compiler.
After editing a dataset, you can call the generate command again to ensure that the changes are reflected in the generated file.
The file contains a `Sentinel1Sar` struct, which is a Go struct that represents a datapoint in the dataset.
```go Go
type Sentinel1Sar struct {
xxx_hidden_GranuleName *string `protobuf:"bytes,1,opt,name=granule_name,json=granuleName"`
xxx_hidden_ProcessingLevel v1.ProcessingLevel `protobuf:"varint,2,opt,name=processing_level,json=processingLevel,enum=datasets.v1.ProcessingLevel"`
// more fields
}
```
Notice that the fields are private (starting with a lowercase letter), so they are not accessible.
Protobuf hides the fields and provides getters and setters to access them.
## Protobuf 101
### Initializing a message
Here is how to initialize a `v1.Sentinel1Sar` message.
```go Go
import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)
datapoint := v1.Sentinel1Sar_builder{
Time: timestamppb.New(time.Now()),
GranuleName: proto.String("S1A_EW_GRDH_1SSH_20141004T020507_20141004T020611_002673_002FAF_8645_COG.SAFE"),
ProductType: proto.String("EW_GRDH_1S-COG"),
FileSize: proto.Int64(488383473),
}.Build()
```
Protobuf fields are private and provides a builder pattern to create a message.
`proto.String` is a helper function that converts `string` to `*string`.
This allows protobuf to differentiate between a field that is set to an empty string and a field that is not set (nil).
An exhaustive list of those helper functions can be found [here](https://github.com/golang/protobuf/blob/master/proto/wrappers.go).
Only primitives have a `proto.XXX` helper function.
Complex types such as timestamps, durations, UUIDs, and geometries have a [constructor function](#constructors).
### Getters and setters
Protobuf provides methods to get, set, clear and check if a field is set.
```go Go
fmt.Println(datapoint.GetGranuleName())
datapoint.SetGranuleName("my amazing granule")
datapoint.ClearGranuleName()
if datapoint.HasGranuleName() {
fmt.Println("Granule name is set")
}
```
Getters for primitive types will return a Go native type (for example, int64, string, etc.).
Getters for complex types such as timestamps, durations, UUIDs, and geometries can also be converted to more standard types using [AsXXX](#asxxx-methods) methods.
## Well known types
Beside Go primitives, Tilebox supports some well known types:
* Duration: A duration of time. See [Duration](https://protobuf.dev/reference/protobuf/google.protobuf/#duration) for more information.
* Timestamp: A point in time. See [Timestamp](https://protobuf.dev/reference/protobuf/google.protobuf/#timestamp) for more information.
* UUID: A [universally unique identifier (UUID)](https://en.wikipedia.org/wiki/Universally_unique_identifier).
* Geometry: Geospatial geometries of type Point, LineString, Polygon or MultiPolygon.
They have a couple of useful methods to work with them.
### Constructors
```go Go
import (
"github.com/paulmach/orb"
datasetsv1 "github.com/tilebox/tilebox-go/protogen/go/datasets/v1"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)
timestamppb.New(time.Now())
durationpb.New(10 * time.Second)
datasetsv1.NewUUID(uuid.New())
datasetsv1.NewGeometry(orb.Point{1, 2})
```
### `CheckValid` method
`CheckValid` returns an error if the field is invalid.
```go Go
err := datapoint.GetTime().CheckValid()
if err != nil {
fmt.Println(err)
}
```
### `IsValid` method
`IsValid` reports whether the field is valid. It's equivalent to `CheckValid == nil`.
```go Go
if datapoint.GetTime().IsValid() {
fmt.Println("Valid")
}
```
### `AsXXX` methods
`AsXXX` methods convert the field to a more user friendly type.
* `AsUUID` will convert a `datasetsv1.UUID` field to a [uuid.UUID](https://pkg.go.dev/github.com/google/uuid#UUID) type
* `AsTime` will convert a `timestamppb.Timestamp` field to a [time.Time](https://pkg.go.dev/time#Time) type
* `AsDuration` will convert a `durationpb.Duration` field to a [time.Duration](https://pkg.go.dev/time#Duration) type
* `AsGeometry` will convert a `datasetsv1.Geometry` field to an [orb.Geometry](https://github.com/paulmach/orb?tab=readme-ov-file#shared-geometry-interface) interface
```go Go
datapoint.GetId().AsUUID() // uuid.UUID
datapoint.GetTime().AsTime() // time.Time
datapoint.GetDuration().AsDuration() // time.Duration
datapoint.GetGeometry().AsGeometry() // orb.Geometry
```
Those methods performs conversion on a best-effort basis. Type validity must be checked beforehand using `IsValid` or `CheckValid` methods.
## Common data operations
Datapoints are contained in a standard Go slice so all the usual [slice operations](https://gobyexample.com/slices) and [slice functions](https://pkg.go.dev/slices) can be used.
The usual pattern to iterate over data in Go is by using a `for` loop.
As an example, here is how to extract the `copernicus_id` fields from the datapoints.
```go Go
// assuming datapoints has been filled using `client.Datapoints.QueryInto` method
var datapoints []*v1.Sentinel1Sar
copernicusIDs := make([]uuid.UUID, len(datapoints))
for i, dp := range datapoints {
copernicusIDs[i] = dp.GetCopernicusId().AsUUID()
}
```
Here is an example of filtering out datapoints that have been published before January 2000 and are not from the Sentinel-1C platform.
```go Go
jan2000 := time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)
// slice of length of 0, but preallocate a capacity of len(datapoints)
s1cDatapoints := make([]*v1.Sentinel1Sar, 0, len(datapoints))
for _, dp := range datapoints {
if dp.GetPublished().AsTime().Before(jan2000) {
continue
}
if dp.GetPlatform() != "S1C" {
continue
}
s1cDatapoints = append(s1cDatapoints, proto.CloneOf(dp)) // Copy the underlying data
}
```
## Converting to JSON
Protobuf messages can be converted to JSON without loss of information. This is useful for interoperability with other systems that doesn't use protobuf.
A guide on protoJSON can be found format here: [https://protobuf.dev/programming-guides/json/](https://protobuf.dev/programming-guides/json/)
```go Go
originalDatapoint := datapoints[0]
// Convert proto.Message to JSON as bytes
jsonDatapoint, err := protojson.Marshal(originalDatapoint)
if err != nil {
log.Fatalf("Failed to marshal datapoint: %v", err)
}
fmt.Println(string(jsonDatapoint))
```
```plaintext Output
{"time":"2001-01-01T00:00:00Z","id":{"uuid":"AOPHpzQAAmV2MZ4+Zv+JGg=="},"ingestionTime":"2025-03-25T10:26:10.577385176Z","granuleName":"MCD12Q1.A2001001.h02v08.061.2022146033342.hdf","geometry":{"wkb":"AQMAAAABAAAABQAAAFIi9vf7TmTAXsX3////I0Bexff///9jwAAAAAAAAAAACUn4//+/YsAAAAAAAAAAAC7AdjgMCmPAXsX3////I0BSIvb3+05kwF7F9////yNA"},"endTime":"2001-12-31T23:59:59Z","horizontalTileNumber":"2","verticalTileNumber":"8","tileId":"51002008","fileSize":"176215","checksum":"771212892","checksumType":"CKSUM","dayNightFlag":"Day","publishedAt":"2022-06-23T10:58:13.895Z"}
```
It can also be converted back to a `proto.Message`.
```go Go
// Convert JSON bytes to proto.Message
unmarshalledDatapoint := &v1.Sentinel1Sar{}
err = protojson.Unmarshal(jsonDatapoint, unmarshalledDatapoint)
if err != nil {
log.Fatalf("Failed to unmarshal datapoint: %v", err)
}
fmt.Println("Are both equal?", proto.Equal(unmarshalledDatapoint, originalDatapoint))
```
```plaintext Output
Are both equal? true
```
# Tilebox languages and SDKs
Source: https://docs.tilebox.com/sdks/introduction
Tilebox supports multiple languages and SDKs for accessing datasets and running workflows.
export const HeroCard = ({children, title, description, href}) => {
return
{children}
;
};
The following language SDKs are currently available for Tilebox. Select one to learn more.
# Async support
Source: https://docs.tilebox.com/sdks/python/async
Tilebox offers a standard synchronous API by default but also provides an async client option when needed.
## Why use async?
When working with external datasets, such as [Tilebox datasets](/datasets/concepts/datasets), loading data may take some time. To speed up this process, you can run requests in parallel. While you can use multi-threading or multi-processing, which can be complex, often times a simpler option is to perform data loading tasks asynchronously using coroutines and `asyncio`.
## Switching to an async datasets client
To switch to the async client, change the import statement for the `Client`. The example below illustrates this change.
```python Python (Sync)
from tilebox.datasets import Client
# This client is synchronous
client = Client()
```
```python Python (Async)
from tilebox.datasets.aio import Client
# This client is asynchronous
client = Client()
```
After switching to the async client, use `await` for operations that interact with the Tilebox API.
```python Python (Sync)
# Listing datasets
datasets = client.datasets()
# Listing collections
dataset = datasets.open_data.copernicus.sentinel1_sar
collections = dataset.collections()
# Collection information
collection = collections["S1A_IW_RAW__0S"]
info = collection.info()
print(f"Data for My-collection is available for {info.availability}")
# Loading data
data = collection.load(("2022-05-01", "2022-06-01"), show_progress=True)
# Finding a specific datapoint
datapoint_uuid = "01910b3c-8552-7671-3345-b902cc0813f3"
datapoint = collection.find(datapoint_uuid)
```
```python Python (Async)
# Listing datasets
datasets = await client.datasets()
# Listing collections
dataset = datasets.open_data.copernicus.sentinel1_sar
collections = await dataset.collections()
# Collection information
collection = collections["S1A_IW_RAW__0S"]
info = await collection.info()
print(f"Data for My-collection is available for {info.availability}")
# Loading data
data = await collection.load(("2022-05-01", "2022-06-01"), show_progress=True)
# Finding a specific datapoint
datapoint_uuid = "01910b3c-8552-7671-3345-b902cc0813f3"
datapoint = await collection.find(datapoint_uuid)
```
Jupyter notebooks and similar interactive environments support asynchronous code execution. You can use
`await some_async_call()` as the output of a code cell.
## Fetching data concurrently
The primary benefit of the async client is that it allows concurrent requests, enhancing performance.
In below example, data is fetched from multiple collections. The synchronous approach retrieves data sequentially, while the async approach does so concurrently, resulting in faster execution.
```python Python (Sync)
# Example: fetching data sequentially
# switch to the async example to compare the differences
import time
from tilebox.datasets import Client
from tilebox.datasets.sync.timeseries import TimeseriesCollection
client = Client()
datasets = client.datasets()
collections = datasets.open_data.copernicus.landsat8_oli_tirs.collections()
def stats_for_2020(collection: TimeseriesCollection) -> None:
"""Fetch data for 2020 and print the number of data points that were loaded."""
data = collection.load(("2020-01-01", "2021-01-01"), show_progress=True)
n = data.sizes['time'] if 'time' in data else 0
return (collection.name, n)
start = time.monotonic()
results = [stats_for_2020(collections[name]) for name in collections]
duration = time.monotonic() - start
for collection_name, n in results:
print(f"There are {n} datapoints in {collection_name} for 2020.")
print(f"Fetching data took {duration:.2f} seconds")
```
```python Python (Async)
# Example: fetching data concurrently
import asyncio
import time
from tilebox.datasets.aio import Client
from tilebox.datasets.aio.timeseries import TimeseriesCollection
client = Client()
datasets = await client.datasets()
collections = await datasets.open_data.copernicus.landsat8_oli_tirs.collections()
async def stats_for_2020(collection: TimeseriesCollection) -> None:
"""Fetch data for 2020 and print the number of data points that were loaded."""
data = await collection.load(("2020-01-01", "2021-01-01"), show_progress=True)
n = data.sizes['time'] if 'time' in data else 0
return (collection.name, n)
start = time.monotonic()
# Initiate all requests concurrently
requests = [stats_for_2020(collections[name]) for name in collections]
# Wait for all requests to finish in parallel
results = await asyncio.gather(*requests)
duration = time.monotonic() - start
for collection_name, n in results:
print(f"There are {n} datapoints in {collection_name} for 2020.")
print(f"Fetching data took {duration:.2f} seconds")
```
The output demonstrates that the async approach runs approximately 30% faster for this example. With `show_progress` enabled, the progress bars update concurrently.
```plaintext Python (Sync)
There are 19624 datapoints in L1GT for 2020.
There are 1281 datapoints in L1T for 2020.
There are 65313 datapoints in L1TP for 2020.
There are 25375 datapoints in L2SP for 2020.
Fetching data took 10.92 seconds
```
```plaintext Python (Async)
There are 19624 datapoints in L1GT for 2020.
There are 1281 datapoints in L1T for 2020.
There are 65313 datapoints in L1TP for 2020.
There are 25375 datapoints in L2SP for 2020.
Fetching data took 7.45 seconds
```
## Async workflows
The Tilebox workflows Python client does not have an async client. This is because workflows are designed for distributed and concurrent execution outside a single async event loop. But within a single task, you may use still use`async` code to take advantage of asynchronous execution, such as parallel data loading. You can achieve this by wrapping your async code in `asyncio.run`.
Below is an example of using async code within a workflow task.
```python Python (Async)
import asyncio
import xarray as xr
from tilebox.datasets.aio import Client as DatasetsClient
from tilebox.datasets.data import TimeIntervalLike
from tilebox.workflows import Task, ExecutionContext
class FetchData(Task):
def execute(self, context: ExecutionContext) -> None:
# The task execution itself is synchronous
# But we can leverage async code within the task using asyncio.run
# This will fetch three months of data in parallel
data_jan, data_feb, data_mar = asyncio.run(load_first_three_months())
async def load_data(interval: TimeIntervalLike):
datasets = await DatasetsClient().datasets()
collections = await datasets.open_data.copernicus.landsat8_oli_tirs.collections()
return await collections["L1T"].load(interval)
async def load_first_three_months() -> tuple[xr.Dataset, xr.Dataset, xr.Dataset]:
jan = load_data(("2020-01-01", "2020-02-01"))
feb = load_data(("2020-02-01", "2020-03-01"))
mar = load_data(("2020-03-01", "2020-04-01"))
# load the three months in parallel
jan, feb, mar = await asyncio.gather(jan, feb, mar)
return jan, feb, mar
```
If you encounter an error like `RuntimeError: asyncio.run() cannot be called from a running event loop`, it means you're trying to start another asyncio event loop (with `asyncio.run`) from within an existing one. This often happens in Jupyter notebooks since they automatically start an event loop. A way to resolve this is by using [nest-asyncio](https://pypi.org/project/nest-asyncio/).
# Geometries
Source: https://docs.tilebox.com/sdks/python/geometries
How geometries are handled in the Tilebox Python client.
Many datasets consist of granules that represent specific geographical areas on the Earth's surface. Often, a polygon defining the outline of these areasāa footprintāaccompanies other granule metadata in time series datasets. Tilebox provides built-in support for working with geometries.
Here's an example that loads some granules from the `ERS SAR` Opendata dataset, which contains geometries.
```python Loading ERS data
from tilebox.datasets import Client
client = Client()
datasets = client.datasets()
ers_collection = datasets.open_data.asf.ers_sar.collection("ERS-2")
ers_data = ers_collection.load(("2008-02-10T21:00", "2008-02-10T22:00"))
```
## Shapely
In the `ers_data` dataset, each granule includes a `geometry` field that represents the footprint of each granule as a polygon. Tilebox automatically converts geometry fields to `Polygon` or `MultiPolygon` objects from the [Shapely](https://shapely.readthedocs.io/en/stable/manual.html) library. By integrating with Shapely, you can use the rich set of libraries and tools it provides. That includes support for computing polygon characteristics such as total area, intersection checks, and conversion to other formats.
```python Printing geometries
geometries = ers_data.geometry.values
print(geometries)
```
Each geometry is a [shapely.Geometry](https://shapely.readthedocs.io/en/stable/geometry.html#geometry).
```plaintext Output
[]
```
Geometries are not always [Polygon](https://shapely.readthedocs.io/en/stable/reference/shapely.Polygon.html#shapely.Polygon) objects. More complex footprint geometries are represented as [MultiPolygon](https://shapely.readthedocs.io/en/stable/reference/shapely.MultiPolygon.html#shapely.MultiPolygon) objects.
### Accessing Coordinates
You can select a polygon from the geometries and access the underlying coordinates and an automatically computed centroid point.
```python Accessing coordinates and computing a centroid point
polygon = geometries[0]
lon, lat = polygon.exterior.coords.xy
center, = list(polygon.centroid.coords)
print(lon)
print(lat)
print(center)
```
```plaintext Output
array('d', [-150.753244, -152.031574, -149.183655, -147.769339, -150.753244])
array('d', [74.250081, 73.336051, 73.001748, 73.899483, 74.250081])
(-149.92927907414239, 73.62538063474753)
```
Interactive environments such as [Jupyter Notebooks](/sdks/python/sample-notebooks) can visualize Polygon shapes graphically. Just type `polygon` in an empty cell and execute it for a visual representation of the polygon shape.
### Visualization on a Map
To visualize polygons on a map, you can use [Folium](https://pypi.org/project/folium/). Below is a helper function that produces an OpenStreetMap with the Polygon overlaid.
```python visualize helper function
# pip install folium
from folium import Figure, Map, Polygon as FoliumPolygon, GeoJson, TileLayer
from folium.plugins import MiniMap
from shapely import Polygon, to_geojson
from collections.abc import Iterable
def visualize(poly: Polygon | Iterable[Polygon], zoom=4):
"""Visualize a polygon or a list of polygons on a map"""
if not isinstance(poly, Iterable):
poly = [poly]
fig = Figure(width=600, height=600)
map = Map(location=geometries[len(geometries)//2].centroid.coords[0][::-1], zoom_start=zoom, control_scale=True)
map.add_child(MiniMap())
fig.add_child(map)
for p in poly:
map.add_child(GeoJson(to_geojson(p)))
return fig
```
Here's how to use it.
```python Visualizing a polygon
visualize(polygon)
```
The `visualize` helper function supports a list of polygons, which can display the data layout of the ERS granules.
```python Visualizing multiple polygons
visualize(geometries)
```
## Format conversion
Shapely supports converting Polygons to some common formats, such as [GeoJSON](https://geojson.org/) or [Well-Known Text (WKT)](https://docs.ogc.org/is/18-010r7/18-010r7.html).
```python Converting to GeoJSON
from shapely import to_geojson
print(to_geojson(polygon))
```
```plaintext Output
{"type":"Polygon","coordinates":[[[-150.753244,74.250081],[-152.031574,73.336051],[-149.183655,73.001748],[-147.769339,73.899483],[-150.753244,74.250081]]]}
```
```python Converting to WKT
from shapely import to_wkt
print(to_wkt(polygon))
```
```plaintext Output
POLYGON ((-150.753244 74.250081, -152.031574 73.336051, -149.183655 73.001748, -147.769339 73.899483, -150.753244 74.250081))
```
## Checking intersections
One common task when working with geometries is checking if a given geometry falls into a specific area of interest. Shapely provides an `intersects` method for this purpose.
```python Checking intersections
from shapely import box
# Box representing the rectangular area lon=(-160, -150) and lat=(69, 70)
area_of_interest = box(-160, 69, -150, 70)
for i, polygon in enumerate(geometries):
if area_of_interest.intersects(polygon):
print(f"{ers_data.granule_name[i].item()} intersects the area of interest!")
else:
print(f"{ers_data.granule_name[i].item()} doesn't intersect the area of interest!")
```
```plaintext Output
E2_66974_STD_F264 doesn't intersect the area of interest!
E2_66974_STD_F265 doesn't intersect the area of interest!
E2_66974_STD_F267 doesn't intersect the area of interest!
E2_66974_STD_F269 doesn't intersect the area of interest!
E2_66974_STD_F271 doesn't intersect the area of interest!
E2_66974_STD_F273 intersects the area of interest!
E2_66974_STD_F275 intersects the area of interest!
E2_66974_STD_F277 intersects the area of interest!
E2_66974_STD_F279 doesn't intersect the area of interest!
E2_66974_STD_F281 doesn't intersect the area of interest!
E2_66974_STD_F283 doesn't intersect the area of interest!
E2_66974_STD_F285 doesn't intersect the area of interest!
E2_66974_STD_F289 doesn't intersect the area of interest!
```
## Combining polygons
As shown in the visualization of the granule footprints, the granules collectively form an orbit from pole to pole. Measurements are often combined during processing. You can do the same with geometries by combining them into a single polygon, which represents the hull around all individual footprints using [shapely.unary\_union](https://shapely.readthedocs.io/en/stable/reference/shapely.unary_union.html).
```python Combining multiple polygons
from shapely.ops import unary_union
hull = unary_union(geometries)
visualize(hull)
```
The computed hull consists of two polygons due to a gap (probably a missing granule) in the geometries. Such geometries are represented as [Multi Polygons](#multi-polygons).
## Multi Polygons
A collection of one or more non-overlapping polygons combined into a single geometry is called a [MultiPolygon](https://shapely.readthedocs.io/en/latest/reference/shapely.MultiPolygon.html). Footprint geometries can be of type `MultiPolygon` due to gaps or pole discontinuities. The computed hull in the previous example is a `MultiPolygon`.
```python Accessing individual polygons of a MultiPolygon
print(f"The computed hull of type {type(hull).__name__} consists of {len(hull.geoms)} sub polygons")
for i, poly in enumerate(hull.geoms):
print(f"Sub polygon {i} has an area of {poly.area}")
```
```plaintext Output
The computed hull of type MultiPolygon consists of 2 sub polygons
Sub polygon 0 has an area of 2.025230449898011
Sub polygon 1 has an area of 24.389998081651527
```
## Antimeridian Crossings
A common issue with `longitude / latitude` geometries is crossings of the 180-degree meridian, or the antimeridian. For example, the coordinates of a `LineString` from Japan to the United States might look like this:
`140, 141, 142, ..., 179, 180, -179, -178, ..., -125, -124`
Libraries like Shapely are not designed to handle spherical coordinate systems, so caution is necessary with such geometries.
Here's an `ERS` granule demonstrating this issue.
```python Antimeridian Crossing
# A granule that crosses the antimeridian
granule = ers_collection.find("0119bb86-0260-5819-6aab-f99796417155")
polygon = granule.geometry.item()
print(polygon.exterior.coords.xy)
visualize(polygon)
```
```plaintext Output
array('d', [177.993407, 176.605009, 179.563047, -178.904076, 177.993407])
array('d', [74.983185, 74.074615, 73.727752, 74.61847, 74.983185])
```
This 2D visualization appears incorrect. Both the visualization and any calculations performed may yield inaccurate results. For instance, testing whether the granule intersects the 0-degree meridian provides a false positive.
```python Problems with calculating intersections
from shapely import LineString
null_meridian = LineString([(0, -90), (0, 90)])
print(polygon.intersects(null_meridian)) # True - but this is incorrect!
```
The GeoJSON specification offers a solution for this problem. In the section [Antimeridian Cutting](https://datatracker.ietf.org/doc/html/rfc7946#section-3.1.9), it suggests always cutting lines and polygons into two partsāone for the eastern hemisphere and one for the western hemisphere.
In python, this can be achieved using the [antimeridian](https://pypi.org/project/antimeridian/) package.
```python Cutting the polygon along the antimeridian
# pip install antimeridian
import antimeridian
fixed_polygon = antimeridian.fix_polygon(polygon)
visualize(fixed_polygon)
print(fixed_polygon.intersects(null_meridian)) # False - this is correct now
```
Since Shapely is unaware of the spherical nature of this data, the **centroid** of the fixed polygon **is still incorrect**. The antimeridian package also includes a function to correct this.
```python Calculating the centroid of a cut polygon crossing the antimeridian
print("Wrongly computed centroid coordinates (Shapely)")
print(list(fixed_polygon.centroid.coords))
print("Correct centroid coordinates (Antimeridian taken into account)")
print(list(antimeridian.centroid(fixed_polygon).coords))
```
```plaintext Output
Wrongly computed centroid coordinates (shapely)
[(139.8766350146937, 74.3747116658462)]
Correct centroid coordinates (antimeridian taken into account)
[(178.7782777050171, 74.3747116658462)]
```
## Spherical Geometry
Another approach to handle the antimeridian issue is performing all coordinate-related calculations, such as polygon intersections, in a [spherical coordinate system](https://en.wikipedia.org/wiki/Spherical_coordinate_system).
One useful library for this is [spherical\_geometry](https://spherical-geometry.readthedocs.io/en/latest/). Here's an example.
```python Spherical Geometry
# pip install spherical-geometry
from spherical_geometry.polygon import SphericalPolygon
from spherical_geometry.vector import lonlat_to_vector
lon, lat = polygon.exterior.coords.xy
spherical_poly = SphericalPolygon.from_lonlat(lon, lat)
# Let's check the x, y, z coordinates of the spherical polygon:
print(list(spherical_poly.points))
```
```plaintext Output
[array([[-0.25894363, 0.00907234, 0.96584983],
[-0.2651968 , -0.00507317, 0.96418096],
[-0.28019363, 0.00213687, 0.95994112],
[-0.27390375, 0.01624885, 0.96161984],
[-0.25894363, 0.00907234, 0.96584983]])]
```
Now, you can compute intersections or check if a particular point is within the polygon. You can compare the incorrect calculation using `shapely` with the correct version when using `spherical_geometry`.
```python Correct calculations using spherical geometry
# A point on the null-meridian, way off from our polygon
null_meridian_point = 0, 74.4
# A point actually inside our polygon
point_inside = 178.8, 74.4
print("Shapely results:")
print("- Null meridian point inside:", polygon.contains(Point(*null_meridian_point)))
print("- Actual inside point inside:", polygon.contains(Point(*point_inside)))
print("Spherical geometry results:")
print("- Null meridian point inside:", spherical_poly.contains_lonlat(*null_meridian_point))
print("- Actual inside point inside:", spherical_poly.contains_lonlat(*point_inside))
```
```plaintext Output
Shapely results:
- Null meridian point inside: True
- Actual inside point inside: False
Spherical geometry results:
- Null meridian point inside: False
- Actual inside point inside: True
```
# Installation
Source: https://docs.tilebox.com/sdks/python/install
Install the Tilebox Python Packages
## Package Overview
Tilebox offers a Python SDK for accessing Tilebox services. The SDK includes separate packages that can be installed individually based on the services you wish to use, or all together for a comprehensive experience.
Access Tilebox datasets from Python
Workflow client and task runner for Tilebox
## Installation
Install the Tilebox python packages using your preferred package manager.
For new projects Tilebox recommend using [uv](https://docs.astral.sh/uv/).
```bash uv
uv add tilebox-datasets tilebox-workflows tilebox-storage
```
```bash pip
pip install tilebox-datasets tilebox-workflows tilebox-storage
```
```bash poetry
poetry add tilebox-datasets="*" tilebox-workflows="*" tilebox-storage="*"
```
```bash pipenv
pipenv install tilebox-datasets tilebox-workflows tilebox-storage
```
## Setting up a local JupyterLab environment
To get started quickly, you can also use an existing Jupyter-compatible cloud environment such as [Google Colab](https://colab.research.google.com/).
If you want to set up a local Jupyter environment to explore the SDK or to run the [Sample notebooks](/sdks/python/sample-notebooks) locally, install [JupyterLab](https://github.com/jupyterlab/jupyterlab) for a browser-based development environment. It's advised to install the Tilebox packages along with JupyterLab, [ipywidgets](https://ipywidgets.readthedocs.io/en/latest/user_install.html), and [tqdm](https://tqdm.github.io/) for an enhanced experience.
```uv uv
mkdir tilebox-exploration
cd tilebox-exploration
uv init --no-package
uv add tilebox-datasets tilebox-workflows tilebox-storage
uv add jupyterlab ipywidgets tqdm
uv run jupyter lab
```
### Trying it out
After installation, create a new notebook and paste the following code snippet to verify your installation. If you're new to Jupyter, you can refer to the guide on [interactive environments](/sdks/python/sample-notebooks#interactive-environments).
```python Python
from tilebox.datasets import Client
client = Client()
datasets = client.datasets()
collection = datasets.open_data.copernicus.landsat8_oli_tirs.collection("L1T")
data = collection.load(("2015-01-01", "2020-01-01"), show_progress=True)
data
```
If the installation is successful, the output should appear as follows.
# Sample notebooks
Source: https://docs.tilebox.com/sdks/python/sample-notebooks
Sample code maintained to use and learn from.
To quickly become familiar with the Python client, you can explore some sample notebooks. Each notebook can be executed standalone from top to bottom.
## Sample notebooks
You can access the sample notebooks on [ Google Drive](https://drive.google.com/drive/folders/1I7G35LLeB2SmKQJFsyhpSVNyZK9uOeJt).
Right click a notebook in Google Drive and select `Open with -> Google Colaboratory` to open it directly in the browser using [Google Colab](https://colab.research.google.com/).
More examples can be found throughout the docs.
### Notebook overview
{/*
This notebook demonstrates how to use the Python client to query metadata from the ERS-SAR Opendata dataset. It shows how to filter results by geographical location and download product data for a specific granule.
[ Open in
Colab](https://colab.research.google.com/drive/1LTYhLKy8m9psMhu0DvANs7hyS3ViVpas)
*/}
This notebook illustrates how to use the Python client to query the S5P Tropomi Opendata dataset for methane products. It also explains how to filter results based on geographical location and download product data for a specific granule.
[ Open in
Colab](https://colab.research.google.com/drive/1eVYARNFnTIeQqBs6gqeay01EDvRk2EI4)
This notebook demonstrates how to ingest data into a Custom Dataset. In this case it's using a sample dataset from the [MODIS instrument](https://lpdaac.usgs.gov/products/mcd12q1v006/) which
is already prepared.
[ Open in
Colab](https://colab.research.google.com/drive/1QS-srlWPMJg4csc0ycn36yCX9U6mvIpW)
Created with Tilebox Workflows, this 10m resolution mosaic highlights distributed, auto-parallelizing capabilities.
Data from `Copernicus Dataspace` was reprojected on `CloudFerro` (intermediate products on AWS S3), and the final composite was built locally using auto-parallelized team notebooks.
[ Open the Mosaic](https://examples.tilebox.com/sentinel2_mosaic)
[ Open in
Github](https://github.com/tilebox/examples/tree/main/s2-cloudfree-mosaic)
Execute cells one by one using `Shift+Enter`. Most commonly used libraries are pre-installed.
All demo notebooks require Python 3.10 or higher.
## Interactive environments
Jupyter, Google Colab, and JetBrains Datalore are interactive environments that simplify the development and sharing of algorithmic code. They allow users to work with notebooks, which combine code and rich text elements like figures, links, and equations. Notebooks require no setup and can be easily shared.
[Jupyter notebooks](https://jupyter.org/) are the original interactive environment for Python. They are useful but require local installation.
[Google Colab](https://colab.research.google.com/) is a free tool that provides a hosted interactive Python environment. It easily connects to local Jupyter instances and allows code sharing using Google credentials or within organizations using Google Workspace.
[JetBrains Datalore](https://datalore.jetbrains.com/) is a free platform for collaborative testing, development, and sharing of Python code and algorithms. It has built-in secret management for storing credentials. Datalore also features advanced JetBrains syntax highlighting and autocompletion. Currently, it only supports Python 3.8, which is not compatible with the Tilebox Python client.
Since Colab is a hosted free tool that meets all requirements, including Python ā„3.10, it's recommended for use.
## Installing packages
Within your interactive environment, you can install missing packages using pip in "magic" cells, which start with an exclamation mark.
```bash
# pip is already installed in your interactive environment
!pip3 install ....
```
All APIs or commands that require authentication can be accessed through client libraries that hide tokens, allowing notebooks to be shared without exposing personal credentials.
## Executing code
Execute code by clicking the play button in the top left corner of the cell or by pressing `Shift + Enter`. While the code is running, a spinning icon appears. When the execution finishes, the icon changes to a number, indicating the order of execution. The output displays below the code.
## Authorization
When sharing notebooks, avoid directly sharing your Tilebox API key. Instead, use one of two methods to authenticate the Tilebox Python client in interactive environments: through environment variables or interactively.
```python Using environment variables to store your API key
# Define an environment variable "TILEBOX_API_KEY" that contains your API key
import os
token = os.getenv("TILEBOX_API_KEY")
```
**Interactive** authorization is possible using the built-in `getpass` module. This prompts the user for the API key when running the code, storing it in memory without sharing it when the notebook is shared.
```python Interactively providing your API key
from getpass import getpass
token = getpass("API key:")
```
# Xarray
Source: https://docs.tilebox.com/sdks/python/xarray
Overview of the Xarray library, common use cases, and implementation details.
[example_satellite_data.nc]: https://github.com/tilebox/docs/raw/main/assets/data/example_satellite_data.nc
[Xarray](https://xarray.dev/) is a library designed for working with labeled multi-dimensional arrays. Built on top of [NumPy](https://numpy.org/) and [Pandas](https://pandas.pydata.org/), Xarray adds labels in the form of dimensions, coordinates, and attributes, enhancing the usability of raw NumPy-like arrays. This enables a more intuitive, concise, and less error-prone development experience. The library also includes a large and expanding collection of functions for advanced analytics and visualization.
An overview of the Xarray library and its suitability for N-dimensional data (such as Tilebox time series datasets) is available in the official [Why Xarray? documentation page](https://xarray.pydata.org/en/stable/why-xarray.html).
The Tilebox Python client provides access to satellite data as an [xarray.Dataset](https://docs.xarray.dev/en/stable/generated/xarray.Dataset.html#xarray.Dataset). This approach offers a great number of benefits over custom Tilebox-specific data structures:
Xarray is based on NumPy and Pandasātwo of the most widely used Python libraries for scientific computing. Familiarity with these libraries translates well to using Xarray.
Leveraging NumPy, which is built on C and Fortran, Xarray benefits from extensive performance optimizations. This allows Xarray to efficiently handle large datasets.
As a widely used library, Xarray easily integrates with many other libraries. Many third-party libraries are also available to expand Xarray's capabilities for diverse use cases.
Xarray is versatile and supports a broad range of applications. It's also easy to extend with custom features.
## Example dataset
To understand how Xarray functions, below is a quick a look at a sample dataset as it might be retrieved from a [Tilebox datasets](/datasets/concepts/datasets) client.
```python Python
from tilebox.datasets import Client
client = Client()
datasets = client.datasets()
collection = datasets.open_data.copernicus.landsat8_oli_tirs.collection("L1GT")
satellite_data = collection.load(("2022-05-01", "2022-06-01"), show_progress=True)
print(satellite_data)
```
```plaintext Output
Size: 305kB
Dimensions: (time: 514, latlon: 2)
Coordinates:
ingestion_time (time) datetime64[ns] 4kB 2024-07-22T09:06:43.5586...
id (time)
This basic dataset illustrates common use cases for Xarray. To follow along, you can [download the dataset as a NetCDF file][example_satellite_data.nc]. The [Reading and writing files section](/sdks/python/xarray#reading-and-writing-files) explains how to save and load Xarray datasets from NetCDF files.
Here's an overview of the output:
* The `satellite_data` **dataset** contains **dimensions**, **coordinates**, and **variables**.
* The `time` **dimension** has 514 elements, indicating that there are 514 data points in the dataset.
* The `time` **dimension coordinate** contains datetime values representing when the data was measured. The `*` indicates a dimension coordinate, which enables label-based indexing and alignment.
* The `ingestion_time` **non-dimension coordinate** holds datetime values for when the data was ingested into Tilebox. Non-dimension coordinates carry coordinate data but are not used for label-based indexing. They can even be [multidimensional](https://docs.xarray.dev/en/stable/examples/multidimensional-coords.html).
* The dataset includes 28 **variables**.
* The `bands` **variable** contains integers indicating how many bands the data contains.
* The `sun_elevation` **variable** contains floating-point values representing the sun's elevation when the data was measured.
Explore the [xarray terminology overview](https://docs.xarray.dev/en/stable/user-guide/terminology.html) to broaden your understanding of **datasets**, **dimensions**, **coordinates**, and **variables**.
## Accessing data in a dataset
### By index
You can access data in different ways. The Xarray documentation offers a [comprehensive overview](https://docs.xarray.dev/en/stable/user-guide/indexing.html) of these methods.
To access the `sun_elevation` variable:
```python Accessing values
# Print the first sun elevation value
print(satellite_data.sun_elevation[0])
```
```plaintext Output
Size: 8B
array(44.19904463)
Coordinates:
ingestion_time datetime64[ns] 8B 2024-07-22T09:06:43.558629
id Size: 665B
Dimensions: (latlon: 2)
Coordinates:
ingestion_time datetime64[ns] 8B 2024-07-22T09:06:43.558629
id Size: 2kB
Dimensions: (time: 3, latlon: 2)
Coordinates:
ingestion_time (time) datetime64[ns] 24B 2024-07-22T09:08:24.7395...
id (time) Size: 216B
array([63.89629314, 63.35038654, ..., 64.37400345, 64.37400345])
Coordinates:
ingestion_time (time) datetime64[ns] 216B 2024-07-22T09:06:43.558629 ......
id (time) 45) &
(satellite_data.sun_elevation < 90)
)
filtered_sun_elevations = satellite_data.sun_elevation[data_filter]
print(filtered_sun_elevations)
```
```plaintext Output
Size: 216B
array([63.89629314, 63.35038654, ..., 64.37400345, 64.37400345])
Coordinates:
ingestion_time (time) datetime64[ns] 216B 2024-07-22T09:06:43.558629 ......
id (time)
Selecting data by value requires unique coordinate values. In case of duplicates, you will encounter an `InvalidIndexError`. To avoid this, you can [drop duplicates](#dropping-duplicates).
```python Indexing by time
specific_datapoint = satellite_data.sel(time="2022-05-01T11:28:28.249000")
print(specific_datapoint)
```
```plaintext Output
Size: 665B
Dimensions: (latlon: 2)
Coordinates:
ingestion_time datetime64[ns] 8B 2024-07-22T09:06:43.558629
id >> raises KeyError: "2022-05-01T11:28:28.000000"
```
To return the closest value instead of raising an error, specify a `method`.
```python Finding the closest data point
nearest_datapoint = satellite_data.sel(time="2022-05-01T11:28:28.000000", method="nearest")
assert nearest_datapoint.equals(specific_datapoint) # passes
```
## Dropping duplicates
Xarray allows you to drop duplicate values from a dataset. For example, to drop duplicate timestamps:
```python Dropping duplicates
deduped = satellite_data.drop_duplicates("time")
```
De-duped datasets are required for certain operations, like [selecting data by value](#selecting-data-by-value).
## Statistics
Xarray and NumPy include a wide range of statistical functions that you can apply to a dataset or DataArray. Here are some examples:
```python Computing dataset statistics
cloud_cover = satellite_data.cloud_cover
min_meas = cloud_cover.min().item()
max_meas = cloud_cover.max().item()
mean_meas = cloud_cover.mean().item()
std_meas = cloud_cover.std().item()
print(f"Cloud cover ranges from {min_meas:.2f} to {max_meas:.2f} with a mean of {mean_meas:.2f} and a standard deviation of {std_meas:.2f}")
```
```plaintext Output
Cloud cover ranges from 0.00 to 100.00 with a mean of 76.48 and a standard deviation of 34.17
```
You can also directly apply many NumPy functions to datasets or DataArrays. For example, to find out how many unique bands the data contains, use [np.unique](https://numpy.org/doc/stable/reference/generated/numpy.unique.html):
```python Finding unique values
import numpy as np
print("Sensors:", np.unique(satellite_data.bands))
```
```plaintext Output
Sensors: [12]
```
## Reading and writing files
Xarray provides a simple method for saving and loading datasets from files. This is useful for sharing your data or storing it for future use. Xarray supports many different file formats, including NetCDF, Zarr, GRIB, and more. For a complete list of supported formats, refer to the [official documentation page](https://docs.xarray.dev/en/stable/user-guide/io.html).
To save the example dataset as a NetCDF file:
You may need to install the `netcdf4` package first.
```python Saving a dataset to a file
satellite_data.to_netcdf("example_satellite_data.nc")
```
This creates a file named `example_satellite_data.nc` in your current directory. You can then load this file back into memory with:
```python Loading a dataset from a file
import xarray as xr
satellite_data = xr.open_dataset("example_satellite_data.nc")
```
If you would like to follow along with the examples in this section, you can download the example dataset as a NetCDF file [here][example_satellite_data.nc].
## Further reading
This section covers only a few common use cases for Xarray. The library offers many more functions and features. For more information, please see the [Xarray documentation](https://xarray.pydata.org/en/stable/) or explore the [Xarray Tutorials](https://tutorial.xarray.dev/intro.html).
Some useful capabilities not covered in this section include:
# Storage Clients
Source: https://docs.tilebox.com/storage/clients
Learn about the different storage clients available in Tilebox to access open data.
Tilebox does not host the actual open data satellite products but instead relies on publicly accessible storage providers for data access.
Tilebox ingests available metadata as [datasets](/datasets/concepts/datasets) to enable high performance querying and structured access of the data as [xarray.Dataset](/sdks/python/xarray).
Below is a list of the storage providers currently supported by Tilebox.
This feature is only available in the Python SDK.
## Copernicus Data Space
The [Copernicus Data Space](https://dataspace.copernicus.eu/) is an open ecosystem that provides free instant access to data and services from the Copernicus Sentinel missions. Check out the [ASF Open Data datasets](/datasets/open-data#copernicus-data-space) that are available in Tilebox.
### Access Copernicus data
To download data products from the Copernicus Data Space after querying them via the Tilebox API, you need to [create an account](https://identity.dataspace.copernicus.eu/auth/realms/CDSE/protocol/openid-connect/auth?client_id=cdse-public\&response_type=code\&scope=openid\&redirect_uri=https%3A//dataspace.copernicus.eu/account/confirmed/1) and then generate [S3 credentials here](https://eodata-s3keysmanager.dataspace.copernicus.eu/panel/s3-credentials).
The following code snippet demonstrates how to query and download Copernicus data using the Tilebox Python SDK.
```python Python {4,9-13,27}
from pathlib import Path
from tilebox.datasets import Client
from tilebox.storage import CopernicusStorageClient
# Creating clients
client = Client()
datasets = client.datasets()
storage_client = CopernicusStorageClient(
access_key="YOUR_ACCESS_KEY",
secret_access_key="YOUR_SECRET_ACCESS_KEY",
cache_directory=Path("./data")
)
# Choosing the dataset and collection
s2_dataset = datasets.open_data.copernicus.sentinel2_msi
collections = s2_dataset.collections()
collection = collections["S2A_S2MSI2A"]
# Loading metadata
s2_data = collection.load(("2024-08-01", "2024-08-02"), show_progress=True)
# Selecting a data point to download
selected = s2_data.isel(time=0) # index 0 selected
# Downloading the data
downloaded_data = storage_client.download(selected)
print(f"Downloaded granule: {downloaded_data.name} to {downloaded_data}")
print("Contents: ")
for content in downloaded_data.iterdir():
print(f" - {content.relative_to(downloaded_data)}")
```
```plaintext Output
Downloaded granule: S2A_MSIL2A_20240801T002611_N0511_R102_T58WET_20240819T170544.SAFE to data/Sentinel-2/MSI/L2A/2024/08/01/S2A_MSIL2A_20240801T002611_N0511_R102_T58WET_20240819T170544.SAFE
Contents:
- manifest.safe
- GRANULE
- INSPIRE.xml
- MTD_MSIL2A.xml
- DATASTRIP
- HTML
- rep_info
- S2A_MSIL2A_20240801T002611_N0511_R102_T58WET_20240819T170544-ql.jpg
```
### Partial product downloads
For cases where only a subset of the available file objects for a product is needed, you may restrict your download to just that subset. First, list available objects using `list_objects`, filter them, and then download using `download_objects`.
For example, a Sentinel-2 L2A product includes many files such as metadata, different bands in multiple resolutions, masks, and quicklook images. The following example shows how to download only specific files from a Sentinel-2 L2A product.
```python Python {4, 15}
collection = datasets.open_data.copernicus.sentinel2_msi.collections()["S2A_S2MSI2A"]
s2_data = collection.load(("2024-08-01", "2024-08-02"), show_progress=True)
selected = s2_data.isel(time=0) # download the first granule in the given time range
objects = storage_client.list_objects(selected)
print(f"Granule {selected.granule_name.item()} consists of {len(objects)} individual objects.")
# only select specific objects to download
want_products = ["B02_10m", "B03_10m", "B08_10m"]
objects = [obj for obj in objects if any(prod in obj for prod in want_products)] # remove all other objects
print(f"Downloading {len(objects)} objects.")
for obj in objects:
print(f" - {obj}")
# Finally, download the selected data
downloaded_data = storage_client.download_objects(selected, objects)
```
```plaintext Output
Granule S2A_MSIL2A_20240801T002611_N0511_R102_T58WET_20240819T170544.SAFE consists of 95 individual objects.
Downloading 3 objects.
- GRANULE/L2A_T58WET_A047575_20240801T002608/IMG_DATA/R10m/T58WET_20240801T002611_B02_10m.jp2
- GRANULE/L2A_T58WET_A047575_20240801T002608/IMG_DATA/R10m/T58WET_20240801T002611_B03_10m.jp2
- GRANULE/L2A_T58WET_A047575_20240801T002608/IMG_DATA/R10m/T58WET_20240801T002611_B08_10m.jp2
```
## Alaska Satellite Facility (ASF)
The [Alaska Satellite Facility (ASF)](https://asf.alaska.edu/) is a NASA-funded research center at the University of Alaska Fairbanks. Check out the [ASF Open Data datasets](/datasets/open-data#alaska-satellite-facility-asf) that are available in Tilebox.
### Accessing ASF Data
You can query ASF metadata without needing an account, as Tilebox has indexed and ingested the relevant metadata. To access and download the actual satellite products, you will need an ASF account.
You can create an ASF account in the [ASF Vertex Search Tool](https://search.asf.alaska.edu/).
The following code snippet demonstrates how to query and download ASF data using the Tilebox Python SDK.
```python Python {4,9-13,27}
from pathlib import Path
from tilebox.datasets import Client
from tilebox.storage import ASFStorageClient
# Creating clients
client = Client()
datasets = client.datasets()
storage_client = ASFStorageClient(
user="YOUR_ASF_USER",
password="YOUR_ASF_PASSWORD",
cache_directory=Path("./data")
)
# Choosing the dataset and collection
ers_dataset = datasets.open_data.asf.ers_sar
collections = ers_dataset.collections()
collection = collections["ERS-2"]
# Loading metadata
ers_data = collection.load(("2009-01-01", "2009-01-02"), show_progress=True)
# Selecting a data point to download
selected = ers_data.isel(time=0) # index 0 selected
# Downloading the data
downloaded_data = storage_client.download(selected, extract=True)
print(f"Downloaded granule: {downloaded_data.name} to {downloaded_data}")
print("Contents: ")
for content in downloaded_data.iterdir():
print(f" - {content.relative_to(downloaded_data)}")
```
```plaintext Output
Downloaded granule: E2_71629_STD_L0_F183 to data/ASF/E2_71629_STD_F183/E2_71629_STD_L0_F183
Contents:
- E2_71629_STD_L0_F183.000.vol
- E2_71629_STD_L0_F183.000.meta
- E2_71629_STD_L0_F183.000.raw
- E2_71629_STD_L0_F183.000.pi
- E2_71629_STD_L0_F183.000.nul
- E2_71629_STD_L0_F183.000.ldr
```
### Further Reading
## Umbra Space
[Umbra](https://umbra.space/) satellites provide high resolution Synthetic Aperture Radar (SAR) imagery from space. Check out the [Umbra datasets](/datasets/open-data#umbra-space) that are available in Tilebox.
### Accessing Umbra data
No account is needed to access Umbra data. All data is under a Creative Commons License (CC BY 4.0), allowing you to use it freely.
The following code snippet demonstrates how to query and download Umbra data using the Tilebox Python SDK.
```python Python {4,9,23}
from pathlib import Path
from tilebox.datasets import Client
from tilebox.storage import UmbraStorageClient
# Creating clients
client = Client()
datasets = client.datasets()
storage_client = UmbraStorageClient(cache_directory=Path("./data"))
# Choosing the dataset and collection
umbra_dataset = datasets.open_data.umbra.sar
collections = umbra_dataset.collections()
collection = collections["SAR"]
# Loading metadata
umbra_data = collection.load(("2024-01-05", "2024-01-06"), show_progress=True)
# Selecting a data point to download
selected = umbra_data.isel(time=0) # index 0 selected
# Downloading the data
downloaded_data = storage_client.download(selected)
print(f"Downloaded granule: {downloaded_data.name} to {downloaded_data}")
print("Contents: ")
for content in downloaded_data.iterdir():
print(f" - {content.relative_to(downloaded_data)}")
```
```plaintext Output
Downloaded granule: 2024-01-05-01-53-37_UMBRA-07 to data/Umbra/ad hoc/Yi_Sun_sin_Bridge_SK/6cf02931-ca2e-4744-b389-4844ddc701cd/2024-01-05-01-53-37_UMBRA-07
Contents:
- 2024-01-05-01-53-37_UMBRA-07_SIDD.nitf
- 2024-01-05-01-53-37_UMBRA-07_SICD.nitf
- 2024-01-05-01-53-37_UMBRA-07_CSI-SIDD.nitf
- 2024-01-05-01-53-37_UMBRA-07_METADATA.json
- 2024-01-05-01-53-37_UMBRA-07_GEC.tif
- 2024-01-05-01-53-37_UMBRA-07_CSI.tif
```
### Partial product downloads
For cases where only a subset of the available file objects for a given Umbra data point is necessary, you can limit your download to just that subset. First, list available objects using `list_objects`, filter the list, and then use `download_objects`.
The below example shows how to download only the metadata file for a given data point.
```python Python {4, 15}
collection = datasets.open_data.umbra.sar.collections()["SAR"]
umbra_data = collection.load(("2024-01-05", "2024-01-06"), show_progress=True)
# Selecting a data point to download
selected = umbra_data.isel(time=0) # index 0 selected
objects = storage_client.list_objects(selected)
print(f"Data point {selected.granule_name.item()} consists of {len(objects)} individual objects.")
# only select specific objects to download
objects = [obj for obj in objects if "METADATA" in obj] # remove all other objects
print(f"Downloading {len(objects)} object.")
print(objects)
# Finally, download the selected data
downloaded_data = storage_client.download_objects(selected, objects)
```
```plaintext Output
Data point 2024-01-05-01-53-37_UMBRA-07 consists of 6 individual objects.
Downloading 1 object.
['2024-01-05-01-53-37_UMBRA-07_METADATA.json']
```
# Caches
Source: https://docs.tilebox.com/workflows/caches
Sharing data between tasks is crucial for workflows, especially in satellite imagery processing, where large datasets are the norm. Tilebox Workflows offers a straightforward API for storing and retrieving data from a shared cache.
The cache API is currently experimental and may undergo changes in the future. Many more features and new [backends](#cache-backends) are on the roadmap. There might be breaking changes to the Cache API in the future.
Caches are configured at the [task runner](/workflows/concepts/task-runners) level. Because task runners can be deployed across multiple locations, caches must be accessible from all task runners contributing to a workflow.
Currently, the default cache implementation uses a Google Cloud Storage bucket, providing a scalable method to share data between tasks. For quick prototyping and local development, you can also use a local file system cache, which is included by default.
If needed, you can create your own cache backend by implementing the `Cache` interface.
## Configuring a Cache
You can configure a cache while creating a task runner by passing a cache instance to the `cache` parameter. To use an in-memory cache, use `tilebox.workflows.cache.InMemoryCache`. This implementation is helpful for local development and quick testing. For alternatives, see the supported [cache backends](#cache-backends).
```python Python
from tilebox.workflows import Client
from tilebox.workflows.cache import InMemoryCache
client = Client()
runner = client.runner(
tasks=[...],
cache=InMemoryCache(),
)
```
By configuring such a cache, the `context` object that is passed to the execution of each task gains access to a `job_cache`
attribute that can be used to [store and retrieve data](#storing-and-retrieving-data) from the cache.
## Cache Backends
Tilebox Workflows comes with four cache implementations out of the box, each backed by a different storage system.
### Google Storage Cache
A cache implementation backed by a Google cloud Storage bucket to store and retrieve data. It's a reliable method for sharing data across tasks, suitable for production environments. You will need access to a GCP project and a bucket.
The Tilebox Workflow orchestrator uses the official Python Client for Google Cloud Storage. To set up authentication, refer to the official documentation.
```python Python
from tilebox.workflows import Client
from tilebox.workflows.cache import GoogleStorageCache
from google.cloud.storage import Client as StorageClient
storage_client = StorageClient(project="gcp-project")
bucket = storage_client.bucket("cache-bucket")
client = Client()
runner = client.runner(
tasks=[...],
cache=GoogleStorageCache(bucket, prefix="jobs"),
)
```
The `prefix` parameter is optional and can be used to set a common prefix for all cache keys, which helps organize objects within a bucket when re-using the same bucket for other purposes.
### Amazon S3 Cache
A cache implementation backed by an Amazon S3 bucket to store and retrieve data. Like the Google Cloud Storage option, it's reliable and scalable for production use.
Tilebox utilizes the `boto3` library to communicate with Amazon S3. For the necessary authentication setup, refer to [its documentation](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html#configuration).
```python Python
from tilebox.workflows import Client
from tilebox.workflows.cache import AmazonS3Cache
client = Client()
runner = client.runner(
tasks=[...],
cache=AmazonS3Cache("my-bucket-name", prefix="jobs")
)
```
The `prefix` parameter is optional and can be used to set a common prefix for all cache keys, which helps organize objects within a bucket when re-using the same bucket for other purposes.
### Local File System Cache
A cache implementation backed by a local file system. It's suitable for quick prototyping and local development, assuming all task runners share the same machine or access the same file system.
```python Python
from tilebox.workflows import Client
from tilebox.workflows.cache import LocalFileSystemCache
client = Client()
runner = client.runner(
tasks=[...],
cache=LocalFileSystemCache("/path/to/cache/directory"),
)
```
Local file system caches can also be used in conjunction with network attached storage or similar tools, making it a viable option also for distributed setups.
### In-Memory Cache
A simple in-memory cache useful for quick prototyping and development. The data is not shared between task runners and is lost upon task runner restarts. Use this cache only for workflows executed on a single task runner.
```python Python
from tilebox.workflows import Client
from tilebox.workflows.cache import InMemoryCache
client = Client()
runner = client.runner(
tasks=[...],
cache=InMemoryCache(),
)
```
## Data Isolation
Caches are isolated per job, meaning that each job's cache data is only accessible to tasks within that job. This setup prevents key conflicts between different job executions. Currently, accessing cache data of other jobs is not supported.
The capability to share data across jobs is planned for future updates. This feature will be beneficial for real-time processing workflows or workflows requiring auxiliary data from external sources.
## Storing and Retrieving Data
The job cache can be accessed via the `ExecutionContext` passed to a tasks `execute` function. This [`job_cache`](/api-reference/python/tilebox.workflows/ExecutionContext.job_cache) object provides methods to handle data storage and retrieval from the cache. The specifics of data storage depend on the chosen cache backend.
The cache API is designed to be simple and can handle all types of data, supporting binary data in the form of `bytes`, identified by `str` cache keys. This allows for storing many different data types, such as pickled Python objects, serialized JSON, UTF-8, or binary data.
The following snippet illustrates storing and retrieving data from the cache.
```python Python
from tilebox.workflows import Task, ExecutionContext
class ProducerTask(Task):
def execute(self, context: ExecutionContext) -> None:
# store data in the cache
context.job_cache["data"] = b"my_binary_data_to_store"
context.submit_subtask(ConsumerTask())
class ConsumerTask(Task):
def execute(self, context: ExecutionContext) -> None:
data = context.job_cache["data"]
print(f"Read {data} from cache")
```
In this example, data stored under the key `"data"` can be any size that fits the cache backend constraints. Ensure the key remains unique within the job's scope to avoid conflicts.
To test the workflow, you can start a local task runner using the `InMemoryCache` backend. Then, submit a job to execute the `ProducerTask` and observe the output of the `ConsumerTask`.
```python Python
# submit a job to test our workflow
job_client = client.jobs()
job_client.submit("testing-cache-access", ProducerTask())
# start a runner to execute it
runner = client.runner(
tasks=[ProducerTask, ConsumerTask],
cache=LocalFileSystemCache("/path/to/cache/directory"),
)
runner.run_forever()
```
```plaintext Output
Read b'my_binary_data_to_store' from cache
```
## Groups and Hierarchical Keys
The cache API supports groups and hierarchical keys, analogous to directories and files in a file system. Groups help organize cache keys hierarchically, preventing key conflicts and allowing data to be structured better. Additionally, groups are iterable, enabling retrieval of all keys within the group. This feature is useful when multiple tasks create cache data, and a later task needs to list all produced data by earlier tasks.
The following code shows an example of how cache groups can be used.
```python Python {39,44-45}
from tilebox.workflows import Task, ExecutionContext
import random
class CacheGroupDemo(Task):
n: int
def execute(self, context: ExecutionContext):
# define a cache group key for subtasks
group_key = "random_numbers"
produce_numbers = context.submit_subtask(
ProduceRandomNumbers(self.n, group_key)
)
sum_task = context.submit_subtask(
PrintSum(group_key),
depends_on=[produce_numbers],
)
class ProduceRandomNumbers(Task):
n: int
group_key: str
def execute(self, context: ExecutionContext):
for i in range(self.n):
context.submit_subtask(ProduceRandomNumber(i, self.group_key))
class ProduceRandomNumber(Task):
index: int
group_key: str
def execute(self, context: ExecutionContext) -> None:
number = random.randint(0, 100)
group = context.job_cache.group(self.group_key)
group[f"key_{self.index}"] = str(number).encode()
class PrintSum(Task):
group_key: str
def execute(self, context: ExecutionContext) -> None:
group = context.job_cache.group(self.group_key)
# PrintSum doesn't know how many numbers were produced,
# instead it iterates over all keys in the cache group
numbers = []
for key in group: # iterate over all stored numbers
number = group[key] # read data from cache
numbers.append(int(number.decode())) # convert bytes back to int
print(f"Sum of all numbers: {sum(numbers)}")
```
Submitting a job of the `CacheGroupDemo` and running it with a task runner can be done as follows:
```python Python
# submit a job to test our workflow
job_client = client.jobs()
job_client.submit("cache-groups", CacheGroupDemo(5))
# start a runner to execute it
runner = client.runner(
tasks=[CacheGroupDemo, ProduceRandomNumbers, ProduceRandomNumber, PrintSum],
cache=LocalFileSystemCache("/path/to/cache/directory"),
)
runner.run_forever()
```
```plaintext Output
Sum of all numbers: 284
```
# Clusters
Source: https://docs.tilebox.com/workflows/concepts/clusters
Clusters are a logical grouping for [task runners](/workflows/concepts/task-runners).
Using clusters, you can scope certain tasks to a specific group of task runners.
Tasks, which are always submitted to a specific cluster, are only executed on task runners assigned to the same cluster.
## Use Cases
Use clusters to organize [task runners](/workflows/concepts/clusters) into logical groups, which can help with:
* Targeting specific task runners for a particular job
* Reserving a group of task runners for specific purposes, such as running certain types of batch jobs
* Setting up different clusters for different environments (like development and production)
Even when using different clusters, task runners within the same cluster may still have different capabilities, such as different registered tasks.
If multiple task runners have the same set of registered tasks, you can assign them to different clusters to target specific task runners for a particular job.
### Adding Task Runners to a Cluster
You can add task runners to a cluster by specifying the [cluster's slug](#cluster-slug) when [registering a task runner](/workflows/concepts/task-runners).
Each task runner must always be assigned to a cluster.
## Default Cluster
Each team has a default cluster that is automatically created for them.
This cluster is used when no cluster is specified when [registering a task runner](/workflows/concepts/task-runners) or [submitting a job](/workflows/concepts/jobs).
This is useful when you are just getting started and don't need to create any custom clusters yet.
## Managing Clusters
Before registering a task runner or submitting a job, you must create a cluster. You can also list, fetch, and delete clusters as needed. The following sections explain how to do this.
To manage clusters, first instantiate a cluster client using the `clusters` method in the workflows client.
```python Python
from tilebox.workflows import Client
client = Client()
clusters = client.clusters()
```
```go Go
import "github.com/tilebox/tilebox-go/workflows/v1"
client := workflows.NewClient()
clusterClient := client.Clusters
```
### Creating a Cluster
To create a cluster, use the `create` method on the cluster client and provide a name for the cluster.
```python Python
cluster = clusters.create("testing")
print(cluster)
```
```go Go
cluster := client.Clusters.Create("testing")
fmt.Println(cluster)
```
```plaintext Python
Cluster(slug='testing-CvufcSxcC9SKfe', display_name='testing')
```
```go Go
&{testing-CvufcSxcC9SKfe testing}
```
### Cluster Slug
Each cluster has a unique identifier, combining the cluster's name and an automatically generated identifier.
Use this slug to reference the cluster for other operations, like submitting a job or subtasks.
### Listing Clusters
To list all available clusters, use the `all` method:
```python Python
all_clusters = clusters.all()
print(all_clusters)
```
```go Go
clusters, err := client.Clusters.List(ctx)
if err != nil {
slog.Error("failed to list clusters", slog.Any("error", err))
return
}
for _, cluster := range clusters {
fmt.Println(cluster)
}
```
```plaintext Python
[Cluster(slug='testing-CvufcSxcC9SKfe', display_name='testing'),
Cluster(slug='production-EifhUozDpwAJDL', display_name='Production')]
```
```go Go
&{testing-CvufcSxcC9SKfe testing}
&{production-EifhUozDpwAJDL Production}
```
### Fetching a Specific Cluster
To fetch a specific cluster, use the `find` method and pass the cluster's slug:
```python Python
cluster = clusters.find("testing-CvufcSxcC9SKfe")
print(cluster)
```
```go Go
cluster, err := client.Clusters.Get(ctx, "testing-CvufcSxcC9SKfe")
if err != nil {
slog.Error("failed to get cluster", slog.Any("error", err))
return
}
fmt.Println(cluster)
```
```plaintext Python
Cluster(slug='testing-CvufcSxcC9SKfe', display_name='testing')
```
```go Go
&{testing-CvufcSxcC9SKfe testing}
```
### Deleting a Cluster
To delete a cluster, use the `delete` method and pass the cluster's slug:
```python Python
clusters.delete("testing-CvufcSxcC9SKfe")
```
```go Go
err := client.Clusters.Delete(ctx, "testing-CvufcSxcC9SKfe")
```
## Jobs Across Different Clusters
When [submitting a job](/workflows/concepts/jobs), you need to specify which cluster the job's root task should be executed on.
This allows you to direct the job to a specific set of task runners.
By default, all sub-tasks within a job are also submitted to the same cluster, but this can be overridden to submit sub-tasks to different clusters if needed.
See the example below for a job that spans across multiple clusters.
```python Python
from tilebox.workflows import Task, ExecutionContext, Client
class MultiCluster(Task):
def execute(self, context: ExecutionContext) -> None:
# this submits a task to the same cluster as the one currently executing this task
same_cluster = context.submit_subtask(DummyTask())
other_cluster = context.submit_subtask(
DummyTask(),
# this task runs only on a task runner in the "other-cluster" cluster
cluster="other-cluster-As3dcSb3D9SAdK",
# dependencies can be specified across clusters
depends_on=[same_cluster],
)
class DummyTask(Task):
def execute(self, context: ExecutionContext) -> None:
pass
# submit a job to the "testing" cluster
client = Client()
job_client = client.jobs()
job = job_client.submit(
"my-job",
MultiCluster(),
cluster="testing-CvufcSxcC9SKfe",
)
```
```go Go
package main
import (
"context"
"github.com/tilebox/tilebox-go/workflows/v1"
"github.com/tilebox/tilebox-go/workflows/v1/subtask"
)
type MultiCluster struct{}
func (t *MultiCluster) Execute(ctx context.Context) error {
// this submits a task to the same cluster as the one currently executing this task
sameCluster, err := workflows.SubmitSubtask(ctx, &DummyTask{})
if err != nil {
return err
}
otherCluster, err := workflows.SubmitSubtask(
ctx,
&DummyTask{},
// this task runs only on a task runner in the "other-cluster" cluster
subtask.WithClusterSlug("other-cluster-As3dcSb3D9SAdK"),
// dependencies can be specified across clusters
subtask.WithDependencies(sameCluster),
)
if err != nil {
return err
}
_ = otherCluster
return nil
}
type DummyTask struct{}
func main() {
ctx := context.Background()
client := workflows.NewClient()
// submit a job to the "testing" cluster
_, _ = client.Jobs.Submit(
ctx,
"my-job",
[]workflows.Task{
&MultiCluster{},
},
job.WithClusterSlug("testing-CvufcSxcC9SKfe"),
)
}
```
This workflow requires at least two task runners to complete. One must be in the "testing" cluster, and the other must be in the "other-cluster" cluster.
If no task runners are available in the "other-cluster," the task submitted to that cluster will remain queued until a task runner is available.
It won't execute on a task runner in the "testing" cluster, even if the task runner has the `DummyTask` registered.
# Jobs
Source: https://docs.tilebox.com/workflows/concepts/jobs
A job is a specific execution of a workflow with designated input parameters. It consists of one or more tasks that can run in parallel or sequentially, based on their dependencies. Submitting a job involves creating a root task with specific input parameters, which may trigger the execution of other tasks within the same job.
## Submission
To execute a [task](/workflows/concepts/tasks), it must be initialized with concrete inputs and submitted as a job. The task will then run within the context of the job, and if it generates sub-tasks, those will also execute as part of the same job.
After submitting a job, the root task is scheduled for execution, and any [eligible task runner](/workflows/concepts/task-runners#task-selection) can pick it up and execute it.
First, instantiate a job client by calling the `jobs` method on the workflow client.
```python Python
from tilebox.workflows import Client
client = Client()
job_client = client.jobs()
```
```go Go
import "github.com/tilebox/tilebox-go/workflows/v1"
client := workflows.NewClient()
jobClient := client.Jobs
```
After obtaining a job client, submit a job using the [submit](/api-reference/python/tilebox.workflows/JobClient.submit) method. You need to provide a name for the job, an instance of the root [task](/workflows/concepts/tasks), and an optional [cluster](/workflows/concepts/clusters) to execute the root task on.
```python Python
# import your own workflow
from my_workflow import MyTask
job = job_client.submit('my-job', MyTask("some", "parameters"))
```
```go Go
job, err := client.Jobs.Submit(ctx, "my-job",
[]workflows.Task{
&MyTask{
Some: "parameters",
},
},
)
if err != nil {
slog.Error("Failed to submit job", slog.Any("error", err))
return
}
```
Once a job is submitted, it's immediately scheduled for execution. The root task will be picked up and executed as soon as an [eligible task runner](/workflows/concepts/task-runners#task-selection) is available.
## Retry Handling
[Tasks support retry handling](/workflows/concepts/tasks#retry-handling) for failed executions. This applies to the root task of a job as well, where you can specify the number of retries using the `max_retries` argument of the `submit` method.
```python Python
from my_workflow import MyFlakyTask
job = job_client.submit('my-job', MyFlakyTask(), max_retries=5)
```
```go Go
myJob, err := client.Jobs.Submit(ctx, "my-job",
[]workflows.Task{
&MyFlakyTask{},
},
job.WithMaxRetries(5),
)
```
In this example, if `MyFlakyTask` fails, it will be retried up to five times before being marked as failed.
## Submitting to a specific cluster
Jobs default to running on the [default cluster](/workflows/concepts/clusters#default-cluster).
You can specify another cluster to run the root task on using the `cluster` argument of the `submit` method.
```python Python
from my_workflow import MyFlakyTask
job = job_client.submit('my-job', MyFlakyTask(), cluster="dev-cluster")
```
```go Go
myJob, err := client.Jobs.Submit(ctx, "my-job",
[]workflows.Task{
&MyFlakyTask{},
},
job.WithClusterSlug("dev-cluster"),
)
```
Only runners listening on the specified cluster can pick up the task.
## Querying jobs
You can query jobs in a given time range using the `query` method on the job client.
```python Python
jobs = job_client.query(("2025-01-01", "2025-02-01"))
print(jobs)
```
```go Go
import (
"time"
workflows "github.com/tilebox/tilebox-go/workflows/v1"
"github.com/tilebox/tilebox-go/workflows/v1/job"
"github.com/tilebox/tilebox-go/query"
)
interval := query.NewTimeInterval(
time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC),
time.Date(2025, 2, 1, 0, 0, 0, 0, time.UTC),
)
jobs, err := workflows.Collect(client.Jobs.Query(ctx,
job.WithTemporalExtent(interval),
))
if err != nil {
slog.Error("Failed to query jobs", slog.Any("error", err))
return
}
for _, job := range jobs {
fmt.Println(job)
}
```
## Retrieving a specific job
When you submit a job, it's assigned a unique identifier that can be used to retrieve it later.
You can use the `find` method on the job client to get a job by its ID.
```python Python
job = job_client.submit('my-job', MyTask("some", "parameters"))
print(job.id) # 018dd029-58ca-74e5-8b58-b4f99d610f9a
# Later, in another process or machine, retrieve job info
job = job_client.find("018dd029-58ca-74e5-8b58-b4f99d610f9a")
```
```go Go
myJob, err := client.Jobs.Submit(ctx, "my-job",
[]workflows.Task{
&helloworld.HelloTask{
Some: "parameters",
},
},
)
if err != nil {
slog.Error("Failed to submit job", slog.Any("error", err))
return
}
// 018dd029-58ca-74e5-8b58-b4f99d610f9a
slog.Info("Job submitted", slog.String("job_id", myJob.ID.String()))
// Later, in another process or machine, retrieve job info
job, err := client.Jobs.Get(ctx, uuid.MustParse("018dd029-58ca-74e5-8b58-b4f99d610f9a"))
```
`find` is also a useful tool for fetching a jobs state after a while, to check if it's still running or has already completed.
## States
A job can be in one of the following states:
* `QUEUED`: the job is queued and waiting for execution
* `STARTED`: at least one task of the job has been started
* `COMPLETED`: all tasks of the job have been completed
```python Python
from tilebox.workflows.data import JobState
job = job_client.find("018dd029-58ca-74e5-8b58-b4f99d610f9a")
print("Job is queued:", job.state == JobState.QUEUED)
```
```go Go
job, err := client.Jobs.Get(ctx, uuid.MustParse("018dd029-58ca-74e5-8b58-b4f99d610f9a"))
fmt.Println("Job is queued:", job.State == workflows.JobQueued)
```
```plaintext Output
Job is queued: True
```
## Visualization
Visualizing the execution of a job can be helpful. The Tilebox workflow orchestrator tracks all tasks in a job, including [sub-tasks](/workflows/concepts/tasks#task-composition-and-subtasks) and [dependencies](/workflows/concepts/tasks#dependencies). This enables the visualization of the execution of a job as a graph diagram.
`display` is designed for use in an [interactive environment](/sdks/python/sample-notebooks#interactive-environments) such as a Jupyter notebook. In non-interactive environments, use [visualize](/api-reference/python/tilebox.workflows/JobClient.visualize), which returns the rendered diagram as an SVG string.
Visualization isn't supported in Go yet.
```python Python
job = job_client.find("some-job-id") # or a recently submitted job
# Then visualize it
job_client.display(job)
```
The following diagram represents the job execution as a graph. Each task is shown as a node, with edges indicating sub-task relationships. The diagram also uses color coding to display the status of each task.
The color codes for task states are:
| Task State | Color | Description |
| ---------- | -------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------- |
| Queued | SalmonYellow | The task is queued and waiting for execution. |
| Running | Blue | The task is currently being executed. |
| Computed | Green | The task has successfully been computed. If a task is computed, and all it's sub-tasks are also computed, the task is considered completed. |
| Failed | Red | The task has been executed but encountered an error. |
Below is another visualization of a job currently being executed by multiple task runners.
This visualization shows:
* The root task, `MyTask`, has executed and spawned three sub-tasks.
* At least three task runners are available, as three tasks currently are executed simultaneously.
* The `SubTask` that is still executing has not generated any sub-tasks yet, as sub-tasks are queued for execution only after the parent task finishes and becomes computed.
* The queued `DependentTask` requires the `LeafTask` to complete before it can be executed.
Job visualizations are meant for development and debugging. They are not suitable for large jobs with hundreds of tasks, as the diagrams may become too complex. Currently, visualizations are limited to jobs with a maximum of 200 tasks.
### Customizing Task Display Names
The text representing a task in the diagram defaults to a tasks class name. You can customize this by modifying the `display` field of the `current_task` object in the task's execution context. The maximum length for a display name is 1024 characters, with any overflow truncated. Line breaks using `\n` are supported as well.
```python Python
from tilebox.workflows import Task, ExecutionContext
class RootTask(Task):
num_subtasks: int
def execute(self, context: ExecutionContext):
context.current_task.display = f"Root({self.num_subtasks})"
for i in range(self.num_subtasks):
context.submit_subtask(SubTask(i))
class SubTask(Task):
index: int
def execute(self, context: ExecutionContext):
context.current_task.display = f"Leaf Nr. {self.index}"
job = job_client.submit('custom-display-names', RootTask(3))
job_client.display(job)
```
```go Go
type RootTask struct {
NumSubtasks int
}
func (t *RootTask) Execute(ctx context.Context) error {
err := workflows.SetTaskDisplay(ctx, fmt.Sprintf("Root(%d)", t.NumSubtasks))
if err != nil {
return fmt.Errorf("failed to set task display: %w", err)
}
for i := range t.NumSubtasks {
_, err := workflows.SubmitSubtask(ctx, &SubTask{Index: i})
if err != nil {
return fmt.Errorf("failed to submit subtask: %w", err)
}
}
return nil
}
type SubTask struct {
Index int
}
func (t *SubTask) Execute(ctx context.Context) error {
err := workflows.SetTaskDisplay(ctx, fmt.Sprintf("Leaf Nr. %d", t.Index))
if err != nil {
return fmt.Errorf("failed to set task display: %w", err)
}
return nil
}
// in main
job, err := client.Jobs.Submit(ctx, "custom-display-names",
[]workflows.Task{&RootTask{
NumSubtasks: 3,
}},
)
```
## Cancellation
You can cancel a job at any time. When a job is canceled, no queued tasks will be picked up by task runners and executed even if task runners are idle. Tasks that are already being executed will finish their execution and not be interrupted. All sub-tasks spawned from such tasks after the cancellation will not be picked up by task runners.
Use the `cancel` method on the job client to cancel a job.
```python Python
job = job_client.submit('my-job', MyTask())
# After a short while, the job gets canceled
job_client.cancel(job)
```
```go Go
job, err := client.Jobs.Submit(ctx, "my-job",
[]workflows.Task{&MyTask{}},
)
if err != nil {
slog.Error("Failed to submit job", slog.Any("error", err))
return
}
// After a short while, the job gets canceled
err = client.Jobs.Cancel(ctx, job.ID)
```
A canceled job can be resumed at any time by [retrying](#retries) it.
If any task in a job fails, the job is automatically canceled to avoid executing irrelevant tasks. Future releases will allow configuring this behavior for each task to meet specific requirements.
## Retries
If a task fails due to a bug or lack of resources, there is no need to resubmit the entire job. You can simply retry the job, and it will resume from the point of failure. This ensures that all the work that was already done up until the point of the failure isn't lost.
Future releases may introduce automatic retries for certain failure conditions, which can be useful for handling temporary issues.
Below is an example of a failing job due to a bug in the task's implementation. The following workflow processes a list of movie titles and queries the [OMDb API](http://www.omdbapi.com/) for each movie's release date.
```python Python
from urllib.parse import urlencode
import httpx
from tilebox.workflows import Task, ExecutionContext
class MoviesStats(Task):
titles: list[str]
def execute(self, context: ExecutionContext) -> None:
for title in self.titles:
context.submit_subtask(PrintMovieStats(title))
class PrintMovieStats(Task):
title: str
def execute(self, context: ExecutionContext) -> None:
params = {"t": self.title, "apikey": ""}
url = "http://www.omdbapi.com/?" + urlencode(params)
response = httpx.get(url).json()
# set the display name of the task to the title of the movie:
context.current_task.display = response["Title"]
print(f"{response['Title']} was released on {response['Released']}")
```
```go Go
package movie
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"github.com/tilebox/tilebox-go/workflows/v1"
)
type MoviesStats struct {
Titles []string
}
func (t *MoviesStats) Execute(ctx context.Context) error {
for _, title := range t.Titles {
_, err := workflows.SubmitSubtask(ctx, &PrintMovieStats{Title: title})
if err != nil {
return fmt.Errorf("failed to submit subtask: %w", err)
}
}
return nil
}
type Movie struct {
Title *string `json:"Title"`
Released *string `json:"Released"`
}
type PrintMovieStats struct {
Title string
}
func (t *PrintMovieStats) Execute(ctx context.Context) error {
apiURL := fmt.Sprintf("http://www.omdbapi.com/?t=%s&apikey=%s", url.QueryEscape(t.Title), "")
response, err := http.Get(apiURL)
if err != nil {
return fmt.Errorf("failed to fetch movie: %w", err)
}
defer response.Body.Close()
body, err := io.ReadAll(response.Body)
if err != nil {
return fmt.Errorf("failed to read response: %w", err)
}
var movie Movie
err = json.Unmarshal(body, &movie)
if err != nil {
return fmt.Errorf("failed to unmarshal response: %w", err)
}
// set the display name of the task to the title of the movie:
err := workflows.SetTaskDisplay(ctx, *movie.Title)
if err != nil {
return fmt.Errorf("failed to set task display: %w", err)
}
fmt.Printf("%s was released on %s\n", *movie.Title, *movie.Released)
return nil
}
```
Submitting the workflow as a job reveals a bug in the `PrintMovieStats` task.
```python Python
job = job_client.submit('movies-stats', MoviesStats([
"The Matrix",
"Shrek 2",
"Tilebox - The Movie",
"The Avengers",
]))
job_client.display(job)
```
```go Go
job, err := client.Jobs.Submit(ctx, "movies-stats",
[]workflows.Task{&MoviesStats{
Titles: []string{
"The Matrix",
"Shrek 2",
"Tilebox - The Movie",
"The Avengers",
},
}},
)
```
One of the `PrintMovieStats` tasks fails with a `KeyError`. This error occurs when a movie title is not found by the [OMDb API](http://www.omdbapi.com/), leading to a response without the `Title` and `Released` fields.
Console output from the task runners confirms this:
```plaintext Output
The Matrix was released on 31 Mar 1999
Shrek 2 was released on 19 May 2004
ERROR: Task PrintMovieStats failed with exception: KeyError('Title')
```
The corrected version of `PrintMovieStats` is as follows:
```python Python
class PrintMovieStats(Task):
title: str
def execute(self, context: ExecutionContext) -> None:
params = {"t": self.title, "apikey": ""}
url = "http://www.omdbapi.com/?" + urlencode(params)
response = httpx.get(url).json()
if "Title" in response and "Released" in response:
context.current_task.display = response["Title"]
print(f"{response['Title']} was released on {response['Released']}")
else:
context.current_task.display = f"NotFound: {self.title}"
print(f"Could not find the release date for {self.title}")
```
```go Go
type PrintMovieStats struct {
Title string
}
func (t *PrintMovieStats) Execute(ctx context.Context) error {
url2 := fmt.Sprintf("http://www.omdbapi.com/?t=%s&apikey=%s", url.QueryEscape(t.Title), "")
response, err := http.Get(url2)
if err != nil {
return fmt.Errorf("failed to fetch movie: %w", err)
}
defer response.Body.Close()
body, err := io.ReadAll(response.Body)
if err != nil {
return fmt.Errorf("failed to read response: %w", err)
}
var movie Movie
err = json.Unmarshal(body, &movie)
if err != nil {
return fmt.Errorf("failed to unmarshal response: %w", err)
}
if movie.Released != nil && movie.Title != nil {
err := workflows.SetTaskDisplay(ctx, *movie.Title)
if err != nil {
return fmt.Errorf("failed to set task display: %w", err)
}
fmt.Printf("%s was released on %s\n", *movie.Title, *movie.Released)
} else {
err := workflows.SetTaskDisplay(ctx, fmt.Sprintf("NotFound: %s", t.Title))
if err != nil {
return fmt.Errorf("failed to set task display: %w", err)
}
fmt.Printf("Could not find the release date for %s\n", t.Title)
}
return nil
}
```
With this fix, and after redeploying the task runners with the updated `PrintMovieStats` implementation, you can retry the job:
```python Python
job_client.retry(job)
job_client.display(job)
```
```go Go
_, err := client.Jobs.Retry(ctx, job.ID)
```
Now the console output shows:
```plaintext Output
Could not find the release date for Tilebox - The Movie
The Avengers was released on 04 May 2012
```
The output confirms that only two tasks were executed, resuming from the point of failure instead of re-executing all tasks.
The job was retried and succeeded. The two tasks that completed before the failure were not re-executed.
# Task Runners
Source: https://docs.tilebox.com/workflows/concepts/task-runners
Task runners are the execution agents within the Tilebox Workflows ecosystem that execute tasks. They can be deployed in different computing environments, including on-premise servers and cloud-based auto-scaling clusters. Task runners execute tasks as scheduled by the workflow orchestrator, ensuring they have the necessary resources and environment for effective execution.
## Implementing a Task Runner
A task runner is a continuously running process that listens for new tasks to execute. You can start multiple task runner processes to execute tasks concurrently. When a task runner receives a task, it executes it and reports the results back to the workflow orchestrator. The task runner also handles any errors that occur during task execution, reporting them to the orchestrator as well.
To execute a task, at least one task runner must be running and available. If no task runners are available, tasks will remain queued until one becomes available.
To create and start a task runner, follow these steps:
Instantiate a client connected to the Tilebox Workflows API.
Select or create a [cluster](/workflows/concepts/clusters) and specify its slug when creating a task runner.
If no cluster is specified, the task runner will use the default cluster.
Register tasks by specifying the task classes that the task runner can execute as a list to the `runner` method.
Call the `run_forever` method of the task runner to listen for new tasks until the task runner process is shut down.
Here is a simple example demonstrating these steps:
```python Python
from tilebox.workflows import Client
# your own workflow:
from my_workflow import MyTask, OtherTask
def main():
client = Client() # 1. connect to the Tilebox Workflows API
runner = client.runner(
cluster= "dev-cluster" # 2. select a cluster to join (optional, omit to use the default cluster)
tasks=[MyTask, OtherTask] # 3. register tasks
)
runner.run_forever() # 4. listen for new tasks to execute
if __name__ == "__main__":
main()
```
```go Go
package main
import (
"context"
"log/slog"
"github.com/tilebox/tilebox-go/workflows/v1"
"github.com/tilebox/tilebox-go/workflows/v1/runner"
// your own workflow:
"github.com/my_org/myworkflow"
)
func main() {
ctx := context.Background()
// 1. connect to the Tilebox Workflows API
client := workflows.NewClient()
// 2. select a cluster to join (optional, omit to use the default cluster)
runner, err := client.NewTaskRunner(ctx, runner.WithClusterSlug("dev-cluster"))
if err != nil {
slog.Error("failed to create task runner", slog.Any("error", err))
return
}
// 3. register tasks
err = runner.RegisterTasks(
&myworkflow.MyTask{},
&myworkflow.OtherTask{},
)
if err != nil {
slog.Error("failed to register task", slog.Any("error", err))
return
}
// 4. listen for new tasks to execute
runner.Run(ctx)
}
```
To start the task runner locally, run it as a script:
```bash Python
> python task_runner.py
```
```bash Go
> go run .
```
## Task Selection
For a task runner to pick up a submitted task, the following conditions must be met:
1. The [cluster](/workflows/concepts/clusters) where the task was submitted must match the task runner's cluster.
2. The task runner must have a registered task that matches the [task identifier](/workflows/concepts/tasks#task-identifiers) of the submitted task.
3. The version of the task runner's registered task must be [compatible](/workflows/concepts/tasks#semantic-versioning) with the submitted task's version.
If a task meets these conditions, the task runner executes it. Otherwise, the task runner remains idle until a matching task is available.
Often, multiple submitted tasks match the conditions for execution. In that case, the task runner selects one of the tasks to execute, and the remaining tasks stay in a queue until the selected task is completed or another task runner becomes available.
## Parallelism
You can start multiple task runner instances in parallel to execute tasks concurrently. Each task runner listens for new tasks and executes them as they become available. This allows for high parallelism and can be used to scale the execution of tasks to handle large workloads.
To test this, run multiple instances of the task runner script in different terminal windows on your local machine, or use a tool like [call-in-parallel](https://github.com/tilebox/call-in-parallel) to start the task runner script multiple times.
For example, to start five task runners in parallel, use the following command:
```bash
> call-in-parallel -n 5 -- python task_runner.py
```
## Deploying Task Runners
Task runners are continuously running processes that can be deployed in different computing environments. The only requirement for deploying task runners is access to the Tilebox Workflows API. Once this is met, task runners can be deployed in many different environments, including:
* On-premise servers
* Cloud-based virtual machines
* Cloud-based auto-scaling clusters
## Scaling
One key benefit of task runners is their **ability to scale even while workflows are executing**. You can start new task runners at any time, and they can immediately pick up queued tasks to execute. It's not necessary to have an entire processing cluster available at the start of a workflow, as task runners can be started and stopped as needed.
This is particularly beneficial in cloud environments, where task runners can be automatically started and stopped based on current workload, measured by metrics such as CPU usage. Here's an example scenario:
1. A single instance of a task runner is actively waiting for work in a cloud environment.
2. A large workload is submitted to the workflow orchestrator, prompting the task runner to pick up the first task.
3. The first task creates new sub-tasks for processing, which the task runner also picks up.
4. As the workload increases, the task runner's CPU usage rises, triggering the cloud environment to automatically start new task runner instances.
5. Newly started task runners begin executing queued tasks, distributing the workload among all available task runners.
6. Once the workload decreases, the cloud environment automatically stops some task runners.
7. The first task runner completes the remaining work until everything is done.
8. The first task runner remains idle until new tasks arrive.
CPU usage-based auto scaling is just one method to scale task runners. Other metrics, such as memory usage or network bandwidth, are also supported by many cloud environments. In a future release, configuration options for scaling task runners based on custom metrics (for example the number of queued tasks) are planned.
## Distributed Execution
Task runners can be distributed across different compute environments. For instance, some data stored on-premise may need pre-processing, while further processing occurs in the cloud. A job might involve tasks that filter relevant on-premise data and publish it to the cloud, and other tasks that read data from the cloud and process it. In such scenarios, a task runners can run on-premise and another in a cloud environments, resulting in them effectively collaborating on the same job.
Another advantage of distributed task runners is executing workflows that require specific hardware for certain tasks. For example, one task might need a GPU, while another requires extensive memory.
Here's an example of a distributed workflow:
```python Python
from tilebox.workflows import Task, ExecutionContext
class DistributedWorkflow(Task):
def execute(self, context: ExecutionContext) -> None:
download_task = context.submit_subtask(DownloadData())
process_task = context.submit_subtask(
ProcessData(),
depends_on=[download_task],
)
class DownloadData(Task):
"""
Download a dataset and store it in a shared internal bucket.
Requires a good network connection for high download bandwidth.
"""
def execute(self, context: ExecutionContext) -> None:
pass
class ProcessData(Task):
"""
Perform compute-intensive processing of a dataset.
The dataset must be available in an internal bucket.
Requires access to a GPU for optimal performance.
"""
def execute(self, context: ExecutionContext) -> None:
pass
```
```go Go
package distributed
import (
"context"
"fmt"
"github.com/tilebox/tilebox-go/workflows/v1"
"github.com/tilebox/tilebox-go/workflows/v1/subtask"
)
type DistributedWorkflow struct{}
func (t *DistributedWorkflow) Execute(ctx context.Context) error {
downloadTask, err := workflows.SubmitSubtask(ctx, &DownloadData{})
if err != nil {
return fmt.Errorf("failed to submit download subtask: %w", err)
}
_, err = workflows.SubmitSubtask(ctx, &ProcessData{}, subtask.WithDependencies(downloadTask))
if err != nil {
return fmt.Errorf("failed to submit process subtask: %w", err)
}
return nil
}
// DownloadData Download a dataset and store it in a shared internal bucket.
// Requires a good network connection for high download bandwidth.
type DownloadData struct{}
func (t *DownloadData) Execute(ctx context.Context) error {
return nil
}
// ProcessData Perform compute-intensive processing of a dataset.
// The dataset must be available in an internal bucket.
// Requires access to a GPU for optimal performance.
type ProcessData struct{}
func (t *ProcessData) Execute(ctx context.Context) error {
return nil
}
```
To achieve distributed execution for this workflow, no single task runner capable of executing all three of the tasks is set up.
Instead, two task runners, each capable of executing one of the tasks are set up: one in a high-speed network environment and the other with GPU access.
When the distributed workflow runs, the first task runner picks up the `DownloadData` task, while the second picks up the `ProcessData` task.
The `DistributedWorkflow` does not require specific hardware, so it can be registered with both runners and executed by either one.
```python Python
from tilebox.workflows import Client
client = Client()
high_network_speed_runner = client.runner(
tasks=[DownloadData, DistributedWorkflow]
)
high_network_speed_runner.run_forever()
```
```go Go
package main
import (
"context"
"log/slog"
"github.com/tilebox/tilebox-go/workflows/v1"
)
func main() {
ctx := context.Background()
client := workflows.NewClient()
highNetworkSpeedRunner, err := client.NewTaskRunner(ctx)
if err != nil {
slog.Error("failed to create task runner", slog.Any("error", err))
return
}
err = highNetworkSpeedRunner.RegisterTasks(
&DownloadData{},
&DistributedWorkflow{},
)
if err != nil {
slog.Error("failed to register tasks", slog.Any("error", err))
return
}
highNetworkSpeedRunner.RunForever(ctx)
}
```
```python Python
from tilebox.workflows import Client
client = Client()
gpu_runner = client.runner(
tasks=[ProcessData, DistributedWorkflow]
)
gpu_runner.run_forever()
```
```go Go
package main
import (
"context"
"log/slog"
"github.com/tilebox/tilebox-go/workflows/v1"
)
func main() {
ctx := context.Background()
client := workflows.NewClient()
gpuRunner, err := client.NewTaskRunner(ctx)
if err != nil {
slog.Error("failed to create task runner", slog.Any("error", err))
return
}
err = gpuRunner.RegisterTasks(
&ProcessData{},
&DistributedWorkflow{},
)
if err != nil {
slog.Error("failed to register tasks", slog.Any("error", err))
return
}
gpuRunner.RunForever(ctx)
}
```
Now, both `download_task_runner.py` and `gpu_task_runner.py` are started, in parallel, on different machines with the required hardware for each. When `DistributedWorkflow` is submitted, it executes on one of the two runners, and it's submitted sub-tasks are handled by the appropriate runner.
In this case, since `ProcessData` depends on `DownloadData`, the GPU task runner remains idle until the download completion, then picks up the processing task.
You can also differentiate between task runners by specifying different [clusters](/workflows/concepts/clusters) and choosing specific clusters for sub-task submissions. For more details, see the [Clusters](/workflows/concepts/clusters) section.
## Task Failures
If an unhandled exception occurs during task execution, the task runner captures it and reports it back to the workflow orchestrator. The orchestrator then marks the task as failed, leading to [job cancellation](/workflows/concepts/jobs#cancellation) to prevent further tasks of the same job-that may not be relevant anymore-from being executed.
A task failure does not result in losing all previous work done by the job. If the failure is fixableāby fixing a bug in a task implementation, ensuring the task has necessary resources, or simply retrying it due to a flaky network connectionāit may be worth [retrying](/workflows/concepts/jobs#retries) the job.
When retrying a job, all failed tasks are added back to the queue, allowing a task runner to potentially execute them. If execution then succeeds, the job continues smoothly. Otherwise, the task will remain marked as failed and can be retried again if desired.
If fixing a failure requires modifying the task implementation, it's important to deploy the updated version to the [task runners](/workflows/concepts/task-runners) before retrying the job. Otherwise, a task runner could pick up the original, faulty implementation again, leading to another failure.
## Task idempotency
Since a task may be retried, it's possible that a task is executed more than once. Depending on where in the execution of the task it failed, it may have already performed some side effects, such as writing to a database, or sending a message to a queue. Because of that it's crucial to ensure that tasks are [idempotent](https://en.wikipedia.org/wiki/Idempotence). Idempotent tasks can be executed multiple times without altering the outcome beyond the first successful execution.
A special case of idempotency involves submitting sub-tasks. After a task calls `context.submit_subtask` and then fails and is retried, those submitted sub-tasks of an earlier failed execution are automatically removed, ensuring that they can be safely submitted again when the task is retried.
## Runner Crashes
Tilebox Workflows has an internal mechanism to handle unexpected task runner crashes. When a task runner picks up a task, it periodically sends a heartbeat to the workflow orchestrator. If the orchestrator does not receive this heartbeat for a defined duration, it marks the task as failed and automatically attempts to [retry](/workflows/concepts/jobs#retries) it up to 10 times. This allows another task runner to pick up the task and continue executing the job.
This mechanism ensures that scenarios such as power outages, hardware failures, or dropped network connections are handled effectively, preventing any task from remaining in a running state indefinitely.
## Observability
Task runners are continuously running processes, making it essential to monitor their health and performance. You can achieve observability by collecting and analyzing logs, metrics, and traces from task runners. Tilebox Workflows provides tools to enable this data collection and analysis. To learn how to configure task runners for observability, head over to the [Observability](/workflows/observability) section.
# Understanding and Creating Tasks
Source: https://docs.tilebox.com/workflows/concepts/tasks
A Task is the smallest unit of work, designed to perform a specific operation. Each task represents a distinct operation or process that can be executed, such as processing data, performing calculations, or managing resources. Tasks can operate independently or as components of a more complex set of connected tasks known as a Workflow. Tasks are defined by their code, inputs, and dependencies on other tasks. To create tasks, you need to define the input parameters and specify the action to be performed during execution.
## Creating a Task
To create a task in Tilebox, define a class that extends the `Task` base class and implements the `execute` method. The `execute` method is the entry point for the task where its logic is defined. It's called when the task is executed.
```python Python
from tilebox.workflows import Task, ExecutionContext
class MyFirstTask(Task):
def execute(self, context: ExecutionContext):
print("Hello World!")
```
```go Go
type MyFirstTask struct{}
func (t *MyFirstTask) Execute(ctx context.Context) error {
slog.Info("Hello World!")
return nil
}
```
This example demonstrates a simple task that prints "Hello World!" to the console.
For python, the key components of this task are:
`MyFirstTask` is a subclass of the `Task` class, which serves as the base class for all defined tasks. It provides the essential structure for a task. Inheriting from `Task` automatically makes the class a `dataclass`, which is useful [for specifying inputs](#input-parameters). Additionally, by inheriting from `Task`, the task is automatically assigned an [identifier based on the class name](#task-identifiers).
The `execute` method is the entry point for executing the task. This is where the task's logic is defined. It's invoked by a [task runner](/workflows/concepts/task-runners) when the task runs and performs the task's operation.
The `context` argument is an `ExecutionContext` instance that provides access to an [API for submitting new tasks](/api-reference/python/tilebox.workflows/ExecutionContext.submit_subtask) as part of the same job and features like [shared caching](/api-reference/python/tilebox.workflows/ExecutionContext.job_cache).
For Go, the key components are:
`MyFirstTask` is a struct that implements the `Task` interface. It represents the task to be executed.
The `Execute` method is the entry point for executing the task. This is where the task's logic is defined. It's invoked by a [task runner](/workflows/concepts/task-runners) when the task runs and performs the task's operation.
The code samples on this page do not illustrate how to execute the task. That will be covered in the
[next section on task runners](/workflows/concepts/task-runners). The reason for that is that executing tasks is a separate concern from implementing tasks.
## Input Parameters
Tasks often require input parameters to operate. These inputs can range from simple values to complex data structures. By inheriting from the `Task` class, the task is treated as a Python `dataclass`, allowing input parameters to be defined as class attributes.
Tasks must be **serializable to JSON or to protobuf** because they may be distributed across a cluster of [task runners](/workflows/concepts/task-runners).
In Go, task parameters must be exported fields of the task struct (starting with an uppercase letter), otherwise they will not be serialized to JSON.
Supported types for input parameters include:
* Basic types such as `str`, `int`, `float`, `bool`
* Lists and dictionaries of basic types
* Nested data classes that are also JSON-serializable or protobuf-serializable
```python Python
class ParametrizableTask(Task):
message: str
number: int
data: dict[str, str]
def execute(self, context: ExecutionContext):
print(self.message * self.number)
task = ParametrizableTask("Hello", 3, {"key": "value"})
```
```go Go
type ParametrizableTask struct {
Message string
Number int
Data map[string]string
}
func (t *ParametrizableTask) Execute(context.Context) error {
slog.Info(strings.Repeat(t.Message, t.Number))
return nil
}
task := &ParametrizableTask{
message: "Hello",
number: 3,
data: map[string]string{"key": "value"},
}
```
## Task Composition and subtasks
Until now, tasks have performed only a single operation. But tasks can be more powerful. **Tasks can submit other tasks as subtasks.** This allows for a modular workflow design, breaking down complex operations into simpler, manageable parts. Additionally, the execution of subtasks is automatically parallelized whenever possible.
```python Python
class ParentTask(Task):
num_subtasks: int
def execute(self, context: ExecutionContext) -> None:
for i in range(self.num_subtasks):
context.submit_subtask(ChildTask(i))
class ChildTask(Task):
index: int
def execute(self, context: ExecutionContext) -> None:
print(f"Executing ChildTask {self.index}")
# after submitting this task, a task runner may pick it up and execute it
# which will result in 5 ChildTasks being submitted and executed as well
task = ParentTask(5)
```
```go Go
type ParentTask struct {
NumSubtasks int
}
func (t *ParentTask) Execute(ctx context.Context) error {
for i := range t.NumSubtasks {
_, err := workflows.SubmitSubtask(ctx, &ChildTask{Index: i})
if err != nil {
return err
}
}
return nil
}
type ChildTask struct {
Index int
}
func (t *ChildTask) Execute(context.Context) error {
slog.Info("Executing ChildTask", slog.Int("index", t.Index))
return nil
}
// after submitting this task, a task runner may pick it up and execute it
// which will result in 5 ChildTasks being submitted and executed as well
task := &ParentTask{numSubtasks: 5}
```
In this example, a `ParentTask` submits `ChildTask` tasks as subtasks. The number of subtasks to be submitted is based on the `num_subtasks` attribute of the `ParentTask`. The `submit_subtask` method takes an instance of a task as its argument, meaning the task to be submitted must be instantiated with concrete parameters first.
Parent task do not have access to results of subtasks, instead, tasks can use [shared caching](/workflows/caches#storing-and-retrieving-data) to share data between tasks.
By submitting a task as a subtask, its execution is scheduled as part of the same job as the parent task. Compared to just directly invoking the subtask's `execute` method, this allows the subtask's execution to occur on a different machine or in parallel with other subtasks. To learn more about how tasks are executed, see the section on [task runners](/workflows/concepts/task-runners).
### Larger subtasks example
A practical workflow example showcasing task composition might help illustrate the capabilities of tasks. Below is an example of a set of tasks forming a workflow capable of downloading a set number of random dog images from the internet. The [Dog API](https://thedogapi.com/) can be used to get the image URLs, and then download them. Implementing this using Task Composition could look like this:
```python Python
import httpx # pip install httpx
from pathlib import Path
class DownloadRandomDogImages(Task):
num_images: int
def execute(self, context: ExecutionContext) -> None:
url = f"https://api.thedogapi.com/v1/images/search?limit={self.num_images}"
response = httpx.get(url)
for dog_image in response.json():
context.submit_subtask(DownloadImage(dog_image["url"]))
class DownloadImage(Task):
url: str
def execute(self, context: ExecutionContext) -> None:
file = Path("dogs") / self.url.split("/")[-1]
response = httpx.get(self.url)
with file.open("wb") as file:
file.write(response.content)
```
```go Go
package dogs
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strings"
"github.com/tilebox/tilebox-go/workflows/v1"
)
type DogImage struct {
ID string `json:"id"`
URL string `json:"url"`
Width *int `json:"width"`
Height *int `json:"height"`
}
type DownloadRandomDogImages struct {
NumImages int
}
func (t *DownloadRandomDogImages) Execute(ctx context.Context) error {
url := fmt.Sprintf("https://api.thedogapi.com/v1/images/search?limit=%d", t.NumImages)
response, err := http.Get(url)
if err != nil {
return fmt.Errorf("failed to download images: %w", err)
}
defer response.Body.Close()
body, err := io.ReadAll(response.Body)
if err != nil {
return fmt.Errorf("failed to read response: %w", err)
}
var dogImages []DogImage
err = json.Unmarshal(body, &dogImages)
if err != nil {
return err
}
for _, dogImage := range dogImages {
_, err := workflows.SubmitSubtask(ctx, &DownloadImage{URL: dogImage.URL})
if err != nil {
return err
}
}
return nil
}
type DownloadImage struct {
URL string
}
func (t *DownloadImage) Execute(context.Context) error {
response, err := http.Get(t.URL)
if err != nil {
return fmt.Errorf("failed to download image: %w", err)
}
defer response.Body.Close()
body, err := io.ReadAll(response.Body)
if err != nil {
return fmt.Errorf("failed to read response: %w", err)
}
err = os.MkdirAll("dogs", 0o755)
if err != nil {
return fmt.Errorf("failed to create dogs directory: %w", err)
}
elements := strings.Split(t.URL, "/")
file := fmt.Sprintf("dogs/%s", elements[len(elements)-1])
return os.WriteFile(file, body, 0o600)
}
```
This example consists of the following tasks:
`DownloadRandomDogImages` fetches a specific number of random dog image URLs from an API. It then submits a `DownloadImage` task for each received image URL.
`DownloadImage` downloads an image from a specified URL and saves it to a file.
Together, these tasks create a workflow that downloads random dog images from the internet. The relationship between the two tasks and their formation as a workflow becomes clear when `DownloadRandomDogImages` submits `DownloadImage` tasks as subtasks.
Visualizing the execution of such a workflow is akin to a tree structure where the `DownloadRandomDogImages` task is the root, and the `DownloadImage` tasks are the leaves. For instance, when downloading five random dog images, the following tasks are executed.
```python Python
from tilebox.workflows import Client
client = Client()
jobs = client.jobs()
job = jobs.submit(
"download-dog-images",
DownloadRandomDogImages(5),
)
# now our deployed task runners will pick up the task and execute it
jobs.display(job)
```
```go Go
ctx := context.Background()
client := workflows.NewClient()
job, err := client.Jobs.Submit(ctx, "download-dog-images",
[]workflows.Task{
&helloworld.DownloadRandomDogImages{
NumImages: 5,
},
},
)
if err != nil {
slog.Error("Failed to submit job", slog.Any("error", err))
return
}
// now our deployed task runners will pick up the task and execute it
```
In total, six tasks are executed: the `DownloadRandomDogImages` task and five `DownloadImage` tasks. The `DownloadImage` tasks can execute in parallel, as they are independent. If more than one task runner is available, the Tilebox Workflow Orchestrator **automatically parallelizes** the execution of these tasks.
Check out [job\_client.display](/workflows/concepts/jobs#visualization) to learn how this visualization was automatically generated from the task executions.
Currently, a limit of `64` subtasks per task is in place to discourage creating workflows where individual parent tasks submit a large number of subtasks, which can lead to performance issues since those parent tasks are not parallelized. If you need to submit more than `64` subtasks, consider using [recursive subtask submission](#recursive-subtasks) instead.
## Recursive subtasks
Tasks can submit other tasks as subtasks, allowing for complex workflows. Sometimes the input to a task is a list, with elements that can be **mapped** to individual subtasks, whose outputs are then aggregated in a **reduce** step. This pattern is commonly known as **MapReduce**.
Often times the initial map stepāsubmitting the individual subtasksāmight already be an expensive operation. Since this is executed within a single task, it's not parallelizable, which can bottleneck the entire workflow.
Fortunately, Tilebox Workflows offers a solution through **recursive subtask submission**. A task can submit instances of itself as subtasks, enabling a recursive breakdown into smaller tasks.
For example, the `RecursiveTask` below is a valid task that submits smaller instances of itself as subtasks.
```python Python
class RecursiveTask(Task):
num: int
def execute(self, context: ExecutionContext) -> None:
print(f"Executing RecursiveTask with num={self.num}")
if self.num >= 2:
context.submit_subtask(RecursiveTask(self.num // 2))
```
```go Go
type RecursiveTask struct {
Num int
}
func (t *RecursiveTask) Execute(ctx context.Context) error {
slog.Info("Executing RecursiveTask", slog.Int("num", t.Num))
if t.Num >= 2 {
_, err := workflows.SubmitSubtask(ctx, &RecursiveTask{Num: t.Num / 2})
if err != nil {
return err
}
}
return nil
}
```
### Recursive subtask example
An example for this is the [random dog images workflow](#larger-subtasks-example) mentioned earlier. In the previous implementation, downloading images was already parallelized. But the initial orchestration of the individual download tasks was not parallelized, because `DownloadRandomDogImages` was responsible for fetching all random dog image URLs and only submitted the individual download tasks once all URLs were retrieved. For a large number of images this setup can bottleneck the entire workflow.
To improve this, recursive subtask submission decomposes a `DownloadRandomDogImages` task with a high number of images into two smaller `DownloadRandomDogImages` tasks, each fetching half. This process can be repeated until a specified threshold is met, at which point the Dog API can be queried directly for image URLs. That way, image downloads start as soon as the first URLs are retrieved, without initial waiting.
An implementation of this recursive submission may look like this:
```python Python
class DownloadRandomDogImages(Task):
num_images: int
def execute(self, context: ExecutionContext) -> None:
if self.num_images > 4:
half = self.num_images // 2
remaining = self.num_images - half # account for odd numbers
context.submit_subtask(DownloadRandomDogImages(half))
context.submit_subtask(DownloadRandomDogImages(remaining))
else:
url = f"https://api.thedogapi.com/v1/images/search?limit={self.num_images}"
response = httpx.get(url)
for dog_image in response.json()[:self.num_images]:
context.submit_subtask(DownloadImage(dog_image["url"]))
```
```go Go
type DownloadRandomDogImages struct {
NumImages int
}
func (t *DownloadRandomDogImages) Execute(ctx context.Context) error {
if t.NumImages > 4 {
half := t.NumImages / 2
remaining := t.NumImages - half // account for odd numbers
_, err := workflows.SubmitSubtask(ctx, &DownloadRandomDogImages{NumImages: half})
if err != nil {
return err
}
_, err = workflows.SubmitSubtask(ctx, &DownloadRandomDogImages{NumImages: remaining})
if err != nil {
return err
}
} else {
url := fmt.Sprintf("https://api.thedogapi.com/v1/images/search?limit=%d", t.NumImages)
response, err := http.Get(url)
if err != nil {
return fmt.Errorf("failed to download images: %w", err)
}
defer response.Body.Close()
body, err := io.ReadAll(response.Body)
if err != nil {
return fmt.Errorf("failed to read response: %w", err)
}
var dogImages []DogImage
err = json.Unmarshal(body, &dogImages)
if err != nil {
return err
}
for _, dogImage := range dogImages {
_, err := workflows.SubmitSubtask(ctx, &DownloadImage{URL: dogImage.URL})
if err != nil {
return err
}
}
}
return nil
}
```
With this implementation, downloading a large number of images (for example, 9) results in the following tasks being executed:
## Retry Handling
By default, when a task fails to execute, it's marked as failed. In some cases, it may be useful to retry the task multiple times before marking it as a failure. This is particularly useful for tasks dependent on external services that might be temporarily unavailable.
Tilebox Workflows allows you to specify the number of retries for a task using the `max_retries` argument of the `submit_subtask` method.
Check out the example below to see how this might look like in practice.
A failed task may be picked up by any available runner and not necessarily the same one that it failed on.
```python Python
import random
class RootTask(Task):
def execute(self, context: ExecutionContext) -> None:
context.submit_subtask(FlakyTask(), max_retries=5)
class FlakyTask(Task):
def execute(self, context: ExecutionContext) -> None:
print(f"Executing FlakyTask")
if random.random() < 0.1:
raise Exception("FlakyTask failed randomly")
```
```go Go
package flaky
import (
"context"
"errors"
"log/slog"
"math/rand/v2"
"github.com/tilebox/tilebox-go/workflows/v1"
"github.com/tilebox/tilebox-go/workflows/v1/subtask"
)
type RootTask struct{}
func (t *RootTask) Execute(ctx context.Context) error {
_, err := workflows.SubmitSubtask(ctx, &FlakyTask{},
subtask.WithMaxRetries(5),
)
return err
}
type FlakyTask struct{}
func (t *FlakyTask) Execute(context.Context) error {
slog.Info("Executing FlakyTask")
if rand.Float64() < 0.1 {
return errors.New("FlakyTask failed randomly")
}
return nil
}
```
## Dependencies
Tasks often rely on other tasks. For example, a task that processes data might depend on a task that fetches that data. **Tasks can express their dependencies on other tasks** by using the `depends_on` argument of the [`submit_subtask`](/api-reference/python/tilebox.workflows/ExecutionContext.submit_subtask) method. This means that a dependent task will only execute after the task it relies on has successfully completed.
The `depends_on` argument accepts a list of tasks, enabling a task to depend on multiple other tasks.
A workflow with dependencies might look like this:
```python Python
class RootTask(Task):
def execute(self, context: ExecutionContext) -> None:
first_task = context.submit_subtask(
PrintTask("Executing first")
)
second_task = context.submit_subtask(
PrintTask("Executing second"),
depends_on=[first_task],
)
third_task = context.submit_subtask(
PrintTask("Executing last"),
depends_on=[second_task],
)
class PrintTask(Task):
message: str
def execute(self, context: ExecutionContext) -> None:
print(self.message)
```
```go Go
type RootTask struct{}
func (t *RootTask) Execute(ctx context.Context) error {
firstTask, err := workflows.SubmitSubtask(
ctx,
&PrintTask{Message: "Executing first"},
)
if err != nil {
return err
}
secondTask, err := workflows.SubmitSubtask(
ctx,
&PrintTask{Message: "Executing second"},
subtask.WithDependencies(firstTask),
)
if err != nil {
return err
}
_, err = workflows.SubmitSubtask(
ctx,
&PrintTask{Message: "Executing last"},
subtask.WithDependencies(secondTask),
)
if err != nil {
return err
}
return nil
}
type PrintTask struct {
Message string
}
func (t *PrintTask) Execute(context.Context) error {
slog.Info("PrintTask", slog.String("message", t.Message))
return nil
}
```
The `RootTask` submits three `PrintTask` tasks as subtasks. These tasks depend on each other, meaning the second task executes only after the first task has successfully completed, and the third only executes after the second completes. The tasks are executed sequentially.
If a task upon which another task depends submits subtasks, those subtasks must also execute before the dependent task begins execution.
### Dependencies Example
A practical example is a workflow that fetches news articles from an API and processes them using the [News API](https://newsapi.org/).
```python Python
from pathlib import Path
import json
from collections import Counter
import httpx # pip install httpx
class NewsWorkflow(Task):
category: str
max_articles: int
def execute(self, context: ExecutionContext) -> None:
fetch_task = context.submit_subtask(FetchNews(self.category, self.max_articles))
context.submit_subtask(PrintHeadlines(), depends_on=[fetch_task])
context.submit_subtask(MostFrequentAuthors(), depends_on=[fetch_task])
class FetchNews(Task):
category: str
max_articles: int
def execute(self, context: ExecutionContext) -> None:
url = f"https://newsapi.org/v2/top-headlines?category={self.category}&pageSize={self.max_articles}&country=us&apiKey=API_KEY"
news = httpx.get(url).json()
# check out our documentation page on caches to learn
# about a better way of passing data between tasks
Path("news.json").write_text(json.dumps(news))
class PrintHeadlines(Task):
def execute(self, context: ExecutionContext) -> None:
news = json.loads(Path("news.json").read_text())
for article in news["articles"]:
print(f"{article['publishedAt'][:10]}: {article['title']}")
class MostFrequentAuthors(Task):
def execute(self, context: ExecutionContext) -> None:
news = json.loads(Path("news.json").read_text())
authors = [article["author"] for article in news["articles"]]
for author, count in Counter(authors).most_common():
print(f"Author {author} has written {count} articles")
# now submit a job, and then visualize it
job = job_client.submit("process-news",
NewsWorkflow(category="science", max_articles=5),
)
```
```go Go
package news
import (
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"os"
"time"
"github.com/tilebox/tilebox-go/workflows/v1"
"github.com/tilebox/tilebox-go/workflows/v1/subtask"
)
const newsAPIKey = "YOUR_API_KEY"
type NewsWorkflow struct {
Category string
MaxArticles int
}
func (t *NewsWorkflow) Execute(ctx context.Context) error {
fetchTask, err := workflows.SubmitSubtask(ctx, &FetchNews{
Category: t.Category,
MaxArticles: t.MaxArticles,
})
if err != nil {
return err
}
_, err = workflows.SubmitSubtask(ctx, &PrintHeadlines{}, subtask.WithDependencies(fetchTask))
if err != nil {
return err
}
_, err = workflows.SubmitSubtask(ctx, &MostFrequentAuthors{}, subtask.WithDependencies(fetchTask))
if err != nil {
return err
}
return nil
}
type News struct {
Status string `json:"status"`
TotalResults int `json:"totalResults"`
Articles []struct {
Source struct {
ID *string `json:"id"`
Name string `json:"name"`
} `json:"source"`
Author *string `json:"author"`
Title string `json:"title"`
Description *string `json:"description"`
URL string `json:"url"`
URLToImage *string `json:"urlToImage"`
PublishedAt time.Time `json:"publishedAt"`
Content *string `json:"content"`
} `json:"articles"`
}
type FetchNews struct {
Category string
MaxArticles int
}
func (t *FetchNews) Execute(context.Context) error {
url := fmt.Sprintf("https://newsapi.org/v2/top-headlines?category=%s&pageSize=%d&country=us&apiKey=%s", t.Category, t.MaxArticles, newsAPIKey)
response, err := http.Get(url)
if err != nil {
return fmt.Errorf("failed to download news: %w", err)
}
defer response.Body.Close()
body, err := io.ReadAll(response.Body)
if err != nil {
return fmt.Errorf("failed to read response: %w", err)
}
// check out our documentation page on caches to learn
// about a better way of passing data between tasks
return os.WriteFile("news.json", body, 0o600)
}
type PrintHeadlines struct{}
func (t *PrintHeadlines) Execute(context.Context) error {
newsBytes, err := os.ReadFile("news.json")
if err != nil {
return fmt.Errorf("failed to read news: %w", err)
}
var news News
err = json.Unmarshal(newsBytes, &news)
if err != nil {
return fmt.Errorf("failed to unmarshal news: %w", err)
}
for _, article := range news.Articles {
slog.Info("Article", slog.Time("published_at", article.PublishedAt), slog.String("title", article.Title))
}
return nil
}
type MostFrequentAuthors struct{}
func (t *MostFrequentAuthors) Execute(context.Context) error {
newsBytes, err := os.ReadFile("news.json")
if err != nil {
return fmt.Errorf("failed to read news: %w", err)
}
var news News
err = json.Unmarshal(newsBytes, &news)
if err != nil {
return fmt.Errorf("failed to unmarshal news: %w", err)
}
authors := make(map[string]int)
for _, article := range news.Articles {
if article.Author == nil {
continue
}
authors[*article.Author]++
}
for author, count := range authors {
slog.Info("Author", slog.String("author", author), slog.Int("count", count))
}
return nil
}
// in main now submit a job, and then visualize it
/*
job, err := client.Jobs.Submit(ctx, "process-news",
[]workflows.Task{
&NewsWorkflow{
Category: "science",
MaxArticles: 5,
},
},
)
*/
```
```plaintext Output
2024-02-15: NASA selects ultraviolet astronomy mission but delays its launch two years - SpaceNews
2024-02-15: SpaceX launches Space Force mission from Cape Canaveral - Orlando Sentinel
2024-02-14: Saturn's largest moon most likely uninhabitable - Phys.org
2024-02-14: AI Unveils Mysteries of Unknown Proteins' Functions - Neuroscience News
2024-02-14: Anthropologists' research unveils early stone plaza in the Andes - Phys.org
Author Jeff Foust has written 1 articles
Author Richard Tribou has written 1 articles
Author Jeff Renaud has written 1 articles
Author Neuroscience News has written 1 articles
Author Science X has written 1 articles
```
This workflow consists of four tasks:
| Task | Dependencies | Description |
| ------------------- | ------------ | ----------------------------------------------------------------------------------------------------------------------- |
| NewsWorkflow | - | The root task of the workflow. It spawns the other tasks and sets up the dependencies between them. |
| FetchNews | - | A task that fetches news articles from the API and writes the results to a file, which is then read by dependent tasks. |
| PrintHeadlines | FetchNews | A task that prints the headlines of the news articles to the console. |
| MostFrequentAuthors | FetchNews | A task that counts the number of articles each author has written and prints the result to the console. |
An important aspect is that there is no dependency between the `PrintHeadlines` and `MostFrequentAuthors` tasks. This means they can execute in parallel, which the Tilebox Workflow Orchestrator will do, provided multiple task runners are available.
In this example, the results from `FetchNews` are stored in a file. This is not the recommended method for passing data between tasks. When executing on a distributed cluster, the existence of a file written by a dependent task cannot be guaranteed. Instead, it's better to use a [shared cache](/workflows/caches).
## Task Identifiers
A task identifier is a unique string used by the Tilebox Workflow Orchestrator to identify the task. It's used by [task runners](/workflows/concepts/task-runners) to map submitted tasks to a task class and execute them. It also serves as the default name in execution visualizations as a tree of tasks.
If unspecified, the identifier of a task defaults to the class name. For instance, the identifier of the `PrintHeadlines` task in the previous example is `"PrintHeadlines"`. This is good for prototyping, but not recommended for production, as changing the class name also changes the identifier, which can lead to issues during refactoring. It also prevents different tasks from sharing the same class name.
To address this, Tilebox Workflows offers a way to explicitly specify the identifier of a task. This is done by overriding the `identifier` method of the `Task` class. This method should return a unique string identifying the task. This decouples the task's identifier from its class name, allowing you to change the identifier without renaming the class. It also allows tasks with the same class name to have different identifiers. The `identifier` method can also specify a version number for the taskāsee the section on [semantic versioning](#semantic-versioning) below for more details.
```python Python
class MyTask(Task):
def execute(self, context: ExecutionContext) -> None:
pass
# MyTask has the identifier "MyTask" and the default version of "v0.0"
class MyTask2(Task):
@staticmethod
def identifier() -> tuple[str, str]:
return "tilebox.com/example_workflow/MyTask", "v1.0"
def execute(self, context: ExecutionContext) -> None:
pass
# MyTask2 has the identifier "tilebox.com/example_workflow/MyTask" and the version "v1.0"
```
```go Go
type MyTask struct{}
func (t *MyTask) Execute(context.Context) error {
return nil
}
// MyTask has the identifier "MyTask" and the default version of "v0.0"
type MyTask2 struct{}
func (t *MyTask2) Identifier() workflows.TaskIdentifier {
return workflows.NewTaskIdentifier("tilebox.com/example_workflow/MyTask", "v1.0")
}
func (t *MyTask2) Execute(context.Context) error {
return nil
}
// MyTask2 has the identifier "tilebox.com/example_workflow/MyTask" and the version "v1.0"
```
In python, the `identifier` method must be defined as either a `classmethod` or a `staticmethod`, meaning it can be called without instantiating the class.
## Semantic Versioning
As seen in the previous section, the `identifier` method can return a tuple of two strings, where the first string is the identifier and the second string is the version number. This allows for semantic versioning of tasks.
Versioning is important for managing changes to a task's execution method. It allows for new features, bug fixes, and changes while ensuring existing workflows operate as expected. Additionally, it enables multiple versions of a task to coexist, enabling gradual rollout of changes without interrupting production deployments.
You assign a version number by overriding the `identifier` method of the task class. It must return a tuple of two strings: the first is the [identifier](#task-identifiers) and the second is the version number, which must match the pattern `vX.Y` (where `X` and `Y` are non-negative integers). `X` is the major version number and `Y` is the minor version.
For example, this task has the identifier `"tilebox.com/example_workflow/MyTask"` and the version `"v1.3"`:
```python Python
class MyTask(Task):
@staticmethod
def identifier() -> tuple[str, str]:
return "tilebox.com/example_workflow/MyTask", "v1.3"
def execute(self, context: ExecutionContext) -> None:
pass
```
```go Go
type MyTask struct{}
func (t *MyTask) Identifier() workflows.TaskIdentifier {
return workflows.NewTaskIdentifier("tilebox.com/example_workflow/MyTask", "v1.3")
}
func (t *MyTask) Execute(context.Context) error {
return nil
}
```
When a task is submitted as part of a job, the version from which it's submitted is recorded and may differ from the version on the task runner executing the task.
When task runners execute a task, they require a registered task with a matching identifier and compatible version number. A compatible version is where the major version number on the task runner matches that of the submitted task, and the minor version number on the task runner is equal to or greater than that of the submitted task.
Examples of compatible version numbers include:
* `MyTask` is submitted as part of a job. The version is `"v1.3"`.
* A task runner with version `"v1.3"` of `MyTask` would executes this task.
* A task runner with version `"v1.5"` of `MyTask` would also executes this task.
* A task runner with version `"v1.2"` of `MyTask` would not execute this task, as its minor version is lower than that of the submitted task.
* A task runner with version `"v2.5"` of `MyTask` would not execute this task, as its major version differs from that of the submitted task.
## States
A task can be in one of the following states:
* `QUEUED`: the task is queued and waiting to be run
* `RUNNING`: the task is currently running on some task runner
* `COMPUTED`: the task has been computed and the output is available. Once in this state, the task will never transition to any other state
* `FAILED`: the task has failed
* `CANCELLED`: the task has been cancelled due to user request
## Conclusion
Tasks form the foundation of Tilebox Workflows. By understanding how to create and manage tasks, you can leverage Tilebox's capabilities to automate and optimize your workflows. Experiment with defining your own tasks, utilizing subtasks, managing dependencies, and employing semantic versioning to develop robust and efficient workflows.
# Tilebox Workflows
Source: https://docs.tilebox.com/workflows/introduction
The Tilebox workflow orchestrator is a parallel processing engine. It simplifies the creation of dynamic tasks that can be executed across various computing environments, including on-premise and auto-scaling clusters in public clouds.
This section provides guides showcasing how to use the Tilebox workflow orchestrator effectively. Here are some of the key learning areas:
Create tasks using the Tilebox Workflow Orchestrator.
Learn how to submit jobs to the workflow orchestrator, which schedules tasks for execution.
Learn how to set up task runners to execute tasks in a distributed manner.
Understand how to gain insights into task executions using observability features like tracing and logging.
Learn to configure shared data access for all tasks of a job using caches.
Trigger jobs based on events or schedules, such as new data availability or CRON schedules.
## Terminology
Before exploring Tilebox Workflows in depth, familiarize yourself with some common terms used throughout this section.
A Task is the smallest unit of work, designed to perform a specific operation. Each task represents a distinct operation or process that can be executed, such as processing data, performing calculations, or managing resources. Tasks can operate independently or as components of a more complex set of connected tasks known as a Workflow. Tasks are defined by their code, inputs, and dependencies on other tasks. To create tasks, you need to define the input parameters and specify the action to be performed during execution.
A job is a specific execution of a workflow with designated input parameters. It consists of one or more tasks that can run in parallel or sequentially, based on their dependencies. Submitting a job involves creating a root task with specific input parameters, which may trigger the execution of other tasks within the same job.
Task runners are the execution agents within the Tilebox Workflows ecosystem that execute tasks. They can be deployed in different computing environments, including on-premise servers and cloud-based auto-scaling clusters. Task runners execute tasks as scheduled by the workflow orchestrator, ensuring they have the necessary resources and environment for effective execution.
Clusters are a logical grouping for task runners. Using clusters, you can scope certain tasks to a specific group of task runners. Tasks, which are always submitted to a specific cluster, are only executed on task runners assigned to the same cluster.
Caches are shared storage that enable data storage and retrieval across tasks within a single job. They store intermediate results and share data among tasks, enabling distributed computing and reducing redundant data processing.
Observability refers to the feature set in Tilebox Workflows that provides visibility into the execution of tasks and jobs. Tools like tracing and logging allow users to monitor performance, diagnose issues, and gain insights into job operations, enabling efficient troubleshooting and optimization.
# Automations
Source: https://docs.tilebox.com/workflows/near-real-time/automations
Process data in near-real-time by triggering jobs based on external events
This feature is only available in the Python SDK.
## Introduction
Tilebox Workflows can execute jobs in two ways: a one-time execution triggered by a user, typically a batch processing, and near-real-time execution based on specific external events.
By defining trigger conditions, you can automatically submit jobs based on external events.
Tilebox Workflows currently supports the following trigger conditions:
Trigger jobs based on a Cron schedule.
Trigger jobs after objects are created or modified in a storage location such as a cloud bucket.
Dataset Event Triggers, which will trigger jobs when new data points are ingested into a Tilebox dataset, are on the roadmap. Stay tuned for updates.
## Automations
To create a trigger, define a special task that serves as a prototype. In response to a trigger condition met, this task will be submitted as a new job. Such tasks are referred to as automations.
Each automation has a [task identifier](/workflows/concepts/tasks#task-identifiers), a [version](/workflows/concepts/tasks#semantic-versioning), and [input parameters](/workflows/concepts/tasks#input-parameters), just like regular tasks.
Automations also automatically provide a special `trigger` attribute that contains information about the event that initiated the task's execution.
Go doesn't support registering automations yet, please use python or the console instead.
## Automation Client
The Tilebox Workflows client includes a sub-client for managing automations. You can create this sub-client by calling the `automations` method on the main client instance.
### Listing Registered Automations
To list all registered automations, use the `all` method on the automation client.
```python Python
from tilebox.workflows import Client
client = Client()
automations = client.automations()
automations = automations.all()
print(automations)
```
```plaintext Output
[
AutomationPrototype(
name='Run MyCronTask every hour at 15 minutes past the hour',
prototype=TaskSubmission(
cluster_slug='dev-cluster',
identifier=TaskIdentifier(name='MyCronTask', version='v0.0'),
input=b'{"message": "Hello"},
dependencies=[],
display='MyCronTask',
max_retries=0),
storage_event_triggers=[],
cron_triggers=[CronTrigger(schedule='15 * * * *')],
)
]
```
### Registering Automations
To register an automation, use the `create_*_automation` methods specific to each trigger type provided by the automation client. Refer to the documentation for each trigger type for more details.
## Overview in the Tilebox Console
You can also use the Tilebox Console to manage automations. Visit [the automations section](https://console.tilebox.com/workflows/automations) to check it out.
You can also register new automations or edit and delete existing ones directly from the console.
# Cron triggers
Source: https://docs.tilebox.com/workflows/near-real-time/cron
Trigger jobs based on a Cron schedule.
This feature is only available in the Python SDK.
## Creating Cron tasks
Cron tasks run repeatedly on a specified [cron](https://en.wikipedia.org/wiki/Cron) schedule.
To create a Cron task, use `tilebox.workflows.automations.CronTask` as your tasks base class instead of the regular `tilebox.workflows.Task`.
```python Python
from tilebox.workflows import ExecutionContext
from tilebox.workflows.automations import CronTask
class MyCronTask(CronTask):
message: str
def execute(self, context: ExecutionContext) -> None:
print(f"Hello {self.message} from a Cron Task!")
# self.trigger is an attribute of the CronTask class,
# which contains information about the trigger event
# that caused this task to be submitted as part of a job
print(f"This task was triggered at {self.trigger.time}")
```
## Registering a Cron trigger
After implementing a Cron task, register it to be triggered according to a Cron schedule.
When the Cron expression matches, a new job is submitted consisting of a single task instance derived from the Cron task prototype.
```python Python
from tilebox.workflows import Client
client = Client()
automations = client.automations()
cron_automation = automations.create_cron_automation(
"my-cron-automation", # name of the cron automation
"dev-cluster", # cluster slug to submit jobs to
MyCronTask(message="World"), # the task (and its input parameters) to run repeatedly
cron_triggers=[
"12 * * * *", # run every hour at minute 12
"45 18 * * *", # run every day at 18:45
"30 13 * * 3", # run every Wednesday at 13:30
],
)
```
The syntax for specifying the cron triggers is a [CRON expression](https://en.wikipedia.org/wiki/Cron#CRON_expression).
A helpful tool to test your cron expressions is [crontab.guru](https://crontab.guru/).
## Starting a Cron Task Runner
With the Cron automation registered, a job is submitted whenever the Cron expression matches. But unless a [task runner](/workflows/concepts/task-runners) is available to execute the Cron task the submitted jobs remain in a task queue.
Once an [eligible task runner](/workflows/concepts/task-runners#task-selection) becomes available, all jobs in the queue are executed.
```python Python
from tilebox.workflows import Client
client = Client()
runner = client.runner("dev-cluster", tasks=[MyCronTask])
runner.run_forever()
```
If this task runner runs continuously, its output may resemble the following:
```plaintext Output
Hello World from a Cron Task!
This task was triggered at 2023-09-25 16:12:00
Hello World from a Cron Task!
This task was triggered at 2023-09-25 17:12:00
Hello World from a Cron Task!
This task was triggered at 2023-09-25 18:12:00
Hello World from a Cron Task!
This task was triggered at 2023-09-25 18:45:00
Hello World from a Cron Task!
This task was triggered at 2023-09-25 19:12:00
```
## Inspecting in the Console
The [Tilebox Console](https://console.tilebox.com/workflows/automations) provides a straightforward way to inspect all registered Cron automations.
Use the console to view, edit, and delete the registered Cron automations.
## Deleting Cron automations
To delete a registered Cron automation, use `automations.delete`. After deletion, no new jobs will be submitted by that Cron trigger. Past jobs already triggered will still remain queued.
```python Python
from tilebox.workflows import Client
client = Client()
automations = client.automations()
# delete the automation as returned by create_cron_automation
automations.delete(cron_automation)
# or manually by id:
automations.delete("0190bafc-b3b8-88c4-008b-a5db044380d0")
```
## Submitting Cron jobs manually
You can submit Cron tasks as regular tasks for testing purposes or as part of a larger workflow. To do so, instantiate the task with a specific trigger time using the `once` method.
Submitting a job with a Cron task using `once` immediately schedules the task, and a runner may pick it up and execute it. The trigger time set in the `once` method does not influence the execution time; it only sets the `self.trigger.time` attribute for the Cron task.
```python Python
from datetime import datetime, timezone
job_client = client.jobs()
# create a Cron task prototype
task = MyCronTask(message="Hello")
# submitting it directly won't work: raises ValueError:
# CronTask cannot be submitted without being triggered. Use task.once().
job_client.submit("manual-cron-job", task, cluster="dev-cluster")
# specify a trigger time to submit the task as a regular task
triggered_task = task.once() # same as task.once(datetime.now())
job_client.submit("manual-cron-job", triggered_task, cluster="dev-cluster")
# simulate a trigger at a specific time
triggered_task = task.once(datetime(2030, 12, 12, 15, 15, tzinfo=timezone.utc))
# the task will be scheduled to run immediately, even with a future trigger time
# but the self.trigger.time will be 2023-12-12T15:15:00Z for the task instance
job_client.submit("manual-cron-job", triggered_task, cluster="dev-cluster")
```
# Storage Event Triggers
Source: https://docs.tilebox.com/workflows/near-real-time/storage-events
Trigger jobs after objects are created or modified in a storage location
This feature is only available in the Python SDK.
## Creating a Storage Event Task
Storage Event Tasks are automations triggered when objects are created or modified in a [storage location](#storage-locations).
To create a Storage Event task, use `tilebox.workflows.automations.StorageEventTask` as your tasks base class instead of the regular `tilebox.workflows.Task`.
```python Python
from tilebox.workflows import ExecutionContext
from tilebox.workflows.automations import StorageEventTask, StorageEventType
from tilebox.workflows.observability.logging import get_logger
logger = get_logger()
class LogObjectCreation(StorageEventTask):
head_bytes: int
def execute(self, context: ExecutionContext) -> None:
# self.trigger is an attribute of the StorageEventTask class,
# which contains information about the storage event that triggered this task
if self.trigger.type == StorageEventType.CREATED:
path = self.trigger.location
logger.info(f"A new object was created: {path}")
# trigger.storage is a storage client for interacting with the storage
# location that triggered the event
# using it, we can read the object to get its content as a bytes object
data = self.trigger.storage.read(path)
logger.info(f"The object's file size is {len(data)} bytes")
logger.info(f"The object's first {self.head_bytes} bytes are: {data[:self.head_bytes]}")
```
## Storage Locations
Storage Event tasks are triggered when objects are created or modified in a storage location. This location can be a cloud storage bucket or a local file system. Tilebox supports the following storage locations:
### Registering a Storage Location
To make a storage location available within Tilebox workflows, it must be registered first. This involves specifying the location and setting up a notification system that forwards events to Tilebox, enabling task triggering. The setup varies depending on the storage location type.
For example, a GCP storage bucket is integrated by setting up a [PubSub Notification with a push subscription](https://cloud.google.com/storage/docs/pubsub-notifications). A local file system requires installing a filesystem watcher. To set up a storage location registered with Tilebox, please [get in touch](mailto:engineering@tilebox.com).
### Listing Available Storage Locations
To list all available storage locations, use the `all` method on the storage location client.
```python Python
from tilebox.workflows import Client
client = Client()
automations_client = client.automations()
storage_locations = automations_client.storage_locations()
print(storage_locations)
```
```plaintext Output
[
StorageLocation(
location="gcp-project:gcs-bucket-fab3fa2",
type=StorageType.GCS,
),
StorageLocation(
location="s3-bucket-263af15",
type=StorageType.S3,
),
StorageLocation(
location='/path/to/a/local/folder',
type=StorageType.FS,
),
]
```
### Reading Files from a Storage Location
Once a storage location is registered, you can read files from it using the `read` method on the storage client.
```python Python
gcs_bucket = storage_locations[0]
s3_bucket = storage_locations[1]
local_folder = storage_locations[2]
gcs_object = gcs_bucket.read("my-object.txt")
s3_object = s3_bucket.read("my-object.txt")
local_object = local_folder.read("my-object.txt")
```
The `read` method instantiates a client for the specific storage location. This requires that
the storage location is accessible by a task runner and may require credentials for cloud storage
or physical/network access to a locally mounted file system.
To set up authentication and enable access to a GCS storage bucket, check out the [Google Client docs for authentication](https://cloud.google.com/docs/authentication/client-libraries#python).
## Registering a Storage Event Trigger
After implementing a Storage Event task, register it to trigger each time a storage event occurs. This registration submits a new job consisting of a single task instance derived from the registered Storage Event task prototype.
```python Python
from tilebox.workflows import Client
client = Client()
automations = client.automations()
storage_event_automation = automations.create_storage_event_automation(
"log-object-creations", # name of the storage event automation
"dev-cluster", # cluster slug to submit jobs to
LogObjectCreation(head_bytes=20), # the task (and its input parameters) to run repeatedly
triggers=[
# you can specify a glob pattern:
# run every time a .txt file is created anywhere in the gcs bucket
(gcs_bucket, "**.txt"),
],
)
```
The syntax for specifying glob patterns follows [Standard Wildcards](https://tldp.org/LDP/GNU-Linux-Tools-Summary/html/x11655.htm).
Additionally, you can use `**` as a super-asterisk, a matching operator not sensitive to slash separators.
Here are some examples of valid glob patterns:
| Pattern | Matches |
| ----------- | ---------------------------------------------------------------------------- |
| `*.ext` | Any file ending in `.ext` in the root directory |
| `**/*.ext` | Any file ending in `.ext` in any subdirectory, but not in the root directory |
| `**.ext` | Any file ending in `.ext` in any subdirectory, including the root directory |
| `folder/*` | Any file directly in a `folder` subdirectory |
| `folder/**` | Any file directly or recursively part of a `folder` subdirectory |
| `[a-z].txt` | Matches `a.txt`, `b.txt`, etc. |
## Start a Storage Event Task Runner
With the Storage Event automation registered, a job is submitted whenever a storage event occurs. But unless a [task runner](/workflows/concepts/task-runners) is available to execute the Storage Event task the submitted jobs remain in a task queue.
Once an [eligible task runner](/workflows/concepts/task-runners#task-selection) becomes available, all jobs in the queue are executed.
```python Python
from tilebox.workflows import Client
client = Client()
runner = client.runner("dev-cluster", tasks=[LogObjectCreation])
runner.run_forever()
```
### Triggering an Event
Creating an object in the bucket where the task is registered results in a job being submitted:
```bash Creating an object
echo "Hello World" > my-object.txt
gcloud storage cp my-object.txt gs://gcs-bucket-fab3fa2
```
Inspecting the task runner output reveals that the job was submitted and the task executed:
```plaintext Output
2024-09-25 16:51:45,621 INFO A new object was created: my-object.txt
2024-09-25 16:51:45,857 INFO The object's file size is 12 bytes
2024-09-25 16:51:45,858 INFO The object's first 20 bytes are: b'Hello World\n'
```
## Inspecting in the Console
The [Tilebox Console](https://console.tilebox.com/workflows/automations) provides an easy way to inspect all registered storage event automations.
## Deleting Storage Event automations
To delete a registered storage event automation, use `automations.delete`. After deletion, no new jobs will be submitted by the storage event trigger. Past jobs already triggered will still remain queued.
```python Python
from tilebox.workflows import Client
client = Client()
automations = client.automations()
# delete the automation as returned by create_storage_event_automation
automations.delete(storage_event_automation)
# or manually by id:
automations.delete("0190bafc-b3b8-88c4-008b-a5db044380d0")
```
## Submitting Storage Event jobs manually
You can submit Storage event tasks as regular tasks for testing purposes or as part of a larger workflow. To do so, instantiate the task with a specific storage location and object name using the `once` method.
```python Python
job_client = client.jobs()
task = LogObjectCreation(head_bytes=20)
# submitting it directly won't work; raises ValueError:
# StorageEventTask cannot be submitted without being triggered. Use task.once().
job_client.submit(
"manual-storage-event-job",
task,
cluster="dev-cluster"
)
# instead, specify a trigger,
# so that we can submit the task as a regular task
triggered_task = task.once(gcs_bucket, "my-object.txt")
job_client.submit(
"manual-storage-event-job",
triggered_task,
cluster="dev-cluster"
)
```
# Logging
Source: https://docs.tilebox.com/workflows/observability/logging
Set up distributed logging using the OpenTelemetry logging protocol
## Overview
Tilebox workflows are designed for distributed execution, making it essential to set up logging to a centralized system. Tilebox supports OpenTelemetry logging, which simplifies sending log messages from your tasks to a chosen backend. Collecting and visualizing logs from a distributed cluster of task runners in a tool like [Axiom](https://axiom.co/) can look like this:
## Configure logging
The Tilebox workflow SDKs include support for exporting OpenTelemetry logs. To enable logging, call the appropriate configuration functions during the startup of your[task runner](/workflows/concepts/task-runners). Then, use the provided `logger` to send log messages from your tasks.
To configure logging with Axiom, you first need to create a [Axiom Dataset](https://axiom.co/docs/reference/datasets) to export your workflow logs to. You will also need an [Axiom API key](https://axiom.co/docs/reference/tokens) with the necessary write permissions for your Axiom dataset.
```python Python
from tilebox.workflows import Client, Task, ExecutionContext
from tilebox.workflows.observability.logging import configure_otel_logging_axiom
# your own workflow:
from my_workflow import MyTask
def main():
configure_otel_logging_axiom(
# specify an Axiom dataset to export logs to
dataset="my-axiom-logs-dataset",
# along with an Axiom API key with ingest permissions for that dataset
api_key="my-axiom-api-key",
)
# the task runner will export logs from
# the executed tasks to the specified dataset
client = Client()
runner = client.runner(tasks=[MyTask])
runner.run_forever()
if __name__ == "__main__":
main()
```
```go Go
package main
import (
"context"
"log/slog"
"github.com/tilebox/tilebox-go/examples/workflows/axiom"
"github.com/tilebox/tilebox-go/observability"
"github.com/tilebox/tilebox-go/observability/logger"
"github.com/tilebox/tilebox-go/workflows/v1"
)
// specify a service name and version to identify the instrumenting application in traces and logs
var service = &observability.Service{Name: "task-runner", Version: "dev"}
func main() {
ctx := context.Background()
// Setup OpenTelemetry logging and slog
// It uses AXIOM_API_KEY and AXIOM_LOGS_DATASET from the environment
axiomHandler, shutdownLogger, err := logger.NewAxiomHandlerFromEnv(ctx, service,
logger.WithLevel(slog.LevelInfo), // export logs at info level and above as OTEL logs
)
defer shutdownLogger(ctx)
if err != nil {
slog.Error("failed to set up axiom log handler", slog.Any("error", err))
return
}
tileboxLogger := logger.New( // initialize a slog.Logger
axiomHandler, // export logs to the Axiom handler
logger.NewConsoleHandler(logger.WithLevel(slog.LevelWarn)), // and additionally, export WARN and ERROR logs to stdout
)
slog.SetDefault(tileboxLogger) // all future slog calls will be forwarded to the tilebox logger
client := workflows.NewClient()
taskRunner, err := client.NewTaskRunner(ctx)
if err != nil {
slog.Error("failed to create task runner", slog.Any("error", err))
return
}
err = taskRunner.RegisterTasks(&MyTask{})
if err != nil {
slog.Error("failed to register tasks", slog.Any("error", err))
return
}
taskRunner.RunForever(ctx)
}
```
Setting the environment variables `AXIOM_API_KEY` and `AXIOM_LOGS_DATASET` allows you to omit these arguments in the `configure_otel_logging_axiom` function.
If you are using another OpenTelemetry-compatible backend besides Axiom, such as OpenTelemetry Collector or Jaeger, you can configure logging by specifying the URL endpoint to export log messages to.
```python Python
from tilebox.workflows import Client
from tilebox.workflows.observability.logging import configure_otel_logging
# your own workflow:
from my_workflow import MyTask
def main():
configure_otel_logging(
# specify an endpoint to export logs to, such as a
# locally running instance of OpenTelemetry Collector
endpoint="http://localhost:4318/v1/logs",
# optional headers for each request
headers={"Authorization": "Bearer some-api-key"},
)
# the task runner will export logs from
# the executed tasks to the specified endpoint
client = Client()
runner = client.runner(tasks=[MyTask])
runner.run_forever()
if __name__ == "__main__":
main()
```
```go Go
package main
import (
"context"
"log/slog"
"github.com/tilebox/tilebox-go/examples/workflows/opentelemetry"
"github.com/tilebox/tilebox-go/observability"
"github.com/tilebox/tilebox-go/observability/logger"
"github.com/tilebox/tilebox-go/workflows/v1"
)
// specify a service name and version to identify the instrumenting application in traces and logs
var service = &observability.Service{Name: "task-runner", Version: "dev"}
func main() {
ctx := context.Background()
endpoint := "http://localhost:4318"
headers := map[string]string{
"Authorization": "Bearer ",
}
// Setup an OpenTelemetry log handler, exporting logs to an OTEL compatible log endpoint
otelHandler, shutdownLogger, err := logger.NewOtelHandler(ctx, service,
logger.WithEndpointURL(endpoint),
logger.WithHeaders(headers),
logger.WithLevel(slog.LevelInfo), // export logs at info level and above as OTEL logs
)
defer shutdownLogger(ctx)
if err != nil {
slog.Error("failed to set up otel log handler", slog.Any("error", err))
return
}
tileboxLogger := logger.New( // initialize a slog.Logger
otelHandler, // export logs to the OTEL handler
logger.NewConsoleHandler(logger.WithLevel(slog.LevelWarn)), // and additionally, export WARN and ERROR logs to stdout
)
slog.SetDefault(tileboxLogger) // all future slog calls will be forwarded to the tilebox logger
client := workflows.NewClient()
taskRunner, err := client.NewTaskRunner(ctx)
if err != nil {
slog.Error("failed to create task runner", slog.Any("error", err))
return
}
err = taskRunner.RegisterTasks(&MyTask{})
if err != nil {
slog.Error("failed to register tasks", slog.Any("error", err))
return
}
taskRunner.RunForever(ctx)
}
```
If you set the environment variable `OTEL_LOGS_ENDPOINT`, you can omit that argument in the `configure_otel_logging` function.
To log messages to the standard console output, use the `configure_console_logging` function.
```python Python
from tilebox.workflows import Client
from tilebox.workflows.observability.logging import configure_console_logging
# your own workflow:
from my_workflow import MyTask
def main():
configure_console_logging()
# the task runner will print log messages from
# the executed tasks to the console
client = Client()
runner = client.runner(tasks=[MyTask])
runner.run_forever()
if __name__ == "__main__":
main()
```
```go Go
package main
import (
"context"
"log/slog"
"github.com/tilebox/tilebox-go/examples/workflows/opentelemetry"
"github.com/tilebox/tilebox-go/observability/logger"
"github.com/tilebox/tilebox-go/workflows/v1"
)
func main() {
ctx := context.Background()
tileboxLogger := logger.New(logger.NewConsoleHandler(logger.WithLevel(slog.LevelWarn)))
slog.SetDefault(tileboxLogger) // all future slog calls will be forwarded to the tilebox logger
client := workflows.NewClient()
taskRunner, err := client.NewTaskRunner(ctx)
if err != nil {
slog.Error("failed to create task runner", slog.Any("error", err))
return
}
err = taskRunner.RegisterTasks(&MyTask{})
if err != nil {
slog.Error("failed to register tasks", slog.Any("error", err))
return
}
taskRunner.RunForever(ctx)
}
```
The console logging backend is not recommended for production use. Log messages will be emitted to the standard output of each task runner rather than a centralized logging system. It is intended for local development and testing of workflows.
## Emitting log messages
Use the logger provided by the Tilebox SDK to emit log messages from your tasks. You can then use it to send log messages to the [configured logging backend](#configure-logging).
Log messages emitted within a task's `execute` method are also automatically recorded as span events for the current [job trace](/workflows/observability/tracing).
```python Python
import logging
from tilebox.workflows import Task, ExecutionContext
from tilebox.workflows.observability.logging import get_logger
logger = get_logger()
class MyTask(Task):
def execute(self, context: ExecutionContext) -> None:
# emit a log message to the configured OpenTelemetry backend
logger.info("Hello world from configured logger!")
```
```go Go
package tasks
import (
"context"
"log/slog"
)
type MyTask struct{}
func (t *MyTask) Execute(context.Context) error {
// emit a log message to the configured OpenTelemetry backend
slog.Info("Hello world from configured logger!")
return nil
}
```
## Logging task runner internals
In python, Tilebox task runners also internally use a logger. By default, it's set to the WARNING level, but you can change it by explicitly configuring a logger for the workflows client when constructing the task runner.
```python Python
from tilebox.workflows import Client
from tilebox.workflows.observability.logging import configure_otel_logging_axiom
from tilebox.workflows.observability.logging import get_logger
# configure Axiom or another logging backend
configure_otel_logging_axiom(
dataset="my-axiom-logs-dataset",
api_key="my-axiom-api-key",
)
# configure a logger for the Tilebox client at the INFO level
client = Client()
client.configure_logger(get_logger(level=logging.INFO))
# now the task runner inherits this logger and uses
# it to emit its own internal log messages as well
runner = client.runner(tasks=[MyTask])
runner.run_forever()
```
# OpenTelemetry Integration
Source: https://docs.tilebox.com/workflows/observability/open-telemetry
Integrate OpenTelemetry into your Tilebox Workflows
## Observability
Effective observability is essential for building reliable workflows. Understanding and monitoring the execution of workflows and their tasks helps ensure correctness and efficiency. This section describes methods to gain insights into your workflow's execution.
## OpenTelemetry
Tilebox Workflows is designed with [OpenTelemetry](https://opentelemetry.io/) in mind, which provides a set of APIs and libraries for instrumenting, generating, collecting, and exporting telemetry data (metrics, logs, and traces) in distributed systems.
Tilebox Workflows currently supports OpenTelemetry for tracing and logging, with plans to include metrics in the future.
## Integrations
Tilebox exports telemetry data using the [OpenTelemetry Protocol](https://opentelemetry.io/docs/specs/otlp/).
Axiom, a cloud-based observability and telemetry platform, supports this protocol. Tilebox Workflows has built-in support for Axiom, and the examples and screenshots in this section come from this integration.
Additionally, any other OpenTelemetry-compatible backend, such as OpenTelemetry Collector or Jaeger, can be used to collect telemetry data generated by Tilebox Workflows.
# Tracing
Source: https://docs.tilebox.com/workflows/observability/tracing
Record the execution of your workflow tasks as OpenTelemetry traces and spans
## Overview
Applying [OpenTelemetry traces](https://opentelemetry.io/docs/concepts/signals/traces/) to the concept of workflows allows you to monitor the execution of your jobs and their individual tasks. Visualizing the trace for a job in a tool like [Axiom](https://axiom.co/) may look like this:
Tracing your workflows enables you to easily observe:
* The order of task execution
* Which tasks run in parallel
* The [task runner](/workflows/concepts/task-runners) handling each task
* The duration of each task
* The outcome of each task (success or failure)
This information helps identify bottlenecks and performance issues, ensuring that your workflows execute correctly.
## Configure tracing
The Tilebox workflow SDKs have built-in support for exporting OpenTelemetry traces. To enable tracing, call the appropriate configuration functions during the startup of your [task runner](/workflows/concepts/task-runners).
To configure tracing with Axiom, you first need to create a [Axiom Dataset](https://axiom.co/docs/reference/datasets) to export your workflow traces to. You will also need an [Axiom API key](https://axiom.co/docs/reference/tokens) with the necessary write permissions for your Axiom dataset.
```python Python
from tilebox.workflows import Client
from tilebox.workflows.observability.tracing import configure_otel_tracing_axiom
# your own workflow:
from my_workflow import MyTask
def main():
configure_otel_tracing_axiom(
# specify an Axiom dataset to export traces to
dataset="my-axiom-traces-dataset",
# along with an Axiom API key for ingest permissions
api_key="my-axiom-api-key",
)
# the following task runner generates traces for executed tasks and
# exports trace and span data to the specified Axiom dataset
client = Client()
runner = client.runner(tasks=[MyTask])
runner.run_forever()
if __name__ == "__main__":
main()
```
```go Go
package main
import (
"context"
"log/slog"
"github.com/tilebox/tilebox-go/examples/workflows/axiom"
"github.com/tilebox/tilebox-go/observability"
"github.com/tilebox/tilebox-go/observability/tracer"
"github.com/tilebox/tilebox-go/workflows/v1"
"go.opentelemetry.io/otel"
)
// specify a service name and version to identify the instrumenting application in traces and logs
var service = &observability.Service{Name: "task-runner", Version: "dev"}
func main() {
ctx := context.Background()
// Setup an OpenTelemetry trace span processor, exporting traces and spans to Axiom
// It uses AXIOM_API_KEY and AXIOM_TRACES_DATASET from the environment
tileboxTracerProvider, shutdown, err := tracer.NewAxiomProviderFromEnv(ctx, service)
defer shutdown(ctx)
if err != nil {
slog.Error("failed to set up axiom tracer provider", slog.Any("error", err))
return
}
otel.SetTracerProvider(tileboxTracerProvider) // set the tilebox tracer provider as the global OTEL tracer provider
client := workflows.NewClient()
taskRunner, err := client.NewTaskRunner(ctx)
if err != nil {
slog.Error("failed to create task runner", slog.Any("error", err))
return
}
err = taskRunner.RegisterTasks(&MyTask{})
if err != nil {
slog.Error("failed to register tasks", slog.Any("error", err))
return
}
taskRunner.RunForever(ctx)
}
```
Set the environment variables `AXIOM_API_KEY` and `AXIOM_TRACES_DATASET` to omit those arguments
in the `configure_otel_tracing_axiom` function.
If you are using another OpenTelemetry-compatible backend besides Axiom, like OpenTelemetry Collector or Jaeger, you can configure tracing by specifying the URL endpoint to export traces to.
```python Python
from tilebox.workflows import Client
from tilebox.workflows.observability.tracing import configure_otel_tracing
# your own workflow:
from my_workflow import MyTask
def main():
configure_otel_tracing(
# specify an endpoint for trace ingestion, such as a
# locally running instance of OpenTelemetry Collector
endpoint="http://localhost:4318/v1/traces",
# optional headers for each request
headers={"Authorization": "Bearer some-api-key"},
)
# the following task runner generates traces for executed tasks and
# exports trace and span data to the specified endpoint
client = Client()
runner = client.runner(tasks=[MyTask])
runner.run_forever()
if __name__ == "__main__":
main()
```
```go Go
package main
import (
"context"
"log/slog"
"github.com/tilebox/tilebox-go/examples/workflows/opentelemetry"
"github.com/tilebox/tilebox-go/observability"
"github.com/tilebox/tilebox-go/observability/tracer"
"github.com/tilebox/tilebox-go/workflows/v1"
"go.opentelemetry.io/otel"
)
// specify a service name and version to identify the instrumenting application in traces and logs
var service = &observability.Service{Name: "task-runner", Version: "dev"}
func main() {
ctx := context.Background()
endpoint := "http://localhost:4318"
headers := map[string]string{
"Authorization": "Bearer ",
}
// Setup an OpenTelemetry trace span processor, exporting traces and spans to an OTEL compatible trace endpoint
tileboxTracerProvider, shutdown, err := tracer.NewOtelProvider(ctx, service,
tracer.WithEndpointURL(endpoint),
tracer.WithHeaders(headers),
)
defer shutdown(ctx)
if err != nil {
slog.Error("failed to set up otel span processor", slog.Any("error", err))
return
}
otel.SetTracerProvider(tileboxTracerProvider) // set the tilebox tracer provider as the global OTEL tracer provider
client := workflows.NewClient()
taskRunner, err := client.NewTaskRunner(ctx)
if err != nil {
slog.Error("failed to create task runner", slog.Any("error", err))
return
}
err = taskRunner.RegisterTasks(&MyTask{})
if err != nil {
slog.Error("failed to register tasks", slog.Any("error", err))
return
}
taskRunner.RunForever(ctx)
}
```
Set the environment variable `OTEL_TRACES_ENDPOINT` to omit that argument
in the `configure_otel_tracing` function.
Once the runner picks up tasks and executes them, corresponding traces and spans are automatically generated and exported to the configured backend.