Skip to main content

Build pipelines with Python scripts

Dagster provides a PythonScriptComponent that you can use to execute Python scripts as assets in your Dagster project. This component runs your Python scripts in a subprocess using Dagster Pipes, allowing you to leverage existing Python scripts while benefiting from Dagster's orchestration and observability features. This guide will walk you through how to use the PythonScriptComponent to execute your Python scripts.

1. Prepare a Dagster project

To begin, you'll need a Dagster project. You can use an existing components-ready project or create a new one:

uvx create-dagster project my-project && cd my-project/src

Activate the project virtual environment:

source ../.venv/bin/activate

2. Scaffold a Python script component definition

Now that you have a Dagster project, you can scaffold a Python script component definition. In this example, we'll create a component definition called generate_revenue_report that will execute a Python script to process sales data and generate a revenue report.

dg scaffold defs dagster.PythonScriptComponent generate_revenue_report
Creating defs at /.../my-project/src/my_project/defs/generate_revenue_report.

The scaffold call will generate a defs.yaml file:

tree my_project/defs
my_project/defs
├── __init__.py
└── generate_revenue_report
└── defs.yaml

2 directories, 2 files

3. Create a Python script (if needed)

Next, you will need to create a Python script that will be executed by the component if you do not already have an existing Python script you've already written. Dagster will orchestrate it without requiring changes to your code. For this example, we'll create a simple data processing script:

my-project/src/my_project/defs/generate_revenue_report/process_sales_data.py
import pandas as pd

# Sample sales data (in a real scenario, this might come from a database or file)
sales_data = {
"date": ["2024-01-01", "2024-01-02", "2024-01-03"],
"product": ["A", "B", "A"],
"quantity": [10, 5, 8],
"price": [100.0, 200.0, 100.0],
}

df = pd.DataFrame(sales_data)
df["revenue"] = df["quantity"] * df["price"]

# Calculate total revenue
total_revenue = df["revenue"].sum()

print(f"Generated revenue report with total revenue: ${total_revenue}")
print(f"Number of transactions: {len(df)}")
print(f"Average transaction: ${df['revenue'].mean():.2f}")

This script will be executed by Dagster in a subprocess. Any output printed to stdout/stderr will be captured and displayed in the Dagster UI logs.

4. Configure your component

Update your defs.yaml file to specify the Python script and define the assets that will be created. You can also specify properties for the asset in Dagster, such as a group name and description:

my-project/src/my_project/defs/generate_revenue_report/defs.yaml
type: dagster.PythonScriptComponent

attributes:
execution:
path: process_sales_data.py
assets:
- key: sales_revenue_report
description: "Daily sales revenue report generated from transaction data"
group_name: "analytics"
kinds: ["python", "report"]

You can run dg list defs to see the asset corresponding to your component:

dg list defs
┏━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Section ┃ Definitions ┃
┡━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Assets │ ┏━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ │
│ │ ┃ Key ┃ Group ┃ Deps ┃ Kinds ┃ Description ┃ │
│ │ ┡━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩ │
│ │ │ sales_revenue_report │ analytics │ │ python │ Daily sales revenue report generated from │ │
│ │ │ │ │ │ report │ transaction data │ │
│ │ └──────────────────────┴───────────┴──────┴────────┴─────────────────────────────────────────────────────┘ │
└─────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

5. Launch your assets

Once your component is configured, you can launch your assets to execute the Python scripts:

dg dev

Navigate to the Dagster UI and you'll see your assets. To execute your Python script, click on the asset, then click Materialize. The script will run in a subprocess, and you'll be able to see the logs and metadata in the Dagster UI.

Advanced configuration

Log metadata inside Python script

For more advanced use cases, you can use Dagster Pipes to pass metadata from your Python script back to Dagster. This allows you to provide rich information about your assets directly in the Dagster UI:

my-project/src/my_project/defs/generate_revenue_report/process_sales_data.py
import pandas as pd
from dagster_pipes import open_dagster_pipes

# Sample sales data (in a real scenario, this might come from a database or file)
sales_data = {
"date": ["2024-01-01", "2024-01-02", "2024-01-03"],
"product": ["A", "B", "A"],
"quantity": [10, 5, 8],
"price": [100.0, 200.0, 100.0],
}

with open_dagster_pipes() as context:
df = pd.DataFrame(sales_data)
df["revenue"] = df["quantity"] * df["price"]

# Calculate total revenue
total_revenue = df["revenue"].sum()

# Log the result to Dagster
context.log.info(f"Generated revenue report with total revenue: ${total_revenue}")
context.log.info(f"Processed {len(df)} transactions")

# Report asset materialization with rich metadata
context.report_asset_materialization(
metadata={
"total_revenue": total_revenue,
"num_transactions": len(df),
"average_transaction": df["revenue"].mean(),
"top_product": df.loc[df["revenue"].idxmax(), "product"],
}
)

With Dagster Pipes, you can:

  • Log structured information: Use context.log.info() to send logs directly to Dagster.
  • Report asset metadata: Use context.report_asset_materialization() to attach rich metadata that appears in the Dagster UI.
  • Handle errors: Exception information is automatically captured and reported to Dagster.

Orchestrate multiple Python scripts

You can define multiple Python script component instances in a single defs.yaml file using the --- separator syntax. This allows you to run different scripts for different assets:

my-project/src/my_project/defs/generate_revenue_report/defs.yaml
type: dagster.PythonScriptComponent

attributes:
execution:
path: process_sales_data.py
assets:
- key: sales_revenue_report
description: "Daily sales revenue report"
group_name: "analytics"
---
type: dagster.PythonScriptComponent

attributes:
execution:
path: process_customer_data.py
assets:
- key: customer_summary_stats
description: "Summary statistics for customer data"
group_name: "analytics"

Each component instance runs independently and can execute different Python scripts. This approach is useful when you have multiple related data processing tasks that should be organized together, but run separately.

Set up dependencies

You can specify dependencies between assets from different scripts. Using the multiple scripts example above, you can make one script depend on another:

my-project/src/my_project/defs/generate_revenue_report/defs.yaml
type: dagster.PythonScriptComponent

attributes:
execution:
path: process_sales_data.py
assets:
- key: sales_revenue_report
description: "Daily sales revenue report"
group_name: "analytics"
---
type: dagster.PythonScriptComponent

attributes:
execution:
path: process_customer_data.py
assets:
- key: customer_summary_stats
description: "Summary statistics for customer data"
group_name: "analytics"
deps: [sales_revenue_report]

Automate Python scripts

You can configure when assets should be automatically materialized using declarative automation conditions:

my-project/src/my_project/defs/generate_revenue_report/defs.yaml
type: dagster.PythonScriptComponent

attributes:
execution:
path: process_sales_data.py
assets:
- key: sales_revenue_report
description: "Daily sales revenue report"
group_name: "analytics"
automation_condition: "{{ automation_condition.on_cron('@daily') }}"

Creating scripts in subdirectories

You can organize your scripts in subdirectories within your component:

my-project/src/my_project/defs/generate_revenue_report/
├── defs.yaml
├── scripts/
│ ├── process_sales_data.py
│ └── generate_reports.py
└── utils/
└── data_helpers.py

Reference scripts in subdirectories in your defs.yaml:

my-project/src/my_project/defs/generate_revenue_report/defs.yaml
type: dagster.PythonScriptComponent

attributes:
execution:
path: scripts/process_sales_data.py
assets:
- key: sales_revenue_report

Best practices

  • Start simple: Begin with standard Python scripts that print output for basic orchestration needs.
  • Log structured metadata and information with Dagster Pipes: Use print statements for simple cases, or leverage context.log.info() with Pipes for structured logging and use open_dagster_pipes() context manager to leverage full Pipes support, such as streaming structured asset materialization events back to Dagster.
  • Keep scripts focused: Each script should have a clear, single responsibility, and offload complex dependencies to Dagster to benefit from native observability like lineage tracking and asset metadata.