lakehouse.etlwriter
- class lakehouse.etlwriter.ETLWriter(spark: SparkSession, **options: Dict[str, Any])
Bases:
objectA generic class integrating the writing of data.
Use the function write to set the write configs. Use _write() to execute the writing process.
- Overwrite functions as required:
get_replace_condition(self, sdf: DataFrame, table: str) -> str: Allows you to define the filter used for the replace where overwrite operation. required if write(mode=”replace”). get_delta_merge_builder(self, sdf: DataFrame, delta_table: DeltaTable) -> DeltaMergeBuilder: Allows you to define the merge builder for the merge write into delta. required if write(mode=”merge”) custom_write(self, sdf: DataFrame, table: str) -> None: Allows to define a custom write operation. required if write(mode=”custom”)
- spark
Spark Session as provided to process the data
- Type:
SparkSession
- \*\*options
Kwargs, Any options provided into the class
- Type:
Dict[str, Any]
- catalog
Name of the created catalog recognized by spark e.g. from Hive Metastore or Unity Catalogue
- Type:
str
- source_schema
Name of the source_schema
- Type:
str
- target_schema
Name of the target_schema
- Type:
str
- __init__(spark: SparkSession, **options: Dict[str, Any]) None
Initializes the Writer class with user-provided options.
- Parameters:
spark (SparkSession) – existing Spark Session
**options (Dict[str, Any]) – Kwargs, Any options provided into the class
- Kwargs options:
catalog (str): Name of the created catalog recognized by spark e.g. from Hive Metastore or Unity Catalogue, required source_schema (str): Name of the source_schema, required target_schema (str): Name of the target_schema, required
- custom_write(sdf: DataFrame, table: str) None
Abstract function to write a DataFrame.
- Parameters:
sdf (DataFrame) – DataFrame
table (str) – name of the table
- get_delta_merge_builder(sdf: DataFrame, delta_table: DeltaTable) DeltaMergeBuilder
Abstract function to define a DeltaMergeBuilder to perform the merge in the write function.
Executes a merge_schema if merge_schema = True.
Example
>>> merge_condition = "target.primary_key = source.primary_key" >>> builder = delta_table.alias("target").merge(sdf.alias("source"), merge_condition) >>> builder = builder.whenMatchedUpdateAll() >>> builder = builder.whenNotMatchedInsertAll() >>> return builder
More examples can be found here: https://docs.delta.io/latest/delta-update.html and https://docs.delta.io/latest/api/python/spark/index.html
- Parameters:
sdf (DataFrame) – DataFrame to be written
delta_table (DeltaTable) – target delta table to write to
- get_replace_condition(sdf: DataFrame, table: str) str
Condition as SQL expression to overwrite specific data.
Input data has to fullfill this condition
Example
>>> return "sample_col >= sample_value"
- Parameters:
sdf (DataFrame) – DataFrame
table (str) – name of the table
- write(**options)
Function to set the writer configs.
- Parameters:
**options (Dict[str, Any]) – Kwargs, Any write options provided
- Kwargs options:
mode (str): overwrite, append, replace, merge or custom, default: append, Defines the mode of writing the data as overwrite, append, replace (define replace filter with function get_replace_condition(), merge (define merge builder with function get_delta_merge_builder(), custom (define custom_write() function) merge_schema (bool): default: False, If the schema should be automatically envolved/merged
- Returns:
self