> ## Documentation Index
> Fetch the complete documentation index at: https://docs.tilebox.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Caches

> Sharing data between tasks is crucial for workflows, especially in satellite imagery processing, where large datasets are the norm. Tilebox Workflows offers a straightforward API for storing and retrieving data from a shared cache.

<Warning>
  The cache API is currently experimental and may undergo changes in the future. Many more features and new [backends](#cache-backends) are on the roadmap. There might be breaking changes to the Cache API in the future.
</Warning>

Caches are configured at the [task runner](/workflows/concepts/task-runners) level. Because task runners can be deployed across multiple locations, caches must be accessible from all task runners contributing to a workflow.

Currently, the default cache implementation uses a Google Cloud Storage bucket, providing a scalable method to share data between tasks. For quick prototyping and local development, you can also use a local file system cache, which is included by default.

If needed, you can create your own cache backend by implementing the `Cache` interface.

## Configuring a Cache

You can configure a cache while creating a task runner by passing a cache instance to the `cache` parameter. To use an in-memory cache, use `tilebox.workflows.cache.InMemoryCache`. This implementation is helpful for local development and quick testing. For alternatives, see the supported [cache backends](#cache-backends).

<CodeGroup>
  ```python Python theme={"system"}
  from tilebox.workflows import Client
  from tilebox.workflows.cache import InMemoryCache

  client = Client()
  runner = client.runner(
      tasks=[...],
      cache=InMemoryCache(),
  )
  ```
</CodeGroup>

By configuring such a cache, the `context` object that is passed to the execution of each task gains access to a `job_cache`
attribute that can be used to [store and retrieve data](#storing-and-retrieving-data) from the cache.

## Cache Backends

Tilebox Workflows comes with four cache implementations out of the box, each backed by a different storage system.

<Columns cols={2}>
  <Card title="Google Cloud Storage" icon="google" href="#google-storage-cache" horizontal />

  <Card title="Amazon S3" icon="aws" href="#amazon-s3-cache" horizontal />

  <Card title="Local File System" icon="folder-open" href="#local-file-system-cache" horizontal />

  <Card title="In-Memory" icon="memory" href="#in-memory-cache" horizontal />
</Columns>

### Google Storage Cache

A cache implementation backed by a Google cloud Storage bucket to store and retrieve data. It's a reliable method for sharing data across tasks, suitable for production environments. You will need access to a GCP project and a bucket.

The Tilebox Workflow orchestrator uses the official Python Client for Google Cloud Storage. To set up authentication, refer to the <Link href="https://cloud.google.com/storage/docs/reference/libraries#authentication" target="_blank">official documentation</Link>.

<CodeGroup title="GoogleStorageCache">
  ```python Python theme={"system"}
  from tilebox.workflows import Client
  from tilebox.workflows.cache import GoogleStorageCache
  from google.cloud.storage import Client as StorageClient

  storage_client = StorageClient(project="gcp-project")
  bucket = storage_client.bucket("cache-bucket")

  client = Client()
  runner = client.runner(
      tasks=[...],
      cache=GoogleStorageCache(bucket, prefix="jobs"),
  )
  ```
</CodeGroup>

<Tip>
  The `prefix` parameter is optional and can be used to set a common prefix for all cache keys, which helps organize objects within a bucket when re-using the same bucket for other purposes.
</Tip>

### Amazon S3 Cache

A cache implementation backed by an Amazon S3 bucket to store and retrieve data. Like the Google Cloud Storage option, it's reliable and scalable for production use.

Tilebox utilizes the `boto3` library to communicate with Amazon S3. For the necessary authentication setup, refer to [its documentation](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html#configuration).

<CodeGroup>
  ```python Python theme={"system"}
  from tilebox.workflows import Client
  from tilebox.workflows.cache import AmazonS3Cache

  client = Client()
  runner = client.runner(
      tasks=[...],
      cache=AmazonS3Cache("my-bucket-name", prefix="jobs")
  )
  ```
</CodeGroup>

<Tip>
  The `prefix` parameter is optional and can be used to set a common prefix for all cache keys, which helps organize objects within a bucket when re-using the same bucket for other purposes.
</Tip>

### Local File System Cache

A cache implementation backed by a local file system. It's suitable for quick prototyping and local development, assuming all task runners share the same machine or access the same file system.

<CodeGroup>
  ```python Python theme={"system"}
  from tilebox.workflows import Client
  from tilebox.workflows.cache import LocalFileSystemCache

  client = Client()
  runner = client.runner(
      tasks=[...],
      cache=LocalFileSystemCache("/path/to/cache/directory"),
  )
  ```
</CodeGroup>

<Tip>
  Local file system caches can also be used in conjunction with network attached storage or similar tools, making it a viable option also for distributed setups.
</Tip>

### In-Memory Cache

A simple in-memory cache useful for quick prototyping and development. The data is not shared between task runners and is lost upon task runner restarts. Use this cache only for workflows executed on a single task runner.

<CodeGroup>
  ```python Python theme={"system"}
  from tilebox.workflows import Client
  from tilebox.workflows.cache import InMemoryCache

  client = Client()
  runner = client.runner(
      tasks=[...],
      cache=InMemoryCache(),
  )
  ```
</CodeGroup>

## Data Isolation

Caches are isolated per job, meaning that each job's cache data is only accessible to tasks within that job. This setup prevents key conflicts between different job executions. Currently, accessing cache data of other jobs is not supported.

<Note>
  The capability to share data across jobs is planned for future updates. This feature will be beneficial for real-time processing workflows or workflows requiring auxiliary data from external sources.
</Note>

## Storing and Retrieving Data

The job cache can be accessed via the `ExecutionContext` passed to a tasks `execute` function. This [`job_cache`](/api-reference/python/tilebox.workflows/ExecutionContext.job_cache) object provides methods to handle data storage and retrieval from the cache. The specifics of data storage depend on the chosen cache backend.

The cache API is designed to be simple and can handle all types of data, supporting binary data in the form of `bytes`, identified by `str` cache keys. This allows for storing many different data types, such as pickled Python objects, serialized JSON, UTF-8, or binary data.

The following snippet illustrates storing and retrieving data from the cache.

<CodeGroup>
  ```python Python theme={"system"}
  from tilebox.workflows import Task, ExecutionContext

  class ProducerTask(Task):
      def execute(self, context: ExecutionContext) -> None:
          # store data in the cache
          context.job_cache["data"] = b"my_binary_data_to_store"
          context.submit_subtask(ConsumerTask())

  class ConsumerTask(Task):
      def execute(self, context: ExecutionContext) -> None:
          data = context.job_cache["data"]
          context.logger.info("Read data from cache", data=data.decode())
  ```
</CodeGroup>

In this example, data stored under the key `"data"` can be any size that fits the cache backend constraints. Ensure the key remains unique within the job's scope to avoid conflicts.

To test the workflow, you can start a local task runner using the `InMemoryCache` backend. Then, submit a job to execute the `ProducerTask` and inspect the logs emitted by the `ConsumerTask`.

<CodeGroup>
  ```python Python theme={"system"}
  # submit a job to test our workflow
  job_client = client.jobs()
  job_client.submit("testing-cache-access", ProducerTask())

  # start a runner to execute it
  runner = client.runner(
      tasks=[ProducerTask, ConsumerTask],
      cache=LocalFileSystemCache("/path/to/cache/directory"),
  )
  runner.run_forever()
  ```
</CodeGroup>

```plaintext Logs theme={"system"}
Read data from cache data=my_binary_data_to_store
```

## Groups and Hierarchical Keys

The cache API supports groups and hierarchical keys, analogous to directories and files in a file system. Groups help organize cache keys hierarchically, preventing key conflicts and allowing data to be structured better. Additionally, groups are iterable, enabling retrieval of all keys within the group. This feature is useful when multiple tasks create cache data, and a later task needs to list all produced data by earlier tasks.

The following code shows an example of how cache groups can be used.

<CodeGroup>
  ```python Python {39,44-45} theme={"system"}
  from tilebox.workflows import Task, ExecutionContext
  import random

  class CacheGroupDemo(Task):
      n: int

      def execute(self, context: ExecutionContext):
          # define a cache group key for subtasks
          group_key = "random_numbers"
          produce_numbers = context.submit_subtask(
              ProduceRandomNumbers(self.n, group_key)
          )
          sum_task = context.submit_subtask(
              PrintSum(group_key),
              depends_on=[produce_numbers],
          )

  class ProduceRandomNumbers(Task):
      n: int
      group_key: str

      def execute(self, context: ExecutionContext):
          for i in range(self.n):
              context.submit_subtask(ProduceRandomNumber(i, self.group_key))

  class ProduceRandomNumber(Task):
      index: int
      group_key: str

      def execute(self, context: ExecutionContext) -> None:
          number = random.randint(0, 100)
          group = context.job_cache.group(self.group_key)
          group[f"key_{self.index}"] = str(number).encode()

  class PrintSum(Task):
      group_key: str

      def execute(self, context: ExecutionContext) -> None:
          group = context.job_cache.group(self.group_key)

          # PrintSum doesn't know how many numbers were produced,
          # instead it iterates over all keys in the cache group
          numbers = []
          for key in group:  # iterate over all stored numbers
              number = group[key]  # read data from cache
              numbers.append(int(number.decode()))  # convert bytes back to int

          context.logger.info("Computed sum of all numbers", total=sum(numbers))
  ```
</CodeGroup>

Submitting a job of the `CacheGroupDemo` and running it with a task runner can be done as follows:

<CodeGroup>
  ```python Python theme={"system"}
  # submit a job to test our workflow
  job_client = client.jobs()
  job_client.submit("cache-groups", CacheGroupDemo(5))

  # start a runner to execute it
  runner = client.runner(
      tasks=[CacheGroupDemo, ProduceRandomNumbers, ProduceRandomNumber, PrintSum],
      cache=LocalFileSystemCache("/path/to/cache/directory"),
  )
  runner.run_forever()
  ```
</CodeGroup>

```plaintext Logs theme={"system"}
Computed sum of all numbers total=284
```
