lakehouse.bronze

class lakehouse.bronze.Bronze(spark: SparkSession, **options: Dict[str, Any])

Bases: ETL

A generic class building a framework how to process data in the Bronze layer in a Medallion architecture.

Use the functions load(), transform() and write() to specify configs. Use execute() to execute the defined steps.

Overwrite functions as required:

custom_load(self, table: str) -> DataFrame: Function to customize the way or the source data is loaded. Always required in Bronze custom_transform(self, sdf: DataFrame, table: str) -> DataFrame: Function to be optionally overwritten to add custom transformations, only executed if transform() is defined custom_filter(self, sdf: DataFrame, table: str) -> DataFrame: Can be overwritten to add default transformations executed after the the custom transformations. Defaults create a timestamp column with the current timestamp of transformations. Only executed if transform(ignore_defaults=False) get_replace_condition(self, sdf: DataFrame, table: str) -> str: Allows you to define the filter used for the replace where overwrite operation. required if write(mode=”replace”). get_delta_merge_builder(self, sdf: DataFrame, delta_table: DeltaTable) -> DeltaMergeBuilder: Allows you to define the merge builder for the merge write into delta. required if write(mode=”merge”) custom_write(self, sdf: DataFrame, table: str) -> None: Allows to define a custom write operation. required if write(mode=”custom”)

spark

Spark Session as provided to process the data

Type:

SparkSession

\*\*options

Kwargs, Any options provided into the class

Type:

Dict[str, Any]

catalog

Name of the created catalog recognized by spark e.g. from Hive Metastore or Unity Catalogue

Type:

str

target_schema

Name of the target_schema

Type:

str

data

Intermediate DataFrame per table based on the specified options before execute()

Type:

Dict[str, DataFrame]

__init__(spark: SparkSession, **options: Dict[str, Any])

Initializes the Loader class with user-provided options.

Parameters:
  • spark (SparkSession) – existing Spark Session

  • **options (Dict[str, Any]) – Kwargs, Any options provided into the class

Kwargs options:

catalog (str): Name of the created catalog recognized by spark e.g. from Hive Metastore or Unity Catalogue, required target_schema (str): Name of the target_schema, required

default_transform(sdf: DataFrame, table: str) DataFrame

Function adding the current timestamp as LH_BronzeTS to the transformed data.

Can be overwritten if more internal transformations should be added

Parameters:
  • sdf (DataFrame) – DataFrame

  • table (str) – name of the table

Returns:

transformed DataFrame with internal transformations

load()

Function to set the loader configs.

For Bronze no load options are provided as load mode is always custom, filter is always all and source_tbl is always None.

Returns:

self