def AutomationClient.create_storage_event_automation(
name: str,
task: StorageEventTask,
triggers: list[tuple[StorageLocation, str]] | tuple[StorageLocation, str],
cluster: ClusterSlugLike | None = None,
max_retries: int = 0,
) -> AutomationPrototype
Create an automation that submits a task when objects are added to storage.
Parameters
Name of the automation to create.
Task to run when a matching storage event occurs.
triggers
list[tuple[StorageLocation, str]] | tuple[StorageLocation, str]
required
One trigger or a list of triggers. Each trigger contains a storage location and a glob pattern.
Cluster to run the task on. If not provided, the default cluster is used.
Maximum number of retries for the task. Defaults to 0.
Returns
The created automation prototype.
location = client.automations().storage_locations()[0]
automation = client.automations().create_storage_event_automation(
name="ingest-new-scenes",
task=IngestScene(),
triggers=(location, "incoming/**/*.json"),
)