Creating and Testing a Data Source Using a Data Source Function
Overview​
When defining a BatchSource
or StreamSource
object, you set the
batch_config
or stream_config
parameter, respectively. The value of these
config
s can be the name of an object (such as HiveConfig
or KafkaConfig
)
or a Data Source Function.
Compared to using an object, a Data Source Function gives you more flexibility in connecting to an underlying data source and specifying logic for transforming the data retrieved from the underlying data source. However, using an object is recommended if you do not require the additional flexibility offered by a Data Source Function.
Define a BatchSource
using a Data Source Function​
To define a BatchSource
using a Data Source Function, use the
@spark_batch_config
decorator.
The inputs to the Data Source Function are a SparkSession
and an optional
filter_context
:
The
SparkSession
is used to connect to your data, run any transformations, and return a SparkDataFrame
.When
@spark_batch_config
decorator is defined withsupports_time_filtering=True
, the Data Source Function must take thefilter_context
parameter, as input, and implement time filtering logic.supports_time_filtering
must be set toTrue
if:<data source>.get_dataframe()
is called withstart_time
orend_time
.FilteredSource
is used with a Data Source when defining a Feature View. The Feature View will call the Data Source Function withFilterContext
, which has thestart_time
andend_time
set.
An example Batch Data Source Function that does not use time filtering​
In the following example, csv_data_source_function
reads from a .CSV file and
returns a DataFrame
.
@spark_batch_config()
def csv_data_source_function(spark):
from pyspark.sql.functions import col
ts_column = "created_at"
df = spark.read.csv(csv_uri, header=True)
df = df.withColumn(ts_column, col(ts_column).cast("timestamp"))
return df
csv_batch_source = BatchSource(name="csv_ds", batch_config=csv_data_source_function)
An example Data Source Function that uses time filtering​
In the following example, redshift_data_source_function
is a Data Source
Function that is used to connect to a Redshift table. Because
supports_time_filtering=True
, handling filtering using the filter_context
is
required.
@spark_batch_config(supports_time_filtering=True)
def redshift_data_source_function(spark, filter_context):
spark_format = "com.databricks.spark.redshift"
params = {"user": "<user name>", "password": os.environ["redshift_password"]}
endpoint = "<redshift endpoint>"
full_connection_string = f"jdbc:redshift://{endpoint};user={params['user']};password={params['password']}"
df_reader = (
spark.read.format(spark_format)
.option("url", full_connection_string)
.option("forward_spark_s3_credentials", "true")
)
df_reader = df_reader.option("dbtable", "<table name>")
df = df_reader_load()
ts_column = "timestamp"
df = df.withColumn(ts_column, col(ts_column).cast("timestamp"))
# Handle time filtering
if filter_context:
if filter_context.start_time:
df = df.where(col(ts_column) >= filter_context.start_time)
if filter_context.end_time:
df = df.where(col(ts_column) < filter_context.end_time)
return df
redshift_batch_source = BatchSource(name="redshift_ds", batch_config=redshift_data_source_function)
Define a StreamSource using a Data Source Function​
To define a StreamSource
using a Data Source Function, use the
@spark_stream_config
decorator. The Data Source Function takes a
SparkSession
as input and returns a Spark streaming DataFrame
.
In the following example, kinesis_data_source_function
connects to a Kinesis
stream and returns a streaming DataFrame
.
def raw_data_deserialization(df):
from pyspark.sql.functions import col, from_json, from_utc_timestamp, when
from pyspark.sql.types import (
StructType,
StructField,
StringType,
DoubleType,
TimestampType,
BooleanType,
IntegerType,
)
payload_schema = StructType(
[
StructField("user_id", StringType(), False),
StructField("transaction_id", StringType(), False),
StructField("category", StringType(), False),
StructField("amt", StringType(), False),
StructField("timestamp", StringType(), False),
]
)
return (
df.selectExpr("cast (data as STRING) jsonData")
.select(from_json("jsonData", payload_schema).alias("payload"))
.select(
col("payload.user_id").alias("user_id"),
col("payload.transaction_id").alias("transaction_id"),
col("payload.category").alias("category"),
col("payload.amt").cast("double").alias("amt"),
from_utc_timestamp("payload.timestamp", "UTC").alias("timestamp"),
)
)
@spark_stream_config()
def kinesis_data_source_function(spark):
options = {
"streamName": "<stream name>",
"roleArn": "<role ARN>",
"region": "<region>",
"shardFetchInterval": "30s",
"initialPosition": "latest",
}
reader = spark.readStream.format("kinesis").options(**options)
df = reader.load()
df = raw_data_deserialization(df)
watermark = "{} seconds".format(timedelta(hours=24).seconds)
df = df.withWatermark("timestamp", watermark)
return df
transactions_stream = StreamSource(
name="kinesis_ds",
stream_config=kinesis_data_source_function,
batch_config=redshift_data_source_function,
)
Test the Data Source in a notebook​
After you have applied your data source using tecton apply
, use the Tecton SDK
in a notebook:
import tecton
workspace = tecton.get_workspace("<workspace name>")
# Test the batch data source. In the call to get_dataframe, start_time is specified because the data source redshift_ds is defined using a Data Source Function having supports_time_filtering=True.
my_batch_ds = workspace.get_data_source("redshift_ds")
df = my_batch_ds.get_dataframe(start_time=datetime.datetime(1, 1, 2020)).to_spark()
df.limit(10).show()
# Test the stream data source
my_stream_ds = workspace.get_data_source("kinesis_ds")
my_stream_ds.start_stream_preview("temp_table")
spark.sql("SELECT * FROM temp_table LIMIT 10").show()