tecton.spark_batch_config
Summary​
Declare a
tecton.SparkBatchConfig
for configuring a Batch Source with a Data Source Function. The function takes
in a SparkSession
and an optional
tecton.FilterContext
, if
supports_time_filtering=True
. Returns a DataFrame
.
Example​
Example defining a Data Source Function using spark_batch_config
:
from tecton import spark_batch_config
@spark_batch_config(supports_time_filtering=True)
def redshift_data_source_function(spark, filter_context):
spark_format = "com.databricks.spark.redshift"
params = {"user": "<user name>", "password": os.environ["redshift_password"]}
endpoint = "<redshift endpoint>"
full_connection_string = f"jdbc:redshift://{endpoint};user={params['user']};password={params['password']}"
df_reader = (
spark.read.format(spark_format)
.option("url", full_connection_string)
.option("forward_spark_s3_credentials", "true")
)
df_reader = df_reader.option("dbtable", "your_table_name")
df = df_reader_load()
ts_column = "timestamp"
df = df.withColumn(ts_column, col(ts_column).cast("timestamp"))
# Handle time filtering
if filter_context:
if filter_context.start_time:
df = df.where(col(ts_column) >= filter_context.start_time)
if filter_context.end_time:
df = df.where(col(ts_column) < filter_context.end_time)
return df
Parameters​
data_delay
(Optional
[timedelta
]) – By default, incremental materialization jobs run immediately at the end of the batch schedule period. This parameter configures how long they wait after the end of the period before starting, typically to ensure that all data has landed. For example, if a feature view has abatch_schedule
of 1 day and one of the data source inputs hasdata_delay=timedelta(hours=1)
set, then incremental materialization jobs will run at01:00
UTC. (Default:datetime.timedelta(0)
)supports_time_filtering
(Optional
[bool
]) – When set toTrue
, the Data Source Function must take thefilter_context
parameter and implement time filtering logic.supports_time_filtering
must be set to True if<data source>.get_dataframe()
is called withstart_time
orend_time
.supports_time_filtering
must also be set toTrue
if usingtecton.declarative.FilteredSource
with a Data Source when defining aFeatureView
. TheFeatureView
will call the Data Source Function with thetecton.FilterContext
, which has thestart_time
andend_time
set. (Default:False
)