Trigger actions on events
Automations provide a flexible and powerful framework for automatically taking action in response to events.
Automations enable you to configure actions that execute automatically based on trigger conditions.
Potential triggers include the occurrence of events from changes in a flow run’s state—or the absence of such events. You can define your own custom trigger to fire based on a custom event defined in Python code. With Prefect Cloud you can even create webhooks that can receive data for use in actions.
Actions include starting flow runs, pausing schedules, and sending custom notifications.
Create an automation
On the Automations page, select the + icon to create a new automation. You’ll be prompted to configure:
- A trigger condition that causes the automation to execute.
- One or more actions carried out by the automation.
- Details about the automation, such as a name and description.
Manage automations
The Automations page provides an overview of all configured automations for your workspace.
Select the toggle next to an automation to pause execution of the automation.
The button next to the toggle provides commands to copy the automation ID, edit the automation, or delete the automation.
Select the name of an automation to view Details about it and relevant Events.
Triggers
Triggers specify the conditions under which your action should be performed. The Prefect UI includes templates for many common conditions, such as:
- Flow run state change (Flow Run Tags are only evaluated with
OR
criteria) - Work pool status
- Work queue status
- Deployment status
- Metric thresholds, such as average duration, lateness, or completion percentage
- Custom event triggers
Automations API
The automations API enables further programmatic customization of trigger and action policies based on arbitrary events.
Importantly, you can configure the triggers not only in reaction to events, but also proactively: in the absence of an expected event.
For example, in the case of flow run state change triggers, you might expect production flows to finish in no longer than thirty minutes. But transient infrastructure or network issues could cause your flow to get “stuck” in a running state. A trigger could kick off an action if the flow stays in a running state for more than 30 minutes.
This action could be taken on the flow itself, such as cancelling or restarting it. Or the action could take the form of a notification for someone to take manual remediation steps. Or you could set both actions to take place when the trigger occurs.
Actions
Actions specify what your automation does when its trigger criteria are met. Current action types include:
- Cancel a flow run
- Pause or resume a schedule
- Run a deployment
- Pause or resume a deployment schedule
- Pause or resume a work pool
- Pause or resume a work queue
- Pause or resume an automation
- Send a notification
- Call a webhook
- Suspend a flow run
- Change the state of a flow run
Create automations In Python code
You can create and access any automation with the Python SDK’s Automation
class and its methods.
Selected and inferred action targets
Some actions require you to either select the target of the action, or specify that the target of the action should be inferred. Selected targets are simple and useful for when you know exactly what object your action should act on. For example, the case of a cleanup flow you want to run or a specific notification you want to send.
Inferred targets are deduced from the trigger itself.
For example, if a trigger fires on a flow run that is stuck in a running state, and the action is to cancel an inferred flow run—the flow run that caused the trigger to fire.
Similarly, if a trigger fires on a work queue event and the corresponding action is to pause an inferred work queue, the inferred work queue is the one that emitted the event.
Prefect infers the relevant event whenever possible, but sometimes one does not exist.
Specify a name and, optionally, a description for the automation.
Create an automation with deployment triggers
To enable the simple configuration of event-driven deployments, Prefect provides deployment triggers—a shorthand for creating automations that are linked to specific deployments to run them based on the presence or absence of events.
Trigger definitions for deployments are supported in prefect.yaml
, .serve
, and .deploy
. At deployment time,
specified trigger definitions create linked automations triggered by events matching your chosen
grammar. Each trigger definition may include a jinja template
to render the triggering event
as the parameters
of your deployment’s flow run.
Define triggers in prefect.yaml
You can include a list of triggers on any deployment in a prefect.yaml
file:
This deployment creates a flow run when an external.resource.pinged
event and an external.resource.replied
event have been seen from my.external.resource
:
Define triggers in .serve
and .deploy
To create deployments with triggers in Python, the trigger types DeploymentEventTrigger
,
DeploymentMetricTrigger
, DeploymentCompoundTrigger
, and DeploymentSequenceTrigger
can be imported
from prefect.events
:
As with prior examples, you must supply composite triggers with a list of underlying triggers:
Pass triggers to prefect deploy
You can pass one or more --trigger
arguments to prefect deploy
as either a JSON string or a
path to a .yaml
or .json
file.
For example, a triggers.yaml
file could have many triggers defined:
Both of the above triggers would be attached to test-deployment
after running prefect deploy
.
Triggers passed to prefect deploy
will override any triggers defined in prefect.yaml
While you can define triggers in prefect.yaml
for a given deployment, triggers passed to prefect deploy
take precedence over those defined in prefect.yaml
.
Note that deployment triggers contribute to the total number of automations in your workspace.
Sending notifications with automations
Automations support sending notifications through any predefined block that is capable of and configured to send a message, including:
- Slack message to a channel
- Microsoft Teams message to a channel
- Email to an email address
Templating with Jinja
You can access templated variables with automation actions through Jinja syntax. Templated variables enable you to dynamically include details from an automation trigger, such as a flow or pool name.
Jinja templated variable syntax wraps the variable name in double curly brackets, like this: {{ variable }}
.
You can access properties of the underlying flow run objects including:
In addition to its native properties, each object includes an id
along with created
and updated
timestamps.
The flow_run|ui_url
token returns the URL to view the flow run in the UI.
Here’s an example relevant to a flow run state-based notification:
The resulting Slack webhook notification looks something like this:
You could include flow
and deployment
properties:
An automation that reports on work pool status might include notifications using work_pool
properties:
In addition to those shortcuts for flows, deployments, and work pools, you have access to the automation and the event that triggered the automation. See the Automations API for additional details.
Note that this example also illustrates the ability to use Jinja features such as iterator and for loop control structures when templating notifications.
API example
This example grabs data from an API and sends a notification based on the end state.
Create the example script
Start by pulling hypothetical user data from an endpoint and then performing data cleaning and transformations.
First create a simple extract method that pulls the data from a random user data generator endpoint:
The data cleaning workflow has visibility into each step, and sends a list of names to the next step of the pipeline.
Create a notification block in the UI
Next, send a notification based off a completed state outcome. Configure a notification that shows when to look into your workflow logic.
-
Prior to creating the automation, confirm the notification location. Create a notification block to help define where the notification is sent.
-
Navigate to the blocks page on the UI, and click into creating an email notification block.
-
Go to the automations page to create your first automation.
-
Next, find the trigger type. In this case, use a flow completion.
-
Create the actions for when the trigger is hit. In this case, create a notification to showcase the completion.
-
Now the automation is ready to be triggered from a flow run completion. Run the file locally and see that the notification is sent to your inbox after the completion. It may take a few minutes for the notification to arrive.
No deployment created
You do not need to create a deployment to trigger your automation. In the case above, the flow run state trigger fired in response to a flow run that was executed locally.
Now that you’ve seen how to create an email notification from a flow run completion, see how to kick off a deployment run in response to an event.
Event-based deployment automation
Create an automation to kick off a deployment instead of a notification. Explore how to programmatically create this automation with Prefect’s REST API.
See the REST API documentation as a reference for interacting with the automation endpoints.
Create a deployment to kick off some work based on how long a flow is running. For example, if the build_names
flow
takes too long to execute, you can kick off a deployment with the same build_names
flow, but replace the count
value
with a lower number to speed up completion.
Create a deployment with a prefect.yaml
file or a Python file that uses flow.deploy
.
Create a prefect.yaml
file like this one for our flow build_names
:
Grab your deployment_id
from this deployment with the CLI and embed it in your automation.
Find deployment_id from the CLI
Run prefect deployment ls
in an authenticated command prompt.
Create an automation with a POST call to programmatically create the automation. Ensure you have your api_key
, account_id
, and workspace_id
.
After running this function, you will see the changes that came from the post request within the UI. Keep in mind the context is “custom” on UI.
Run the underlying flow and see the deployment kick off after 30 seconds. This results in a new flow run of build_names
.
You can see this new deployment get initiated with the custom parameters outlined above.
In a few quick changes, you can programmatically create an automation that deploys workflows with custom parameters.
Use an underlying .yaml file
You can take this a step further by using your own .yaml version of the automation, and registering that file with the UI. This simplifies the requirements of the automation by declaring it in its own .yaml file, and then registering that .yaml with the API.
First start with creating the .yaml file to house the automation requirements:
Make a helper function that applies this YAML file with the REST API function.
Find a complete repo with these APIs examples in this GitHub repository.
In this example, you created the automation by registering the .yaml file with a helper function.
Kick off an automation with a custom webhook
Use webhooks to expose the events API. This allows you to extend the functionality of deployments and respond to changes in your workflow.
By exposing a webhook endpoint, you can kick off workflows that trigger deployments, all from an event created from an HTTP request.
Create this webhook in the UI to create these dynamic events.
From this input, you can create an exposed webhook endpoint.
Each webhook corresponds to a custom event created where you can react to it downstream with a separate deployment or automation.
For example, you can create a curl request that sends the endpoint information such as a run count for your deployment:
From here, you can make a webhook that is connected to pulling in parameters on the curl command. It kicks off a deployment that uses these pulled parameters:
Go into the event feed to automate straight from this event:
This allows you to create automations that respond to these webhook events. From a few clicks in the UI, you can associate an external process with the Prefect events API that can trigger downstream deployments.
Examples
Trigger a downstream deployment with an event
This example shows how to use a trigger to schedule a downstream deployment when an upstream deployment completes.
First, start the serve
process to listen for scheduled deployments runs:
Now, run the upstream deployment and see the downstream deployment kick off after it completes:
Check the event feed
You can inspect raw events in the event feed in the UI to see what related resources are available to match against.
For example, the following prefect.flow-run.Completed
event’s related resources include:
Trigger a deployment when a customer completes an order
Imagine you are running an e-commerce platform and you want to trigger a deployment when a customer completes an order.
There might be a number of events that occur during an order on your platform, for example:
order.created
order.item.added
order.payment-method.confirmed
order.shipping-method.added
order.complete
Event grammar
The above choices of event names are arbitrary. With Prefect events, you’re free to select any event grammar that best represents your use case.
In this case, we want to trigger a deployment when a user completes an order, so our trigger should:
expect
anorder.complete
eventafter
anorder.created
event- evaluate these conditions
for_each
user id
Finally, it should pass the user_id
as a parameter to the deployment. Here’s how this looks in code:
Specify multiple events or resources
The expect
and after
fields accept a set
of event names, so you can specify multiple events for each condition.
Similarly, the for_each
field accepts a set
of resource ids.
To simulate users causing order status events, run the following in a Python shell or script:
In the above example:
user_id_1
creates and then completes an order, triggering a run of our deployment.user_id_2
creates an order, but no completed event is emitted so no deployment is triggered.
Detect and respond to zombie flows
If the infrastructure a flow is running on suddenly fails (for example, the machine crashes or a container is evicted), Prefect’s orchestration engine will be unable to report state changes and the flow run will get stuck in the running state.
Fortunately, flow runs triggered via deployment can emit heartbeats as they are running, and a Prefect automation can update a flow run’s state to crashed if the server stops receiving heartbeats for that flow run.
Enable flow run heartbeat events
You will need to ensure you’re running Prefect version 3.1.8 or greater and set PREFECT_RUNNER_HEARTBEAT_FREQUENCY
to an integer greater than 30 to emit flow run heartbeat events.
To create an automation that marks zombie flow runs as crashed, run this script:
The trigger definition says after each heartbeat event for a flow run we expect to see another heartbeat event or a terminal state event for that same flow run within 90 seconds of a heartbeat event.
If PREFECT_RUNNER_HEARTBEAT_FREQUENCY
is set to 30
, the automation will trigger only after 3 heartbeats have been missed.
You can adjust within
in the trigger definition and PREFECT_RUNNER_HEARTBEAT_FREQUENCY
to change how quickly the automation
will fire after the server stops receiving flow run heartbeats.
You can also add additional actions to your automation to send a notification when zombie runs are detected.
See also
- To learn more about Prefect events, which can trigger automations, see the events docs.
- See the webhooks guide to learn how to create webhooks and receive external events.