Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating dataflow: Dedupe3 #4

Open
wants to merge 2 commits into
base: newbranch
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion dataflow/Dedupe3.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,20 @@
"script": "source(output(\n\t\tacctnum as string,\n\t\tfullname as string,\n\t\tphone as string,\n\t\tzip as string\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false) ~> sourceNames"
}
],
"sinks": [
{
"dataset": {
"referenceName": "Addresses_Blob",
"type": "DatasetReference"
},
"name": "sink1",
"script": "Copy sink(input(\n\t\tAddressID as string,\n\t\tAddress1 as string,\n\t\tAddress2 as string,\n\t\tCity as string,\n\t\tProp_4 as string,\n\t\tProp_5 as string,\n\t\tProp_6 as string,\n\t\tProp_7 as string,\n\t\tProp_8 as string\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false,\n\tpartitionBy('hash', 1),\n\trowUrlColumn:'undefined') ~> sink1"
}
],
"transformations": [
{
"name": "FuzzyMatch",
"script": "sourceNames derive(SoundexValue = soundex(fullname)) ~> FuzzyMatch"
"script": "DerivedColumn1 derive(SoundexValue = soundex(fullname)) ~> FuzzyMatch"
},
{
"name": "CreateRowHash",
Expand All @@ -41,6 +51,10 @@
{
"name": "FinalResults",
"script": "JoinCopyToOrig select(mapColumn(\n\t\tacctnum,\n\t\tfullname,\n\t\tphone,\n\t\tzip\n\t))~> FinalResults"
},
{
"name": "DerivedColumn1",
"script": "sourceNames derive(myfilename = ERROR_FUNCTION('')) ~> DerivedColumn1"
}
]
}
Expand Down
8 changes: 8 additions & 0 deletions dataflow/FactLoader2.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@
},
"name": "writeFactTable",
"script": "JoinAllColumns sink(allowSchemaDrift: true,\n\tvalidateSchema: false,\n\tformat: 'table',\n\tbatchSize:50,\n\tdeletable:false,\n\tinsertable:true,\n\tupdateable:false,\n\tupsertable:false,\n\trecreate:true,\n\tmapColumn(\n\t\tProjDate = OrigData@ProjDate,\n\t\temployeeID = OrigData@employeeID,\n\t\tTotalHours,\n\t\tTotalAmount,\n\t\tfacttimestamp,\n\t\tsurrogatekey,\n\t\tRegion,\n\t\tStatus,\n\t\tEmpFunction,\n\t\tLevel,\n\t\tRole,\n\t\tprocesstime\n\t),\n\tpartitionBy('roundRobin', 4)) ~> writeFactTable"
},
{
"dataset": {
"referenceName": "Addresses_Blob",
"type": "DatasetReference"
},
"name": "sink1",
"script": "OrigData sink(input(\n\t\tAddressID as string,\n\t\tAddress1 as string,\n\t\tAddress2 as string,\n\t\tCity as string,\n\t\tProp_4 as string,\n\t\tProp_5 as string,\n\t\tProp_6 as string,\n\t\tProp_7 as string,\n\t\tProp_8 as string\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false,\n\tpartitionBy('hash', 1),\n\tpartitionFileNames:['file.csv']) ~> sink1"
}
],
"transformations": [
Expand Down
73 changes: 39 additions & 34 deletions dataflow/dataflow_params1.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,46 @@
"folder": {
"name": "Params"
},
"sources": [
{
"dataset": {
"referenceName": "Product_Blob",
"type": "DatasetReference"
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "EmployeeFiles",
"type": "DatasetReference"
},
"name": "source1",
"script": "source(output(\n\t\tEmpID as string,\n\t\tRegion as string,\n\t\tStatus as string,\n\t\tFunction as string,\n\t\tLevel as string,\n\t\tRole as string,\n\t\tStartDate as string,\n\t\tEndDate as string\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false) ~> source1"
},
"name": "prodSource",
"script": "source(output(\n\t\tProductID as string,\n\t\tName as string,\n\t\tProductNumber as string,\n\t\tColor as string,\n\t\tStandardCost as decimal(10,0),\n\t\tListPrice as string,\n\t\tSize as string,\n\t\tWeight as string,\n\t\tProductCategoryID as string,\n\t\tProductModelID as string\n\t),\n\tallowSchemaDrift: false,\n\tvalidateSchema: false) ~> prodSource"
},
{
"dataset": {
"referenceName": "blob_params",
"type": "DatasetReference"
{
"dataset": {
"referenceName": "EmployeeFiles",
"type": "DatasetReference"
},
"name": "source2",
"script": "source(output(\n\t\tEmpID as string,\n\t\tRegion as string,\n\t\tStatus as string,\n\t\tFunction as string,\n\t\tLevel as string,\n\t\tRole as string,\n\t\tStartDate as string,\n\t\tEndDate as string\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false) ~> source2"
}
],
"sinks": [
{
"dataset": {
"referenceName": "Addresses_Blob",
"type": "DatasetReference"
},
"name": "sink1",
"script": "DerivedColumn1 sink(input(\n\t\tAddressID as string,\n\t\tAddress1 as string,\n\t\tAddress2 as string,\n\t\tCity as string,\n\t\tProp_4 as string,\n\t\tProp_5 as string,\n\t\tProp_6 as string,\n\t\tProp_7 as string,\n\t\tProp_8 as string\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false,\n\tpartitionBy('roundRobin', 20),\n\tfilePattern:'undefined') ~> sink1"
}
],
"transformations": [
{
"name": "Lookup1",
"script": "source1, source2 lookup(ERROR_FUNCTION('') == ERROR_FUNCTION(''),\n\tbroadcast: 'none')~> Lookup1"
},
"name": "paramsFile",
"script": "source(output(\n\t\tParamColor as string\n\t),\n\tallowSchemaDrift: false,\n\tvalidateSchema: false) ~> paramsFile"
}
],
"transformations": [
{
"name": "getParams",
"script": "prodSource, paramsFile lookup(Color == ParamColor,\n\tbroadcast: 'none')~> getParams"
},
{
"name": "FilterRows",
"script": "Select1 filter(Color == ParamColor) ~> FilterRows"
},
{
"name": "AggregateStandardCost",
"script": "FilterRows aggregate(groupBy(Color),\n\tAverageStandardCostByColor = '$'+toString(round(avg(StandardCost),2))) ~> AggregateStandardCost"
},
{
"name": "Select1",
"script": "getParams select(mapColumn(\n\t\tProductID,\n\t\tName,\n\t\tProductNumber,\n\t\tColor,\n\t\tStandardCost,\n\t\tListPrice,\n\t\tSize,\n\t\tWeight,\n\t\tProductCategoryID,\n\t\tProductModelID,\n\t\tParamColor\n\t))~> Select1"
}
]
{
"name": "DerivedColumn1",
"script": "Lookup1 derive(myfilename = source2@Role) ~> DerivedColumn1"
}
]
}
}
}
4 changes: 2 additions & 2 deletions dataflow/soccerETL.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"type": "DatasetReference"
},
"name": "specifySchemaExtracts",
"script": "source(output(\n\t\tid_odsp as string,\n\t\tid_event as integer,\n\t\tsort_order as short,\n\t\ttime as short,\n\t\ttext as string,\n\t\tevent_type as integer,\n\t\tevent_type2 as integer,\n\t\tside as short,\n\t\tevent_team as string,\n\t\topponent as string,\n\t\tplayer as string,\n\t\tplayer2 as string,\n\t\tplayer_in as string,\n\t\tplayer_out as string,\n\t\tshot_place as string,\n\t\tshot_outcome as string,\n\t\tis_goal as boolean,\n\t\tlocation as string,\n\t\tbodypart as string,\n\t\tassist_method as short,\n\t\tsituation as string,\n\t\tfast_break as boolean\n\t),\n\tallowSchemaDrift: false,\n\tvalidateSchema: false) ~> specifySchemaExtracts"
"script": "source(output(\n\t\tid_odsp as string,\n\t\tid_event as string,\n\t\tsort_order as short,\n\t\ttime as short,\n\t\ttext as string,\n\t\tevent_type as integer,\n\t\tevent_type2 as integer,\n\t\tside as short,\n\t\tevent_team as string,\n\t\topponent as string,\n\t\tplayer as string,\n\t\tplayer2 as string,\n\t\tplayer_in as string,\n\t\tplayer_out as string,\n\t\tshot_place as string,\n\t\tshot_outcome as string,\n\t\tis_goal as boolean,\n\t\tlocation as string,\n\t\tbodypart as string,\n\t\tassist_method as short,\n\t\tsituation as string,\n\t\tfast_break as boolean\n\t),\n\tallowSchemaDrift: false,\n\tvalidateSchema: false) ~> specifySchemaExtracts"
},
{
"dataset": {
Expand All @@ -31,7 +31,7 @@
"type": "DatasetReference"
},
"name": "sink1",
"script": "timeBins sink(allowSchemaDrift: true,\n\tvalidateSchema: false,\n\tpartitionBy('hash', 1),\n\tpartitionFileNames:['']) ~> sink1"
"script": "timeBins sink(allowSchemaDrift: true,\n\tvalidateSchema: false,\n\tpartitionBy('hash', 1),\n\tpartitionFileNames:['soccerOut.csv']) ~> sink1"
}
],
"transformations": [
Expand Down
76 changes: 76 additions & 0 deletions pipeline/pipeline11.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
{
"name": "pipeline11",
"properties": {
"activities": [
{
"name": "Copy Data1",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"enableStaging": false
}
},
{
"name": "Notebook1",
"type": "DatabricksNotebook",
"dependsOn": [
{
"activity": "Copy Data1",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"notebookPath": "/european_soccer_events/european_soccer_events_01_etl"
},
"linkedServiceName": {
"referenceName": "databrickswest",
"type": "LinkedServiceReference"
}
},
{
"name": "soccerETL",
"type": "ExecuteDataFlow",
"dependsOn": [
{
"activity": "Notebook1",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"dataflow": {
"referenceName": "soccerETL",
"type": "DataFlowReference"
},
"compute": {
"computeType": "General",
"coreCount": 8
}
}
}
]
}
}