Skip to main content
Version: Beta 🚧

Unit Testing

note

This page does not apply to Tecton on Snowflake.

If you are using Tecton on Spark and you will be running unit tests, the Tecton CLI must be installed using one of the following commands:

  • To install with Pyspark 3.1: pip install 'tecton[pyspark]'
  • To install with Pyspark 3.2: pip install 'tecton[pyspark3.2]'
  • To install with Pyspark 3.3: pip install tecton pyspark==3.3

Overview

Unit tests are stored in feature repositories, in files whose path matches the pattern **/tests/*.py.

Tests run when the following commands are executed:

  • tecton apply: Runs the tests and applies the repo if the tests pass.

  • tecton plan: Runs the tests and shows the changes that would be made to the repo if the changes were applied.

  • tecton test: Runs the tests, only.

On-Demand Feature View Unit Test

Testing a On-Demand Feature View is straightforward, all that we need is the On-Demand Feature View and a test file located in a tests directory.

For example, let's say I have a feature view that determines if a transaction amount is high:

from tecton import RequestSource, on_demand_feature_view
from tecton.types import Field, Float64, Int64
from pyspark.sql.types import StructType, LongType
import pandas

# Define the request schema
transaction_request = RequestSource(schema=[Field("amount", Float64)])

# Define the output schema
output_schema = [Field("transaction_amount_is_high", Int64)]

# This On-Demand Feature View evaluates a transaction amount and declares it as "high", if it's higher than 10,000
@on_demand_feature_view(
sources=[transaction_request],
mode="pandas",
schema=output_schema,
owner="matt@tecton.ai",
description="Whether the transaction amount is considered high (over $10000)",
)
def transaction_amount_is_high(transaction_request: pandas.DataFrame):
import pandas as pd

df = pd.DataFrame()
df["transaction_amount_is_high"] = (transaction_request["amount"] >= 10000).astype("int64")
return df

With the above feature view, we can define the unit test that mocks up some sample inputs, and asserts that we're getting the expected result.

### tests/transaction_amount_is_high.py ###
from fraud.features.on_demand_feature_views.transaction_amount_is_high import transaction_amount_is_high
import pandas

# Testing the 'transaction_amount_is_high' feature which depends on request data ('amount') as input
def test_transaction_amount_is_high():
transaction_request = pandas.DataFrame({"amount": [124, 10001, 34235436234]})

actual = transaction_amount_is_high.test_run(transaction_request=transaction_request)
expected = pandas.DataFrame({"transaction_amount_is_high": [0, 1, 1]})

pandas.testing.assert_frame_equal(actual, expected)

Spark Feature View Unit Test

Creating a unit test in a PySpark or Spark SQL feature view is similar to the above example, except that we also need to install the Java Development Kit (JDK) locally and provide a SparkSession in the test code.

When executing with tecton test or pytest, Tecton will set the validation mode to "skip" to avoid connecting to the Tecton backend. Furthermore, Tecton ensures that calls to feature transformation code round trips through the function serialization process to ensure that the code executed is representative of what will be executed in materialization.

Installing the JDK

Unit tests using a PySpark or Spark SQL feature view require JDK version 8 (u201 or later) or JDK version 11 to run.

caution

As noted on the Oracle web site, older versions of the JDK are provided to help developers debug issues in older systems. They are not updated with the latest security patches and are not recommended for use in production.

Unit tests using a PySpark or Spark SQL feature view are not supported in environments that have Databricks Connect installed.

Install the JDK and then set the JAVA_HOME environment variable.

Supported Methods

In unit tests, the following methods are supported:

  • on demand feature view: test_run
  • batch and stream feature view: test_run, run, and get_historical_features

Examples

For example, let's say I have a feature view that determines if a user has good credit:

### user_has_good_credit.py ###
from tecton import batch_feature_view, FilteredSource
from fraud.entities import user
from fraud.data_sources.credit_scores_batch import credit_scores_batch
from datetime import datetime, timedelta


@batch_feature_view(
sources=[FilteredSource(source=credit_scores_batch)],
entities=[user],
mode="spark_sql",
online=True,
offline=True,
feature_start_time=datetime(2021, 1, 1),
batch_schedule=timedelta(days=1),
ttl=timedelta(days=120),
)
def user_has_good_credit(credit_scores):
return f"""
SELECT
user_id,
credit_score > 670 as user_has_good_credit,
timestamp
FROM
{credit_scores}
"""

Because this is a Spark SQL feature view, we'll need a SparkSession to test. Tecton provides the tecton_pytest_spark_session pytest fixture. This fixture creates a SparkSession.

Finally, we can define the actual unit test that mocks up some sample inputs, and asserts that we're getting the expected result.

You should ensure that the mock data schema exactly matches the source schema. Any datetime partition columns that may be present need to match, too.

run unit test

from datetime import datetime, timedelta

import pandas

from fraud.features.batch_feature_views.user_has_good_credit import user_has_good_credit


def test_user_credit_card_issuer(tecton_pytest_spark_session):
input_pandas_df = pandas.DataFrame(
{
"user_id": ["user_1", "user_2", "user_3"],
"signup_timestamp": [datetime(2022, 5, 1)] * 3,
"credit_score": [600, 670, 700],
}
)
input_spark_df = tecton_pytest_spark_session.createDataFrame(input_pandas_df)

# Simulate materializing features for May 1st.
output = user_has_good_credit.test_run(
start_time=datetime(2022, 5, 1), end_time=datetime(2022, 5, 2), credit_scores=input_spark_df
)

actual = output.to_pandas()

expected = pandas.DataFrame(
{
"user_id": ["user_1", "user_2", "user_3"],
"timestamp": [datetime(2022, 5, 1)] * 3,
"user_has_good_credit": [False, False, True],
}
)

pandas.testing.assert_frame_equal(actual, expected)

Just like in the example above, this test will now run when we execute tecton apply, tecton plan, or tecton test.

get_historical_features unit test

note

The ability to run get_historical_features in a unit test was introduced in Tecton SDK 0.7 and does not work in prior versions.

def test_user_credit_card_issuer_ghf(tecton_pytest_spark_session):
input_pandas_df = pandas.DataFrame(
{
"user_id": ["user_1", "user_2", "user_3", "user_4"],
"signup_timestamp": [datetime(2022, 5, 1)] * 4,
"cc_num": [1000000000000000, 4000000000000000, 5000000000000000, 6000000000000000],
}
)
input_spark_df = tecton_pytest_spark_session.createDataFrame(input_pandas_df)

spine_df = pandas.DataFrame(
{
"user_id": ["user_1", "user_1", "user_2", "user_not_found"],
"timestamp": [datetime(2022, 5, 1), datetime(2022, 5, 2), datetime(2022, 6, 1), datetime(2022, 6, 1)],
}
)

# Simulate materializing features for May 1st.
output = user_credit_card_issuer.get_historical_features(
spine_df, mock_inputs={"fraud_users_batch": input_spark_df}
)

actual = output.to_pandas()

expected = pandas.DataFrame(
{
"user_id": ["user_1", "user_1", "user_2", "user_not_found"],
"timestamp": [datetime(2022, 5, 1), datetime(2022, 5, 2), datetime(2022, 6, 1), datetime(2022, 6, 1)],
"user_credit_card_issuer__credit_card_issuer": [None, "other", "Visa", None],
}
)

# NOTE: because the Spark join has non-deterministic ordering, it is important to
# sort the dataframe to avoid test flakes.
actual = actual.sort_values(["user_id", "timestamp"]).reset_index(drop=True)
expected = expected.sort_values(["user_id", "timestamp"]).reset_index(drop=True)

pandas.testing.assert_frame_equal(actual, expected)

Configuring the Local Test Spark Session

Tecton provides a Pytest session-scoped tecton_pytest_spark_session fixture. However, that spark session may not be configured correctly for your tests. In that case, you may either configure the Tecton provided fixture or create your own spark session.

Here's an example of configuring the Tecton-provided spark session:

import pytest


@pytest.fixture(scope="module", autouse=True)
def configure_spark_session(tecton_pytest_spark_session):
# Custom configuration for the spark session.
tecton_pytest_spark_session.conf.set("spark.sql.session.timeZone", "UTC")

And here's an example of how to create your own spark session and provide it to Tecton:

from importlib import resources


@pytest.fixture(scope="session")
def my_custom_spark_session():
"""Returns a custom spark session configured for use in Tecton unit testing."""
with resources.path("tecton_spark.jars", "tecton-udfs-spark-3.jar") as path:
tecton_udf_jar_path = str(path)

spark = (
SparkSession.builder.appName("my_custom_spark_session")
.config("spark.jars", tecton_udf_jar_path)
# This short-circuit's Spark's attempt to auto-detect a hostname for the master address, which can lead to
# errors on hosts with "unusual" hostnames that Spark believes are invalid.
.config("spark.driver.host", "localhost")
.config("spark.sql.session.timeZone", "UTC")
.getOrCreate()
)
try:
tecton.set_tecton_spark_session(spark)
yield spark
finally:
spark.stop()

Skip Tests

Specifying the --skip-tests flag when running tecton apply, tecton plan, or tecton test will skip execution of Tecton tests.

Was this page helpful?

Happy React is loading...