Connecting to a Kinesis Stream Data Source
Overview
This example explains how you can connect Tecton to a Kinesis streaming data source.
Kinesis is an AWS global resource, which is not associated with a specific virtual private cloud (VPC). This eliminates the need to set up networking access (Security Groups, Subnets, VPC Peering, and so on). Instead, just ensure that Tecton has AWS IAM permissions to read from the Kinesis data source. Tecton's default configuration enables it to access all Kinesis streams available in the AWS account in which Tecton is deployed.
See Cross-Account Kinesis Access, below, for instructions on how to connect Tecton to a Kinesis stream in an AWS account that differs from the AWS account in which Tecton is deployed.
Once Tecton has access to your Kinesis streams, you need the following information:
- region: The AWS region in which the Kinesis stream lives (for example: us-west-1, us-east-2)
- stream_name: The unique name of the Kinesis stream
Sample Kinesis Data Source Configuration
The following example shows how to define a Stream Data Source with a
KinesisConfig
.
from tecton import (
HiveConfig,
KinesisConfig,
StreamSource,
BatchSource,
DatetimePartitionColumn,
)
from datetime import timedelta
def raw_data_deserialization(df):
from pyspark.sql.functions import col, from_json, from_utc_timestamp, when
from pyspark.sql.types import (
StructType,
StructField,
StringType,
DoubleType,
TimestampType,
BooleanType,
IntegerType,
)
payload_schema = StructType(
[
StructField("user_id", StringType(), False),
StructField("transaction_id", StringType(), False),
StructField("category", StringType(), False),
StructField("amt", StringType(), False),
StructField("is_fraud", StringType(), False),
StructField("merchant", StringType(), False),
StructField("merch_lat", StringType(), False),
StructField("merch_long", StringType(), False),
StructField("timestamp", StringType(), False),
]
)
return (
df.selectExpr("cast (data as STRING) jsonData")
.select(from_json("jsonData", payload_schema).alias("payload"))
.select(
col("payload.user_id").alias("user_id"),
col("payload.transaction_id").alias("transaction_id"),
col("payload.category").alias("category"),
col("payload.amt").cast("double").alias("amt"),
col("payload.is_fraud").cast("long").alias("is_fraud"),
col("payload.merchant").alias("merchant"),
col("payload.merch_lat").cast("double").alias("merch_lat"),
col("payload.merch_long").cast("double").alias("merch_long"),
from_utc_timestamp("payload.timestamp", "UTC").alias("timestamp"),
)
)
partition_columns = [
DatetimePartitionColumn(column_name="partition_0", datepart="year", zero_padded=True),
DatetimePartitionColumn(column_name="partition_1", datepart="month", zero_padded=True),
DatetimePartitionColumn(column_name="partition_2", datepart="day", zero_padded=True),
]
batch_config = HiveConfig(
database="demo_fraud_v2",
table="transactions",
timestamp_field="timestamp",
datetime_partition_columns=partition_columns,
)
transactions_stream = StreamSource(
name="transactions_stream",
stream_config=KinesisConfig(
stream_name="tecton-demo-fraud-data-stream",
region="us-west-2",
initial_stream_position="latest",
watermark_delay_threshold=timedelta(hours=24),
timestamp_field="timestamp",
post_processor=raw_data_deserialization,
options={"roleArn": "arn:aws:iam::706752053316:role/tecton-demo-fraud-data-cross-account-kinesis-ro"},
),
batch_config=batch_config,
owner="david@tecton.ai",
tags={"release": "production"},
)
transactions_batch = BatchSource(
name="transactions_batch",
batch_config=batch_config,
owner="david@tecton.ai",
tags={"release": "production"},
)
Cross-Account Kinesis Access
You might need access to a Kinesis stream that's in a different AWS account than Tecton's data plane. To enable cross-account access:
- Create a cross-account role in the AWS of your Kinesis stream that allows Tecton-orchestrated Spark workers to read from your Kinesis stream
- Configure your
KinesisConfig
object to use the cross-account role by setting theroleArn
parameter to the AWS ARN of the cross-account IAM role
Creating a Cross-Account Role
- In your Kinesis AWS Account, go to the IAM service and click the Roles tab.
- Click Create role. In the Select type of trusted entity panel,
click Another AWS Account. Paste in the Account ID of Tecton's data plane
AWS account,
<deployment-acct-id>
. You can get this ID by emailing support@tecton.ai. - Click Next: permissions and give this role permission to access Kinesis. You can provide your own JSON or use the AmazonKinesisFullAccess policy.
- Click Next: Review and give the role a name, for
example
KinesisCrossAccountRole
. - Click Create role. The list of roles displays.
- In the Roles list, click
KinesisCrossAccountRole
and verify that the trusted account contains a JSON policy like:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": ["arn:aws:iam::<deployment-acct-id>:root"],
"Service": "ec2.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
- Copy the role ARN. For example:
arn:aws:iam::<kinesis-owner-acct-id>:role/KinesisCrossAccountRole
.
Configuring the KinesisConfig Object
Set the roleArn
value in the options
argument of your KinesisConfig
as
shown below:
stream_config = KinesisConfig(
...,
options={"roleArn": "arn:aws:iam::<kinesis-owner-acct-id>:role/KinesisCrossAccountRole"},
)
Validate Data Access
To validate that Tecton can properly access the Kinesis stream, test the stream
with the Data Source's start_stream_preview
function, documented here.
Use the function in an interactive notebook.