Batch Feature View Examples
Row-Level SQL Transformation​
- Spark
- Snowflake
from tecton import batch_feature_view
from fraud.entities import user
from fraud.data_sources.fraud_users import fraud_users_batch
from datetime import datetime, timedelta
@batch_feature_view(
    sources=[fraud_users_batch],
    entities=[user],
    mode="spark_sql",
    online=False,
    offline=False,
    # Note the timestamp is the signup date, hence the old start_time.
    feature_start_time=datetime(2017, 1, 1),
    batch_schedule=timedelta(days=1),
    ttl=timedelta(days=3650),
    timestamp_field="signup_timestamp",
    tags={"release": "production"},
    owner="matt@tecton.ai",
    description="User credit card issuer derived from the user credit card number.",
)
def user_credit_card_issuer(fraud_users_batch):
    return f"""
        SELECT
            user_id,
            signup_timestamp,
            CASE SUBSTRING(CAST(cc_num AS STRING), 0, 1)
                WHEN '4' THEN 'Visa'
                WHEN '5' THEN 'MasterCard'
                WHEN '6' THEN 'Discover'
                ELSE 'other'
            END as credit_card_issuer
        FROM
            {fraud_users_batch}
        """
from tecton import batch_feature_view
from fraud.entities import user
from fraud.data_sources.fraud_users import fraud_users_batch
from datetime import datetime, timedelta
@batch_feature_view(
    sources=[fraud_users_batch],
    entities=[user],
    mode="snowflake_sql",
    online=False,
    offline=False,
    # Note the timestamp is the signup date, hence the old start_time.
    feature_start_time=datetime(2017, 1, 1),
    batch_schedule=timedelta(days=1),
    ttl=timedelta(days=3650),
    timestamp_field="signup_timestamp",
    tags={"release": "production"},
    owner="matt@tecton.ai",
    description="User credit card issuer derived from the user credit card number.",
)
def user_credit_card_issuer(fraud_users_batch):
    return f"""
        SELECT
            user_id,
            signup_timestamp,
            CASE SUBSTRING(CAST(cc_num AS STRING), 0, 1)
                WHEN '4' THEN 'Visa'
                WHEN '5' THEN 'MasterCard'
                WHEN '6' THEN 'Discover'
                ELSE 'other'
            END as credit_card_issuer
        FROM
            {fraud_users_batch}
        """
Time-Windowed Aggregations​
- Spark
- Snowflake
from tecton import batch_feature_view, FilteredSource, Aggregation
from fraud.entities import user
from fraud.data_sources.transactions import transactions_batch
from datetime import datetime, timedelta
@batch_feature_view(
    sources=[FilteredSource(transactions_batch)],
    entities=[user],
    mode="spark_sql",
    aggregation_interval=timedelta(days=1),
    aggregations=[
        Aggregation(column="transaction", function="count", time_window=timedelta(days=1)),
        Aggregation(column="transaction", function="count", time_window=timedelta(days=30)),
        Aggregation(column="transaction", function="count", time_window=timedelta(days=90)),
    ],
    online=True,
    offline=True,
    feature_start_time=datetime(2022, 5, 1),
    tags={"release": "production"},
    owner="matt@tecton.ai",
    description="User transaction totals over a series of time windows, updated daily.",
)
def user_transaction_counts(transactions):
    return f"""
        SELECT
            user_id,
            1 as transaction,
            timestamp
        FROM
            {transactions}
        """
from tecton import batch_feature_view, FilteredSource, Aggregation
from fraud.entities import user
from fraud.data_sources.transactions import transactions_batch
from datetime import datetime, timedelta
@batch_feature_view(
    sources=[FilteredSource(transactions_batch)],
    entities=[user],
    mode="snowflake_sql",
    aggregation_interval=timedelta(days=1),
    aggregations=[
        Aggregation(column="transaction", function="count", time_window=timedelta(days=1)),
        Aggregation(column="transaction", function="count", time_window=timedelta(days=30)),
        Aggregation(column="transaction", function="count", time_window=timedelta(days=90)),
    ],
    online=True,
    offline=True,
    feature_start_time=datetime(2022, 5, 1),
    tags={"release": "production"},
    owner="matt@tecton.ai",
    description="User transaction totals over a series of time windows, updated daily.",
)
def user_transaction_counts(transactions):
    return f"""
        SELECT
            user_id,
            1 as transaction,
            timestamp
        FROM
            {transactions}
        """
Row-Level PySpark Transformation​
from tecton import batch_feature_view
from fraud.entities import user
from fraud.data_sources.fraud_users import fraud_users_batch
from datetime import datetime, timedelta
@batch_feature_view(
    sources=[fraud_users_batch],
    entities=[user],
    mode="pyspark",
    online=False,
    offline=False,
    # Note the timestamp is the signup date, hence the old start_time.
    feature_start_time=datetime(2017, 1, 1),
    batch_schedule=timedelta(days=1),
    ttl=timedelta(days=3650),
    tags={"release": "production"},
    owner="matt@tecton.ai",
    description="User date of birth, entered at signup.",
)
def user_date_of_birth(fraud_users_batch):
    from pyspark.sql import functions as f
    return (
        fraud_users_batch.withColumn("user_date_of_birth", f.date_format(f.col("dob"), "yyyy-MM-dd"))
        .withColumnRenamed("signup_timestamp", "timestamp")
        .select("user_id", "user_date_of_birth", "timestamp")
    )
Custom Aggregations​
- Spark
- Snowflake
from tecton import batch_feature_view, FilteredSource, materialization_context
from fraud.entities import user
from fraud.data_sources.transactions import transactions_batch
from datetime import datetime, timedelta
@batch_feature_view(
    sources=[FilteredSource(transactions_batch, start_time_offset=timedelta(days=-29))],
    entities=[user],
    mode="spark_sql",
    online=True,
    offline=True,
    feature_start_time=datetime(2022, 4, 1),
    incremental_backfills=True,
    batch_schedule=timedelta(days=1),
    ttl=timedelta(days=2),
    owner="david@tecton.ai",
    tags={"release": "production"},
    description="How many transactions the user has made to distinct merchants in the last 30 days.",
)
def user_distinct_merchant_transaction_count_30d(transactions_batch, context=materialization_context()):
    return f"""
        SELECT
            user_id,
            TO_TIMESTAMP("{context.end_time}") - INTERVAL 1 MICROSECOND as timestamp,
            COUNT(DISTINCT merchant) AS distinct_merchant_transaction_count_30d
        FROM {transactions_batch}
        GROUP BY
            user_id
    """
from tecton import batch_feature_view, FilteredSource, materialization_context
from fraud.entities import user
from fraud.data_sources.transactions import transactions_batch
from datetime import datetime, timedelta
@batch_feature_view(
    sources=[FilteredSource(transactions_batch, start_time_offset=timedelta(days=-29))],
    entities=[user],
    mode="snowflake_sql",
    online=True,
    offline=True,
    feature_start_time=datetime(2022, 4, 1),
    incremental_backfills=True,
    batch_schedule=timedelta(days=1),
    ttl=timedelta(days=2),
    owner="david@tecton.ai",
    tags={"release": "production"},
    description="How many transactions the user has made to distinct merchants in the last 30 days.",
)
def user_distinct_merchant_transaction_count_30d(transactions_batch, context=materialization_context()):
    return f"""
        SELECT
            user_id,
            TIMESTAMPADD(microsecond, -1, TO_TIMESTAMP('{context.end_time}')) as timestamp,
            COUNT(DISTINCT merchant) AS distinct_merchant_transaction_count_30d
        FROM {transactions_batch}
        GROUP BY
            user_id
    """
Modular Transformation Pipeline​
- Spark
- Snowflake
from tecton import transformation, FilteredSource, batch_feature_view, const
from ads.entities import auction
from ads.data_sources.ad_impressions import ad_impressions_batch
from datetime import datetime, timedelta
# Create new column by splitting the string in an existing column.
@transformation(mode="spark_sql")
def str_split(input_data, column_to_split, new_column_name, delimiter):
    return f"""
    SELECT
        *,
        split({column_to_split}, {delimiter}) AS {new_column_name}
    FROM {input_data}
    """
# Create features based on the keyword array
@transformation(mode="spark_sql")
def keyword_stats(input_data, keyword_column):
    return f"""
    SELECT
        auction_id,
        timestamp,
        {keyword_column} AS keyword_list,
        size({keyword_column}) AS num_keywords,
        array_contains({keyword_column}, "bitcoin") AS keyword_contains_bitcoin
    FROM {input_data}
    """
# This feature view runs in pipeline mode to turn the keyword string into an
# array of words, then create metrics based on that array.
@batch_feature_view(
    mode="pipeline",
    sources=[FilteredSource(ad_impressions_batch)],
    entities=[auction],
    ttl=timedelta(days=1),
    batch_schedule=timedelta(days=1),
    online=False,
    offline=False,
    feature_start_time=datetime(2022, 5, 1),
    owner="derek@tecton.ai",
    tags={"release": "production"},
)
def auction_keywords(ad_impressions):
    split_keywords = str_split(ad_impressions, const("content_keyword"), const("keywords"), const("' '"))
    return keyword_stats(split_keywords, const("keywords"))
from tecton import transformation, FilteredSource, batch_feature_view, const
from ads.entities import auction
from ads.data_sources.ad_impressions import ad_impressions_batch
from datetime import datetime, timedelta
# Create new column by splitting the string in an existing column.
@transformation(mode="snowflake_sql")
def str_split(input_data, column_to_split, new_column_name, delimiter):
    return f"""
    SELECT
        *,
        split({column_to_split}, {delimiter}) AS {new_column_name}
    FROM {input_data}
    """
# Create features based on the keyword array
@transformation(mode="snowflake_sql")
def keyword_stats(input_data, keyword_column):
    return f"""
    SELECT
        auction_id,
        timestamp,
        {keyword_column} AS keyword_list,
        array_size({keyword_column}) AS num_keywords,
        array_contains('bitcoin'::variant, {keyword_column}) AS keyword_contains_bitcoin
    FROM {input_data}
    """
# This feature view runs in pipeline mode to turn the keyword string into an
# array of words, then create metrics based on that array.
@batch_feature_view(
    mode="pipeline",
    sources=[FilteredSource(ad_impressions_batch)],
    entities=[auction],
    ttl=timedelta(days=1),
    batch_schedule=timedelta(days=1),
    online=False,
    offline=False,
    feature_start_time=datetime(2022, 5, 1),
    owner="derek@tecton.ai",
    tags={"release": "production"},
)
def auction_keywords(ad_impressions):
    split_keywords = str_split(ad_impressions, const("content_keyword"), const("keywords"), const("' '"))
    return keyword_stats(split_keywords, const("keywords"))
Multiple Data Sources​
- Spark
- Snowflake
from tecton import batch_feature_view, FilteredSource
from fraud.entities import user
from fraud.data_sources.fraud_users import fraud_users_batch
from fraud.data_sources.transactions import transactions_batch
from datetime import datetime, timedelta
# For every transaction, the following FeatureView precomputes a feature that indicates
# whether a user was an adult as of the time of the transaction
@batch_feature_view(
    sources=[FilteredSource(transactions_batch), fraud_users_batch],
    entities=[user],
    mode="spark_sql",
    online=False,
    offline=False,
    feature_start_time=datetime(2022, 5, 1),
    batch_schedule=timedelta(days=1),
    ttl=timedelta(days=100),
    tags={"release": "production"},
    owner="david@tecton.ai",
    description="Whether the user performing the transaction is over 18 years old.",
)
def transaction_user_is_adult(transactions_batch, fraud_users_batch):
    return f"""
        select
          timestamp,
          t.user_id,
          IF (datediff(timestamp, to_date(dob)) > (18*365), 1, 0) as user_is_adult
        from {transactions_batch} t join {fraud_users_batch} u on t.user_id=u.user_id
    """
from tecton import batch_feature_view, FilteredSource
from fraud.entities import user
from fraud.data_sources.fraud_users import fraud_users_batch
from fraud.data_sources.transactions import transactions_batch
from datetime import datetime, timedelta
# For every transaction, the following FeatureView precomputes a feature that indicates
# whether a user was an adult as of the time of the transaction
@batch_feature_view(
    sources=[FilteredSource(transactions_batch), fraud_users_batch],
    entities=[user],
    mode="snowflake_sql",
    online=False,
    offline=False,
    feature_start_time=datetime(2022, 5, 1),
    batch_schedule=timedelta(days=1),
    ttl=timedelta(days=100),
    tags={"release": "production"},
    owner="david@tecton.ai",
    description="Whether the user performing the transaction is over 18 years old.",
)
def transaction_user_is_adult(transactions_batch, fraud_users_batch):
    return f"""
        select
          timestamp,
          t.user_id,
          IFF (datediff(year, timestamp, to_date(dob)) > (18*365), 1, 0) as user_is_adult
        from {transactions_batch} t join {fraud_users_batch} u on t.user_id=u.user_id
    """