Customizing Spark Materialization jobs
This page does not apply to Tecton on Snowflake.
Overview
When defining a Batch Feature View or a Stream Feature View, you can set the
batch_compute and stream_compute parameters, which can be set to cluster
config objects that are listed in the following table.
| Data Platform | Simple Cluster Config Object | Custom Cluster Config Object |
|---|---|---|
| Databricks | DatabricksClusterConfig | DatabricksJsonClusterConfig |
| EMR | EMRClusterConfig | EMRJsonClusterConfig |
| Snowflake | Not supported | Not supported |
Compared to the simple cluster config objects, custom cluster config objects are more flexible. With the custom config objects, you can specify a wide range of configuration settings for your Spark cluster. For example, you can specify the number of worker nodes, the amount of memory and CPU to allocate to each worker, the Spark version to use, and the type of storage to use.
An example Batch Feature View that uses a DatabricksJsonClusterConfig object
is shown below.
@batch_feature_view(
...,
batch_compute=DatabricksJsonClusterConfig(
json=my_json
), # my_json is your JSON string containing your spark cluster configurations
...,
)
def feature_view_1():
return ...
The JSON you use to specify your cluster config must conform to the Databricks schema or EMR schema that is required to submit jobs to the cluster.
After a custom cluster config object is instantiated (via the instantiation of a Feature View), Tecton internally updates the contents of the JSON string; some settings are appended and others are overwritten.
Using the Web UI, you can see both the original JSON string and the updated JSON string under the materialization tab of your feature view.
Configuring Databricks job clusters
Tecton uses the
Runs Submit
endpoint of the Databricks Jobs API 2.0 to programmatically create and manage
materialization jobs that run on Databricks. When a
DatabricksJsonClusterConfig object is instantiated, a JSON string that
conforms to the Runs Submit
request schema
must provided.
Use new_cluster for your cluster configurations; existing cluster reuse is not
supported.
Materialization jobs are executed as a notebook_task. Other task types, such
as spark_jar_task, spark_python_task ,spark_submit_task , and
pipeline_task are not supported.
Settings overridden by Tecton
Tecton overrides the following contents of the JSON value that is passed to a
DatabricksJsonClusterConfig object: run_name, cluster_log_conf and
notebook_task.
"run_name": "orch-<your_feature_view_name>-<your_feature_view_id>-<YOUR_TASK_TYPE>_<your_task_timestamp>-<your_task_attempt_number>",
"new_cluster": {
...,
"cluster_log_conf": {
"s3": {
"destination": "s3://<your_bucket>/logs/<your_run_name>",
"region": "<your_deployment_region>"
}
}
},
"notebook_task": {
"notebook_path": "/Users/<your_user_name>/materialization_notebook.py",
"base_parameters": {
"materialization_params": "s3://<your_bucket>/intermediate-data/materialization_params/<your_run_name>"
}
}
Settings appended by Tecton
Tecton appends the following contents to the JSON value that is passed to a
DatabricksJsonClusterConfig object.
init_scripts:"init_scripts": [
...,
{
"s3": {
"destination": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/chronocollector_init_script_v2_esr.sh",
"region": "<your_deployment_region>"
}
}
]custom_tagsbased on feature view metadata. Do not specify tags with the same key as shown below, since duplicate keys are not allowed for tags."custom_tags": [
...,
{
"key": "feature_view_id",
"value": "<your_feature_view_id>"
},
{
"key": "tecton-owned",
"value": "true"
},
{
"key": "tecton-cluster-name",
"value": "<your_cluster_name>"
},
{
"key": "tecton_feature_view",
"value": "<your_feature_view_name>"
},
{
"key": "tecton_deployment",
"value": "<your_deployment_name>"
},
{
"key": "tecton_workspace",
"value": "<your_workspace_name>"
}
]spark_conf:"spark_conf": {
...,
"spark.sql.streaming.metricsEnabled": "true",
"spark.metrics.namespace": "${spark.app.name}",
"*.sink.statsd.prefix": "spark",
"*.sink.statsd.host": "0.0.0.0",
"*.sink.statsd.period": "5",
"*.sink.statsd.unit": "seconds",
"*.sink.statsd.port": "3031",
"*.source.jvm.class": "org.apache.spark.metrics.source.JvmSource",
"*.sink.statsd.class": "org.apache.spark.metrics.sink.StatsdSink",
"spark.app.name": "<your_feature_view_name>__<your_feature_view_id>"
}spark_env_varsthat are required by materialization jobs:"spark_env_vars": {
...,
"AWS_STS_REGIONAL_ENDPOINTS": "regional",
"TECTON_CLUSTER_NAME": "<your_cluster_name>"
}librariesthat are required by materialization jobs:"libraries": [
...,
{
"jar": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/tecton-udfs-spark-3.jar"
},
{
"jar": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/tecton-online-store-sink-spark-3.jar"
},
{
"whl": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/tecton_materialization-latest-py3-none-any.whl"
}
]
Example Databricks cluster configuration
The following example first shows a sample JSON value that is passed to a
DatabricksJsonClusterConfig object. Next, the final JSON value (including JSON
settings overridden and appended by Tecton) is shown. In the final JSON value,
settings appended by Tecton are highlighted.
JSON value passed to the DatabricksJsonClusterConfig object
{
"new_cluster": {
"num_workers": 0,
"spark_version": "11.3.x-scala2.12",
"node_type_id": "m5.large",
"aws_attributes": {
"ebs_volume_type": "GENERAL_PURPOSE_SSD",
"ebs_volume_count": 1,
"ebs_volume_size": 100,
"instance_profile_arn": "arn:aws:iam::your_account_id:instance-profile/your-role",
"availability": "SPOT",
"zone_id": "auto"
},
"custom_tags": [
{
"key": "test_key",
"value": "test_value"
}
],
"spark_conf": {
"spark.databricks.service.server.enabled": "true",
"spark.hadoop.fs.s3a.acl.default": "BucketOwnerFullControl",
"spark.sql.sources.partitionOverwriteMode": "dynamic",
"spark.databricks.cluster.profile": "singleNode",
"spark.sql.legacy.parquet.datetimeRebaseModeInRead": "CORRECTED",
"spark.sql.legacy.parquet.int96RebaseModeInRead": "CORRECTED",
"spark.sql.legacy.parquet.int96RebaseModeInWrite": "CORRECTED",
"spark.master": "local[*]",
"spark.hadoop.hive.metastore.glue.catalogid": "your_account_id",
"spark.databricks.hive.metastore.glueCatalog.enabled": "true"
},
"spark_env_vars": {
"TEST_ENV_VAR": "IGNORE_ME"
}
},
"libraries": [
{
"jar": "s3://tecton.ai.public/jars/snowflake-jdbc-3.13.6.jar"
}
]
}
Final JSON value (with settings appended by Tecton highlighted) that is passed to Databricks
{
"new_cluster": {
"num_workers": 0,
"spark_version": "11.3.x-scala2.12",
"node_type_id": "m5.large",
"aws_attributes": {
"ebs_volume_type": "GENERAL_PURPOSE_SSD",
"ebs_volume_count": 1,
"ebs_volume_size": 100,
"instance_profile_arn": "arn:aws:iam::your_account_id:instance-profile/your-role",
"availability": "SPOT",
"zone_id": "auto"
},
"init_scripts": [
{
"s3": {
"destination": "s3://path_to_script_store/chronocollector_init_script_v2_esr.sh",
"region": "us-west-2"
}
}
],
"cluster_log_conf": {
"s3": {
"destination": "s3://path_to_log_store/your_run_name",
"region": "us-west-2"
}
},
"custom_tags": [
{
"key": "test_key",
"value": "test_value"
},
{
"key": "feature_view_id",
"value": "fv_id"
},
{
"key": "tecton-owned",
"value": "true"
},
{
"key": "tecton-cluster-name",
"value": "dev-blair"
},
{
"key": "tecton_feature_view",
"value": "test_bfv"
},
{
"key": "tecton_deployment",
"value": "dev-blair"
},
{
"key": "tecton_workspace",
"value": "db-1"
}
],
"spark_conf": {
"spark.databricks.cluster.profile": "singleNode",
"spark.sql.legacy.parquet.int96RebaseModeInWrite": "CORRECTED",
"spark.hadoop.hive.metastore.glue.catalogid": "your_account_id",
"spark.databricks.hive.metastore.glueCatalog.enabled": "true",
"spark.hadoop.fs.s3a.acl.default": "BucketOwnerFullControl",
"spark.sql.sources.partitionOverwriteMode": "dynamic",
"spark.sql.legacy.parquet.int96RebaseModeInRead": "CORRECTED",
"spark.master": "local[*]",
"spark.databricks.service.server.enabled": "true",
"spark.sql.legacy.parquet.datetimeRebaseModeInRead": "CORRECTED",
"spark.sql.streaming.metricsEnabled": "true",
"spark.metrics.namespace": "${spark.app.name}",
"*.sink.statsd.prefix": "spark",
"*.sink.statsd.host": "0.0.0.0",
"*.sink.statsd.period": "5",
"*.sink.statsd.unit": "seconds",
"*.sink.statsd.port": "3031",
"*.source.jvm.class": "org.apache.spark.metrics.source.JvmSource",
"*.sink.statsd.class": "org.apache.spark.metrics.sink.StatsdSink",
"spark.app.name": "test_bfv__fv_id"
},
"spark_env_vars": {
"TEST_ENV_VAR": "IGNORE_ME",
"AWS_STS_REGIONAL_ENDPOINTS": "regional",
"TECTON_CLUSTER_NAME": "dev-blair"
}
},
"notebook_task": {
"notebook_path": "/Users/bot@tecton.ai/materialization_notebook.py",
"base_parameters": {
"materialization_params": "s3://path_to_param_store/your_run_name"
}
},
"run_name": "orch-test_bfv-fv_id-BATCH_2022.12.28.00:00:00-1",
"libraries": [
{
"jar": "s3://tecton.ai.public/jars/snowflake-jdbc-3.13.6.jar"
},
{
"jar": "s3://path_to_script_store/tecton-udfs-spark-3.jar"
},
{
"jar": "s3://path_to_script_store/tecton-online-store-sink-spark-3.jar"
},
{
"whl": "s3://path_to_script_store/tecton_materialization-latest-py3-none-any.whl"
}
]
}
Configuring EMR job clusters
Tecton supports EMR versions from 6.5 to 6.9.
Tecton uses the RunJobFlow request action of the EMR API to start a new EMR
cluster. When you instantiate an EMRJsonClusterConfig object, you must provide
a JSON string that conforms with the
RunJobFlow request schema.
The following are required parameters: ReleaseLabel, Instances,
ServiceRole, JobFlowRole. These parameters are explained in the EMR
RunJobFlow documentation that is linked to earlier in this paragraph.
Settings overridden by Tecton
Tecton overrides the following contents of the JSON value that is passed to a
EMRJsonClusterConfig object.
The name of the job flow:
“Name”: “<your_run_name>”LogUri, based on feature view metadata:"LogUri": "s3://<your_bucket>/logs/<your_run_name>"Applicationsto include all required dependencies :"Applications": [
{
"Name": "Hadoop"
},
{
"Name": "Hive"
},
{
"Name": "Spark"
},
{
"Name": "Livy"
}
]
Settings appended by Tecton
Tecton appends the following contents to the JSON value that is passed to a
EMRJsonClusterConfig object.
Bootstrap actions appended by Tecton
chronocollectorinitialization{
"Name": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/emr_chronocollector_init_script_v2.sh",
"ScriptBootstrapAction": {
"Path": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/emr_chronocollector_init_script_v2.sh",
"Args": []
}
}The Delta JAR installation for Python:
{
"Name": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/install_delta.sh",
"ScriptBootstrapAction": {
"Path": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/install_delta.sh",
"Args": [
"s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/delta-core.jar"
]
}
}Installation of Tecton JARs from S3 to the destination
/home/hadoop/extrajars/*, includingtecton-udfs-spark-3.jarandtecton-online-store-sink-spark-3.jaranddelta-core.jarif you're using EMR 6.5:{
"Name": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/install_jars_from_s3.sh",
"ScriptBootstrapAction": {
"Path": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/install_jars_from_s3.sh",
"Args": [
"s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/tecton-udfs-spark-3.jar",
"s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/tecton-online-store-sink-spark-3.jar",
"s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/delta-core.jar"
]
}
}Installation of the materialization library from S3 to the destination
/home/hadoop/extrajars/*{
"Name": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/install_python_libraries_from_s3.sh",
"ScriptBootstrapAction": {
"Path": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/install_python_libraries_from_s3.sh",
"Args": [
"s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/tecton_materialization-latest-py3-none-any.whl"
]
}
}Installation of
pyarrow==5.0.0if you’re using EMR 6.5:{
"Name": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/install_python_libraries_from_pypi.sh",
"ScriptBootstrapAction": {
"Path": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/install_python_libraries_from_pypi.sh",
"Args": ["pyarrow==5.0.0"]
}
}
Steps appended by Tecton
A first step to enable debugging:
{
"Name": "Enable debugging",
"ActionOnFailure": "TERMINATE_JOB_FLOW",
"HadoopJarStep": {
"Properties": [],
"Jar": "command-runner.jar",
"Args": ["state-pusher-script"]
}
}A step for materialization, where transformation logic is run. Tecton will fill in the executable and its arguments to the
spark-submitcommand and append several required dependency packages to the--packagesargument and--jarsargument:{
"Name": "Tecton Materialization",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {
"Properties": [],
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
...,
"--packages",
"org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.10.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.qubole.spark:spark-sql-kinesis_2.12:1.2.0_spark-3.0,xerces:xercesImpl:2.8.0",
"--jars",
"/home/hadoop/extrajars/*",
"s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/materialization_entrypoint.py",
"--materialization-params",
"s3://<your_bucket>/internal/intermediate-data/materialization_params/<your_run_name>",
"--materialization-step",
"1",
"--spark-session-name",
"<your_feature_view_name>__<your_feature_view_id>"
]
}
}
If you would like to specify any additional library dependencies via --jars,
--packages or --py-files, see
following section.
A
Tecton Feature Store Writerstep:{
"Name": "Tecton Feature Store Writer",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {
"Properties": [],
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--packages",
"org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.10.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.qubole.spark:spark-sql-kinesis_2.12:1.2.0_spark-3.0,xerces:xercesImpl:2.8.0",
"--jars",
"/home/hadoop/extrajars/*",
"--conf",
"spark.driver.maxResultSize=4G",
"--conf",
"spark.sql.hive.caseSensitiveInferenceMode=NEVER_INFER",
"--conf",
"spark.serializer=org.apache.spark.serializer.KryoSerializer",
"--conf",
"spark.delta.logStore.class=io.delta.storage.DynamoDBLogStore",
"--conf",
"spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog",
"--conf",
"spark.delta.DynamoDBLogStore.tableName=delta",
"--conf",
"spark.delta.DynamoDBLogStore.region=us-west-2",
"--conf",
"spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension",
"s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/materialization_entrypoint.py",
"--materialization-params",
"s3://<your_bucket>/internal/intermediate-data/materialization_params/<your_run_name>",
"--materialization-step",
"2",
"--spark-session-name",
"<your_feature_view_name>__<your_feature_view_id>"
]
}
}
Configurations appended by Tecton
Properties for the spark-defaults, spark-metrics, yarn-env, spark-env
and livy-env classifications. Do not specify duplicate Properties keys as
shown below, since duplicate keys are not allowed.
"Configurations": [
...,
{
"Classification": "spark-defaults",
"Configurations": [...],
"Properties": {
...,
"spark.sql.streaming.metricsEnabled": "true",
"spark.metrics.namespace": "${spark.app.name}",
"spark.app.name": "<your_feature_view_name>__<your_feature_view_id>"
}
},
{
"Classification": "yarn-env",
"Configurations": [
...,
{
"Classification": "export",
"Configurations": [...],
"Properties": {
...,
"CLUSTER_REGION": "us-west-2",
"AWS_STS_REGIONAL_ENDPOINTS": "regional",
"TECTON_CLUSTER_NAME": "<your_cluster_name>"
}
}
],
"Properties": {...}
},
{
"Classification": "spark-metrics",
"Configurations": [...],
"Properties": {
...,
"*.sink.statsd.prefix": "spark",
"*.sink.statsd.host": "0.0.0.0",
"*.sink.statsd.period": "5",
"*.sink.statsd.unit": "seconds",
"*.sink.statsd.port": "3031",
"*.source.jvm.class": "org.apache.spark.metrics.source.JvmSource",
"*.sink.statsd.class": "org.apache.spark.metrics.sink.StatsdSink"
}
},
{
"Classification": "spark-env",
"Configurations": [
...,
{
"Classification": "export",
"Configurations": [...],
"Properties": {
...,
"CLUSTER_REGION": "us-west-2",
"AWS_STS_REGIONAL_ENDPOINTS": "regional",
"TECTON_CLUSTER_NAME": "<your_cluster_name>"
}
}
],
"Properties": {...}
},
{
"Classification": "livy-env",
"Configurations": [
...,
{
"Classification": "export",
"Configurations": [...],
"Properties": {
...,
"CLUSTER_REGION": "us-west-2",
"AWS_STS_REGIONAL_ENDPOINTS": "regional",
"TECTON_CLUSTER_NAME": "<your_cluster_name>"
}
}
],
"Properties": {...}
}
]
If you use a Glue or Iceberg catalog as your metastore, you may want to specify
hive-site and spark-hive-site settings.
Tags appended by Tecton
Do not specify tags with the same key as shown below, since duplicate keys are not allowed for tags.
"Tags": [
...,
{
"Key": "feature_view_id",
"Value": "<your_feature_view_id>"
},
{
"Key": "tecton-owned",
"Value": "true"
},
{
"Key": "tecton-cluster-name",
"Value": "<your_cluster_name>"
},
{
"Key": "tecton_feature_view",
"Value": "<your_feature_view_name>"
},
{
"Key": "tecton_deployment",
"Value": "<your_deployment_name>"
},
{
"Key": "tecton_workspace",
"Value": "<your_workspace_name>"
}
]
Attaching custom libraries to EMR materialization jobs
Download and install your dependency libraries to your job clusters. We recommend doing so either via a bootstrap action or a step.
Pass your dependency libraries as arguments to the spark-submit command in
Tecton Materializationstep. For example, if you want to attach all custom jars installed in/home/hadoop/customjars/*directory, a python library in an accessible S3 location and the protobuf maven package, theTecton Materializationstep config in your JSON could read as follows:{
"Name": "Tecton Materialization",
"HadoopJarStep": {
"Args": [
"spark-submit",
"--packages",
"com.google.protobuf:protobuf-java:3.22.0",
"--jars",
"/home/hadoop/customjars/*",
"--py-files",
"s3://<path_to_your_python_library>"
]
}
}
Example EMR cluster configuration
The following example first shows a sample JSON value that is passed to a
EMRJsonClusterConfig object. Next, the final JSON value (including JSON
settings overridden and appended by Tecton) is shown. In the final JSON value,
settings appended by Tecton are highlighted.
JSON value passed to the EMRJsonClusterConfig object
{
"ReleaseLabel": "emr-6.9.0",
"Instances": {
"InstanceGroups": [],
"InstanceFleets": [
{
"InstanceFleetType": "CORE",
"TargetOnDemandCapacity": 0,
"TargetSpotCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "r4.xlarge",
"BidPriceAsPercentageOfOnDemandPrice": 100,
"Configurations": []
}
]
},
{
"InstanceFleetType": "MASTER",
"TargetOnDemandCapacity": 0,
"TargetSpotCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "r4.xlarge",
"BidPriceAsPercentageOfOnDemandPrice": 100,
"Configurations": []
}
]
}
],
"Ec2SubnetIds": ["subnet-your_net_a", "subnet-your_net_b"],
"EmrManagedMasterSecurityGroup": "sg-your_group_a",
"EmrManagedSlaveSecurityGroup": "sg-your_group_b",
"ServiceAccessSecurityGroup": "sg-your_group_c",
"AdditionalMasterSecurityGroups": [],
"AdditionalSlaveSecurityGroups": []
},
"Configurations": [
{
"Classification": "spark-defaults",
"Configurations": [],
"Properties": {
"spark.driver.maxResultSize": "4G",
"spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension,org.apache.spark.sql.hudi.HoodieSparkSessionExtension",
"spark.sql.catalogImplementation": "hive",
"spark.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
"spark.hadoop.hive.metastore.glue.catalogid": "your_account_id",
"spark.hive.metastore.glue.catalogid": "your_account_id",
"spark.sql.legacy.parquet.datetimeRebaseModeInRead": "CORRECTED",
"spark.sql.legacy.parquet.int96RebaseModeInRead": "CORRECTED",
"spark.sql.legacy.parquet.int96RebaseModeInWrite": "CORRECTED"
}
},
{
"Classification": "hive-site",
"Configurations": [],
"Properties": {
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
"hive.metastore.glue.catalogid": "your_account_id"
}
},
{
"Classification": "spark-hive-site",
"Configurations": [],
"Properties": {
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
"hive.metastore.glue.catalogid": "your_account_id"
}
},
{
"Classification": "yarn-env",
"Configurations": [
{
"Classification": "export",
"Configurations": [],
"Properties": {
"TEST_ENV_VAR": "TEST_VALUE"
}
}
],
"Properties": {}
}
],
"BootstrapActions": [
{
"Name": "install_jars_from_s3",
"ScriptBootstrapAction": {
"Path": "s3://tecton-dev-shared/blair/install_jars_from_s3.sh",
"Args": [
"s3://tecton.ai.public/jars/spark-snowflake_2.12-2.9.1-spark_3.0.jar",
"s3://tecton.ai.public/jars/snowflake-jdbc-3.13.6.jar"
]
}
},
{
"Name": "install_python_libraries_from_pypi",
"ScriptBootstrapAction": {
"Path": "s3://tecton-dev-shared/blair/install_python_libraries_from_pypi.sh",
"Args": ["apache-sedona"]
}
}
],
"Steps": [
{
"Name": "Test Custom Step",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Properties": [],
"Jar": "command-runner.jar",
"Args": ["state-pusher-script"]
}
},
{
"Name": "Tecton Materialization",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {
"Properties": [],
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--executor-memory",
"4g",
"--packages",
"com.google.protobuf:protobuf-java:3.22.0",
"--jars",
"/home/hadoop/customjars/*",
"--py-files",
"s3://tecton-dev-shared/blair/test.whl"
]
}
}
],
"JobFlowRole": "your-job-flow-role",
"ServiceRole": "your-service-role",
"CustomAmiId": "your-custom-AMI-ID"
}
Final JSON value (with settings appended by Tecton highlighted) that is passed to EMR
{
"Name": "orch-test_bfv-fv_id-BATCH_2022.11.02.00:00:00-1",
"LogUri": "s3://path_to_log_store/your_run_name",
"ReleaseLabel": "emr-6.9.0",
"Instances": {
"InstanceGroups": [],
"InstanceFleets": [
{
"InstanceFleetType": "CORE",
"TargetOnDemandCapacity": 0,
"TargetSpotCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "r4.xlarge",
"BidPriceAsPercentageOfOnDemandPrice": 100,
"Configurations": []
}
]
},
{
"InstanceFleetType": "MASTER",
"TargetOnDemandCapacity": 0,
"TargetSpotCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "r4.xlarge",
"BidPriceAsPercentageOfOnDemandPrice": 100,
"Configurations": []
}
]
}
],
"Ec2SubnetIds": ["subnet-your_net_a", "subnet-your_net_b"],
"EmrManagedMasterSecurityGroup": "sg-your_group_a",
"EmrManagedSlaveSecurityGroup": "sg-your_group_b",
"ServiceAccessSecurityGroup": "sg-your_group_c",
"AdditionalMasterSecurityGroups": [],
"AdditionalSlaveSecurityGroups": []
},
"Steps": [
{
"Name": "Enable debugging",
"ActionOnFailure": "TERMINATE_JOB_FLOW",
"HadoopJarStep": {
"Properties": [],
"Jar": "command-runner.jar",
"Args": ["state-pusher-script"]
}
},
{
"Name": "Test Custom Step",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Properties": [],
"Jar": "command-runner.jar",
"Args": ["state-pusher-script"]
}
},
{
"Name": "Tecton Materialization",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {
"Properties": [],
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--executor-memory",
"4g",
"--packages",
"com.google.protobuf:protobuf-java:3.22.0,org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.10.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.qubole.spark:spark-sql-kinesis_2.12:1.2.0_spark-3.0,xerces:xercesImpl:2.8.0",
"--jars",
"/home/hadoop/customjars/*,/home/hadoop/extrajars/*",
"--py-files",
"s3://tecton-dev-shared/blair/test.whl",
"s3://path_to_script_store/materialization_entrypoint.py",
"--materialization-params",
"s3://path_to_param_store/your_run_name",
"--materialization-step",
"1",
"--spark-session-name",
"test_bfv__fv_id"
]
}
},
{
"Name": "Tecton Feature Store Writer",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {
"Properties": [],
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--packages",
"org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.10.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.qubole.spark:spark-sql-kinesis_2.12:1.2.0_spark-3.0,xerces:xercesImpl:2.8.0",
"--jars",
"/home/hadoop/extrajars/*",
"--conf",
"spark.driver.maxResultSize=4G",
"--conf",
"spark.sql.hive.caseSensitiveInferenceMode=NEVER_INFER",
"--conf",
"spark.serializer=org.apache.spark.serializer.KryoSerializer",
"--conf",
"spark.sql.sources.partitionOverwriteMode=dynamic",
"--conf",
"spark.delta.logStore.class=io.delta.storage.DynamoDBLogStore",
"--conf",
"spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog",
"--conf",
"spark.delta.DynamoDBLogStore.tableName=delta",
"--conf",
"spark.delta.DynamoDBLogStore.region=us-west-2",
"--conf",
"spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension",
"s3://path_to_script_store/materialization_entrypoint.py",
"--materialization-params",
"s3://path_to_param_store/your_run_name",
"--materialization-step",
"2",
"--spark-session-name",
"test_bfv__fv_id"
]
}
}
],
"BootstrapActions": [
{
"Name": "install_jars_from_s3",
"ScriptBootstrapAction": {
"Path": "s3://tecton-dev-shared/blair/install_jars_from_s3.sh",
"Args": [
"s3://tecton.ai.public/jars/spark-snowflake_2.12-2.9.1-spark_3.0.jar",
"s3://tecton.ai.public/jars/snowflake-jdbc-3.13.6.jar"
]
}
},
{
"Name": "install_python_libraries_from_pypi",
"ScriptBootstrapAction": {
"Path": "s3://tecton-dev-shared/blair/install_python_libraries_from_pypi.sh",
"Args": ["apache-sedona"]
}
},
{
"Name": "s3://path_to_script_store/install_python_libraries_from_s3.sh",
"ScriptBootstrapAction": {
"Path": "s3://path_to_script_store/install_python_libraries_from_s3.sh",
"Args": [
"s3://path_to_script_store/tecton_materialization-latest-py3-none-any.whl"
]
}
},
{
"Name": "s3://path_to_script_store/install_jars_from_s3.sh",
"ScriptBootstrapAction": {
"Path": "s3://path_to_script_store/install_jars_from_s3.sh",
"Args": [
"s3://path_to_script_store/tecton-udfs-spark-3.jar",
"s3://path_to_script_store/tecton-online-store-sink-spark-3.jar"
]
}
},
{
"Name": "s3://path_to_script_store/emr_chronocollector_init_script_v2.sh",
"ScriptBootstrapAction": {
"Path": "s3://path_to_script_store/emr_chronocollector_init_script_v2.sh",
"Args": []
}
}
],
"Applications": [
{
"Name": "Hadoop",
"Args": [],
"AdditionalInfo": {}
},
{
"Name": "Hive",
"Args": [],
"AdditionalInfo": {}
},
{
"Name": "Spark",
"Args": [],
"AdditionalInfo": {}
},
{
"Name": "Livy",
"Args": [],
"AdditionalInfo": {}
}
],
"Configurations": [
{
"Classification": "spark-defaults",
"Configurations": [],
"Properties": {
"spark.sql.legacy.parquet.datetimeRebaseModeInRead": "CORRECTED",
"spark.sql.legacy.parquet.int96RebaseModeInRead": "CORRECTED",
"spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension,org.apache.spark.sql.hudi.HoodieSparkSessionExtension",
"spark.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
"spark.hive.metastore.glue.catalogid": "your_account_id",
"spark.sql.catalogImplementation": "hive",
"spark.hadoop.hive.metastore.glue.catalogid": "your_account_id",
"spark.sql.legacy.parquet.int96RebaseModeInWrite": "CORRECTED",
"spark.sql.streaming.metricsEnabled": "true",
"spark.metrics.namespace": "${spark.app.name}",
"spark.app.name": "test_bfv__fv_id"
}
},
{
"Classification": "hive-site",
"Configurations": [],
"Properties": {
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
"hive.metastore.glue.catalogid": "your_account_id"
}
},
{
"Classification": "spark-hive-site",
"Configurations": [],
"Properties": {
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
"hive.metastore.glue.catalogid": "your_account_id"
}
},
{
"Classification": "yarn-env",
"Configurations": [
{
"Classification": "export",
"Configurations": [],
"Properties": {
"TEST_ENV_VAR": "TEST_VALUE",
"CLUSTER_REGION": "us-west-2",
"AWS_STS_REGIONAL_ENDPOINTS": "regional",
"TECTON_CLUSTER_NAME": "dev-blair"
}
}
],
"Properties": {}
},
{
"Classification": "spark-metrics",
"Configurations": [],
"Properties": {
"*.sink.statsd.prefix": "spark",
"*.sink.statsd.host": "0.0.0.0",
"*.sink.statsd.period": "5",
"*.sink.statsd.unit": "seconds",
"*.sink.statsd.port": "3031",
"*.source.jvm.class": "org.apache.spark.metrics.source.JvmSource",
"*.sink.statsd.class": "org.apache.spark.metrics.sink.StatsdSink"
}
},
{
"Classification": "spark-env",
"Configurations": [
{
"Classification": "export",
"Configurations": [],
"Properties": {
"CLUSTER_REGION": "us-west-2",
"AWS_STS_REGIONAL_ENDPOINTS": "regional",
"TECTON_CLUSTER_NAME": "dev-blair"
}
}
],
"Properties": {}
},
{
"Classification": "livy-env",
"Configurations": [
{
"Classification": "export",
"Configurations": [],
"Properties": {
"CLUSTER_REGION": "us-west-2",
"AWS_STS_REGIONAL_ENDPOINTS": "regional",
"TECTON_CLUSTER_NAME": "dev-blair"
}
}
],
"Properties": {}
}
],
"JobFlowRole": "your-job-flow-role",
"ServiceRole": "your-service-role",
"CustomAmiId": "your-custom-AMI-ID",
"Tags": [
{
"Key": "feature_view_id",
"Value": "fv_id"
},
{
"Key": "tecton-owned",
"Value": "true"
},
{
"Key": "tecton-cluster-name",
"Value": "dev-blair"
},
{
"Key": "tecton_feature_view",
"Value": "test_bfv"
},
{
"Key": "tecton_deployment",
"Value": "dev-blair"
},
{
"Key": "tecton_workspace",
"Value": "emr-1"
},
{
"Key": "tecton-accessible:dev-shared",
"Value": "true"
}
]
}
Migrating simple cluster config objects to custom cluster config objects
To migrate simple cluster config objects to custom cluster config objects, follow these steps.
- Upgrade your Tecton SDK version to 0.6
- For each Feature View that uses simple cluster config objects:
- Replace the simple cluster config objects with new custom cluster config objects.
- As needed, update
batch_computeand/orstream_computein your Feature view declaration to use the new custom cluster config objects. - In the Tecton CLI, run
tecton applyto apply your changes. The new configuration will be used for any future jobs.