Skip to content

Commit

Permalink
Add in portal code
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasyu888 committed Sep 24, 2023
1 parent bcca034 commit 88773ca
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 41 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,12 @@ The RECOVER data is processed via AWS and is compressed to parquet datasets. Th
## GENIE (PoC)

The GENIE public releases are loaded into snowflake via this [script](admin/genie_elt.py). You must have a snowflake connection section, please copy and fill out this [template](.env_template) with your username and password.


## Portals data (PoC)

snowflake, synapseclient and dotenv must be installed as dependencies.

```
pip install pip install "snowflake-connector-python[pandas]" "synapseclient[pandas]" python-dotenv
```
19 changes: 19 additions & 0 deletions analytics/exploration.sql
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,22 @@ group by PASSED;
select *
from synapse_data_warehouse.synapse_raw.certifiedquiz
where USER_ID = 3324230;
SELECT *
FROM TABLE(SHOW TABLES IN SCHEMA sage.portal_raw);
SHOW TABLES IN SCHEMA sage.portal_raw;
select * from table(result_scan(last_query_id()));
SELECT *
FROM sage.information_schema.tables
WHERE TABLE_SCHEMA = 'PORTAL_RAW' AND
TABLE_NAME = 'NF';
MERGE INTO AD target_table USING AD_temp src
ON target_table.id = src.id
when matched then
update set target_table.id = src.id,target_table.name = src.name,target_table.study = src.study,target_table.dataType = src.dataType,target_table.assay = src.assay,target_table.organ = src.organ,target_table.tissue = src.tissue,target_table.species = src.species,target_table.sex = src.sex,target_table.consortium = src.consortium,target_table."grant" = src."grant",target_table.modelSystemName = src.modelSystemName,target_table.treatmentType = src.treatmentType,target_table.specimenID = src.specimenID,target_table.individualID = src.individualID,target_table.individualIdSource = src.individualIdSource,target_table.specimenIdSource = src.specimenIdSource,target_table.resourceType = src.resourceType,target_table.dataSubtype = src.dataSubtype,target_table.metadataType = src.metadataType,target_table.assayTarget = src.assayTarget,target_table.analysisType = src.analysisType,target_table.cellType = src.cellType,target_table.nucleicAcidSource = src.nucleicAcidSource,target_table.fileFormat = src.fileFormat,target_table."group" = src."group",target_table.isModelSystem = src.isModelSystem,target_table.isConsortiumAnalysis = src.isConsortiumAnalysis,target_table.isMultiSpecimen = src.isMultiSpecimen,target_table.createdOn = src.createdOn,target_table.createdBy = src.createdBy,target_table.parentId = src.parentId,target_table.currentVersion = src.currentVersion,target_table.benefactorId = src.benefactorId,target_table.projectId = src.projectId,target_table.modifiedOn = src.modifiedOn,target_table.modifiedBy = src.modifiedBy,target_table.dataFileHandleId = src.dataFileHandleId,target_table.metaboliteType = src.metaboliteType,target_table.chromosome = src.chromosome,target_table.modelSystemType = src.modelSystemType,target_table.libraryPrep = src.libraryPrep,target_table.dataFileSizeBytes = src.dataFileSizeBytes
when not matched then
insert
(id,name,study,dataType,assay,organ,tissue,species,sex,consortium,"grant",modelSystemName,treatmentType,specimenID,individualID,individualIdSource,specimenIdSource,resourceType,dataSubtype,metadataType,assayTarget,analysisType,cellType,nucleicAcidSource,fileFormat,"group",isModelSystem,isConsortiumAnalysis,isMultiSpecimen,createdOn,createdBy,parentId,currentVersion,benefactorId,projectId,modifiedOn,modifiedBy,dataFileHandleId,metaboliteType,chromosome,modelSystemType,libraryPrep,dataFileSizeBytes) values(src.id,src.name,src.study,src.dataType,src.assay,src.organ,src.tissue,src.species,src.sex,src.consortium,src."grant",src.modelSystemName,src.treatmentType,src.specimenID,src.individualID,src.individualIdSource,src.specimenIdSource,src.resourceType,src.dataSubtype,src.metadataType,src.assayTarget,src.analysisType,src.cellType,src.nucleicAcidSource,src.fileFormat,src."group",src.isModelSystem,src.isConsortiumAnalysis,src.isMultiSpecimen,src.createdOn,src.createdBy,src.parentId,src.currentVersion,src.benefactorId,src.projectId,src.modifiedOn,src.modifiedBy,src.dataFileHandleId,src.metaboliteType,src.chromosome,src.modelSystemType,src.libraryPrep,src.dataFileSizeBytes);
94 changes: 53 additions & 41 deletions elt/portal_elt.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from dotenv import dotenv_values
import pandas as pd
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas
import synapseclient
Expand All @@ -10,53 +11,64 @@

config = dotenv_values(".env")

# nf_portal = syn.tableQuery("select * from syn16858331")
# nf_portal_df = nf_portal.asDataFrame()
ctx = snowflake.connector.connect(
user=config['user'],
password=config['password'],
account=config['snowflake_account'],
database="sage",
schema="portal_raw",
role="SYSADMIN"
)

# ctx = snowflake.connector.connect(
# user=config['user'],
# password=config['password'],
# account=config['snowflake_account'],
# database="sage",
# schema="portal_raw",
# role="SYSADMIN"
# )

# cs = ctx.cursor()

# table = "NF"

# write_pandas(ctx, nf_portal_df, table, auto_create_table=True)

# query = f"select * from {table} limit 10;"

# cs.execute(query)
# opt = cs.fetch_pandas_all()

## AD
cs = ctx.cursor()

portals = {
"AD": "syn11346063"
"AD": "syn11346063",
"PSYCHENCODE": "syn20821313.16",
"NF": "syn16858331"
}
for portal_name, synapse_id in portals.items():
ad_portal = syn.tableQuery(f"select * from {synapse_id}")
ad_portal_df = ad_portal.asDataFrame()
ad_portal_df.reset_index(inplace=True, drop="index")

ctx = snowflake.connector.connect(
user=config['user'],
password=config['password'],
account=config['snowflake_account'],
database="sage",
schema="portal_raw",
role="SYSADMIN"
)

cs = ctx.cursor()

write_pandas(ctx, ad_portal_df, portal_name, auto_create_table=True)
portal = syn.tableQuery(f"select * from {synapse_id}")
portal_df = portal.asDataFrame()
portal_df.reset_index(inplace=True, drop="index")

find_table_query = f"""
SELECT *
FROM sage.information_schema.tables
WHERE TABLE_SCHEMA = 'PORTAL_RAW' AND
TABLE_NAME = '{portal_name}';
"""
cs.execute(find_table_query)
opt = cs.fetch_pandas_all()
# If the table is empty, auto create it, otherwise, truncake and overwrite
# The rationale for this is that some tables have "grant" and "group" as
# and those are reserved column headers.
auto_create_table = opt.empty
write_pandas(ctx, portal_df, portal_name, auto_create_table=auto_create_table, overwrite=True)

# if opt.empty:
# write_pandas(ctx, portal_df, portal_name, auto_create_table=True)
# else:
# # Create temporary table so we can upsert
# target_table = f"{portal_name}_TEMP"
# write_pandas(ctx, portal_df, target_table, auto_create_table=True, table_type="transient")
# # Upsert into non-temporary tables
# upset_set = [f'"{target_table}"."{col}" = "{portal_name}"."{col}"' for col in portal_df.columns]
# upset_set_str = ",".join(upset_set)
# col_str = ",".join(f'"{col}"' for col in portal_df.columns)
# src_value_str = ",".join([f'"{portal_name}"."{col}"' for col in portal_df.columns])
# merge_sql = f"""
# MERGE INTO {portal_name} USING {target_table}
# ON {portal_name}.id = {target_table}.id
# when matched then
# update set {upset_set_str}
# when not matched then
# insert
# ({col_str}) values({src_value_str})
# """
# print(merge_sql)
# cs.execute(merge_sql)

query = f"select * from {portal_name} limit 10;"

cs.execute(query)
opt = cs.fetch_pandas_all()

0 comments on commit 88773ca

Please sign in to comment.