The cache API is currently experimental and may undergo changes in the future. Many more features and new backends are on the roadmap. There might be breaking changes to the Cache API in the future.

Caches are configured at the task runner level. Because task runners can be deployed across multiple locations, caches must be accessible from all task runners contributing to a workflow.

Currently, the default cache implementation uses a Google Cloud Storage bucket, providing a scalable method to share data between tasks. For quick prototyping and local development, you can also use a local file system cache, which is included by default.

If needed, you can create your own cache backend by implementing the Cache interface.

Configuring a Cache

You can configure a cache while creating a task runner by passing a cache instance to the cache parameter. To use an in-memory cache, use tilebox.workflows.cache.InMemoryCache. This implementation is helpful for local development and quick testing. For alternatives, see the supported cache backends.

from tilebox.workflows import Client
from tilebox.workflows.cache import InMemoryCache

client = Client()
runner = client.runner(
    "dev-cluster",
    tasks=[...],
    cache=InMemoryCache(),
)

By configuring such a cache, the context object that is passed to the execution of each task gains access to a job_cache attribute that can be used to store and retrieve data from the cache.

Cache Backends

Tilebox Workflows comes with four cache implementations out of the box, each backed by a different storage system.

Google Storage Cache

A cache implementation backed by a Google cloud Storage bucket to store and retrieve data. It’s a reliable method for sharing data across tasks, suitable for production environments. You will need access to a GCP project and a bucket.

The Tilebox Workflow orchestrator uses the official Python Client for Google Cloud Storage. To set up authentication, refer to the official documentation.

from tilebox.workflows import Client
from tilebox.workflows.cache import GoogleStorageCache
from google.cloud.storage import Client as StorageClient

storage_client = StorageClient(project="gcp-project")
bucket = storage_client.bucket("cache-bucket")

client = Client()
runner = client.runner(
    "dev-cluster",
    tasks=[...],
    cache=GoogleStorageCache(bucket, prefix="jobs"),
)

The prefix parameter is optional and can be used to set a common prefix for all cache keys, which helps organize objects within a bucket when re-using the same bucket for other purposes.

Amazon S3 Cache

A cache implementation backed by an Amazon S3 bucket to store and retrieve data. Like the Google Cloud Storage option, it’s reliable and scalable for production use.

Tilebox utilizes the boto3 library to communicate with Amazon S3. For the necessary authentication setup, refer to its documentation.

from tilebox.workflows import Client
from tilebox.workflows.cache import AmazonS3Cache

client = Client()
runner = client.runner(
    "dev-cluster",
    tasks=[...],
    cache=AmazonS3Cache("my-bucket-name", prefix="jobs")
)

The prefix parameter is optional and can be used to set a common prefix for all cache keys, which helps organize objects within a bucket when re-using the same bucket for other purposes.

Local File System Cache

A cache implementation backed by a local file system. It’s suitable for quick prototyping and local development, assuming all task runners share the same machine or access the same file system.

from tilebox.workflows import Client
from tilebox.workflows.cache import LocalFileSystemCache

client = Client()
runner = client.runner(
    "dev-cluster",
    tasks=[...],
    cache=LocalFileSystemCache("/path/to/cache/directory"),
)

Local file system caches can also be used in conjunction with network attached storage or similar tools, making it a viable option also for distributed setups.

In-Memory Cache

A simple in-memory cache useful for quick prototyping and development. The data is not shared between task runners and is lost upon task runner restarts. Use this cache only for workflows executed on a single task runner.

from tilebox.workflows import Client
from tilebox.workflows.cache import InMemoryCache

client = Client()
runner = client.runner(
    "dev-cluster",
    tasks=[...],
    cache=InMemoryCache(),
)

Data Isolation

Caches are isolated per job, meaning that each job’s cache data is only accessible to tasks within that job. This setup prevents key conflicts between different job executions. Currently, accessing cache data of other jobs is not supported.

The capability to share data across jobs is planned for future updates. This feature will be beneficial for real-time processing workflows or workflows requiring auxiliary data from external sources.

Storing and Retrieving Data

The job cache can be accessed via the ExecutionContext passed to a tasks execute function. This job_cache object provides methods to handle data storage and retrieval from the cache. The specifics of data storage depend on the chosen cache backend.

The cache API is designed to be simple and can handle all types of data, supporting binary data in the form of bytes, identified by str cache keys. This allows for storing many different data types, such as pickled Python objects, serialized JSON, UTF-8, or binary data.

The following snippet illustrates storing and retrieving data from the cache.

from tilebox.workflows import Task, ExecutionContext

class ProducerTask(Task):
    def execute(self, context: ExecutionContext) -> None:
        # store data in the cache
        context.job_cache["data"] = b"my_binary_data_to_store"
        context.submit_subtask(ConsumerTask())

class ConsumerTask(Task):
    def execute(self, context: ExecutionContext) -> None:
        data = context.job_cache["data"]
        print(f"Read {data} from cache")

In this example, data stored under the key "data" can be any size that fits the cache backend constraints. Ensure the key remains unique within the job’s scope to avoid conflicts.

To test the workflow, you can start a local task runner using the InMemoryCache backend. Then, submit a job to execute the ProducerTask and observe the output of the ConsumerTask.

# submit a job to test our workflow
job_client = client.jobs()
job_client.submit("testing-cache-access", ProducerTask(), cluster="dev-cluster")

# start a runner to execute it
runner = client.runner(
    "dev-cluster",
    tasks=[ProducerTask, ConsumerTask],
    cache=LocalFileSystemCache("/path/to/cache/directory"),
)
runner.run_forever()
Output
Read b'my_binary_data_to_store' from cache

Groups and Hierarchical Keys

The cache API supports groups and hierarchical keys, analogous to directories and files in a file system. Groups help organize cache keys hierarchically, preventing key conflicts and allowing data to be structured better. Additionally, groups are iterable, enabling retrieval of all keys within the group. This feature is useful when multiple tasks create cache data, and a later task needs to list all produced data by earlier tasks.

The following code shows an example of how cache groups can be used.

from tilebox.workflows import Task, ExecutionContext
import random

class CacheGroupDemoWorkflow(Task):
    n: int

    def execute(self, context: ExecutionContext):
        # define a cache group key for subtasks
        group_key = "random_numbers"
        produce_numbers = context.submit_subtask(
            ProduceRandomNumbers(self.n, group_key)
        )
        sum_task = context.submit_subtask(
            PrintSum(group_key),
            depends_on=[produce_numbers],
        )

class ProduceRandomNumbers(Task):
    n: int
    group_key: str

    def execute(self, context: ExecutionContext):
        for i in range(self.n):
            context.submit_subtask(ProduceRandomNumber(i, self.group_key))

class ProduceRandomNumber(Task):
    index: int
    group_key: str

    def execute(self, context: ExecutionContext) -> None:
        number = random.randint(0, 100)
        group = context.job_cache.group(self.group_key)
        group[f"key_{self.index}"] = str(number).encode()

class PrintSum(Task):
    group_key: str

    def execute(self, context: ExecutionContext) -> None:
        group = context.job_cache.group(self.group_key)

        # PrintSum doesn't know how many numbers were produced,
        # instead it iterates over all keys in the cache group
        numbers = []
        for key in group:  # iterate over all stored numbers
            number = group[key]  # read data from cache
            numbers.append(int(number.decode()))  # convert bytes back to int

        print(f"Sum of all numbers: {sum(numbers)}")

Submitting a job of the CacheGroupDemoWorkflow and running it with a task runner can be done as follows:

# submit a job to test our workflow
job_client = client.jobs()
job_client.submit("cache-groups", CacheGroupDemoWorkflow(5), cluster="dev-cluster")

# start a runner to execute it
runner = client.runner(
    "dev-cluster",
    tasks=[CacheGroupDemoWorkflow, ProduceRandomNumbers, ProduceRandomNumber, PrintSum],
    cache=LocalFileSystemCache("/path/to/cache/directory"),
)
runner.run_forever()
Output
Sum of all numbers: 284