Skip to main content
def AutomationClient.create_storage_event_automation(
    name: str,
    task: StorageEventTask,
    triggers: list[tuple[StorageLocation, str]] | tuple[StorageLocation, str],
    cluster: ClusterSlugLike | None = None,
    max_retries: int = 0,
) -> AutomationPrototype
Create an automation that submits a task when objects are added to storage.

Parameters

name
str
required
Name of the automation to create.
task
StorageEventTask
required
Task to run when a matching storage event occurs.
triggers
list[tuple[StorageLocation, str]] | tuple[StorageLocation, str]
required
One trigger or a list of triggers. Each trigger contains a storage location and a glob pattern.
cluster
ClusterSlugLike | None
Cluster to run the task on. If not provided, the default cluster is used.
max_retries
int
Maximum number of retries for the task. Defaults to 0.

Returns

The created automation prototype.
location = client.automations().storage_locations()[0]

automation = client.automations().create_storage_event_automation(
    name="ingest-new-scenes",
    task=IngestScene(),
    triggers=(location, "incoming/**/*.json"),
)