Build pipelines with Python scripts
Dagster provides a PythonScriptComponent that you can use to execute Python scripts as assets in your Dagster project. This component runs your Python scripts in a subprocess using Dagster Pipes, allowing you to leverage existing Python scripts while benefiting from Dagster's orchestration and observability features. This guide will walk you through how to use the PythonScriptComponent to execute your Python scripts.
1. Prepare a Dagster project
To begin, you'll need a Dagster project. You can use an existing components-ready project or create a new one:
uvx create-dagster project my-project && cd my-project/src
Activate the project virtual environment:
source ../.venv/bin/activate
2. Scaffold a Python script component definition
Now that you have a Dagster project, you can scaffold a Python script component definition. In this example, we'll create a component definition called generate_revenue_report that will execute a Python script to process sales data and generate a revenue report.
dg scaffold defs dagster.PythonScriptComponent generate_revenue_report
Creating defs at /.../my-project/src/my_project/defs/generate_revenue_report.
The scaffold call will generate a defs.yaml file:
tree my_project/defs
my_project/defs
├── __init__.py
└── generate_revenue_report
└── defs.yaml
2 directories, 2 files
3. Create a Python script (if needed)
Next, you will need to create a Python script that will be executed by the component if you do not already have an existing Python script you've already written. Dagster will orchestrate it without requiring changes to your code. For this example, we'll create a simple data processing script:
import pandas as pd
# Sample sales data (in a real scenario, this might come from a database or file)
sales_data = {
"date": ["2024-01-01", "2024-01-02", "2024-01-03"],
"product": ["A", "B", "A"],
"quantity": [10, 5, 8],
"price": [100.0, 200.0, 100.0],
}
df = pd.DataFrame(sales_data)
df["revenue"] = df["quantity"] * df["price"]
# Calculate total revenue
total_revenue = df["revenue"].sum()
print(f"Generated revenue report with total revenue: ${total_revenue}")
print(f"Number of transactions: {len(df)}")
print(f"Average transaction: ${df['revenue'].mean():.2f}")
This script will be executed by Dagster in a subprocess. Any output printed to stdout/stderr will be captured and displayed in the Dagster UI logs.
4. Configure your component
Update your defs.yaml file to specify the Python script and define the assets that will be created. You can also specify properties for the asset in Dagster, such as a group name and description:
type: dagster.PythonScriptComponent
attributes:
execution:
path: process_sales_data.py
assets:
- key: sales_revenue_report
description: "Daily sales revenue report generated from transaction data"
group_name: "analytics"
kinds: ["python", "report"]
You can run dg list defs to see the asset corresponding to your component:
dg list defs
┏━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Section ┃ Definitions ┃
┡━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Assets │ ┏━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ │
│ │ ┃ Key ┃ Group ┃ Deps ┃ Kinds ┃ Description ┃ │
│ │ ┡━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩ │
│ │ │ sales_revenue_report │ analytics │ │ python │ Daily sales revenue report generated from │ │
│ │ │ │ │ │ report │ transaction data │ │
│ │ └──────────────────────┴───────────┴──────┴────────┴─────────────────────────────────────────────────────┘ │
└─────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
5. Launch your assets
Once your component is configured, you can launch your assets to execute the Python scripts:
dg dev
Navigate to the Dagster UI and you'll see your assets. To execute your Python script, click on the asset, then click Materialize. The script will run in a subprocess, and you'll be able to see the logs and metadata in the Dagster UI.
Advanced configuration
Log metadata inside Python script
For more advanced use cases, you can use Dagster Pipes to pass metadata from your Python script back to Dagster. This allows you to provide rich information about your assets directly in the Dagster UI:
import pandas as pd
from dagster_pipes import open_dagster_pipes
# Sample sales data (in a real scenario, this might come from a database or file)
sales_data = {
"date": ["2024-01-01", "2024-01-02", "2024-01-03"],
"product": ["A", "B", "A"],
"quantity": [10, 5, 8],
"price": [100.0, 200.0, 100.0],
}
with open_dagster_pipes() as context:
df = pd.DataFrame(sales_data)
df["revenue"] = df["quantity"] * df["price"]
# Calculate total revenue
total_revenue = df["revenue"].sum()
# Log the result to Dagster
context.log.info(f"Generated revenue report with total revenue: ${total_revenue}")
context.log.info(f"Processed {len(df)} transactions")
# Report asset materialization with rich metadata
context.report_asset_materialization(
metadata={
"total_revenue": total_revenue,
"num_transactions": len(df),
"average_transaction": df["revenue"].mean(),
"top_product": df.loc[df["revenue"].idxmax(), "product"],
}
)
With Dagster Pipes, you can:
- Log structured information: Use
context.log.info()to send logs directly to Dagster. - Report asset metadata: Use
context.report_asset_materialization()to attach rich metadata that appears in the Dagster UI. - Handle errors: Exception information is automatically captured and reported to Dagster.
Orchestrate multiple Python scripts
You can define multiple Python script component instances in a single defs.yaml file using the --- separator syntax. This allows you to run different scripts for different assets:
type: dagster.PythonScriptComponent
attributes:
execution:
path: process_sales_data.py
assets:
- key: sales_revenue_report
description: "Daily sales revenue report"
group_name: "analytics"
---
type: dagster.PythonScriptComponent
attributes:
execution:
path: process_customer_data.py
assets:
- key: customer_summary_stats
description: "Summary statistics for customer data"
group_name: "analytics"
Each component instance runs independently and can execute different Python scripts. This approach is useful when you have multiple related data processing tasks that should be organized together, but run separately.
Set up dependencies
You can specify dependencies between assets from different scripts. Using the multiple scripts example above, you can make one script depend on another:
type: dagster.PythonScriptComponent
attributes:
execution:
path: process_sales_data.py
assets:
- key: sales_revenue_report
description: "Daily sales revenue report"
group_name: "analytics"
---
type: dagster.PythonScriptComponent
attributes:
execution:
path: process_customer_data.py
assets:
- key: customer_summary_stats
description: "Summary statistics for customer data"
group_name: "analytics"
deps: [sales_revenue_report]
Automate Python scripts
You can configure when assets should be automatically materialized using declarative automation conditions:
type: dagster.PythonScriptComponent
attributes:
execution:
path: process_sales_data.py
assets:
- key: sales_revenue_report
description: "Daily sales revenue report"
group_name: "analytics"
automation_condition: "{{ automation_condition.on_cron('@daily') }}"
Creating scripts in subdirectories
You can organize your scripts in subdirectories within your component:
my-project/src/my_project/defs/generate_revenue_report/
├── defs.yaml
├── scripts/
│ ├── process_sales_data.py
│ └── generate_reports.py
└── utils/
└── data_helpers.py
Reference scripts in subdirectories in your defs.yaml:
type: dagster.PythonScriptComponent
attributes:
execution:
path: scripts/process_sales_data.py
assets:
- key: sales_revenue_report
Best practices
- Start simple: Begin with standard Python scripts that print output for basic orchestration needs.
- Log structured metadata and information with Dagster Pipes: Use print statements for simple cases, or leverage
context.log.info()with Pipes for structured logging and useopen_dagster_pipes()context manager to leverage full Pipes support, such as streaming structured asset materialization events back to Dagster. - Keep scripts focused: Each script should have a clear, single responsibility, and offload complex dependencies to Dagster to benefit from native observability like lineage tracking and asset metadata.