lakehouse.etlwriter

class lakehouse.etlwriter.ETLWriter(spark: SparkSession, **options: Dict[str, Any])

Bases: object

A generic class integrating the writing of data.

Use the function write to set the write configs. Use _write() to execute the writing process.

Overwrite functions as required:

get_replace_condition(self, sdf: DataFrame, table: str) -> str: Allows you to define the filter used for the replace where overwrite operation. required if write(mode=”replace”). get_delta_merge_builder(self, sdf: DataFrame, delta_table: DeltaTable) -> DeltaMergeBuilder: Allows you to define the merge builder for the merge write into delta. required if write(mode=”merge”) custom_write(self, sdf: DataFrame, table: str) -> None: Allows to define a custom write operation. required if write(mode=”custom”)

spark

Spark Session as provided to process the data

Type:

SparkSession

\*\*options

Kwargs, Any options provided into the class

Type:

Dict[str, Any]

catalog

Name of the created catalog recognized by spark e.g. from Hive Metastore or Unity Catalogue

Type:

str

source_schema

Name of the source_schema

Type:

str

target_schema

Name of the target_schema

Type:

str

__init__(spark: SparkSession, **options: Dict[str, Any]) None

Initializes the Writer class with user-provided options.

Parameters:
  • spark (SparkSession) – existing Spark Session

  • **options (Dict[str, Any]) – Kwargs, Any options provided into the class

Kwargs options:

catalog (str): Name of the created catalog recognized by spark e.g. from Hive Metastore or Unity Catalogue, required source_schema (str): Name of the source_schema, required target_schema (str): Name of the target_schema, required

custom_write(sdf: DataFrame, table: str) None

Abstract function to write a DataFrame.

Parameters:
  • sdf (DataFrame) – DataFrame

  • table (str) – name of the table

get_delta_merge_builder(sdf: DataFrame, delta_table: DeltaTable) DeltaMergeBuilder

Abstract function to define a DeltaMergeBuilder to perform the merge in the write function.

Executes a merge_schema if merge_schema = True.

Example

>>> merge_condition = "target.primary_key = source.primary_key"
>>> builder = delta_table.alias("target").merge(sdf.alias("source"), merge_condition)
>>> builder = builder.whenMatchedUpdateAll()
>>> builder = builder.whenNotMatchedInsertAll()
>>> return builder

More examples can be found here: https://docs.delta.io/latest/delta-update.html and https://docs.delta.io/latest/api/python/spark/index.html

Parameters:
  • sdf (DataFrame) – DataFrame to be written

  • delta_table (DeltaTable) – target delta table to write to

get_replace_condition(sdf: DataFrame, table: str) str

Condition as SQL expression to overwrite specific data.

Input data has to fullfill this condition

Example

>>> return "sample_col >= sample_value"
Parameters:
  • sdf (DataFrame) – DataFrame

  • table (str) – name of the table

write(**options)

Function to set the writer configs.

Parameters:

**options (Dict[str, Any]) – Kwargs, Any write options provided

Kwargs options:

mode (str): overwrite, append, replace, merge or custom, default: append, Defines the mode of writing the data as overwrite, append, replace (define replace filter with function get_replace_condition(), merge (define merge builder with function get_delta_merge_builder(), custom (define custom_write() function) merge_schema (bool): default: False, If the schema should be automatically envolved/merged

Returns:

self