Usage
This guide walks through common scenarios for using Iceberg with Dagster.
Selecting specific columns in a downstream asset
At times, you might prefer not to retrieve an entire table for a downstream asset. The Iceberg I/O manager allows you to load specific columns by providing metadata related to the downstream asset:
import pandas as pd
from dagster_iceberg.config import IcebergCatalogConfig
from dagster_iceberg.io_manager.pandas import PandasIcebergIOManager
from dagster import AssetIn, Definitions, asset
CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
CATALOG_WAREHOUSE = (
"file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
)
resources = {
"io_manager": PandasIcebergIOManager(
name="test",
config=IcebergCatalogConfig(
properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
),
namespace="dagster",
)
}
@asset
def iris_dataset() -> pd.DataFrame:
return pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
@asset(
ins={
"iris_sepal": AssetIn(
key="iris_dataset",
metadata={"columns": ["sepal_length_cm", "sepal_width_cm"]},
)
}
)
def sepal_data(iris_sepal: pd.DataFrame) -> pd.DataFrame:
iris_sepal["sepal_area_cm2"] = (
iris_sepal["sepal_length_cm"] * iris_sepal["sepal_width_cm"]
)
return iris_sepal
defs = Definitions(assets=[iris_dataset, sepal_data], resources=resources)
In this example, we focus exclusively on the columns containing sepal data from the iris_dataset table. To select specific columns, we can include metadata in the input asset. This is done using the metadata parameter of the AssetIn that loads the iris_dataset asset within the ins parameter. We provide the key columns along with a list of the desired column names.
When Dagster materializes sepal_data and retrieves the iris_dataset asset via the Iceberg I/O manager, it will only extract the sepal_length_cm and sepal_width_cm columns from the iris/iris_dataset table and make them available in sepal_data as a pandas DataFrame.
Storing partitioned assets
The Iceberg I/O manager facilitates the storage and retrieval of partitioned data. To effectively manage data in the Iceberg table, it is essential for the Iceberg I/O manager to identify the column that specifies the partition boundaries. This information allows the I/O manager to formulate the appropriate queries for selecting or replacing data.
Below, we outline how the I/O manager generates these queries for various partition types.
For partitioning to function correctly, the partition dimension must correspond to one of the partition columns defined in the Iceberg table. Tables created through the I/O manager will be configured accordingly.
- Static partitions
- Time-based partitions
- Multi-dimensional partitions
To save static-partitioned assets in your Iceberg table, you need to set the partition_expr metadata on the asset. This informs the Iceberg I/O manager which column holds the partition data:
import pandas as pd
from dagster_iceberg.config import IcebergCatalogConfig
from dagster_iceberg.io_manager.pandas import PandasIcebergIOManager
from dagster import Definitions, StaticPartitionsDefinition, asset
CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/catalog.db"
CATALOG_WAREHOUSE = "file:///home/vscode/workspace/.tmp/examples/warehouse"
resources = {
"io_manager": PandasIcebergIOManager(
name="test",
config=IcebergCatalogConfig(
properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
),
namespace="dagster",
)
}
@asset(
partitions_def=StaticPartitionsDefinition(
["Iris-setosa", "Iris-virginica", "Iris-versicolor"]
),
metadata={"partition_expr": "species"},
)
def iris_dataset_partitioned(context) -> pd.DataFrame:
species = context.partition_key
full_df = pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
return full_df[full_df["species"] == species]
@asset
def iris_cleaned(iris_dataset_partitioned: pd.DataFrame):
return iris_dataset_partitioned.dropna().drop_duplicates()
defs = Definitions(assets=[iris_dataset_partitioned, iris_cleaned], resources=resources)
Dagster uses the partition_expr metadata to create the necessary function parameters when retrieving the partition in the downstream asset. For static partitions, this is roughly equivalent to the following SQL query:
SELECT *
WHERE [partition_expr] IN ([selected partitions])
A partition must be specified when materializing the above assets, as explained in the Materializing partitioned assets documentation. For instance, the query used to materialize the Iris-setosa partition of the assets would be:
SELECT *
WHERE species = 'Iris-setosa'
Like static-partitioned assets, you can specify partition_expr metadata on the asset to tell the Iceberg I/O manager which column contains the partition data:
import datetime as dt
import random
import pandas as pd
from dagster_iceberg.config import IcebergCatalogConfig
from dagster_iceberg.io_manager.pandas import PandasIcebergIOManager
from dagster import DailyPartitionsDefinition, Definitions, asset
CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/catalog.db"
CATALOG_WAREHOUSE = "file:///home/vscode/workspace/.tmp/examples/warehouse"
resources = {
"io_manager": PandasIcebergIOManager(
name="test",
config=IcebergCatalogConfig(
properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
),
namespace="dagster",
)
}
def get_iris_data_for_date(partition: str) -> pd.DataFrame:
random.seed(876)
N = 1440
d = {
"timestamp": [dt.date.fromisoformat(partition)],
"species": [
random.choice(["Iris-setosa", "Iris-virginica", "Iris-versicolor"])
for _ in range(N)
],
"sepal_length_cm": [random.uniform(0, 1) for _ in range(N)],
"sepal_width_cm": [random.uniform(0, 1) for _ in range(N)],
"petal_length_cm": [random.uniform(0, 1) for _ in range(N)],
"petal_width_cm": [random.uniform(0, 1) for _ in range(N)],
}
return pd.DataFrame.from_dict(d)
@asset(
partitions_def=DailyPartitionsDefinition(start_date="2023-01-01"),
metadata={"partition_expr": "time"},
)
def iris_data_per_day(context) -> pd.DataFrame:
partition = context.partition_key
# get_iris_data_for_date fetches all of the iris data for a given date,
# the returned dataframe contains a column named 'time' with that stores
# the time of the row as an integer of seconds since epoch
return get_iris_data_for_date(partition)
@asset
def iris_cleaned(iris_data_per_day: pd.DataFrame):
return iris_data_per_day.dropna().drop_duplicates()
defs = Definitions(assets=[iris_data_per_day, iris_cleaned], resources=resources)
Dagster uses the partition_expr metadata to craft the SELECT statement when loading the correct partition in the downstream asset. When loading a dynamic partition, the following statement is used:
SELECT *
WHERE [partition_expr] = [partition_start]
A partition must be selected when materializing the above assets, as described in the Materializing partitioned assets documentation. The [partition_start] and [partition_end] bounds are of the form YYYY-MM-DD HH:MM:SS. In this example, the query when materializing the 2023-01-02 partition of the above assets would be:
SELECT *
WHERE time = '2023-01-02 00:00:00'
The Iceberg I/O manager can also store data partitioned on multiple dimensions. To do this, specify the column for each partition as a dictionary of partition_expr metadata:
import datetime as dt
import random
import pandas as pd
from dagster_iceberg.config import IcebergCatalogConfig
from dagster_iceberg.io_manager.pandas import PandasIcebergIOManager
from dagster import (
DailyPartitionsDefinition,
Definitions,
MultiPartitionsDefinition,
StaticPartitionsDefinition,
asset,
)
CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/catalog.db"
CATALOG_WAREHOUSE = "file:///home/vscode/workspace/.tmp/examples/warehouse"
resources = {
"io_manager": PandasIcebergIOManager(
name="test",
config=IcebergCatalogConfig(
properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
),
namespace="dagster",
)
}
def get_iris_data_for_date(partition: str) -> pd.DataFrame:
random.seed(876)
N = 1440
d = {
"timestamp": [dt.date.fromisoformat(partition)],
"species": [
random.choice(["Iris-setosa", "Iris-virginica", "Iris-versicolor"])
for _ in range(N)
],
"sepal_length_cm": [random.uniform(0, 1) for _ in range(N)],
"sepal_width_cm": [random.uniform(0, 1) for _ in range(N)],
"petal_length_cm": [random.uniform(0, 1) for _ in range(N)],
"petal_width_cm": [random.uniform(0, 1) for _ in range(N)],
}
return pd.DataFrame.from_dict(d)
@asset(
partitions_def=MultiPartitionsDefinition(
{
"date": DailyPartitionsDefinition(start_date="2023-01-01"),
"species": StaticPartitionsDefinition(
["Iris-setosa", "Iris-virginica", "Iris-versicolor"]
),
}
),
metadata={"partition_expr": {"date": "time", "species": "species"}},
)
def iris_dataset_partitioned(context) -> pd.DataFrame:
partition = context.partition_key.keys_by_dimension
species = partition["species"]
date = partition["date"]
# get_iris_data_for_date fetches all of the iris data for a given date,
# the returned dataframe contains a column named 'time' with that stores
# the time of the row as an integer of seconds since epoch
full_df = get_iris_data_for_date(date)
return full_df[full_df["species"] == species]
@asset
def iris_cleaned(iris_dataset_partitioned: pd.DataFrame):
return iris_dataset_partitioned.dropna().drop_duplicates()
defs = Definitions(assets=[iris_dataset_partitioned, iris_cleaned], resources=resources)
Dagster uses the partition_expr metadata to craft the SELECT statement when loading the correct partition in a downstream asset. For multi-dimensional partitions, Dagster concatenates the WHERE statements described in the static and time-based cases to craft the correct SELECT statement.
A partition must be selected when materializing the above assets, as described in the Materializing partitioned assets documentation. For example, when materializing the 2023-01-02|Iris-setosa partition of the above assets, the following query will be used:
SELECT *
WHERE species = 'Iris-setosa'
AND time = '2023-01-02 00:00:00'
Partition field naming
Partition fields are named using the column name that they correspond to, with a configurable prefix applied (defaults to "part-"). This prefixing is done in order to comply with changes introduced in pyiceberg 0.10.0 which require that partition field names do not exactly match any existing column names.
For example, an asset that is partitioned using hourly partitions on a column ingestion_time will be assigned a corresponding partition field name of part-ingestion_time.
The user may configure the prefix in the IO manager configuration via the IcebergCatalogConfig:
from dagster_iceberg.config import IcebergCatalogConfig
from dagster_iceberg.io_manager.pandas import PandasIcebergIOManager
from dagster import Definitions
CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/catalog.db"
CATALOG_WAREHOUSE = "file:///home/vscode/workspace/.tmp/examples/warehouse"
resources = {
"io_manager": PandasIcebergIOManager(
name="test",
config=IcebergCatalogConfig(
properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE},
partition_field_name_prefix="custom-prefix",
),
namespace="dagster",
)
}
defs = Definitions(resources=resources, assets=[...])
Users may also configure the prefix at launch time via run config if the IO manager is set up using configure_at_launch() (see the resource configuration docs for more details on this pattern).
Storing tables in multiple schemas
You may want to have different assets stored in different Iceberg schemas. The Iceberg I/O manager allows you to specify the schema in several ways.
If you want all of your assets to be stored in the same schema, you can specify the schema as configuration to the I/O manager.
If you want to store assets in different schemas, you can specify the schema as part of the asset key:
import pandas as pd
from dagster_iceberg.config import IcebergCatalogConfig
from dagster_iceberg.io_manager.pandas import PandasIcebergIOManager
from dagster import Definitions, asset
CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/catalog.db"
CATALOG_WAREHOUSE = "file:///home/vscode/workspace/.tmp/examples/warehouse"
resources = {
"io_manager": PandasIcebergIOManager(
name="test",
config=IcebergCatalogConfig(
properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
),
namespace="dagster",
)
}
@asset(key_prefix=["iris"]) # will be stored in "iris" schema
def iris_dataset() -> pd.DataFrame:
return pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
@asset(key_prefix=["wine"]) # will be stored in "wine" schema
def wine_dataset() -> pd.DataFrame:
return pd.read_csv(
"https://gist.githubusercontent.com/tijptjik/9408623/raw/b237fa5848349a14a14e5d4107dc7897c21951f5/wine.csv",
names=[
"fixed_acidity",
"volatile_acidity",
"citric_acid",
"residual_sugar",
"chlorides",
"free_sulfur_dioxide",
"total_sulfur_dioxide",
"density",
"ph",
"sulphates",
"alcohol",
"quality",
],
)
defs = Definitions(assets=[iris_dataset, wine_dataset], resources=resources)
In this example, the iris_dataset asset will be stored in the iris schema, and the daffodil_dataset asset will be found in the daffodil schema.
The two options for specifying schema are mutually exclusive. If you provide
schema configuration to the I/O manager, you cannot also provide
it via the asset key, and vice versa. If no schema is provided,
either from configuration or asset keys, the default public schema
will be used.
Using the Iceberg I/O manager with other I/O managers
You may have assets that you don't want to store in Iceberg. You can provide an I/O manager to each asset using the io_manager_key parameter in the @dg.asset decorator:
import pandas as pd
from dagster_iceberg.config import IcebergCatalogConfig
from dagster_iceberg.io_manager.pandas import PandasIcebergIOManager
from dagster import Definitions, FilesystemIOManager, asset
CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/catalog.db"
CATALOG_WAREHOUSE = "file:///home/vscode/workspace/.tmp/examples/warehouse"
FS_BASE_DIR = "/home/vscode/workspace/.tmp/examples/images"
resources = {
"dwh_io_manager": PandasIcebergIOManager(
name="test",
config=IcebergCatalogConfig(
properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
),
namespace="dagster",
),
"blob_io_manager": FilesystemIOManager(base_dir=FS_BASE_DIR),
}
@asset(io_manager_key="dwh_io_manager")
def iris_dataset() -> pd.DataFrame:
return pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
@asset(io_manager_key="blob_io_manager")
def iris_plots(iris_dataset: pd.DataFrame):
# plot_data is a function we've defined somewhere else
# that plots the data in a DataFrame
return iris_dataset["sepal_length_cm"].plot.hist()
defs = Definitions(assets=[iris_dataset, iris_plots], resources=resources)
In the above example:
- The
iris_datasetasset uses the I/O manager bound to the keywarehouse_io_manager, andiris_plotsuses the I/O manager bound to the keyblob_io_manager. - We define the I/O managers for those keys in the
Definitionsobject. - When the assets are materialized, the
iris_datasetwill be stored in Iceberg, andiris_plotswill be saved in Amazon S3.
Using different compute engines to read from and write to Iceberg
dagster-iceberg supports several compute engines out-of-the-box. You can find detailed examples of how to use each engine in the API docs.
- PyArrow Tables
- Pandas DataFrames
- Polars DataFrames
- Daft DataFrames
The Iceberg package relies heavily on Apache Arrow for efficient data transfer, so PyArrow is natively supported.
You can use PyArrowIcebergIOManager to read and write iceberg tables:
import pandas as pd
import pyarrow as pa
from dagster_iceberg.config import IcebergCatalogConfig
from dagster_iceberg.io_manager.arrow import PyArrowIcebergIOManager
from dagster import Definitions, asset
CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
CATALOG_WAREHOUSE = (
"file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
)
resources = {
"io_manager": PyArrowIcebergIOManager(
name="test",
config=IcebergCatalogConfig(
properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
),
namespace="dagster",
)
}
@asset
def iris_dataset() -> pa.Table:
return pa.Table.from_pandas(
pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
)
defs = Definitions(assets=[iris_dataset], resources=resources)
You can use PandasIcebergIOManager to read and write iceberg tables using Pandas:
import pandas as pd
from dagster_iceberg.config import IcebergCatalogConfig
from dagster_iceberg.io_manager.pandas import PandasIcebergIOManager
from dagster import Definitions, asset
CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
CATALOG_WAREHOUSE = (
"file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
)
resources = {
"io_manager": PandasIcebergIOManager(
name="test",
config=IcebergCatalogConfig(
properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
),
namespace="dagster",
)
}
@asset
def iris_dataset() -> pd.DataFrame:
return pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
defs = Definitions(assets=[iris_dataset], resources=resources)
You can use the PolarsIcebergIOManager to read and write iceberg tables using Polars using a full lazily optimized query engine:
import pandas as pd
import polars as pl
from dagster_iceberg.config import IcebergCatalogConfig
from dagster_iceberg.io_manager.polars import PolarsIcebergIOManager
from dagster import Definitions, asset
CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
CATALOG_WAREHOUSE = (
"file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
)
resources = {
"io_manager": PolarsIcebergIOManager(
name="test",
config=IcebergCatalogConfig(
properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
),
namespace="dagster",
)
}
@asset
def iris_dataset() -> pl.DataFrame:
return pl.from_pandas(
pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
)
defs = Definitions(assets=[iris_dataset], resources=resources)
You can use the DaftIcebergIOManager to read and write iceberg tables using Daft using a full lazily optimized query engine:
import daft as da
import pandas as pd
from dagster_iceberg.config import IcebergCatalogConfig
from dagster_iceberg.io_manager.daft import DaftIcebergIOManager
from dagster import Definitions, asset
# Replace with your actual catalog and warehouse paths, or use environment variables to set these values at runtime.
CATALOG_URI = "sqlite:////path/to/your/catalog.db"
CATALOG_WAREHOUSE = "file:///path/to/your/warehouse"
resources = {
"io_manager": DaftIcebergIOManager(
name="test",
config=IcebergCatalogConfig(
properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
),
namespace="dagster",
)
}
@asset
def iris_dataset() -> da.DataFrame:
return da.from_pandas(
pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
)
defs = Definitions(assets=[iris_dataset], resources=resources)
Executing custom SQL commands
In addition to the Iceberg I/O manager, Dagster also provides an resource.IcebergTableResource for executing custom SQL queries.
import pandas as pd
from dagster_iceberg.config import IcebergCatalogConfig
from dagster_iceberg.resource import IcebergTableResource
from dagster import Definitions, asset
CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/catalog.db"
CATALOG_WAREHOUSE = "file:///home/vscode/workspace/.tmp/examples/warehouse"
@asset
def small_petals(iceberg: IcebergTableResource) -> pd.DataFrame:
return iceberg.load().scan().to_pandas()
defs = Definitions(
assets=[small_petals],
resources={
"iceberg": IcebergTableResource(
name="test",
config=IcebergCatalogConfig(
properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
),
namespace="dagster",
table="ingested_data", # assuming that `ingested_data` Iceberg table exists
)
},
)
In this example, we attach the resource to the small_petals asset. In the body of the asset function, we use the load() method to retrieve the Iceberg table object, which can then be used for further processing.
Configuring table behavior using table properties
PyIceberg tables support table properties to configure table behavior. You can find a full list of properties in the PyIceberg documentation.
Use asset metadata to set table properties:
import pandas as pd
from dagster_iceberg.config import IcebergCatalogConfig
from dagster_iceberg.io_manager.pandas import PandasIcebergIOManager
from dagster import Definitions, asset
CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
CATALOG_WAREHOUSE = (
"file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
)
resources = {
"io_manager": PandasIcebergIOManager(
name="test",
config=IcebergCatalogConfig(
properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
),
namespace="dagster",
)
}
@asset
def iris_dataset() -> pd.DataFrame:
return pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
@asset(
metadata={
"table_properties": {
"write.parquet.page-size-bytes": "2097152", # 2MB
"write.parquet.page-row-limit": "10000",
}
}
)
def sepal_data(iris_sepal: pd.DataFrame) -> pd.DataFrame:
iris_sepal["sepal_area_cm2"] = (
iris_sepal["sepal_length_cm"] * iris_sepal["sepal_width_cm"]
)
return iris_sepal
defs = Definitions(assets=[iris_dataset, sepal_data], resources=resources)
Write modes
The Iceberg I/O manager supports three write modes:
overwrite(default): every asset materialization will overwrite the backing Iceberg table. Partitioned assets only overwrite partitions of the Iceberg table that were part of the asset materialization.append: results returned from each asset materialization will be inserted into the backing Iceberg table, respecting partitions when appropriate. Not currently supported in the Spark I/O manager.upsert: asset materialization results will be merged into the backing Iceberg table using pyiceberg's native implementation, updating any existing records that match on a configurable join key, and inserting records that do not exist in the target table. Insert and update actions can be turned on or off via configuration; for example, you may only want to insert any new records but not update any matching records, or vice versa (see Using upsert mode for usage details). Not currently supported in the Spark I/O manager.
The write mode is set using the write_mode metadata key, which can be set using asset definition at deployment time, or at runtime within the asset definition body by using output metadata (see the examples in the next section).
Setting write mode in definition metadata
import pyarrow as pa
from dagster import AssetExecutionContext, asset
@asset(metadata={"write_mode": "append"})
def user_profiles(context: AssetExecutionContext) -> pa.Table:
return pa.table(
{
"id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"],
"updated_at": ["2024-01-01", "2024-01-02", "2024-01-03"],
}
)
Overriding definition metadata write mode with output metadata
Setting write mode in output metadata overrides any write mode settings in the asset definition metadata:
import pyarrow as pa
from dagster import AssetExecutionContext, asset
@asset(metadata={"write_mode": "overwrite"})
def user_profiles(context: AssetExecutionContext) -> pa.Table:
context.add_output_metadata({"write_mode": "append"})
return pa.table(
{
"id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"],
"updated_at": ["2024-01-01", "2024-01-02", "2024-01-03"],
}
)
Using upsert mode
Note: only supported in non-spark IO managers
The Iceberg I/O manager supports upsert operations, which allow you to update existing rows and insert new rows in a single operation. This is useful for maintaining slowly changing dimensions or incrementally updating tables.
Options
Upsert options can be set at deployment time via asset definition metadata, or dynamically at runtime via output metadata. Upsert options set at runtime via context.add_output_metadata() take precedence over those set in definition metadata.
Required:
join_cols: list[str] - list of columns that make up the join key for the upsert operation
Optional:
when_matched_update_all: bool - Whether to update rows in the target table that join with the dataframe being upserted (default True)when_not_matched_insert_all: bool - Whether to insert all rows from the upsert dataframe that do not join with the target table (default True)
Any upsert_options set when write_mode is not set to upsert will be ignored, with a debug log message indicating the options were ignored. This allows a user to set the write_mode to upsert with upsert_options in the asset definition metadata while still being able to override the write mode in the output metadata.
To use upsert mode, set the write_mode to "upsert" and provide upsert_options in the asset or output metadata:
import pyarrow as pa
from dagster import AssetExecutionContext, asset
@asset(
metadata={
"write_mode": "upsert",
"upsert_options": {
"join_cols": ["id"], # Columns to join on for matching
"when_matched_update_all": True, # Update all columns when matched
"when_not_matched_insert_all": True, # Insert all columns when not matched
},
}
)
def user_profiles(context: AssetExecutionContext) -> pa.Table:
# Returns a table with user profiles
# Rows with matching 'id' will be updated
# Rows with new 'id' values will be inserted
return pa.table(
{
"id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"],
"updated_at": ["2024-01-01", "2024-01-02", "2024-01-03"],
}
)
Upsert options set in definition metadata can be overridden at runtime using output metadata:
import pyarrow as pa
from dagster import AssetExecutionContext, asset
@asset(
metadata={
"write_mode": "upsert",
"upsert_options": {
"join_cols": ["id"],
"when_matched_update_all": True,
"when_not_matched_insert_all": True,
},
}
)
def user_profiles_dynamic(context: AssetExecutionContext) -> pa.Table:
# Override upsert options at runtime based on business logic
if context.run.tags.get("upsert_join_keys") == "id_and_timestamp":
context.add_output_metadata(
{
"upsert_options": {
"join_cols": ["id", "timestamp"], # Join on multiple columns
"when_matched_update_all": False,
"when_not_matched_insert_all": False,
}
}
)
return pa.table(
{
"id": [1, 2, 3],
"timestamp": ["2024-01-01", "2024-01-01", "2024-01-01"],
"name": ["Alice", "Bob", "Charlie"],
}
)
The UpsertOptions BaseModel subclass can be used to represent upsert options metadata to provide deployment-time type validation:
import pyarrow as pa
from dagster_iceberg.config import UpsertOptions
from dagster import AssetExecutionContext, asset
@asset(
metadata={
"write_mode": "upsert",
"upsert_options": UpsertOptions(
join_cols=["id", "timestamp"],
when_matched_update_all=True,
when_not_matched_insert_all=True,
),
}
)
def my_table_typed_upsert(context: AssetExecutionContext):
context.add_output_metadata(
{
"upsert_options": UpsertOptions(
join_cols=["id", "timestamp"],
when_matched_update_all=True,
when_not_matched_insert_all=False,
)
}
)
return pa.table(
{
"id": [1, 2, 3],
"timestamp": ["2024-01-01", "2024-01-01", "2024-01-01"],
"name": ["Alice", "Bob", "Charlie"],
}
)
Allowing updates to schema and partitions
By default, assets will error when you change the partition spec (e.g. if you change a partition from hourly to daily) or the schema (e.g. when you add a column). You can allow updates to an asset's partition spec and/or schema by setting partition_spec_update_mode and/or schema_update_mode, respectively, on the asset metadata:
@asset(
partitions_def=MultiPartitionsDefinition(
{
"date": DailyPartitionsDefinition(start_date="2023-01-01"),
"species": StaticPartitionsDefinition(
["Iris-setosa", "Iris-virginica", "Iris-versicolor"]
),
}
),
metadata={
"partition_expr": {"date": "time", "species": "species"},
"partition_spec_update_mode": "update",
"schema_update_mode": "update",
},
)
def iris_dataset_partitioned(context) -> pd.DataFrame: ...
Using the custom I/O manager
The dagster-iceberg library leans heavily on Dagster's DbIOManager implementation. However, this I/O manager comes with some limitations, such as the lack of support for various partition mappings. A custom (experimental) DbIOManager implementation is available that supports partition mappings as long as any time-based partition is consecutive and static partitions are of string type. You can enable it as follows:
resources = {
"io_manager": PyArrowIcebergIOManager(
name="my_catalog",
config=IcebergCatalogConfig(
properties={
"type": "sql",
"uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
"warehouse": f"file://{warehouse_path}",
}
),
namespace="my_schema",
)
}
For example, a MultiToSingleDimensionPartitionMapping is supported:
@asset(
key_prefix=["my_schema"],
partitions_def=DailyPartitionsDefinition(start_date="2022-01-01"),
ins={
"multi_partitioned_asset": AssetIn(
["my_schema", "multi_partitioned_asset_1"],
partition_mapping=MultiToSingleDimensionPartitionMapping(
partition_dimension_name="date"
),
)
},
metadata={
"partition_expr": "date_column",
},
)
def single_partitioned_asset_date(multi_partitioned_asset: pa.Table) -> pa.Table: ...
However, a SpecificPartitionsPartitionMapping is not, because these dates are not consecutive:
@asset(
partitions_def=MultiPartitionsDefinition(
partitions_defs={
"date": DailyPartitionsDefinition(
start_date="2022-01-01",
end_date="2022-01-10",
),
"letter": StaticPartitionsDefinition(["a", "b", "c"]),
},
),
key_prefix=["my_schema"],
metadata={"partition_expr": {"time": "time", "letter": "letter"}},
ins={
"multi_partitioned_asset": AssetIn(
["my_schema", "multi_partitioned_asset_1"],
partition_mapping=MultiPartitionMapping(
{
"color": DimensionPartitionMapping(
dimension_name="letter",
partition_mapping=StaticPartitionMapping(
{"blue": "a", "red": "b", "yellow": "c"}
),
),
"date": DimensionPartitionMapping(
dimension_name="date",
partition_mapping=SpecificPartitionsPartitionMapping(
["2022-01-01", "2024-01-01"]
),
),
}
),
)
},
)
def mapped_multi_partition(
context: AssetExecutionContext, multi_partitioned_asset: pa.Table
) -> pa.Table: ...