tecton.declarative.StreamSource

class tecton.declarative.StreamSource(*, name, description=None, tags=None, owner=None, batch_config, stream_config)

Declare a StreamSource, used to read streaming data into Tecton.

StreamFeatureViews ingest data from StreamSources. A StreamSource contains both a batch config and a stream config.

Methods

__init__

Creates a new StreamSource.

__init__(*, name, description=None, tags=None, owner=None, batch_config, stream_config)

Creates a new StreamSource.

Parameters
  • name (str) – An unique name of the DataSource.

  • description (Optional[str]) – A human readable description.

  • tags (Optional[Dict[str, str]]) – Tags associated with this Tecton Object (key-value pairs of arbitrary metadata).

  • owner (Optional[str]) – Owner name (typically the email of the primary maintainer).

  • batch_config (Union[FileConfig, HiveConfig, RedshiftConfig, SnowflakeConfig]) – BatchConfig object containing the configuration of the batch data source that is to be included in this DataSource.

  • stream_config (Union[KinesisConfig, KafkaConfig]) – StreamConfig object containing the configuration of the stream data source that is to be included in this DataSource.

Returns

A StreamSource class instance.

Example of a StreamSource declaration:

import pyspark
   from tecton import KinesisConfig, HiveConfig, StreamSource
   from datetime import timedelta


   # Define our deserialization raw stream translator
   def raw_data_deserialization(df:pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
       from pyspark.sql.functions import col, from_json, from_utc_timestamp
       from pyspark.sql.types import StructType, StringType

       payload_schema = (
         StructType()
               .add('amount', StringType(), False)
               .add('isFraud', StringType(), False)
               .add('timestamp', StringType(), False)
       )
       return (
           df.selectExpr('cast (data as STRING) jsonData')
           .select(from_json('jsonData', payload_schema).alias('payload'))
           .select(
               col('payload.amount').cast('long').alias('amount'),
               col('payload.isFraud').cast('long').alias('isFraud'),
               from_utc_timestamp('payload.timestamp', 'UTC').alias('timestamp')
           )
       )

   # Declare a StreamSource with both a batch_config and a stream_config as parameters
   # See the API documentation for both BatchConfig and StreamConfig
   transactions_stream = StreamSource(
                           name='transactions_stream',
                           stream_config=KinesisConfig(
                               stream_name='transaction_events',
                               region='us-west-2',
                               initial_stream_position='latest',
                               watermark_delay_threshold=timedelta(minutes=30),
                               timestamp_field='timestamp',
                               post_processor=raw_data_deserialization,
                               options={'roleArn': 'arn:aws:iam::472542229217:role/demo-cross-account-kinesis-ro'}
                           ),
                           batch_config=HiveConfig(
                               database='demo_fraud',
                               table='transactions',
                               timestamp_field='timestamp',
                           ),
                           owner='jules@tecton.ai',
                           tags={'release': 'staging'}
                           )

Attributes

name

The name of this DataSource.

timestamp_field

The name of the timestamp column or key of this DataSource.