tecton.declarative.StreamSource¶
-
class
tecton.declarative.
StreamSource
(*, name, description=None, tags=None, owner=None, batch_config, stream_config)¶ Declare a
StreamSource
, used to read streaming data into Tecton.StreamFeatureViews
ingest data from StreamSources. A StreamSource contains both a batch config and a stream config.Methods
Creates a new StreamSource.
-
__init__
(*, name, description=None, tags=None, owner=None, batch_config, stream_config)¶ Creates a new StreamSource.
- Parameters
name (
str
) – An unique name of the DataSource.tags (
Optional
[Dict
[str
,str
]]) – Tags associated with this Tecton Object (key-value pairs of arbitrary metadata).owner (
Optional
[str
]) – Owner name (typically the email of the primary maintainer).batch_config (
Union
[FileConfig
,HiveConfig
,RedshiftConfig
,SnowflakeConfig
]) – BatchConfig object containing the configuration of the batch data source that is to be included in this DataSource.stream_config (
Union
[KinesisConfig
,KafkaConfig
]) – StreamConfig object containing the configuration of the stream data source that is to be included in this DataSource.
- Returns
A
StreamSource
class instance.
Example of a StreamSource declaration:
import pyspark from tecton import KinesisConfig, HiveConfig, StreamSource from datetime import timedelta # Define our deserialization raw stream translator def raw_data_deserialization(df:pyspark.sql.DataFrame) -> pyspark.sql.DataFrame: from pyspark.sql.functions import col, from_json, from_utc_timestamp from pyspark.sql.types import StructType, StringType payload_schema = ( StructType() .add('amount', StringType(), False) .add('isFraud', StringType(), False) .add('timestamp', StringType(), False) ) return ( df.selectExpr('cast (data as STRING) jsonData') .select(from_json('jsonData', payload_schema).alias('payload')) .select( col('payload.amount').cast('long').alias('amount'), col('payload.isFraud').cast('long').alias('isFraud'), from_utc_timestamp('payload.timestamp', 'UTC').alias('timestamp') ) ) # Declare a StreamSource with both a batch_config and a stream_config as parameters # See the API documentation for both BatchConfig and StreamConfig transactions_stream = StreamSource( name='transactions_stream', stream_config=KinesisConfig( stream_name='transaction_events', region='us-west-2', initial_stream_position='latest', watermark_delay_threshold=timedelta(minutes=30), timestamp_field='timestamp', post_processor=raw_data_deserialization, options={'roleArn': 'arn:aws:iam::472542229217:role/demo-cross-account-kinesis-ro'} ), batch_config=HiveConfig( database='demo_fraud', table='transactions', timestamp_field='timestamp', ), owner='jules@tecton.ai', tags={'release': 'staging'} )
Attributes
name
The name of this DataSource.
timestamp_field
The name of the timestamp column or key of this DataSource.
-