Customizing Spark Materialization jobs
This page does not apply to Tecton on Snowflake.
Overview
When defining a Batch Feature View or a Stream Feature View, you can set the
batch_compute
and stream_compute
parameters, which can be set to cluster
config objects that are listed in the following table.
Data Platform | Simple Cluster Config Object | Custom Cluster Config Object |
---|---|---|
Databricks | DatabricksClusterConfig | DatabricksJsonClusterConfig |
EMR | EMRClusterConfig | EMRJsonClusterConfig |
Snowflake | Not supported | Not supported |
Compared to the simple cluster config objects, custom cluster config objects are more flexible. With the custom config objects, you can specify a wide range of configuration settings for your Spark cluster. For example, you can specify the number of worker nodes, the amount of memory and CPU to allocate to each worker, the Spark version to use, and the type of storage to use.
An example Batch Feature View that uses a DatabricksJsonClusterConfig
object
is shown below.
@batch_feature_view(
...,
batch_compute=DatabricksJsonClusterConfig(
json=my_json
), # my_json is your JSON string containing your spark cluster configurations
...,
)
def feature_view_1():
return ...
The JSON you use to specify your cluster config must conform to the Databricks schema or EMR schema that is required to submit jobs to the cluster.
After a custom cluster config object is instantiated (via the instantiation of a Feature View), Tecton internally updates the contents of the JSON string; some settings are appended and others are overwritten.
Using the Web UI, you can see both the original JSON string and the updated JSON string under the materialization tab of your feature view.
Configuring Databricks job clusters
Tecton uses the
Runs Submit
endpoint of the Databricks Jobs API 2.0 to programmatically create and manage
materialization jobs that run on Databricks. When a
DatabricksJsonClusterConfig
object is instantiated, a JSON string that
conforms to the Runs Submit
request schema
must provided.
Use new_cluster
for your cluster configurations; existing cluster reuse is not
supported.
Materialization jobs are executed as a notebook_task
. Other task types, such
as spark_jar_task
, spark_python_task
,spark_submit_task
, and
pipeline_task
are not supported.
Settings overridden by Tecton
Tecton overrides the following contents of the JSON value that is passed to a
DatabricksJsonClusterConfig
object: run_name
, cluster_log_conf
and
notebook_task
.
"run_name": "orch-<your_feature_view_name>-<your_feature_view_id>-<YOUR_TASK_TYPE>_<your_task_timestamp>-<your_task_attempt_number>",
"new_cluster": {
...,
"cluster_log_conf": {
"s3": {
"destination": "s3://<your_bucket>/logs/<your_run_name>",
"region": "<your_deployment_region>"
}
}
},
"notebook_task": {
"notebook_path": "/Users/<your_user_name>/materialization_notebook.py",
"base_parameters": {
"materialization_params": "s3://<your_bucket>/intermediate-data/materialization_params/<your_run_name>"
}
}
Settings appended by Tecton
Tecton appends the following contents to the JSON value that is passed to a
DatabricksJsonClusterConfig
object.
init_scripts
:"init_scripts": [
...,
{
"s3": {
"destination": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/chronocollector_init_script_v2_esr.sh",
"region": "<your_deployment_region>"
}
}
]custom_tags
based on feature view metadata. Do not specify tags with the same key as shown below, since duplicate keys are not allowed for tags."custom_tags": [
...,
{
"key": "feature_view_id",
"value": "<your_feature_view_id>"
},
{
"key": "tecton-owned",
"value": "true"
},
{
"key": "tecton-cluster-name",
"value": "<your_cluster_name>"
},
{
"key": "tecton_feature_view",
"value": "<your_feature_view_name>"
},
{
"key": "tecton_deployment",
"value": "<your_deployment_name>"
},
{
"key": "tecton_workspace",
"value": "<your_workspace_name>"
}
]spark_conf
:"spark_conf": {
...,
"spark.sql.streaming.metricsEnabled": "true",
"spark.metrics.namespace": "${spark.app.name}",
"*.sink.statsd.prefix": "spark",
"*.sink.statsd.host": "0.0.0.0",
"*.sink.statsd.period": "5",
"*.sink.statsd.unit": "seconds",
"*.sink.statsd.port": "3031",
"*.source.jvm.class": "org.apache.spark.metrics.source.JvmSource",
"*.sink.statsd.class": "org.apache.spark.metrics.sink.StatsdSink",
"spark.app.name": "<your_feature_view_name>__<your_feature_view_id>"
}spark_env_vars
that are required by materialization jobs:"spark_env_vars": {
...,
"AWS_STS_REGIONAL_ENDPOINTS": "regional",
"TECTON_CLUSTER_NAME": "<your_cluster_name>"
}libraries
that are required by materialization jobs:"libraries": [
...,
{
"jar": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/tecton-udfs-spark-3.jar"
},
{
"jar": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/tecton-online-store-sink-spark-3.jar"
},
{
"whl": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/tecton_materialization-latest-py3-none-any.whl"
}
]
Example Databricks cluster configuration
The following example first shows a sample JSON value that is passed to a
DatabricksJsonClusterConfig
object. Next, the final JSON value (including JSON
settings overridden and appended by Tecton) is shown. In the final JSON value,
settings appended by Tecton are highlighted.
JSON value passed to the DatabricksJsonClusterConfig object
{
"new_cluster": {
"num_workers": 0,
"spark_version": "11.3.x-scala2.12",
"node_type_id": "m5.large",
"aws_attributes": {
"ebs_volume_type": "GENERAL_PURPOSE_SSD",
"ebs_volume_count": 1,
"ebs_volume_size": 100,
"instance_profile_arn": "arn:aws:iam::your_account_id:instance-profile/your-role",
"availability": "SPOT",
"zone_id": "auto"
},
"custom_tags": [
{
"key": "test_key",
"value": "test_value"
}
],
"spark_conf": {
"spark.databricks.service.server.enabled": "true",
"spark.hadoop.fs.s3a.acl.default": "BucketOwnerFullControl",
"spark.sql.sources.partitionOverwriteMode": "dynamic",
"spark.databricks.cluster.profile": "singleNode",
"spark.sql.legacy.parquet.datetimeRebaseModeInRead": "CORRECTED",
"spark.sql.legacy.parquet.int96RebaseModeInRead": "CORRECTED",
"spark.sql.legacy.parquet.int96RebaseModeInWrite": "CORRECTED",
"spark.master": "local[*]",
"spark.hadoop.hive.metastore.glue.catalogid": "your_account_id",
"spark.databricks.hive.metastore.glueCatalog.enabled": "true"
},
"spark_env_vars": {
"TEST_ENV_VAR": "IGNORE_ME"
}
},
"libraries": [
{
"jar": "s3://tecton.ai.public/jars/snowflake-jdbc-3.13.6.jar"
}
]
}
Final JSON value (with settings appended by Tecton highlighted) that is passed to Databricks
{
"new_cluster": {
"num_workers": 0,
"spark_version": "11.3.x-scala2.12",
"node_type_id": "m5.large",
"aws_attributes": {
"ebs_volume_type": "GENERAL_PURPOSE_SSD",
"ebs_volume_count": 1,
"ebs_volume_size": 100,
"instance_profile_arn": "arn:aws:iam::your_account_id:instance-profile/your-role",
"availability": "SPOT",
"zone_id": "auto"
},
"init_scripts": [
{
"s3": {
"destination": "s3://path_to_script_store/chronocollector_init_script_v2_esr.sh",
"region": "us-west-2"
}
}
],
"cluster_log_conf": {
"s3": {
"destination": "s3://path_to_log_store/your_run_name",
"region": "us-west-2"
}
},
"custom_tags": [
{
"key": "test_key",
"value": "test_value"
},
{
"key": "feature_view_id",
"value": "fv_id"
},
{
"key": "tecton-owned",
"value": "true"
},
{
"key": "tecton-cluster-name",
"value": "dev-blair"
},
{
"key": "tecton_feature_view",
"value": "test_bfv"
},
{
"key": "tecton_deployment",
"value": "dev-blair"
},
{
"key": "tecton_workspace",
"value": "db-1"
}
],
"spark_conf": {
"spark.databricks.cluster.profile": "singleNode",
"spark.sql.legacy.parquet.int96RebaseModeInWrite": "CORRECTED",
"spark.hadoop.hive.metastore.glue.catalogid": "your_account_id",
"spark.databricks.hive.metastore.glueCatalog.enabled": "true",
"spark.hadoop.fs.s3a.acl.default": "BucketOwnerFullControl",
"spark.sql.sources.partitionOverwriteMode": "dynamic",
"spark.sql.legacy.parquet.int96RebaseModeInRead": "CORRECTED",
"spark.master": "local[*]",
"spark.databricks.service.server.enabled": "true",
"spark.sql.legacy.parquet.datetimeRebaseModeInRead": "CORRECTED",
"spark.sql.streaming.metricsEnabled": "true",
"spark.metrics.namespace": "${spark.app.name}",
"*.sink.statsd.prefix": "spark",
"*.sink.statsd.host": "0.0.0.0",
"*.sink.statsd.period": "5",
"*.sink.statsd.unit": "seconds",
"*.sink.statsd.port": "3031",
"*.source.jvm.class": "org.apache.spark.metrics.source.JvmSource",
"*.sink.statsd.class": "org.apache.spark.metrics.sink.StatsdSink",
"spark.app.name": "test_bfv__fv_id"
},
"spark_env_vars": {
"TEST_ENV_VAR": "IGNORE_ME",
"AWS_STS_REGIONAL_ENDPOINTS": "regional",
"TECTON_CLUSTER_NAME": "dev-blair"
}
},
"notebook_task": {
"notebook_path": "/Users/bot@tecton.ai/materialization_notebook.py",
"base_parameters": {
"materialization_params": "s3://path_to_param_store/your_run_name"
}
},
"run_name": "orch-test_bfv-fv_id-BATCH_2022.12.28.00:00:00-1",
"libraries": [
{
"jar": "s3://tecton.ai.public/jars/snowflake-jdbc-3.13.6.jar"
},
{
"jar": "s3://path_to_script_store/tecton-udfs-spark-3.jar"
},
{
"jar": "s3://path_to_script_store/tecton-online-store-sink-spark-3.jar"
},
{
"whl": "s3://path_to_script_store/tecton_materialization-latest-py3-none-any.whl"
}
]
}
Configuring EMR job clusters
Tecton supports EMR versions from 6.5 to 6.9.
Tecton uses the RunJobFlow
request action of the EMR API to start a new EMR
cluster. When you instantiate an EMRJsonClusterConfig
object, you must provide
a JSON string that conforms with the
RunJobFlow request schema.
The following are required parameters: ReleaseLabel
, Instances
,
ServiceRole
, JobFlowRole
. These parameters are explained in the EMR
RunJobFlow documentation that is linked to earlier in this paragraph.
Settings overridden by Tecton
Tecton overrides the following contents of the JSON value that is passed to a
EMRJsonClusterConfig
object.
The name of the job flow:
“Name”: “<your_run_name>”
LogUri
, based on feature view metadata:"LogUri": "s3://<your_bucket>/logs/<your_run_name>"
Applications
to include all required dependencies :"Applications": [
{
"Name": "Hadoop"
},
{
"Name": "Hive"
},
{
"Name": "Spark"
},
{
"Name": "Livy"
}
]
Settings appended by Tecton
Tecton appends the following contents to the JSON value that is passed to a
EMRJsonClusterConfig
object.
Bootstrap actions appended by Tecton
chronocollector
initialization{
"Name": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/emr_chronocollector_init_script_v2.sh",
"ScriptBootstrapAction": {
"Path": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/emr_chronocollector_init_script_v2.sh",
"Args": []
}
}The Delta JAR installation for Python:
{
"Name": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/install_delta.sh",
"ScriptBootstrapAction": {
"Path": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/install_delta.sh",
"Args": [
"s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/delta-core.jar"
]
}
}Installation of Tecton JARs from S3 to the destination
/home/hadoop/extrajars/*
, includingtecton-udfs-spark-3.jar
andtecton-online-store-sink-spark-3.jar
anddelta-core.jar
if you're using EMR 6.5:{
"Name": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/install_jars_from_s3.sh",
"ScriptBootstrapAction": {
"Path": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/install_jars_from_s3.sh",
"Args": [
"s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/tecton-udfs-spark-3.jar",
"s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/tecton-online-store-sink-spark-3.jar",
"s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/delta-core.jar"
]
}
}Installation of the materialization library from S3 to the destination
/home/hadoop/extrajars/*
{
"Name": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/install_python_libraries_from_s3.sh",
"ScriptBootstrapAction": {
"Path": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/install_python_libraries_from_s3.sh",
"Args": [
"s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/tecton_materialization-latest-py3-none-any.whl"
]
}
}Installation of
pyarrow==5.0.0
if you’re using EMR 6.5:{
"Name": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/install_python_libraries_from_pypi.sh",
"ScriptBootstrapAction": {
"Path": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/install_python_libraries_from_pypi.sh",
"Args": ["pyarrow==5.0.0"]
}
}
Steps appended by Tecton
A first step to enable debugging:
{
"Name": "Enable debugging",
"ActionOnFailure": "TERMINATE_JOB_FLOW",
"HadoopJarStep": {
"Properties": [],
"Jar": "command-runner.jar",
"Args": ["state-pusher-script"]
}
}A step for materialization, where transformation logic is run. Tecton will fill in the executable and its arguments to the
spark-submit
command and append several required dependency packages to the--packages
argument and--jars
argument:{
"Name": "Tecton Materialization",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {
"Properties": [],
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
...,
"--packages",
"org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.10.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.qubole.spark:spark-sql-kinesis_2.12:1.2.0_spark-3.0,xerces:xercesImpl:2.8.0",
"--jars",
"/home/hadoop/extrajars/*",
"s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/materialization_entrypoint.py",
"--materialization-params",
"s3://<your_bucket>/internal/intermediate-data/materialization_params/<your_run_name>",
"--materialization-step",
"1",
"--spark-session-name",
"<your_feature_view_name>__<your_feature_view_id>"
]
}
}
If you would like to specify any additional library dependencies via --jars
,
--packages
or --py-files
, see
following section.
A
Tecton Feature Store Writer
step:{
"Name": "Tecton Feature Store Writer",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {
"Properties": [],
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--packages",
"org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.10.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.qubole.spark:spark-sql-kinesis_2.12:1.2.0_spark-3.0,xerces:xercesImpl:2.8.0",
"--jars",
"/home/hadoop/extrajars/*",
"--conf",
"spark.driver.maxResultSize=4G",
"--conf",
"spark.sql.hive.caseSensitiveInferenceMode=NEVER_INFER",
"--conf",
"spark.serializer=org.apache.spark.serializer.KryoSerializer",
"--conf",
"spark.delta.logStore.class=io.delta.storage.DynamoDBLogStore",
"--conf",
"spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog",
"--conf",
"spark.delta.DynamoDBLogStore.tableName=delta",
"--conf",
"spark.delta.DynamoDBLogStore.region=us-west-2",
"--conf",
"spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension",
"s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/materialization_entrypoint.py",
"--materialization-params",
"s3://<your_bucket>/internal/intermediate-data/materialization_params/<your_run_name>",
"--materialization-step",
"2",
"--spark-session-name",
"<your_feature_view_name>__<your_feature_view_id>"
]
}
}
Configurations appended by Tecton
Properties
for the spark-defaults
, spark-metrics
, yarn-env
, spark-env
and livy-env
classifications. Do not specify duplicate Properties
keys as
shown below, since duplicate keys are not allowed.
"Configurations": [
...,
{
"Classification": "spark-defaults",
"Configurations": [...],
"Properties": {
...,
"spark.sql.streaming.metricsEnabled": "true",
"spark.metrics.namespace": "${spark.app.name}",
"spark.app.name": "<your_feature_view_name>__<your_feature_view_id>"
}
},
{
"Classification": "yarn-env",
"Configurations": [
...,
{
"Classification": "export",
"Configurations": [...],
"Properties": {
...,
"CLUSTER_REGION": "us-west-2",
"AWS_STS_REGIONAL_ENDPOINTS": "regional",
"TECTON_CLUSTER_NAME": "<your_cluster_name>"
}
}
],
"Properties": {...}
},
{
"Classification": "spark-metrics",
"Configurations": [...],
"Properties": {
...,
"*.sink.statsd.prefix": "spark",
"*.sink.statsd.host": "0.0.0.0",
"*.sink.statsd.period": "5",
"*.sink.statsd.unit": "seconds",
"*.sink.statsd.port": "3031",
"*.source.jvm.class": "org.apache.spark.metrics.source.JvmSource",
"*.sink.statsd.class": "org.apache.spark.metrics.sink.StatsdSink"
}
},
{
"Classification": "spark-env",
"Configurations": [
...,
{
"Classification": "export",
"Configurations": [...],
"Properties": {
...,
"CLUSTER_REGION": "us-west-2",
"AWS_STS_REGIONAL_ENDPOINTS": "regional",
"TECTON_CLUSTER_NAME": "<your_cluster_name>"
}
}
],
"Properties": {...}
},
{
"Classification": "livy-env",
"Configurations": [
...,
{
"Classification": "export",
"Configurations": [...],
"Properties": {
...,
"CLUSTER_REGION": "us-west-2",
"AWS_STS_REGIONAL_ENDPOINTS": "regional",
"TECTON_CLUSTER_NAME": "<your_cluster_name>"
}
}
],
"Properties": {...}
}
]
If you use a Glue or Iceberg catalog as your metastore, you may want to specify
hive-site
and spark-hive-site
settings.
Tags appended by Tecton
Do not specify tags with the same key as shown below, since duplicate keys are not allowed for tags.
"Tags": [
...,
{
"Key": "feature_view_id",
"Value": "<your_feature_view_id>"
},
{
"Key": "tecton-owned",
"Value": "true"
},
{
"Key": "tecton-cluster-name",
"Value": "<your_cluster_name>"
},
{
"Key": "tecton_feature_view",
"Value": "<your_feature_view_name>"
},
{
"Key": "tecton_deployment",
"Value": "<your_deployment_name>"
},
{
"Key": "tecton_workspace",
"Value": "<your_workspace_name>"
}
]
Attaching custom libraries to EMR materialization jobs
Download and install your dependency libraries to your job clusters. We recommend doing so either via a bootstrap action or a step.
Pass your dependency libraries as arguments to the spark-submit command in
Tecton Materialization
step. For example, if you want to attach all custom jars installed in/home/hadoop/customjars/*
directory, a python library in an accessible S3 location and the protobuf maven package, theTecton Materialization
step config in your JSON could read as follows:{
"Name": "Tecton Materialization",
"HadoopJarStep": {
"Args": [
"spark-submit",
"--packages",
"com.google.protobuf:protobuf-java:3.22.0",
"--jars",
"/home/hadoop/customjars/*",
"--py-files",
"s3://<path_to_your_python_library>"
]
}
}
Example EMR cluster configuration
The following example first shows a sample JSON value that is passed to a
EMRJsonClusterConfig
object. Next, the final JSON value (including JSON
settings overridden and appended by Tecton) is shown. In the final JSON value,
settings appended by Tecton are highlighted.
JSON value passed to the EMRJsonClusterConfig object
{
"ReleaseLabel": "emr-6.9.0",
"Instances": {
"InstanceGroups": [],
"InstanceFleets": [
{
"InstanceFleetType": "CORE",
"TargetOnDemandCapacity": 0,
"TargetSpotCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "r4.xlarge",
"BidPriceAsPercentageOfOnDemandPrice": 100,
"Configurations": []
}
]
},
{
"InstanceFleetType": "MASTER",
"TargetOnDemandCapacity": 0,
"TargetSpotCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "r4.xlarge",
"BidPriceAsPercentageOfOnDemandPrice": 100,
"Configurations": []
}
]
}
],
"Ec2SubnetIds": ["subnet-your_net_a", "subnet-your_net_b"],
"EmrManagedMasterSecurityGroup": "sg-your_group_a",
"EmrManagedSlaveSecurityGroup": "sg-your_group_b",
"ServiceAccessSecurityGroup": "sg-your_group_c",
"AdditionalMasterSecurityGroups": [],
"AdditionalSlaveSecurityGroups": []
},
"Configurations": [
{
"Classification": "spark-defaults",
"Configurations": [],
"Properties": {
"spark.driver.maxResultSize": "4G",
"spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension,org.apache.spark.sql.hudi.HoodieSparkSessionExtension",
"spark.sql.catalogImplementation": "hive",
"spark.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
"spark.hadoop.hive.metastore.glue.catalogid": "your_account_id",
"spark.hive.metastore.glue.catalogid": "your_account_id",
"spark.sql.legacy.parquet.datetimeRebaseModeInRead": "CORRECTED",
"spark.sql.legacy.parquet.int96RebaseModeInRead": "CORRECTED",
"spark.sql.legacy.parquet.int96RebaseModeInWrite": "CORRECTED"
}
},
{
"Classification": "hive-site",
"Configurations": [],
"Properties": {
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
"hive.metastore.glue.catalogid": "your_account_id"
}
},
{
"Classification": "spark-hive-site",
"Configurations": [],
"Properties": {
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
"hive.metastore.glue.catalogid": "your_account_id"
}
},
{
"Classification": "yarn-env",
"Configurations": [
{
"Classification": "export",
"Configurations": [],
"Properties": {
"TEST_ENV_VAR": "TEST_VALUE"
}
}
],
"Properties": {}
}
],
"BootstrapActions": [
{
"Name": "install_jars_from_s3",
"ScriptBootstrapAction": {
"Path": "s3://tecton-dev-shared/blair/install_jars_from_s3.sh",
"Args": [
"s3://tecton.ai.public/jars/spark-snowflake_2.12-2.9.1-spark_3.0.jar",
"s3://tecton.ai.public/jars/snowflake-jdbc-3.13.6.jar"
]
}
},
{
"Name": "install_python_libraries_from_pypi",
"ScriptBootstrapAction": {
"Path": "s3://tecton-dev-shared/blair/install_python_libraries_from_pypi.sh",
"Args": ["apache-sedona"]
}
}
],
"Steps": [
{
"Name": "Test Custom Step",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Properties": [],
"Jar": "command-runner.jar",
"Args": ["state-pusher-script"]
}
},
{
"Name": "Tecton Materialization",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {
"Properties": [],
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--executor-memory",
"4g",
"--packages",
"com.google.protobuf:protobuf-java:3.22.0",
"--jars",
"/home/hadoop/customjars/*",
"--py-files",
"s3://tecton-dev-shared/blair/test.whl"
]
}
}
],
"JobFlowRole": "your-job-flow-role",
"ServiceRole": "your-service-role",
"CustomAmiId": "your-custom-AMI-ID"
}
Final JSON value (with settings appended by Tecton highlighted) that is passed to EMR
{
"Name": "orch-test_bfv-fv_id-BATCH_2022.11.02.00:00:00-1",
"LogUri": "s3://path_to_log_store/your_run_name",
"ReleaseLabel": "emr-6.9.0",
"Instances": {
"InstanceGroups": [],
"InstanceFleets": [
{
"InstanceFleetType": "CORE",
"TargetOnDemandCapacity": 0,
"TargetSpotCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "r4.xlarge",
"BidPriceAsPercentageOfOnDemandPrice": 100,
"Configurations": []
}
]
},
{
"InstanceFleetType": "MASTER",
"TargetOnDemandCapacity": 0,
"TargetSpotCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "r4.xlarge",
"BidPriceAsPercentageOfOnDemandPrice": 100,
"Configurations": []
}
]
}
],
"Ec2SubnetIds": ["subnet-your_net_a", "subnet-your_net_b"],
"EmrManagedMasterSecurityGroup": "sg-your_group_a",
"EmrManagedSlaveSecurityGroup": "sg-your_group_b",
"ServiceAccessSecurityGroup": "sg-your_group_c",
"AdditionalMasterSecurityGroups": [],
"AdditionalSlaveSecurityGroups": []
},
"Steps": [
{
"Name": "Enable debugging",
"ActionOnFailure": "TERMINATE_JOB_FLOW",
"HadoopJarStep": {
"Properties": [],
"Jar": "command-runner.jar",
"Args": ["state-pusher-script"]
}
},
{
"Name": "Test Custom Step",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Properties": [],
"Jar": "command-runner.jar",
"Args": ["state-pusher-script"]
}
},
{
"Name": "Tecton Materialization",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {
"Properties": [],
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--executor-memory",
"4g",
"--packages",
"com.google.protobuf:protobuf-java:3.22.0,org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.10.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.qubole.spark:spark-sql-kinesis_2.12:1.2.0_spark-3.0,xerces:xercesImpl:2.8.0",
"--jars",
"/home/hadoop/customjars/*,/home/hadoop/extrajars/*",
"--py-files",
"s3://tecton-dev-shared/blair/test.whl",
"s3://path_to_script_store/materialization_entrypoint.py",
"--materialization-params",
"s3://path_to_param_store/your_run_name",
"--materialization-step",
"1",
"--spark-session-name",
"test_bfv__fv_id"
]
}
},
{
"Name": "Tecton Feature Store Writer",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {
"Properties": [],
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--packages",
"org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.10.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.qubole.spark:spark-sql-kinesis_2.12:1.2.0_spark-3.0,xerces:xercesImpl:2.8.0",
"--jars",
"/home/hadoop/extrajars/*",
"--conf",
"spark.driver.maxResultSize=4G",
"--conf",
"spark.sql.hive.caseSensitiveInferenceMode=NEVER_INFER",
"--conf",
"spark.serializer=org.apache.spark.serializer.KryoSerializer",
"--conf",
"spark.sql.sources.partitionOverwriteMode=dynamic",
"--conf",
"spark.delta.logStore.class=io.delta.storage.DynamoDBLogStore",
"--conf",
"spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog",
"--conf",
"spark.delta.DynamoDBLogStore.tableName=delta",
"--conf",
"spark.delta.DynamoDBLogStore.region=us-west-2",
"--conf",
"spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension",
"s3://path_to_script_store/materialization_entrypoint.py",
"--materialization-params",
"s3://path_to_param_store/your_run_name",
"--materialization-step",
"2",
"--spark-session-name",
"test_bfv__fv_id"
]
}
}
],
"BootstrapActions": [
{
"Name": "install_jars_from_s3",
"ScriptBootstrapAction": {
"Path": "s3://tecton-dev-shared/blair/install_jars_from_s3.sh",
"Args": [
"s3://tecton.ai.public/jars/spark-snowflake_2.12-2.9.1-spark_3.0.jar",
"s3://tecton.ai.public/jars/snowflake-jdbc-3.13.6.jar"
]
}
},
{
"Name": "install_python_libraries_from_pypi",
"ScriptBootstrapAction": {
"Path": "s3://tecton-dev-shared/blair/install_python_libraries_from_pypi.sh",
"Args": ["apache-sedona"]
}
},
{
"Name": "s3://path_to_script_store/install_python_libraries_from_s3.sh",
"ScriptBootstrapAction": {
"Path": "s3://path_to_script_store/install_python_libraries_from_s3.sh",
"Args": [
"s3://path_to_script_store/tecton_materialization-latest-py3-none-any.whl"
]
}
},
{
"Name": "s3://path_to_script_store/install_jars_from_s3.sh",
"ScriptBootstrapAction": {
"Path": "s3://path_to_script_store/install_jars_from_s3.sh",
"Args": [
"s3://path_to_script_store/tecton-udfs-spark-3.jar",
"s3://path_to_script_store/tecton-online-store-sink-spark-3.jar"
]
}
},
{
"Name": "s3://path_to_script_store/emr_chronocollector_init_script_v2.sh",
"ScriptBootstrapAction": {
"Path": "s3://path_to_script_store/emr_chronocollector_init_script_v2.sh",
"Args": []
}
}
],
"Applications": [
{
"Name": "Hadoop",
"Args": [],
"AdditionalInfo": {}
},
{
"Name": "Hive",
"Args": [],
"AdditionalInfo": {}
},
{
"Name": "Spark",
"Args": [],
"AdditionalInfo": {}
},
{
"Name": "Livy",
"Args": [],
"AdditionalInfo": {}
}
],
"Configurations": [
{
"Classification": "spark-defaults",
"Configurations": [],
"Properties": {
"spark.sql.legacy.parquet.datetimeRebaseModeInRead": "CORRECTED",
"spark.sql.legacy.parquet.int96RebaseModeInRead": "CORRECTED",
"spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension,org.apache.spark.sql.hudi.HoodieSparkSessionExtension",
"spark.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
"spark.hive.metastore.glue.catalogid": "your_account_id",
"spark.sql.catalogImplementation": "hive",
"spark.hadoop.hive.metastore.glue.catalogid": "your_account_id",
"spark.sql.legacy.parquet.int96RebaseModeInWrite": "CORRECTED",
"spark.sql.streaming.metricsEnabled": "true",
"spark.metrics.namespace": "${spark.app.name}",
"spark.app.name": "test_bfv__fv_id"
}
},
{
"Classification": "hive-site",
"Configurations": [],
"Properties": {
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
"hive.metastore.glue.catalogid": "your_account_id"
}
},
{
"Classification": "spark-hive-site",
"Configurations": [],
"Properties": {
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
"hive.metastore.glue.catalogid": "your_account_id"
}
},
{
"Classification": "yarn-env",
"Configurations": [
{
"Classification": "export",
"Configurations": [],
"Properties": {
"TEST_ENV_VAR": "TEST_VALUE",
"CLUSTER_REGION": "us-west-2",
"AWS_STS_REGIONAL_ENDPOINTS": "regional",
"TECTON_CLUSTER_NAME": "dev-blair"
}
}
],
"Properties": {}
},
{
"Classification": "spark-metrics",
"Configurations": [],
"Properties": {
"*.sink.statsd.prefix": "spark",
"*.sink.statsd.host": "0.0.0.0",
"*.sink.statsd.period": "5",
"*.sink.statsd.unit": "seconds",
"*.sink.statsd.port": "3031",
"*.source.jvm.class": "org.apache.spark.metrics.source.JvmSource",
"*.sink.statsd.class": "org.apache.spark.metrics.sink.StatsdSink"
}
},
{
"Classification": "spark-env",
"Configurations": [
{
"Classification": "export",
"Configurations": [],
"Properties": {
"CLUSTER_REGION": "us-west-2",
"AWS_STS_REGIONAL_ENDPOINTS": "regional",
"TECTON_CLUSTER_NAME": "dev-blair"
}
}
],
"Properties": {}
},
{
"Classification": "livy-env",
"Configurations": [
{
"Classification": "export",
"Configurations": [],
"Properties": {
"CLUSTER_REGION": "us-west-2",
"AWS_STS_REGIONAL_ENDPOINTS": "regional",
"TECTON_CLUSTER_NAME": "dev-blair"
}
}
],
"Properties": {}
}
],
"JobFlowRole": "your-job-flow-role",
"ServiceRole": "your-service-role",
"CustomAmiId": "your-custom-AMI-ID",
"Tags": [
{
"Key": "feature_view_id",
"Value": "fv_id"
},
{
"Key": "tecton-owned",
"Value": "true"
},
{
"Key": "tecton-cluster-name",
"Value": "dev-blair"
},
{
"Key": "tecton_feature_view",
"Value": "test_bfv"
},
{
"Key": "tecton_deployment",
"Value": "dev-blair"
},
{
"Key": "tecton_workspace",
"Value": "emr-1"
},
{
"Key": "tecton-accessible:dev-shared",
"Value": "true"
}
]
}
Migrating simple cluster config objects to custom cluster config objects
To migrate simple cluster config objects to custom cluster config objects, follow these steps.
- Upgrade your Tecton SDK version to 0.6
- For each Feature View that uses simple cluster config objects:
- Replace the simple cluster config objects with new custom cluster config objects.
- As needed, update
batch_compute
and/orstream_compute
in your Feature view declaration to use the new custom cluster config objects. - In the Tecton CLI, run
tecton apply
to apply your changes. The new configuration will be used for any future jobs.