Batch Feature View
A Batch Feature View defines transformations against one or more Batch Sources, which are data sources (e.g. S3, Hive, Redshift, Snowflake). Batch Feature Views can be scheduled to materialize new feature data to the Online and Offline Feature Stores on a regular cadence. They can also run automatic backfills when they’re first created.
With Tecton on Snowflake, a Batch Feature View can define a transformation against a Snowflake data source, only.
Common Feature Examples:
- determining if a user's credit score is over a pre-defined threshold
- counting the total number of transactions over a time window in batch
- batch ingesting pre-computed feature values from an existing batch source
Batch Feature Transformations
A Batch Feature View defines a transformation that turns your raw data into feature values. When Tecton executes the transformation, it always runs directly in the data platform you connected to Tecton (e.g. your Snowflake warehouse or your Databricks cluster).
- Spark
- Snowflake
from tecton import batch_feature_view
@batch_feature_view(sources=[transactions], entities=[user], mode="spark_sql")
def user_last_transaction_amount(transactions):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions}
"""
from tecton import batch_feature_view
@batch_feature_view(sources=[transactions], entities=[user], mode="snowflake_sql")
def user_last_transaction_amount(transactions):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions}
"""
Materialization to the Online and Offline Feature Store
Online Materialization
You can easily make batch features available for low-latency online retrieval to
feed an online model. Simply set online=True
, a batch_schedule
that
determines how frequently Tecton runs your transformation and a
feature_start_time
date to backfill feature data from.
- Spark
- Snowflake
from tecton import batch_feature_view
from datetime import datetime, timedelta
@batch_feature_view(
sources=[transactions],
entities=[user],
mode="spark_sql",
online=True,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
)
def user_last_transaction_amount(transactions):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions}
"""
from tecton import batch_feature_view
from datetime import datetime, timedelta
@batch_feature_view(
sources=[transactions],
entities=[user],
mode="snowflake_sql",
online=True,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
)
def user_last_transaction_amount(transactions):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions}
"""
Offline Materialization
Tecton also supports offline materialization. This can speed up some expensive
queries considerably. When you set offline=True
, Tecton will materialize
offline feature data to an offline table according to the batch_schedule
and
feature_start_time
.
This feature is not supported in Tecton on Snowflake, except when using incremental backfills.
If you are interested in this functionality, please file a feature request.
from tecton import batch_feature_view
from datetime import datetime, timedelta
@batch_feature_view(
sources=[transactions],
entities=[user],
mode="spark_sql",
offline=True,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
)
def user_last_transaction_amount(transactions):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions}
"""
If you don’t materialize Batch Feature Views offline, Tecton will always execute your transformation when you use the Tecton SDK to generate offline data. Speaking in SQL terms, a Batch Feature View without offline materialization is simply a “View”. A Batch Feature View with offline materialization is a “Materialized View”.
Offline materialization has additional benefits including:
- Online-offline skew is minimized because the data in the online and offline store is incrementally updated at the same time.
- Offline feature data is saved so it is resilient to any losses of historical data upstream.
Materialization Feature Data Filtering
Every materialization run is expected to produce feature values for a specific time range. This time range is known as the “materialization window”. The materialization window is different for backfills and incremental runs:
- During the initial backfill of feature data to the Online and Offline Feature
Store, the materialization time window starts with
feature_start_time
and ends with Tecton’s “current wall clock time” at which the feature’s materialization is enabled. - On incremental runs, the materialization time window starts with the previous
run’s
start_time
and ends withstart_time + batch_schedule
.
Tecton only materializes feature values that fall within the materialization
time window. It automatically applies a filter to the Feature View
transformation as shown with the WHERE
clause below:
--Tecton applies this filter to the user-provided transformation
SELECT * FROM {batch_feature_view_transformation}
WHERE {timestamp_field} >= {start_time}
AND {timestamp_field} < {end_time}
The start time of the window is inclusive and the end time is exclusive. This
means that a feature value whose timestamp is exactly equal to the end_time
is
not part of the window.
Creating Efficient Transformations using Raw Data Filtering
In many cases, running the transformation against the full set of raw data is unnecessary and inefficient. Some data platforms will push down Tecton’s feature data filter (shown above) to the raw data, only reading what is needed, but this is not always the case.
We strongly recommend adding raw data filtering to Spark-based Feature Views in order to achieve good performance.
Filtering Using the Context Object
In your Batch Feature View transformation, you can filter for the raw data
needed to produce feature values on each run by leveraging a context
object
that Tecton passes into the transformation. context.start_time
and
context.end_time
are equal to the expected materialization time window as
shown in the diagram below:
The example transformation below filters for the required raw data in the
WHERE
clause.
- Spark
- Snowflake
from tecton import batch_feature_view, materialization_context
from datetime import datetime, timedelta
@batch_feature_view(
sources=[transactions],
entities=[user],
online=True,
mode="spark_sql",
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
offline=True,
)
def user_last_transaction_amount(transactions, context=materialization_context()):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions}
WHERE TIMESTAMP >= TO_TIMESTAMP("{context.start_time}")
AND TIMESTAMP < TO_TIMESTAMP("{context.end_time}")
"""
from tecton import batch_feature_view, materialization_context
from datetime import datetime, timedelta
@batch_feature_view(
sources=[transactions],
entities=[user],
online=True,
mode="snowflake_sql",
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
offline=True,
)
def user_last_transaction_amount(transactions, context=materialization_context()):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions}
WHERE TIMESTAMP >= TO_TIMESTAMP("{context.start_time}")
AND TIMESTAMP < TO_TIMESTAMP("{context.end_time}")
"""
In cases where you read from a time-partitioned data source, like a Glue table or partitioned data on S3, you will also want to filter by partition columns.
Filtering Using the FilteredSource Class
For convenience, Tecton offers a FilteredSource
class that applies timestamp
and partition filtering to the data source automatically based on parameters set
on the Batch Data Source object: timestamp_field
and
datetime_partition_columns
(if using Hive). This replaces the need for the
WHERE
clause in the transformation above. Tecton will automatically filter the
data source’s data based on its timestamp_field
and, if applicable, its
datetime_partition_columns
.
- Spark
- Snowflake
from tecton import batch_feature_view, FilteredSource
from datetime import datetime, timedelta
@batch_feature_view(
sources=[FilteredSource(transactions)],
entities=[user],
mode="spark_sql",
online=True,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
offline=True,
)
def user_last_transaction_amount(transactions):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions}
"""
from tecton import batch_feature_view, FilteredSource
from datetime import datetime, timedelta
@batch_feature_view(
sources=[FilteredSource(transactions)],
entities=[user],
mode="snowflake_sql",
online=True,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
offline=True,
)
def user_last_transaction_amount(transactions):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions}
"""
By default, FilteredSource
filters for data between context.start_time
and
context.end_time
. If needed, a start_time_offset
parameter can be set with a
negative offset to increase the amount of raw data that gets read. For example,
setting start_time_offset
with a negative offset of 10 days
(timedelta(days=-10)
) will filter for data in the range
[context.start_time
- 10 days, context.end_time
):
FilteredSource(transactions, start_time_offset=timedelta(days=-10))
Creating Features that use Time-Windowed Aggregations
Using Built-in Time-Windowed Aggregations
Tecton provides built-in implementations of common time-windowed aggregations that simplify transformation logic and ensure correct feature value computation.
For a list of available time-window aggregation functions and other information about the functions, refer to the Time-Window Aggregation Functions Reference.
Time-windowed aggregations can be specified in the Batch Feature View decorator
using the aggregations
and aggregation_interval
parameters as shown below.
Tecton expects the provided SQL query to select the raw events (with timestamps)
to be aggregated.
- Spark
- Snowflake
from tecton import batch_feature_view, Aggregation, materialization_context
from datetime import datetime, timedelta
@batch_feature_view(
sources=[transactions],
entities=[user],
mode="spark_sql",
online=True,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
offline=True,
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(
column="AMOUNT",
function="mean",
time_window=timedelta(days=30),
name="average_amount_30d",
),
Aggregation(
column="AMOUNT",
function="mean",
time_window=timedelta(days=60),
name="average_amount_60d",
),
Aggregation(
column="AMOUNT",
function="mean",
time_window=timedelta(days=90),
name="average_amount_90d",
),
],
)
def user_transaction_features(transactions, context=materialization_context()):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions}
WHERE TIMESTAMP >= TO_TIMESTAMP("{context.start_time}")
AND TIMESTAMP < TO_TIMESTAMP("{context.end_time}")
"""
from tecton import batch_feature_view, Aggregation, materialization_context
from datetime import datetime, timedelta
@batch_feature_view(
sources=[transactions],
entities=[user],
mode="snowflake_sql",
online=True,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
offline=True,
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(
column="AMOUNT",
function="mean",
time_window=timedelta(days=30),
name="average_amount_30d",
),
Aggregation(
column="AMOUNT",
function="mean",
time_window=timedelta(days=60),
name="average_amount_60d",
),
Aggregation(
column="AMOUNT",
function="mean",
time_window=timedelta(days=90),
name="average_amount_90d",
),
],
)
def user_transaction_features(transactions, context=materialization_context()):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions}
WHERE TIMESTAMP >= TO_TIMESTAMP("{context.start_time}")
AND TIMESTAMP < TO_TIMESTAMP("{context.end_time}")
"""
If the aggregation you need is not supported, please make a feature request or define your own custom aggregation.
How Time-Windowed Aggregations are Computed
Behind the scenes, Tecton stores partial aggregations in the form of "tiles".
The tile size is defined by the aggregation_interval
parameter. At feature
request-time, Tecton's online and offline feature serving capabilities
automatically roll up the persisted tiles to produce the final feature value
over the entire time window. This has several key benefits:
- Significantly reduced storage requirements if you define several time windows, because all time windows will share the same underlying tiles
- Reduced precompute resource requirements, given that Tecton needs to only compute incremental tiles and not the entire time window
Defining Custom Time-Windowed Aggregations
If you want to use a time-windowed aggregation that is not a Tecton built-in
aggregation, you can define a custom time-windowed aggregation by setting
incremental_backfills=True
. You can learn more about implementing this by
referring to the
Custom Batch Aggregations Reference.
Incremental backfills cannot be used with built-in time-windowed aggregations.
Configuring a Data Source Used By a Batch Feature View to Account for Late Arriving Data
By default, incremental materialization jobs for Batch Feature Views run
immediately at the end of the batch schedule period. To override this default,
set the data_delay
parameter, which is specified in the data source
configuration (the batch_config
object referenced in the BatchSource
object
used by the Batch Feature View). data_delay
configures how long jobs wait
after the end of the batch schedule period before starting. This is typically to
ensure that all data has landed. For example, if a Batch Feature View has a
batch_schedule
of 1 day and its data source input has
data_delay=timedelta(hours=1)
set, then incremental batch materialization jobs
will run at 01:00 UTC.
The ttl
(time-to-live) parameter
The value of ttl
(a Batch Feature View parameter) affects the availability of
feature data in the online store, the generation of training feature data, and
the deletion of feature values from the online store.
For more details, see The ttl Parameter in Feature Views.
Full list of parameters
See the API reference for the full list of parameters.