Skip to content

Commit

Permalink
Exporter: improvements in jobs, DLT pipelines and policies (#3140)
Browse files Browse the repository at this point in the history
This PR includes following changes:

* Emit DBFS & Workspace file references for job tasks parameters.
* For DLT pipelines emit DBFS & Workspace file references, cluster policy, secrets from
  pipeline configuration
* Fixed generation of cluster policy definition broken after introduction of policy
  families overrides
  • Loading branch information
alexott authored Jan 22, 2024
1 parent 04a180b commit dcbfe25
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 45 deletions.
1 change: 1 addition & 0 deletions exporter/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,7 @@ func (ic *importContext) dataToHcl(i importable, path []string,
case schema.TypeFloat:
body.SetAttributeValue(a, cty.NumberFloatVal(raw.(float64)))
case schema.TypeMap:
// TODO: Resolve references in maps as well, and also support different types inside map...
ov := map[string]cty.Value{}
for key, iv := range raw.(map[string]any) {
v := cty.StringVal(fmt.Sprintf("%v", iv))
Expand Down
130 changes: 87 additions & 43 deletions exporter/importables.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,51 +348,66 @@ var resourcesMap map[string]importable = map[string]importable{
fmt.Sprintf("%s_%s", name, d.Id()), "_")
},
Depends: []reference{
{Path: "email_notifications.on_duration_warning_threshold_exceeded", Resource: "databricks_user", Match: "user_name", MatchType: MatchCaseInsensitive},
{Path: "email_notifications.on_failure", Resource: "databricks_user", Match: "user_name", MatchType: MatchCaseInsensitive},
{Path: "email_notifications.on_success", Resource: "databricks_user", Match: "user_name", MatchType: MatchCaseInsensitive},
{Path: "email_notifications.on_start", Resource: "databricks_user", Match: "user_name", MatchType: MatchCaseInsensitive},
{Path: "email_notifications.on_duration_warning_threshold_exceeded", Resource: "databricks_user",
Match: "user_name", MatchType: MatchCaseInsensitive},
{Path: "email_notifications.on_success", Resource: "databricks_user", Match: "user_name", MatchType: MatchCaseInsensitive},
{Path: "job_cluster.new_cluster.aws_attributes.instance_profile_arn", Resource: "databricks_instance_profile"},
{Path: "job_cluster.new_cluster.driver_instance_pool_id", Resource: "databricks_instance_pool"},
{Path: "job_cluster.new_cluster.init_scripts.dbfs.destination", Resource: "databricks_dbfs_file", Match: "dbfs_path"},
{Path: "job_cluster.new_cluster.init_scripts.workspace.destination", Resource: "databricks_repo", Match: "workspace_path", MatchType: MatchPrefix},
{Path: "job_cluster.new_cluster.init_scripts.workspace.destination", Resource: "databricks_workspace_file"},
{Path: "job_cluster.new_cluster.instance_pool_id", Resource: "databricks_instance_pool"},
{Path: "job_cluster.new_cluster.policy_id", Resource: "databricks_cluster_policy"},
{Path: "run_as.service_principal_name", Resource: "databricks_service_principal", Match: "application_id"},
{Path: "run_as.user_name", Resource: "databricks_user", Match: "user_name", MatchType: MatchCaseInsensitive},
{Path: "task.dbt_task.warehouse_id", Resource: "databricks_sql_endpoint"},
{Path: "task.existing_cluster_id", Resource: "databricks_cluster"},
{Path: "task.library.egg", Resource: "databricks_dbfs_file", Match: "dbfs_path"},
{Path: "task.library.egg", Resource: "databricks_workspace_file", Match: "workspace_path"},
{Path: "task.library.jar", Resource: "databricks_dbfs_file", Match: "dbfs_path"},
{Path: "task.library.jar", Resource: "databricks_workspace_file", Match: "workspace_path"},
{Path: "task.library.whl", Resource: "databricks_dbfs_file", Match: "dbfs_path"},
{Path: "task.library.whl", Resource: "databricks_workspace_file", Match: "workspace_path"},
{Path: "task.library.whl", Resource: "databricks_repo", Match: "workspace_path", MatchType: MatchPrefix},
{Path: "task.library.egg", Resource: "databricks_dbfs_file", Match: "dbfs_path"},
{Path: "task.library.egg", Resource: "databricks_workspace_file", Match: "workspace_path"},
{Path: "task.spark_python_task.python_file", Resource: "databricks_dbfs_file", Match: "dbfs_path"},
{Path: "task.spark_python_task.python_file", Resource: "databricks_workspace_file", Match: "path"},
{Path: "task.spark_python_task.python_file", Resource: "databricks_repo", Match: "path", MatchType: MatchPrefix},
{Path: "task.spark_python_task.parameters", Resource: "databricks_dbfs_file", Match: "dbfs_path"},
{Path: "task.spark_jar_task.jar_uri", Resource: "databricks_dbfs_file", Match: "dbfs_path"},
{Path: "task.notebook_task.notebook_path", Resource: "databricks_notebook"},
{Path: "task.run_job_task.job_id", Resource: "databricks_job"},
{Path: "task.notebook_task.notebook_path", Resource: "databricks_repo", Match: "path", MatchType: MatchPrefix},
{Path: "task.pipeline_task.pipeline_id", Resource: "databricks_pipeline"},
{Path: "task.sql_task.query.query_id", Resource: "databricks_sql_query"},
{Path: "task.sql_task.dashboard.dashboard_id", Resource: "databricks_sql_dashboard"},
{Path: "task.sql_task.alert.alert_id", Resource: "databricks_sql_alert"},
{Path: "task.sql_task.warehouse_id", Resource: "databricks_sql_endpoint"},
{Path: "task.dbt_task.warehouse_id", Resource: "databricks_sql_endpoint"},
{Path: "task.library.whl", Resource: "databricks_workspace_file", Match: "workspace_path"},
{Path: "task.new_cluster.aws_attributes.instance_profile_arn", Resource: "databricks_instance_profile"},
{Path: "task.new_cluster.driver_instance_pool_id", Resource: "databricks_instance_pool"},
{Path: "task.new_cluster.init_scripts.dbfs.destination", Resource: "databricks_dbfs_file", Match: "dbfs_path"},
{Path: "task.new_cluster.init_scripts.workspace.destination", Resource: "databricks_repo", Match: "workspace_path", MatchType: MatchPrefix},
{Path: "task.new_cluster.init_scripts.workspace.destination", Resource: "databricks_workspace_file"},
{Path: "task.new_cluster.init_scripts.workspace.destination", Resource: "databricks_repo",
Match: "workspace_path", MatchType: MatchPrefix},
{Path: "task.new_cluster.instance_pool_id", Resource: "databricks_instance_pool"},
{Path: "task.new_cluster.driver_instance_pool_id", Resource: "databricks_instance_pool"},
{Path: "task.new_cluster.policy_id", Resource: "databricks_cluster_policy"},
{Path: "task.existing_cluster_id", Resource: "databricks_cluster"},
{Path: "job_cluster.new_cluster.aws_attributes.instance_profile_arn", Resource: "databricks_instance_profile"},
{Path: "job_cluster.new_cluster.init_scripts.dbfs.destination", Resource: "databricks_dbfs_file", Match: "dbfs_path"},
{Path: "job_cluster.new_cluster.init_scripts.workspace.destination", Resource: "databricks_workspace_file"},
{Path: "job_cluster.new_cluster.init_scripts.workspace.destination", Resource: "databricks_repo",
Match: "workspace_path", MatchType: MatchPrefix},
{Path: "job_cluster.new_cluster.instance_pool_id", Resource: "databricks_instance_pool"},
{Path: "job_cluster.new_cluster.driver_instance_pool_id", Resource: "databricks_instance_pool"},
{Path: "job_cluster.new_cluster.policy_id", Resource: "databricks_cluster_policy"},
{Path: "run_as.user_name", Resource: "databricks_user", Match: "user_name", MatchType: MatchCaseInsensitive},
{Path: "run_as.service_principal_name", Resource: "databricks_service_principal", Match: "application_id"},
{Path: "task.notebook_task.base_parameters", Resource: "databricks_dbfs_file", Match: "dbfs_path"},
{Path: "task.notebook_task.base_parameters", Resource: "databricks_repo", Match: "workspace_path", MatchType: MatchPrefix},
{Path: "task.notebook_task.base_parameters", Resource: "databricks_workspace_file", Match: "workspace_path"},
{Path: "task.notebook_task.notebook_path", Resource: "databricks_notebook"},
{Path: "task.notebook_task.notebook_path", Resource: "databricks_repo", Match: "path", MatchType: MatchPrefix},
{Path: "task.pipeline_task.pipeline_id", Resource: "databricks_pipeline"},
{Path: "task.python_wheel_task.named_parameters", Resource: "databricks_dbfs_file", Match: "dbfs_path"},
{Path: "task.python_wheel_task.named_parameters", Resource: "databricks_repo", Match: "workspace_path", MatchType: MatchPrefix},
{Path: "task.python_wheel_task.named_parameters", Resource: "databricks_workspace_file", Match: "workspace_path"},
{Path: "task.python_wheel_task.parameters", Resource: "databricks_dbfs_file", Match: "dbfs_path"},
{Path: "task.python_wheel_task.parameters", Resource: "databricks_repo", Match: "workspace_path", MatchType: MatchPrefix},
{Path: "task.python_wheel_task.parameters", Resource: "databricks_workspace_file", Match: "workspace_path"},
{Path: "task.run_job_task.job_id", Resource: "databricks_job"},
{Path: "task.run_job_task.job_parameters", Resource: "databricks_dbfs_file", Match: "dbfs_path"},
{Path: "task.run_job_task.job_parameters", Resource: "databricks_repo", Match: "workspace_path", MatchType: MatchPrefix},
{Path: "task.run_job_task.job_parameters", Resource: "databricks_workspace_file", Match: "workspace_path"},
{Path: "task.spark_jar_task.jar_uri", Resource: "databricks_dbfs_file", Match: "dbfs_path"},
{Path: "task.spark_jar_task.parameters", Resource: "databricks_dbfs_file", Match: "dbfs_path"},
{Path: "task.spark_jar_task.parameters", Resource: "databricks_repo", Match: "workspace_path", MatchType: MatchPrefix},
{Path: "task.spark_jar_task.parameters", Resource: "databricks_workspace_file", Match: "workspace_path"},
{Path: "task.spark_python_task.parameters", Resource: "databricks_dbfs_file", Match: "dbfs_path"},
{Path: "task.spark_python_task.python_file", Resource: "databricks_dbfs_file", Match: "dbfs_path"},
{Path: "task.spark_python_task.python_file", Resource: "databricks_repo", Match: "path", MatchType: MatchPrefix},
{Path: "task.spark_python_task.python_file", Resource: "databricks_workspace_file", Match: "path"},
{Path: "task.spark_submit_task.parameters", Resource: "databricks_dbfs_file", Match: "dbfs_path"},
{Path: "task.spark_submit_task.parameters", Resource: "databricks_repo", Match: "workspace_path", MatchType: MatchPrefix},
{Path: "task.spark_submit_task.parameters", Resource: "databricks_workspace_file", Match: "workspace_path"},
{Path: "task.sql_task.alert.alert_id", Resource: "databricks_sql_alert"},
{Path: "task.sql_task.dashboard.dashboard_id", Resource: "databricks_sql_dashboard"},
{Path: "task.sql_task.query.query_id", Resource: "databricks_sql_query"},
{Path: "task.sql_task.warehouse_id", Resource: "databricks_sql_endpoint"},
},
Import: func(ic *importContext, r *resource) error {
var job jobs.JobSettings
Expand All @@ -416,6 +431,7 @@ var resourcesMap map[string]importable = map[string]importable{
if task.NotebookTask.Source != "GIT" {
ic.emitNotebookOrRepo(task.NotebookTask.NotebookPath)
}
ic.emitFilesFromMap(task.NotebookTask.BaseParameters)
}
if task.PipelineTask != nil {
ic.Emit(&resource{
Expand All @@ -434,10 +450,17 @@ var resourcesMap map[string]importable = map[string]importable{
ic.emitWorkspaceFileOrRepo(task.SparkPythonTask.PythonFile)
}
}
for _, p := range task.SparkPythonTask.Parameters {
ic.emitIfDbfsFile(p)
ic.emitIfWsfsFile(p)
}
ic.emitFilesFromSlice(task.SparkPythonTask.Parameters)
}
if task.PythonWheelTask != nil {
ic.emitFilesFromSlice(task.PythonWheelTask.Parameters)
ic.emitFilesFromMap(task.PythonWheelTask.NamedParameters)
}
if task.SparkJarTask != nil {
ic.emitFilesFromSlice(task.SparkJarTask.Parameters)
}
if task.SparkSubmitTask != nil {
ic.emitFilesFromSlice(task.SparkSubmitTask.Parameters)
}
if task.SqlTask != nil {
if task.SqlTask.Query != nil {
Expand Down Expand Up @@ -478,6 +501,7 @@ var resourcesMap map[string]importable = map[string]importable{
Resource: "databricks_job",
ID: strconv.FormatInt(task.RunJobTask.JobID, 10),
})
ic.emitFilesFromMap(task.RunJobTask.JobParameters)
}
ic.importCluster(task.NewCluster)
ic.Emit(&resource{
Expand Down Expand Up @@ -704,6 +728,12 @@ var resourcesMap map[string]importable = map[string]importable{

return nil
},
ShouldOmitField: func(ic *importContext, pathString string, as *schema.Schema, d *schema.ResourceData) bool {
if pathString == "definition" {
return d.Get("policy_family_id").(string) != ""
}
return defaultShouldOmitFieldFunc(ic, pathString, as, d)
},
Depends: []reference{
{Path: "libraries.jar", Resource: "databricks_dbfs_file", Match: "dbfs_path"},
{Path: "libraries.jar", Resource: "databricks_workspace_file", Match: "workspace_path"},
Expand Down Expand Up @@ -1442,6 +1472,7 @@ var resourcesMap map[string]importable = map[string]importable{
ShouldOmitField: shouldOmitMd5Field,
Depends: []reference{
{Path: "source", File: true},
{Path: "path", Resource: "databricks_directory", MatchType: MatchPrefix},
{Path: "path", Resource: "databricks_user", Match: "home", MatchType: MatchPrefix},
{Path: "path", Resource: "databricks_service_principal", Match: "home", MatchType: MatchPrefix},
},
Expand Down Expand Up @@ -1499,6 +1530,7 @@ var resourcesMap map[string]importable = map[string]importable{
ShouldOmitField: shouldOmitMd5Field,
Depends: []reference{
{Path: "source", File: true},
{Path: "path", Resource: "databricks_directory", MatchType: MatchPrefix},
{Path: "path", Resource: "databricks_user", Match: "home", MatchType: MatchPrefix},
{Path: "path", Resource: "databricks_service_principal", Match: "home", MatchType: MatchPrefix},
},
Expand Down Expand Up @@ -1887,7 +1919,7 @@ var resourcesMap map[string]importable = map[string]importable{
ic.emitIfDbfsFile(lib.Jar)
ic.emitIfDbfsFile(lib.Whl)
}
// TODO: check if storage is like dbfs:/pipelines/uuid, then remove it from data
// Emit clusters
for _, cluster := range pipeline.Clusters {
if cluster.AwsAttributes != nil && cluster.AwsAttributes.InstanceProfileArn != "" {
ic.Emit(&resource{
Expand All @@ -1907,10 +1939,18 @@ var resourcesMap map[string]importable = map[string]importable{
ID: cluster.DriverInstancePoolID,
})
}
if cluster.PolicyID != "" {
ic.Emit(&resource{
Resource: "databricks_cluster_policy",
ID: cluster.PolicyID,
})
}
ic.emitInitScripts(cluster.InitScripts)
ic.emitSecretsFromSecretsPath(cluster.SparkConf)
ic.emitSecretsFromSecretsPath(cluster.SparkEnvVars)
}
ic.emitFilesFromMap(pipeline.Configuration)
ic.emitSecretsFromSecretsPath(pipeline.Configuration)

if ic.meAdmin {
ic.Emit(&resource{
Expand Down Expand Up @@ -1940,12 +1980,15 @@ var resourcesMap map[string]importable = map[string]importable{
},
Depends: []reference{
{Path: "cluster.aws_attributes.instance_profile_arn", Resource: "databricks_instance_profile"},
{Path: "new_cluster.init_scripts.dbfs.destination", Resource: "databricks_dbfs_file", Match: "dbfs_path"},
{Path: "new_cluster.init_scripts.workspace.destination", Resource: "databricks_workspace_file"},
{Path: "new_cluster.init_scripts.workspace.destination", Resource: "databricks_repo",
Match: "workspace_path", MatchType: MatchPrefix},
{Path: "cluster.init_scripts.dbfs.destination", Resource: "databricks_dbfs_file", Match: "dbfs_path"},
{Path: "cluster.init_scripts.workspace.destination", Resource: "databricks_workspace_file"},
{Path: "cluster.init_scripts.workspace.destination", Resource: "databricks_repo", Match: "workspace_path", MatchType: MatchPrefix},
{Path: "cluster.instance_pool_id", Resource: "databricks_instance_pool"},
{Path: "cluster.driver_instance_pool_id", Resource: "databricks_instance_pool"},
{Path: "cluster.policy_id", Resource: "databricks_cluster_policy"},
{Path: "configuration", Resource: "databricks_dbfs_file", Match: "dbfs_path"},
{Path: "configuration", Resource: "databricks_repo", Match: "workspace_path", MatchType: MatchPrefix},
{Path: "configuration", Resource: "databricks_workspace_file", Match: "workspace_path"},
{Path: "library.notebook.path", Resource: "databricks_notebook"},
{Path: "library.notebook.path", Resource: "databricks_repo", Match: "path", MatchType: MatchPrefix},
{Path: "library.file.path", Resource: "databricks_workspace_file"},
Expand Down Expand Up @@ -2011,6 +2054,7 @@ var resourcesMap map[string]importable = map[string]importable{
},
Body: resourceOrDataBlockBody,
Depends: []reference{
// TODO: it should try to find longest reference to another directory object that it not itself...
{Path: "path", Resource: "databricks_user", Match: "home", MatchType: MatchPrefix},
{Path: "path", Resource: "databricks_service_principal", Match: "home", MatchType: MatchPrefix},
},
Expand Down
27 changes: 26 additions & 1 deletion exporter/importables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,7 @@ func TestNotebookGenerationBadCharacters(t *testing.T) {
},
}, "notebooks,directories", true, func(ic *importContext) {
ic.notebooksFormat = "SOURCE"
ic.services = []string{"notebooks"}
err := resourcesMap["databricks_notebook"].List(ic)
assert.NoError(t, err)
ic.waitGroup.Wait()
Expand Down Expand Up @@ -1400,8 +1401,32 @@ func TestListUcAllowListSuccess(t *testing.T) {
func TestEmitSqlParent(t *testing.T) {
ic := importContextForTest()
ic.emitSqlParentDirectory("")
assert.Equal(t, len(ic.testEmits), 0)
assert.Equal(t, 0, len(ic.testEmits))
ic.emitSqlParentDirectory("folders/12345")
assert.Equal(t, 1, len(ic.testEmits))
assert.Contains(t, ic.testEmits, "databricks_directory[<unknown>] (object_id: 12345)")
}

func TestEmitFilesFromSlice(t *testing.T) {
ic := importContextForTest()
ic.emitFilesFromSlice([]string{
"dbfs:/FileStore/test.txt",
"/Workspace/Shared/test.txt",
"nothing",
})
assert.Equal(t, 2, len(ic.testEmits))
assert.Contains(t, ic.testEmits, "databricks_dbfs_file[<unknown>] (id: dbfs:/FileStore/test.txt)")
assert.Contains(t, ic.testEmits, "databricks_workspace_file[<unknown>] (id: /Shared/test.txt)")
}

func TestEmitFilesFromMap(t *testing.T) {
ic := importContextForTest()
ic.emitFilesFromMap(map[string]string{
"k1": "dbfs:/FileStore/test.txt",
"k2": "/Workspace/Shared/test.txt",
"k3": "nothing",
})
assert.Equal(t, 2, len(ic.testEmits))
assert.Contains(t, ic.testEmits, "databricks_dbfs_file[<unknown>] (id: dbfs:/FileStore/test.txt)")
assert.Contains(t, ic.testEmits, "databricks_workspace_file[<unknown>] (id: /Shared/test.txt)")
}
15 changes: 14 additions & 1 deletion exporter/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,26 @@ func (ic *importContext) emitInitScripts(initScripts []clusters.InitScriptStorag
ic.emitWorkspaceFileOrRepo(is.Workspace.Destination)
}
}
}

func (ic *importContext) emitFilesFromSlice(slice []string) {
for _, p := range slice {
ic.emitIfDbfsFile(p)
ic.emitIfWsfsFile(p)
}
}

func (ic *importContext) emitFilesFromMap(m map[string]string) {
for _, p := range m {
ic.emitIfDbfsFile(p)
ic.emitIfWsfsFile(p)
}
}

func (ic *importContext) importCluster(c *clusters.Cluster) {
if c == nil {
return
}
ic.emitInitScripts(c.InitScripts)
if c.AwsAttributes != nil {
ic.Emit(&resource{
Resource: "databricks_instance_profile",
Expand All @@ -76,6 +88,7 @@ func (ic *importContext) importCluster(c *clusters.Cluster) {
ID: c.PolicyID,
})
}
ic.emitInitScripts(c.InitScripts)
ic.emitSecretsFromSecretsPath(c.SparkConf)
ic.emitSecretsFromSecretsPath(c.SparkEnvVars)
ic.emitUserOrServicePrincipal(c.SingleUserName)
Expand Down

0 comments on commit dcbfe25

Please sign in to comment.