Creating a Storage Event Task

Storage Event Tasks are recurrent tasks that are triggered when objects are created or modified in a storage location. To create a Storage Event task, subclass StorageEventTask class and overwrite it’s execute method just as you would any other regular task.

Python
from tilebox.workflows import ExecutionContext
from tilebox.workflows.recurrent_tasks import StorageEventTask, StorageEventType
from tilebox.workflows.observability.logging import get_logger

logger = get_logger()

class LogObjectCreation(StorageEventTask):
    head_bytes: int

    def execute(self, context: ExecutionContext) -> None:
        # self.trigger is an attribute of the StorageEventTask class,
        # which contains information about the storage event that triggered this task
        if self.trigger.type == StorageEventType.CREATED:
            path = self.trigger.location
            logger.info(f"A new object was created: {path}")

            # trigger.storage is a storage client for interacting with the storage
            # location that triggered the event
            # using it, we can read the object to get its content as a bytes object
            data = self.trigger.storage.read(path)
            logger.info(f"The object's file size is {len(data)} bytes")
            logger.info(f"The object's first {self.head_bytes} bytes are: {data[:self.head_bytes]}")

Storage locations

Storage Event tasks are triggered when objects are created or modified in a storage location. This storage location can be a buckets in a cloud storage or a local file system. Tilebox supports the following storage locations:

Google Cloud Storage

Amazon S3

Local File System

Registering a storage location

In order for a storage location to be available within Tilebox workflows, it needs to be registered first. Registering not only consists of specifying the location, but also setting up a notification system that forwards events to Tilebox, such that tasks can be triggered. How this notifier is set up depends on the storage location type.

For example, a GCP storage bucket is integrated by setting up a PubSub Notification with a push subscription. A local file system requires the installation of a filesystem watcher. If you are interested in setting up a storage location registered with Tilebox, please get in touch.

Listing available storage locations

To list all available storage locations, you can use the all method on the storage location client.

Python
from tilebox.workflows import Client

client = Client()
recurrent_tasks_client = client.recurrent_tasks()

storage_locations = recurrent_tasks_client.storage_locations()
print(storage_locations)
Output
[
    StorageLocation(
        location="gcp-project:gcs-bucket-fab3fa2",
        type=StorageType.GCS,
    ),
    StorageLocation(
        location="s3-bucket-263af15",
        type=StorageType.S3,
    ),
    StorageLocation(
        location='/path/to/a/local/folder',
        type=StorageType.FS,
    ),
]

Reading files from a storage location

Once a storage location is registered, you can read files from it using the read method on the storage client.

Python
gcs_bucket = storage_locations[0]
s3_bucket = storage_locations[1]
local_folder = storage_locations[2]

gcs_object = gcs_bucket.read("my-object.txt")
s3_object = s3_bucket.read("my-object.txt")
local_object = local_folder.read("my-object.txt")

read instantiates a client for the specific storage location. This of course requires that the storage location is actually accessible by a task runner and may require credentials for a cloud storage, or physical/network access to a locally mounted file system.

To set up authentication and enable access to a GCS storage bucket, check out the Google Client docs for authentication.

Registering a Storage Event Trigger

Once a Storage Event task is implemented, it can be registered to be triggered every time a storage event occurs in a storage location. This means, every time a new object is created or modified in a storage location, a new job gets be submitted, consisting of a single task instance derived from the Storage Event registered task prototype.

Python
from tilebox.workflows import Client

client = Client()
recurrent_tasks = client.recurrent_tasks()
storage_event_task = recurrent_tasks.create_recurring_storage_event_task(
    "log-object-creations",  # name of the recurring storage event task
    "dev-cluster",  # cluster slug to submit jobs to
    LogObjectCreation(head_bytes=20),  # the task (and its input parameters) to run repeatedly
    triggers=[
        # you can specify a glob pattern:
        # run every time a .txt file is created anywhere in the gcs bucket
        (gcs_bucket, "**.txt"),  
    ],
)

The syntax for specifying glob patterns are Standard Wildcards. Additionally you can use ** as super-asterisk, a matching operator not sensitive to slash separators.

Below are some examples of valid glob patterns:

PatternMatches
*.extAny file ending in .ext in the root directory
**/*.extAny file ending in .ext in any subdirectory, but not in the root directory
**.extAny file ending in .ext in any subdirectory, including the root directory
folder/*Any file directly in a folder subdirectory
folder/**Any file directly or recursively part of a folder subdirectory
[a-z].txtMatches a.txt, b.txt, etc.

Start a Task Runner capable of executing Storage Event Tasks

With the Storage Event task registered, a job gets submitted whenever the storage event occurs. But for the tasks to actually run, a task runner needs to be available capable of executing the Storage Event task. If no task runner is available, the submitted jobs remain in a task queue. Once a eligible task runner becomes available, all jobs in the queue are picked up and executed.

Python
from tilebox.workflows import Client

client = Client()
runner = client.runner("dev-cluster", tasks=[LogObjectCreation])
runner.run_forever()

Triggering an event

Creating an object in the bucket that the task is registered on results in a job being submitted:

Creating an object
echo "Hello World" > my-object.txt
gcloud storage cp my-object.txt gs://gcs-bucket-fab3fa2

Inspecting the task runner output now, reveals that the job was submitted and the task executed:

Output
2024-09-25 16:51:45,621 INFO A new object was created: my-object.txt
2024-09-25 16:51:45,857 INFO The object's file size is 12 bytes
2024-09-25 16:51:45,858 INFO The object's first 20 bytes are: b'Hello World\n'

Inspecting the registered Storage Event Task

The Tilebox Console offers a convenient way to inspect all the recurrent tasks that are registered to run on a schedule.

Once you’ve registered a storage event task, you can inspect it in the console. Here is an example of what it looks like:

Deleting Storage Event Triggers

To delete a registered storage event task, use recurrent_tasks.delete. After deletion, no new jobs get submitted by a storage event trigger. But jobs submitted in the past, which are still in the queue, remain in the queue and can still be picked up by a task runner.

Python
from tilebox.workflows import Client

client = Client()
recurrent_tasks = client.recurrent_tasks()

# delete the task as returned by create_recurring_storage_event_task
recurrent_tasks.delete(storage_event_task)

# or manually by id:
recurrent_tasks.delete("0190bafc-b3b8-88c4-008b-a5db044380d0")

Submitting Storage Event jobs manually

To test storage event tasks, they can be submitted as regular tasks. This is useful for testing purposes, or if you want to submit a storage event task as part of a larger workflow. To do so, it needs to be instantiated with a specific trigger time using the once method.

Submitting a job with a storage event task once immediately schedules the task, and a runner could pick it up and execute it immediately. The storage location and object location specified in the once method only influence the self.trigger attribute that the storage event task receives.

Python
job_client = client.jobs()

task = LogObjectCreation(head_bytes=20)

# submitting it directly won't work: raises ValueError:
# StorageEventTask cannot be submitted without being triggered. Use task.once().
job_client.submit(
    "manual-storage-event-job",
    task,
    cluster="dev-cluster"
)

# instead we need to specify a trigger,
# so that we can submit the task as a regular task
triggered_task = task.once(gcs_bucket, "my-object.txt")
job_client.submit(
    "manual-storage-event-job",
    triggered_task,
    cluster="dev-cluster"
)