From 9807e71ea146d5957eb2ea0c942e69c45666491b Mon Sep 17 00:00:00 2001 From: Mark Kromer Date: Fri, 19 Apr 2019 00:54:57 -0700 Subject: [PATCH 1/2] Updating dataflow: Dedupe3 Updating dataflow: FactLoader2 Updating dataflow: dataflow_params1 --- dataflow/Dedupe3.json | 16 +++++++- dataflow/FactLoader2.json | 8 ++++ dataflow/dataflow_params1.json | 73 ++++++++++++++++++---------------- 3 files changed, 62 insertions(+), 35 deletions(-) diff --git a/dataflow/Dedupe3.json b/dataflow/Dedupe3.json index f3475199..f07b23ed 100644 --- a/dataflow/Dedupe3.json +++ b/dataflow/Dedupe3.json @@ -13,10 +13,20 @@ "script": "source(output(\n\t\tacctnum as string,\n\t\tfullname as string,\n\t\tphone as string,\n\t\tzip as string\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false) ~> sourceNames" } ], + "sinks": [ + { + "dataset": { + "referenceName": "Addresses_Blob", + "type": "DatasetReference" + }, + "name": "sink1", + "script": "Copy sink(input(\n\t\tAddressID as string,\n\t\tAddress1 as string,\n\t\tAddress2 as string,\n\t\tCity as string,\n\t\tProp_4 as string,\n\t\tProp_5 as string,\n\t\tProp_6 as string,\n\t\tProp_7 as string,\n\t\tProp_8 as string\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false,\n\tpartitionBy('hash', 1),\n\trowUrlColumn:'undefined') ~> sink1" + } + ], "transformations": [ { "name": "FuzzyMatch", - "script": "sourceNames derive(SoundexValue = soundex(fullname)) ~> FuzzyMatch" + "script": "DerivedColumn1 derive(SoundexValue = soundex(fullname)) ~> FuzzyMatch" }, { "name": "CreateRowHash", @@ -41,6 +51,10 @@ { "name": "FinalResults", "script": "JoinCopyToOrig select(mapColumn(\n\t\tacctnum,\n\t\tfullname,\n\t\tphone,\n\t\tzip\n\t))~> FinalResults" + }, + { + "name": "DerivedColumn1", + "script": "sourceNames derive(myfilename = ERROR_FUNCTION('')) ~> DerivedColumn1" } ] } diff --git a/dataflow/FactLoader2.json b/dataflow/FactLoader2.json index e384178a..2c04b5a3 100644 --- a/dataflow/FactLoader2.json +++ b/dataflow/FactLoader2.json @@ -29,6 +29,14 @@ }, "name": "writeFactTable", "script": "JoinAllColumns sink(allowSchemaDrift: true,\n\tvalidateSchema: false,\n\tformat: 'table',\n\tbatchSize:50,\n\tdeletable:false,\n\tinsertable:true,\n\tupdateable:false,\n\tupsertable:false,\n\trecreate:true,\n\tmapColumn(\n\t\tProjDate = OrigData@ProjDate,\n\t\temployeeID = OrigData@employeeID,\n\t\tTotalHours,\n\t\tTotalAmount,\n\t\tfacttimestamp,\n\t\tsurrogatekey,\n\t\tRegion,\n\t\tStatus,\n\t\tEmpFunction,\n\t\tLevel,\n\t\tRole,\n\t\tprocesstime\n\t),\n\tpartitionBy('roundRobin', 4)) ~> writeFactTable" + }, + { + "dataset": { + "referenceName": "Addresses_Blob", + "type": "DatasetReference" + }, + "name": "sink1", + "script": "OrigData sink(input(\n\t\tAddressID as string,\n\t\tAddress1 as string,\n\t\tAddress2 as string,\n\t\tCity as string,\n\t\tProp_4 as string,\n\t\tProp_5 as string,\n\t\tProp_6 as string,\n\t\tProp_7 as string,\n\t\tProp_8 as string\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false,\n\tpartitionBy('hash', 1),\n\tpartitionFileNames:['file.csv']) ~> sink1" } ], "transformations": [ diff --git a/dataflow/dataflow_params1.json b/dataflow/dataflow_params1.json index 707fe219..b572fe64 100644 --- a/dataflow/dataflow_params1.json +++ b/dataflow/dataflow_params1.json @@ -4,41 +4,46 @@ "folder": { "name": "Params" }, - "sources": [ - { - "dataset": { - "referenceName": "Product_Blob", - "type": "DatasetReference" + "type": "MappingDataFlow", + "typeProperties": { + "sources": [ + { + "dataset": { + "referenceName": "EmployeeFiles", + "type": "DatasetReference" + }, + "name": "source1", + "script": "source(output(\n\t\tEmpID as string,\n\t\tRegion as string,\n\t\tStatus as string,\n\t\tFunction as string,\n\t\tLevel as string,\n\t\tRole as string,\n\t\tStartDate as string,\n\t\tEndDate as string\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false) ~> source1" }, - "name": "prodSource", - "script": "source(output(\n\t\tProductID as string,\n\t\tName as string,\n\t\tProductNumber as string,\n\t\tColor as string,\n\t\tStandardCost as decimal(10,0),\n\t\tListPrice as string,\n\t\tSize as string,\n\t\tWeight as string,\n\t\tProductCategoryID as string,\n\t\tProductModelID as string\n\t),\n\tallowSchemaDrift: false,\n\tvalidateSchema: false) ~> prodSource" - }, - { - "dataset": { - "referenceName": "blob_params", - "type": "DatasetReference" + { + "dataset": { + "referenceName": "EmployeeFiles", + "type": "DatasetReference" + }, + "name": "source2", + "script": "source(output(\n\t\tEmpID as string,\n\t\tRegion as string,\n\t\tStatus as string,\n\t\tFunction as string,\n\t\tLevel as string,\n\t\tRole as string,\n\t\tStartDate as string,\n\t\tEndDate as string\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false) ~> source2" + } + ], + "sinks": [ + { + "dataset": { + "referenceName": "Addresses_Blob", + "type": "DatasetReference" + }, + "name": "sink1", + "script": "DerivedColumn1 sink(input(\n\t\tAddressID as string,\n\t\tAddress1 as string,\n\t\tAddress2 as string,\n\t\tCity as string,\n\t\tProp_4 as string,\n\t\tProp_5 as string,\n\t\tProp_6 as string,\n\t\tProp_7 as string,\n\t\tProp_8 as string\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false,\n\tpartitionBy('roundRobin', 20),\n\tfilePattern:'undefined') ~> sink1" + } + ], + "transformations": [ + { + "name": "Lookup1", + "script": "source1, source2 lookup(ERROR_FUNCTION('') == ERROR_FUNCTION(''),\n\tbroadcast: 'none')~> Lookup1" }, - "name": "paramsFile", - "script": "source(output(\n\t\tParamColor as string\n\t),\n\tallowSchemaDrift: false,\n\tvalidateSchema: false) ~> paramsFile" - } - ], - "transformations": [ - { - "name": "getParams", - "script": "prodSource, paramsFile lookup(Color == ParamColor,\n\tbroadcast: 'none')~> getParams" - }, - { - "name": "FilterRows", - "script": "Select1 filter(Color == ParamColor) ~> FilterRows" - }, - { - "name": "AggregateStandardCost", - "script": "FilterRows aggregate(groupBy(Color),\n\tAverageStandardCostByColor = '$'+toString(round(avg(StandardCost),2))) ~> AggregateStandardCost" - }, - { - "name": "Select1", - "script": "getParams select(mapColumn(\n\t\tProductID,\n\t\tName,\n\t\tProductNumber,\n\t\tColor,\n\t\tStandardCost,\n\t\tListPrice,\n\t\tSize,\n\t\tWeight,\n\t\tProductCategoryID,\n\t\tProductModelID,\n\t\tParamColor\n\t))~> Select1" - } - ] + { + "name": "DerivedColumn1", + "script": "Lookup1 derive(myfilename = source2@Role) ~> DerivedColumn1" + } + ] + } } } \ No newline at end of file From 6ddf72cb6d2faa7cc9435ddf0b321cec43b31119 Mon Sep 17 00:00:00 2001 From: Mark Kromer Date: Tue, 23 Apr 2019 19:02:29 -0700 Subject: [PATCH 2/2] Updating pipeline: pipeline9 Updating dataflow: soccerETL Adding pipeline: pipeline11 --- dataflow/soccerETL.json | 4 +-- pipeline/pipeline11.json | 76 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 2 deletions(-) create mode 100644 pipeline/pipeline11.json diff --git a/dataflow/soccerETL.json b/dataflow/soccerETL.json index b5027380..3e20eb46 100644 --- a/dataflow/soccerETL.json +++ b/dataflow/soccerETL.json @@ -13,7 +13,7 @@ "type": "DatasetReference" }, "name": "specifySchemaExtracts", - "script": "source(output(\n\t\tid_odsp as string,\n\t\tid_event as integer,\n\t\tsort_order as short,\n\t\ttime as short,\n\t\ttext as string,\n\t\tevent_type as integer,\n\t\tevent_type2 as integer,\n\t\tside as short,\n\t\tevent_team as string,\n\t\topponent as string,\n\t\tplayer as string,\n\t\tplayer2 as string,\n\t\tplayer_in as string,\n\t\tplayer_out as string,\n\t\tshot_place as string,\n\t\tshot_outcome as string,\n\t\tis_goal as boolean,\n\t\tlocation as string,\n\t\tbodypart as string,\n\t\tassist_method as short,\n\t\tsituation as string,\n\t\tfast_break as boolean\n\t),\n\tallowSchemaDrift: false,\n\tvalidateSchema: false) ~> specifySchemaExtracts" + "script": "source(output(\n\t\tid_odsp as string,\n\t\tid_event as string,\n\t\tsort_order as short,\n\t\ttime as short,\n\t\ttext as string,\n\t\tevent_type as integer,\n\t\tevent_type2 as integer,\n\t\tside as short,\n\t\tevent_team as string,\n\t\topponent as string,\n\t\tplayer as string,\n\t\tplayer2 as string,\n\t\tplayer_in as string,\n\t\tplayer_out as string,\n\t\tshot_place as string,\n\t\tshot_outcome as string,\n\t\tis_goal as boolean,\n\t\tlocation as string,\n\t\tbodypart as string,\n\t\tassist_method as short,\n\t\tsituation as string,\n\t\tfast_break as boolean\n\t),\n\tallowSchemaDrift: false,\n\tvalidateSchema: false) ~> specifySchemaExtracts" }, { "dataset": { @@ -31,7 +31,7 @@ "type": "DatasetReference" }, "name": "sink1", - "script": "timeBins sink(allowSchemaDrift: true,\n\tvalidateSchema: false,\n\tpartitionBy('hash', 1),\n\tpartitionFileNames:['']) ~> sink1" + "script": "timeBins sink(allowSchemaDrift: true,\n\tvalidateSchema: false,\n\tpartitionBy('hash', 1),\n\tpartitionFileNames:['soccerOut.csv']) ~> sink1" } ], "transformations": [ diff --git a/pipeline/pipeline11.json b/pipeline/pipeline11.json new file mode 100644 index 00000000..6996d15e --- /dev/null +++ b/pipeline/pipeline11.json @@ -0,0 +1,76 @@ +{ + "name": "pipeline11", + "properties": { + "activities": [ + { + "name": "Copy Data1", + "type": "Copy", + "policy": { + "timeout": "7.00:00:00", + "retry": 0, + "retryIntervalInSeconds": 30, + "secureOutput": false, + "secureInput": false + }, + "typeProperties": { + "enableStaging": false + } + }, + { + "name": "Notebook1", + "type": "DatabricksNotebook", + "dependsOn": [ + { + "activity": "Copy Data1", + "dependencyConditions": [ + "Succeeded" + ] + } + ], + "policy": { + "timeout": "7.00:00:00", + "retry": 0, + "retryIntervalInSeconds": 30, + "secureOutput": false, + "secureInput": false + }, + "typeProperties": { + "notebookPath": "/european_soccer_events/european_soccer_events_01_etl" + }, + "linkedServiceName": { + "referenceName": "databrickswest", + "type": "LinkedServiceReference" + } + }, + { + "name": "soccerETL", + "type": "ExecuteDataFlow", + "dependsOn": [ + { + "activity": "Notebook1", + "dependencyConditions": [ + "Succeeded" + ] + } + ], + "policy": { + "timeout": "7.00:00:00", + "retry": 0, + "retryIntervalInSeconds": 30, + "secureOutput": false, + "secureInput": false + }, + "typeProperties": { + "dataflow": { + "referenceName": "soccerETL", + "type": "DataFlowReference" + }, + "compute": { + "computeType": "General", + "coreCount": 8 + } + } + } + ] + } +} \ No newline at end of file