Skip to main content
Version: 0.5

Stream Feature View

info

This feature is not supported in Tecton on Snowflake.

If you are interested in this functionality, please file a feature request.

A Stream Feature View defines transformations against a Stream Data Source and can compute features in near-real-time. It processes raw data from a streaming source (e.g. Kafka or Kinesis) and can be backfilled from a batch source that contains a historical log of stream events.

Stream Feature Transformations​

Stream Feature Views can run row-level Spark SQL or PySpark transformations and can apply optional time-windowed aggregations (see section below). Tecton executes these transformations as Spark jobs on your connected data platform (Databricks or EMR).

from tecton import stream_feature_view


@stream_feature_view(source=transactions_stream, entities=[user], mode="spark_sql")
def user_last_transaction_amount(transactions_stream):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions_stream}
"""

Materialization to the Online and Offline Feature Store​

Online Materialization​

When online=True is set, Tecton will run the Stream Feature View transformation on each event that comes in from the underlying stream source and write it to the online store. Any previous values will be overwritten, so the online store only has the most recent value. Feature data will be backfilled from the Stream Data Source's log of historical events (configured via its batch_config). The feature_start_time specifies how far back to backfill features.

from tecton import stream_feature_view
from datetime import datetime, timedelta


@stream_feature_view(
source=transactions_stream,
entities=[user],
mode="spark_sql",
online=True,
feature_start_time=datetime(2020, 10, 10),
)
def user_last_transaction_amount(transactions_stream):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions_stream}
"""

Offline Materialization​

Feature data can also be materialized to the Offline Feature Store in order to speed up offline queries (for testing and training data generation). When offline=True is set, Tecton will run the same Stream Feature View transformation pipeline against the batch source (the historical log of stream events) that backs the stream source. The batch_schedule parameter determines how often Tecton will run offline materialization jobs.

from tecton import stream_feature_view
from datetime import datetime, timedelta


@stream_feature_view(
source=transactions_stream,
entities=[user],
mode="spark_sql",
online=True,
offline=True,
feature_start_time=datetime(2020, 10, 10),
batch_schedule=timedelta(days=1),
)
def user_last_transaction_amount(transactions_stream):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions_stream}
"""

Time-Windowed Aggregations​

Tecton provides built-in implementations of common time-windowed aggregations that can be applied to Stream Feature Views. These time-windowed aggregations are optimized for performance and efficiency and are applied consistently online and offline.

tip

For a technical deep dive, check out our two-part blog post on Real-Time Aggregation Features for Machine Learning.

Time-windowed aggregations can be specified in the Stream Feature View decorator using the aggregations parameter. Tecton expects the Stream Feature View transformation to select the raw events (with timestamps) to be aggregated.

Aggregations can be updated either on a sliding time window or with continuous processing for the fresher feature values.

For a list of available time-window aggregation functions and other information about the functions, refer to the Time-Window Aggregation Functions Reference.

note

If the aggregation you need is not supported, please make a feature request.

Sliding Time-Windows​

Sliding time-windows are configured with the aggregation_interval parameter. Tecton will update the feature value in the online store after the aggregation interval has elapsed, assuming there was at least one event for that key.

from tecton import stream_feature_view, Aggregation
from datetime import datetime, timedelta


@stream_feature_view(
source=transactions_stream,
entities=[user],
mode="spark_sql",
online=True,
offline=True,
feature_start_time=datetime(2020, 10, 10),
batch_schedule=timedelta(days=1),
aggregation_interval=timedelta(minutes=10),
aggregations=[
Aggregation(
column="AMOUNT",
function="mean",
time_window=timedelta(hours=1),
name="average_amount_1h",
),
Aggregation(
column="AMOUNT",
function="mean",
time_window=timedelta(hours=12),
name="average_amount_12h",
),
Aggregation(
column="AMOUNT",
function="mean",
time_window=timedelta(hours=24),
name="average_amount_24h",
),
],
)
def user_transaction_amount_averages(transactions_stream):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions_stream}
"""

When using a sliding time-window, Tecton stores partial aggregations in the form of tiles. The tile size is defined by the aggregation_interval parameter. At feature request-time, Tecton's online and offline feature serving capabilities automatically roll up the persisted tiles. This design has several key benefits:

  • Significantly reduced storage requirements if you define several time windows.
  • Reduced stream job memory requirements.
  • Streaming features with a time window that exceeds the streaming source's retention period can be backfilled from the batch source (the historical log of stream events) that backs the stream source. Tecton will backfill historical tiles from the batch source and combine tiles that were written by the streaming source transparently at request time.

Continuous Processing for Low-Latency Ingestion​

Continuous processing for Stream Feature Views can update feature values within a few seconds of when the event is available in the stream data source, rather than waiting for the slide interval to complete. To enable continuous mode, set stream_processing_mode=StreamProcessingMode.CONTINUOUS as shown in the example below:

from tecton import stream_feature_view, Aggregation, StreamProcessingMode
from datetime import datetime, timedelta


@stream_feature_view(
source=transactions_stream,
entities=[user],
mode="spark_sql",
online=True,
offline=True,
feature_start_time=datetime(2020, 10, 10),
batch_schedule=timedelta(days=1),
stream_processing_mode=StreamProcessingMode.CONTINUOUS,
aggregations=[
Aggregation(
column="AMOUNT",
function="mean",
time_window=timedelta(hours=1),
name="average_amount_1h",
),
Aggregation(
column="AMOUNT",
function="mean",
time_window=timedelta(hours=12),
name="average_amount_12h",
),
Aggregation(
column="AMOUNT",
function="mean",
time_window=timedelta(hours=24),
name="average_amount_24h",
),
],
)
def user_transaction_amount_averages(transactions_stream):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions_stream}
"""

When using continuous processing, Tecton will store all transformed events in the online store and run the full aggregations at the time of request.

When to Use Continuous Processing​

You should use continuous processing if your model performance depends on extremely fresh features. For example, it may be important for a fraud detection use-case for features to include previous transactions made even a few seconds prior.

You should not use continuous processing if your model can tolerate features updating every few minutes, and you are trying to optimize costs. Continuous processing may lead to higher infrastructure costs due to more frequent feature writes and checkpointing updates. Using a longer aggregation interval can have lower costs, especially if a single key may have multiple events in a short period of time that can be grouped into a single interval.

Checkpointing costs with continuous processing on EMR

Continuous mode can cause significant S3 costs for customers using Tecton with EMR due to the frequency of writing Spark Streaming checkpoints to S3. The Databricks implementation of Spark Streaming has much lower checkpointing costs.

Productionizing a Stream​

For a Stream Feature View used in production where late data loss is unacceptable, it is recommended to set the Stream Data Source watermark_delay_threshold to your stream retention period, or at least 24 hours. This will configure Spark Structured Streaming to not drop data in the event that it processes the events late or out-of-order. The tradeoff of a longer watermark delay is greater amount of in-memory state used by the streaming job.

The ttl (time-to-live) parameter​

The value of ttl (a Stream Feature View parameter) affects the availability of feature data in the online store, the generation of training feature data, and the deletion of feature values from the online store.

For more details, see The ttl Parameter in Feature Views.

Full list of parameters​

See theΒ API referenceΒ for the full list of parameters.

Was this page helpful?

Happy React is loading...