Skip to main content
Version: Beta 🚧

Debugging Example: Slow Execution of get_historical_features()

info

This page applies to Tecton on Spark and Athena Retrieval, only.

This page shows how to debug a query that is generated by get_historical_features(). The query is much slower than expected.

The setup code below is needed to run the code that follows the setup code.

from tecton import FileConfig, BatchSource, Entity, batch_feature_view, Aggregation
from datetime import datetime, timedelta

batch_config = FileConfig(
uri="s3://tecton.ai.public/tutorials/fraud_demo/transactions/data.pq",
file_format="parquet",
timestamp_field="timestamp",
)
transactions_batch = BatchSource(name="transactions_batch", batch_config=batch_config)
user = Entity(name="fraud_user", join_keys=["user_id"])


@batch_feature_view(
sources=[transactions_batch],
entities=[user],
mode="spark_sql",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(column="transaction", function="count", time_window=timedelta(days=1)),
Aggregation(column="transaction", function="count", time_window=timedelta(days=30)),
Aggregation(column="transaction", function="count", time_window=timedelta(days=90)),
],
online=True,
offline=True,
feature_start_time=datetime(2022, 4, 30),
)
def user_transaction_counts(transactions):
return f"""
SELECT
user_id,
1 as transaction,
timestamp
FROM
{transactions}
"""

The call to get_historical_features() that is returning much slower than expected​

Build a spine and then call get_historical_features() with the spine:

import tecton
import pandas as pd

from datetime import datetime

tecton.set_validation_mode("auto")

spine = pd.DataFrame.from_dict(
{
"timestamp": [datetime(2022, 10, 1, 0, 0, 0)],
"user_id": ["user_268308151877"],
}
)
df = user_transaction_counts.get_historical_features(spine=spine)

The debugging process​

df.to_spark().show()
user_idtimestampuser_transaction_counts__transaction_count_1d_1duser_transaction_counts__transaction_count_30d_1duser_transaction_counts__transaction_count_90d_1d
user_2683081518772022-10-01 00:00:0001336

The result is expected, but the query took much longer than expected. To debug this result, show the query plan:

df.explain()
<1> RenameColsNode: Rename columns with map {'transaction_count_1d_1d': 'user_transaction_counts_no_filter__transaction_count_1d_1d', 'transaction_count_30d_1d': 'user_transaction_counts_no_filter__transaction_count_30d_1d', 'transaction_count_90d_1d': 'user_transaction_counts_no_filter__transaction_count_90d_1d'}. Drop columns ['_anchor_time'].
└── <2> RespectFeatureStartTimeNode: Respect the feature start time for all rows where '_anchor_time' < 2022-04-29T00:00:00+00:00 by setting all feature columns for those rows to NULL
└── <3> AsofJoinFullAggNode(spine, partial_aggregates): Spine asof join partial aggregates, where the join condition is partial_aggregates._anchor_time <= spine._anchor_time and partial aggregates are rolled up to compute full aggregates
├── <4> [spine] AddRetrievalAnchorTimeNode: Add anchor time column '_anchor_time' to represent the most recent feature data available for retrieval. The time at which feature data becomes available for retrieval depends on two factors: the frequency at which the feature view is materialized, and the data delay. Since 'user_transaction_counts_no_filter' is a batch feature view with aggregations, feature data is stored in tiles. Each tile has size equal to the tile interval, which is 86400 seconds. The anchor time column contains the start time of the most recent tile available for retrieval. Let T be the timestamp column 'timestamp'. The anchor time column is calculated as T - (T % batch_schedule) - tile_interval where batch_schedule = 86400 seconds.
│ └── <5> UserSpecifiedDataNode: User provided data with columns timestamp|user_id
└── <6> [partial_aggregates] PartialAggNode: Perform partial aggregations with column '_anchor_time' as the start time of tiles.
└── <7> EntityFilterNode(feature_data, entities): Filter feature data by entities with respect to ['user_id']:
├── <8> [feature_data] FeatureViewPipelineNode(transactions): Evaluate feature view pipeline 'user_transaction_counts_no_filter' with feature time limits [2022-07-03T00:00:00+00:00, 2022-10-02T00:00:00+00:00)
│ └── <9> [transactions] DataSourceScanNode: Scan data source 'transactions_batch'. WARNING: there is no time range filter so all rows will be returned. This can be very inefficient.
└── <10> [entities] SelectDistinctNode: Select distinct with columns ['user_id'].
└── <11> AddRetrievalAnchorTimeNode: Add anchor time column '_anchor_time' to represent the most recent feature data available for retrieval. The time at which feature data becomes available for retrieval depends on two factors: the frequency at which the feature view is materialized, and the data delay. Since 'user_transaction_counts_no_filter' is a batch feature view with aggregations, feature data is stored in tiles. Each tile has size equal to the tile interval, which is 86400 seconds. The anchor time column contains the start time of the most recent tile available for retrieval. Let T be the timestamp column 'timestamp'. The anchor time column is calculated as T - (T % batch_schedule) - tile_interval where batch_schedule = 86400 seconds.
└── <12> UserSpecifiedDataNode: User provided data with columns timestamp|user_id

Node 9 reveals a potential issue with the DataSourceScanNode. It is scanning the full data source transactions_batch, without any time range filter. This is an expensive operation.

Hypothesis: The root cause is that the user_transaction_counts feature view is not using a FilteredSource.

Correcting the feature view to fix the issue​

You can fix the original feature view by using a FilteredSource:

from tecton import FilteredSource


@batch_feature_view(
sources=[FilteredSource(transactions_batch)],
entities=[user],
mode="spark_sql",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(column="transaction", function="count", time_window=timedelta(days=1)),
Aggregation(column="transaction", function="count", time_window=timedelta(days=30)),
Aggregation(column="transaction", function="count", time_window=timedelta(days=90)),
],
online=True,
offline=True,
feature_start_time=datetime(2022, 4, 30),
)
def user_transaction_counts(transactions):
return f"""
SELECT
user_id,
1 as transaction,
timestamp
FROM
{transactions}
"""

You can then test the original query again:

import tecton
import pandas as pd

from datetime import datetime

tecton.set_validation_mode("auto")

spine = pd.DataFrame.from_dict(
{
"timestamp": [datetime(2022, 10, 1, 0, 0, 0)],
"user_id": ["user_268308151877"],
}
)
df = user_transaction_counts.get_historical_features(spine=spine)
df.to_spark().show()
user_idtimestampuser_transaction_counts__transaction_count_1d_1duser_transaction_counts__transaction_count_30d_1duser_transaction_counts__transaction_count_90d_1d
user_2683081518772022-10-01 00:00:0001336

This time, get_historical_features() should return much faster. You can also confirm that this is because it is correctly filtering the data source:

<1> RenameColsNode: Rename columns with map {'transaction_count_1d_1d': 'user_transaction_counts__transaction_count_1d_1d', 'transaction_count_30d_1d': 'user_transaction_counts__transaction_count_30d_1d', 'transaction_count_90d_1d': 'user_transaction_counts__transaction_count_90d_1d'}. Drop columns ['_anchor_time'].
└── <2> RespectFeatureStartTimeNode: Respect the feature start time for all rows where '_anchor_time' < 2022-04-29T00:00:00+00:00 by setting all feature columns for those rows to NULL
└── <3> AsofJoinFullAggNode(spine, partial_aggregates): Spine asof join partial aggregates, where the join condition is partial_aggregates._anchor_time <= spine._anchor_time and partial aggregates are rolled up to compute full aggregates
├── <4> [spine] AddRetrievalAnchorTimeNode: Add anchor time column '_anchor_time' to represent the most recent feature data available for retrieval. The time at which feature data becomes available for retrieval depends on two factors: the frequency at which the feature view is materialized, and the data delay. Since 'user_transaction_counts' is a batch feature view with aggregations, feature data is stored in tiles. Each tile has size equal to the tile interval, which is 86400 seconds. The anchor time column contains the start time of the most recent tile available for retrieval. Let T be the timestamp column 'timestamp'. The anchor time column is calculated as T - (T % batch_schedule) - tile_interval where batch_schedule = 86400 seconds.
│ └── <5> UserSpecifiedDataNode: User provided data with columns timestamp|user_id
└── <6> [partial_aggregates] PartialAggNode: Perform partial aggregations with column '_anchor_time' as the start time of tiles.
└── <7> EntityFilterNode(feature_data, entities): Filter feature data by entities with respect to ['user_id']:
├── <8> [feature_data] FeatureViewPipelineNode(transactions): Evaluate feature view pipeline 'user_transaction_counts' with feature time limits [2022-07-03T00:00:00+00:00, 2022-10-02T00:00:00+00:00)
│ └── <9> [transactions] DataSourceScanNode: Scan data source 'transactions_batch' and apply time range filter [2022-07-03T00:00:00+00:00, 2022-10-02T00:00:00+00:00)
└── <10> [entities] SelectDistinctNode: Select distinct with columns ['user_id'].
└── <11> AddRetrievalAnchorTimeNode: Add anchor time column '_anchor_time' to represent the most recent feature data available for retrieval. The time at which feature data becomes available for retrieval depends on two factors: the frequency at which the feature view is materialized, and the data delay. Since 'user_transaction_counts' is a batch feature view with aggregations, feature data is stored in tiles. Each tile has size equal to the tile interval, which is 86400 seconds. The anchor time column contains the start time of the most recent tile available for retrieval. Let T be the timestamp column 'timestamp'. The anchor time column is calculated as T - (T % batch_schedule) - tile_interval where batch_schedule = 86400 seconds.
└── <12> UserSpecifiedDataNode: User provided data with columns timestamp|user_id

Was this page helpful?

Happy React is loading...