Skip to content

Commit

Permalink
Fix: Migrate IRS 990 DAGs to new environment. (#848)
Browse files Browse the repository at this point in the history
* Fix: Migrate IRS 990 DAGs to new environment.

* Fix: convert to GKEStartPodOperator.
  • Loading branch information
nlarge-google authored Oct 30, 2024
1 parent 1b3f9ec commit d1dc743
Show file tree
Hide file tree
Showing 22 changed files with 327 additions and 505 deletions.
41 changes: 35 additions & 6 deletions datasets/irs_990/pipelines/irs_990_2014/irs_990_2014_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@


from airflow import DAG
from airflow.providers.cncf.kubernetes.operators import kubernetes_pod
from airflow.providers.google.cloud.operators import kubernetes_engine
from airflow.providers.google.cloud.transfers import gcs_to_bigquery

default_args = {
Expand All @@ -32,14 +32,33 @@
catchup=False,
default_view="graph",
) as dag:
create_cluster = kubernetes_engine.GKECreateClusterOperator(
task_id="create_cluster",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
body={
"name": "pdp-irs-990-2014",
"initial_node_count": 1,
"network": "{{ var.value.vpc_network }}",
"node_config": {
"machine_type": "e2-standard-16",
"oauth_scopes": [
"https://www.googleapis.com/auth/devstorage.read_write",
"https://www.googleapis.com/auth/cloud-platform",
],
},
},
)

# Run CSV transform within kubernetes pod
irs_990_transform_csv = kubernetes_pod.KubernetesPodOperator(
irs_990_transform_csv = kubernetes_engine.GKEStartPodOperator(
task_id="irs_990_transform_csv",
startup_timeout_seconds=600,
name="irs_990_2014",
service_account_name="datasets",
namespace="composer",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="pdp-irs-990-2014",
image_pull_policy="Always",
image="{{ var.json.irs_990.container_registry.run_csv_transform_kub }}",
env_vars={
Expand All @@ -52,7 +71,17 @@
"CSV_HEADERS": '["ein","tax_pd","subseccd","s501c3or4947a1cd","schdbind","politicalactvtscd","lbbyingactvtscd","subjto6033cd","dnradvisedfundscd","prptyintrcvdcd","maintwrkofartcd","crcounselingqstncd","hldassetsintermpermcd","rptlndbldgeqptcd","rptinvstothsecd","rptinvstprgrelcd","rptothasstcd","rptothliabcd","sepcnsldtfinstmtcd","sepindaudfinstmtcd","inclinfinstmtcd","operateschools170cd","frgnofficecd","frgnrevexpnscd","frgngrntscd","frgnaggragrntscd","rptprofndrsngfeescd","rptincfnndrsngcd","rptincgamingcd","operatehosptlcd","hospaudfinstmtcd","rptgrntstogovtcd","rptgrntstoindvcd","rptyestocompnstncd","txexmptbndcd","invstproceedscd","maintescrwaccntcd","actonbehalfcd","engageexcessbnftcd","awarexcessbnftcd","loantofficercd","grantoofficercd","dirbusnreltdcd","fmlybusnreltdcd","servasofficercd","recvnoncashcd","recvartcd","ceaseoperationscd","sellorexchcd","ownsepentcd","reltdorgcd","intincntrlcd","orgtrnsfrcd","conduct5percentcd","compltschocd","f1096cnt","fw2gcnt","wthldngrulescd","noemplyeesw3cnt","filerqrdrtnscd","unrelbusinccd","filedf990tcd","frgnacctcd","prohibtdtxshltrcd","prtynotifyorgcd","filedf8886tcd","solicitcntrbcd","exprstmntcd","providegoodscd","notfydnrvalcd","filedf8282cd","f8282cnt","fndsrcvdcd","premiumspaidcd","filedf8899cd","filedf1098ccd","excbushldngscd","s4966distribcd","distribtodonorcd","initiationfees","grsrcptspublicuse","grsincmembers","grsincother","filedlieuf1041cd","txexmptint","qualhlthplncd","qualhlthreqmntn","qualhlthonhnd","rcvdpdtngcd","filedf720cd","totreprtabled","totcomprelatede","totestcompf","noindiv100kcnt","nocontractor100kcnt","totcntrbgfts","prgmservcode2acd","totrev2acola","prgmservcode2bcd","totrev2bcola","prgmservcode2ccd","totrev2ccola","prgmservcode2dcd","totrev2dcola","prgmservcode2ecd","totrev2ecola","totrev2fcola","totprgmrevnue","invstmntinc","txexmptbndsproceeds","royaltsinc","grsrntsreal","grsrntsprsnl","rntlexpnsreal","rntlexpnsprsnl","rntlincreal","rntlincprsnl","netrntlinc","grsalesecur","grsalesothr","cstbasisecur","cstbasisothr","gnlsecur","gnlsothr","netgnls","grsincfndrsng","lessdirfndrsng","netincfndrsng","grsincgaming","lessdirgaming","netincgaming","grsalesinvent","lesscstofgoods","netincsales","miscrev11acd","miscrevtota","miscrev11bcd","miscrevtot11b","miscrev11ccd","miscrevtot11c","miscrevtot11d","miscrevtot11e","totrevenue","grntstogovt","grnsttoindiv","grntstofrgngovt","benifitsmembrs","compnsatncurrofcr","compnsatnandothr","othrsalwages","pensionplancontrb","othremplyeebenef","payrolltx","feesforsrvcmgmt","legalfees","accntingfees","feesforsrvclobby","profndraising","feesforsrvcinvstmgmt","feesforsrvcothr","advrtpromo","officexpns","infotech","royaltsexpns","occupancy","travel","travelofpublicoffcl","converconventmtng","interestamt","pymtoaffiliates","deprcatndepletn","insurance","othrexpnsa","othrexpnsb","othrexpnsc","othrexpnsd","othrexpnse","othrexpnsf","totfuncexpns","nonintcashend","svngstempinvend","pldgegrntrcvblend","accntsrcvblend","currfrmrcvblend","rcvbldisqualend","notesloansrcvblend","invntriesalesend","prepaidexpnsend","lndbldgsequipend","invstmntsend","invstmntsothrend","invstmntsprgmend","intangibleassetsend","othrassetsend","totassetsend","accntspayableend","grntspayableend","deferedrevnuend","txexmptbndsend","escrwaccntliabend","paybletoffcrsend","secrdmrtgsend","unsecurednotesend","othrliabend","totliabend","unrstrctnetasstsend","temprstrctnetasstsend","permrstrctnetasstsend","capitalstktrstend","paidinsurplusend","retainedearnend","totnetassetend","totnetliabastend","nonpfrea","totnooforgscnt","totsupport","gftgrntsrcvd170","txrevnuelevied170","srvcsval170","pubsuppsubtot170","exceeds2pct170","pubsupplesspct170","samepubsuppsubtot170","grsinc170","netincunreltd170","othrinc170","totsupp170","grsrcptsrelated170","totgftgrntrcvd509","grsrcptsadmissn509","grsrcptsactivities509","txrevnuelevied509","srvcsval509","pubsuppsubtot509","rcvdfrmdisqualsub509","exceeds1pct509","subtotpub509","pubsupplesub509","samepubsuppsubtot509","grsinc509","unreltxincls511tx509","subtotsuppinc509","netincunrelatd509","othrinc509","totsupp509"]',
"RENAME_MAPPINGS": '{"elf": "elf","EIN": "ein","tax_prd": "tax_pd","subseccd": "subseccd","s50Yc3or4947aYcd": "s501c3or4947a1cd","schdbind": "schdbind","politicalactvtscd": "politicalactvtscd","lbbyingactvtscd": "lbbyingactvtscd","subjto6033cd": "subjto6033cd","dnradvisedfundscd": "dnradvisedfundscd","prptyintrcvdcd": "prptyintrcvdcd","maintwrkofartcd": "maintwrkofartcd","crcounselingqstncd": "crcounselingqstncd","hldassetsintermpermcd": "hldassetsintermpermcd","rptlndbldgeqptcd": "rptlndbldgeqptcd","rptinvstothsecd": "rptinvstothsecd","rptinvstprgrelcd": "rptinvstprgrelcd","rptothasstcd": "rptothasstcd","rptothliabcd": "rptothliabcd","sepcnsldtfinstmtcd": "sepcnsldtfinstmtcd","sepindaudfinstmtcd": "sepindaudfinstmtcd","inclinfinstmtcd": "inclinfinstmtcd","operateschoolsY70cd": "operateschools170cd","frgnofficecd": "frgnofficecd","frgnrevexpnscd": "frgnrevexpnscd","frgngrntscd": "frgngrntscd","frgnaggragrntscd": "frgnaggragrntscd","rptprofndrsngfeescd": "rptprofndrsngfeescd","rptincfnndrsngcd": "rptincfnndrsngcd","rptincgamingcd": "rptincgamingcd","operatehosptlcd": "operatehosptlcd","hospaudfinstmtcd": "hospaudfinstmtcd","rptgrntstogovtcd": "rptgrntstogovtcd","rptgrntstoindvcd": "rptgrntstoindvcd","rptyestocompnstncd": "rptyestocompnstncd","txexmptbndcd": "txexmptbndcd","invstproceedscd": "invstproceedscd","maintescrwaccntcd": "maintescrwaccntcd","actonbehalfcd": "actonbehalfcd","engageexcessbnftcd": "engageexcessbnftcd","awarexcessbnftcd": "awarexcessbnftcd","loantofficercd": "loantofficercd","grantoofficercd": "grantoofficercd","dirbusnreltdcd": "dirbusnreltdcd","fmlybusnreltdcd": "fmlybusnreltdcd","servasofficercd": "servasofficercd","recvnoncashcd": "recvnoncashcd","recvartcd": "recvartcd","ceaseoperationscd": "ceaseoperationscd","sellorexchcd": "sellorexchcd","ownsepentcd": "ownsepentcd","reltdorgcd": "reltdorgcd","intincntrlcd": "intincntrlcd","orgtrnsfrcd": "orgtrnsfrcd","conduct5percentcd": "conduct5percentcd","compltschocd": "compltschocd","f1096cnt": "f1096cnt","fw2gcnt": "fw2gcnt","wthldngrulescd": "wthldngrulescd","noemplyeesw3cnt": "noemplyeesw3cnt","filerqrdrtnscd": "filerqrdrtnscd","unrelbusinccd": "unrelbusinccd","filedf990tcd": "filedf990tcd","frgnacctcd": "frgnacctcd","prohibtdtxshltrcd": "prohibtdtxshltrcd","prtynotifyorgcd": "prtynotifyorgcd","filedf8886tcd": "filedf8886tcd","solicitcntrbcd": "solicitcntrbcd","exprstmntcd": "exprstmntcd","providegoodscd": "providegoodscd","notfydnrvalcd": "notfydnrvalcd","filedf8N8Ncd": "filedf8282cd","f8282cnt": "f8282cnt","fndsrcvdcd": "fndsrcvdcd","premiumspaidcd": "premiumspaidcd","filedf8899cd": "filedf8899cd","filedfY098ccd": "filedf1098ccd","excbushldngscd": "excbushldngscd","s4966distribcd": "s4966distribcd","distribtodonorcd": "distribtodonorcd","initiationfees": "initiationfees","grsrcptspublicuse": "grsrcptspublicuse","grsincmembers": "grsincmembers","grsincother": "grsincother","filedlieufY04Ycd": "filedlieuf1041cd","txexmptint": "txexmptint","qualhlthplncd": "qualhlthplncd","qualhlthreqmntn": "qualhlthreqmntn","qualhlthonhnd": "qualhlthonhnd","rcvdpdtngcd": "rcvdpdtngcd","filedf7N0cd": "filedf720cd","totreprtabled": "totreprtabled","totcomprelatede": "totcomprelatede","totestcompf": "totestcompf","noindiv100kcnt": "noindiv100kcnt","nocontractor100kcnt": "nocontractor100kcnt","totcntrbgfts": "totcntrbgfts","prgmservcode2acd": "prgmservcode2acd","totrev2acola": "totrev2acola","prgmservcode2bcd": "prgmservcode2bcd","totrev2bcola": "totrev2bcola","prgmservcode2ccd": "prgmservcode2ccd","totrev2ccola": "totrev2ccola","prgmservcode2dcd": "prgmservcode2dcd","totrev2dcola": "totrev2dcola","prgmservcode2ecd": "prgmservcode2ecd","totrev2ecola": "totrev2ecola","totrev2fcola": "totrev2fcola","totprgmrevnue": "totprgmrevnue","invstmntinc": "invstmntinc","txexmptbndsproceeds": "txexmptbndsproceeds","royaltsinc": "royaltsinc","grsrntsreal": "grsrntsreal","grsrntsprsnl": "grsrntsprsnl","rntlexpnsreal": "rntlexpnsreal","rntlexpnsprsnl": "rntlexpnsprsnl","rntlincreal": "rntlincreal","rntlincprsnl": "rntlincprsnl","netrntlinc": "netrntlinc","grsalesecur": "grsalesecur","grsalesothr": "grsalesothr","cstbasisecur": "cstbasisecur","cstbasisothr": "cstbasisothr","gnlsecur": "gnlsecur","gnlsothr": "gnlsothr","netgnls": "netgnls","grsincfndrsng": "grsincfndrsng","lessdirfndrsng": "lessdirfndrsng","netincfndrsng": "netincfndrsng","grsincgaming": "grsincgaming","lessdirgaming": "lessdirgaming","netincgaming": "netincgaming","grsalesinvent": "grsalesinvent","lesscstofgoods": "lesscstofgoods","netincsales": "netincsales","miscrev11acd": "miscrev11acd","miscrevtota": "miscrevtota","miscrev11bcd": "miscrev11bcd","miscrevtot11b": "miscrevtot11b","miscrev11ccd": "miscrev11ccd","miscrevtot11c": "miscrevtot11c","miscrevtot11d": "miscrevtot11d","miscrevtot11e": "miscrevtot11e","totrevenue": "totrevenue","grntstogovt": "grntstogovt","grnsttoindiv": "grnsttoindiv","grntstofrgngovt": "grntstofrgngovt","benifitsmembrs": "benifitsmembrs","compnsatncurrofcr": "compnsatncurrofcr","compnsatnandothr": "compnsatnandothr","othrsalwages": "othrsalwages","pensionplancontrb": "pensionplancontrb","othremplyeebenef": "othremplyeebenef","payrolltx": "payrolltx","feesforsrvcmgmt": "feesforsrvcmgmt","legalfees": "legalfees","accntingfees": "accntingfees","feesforsrvclobby": "feesforsrvclobby","profndraising": "profndraising","feesforsrvcinvstmgmt": "feesforsrvcinvstmgmt","feesforsrvcothr": "feesforsrvcothr","advrtpromo": "advrtpromo","officexpns": "officexpns","infotech": "infotech","royaltsexpns": "royaltsexpns","occupancy": "occupancy","travel": "travel","travelofpublicoffcl": "travelofpublicoffcl","converconventmtng": "converconventmtng","interestamt": "interestamt","pymtoaffiliates": "pymtoaffiliates","deprcatndepletn": "deprcatndepletn","insurance": "insurance","othrexpnsa": "othrexpnsa","othrexpnsb": "othrexpnsb","othrexpnsc": "othrexpnsc","othrexpnsd": "othrexpnsd","othrexpnse": "othrexpnse","othrexpnsf": "othrexpnsf","totfuncexpns": "totfuncexpns","nonintcashend": "nonintcashend","svngstempinvend": "svngstempinvend","pldgegrntrcvblend": "pldgegrntrcvblend","accntsrcvblend": "accntsrcvblend","currfrmrcvblend": "currfrmrcvblend","rcvbldisqualend": "rcvbldisqualend","notesloansrcvblend": "notesloansrcvblend","invntriesalesend": "invntriesalesend","prepaidexpnsend": "prepaidexpnsend","lndbldgsequipend": "lndbldgsequipend","invstmntsend": "invstmntsend","invstmntsothrend": "invstmntsothrend","invstmntsprgmend": "invstmntsprgmend","intangibleassetsend": "intangibleassetsend","othrassetsend": "othrassetsend","totassetsend": "totassetsend","accntspayableend": "accntspayableend","grntspayableend": "grntspayableend","deferedrevnuend": "deferedrevnuend","txexmptbndsend": "txexmptbndsend","escrwaccntliabend": "escrwaccntliabend","paybletoffcrsend": "paybletoffcrsend","secrdmrtgsend": "secrdmrtgsend","unsecurednotesend": "unsecurednotesend","othrliabend": "othrliabend","totliabend": "totliabend","unrstrctnetasstsend": "unrstrctnetasstsend","temprstrctnetasstsend": "temprstrctnetasstsend","permrstrctnetasstsend": "permrstrctnetasstsend","capitalstktrstend": "capitalstktrstend","paidinsurplusend": "paidinsurplusend","retainedearnend": "retainedearnend","totnetassetend": "totnetassetend","totnetliabastend": "totnetliabastend","nonpfrea": "nonpfrea","totnooforgscnt": "totnooforgscnt","totsupport": "totsupport","gftgrntsrcvd170": "gftgrntsrcvd170","txrevnuelevied170": "txrevnuelevied170","srvcsval170": "srvcsval170","pubsuppsubtot170": "pubsuppsubtot170","exceeds2pct170": "exceeds2pct170","pubsupplesspct170": "pubsupplesspct170","samepubsuppsubtot170": "samepubsuppsubtot170","grsinc170": "grsinc170","netincunreltd170": "netincunreltd170","othrinc170": "othrinc170","totsupp170": "totsupp170","grsrcptsrelated170": "grsrcptsrelated170","totgftgrntrcvd509": "totgftgrntrcvd509","grsrcptsadmissn509": "grsrcptsadmissn509","grsrcptsactivities509": "grsrcptsactivities509","txrevnuelevied509": "txrevnuelevied509","srvcsval509": "srvcsval509","pubsuppsubtot509": "pubsuppsubtot509","rcvdfrmdisqualsub509": "rcvdfrmdisqualsub509","exceeds1pct509": "exceeds1pct509","subtotpub509": "subtotpub509","pubsupplesub509": "pubsupplesub509","samepubsuppsubtot509": "samepubsuppsubtot509","grsinc509": "grsinc509","unreltxincls511tx509": "unreltxincls511tx509","subtotsuppinc509": "subtotsuppinc509","netincunrelatd509": "netincunrelatd509","othrinc509": "othrinc509","totsupp509": "totsupp509"}',
},
resources={"request_memory": "4G", "request_cpu": "1"},
container_resources={
"memory": {"request": "16Gi"},
"cpu": {"request": "1"},
"ephemeral-storage": {"request": "10Gi"},
},
)
delete_cluster = kubernetes_engine.GKEDeleteClusterOperator(
task_id="delete_cluster",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
name="pdp-irs-990-2014",
)

# Task to load CSV data to a BigQuery table
Expand Down Expand Up @@ -313,4 +342,4 @@
],
)

irs_990_transform_csv >> load_irs_990_to_bq
create_cluster >> irs_990_transform_csv >> delete_cluster >> load_irs_990_to_bq
Loading

0 comments on commit d1dc743

Please sign in to comment.