tecton.declarative.batch_feature_view¶
-
tecton.declarative.
batch_feature_view
(*, mode, sources, entities, aggregation_interval=None, aggregations=[], online=False, offline=False, ttl=None, feature_start_time=None, batch_schedule=None, online_serving_index=None, batch_compute=None, offline_store=ParquetConfig(), online_store=None, monitor_freshness=False, expected_feature_freshness=None, alert_email=None, description=None, owner=None, tags=None, timestamp_field=None, name=None, max_batch_aggregation_interval=None, incremental_backfills=False)¶ Declare a Batch Feature View.
- Parameters
mode (
str
) – Whether the annotated function is a pipeline function (“pipeline” mode) or a transformation function (“spark_sql” or “pyspark” mode). For the non-pipeline mode, an inferred transformation will also be registered.sources (
Sequence
[Union
[BatchDataSource
,BatchSource
,FilteredSource
]]) – The data source inputs to the feature view.entities (
List
[Union
[BaseEntity
,OverriddenEntity
]]) – The entities this feature view is associated with.aggregation_interval (
Optional
[timedelta
]) – How frequently the feature value is updated (for example, “1h” or “6h”)aggregations (
List
[Aggregation
]) – A list ofAggregation
structs.online (
Optional
[bool
]) – Whether the feature view should be materialized to the online feature store. (Default: False)offline (
Optional
[bool
]) – Whether the feature view should be materialized to the offline feature store. (Default: False)ttl (
Optional
[timedelta
]) – The TTL (or “look back window”) for features defined by this feature view. This parameter determines how long features will live in the online store and how far to “look back” relative to a training example’s timestamp when generating offline training sets. Shorter TTLs improve performance and reduce costs.feature_start_time (
Union
[DateTime
,datetime
,None
]) – When materialization for this feature view should start from. (Required if offline=true)batch_schedule (
Optional
[timedelta
]) – The interval at which batch materialization should be scheduled.online_serving_index (
Optional
[List
[str
]]) – (Advanced) Defines the set of join keys that will be indexed and queryable during online serving.batch_compute (
Union
[DatabricksClusterConfig
,EMRClusterConfig
,None
]) – Batch materialization cluster configuration.offline_store (
Union
[ParquetConfig
,DeltaConfig
,None
]) – Configuration for how data is written to the offline feature store.online_store (
Union
[DynamoConfig
,RedisConfig
,None
]) – Configuration for how data is written to the online feature store.monitor_freshness (
bool
) – If true, enables monitoring when feature data is materialized to the online feature store.expected_feature_freshness (
Optional
[timedelta
]) – Threshold used to determine if recently materialized feature data is stale. Data is stale ifnow - most_recent_feature_value_timestamp > expected_feature_freshness
. For feature views using Tecton aggregations, data is stale ifnow - round_up_to_aggregation_interval(most_recent_feature_value_timestamp) > expected_feature_freshness
. Whereround_up_to_aggregation_interval()
rounds up the feature timestamp to the end of theaggregation_interval
. Value must be at least 2 timesaggregation_interval
. If not specified, a value determined by the Tecton backend is used.alert_email (
Optional
[str
]) – Email that alerts for this FeatureView will be sent to.owner (
Optional
[str
]) – Owner name (typically the email of the primary maintainer).tags (
Optional
[Dict
[str
,str
]]) – Tags associated with this Tecton Object (key-value pairs of arbitrary metadata).timestamp_field (
Optional
[str
]) – The column name that refers to the timestamp for records that are produced by the feature view. This parameter is optional if exactly one column is a Timestamp type. This parameter is required if using Tecton on Snowflake without Snowpark.name (
Optional
[str
]) – Unique, human friendly name that identifies the FeatureView. Defaults to the function name.max_batch_aggregation_interval (
Optional
[timedelta
]) – (Advanced) makes batch job scheduler group jobs together for efficiency.incremental_backfills (
bool
) – If set to True, the feature view will be backfilled one interval at a time as if it had been updated “incrementally” since its feature_start_time. For example, if batch_interval is 1 day and feature_start_time is 1 year prior to the current time, then the backfill will run 365 separate materialization jobs to fill the historical feature data.
- Returns
An object of type
tecton.declarative.feature_view.MaterializedFeatureView
.
Example BatchFeatureView declaration:
from datetime import datetime from datetime import timedelta from fraud.entities import user from fraud.data_sources.credit_scores_batch import credit_scores_batch from tecton import batch_feature_view, Aggregation, FilteredSource @batch_feature_view( sources=[FilteredSource(credit_scores_batch)], entities=[user], mode='spark_sql', online=True, offline=True, feature_start_time=datetime(2020, 10, 10), batch_schedule=timedelta(days=1), tll=timedelta(days=60), description="Features about the users most recent transaction in the past 60 days. Updated daily.", ) def user_last_transaction_features(credit_scores_batch): return f''' SELECT USER_ID, TIMESTAMP, AMOUNT as LAST_TRANSACTION_AMOUNT, CATEGORY as LAST_TRANSACTION_CATEGORY FROM {credit_scores_batch} '''
Example BatchFeatureView declaration using aggregates:
from datetime import datetime from datetime import timedelta from fraud.entities import user from fraud.data_sources.credit_scores_batch import credit_scores_batch from tecton import batch_feature_view, Aggregation, FilteredSource @batch_feature_view( sources=[FilteredSource(credit_scores_batch)], entities=[user], mode='spark_sql', online=True, offline=True, feature_start_time=datetime(2020, 10, 10), aggregations=[ Aggregation(column="amount", function="mean", time_window=timedelta(days=1)), Aggregation(column="amount", function="mean", time_window=timedelta(days=30)), ], aggregation_interval=timedelta(days=1), description="Transaction amount statistics and total over a series of time windows, updated daily.", ) def user_recent_transaction_aggregate_features(credit_scores_batch): return f''' SELECT USER_ID, AMOUNT, TIMESTAMP FROM {credit_scores_batch} '''