Delta Lake (dagster-deltalake)
This library provides an integration with the Delta Lake storage framework.
Related Guides:
- dagster_deltalake.DeltaLakeIOManager IOManagerDefinition [source]
- Base class for an IO manager definition that reads inputs from and writes outputs to Delta Lake. - Examples: - from dagster_deltalake import DeltaLakeIOManager
 from dagster_deltalake_pandas import DeltaLakePandasTypeHandler
 class MyDeltaLakeIOManager(DeltaLakeIOManager):
 @staticmethod
 def type_handlers() -> Sequence[DbTypeHandler]:
 return [DeltaLakePandasTypeHandler()]
 @asset(
 key_prefix=["my_schema"] # will be used as the schema (parent folder) in Delta Lake
 )
 def my_table() -> pd.DataFrame: # the name of the asset will be the table name
 ...
 defs = Definitions(
 assets=[my_table],
 resources={"io_manager": MyDeltaLakeIOManager()}
 )- If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O Manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a “schema” entry in output metadata. If none of these is provided, the schema will default to “public”. - @op(
 out={"my_table": Out(metadata={"schema": "my_schema"})}
 )
 def make_my_table() -> pd.DataFrame:
 ...- To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn. - @asset(
 ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
 )
 def my_table_a(my_table: pd.DataFrame):
 # my_table will just contain the data from column "a"
 ...
- dagster_deltalake.DeltaLakePyarrowIOManager IOManagerDefinition [source]
- Base class for an IO manager definition that reads inputs from and writes outputs to Delta Lake. - Examples: - from dagster_deltalake import DeltaLakeIOManager
 from dagster_deltalake_pandas import DeltaLakePandasTypeHandler
 class MyDeltaLakeIOManager(DeltaLakeIOManager):
 @staticmethod
 def type_handlers() -> Sequence[DbTypeHandler]:
 return [DeltaLakePandasTypeHandler()]
 @asset(
 key_prefix=["my_schema"] # will be used as the schema (parent folder) in Delta Lake
 )
 def my_table() -> pd.DataFrame: # the name of the asset will be the table name
 ...
 defs = Definitions(
 assets=[my_table],
 resources={"io_manager": MyDeltaLakeIOManager()}
 )- If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O Manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a “schema” entry in output metadata. If none of these is provided, the schema will default to “public”. - @op(
 out={"my_table": Out(metadata={"schema": "my_schema"})}
 )
 def make_my_table() -> pd.DataFrame:
 ...- To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn. - @asset(
 ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
 )
 def my_table_a(my_table: pd.DataFrame):
 # my_table will just contain the data from column "a"
 ...
- dagster_deltalake.DeltaTableResource ResourceDefinition [source]
- Resource for interacting with a Delta table. - Examples: - from dagster import Definitions, asset
 from dagster_deltalake import DeltaTableResource, LocalConfig
 @asset
 def my_table(delta_table: DeltaTableResource):
 df = delta_table.load().to_pandas()
 defs = Definitions(
 assets=[my_table],
 resources={
 "delta_table": DeltaTableResource(
 url="/path/to/table",
 storage_options=LocalConfig()
 )
 }
 )