Migrating from Spark Step Launchers to Dagster Pipes
In this guide, we’ll show you how to migrate from using step launchers to using Dagster Pipes in Dagster.
While step launchers were intended to support various runtime environments, in practice, they have only been implemented for Spark. Therefore, we will focus on Spark-related examples.
Considerations
When deciding to migrate from step launchers to Dagster Pipes, consider the following:
- Step launchers are superceded by Dagster Pipes. While they are still available (and there are no plans for their removal), they are no longer the recommended method for launching external code from Dagster ops and assets. They won't be receiving new features or be under active development.
- Dagster Pipes is a more lightweight and flexible framework, but it does come with a few drawbacks:
- Spark runtime and the code executed will no longer be managed by Dagster for you.
- Dagster Pipes are not compatible with Resources and IO Managers. If you are heavily relying on these features, you might want to keep using step launchers.
Steps
To migrate from step launchers to Dagster Pipes, you will have to perform the following steps.
1. Implement new CI/CD pipelines to prepare your Spark runtime environment
Alternatively, this can be done from Dagster jobs, but either way, you will need to manage the Spark runtime yourself.
When running PySpark jobs, the following changes to Python dependencies should be considered:
- drop dagster
- add dagster-pipes
You can learn more about packaging Python dependencies for PySpark in PySpark documentation or in the AWS EMR Pipes guide.
The process of packaging the Python dependencies and scripts should be automated with a CI/CD pipeline and run before deploying the Dagster code location.
It's also possible to run Java or Scala Spark jobs with Dagster Pipes, but currently there is no official Pipes implementation for these languages. Therefore, forwarding Dagster events from these jobs is not yet supported officially (although it can be done with some custom code).
2. Update your Dagster code
The goal is to keep the same observability and orchestration features while moving compute to an external script. Suppose you have existing code using step launchers similar to this:
from typing import Any
import dagster as dg
from dagster_aws.emr import emr_pyspark_step_launcher
from pyspark.sql import DataFrame
from my_lib import MyPysparkIOManager, calculate_metric, get_data_frame
# the upstream asset will serve as an example of writing a Spark DataFrame
@dg.asset(io_manager_key="pyspark_io_manager")
def upstream(pyspark_step_launcher: dg.ResourceParam[Any]) -> DataFrame:
    return get_data_frame()
# the downstream asset will serve as an example of reading a Spark DataFrame
# and logging metadata to Dagster
@dg.asset(io_manager_key="pyspark_io_manager")
def downstream(
    context: dg.AssetExecutionContext,
    upstream: DataFrame,
    pyspark_step_launcher: dg.ResourceParam[Any],
) -> None:
    my_metric = calculate_metric(upstream)
    context.add_output_metadata({"my_metric": my_metric})
    return
definitions = dg.Definitions(
    assets=[upstream, downstream],
    resources={
        "pyspark_step_launcher": emr_pyspark_step_launcher,
        "pyspark_io_manager": MyPysparkIOManager(),
    },
)
The corresponding Pipes code will instead have two components: the Dagster asset definition, and the external PySpark job.
Let's start with the PySpark job. The upstream asset will invoke the following script:
import boto3
from dagster_pipes import PipesS3MessageWriter, open_dagster_pipes
from my_lib import get_data_frame
def main():
    with open_dagster_pipes(
        message_writer=PipesS3MessageWriter(client=boto3.client("s3")),
    ) as pipes:
        df = get_data_frame()
        # change according to your needs
        path = "s3://" + "<my-bucket>/" + pipes.asset_key + "/.parquet"
        # this was probably previously logged by the IOManager
        pipes.add_output_metadata({"path": path})
        df.write.parquet(path)
Now, we have to run this script from Dagster. First, let's factor the boilerplate EMR config into a reusable function:
def make_emr_params(script_path: str) -> dict:
    return {
        # very rough configuration, please adjust to your needs
        "Name": "MyJobFlow",
        "Applications": [{"Name": "Hadoop"}, {"Name": "Spark"}],
        "LogUri": "s3://your-bucket/emr/logs",
        "Steps": [
            {
                "Name": "MyStep",
                "ActionOnFailure": "CONTINUE",
                "HadoopJarStep": {
                    "Jar": "command-runner.jar",
                    "Args": [
                        "spark-submit",
                        "--deploy-mode",
                        "cluster",
                        "--master",
                        "yarn",
                        "--files",
                        "s3://your-bucket/venv.pex",
                        "--conf",
                        "spark.pyspark.python=./venv.pex",
                        "--conf",
                        "spark.yarn.submit.waitAppCompletion=true",
                        script_path,
                    ],
                },
            },
        ],
    }
Now, the asset body will be as follows:
import boto3
import dagster as dg
from dagster_aws.pipes import PipesEMRClient
from my_lib import make_emr_params
@dg.asset(io_manager_key="s3_io_manager")
def upstream(
    context: dg.AssetExecutionContext, pipes_emr_client: PipesEMRClient
) -> str:
    result = pipes_emr_client.run(
        context=context,
        run_job_flow_params=make_emr_params(
            "s3://your-bucket/upstream_asset_script.py"
        ),
    ).get_materialize_result()
    return result.metadata["path"].value
Since the asset now returns the Parquet file path, it will be saved by the IOManager, and the downstream asset will be able to access it.
Let's continue to migrating the second downstream asset.
Since we can't use IO Managers in scripts launched by Pipes, we would have to either make a CLI argument parser or use the handy extras feature provided by Pipes in order to pass this "path" value to the job. We will demonstrate the latter approach. The downstream asset turns into:
@dg.asset
def downstream(
    context: dg.AssetExecutionContext, upstream: str, pipes_emr_client: PipesEMRClient
):
    return pipes_emr_client.run(
        context=context,
        run_job_flow_params=make_emr_params(
            "s3://your-bucket/downstream_asset_script.py"
        ),
        extras={"path": upstream},
    ).get_materialize_result()
Now, let's access the path value in the PySpark job:
import boto3
import pyspark
from dagster_pipes import PipesS3MessageWriter, open_dagster_pipes
from my_lib import calculate_metric
def main():
    with open_dagster_pipes(
        message_writer=PipesS3MessageWriter(client=boto3.client("s3")),
    ) as pipes:
        spark = pyspark.sql.SparkSession.builder.getOrCreate()
        upstream_path = pipes.get_extra("path")
        df = spark.read.parquet(upstream_path)
        my_metric = calculate_metric(df)
        pipes.report_asset_materialization(metadata={"my_metric": my_metric})
Finally, provide the required resources to Definitions:
from dagster_aws.pipes import PipesS3MessageReader
from dagster_aws.s3 import S3PickleIOManager, S3Resource
definitions = dg.Definitions(
    assets=[upstream, downstream],
    resources={
        "pipes_emr_client": PipesEMRClient(
            client=boto3.client("emr"),
            message_reader=PipesS3MessageReader(
                client=boto3.client("s3"),
                bucket="your-bucket",
            ),
        ),
        "s3_io_manager": S3PickleIOManager(
            s3_resource=S3Resource(),
            s3_bucket="your-bucket",
            s3_prefix="your-prefix",
        ),
    },
)
Conclusion
In this guide, we have demonstrated how to migrate from using step launchers to using Dagster Pipes. We have shown how to launch PySpark jobs on AWS EMR using PipesEMRClient and how to pass small pieces of data between assets using Dagster's metadata and Pipes extras.
Supplementary
- Dagster Pipes
- GitHub discussion on the topic
- Dagster + Spark - an up to date list of Pipes Clients for various Spark providers can be found here
- AWS EMR Pipes tutorial
- PipesEMRClient API docs
Heads up! As an alternative to storing paths with an IOManager, the following utility function can be used to retrieve logged metadata values from upstream assets:
def get_latest_output_metadata_value(
    context: dg.AssetExecutionContext, asset_key: dg.AssetKey, metadata_key: str
):
    # see https://github.com/dagster-io/dagster/issues/8521 for more details about accessing upstream metadata from downstream assets and ops
    event_log_entry = (
        context.get_step_execution_context().instance.get_latest_materialization_event(
            asset_key
        )
    )
    metadata = (
        event_log_entry.dagster_event.event_specific_data.materialization.metadata
    )
    return metadata[metadata_key].value