This guide is intended to walk through how to re-execute Dagster jobs and where the subsequent executions are found within Dagit.
If ops fail or upstream data has changed within a job execution, the job may need to be re-run starting from a particular point. Dagster calls this process re-execution
.
Imagine a machine learning job with three ops. The first op, training the model, is the most time and resource intensive. Then, we test the model, and build analyses on the results. Suppose that the job fails with the op that is testing the model. After fixing the root cause, we want to re-run our job. However, it would take much more time to create a new run of the job as we would have to repeat the first op. It would be more economical to start again from the second op, reusing the previous run's execution result for the first op.
With Dagster, the re-execution of parts of the job is grouped with the original run to make it easy to trace. The original job execution metadata is not overwritten, making re-execution a non-destructive operation.
Consider the following job which has three ops, one of which fails half of the time.
from dagster import in_process_executor, job, op
@op
def start():
return 1
@op
def unreliable(num: int) -> int:
failure_rate = 0.5
if some_random_result() < failure_rate:
raise Exception("blah")
return num
@op
def end(_num: int):
pass
@job(executor_def=in_process_executor)
def unreliable_job():
end(unreliable(start()))
Although very simple, there are inputs and outputs passed between ops. With an IO manager, re-execution is able to handle inputs and outputs stored from the initial run.
To initiate a re-execution from an existing run, navigate to the run in Dagit and you can find the re-execution option on the top right of the interface.
Under the re-execution drop down, you will see multiple options. No matter which one you choose, the re-executed job is linked to the original run.
In the above example, re-executing from failure would make sense as the failed task has a 50% chance of succeeding on the next run.
If the run succeeded but the underlying code changed, running specific ops to test the differences would be more relevant.
Within Dagit, a single or multiple ops may be selected simply by clicking them with the mouse. Alternatively, you can use the subset selector and specify your desired op names to re-run.
Re-execution can be triggered via the API as well.
Name | Description |
---|---|
execute_job | Execute a dagster job synchronously from python. |
ReexecutionOptions | Configuration to provide to execute_job allowing for reexecution from a previous dagster run. |
Again, let's revist the job unreliable_job
, which has a op named unreliable
.
from dagster import DagsterInstance, ReexecutionOptions, execute_job, reconstructable
from docs_snippets.guides.dagster.reexecution.unreliable_job import unreliable_job
instance = DagsterInstance.ephemeral()
# Initial execution
initial_result = execute_job(reconstructable(unreliable_job), instance=instance)
if not initial_result.success:
options = ReexecutionOptions.from_failure(initial_result.run_id, instance)
# re-execute the entire job
from_failure_result = execute_job(
reconstructable(unreliable_job), instance=instance, reexecution_options=options
)
Using Dagster's API, you can programmatically trigger both an execution and a reexecution. Upon an initial run failing, you may want to re-trigger a run from the point of failure, as shown above. Similarly, you can trigger a re-execution of selected ops or from a particular point.
# re-execute the job, but only the "unreliable" op and all its descendents
options = ReexecutionOptions(
parent_run_id=initial_result.run_id, step_selection=["unreliable*"]
)
result = execute_job(
reconstructable(unreliable_job),
instance=instance,
reexecution_options=options,
)
The step_selection
input is configurable, with syntax further documented in the API docs for ReexecutionOptions
.