Op hooks
If you are just getting started with Dagster, we strongly recommend you use assets rather than ops to build your data pipelines. The ops documentation is for Dagster users who need to manage existing ops, or who have complex use cases.
Op hooks let you define success and failure handling policies on ops.
Relevant APIs
| Name | Description | 
|---|---|
| @dg.failure_hook | The decorator to define a callback on op failure. | 
| @dg.success_hook | The decorator to define a callback on op success. | 
| HookContext | The context object available to a hook function. | 
| build_hook_context | A function for building a HookContextoutside of execution, intended to be used when testing a hook. | 
Overview
A @dg.success_hook or @dg.failure_hook decorated function is called an op hook. Op hooks are designed for generic purposes — it can be anything you would like to do at a per op level.
Defining an op hook
import dagster as dg
@dg.success_hook(required_resource_keys={"slack"})
def slack_message_on_success(context: dg.HookContext):
    message = f"Op {context.op.name} finished successfully"
    context.resources.slack.chat_postMessage(channel="#foo", text=message)
@dg.failure_hook(required_resource_keys={"slack"})
def slack_message_on_failure(context: dg.HookContext):
    message = f"Op {context.op.name} failed"
    context.resources.slack.chat_postMessage(channel="#foo", text=message)
Hook context
As you may have noticed, the hook function takes one argument, which is an instance of HookContext. The available properties on this context are:
- context.job_name: the name of the job where the hook is triggered.
- context.log: loggers
- context.hook_def: the hook that the context object belongs to.
- context.op: the op associated with the hook.
- context.op_config: The config specific to the associated op.
- context.op_exception: The thrown exception in the associated failed op.
- context.op_output_values: The computed output values of the associated op.
- context.step_key: the key for the step where the hook is triggered.
- context.resources: the resources the hook can use.
- context.required_resource_keys: the resources required by this hook.
Using hooks
Dagster provides different ways to trigger op hooks.
Applying a hook on every op in a job
For example, you want to send a slack message to a channel when any op fails in a job. In this case, we will be applying a hook on a job, which will apply the hook on every op instance within in that job.
The @dg.job decorator accepts hooks as a parameter. Likewise, when creating a job from a graph, hooks are also accepted as a parameter in the GraphDefinition.to_job function. In the below example, we can pass the slack_message_on_failure hook above in a set as a parameter to @dg.job. Then, slack messages will be sent when any op in the job fails.
@dg.job(resource_defs={"slack": slack_resource}, hooks={slack_message_on_failure})
def notif_all():
    # the hook "slack_message_on_failure" is applied on every dg.op instance within this dg.graph
    a()
    b()
When you run this job, you can provide configuration to the slack resource in the run config:
resources:
  slack:
    config:
      token: "xoxp-1234123412341234-12341234-1234" # replace with your slack token
or by using the configured API:
@dg.job(
    resource_defs={
        "slack": slack_resource.configured(
            {"token": "xoxp-1234123412341234-12341234-1234"}
        )
    },
    hooks={slack_message_on_failure},
)
def notif_all_configured():
    # the hook "slack_message_on_failure" is applied on every dg.op instance within this dg.graph
    a()
    b()
Applying a hook on an op
Sometimes a job is a shared responsibility or you only want to be alerted on high-priority op executions. So we also provide a way to set up hooks on op instances which enables you to apply policies on a per-op basis.
@dg.job(resource_defs={"slack": slack_resource})
def selective_notif():
    # only dg.op "a" triggers hooks: a slack message will be sent when it fails or succeeds
    a.with_hooks({slack_message_on_failure, slack_message_on_success})()
    # dg.op "b" won't trigger any hooks
    b()
In this case, op "b" won't trigger any hooks, while when op "a" fails or succeeds it will send a slack message.
Testing hooks
You can test the functionality of a hook by invoking the hook definition. This will run the underlying decorated function. You can construct a context to provide to the invocation using the build_hook_context function.
from dagster import build_hook_context
@dg.success_hook(required_resource_keys={"my_conn"})
def my_success_hook(context):
    context.resources.my_conn.send("foo")
def test_my_success_hook():
    my_conn = mock.MagicMock()
    # construct dg.HookContext with mocked ``my_conn`` resource.
    context = build_hook_context(resources={"my_conn": my_conn})
    my_success_hook(context)
    assert my_conn.send.call_count == 1
Examples
Accessing failure information in a failure hook
In many cases, you might want to know details about an op failure. You can get the exception object thrown in the failed op via the op_exception property on HookContext:
from dagster import HookContext, failure_hook
import traceback
@failure_hook
def my_failure_hook(context: HookContext):
    op_exception: BaseException = context.op_exception
    # print stack trace of exception
    traceback.print_tb(op_exception.__traceback__)
Patterns
Environment-specific hooks using jobs
Hooks use resource keys to access resources. After including the resource key in its set of required_resource_keys, the body of the hook can access the corresponding resource via the resources attribute of its context object.
It also enables you to switch resource values in different jobs so that, for example, you can send slack messages only while executing a production job and mock the slack resource while testing.
Because executing a production job and a testing job share the same core of business logic, we can build these jobs from a shared graph. In the GraphDefinition.to_job method, which builds a job from a graph, you can specify environment-specific hooks and resources.
In this case, we can mock the slack_resource using a helper function ResourceDefinition.hardcoded_resource(), so it won't send slack messages during development.
@dg.graph
def slack_notif_all():
    a()
    b()
notif_all_dev = slack_notif_all.to_job(
    name="notif_all_dev",
    resource_defs={
        "slack": dg.ResourceDefinition.hardcoded_resource(
            slack_resource_mock, "do not send messages in dev"
        )
    },
    hooks={slack_message_on_failure},
)
notif_all_prod = slack_notif_all.to_job(
    name="notif_all_prod",
    resource_defs={"slack": slack_resource},
    hooks={slack_message_on_failure},
)
When we switch to production, we can provide the real slack token in the run_config and therefore enable sending messages to a certain slack channel when a hook is triggered.
resources:
  slack:
    config:
      token: "xoxp-1234123412341234-12341234-1234" # replace with your slack token
Then, we can execute a job with the config through Python API, CLI, or the Dagster UI. Here's an example of using the Python API.
Loading...
Job-level hooks
When you add a hook to a job, the hook will be added to every op in the job individually. The hook does not track job-scoped events and only tracks op-level success or failure events.
You may find the need to set up job-level policies. For example, you may want to run some code for every job failure.
Dagster provides a way to create a sensor that reacts to job failure events. You can find detail in the Sensors docs.