Creating and Testing a Data Source Using a Data Source Function
Overview​
When defining a BatchSource or StreamSource object, you set the
batch_config or stream_config parameter, respectively. The value of these
configs can be the name of an object (such as HiveConfig or KafkaConfig)
or a Data Source Function.
Compared to using an object, a Data Source Function gives you more flexibility in connecting to an underlying data source and specifying logic for transforming the data retrieved from the underlying data source. However, using an object is recommended if you do not require the additional flexibility offered by a Data Source Function.
Define a BatchSource using a Data Source Function​
To define a BatchSource using a Data Source Function, use the
@spark_batch_config decorator.
The inputs to the Data Source Function are a SparkSession and an optional
filter_context:
The
SparkSessionis used to connect to your data, run any transformations, and return a SparkDataFrame.When
@spark_batch_configdecorator is defined withsupports_time_filtering=True, the Data Source Function must take thefilter_contextparameter, as input, and implement time filtering logic.supports_time_filteringmust be set toTrueif:<data source>.get_dataframe()is called withstart_timeorend_time.FilteredSourceis used with a Data Source when defining a Feature View. The Feature View will call the Data Source Function withFilterContext, which has thestart_timeandend_timeset.
An example Batch Data Source Function that does not use time filtering​
In the following example, csv_data_source_function reads from a .CSV file and
returns a DataFrame.
@spark_batch_config()
def csv_data_source_function(spark):
from pyspark.sql.functions import col
ts_column = "created_at"
df = spark.read.csv(csv_uri, header=True)
df = df.withColumn(ts_column, col(ts_column).cast("timestamp"))
return df
csv_batch_source = BatchSource(name="csv_ds", batch_config=csv_data_source_function)
An example Data Source Function that uses time filtering​
In the following example, redshift_data_source_function is a Data Source
Function that is used to connect to a Redshift table. Because
supports_time_filtering=True, handling filtering using the filter_context is
required.
@spark_batch_config(supports_time_filtering=True)
def redshift_data_source_function(spark, filter_context):
spark_format = "com.databricks.spark.redshift"
params = {"user": "<user name>", "password": os.environ["redshift_password"]}
endpoint = "<redshift endpoint>"
full_connection_string = f"jdbc:redshift://{endpoint};user={params['user']};password={params['password']}"
df_reader = (
spark.read.format(spark_format)
.option("url", full_connection_string)
.option("forward_spark_s3_credentials", "true")
)
df_reader = df_reader.option("dbtable", "<table name>")
df = df_reader_load()
ts_column = "timestamp"
df = df.withColumn(ts_column, col(ts_column).cast("timestamp"))
# Handle time filtering
if filter_context:
if filter_context.start_time:
df = df.where(col(ts_column) >= filter_context.start_time)
if filter_context.end_time:
df = df.where(col(ts_column) < filter_context.end_time)
return df
redshift_batch_source = BatchSource(name="redshift_ds", batch_config=redshift_data_source_function)
Define a StreamSource using a Data Source Function​
To define a StreamSource using a Data Source Function, use the
@spark_stream_config decorator. The Data Source Function takes a
SparkSession as input and returns a Spark streaming DataFrame.
In the following example, kinesis_data_source_function connects to a Kinesis
stream and returns a streaming DataFrame.
def raw_data_deserialization(df):
from pyspark.sql.functions import col, from_json, from_utc_timestamp, when
from pyspark.sql.types import (
StructType,
StructField,
StringType,
DoubleType,
TimestampType,
BooleanType,
IntegerType,
)
payload_schema = StructType(
[
StructField("user_id", StringType(), False),
StructField("transaction_id", StringType(), False),
StructField("category", StringType(), False),
StructField("amt", StringType(), False),
StructField("timestamp", StringType(), False),
]
)
return (
df.selectExpr("cast (data as STRING) jsonData")
.select(from_json("jsonData", payload_schema).alias("payload"))
.select(
col("payload.user_id").alias("user_id"),
col("payload.transaction_id").alias("transaction_id"),
col("payload.category").alias("category"),
col("payload.amt").cast("double").alias("amt"),
from_utc_timestamp("payload.timestamp", "UTC").alias("timestamp"),
)
)
@spark_stream_config()
def kinesis_data_source_function(spark):
options = {
"streamName": "<stream name>",
"roleArn": "<role ARN>",
"region": "<region>",
"shardFetchInterval": "30s",
"initialPosition": "latest",
}
reader = spark.readStream.format("kinesis").options(**options)
df = reader.load()
df = raw_data_deserialization(df)
watermark = "{} seconds".format(timedelta(hours=24).seconds)
df = df.withWatermark("timestamp", watermark)
return df
transactions_stream = StreamSource(
name="kinesis_ds",
stream_config=kinesis_data_source_function,
batch_config=redshift_data_source_function,
)
Test the Data Source in a notebook​
After you have applied your data source using tecton apply, use the Tecton SDK
in a notebook:
import tecton
workspace = tecton.get_workspace("<workspace name>")
# Test the batch data source. In the call to get_dataframe, start_time is specified because the data source redshift_ds is defined using a Data Source Function having supports_time_filtering=True.
my_batch_ds = workspace.get_data_source("redshift_ds")
df = my_batch_ds.get_dataframe(start_time=datetime.datetime(1, 1, 2020)).to_spark()
df.limit(10).show()
# Test the stream data source
my_stream_ds = workspace.get_data_source("kinesis_ds")
my_stream_ds.start_stream_preview("temp_table")
spark.sql("SELECT * FROM temp_table LIMIT 10").show()