Cron Triggers
Trigger jobs based on a Cron schedule.
Creating a Cron Task
Cron Tasks are recurrent tasks that are triggered on a Cron schedule.
To create a Cron task, subclass CronTask
class and overwrite it’s execute
method exactly
as you would any other regular task.
from tilebox.workflows import ExecutionContext
from tilebox.workflows.recurrent_tasks import CronTask
class MyCronTask(CronTask):
message: str
def execute(self, context: ExecutionContext) -> None:
print(f"Hello {self.message} from a Cron Task!")
# self.trigger is an attribute of the CronTask class,
# which contains information about the trigger event
# that caused this task to be submitted as part of a job
print(f"This task was triggered at {self.trigger.time}")
Registering a Cron Trigger
Once a Cron task is implemented, it can be registered to be triggered on a Cron schedule. This means, every minute for which the Cron expression matches, a new job gets be submitted, consisting of a single task instance derived from the Cron registered task prototype.
from tilebox.workflows import Client
client = Client()
recurrent_tasks = client.recurrent_tasks()
cron_task = recurrent_tasks.create_recurring_cron_task(
"my-cron-task", # name of the recurring cron task
"dev-cluster", # cluster slug to submit jobs to
MyCronTask(message="World"), # the task (and its input parameters) to run repeatedly
cron_triggers=[
"12 * * * *", # run every hour at minute 12
"45 18 * * *", # run every day at 18:45
"30 13 * * 3", # run every Wednesday at 13:30
],
)
The syntax for specifying the cron triggers is a CRON expression. A helpful tool to test your cron expressions is crontab.guru.
Start a Task Runner capable of executing Cron Tasks
With the Cron task registered, a job gets submitted whenever the Cron expression matches. But for the tasks to actually run, a task runner needs to be available capable of executing the Cron task. If no task runner is available, the submitted jobs remain in a task queue. Once a eligible task runner becomes available, all jobs in the queue are picked up and executed.
from tilebox.workflows import Client
client = Client()
runner = client.runner("dev-cluster", tasks=[MyCronTask])
runner.run_forever()
Leaving this task runner running, after a while it’s output looks like this:
Hello World from a Cron Task!
This task was triggered at 2023-09-25 16:12:00
Hello World from a Cron Task!
This task was triggered at 2023-09-25 17:12:00
Hello World from a Cron Task!
This task was triggered at 2023-09-25 18:12:00
Hello World from a Cron Task!
This task was triggered at 2023-09-25 18:45:00
Hello World from a Cron Task!
This task was triggered at 2023-09-25 19:12:00
Inspecting the registered Cron Task
The Tilebox Console offers a convenient way to inspect all the recurrent tasks that are registered to run on a schedule.
Once you’ve registered a cron task, you can inspect it in the console. Here is an example of what it looks like:
You can use the console to view, edit, and delete the cron tasks that are registered.
Deleting Cron Triggers
To delete a registered cron task, use recurrent_tasks.delete
. After deletion, no new jobs get submitted by
a cron trigger. But jobs submitted in the past, which are still in the queue, remain in the queue and can still be
picked up by a task runner.
from tilebox.workflows import Client
client = Client()
recurrent_tasks = client.recurrent_tasks()
# delete the task as returned by create_recurring_cron_task
recurrent_tasks.delete(cron_task)
# or manually by id:
recurrent_tasks.delete("0190bafc-b3b8-88c4-008b-a5db044380d0")
Submitting Cron jobs manually
To test cron tasks, they can be submitted as regular tasks. This is useful for testing purposes, or if you
want to submit a cron task as part of a larger workflow. To do so, it needs to be instantiated with a specific trigger
time using the once
method.
Submitting a job with a cron task once immediately schedules the task, and a runner could pick
it up and execute it immediately. The given time specified in the once
method does not influence the time at which
the task gets executed, only the self.trigger.time
attribute of the cron task corresponds to the specified time.
from datetime import datetime, timezone
job_client = client.jobs()
# create a Cron task prototype
task = MyCronTask(message="Hello")
# submitting it directly won't work: raises ValueError:
# CronTask cannot be submitted without being triggered. Use task.once().
job_client.submit("manual-cron-job", task, cluster="dev-cluster")
# we need to specify a trigger time, so we can submit the task as a regular task
triggered_task = task.once() # same as task.once(datetime.now())
job_client.submit("manual-cron-job", triggered_task, cluster="dev-cluster")
# or we can simulate a trigger at a specific time
triggered_task = task.once(datetime(2030, 12, 12, 15, 15, tzinfo=timezone.utc))
# the task will be scheduled to run immediately, even with a future trigger time
# but self.trigger.time will be 2023-12-12T15:15:00Z for the task instance
job_client.submit("manual-cron-job", triggered_task, cluster="dev-cluster")
Was this page helpful?