lakehouse.silver

class lakehouse.silver.Silver(spark: SparkSession, **options: Dict[str, Any])

Bases: ETL

A generic class building a framework how to process data in the Silver layer in a Medallion architecture.

Use the functions load(), transform() and write() to specify configs. Use execute() to execute the defined steps.

Overwrite functions as required:

custom_load(self, table: str) -> DataFrame: Function to customize the way or the source data is loaded. required, if load(mode=”custom”) else ignored. custom_filter(self, sdf: DataFrame, table: str) -> DataFrame: Function to filter the loaded dataframe and making use of predicate pushdown. required, if load(filter=”custom”) else ignored. custom_transform(self, sdf: DataFrame, table: str) -> DataFrame: Function to be optionally overwritten to add custom transformations, only executed if transform() is defined custom_filter(self, sdf: DataFrame, table: str) -> DataFrame: Can be overwritten to add default transformations executed after the the custom transformations. Defaults create a timestamp column with the current timestamp of transformations. Only executed if transform(ignore_defaults=False) get_replace_condition(self, sdf: DataFrame, table: str) -> str: Allows you to define the filter used for the replace where overwrite operation. required if write(mode=”replace”). get_delta_merge_builder(self, sdf: DataFrame, delta_table: DeltaTable) -> DeltaMergeBuilder: Allows you to define the merge builder for the merge write into delta. required if write(mode=”merge”) custom_write(self, sdf: DataFrame, table: str) -> None: Allows to define a custom write operation. required if write(mode=”custom”)

spark

Spark Session as provided to process the data

Type:

SparkSession

\*\*options

Kwargs, Any options provided into the class

Type:

Dict[str, Any]

catalog

Name of the created catalog recognized by spark e.g. from Hive Metastore or Unity Catalogue

Type:

str

source_schema

Name of the source_schema

Type:

str

target_schema

Name of the target_schema

Type:

str

data

Intermediate DataFrame per table based on the specified options before execute()

Type:

Dict[str, DataFrame]

__init__(spark: SparkSession, **options: Dict[str, Any])

Initializes the Loader class with user-provided options.

Parameters:
  • spark (SparkSession) – existing Spark Session

  • **options (Dict[str, Any]) – Kwargs, Any options provided into the class

Kwargs options:

catalog (str): Name of the created catalog recognized by spark e.g. from Hive Metastore or Unity Catalogue, required source_schema (str): Name of the source_schema, required target_schema (str): Name of the target_schema, required

default_transform(sdf: DataFrame, table: str) DataFrame

Function adding the current timestamp as LH_SilverTS to the transformed data.

Can be overwritten if more internal transformations should be added

Parameters:
  • sdf (DataFrame) – DataFrame

  • table (str) – name of the table

Returns:

transformed DataFrame with internal transformations

load(**options)

Function to set the loader configs.

In Silver source_tbl is always None

Parameters:

**options (Dict[str, Any]) – Kwargs, Any load options provided

Kwargs options:

mode (str): default or custom, default: default, Mode of loading loads either the table from source_schema.table as default or as defined in the custom_load function. In Bronze always a custom_load function is needed meaning the default is custom filter (str): all or custom, default: all, Allows applying directly filters on the loaded data for predicate pushdown using “custom”, otherwise “all” data is loaded. In Bronze always all data is loaded based on the custom_load function

Returns:

self