Creating a Storage Event Task

Storage Event Tasks are recurring tasks triggered when objects are created or modified in a storage location. To create a Cron task, use tilebox.workflows.recurrent_tasks.StorageEventTask as your tasks base class instead of the regular tilebox.workflows.Task.

Python
from tilebox.workflows import ExecutionContext
from tilebox.workflows.recurrent_tasks import StorageEventTask, StorageEventType
from tilebox.workflows.observability.logging import get_logger

logger = get_logger()

class LogObjectCreation(StorageEventTask):
    head_bytes: int

    def execute(self, context: ExecutionContext) -> None:
        # self.trigger is an attribute of the StorageEventTask class,
        # which contains information about the storage event that triggered this task
        if self.trigger.type == StorageEventType.CREATED:
            path = self.trigger.location
            logger.info(f"A new object was created: {path}")

            # trigger.storage is a storage client for interacting with the storage
            # location that triggered the event
            # using it, we can read the object to get its content as a bytes object
            data = self.trigger.storage.read(path)
            logger.info(f"The object's file size is {len(data)} bytes")
            logger.info(f"The object's first {self.head_bytes} bytes are: {data[:self.head_bytes]}")

Storage Locations

Storage Event tasks are triggered when objects are created or modified in a storage location. This location can be a cloud storage bucket or a local file system. Tilebox supports the following storage locations:

Google Cloud Storage

Amazon S3

Local File System

Registering a Storage Location

To make a storage location available within Tilebox workflows, it must be registered first. This involves specifying the location and setting up a notification system that forwards events to Tilebox, enabling task triggering. The setup varies depending on the storage location type.

For example, a GCP storage bucket is integrated by setting up a PubSub Notification with a push subscription. A local file system requires installing a filesystem watcher. To set up a storage location registered with Tilebox, please get in touch.

Listing Available Storage Locations

To list all available storage locations, use the all method on the storage location client.

Python
from tilebox.workflows import Client

client = Client()
recurrent_tasks_client = client.recurrent_tasks()

storage_locations = recurrent_tasks_client.storage_locations()
print(storage_locations)
Output
[
    StorageLocation(
        location="gcp-project:gcs-bucket-fab3fa2",
        type=StorageType.GCS,
    ),
    StorageLocation(
        location="s3-bucket-263af15",
        type=StorageType.S3,
    ),
    StorageLocation(
        location='/path/to/a/local/folder',
        type=StorageType.FS,
    ),
]

Reading Files from a Storage Location

Once a storage location is registered, you can read files from it using the read method on the storage client.

Python
gcs_bucket = storage_locations[0]
s3_bucket = storage_locations[1]
local_folder = storage_locations[2]

gcs_object = gcs_bucket.read("my-object.txt")
s3_object = s3_bucket.read("my-object.txt")
local_object = local_folder.read("my-object.txt")

The read method instantiates a client for the specific storage location. This requires that the storage location is accessible by a task runner and may require credentials for cloud storage or physical/network access to a locally mounted file system.

To set up authentication and enable access to a GCS storage bucket, check out the Google Client docs for authentication.

Registering a Storage Event Trigger

After implementing a Storage Event task, register it to trigger each time a storage event occurs. This registration submits a new job consisting of a single task instance derived from the registered Storage Event task prototype.

Python
from tilebox.workflows import Client

client = Client()
recurrent_tasks = client.recurrent_tasks()
storage_event_task = recurrent_tasks.create_recurring_storage_event_task(
    "log-object-creations",  # name of the recurring storage event task
    "dev-cluster",  # cluster slug to submit jobs to
    LogObjectCreation(head_bytes=20),  # the task (and its input parameters) to run repeatedly
    triggers=[
        # you can specify a glob pattern:
        # run every time a .txt file is created anywhere in the gcs bucket
        (gcs_bucket, "**.txt"),  
    ],
)

The syntax for specifying glob patterns follows Standard Wildcards. Additionally, you can use ** as a super-asterisk, a matching operator not sensitive to slash separators.

Here are some examples of valid glob patterns:

PatternMatches
*.extAny file ending in .ext in the root directory
**/*.extAny file ending in .ext in any subdirectory, but not in the root directory
**.extAny file ending in .ext in any subdirectory, including the root directory
folder/*Any file directly in a folder subdirectory
folder/**Any file directly or recursively part of a folder subdirectory
[a-z].txtMatches a.txt, b.txt, etc.

Start a Storage Event Task Runner

With the Storage Event task registered, a job is submitted whenever a storage event occurs. But unless a task runner is available to execute the Storage Event task the submitted jobs remain in a task queue. Once an eligible task runner becomes available, all jobs in the queue are executed.

Python
from tilebox.workflows import Client

client = Client()
runner = client.runner("dev-cluster", tasks=[LogObjectCreation])
runner.run_forever()

Triggering an Event

Creating an object in the bucket where the task is registered results in a job being submitted:

Creating an object
echo "Hello World" > my-object.txt
gcloud storage cp my-object.txt gs://gcs-bucket-fab3fa2

Inspecting the task runner output reveals that the job was submitted and the task executed:

Output
2024-09-25 16:51:45,621 INFO A new object was created: my-object.txt
2024-09-25 16:51:45,857 INFO The object's file size is 12 bytes
2024-09-25 16:51:45,858 INFO The object's first 20 bytes are: b'Hello World\n'

Inspecting in the Console

The Tilebox Console provides an easy way to inspect all registered storage event tasks.

Deleting Storage Event triggers

To delete a registered storage event task, use recurrent_tasks.delete. After deletion, no new jobs will be submitted by the storage event trigger. Past jobs already triggered will still remain queued.

Python
from tilebox.workflows import Client

client = Client()
recurrent_tasks = client.recurrent_tasks()

# delete the task as returned by create_recurring_storage_event_task
recurrent_tasks.delete(storage_event_task)

# or manually by id:
recurrent_tasks.delete("0190bafc-b3b8-88c4-008b-a5db044380d0")

Submitting Storage Event jobs manually

You can submit Storage event tasks as regular tasks for testing purposes or as part of a larger workflow. To do so, instantiate the task with a specific storage location and object name using the once method.

Python
job_client = client.jobs()

task = LogObjectCreation(head_bytes=20)

# submitting it directly won't work; raises ValueError:
# StorageEventTask cannot be submitted without being triggered. Use task.once().
job_client.submit(
    "manual-storage-event-job",
    task,
    cluster="dev-cluster"
)

# instead, specify a trigger,
# so that we can submit the task as a regular task
triggered_task = task.once(gcs_bucket, "my-object.txt")
job_client.submit(
    "manual-storage-event-job",
    triggered_task,
    cluster="dev-cluster"
)