tecton.declarative.feature_view.MaterializedFeatureView¶
-
class
tecton.declarative.feature_view.
MaterializedFeatureView
(name, pipeline_function, sources, entities, online, offline, offline_store, online_store, aggregation_interval, aggregations, ttl, feature_start_time, batch_schedule, online_serving_index, batch_compute, stream_compute, monitor_freshness, expected_feature_freshness, alert_email, description, owner, tags, inferred_transform, feature_view_type, timestamp_field, data_source_type, user_function, incremental_backfills, aggregation_mode=None, max_batch_aggregation_interval=None, output_stream=None, batch_trigger=None)¶ Stream/Batch Feature View class to include in Feature Services or to use in unit testing.
Do not instantiate this class directly. Use a decorator-based constructor instead:
Methods
Do not directly use this constructor. Internal constructor for materialized FeatureViews.
Run the FeatureView using mock data sources.
Used to rebind join keys for a Feature View used in a Feature Service.
Used to rename a Feature View used in a Feature Service.
-
__init__
(name, pipeline_function, sources, entities, online, offline, offline_store, online_store, aggregation_interval, aggregations, ttl, feature_start_time, batch_schedule, online_serving_index, batch_compute, stream_compute, monitor_freshness, expected_feature_freshness, alert_email, description, owner, tags, inferred_transform, feature_view_type, timestamp_field, data_source_type, user_function, incremental_backfills, aggregation_mode=None, max_batch_aggregation_interval=None, output_stream=None, batch_trigger=None)¶ Do not directly use this constructor. Internal constructor for materialized FeatureViews.
-
run
(spark, start_time, end_time, aggregation_level=None, **mock_sources)¶ Run the FeatureView using mock data sources. This requires a local spark session.
- Parameters
start_time (
Optional
[datetime
]) – The start time of the time window to materialize. If not set, defaults to end_time minus batch_schedule.end_time (
Optional
[datetime
]) – The end time of the time window to materialize. If not set, defaults to start_time plus batch_scheduleaggregation_level (
Optional
[str
]) –For feature views with aggregations, aggregation_level configures what stage of the aggregation to run up to.
The query for Aggregate Feature Views operates in three logical steps:
The feature view query is run over the provided time range. The user defined transformations are applied over the data source.
The result of #1 is aggregated into tiles the size of the aggregation_interval.
The tiles from #2 are combined to form the final feature values. The number of tiles that are combined is based off of the time_window of the aggregation.
For testing and debugging purposes, to see the output of #1, use
aggregation_level="disabled"
. For #2, useaggregation_level="partial"
. For #3, useaggregation_level="full"
.aggregation_level="full"
is the default behavior.**mock_sources – kwargs with expected same keys as the FeatureView’s inputs parameter. Each input name maps to a Spark DataFrame that should be evaluated for that node in the pipeline.
Example:
from datetime import datetime, timedelta import pandas from fraud.features.batch_features.user_credit_card_issuer import user_credit_card_issuer # The `tecton_pytest_spark_session` is a PyTest fixture that provides a # Tecton-defined PySpark session for testing Spark transformations and feature # views. def test_user_distinct_merchant_transaction_count_30d(tecton_pytest_spark_session): input_pandas_df = pandas.DataFrame({ "user_id": ["user_1", "user_2", "user_3", "user_4"], "signup_timestamp": [datetime(2022, 5, 1)] * 4, "cc_num": [1000000000000000, 4000000000000000, 5000000000000000, 6000000000000000], }) input_spark_df = tecton_pytest_spark_session.createDataFrame(input_pandas_df) # Simulate materializing features for May 1st. output = user_credit_card_issuer.run( spark=tecton_pytest_spark_session, start_time=datetime(2022, 5, 1), end_time=datetime(2022, 5, 2), fraud_users_batch=input_spark_df) actual = output.toPandas() expected = pandas.DataFrame({ "user_id": ["user_1", "user_2", "user_3", "user_4"], "signup_timestamp": [datetime(2022, 5, 1)] * 4, "credit_card_issuer": ["other", "Visa", "MasterCard", "Discover"], }) pandas.testing.assert_frame_equal(actual, expected)
- Returns
A
tecton.TectonDataFrame
object.
-
with_join_key_map
(join_key_map)¶ Used to rebind join keys for a Feature View used in a Feature Service. The keys in join_key_map should be the feature view join keys, and the values should be the feature service overrides.
from tecton import FeatureService # The join key for this feature service will be "feature_service_user_id". feature_service = FeatureService( name="feature_service", features=[ my_feature_view.with_join_key_map({"user_id" : "feature_service_user_id"}), ], ) # Here is a more sophisticated example. The join keys for this feature service will be "transaction_id", # "sender_id", and "recipient_id" and will contain three feature views named "transaction_features", # "sender_features", and "recipient_features". transaction_fraud_service = FeatureService( name="transaction_fraud_service", features=[ # Select a subset of features from a feature view. transaction_features[["amount"]], # Rename a feature view and/or rebind its join keys. In this example, we want user features for both the # transaction sender and recipient, so include the feature view twice and bind it to two different feature # service join keys. user_features.with_name("sender_features").with_join_key_map({"user_id" : "sender_id"}), user_features.with_name("recipient_features").with_join_key_map({"user_id" : "recipient_id"}), ], )
-
with_name
(namespace)¶ Used to rename a Feature View used in a Feature Service.
from tecton import FeatureService # The feature view in this feature service will be named "new_named_feature_view" in training data dataframe # columns and other metadata. feature_service = FeatureService( name="feature_service", features=[ my_feature_view.with_name("new_named_feature_view") ], ) # Here is a more sophisticated example. The join keys for this feature service will be "transaction_id", # "sender_id", and "recipient_id" and will contain three feature views named "transaction_features", # "sender_features", and "recipient_features". transaction_fraud_service = FeatureService( name="transaction_fraud_service", features=[ # Select a subset of features from a feature view. transaction_features[["amount"]], # Rename a feature view and/or rebind its join keys. In this example, we want user features for both the # transaction sender and recipient, so include the feature view twice and bind it to two different feature # service join keys. user_features.with_name("sender_features").with_join_key_map({"user_id" : "sender_id"}), user_features.with_name("recipient_features").with_join_key_map({"user_id" : "recipient_id"}), ], )
Attributes
name
Name of this Tecton Object.
-