tecton.declarative.spark_stream_config¶
-
tecton.declarative.
spark_stream_config
()¶ Declare an
tecton.declarative.data_source.SparkStreamConfig
for configuring a stream source with a Data Source Function. The function takes in aSparkSession
returns a streamingDataFrame
.Example defining a Data Source Function using
spark_stream_config
:from tecton import spark_stream_config def raw_data_deserialization(df): from pyspark.sql.functions import col, from_json, from_utc_timestamp, when from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType, BooleanType, IntegerType payload_schema = StructType([ StructField("user_id", StringType(), False), StructField("transaction_id", StringType(), False), StructField("category", StringType(), False), StructField("amt", StringType(), False), StructField("timestamp", StringType(), False), ]) return ( df.selectExpr("cast (data as STRING) jsonData") .select(from_json("jsonData", payload_schema).alias("payload")) .select( col("payload.user_id").alias("user_id"), col("payload.transaction_id").alias("transaction_id"), col("payload.category").alias("category"), col("payload.amt").cast("double").alias("amt"), from_utc_timestamp("payload.timestamp", "UTC").alias("timestamp") ) ) @spark_stream_config() def kinesis_data_source_function(spark): options = { "streamName": "<stream name>", "roleArn": "<role ARN>", "region": "<region>", "shardFetchInterval": "30s", "initialPosition": "latest" } reader = spark.readStream.format("kinesis").options(**options) df = reader.load() df = raw_data_deserialization(df) watermark = "{} seconds".format(timedelta(hours=24).seconds) df = df.withWatermark("timestamp", watermark) return df