# AI Assistance Source: https://docs.tilebox.com/ai-assistance Large Language Models (LLMs) are powerful tools for exploring and learning about Tilebox. This section explains how to provide them with Tilebox-specific context for tailored, relevant and up-to-date responses. ## Providing complete Tilebox context AI assistants and Large Language Models (LLMs) can answer questions, guide you in using Tilebox, suggest relevant sections in the documentation, and assist you in creating workflows. Download the complete Tilebox documentation as a plain text file to share with your AI assistant or language model. The full content of the Tilebox documentation is available in plain markdown format at [https://docs.tilebox.com/llms-full.txt](https://docs.tilebox.com/llms-full.txt). Upload this document to your AI assistant or LLM to provide it with Tilebox-specific context. The [Documentation Context](https://docs.tilebox.com/llms-full.txt) is updated whenever the documentation changes. If you download the file, refresh it occasionally to stay up-to-date. ## Providing tailored context via MCP Tilebox Docs can be installed as an MCP tool, so an MCP client can ask for detailed context on specific topics. Run the following command to generate an MCP server for Tilebox Docs. ```bash npx mint-mcp add tilebox ``` The command line tool will guide you along the installation process for Cursor, Windsurf, Claude Code, Augment Code or other MCP clients. The MCP server always retrieves the most up to date version of the documentation. # As Source: https://docs.tilebox.com/api-reference/go/datasets/As ```go func As[T proto.Message](seq iter.Seq2[[]byte, error]) iter.Seq2[T, error] ``` Convert a sequence of bytes into a sequence of `proto.Message`. Useful to convert the output of [`Datapoints.Query`](/api-reference/go/datasets/Datapoints.Query) into a sequence of `proto.Message`. ## Parameters The sequence of bytes to convert ## Returns A sequence of `proto.Message` or an error if any. ```go Go import ( "time" datasets "github.com/tilebox/tilebox-go/datasets/v1" "github.com/tilebox/tilebox-go/query" ) startDate := time.Date(2014, 10, 4, 0, 0, 0, 0, time.UTC) endDate := time.Date(2021, 2, 24, 0, 0, 0, 0, time.UTC) queryInterval := query.NewTimeInterval(startDate, endDate) seq := datasets.As[*v1.Sentinel1Sar]( client.Datapoints.Query(ctx, collectionID, datasets.WithTemporalExtent(queryInterval)), ) ``` # Collect Source: https://docs.tilebox.com/api-reference/go/datasets/Collect ```go func Collect[K any](seq iter.Seq2[K, error]) ([]K, error) ``` Convert any sequence into a slice. It return an error if any of the elements in the sequence has a non-nil error. ## Parameters The sequence of bytes to convert ## Returns A slice of `K` or an error if any. ```go Go import ( "time" datasets "github.com/tilebox/tilebox-go/datasets/v1" "github.com/tilebox/tilebox-go/query" ) startDate := time.Date(2014, 10, 4, 0, 0, 0, 0, time.UTC) endDate := time.Date(2021, 2, 24, 0, 0, 0, 0, time.UTC) queryInterval := query.NewTimeInterval(startDate, endDate) datapoints, err := datasets.Collect(datasets.As[*v1.Sentinel1Sar]( client.Datapoints.Query(ctx, collectionID, datasets.WithTemporalExtent(queryInterval)), )) ``` # CollectAs Source: https://docs.tilebox.com/api-reference/go/datasets/CollectAs ```go func CollectAs[T proto.Message](seq iter.Seq2[[]byte, error]) ([]T, error) ``` Convert a sequence of bytes into a slice of `proto.Message`. Useful to convert the output of [`Datapoints.Query`](/api-reference/go/datasets/Datapoints.Query) into a slice of `proto.Message`. This a convenience function for `Collect(As[T](seq))`. ## Parameters The sequence of bytes to convert ## Returns A slice of `proto.Message` or an error if any. ```go Go import ( "time" datasets "github.com/tilebox/tilebox-go/datasets/v1" "github.com/tilebox/tilebox-go/query" ) startDate := time.Date(2014, 10, 4, 0, 0, 0, 0, time.UTC) endDate := time.Date(2021, 2, 24, 0, 0, 0, 0, time.UTC) queryInterval := query.NewTimeInterval(startDate, endDate) datapoints, err := datasets.CollectAs[*v1.Sentinel1Sar]( client.Datapoints.Query(ctx, collectionID, datasets.WithTemporalExtent(queryInterval)), ) ``` # Client.Collections.Create Source: https://docs.tilebox.com/api-reference/go/datasets/Collections.Create ```go func (collectionClient) Create( ctx context.Context, datasetID uuid.UUID, collectionName string, ) (*datasets.Collection, error) ``` Create a collection in the dataset. ## Parameters The id of the dataset The name of the collection ## Returns The created collection object. ```go Go collection, err := client.Collections.Create(ctx, datasetID, "My-collection", ) ``` # Client.Collections.Delete Source: https://docs.tilebox.com/api-reference/go/datasets/Collections.Delete ```go func (collectionClient) Delete( ctx context.Context, datasetID uuid.UUID, collectionID uuid.UUID, ) error ``` Delete a collection by its id. ## Parameters The id of the dataset The id of the collection ## Returns An error if the collection could not be deleted. ```go Go err := client.Collections.Delete(ctx, datasetID, collectionID, ) ``` ## Errors The specified dataset does not exist. The specified collection does not exist. # Client.Collections.Get Source: https://docs.tilebox.com/api-reference/go/datasets/Collections.Get ```go func (collectionClient) Get( ctx context.Context, datasetID uuid.UUID, name string, ) (*datasets.Collection, error) ``` Get a dataset by its slug. ## Parameters The id of the dataset The name of the collection ## Returns The created collection object. ```go Go collection, err := client.Collections.Get(ctx, datasetID, "My-collection", ) ``` ## Errors The specified dataset does not exist. # Client.Collections.GetOrCreate Source: https://docs.tilebox.com/api-reference/go/datasets/Collections.GetOrCreate ```go func (collectionClient) GetOrCreate( ctx context.Context, datasetID uuid.UUID, name string, ) (*datasets.Collection, error) ``` Get or create a collection by its name. If the collection does not exist, it will be created. ## Parameters The id of the dataset The name of the collection ## Returns A collection object. ```go Go collection, err := client.Collections.GetOrCreate(ctx, datasetID, "My-collection", ) ``` # Client.Collections.List Source: https://docs.tilebox.com/api-reference/go/datasets/Collections.List ```go func (collectionClient) List( ctx context.Context, datasetID uuid.UUID, ) ([]*datasets.Collection, error) ``` List the available collections in a dataset. ## Parameters The id of the dataset ## Returns A list of collection objects. ```go Go collections, err := client.Collections.List(ctx, datasetID, ) ``` ## Errors The specified dataset does not exist. # Client.Datapoints.Delete Source: https://docs.tilebox.com/api-reference/go/datasets/Datapoints.Delete ```go func (datapointClient) Delete( ctx context.Context, collectionID uuid.UUID, datapoints any, ) (int64, error) ``` Delete data points from a collection. Data points are identified and deleted by their ids. ## Parameters The id of the collection The datapoints to delete from the collection ## Returns The number of data points that were deleted. ```go Go var datapoints []*v1.Sentinel1Sar // assuming the slice is filled with datapoints numDeleted, err := client.Datapoints.Delete(ctx, collectionID, datapoints, ) ``` # Client.Datapoints.DeleteIDs Source: https://docs.tilebox.com/api-reference/go/datasets/Datapoints.DeleteIDs ```go func (datapointClient) DeleteIDs( ctx context.Context, collectionID uuid.UUID, datapointIDs []uuid.UUID, ) (int64, error) ``` Delete data points from a collection. ## Parameters The id of the collection The ids of the data points to delete from the collection ## Returns The number of data points that were deleted. ```go Go numDeleted, err := client.Datapoints.DeleteIDs(ctx, collectionID, []uuid.UUID{ uuid.MustParse("0195c87a-49f6-5ffa-e3cb-92215d057ea6"), uuid.MustParse("0195c87b-bd0e-3998-05cf-af6538f34957"), }, ) ``` # Client.Datapoints.GetInto Source: https://docs.tilebox.com/api-reference/go/datasets/Datapoints.GetInto ```go func (datapointClient) GetInto( ctx context.Context, collectionIDs []uuid.UUID, datapointID uuid.UUID, datapoint proto.Message, options ...QueryOption, ) error ``` Get a data point by its id from one of the specified collections. The data point is stored in the `datapoint` parameter. ## Parameters The ids of the collections to query The id of the datapoint to query The datapoint to query into Options for querying data points. ## Options Skip the data when querying datapoint. If set, only the required and auto-generated fields will be returned. ## Returns An error if data point could not be queried. ```go Go var datapoint v1.Sentinel1Sar err = client.Datapoints.GetInto(ctx, []uuid.UUID{collection.ID}, datapointID, &datapoint, ) ``` # Client.Datapoints.Ingest Source: https://docs.tilebox.com/api-reference/go/datasets/Datapoints.Ingest ```go func (datapointClient) Ingest( ctx context.Context, collectionID uuid.UUID, datapoints any, allowExisting bool, ) (*datasets.IngestResponse, error) ``` Ingest data points into a collection. ## Parameters The id of the collection The datapoints to ingest Datapoint fields are used to generate a deterministic unique `UUID` for each datapoint in a collection. Duplicate data points result in the same ID being generated. If `allowExisting` is `true`, `ingest` will skip those datapoints, since they already exist. If `allowExisting` is `false`, `ingest` will raise an error if any of the generated datapoint IDs already exist. ## Returns The list of datapoint ids that were ingested, including the IDs of existing data points in case of duplicates and `allowExisting=true`. ```go Go datapoints := []*v1.Modis{ v1.Modis_builder{ Time: timestamppb.New(time.Now()), GranuleName: proto.String("Granule 1"), }.Build(), v1.Modis_builder{ Time: timestamppb.New(time.Now().Add(-5 * time.Hour)), GranuleName: proto.String("Past Granule 2"), }.Build(), } ingestResponse, err := client.Datapoints.Ingest(ctx, collectionID, &datapoints false, ) ``` ## Errors If `allowExisting` is `False` and any of the datapoints attempting to ingest already exist. # Client.Datapoints.Query Source: https://docs.tilebox.com/api-reference/go/datasets/Datapoints.Query ```go func (datapointClient) Query( ctx context.Context, collectionIDs []uuid.UUID, options ...datasets.QueryOption, ) iter.Seq2[[]byte, error] ``` Query a range of data points in the specified collections in a specified interval. The datapoints are lazily queried and returned as a sequence of bytes. The output sequence can be transformed into a typed `proto.Message` using [CollectAs](/api-reference/go/datasets/CollectAs) or [As](/api-reference/go/datasets/As) functions. ## Parameters The ids of the collections to query Options for querying data points ## Options Specify the time interval for which data should be queried. Right now, a temporal extent is required for every query. Specify the geographical extent in which to query data. Optional, if not specified the query will return all results found globally. Skip the data when querying datapoints. If set, only the required and auto-generated fields will be returned. ## Returns A sequence of bytes containing the requested data points as bytes. ```go Go import ( "time" datasets "github.com/tilebox/tilebox-go/datasets/v1" "github.com/tilebox/tilebox-go/query" ) startDate := time.Date(2014, 10, 4, 0, 0, 0, 0, time.UTC) endDate := time.Date(2021, 2, 24, 0, 0, 0, 0, time.UTC) queryInterval := query.NewTimeInterval(startDate, endDate) datapoints, err := datasets.CollectAs[*v1.Sentinel1Sar]( client.Datapoints.Query(ctx, []uuid.UUID{collection.ID}, datasets.WithTemporalExtent(queryInterval)), ) ``` # Client.Datapoints.QueryInto Source: https://docs.tilebox.com/api-reference/go/datasets/Datapoints.QueryInto ```go func (datapointClient) QueryInto( ctx context.Context, collectionIDs []uuid.UUID, datapoints any, options ...datasets.QueryOption, ) error ``` Query a range of data points in the specified collections in a specified interval. QueryInto is a convenience function for [Query](/api-reference/go/datasets/Datapoints.Query), when no manual pagination or custom iteration is required. ## Parameters The ids of the collections to query The datapoints to query into Options for querying data points ## Options Specify the time interval for which data should be queried. Right now, a temporal extent is required for every query. Specify the geographical extent in which to query data. Optional, if not specified the query will return all results found globally. Skip the data when querying datapoints. If set, only the required and auto-generated fields will be returned. ## Returns An error if data points could not be queried. ```go Go import ( "time" datasets "github.com/tilebox/tilebox-go/datasets/v1" "github.com/tilebox/tilebox-go/query" ) startDate := time.Date(2014, 10, 4, 0, 0, 0, 0, time.UTC) endDate := time.Date(2021, 2, 24, 0, 0, 0, 0, time.UTC) queryInterval := query.NewTimeInterval(startDate, endDate) var datapoints []*v1.Sentinel1Sar err := client.Datapoints.QueryInto(ctx, []uuid.UUID{collection.ID}, &datapoints, datasets.WithTemporalExtent(queryInterval), ) ``` # Client.Datasets.Get Source: https://docs.tilebox.com/api-reference/go/datasets/Get ```go func (datasetClient) Get( ctx context.Context, slug string, ) (*datasets.Dataset, error) ``` Get a dataset by its slug. ## Parameters The slug of the dataset ## Returns A dataset object. ```go Go s1_sar, err := client.Datasets.Get(ctx, "open_data.copernicus.sentinel1_sar" ) ``` ## Errors The specified dataset does not exist. # Client.Datasets.List Source: https://docs.tilebox.com/api-reference/go/datasets/List ```go func (datasetClient) List(ctx context.Context) ([]*datasets.Dataset, error) ``` Fetch all available datasets. ## Returns A list of all available datasets. ```go Go datasets, err := client.Datasets.List(ctx) ``` # Client.Clusters.Create Source: https://docs.tilebox.com/api-reference/go/workflows/Clusters.Create ```go func (*ClusterClient) Create( ctx context.Context, name string, ) (*workflows.Cluster, error) ``` Create a cluster. ## Parameters A display name for the cluster ## Returns The created cluster object. ```go Go cluster, err := client.Clusters.Create(ctx, "My cluster" ) ``` # Client.Clusters.Delete Source: https://docs.tilebox.com/api-reference/go/workflows/Clusters.Delete ```go func (*ClusterClient) Delete(ctx context.Context, slug string) error ``` Delete a cluster by its slug. ## Parameters The slug of the cluster to delete ## Returns An error if the cluster could not be deleted. ```go Go err := client.Clusters.Delete(ctx, "my-cluster-tZD9Ca1qsqt3V" ) ``` ## Errors The specified cluster does not exist. # Client.Clusters.Get Source: https://docs.tilebox.com/api-reference/go/workflows/Clusters.Get ```go func (*ClusterClient) Get( ctx context.Context, slug string, ) (*workflows.Cluster, error) ``` Get a cluster by its slug. ## Parameters The slug of the cluster ## Returns A cluster object. ```go Go cluster, err := client.Clusters.Get(ctx, "my-cluster-tZD9Ca1qsqt3V" ) ``` ## Errors The specified cluster does not exist. # Client.Clusters.List Source: https://docs.tilebox.com/api-reference/go/workflows/Clusters.List ```go func (*ClusterClient) List(ctx context.Context) ([]*workflows.Cluster, error) ``` Fetch all available clusters. ## Returns A list of all available clusters. ```go Go clusters, err := client.Clusters.List(ctx) ``` # Collect Source: https://docs.tilebox.com/api-reference/go/workflows/Collect ```go func Collect[K any](seq iter.Seq2[K, error]) ([]K, error) ``` Convert any sequence into a slice. It return an error if any of the elements in the sequence has a non-nil error. ## Parameters The sequence of bytes to convert ## Returns A slice of `K` or an error if any. ```go Go jobs, err := workflows.Collect( client.Jobs.List(ctx, interval), ) ``` # workflows.GetCurrentCluster Source: https://docs.tilebox.com/api-reference/go/workflows/GetCurrentCluster ```go workflows.GetCurrentCluster(ctx context.Context) (string, error) ``` Get the current cluster slug. This function is intended to be used in tasks. ## Returns The current cluster slug. ```go Go type Task struct{} func (t *Task) Execute(ctx context.Context) error { clusterSlug, err := workflows.GetCurrentCluster(ctx) if err != nil { return fmt.Errorf("failed to get current cluster: %w", err) } return nil } ``` # Client.Jobs.Cancel Source: https://docs.tilebox.com/api-reference/go/workflows/Jobs.Cancel ```go func (*JobClient) Cancel(ctx context.Context, jobID uuid.UUID) error ``` Cancel a job. When a job is canceled, no queued tasks will be picked up by task runners and executed even if task runners are idle. Tasks that are already being executed will finish their execution and not be interrupted. All sub-tasks spawned from such tasks after the cancellation will not be picked up by task runners. ## Parameters The id of the job ## Returns An error if the job could not be cancelled. ```go Go err := client.Jobs.Cancel(ctx, uuid.MustParse("0195c87a-49f6-5ffa-e3cb-92215d057ea6"), ) ``` # Client.Jobs.Get Source: https://docs.tilebox.com/api-reference/go/workflows/Jobs.Get ```go func (*JobClient) Get( ctx context.Context, jobID uuid.UUID, ) (*workflows.Job, error) ``` Get a job by its id. ## Parameters The id of the job ## Returns A job object. ```go Go job, err := client.Jobs.Get(ctx, uuid.MustParse("0195c87a-49f6-5ffa-e3cb-92215d057ea6"), ) ``` ## Errors The specified job does not exist. # Client.Jobs.Query Source: https://docs.tilebox.com/api-reference/go/workflows/Jobs.Query ```go func (*JobClient) Query( ctx context.Context, options ...job.QueryOption, ) iter.Seq2[*workflows.Job, error] ``` Query jobs in the specified interval. The jobs are lazily loaded and returned as a sequence of Jobs. The jobs are returned sorted by creation time in reverse order. The output sequence can be transformed into a slice of Job using [Collect](/api-reference/go/workflows/Collect) function. ## Parameters Options for querying jobs ## Options Specify the time interval for which data should be queried. Right now, a temporal extent is required for every query. Specify the automation id for which data should be queried. Only jobs that were created by the specified automation will be returned. ## Returns A sequence of jobs. ```go Go import ( "time" workflows "github.com/tilebox/tilebox-go/workflows/v1" "github.com/tilebox/tilebox-go/workflows/v1/job" "github.com/tilebox/tilebox-go/query" ) interval := query.NewTimeInterval( time.Now().Add(-24 * time.Hour), time.Now(), ) jobs, err := workflows.Collect( client.Jobs.Query(ctx, job.WithTemporalExtent(interval), ), ) ``` # Client.Jobs.Retry Source: https://docs.tilebox.com/api-reference/go/workflows/Jobs.Retry ```go func (*JobClient) Retry( ctx context.Context, jobID uuid.UUID, ) (int64, error) ``` Retry a job. All failed tasks will become queued again, and queued tasks will be picked up by task runners again. ## Parameters The id of the job to retry ## Returns The number of tasks that were rescheduled. ```go Go nbRescheduled, err := client.Jobs.Retry(ctx, uuid.MustParse("0195c87a-49f6-5ffa-e3cb-92215d057ea6"), ) ``` ## Errors The specified job does not exist. # Client.Jobs.Submit Source: https://docs.tilebox.com/api-reference/go/workflows/Jobs.Submit ```go func (*JobClient) Submit( ctx context.Context, jobName string, tasks []workflows.Task, options ...job.SubmitOption ) (*workflows.Job, error) ``` Submit a job. ## Parameters The name of the job The root task for the job. This task is executed first and can submit subtasks to manage the entire workflow. A job can have optionally consist of multiple root tasks. Options for the job ## Options Set the maximum number of [retries](/workflows/concepts/tasks#retry-handling) for the subtask in case it fails The [cluster](/workflows/concepts/clusters#managing-clusters) to run the root task on. If not provided, the default cluster is used. ## Returns A job object. ```go Go job, err := client.Jobs.Submit(ctx, "My job", []workflows.Task{rootTask}, ) ``` # Client.NewTaskRunner Source: https://docs.tilebox.com/api-reference/go/workflows/NewTaskRunner ```go func (*Client) NewTaskRunner( ctx context.Context, options ...runner.Option, ) (*workflows.TaskRunner, error) ``` Initialize a task runner. ## Parameters Options for initializing the task runner ## Options The [cluster](/workflows/concepts/clusters#managing-clusters) to connect to. If not provided, the default cluster is used. Set the logger to use for the task runner Disable OpenTelemetry metrics for the task runner ## Returns The created task runner object. ```go Go runner, err := client.NewTaskRunner(ctx) ``` # workflows.SubmitSubtask Source: https://docs.tilebox.com/api-reference/go/workflows/SubmitSubtask ```go workflows.SubmitSubtask( ctx context.Context, task workflows.Task, options ...subtask.SubmitOption, ) (subtask.FutureTask, error) ``` Submit a subtask to the task runner. This function is intended to be used in tasks. ## Parameters A subtask to submit Options for the subtask ## Options Set dependencies for the task Set the cluster slug of the cluster where the task will be executed. Set the maximum number of [retries](/workflows/concepts/tasks#retry-handling) for the subtask in case it fails ## Returns A future task that can be used to set dependencies between tasks. ```go Go type MySubTask struct { Sensor string Value float64 } type Task struct{} func (t *Task) Execute(ctx context.Context) error { err := workflows.SubmitSubtask(ctx, &MySubTask{ Sensor: "A", Value: 42, }, ) if err != nil { return fmt.Errorf("failed to submit subtasks: %w", err) } return nil } ``` # workflows.SubmitSubtasks Source: https://docs.tilebox.com/api-reference/go/workflows/SubmitSubtasks ```go workflows.SubmitSubtasks( ctx context.Context, tasks []workflows.Task, options ...subtask.SubmitOption, ) ([]subtask.FutureTask, error) ``` Submit multiple subtasks to the task runner. Same as [SubmitSubtask](/api-reference/go/workflows/SubmitSubtask), but accepts a list of tasks. This function is intended to be used in tasks. ## Parameters A list of tasks to submit Options for the subtasks ## Options Set dependencies for the tasks Set the cluster slug of the cluster where the tasks will be executed. Set the maximum number of [retries](/workflows/concepts/tasks#retry-handling) for the subtasks in case it fails ## Returns A list of future tasks that can be used to set dependencies between tasks. ```go Go type MySubTask struct { Sensor string Value float64 } type Task struct{} func (t *Task) Execute(ctx context.Context) error { err := workflows.SubmitSubtasks(ctx, []workflows.Task{ &MySubTask{ Sensor: "A", Value: 42, }, &MySubTask{ Sensor: "B", Value: 42, } }, ) if err != nil { return fmt.Errorf("failed to submit subtasks: %w", err) } return nil } ``` # Task Source: https://docs.tilebox.com/api-reference/go/workflows/Task ```go type Task interface{} ``` Base interface for Tilebox workflows [tasks](/workflows/concepts/tasks). It doesn't need to be identifiable or executable, but it can be both (see below). ## Methods ```go Task.Execute(ctx context.Context) error ``` The entry point for the execution of the task. If not defined, the task can't be registered with a task runner but can still be submitted. ```go Task.Identifier() TaskIdentifier ``` Provides a user-defined task identifier. The identifier is used to uniquely identify the task and specify its version. If not defined, the task runner will generate an identifier for it using reflection. ## JSON-serializable task ```go type SampleTask struct { Message string Depth int BranchFactor int } ``` Optional task [input parameters](/workflows/concepts/tasks#input-parameters), defined as struct fields. Supported types are all types supported by [json.Marshal](https://pkg.go.dev/encoding/json#Marshal). ## Protobuf-serializable task ```go type SampleTask struct { examplesv1.SpawnWorkflowTreeTask } ``` Task can also be defined as a protobuf message. An example using task protobuf messages can be found [here](https://github.com/tilebox/tilebox-go/tree/main/examples/sampleworkflow). ```go Go package helloworld import ( "context" "fmt" "github.com/tilebox/tilebox-go/workflows/v1" ) type MyFirstTask struct{} func (t *MyFirstTask) Execute(ctx context.Context) error { fmt.Println("Hello World!") return nil } func (t *MyFirstTask) Identifier() workflows.TaskIdentifier { return workflows.NewTaskIdentifier("tilebox.workflows.MyTask", "v3.2") } type MyFirstParameterizedTask struct { Name string Greet bool Data map[string]string } func (t *MyFirstParameterizedTask) Execute(ctx context.Context) error { if t.Greet { fmt.Printf("Hello %s!\n", t.Name) } return nil } ``` # TaskRunner.GetRegisteredTask Source: https://docs.tilebox.com/api-reference/go/workflows/TaskRunner.GetRegisteredTask ```go func (*TaskRunner) GetRegisteredTask( identifier workflows.TaskIdentifier, ) (workflows.ExecutableTask, bool) ``` Get the task with the given identifier. ## Parameters A display name for the cluster ## Returns The registered task. Returns `false` if not found. ```go Go identifier := workflows.NewTaskIdentifier("my-task", "v1.0") task, found := runner.GetRegisteredTask( identifier, ) ``` # TaskRunner.RegisterTasks Source: https://docs.tilebox.com/api-reference/go/workflows/TaskRunner.RegisterTasks ```go func (*TaskRunner) RegisterTasks(tasks ...workflows.ExecutableTask) error ``` Register tasks that can be executed by this task runner. ## Parameters A list of task classes that this runner can execute ## Returns An error if the tasks could not be registered. ```go Go err := runner.RegisterTasks( &MyTask{}, &MyOtherTask{}, ) ``` # TaskRunner.Run Source: https://docs.tilebox.com/api-reference/go/workflows/TaskRunner.Run ```go func (*TaskRunner) Run(ctx context.Context) ``` Run the task runner forever, looking for new tasks to run and polling for new tasks when idle. ```go Go runner.Run(ctx) ``` # workflows.WithTaskSpan Source: https://docs.tilebox.com/api-reference/go/workflows/WithTaskSpan ```go workflows.WithTaskSpan( ctx context.Context, name string, f func(ctx context.Context) error, ) error ``` Wrap a function with a [tracing span](/workflows/observability/tracing). ## Parameters The name of the span The function to wrap ## Returns An error if any. ```go Go type Task struct{} func (t *Task) Execute(ctx context.Context) error { err := workflows.WithTaskSpan(ctx, "Database insert", func(ctx context.Context) error { // Do something return nil }) if err != nil { return fmt.Errorf("failed to insert into database: %w", err) } return nil } ``` # workflows.WithTaskSpanResult Source: https://docs.tilebox.com/api-reference/go/workflows/WithTaskSpanResult ```go workflows.WithTaskSpanResult[Result any]( ctx context.Context, name string, f func(ctx context.Context) (Result, error), ) (Result, error) ``` Wrap a function with a [tracing span](/workflows/observability/tracing). ## Parameters The name of the span The function to wrap ## Returns The result of the function and an error if any. ```go Go type Task struct{} func (t *Task) Execute(ctx context.Context) error { result, err := workflows.WithTaskSpanResult(ctx, "Expensive Compute", func(ctx context.Context) (int, error) { return 6 * 7, nil }) if err != nil { return fmt.Errorf("failed to compute: %w", err) } return nil } ``` # Client Source: https://docs.tilebox.com/api-reference/python/tilebox.datasets/Client ```python class Client(url: str, token: str) ``` Create a Tilebox datasets client. ## Parameters Tilebox API Url. Defaults to `https://api.tilebox.com`. The API Key to authenticate with. If not set the `TILEBOX_API_KEY` environment variable will be used. ```python Python from tilebox.datasets import Client # read token from an environment variable client = Client() # or provide connection details directly client = Client( url="https://api.tilebox.com", token="YOUR_TILEBOX_API_KEY" ) ``` # Client.dataset Source: https://docs.tilebox.com/api-reference/python/tilebox.datasets/Client.dataset ```python def Client.dataset(slug: str) -> Dataset ``` Get a dataset by its slug. ## Parameters The slug of the dataset ## Returns A dataset object. ```python Python s1_sar = client.dataset( "open_data.copernicus.sentinel1_sar" ) ``` ## Errors The specified dataset does not exist. # Client.datasets Source: https://docs.tilebox.com/api-reference/python/tilebox.datasets/Client.datasets ```python def Client.datasets() -> Datasets ``` Fetch all available datasets. ## Returns An object containing all available datasets, accessible via dot notation. ```python Python datasets = client.datasets() s1_sar = datasets.open_data.copernicus.sentinel1_sar ``` # Collection.delete Source: https://docs.tilebox.com/api-reference/python/tilebox.datasets/Collection.delete ```python def Collection.delete(datapoints: DatapointIDs) -> int ``` Delete data points from the collection. Data points are identified and deleted by their ids. You need to have write permission on the collection to be able to delete data points. ## Parameters Datapoint IDs to delete from the collection. Supported `DatapointIDs` types are. * A `pandas.DataFrame` containing an `id` column. * A `pandas.Series` containing datapoint IDs. * An `xarray.Dataset` containing an "id" variable. * An `xarray.DataArray` containing datapoint IDs. * A `numpy.ndarray` containing datapoint IDs. * A `Collection[UUID]` containing datapoint IDs as python built-in `UUID` objects, e.g. `list[UUID]`. * A `Collection[str]` containing datapoint IDs as strings, e.g. `list[str]`. ## Returns The number of data points that were deleted. ```python Python collection.delete([ "0195c87a-49f6-5ffa-e3cb-92215d057ea6", "0195c87b-bd0e-3998-05cf-af6538f34957", ]) ``` ## Errors One of the data points is not found in the collection. If any of the data points are not found, nothing will be deleted. One of the specified ids is not a valid UUID # Collection.find Source: https://docs.tilebox.com/api-reference/python/tilebox.datasets/Collection.find ```python def Collection.find( datapoint_id: str, skip_data: bool = False ) -> xarray.Dataset ``` Find a specific datapoint in a collection by its id. ## Parameters The id of the datapoint to find. Whether to skip loading the data for the datapoint. If `True`, only the metadata for the datapoint is loaded. ## Returns An [`xarray.Dataset`](/sdks/python/xarray) containing the requested data point. Since it returns only a single data point, the output xarray dataset does not include a `time` dimension. ## Errors The specified datapoint does not exist in this collection. ```python Python data = collection.find( "0186d6b6-66cc-fcfd-91df-bbbff72499c3", skip_data = False, ) ``` # Collection.info Source: https://docs.tilebox.com/api-reference/python/tilebox.datasets/Collection.info ```python def Collection.info() -> CollectionInfo ``` Fetch metadata about the data points in this collection. ## Returns A collection info object. ```python Python info = collection.info() ``` # Collection.ingest Source: https://docs.tilebox.com/api-reference/python/tilebox.datasets/Collection.ingest ```python def Collection.ingest( data: IngestionData, allow_existing: bool = True ) -> list[UUID] ``` Ingest data into a collection. You need to have write permission on the collection to be able to delete data points. ## Parameters The data to ingest. Supported `IngestionData` data types are: * A `pandas.DataFrame`, mapping the column names to dataset fields. * An `xarray.Dataset`, mapping variables and coordinates to dataset fields. * An `Iterable`, `dict` or `nd-array`: ingest any object that can be converted to a `pandas.DataFrame` using it's constructor, equivalent to `ingest(pd.DataFrame(data))`. Datapoint fields are used to generate a deterministic unique `UUID` for each datapoint in a collection. Duplicate data points result in the same ID being generated. If `allow_existing` is `True`, `ingest` will skip those data points, since they already exist. If `allow_existing` is `False`, `ingest` will raise an error if any of the generated datapoint IDs already exist. Defaults to `True`. ## Returns List of datapoint ids that were ingested, including the IDs of existing data points in case of duplicates and `allow_existing=True`. ```python Python import pandas as pd collection.ingest(pd.DataFrame({ "time": [ "2023-05-01T12:00:00Z", "2023-05-02T12:00:00Z", ], "value": [1, 2], "sensor": ["A", "B"], })) ``` ## Errors If `allow_existing` is `False` and any of the datapoints attempting to ingest already exist. # Collection.load Source: https://docs.tilebox.com/api-reference/python/tilebox.datasets/Collection.load ```python def Collection.load( time_or_interval: TimeIntervalLike, skip_data: bool = False, show_progress: bool = False ) -> xarray.Dataset ``` Load a range of data points in this collection in a specified interval. If no data exists for the requested time or interval, an empty `xarray.Dataset` is returned. ## Parameters The time or time interval for which to load data. This can be a single time scalar, a tuple of two time scalars, or an array of time scalars. Valid time scalars are: `datetime.datetime` objects, strings in ISO 8601 format, or Unix timestamps in seconds. Behavior for each input type: * **TimeScalar**: If a single time scalar is provided, `load` returns all data points for that exact millisecond. * **TimeInterval**: If a time interval is provided, `load` returns all data points in that interval. Intervals can be a tuple of two `TimeScalars` or a `TimeInterval` object. Tuples are interpreted as a half-open interval `[start, end)`. With a `TimeInterval` object, the `start_exclusive` and `end_inclusive` parameters control whether the start and end time are inclusive or exclusive. * **Iterable\[TimeScalar]**: If an array of time scalars is provided, `load` constructs a time interval from the first and last time scalar in the array. Here, both the `start` and `end` times are inclusive. If `True`, the response contains only the [required fields for the dataset type](/datasets/types/timeseries) without the actual dataset-specific fields. Defaults to `False`. If `True`, a progress bar is displayed when pagination is required. Defaults to `False`. ## Returns An [`xarray.Dataset`](/sdks/python/xarray) containing the requested data points. ```python Python from datetime import datetime from tilebox.clients.core.data import TimeInterval # loading a specific time time = "2023-05-01 12:45:33.423" data = collection.load(time) # loading a time interval interval = ("2023-05-01", "2023-08-01") data = collection.load(interval, show_progress=True) # loading a time interval with TimeInterval interval = TimeInterval( start=datetime(2023, 5, 1), end=datetime(2023, 8, 1), start_exclusive=False, end_inclusive=False, ) data = collection.load(interval, show_progress=True) # loading with an iterable meta_data = collection.load(..., skip_data=True) first_50 = collection.load(meta_data.time[:50], skip_data=False) ``` # Dataset.collection Source: https://docs.tilebox.com/api-reference/python/tilebox.datasets/Dataset.collection ```python def Dataset.collection(name: str) -> Collection ``` Access a specific collection by its name. ## Parameters The name of the collection ## Returns A collection object. ```python Python collection = dataset.collection("My-collection") ``` ## Errors The specified collection does not exist in the dataset. # Dataset.collections Source: https://docs.tilebox.com/api-reference/python/tilebox.datasets/Dataset.collections ```python def Dataset.collections() -> dict[str, Collection] ``` List the available collections in a dataset. ## Returns A dictionary mapping collection names to collection objects. ```python Python collections = dataset.collections() ``` # Dataset.create_collection Source: https://docs.tilebox.com/api-reference/python/tilebox.datasets/Dataset.create_collection ```python def Dataset.create_collection(name: str) -> Collection ``` Create a collection in the dataset. ## Parameters The name of the collection ## Returns The created collection object. ```python Python collection = dataset.create_collection("My-collection") ``` # Dataset.delete_collection Source: https://docs.tilebox.com/api-reference/python/tilebox.datasets/Dataset.delete_collection ```python def Dataset.delete_collection(collection: str | UUID | CollectionClient) -> None ``` Delete a collection in the dataset. ## Parameters The collection to delete. Can be specified by name, id, or as a collection object. ```python Python dataset.delete_collection("My-collection") ``` # Dataset.get_or_create_collection Source: https://docs.tilebox.com/api-reference/python/tilebox.datasets/Dataset.get_or_create_collection ```python def Dataset.get_or_create_collection(name: str) -> Collection ``` Get a collection by its name. If the collection does not exist, it will be created. ## Parameters The name of the collection ## Returns A collection object. ```python Python collection = dataset.get_or_create_collection("My-collection") ``` # Client Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/Client ```python class Client(url: str, token: str) ``` Create a Tilebox workflows client. ## Parameters Tilebox API Url. Defaults to `https://api.tilebox.com`. The API Key to authenticate with. If not set the `TILEBOX_API_KEY` environment variable will be used. ## Sub clients The workflows client exposes sub clients for interacting with different parts of the Tilebox workflows API. ```python def Client.jobs() -> JobClient ``` A client for interacting with jobs. ```python def Client.clusters() -> ClusterClient ``` A client for managing clusters. ```python def Client.automations() -> AutomationClient ``` A client for scheduling automations. ## Task runners ```python def Client.runner(...) -> TaskRunner ``` A client is also used to instantiate task runners. Check out the [`Client.runner` API reference](/api-reference/python/tilebox.workflows/Client.runner) for more information. ```python Python from tilebox.workflows import Client # read token from an environment variable client = Client() # or provide connection details directly client = Client( url="https://api.tilebox.com", token="YOUR_TILEBOX_API_KEY" ) # access sub clients job_client = client.jobs() cluster_client = client.clusters() automation_client = client.automations() # or instantiate a task runner runner = client.runner(tasks=[...]) ``` # Client.runner Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/Client.runner ```python def Client.runner( cluster: ClusterSlugLike | None = None, tasks: list[type[Task]], cache: JobCache | None = None ) -> TaskRunner ``` Initialize a task runner. ## Parameters The [cluster slug](/workflows/concepts/clusters#managing-clusters) for the cluster associated with this task runner. If not provided, the default cluster is used. A list of task classes that this runner can execute. An optional [job cache](/workflows/caches) for caching results from tasks and sharing data between tasks. ```python Python from tilebox.workflows import Client from tilebox.workflows.cache import LocalFileSystemCache client = Client() runner = client.runner( tasks=[MyFirstTask, MySubtask], # optional: cache=LocalFileSystemCache("cache_directory"), ) ``` # ClusterClient.all Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/ClusterClient.all ```python def ClusterClient.all(): list[Cluster] ``` List all available clusters. ## Returns A list of all available clusters. ```python Python clusters = cluster_client.all() ``` # ClusterClient.create Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/ClusterClient.create ```python def ClusterClient.create(name: str): Cluster ``` Create a cluster. ## Parameters A display name for the cluster ## Returns The created cluster object. ```python Python cluster = cluster_client.create("My cluster") ``` # ClusterClient.delete Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/ClusterClient.delete ```python def ClusterClient.delete(cluster_or_slug: Cluster | str) ``` Delete a cluster. ## Parameters The cluster or cluster slug to delete. ```python Python cluster_client.delete(cluster) ``` # ClusterClient.find Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/ClusterClient.find ```python def ClusterClient.find(cluster_or_slug: Cluster | str): Cluster ``` Get a cluster by its slug. ## Parameters The cluster or cluster slug to get. ## Returns A cluster object. ```python Python cluster = cluster_client.find("my-cluster-CvufcSxcC9SKfe") ``` # Context.job_cache Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/ExecutionContext.job_cache ```python ExecutionContext.job_cache: JobCache ``` Access the job cache for a task. ```python Python # within a Tasks execute method, access the job cache # def execute(self, context: ExecutionContext): context.job_cache["some-key"] = b"my-value" ``` # Context.submit_subtask Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/ExecutionContext.submit_subtask ```python def ExecutionContext.submit_subtask( task: Task, depends_on: list[FutureTask] = None, cluster: str | None = None, max_retries: int = 0 ) -> FutureTask ``` Submit a [subtask](/workflows/concepts/tasks#subtasks-and-task-composition) from a currently executing task. ## Parameters The task to submit as a subtask. An optional list of tasks already submitted within the same context that this subtask depends on. An optional [cluster slug](/workflows/concepts/clusters#managing-clusters) for running the subtask. If not provided, the subtask runs on the same cluster as the parent task. Specify the maximum number of [retries](/workflows/concepts/tasks#retry-handling) for the subtask in case of failure. The default is 0. ## Returns A `FutureTask` object that represents the submitted subtask. Can be used to set up dependencies between tasks. ```python Python # within the execute method of a Task: subtask = context.submit_subtask(MySubtask()) dependent_subtask = context.submit_subtask( MyOtherSubtask(), depends_on=[subtask] ) gpu_task = context.submit_subtask( MyGPUTask(), cluster="gpu-cluster-slug" ) flaky_task = context.submit_subtask( MyFlakyTask(), max_retries=5 ) ``` # JobCache.__iter__ Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/JobCache.__iter__ ```python def JobCache.__iter__() -> Iterator[str] ``` List all available keys in a job cache group. Only keys that are direct children of the current group are returned. For nested groups, first access the group and then iterate over its keys. ```python Python # within a Tasks execute method, access the job cache # def execute(context: ExecutionContext) cache = context.job_cache # iterate over all keys for key in cache: print(cache[key]) # or collect as list keys = list(cache) # also works with nested groups for key in cache.group("some-group"): print(cache[key]) ``` # JobCache.group Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/JobCache.group ```python def JobCache.group(name: str) -> JobCache ``` You can nest caches in a hierarchical manner using [groups](/workflows/caches#groups-and-hierarchical-keys). Groups are separated by a forward slash (/) in the key. This hierarchical structure functions similarly to a file system. ## Parameters The name of the cache group. ```python Python # within a Tasks execute method, access the job cache # def execute(context: ExecutionContext) cache = context.job_cache # use groups to nest related cache values group = cache.group("some-group") group["value"] = b"my-value" nested = group.group("nested") nested["value"] = b"nested-value" # access nested groups directly via / notation value = cache["some-group/nested/value"] ``` # JobClient.cancel Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/JobClient.cancel ```python def JobClient.cancel(job_or_id: Job | str) ``` Cancel a job. When a job is canceled, no queued tasks will be picked up by task runners and executed even if task runners are idle. Tasks that are already being executed will finish their execution and not be interrupted. All sub-tasks spawned from such tasks after the cancellation will not be picked up by task runners. ## Parameters The job or job id to cancel. ```python Python job_client.cancel(job) ``` # JobClient.find Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/JobClient.find ```python def JobClient.find(job_or_id: Job | str) -> Job ``` Get a job by its id. Can also be an existing job object, in which case this method acts as a refresh operation to fetch the latest job details. ## Parameters The job or job id to get. ## Returns A job object. ```python Python job = job_client.find("0195c87a-49f6-5ffa-e3cb-92215d057ea6") ``` # JobClient.query Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/JobClient.query ```python def JobClient.query( temporal_extent: TimeIntervalLike | IDIntervalLike, automation_id: UUID | None = None, ) -> list[Job] ``` Query jobs in the specified interval. ## Parameters The temporal extent to filter jobs by. If an `IDInterval` is given, jobs are filtered by their job id instead of their creation time. It can be specified in the following ways: * TimeInterval: interval -> Use the time interval as its given * DatetimeScalar: \[time, time] -> Construct a TimeInterval with start and end time set to the given value and the end time inclusive * tuple of two DatetimeScalar: \[start, end) -> Construct a TimeInterval with the given start and end time * `IDInterval`: interval -> Use the ID interval as its given * tuple of two UUIDs: \[start, end) -> Construct an `IDInterval` with the given start and end id * tuple of two strings: \[start, end) -> Construct an `IDInterval` with the given start and end id parsed from the strings The automation id to filter jobs by. If not provided, jobs from all automations are returned. ## Returns A list of jobs. ```python Python jobs = job_client.query(("2025-01-01", "2025-02-01")) ``` # JobClient.retry Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/JobClient.retry ```python def JobClient.retry(job_or_id: Job | str) -> int ``` Retry a job. All failed tasks will become queued again, and queued tasks will be picked up by task runners again. ## Parameters The job or job id to retry. ## Returns The number of tasks that were rescheduled. ```python Python nb_rescheduled = job_client.retry(job) ``` # JobClient.submit Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/JobClient.submit ```python def JobClient.submit( job_name: str, root_task_or_tasks: Task | Iterable[Task], cluster: str | Cluster | Iterable[str | Cluster] | None = None, max_retries: int = 0 ) -> Job ``` Submit a job. ## Parameters The name of the job. The root task for the job. This task is executed first and can submit subtasks to manage the entire workflow. A job can have optionally consist of multiple root tasks. The [cluster slug](/workflows/concepts/clusters#managing-clusters) for the cluster to run the root task on. In case of multiple root tasks, a list of cluster slugs can be provided. If not provided, the default cluster is used. The maximum number of [retries](/workflows/concepts/tasks#retry-handling) for the subtask in case it fails. Defaults to 0. ```python Python from my_workflow import MyTask job = job_client.submit( "my-job", MyTask( message="Hello, World!", value=42, data={"key": "value"} ), ) ``` # JobClient.visualize Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/JobClient.visualize ```python def JobClient.visualize( job_or_id: Job | str, direction: str = "down", layout: str = "dagre", sketchy: bool = True ) ``` Create a visualization of a job as a diagram. If you are working in an interactive environment, such as Jupyter notebooks, you can use the `display` method to display the visualized job diagram directly in the notebook. ## Parameters The job to visualize. The direction for the diagram to flow. For more details, see the relevant section in the [D2 documentation](https://d2lang.com/tour/layouts/#direction). Valid values are `up`, `down`, `right`, and `left`. The default value is `down`. The layout engine for the diagram. For further information, refer to the [D2 layout engines](https://d2lang.com/tour/layouts). Valid values are `dagre` and `elk`, with the default being `dagre`. Indicates whether to use a sketchy, hand-drawn style for the diagram. The default is `True`. ```python Python svg = job_client.visualize(job) Path("diagram.svg").write_text(svg) # in a notebook job_client.display(job) ``` # Task Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/Task ```python class Task: def execute(context: ExecutionContext) -> None @staticmethod def identifier() -> tuple[str, str] ``` Base class for Tilebox workflows [tasks](/workflows/concepts/tasks). Inherit from this class to create a task. Inheriting also automatically applies the dataclass decorator. ## Methods ```python def Task.execute(context: ExecutionContext) -> None ``` The entry point for the execution of the task. ```python @staticmethod def Task.identifier() -> tuple[str, str] ``` Override a task identifier and specify its version. If not overridden, the identifier of a task defaults to the class name, and the version to `v0.0`. ## Task Input Parameters ```python class MyTask(Task): message: str value: int data: dict[str, int] ``` Optional task [input parameters](/workflows/concepts/tasks#input-parameters), defined as class attributes. Supported types are `str`, `int`, `float`, `bool`, as well as `lists` and `dicts` thereof. ```python Python from tilebox.workflows import Task, ExecutionContext class MyFirstTask(Task): def execute(self, context: ExecutionContext): print(f"Hello World!") @staticmethod def identifier() -> tuple[str, str]: return ("tilebox.workflows.MyTask", "v3.2") class MyFirstParameterizedTask(Task): name: str greet: bool data: dict[str, str] def execute(self, context: ExecutionContext): if self.greet: print(f"Hello {self.name}!") ``` # TaskRunner.run_all Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/TaskRunner.run_all ```python def TaskRunner.run_all() ``` Run the task runner and execute all tasks, until there are no more tasks available. ```python Python # run all tasks until no more tasks are available runner.run_all() ``` # TaskRunner.run_forever Source: https://docs.tilebox.com/api-reference/python/tilebox.workflows/TaskRunner.run_forever ```python def TaskRunner.run_forever() ``` Run the task runner forever. This will poll for new tasks and execute them as they come in. If no tasks are available, it will sleep for a short time and then try again. ```python Python # run forever, until the current process is stopped runner.run_forever() ``` # Authentication Source: https://docs.tilebox.com/authentication To access the Tilebox API, you must authenticate your requests. This guide explains how authentication works, focusing on API keys used as bearer tokens. ## Creating an API key To create an API key, log into the [Tilebox Console](https://console.tilebox.com). Navigate to [Account -> API Keys](https://console.tilebox.com/account/api-keys) and click the "Create API Key" button. Keep your API keys secure. Deactivate any keys if you suspect they have been compromised. ## Bearer token authentication The Tilebox API uses bearer tokens for authentication. You need to pass your API key as the `token` parameter when creating an instance of the client. ```python Python from tilebox.datasets import Client as DatasetsClient from tilebox.workflows import Client as WorkflowsClient datasets_client = DatasetsClient(token="YOUR_TILEBOX_API_KEY") workflows_client = WorkflowsClient(token="YOUR_TILEBOX_API_KEY") ``` ```go Go import ( "github.com/tilebox/tilebox-go/datasets/v1" "github.com/tilebox/tilebox-go/workflows/v1" ) datasetsClient := datasets.NewClient(datasets.WithAPIKey("YOUR_TILEBOX_API_KEY")) workflowsClient := workflows.NewClient(workflows.WithAPIKey("YOUR_TILEBOX_API_KEY")) ``` If you set your API key as an environment variable named `TILEBOX_API_KEY`, you can skip the token parameter when creating a client. # Product Updates Source: https://docs.tilebox.com/changelog New features, updates and improvements ## Go Client ![Tilebox Go Client](https://mintlify.s3.us-west-1.amazonaws.com/tilebox/assets/changelog/2025-05-tilebox-banner-go.svg) Excited to announce the release of the Tilebox Go client! **Features** * Datasets client * Statically typed dataset types * CLI to generate dataset types * Workflows client * Go task runners To get started, check out the [Go SDK documentation](/sdks/go/install). ## Spatio-Temporal datasets ![Spatio-Temporal Datasets](https://mintlify.s3.us-west-1.amazonaws.com/tilebox/assets/changelog/2025-04-spatio-temporal.png) Spatio-temporal datasets are officially out, fully supported in all languages and available as a category to create in custom datasets! The core problems that spatio-temporal datasets solve are * finding relevant data quickly (e.g. all Sentinel 2 granules along the US coastline, last year), * storing auxiliary geographically coded data (e.g. weather station data, ground truth data), * cataloging higher level data and results [Here's a short video](https://share.descript.com/view/khO3QJslhgU) on performance and core capabilities. We're excited about this as cataloging has until now been an unsexy but hard problem, and it's great to finally have a solution out there **More information** * [Spatio Temporal datasets documentation](https://docs.tilebox.com/datasets/types/spatiotemporal) * All open data now supports spatio-temporal queries * [Create your own spatio-temporal datasets](/guides/datasets/create) * [Ingesting spatio-temporal data](/datasets/ingest) ## Custom Datasets ![Custom datasets](https://mintlify.s3.us-west-1.amazonaws.com/tilebox/assets/changelog/2025-03-custom-datasets.png) Create your own custom datasets! * statically typed * with clients in Python and Go Use it to organize anything from telemetry, raw payload metadata, auxiliary sensor data, configuration data, or internal data catalogs. **Quickstart** 1. Specify the data type in the Console 2. Create a collection 3. Use client.ingest() to ingest a `xarray.Dataset` or `pandas.DataFrame` 4. Query away! For detailed instructions, check out the [Creating a dataset](/guides/datasets/create) how-to guide. # Console Source: https://docs.tilebox.com/console Explore your datasets and workflows with the Tilebox Console The [Tilebox Console](https://console.tilebox.com) is a web interface that enables you to explore your datasets and workflows, manage your account and API keys, add collaborators to your team and monitor your usage. ## Datasets The datasets explorer lets you explore available datasets for your team. You can select a dataset, view its collections, and load data for a collection within a specified time range. Tilebox Console Tilebox Console When you click a specific event time in the data point list view, a detailed view of that data point will appear. Tilebox Console Tilebox Console ### Export as Code After selecting a dataset, collection, and time range, you can export the current selection as a Python code snippet. This will copy a code snippet like the one below to your clipboard. ```python Python from tilebox.datasets import Client client = Client() datasets = client.datasets() sentinel2_msi = datasets.open_data.copernicus.sentinel2_msi data = sentinel2_msi.collection("S2A_S2MSI1C").query( temporal_extent=("2024-07-12", "2024-07-26"), show_progress=True, ) ``` ```go Go ctx := context.Background() client := datasets.NewClient() dataset, err := client.Datasets.Get(ctx, "open_data.copernicus.sentinel2_msi") if err != nil { log.Fatalf("Failed to get dataset: %v", err) } collection, err := client.Collections.Get(ctx, dataset.ID, "S2A_S2MSI1C") if err != nil { log.Fatalf("Failed to get collection: %v", err) } startDate := time.Date(2024, 7, 12, 0, 0, 0, 0, time.UTC) endDate := time.Date(2024, 7, 26, 0, 0, 0, 0, time.UTC) timeInterval := query.NewTimeInterval(startDate, endDate) var datapoints []*v1.Sentinel2Msi err = client.Datapoints.QueryInto(ctx, []uuid.UUID{collection.ID}, &datapoints, datasets.WithTemporalExtent(timeInterval), ) if err != nil { log.Fatalf("Failed to query datapoints: %v", err) } ``` Paste the snippet into a [notebook](/sdks/python/sample-notebooks) to interactively explore the [`xarray.Dataset`](/sdks/python/xarray) that is returned. ## Workflows The workflows section of the console allows you to view jobs, create clusters and create near real-time automations. Tilebox Console Tilebox Console ## Account ### API Keys The API Keys page enables you to manage your API keys. You can create new API keys, revoke existing ones, and view currently active API keys. Tilebox Console Tilebox Console ### Usage The Usage page allows you to view your current usage of the Tilebox API. Tilebox Console Tilebox Console # Collections Source: https://docs.tilebox.com/datasets/concepts/collections Learn about dataset collections Collections group data points within a dataset. They help represent logical groupings of data points that are commonly queried together. For example, if your dataset includes data from a specific instrument on different satellites, you can group the data points from each satellite into a collection. Refer to the examples below for common use cases when working with collections. These examples assume that you have already created a client and selected a dataset as shown below. ```python Python from tilebox.datasets import Client client = Client() datasets = client.datasets() dataset = datasets.open_data.copernicus.landsat8_oli_tirs ``` ```go Go package main import ( "context" "log" "github.com/tilebox/tilebox-go/datasets/v1" ) func main() { ctx := context.Background() client := datasets.NewClient() dataset, err := client.Datasets.Get(ctx, "open_data.copernicus.landsat8_oli_tirs") if err != nil { log.Fatalf("Failed to get dataset: %v", err) } } ``` ## Listing collections To list the collections for a dataset, use the `collections` method on the dataset object. ```python Python collections = dataset.collections() print(collections) ``` ```go Go collections, err := client.Collections.List(ctx, dataset.ID) if err != nil { log.Fatalf("Failed to list collections: %v", err) } log.Println(collections) ``` ```plaintext Output {'L1GT': Collection L1GT: [2013-03-25T12:08:43.699 UTC, 2024-08-19T12:57:32.456 UTC] (154288 data points), 'L1T': Collection L1T: [2013-03-26T09:33:19.763 UTC, 2020-08-24T03:21:50.000 UTC] (87958 data points), 'L1TP': Collection L1TP: [2013-03-24T00:25:55.457 UTC, 2024-08-19T12:58:20.229 UTC] (322041 data points), 'L2SP': Collection L2SP: [2015-01-01T07:53:35.391 UTC, 2024-08-12T12:52:03.243 UTC] (191110 data points)} ``` [dataset.collections](/api-reference/python/tilebox.datasets/Dataset.collections) returns a dictionary mapping collection names to their corresponding collection objects. Each collection has a unique name within its dataset. ## Creating collections To create a collection in a dataset, use [dataset.create\_collection()](/api-reference/python/tilebox.datasets/Dataset.create_collection). This method returns the created collection object. ```python Python collection = dataset.create_collection("My-collection") ``` ```go Go collection, err := client.Collections.Create(ctx, dataset.ID, "My-collection") ``` You can use [dataset.get\_or\_create\_collection()](/api-reference/python/tilebox.datasets/Dataset.get_or_create_collection) to get a collection by its name. If the collection does not exist, it will be created. ```python Python collection = dataset.get_or_create_collection("My-collection") ``` ```go Go collection, err := client.Collections.GetOrCreate(ctx, dataset.ID, "My-collection") ``` ## Accessing individual collections Once you have listed the collections for a dataset using [dataset.collections()](/api-reference/python/tilebox.datasets/Dataset.collections), you can access a specific collection by retrieving it from the resulting dictionary with its name. Use [collection.info()](/api-reference/python/tilebox.datasets/Collection.info) in Python or `String()` in Go to get details (name, availability, and count) about it. ```python Python collections = dataset.collections() terrain_correction = collections["L1GT"] collection_info = terrain_correction.info() print(collection_info) ``` ```go Go dataset, err := client.Datasets.Get(ctx, "open_data.copernicus.landsat8_oli_tirs") if err != nil { log.Fatalf("Failed to get dataset: %v", err) } collection, err := client.Collections.Get(ctx, dataset.ID, "L1GT") if err != nil { log.Fatalf("Failed to get collection: %v", err) } log.Println(collection.String()) ``` ```plaintext Output L1GT: [2013-03-25T12:08:43.699 UTC, 2024-08-19T12:57:32.456 UTC] (154288 data points) ``` You can also access a specific collection directly using the [dataset.collection](/api-reference/python/tilebox.datasets/Dataset.collection) method on the dataset object. This method allows you to get the collection without having to list all collections first. ```python Python terrain_correction = dataset.collection("L1GT") collection_info = terrain_correction.info() print(collection_info) ``` ```go Go dataset, err := client.Datasets.Get(ctx, "open_data.copernicus.landsat8_oli_tirs") if err != nil { log.Fatalf("Failed to get dataset: %v", err) } collection, err := client.Collections.Get(ctx, dataset.ID, "L1GT") if err != nil { log.Fatalf("Failed to get collection: %v", err) } log.Println(collection.String()) ``` ```plaintext Output L1GT: [2013-03-25T12:08:43.699 UTC, 2024-08-19T12:57:32.456 UTC] (154288 data points) ``` ## Deleting collections Collections can be deleted from a dataset using the `delete_collection` method. To delete a collection, you need to have write permission on the dataset. Deleting a collection will delete all data points in the collection. ```python Python dataset.delete_collection(collection) ``` ```go Go err := client.Collections.Delete(ctx, dataset.ID, collection.ID) ``` ## Errors you may encounter ### NotFoundError If you attempt to access a collection with a non-existent name, a `NotFoundError` is raised. For example: ```python Python dataset.collection("Sat-X").info() # raises NotFoundError: 'No such collection Sat-X' ``` ```go Go collection, err := client.Collections.Get(ctx, dataset.ID, "Sat-X") if err != nil { log.Fatal(err) // prints 'failed to get collections: not_found: no such collection' } ``` ## Next steps Learn how to query data from a collection. # Datasets Source: https://docs.tilebox.com/datasets/concepts/datasets Tilebox Datasets act as containers for data points. All data points in a dataset share the same type and fields. You can create your own, Custom Datasets via the [Tilebox Console](/console). ## Related Guides Learn how to create a Timeseries dataset using the Tilebox Console. Learn how to ingest an existing CSV dataset into a Timeseries dataset collection. ## Dataset types Each dataset is of a specific type. Each dataset type comes with a set of required fields for each data point. The dataset type also determines the query capabilities for a dataset, for example, whether a dataset supports time-based queries or additionally also spatially filtered queries. To find out which fields are required for each dataset type check out the documentation for the available dataset types below. Each data point is linked to a specific point in time. Common for satellite telemetry, or other time-based data. Supports efficient time-based queries. Each data point is linked to a specific point in time and a location on the Earth's surface. Common for satellite imagery. Supports efficient time-based and spatially filtered queries. ## Dataset specific fields Additionally, each dataset has a set of fields that are specific to that dataset. Fields are defined during dataset creation. That way, all data points in a dataset are strongly typed and are validated during ingestion. The required fields of the dataset type, as well as the custom fields specific to each dataset together make up the **dataset schema**. Once a **dataset schema** is defined, existing fields cannot be removed or edited as soon as data has been ingested into it. You can always add new fields to a dataset, since all fields are always optional. The only exception to this rule are empty datasets. If you empty all collections in a dataset, you can freely edit the data schema, since no conflicts with existing data points can occur. ## Field types When defining the data schema, you can specify each field's type. The following field types are supported. ### Primitives | Type | Description | Example value | | ----------------------------------------------------------------------------------------- | ------------------------------------------- | ------------- | | string | A string of characters of arbitrary length. | `Some string` | | int64 | A 64-bit signed integer. | `123` | | uint64 | A 64-bit unsigned integer. | `123` | | float64 | A 64-bit floating-point number. | `123.45` | | bool | A boolean. | `true` | | bytes | A sequence of arbitrary length bytes. | `0xAF1E28D4` | ### Time | Type | Description | Example value | | ------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------------------- | | Duration | A signed, fixed-length span of time represented as a count of seconds and fractions of seconds at nanosecond resolution. See [Duration](https://protobuf.dev/reference/protobuf/google.protobuf/#duration) for more information. | `12s 345ms` | | Timestamp | A point in time, represented as seconds and fractions of seconds at nanosecond resolution in UTC Epoch time. See [Timestamp](https://protobuf.dev/reference/protobuf/google.protobuf/#timestamp) for more information. | `2023-05-17T14:30:00Z` | ### Identifier | Type | Description | Example value | | -------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------ | -------------------------------------- | | UUID | A [universally unique identifier (UUID)](https://en.wikipedia.org/wiki/Universally_unique_identifier). | `126a2531-c98d-4e06-815a-34bc5b1228cc` | ### Geospatial | Type | Description | Example value | | ------------------------------------------------------------------------------------------ | ------------------------------------------------------------------------- | --------------------------------------- | | Geometry | Geospatial geometries of type Point, LineString, Polygon or MultiPolygon. | `POLYGON ((12.3 -5.4, 12.5 -5.4, ...))` | ### Arrays Every type is also available as an array, allowing to ingest multiple values of the underlying type for each data point. The size of the array is flexible, and can be different for each data point. ## Creating a dataset You can create a dataset in Tilebox using the [Tilebox Console](/console). Check out the [Creating a dataset](/guides/datasets/create) guide for an example of how to achieve this. ## Listing datasets You can use [your client instance](/datasets/introduction#creating-a-datasets-client) to access the datasets available to you. To list all available datasets, use the `datasets` method of the client. ```python Python from tilebox.datasets import Client client = Client() datasets = client.datasets() print(datasets) ``` ```go Go package main import ( "context" "log" "github.com/tilebox/tilebox-go/datasets/v1" ) func main() { ctx := context.Background() client := datasets.NewClient() allDatasets, err := client.Datasets.List(ctx) if err != nil { log.Fatalf("Failed to list datasets: %v", err) } for _, dataset := range allDatasets { log.Println(dataset) } } ``` ```plaintext Output open_data: asf: ers_sar: European Remote Sensing Satellite (ERS) Synthetic Aperture Radar (SAR) Granules copernicus: landsat8_oli_tirs: Landsat-8 is part of the long-running Landsat programme ... sentinel1_sar: The Sentinel-1 mission is the European Radar Observatory for the ... sentinel2_msi: Sentinel-2 is equipped with an optical instrument payload that samples ... sentinel3_olci: OLCI (Ocean and Land Colour Instrument) is an optical instrument used to ... ... ``` Once you have your dataset object, you can use it to [list the available collections](/datasets/concepts/collections) for the dataset. In python, if you're using an IDE or an interactive environment with auto-complete, you can use it on your client instance to discover the datasets available to you. Type `client.` and trigger auto-complete after the dot to do so. ## Accessing a dataset Each dataset has an automatically generated *slug* that can be used to access it. The *slug* is the name of the group, followed by a dot, followed by the dataset *code name*. For example, the *slug* for the Sentinel-2 MSI dataset, which is part of the `open_data.copernicus` group, is `open_data.copernicus.sentinel2_msi`. To access a dataset, use the `dataset` method of your client instance and pass the *slug* of the dataset as an argument. ```python Python from tilebox.datasets import Client client = Client() s2_msi_dataset = client.dataset("open_data.copernicus.sentinel2_msi") ``` ```go Go s2MsiDataset, err := client.Datasets.Get(ctx, "open_data.copernicus.sentinel2_msi") ``` Once you have your dataset object, you can use it to [access available collections](/datasets/concepts/collections) for the dataset. ## Deleting a dataset Datasets can be deleted through the [Tilebox Console](/console) by clicking the `Delete` button in the dataset page. Empty datasets will be deleted right away. A dataset is considered empty if none of its collections contain any data points. A non-empty dataset can also be deleted, but is a destructive operation. Every data point in every collection of the dataset will be deleted. As a safety measure, Tilebox soft-deletes the dataset for 7 days before permanently deleting it. Please [get in touch](mailto:support@tilebox.com) if you deleted a dataset by accident and want to restore it. # Deleting Data Source: https://docs.tilebox.com/datasets/delete Learn how to delete data points from Tilebox datasets. You need to have write permission on the collection to be able to delete datapoints. Check out the examples below for common scenarios of deleting data from a collection. ## Deleting data by datapoint IDs To delete data from a collection, use the [delete](/api-reference/python/tilebox.datasets/Collection.delete) method. This method accepts a list of datapoint IDs to delete. ```python Python from tilebox.datasets import Client client = Client() datasets = client.datasets() collections = datasets.my_custom_dataset.collections() collection = collections["Sensor-1"] n_deleted = collection.delete([ "0195c87a-49f6-5ffa-e3cb-92215d057ea6", "0195c87b-bd0e-3998-05cf-af6538f34957", ]) print(f"Deleted {n_deleted} data points.") ``` ```go Go package main import ( "context" "log" "log/slog" "github.com/google/uuid" "github.com/tilebox/tilebox-go/datasets/v1" ) func main() { ctx := context.Background() client := datasets.NewClient() dataset, err := client.Datasets.Get(ctx, "my_custom_dataset") if err != nil { log.Fatalf("Failed to get dataset: %v", err) } collection, err := client.Collections.Get(ctx, dataset.ID, "Sensor-1") if err != nil { log.Fatalf("Failed to create collection: %v", err) } datapointIDs := []uuid.UUID{ uuid.MustParse("0195c87a-49f6-5ffa-e3cb-92215d057ea6"), uuid.MustParse("0195c87b-bd0e-3998-05cf-af6538f34957"), } numDeleted, err := client.Datapoints.DeleteIDs(ctx, collection.ID, datapointIDs) if err != nil { log.Fatalf("Failed to delete datapoints: %v", err) } slog.Info("Deleted data points", slog.Int64("deleted", numDeleted)) } ``` ```plaintext Output Deleted 2 data points. ``` In python, `delete` not only takes a list of datapoint IDs as string, but supports a wide range of other useful input types as well. See the [delete](/api-reference/python/tilebox.datasets/Collection.delete) API documentation for more details. ### Possible errors * `NotFoundError`: raised if one of the data points is not found in the collection. If any of the data points are not found, nothing will be deleted * `ValueError`: raised if one of the specified ids is not a valid UUID ## Deleting a time interval One common way to delete data is to first load it from a collection and then forward it to the `delete` method. For this use case it often is a good idea to query the datapoints with `skip_data=True` to avoid loading the data fields, since you only need the datapoint IDs. See [fetching only metadata](/datasets/query#fetching-only-metadata) for more details. ```python Python to_delete = collection.load(("2023-05-01", "2023-06-01"), skip_data=True) n_deleted = collection.delete(datapoints) print(f"Deleted {n_deleted} data points.") ``` ```go Go collectionID := uuid.MustParse("c5145c99-1843-4816-9221-970f9ce3ac93") startDate := time.Date(2023, time.May, 1, 0, 0, 0, 0, time.UTC) endDate := time.Date(2023, time.June, 1, 0, 0, 0, 0, time.UTC) mai2023 := query.NewTimeInterval(startDate, endDate) var toDelete []*v1.Sentinel2Msi err := client.Datapoints.QueryInto(ctx, []uuid.UUID{collectionID}, &toDelete, datasets.WithTemporalExtent(mai2023), datasets.WithSkipData(), ) if err != nil { log.Fatalf("Failed to query datapoints: %v", err) } numDeleted, err := client.Datapoints.Delete(ctx, collectionID, toDelete) if err != nil { log.Fatalf("Failed to delete datapoints: %v", err) } slog.Info("Deleted data points", slog.Int64("deleted", numDeleted)) ``` ```plaintext Output Deleted 104 data points. ``` ## Automatic batching Tilebox automatically batches the delete requests for you, so you don't have to worry about the maximum request size. # Ingesting Data Source: https://docs.tilebox.com/datasets/ingest Learn how to ingest data into a Tilebox dataset. You need to have write permission on the collection to be able to ingest data. Check out the examples below for common scenarios of ingesting data into a collection. ## Dataset schema Tilebox Datasets are strongly typed. This means you can only ingest data that matches the schema of a dataset. The schema is defined during dataset creation time. The examples on this page assume that you have access to a [Timeseries dataset](/datasets/types/timeseries) that has the following schema: Check out the [Creating a dataset](/guides/datasets/create) guide for an example of how to create such a dataset. **MyCustomDataset schema** | Field name | Type | Description | | ---------------- | ------------------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------- | | `time` | Timestamp | Timestamp of the data point. Required by the [Timeseries dataset](/datasets/types/timeseries) type. | | `id` | UUID | Auto-generated UUID for each datapoint. | | `ingestion_time` | Timestamp | Auto-generated timestamp for when the data point was ingested into the Tilebox API. | | `value` | float64 | A numeric measurement value. | | `sensor` | string | A name of the sensor that generated the data point. | | `precise_time` | Timestamp | A precise measurement time in nanosecond precision. | | `sensor_history` | Array\[float64] | The last few measurements of the sensor. | A full overview of available data types can be found in the [here](/datasets/concepts/datasets#field-types). Once you've defined the schema and created a dataset, you can access it and create a collection to ingest data into. ```python Python from tilebox.datasets import Client client = Client() dataset = client.dataset("my_org.my_custom_dataset") collection = dataset.get_or_create_collection("Measurements") ``` ```go Go package main import ( "context" "log" "github.com/tilebox/tilebox-go/datasets/v1" ) func main() { ctx := context.Background() client := datasets.NewClient() dataset, err := client.Datasets.Get(ctx, "my_org.my_custom_dataset") if err != nil { log.Fatalf("Failed to get dataset: %v", err) } collection, err := client.Collections.GetOrCreate(ctx, dataset.ID, "Measurements") if err != nil { log.Fatalf("Failed to get collection: %v", err) } } ``` ## Preparing data for ingestion Ingestion can be done either in Python or Go. ### Python [`collection.ingest`](/api-reference/python/tilebox.datasets/Collection.ingest) supports a wide range of input types. Below is an example of using either a `pandas.DataFrame` or an `xarray.Dataset` as input. #### pandas.DataFrame A [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) is a representation of two-dimensional, potentially heterogeneous tabular data. It's a powerful tool for working with structured data, and Tilebox supports it as input for `ingest`. The example below shows how to construct a `pandas.DataFrame` from scratch, that matches the schema of the `MyCustomDataset` dataset and can be ingested into it. ```python Python import pandas as pd data = pd.DataFrame({ "time": [ "2025-03-28T11:44:23Z", "2025-03-28T11:45:19Z", ], "value": [45.16, 273.15], "sensor": ["A", "B"], "precise_time": [ "2025-03-28T11:44:23.345761444Z", "2025-03-28T11:45:19.128742312Z", ], "sensor_history": [ [-12.15, 13.45, -8.2, 16.5, 45.16], [300.16, 280.12, 273.15], ], }) print(data) ``` ```plaintext Output time value sensor precise_time sensor_history 0 2025-03-28T11:44:23Z 45.16 A 2025-03-28T11:44:23.345761444Z [-12.15, 13.45, -8.2, 16.5, 45.16] 1 2025-03-28T11:45:19Z 273.15 B 2025-03-28T11:45:19.128742312Z [300.16, 280.12, 273.15] ``` Once you have the data ready in this format, you can `ingest` it into a collection. ```python Python # now that we have the data frame in the correct format # we can ingest it into the Tilebox dataset collection.ingest(data) # To verify it now contains the 2 data points print(collection.info()) ``` ```plaintext Output Measurements: [2025-03-28T11:44:23.000 UTC, 2025-03-28T11:45:19.000 UTC] (2 data points) ``` You can now also head on over to the [Tilebox Console](/console) and view the newly ingested data points there. #### xarray.Dataset [`xarray.Dataset`](/sdks/python/xarray) is the default format in which Tilebox Datasets returns data when [querying data](/datasets/query) from a collection. Tilebox also supports it as input for ingestion. The example below shows how to construct an `xarray.Dataset` from scratch, that matches the schema of the `MyCustomDataset` dataset and can then be ingested into it. To learn more about `xarray.Dataset`, visit Tilebox dedicated [Xarray documentation page](/sdks/python/xarray). ```python Python import pandas as pd data = xr.Dataset({ "time": ("time", [ "2025-03-28T11:46:13Z", "2025-03-28T11:46:54Z", ]), "value": ("time", [48.1, 290.12]), "sensor": ("time", ["A", "B"]), "precise_time": ("time", [ "2025-03-28T11:46:13.345761444Z", "2025-03-28T11:46:54.128742312Z", ]), "sensor_history": (("time", "n_sensor_history"), [ [13.45, -8.2, 16.5, 45.16, 48.1], [280.12, 273.15, 290.12, np.nan, np.nan], ]), }) print(data) ``` ```plaintext Output Size: 504B Dimensions: (time: 2, n_sensor_history: 5) Coordinates: * time (time) Array fields manifest in xarray using an extra dimension, in this case `n_sensor_history`. In case of different array sizes for each data point, remaining values are filled up with a fill value, depending on the `dtype` of the array. For `float64` this is `np.nan` (not a number). Don't worry - when ingesting data into a Tilebox dataset, Tilebox will automatically skip those padding fill values and not store them in the dataset. Now that you have the `xarray.Dataset` in the correct format, you can ingest it into the Tilebox dataset collection. ```python Python collection = dataset.get_or_create_collection("OtherMeasurements") collection.ingest(data) # To verify it now contains the 2 data points print(collection.info()) ``` ```plaintext Output OtherMeasurements: [2025-03-28T11:46:13.000 UTC, 2025-03-28T11:46:54.000 UTC] (2 data points) ``` ### Go [`Client.Datapoints.Ingest`](/api-reference/go/datasets/Datapoints.Ingest) supports ingestion of data points in the form of a slice of protobuf messages. #### Protobuf Protobuf is Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data. More details on protobuf can be found in the [protobuf section](/sdks/go/protobuf). In the example below, `v1.Modis` type has been generated using [tilebox-generate](https://github.com/tilebox/tilebox-generate) as described in the [protobuf section](/sdks/go/protobuf). ```go Go datapoints := []*v1.Modis{ v1.Modis_builder{ Time: timestamppb.New(time.Now()), GranuleName: proto.String("Granule 1"), }.Build(), v1.Modis_builder{ Time: timestamppb.New(time.Now().Add(-5 * time.Hour)), GranuleName: proto.String("Past Granule 2"), }.Build(), } ingestResponse, err := client.Datapoints.Ingest(ctx, collectionID, &datapoints false, ) ``` ## Copying or moving data Since `ingest` takes `query`'s output as input, you can easily copy or move data from one collection to another. Copying data like this also works across datasets in case the dataset schemas are compatible. ```python Python src_collection = dataset.collection("Measurements") data_to_copy = src_collection.load(("2025-03-28", "2025-03-29")) dest_collection = dataset.collection("OtherMeasurements") dest_collection.ingest(data_to_copy) # copy the data to the other collection # To verify it now contains 4 datapoints (2 we ingested already, and 2 we copied just now) print(dest_collection.info()) ``` ```go Go dataset, err := client.Datasets.Get(ctx, "my_org.my_custom_dataset") if err != nil { log.Fatalf("Failed to get dataset: %v", err) } srcCollection, err := client.Collections.GetOrCreate(ctx, dataset.ID, "Measurements") if err != nil { log.Fatalf("Failed to get collection: %v", err) } startDate := time.Date(2025, time.March, 28, 0, 0, 0, 0, time.UTC) endDate := time.Date(2025, time.March, 29, 0, 0, 0, 0, time.UTC) var dataToCopy []*v1.MyCustomDataset err = client.Datapoints.QueryInto(ctx, []uuid.UUID{srcCollection.ID}, &dataToCopy, datasets.WithTemporalExtent(query.NewTimeInterval(startDate, endDate)), ) if err != nil { log.Fatalf("Failed to query datapoints: %v", err) } destCollection, err := client.Collections.GetOrCreate(ctx, dataset.ID, "OtherMeasurements") if err != nil { log.Fatalf("Failed to get collection: %v", err) } // copy the data to the other collection _, err = client.Datapoints.Ingest(ctx, destCollection.ID, &dataToCopy, false) if err != nil { log.Fatalf("Failed to ingest datapoints: %v", err) } // To verify it now contains 4 datapoints (2 we ingested already, and 2 we copied just now) updatedDestCollection, err := client.Collections.Get(ctx, dataset.ID, "OtherMeasurements") if err != nil { log.Fatalf("Failed to get collection: %v", err) } slog.Info("Updated collection", slog.String("collection", updatedDestCollection.String())) ``` ```plaintext Output OtherMeasurements: [2025-03-28T11:44:23.000 UTC, 2025-03-28T11:46:54.000 UTC] (4 data points) ``` ## Automatic batching Tilebox automatically batches the ingestion requests for you, so you don't have to worry about the maximum request size. ## Idempotency Tilebox will auto-generate datapoint IDs based on the data of all its fields - except for the auto-generated `ingestion_time`, so ingesting the same data twice will result in the same ID being generated. By default, Tilebox will silently skip any data points that are duplicates of existing ones in a collection. This behavior is especially useful when implementing idempotent algorithms. That way, re-executions of certain ingestion tasks due to retries or other reasons will never result in duplicate data points. You can instead also request an error to be raised if any of the generated datapoint IDs already exist. This can be done by setting the `allow_existing` parameter to `False`. ```python Python data = pd.DataFrame({ "time": [ "2025-03-28T11:45:19Z", ], "value": [45.16], "sensor": ["A"], "precise_time": [ "2025-03-28T11:44:23.345761444Z", ], "sensor_history": [ [-12.15, 13.45, -8.2, 16.5, 45.16], ], }) # we already ingested the same data point previously collection.ingest(data, allow_existing=False) # we can still ingest it, by setting allow_existing=True # but the total number of datapoints will still be the same # as before in that case, since it already exists and therefore # will be skipped collection.ingest(data, allow_existing=True) # no-op ``` ```go Go datapoints := []*v1.MyCustomDataset{ v1.MyCustomDataset_builder{ Time: timestamppb.New(time.Date(2025, time.March, 28, 11, 45, 19, 0, time.UTC)), Value: proto.Float64(45.16), Sensor: proto.String("A"), PreciseTime: timestamppb.New(time.Date(2025, time.March, 28, 11, 44, 23, 345761444, time.UTC)), SensorHistory: []float64{-12.15, 13.45, -8.2, 16.5, 45.16}, }.Build(), } // we already ingested the same data point previously _, err = client.Datapoints.Ingest(ctx, collection.ID, &datapoints, false) if err != nil { log.Fatalf("Failed to ingest datapoints: %v", err) } // we can still ingest it, by setting allowExisting to true // but the total number of datapoints will still be the same // as before in that case, since it already exists and therefore // will be skipped _, err = client.Datapoints.Ingest(ctx, collection.ID, &datapoints, true) // no-op if err != nil { log.Fatalf("Failed to ingest datapoints: %v", err) } ``` ```plaintext Output ArgumentError: found existing datapoints with same id, refusing to ingest with "allow_existing=false" ``` ## Ingestion from common file formats Through the usage of `xarray` and `pandas` you can also easily ingest existing datasets available in file formats, such as CSV, [Parquet](https://parquet.apache.org/), [Feather](https://arrow.apache.org/docs/python/feather.html) and more. Check out the [Ingestion from common file formats](/guides/datasets/ingest-format) guide for examples of how to achieve this. # Tilebox Datasets Source: https://docs.tilebox.com/datasets/introduction Tilebox Datasets provides structured and high-performance satellite metadata access. Tilebox Datasets ingests and structures metadata for efficient querying, reducing data transfer and storage costs. Create your own [Custom Datasets](/datasets/concepts/datasets) and easily set up a private, custom, strongly typed and highly available catalogue, or explore any of the wide range of [available public open data datasets](/datasets/open-data) available on Tilebox. Learn more about datasets by exploring the following sections: Learn what dataset types are available on Tilebox and how to create, list and access them. Learn what collections are and how to access them. Find out how to access data from a collection for specific time intervals. Learn how to ingest data into a collection. For a quick reference to API methods or specific parameter meanings, [check out the complete datasets API Reference](/api-reference/python/tilebox.datasets/Client). ## Terminology Get familiar with some key terms when working with time series datasets. Data points are the individual entities that form a dataset. Each data point has a set of required [fields](/datasets/types/timeseries) determined by the dataset type, and can have custom user-defined fields. Datasets act as containers for data points. All data points in a dataset share the same type and fields. Tilebox supports different types of datasets, currently those are [Timeseries](/datasets/types/timeseries) and [Spatio-temporal](/datasets/types/spatiotemporal) datasets. Collections group data points within a dataset. They help represent logical groupings of data points that are often queried together. ## Creating a datasets client Prerequisites * You have installed the [python](/sdks/python/install) `tilebox-datasets` package or [go](/sdks/go/install) library. * You have [created](/authentication) a Tilebox API key. After meeting these prerequisites, you can create a client instance to interact with Tilebox Datasets. ```python Python from tilebox.datasets import Client client = Client(token="YOUR_TILEBOX_API_KEY") ``` ```go Go import ( "github.com/tilebox/tilebox-go/datasets/v1" ) client := datasets.NewClient( datasets.WithAPIKey("YOUR_TILEBOX_API_KEY"), ) ``` You can also set the `TILEBOX_API_KEY` environment variable to your API key. You can then instantiate the client without passing the `token` argument. Python will automatically use this environment variable for authentication. ```python Python from tilebox.datasets import Client # requires a TILEBOX_API_KEY environment variable client = Client() ``` ```go Go import ( "github.com/tilebox/tilebox-go/datasets/v1" ) // requires a TILEBOX_API_KEY environment variable client := datasets.NewClient() ``` Tilebox datasets provide a standard synchronous API by default but also offers an [asynchronous client](/sdks/python/async) if needed. ### Exploring datasets After creating a client instance, you can start exploring available datasets. A straightforward way to do this in an interactive environment is to [list all datasets](/api-reference/python/tilebox.datasets/Client.datasets) and use the autocomplete feature in your Jupyter notebook. ```python Python datasets = client.datasets() datasets. # trigger autocomplete here to view available datasets ``` ```go Go package main import ( "context" "github.com/tilebox/tilebox-go/datasets/v1" "log" ) func main() { client := datasets.NewClient() ctx := context.Background() allDatasets, err := client.Datasets.List(ctx) if err != nil { log.Fatalf("Failed to list datasets: %v", err) } for _, dataset := range allDatasets { log.Printf("Dataset: %s", dataset.Name) } } ``` The Console also provides an [overview](https://console.tilebox.com/datasets/explorer) of all available datasets. ### Errors you might encounter #### AuthenticationError `AuthenticationError` occurs when the client fails to authenticate with the Tilebox API. This may happen if the provided API key is invalid or expired. A client instantiated with an invalid API key won't raise an error immediately, but an error will occur when making a request to the API. ```python Python client = Client(token="invalid-key") # runs without error datasets = client.datasets() # raises AuthenticationError ``` ```go Go package main import ( "context" "github.com/tilebox/tilebox-go/datasets/v1" "log" ) func main() { // runs without error client := datasets.NewClient(datasets.WithAPIKey("invalid-key")) // returns an error _, err := client.Datasets.List(context.Background()) if err != nil { log.Fatalf("Failed to list datasets: %v", err) } } ``` ## Next steps # Open Data Source: https://docs.tilebox.com/datasets/open-data Learn about the Open data available in Tilebox. On top of access to your own, private datasets, Tilebox provides access to a growing number of public datasets. These datasets are available to all users of Tilebox and are a great way to get started and prototype your applications even before data from your own satellites is available. If there is a dataset you would like to see in Tilebox, you can request it in the [Console open data page](https://console.tilebox.com/datasets/open-data). ## Accessing Open Data through Tilebox Accessing open datasets in Tilebox is straightforward and as easy as accessing your own datasets. Tilebox has already ingested the required metadata for each available dataset, so you can query, preview, and download the data seamlessly. This setup enables you to leverage performance optimizations and simplifies your workflows. By using the [datasets API](/datasets), you can start prototyping your applications and workflows easily. ## Available datasets The Tilebox Console contains in-depth descriptions of each dataset. Check out the [Sentinel 5P Tropomi](https://console.tilebox.com/datasets/explorer/bb394de4-b47f-4069-bc4c-6e6a2c9f0641?view=documentation) documentation as an example. ### Copernicus Data Space The [Copernicus Data Space](https://dataspace.copernicus.eu/) is an open ecosystem that provides free instant access to data and services from the Copernicus Sentinel missions. Tilebox currently supports the following datasets from the Copernicus Data Space: The Sentinel-1 mission is the European Radar Observatory for the Copernicus joint initiative of the European Commission (EC) and the European Space Agency (ESA). The Sentinel-1 mission includes C-band imaging operating in four exclusive imaging modes with different resolution (down to 5 m) and coverage (up to 400 km). It provides dual polarization capability, short revisit times and rapid product delivery. Sentinel-2 is equipped with an optical instrument payload that samples 13 spectral bands: four bands at 10 m, six bands at 20 m and three bands at 60 m spatial resolution. Sentinel-3 is equipped with multiple instruments whose data is available in Tilebox. `OLCI` (Ocean and Land Color Instrument) is an optical instrument used to provide data continuity for ENVISAT MERIS. `SLSTR` (Sea and Land Surface Temperature Radiometer) is a dual-view scanning temperature radiometer, which flies in low Earth orbit (800 - 830 km altitude). The `SRAL` (SAR Radar Altimeter) instrument comprises one nadir-looking antenna, and a central electronic chain composed of a Digital Processing Unit (DPU) and a Radio Frequency Unit (RFU). OLCI, in conjunction with the SLSTR instrument, provides the `SYN` products, providing continuity with SPOT VEGETATION. The primary goal of `TROPOMI` is to provide daily global observations of key atmospheric constituents related to monitoring and forecasting air quality, the ozone layer, and climate change. Landsat-8 is part of the long-running Landsat programme led by USGS and NASA and carries the Operational Land Imager (OLI) and the Thermal Infrared Sensor (TIRS). The Operational Land Imager (OLI), on board Landsat-8 measures in the VIS, NIR and SWIR portions of the spectrum. Its images have 15 m panchromatic and 30 m multi-spectral spatial resolutions along a 185 km wide swath, covering wide areas of the Earth's landscape while providing high enough resolution to distinguish features like urban centres, farms, forests and other land uses. The entire Earth falls within view once every 16 days due to Landsat-8's near-polar orbit. The Thermal Infra-Red Sensor instrument, on board Landsat-8, is a thermal imager operating in pushbroom mode with two Infra-Red channels: 10.8 µm and 12 µm with 100 m spatial resolution. ### Alaska Satellite Facility (ASF) The [Alaska Satellite Facility (ASF)](https://asf.alaska.edu/) is a NASA-funded research center at the University of Alaska Fairbanks. ASF supports a wide variety of research and applications using synthetic aperture radar (SAR) and related remote sensing technologies. Tilebox currently supports the following datasets from the Alaska Satellite Facility: European Remote Sensing Satellite (ERS) Synthetic Aperture Radar (SAR) Granules ### Umbra Space [Umbra](https://umbra.space/) satellites provide up to 16 cm resolution Synthetic Aperture Radar (SAR) imagery from space. The Umbra Open Data Program (ODP) features over twenty diverse time-series locations that are frequently updated, allowing users to explore SAR's capabilities. Tilebox currently supports the following datasets from Umbra Space: Time-series SAR data provided as Opendata by Umbra Space. # Querying data Source: https://docs.tilebox.com/datasets/query Learn how to query and load data from Tilebox datasets. Check out the examples below for common scenarios when loading data from collections. ```python Python from tilebox.datasets import Client client = Client() datasets = client.datasets() collections = datasets.open_data.copernicus.sentinel1_sar.collections() collection = collections["S1A_IW_RAW__0S"] ``` ```go Go package main import ( "context" "log" "github.com/tilebox/tilebox-go/datasets/v1" ) func main() { ctx := context.Background() client := datasets.NewClient() dataset, err := client.Datasets.Get(ctx, "open_data.copernicus.sentinel1_sar") if err != nil { log.Fatalf("Failed to get dataset: %v", err) } collection, err := client.Collections.Get(ctx, dataset.ID, "S1A_IW_RAW__0S") if err != nil { log.Fatalf("Failed to get collection: %v", err) } } ``` To load data points from a dataset collection, use the [load](/api-reference/python/tilebox.datasets/Collection.load) method. It requires a `time_or_interval` parameter to specify the time or time interval for loading. ## Filtering by time ### Time interval To load data for a specific time interval, use a `tuple` in the form `(start, end)` as the `time_or_interval` parameter. Both `start` and `end` must be [TimeScalars](#time-scalars), which can be `datetime` objects or strings in ISO 8601 format. ```python Python interval = ("2017-01-01", "2023-01-01") data = collection.load(interval, show_progress=True) ``` ```go Go startDate := time.Date(2017, time.January, 1, 0, 0, 0, 0, time.UTC) endDate := time.Date(2023, time.January, 1, 0, 0, 0, 0, time.UTC) interval := query.NewTimeInterval(startDate, endDate) var datapoints []*v1.Sentinel1Sar err = client.Datapoints.QueryInto(ctx, []uuid.UUID{collection.ID}, &datapoints, datasets.WithTemporalExtent(interval), ) if err != nil { log.Fatalf("Failed to query datapoints: %v", err) } log.Printf("Queried %d datapoints", len(datapoints)) ``` Output ```plaintext Python Size: 725MB Dimensions: (time: 1109597, latlon: 2) Coordinates: ingestion_time (time) datetime64[ns] 9MB 2024-06-21T11:03:33.8524... id (time) The `show_progress` parameter is optional and can be used to display a [tqdm](https://tqdm.github.io/) progress bar while loading data. A time interval specified as a tuple is interpreted as a half-closed interval. This means the start time is inclusive, and the end time is exclusive. For instance, using an end time of `2023-01-01` includes data points up to `2022-12-31 23:59:59.999`, but excludes those from `2023-01-01 00:00:00.000`. This behavior mimics the Python `range` function and is useful for chaining time intervals. ```python Python import xarray as xr data = [] for year in [2017, 2018, 2019, 2020, 2021, 2022]: interval = (f"{year}-01-01", f"{year + 1}-01-01") data.append(collection.load(interval, show_progress=True)) # Concatenate the data into a single dataset, which is equivalent # to the result of the single request in the code example above. data = xr.concat(data, dim="time") ``` ```go Go var datapoints []*v1.Sentinel1Sar for year := 2017; year <= 2022; year++ { startDate := time.Date(year, time.January, 1, 0, 0, 0, 0, time.UTC) interval := query.NewTimeInterval(startDate, startDate.AddDate(1, 0, 0)) var partialDatapoints []*v1.Sentinel1Sar err = client.Datapoints.QueryInto(ctx, []uuid.UUID{collection.ID}, &partialDatapoints, datasets.WithTemporalExtent(interval), ) if err != nil { log.Fatalf("Failed to query datapoints: %v", err) } // Concatenate the data into a single dataset, which is equivalent // to the result of the single request in the code example above. datapoints = append(datapoints, partialDatapoints...) } ``` Above example demonstrates how to split a large time interval into smaller chunks while loading data in separate requests. Typically, this is not necessary as the datasets client auto-paginates large intervals. ### Endpoint inclusivity For greater control over inclusivity of start and end times, you can use the `TimeInterval` dataclass instead of a tuple of two [TimeScalars](#time-scalars). This class allows you to specify the `start` and `end` times, as well as their inclusivity. Here's an example of creating equivalent `TimeInterval` objects in two different ways. ```python Python from datetime import datetime from tilebox.datasets.data import TimeInterval interval1 = TimeInterval( datetime(2017, 1, 1), datetime(2023, 1, 1), end_inclusive=False ) interval2 = TimeInterval( # python datetime granularity is in milliseconds datetime(2017, 1, 1), datetime(2022, 12, 31, 23, 59, 59, 999999), end_inclusive=True ) print("Inclusivity is indicated by interval notation: ( and [") print(interval1) print(interval2) print(f"They are equivalent: {interval1 == interval2}") print(interval2.to_half_open()) # Query data for a time interval data = collection.load(interval1, show_progress=True) ``` ```go Go interval1 := query.TimeInterval{ Start: time.Date(2017, time.January, 1, 0, 0, 0, 0, time.UTC), End: time.Date(2023, time.January, 1, 0, 0, 0, 0, time.UTC), EndInclusive: false, } interval2 := query.TimeInterval{ Start: time.Date(2017, time.January, 1, 0, 0, 0, 0, time.UTC), End: time.Date(2022, time.December, 31, 23, 59, 59, 999999999, time.UTC), EndInclusive: true, } log.Println("Inclusivity is indicated by interval notation: ( and [") log.Println(interval1.String()) log.Println(interval2.String()) log.Println("They are equivalent:", interval1.Equal(&interval2)) log.Println(interval2.ToHalfOpen().String()) // Query data for a time interval var datapoints []*v1.Sentinel1Sar err = client.Datapoints.QueryInto(ctx, []uuid.UUID{collection.ID}, &datapoints, datasets.WithTemporalExtent(interval1), ) ``` ```plaintext Output Inclusivity is indicated by interval notation: ( and [ [2017-01-01T00:00:00.000 UTC, 2023-01-01T00:00:00.000 UTC) [2017-01-01T00:00:00.000 UTC, 2022-12-31T23:59:59.999 UTC] They are equivalent: True [2017-01-01T00:00:00.000 UTC, 2023-01-01T00:00:00.000 UTC) ``` ### Time scalars You can load all datapoints linked to a specific timestamp by specifying a `TimeScalar` as the time query argument. A `TimeScalar` can be a `datetime` object or a string in ISO 8601 format. When passed to the `load` method, it retrieves all data points matching exactly that specified time, with a millisecond precision. A collection may contain multiple datapoints for one millisecond, so multiple data points could still be returned. If you want to fetch only a single data point, [query the collection by id](#loading-a-data-point-by-id) instead. Here's how to load a data point at a specific millisecond from a [collection](/datasets/concepts/collections). ```python Python data = collection.load("2024-08-01 00:00:01.362") print(data) ``` ```go Go temporalExtent := query.NewPointInTime(time.Date(2024, time.August, 1, 0, 0, 1, 362000000, time.UTC)) var datapoints []*v1.Sentinel1Sar err = client.Datapoints.QueryInto(ctx, []uuid.UUID{collection.ID}, &datapoints, datasets.WithTemporalExtent(temporalExtent), ) log.Printf("Queried %d datapoints", len(datapoints)) log.Printf("First datapoint time: %s", datapoints[0].GetTime().AsTime()) ``` Output ```plaintext Python Size: 721B Dimensions: (time: 1, latlon: 2) Coordinates: ingestion_time (time) datetime64[ns] 8B 2024-08-01T08:53:08.450499 id (time) Tilebox uses millisecond precision for timestamps. To load all data points for a specific second, it's a [time interval](/datasets/query#time-interval) request. Refer to the examples below for details. The output of the `load` method is an `xarray.Dataset` object. To learn more about Xarray, visit the dedicated [Xarray page](/sdks/python/xarray). ### Time iterables (Python only) You can specify a time interval by using an iterable of `TimeScalar`s as the `time_or_interval` parameter. This is especially useful when you want to use the output of a previous `load` call as input for another load. Here's how that works. ```python Python interval = ("2017-01-01", "2023-01-01") meta_data = collection.load(interval, skip_data=True) first_50_data_points = collection.load(meta_data.time[:50], skip_data=False) print(first_50_data_points) ``` ```plaintext Output Size: 33kB Dimensions: (time: 50, latlon: 2) Coordinates: ingestion_time (time) datetime64[ns] 400B 2024-06-21T11:03:33.852... id (time) ```python Python from datetime import datetime import pytz # Tokyo has a UTC+9 hours offset, so this is the same as # 2017-01-01 02:45:25.679 UTC tokyo_time = pytz.timezone('Asia/Tokyo').localize( datetime(2017, 1, 1, 11, 45, 25, 679000) ) print(tokyo_time) data = collection.load(tokyo_time) print(data) ``` ```go Go // Tokyo has a UTC+9 hours offset, so this is the same as // 2017-01-01 02:45:25.679 UTC location, _ := time.LoadLocation("Asia/Tokyo") tokyoTime := query.NewPointInTime(time.Date(2017, 1, 1, 11, 45, 25, 679000000, location)) log.Println(tokyoTime) var datapoints []*v1.Sentinel1Sar err = client.Datapoints.QueryInto(ctx, []uuid.UUID{collection.ID}, &datapoints, datasets.WithTemporalExtent(tokyoTime), ) if err != nil { log.Fatalf("Failed to query datapoints: %v", err) } log.Printf("Queried %d datapoints", len(datapoints)) // time is in UTC since API always returns UTC timestamps log.Printf("First datapoint time: %s", datapoints[0].GetTime().AsTime()) ``` Output ```plaintext Python 2017-01-01 11:45:25.679000+09:00 Size: 725B Dimensions: (time: 1, latlon: 2) Coordinates: ingestion_time (time) datetime64[ns] 8B 2024-06-21T11:03:33.852435 id (time) ## Filtering by area of interest [Spatio-temporal](/datasets/types/spatiotemporal) also come with spatial filtering capabilities. When querying, you can specify a time interval, and additionally also specify a bounding box or a polygon as an area of interest to filter by. Here is how to query Sentinel-2 `S2A_S2MSI2A` data over Colorado for April 2025. ```python Python from shapely import MultiPolygon from tilebox.datasets import Client area = MultiPolygon( [ (((-109.10, 40.98), (-102.01, 40.95), (-102.06, 36.82), (-109.06, 36.96), (-109.10, 40.98)),), ] ) client = Client() datasets = client.datasets() sentinel2_msi = datasets.open_data.copernicus.sentinel2_msi data = sentinel2_msi.collection("S2A_S2MSI2A").query( temporal_extent=("2025-04-01", "2025-05-02"), spatial_extent=area, show_progress=True, ) ``` ```go Go ctx := context.Background() client := datasets.NewClient() dataset, err := client.Datasets.Get(ctx, "open_data.copernicus.sentinel2_msi") if err != nil { log.Fatalf("Failed to get dataset: %v", err) } collection, err := client.Collections.Get(ctx, dataset.ID, "S2A_S2MSI2A") if err != nil { log.Fatalf("Failed to get collection: %v", err) } startDate := time.Date(2025, 4, 1, 0, 0, 0, 0, time.UTC) endDate := time.Date(2025, 5, 2, 0, 0, 0, 0, time.UTC) timeInterval := query.NewTimeInterval(startDate, endDate) area := orb.MultiPolygon{ { {{-109.10, 40.98}, {-102.01, 40.95}, {-102.06, 36.82}, {-109.06, 36.96}, {-109.10, 40.98}}, }, } var datapoints []*v1.Sentinel2Msi err = client.Datapoints.QueryInto(ctx, []uuid.UUID{collection.ID}, &datapoints, datasets.WithTemporalExtent(timeInterval), datasets.WithSpatialExtent(area), ) if err != nil { log.Fatalf("Failed to query datapoints: %v", err) } ``` ## Fetching only metadata Sometimes, it may be useful to load only dataset metadata fields without the actual data fields. This can be done by setting the `skip_data` parameter to `True`. For example, when only checking if a datapoint exists, you may want to use `skip_data=True` to avoid loading the data fields. If this flag is set, the response will only include the required fields for the given dataset type, but no custom data fields. ```python Python data = collection.load("2024-08-01 00:00:01.362", skip_data=True) print(data) ``` ```go Go temporalExtent := query.NewPointInTime(time.Date(2024, time.August, 1, 0, 0, 1, 362000000, time.UTC)) var datapoints []*v1.Sentinel1Sar err = client.Datapoints.QueryInto(ctx, []uuid.UUID{collection.ID}, &datapoints, datasets.WithTemporalExtent(temporalExtent), datasets.WithSkipData(), ) if err != nil { log.Fatalf("Failed to query datapoints: %v", err) } log.Printf("Queried %d datapoints", len(datapoints)) log.Printf("First datapoint time: %s", datapoints[0].GetTime().AsTime()) ``` Output ```plaintext Python Size: 160B Dimensions: (time: 1) Coordinates: ingestion_time (time) datetime64[ns] 8B 2024-08-01T08:53:08.450499 id (time) ## Empty response The `load` method always returns an `xarray.Dataset` object, even if there are no data points for the specified query. In such cases, the returned dataset will be empty, but no error will be raised. ```python Python time_with_no_data_points = "1997-02-06 10:21:00" data = collection.load(time_with_no_data_points) print(data) ``` ```go Go timeWithNoDatapoints := query.NewPointInTime(time.Date(1997, time.February, 6, 10, 21, 0, 0, time.UTC)) var datapoints []*v1.Sentinel1Sar err = client.Datapoints.QueryInto(ctx, []uuid.UUID{collection.ID}, &datapoints, datasets.WithTemporalExtent(timeWithNoDatapoints), ) if err != nil { log.Fatalf("Failed to query datapoints: %v", err) } log.Printf("Queried %d datapoints", len(datapoints)) ``` Output ```plaintext Python Size: 0B Dimensions: () Data variables: *empty* ``` ```plaintext Go Queried 0 datapoints ``` ## By datapoint ID If you know the ID of the data point you want to load, you can use [find](/api-reference/python/tilebox.datasets/Collection.find). This method always returns a single data point or raises an exception if no data point with the specified ID exists. ```python Python datapoint_id = "01916d89-ba23-64c9-e383-3152644bcbde" datapoint = collection.find(datapoint_id) print(datapoint) ``` ```go Go datapointID := uuid.MustParse("01910b3c-8552-424d-e116-81d0c3402ccc") var datapoint v1.Sentinel1Sar err = client.Datapoints.GetInto(ctx, []uuid.UUID{collection.ID}, datapointID, &datapoint, ) if err != nil { log.Fatalf("Failed to query datapoint: %v", err) } fmt.Println(protojson.Format(&datapoint)) ``` Output ```plaintext Python Size: 725B Dimensions: (latlon: 2) Coordinates: ingestion_time datetime64[ns] 8B 2024-08-20T05:53:08.600528 id You can also set the `skip_data` parameter when calling `find` to query only the required fields of the data point, same as for `load`. ## Automatic pagination Querying large time intervals can return a large number of data points. Tilebox automatically handles pagination for you by sending paginated requests to the server. # Spatio-temporal Source: https://docs.tilebox.com/datasets/types/spatiotemporal Spatio-temporal datasets link each data point to a specific point in time and a location on the Earth's surface. Spatio-temporal datasets are currently in development and not available yet. Stay tuned for updates Each spatio-temporal dataset comes with a set of required and auto-generated fields for each data point. ## Required fields While the specific data fields between different time series datasets can vary, there are common fields that all time series datasets share. The timestamp associated with each data point. Timestamps are always in UTC. For indexing and querying, Tilebox truncates timestamps to millisecond precision. But Timeseries datasets may contain arbitrary custom `Timestamp` fields that store timestamps up to a nanosecond precision. A location on the earth's surface associated with each data point. Supported geometry types are `Polygon`, `MultiPolygon`, `Point` and `MultiPoint`. ## Auto-generated fields A [universally unique identifier (UUID)](https://en.wikipedia.org/wiki/Universally_unique_identifier) that uniquely identifies each data point. IDs are generated so that sorting them lexicographically also sorts them by time. IDs generated by Tilebox are deterministic, meaning that ingesting the exact same data values into the same collection will always result in the same ID. The time the data point was ingested into the Tilebox API. ## Creating a spatio-temporal dataset To create a spatio-temporal dataset, use the [Tilebox Console](/console) and select `Spatio-temporal Dataset` as the dataset type. The required and auto-generated fields already outlined will be automatically added to the dataset schema. ## Spatio-temporal queries Spatio-temporal datasets support efficient time-based and spatially filtered queries. To query a specific location in a given time interval, specify a time range and a geometry when [querying data points](/datasets/query) from a collection. # Timeseries Source: https://docs.tilebox.com/datasets/types/timeseries Timeseries datasets link each data point to a specific point in time. Each timeseries dataset comes with a set of required and auto-generated fields for each data point. ## Required fields While the specific data fields between different time series datasets can vary, there are common fields that all time series datasets share. The timestamp associated with each data point. Timestamps are always in UTC. For indexing and querying, Tilebox truncates timestamps to millisecond precision. But Timeseries datasets may contain arbitrary custom `Timestamp` fields that store timestamps up to a nanosecond precision. ## Auto-generated fields A [universally unique identifier (UUID)](https://en.wikipedia.org/wiki/Universally_unique_identifier) that uniquely identifies each data point. IDs are generated so that sorting them lexicographically also sorts them by time. IDs generated by Tilebox are deterministic, meaning that ingesting the exact same data values into the same collection will always result in the same ID. The time the data point was ingested into the Tilebox API. ## Creating a timeseries dataset To create a timeseries dataset, use the [Tilebox Console](/console) and select `Timeseries Dataset` as the dataset type. The required and auto-generated fields already outlined will be automatically added to the dataset schema. ## Time-based queries Timeseries datasets support time-based queries. To query a specific time interval, specify a time range when [querying data](/datasets/query) from a collection. # Creating a dataset Source: https://docs.tilebox.com/guides/datasets/create Learn how to create a dataset This page guides you through the process of creating a dataset in Tilebox using the [Tilebox Console](/console). ## Related documentation Learn about Tilebox datasets and how to use them. Learn about Timeseries datasets, which link each data point to a specific point in time. ## Creating a dataset in the Console Create a dataset in Tilebox by going to [My datasets](https://console.tilebox.com/datasets/my-datasets) and clicking the `Create dataset` button. Choose a [dataset kind](/datasets/concepts/datasets#dataset-types) from the dropdown menu. Required fields for the selected dataset kind are automatically added. Tilebox Console Tilebox Console Complete these fields: * `Name` is the name of your dataset. * `Code name` is a unique identifier for the dataset within your team. It's automatically generated, but you can adjust it if needed. * `Description` is a brief description of the dataset purpose. Tilebox Console Tilebox Console Specify the fields for your dataset. Each field has these properties: * `Name` is the name of the field (it should be `snake_case`). * `Type` and `Array` let you specify the field data type and whether it's an array. See below for an explanation of the available data. * `Description` is an optional brief description of the field. You can use it to provide more context and details about the data. * `Example value` is an optional example for this field. It can be useful for documentation purposes. Tilebox Console Tilebox Console Once you're done completing the fields, click "Create" to create and save the dataset. You are redirected to your newly created dataset. ## Automatic dataset schema documentation By specifying the fields for your dataset, including the data type, description, and an example value for each one, Tilebox is capable of automatically generating a documentation page for your dataset schema. Dataset schema overview Dataset schema overview ## Adding extra documentation You can also add custom documentation to your dataset, providing more context and details about the data included data. This documentation supports rich formatting, including links, tables, code snippets, and more. Tilebox Console Tilebox Console To add documentation, click the edit pencil button on the dataset page to open the documentation editor. You can use Markdown to format your documentation; you can include links, tables, code snippets, and other Markdown features. If you don't see the edit pencil button, you don't have the required permissions to edit the documentation. Once you are done editing the documentation, click the `Save` button to save your changes. ## Changing the dataset schema You can always add new fields to a dataset. If you want to remove or edit existing fields, you'll first need to empty all collections in the dataset. Then, you can edit the dataset schema in the console. # Ingesting data Source: https://docs.tilebox.com/guides/datasets/ingest Learn how to ingest an existing dataset into Tilebox This guide is also available as a Google Colab notebook. Click here for an interactive version. This page guides you through the process of ingesting data into a Tilebox dataset. Starting from an existing dataset available as file in the [GeoParquet](https://geoparquet.org/) format, you'll go through the process of ingesting that data into Tilebox as a [Timeseries](/datasets/types/timeseries) dataset. If you have your data in a different format, check out the [Ingesting from common file formats](/guides/datasets/ingest-format) examples of how to ingest it. ## Related documentation Learn about Tilebox datasets and how to use them. Learn how to ingest data into a Tilebox dataset. ## Downloading the example dataset The dataset used in this example is available as a [GeoParquet](https://geoparquet.org/) file. You can download it from here: [modis\_MCD12Q1.geoparquet](https://storage.googleapis.com/tbx-web-assets-2bad228/docs/data-samples/modis_MCD12Q1.geoparquet). ## Installing the necessary packages This example uses a couple of python packages for reading parquet files and for visualizing the dataset. Install the required packages using your preferred package manager. For new projects, Tilebox recommend using [uv](https://docs.astral.sh/uv/). ```bash uv uv add tilebox-datasets geopandas folium matplotlib mapclassify ``` ```bash pip pip install tilebox-datasets geopandas folium matplotlib mapclassify ``` ```bash poetry poetry add tilebox-datasets="*" geopandas="*" folium="*" matplotlib="*" mapclassify="*" ``` ```bash pipenv pipenv install tilebox-datasets geopandas folium matplotlib mapclassify ``` ## Reading and previewing the dataset The dataset is available as a [GeoParquet](https://geoparquet.org/) file. You can read it using the `geopandas.read_parquet` function. ```python Python import geopandas as gpd modis_data = gpd.read_parquet("modis_MCD12Q1.geoparquet") modis_data.head(5) ``` ```plaintext Output time end_time granule_name geometry horizontal_tile_number vertical_tile_number tile_id file_size checksum checksum_type day_night_flag browse_granule_id published_at 0 2001-01-01 00:00:00+00:00 2001-12-31 23:59:59+00:00 MCD12Q1.A2001001.h00v08.061.2022146024956.hdf POLYGON ((-180 10, -180 0, -170 0, -172.62252 ... 0 8 51000008 275957 941243048 CKSUM Day None 2022-06-23 10:54:43.824000+00:00 1 2001-01-01 00:00:00+00:00 2001-12-31 23:59:59+00:00 MCD12Q1.A2001001.h00v09.061.2022146024922.hdf POLYGON ((-180 0, -180 -10, -172.62252 -10, -1... 0 9 51000009 285389 3014510714 CKSUM Day None 2022-06-23 10:54:44.697000+00:00 2 2001-01-01 00:00:00+00:00 2001-12-31 23:59:59+00:00 MCD12Q1.A2001001.h00v10.061.2022146032851.hdf POLYGON ((-180 -10, -180 -20, -180 -20, -172.6... 0 10 51000010 358728 2908215698 CKSUM Day None 2022-06-23 10:54:44.669000+00:00 3 2001-01-01 00:00:00+00:00 2001-12-31 23:59:59+00:00 MCD12Q1.A2001001.h01v08.061.2022146025203.hdf POLYGON ((-172.62252 10, -170 0, -160 0, -162.... 1 8 51001008 146979 1397661843 CKSUM Day None 2022-06-23 10:54:44.309000+00:00 4 2001-01-01 00:00:00+00:00 2001-12-31 23:59:59+00:00 MCD12Q1.A2001001.h01v09.061.2022146025902.hdf POLYGON ((-170 0, -172.62252 -10, -162.46826 -... 1 9 51001009 148935 2314263965 CKSUM Day None 2022-06-23 10:54:44.023000+00:00 ``` ## Exploring it visually Geopandas comes with a built in explorer to visually explore the dataset. ```python Python modis_data.head(1000).explore(width=800, height=600) ``` Explore the MODIS dataset Explore the MODIS dataset ## Create a Tilebox dataset Now you'll create a [Timeseries](/datasets/types/timeseries) dataset with the same schema as the given MODIS dataset. To do so, you'll use the [Tilebox Console](/console), navigate to `My Datasets` and click `Create Dataset`. Then select `Timeseries Dataset` as the dataset type. For more information on creating a dataset, check out the [Creating a dataset](/guides/datasets/create) guide for a Step by step guide. Now, to match the given MODIS dataset, you'll specify the following fields: | Field | Type | Note | | ------------------------ | --------- | ---------------------------------------- | | granule\_name | string | MODIS granule name | | geometry | Geometry | Tile boundary coordinates of the granule | | end\_time | Timestamp | Measurement end time | | horizontal\_tile\_number | int64 | Horizontal modis tile number (0-35) | | vertical\_tile\_number | int64 | Vertical modis tile number (0-17) | | tile\_id | int64 | Modis Tile ID | | file\_size | uint64 | File size of the product in bytes | | checksum | string | Hash checksum of the file | | checksum\_type | string | Checksum algorithm (MD5 / CKSUM) | | day\_night\_flag | int64 | Day / Night / Both | | browse\_granule\_id | string | Optional granule ID for browsing | | published\_at | Timestamp | The time the product was published | In the console, this will look like the following: Tilebox Console Tilebox Console ## Access the dataset from Python Your newly created dataset is now available. You can access it from Python. For this, you'll need to know the dataset slug, which was assigned automatically based on the specified `code_name`. To find out the slug, navigate to the dataset overview in the console. Explore the MODIS dataset Explore the MODIS dataset You can now instantiate the dataset client and access the dataset. ```python Python from tilebox.datasets import Client client = Client() dataset = client.dataset("tilebox.modis") # replace with your dataset slug ``` ## Create a collection Next, you'll create a collection to insert your data into. ```python Python collection = dataset.get_or_create_collection("MCD12Q1") ``` ## Ingest the data Now, you'll finally ingest the MODIS data into the collection. ```python Python datapoint_ids = collection.ingest(modis_data) print(f"Successfully ingested {len(datapoint_ids)} datapoints!") ``` ```plaintext Output Successfully ingested 7245 datapoints! ``` ## Query the newly ingested data You can now query the newly ingested data. You can query a subset of the data for a specific time range. Since the data is now stored directly in the Tilebox dataset, you can query and access it from anywhere. ```python Python data = collection.load(("2015-01-01", "2020-01-01")) data ``` ```plaintext Output Size: 403kB Dimensions: (time: 1575) Coordinates: * time (time) datetime64[ns] 13kB 2015-01-01 ... 2019-01-01 Data variables: (12/14) id (time) For more information on accessing and querying data, check out [querying data](/datasets/query). ## View the data in the console You can also view your data in the Console, by navigate to the dataset, selecting the collection and then clicking on one of the data points. Explore the MODIS dataset Explore the MODIS dataset ## Next steps Congrats. You've successfully ingested data into Tilebox. You can now explore the data in the console and use it for further processing and analysis. Learn all about [querying your newly created dataset](https://docs.tilebox.com/datasets/query) Explore the different dataset types available in Tilebox Check out a growing number of publicly available open data datasets on Tilebox # Ingesting from common file formats Source: https://docs.tilebox.com/guides/datasets/ingest-format Learn how to ingest data from common file formats into Tilebox Through the usage of `xarray` and `pandas` you can also easily ingest existing datasets available in file formats, such as CSV, [Parquet](https://parquet.apache.org/), [Feather](https://arrow.apache.org/docs/python/feather.html) and more. ## CSV Comma-separated values (CSV) is a common file format for tabular data. It's widely used in data science. Tilebox supports CSV ingestion using the `pandas.read_csv` function. Assuming you have a CSV file named `data.csv` with the following content. If you want to follow along, you can download the file [here](https://storage.googleapis.com/tbx-web-assets-2bad228/docs/data-samples/ingestion_data.csv). ```csv ingestion_data.csv time,value,sensor,precise_time,sensor_history,some_unwanted_column 2025-03-28T11:44:23Z,45.16,A,2025-03-28T11:44:23.345761444Z,"[-12.15, 13.45, -8.2, 16.5, 45.16]","Unsupported" 2025-03-28T11:45:19Z,273.15,B,2025-03-28T11:45:19.128742312Z,"[300.16, 280.12, 273.15]","Unsupported" ``` This data already conforms to the schema of the `MyCustomDataset` dataset, except for `some_unwanted_column` which you want to drop before you ingest it. Here is how this could look like: ```python Python import pandas as pd data = pd.read_csv("ingestion_data.csv") data = data.drop(columns=["some_unwanted_column"]) collection = dataset.get_or_create_collection("CSVMeasurements") collection.ingest(data) ``` ## Parquet [Apache Parquet](https://parquet.apache.org/) is an open source, column-oriented data file format designed for efficient data storage and retrieval. Tilebox supports Parquet ingestion using the `pandas.read_parquet` function. The parquet file used in this example [is available here](https://storage.googleapis.com/tbx-web-assets-2bad228/docs/data-samples/ingestion_data.parquet). ```python Python import pandas as pd data = pd.read_parquet("ingestion_data.parquet") # our data already conforms to the schema of the MyCustomDataset # dataset, so lets ingest it collection = dataset.get_or_create_collection("ParquetMeasurements") collection.ingest(data) ``` ## Feather [Feather](https://arrow.apache.org/docs/python/feather.html) is a file format originating from the Apache Arrow project, designed for storing tabular data in a fast and memory-efficient way. It's supported by many programming languages, including Python. Tilebox supports Feather ingestion using the `pandas.read_feather` function. The feather file used in this example [is available here](https://storage.googleapis.com/tbx-web-assets-2bad228/docs/data-samples/ingestion_data.feather). ```python Python import pandas as pd data = pd.read_feather("ingestion_data.feather") # our data already conforms to the schema of the MyCustomDataset # dataset, so lets ingest it collection = dataset.get_or_create_collection("FeatherMeasurements") collection.ingest(data) ``` ## GeoParquet Please check out the [Ingesting data](/guides/datasets/ingest) guide for an example of ingesting a GeoParquet file. # Multi-language Workflows Source: https://docs.tilebox.com/guides/workflows/multi-language Learn how to create workflows that use tasks written in different languages The code for this guide is available on GitHub. ## Tilebox languages and SDKs Tilebox supports multiple languages and SDKs for running workflows. All Tilebox SDKs and workflows are designed to be interoperable, which means it's possible to have a workflow where individual tasks are executed in different languages. Check out [Languages & SDKs](/sdks/introduction) to learn more about currently available programming SDKs. ## Why multi-language workflows? You might need to use multiple languages in a single workflow for many reasons, such as: * You want to use a language that is better suited for a specific task (for example Python for data processing, Go for a backend API) * You want to use a library that is only available in a specific language (for example xarray in Python) * You started prototyping in Python, but need to start migrating the compute-intensive parts of your workflow to a different language for performance reasons ## Multi-language workflow example This guide will tackle the first use case: you have a tasking server in Go and want to offload some of the processing to Python. ## Defining tasks in Python and Go ```python Python class ScheduleImageCapture(Task): # The input parameters must match the ones defined in the Go task location: tuple[float, float] # lat_lon resolution_m: int spectral_bands: list[float] # spectral bands in nm def execute(self, context: ExecutionContext) -> None: # Here you can implement your task logic, submit subtasks, etc. print(f"Image captured for {self.location} with {self.resolution_m}m resolution and bands {self.spectral_bands}") @staticmethod def identifier() -> tuple[str, str]: # The identifier must match the one defined in the Go task return "tilebox.com/schedule_image_capture", "v1.0" ``` ```go Go type ScheduleImageCapture struct { // json tags must match the Python task definition Location [2]float64 `json:"location"` // lat_lon ResolutionM int `json:"resolution_m"` SpectralBands []float64 `json:"spectral_bands"` // spectral bands in nm } // No need to define the Execute method since we're only submitting the task // Identifier must match with the task identifier in the Python runner func (t *ScheduleImageCapture) Identifier() workflows.TaskIdentifier { return workflows.NewTaskIdentifier("tilebox.com/schedule_image_capture", "v1.0") } ``` A couple important points to note: The dataclass parameters in Python must match the struct fields in Go, including the types and the names (through the JSON tags in Go). Due to Go and Python having different naming conventions, it's recommended to use JSON tags in the Go struct to match the Python dataclass field names to respect the language-specific conventions. Go fields must start with an uppercase letter to be serialized to JSON. The need for JSON tags in the preceding Go code is currently necessary but might be removed in the future. The execute method is defined in the Python task but not in the Go task since Go will only be used to submit the task, not executing it. It's necessary to define the `identifier` method in both the Python and Go tasks and to make sure they match. The `identifier` method has two values, the first being an arbitrary unique task identifier and the second being the version in the `v{major}.{minor}` format. ## Creating a Go server that submits jobs Write a simple HTTP tasking server in Go with a `/submit` endpoint that accepts requests to submit a `ScheduleImageCapture` job. Both Go and Python code are using `test-cluster-tZD9Ca2qsqt4V` as the cluster slug. You should replace it with your own cluster slug, which you can create in the [Tilebox Console](https://console.tilebox.com/workflows/clusters). ```go Go func main() { log.Println("Server starting on http://localhost:8080") client := workflows.NewClient() http.HandleFunc("/submit", submitHandler(client)) log.Fatal(http.ListenAndServe(":8080", nil)) } // Submit a job based on some query parameters func submitHandler(client *workflows.Client) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { latArg := r.URL.Query().Get("lat") lonArg := r.URL.Query().Get("lon") resolutionArg := r.URL.Query().Get("resolution") bandsArg := r.URL.Query().Get("bands[]") latFloat, err := strconv.ParseFloat(latArg, 64) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } lonFloat, err := strconv.ParseFloat(lonArg, 64) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } resolutionM, err := strconv.Atoi(resolutionArg) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } var spectralBands []float64 for _, bandArg := range strings.Split(bandsArg, ",") { band, err := strconv.ParseFloat(bandArg, 64) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } spectralBands = append(spectralBands, band) } job, err := client.Jobs.Submit(r.Context(), "Schedule Image capture", []workflows.Task{ &ScheduleImageCapture{ Location: [2]float64{latFloat, lonFloat}, ResolutionM: resolutionM, SpectralBands: spectralBands, }, }, ) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } _, _ = io.WriteString(w, fmt.Sprintf("Job submitted: %s\n", job.ID)) } } ``` In the same way that you can submit jobs across languages you can also submit subtasks across languages. ## Creating a Python runner Write a Python script that starts a task runner and registers the `ScheduleImageCapture` task. ```python Python from tilebox.workflows import Client def main(): client = Client() runner = client.runner(tasks=[ScheduleImageCapture]) runner.run_forever() if __name__ == "__main__": main() ``` ## Testing it Start the Go server. ```bash Shell go run . ``` In another terminal, start the Python runner. ```bash Shell uv run runner.py ``` Submit a job to the Go server. ```bash Shell curl http://localhost:8080/submit?lat=40.75&lon=-73.98&resolution=30&bands[]=489.0,560.6,666.5 ``` Check the Python runner output, it should print the following line: ```plaintext Output Image captured for [40.75, -73.98] with 30m resolution and bands [489, 560.6, 666.5] ``` ## Next Steps The code for this guide is available on GitHub. As a learning exercise, you can try to change the [News API Workflow](/workflows/concepts/tasks#dependencies-example) to replace the `FetchNews` task with a Go task and keep all the other tasks in Python. You'll learn how to submit a subtask in another language than what the current task is executed in. # Tilebox Source: https://docs.tilebox.com/introduction The Solar System's #1 developer tool for space data management export const HeroCard = ({children, title, description, href}) => { return {children}

{title}

{description}

; }; Tilebox is a lightweight space data management and orchestration software - on ground and in orbit. It provides a framework that simplifies access, processing, and distribution of space data across different environments enabling efficient multi-language, multi-cluster workflows. Tilebox integrates seamlessly with your existing infrastructure, ensuring that you maintain complete control over your data and algorithms. ## Modules Tilebox consists of two primary modules: Datasets Workflows ## Getting Started To get started, check out some of the following resources: ## Guides You can also start by looking through these guides: Learn about time-series datasets and their structure. Discover how to query and load data from a dataset. Gain a deeper understanding of how to create tasks using the Tilebox Workflow Orchestrator. Find out how to deploy Task Runners to run workflows in a parallel, distributed manner. # Quickstart Source: https://docs.tilebox.com/quickstart This guide helps you set up and get started using Tilebox. It covers how to install a Tilebox client for your preferred language and how to use it to query data from a dataset and run a workflow. Select your preferred language and follow the steps below to get started. ## Start in a Notebook Explore the provided [Sample Notebooks](/sdks/python/sample-notebooks) to begin your journey with Tilebox. These notebooks offer a step-by-step guide to using the API and showcase many features supported by Tilebox Python clients. You can also use these notebooks as a foundation for your own projects. ## Start on Your Device If you prefer to work locally, follow these steps to get started. Install the Tilebox Python packages. ```bash uv uv add tilebox-datasets tilebox-workflows tilebox-storage ``` ```bash pip pip install tilebox-datasets tilebox-workflows tilebox-storage ``` ```bash poetry poetry add tilebox-datasets="*" tilebox-workflows="*" tilebox-storage="*" ``` ```bash pipenv pipenv install tilebox-datasets tilebox-workflows tilebox-storage ``` For new projects we recommend using [uv](https://docs.astral.sh/uv/). More information about installing the Tilebox Python SDKs can be found in the [Installation](/sdks/python/install) section. Create an API key by logging into the [Tilebox Console](https://console.tilebox.com), navigating to [Account -> API Keys](https://console.tilebox.com/account/api-keys), and clicking the "Create API Key" button. Tilebox Console Tilebox Console Copy the API key and keep it somewhere safe. You will need it to authenticate your requests. Use the datasets client to query data from a dataset. ```python Python from tilebox.datasets import Client client = Client(token="YOUR_TILEBOX_API_KEY") # select a dataset datasets = client.datasets() dataset = datasets.open_data.copernicus.sentinel2_msi # and load data from a collection in a given time range collection = dataset.collection("S2A_S2MSI1C") data_january_2022 = collection.load(("2022-01-01", "2022-02-01")) ``` Use the workflows client to create a task and submit it as a job. ```python Python from tilebox.workflows import Client, Task # Replace with your actual token client = Client(token="YOUR_TILEBOX_API_KEY") class HelloWorldTask(Task): greeting: str = "Hello" name: str = "World" def execute(self, context): print(f"{self.greeting} {self.name}, from the main task!") context.submit_subtask(HelloSubtask(name=self.name)) class HelloSubtask(Task): name: str def execute(self, context): print(f"Hello from the subtask, {self.name}!") # Initiate the job jobs = client.jobs() jobs.submit("parameterized-hello-world", HelloWorldTask(greeting="Greetings", name="Universe")) # Run the tasks runner = client.runner(tasks=[HelloWorldTask, HelloSubtask]) runner.run_all() ``` Review the following guides to learn more about the modules that make up Tilebox: Learn how to create a Timeseries dataset using the Tilebox Console. Learn how to ingest an existing CSV dataset into a Timeseries dataset collection. ## Start with Examples Explore the provided [Examples](/sdks/go/examples) to begin your journey with Tilebox. These examples offer a step-by-step guide to using the API and showcase many features supported by Tilebox Go clients. You can also use these examples as a foundation for your own projects. ## Start on Your Device If you prefer to work locally, follow these steps to get started. Add the Tilebox library in your project. ```bash Shell go get github.com/tilebox/tilebox-go ``` Install [tilebox-generate](https://github.com/tilebox/tilebox-generate) command-line tool on your machine. It's used to generate Go structs for Tilebox datasets. ```bash Shell go install github.com/tilebox/tilebox-generate@latest ``` Create an API key by logging into the [Tilebox Console](https://console.tilebox.com), navigating to [Account -> API Keys](https://console.tilebox.com/account/api-keys), and clicking the "Create API Key" button. Tilebox Console Tilebox Console Copy the API key and keep it somewhere safe. You will need it to authenticate your requests. Run [tilebox-generate](https://github.com/tilebox/tilebox-generate) in the root directory of your Go project. It generates the dataset type for Sentinel-2 MSI dataset. It will generate a `./protogen/tilebox/v1/sentinel2_msi.pb.go` file. ```bash Shell tilebox-generate --dataset open_data.copernicus.sentinel2_msi --tilebox-api-key $TILEBOX_API_KEY ``` Use the datasets client to query data from a dataset. ```go Go package main import ( "context" "log" "log/slog" "time" "github.com/google/uuid" "github.com/paulmach/orb" "github.com/paulmach/orb/encoding/wkt" "github.com/tilebox/tilebox-go/datasets/v1" "github.com/tilebox/tilebox-go/query" ) func main() { ctx := context.Background() client := datasets.NewClient() // select a dataset dataset, err := client.Datasets.Get(ctx, "open_data.copernicus.sentinel2_msi") if err != nil { log.Fatalf("Failed to get dataset: %v", err) } // select a collection collection, err := client.Collections.Get(ctx, dataset.ID, "S2A_S2MSI1C") if err != nil { log.Fatalf("Failed to get collection: %v", err) } // load data from a collection in a given time range and spatial extent colorado := orb.Polygon{ {{-109.05, 37.09}, {-102.06, 37.09}, {-102.06, 41.59}, {-109.05, 41.59}, {-109.05, 37.09}}, } startDate := time.Date(2025, time.March, 1, 0, 0, 0, 0, time.UTC) endDate := time.Date(2025, time.April, 1, 0, 0, 0, 0, time.UTC) march2025 := query.NewTimeInterval(startDate, endDate) // You have to use tilebox-generate to generate the dataset type var datapointsOverColorado []*v1.Sentinel2Msi err = client.Datapoints.QueryInto(ctx, []uuid.UUID{collection.ID}, &datapointsOverColorado, datasets.WithTemporalExtent(march2025), datasets.WithSpatialExtent(colorado), ) if err != nil { log.Fatalf("Failed to query datapoints: %v", err) } slog.Info("Found datapoints over Colorado in March 2025", slog.Int("count", len(datapointsOverColorado))) slog.Info("First datapoint over Colorado", slog.String("id", datapointsOverColorado[0].GetId().AsUUID().String()), slog.Time("event time", datapointsOverColorado[0].GetTime().AsTime()), slog.Time("ingestion time", datapointsOverColorado[0].GetIngestionTime().AsTime()), slog.String("geometry", wkt.MarshalString(datapointsOverColorado[0].GetGeometry().AsGeometry())), slog.String("granule name", datapointsOverColorado[0].GetGranuleName()), slog.String("processing level", datapointsOverColorado[0].GetProcessingLevel().String()), slog.String("product type", datapointsOverColorado[0].GetProductType()), // and so on... ) } ``` Use the workflows client to create a task and submit it as a job. ```go Go package main import ( "context" "log/slog" "github.com/tilebox/tilebox-go/workflows/v1" ) type HelloTask struct { Greeting string Name string } func (t *HelloTask) Execute(ctx context.Context) error { slog.InfoContext(ctx, "Hello from the main task!", slog.String("Greeting", t.Greeting), slog.String("Name", t.Name)) err := workflows.SubmitSubtasks(ctx, &HelloSubtask{Name: t.Name}) if err != nil { return err } return nil } type HelloSubtask struct { Name string } func (t *HelloSubtask) Execute(context.Context) error { slog.Info("Hello from the subtask!", slog.String("Name", t.Name)) return nil } func main() { ctx := context.Background() // Replace with your actual token client := workflows.NewClient() job, err := client.Jobs.Submit(ctx, "hello-world", []workflows.Task{ &HelloTask{ Greeting: "Greetings", Name: "Tilebox", }, }, ) if err != nil { slog.ErrorContext(ctx, "Failed to submit job", slog.Any("error", err)) return } slog.InfoContext(ctx, "Job submitted", slog.String("job_id", job.ID.String())) runner, err := client.NewTaskRunner(ctx) if err != nil { slog.Error("failed to create task runner", slog.Any("error", err)) return } err = runner.RegisterTasks( &HelloTask{}, &HelloSubtask{}, ) if err != nil { slog.Error("failed to register task", slog.Any("error", err)) return } runner.Run(ctx) } ``` Review the following guides to learn more about the modules that make up Tilebox: Learn how to create a Timeseries dataset using the Tilebox Console. Learn how to ingest an existing CSV dataset into a Timeseries dataset collection. # Examples Source: https://docs.tilebox.com/sdks/go/examples Examples maintained to use and learn from. To quickly become familiar with the Go client, you can explore some standalone examples. You can access the examples on [ GitHub](https://github.com/tilebox/tilebox-go/tree/main/examples). More examples can be found throughout the docs. ## Workflows examples How to use Tilebox Workflows to submit and execute a simple task. How to submit a task and run a workflow using protobuf messages. How to set up tracing and logging for workflows using Axiom observability platform. How to set up tracing and logging for workflows using OpenTelemetry. ## Datasets examples How to query datapoints from a Tilebox dataset. How to create a collection, ingest datapoints, and then delete them. # Installation Source: https://docs.tilebox.com/sdks/go/install Install the Tilebox Go library ## Package Overview Tilebox offers a Go SDK for accessing Tilebox services. It additionally includes a command-line tool (tilebox-generate) that can be installed separately. Datasets and workflows client for Tilebox Command-line tool to generate Tilebox datasets types for Go ## Installation Add `tilebox-go` to your project. ```bash Shell go get github.com/tilebox/tilebox-go ``` Install `tilebox-generate` command-line tool on your machine. ```bash Shell go install github.com/tilebox/tilebox-generate@latest ``` # Protobuf Source: https://docs.tilebox.com/sdks/go/protobuf Overview of protobuf, common use cases, and implementation details. Tilebox uses [Protocol Buffers](https://protobuf.dev/), with a custom generation tool, combined with standard Go data structures. [Protocol Buffers](https://protobuf.dev/) (often referred to as `protobuf`) is a schema definition language with an efficient binary format and native language support for lots of languages, including Go. Protocol buffers are open source since 2008 and are maintained by Google. ## tilebox-generate Protobuf schemas are typically defined in a `.proto` file, and then converted to a native Go struct using the protobuf compiler. Tilebox datasets already define a protobuf schema as well, and automate the generation of Go structs for existing datasets through a quick `tilebox-generate` command-line tool. See [Installation](/sdks/go/install) for more details on how to install `tilebox-generate`. ```sh tilebox-generate --dataset open_data.copernicus.sentinel1_sar ``` The preceding command will generate a `./protogen/tilebox/v1/sentinel1_sar.pb.go` file. More flags can be set to change the default output folders, package name, etc. This file contains everything needed to work with the [Sentinel-1 SAR](https://console.tilebox.com/datasets/explorer/e27e6a58-c149-4379-9fdf-9d43903cba74) dataset. It's recommended to check the generated files you use in your version control system. If you open this file, you will see that it starts with `// Code generated by protoc-gen-go. DO NOT EDIT.`. It means that the file was generated by the `protoc-gen-go` tool, which is part of the protobuf compiler. After editing a dataset, you can call the generate command again to ensure that the changes are reflected in the generated file. The file contains a `Sentinel1Sar` struct, which is a Go struct that represents a datapoint in the dataset. ```go Go type Sentinel1Sar struct { xxx_hidden_GranuleName *string `protobuf:"bytes,1,opt,name=granule_name,json=granuleName"` xxx_hidden_ProcessingLevel v1.ProcessingLevel `protobuf:"varint,2,opt,name=processing_level,json=processingLevel,enum=datasets.v1.ProcessingLevel"` // more fields } ``` Notice that the fields are private (starting with a lowercase letter), so they are not accessible. Protobuf hides the fields and provides getters and setters to access them. ## Protobuf 101 ### Initializing a message Here is how to initialize a `v1.Sentinel1Sar` message. ```go Go import ( "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" ) datapoint := v1.Sentinel1Sar_builder{ Time: timestamppb.New(time.Now()), GranuleName: proto.String("S1A_EW_GRDH_1SSH_20141004T020507_20141004T020611_002673_002FAF_8645_COG.SAFE"), ProductType: proto.String("EW_GRDH_1S-COG"), FileSize: proto.Int64(488383473), }.Build() ``` Protobuf fields are private and provides a builder pattern to create a message. `proto.String` is a helper function that converts `string` to `*string`. This allows protobuf to differentiate between a field that is set to an empty string and a field that is not set (nil). An exhaustive list of those helper functions can be found [here](https://github.com/golang/protobuf/blob/master/proto/wrappers.go). Only primitives have a `proto.XXX` helper function. Complex types such as timestamps, durations, UUIDs, and geometries have a [constructor function](#constructors). ### Getters and setters Protobuf provides methods to get, set, clear and check if a field is set. ```go Go fmt.Println(datapoint.GetGranuleName()) datapoint.SetGranuleName("my amazing granule") datapoint.ClearGranuleName() if datapoint.HasGranuleName() { fmt.Println("Granule name is set") } ``` Getters for primitive types will return a Go native type (for example, int64, string, etc.). Getters for complex types such as timestamps, durations, UUIDs, and geometries can also be converted to more standard types using [AsXXX](#asxxx-methods) methods. ## Well known types Beside Go primitives, Tilebox supports some well known types: * Duration: A duration of time. See [Duration](https://protobuf.dev/reference/protobuf/google.protobuf/#duration) for more information. * Timestamp: A point in time. See [Timestamp](https://protobuf.dev/reference/protobuf/google.protobuf/#timestamp) for more information. * UUID: A [universally unique identifier (UUID)](https://en.wikipedia.org/wiki/Universally_unique_identifier). * Geometry: Geospatial geometries of type Point, LineString, Polygon or MultiPolygon. They have a couple of useful methods to work with them. ### Constructors ```go Go import ( "github.com/paulmach/orb" datasetsv1 "github.com/tilebox/tilebox-go/protogen/go/datasets/v1" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" ) timestamppb.New(time.Now()) durationpb.New(10 * time.Second) datasetsv1.NewUUID(uuid.New()) datasetsv1.NewGeometry(orb.Point{1, 2}) ``` ### `CheckValid` method `CheckValid` returns an error if the field is invalid. ```go Go err := datapoint.GetTime().CheckValid() if err != nil { fmt.Println(err) } ``` ### `IsValid` method `IsValid` reports whether the field is valid. It's equivalent to `CheckValid == nil`. ```go Go if datapoint.GetTime().IsValid() { fmt.Println("Valid") } ``` ### `AsXXX` methods `AsXXX` methods convert the field to a more user friendly type. * `AsUUID` will convert a `datasetsv1.UUID` field to a [uuid.UUID](https://pkg.go.dev/github.com/google/uuid#UUID) type * `AsTime` will convert a `timestamppb.Timestamp` field to a [time.Time](https://pkg.go.dev/time#Time) type * `AsDuration` will convert a `durationpb.Duration` field to a [time.Duration](https://pkg.go.dev/time#Duration) type * `AsGeometry` will convert a `datasetsv1.Geometry` field to an [orb.Geometry](https://github.com/paulmach/orb?tab=readme-ov-file#shared-geometry-interface) interface ```go Go datapoint.GetId().AsUUID() // uuid.UUID datapoint.GetTime().AsTime() // time.Time datapoint.GetDuration().AsDuration() // time.Duration datapoint.GetGeometry().AsGeometry() // orb.Geometry ``` Those methods performs conversion on a best-effort basis. Type validity must be checked beforehand using `IsValid` or `CheckValid` methods. ## Common data operations Datapoints are contained in a standard Go slice so all the usual [slice operations](https://gobyexample.com/slices) and [slice functions](https://pkg.go.dev/slices) can be used. The usual pattern to iterate over data in Go is by using a `for` loop. As an example, here is how to extract the `copernicus_id` fields from the datapoints. ```go Go // assuming datapoints has been filled using `client.Datapoints.QueryInto` method var datapoints []*v1.Sentinel1Sar copernicusIDs := make([]uuid.UUID, len(datapoints)) for i, dp := range datapoints { copernicusIDs[i] = dp.GetCopernicusId().AsUUID() } ``` Here is an example of filtering out datapoints that have been published before January 2000 and are not from the Sentinel-1C platform. ```go Go jan2000 := time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC) // slice of length of 0, but preallocate a capacity of len(datapoints) s1cDatapoints := make([]*v1.Sentinel1Sar, 0, len(datapoints)) for _, dp := range datapoints { if dp.GetPublished().AsTime().Before(jan2000) { continue } if dp.GetPlatform() != "S1C" { continue } s1cDatapoints = append(s1cDatapoints, proto.CloneOf(dp)) // Copy the underlying data } ``` ## Converting to JSON Protobuf messages can be converted to JSON without loss of information. This is useful for interoperability with other systems that doesn't use protobuf. A guide on protoJSON can be found format here: [https://protobuf.dev/programming-guides/json/](https://protobuf.dev/programming-guides/json/) ```go Go originalDatapoint := datapoints[0] // Convert proto.Message to JSON as bytes jsonDatapoint, err := protojson.Marshal(originalDatapoint) if err != nil { log.Fatalf("Failed to marshal datapoint: %v", err) } fmt.Println(string(jsonDatapoint)) ``` ```plaintext Output {"time":"2001-01-01T00:00:00Z","id":{"uuid":"AOPHpzQAAmV2MZ4+Zv+JGg=="},"ingestionTime":"2025-03-25T10:26:10.577385176Z","granuleName":"MCD12Q1.A2001001.h02v08.061.2022146033342.hdf","geometry":{"wkb":"AQMAAAABAAAABQAAAFIi9vf7TmTAXsX3////I0Bexff///9jwAAAAAAAAAAACUn4//+/YsAAAAAAAAAAAC7AdjgMCmPAXsX3////I0BSIvb3+05kwF7F9////yNA"},"endTime":"2001-12-31T23:59:59Z","horizontalTileNumber":"2","verticalTileNumber":"8","tileId":"51002008","fileSize":"176215","checksum":"771212892","checksumType":"CKSUM","dayNightFlag":"Day","publishedAt":"2022-06-23T10:58:13.895Z"} ``` It can also be converted back to a `proto.Message`. ```go Go // Convert JSON bytes to proto.Message unmarshalledDatapoint := &v1.Sentinel1Sar{} err = protojson.Unmarshal(jsonDatapoint, unmarshalledDatapoint) if err != nil { log.Fatalf("Failed to unmarshal datapoint: %v", err) } fmt.Println("Are both equal?", proto.Equal(unmarshalledDatapoint, originalDatapoint)) ``` ```plaintext Output Are both equal? true ``` # Tilebox languages and SDKs Source: https://docs.tilebox.com/sdks/introduction Tilebox supports multiple languages and SDKs for accessing datasets and running workflows. export const HeroCard = ({children, title, description, href}) => { return {children}

{title}

{description}

; }; The following language SDKs are currently available for Tilebox. Select one to learn more. Tilebox Python Tilebox Go # Async support Source: https://docs.tilebox.com/sdks/python/async Tilebox offers a standard synchronous API by default but also provides an async client option when needed. ## Why use async? When working with external datasets, such as [Tilebox datasets](/datasets/concepts/datasets), loading data may take some time. To speed up this process, you can run requests in parallel. While you can use multi-threading or multi-processing, which can be complex, often times a simpler option is to perform data loading tasks asynchronously using coroutines and `asyncio`. ## Switching to an async datasets client To switch to the async client, change the import statement for the `Client`. The example below illustrates this change. ```python Python (Sync) from tilebox.datasets import Client # This client is synchronous client = Client() ``` ```python Python (Async) from tilebox.datasets.aio import Client # This client is asynchronous client = Client() ``` After switching to the async client, use `await` for operations that interact with the Tilebox API. ```python Python (Sync) # Listing datasets datasets = client.datasets() # Listing collections dataset = datasets.open_data.copernicus.sentinel1_sar collections = dataset.collections() # Collection information collection = collections["S1A_IW_RAW__0S"] info = collection.info() print(f"Data for My-collection is available for {info.availability}") # Loading data data = collection.load(("2022-05-01", "2022-06-01"), show_progress=True) # Finding a specific datapoint datapoint_uuid = "01910b3c-8552-7671-3345-b902cc0813f3" datapoint = collection.find(datapoint_uuid) ``` ```python Python (Async) # Listing datasets datasets = await client.datasets() # Listing collections dataset = datasets.open_data.copernicus.sentinel1_sar collections = await dataset.collections() # Collection information collection = collections["S1A_IW_RAW__0S"] info = await collection.info() print(f"Data for My-collection is available for {info.availability}") # Loading data data = await collection.load(("2022-05-01", "2022-06-01"), show_progress=True) # Finding a specific datapoint datapoint_uuid = "01910b3c-8552-7671-3345-b902cc0813f3" datapoint = await collection.find(datapoint_uuid) ``` Jupyter notebooks and similar interactive environments support asynchronous code execution. You can use `await some_async_call()` as the output of a code cell. ## Fetching data concurrently The primary benefit of the async client is that it allows concurrent requests, enhancing performance. In below example, data is fetched from multiple collections. The synchronous approach retrieves data sequentially, while the async approach does so concurrently, resulting in faster execution. ```python Python (Sync) # Example: fetching data sequentially # switch to the async example to compare the differences import time from tilebox.datasets import Client from tilebox.datasets.sync.timeseries import TimeseriesCollection client = Client() datasets = client.datasets() collections = datasets.open_data.copernicus.landsat8_oli_tirs.collections() def stats_for_2020(collection: TimeseriesCollection) -> None: """Fetch data for 2020 and print the number of data points that were loaded.""" data = collection.load(("2020-01-01", "2021-01-01"), show_progress=True) n = data.sizes['time'] if 'time' in data else 0 return (collection.name, n) start = time.monotonic() results = [stats_for_2020(collections[name]) for name in collections] duration = time.monotonic() - start for collection_name, n in results: print(f"There are {n} datapoints in {collection_name} for 2020.") print(f"Fetching data took {duration:.2f} seconds") ``` ```python Python (Async) # Example: fetching data concurrently import asyncio import time from tilebox.datasets.aio import Client from tilebox.datasets.aio.timeseries import TimeseriesCollection client = Client() datasets = await client.datasets() collections = await datasets.open_data.copernicus.landsat8_oli_tirs.collections() async def stats_for_2020(collection: TimeseriesCollection) -> None: """Fetch data for 2020 and print the number of data points that were loaded.""" data = await collection.load(("2020-01-01", "2021-01-01"), show_progress=True) n = data.sizes['time'] if 'time' in data else 0 return (collection.name, n) start = time.monotonic() # Initiate all requests concurrently requests = [stats_for_2020(collections[name]) for name in collections] # Wait for all requests to finish in parallel results = await asyncio.gather(*requests) duration = time.monotonic() - start for collection_name, n in results: print(f"There are {n} datapoints in {collection_name} for 2020.") print(f"Fetching data took {duration:.2f} seconds") ``` The output demonstrates that the async approach runs approximately 30% faster for this example. With `show_progress` enabled, the progress bars update concurrently. ```plaintext Python (Sync) There are 19624 datapoints in L1GT for 2020. There are 1281 datapoints in L1T for 2020. There are 65313 datapoints in L1TP for 2020. There are 25375 datapoints in L2SP for 2020. Fetching data took 10.92 seconds ``` ```plaintext Python (Async) There are 19624 datapoints in L1GT for 2020. There are 1281 datapoints in L1T for 2020. There are 65313 datapoints in L1TP for 2020. There are 25375 datapoints in L2SP for 2020. Fetching data took 7.45 seconds ``` ## Async workflows The Tilebox workflows Python client does not have an async client. This is because workflows are designed for distributed and concurrent execution outside a single async event loop. But within a single task, you may use still use`async` code to take advantage of asynchronous execution, such as parallel data loading. You can achieve this by wrapping your async code in `asyncio.run`. Below is an example of using async code within a workflow task. ```python Python (Async) import asyncio import xarray as xr from tilebox.datasets.aio import Client as DatasetsClient from tilebox.datasets.data import TimeIntervalLike from tilebox.workflows import Task, ExecutionContext class FetchData(Task): def execute(self, context: ExecutionContext) -> None: # The task execution itself is synchronous # But we can leverage async code within the task using asyncio.run # This will fetch three months of data in parallel data_jan, data_feb, data_mar = asyncio.run(load_first_three_months()) async def load_data(interval: TimeIntervalLike): datasets = await DatasetsClient().datasets() collections = await datasets.open_data.copernicus.landsat8_oli_tirs.collections() return await collections["L1T"].load(interval) async def load_first_three_months() -> tuple[xr.Dataset, xr.Dataset, xr.Dataset]: jan = load_data(("2020-01-01", "2020-02-01")) feb = load_data(("2020-02-01", "2020-03-01")) mar = load_data(("2020-03-01", "2020-04-01")) # load the three months in parallel jan, feb, mar = await asyncio.gather(jan, feb, mar) return jan, feb, mar ``` If you encounter an error like `RuntimeError: asyncio.run() cannot be called from a running event loop`, it means you're trying to start another asyncio event loop (with `asyncio.run`) from within an existing one. This often happens in Jupyter notebooks since they automatically start an event loop. A way to resolve this is by using [nest-asyncio](https://pypi.org/project/nest-asyncio/). # Geometries Source: https://docs.tilebox.com/sdks/python/geometries How geometries are handled in the Tilebox Python client. Many datasets consist of granules that represent specific geographical areas on the Earth's surface. Often, a polygon defining the outline of these areas—a footprint—accompanies other granule metadata in time series datasets. Tilebox provides built-in support for working with geometries. Here's an example that loads some granules from the `ERS SAR` Opendata dataset, which contains geometries. ```python Loading ERS data from tilebox.datasets import Client client = Client() datasets = client.datasets() ers_collection = datasets.open_data.asf.ers_sar.collection("ERS-2") ers_data = ers_collection.load(("2008-02-10T21:00", "2008-02-10T22:00")) ``` ## Shapely In the `ers_data` dataset, each granule includes a `geometry` field that represents the footprint of each granule as a polygon. Tilebox automatically converts geometry fields to `Polygon` or `MultiPolygon` objects from the [Shapely](https://shapely.readthedocs.io/en/stable/manual.html) library. By integrating with Shapely, you can use the rich set of libraries and tools it provides. That includes support for computing polygon characteristics such as total area, intersection checks, and conversion to other formats. ```python Printing geometries geometries = ers_data.geometry.values print(geometries) ``` Each geometry is a [shapely.Geometry](https://shapely.readthedocs.io/en/stable/geometry.html#geometry). ```plaintext Output [ ] ``` Geometries are not always [Polygon](https://shapely.readthedocs.io/en/stable/reference/shapely.Polygon.html#shapely.Polygon) objects. More complex footprint geometries are represented as [MultiPolygon](https://shapely.readthedocs.io/en/stable/reference/shapely.MultiPolygon.html#shapely.MultiPolygon) objects. ### Accessing Coordinates You can select a polygon from the geometries and access the underlying coordinates and an automatically computed centroid point. ```python Accessing coordinates and computing a centroid point polygon = geometries[0] lon, lat = polygon.exterior.coords.xy center, = list(polygon.centroid.coords) print(lon) print(lat) print(center) ``` ```plaintext Output array('d', [-150.753244, -152.031574, -149.183655, -147.769339, -150.753244]) array('d', [74.250081, 73.336051, 73.001748, 73.899483, 74.250081]) (-149.92927907414239, 73.62538063474753) ``` Interactive environments such as [Jupyter Notebooks](/sdks/python/sample-notebooks) can visualize Polygon shapes graphically. Just type `polygon` in an empty cell and execute it for a visual representation of the polygon shape. ### Visualization on a Map To visualize polygons on a map, you can use [Folium](https://pypi.org/project/folium/). Below is a helper function that produces an OpenStreetMap with the Polygon overlaid. ```python visualize helper function # pip install folium from folium import Figure, Map, Polygon as FoliumPolygon, GeoJson, TileLayer from folium.plugins import MiniMap from shapely import Polygon, to_geojson from collections.abc import Iterable def visualize(poly: Polygon | Iterable[Polygon], zoom=4): """Visualize a polygon or a list of polygons on a map""" if not isinstance(poly, Iterable): poly = [poly] fig = Figure(width=600, height=600) map = Map(location=geometries[len(geometries)//2].centroid.coords[0][::-1], zoom_start=zoom, control_scale=True) map.add_child(MiniMap()) fig.add_child(map) for p in poly: map.add_child(GeoJson(to_geojson(p))) return fig ``` Here's how to use it. ```python Visualizing a polygon visualize(polygon) ``` Single visualization The `visualize` helper function supports a list of polygons, which can display the data layout of the ERS granules. ```python Visualizing multiple polygons visualize(geometries) ``` Granules visualization ## Format conversion Shapely supports converting Polygons to some common formats, such as [GeoJSON](https://geojson.org/) or [Well-Known Text (WKT)](https://docs.ogc.org/is/18-010r7/18-010r7.html). ```python Converting to GeoJSON from shapely import to_geojson print(to_geojson(polygon)) ``` ```plaintext Output {"type":"Polygon","coordinates":[[[-150.753244,74.250081],[-152.031574,73.336051],[-149.183655,73.001748],[-147.769339,73.899483],[-150.753244,74.250081]]]} ``` ```python Converting to WKT from shapely import to_wkt print(to_wkt(polygon)) ``` ```plaintext Output POLYGON ((-150.753244 74.250081, -152.031574 73.336051, -149.183655 73.001748, -147.769339 73.899483, -150.753244 74.250081)) ``` ## Checking intersections One common task when working with geometries is checking if a given geometry falls into a specific area of interest. Shapely provides an `intersects` method for this purpose. ```python Checking intersections from shapely import box # Box representing the rectangular area lon=(-160, -150) and lat=(69, 70) area_of_interest = box(-160, 69, -150, 70) for i, polygon in enumerate(geometries): if area_of_interest.intersects(polygon): print(f"{ers_data.granule_name[i].item()} intersects the area of interest!") else: print(f"{ers_data.granule_name[i].item()} doesn't intersect the area of interest!") ``` ```plaintext Output E2_66974_STD_F264 doesn't intersect the area of interest! E2_66974_STD_F265 doesn't intersect the area of interest! E2_66974_STD_F267 doesn't intersect the area of interest! E2_66974_STD_F269 doesn't intersect the area of interest! E2_66974_STD_F271 doesn't intersect the area of interest! E2_66974_STD_F273 intersects the area of interest! E2_66974_STD_F275 intersects the area of interest! E2_66974_STD_F277 intersects the area of interest! E2_66974_STD_F279 doesn't intersect the area of interest! E2_66974_STD_F281 doesn't intersect the area of interest! E2_66974_STD_F283 doesn't intersect the area of interest! E2_66974_STD_F285 doesn't intersect the area of interest! E2_66974_STD_F289 doesn't intersect the area of interest! ``` ## Combining polygons As shown in the visualization of the granule footprints, the granules collectively form an orbit from pole to pole. Measurements are often combined during processing. You can do the same with geometries by combining them into a single polygon, which represents the hull around all individual footprints using [shapely.unary\_union](https://shapely.readthedocs.io/en/stable/reference/shapely.unary_union.html). ```python Combining multiple polygons from shapely.ops import unary_union hull = unary_union(geometries) visualize(hull) ``` The computed hull consists of two polygons due to a gap (probably a missing granule) in the geometries. Such geometries are represented as [Multi Polygons](#multi-polygons). Unary union visualization ## Multi Polygons A collection of one or more non-overlapping polygons combined into a single geometry is called a [MultiPolygon](https://shapely.readthedocs.io/en/latest/reference/shapely.MultiPolygon.html). Footprint geometries can be of type `MultiPolygon` due to gaps or pole discontinuities. The computed hull in the previous example is a `MultiPolygon`. ```python Accessing individual polygons of a MultiPolygon print(f"The computed hull of type {type(hull).__name__} consists of {len(hull.geoms)} sub polygons") for i, poly in enumerate(hull.geoms): print(f"Sub polygon {i} has an area of {poly.area}") ``` ```plaintext Output The computed hull of type MultiPolygon consists of 2 sub polygons Sub polygon 0 has an area of 2.025230449898011 Sub polygon 1 has an area of 24.389998081651527 ``` ## Antimeridian Crossings A common issue with `longitude / latitude` geometries is crossings of the 180-degree meridian, or the antimeridian. For example, the coordinates of a `LineString` from Japan to the United States might look like this: `140, 141, 142, ..., 179, 180, -179, -178, ..., -125, -124` Libraries like Shapely are not designed to handle spherical coordinate systems, so caution is necessary with such geometries. Here's an `ERS` granule demonstrating this issue. ```python Antimeridian Crossing # A granule that crosses the antimeridian granule = ers_collection.find("0119bb86-0260-5819-6aab-f99796417155") polygon = granule.geometry.item() print(polygon.exterior.coords.xy) visualize(polygon) ``` ```plaintext Output array('d', [177.993407, 176.605009, 179.563047, -178.904076, 177.993407]) array('d', [74.983185, 74.074615, 73.727752, 74.61847, 74.983185]) ``` Antimeridian buggy visualization This 2D visualization appears incorrect. Both the visualization and any calculations performed may yield inaccurate results. For instance, testing whether the granule intersects the 0-degree meridian provides a false positive. ```python Problems with calculating intersections from shapely import LineString null_meridian = LineString([(0, -90), (0, 90)]) print(polygon.intersects(null_meridian)) # True - but this is incorrect! ``` The GeoJSON specification offers a solution for this problem. In the section [Antimeridian Cutting](https://datatracker.ietf.org/doc/html/rfc7946#section-3.1.9), it suggests always cutting lines and polygons into two parts—one for the eastern hemisphere and one for the western hemisphere. In python, this can be achieved using the [antimeridian](https://pypi.org/project/antimeridian/) package. ```python Cutting the polygon along the antimeridian # pip install antimeridian import antimeridian fixed_polygon = antimeridian.fix_polygon(polygon) visualize(fixed_polygon) print(fixed_polygon.intersects(null_meridian)) # False - this is correct now ``` Antimeridian fixed visualization Since Shapely is unaware of the spherical nature of this data, the **centroid** of the fixed polygon **is still incorrect**. The antimeridian package also includes a function to correct this. ```python Calculating the centroid of a cut polygon crossing the antimeridian print("Wrongly computed centroid coordinates (Shapely)") print(list(fixed_polygon.centroid.coords)) print("Correct centroid coordinates (Antimeridian taken into account)") print(list(antimeridian.centroid(fixed_polygon).coords)) ``` ```plaintext Output Wrongly computed centroid coordinates (shapely) [(139.8766350146937, 74.3747116658462)] Correct centroid coordinates (antimeridian taken into account) [(178.7782777050171, 74.3747116658462)] ``` ## Spherical Geometry Another approach to handle the antimeridian issue is performing all coordinate-related calculations, such as polygon intersections, in a [spherical coordinate system](https://en.wikipedia.org/wiki/Spherical_coordinate_system). One useful library for this is [spherical\_geometry](https://spherical-geometry.readthedocs.io/en/latest/). Here's an example. ```python Spherical Geometry # pip install spherical-geometry from spherical_geometry.polygon import SphericalPolygon from spherical_geometry.vector import lonlat_to_vector lon, lat = polygon.exterior.coords.xy spherical_poly = SphericalPolygon.from_lonlat(lon, lat) # Let's check the x, y, z coordinates of the spherical polygon: print(list(spherical_poly.points)) ``` ```plaintext Output [array([[-0.25894363, 0.00907234, 0.96584983], [-0.2651968 , -0.00507317, 0.96418096], [-0.28019363, 0.00213687, 0.95994112], [-0.27390375, 0.01624885, 0.96161984], [-0.25894363, 0.00907234, 0.96584983]])] ``` Now, you can compute intersections or check if a particular point is within the polygon. You can compare the incorrect calculation using `shapely` with the correct version when using `spherical_geometry`. ```python Correct calculations using spherical geometry # A point on the null-meridian, way off from our polygon null_meridian_point = 0, 74.4 # A point actually inside our polygon point_inside = 178.8, 74.4 print("Shapely results:") print("- Null meridian point inside:", polygon.contains(Point(*null_meridian_point))) print("- Actual inside point inside:", polygon.contains(Point(*point_inside))) print("Spherical geometry results:") print("- Null meridian point inside:", spherical_poly.contains_lonlat(*null_meridian_point)) print("- Actual inside point inside:", spherical_poly.contains_lonlat(*point_inside)) ``` ```plaintext Output Shapely results: - Null meridian point inside: True - Actual inside point inside: False Spherical geometry results: - Null meridian point inside: False - Actual inside point inside: True ``` # Installation Source: https://docs.tilebox.com/sdks/python/install Install the Tilebox Python Packages ## Package Overview Tilebox offers a Python SDK for accessing Tilebox services. The SDK includes separate packages that can be installed individually based on the services you wish to use, or all together for a comprehensive experience. Access Tilebox datasets from Python Workflow client and task runner for Tilebox ## Installation Install the Tilebox python packages using your preferred package manager. For new projects Tilebox recommend using [uv](https://docs.astral.sh/uv/). ```bash uv uv add tilebox-datasets tilebox-workflows tilebox-storage ``` ```bash pip pip install tilebox-datasets tilebox-workflows tilebox-storage ``` ```bash poetry poetry add tilebox-datasets="*" tilebox-workflows="*" tilebox-storage="*" ``` ```bash pipenv pipenv install tilebox-datasets tilebox-workflows tilebox-storage ``` ## Setting up a local JupyterLab environment To get started quickly, you can also use an existing Jupyter-compatible cloud environment such as [Google Colab](https://colab.research.google.com/). If you want to set up a local Jupyter environment to explore the SDK or to run the [Sample notebooks](/sdks/python/sample-notebooks) locally, install [JupyterLab](https://github.com/jupyterlab/jupyterlab) for a browser-based development environment. It's advised to install the Tilebox packages along with JupyterLab, [ipywidgets](https://ipywidgets.readthedocs.io/en/latest/user_install.html), and [tqdm](https://tqdm.github.io/) for an enhanced experience. ```uv uv mkdir tilebox-exploration cd tilebox-exploration uv init --no-package uv add tilebox-datasets tilebox-workflows tilebox-storage uv add jupyterlab ipywidgets tqdm uv run jupyter lab ``` ### Trying it out After installation, create a new notebook and paste the following code snippet to verify your installation. If you're new to Jupyter, you can refer to the guide on [interactive environments](/sdks/python/sample-notebooks#interactive-environments). ```python Python from tilebox.datasets import Client client = Client() datasets = client.datasets() collection = datasets.open_data.copernicus.landsat8_oli_tirs.collection("L1T") data = collection.load(("2015-01-01", "2020-01-01"), show_progress=True) data ``` If the installation is successful, the output should appear as follows. Local JupyterLab Local JupyterLab # Sample notebooks Source: https://docs.tilebox.com/sdks/python/sample-notebooks Sample code maintained to use and learn from. To quickly become familiar with the Python client, you can explore some sample notebooks. Each notebook can be executed standalone from top to bottom. ## Sample notebooks You can access the sample notebooks on [ Google Drive](https://drive.google.com/drive/folders/1I7G35LLeB2SmKQJFsyhpSVNyZK9uOeJt). Right click a notebook in Google Drive and select `Open with -> Google Colaboratory` to open it directly in the browser using [Google Colab](https://colab.research.google.com/). More examples can be found throughout the docs. ### Notebook overview {/* This notebook demonstrates how to use the Python client to query metadata from the ERS-SAR Opendata dataset. It shows how to filter results by geographical location and download product data for a specific granule. [ Open in Colab](https://colab.research.google.com/drive/1LTYhLKy8m9psMhu0DvANs7hyS3ViVpas) */} This notebook illustrates how to use the Python client to query the S5P Tropomi Opendata dataset for methane products. It also explains how to filter results based on geographical location and download product data for a specific granule. [ Open in Colab](https://colab.research.google.com/drive/1eVYARNFnTIeQqBs6gqeay01EDvRk2EI4) This notebook demonstrates how to ingest data into a Custom Dataset. In this case it's using a sample dataset from the [MODIS instrument](https://lpdaac.usgs.gov/products/mcd12q1v006/) which is already prepared. [ Open in Colab](https://colab.research.google.com/drive/1QS-srlWPMJg4csc0ycn36yCX9U6mvIpW) Created with Tilebox Workflows, this 10m resolution mosaic highlights distributed, auto-parallelizing capabilities. Data from `Copernicus Dataspace` was reprojected on `CloudFerro` (intermediate products on AWS S3), and the final composite was built locally using auto-parallelized team notebooks. [ Open the Mosaic](https://examples.tilebox.com/sentinel2_mosaic) [ Open in Github](https://github.com/tilebox/examples/tree/main/s2-cloudfree-mosaic) Execute cells one by one using `Shift+Enter`. Most commonly used libraries are pre-installed. All demo notebooks require Python 3.10 or higher. ## Interactive environments Jupyter, Google Colab, and JetBrains Datalore are interactive environments that simplify the development and sharing of algorithmic code. They allow users to work with notebooks, which combine code and rich text elements like figures, links, and equations. Notebooks require no setup and can be easily shared. [Jupyter notebooks](https://jupyter.org/) are the original interactive environment for Python. They are useful but require local installation. [Google Colab](https://colab.research.google.com/) is a free tool that provides a hosted interactive Python environment. It easily connects to local Jupyter instances and allows code sharing using Google credentials or within organizations using Google Workspace. [JetBrains Datalore](https://datalore.jetbrains.com/) is a free platform for collaborative testing, development, and sharing of Python code and algorithms. It has built-in secret management for storing credentials. Datalore also features advanced JetBrains syntax highlighting and autocompletion. Currently, it only supports Python 3.8, which is not compatible with the Tilebox Python client. Since Colab is a hosted free tool that meets all requirements, including Python ≄3.10, it's recommended for use. ## Installing packages Within your interactive environment, you can install missing packages using pip in "magic" cells, which start with an exclamation mark. ```bash # pip is already installed in your interactive environment !pip3 install .... ``` All APIs or commands that require authentication can be accessed through client libraries that hide tokens, allowing notebooks to be shared without exposing personal credentials. ## Executing code Execute code by clicking the play button in the top left corner of the cell or by pressing `Shift + Enter`. While the code is running, a spinning icon appears. When the execution finishes, the icon changes to a number, indicating the order of execution. The output displays below the code. ## Authorization When sharing notebooks, avoid directly sharing your Tilebox API key. Instead, use one of two methods to authenticate the Tilebox Python client in interactive environments: through environment variables or interactively. ```python Using environment variables to store your API key # Define an environment variable "TILEBOX_API_KEY" that contains your API key import os token = os.getenv("TILEBOX_API_KEY") ``` **Interactive** authorization is possible using the built-in `getpass` module. This prompts the user for the API key when running the code, storing it in memory without sharing it when the notebook is shared. ```python Interactively providing your API key from getpass import getpass token = getpass("API key:") ``` # Xarray Source: https://docs.tilebox.com/sdks/python/xarray Overview of the Xarray library, common use cases, and implementation details. [example_satellite_data.nc]: https://github.com/tilebox/docs/raw/main/assets/data/example_satellite_data.nc [Xarray](https://xarray.dev/) is a library designed for working with labeled multi-dimensional arrays. Built on top of [NumPy](https://numpy.org/) and [Pandas](https://pandas.pydata.org/), Xarray adds labels in the form of dimensions, coordinates, and attributes, enhancing the usability of raw NumPy-like arrays. This enables a more intuitive, concise, and less error-prone development experience. The library also includes a large and expanding collection of functions for advanced analytics and visualization. Overview of the Xarray data structure An overview of the Xarray library and its suitability for N-dimensional data (such as Tilebox time series datasets) is available in the official [Why Xarray? documentation page](https://xarray.pydata.org/en/stable/why-xarray.html). The Tilebox Python client provides access to satellite data as an [xarray.Dataset](https://docs.xarray.dev/en/stable/generated/xarray.Dataset.html#xarray.Dataset). This approach offers a great number of benefits over custom Tilebox-specific data structures: Xarray is based on NumPy and Pandas—two of the most widely used Python libraries for scientific computing. Familiarity with these libraries translates well to using Xarray. Leveraging NumPy, which is built on C and Fortran, Xarray benefits from extensive performance optimizations. This allows Xarray to efficiently handle large datasets. As a widely used library, Xarray easily integrates with many other libraries. Many third-party libraries are also available to expand Xarray's capabilities for diverse use cases. Xarray is versatile and supports a broad range of applications. It's also easy to extend with custom features. ## Example dataset To understand how Xarray functions, below is a quick a look at a sample dataset as it might be retrieved from a [Tilebox datasets](/datasets/concepts/datasets) client. ```python Python from tilebox.datasets import Client client = Client() datasets = client.datasets() collection = datasets.open_data.copernicus.landsat8_oli_tirs.collection("L1GT") satellite_data = collection.load(("2022-05-01", "2022-06-01"), show_progress=True) print(satellite_data) ``` ```plaintext Output Size: 305kB Dimensions: (time: 514, latlon: 2) Coordinates: ingestion_time (time) datetime64[ns] 4kB 2024-07-22T09:06:43.5586... id (time) This basic dataset illustrates common use cases for Xarray. To follow along, you can [download the dataset as a NetCDF file][example_satellite_data.nc]. The [Reading and writing files section](/sdks/python/xarray#reading-and-writing-files) explains how to save and load Xarray datasets from NetCDF files. Here's an overview of the output: * The `satellite_data` **dataset** contains **dimensions**, **coordinates**, and **variables**. * The `time` **dimension** has 514 elements, indicating that there are 514 data points in the dataset. * The `time` **dimension coordinate** contains datetime values representing when the data was measured. The `*` indicates a dimension coordinate, which enables label-based indexing and alignment. * The `ingestion_time` **non-dimension coordinate** holds datetime values for when the data was ingested into Tilebox. Non-dimension coordinates carry coordinate data but are not used for label-based indexing. They can even be [multidimensional](https://docs.xarray.dev/en/stable/examples/multidimensional-coords.html). * The dataset includes 28 **variables**. * The `bands` **variable** contains integers indicating how many bands the data contains. * The `sun_elevation` **variable** contains floating-point values representing the sun's elevation when the data was measured. Explore the [xarray terminology overview](https://docs.xarray.dev/en/stable/user-guide/terminology.html) to broaden your understanding of **datasets**, **dimensions**, **coordinates**, and **variables**. ## Accessing data in a dataset ### By index You can access data in different ways. The Xarray documentation offers a [comprehensive overview](https://docs.xarray.dev/en/stable/user-guide/indexing.html) of these methods. To access the `sun_elevation` variable: ```python Accessing values # Print the first sun elevation value print(satellite_data.sun_elevation[0]) ``` ```plaintext Output Size: 8B array(44.19904463) Coordinates: ingestion_time datetime64[ns] 8B 2024-07-22T09:06:43.558629 id Size: 665B Dimensions: (latlon: 2) Coordinates: ingestion_time datetime64[ns] 8B 2024-07-22T09:06:43.558629 id Size: 2kB Dimensions: (time: 3, latlon: 2) Coordinates: ingestion_time (time) datetime64[ns] 24B 2024-07-22T09:08:24.7395... id (time) Size: 216B array([63.89629314, 63.35038654, ..., 64.37400345, 64.37400345]) Coordinates: ingestion_time (time) datetime64[ns] 216B 2024-07-22T09:06:43.558629 ...... id (time) 45) & (satellite_data.sun_elevation < 90) ) filtered_sun_elevations = satellite_data.sun_elevation[data_filter] print(filtered_sun_elevations) ``` ```plaintext Output Size: 216B array([63.89629314, 63.35038654, ..., 64.37400345, 64.37400345]) Coordinates: ingestion_time (time) datetime64[ns] 216B 2024-07-22T09:06:43.558629 ...... id (time) Selecting data by value requires unique coordinate values. In case of duplicates, you will encounter an `InvalidIndexError`. To avoid this, you can [drop duplicates](#dropping-duplicates). ```python Indexing by time specific_datapoint = satellite_data.sel(time="2022-05-01T11:28:28.249000") print(specific_datapoint) ``` ```plaintext Output Size: 665B Dimensions: (latlon: 2) Coordinates: ingestion_time datetime64[ns] 8B 2024-07-22T09:06:43.558629 id >> raises KeyError: "2022-05-01T11:28:28.000000" ``` To return the closest value instead of raising an error, specify a `method`. ```python Finding the closest data point nearest_datapoint = satellite_data.sel(time="2022-05-01T11:28:28.000000", method="nearest") assert nearest_datapoint.equals(specific_datapoint) # passes ``` ## Dropping duplicates Xarray allows you to drop duplicate values from a dataset. For example, to drop duplicate timestamps: ```python Dropping duplicates deduped = satellite_data.drop_duplicates("time") ``` De-duped datasets are required for certain operations, like [selecting data by value](#selecting-data-by-value). ## Statistics Xarray and NumPy include a wide range of statistical functions that you can apply to a dataset or DataArray. Here are some examples: ```python Computing dataset statistics cloud_cover = satellite_data.cloud_cover min_meas = cloud_cover.min().item() max_meas = cloud_cover.max().item() mean_meas = cloud_cover.mean().item() std_meas = cloud_cover.std().item() print(f"Cloud cover ranges from {min_meas:.2f} to {max_meas:.2f} with a mean of {mean_meas:.2f} and a standard deviation of {std_meas:.2f}") ``` ```plaintext Output Cloud cover ranges from 0.00 to 100.00 with a mean of 76.48 and a standard deviation of 34.17 ``` You can also directly apply many NumPy functions to datasets or DataArrays. For example, to find out how many unique bands the data contains, use [np.unique](https://numpy.org/doc/stable/reference/generated/numpy.unique.html): ```python Finding unique values import numpy as np print("Sensors:", np.unique(satellite_data.bands)) ``` ```plaintext Output Sensors: [12] ``` ## Reading and writing files Xarray provides a simple method for saving and loading datasets from files. This is useful for sharing your data or storing it for future use. Xarray supports many different file formats, including NetCDF, Zarr, GRIB, and more. For a complete list of supported formats, refer to the [official documentation page](https://docs.xarray.dev/en/stable/user-guide/io.html). To save the example dataset as a NetCDF file: You may need to install the `netcdf4` package first. ```python Saving a dataset to a file satellite_data.to_netcdf("example_satellite_data.nc") ``` This creates a file named `example_satellite_data.nc` in your current directory. You can then load this file back into memory with: ```python Loading a dataset from a file import xarray as xr satellite_data = xr.open_dataset("example_satellite_data.nc") ``` If you would like to follow along with the examples in this section, you can download the example dataset as a NetCDF file [here][example_satellite_data.nc]. ## Further reading This section covers only a few common use cases for Xarray. The library offers many more functions and features. For more information, please see the [Xarray documentation](https://xarray.pydata.org/en/stable/) or explore the [Xarray Tutorials](https://tutorial.xarray.dev/intro.html). Some useful capabilities not covered in this section include: # Storage Clients Source: https://docs.tilebox.com/storage/clients Learn about the different storage clients available in Tilebox to access open data. Tilebox does not host the actual open data satellite products but instead relies on publicly accessible storage providers for data access. Tilebox ingests available metadata as [datasets](/datasets/concepts/datasets) to enable high performance querying and structured access of the data as [xarray.Dataset](/sdks/python/xarray). Below is a list of the storage providers currently supported by Tilebox. This feature is only available in the Python SDK. ## Copernicus Data Space The [Copernicus Data Space](https://dataspace.copernicus.eu/) is an open ecosystem that provides free instant access to data and services from the Copernicus Sentinel missions. Check out the [ASF Open Data datasets](/datasets/open-data#copernicus-data-space) that are available in Tilebox. ### Access Copernicus data To download data products from the Copernicus Data Space after querying them via the Tilebox API, you need to [create an account](https://identity.dataspace.copernicus.eu/auth/realms/CDSE/protocol/openid-connect/auth?client_id=cdse-public\&response_type=code\&scope=openid\&redirect_uri=https%3A//dataspace.copernicus.eu/account/confirmed/1) and then generate [S3 credentials here](https://eodata-s3keysmanager.dataspace.copernicus.eu/panel/s3-credentials). The following code snippet demonstrates how to query and download Copernicus data using the Tilebox Python SDK. ```python Python {4,9-13,27} from pathlib import Path from tilebox.datasets import Client from tilebox.storage import CopernicusStorageClient # Creating clients client = Client() datasets = client.datasets() storage_client = CopernicusStorageClient( access_key="YOUR_ACCESS_KEY", secret_access_key="YOUR_SECRET_ACCESS_KEY", cache_directory=Path("./data") ) # Choosing the dataset and collection s2_dataset = datasets.open_data.copernicus.sentinel2_msi collections = s2_dataset.collections() collection = collections["S2A_S2MSI2A"] # Loading metadata s2_data = collection.load(("2024-08-01", "2024-08-02"), show_progress=True) # Selecting a data point to download selected = s2_data.isel(time=0) # index 0 selected # Downloading the data downloaded_data = storage_client.download(selected) print(f"Downloaded granule: {downloaded_data.name} to {downloaded_data}") print("Contents: ") for content in downloaded_data.iterdir(): print(f" - {content.relative_to(downloaded_data)}") ``` ```plaintext Output Downloaded granule: S2A_MSIL2A_20240801T002611_N0511_R102_T58WET_20240819T170544.SAFE to data/Sentinel-2/MSI/L2A/2024/08/01/S2A_MSIL2A_20240801T002611_N0511_R102_T58WET_20240819T170544.SAFE Contents: - manifest.safe - GRANULE - INSPIRE.xml - MTD_MSIL2A.xml - DATASTRIP - HTML - rep_info - S2A_MSIL2A_20240801T002611_N0511_R102_T58WET_20240819T170544-ql.jpg ``` ### Partial product downloads For cases where only a subset of the available file objects for a product is needed, you may restrict your download to just that subset. First, list available objects using `list_objects`, filter them, and then download using `download_objects`. For example, a Sentinel-2 L2A product includes many files such as metadata, different bands in multiple resolutions, masks, and quicklook images. The following example shows how to download only specific files from a Sentinel-2 L2A product. ```python Python {4, 15} collection = datasets.open_data.copernicus.sentinel2_msi.collections()["S2A_S2MSI2A"] s2_data = collection.load(("2024-08-01", "2024-08-02"), show_progress=True) selected = s2_data.isel(time=0) # download the first granule in the given time range objects = storage_client.list_objects(selected) print(f"Granule {selected.granule_name.item()} consists of {len(objects)} individual objects.") # only select specific objects to download want_products = ["B02_10m", "B03_10m", "B08_10m"] objects = [obj for obj in objects if any(prod in obj for prod in want_products)] # remove all other objects print(f"Downloading {len(objects)} objects.") for obj in objects: print(f" - {obj}") # Finally, download the selected data downloaded_data = storage_client.download_objects(selected, objects) ``` ```plaintext Output Granule S2A_MSIL2A_20240801T002611_N0511_R102_T58WET_20240819T170544.SAFE consists of 95 individual objects. Downloading 3 objects. - GRANULE/L2A_T58WET_A047575_20240801T002608/IMG_DATA/R10m/T58WET_20240801T002611_B02_10m.jp2 - GRANULE/L2A_T58WET_A047575_20240801T002608/IMG_DATA/R10m/T58WET_20240801T002611_B03_10m.jp2 - GRANULE/L2A_T58WET_A047575_20240801T002608/IMG_DATA/R10m/T58WET_20240801T002611_B08_10m.jp2 ``` ## Alaska Satellite Facility (ASF) The [Alaska Satellite Facility (ASF)](https://asf.alaska.edu/) is a NASA-funded research center at the University of Alaska Fairbanks. Check out the [ASF Open Data datasets](/datasets/open-data#alaska-satellite-facility-asf) that are available in Tilebox. ### Accessing ASF Data You can query ASF metadata without needing an account, as Tilebox has indexed and ingested the relevant metadata. To access and download the actual satellite products, you will need an ASF account. You can create an ASF account in the [ASF Vertex Search Tool](https://search.asf.alaska.edu/). The following code snippet demonstrates how to query and download ASF data using the Tilebox Python SDK. ```python Python {4,9-13,27} from pathlib import Path from tilebox.datasets import Client from tilebox.storage import ASFStorageClient # Creating clients client = Client() datasets = client.datasets() storage_client = ASFStorageClient( user="YOUR_ASF_USER", password="YOUR_ASF_PASSWORD", cache_directory=Path("./data") ) # Choosing the dataset and collection ers_dataset = datasets.open_data.asf.ers_sar collections = ers_dataset.collections() collection = collections["ERS-2"] # Loading metadata ers_data = collection.load(("2009-01-01", "2009-01-02"), show_progress=True) # Selecting a data point to download selected = ers_data.isel(time=0) # index 0 selected # Downloading the data downloaded_data = storage_client.download(selected, extract=True) print(f"Downloaded granule: {downloaded_data.name} to {downloaded_data}") print("Contents: ") for content in downloaded_data.iterdir(): print(f" - {content.relative_to(downloaded_data)}") ``` ```plaintext Output Downloaded granule: E2_71629_STD_L0_F183 to data/ASF/E2_71629_STD_F183/E2_71629_STD_L0_F183 Contents: - E2_71629_STD_L0_F183.000.vol - E2_71629_STD_L0_F183.000.meta - E2_71629_STD_L0_F183.000.raw - E2_71629_STD_L0_F183.000.pi - E2_71629_STD_L0_F183.000.nul - E2_71629_STD_L0_F183.000.ldr ``` ### Further Reading ## Umbra Space [Umbra](https://umbra.space/) satellites provide high resolution Synthetic Aperture Radar (SAR) imagery from space. Check out the [Umbra datasets](/datasets/open-data#umbra-space) that are available in Tilebox. ### Accessing Umbra data No account is needed to access Umbra data. All data is under a Creative Commons License (CC BY 4.0), allowing you to use it freely. The following code snippet demonstrates how to query and download Umbra data using the Tilebox Python SDK. ```python Python {4,9,23} from pathlib import Path from tilebox.datasets import Client from tilebox.storage import UmbraStorageClient # Creating clients client = Client() datasets = client.datasets() storage_client = UmbraStorageClient(cache_directory=Path("./data")) # Choosing the dataset and collection umbra_dataset = datasets.open_data.umbra.sar collections = umbra_dataset.collections() collection = collections["SAR"] # Loading metadata umbra_data = collection.load(("2024-01-05", "2024-01-06"), show_progress=True) # Selecting a data point to download selected = umbra_data.isel(time=0) # index 0 selected # Downloading the data downloaded_data = storage_client.download(selected) print(f"Downloaded granule: {downloaded_data.name} to {downloaded_data}") print("Contents: ") for content in downloaded_data.iterdir(): print(f" - {content.relative_to(downloaded_data)}") ``` ```plaintext Output Downloaded granule: 2024-01-05-01-53-37_UMBRA-07 to data/Umbra/ad hoc/Yi_Sun_sin_Bridge_SK/6cf02931-ca2e-4744-b389-4844ddc701cd/2024-01-05-01-53-37_UMBRA-07 Contents: - 2024-01-05-01-53-37_UMBRA-07_SIDD.nitf - 2024-01-05-01-53-37_UMBRA-07_SICD.nitf - 2024-01-05-01-53-37_UMBRA-07_CSI-SIDD.nitf - 2024-01-05-01-53-37_UMBRA-07_METADATA.json - 2024-01-05-01-53-37_UMBRA-07_GEC.tif - 2024-01-05-01-53-37_UMBRA-07_CSI.tif ``` ### Partial product downloads For cases where only a subset of the available file objects for a given Umbra data point is necessary, you can limit your download to just that subset. First, list available objects using `list_objects`, filter the list, and then use `download_objects`. The below example shows how to download only the metadata file for a given data point. ```python Python {4, 15} collection = datasets.open_data.umbra.sar.collections()["SAR"] umbra_data = collection.load(("2024-01-05", "2024-01-06"), show_progress=True) # Selecting a data point to download selected = umbra_data.isel(time=0) # index 0 selected objects = storage_client.list_objects(selected) print(f"Data point {selected.granule_name.item()} consists of {len(objects)} individual objects.") # only select specific objects to download objects = [obj for obj in objects if "METADATA" in obj] # remove all other objects print(f"Downloading {len(objects)} object.") print(objects) # Finally, download the selected data downloaded_data = storage_client.download_objects(selected, objects) ``` ```plaintext Output Data point 2024-01-05-01-53-37_UMBRA-07 consists of 6 individual objects. Downloading 1 object. ['2024-01-05-01-53-37_UMBRA-07_METADATA.json'] ``` # Caches Source: https://docs.tilebox.com/workflows/caches Sharing data between tasks is crucial for workflows, especially in satellite imagery processing, where large datasets are the norm. Tilebox Workflows offers a straightforward API for storing and retrieving data from a shared cache. The cache API is currently experimental and may undergo changes in the future. Many more features and new [backends](#cache-backends) are on the roadmap. There might be breaking changes to the Cache API in the future. Caches are configured at the [task runner](/workflows/concepts/task-runners) level. Because task runners can be deployed across multiple locations, caches must be accessible from all task runners contributing to a workflow. Currently, the default cache implementation uses a Google Cloud Storage bucket, providing a scalable method to share data between tasks. For quick prototyping and local development, you can also use a local file system cache, which is included by default. If needed, you can create your own cache backend by implementing the `Cache` interface. ## Configuring a Cache You can configure a cache while creating a task runner by passing a cache instance to the `cache` parameter. To use an in-memory cache, use `tilebox.workflows.cache.InMemoryCache`. This implementation is helpful for local development and quick testing. For alternatives, see the supported [cache backends](#cache-backends). ```python Python from tilebox.workflows import Client from tilebox.workflows.cache import InMemoryCache client = Client() runner = client.runner( tasks=[...], cache=InMemoryCache(), ) ``` By configuring such a cache, the `context` object that is passed to the execution of each task gains access to a `job_cache` attribute that can be used to [store and retrieve data](#storing-and-retrieving-data) from the cache. ## Cache Backends Tilebox Workflows comes with four cache implementations out of the box, each backed by a different storage system. ### Google Storage Cache A cache implementation backed by a Google cloud Storage bucket to store and retrieve data. It's a reliable method for sharing data across tasks, suitable for production environments. You will need access to a GCP project and a bucket. The Tilebox Workflow orchestrator uses the official Python Client for Google Cloud Storage. To set up authentication, refer to the official documentation. ```python Python from tilebox.workflows import Client from tilebox.workflows.cache import GoogleStorageCache from google.cloud.storage import Client as StorageClient storage_client = StorageClient(project="gcp-project") bucket = storage_client.bucket("cache-bucket") client = Client() runner = client.runner( tasks=[...], cache=GoogleStorageCache(bucket, prefix="jobs"), ) ``` The `prefix` parameter is optional and can be used to set a common prefix for all cache keys, which helps organize objects within a bucket when re-using the same bucket for other purposes. ### Amazon S3 Cache A cache implementation backed by an Amazon S3 bucket to store and retrieve data. Like the Google Cloud Storage option, it's reliable and scalable for production use. Tilebox utilizes the `boto3` library to communicate with Amazon S3. For the necessary authentication setup, refer to [its documentation](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html#configuration). ```python Python from tilebox.workflows import Client from tilebox.workflows.cache import AmazonS3Cache client = Client() runner = client.runner( tasks=[...], cache=AmazonS3Cache("my-bucket-name", prefix="jobs") ) ``` The `prefix` parameter is optional and can be used to set a common prefix for all cache keys, which helps organize objects within a bucket when re-using the same bucket for other purposes. ### Local File System Cache A cache implementation backed by a local file system. It's suitable for quick prototyping and local development, assuming all task runners share the same machine or access the same file system. ```python Python from tilebox.workflows import Client from tilebox.workflows.cache import LocalFileSystemCache client = Client() runner = client.runner( tasks=[...], cache=LocalFileSystemCache("/path/to/cache/directory"), ) ``` Local file system caches can also be used in conjunction with network attached storage or similar tools, making it a viable option also for distributed setups. ### In-Memory Cache A simple in-memory cache useful for quick prototyping and development. The data is not shared between task runners and is lost upon task runner restarts. Use this cache only for workflows executed on a single task runner. ```python Python from tilebox.workflows import Client from tilebox.workflows.cache import InMemoryCache client = Client() runner = client.runner( tasks=[...], cache=InMemoryCache(), ) ``` ## Data Isolation Caches are isolated per job, meaning that each job's cache data is only accessible to tasks within that job. This setup prevents key conflicts between different job executions. Currently, accessing cache data of other jobs is not supported. The capability to share data across jobs is planned for future updates. This feature will be beneficial for real-time processing workflows or workflows requiring auxiliary data from external sources. ## Storing and Retrieving Data The job cache can be accessed via the `ExecutionContext` passed to a tasks `execute` function. This [`job_cache`](/api-reference/python/tilebox.workflows/ExecutionContext.job_cache) object provides methods to handle data storage and retrieval from the cache. The specifics of data storage depend on the chosen cache backend. The cache API is designed to be simple and can handle all types of data, supporting binary data in the form of `bytes`, identified by `str` cache keys. This allows for storing many different data types, such as pickled Python objects, serialized JSON, UTF-8, or binary data. The following snippet illustrates storing and retrieving data from the cache. ```python Python from tilebox.workflows import Task, ExecutionContext class ProducerTask(Task): def execute(self, context: ExecutionContext) -> None: # store data in the cache context.job_cache["data"] = b"my_binary_data_to_store" context.submit_subtask(ConsumerTask()) class ConsumerTask(Task): def execute(self, context: ExecutionContext) -> None: data = context.job_cache["data"] print(f"Read {data} from cache") ``` In this example, data stored under the key `"data"` can be any size that fits the cache backend constraints. Ensure the key remains unique within the job's scope to avoid conflicts. To test the workflow, you can start a local task runner using the `InMemoryCache` backend. Then, submit a job to execute the `ProducerTask` and observe the output of the `ConsumerTask`. ```python Python # submit a job to test our workflow job_client = client.jobs() job_client.submit("testing-cache-access", ProducerTask()) # start a runner to execute it runner = client.runner( tasks=[ProducerTask, ConsumerTask], cache=LocalFileSystemCache("/path/to/cache/directory"), ) runner.run_forever() ``` ```plaintext Output Read b'my_binary_data_to_store' from cache ``` ## Groups and Hierarchical Keys The cache API supports groups and hierarchical keys, analogous to directories and files in a file system. Groups help organize cache keys hierarchically, preventing key conflicts and allowing data to be structured better. Additionally, groups are iterable, enabling retrieval of all keys within the group. This feature is useful when multiple tasks create cache data, and a later task needs to list all produced data by earlier tasks. The following code shows an example of how cache groups can be used. ```python Python {39,44-45} from tilebox.workflows import Task, ExecutionContext import random class CacheGroupDemo(Task): n: int def execute(self, context: ExecutionContext): # define a cache group key for subtasks group_key = "random_numbers" produce_numbers = context.submit_subtask( ProduceRandomNumbers(self.n, group_key) ) sum_task = context.submit_subtask( PrintSum(group_key), depends_on=[produce_numbers], ) class ProduceRandomNumbers(Task): n: int group_key: str def execute(self, context: ExecutionContext): for i in range(self.n): context.submit_subtask(ProduceRandomNumber(i, self.group_key)) class ProduceRandomNumber(Task): index: int group_key: str def execute(self, context: ExecutionContext) -> None: number = random.randint(0, 100) group = context.job_cache.group(self.group_key) group[f"key_{self.index}"] = str(number).encode() class PrintSum(Task): group_key: str def execute(self, context: ExecutionContext) -> None: group = context.job_cache.group(self.group_key) # PrintSum doesn't know how many numbers were produced, # instead it iterates over all keys in the cache group numbers = [] for key in group: # iterate over all stored numbers number = group[key] # read data from cache numbers.append(int(number.decode())) # convert bytes back to int print(f"Sum of all numbers: {sum(numbers)}") ``` Submitting a job of the `CacheGroupDemo` and running it with a task runner can be done as follows: ```python Python # submit a job to test our workflow job_client = client.jobs() job_client.submit("cache-groups", CacheGroupDemo(5)) # start a runner to execute it runner = client.runner( tasks=[CacheGroupDemo, ProduceRandomNumbers, ProduceRandomNumber, PrintSum], cache=LocalFileSystemCache("/path/to/cache/directory"), ) runner.run_forever() ``` ```plaintext Output Sum of all numbers: 284 ``` # Clusters Source: https://docs.tilebox.com/workflows/concepts/clusters Clusters are a logical grouping for [task runners](/workflows/concepts/task-runners). Using clusters, you can scope certain tasks to a specific group of task runners. Tasks, which are always submitted to a specific cluster, are only executed on task runners assigned to the same cluster. ## Use Cases Use clusters to organize [task runners](/workflows/concepts/clusters) into logical groups, which can help with: * Targeting specific task runners for a particular job * Reserving a group of task runners for specific purposes, such as running certain types of batch jobs * Setting up different clusters for different environments (like development and production) Even when using different clusters, task runners within the same cluster may still have different capabilities, such as different registered tasks. If multiple task runners have the same set of registered tasks, you can assign them to different clusters to target specific task runners for a particular job. ### Adding Task Runners to a Cluster You can add task runners to a cluster by specifying the [cluster's slug](#cluster-slug) when [registering a task runner](/workflows/concepts/task-runners). Each task runner must always be assigned to a cluster. ## Default Cluster Each team has a default cluster that is automatically created for them. This cluster is used when no cluster is specified when [registering a task runner](/workflows/concepts/task-runners) or [submitting a job](/workflows/concepts/jobs). This is useful when you are just getting started and don't need to create any custom clusters yet. ## Managing Clusters Before registering a task runner or submitting a job, you must create a cluster. You can also list, fetch, and delete clusters as needed. The following sections explain how to do this. To manage clusters, first instantiate a cluster client using the `clusters` method in the workflows client. ```python Python from tilebox.workflows import Client client = Client() clusters = client.clusters() ``` ```go Go import "github.com/tilebox/tilebox-go/workflows/v1" client := workflows.NewClient() clusterClient := client.Clusters ``` ### Creating a Cluster To create a cluster, use the `create` method on the cluster client and provide a name for the cluster. ```python Python cluster = clusters.create("testing") print(cluster) ``` ```go Go cluster := client.Clusters.Create("testing") fmt.Println(cluster) ``` ```plaintext Python Cluster(slug='testing-CvufcSxcC9SKfe', display_name='testing') ``` ```go Go &{testing-CvufcSxcC9SKfe testing} ``` ### Cluster Slug Each cluster has a unique identifier, combining the cluster's name and an automatically generated identifier. Use this slug to reference the cluster for other operations, like submitting a job or subtasks. ### Listing Clusters To list all available clusters, use the `all` method: ```python Python all_clusters = clusters.all() print(all_clusters) ``` ```go Go clusters, err := client.Clusters.List(ctx) if err != nil { slog.Error("failed to list clusters", slog.Any("error", err)) return } for _, cluster := range clusters { fmt.Println(cluster) } ``` ```plaintext Python [Cluster(slug='testing-CvufcSxcC9SKfe', display_name='testing'), Cluster(slug='production-EifhUozDpwAJDL', display_name='Production')] ``` ```go Go &{testing-CvufcSxcC9SKfe testing} &{production-EifhUozDpwAJDL Production} ``` ### Fetching a Specific Cluster To fetch a specific cluster, use the `find` method and pass the cluster's slug: ```python Python cluster = clusters.find("testing-CvufcSxcC9SKfe") print(cluster) ``` ```go Go cluster, err := client.Clusters.Get(ctx, "testing-CvufcSxcC9SKfe") if err != nil { slog.Error("failed to get cluster", slog.Any("error", err)) return } fmt.Println(cluster) ``` ```plaintext Python Cluster(slug='testing-CvufcSxcC9SKfe', display_name='testing') ``` ```go Go &{testing-CvufcSxcC9SKfe testing} ``` ### Deleting a Cluster To delete a cluster, use the `delete` method and pass the cluster's slug: ```python Python clusters.delete("testing-CvufcSxcC9SKfe") ``` ```go Go err := client.Clusters.Delete(ctx, "testing-CvufcSxcC9SKfe") ``` ## Jobs Across Different Clusters When [submitting a job](/workflows/concepts/jobs), you need to specify which cluster the job's root task should be executed on. This allows you to direct the job to a specific set of task runners. By default, all sub-tasks within a job are also submitted to the same cluster, but this can be overridden to submit sub-tasks to different clusters if needed. See the example below for a job that spans across multiple clusters. ```python Python from tilebox.workflows import Task, ExecutionContext, Client class MultiCluster(Task): def execute(self, context: ExecutionContext) -> None: # this submits a task to the same cluster as the one currently executing this task same_cluster = context.submit_subtask(DummyTask()) other_cluster = context.submit_subtask( DummyTask(), # this task runs only on a task runner in the "other-cluster" cluster cluster="other-cluster-As3dcSb3D9SAdK", # dependencies can be specified across clusters depends_on=[same_cluster], ) class DummyTask(Task): def execute(self, context: ExecutionContext) -> None: pass # submit a job to the "testing" cluster client = Client() job_client = client.jobs() job = job_client.submit( "my-job", MultiCluster(), cluster="testing-CvufcSxcC9SKfe", ) ``` ```go Go package main import ( "context" "github.com/tilebox/tilebox-go/workflows/v1" "github.com/tilebox/tilebox-go/workflows/v1/subtask" ) type MultiCluster struct{} func (t *MultiCluster) Execute(ctx context.Context) error { // this submits a task to the same cluster as the one currently executing this task sameCluster, err := workflows.SubmitSubtask(ctx, &DummyTask{}) if err != nil { return err } otherCluster, err := workflows.SubmitSubtask( ctx, &DummyTask{}, // this task runs only on a task runner in the "other-cluster" cluster subtask.WithClusterSlug("other-cluster-As3dcSb3D9SAdK"), // dependencies can be specified across clusters subtask.WithDependencies(sameCluster), ) if err != nil { return err } _ = otherCluster return nil } type DummyTask struct{} func main() { ctx := context.Background() client := workflows.NewClient() // submit a job to the "testing" cluster _, _ = client.Jobs.Submit( ctx, "my-job", []workflows.Task{ &MultiCluster{}, }, job.WithClusterSlug("testing-CvufcSxcC9SKfe"), ) } ``` This workflow requires at least two task runners to complete. One must be in the "testing" cluster, and the other must be in the "other-cluster" cluster. If no task runners are available in the "other-cluster," the task submitted to that cluster will remain queued until a task runner is available. It won't execute on a task runner in the "testing" cluster, even if the task runner has the `DummyTask` registered. # Jobs Source: https://docs.tilebox.com/workflows/concepts/jobs A job is a specific execution of a workflow with designated input parameters. It consists of one or more tasks that can run in parallel or sequentially, based on their dependencies. Submitting a job involves creating a root task with specific input parameters, which may trigger the execution of other tasks within the same job. ## Submission To execute a [task](/workflows/concepts/tasks), it must be initialized with concrete inputs and submitted as a job. The task will then run within the context of the job, and if it generates sub-tasks, those will also execute as part of the same job. After submitting a job, the root task is scheduled for execution, and any [eligible task runner](/workflows/concepts/task-runners#task-selection) can pick it up and execute it. First, instantiate a job client by calling the `jobs` method on the workflow client. ```python Python from tilebox.workflows import Client client = Client() job_client = client.jobs() ``` ```go Go import "github.com/tilebox/tilebox-go/workflows/v1" client := workflows.NewClient() jobClient := client.Jobs ``` After obtaining a job client, submit a job using the [submit](/api-reference/python/tilebox.workflows/JobClient.submit) method. You need to provide a name for the job, an instance of the root [task](/workflows/concepts/tasks), and an optional [cluster](/workflows/concepts/clusters) to execute the root task on. ```python Python # import your own workflow from my_workflow import MyTask job = job_client.submit('my-job', MyTask("some", "parameters")) ``` ```go Go job, err := client.Jobs.Submit(ctx, "my-job", []workflows.Task{ &MyTask{ Some: "parameters", }, }, ) if err != nil { slog.Error("Failed to submit job", slog.Any("error", err)) return } ``` Once a job is submitted, it's immediately scheduled for execution. The root task will be picked up and executed as soon as an [eligible task runner](/workflows/concepts/task-runners#task-selection) is available. ## Retry Handling [Tasks support retry handling](/workflows/concepts/tasks#retry-handling) for failed executions. This applies to the root task of a job as well, where you can specify the number of retries using the `max_retries` argument of the `submit` method. ```python Python from my_workflow import MyFlakyTask job = job_client.submit('my-job', MyFlakyTask(), max_retries=5) ``` ```go Go myJob, err := client.Jobs.Submit(ctx, "my-job", []workflows.Task{ &MyFlakyTask{}, }, job.WithMaxRetries(5), ) ``` In this example, if `MyFlakyTask` fails, it will be retried up to five times before being marked as failed. ## Submitting to a specific cluster Jobs default to running on the [default cluster](/workflows/concepts/clusters#default-cluster). You can specify another cluster to run the root task on using the `cluster` argument of the `submit` method. ```python Python from my_workflow import MyFlakyTask job = job_client.submit('my-job', MyFlakyTask(), cluster="dev-cluster") ``` ```go Go myJob, err := client.Jobs.Submit(ctx, "my-job", []workflows.Task{ &MyFlakyTask{}, }, job.WithClusterSlug("dev-cluster"), ) ``` Only runners listening on the specified cluster can pick up the task. ## Querying jobs You can query jobs in a given time range using the `query` method on the job client. ```python Python jobs = job_client.query(("2025-01-01", "2025-02-01")) print(jobs) ``` ```go Go import ( "time" workflows "github.com/tilebox/tilebox-go/workflows/v1" "github.com/tilebox/tilebox-go/workflows/v1/job" "github.com/tilebox/tilebox-go/query" ) interval := query.NewTimeInterval( time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC), time.Date(2025, 2, 1, 0, 0, 0, 0, time.UTC), ) jobs, err := workflows.Collect(client.Jobs.Query(ctx, job.WithTemporalExtent(interval), )) if err != nil { slog.Error("Failed to query jobs", slog.Any("error", err)) return } for _, job := range jobs { fmt.Println(job) } ``` ## Retrieving a specific job When you submit a job, it's assigned a unique identifier that can be used to retrieve it later. You can use the `find` method on the job client to get a job by its ID. ```python Python job = job_client.submit('my-job', MyTask("some", "parameters")) print(job.id) # 018dd029-58ca-74e5-8b58-b4f99d610f9a # Later, in another process or machine, retrieve job info job = job_client.find("018dd029-58ca-74e5-8b58-b4f99d610f9a") ``` ```go Go myJob, err := client.Jobs.Submit(ctx, "my-job", []workflows.Task{ &helloworld.HelloTask{ Some: "parameters", }, }, ) if err != nil { slog.Error("Failed to submit job", slog.Any("error", err)) return } // 018dd029-58ca-74e5-8b58-b4f99d610f9a slog.Info("Job submitted", slog.String("job_id", myJob.ID.String())) // Later, in another process or machine, retrieve job info job, err := client.Jobs.Get(ctx, uuid.MustParse("018dd029-58ca-74e5-8b58-b4f99d610f9a")) ``` `find` is also a useful tool for fetching a jobs state after a while, to check if it's still running or has already completed. ## States A job can be in one of the following states: * `QUEUED`: the job is queued and waiting for execution * `STARTED`: at least one task of the job has been started * `COMPLETED`: all tasks of the job have been completed ```python Python from tilebox.workflows.data import JobState job = job_client.find("018dd029-58ca-74e5-8b58-b4f99d610f9a") print("Job is queued:", job.state == JobState.QUEUED) ``` ```go Go job, err := client.Jobs.Get(ctx, uuid.MustParse("018dd029-58ca-74e5-8b58-b4f99d610f9a")) fmt.Println("Job is queued:", job.State == workflows.JobQueued) ``` ```plaintext Output Job is queued: True ``` ## Visualization Visualizing the execution of a job can be helpful. The Tilebox workflow orchestrator tracks all tasks in a job, including [sub-tasks](/workflows/concepts/tasks#task-composition-and-subtasks) and [dependencies](/workflows/concepts/tasks#dependencies). This enables the visualization of the execution of a job as a graph diagram. `display` is designed for use in an [interactive environment](/sdks/python/sample-notebooks#interactive-environments) such as a Jupyter notebook. In non-interactive environments, use [visualize](/api-reference/python/tilebox.workflows/JobClient.visualize), which returns the rendered diagram as an SVG string. Visualization isn't supported in Go yet. ```python Python job = job_client.find("some-job-id") # or a recently submitted job # Then visualize it job_client.display(job) ``` The following diagram represents the job execution as a graph. Each task is shown as a node, with edges indicating sub-task relationships. The diagram also uses color coding to display the status of each task. Color coding of task states Color coding of task states The color codes for task states are: | Task State | Color | Description | | ---------- | -------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------- | | Queued | SalmonYellow | The task is queued and waiting for execution. | | Running | Blue | The task is currently being executed. | | Computed | Green | The task has successfully been computed. If a task is computed, and all it's sub-tasks are also computed, the task is considered completed. | | Failed | Red | The task has been executed but encountered an error. | Below is another visualization of a job currently being executed by multiple task runners. Job being executed by multiple runners Job being executed by multiple runners This visualization shows: * The root task, `MyTask`, has executed and spawned three sub-tasks. * At least three task runners are available, as three tasks currently are executed simultaneously. * The `SubTask` that is still executing has not generated any sub-tasks yet, as sub-tasks are queued for execution only after the parent task finishes and becomes computed. * The queued `DependentTask` requires the `LeafTask` to complete before it can be executed. Job visualizations are meant for development and debugging. They are not suitable for large jobs with hundreds of tasks, as the diagrams may become too complex. Currently, visualizations are limited to jobs with a maximum of 200 tasks. ### Customizing Task Display Names The text representing a task in the diagram defaults to a tasks class name. You can customize this by modifying the `display` field of the `current_task` object in the task's execution context. The maximum length for a display name is 1024 characters, with any overflow truncated. Line breaks using `\n` are supported as well. ```python Python from tilebox.workflows import Task, ExecutionContext class RootTask(Task): num_subtasks: int def execute(self, context: ExecutionContext): context.current_task.display = f"Root({self.num_subtasks})" for i in range(self.num_subtasks): context.submit_subtask(SubTask(i)) class SubTask(Task): index: int def execute(self, context: ExecutionContext): context.current_task.display = f"Leaf Nr. {self.index}" job = job_client.submit('custom-display-names', RootTask(3)) job_client.display(job) ``` ```go Go type RootTask struct { NumSubtasks int } func (t *RootTask) Execute(ctx context.Context) error { err := workflows.SetTaskDisplay(ctx, fmt.Sprintf("Root(%d)", t.NumSubtasks)) if err != nil { return fmt.Errorf("failed to set task display: %w", err) } for i := range t.NumSubtasks { _, err := workflows.SubmitSubtask(ctx, &SubTask{Index: i}) if err != nil { return fmt.Errorf("failed to submit subtask: %w", err) } } return nil } type SubTask struct { Index int } func (t *SubTask) Execute(ctx context.Context) error { err := workflows.SetTaskDisplay(ctx, fmt.Sprintf("Leaf Nr. %d", t.Index)) if err != nil { return fmt.Errorf("failed to set task display: %w", err) } return nil } // in main job, err := client.Jobs.Submit(ctx, "custom-display-names", []workflows.Task{&RootTask{ NumSubtasks: 3, }}, ) ``` Customize Tasks Display Names Customize Tasks Display Names ## Cancellation You can cancel a job at any time. When a job is canceled, no queued tasks will be picked up by task runners and executed even if task runners are idle. Tasks that are already being executed will finish their execution and not be interrupted. All sub-tasks spawned from such tasks after the cancellation will not be picked up by task runners. Use the `cancel` method on the job client to cancel a job. ```python Python job = job_client.submit('my-job', MyTask()) # After a short while, the job gets canceled job_client.cancel(job) ``` ```go Go job, err := client.Jobs.Submit(ctx, "my-job", []workflows.Task{&MyTask{}}, ) if err != nil { slog.Error("Failed to submit job", slog.Any("error", err)) return } // After a short while, the job gets canceled err = client.Jobs.Cancel(ctx, job.ID) ``` A canceled job can be resumed at any time by [retrying](#retries) it. If any task in a job fails, the job is automatically canceled to avoid executing irrelevant tasks. Future releases will allow configuring this behavior for each task to meet specific requirements. ## Retries If a task fails due to a bug or lack of resources, there is no need to resubmit the entire job. You can simply retry the job, and it will resume from the point of failure. This ensures that all the work that was already done up until the point of the failure isn't lost. Future releases may introduce automatic retries for certain failure conditions, which can be useful for handling temporary issues. Below is an example of a failing job due to a bug in the task's implementation. The following workflow processes a list of movie titles and queries the [OMDb API](http://www.omdbapi.com/) for each movie's release date. ```python Python from urllib.parse import urlencode import httpx from tilebox.workflows import Task, ExecutionContext class MoviesStats(Task): titles: list[str] def execute(self, context: ExecutionContext) -> None: for title in self.titles: context.submit_subtask(PrintMovieStats(title)) class PrintMovieStats(Task): title: str def execute(self, context: ExecutionContext) -> None: params = {"t": self.title, "apikey": ""} url = "http://www.omdbapi.com/?" + urlencode(params) response = httpx.get(url).json() # set the display name of the task to the title of the movie: context.current_task.display = response["Title"] print(f"{response['Title']} was released on {response['Released']}") ``` ```go Go package movie import ( "context" "encoding/json" "fmt" "io" "net/http" "net/url" "github.com/tilebox/tilebox-go/workflows/v1" ) type MoviesStats struct { Titles []string } func (t *MoviesStats) Execute(ctx context.Context) error { for _, title := range t.Titles { _, err := workflows.SubmitSubtask(ctx, &PrintMovieStats{Title: title}) if err != nil { return fmt.Errorf("failed to submit subtask: %w", err) } } return nil } type Movie struct { Title *string `json:"Title"` Released *string `json:"Released"` } type PrintMovieStats struct { Title string } func (t *PrintMovieStats) Execute(ctx context.Context) error { apiURL := fmt.Sprintf("http://www.omdbapi.com/?t=%s&apikey=%s", url.QueryEscape(t.Title), "") response, err := http.Get(apiURL) if err != nil { return fmt.Errorf("failed to fetch movie: %w", err) } defer response.Body.Close() body, err := io.ReadAll(response.Body) if err != nil { return fmt.Errorf("failed to read response: %w", err) } var movie Movie err = json.Unmarshal(body, &movie) if err != nil { return fmt.Errorf("failed to unmarshal response: %w", err) } // set the display name of the task to the title of the movie: err := workflows.SetTaskDisplay(ctx, *movie.Title) if err != nil { return fmt.Errorf("failed to set task display: %w", err) } fmt.Printf("%s was released on %s\n", *movie.Title, *movie.Released) return nil } ``` Submitting the workflow as a job reveals a bug in the `PrintMovieStats` task. ```python Python job = job_client.submit('movies-stats', MoviesStats([ "The Matrix", "Shrek 2", "Tilebox - The Movie", "The Avengers", ])) job_client.display(job) ``` ```go Go job, err := client.Jobs.Submit(ctx, "movies-stats", []workflows.Task{&MoviesStats{ Titles: []string{ "The Matrix", "Shrek 2", "Tilebox - The Movie", "The Avengers", }, }}, ) ``` Job that failed due to a bug Job that failed due to a bug One of the `PrintMovieStats` tasks fails with a `KeyError`. This error occurs when a movie title is not found by the [OMDb API](http://www.omdbapi.com/), leading to a response without the `Title` and `Released` fields. Console output from the task runners confirms this: ```plaintext Output The Matrix was released on 31 Mar 1999 Shrek 2 was released on 19 May 2004 ERROR: Task PrintMovieStats failed with exception: KeyError('Title') ``` The corrected version of `PrintMovieStats` is as follows: ```python Python class PrintMovieStats(Task): title: str def execute(self, context: ExecutionContext) -> None: params = {"t": self.title, "apikey": ""} url = "http://www.omdbapi.com/?" + urlencode(params) response = httpx.get(url).json() if "Title" in response and "Released" in response: context.current_task.display = response["Title"] print(f"{response['Title']} was released on {response['Released']}") else: context.current_task.display = f"NotFound: {self.title}" print(f"Could not find the release date for {self.title}") ``` ```go Go type PrintMovieStats struct { Title string } func (t *PrintMovieStats) Execute(ctx context.Context) error { url2 := fmt.Sprintf("http://www.omdbapi.com/?t=%s&apikey=%s", url.QueryEscape(t.Title), "") response, err := http.Get(url2) if err != nil { return fmt.Errorf("failed to fetch movie: %w", err) } defer response.Body.Close() body, err := io.ReadAll(response.Body) if err != nil { return fmt.Errorf("failed to read response: %w", err) } var movie Movie err = json.Unmarshal(body, &movie) if err != nil { return fmt.Errorf("failed to unmarshal response: %w", err) } if movie.Released != nil && movie.Title != nil { err := workflows.SetTaskDisplay(ctx, *movie.Title) if err != nil { return fmt.Errorf("failed to set task display: %w", err) } fmt.Printf("%s was released on %s\n", *movie.Title, *movie.Released) } else { err := workflows.SetTaskDisplay(ctx, fmt.Sprintf("NotFound: %s", t.Title)) if err != nil { return fmt.Errorf("failed to set task display: %w", err) } fmt.Printf("Could not find the release date for %s\n", t.Title) } return nil } ``` With this fix, and after redeploying the task runners with the updated `PrintMovieStats` implementation, you can retry the job: ```python Python job_client.retry(job) job_client.display(job) ``` ```go Go _, err := client.Jobs.Retry(ctx, job.ID) ``` Job retried successfully Job retried successfully Now the console output shows: ```plaintext Output Could not find the release date for Tilebox - The Movie The Avengers was released on 04 May 2012 ``` The output confirms that only two tasks were executed, resuming from the point of failure instead of re-executing all tasks. The job was retried and succeeded. The two tasks that completed before the failure were not re-executed. # Task Runners Source: https://docs.tilebox.com/workflows/concepts/task-runners Task runners are the execution agents within the Tilebox Workflows ecosystem that execute tasks. They can be deployed in different computing environments, including on-premise servers and cloud-based auto-scaling clusters. Task runners execute tasks as scheduled by the workflow orchestrator, ensuring they have the necessary resources and environment for effective execution. ## Implementing a Task Runner A task runner is a continuously running process that listens for new tasks to execute. You can start multiple task runner processes to execute tasks concurrently. When a task runner receives a task, it executes it and reports the results back to the workflow orchestrator. The task runner also handles any errors that occur during task execution, reporting them to the orchestrator as well. To execute a task, at least one task runner must be running and available. If no task runners are available, tasks will remain queued until one becomes available. To create and start a task runner, follow these steps: Instantiate a client connected to the Tilebox Workflows API. Select or create a [cluster](/workflows/concepts/clusters) and specify its slug when creating a task runner. If no cluster is specified, the task runner will use the default cluster. Register tasks by specifying the task classes that the task runner can execute as a list to the `runner` method. Call the `run_forever` method of the task runner to listen for new tasks until the task runner process is shut down. Here is a simple example demonstrating these steps: ```python Python from tilebox.workflows import Client # your own workflow: from my_workflow import MyTask, OtherTask def main(): client = Client() # 1. connect to the Tilebox Workflows API runner = client.runner( cluster= "dev-cluster" # 2. select a cluster to join (optional, omit to use the default cluster) tasks=[MyTask, OtherTask] # 3. register tasks ) runner.run_forever() # 4. listen for new tasks to execute if __name__ == "__main__": main() ``` ```go Go package main import ( "context" "log/slog" "github.com/tilebox/tilebox-go/workflows/v1" "github.com/tilebox/tilebox-go/workflows/v1/runner" // your own workflow: "github.com/my_org/myworkflow" ) func main() { ctx := context.Background() // 1. connect to the Tilebox Workflows API client := workflows.NewClient() // 2. select a cluster to join (optional, omit to use the default cluster) runner, err := client.NewTaskRunner(ctx, runner.WithClusterSlug("dev-cluster")) if err != nil { slog.Error("failed to create task runner", slog.Any("error", err)) return } // 3. register tasks err = runner.RegisterTasks( &myworkflow.MyTask{}, &myworkflow.OtherTask{}, ) if err != nil { slog.Error("failed to register task", slog.Any("error", err)) return } // 4. listen for new tasks to execute runner.Run(ctx) } ``` To start the task runner locally, run it as a script: ```bash Python > python task_runner.py ``` ```bash Go > go run . ``` ## Task Selection For a task runner to pick up a submitted task, the following conditions must be met: 1. The [cluster](/workflows/concepts/clusters) where the task was submitted must match the task runner's cluster. 2. The task runner must have a registered task that matches the [task identifier](/workflows/concepts/tasks#task-identifiers) of the submitted task. 3. The version of the task runner's registered task must be [compatible](/workflows/concepts/tasks#semantic-versioning) with the submitted task's version. If a task meets these conditions, the task runner executes it. Otherwise, the task runner remains idle until a matching task is available. Often, multiple submitted tasks match the conditions for execution. In that case, the task runner selects one of the tasks to execute, and the remaining tasks stay in a queue until the selected task is completed or another task runner becomes available. ## Parallelism You can start multiple task runner instances in parallel to execute tasks concurrently. Each task runner listens for new tasks and executes them as they become available. This allows for high parallelism and can be used to scale the execution of tasks to handle large workloads. To test this, run multiple instances of the task runner script in different terminal windows on your local machine, or use a tool like [call-in-parallel](https://github.com/tilebox/call-in-parallel) to start the task runner script multiple times. For example, to start five task runners in parallel, use the following command: ```bash > call-in-parallel -n 5 -- python task_runner.py ``` ## Deploying Task Runners Task runners are continuously running processes that can be deployed in different computing environments. The only requirement for deploying task runners is access to the Tilebox Workflows API. Once this is met, task runners can be deployed in many different environments, including: * On-premise servers * Cloud-based virtual machines * Cloud-based auto-scaling clusters ## Scaling One key benefit of task runners is their **ability to scale even while workflows are executing**. You can start new task runners at any time, and they can immediately pick up queued tasks to execute. It's not necessary to have an entire processing cluster available at the start of a workflow, as task runners can be started and stopped as needed. This is particularly beneficial in cloud environments, where task runners can be automatically started and stopped based on current workload, measured by metrics such as CPU usage. Here's an example scenario: 1. A single instance of a task runner is actively waiting for work in a cloud environment. 2. A large workload is submitted to the workflow orchestrator, prompting the task runner to pick up the first task. 3. The first task creates new sub-tasks for processing, which the task runner also picks up. 4. As the workload increases, the task runner's CPU usage rises, triggering the cloud environment to automatically start new task runner instances. 5. Newly started task runners begin executing queued tasks, distributing the workload among all available task runners. 6. Once the workload decreases, the cloud environment automatically stops some task runners. 7. The first task runner completes the remaining work until everything is done. 8. The first task runner remains idle until new tasks arrive. CPU usage-based auto scaling is just one method to scale task runners. Other metrics, such as memory usage or network bandwidth, are also supported by many cloud environments. In a future release, configuration options for scaling task runners based on custom metrics (for example the number of queued tasks) are planned. ## Distributed Execution Task runners can be distributed across different compute environments. For instance, some data stored on-premise may need pre-processing, while further processing occurs in the cloud. A job might involve tasks that filter relevant on-premise data and publish it to the cloud, and other tasks that read data from the cloud and process it. In such scenarios, a task runners can run on-premise and another in a cloud environments, resulting in them effectively collaborating on the same job. Another advantage of distributed task runners is executing workflows that require specific hardware for certain tasks. For example, one task might need a GPU, while another requires extensive memory. Here's an example of a distributed workflow: ```python Python from tilebox.workflows import Task, ExecutionContext class DistributedWorkflow(Task): def execute(self, context: ExecutionContext) -> None: download_task = context.submit_subtask(DownloadData()) process_task = context.submit_subtask( ProcessData(), depends_on=[download_task], ) class DownloadData(Task): """ Download a dataset and store it in a shared internal bucket. Requires a good network connection for high download bandwidth. """ def execute(self, context: ExecutionContext) -> None: pass class ProcessData(Task): """ Perform compute-intensive processing of a dataset. The dataset must be available in an internal bucket. Requires access to a GPU for optimal performance. """ def execute(self, context: ExecutionContext) -> None: pass ``` ```go Go package distributed import ( "context" "fmt" "github.com/tilebox/tilebox-go/workflows/v1" "github.com/tilebox/tilebox-go/workflows/v1/subtask" ) type DistributedWorkflow struct{} func (t *DistributedWorkflow) Execute(ctx context.Context) error { downloadTask, err := workflows.SubmitSubtask(ctx, &DownloadData{}) if err != nil { return fmt.Errorf("failed to submit download subtask: %w", err) } _, err = workflows.SubmitSubtask(ctx, &ProcessData{}, subtask.WithDependencies(downloadTask)) if err != nil { return fmt.Errorf("failed to submit process subtask: %w", err) } return nil } // DownloadData Download a dataset and store it in a shared internal bucket. // Requires a good network connection for high download bandwidth. type DownloadData struct{} func (t *DownloadData) Execute(ctx context.Context) error { return nil } // ProcessData Perform compute-intensive processing of a dataset. // The dataset must be available in an internal bucket. // Requires access to a GPU for optimal performance. type ProcessData struct{} func (t *ProcessData) Execute(ctx context.Context) error { return nil } ``` To achieve distributed execution for this workflow, no single task runner capable of executing all three of the tasks is set up. Instead, two task runners, each capable of executing one of the tasks are set up: one in a high-speed network environment and the other with GPU access. When the distributed workflow runs, the first task runner picks up the `DownloadData` task, while the second picks up the `ProcessData` task. The `DistributedWorkflow` does not require specific hardware, so it can be registered with both runners and executed by either one. ```python Python from tilebox.workflows import Client client = Client() high_network_speed_runner = client.runner( tasks=[DownloadData, DistributedWorkflow] ) high_network_speed_runner.run_forever() ``` ```go Go package main import ( "context" "log/slog" "github.com/tilebox/tilebox-go/workflows/v1" ) func main() { ctx := context.Background() client := workflows.NewClient() highNetworkSpeedRunner, err := client.NewTaskRunner(ctx) if err != nil { slog.Error("failed to create task runner", slog.Any("error", err)) return } err = highNetworkSpeedRunner.RegisterTasks( &DownloadData{}, &DistributedWorkflow{}, ) if err != nil { slog.Error("failed to register tasks", slog.Any("error", err)) return } highNetworkSpeedRunner.RunForever(ctx) } ``` ```python Python from tilebox.workflows import Client client = Client() gpu_runner = client.runner( tasks=[ProcessData, DistributedWorkflow] ) gpu_runner.run_forever() ``` ```go Go package main import ( "context" "log/slog" "github.com/tilebox/tilebox-go/workflows/v1" ) func main() { ctx := context.Background() client := workflows.NewClient() gpuRunner, err := client.NewTaskRunner(ctx) if err != nil { slog.Error("failed to create task runner", slog.Any("error", err)) return } err = gpuRunner.RegisterTasks( &ProcessData{}, &DistributedWorkflow{}, ) if err != nil { slog.Error("failed to register tasks", slog.Any("error", err)) return } gpuRunner.RunForever(ctx) } ``` Now, both `download_task_runner.py` and `gpu_task_runner.py` are started, in parallel, on different machines with the required hardware for each. When `DistributedWorkflow` is submitted, it executes on one of the two runners, and it's submitted sub-tasks are handled by the appropriate runner. In this case, since `ProcessData` depends on `DownloadData`, the GPU task runner remains idle until the download completion, then picks up the processing task. You can also differentiate between task runners by specifying different [clusters](/workflows/concepts/clusters) and choosing specific clusters for sub-task submissions. For more details, see the [Clusters](/workflows/concepts/clusters) section. ## Task Failures If an unhandled exception occurs during task execution, the task runner captures it and reports it back to the workflow orchestrator. The orchestrator then marks the task as failed, leading to [job cancellation](/workflows/concepts/jobs#cancellation) to prevent further tasks of the same job-that may not be relevant anymore-from being executed. A task failure does not result in losing all previous work done by the job. If the failure is fixable—by fixing a bug in a task implementation, ensuring the task has necessary resources, or simply retrying it due to a flaky network connection—it may be worth [retrying](/workflows/concepts/jobs#retries) the job. When retrying a job, all failed tasks are added back to the queue, allowing a task runner to potentially execute them. If execution then succeeds, the job continues smoothly. Otherwise, the task will remain marked as failed and can be retried again if desired. If fixing a failure requires modifying the task implementation, it's important to deploy the updated version to the [task runners](/workflows/concepts/task-runners) before retrying the job. Otherwise, a task runner could pick up the original, faulty implementation again, leading to another failure. ## Task idempotency Since a task may be retried, it's possible that a task is executed more than once. Depending on where in the execution of the task it failed, it may have already performed some side effects, such as writing to a database, or sending a message to a queue. Because of that it's crucial to ensure that tasks are [idempotent](https://en.wikipedia.org/wiki/Idempotence). Idempotent tasks can be executed multiple times without altering the outcome beyond the first successful execution. A special case of idempotency involves submitting sub-tasks. After a task calls `context.submit_subtask` and then fails and is retried, those submitted sub-tasks of an earlier failed execution are automatically removed, ensuring that they can be safely submitted again when the task is retried. ## Runner Crashes Tilebox Workflows has an internal mechanism to handle unexpected task runner crashes. When a task runner picks up a task, it periodically sends a heartbeat to the workflow orchestrator. If the orchestrator does not receive this heartbeat for a defined duration, it marks the task as failed and automatically attempts to [retry](/workflows/concepts/jobs#retries) it up to 10 times. This allows another task runner to pick up the task and continue executing the job. This mechanism ensures that scenarios such as power outages, hardware failures, or dropped network connections are handled effectively, preventing any task from remaining in a running state indefinitely. ## Observability Task runners are continuously running processes, making it essential to monitor their health and performance. You can achieve observability by collecting and analyzing logs, metrics, and traces from task runners. Tilebox Workflows provides tools to enable this data collection and analysis. To learn how to configure task runners for observability, head over to the [Observability](/workflows/observability) section. # Understanding and Creating Tasks Source: https://docs.tilebox.com/workflows/concepts/tasks A Task is the smallest unit of work, designed to perform a specific operation. Each task represents a distinct operation or process that can be executed, such as processing data, performing calculations, or managing resources. Tasks can operate independently or as components of a more complex set of connected tasks known as a Workflow. Tasks are defined by their code, inputs, and dependencies on other tasks. To create tasks, you need to define the input parameters and specify the action to be performed during execution. ## Creating a Task To create a task in Tilebox, define a class that extends the `Task` base class and implements the `execute` method. The `execute` method is the entry point for the task where its logic is defined. It's called when the task is executed. ```python Python from tilebox.workflows import Task, ExecutionContext class MyFirstTask(Task): def execute(self, context: ExecutionContext): print("Hello World!") ``` ```go Go type MyFirstTask struct{} func (t *MyFirstTask) Execute(ctx context.Context) error { slog.Info("Hello World!") return nil } ``` This example demonstrates a simple task that prints "Hello World!" to the console. For python, the key components of this task are: `MyFirstTask` is a subclass of the `Task` class, which serves as the base class for all defined tasks. It provides the essential structure for a task. Inheriting from `Task` automatically makes the class a `dataclass`, which is useful [for specifying inputs](#input-parameters). Additionally, by inheriting from `Task`, the task is automatically assigned an [identifier based on the class name](#task-identifiers). The `execute` method is the entry point for executing the task. This is where the task's logic is defined. It's invoked by a [task runner](/workflows/concepts/task-runners) when the task runs and performs the task's operation. The `context` argument is an `ExecutionContext` instance that provides access to an [API for submitting new tasks](/api-reference/python/tilebox.workflows/ExecutionContext.submit_subtask) as part of the same job and features like [shared caching](/api-reference/python/tilebox.workflows/ExecutionContext.job_cache). For Go, the key components are: `MyFirstTask` is a struct that implements the `Task` interface. It represents the task to be executed. The `Execute` method is the entry point for executing the task. This is where the task's logic is defined. It's invoked by a [task runner](/workflows/concepts/task-runners) when the task runs and performs the task's operation. The code samples on this page do not illustrate how to execute the task. That will be covered in the [next section on task runners](/workflows/concepts/task-runners). The reason for that is that executing tasks is a separate concern from implementing tasks. ## Input Parameters Tasks often require input parameters to operate. These inputs can range from simple values to complex data structures. By inheriting from the `Task` class, the task is treated as a Python `dataclass`, allowing input parameters to be defined as class attributes. Tasks must be **serializable to JSON or to protobuf** because they may be distributed across a cluster of [task runners](/workflows/concepts/task-runners). In Go, task parameters must be exported fields of the task struct (starting with an uppercase letter), otherwise they will not be serialized to JSON. Supported types for input parameters include: * Basic types such as `str`, `int`, `float`, `bool` * Lists and dictionaries of basic types * Nested data classes that are also JSON-serializable or protobuf-serializable ```python Python class ParametrizableTask(Task): message: str number: int data: dict[str, str] def execute(self, context: ExecutionContext): print(self.message * self.number) task = ParametrizableTask("Hello", 3, {"key": "value"}) ``` ```go Go type ParametrizableTask struct { Message string Number int Data map[string]string } func (t *ParametrizableTask) Execute(context.Context) error { slog.Info(strings.Repeat(t.Message, t.Number)) return nil } task := &ParametrizableTask{ message: "Hello", number: 3, data: map[string]string{"key": "value"}, } ``` ## Task Composition and subtasks Until now, tasks have performed only a single operation. But tasks can be more powerful. **Tasks can submit other tasks as subtasks.** This allows for a modular workflow design, breaking down complex operations into simpler, manageable parts. Additionally, the execution of subtasks is automatically parallelized whenever possible. ```python Python class ParentTask(Task): num_subtasks: int def execute(self, context: ExecutionContext) -> None: for i in range(self.num_subtasks): context.submit_subtask(ChildTask(i)) class ChildTask(Task): index: int def execute(self, context: ExecutionContext) -> None: print(f"Executing ChildTask {self.index}") # after submitting this task, a task runner may pick it up and execute it # which will result in 5 ChildTasks being submitted and executed as well task = ParentTask(5) ``` ```go Go type ParentTask struct { NumSubtasks int } func (t *ParentTask) Execute(ctx context.Context) error { for i := range t.NumSubtasks { _, err := workflows.SubmitSubtask(ctx, &ChildTask{Index: i}) if err != nil { return err } } return nil } type ChildTask struct { Index int } func (t *ChildTask) Execute(context.Context) error { slog.Info("Executing ChildTask", slog.Int("index", t.Index)) return nil } // after submitting this task, a task runner may pick it up and execute it // which will result in 5 ChildTasks being submitted and executed as well task := &ParentTask{numSubtasks: 5} ``` In this example, a `ParentTask` submits `ChildTask` tasks as subtasks. The number of subtasks to be submitted is based on the `num_subtasks` attribute of the `ParentTask`. The `submit_subtask` method takes an instance of a task as its argument, meaning the task to be submitted must be instantiated with concrete parameters first. Parent task do not have access to results of subtasks, instead, tasks can use [shared caching](/workflows/caches#storing-and-retrieving-data) to share data between tasks. By submitting a task as a subtask, its execution is scheduled as part of the same job as the parent task. Compared to just directly invoking the subtask's `execute` method, this allows the subtask's execution to occur on a different machine or in parallel with other subtasks. To learn more about how tasks are executed, see the section on [task runners](/workflows/concepts/task-runners). ### Larger subtasks example A practical workflow example showcasing task composition might help illustrate the capabilities of tasks. Below is an example of a set of tasks forming a workflow capable of downloading a set number of random dog images from the internet. The [Dog API](https://thedogapi.com/) can be used to get the image URLs, and then download them. Implementing this using Task Composition could look like this: ```python Python import httpx # pip install httpx from pathlib import Path class DownloadRandomDogImages(Task): num_images: int def execute(self, context: ExecutionContext) -> None: url = f"https://api.thedogapi.com/v1/images/search?limit={self.num_images}" response = httpx.get(url) for dog_image in response.json(): context.submit_subtask(DownloadImage(dog_image["url"])) class DownloadImage(Task): url: str def execute(self, context: ExecutionContext) -> None: file = Path("dogs") / self.url.split("/")[-1] response = httpx.get(self.url) with file.open("wb") as file: file.write(response.content) ``` ```go Go package dogs import ( "context" "encoding/json" "fmt" "io" "net/http" "os" "strings" "github.com/tilebox/tilebox-go/workflows/v1" ) type DogImage struct { ID string `json:"id"` URL string `json:"url"` Width *int `json:"width"` Height *int `json:"height"` } type DownloadRandomDogImages struct { NumImages int } func (t *DownloadRandomDogImages) Execute(ctx context.Context) error { url := fmt.Sprintf("https://api.thedogapi.com/v1/images/search?limit=%d", t.NumImages) response, err := http.Get(url) if err != nil { return fmt.Errorf("failed to download images: %w", err) } defer response.Body.Close() body, err := io.ReadAll(response.Body) if err != nil { return fmt.Errorf("failed to read response: %w", err) } var dogImages []DogImage err = json.Unmarshal(body, &dogImages) if err != nil { return err } for _, dogImage := range dogImages { _, err := workflows.SubmitSubtask(ctx, &DownloadImage{URL: dogImage.URL}) if err != nil { return err } } return nil } type DownloadImage struct { URL string } func (t *DownloadImage) Execute(context.Context) error { response, err := http.Get(t.URL) if err != nil { return fmt.Errorf("failed to download image: %w", err) } defer response.Body.Close() body, err := io.ReadAll(response.Body) if err != nil { return fmt.Errorf("failed to read response: %w", err) } err = os.MkdirAll("dogs", 0o755) if err != nil { return fmt.Errorf("failed to create dogs directory: %w", err) } elements := strings.Split(t.URL, "/") file := fmt.Sprintf("dogs/%s", elements[len(elements)-1]) return os.WriteFile(file, body, 0o600) } ``` This example consists of the following tasks: `DownloadRandomDogImages` fetches a specific number of random dog image URLs from an API. It then submits a `DownloadImage` task for each received image URL. `DownloadImage` downloads an image from a specified URL and saves it to a file. Together, these tasks create a workflow that downloads random dog images from the internet. The relationship between the two tasks and their formation as a workflow becomes clear when `DownloadRandomDogImages` submits `DownloadImage` tasks as subtasks. Visualizing the execution of such a workflow is akin to a tree structure where the `DownloadRandomDogImages` task is the root, and the `DownloadImage` tasks are the leaves. For instance, when downloading five random dog images, the following tasks are executed. ```python Python from tilebox.workflows import Client client = Client() jobs = client.jobs() job = jobs.submit( "download-dog-images", DownloadRandomDogImages(5), ) # now our deployed task runners will pick up the task and execute it jobs.display(job) ``` ```go Go ctx := context.Background() client := workflows.NewClient() job, err := client.Jobs.Submit(ctx, "download-dog-images", []workflows.Task{ &helloworld.DownloadRandomDogImages{ NumImages: 5, }, }, ) if err != nil { slog.Error("Failed to submit job", slog.Any("error", err)) return } // now our deployed task runners will pick up the task and execute it ``` Download Dog Images Workflow Download Dog Images Workflow In total, six tasks are executed: the `DownloadRandomDogImages` task and five `DownloadImage` tasks. The `DownloadImage` tasks can execute in parallel, as they are independent. If more than one task runner is available, the Tilebox Workflow Orchestrator **automatically parallelizes** the execution of these tasks. Check out [job\_client.display](/workflows/concepts/jobs#visualization) to learn how this visualization was automatically generated from the task executions. Currently, a limit of `64` subtasks per task is in place to discourage creating workflows where individual parent tasks submit a large number of subtasks, which can lead to performance issues since those parent tasks are not parallelized. If you need to submit more than `64` subtasks, consider using [recursive subtask submission](#recursive-subtasks) instead. ## Recursive subtasks Tasks can submit other tasks as subtasks, allowing for complex workflows. Sometimes the input to a task is a list, with elements that can be **mapped** to individual subtasks, whose outputs are then aggregated in a **reduce** step. This pattern is commonly known as **MapReduce**. Often times the initial map step—submitting the individual subtasks—might already be an expensive operation. Since this is executed within a single task, it's not parallelizable, which can bottleneck the entire workflow. Fortunately, Tilebox Workflows offers a solution through **recursive subtask submission**. A task can submit instances of itself as subtasks, enabling a recursive breakdown into smaller tasks. For example, the `RecursiveTask` below is a valid task that submits smaller instances of itself as subtasks. ```python Python class RecursiveTask(Task): num: int def execute(self, context: ExecutionContext) -> None: print(f"Executing RecursiveTask with num={self.num}") if self.num >= 2: context.submit_subtask(RecursiveTask(self.num // 2)) ``` ```go Go type RecursiveTask struct { Num int } func (t *RecursiveTask) Execute(ctx context.Context) error { slog.Info("Executing RecursiveTask", slog.Int("num", t.Num)) if t.Num >= 2 { _, err := workflows.SubmitSubtask(ctx, &RecursiveTask{Num: t.Num / 2}) if err != nil { return err } } return nil } ``` ### Recursive subtask example An example for this is the [random dog images workflow](#larger-subtasks-example) mentioned earlier. In the previous implementation, downloading images was already parallelized. But the initial orchestration of the individual download tasks was not parallelized, because `DownloadRandomDogImages` was responsible for fetching all random dog image URLs and only submitted the individual download tasks once all URLs were retrieved. For a large number of images this setup can bottleneck the entire workflow. To improve this, recursive subtask submission decomposes a `DownloadRandomDogImages` task with a high number of images into two smaller `DownloadRandomDogImages` tasks, each fetching half. This process can be repeated until a specified threshold is met, at which point the Dog API can be queried directly for image URLs. That way, image downloads start as soon as the first URLs are retrieved, without initial waiting. An implementation of this recursive submission may look like this: ```python Python class DownloadRandomDogImages(Task): num_images: int def execute(self, context: ExecutionContext) -> None: if self.num_images > 4: half = self.num_images // 2 remaining = self.num_images - half # account for odd numbers context.submit_subtask(DownloadRandomDogImages(half)) context.submit_subtask(DownloadRandomDogImages(remaining)) else: url = f"https://api.thedogapi.com/v1/images/search?limit={self.num_images}" response = httpx.get(url) for dog_image in response.json()[:self.num_images]: context.submit_subtask(DownloadImage(dog_image["url"])) ``` ```go Go type DownloadRandomDogImages struct { NumImages int } func (t *DownloadRandomDogImages) Execute(ctx context.Context) error { if t.NumImages > 4 { half := t.NumImages / 2 remaining := t.NumImages - half // account for odd numbers _, err := workflows.SubmitSubtask(ctx, &DownloadRandomDogImages{NumImages: half}) if err != nil { return err } _, err = workflows.SubmitSubtask(ctx, &DownloadRandomDogImages{NumImages: remaining}) if err != nil { return err } } else { url := fmt.Sprintf("https://api.thedogapi.com/v1/images/search?limit=%d", t.NumImages) response, err := http.Get(url) if err != nil { return fmt.Errorf("failed to download images: %w", err) } defer response.Body.Close() body, err := io.ReadAll(response.Body) if err != nil { return fmt.Errorf("failed to read response: %w", err) } var dogImages []DogImage err = json.Unmarshal(body, &dogImages) if err != nil { return err } for _, dogImage := range dogImages { _, err := workflows.SubmitSubtask(ctx, &DownloadImage{URL: dogImage.URL}) if err != nil { return err } } } return nil } ``` With this implementation, downloading a large number of images (for example, 9) results in the following tasks being executed: Download Dog Images Workflow implemented recursively Download Dog Images Workflow implemented recursively ## Retry Handling By default, when a task fails to execute, it's marked as failed. In some cases, it may be useful to retry the task multiple times before marking it as a failure. This is particularly useful for tasks dependent on external services that might be temporarily unavailable. Tilebox Workflows allows you to specify the number of retries for a task using the `max_retries` argument of the `submit_subtask` method. Check out the example below to see how this might look like in practice. A failed task may be picked up by any available runner and not necessarily the same one that it failed on. ```python Python import random class RootTask(Task): def execute(self, context: ExecutionContext) -> None: context.submit_subtask(FlakyTask(), max_retries=5) class FlakyTask(Task): def execute(self, context: ExecutionContext) -> None: print(f"Executing FlakyTask") if random.random() < 0.1: raise Exception("FlakyTask failed randomly") ``` ```go Go package flaky import ( "context" "errors" "log/slog" "math/rand/v2" "github.com/tilebox/tilebox-go/workflows/v1" "github.com/tilebox/tilebox-go/workflows/v1/subtask" ) type RootTask struct{} func (t *RootTask) Execute(ctx context.Context) error { _, err := workflows.SubmitSubtask(ctx, &FlakyTask{}, subtask.WithMaxRetries(5), ) return err } type FlakyTask struct{} func (t *FlakyTask) Execute(context.Context) error { slog.Info("Executing FlakyTask") if rand.Float64() < 0.1 { return errors.New("FlakyTask failed randomly") } return nil } ``` ## Dependencies Tasks often rely on other tasks. For example, a task that processes data might depend on a task that fetches that data. **Tasks can express their dependencies on other tasks** by using the `depends_on` argument of the [`submit_subtask`](/api-reference/python/tilebox.workflows/ExecutionContext.submit_subtask) method. This means that a dependent task will only execute after the task it relies on has successfully completed. The `depends_on` argument accepts a list of tasks, enabling a task to depend on multiple other tasks. A workflow with dependencies might look like this: ```python Python class RootTask(Task): def execute(self, context: ExecutionContext) -> None: first_task = context.submit_subtask( PrintTask("Executing first") ) second_task = context.submit_subtask( PrintTask("Executing second"), depends_on=[first_task], ) third_task = context.submit_subtask( PrintTask("Executing last"), depends_on=[second_task], ) class PrintTask(Task): message: str def execute(self, context: ExecutionContext) -> None: print(self.message) ``` ```go Go type RootTask struct{} func (t *RootTask) Execute(ctx context.Context) error { firstTask, err := workflows.SubmitSubtask( ctx, &PrintTask{Message: "Executing first"}, ) if err != nil { return err } secondTask, err := workflows.SubmitSubtask( ctx, &PrintTask{Message: "Executing second"}, subtask.WithDependencies(firstTask), ) if err != nil { return err } _, err = workflows.SubmitSubtask( ctx, &PrintTask{Message: "Executing last"}, subtask.WithDependencies(secondTask), ) if err != nil { return err } return nil } type PrintTask struct { Message string } func (t *PrintTask) Execute(context.Context) error { slog.Info("PrintTask", slog.String("message", t.Message)) return nil } ``` The `RootTask` submits three `PrintTask` tasks as subtasks. These tasks depend on each other, meaning the second task executes only after the first task has successfully completed, and the third only executes after the second completes. The tasks are executed sequentially. If a task upon which another task depends submits subtasks, those subtasks must also execute before the dependent task begins execution. ### Dependencies Example A practical example is a workflow that fetches news articles from an API and processes them using the [News API](https://newsapi.org/). ```python Python from pathlib import Path import json from collections import Counter import httpx # pip install httpx class NewsWorkflow(Task): category: str max_articles: int def execute(self, context: ExecutionContext) -> None: fetch_task = context.submit_subtask(FetchNews(self.category, self.max_articles)) context.submit_subtask(PrintHeadlines(), depends_on=[fetch_task]) context.submit_subtask(MostFrequentAuthors(), depends_on=[fetch_task]) class FetchNews(Task): category: str max_articles: int def execute(self, context: ExecutionContext) -> None: url = f"https://newsapi.org/v2/top-headlines?category={self.category}&pageSize={self.max_articles}&country=us&apiKey=API_KEY" news = httpx.get(url).json() # check out our documentation page on caches to learn # about a better way of passing data between tasks Path("news.json").write_text(json.dumps(news)) class PrintHeadlines(Task): def execute(self, context: ExecutionContext) -> None: news = json.loads(Path("news.json").read_text()) for article in news["articles"]: print(f"{article['publishedAt'][:10]}: {article['title']}") class MostFrequentAuthors(Task): def execute(self, context: ExecutionContext) -> None: news = json.loads(Path("news.json").read_text()) authors = [article["author"] for article in news["articles"]] for author, count in Counter(authors).most_common(): print(f"Author {author} has written {count} articles") # now submit a job, and then visualize it job = job_client.submit("process-news", NewsWorkflow(category="science", max_articles=5), ) ``` ```go Go package news import ( "context" "encoding/json" "fmt" "io" "log/slog" "net/http" "os" "time" "github.com/tilebox/tilebox-go/workflows/v1" "github.com/tilebox/tilebox-go/workflows/v1/subtask" ) const newsAPIKey = "YOUR_API_KEY" type NewsWorkflow struct { Category string MaxArticles int } func (t *NewsWorkflow) Execute(ctx context.Context) error { fetchTask, err := workflows.SubmitSubtask(ctx, &FetchNews{ Category: t.Category, MaxArticles: t.MaxArticles, }) if err != nil { return err } _, err = workflows.SubmitSubtask(ctx, &PrintHeadlines{}, subtask.WithDependencies(fetchTask)) if err != nil { return err } _, err = workflows.SubmitSubtask(ctx, &MostFrequentAuthors{}, subtask.WithDependencies(fetchTask)) if err != nil { return err } return nil } type News struct { Status string `json:"status"` TotalResults int `json:"totalResults"` Articles []struct { Source struct { ID *string `json:"id"` Name string `json:"name"` } `json:"source"` Author *string `json:"author"` Title string `json:"title"` Description *string `json:"description"` URL string `json:"url"` URLToImage *string `json:"urlToImage"` PublishedAt time.Time `json:"publishedAt"` Content *string `json:"content"` } `json:"articles"` } type FetchNews struct { Category string MaxArticles int } func (t *FetchNews) Execute(context.Context) error { url := fmt.Sprintf("https://newsapi.org/v2/top-headlines?category=%s&pageSize=%d&country=us&apiKey=%s", t.Category, t.MaxArticles, newsAPIKey) response, err := http.Get(url) if err != nil { return fmt.Errorf("failed to download news: %w", err) } defer response.Body.Close() body, err := io.ReadAll(response.Body) if err != nil { return fmt.Errorf("failed to read response: %w", err) } // check out our documentation page on caches to learn // about a better way of passing data between tasks return os.WriteFile("news.json", body, 0o600) } type PrintHeadlines struct{} func (t *PrintHeadlines) Execute(context.Context) error { newsBytes, err := os.ReadFile("news.json") if err != nil { return fmt.Errorf("failed to read news: %w", err) } var news News err = json.Unmarshal(newsBytes, &news) if err != nil { return fmt.Errorf("failed to unmarshal news: %w", err) } for _, article := range news.Articles { slog.Info("Article", slog.Time("published_at", article.PublishedAt), slog.String("title", article.Title)) } return nil } type MostFrequentAuthors struct{} func (t *MostFrequentAuthors) Execute(context.Context) error { newsBytes, err := os.ReadFile("news.json") if err != nil { return fmt.Errorf("failed to read news: %w", err) } var news News err = json.Unmarshal(newsBytes, &news) if err != nil { return fmt.Errorf("failed to unmarshal news: %w", err) } authors := make(map[string]int) for _, article := range news.Articles { if article.Author == nil { continue } authors[*article.Author]++ } for author, count := range authors { slog.Info("Author", slog.String("author", author), slog.Int("count", count)) } return nil } // in main now submit a job, and then visualize it /* job, err := client.Jobs.Submit(ctx, "process-news", []workflows.Task{ &NewsWorkflow{ Category: "science", MaxArticles: 5, }, }, ) */ ``` ```plaintext Output 2024-02-15: NASA selects ultraviolet astronomy mission but delays its launch two years - SpaceNews 2024-02-15: SpaceX launches Space Force mission from Cape Canaveral - Orlando Sentinel 2024-02-14: Saturn's largest moon most likely uninhabitable - Phys.org 2024-02-14: AI Unveils Mysteries of Unknown Proteins' Functions - Neuroscience News 2024-02-14: Anthropologists' research unveils early stone plaza in the Andes - Phys.org Author Jeff Foust has written 1 articles Author Richard Tribou has written 1 articles Author Jeff Renaud has written 1 articles Author Neuroscience News has written 1 articles Author Science X has written 1 articles ``` Process News Workflow Process News Workflow This workflow consists of four tasks: | Task | Dependencies | Description | | ------------------- | ------------ | ----------------------------------------------------------------------------------------------------------------------- | | NewsWorkflow | - | The root task of the workflow. It spawns the other tasks and sets up the dependencies between them. | | FetchNews | - | A task that fetches news articles from the API and writes the results to a file, which is then read by dependent tasks. | | PrintHeadlines | FetchNews | A task that prints the headlines of the news articles to the console. | | MostFrequentAuthors | FetchNews | A task that counts the number of articles each author has written and prints the result to the console. | An important aspect is that there is no dependency between the `PrintHeadlines` and `MostFrequentAuthors` tasks. This means they can execute in parallel, which the Tilebox Workflow Orchestrator will do, provided multiple task runners are available. In this example, the results from `FetchNews` are stored in a file. This is not the recommended method for passing data between tasks. When executing on a distributed cluster, the existence of a file written by a dependent task cannot be guaranteed. Instead, it's better to use a [shared cache](/workflows/caches). ## Task Identifiers A task identifier is a unique string used by the Tilebox Workflow Orchestrator to identify the task. It's used by [task runners](/workflows/concepts/task-runners) to map submitted tasks to a task class and execute them. It also serves as the default name in execution visualizations as a tree of tasks. If unspecified, the identifier of a task defaults to the class name. For instance, the identifier of the `PrintHeadlines` task in the previous example is `"PrintHeadlines"`. This is good for prototyping, but not recommended for production, as changing the class name also changes the identifier, which can lead to issues during refactoring. It also prevents different tasks from sharing the same class name. To address this, Tilebox Workflows offers a way to explicitly specify the identifier of a task. This is done by overriding the `identifier` method of the `Task` class. This method should return a unique string identifying the task. This decouples the task's identifier from its class name, allowing you to change the identifier without renaming the class. It also allows tasks with the same class name to have different identifiers. The `identifier` method can also specify a version number for the task—see the section on [semantic versioning](#semantic-versioning) below for more details. ```python Python class MyTask(Task): def execute(self, context: ExecutionContext) -> None: pass # MyTask has the identifier "MyTask" and the default version of "v0.0" class MyTask2(Task): @staticmethod def identifier() -> tuple[str, str]: return "tilebox.com/example_workflow/MyTask", "v1.0" def execute(self, context: ExecutionContext) -> None: pass # MyTask2 has the identifier "tilebox.com/example_workflow/MyTask" and the version "v1.0" ``` ```go Go type MyTask struct{} func (t *MyTask) Execute(context.Context) error { return nil } // MyTask has the identifier "MyTask" and the default version of "v0.0" type MyTask2 struct{} func (t *MyTask2) Identifier() workflows.TaskIdentifier { return workflows.NewTaskIdentifier("tilebox.com/example_workflow/MyTask", "v1.0") } func (t *MyTask2) Execute(context.Context) error { return nil } // MyTask2 has the identifier "tilebox.com/example_workflow/MyTask" and the version "v1.0" ``` In python, the `identifier` method must be defined as either a `classmethod` or a `staticmethod`, meaning it can be called without instantiating the class. ## Semantic Versioning As seen in the previous section, the `identifier` method can return a tuple of two strings, where the first string is the identifier and the second string is the version number. This allows for semantic versioning of tasks. Versioning is important for managing changes to a task's execution method. It allows for new features, bug fixes, and changes while ensuring existing workflows operate as expected. Additionally, it enables multiple versions of a task to coexist, enabling gradual rollout of changes without interrupting production deployments. You assign a version number by overriding the `identifier` method of the task class. It must return a tuple of two strings: the first is the [identifier](#task-identifiers) and the second is the version number, which must match the pattern `vX.Y` (where `X` and `Y` are non-negative integers). `X` is the major version number and `Y` is the minor version. For example, this task has the identifier `"tilebox.com/example_workflow/MyTask"` and the version `"v1.3"`: ```python Python class MyTask(Task): @staticmethod def identifier() -> tuple[str, str]: return "tilebox.com/example_workflow/MyTask", "v1.3" def execute(self, context: ExecutionContext) -> None: pass ``` ```go Go type MyTask struct{} func (t *MyTask) Identifier() workflows.TaskIdentifier { return workflows.NewTaskIdentifier("tilebox.com/example_workflow/MyTask", "v1.3") } func (t *MyTask) Execute(context.Context) error { return nil } ``` When a task is submitted as part of a job, the version from which it's submitted is recorded and may differ from the version on the task runner executing the task. When task runners execute a task, they require a registered task with a matching identifier and compatible version number. A compatible version is where the major version number on the task runner matches that of the submitted task, and the minor version number on the task runner is equal to or greater than that of the submitted task. Examples of compatible version numbers include: * `MyTask` is submitted as part of a job. The version is `"v1.3"`. * A task runner with version `"v1.3"` of `MyTask` would executes this task. * A task runner with version `"v1.5"` of `MyTask` would also executes this task. * A task runner with version `"v1.2"` of `MyTask` would not execute this task, as its minor version is lower than that of the submitted task. * A task runner with version `"v2.5"` of `MyTask` would not execute this task, as its major version differs from that of the submitted task. ## States A task can be in one of the following states: * `QUEUED`: the task is queued and waiting to be run * `RUNNING`: the task is currently running on some task runner * `COMPUTED`: the task has been computed and the output is available. Once in this state, the task will never transition to any other state * `FAILED`: the task has failed * `CANCELLED`: the task has been cancelled due to user request ## Conclusion Tasks form the foundation of Tilebox Workflows. By understanding how to create and manage tasks, you can leverage Tilebox's capabilities to automate and optimize your workflows. Experiment with defining your own tasks, utilizing subtasks, managing dependencies, and employing semantic versioning to develop robust and efficient workflows. # Tilebox Workflows Source: https://docs.tilebox.com/workflows/introduction The Tilebox workflow orchestrator is a parallel processing engine. It simplifies the creation of dynamic tasks that can be executed across various computing environments, including on-premise and auto-scaling clusters in public clouds. This section provides guides showcasing how to use the Tilebox workflow orchestrator effectively. Here are some of the key learning areas: Create tasks using the Tilebox Workflow Orchestrator. Learn how to submit jobs to the workflow orchestrator, which schedules tasks for execution. Learn how to set up task runners to execute tasks in a distributed manner. Understand how to gain insights into task executions using observability features like tracing and logging. Learn to configure shared data access for all tasks of a job using caches. Trigger jobs based on events or schedules, such as new data availability or CRON schedules. ## Terminology Before exploring Tilebox Workflows in depth, familiarize yourself with some common terms used throughout this section. A Task is the smallest unit of work, designed to perform a specific operation. Each task represents a distinct operation or process that can be executed, such as processing data, performing calculations, or managing resources. Tasks can operate independently or as components of a more complex set of connected tasks known as a Workflow. Tasks are defined by their code, inputs, and dependencies on other tasks. To create tasks, you need to define the input parameters and specify the action to be performed during execution. A job is a specific execution of a workflow with designated input parameters. It consists of one or more tasks that can run in parallel or sequentially, based on their dependencies. Submitting a job involves creating a root task with specific input parameters, which may trigger the execution of other tasks within the same job. Task runners are the execution agents within the Tilebox Workflows ecosystem that execute tasks. They can be deployed in different computing environments, including on-premise servers and cloud-based auto-scaling clusters. Task runners execute tasks as scheduled by the workflow orchestrator, ensuring they have the necessary resources and environment for effective execution. Clusters are a logical grouping for task runners. Using clusters, you can scope certain tasks to a specific group of task runners. Tasks, which are always submitted to a specific cluster, are only executed on task runners assigned to the same cluster. Caches are shared storage that enable data storage and retrieval across tasks within a single job. They store intermediate results and share data among tasks, enabling distributed computing and reducing redundant data processing. Observability refers to the feature set in Tilebox Workflows that provides visibility into the execution of tasks and jobs. Tools like tracing and logging allow users to monitor performance, diagnose issues, and gain insights into job operations, enabling efficient troubleshooting and optimization. # Automations Source: https://docs.tilebox.com/workflows/near-real-time/automations Process data in near-real-time by triggering jobs based on external events This feature is only available in the Python SDK. ## Introduction Tilebox Workflows can execute jobs in two ways: a one-time execution triggered by a user, typically a batch processing, and near-real-time execution based on specific external events. By defining trigger conditions, you can automatically submit jobs based on external events. Tilebox Workflows currently supports the following trigger conditions: Trigger jobs based on a Cron schedule. Trigger jobs after objects are created or modified in a storage location such as a cloud bucket. Dataset Event Triggers, which will trigger jobs when new data points are ingested into a Tilebox dataset, are on the roadmap. Stay tuned for updates. ## Automations To create a trigger, define a special task that serves as a prototype. In response to a trigger condition met, this task will be submitted as a new job. Such tasks are referred to as automations. Each automation has a [task identifier](/workflows/concepts/tasks#task-identifiers), a [version](/workflows/concepts/tasks#semantic-versioning), and [input parameters](/workflows/concepts/tasks#input-parameters), just like regular tasks. Automations also automatically provide a special `trigger` attribute that contains information about the event that initiated the task's execution. Go doesn't support registering automations yet, please use python or the console instead. ## Automation Client The Tilebox Workflows client includes a sub-client for managing automations. You can create this sub-client by calling the `automations` method on the main client instance. ### Listing Registered Automations To list all registered automations, use the `all` method on the automation client. ```python Python from tilebox.workflows import Client client = Client() automations = client.automations() automations = automations.all() print(automations) ``` ```plaintext Output [ AutomationPrototype( name='Run MyCronTask every hour at 15 minutes past the hour', prototype=TaskSubmission( cluster_slug='dev-cluster', identifier=TaskIdentifier(name='MyCronTask', version='v0.0'), input=b'{"message": "Hello"}, dependencies=[], display='MyCronTask', max_retries=0), storage_event_triggers=[], cron_triggers=[CronTrigger(schedule='15 * * * *')], ) ] ``` ### Registering Automations To register an automation, use the `create_*_automation` methods specific to each trigger type provided by the automation client. Refer to the documentation for each trigger type for more details. ## Overview in the Tilebox Console You can also use the Tilebox Console to manage automations. Visit [the automations section](https://console.tilebox.com/workflows/automations) to check it out. Tilebox Workflows automations in the Tilebox Console Tilebox Workflows automations in the Tilebox Console You can also register new automations or edit and delete existing ones directly from the console. Tilebox Workflows automations in the Tilebox Console Tilebox Workflows automations in the Tilebox Console # Cron triggers Source: https://docs.tilebox.com/workflows/near-real-time/cron Trigger jobs based on a Cron schedule. This feature is only available in the Python SDK. ## Creating Cron tasks Cron tasks run repeatedly on a specified [cron](https://en.wikipedia.org/wiki/Cron) schedule. To create a Cron task, use `tilebox.workflows.automations.CronTask` as your tasks base class instead of the regular `tilebox.workflows.Task`. ```python Python from tilebox.workflows import ExecutionContext from tilebox.workflows.automations import CronTask class MyCronTask(CronTask): message: str def execute(self, context: ExecutionContext) -> None: print(f"Hello {self.message} from a Cron Task!") # self.trigger is an attribute of the CronTask class, # which contains information about the trigger event # that caused this task to be submitted as part of a job print(f"This task was triggered at {self.trigger.time}") ``` ## Registering a Cron trigger After implementing a Cron task, register it to be triggered according to a Cron schedule. When the Cron expression matches, a new job is submitted consisting of a single task instance derived from the Cron task prototype. ```python Python from tilebox.workflows import Client client = Client() automations = client.automations() cron_automation = automations.create_cron_automation( "my-cron-automation", # name of the cron automation "dev-cluster", # cluster slug to submit jobs to MyCronTask(message="World"), # the task (and its input parameters) to run repeatedly cron_triggers=[ "12 * * * *", # run every hour at minute 12 "45 18 * * *", # run every day at 18:45 "30 13 * * 3", # run every Wednesday at 13:30 ], ) ``` The syntax for specifying the cron triggers is a [CRON expression](https://en.wikipedia.org/wiki/Cron#CRON_expression). A helpful tool to test your cron expressions is [crontab.guru](https://crontab.guru/). ## Starting a Cron Task Runner With the Cron automation registered, a job is submitted whenever the Cron expression matches. But unless a [task runner](/workflows/concepts/task-runners) is available to execute the Cron task the submitted jobs remain in a task queue. Once an [eligible task runner](/workflows/concepts/task-runners#task-selection) becomes available, all jobs in the queue are executed. ```python Python from tilebox.workflows import Client client = Client() runner = client.runner("dev-cluster", tasks=[MyCronTask]) runner.run_forever() ``` If this task runner runs continuously, its output may resemble the following: ```plaintext Output Hello World from a Cron Task! This task was triggered at 2023-09-25 16:12:00 Hello World from a Cron Task! This task was triggered at 2023-09-25 17:12:00 Hello World from a Cron Task! This task was triggered at 2023-09-25 18:12:00 Hello World from a Cron Task! This task was triggered at 2023-09-25 18:45:00 Hello World from a Cron Task! This task was triggered at 2023-09-25 19:12:00 ``` ## Inspecting in the Console The [Tilebox Console](https://console.tilebox.com/workflows/automations) provides a straightforward way to inspect all registered Cron automations. Tilebox Workflows automations in the Tilebox Console Tilebox Workflows automations in the Tilebox Console Use the console to view, edit, and delete the registered Cron automations. ## Deleting Cron automations To delete a registered Cron automation, use `automations.delete`. After deletion, no new jobs will be submitted by that Cron trigger. Past jobs already triggered will still remain queued. ```python Python from tilebox.workflows import Client client = Client() automations = client.automations() # delete the automation as returned by create_cron_automation automations.delete(cron_automation) # or manually by id: automations.delete("0190bafc-b3b8-88c4-008b-a5db044380d0") ``` ## Submitting Cron jobs manually You can submit Cron tasks as regular tasks for testing purposes or as part of a larger workflow. To do so, instantiate the task with a specific trigger time using the `once` method. Submitting a job with a Cron task using `once` immediately schedules the task, and a runner may pick it up and execute it. The trigger time set in the `once` method does not influence the execution time; it only sets the `self.trigger.time` attribute for the Cron task. ```python Python from datetime import datetime, timezone job_client = client.jobs() # create a Cron task prototype task = MyCronTask(message="Hello") # submitting it directly won't work: raises ValueError: # CronTask cannot be submitted without being triggered. Use task.once(). job_client.submit("manual-cron-job", task, cluster="dev-cluster") # specify a trigger time to submit the task as a regular task triggered_task = task.once() # same as task.once(datetime.now()) job_client.submit("manual-cron-job", triggered_task, cluster="dev-cluster") # simulate a trigger at a specific time triggered_task = task.once(datetime(2030, 12, 12, 15, 15, tzinfo=timezone.utc)) # the task will be scheduled to run immediately, even with a future trigger time # but the self.trigger.time will be 2023-12-12T15:15:00Z for the task instance job_client.submit("manual-cron-job", triggered_task, cluster="dev-cluster") ``` # Storage Event Triggers Source: https://docs.tilebox.com/workflows/near-real-time/storage-events Trigger jobs after objects are created or modified in a storage location This feature is only available in the Python SDK. ## Creating a Storage Event Task Storage Event Tasks are automations triggered when objects are created or modified in a [storage location](#storage-locations). To create a Storage Event task, use `tilebox.workflows.automations.StorageEventTask` as your tasks base class instead of the regular `tilebox.workflows.Task`. ```python Python from tilebox.workflows import ExecutionContext from tilebox.workflows.automations import StorageEventTask, StorageEventType from tilebox.workflows.observability.logging import get_logger logger = get_logger() class LogObjectCreation(StorageEventTask): head_bytes: int def execute(self, context: ExecutionContext) -> None: # self.trigger is an attribute of the StorageEventTask class, # which contains information about the storage event that triggered this task if self.trigger.type == StorageEventType.CREATED: path = self.trigger.location logger.info(f"A new object was created: {path}") # trigger.storage is a storage client for interacting with the storage # location that triggered the event # using it, we can read the object to get its content as a bytes object data = self.trigger.storage.read(path) logger.info(f"The object's file size is {len(data)} bytes") logger.info(f"The object's first {self.head_bytes} bytes are: {data[:self.head_bytes]}") ``` ## Storage Locations Storage Event tasks are triggered when objects are created or modified in a storage location. This location can be a cloud storage bucket or a local file system. Tilebox supports the following storage locations: ### Registering a Storage Location To make a storage location available within Tilebox workflows, it must be registered first. This involves specifying the location and setting up a notification system that forwards events to Tilebox, enabling task triggering. The setup varies depending on the storage location type. For example, a GCP storage bucket is integrated by setting up a [PubSub Notification with a push subscription](https://cloud.google.com/storage/docs/pubsub-notifications). A local file system requires installing a filesystem watcher. To set up a storage location registered with Tilebox, please [get in touch](mailto:engineering@tilebox.com). ### Listing Available Storage Locations To list all available storage locations, use the `all` method on the storage location client. ```python Python from tilebox.workflows import Client client = Client() automations_client = client.automations() storage_locations = automations_client.storage_locations() print(storage_locations) ``` ```plaintext Output [ StorageLocation( location="gcp-project:gcs-bucket-fab3fa2", type=StorageType.GCS, ), StorageLocation( location="s3-bucket-263af15", type=StorageType.S3, ), StorageLocation( location='/path/to/a/local/folder', type=StorageType.FS, ), ] ``` ### Reading Files from a Storage Location Once a storage location is registered, you can read files from it using the `read` method on the storage client. ```python Python gcs_bucket = storage_locations[0] s3_bucket = storage_locations[1] local_folder = storage_locations[2] gcs_object = gcs_bucket.read("my-object.txt") s3_object = s3_bucket.read("my-object.txt") local_object = local_folder.read("my-object.txt") ``` The `read` method instantiates a client for the specific storage location. This requires that the storage location is accessible by a task runner and may require credentials for cloud storage or physical/network access to a locally mounted file system. To set up authentication and enable access to a GCS storage bucket, check out the [Google Client docs for authentication](https://cloud.google.com/docs/authentication/client-libraries#python). ## Registering a Storage Event Trigger After implementing a Storage Event task, register it to trigger each time a storage event occurs. This registration submits a new job consisting of a single task instance derived from the registered Storage Event task prototype. ```python Python from tilebox.workflows import Client client = Client() automations = client.automations() storage_event_automation = automations.create_storage_event_automation( "log-object-creations", # name of the storage event automation "dev-cluster", # cluster slug to submit jobs to LogObjectCreation(head_bytes=20), # the task (and its input parameters) to run repeatedly triggers=[ # you can specify a glob pattern: # run every time a .txt file is created anywhere in the gcs bucket (gcs_bucket, "**.txt"), ], ) ``` The syntax for specifying glob patterns follows [Standard Wildcards](https://tldp.org/LDP/GNU-Linux-Tools-Summary/html/x11655.htm). Additionally, you can use `**` as a super-asterisk, a matching operator not sensitive to slash separators. Here are some examples of valid glob patterns: | Pattern | Matches | | ----------- | ---------------------------------------------------------------------------- | | `*.ext` | Any file ending in `.ext` in the root directory | | `**/*.ext` | Any file ending in `.ext` in any subdirectory, but not in the root directory | | `**.ext` | Any file ending in `.ext` in any subdirectory, including the root directory | | `folder/*` | Any file directly in a `folder` subdirectory | | `folder/**` | Any file directly or recursively part of a `folder` subdirectory | | `[a-z].txt` | Matches `a.txt`, `b.txt`, etc. | ## Start a Storage Event Task Runner With the Storage Event automation registered, a job is submitted whenever a storage event occurs. But unless a [task runner](/workflows/concepts/task-runners) is available to execute the Storage Event task the submitted jobs remain in a task queue. Once an [eligible task runner](/workflows/concepts/task-runners#task-selection) becomes available, all jobs in the queue are executed. ```python Python from tilebox.workflows import Client client = Client() runner = client.runner("dev-cluster", tasks=[LogObjectCreation]) runner.run_forever() ``` ### Triggering an Event Creating an object in the bucket where the task is registered results in a job being submitted: ```bash Creating an object echo "Hello World" > my-object.txt gcloud storage cp my-object.txt gs://gcs-bucket-fab3fa2 ``` Inspecting the task runner output reveals that the job was submitted and the task executed: ```plaintext Output 2024-09-25 16:51:45,621 INFO A new object was created: my-object.txt 2024-09-25 16:51:45,857 INFO The object's file size is 12 bytes 2024-09-25 16:51:45,858 INFO The object's first 20 bytes are: b'Hello World\n' ``` ## Inspecting in the Console The [Tilebox Console](https://console.tilebox.com/workflows/automations) provides an easy way to inspect all registered storage event automations. Tilebox Workflows automations in the Tilebox Console Tilebox Workflows automations in the Tilebox Console ## Deleting Storage Event automations To delete a registered storage event automation, use `automations.delete`. After deletion, no new jobs will be submitted by the storage event trigger. Past jobs already triggered will still remain queued. ```python Python from tilebox.workflows import Client client = Client() automations = client.automations() # delete the automation as returned by create_storage_event_automation automations.delete(storage_event_automation) # or manually by id: automations.delete("0190bafc-b3b8-88c4-008b-a5db044380d0") ``` ## Submitting Storage Event jobs manually You can submit Storage event tasks as regular tasks for testing purposes or as part of a larger workflow. To do so, instantiate the task with a specific storage location and object name using the `once` method. ```python Python job_client = client.jobs() task = LogObjectCreation(head_bytes=20) # submitting it directly won't work; raises ValueError: # StorageEventTask cannot be submitted without being triggered. Use task.once(). job_client.submit( "manual-storage-event-job", task, cluster="dev-cluster" ) # instead, specify a trigger, # so that we can submit the task as a regular task triggered_task = task.once(gcs_bucket, "my-object.txt") job_client.submit( "manual-storage-event-job", triggered_task, cluster="dev-cluster" ) ``` # Logging Source: https://docs.tilebox.com/workflows/observability/logging Set up distributed logging using the OpenTelemetry logging protocol ## Overview Tilebox workflows are designed for distributed execution, making it essential to set up logging to a centralized system. Tilebox supports OpenTelemetry logging, which simplifies sending log messages from your tasks to a chosen backend. Collecting and visualizing logs from a distributed cluster of task runners in a tool like [Axiom](https://axiom.co/) can look like this: Tilebox Workflows logging in Axiom Tilebox Workflows logging in Axiom ## Configure logging The Tilebox workflow SDKs include support for exporting OpenTelemetry logs. To enable logging, call the appropriate configuration functions during the startup of your[task runner](/workflows/concepts/task-runners). Then, use the provided `logger` to send log messages from your tasks. To configure logging with Axiom, you first need to create a [Axiom Dataset](https://axiom.co/docs/reference/datasets) to export your workflow logs to. You will also need an [Axiom API key](https://axiom.co/docs/reference/tokens) with the necessary write permissions for your Axiom dataset. ```python Python from tilebox.workflows import Client, Task, ExecutionContext from tilebox.workflows.observability.logging import configure_otel_logging_axiom # your own workflow: from my_workflow import MyTask def main(): configure_otel_logging_axiom( # specify an Axiom dataset to export logs to dataset="my-axiom-logs-dataset", # along with an Axiom API key with ingest permissions for that dataset api_key="my-axiom-api-key", ) # the task runner will export logs from # the executed tasks to the specified dataset client = Client() runner = client.runner(tasks=[MyTask]) runner.run_forever() if __name__ == "__main__": main() ``` ```go Go package main import ( "context" "log/slog" "github.com/tilebox/tilebox-go/examples/workflows/axiom" "github.com/tilebox/tilebox-go/observability" "github.com/tilebox/tilebox-go/observability/logger" "github.com/tilebox/tilebox-go/workflows/v1" ) // specify a service name and version to identify the instrumenting application in traces and logs var service = &observability.Service{Name: "task-runner", Version: "dev"} func main() { ctx := context.Background() // Setup OpenTelemetry logging and slog // It uses AXIOM_API_KEY and AXIOM_LOGS_DATASET from the environment axiomHandler, shutdownLogger, err := logger.NewAxiomHandlerFromEnv(ctx, service, logger.WithLevel(slog.LevelInfo), // export logs at info level and above as OTEL logs ) defer shutdownLogger(ctx) if err != nil { slog.Error("failed to set up axiom log handler", slog.Any("error", err)) return } tileboxLogger := logger.New( // initialize a slog.Logger axiomHandler, // export logs to the Axiom handler logger.NewConsoleHandler(logger.WithLevel(slog.LevelWarn)), // and additionally, export WARN and ERROR logs to stdout ) slog.SetDefault(tileboxLogger) // all future slog calls will be forwarded to the tilebox logger client := workflows.NewClient() taskRunner, err := client.NewTaskRunner(ctx) if err != nil { slog.Error("failed to create task runner", slog.Any("error", err)) return } err = taskRunner.RegisterTasks(&MyTask{}) if err != nil { slog.Error("failed to register tasks", slog.Any("error", err)) return } taskRunner.RunForever(ctx) } ``` Setting the environment variables `AXIOM_API_KEY` and `AXIOM_LOGS_DATASET` allows you to omit these arguments in the `configure_otel_logging_axiom` function. If you are using another OpenTelemetry-compatible backend besides Axiom, such as OpenTelemetry Collector or Jaeger, you can configure logging by specifying the URL endpoint to export log messages to. ```python Python from tilebox.workflows import Client from tilebox.workflows.observability.logging import configure_otel_logging # your own workflow: from my_workflow import MyTask def main(): configure_otel_logging( # specify an endpoint to export logs to, such as a # locally running instance of OpenTelemetry Collector endpoint="http://localhost:4318/v1/logs", # optional headers for each request headers={"Authorization": "Bearer some-api-key"}, ) # the task runner will export logs from # the executed tasks to the specified endpoint client = Client() runner = client.runner(tasks=[MyTask]) runner.run_forever() if __name__ == "__main__": main() ``` ```go Go package main import ( "context" "log/slog" "github.com/tilebox/tilebox-go/examples/workflows/opentelemetry" "github.com/tilebox/tilebox-go/observability" "github.com/tilebox/tilebox-go/observability/logger" "github.com/tilebox/tilebox-go/workflows/v1" ) // specify a service name and version to identify the instrumenting application in traces and logs var service = &observability.Service{Name: "task-runner", Version: "dev"} func main() { ctx := context.Background() endpoint := "http://localhost:4318" headers := map[string]string{ "Authorization": "Bearer ", } // Setup an OpenTelemetry log handler, exporting logs to an OTEL compatible log endpoint otelHandler, shutdownLogger, err := logger.NewOtelHandler(ctx, service, logger.WithEndpointURL(endpoint), logger.WithHeaders(headers), logger.WithLevel(slog.LevelInfo), // export logs at info level and above as OTEL logs ) defer shutdownLogger(ctx) if err != nil { slog.Error("failed to set up otel log handler", slog.Any("error", err)) return } tileboxLogger := logger.New( // initialize a slog.Logger otelHandler, // export logs to the OTEL handler logger.NewConsoleHandler(logger.WithLevel(slog.LevelWarn)), // and additionally, export WARN and ERROR logs to stdout ) slog.SetDefault(tileboxLogger) // all future slog calls will be forwarded to the tilebox logger client := workflows.NewClient() taskRunner, err := client.NewTaskRunner(ctx) if err != nil { slog.Error("failed to create task runner", slog.Any("error", err)) return } err = taskRunner.RegisterTasks(&MyTask{}) if err != nil { slog.Error("failed to register tasks", slog.Any("error", err)) return } taskRunner.RunForever(ctx) } ``` If you set the environment variable `OTEL_LOGS_ENDPOINT`, you can omit that argument in the `configure_otel_logging` function. To log messages to the standard console output, use the `configure_console_logging` function. ```python Python from tilebox.workflows import Client from tilebox.workflows.observability.logging import configure_console_logging # your own workflow: from my_workflow import MyTask def main(): configure_console_logging() # the task runner will print log messages from # the executed tasks to the console client = Client() runner = client.runner(tasks=[MyTask]) runner.run_forever() if __name__ == "__main__": main() ``` ```go Go package main import ( "context" "log/slog" "github.com/tilebox/tilebox-go/examples/workflows/opentelemetry" "github.com/tilebox/tilebox-go/observability/logger" "github.com/tilebox/tilebox-go/workflows/v1" ) func main() { ctx := context.Background() tileboxLogger := logger.New(logger.NewConsoleHandler(logger.WithLevel(slog.LevelWarn))) slog.SetDefault(tileboxLogger) // all future slog calls will be forwarded to the tilebox logger client := workflows.NewClient() taskRunner, err := client.NewTaskRunner(ctx) if err != nil { slog.Error("failed to create task runner", slog.Any("error", err)) return } err = taskRunner.RegisterTasks(&MyTask{}) if err != nil { slog.Error("failed to register tasks", slog.Any("error", err)) return } taskRunner.RunForever(ctx) } ``` The console logging backend is not recommended for production use. Log messages will be emitted to the standard output of each task runner rather than a centralized logging system. It is intended for local development and testing of workflows. ## Emitting log messages Use the logger provided by the Tilebox SDK to emit log messages from your tasks. You can then use it to send log messages to the [configured logging backend](#configure-logging). Log messages emitted within a task's `execute` method are also automatically recorded as span events for the current [job trace](/workflows/observability/tracing). ```python Python import logging from tilebox.workflows import Task, ExecutionContext from tilebox.workflows.observability.logging import get_logger logger = get_logger() class MyTask(Task): def execute(self, context: ExecutionContext) -> None: # emit a log message to the configured OpenTelemetry backend logger.info("Hello world from configured logger!") ``` ```go Go package tasks import ( "context" "log/slog" ) type MyTask struct{} func (t *MyTask) Execute(context.Context) error { // emit a log message to the configured OpenTelemetry backend slog.Info("Hello world from configured logger!") return nil } ``` ## Logging task runner internals In python, Tilebox task runners also internally use a logger. By default, it's set to the WARNING level, but you can change it by explicitly configuring a logger for the workflows client when constructing the task runner. ```python Python from tilebox.workflows import Client from tilebox.workflows.observability.logging import configure_otel_logging_axiom from tilebox.workflows.observability.logging import get_logger # configure Axiom or another logging backend configure_otel_logging_axiom( dataset="my-axiom-logs-dataset", api_key="my-axiom-api-key", ) # configure a logger for the Tilebox client at the INFO level client = Client() client.configure_logger(get_logger(level=logging.INFO)) # now the task runner inherits this logger and uses # it to emit its own internal log messages as well runner = client.runner(tasks=[MyTask]) runner.run_forever() ``` # OpenTelemetry Integration Source: https://docs.tilebox.com/workflows/observability/open-telemetry Integrate OpenTelemetry into your Tilebox Workflows ## Observability Effective observability is essential for building reliable workflows. Understanding and monitoring the execution of workflows and their tasks helps ensure correctness and efficiency. This section describes methods to gain insights into your workflow's execution. ## OpenTelemetry Tilebox Workflows is designed with [OpenTelemetry](https://opentelemetry.io/) in mind, which provides a set of APIs and libraries for instrumenting, generating, collecting, and exporting telemetry data (metrics, logs, and traces) in distributed systems. Tilebox Workflows currently supports OpenTelemetry for tracing and logging, with plans to include metrics in the future. ## Integrations Tilebox exports telemetry data using the [OpenTelemetry Protocol](https://opentelemetry.io/docs/specs/otlp/). Tilebox is pre-integrated with Axiom Axiom, a cloud-based observability and telemetry platform, supports this protocol. Tilebox Workflows has built-in support for Axiom, and the examples and screenshots in this section come from this integration. Additionally, any other OpenTelemetry-compatible backend, such as OpenTelemetry Collector or Jaeger, can be used to collect telemetry data generated by Tilebox Workflows. # Tracing Source: https://docs.tilebox.com/workflows/observability/tracing Record the execution of your workflow tasks as OpenTelemetry traces and spans ## Overview Applying [OpenTelemetry traces](https://opentelemetry.io/docs/concepts/signals/traces/) to the concept of workflows allows you to monitor the execution of your jobs and their individual tasks. Visualizing the trace for a job in a tool like [Axiom](https://axiom.co/) may look like this: Tilebox Workflows tracing in Axiom Tilebox Workflows tracing in Axiom Tracing your workflows enables you to easily observe: * The order of task execution * Which tasks run in parallel * The [task runner](/workflows/concepts/task-runners) handling each task * The duration of each task * The outcome of each task (success or failure) This information helps identify bottlenecks and performance issues, ensuring that your workflows execute correctly. ## Configure tracing The Tilebox workflow SDKs have built-in support for exporting OpenTelemetry traces. To enable tracing, call the appropriate configuration functions during the startup of your [task runner](/workflows/concepts/task-runners). To configure tracing with Axiom, you first need to create a [Axiom Dataset](https://axiom.co/docs/reference/datasets) to export your workflow traces to. You will also need an [Axiom API key](https://axiom.co/docs/reference/tokens) with the necessary write permissions for your Axiom dataset. ```python Python from tilebox.workflows import Client from tilebox.workflows.observability.tracing import configure_otel_tracing_axiom # your own workflow: from my_workflow import MyTask def main(): configure_otel_tracing_axiom( # specify an Axiom dataset to export traces to dataset="my-axiom-traces-dataset", # along with an Axiom API key for ingest permissions api_key="my-axiom-api-key", ) # the following task runner generates traces for executed tasks and # exports trace and span data to the specified Axiom dataset client = Client() runner = client.runner(tasks=[MyTask]) runner.run_forever() if __name__ == "__main__": main() ``` ```go Go package main import ( "context" "log/slog" "github.com/tilebox/tilebox-go/examples/workflows/axiom" "github.com/tilebox/tilebox-go/observability" "github.com/tilebox/tilebox-go/observability/tracer" "github.com/tilebox/tilebox-go/workflows/v1" "go.opentelemetry.io/otel" ) // specify a service name and version to identify the instrumenting application in traces and logs var service = &observability.Service{Name: "task-runner", Version: "dev"} func main() { ctx := context.Background() // Setup an OpenTelemetry trace span processor, exporting traces and spans to Axiom // It uses AXIOM_API_KEY and AXIOM_TRACES_DATASET from the environment tileboxTracerProvider, shutdown, err := tracer.NewAxiomProviderFromEnv(ctx, service) defer shutdown(ctx) if err != nil { slog.Error("failed to set up axiom tracer provider", slog.Any("error", err)) return } otel.SetTracerProvider(tileboxTracerProvider) // set the tilebox tracer provider as the global OTEL tracer provider client := workflows.NewClient() taskRunner, err := client.NewTaskRunner(ctx) if err != nil { slog.Error("failed to create task runner", slog.Any("error", err)) return } err = taskRunner.RegisterTasks(&MyTask{}) if err != nil { slog.Error("failed to register tasks", slog.Any("error", err)) return } taskRunner.RunForever(ctx) } ``` Set the environment variables `AXIOM_API_KEY` and `AXIOM_TRACES_DATASET` to omit those arguments in the `configure_otel_tracing_axiom` function. If you are using another OpenTelemetry-compatible backend besides Axiom, like OpenTelemetry Collector or Jaeger, you can configure tracing by specifying the URL endpoint to export traces to. ```python Python from tilebox.workflows import Client from tilebox.workflows.observability.tracing import configure_otel_tracing # your own workflow: from my_workflow import MyTask def main(): configure_otel_tracing( # specify an endpoint for trace ingestion, such as a # locally running instance of OpenTelemetry Collector endpoint="http://localhost:4318/v1/traces", # optional headers for each request headers={"Authorization": "Bearer some-api-key"}, ) # the following task runner generates traces for executed tasks and # exports trace and span data to the specified endpoint client = Client() runner = client.runner(tasks=[MyTask]) runner.run_forever() if __name__ == "__main__": main() ``` ```go Go package main import ( "context" "log/slog" "github.com/tilebox/tilebox-go/examples/workflows/opentelemetry" "github.com/tilebox/tilebox-go/observability" "github.com/tilebox/tilebox-go/observability/tracer" "github.com/tilebox/tilebox-go/workflows/v1" "go.opentelemetry.io/otel" ) // specify a service name and version to identify the instrumenting application in traces and logs var service = &observability.Service{Name: "task-runner", Version: "dev"} func main() { ctx := context.Background() endpoint := "http://localhost:4318" headers := map[string]string{ "Authorization": "Bearer ", } // Setup an OpenTelemetry trace span processor, exporting traces and spans to an OTEL compatible trace endpoint tileboxTracerProvider, shutdown, err := tracer.NewOtelProvider(ctx, service, tracer.WithEndpointURL(endpoint), tracer.WithHeaders(headers), ) defer shutdown(ctx) if err != nil { slog.Error("failed to set up otel span processor", slog.Any("error", err)) return } otel.SetTracerProvider(tileboxTracerProvider) // set the tilebox tracer provider as the global OTEL tracer provider client := workflows.NewClient() taskRunner, err := client.NewTaskRunner(ctx) if err != nil { slog.Error("failed to create task runner", slog.Any("error", err)) return } err = taskRunner.RegisterTasks(&MyTask{}) if err != nil { slog.Error("failed to register tasks", slog.Any("error", err)) return } taskRunner.RunForever(ctx) } ``` Set the environment variable `OTEL_TRACES_ENDPOINT` to omit that argument in the `configure_otel_tracing` function. Once the runner picks up tasks and executes them, corresponding traces and spans are automatically generated and exported to the configured backend.