Skip to content

Commit

Permalink
Merge pull request #1 from dataforgelabs/next
Browse files Browse the repository at this point in the history
Update from next
  • Loading branch information
dfjswanson authored Apr 23, 2024
2 parents 7229d80 + 39c8d24 commit cbabd2d
Show file tree
Hide file tree
Showing 11 changed files with 164 additions and 28 deletions.
5 changes: 3 additions & 2 deletions database/build.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/bin/bash
##please do not leave any /r characters in this file!!
##please do not leave any /r characters in this file!
#Runs in database directory
# Build Core
pgdeploy="$(pwd)/pg_deploy.sql"
echo $pgdeploy
Expand All @@ -8,4 +9,4 @@ cd schema
find -name "*.sql" -print0 | sort -k2 -t/ -n -z | xargs -r0n1 cat >> $pgdeploy
cd ../code
cat */*/*.sql */*/*/*.sql >> $pgdeploy


1 change: 1 addition & 0 deletions database/code/meta/imp-import/impc_update_test_results.sql
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ IF v_next::text = '[]' THEN
INSERT INTO log.actor_log (log_id, message, actor_path, severity, insert_datetime)
VALUES ( v_imp.log_id, 'Expressions validated, Import completed successfully. ','impc_execute', 'I', clock_timestamp());
UPDATE meta.import SET status_code = 'P' WHERE import_id = v_imp.import_id;
RETURN json_build_object('complete',true);
END IF;

RETURN json_build_object('next',v_next);
Expand Down
10 changes: 5 additions & 5 deletions database/code/meta/imp-import/impc_upsert_output.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ DECLARE
v_body jsonb;
BEGIN

IF in_o.body->>'table_name' IS NULL THEN
RETURN jsonb_build_object('error', 'table_name is undefined');
END IF;

v_body := in_o.body ||
jsonb_build_object('output_package_parameters',jsonb_build_object('table_name', COALESCE(in_o.body->>'table_name',in_o.name),
'table_schema', COALESCE(in_o.body->>'schema_name','')) ) ;

IF v_body->'error' IS NOT NULL THEN
RETURN v_body;
END IF;
'table_schema', in_o.body->>'schema_name') ) ;

IF v_id IS NULL THEN
INSERT INTO meta.output(output_type ,output_name ,active_flag ,created_userid ,create_datetime ,output_package_parameters ,retention_parameters ,output_description ,output_sub_type,
Expand Down
2 changes: 1 addition & 1 deletion database/code/meta/imp-import/impc_upsert_relations.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ BEGIN
SELECT j.relation_name, j.source_id, j.related_source_id, j.expression, j.source_cardinality, j.related_source_cardinality, j.expression_parsed,
COALESCE(j.active_flag, true) active_flag, j.description, COALESCE(j.primary_flag, true) primary_flag, el rel, rel_js->'error' error
FROM meta.import_object o
CROSS JOIN jsonb_array_elements(o.body) el
CROSS JOIN jsonb_array_elements(COALESCE(NULLIF(o.body,'null'),'[]'::jsonb)) el
CROSS JOIN meta.imp_decode_relation(el,in_imp.project_id) rel_js
CROSS JOIN jsonb_populate_record(null::meta.source_relation, rel_js) j
WHERE o.object_type = 'relations' AND o.import_id = in_imp.import_id
Expand Down
11 changes: 6 additions & 5 deletions database/code/meta/imp-import/impc_upsert_source.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ DECLARE
v_body jsonb;
BEGIN

v_hub_view_name := COALESCE(v_body->>'target_table_name',in_o.name);
v_body := in_o.body || jsonb_build_object('ingestion_parameters',jsonb_build_object('source_query', in_o.body->>'source_query'));

IF v_body->'error' IS NOT NULL THEN
RETURN v_body;
IF in_o.body->>'source_table' IS NULL THEN
RETURN jsonb_build_object('error', 'source_table is undefined');
END IF;

v_hub_view_name := COALESCE(in_o.body->>'target_table',in_o.name);
v_body := in_o.body || jsonb_build_object('ingestion_parameters',jsonb_build_object('source_query', in_o.body->>'source_query',
'source_table', in_o.body->>'source_table'));

IF in_o.id IS NULL THEN
INSERT INTO meta.source(source_name ,source_description ,active_flag , ingestion_parameters, create_datetime, created_userid, parsing_parameters ,cdc_refresh_parameters ,alert_parameters ,file_type ,refresh_type ,
connection_type ,initiation_type ,cost_parameters ,parser , hub_view_name , project_id)
Expand Down
108 changes: 108 additions & 0 deletions database/code/meta/svc-service/svc_generate_queries.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
CREATE OR REPLACE FUNCTION meta.svc_generate_queries(in_import_id int)
RETURNS json
LANGUAGE plpgsql
AS $function$

DECLARE
v_imp meta.import;
v_source_queries json;
v_output_queries json;
v_level int;
v_count int;
v_err json;
v_all_source_query text;
v_all_output_query text;
BEGIN
SELECT * INTO v_imp FROM meta.import WHERE import_id = in_import_id;

CREATE TEMP TABLE _sources ON COMMIT DROP AS
SELECT s.source_id, s.source_name, 0 as level , null::text file_name , null::text query
FROM meta.source s
WHERE s.project_id = v_imp.project_id AND
NOT EXISTS(SELECT 1 FROM meta.enrichment e JOIN meta.enrichment_parameter ep ON e.enrichment_id = ep.parent_enrichment_id
WHERE e.source_id = s.source_id AND ep.source_id <> s.source_id);


FOR v_level IN 1..20 LOOP
INSERT INTO _sources (source_id, source_name, level)
SELECT s.source_id, s.source_name, v_level
FROM meta.source s
WHERE s.project_id = v_imp.project_id AND
s.source_id NOT IN (SELECT source_id FROM _sources) AND
NOT EXISTS(SELECT 1
FROM meta.enrichment e
JOIN meta.enrichment_parameter ep ON e.enrichment_id = ep.parent_enrichment_id
JOIN meta.source_relation sr ON sr.source_relation_id = ANY(ep.source_relation_ids)
WHERE e.source_id = s.source_id
AND s.source_id NOT IN (sr.source_id, sr.related_source_id)
AND NOT EXISTS(SELECT 1 FROM _sources _s WHERE _s.source_id = sr.source_id)
AND NOT EXISTS(SELECT 1 FROM _sources _s WHERE _s.source_id = sr.related_source_id)
);
GET DIAGNOSTICS v_count = ROW_COUNT;
EXIT WHEN v_count = 0;
END LOOP;

SELECT json_agg(s.source_name)
INTO v_err
FROM meta.source s
WHERE s.project_id = v_imp.project_id
AND s.source_id NOT IN (SELECT source_id FROM _sources);

IF v_err IS NOT NULL THEN
RETURN json_build_object('error',format('Circular dependencies in sources %s. Please check rules to ensure that 2 sources do not have lookups pointing to each other',v_err));
END IF;

-- clean file names
UPDATE _sources s
SET file_name = lower(regexp_replace(left(s.source_name,245),'["<>:\/\\|?*]','_','g')),
query = meta.u_enr_query_generate_query(s.source_id);

WITH w AS (
SELECT s.source_id, ROW_NUMBER() OVER (PARTITION BY s.file_name ORDER BY s.source_id) rn
FROM _sources s
)
UPDATE _sources s
SET file_name = file_name || '_' || w.rn
FROM w
WHERE w.source_id = s.source_id AND w.rn > 1;

SELECT json_agg(json_build_object('file_name', file_name || '.sql', 'query', query ))
INTO v_source_queries
FROM _sources;

SELECT string_agg(query,E'\n\n' ORDER BY level)
INTO v_all_source_query
FROM _sources;

CREATE TEMP TABLE _outputs ON COMMIT DROP AS
SELECT o.output_id, o.output_name, 0 as level , null::text file_name , null::text query
FROM meta.output o WHERE o.project_id = v_imp.project_id;

-- clean file names
UPDATE _outputs o
SET file_name = lower(regexp_replace(left(o.output_name,245),'["<>:\/\\|?*]','_','g')),
query = meta.u_output_generate_query(o.output_id);

WITH w AS (
SELECT o.output_id, ROW_NUMBER() OVER (PARTITION BY o.file_name ORDER BY o.output_id) rn
FROM _outputs o
)
UPDATE _outputs o
SET file_name = file_name || '_' || w.rn
FROM w
WHERE w.output_id = o.output_id AND w.rn > 1;

SELECT json_agg(json_build_object('file_name', file_name || '.sql', 'query', query))
INTO v_output_queries
FROM _outputs;

SELECT string_agg(query,E'\n\n' ORDER BY level)
INTO v_all_output_query
FROM _outputs;

RETURN json_build_object('source', v_source_queries, 'output',v_output_queries, 'run', E'/*SOURCES*/\n' || v_all_source_query || E'\n/*OUTPUTs*/\n' || v_all_output_query);


END;

$function$;
2 changes: 1 addition & 1 deletion database/code/meta/svc-service/svc_import_load_object.sql
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ BEGIN
(in_import_id, in_path, v_object_type, in_body);
ELSE
INSERT INTO log.actor_log (log_id, message, actor_path, severity, insert_datetime)
SELECT i.log_id, format('Skipped unklnown object %s. Please check you project',in_path), 'svc_import_restart', 'W', now()
SELECT i.log_id, format('Skipped unknown object %s. Please check you project',in_path), 'svc_import_restart', 'W', now()
FROM meta.import i WHERE i.import_id = in_import_id;
END IF;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ CREATE OR REPLACE FUNCTION meta.u_enr_query_generate_distinct_many_join_query(in

RETURNS text
LANGUAGE plpgsql
COST 100

AS $function$
DECLARE
v_sql text := '';
Expand All @@ -24,7 +24,7 @@ FOR v_el IN
v_sql := CASE WHEN v_sql = '' THEN '' ELSE v_sql || E',\n' END;
-- DISTINCT query
v_sql := v_sql || v_el.alias || '_DIST AS (SELECT DISTINCT ' || array_to_string(v_el.many_join_list,',') ||
' FROM ' || CASE WHEN in_cte = 0 THEN 'input' ELSE 'cte' || (in_cte - 1) END || '),
' FROM ' || CASE WHEN in_cte = 0 THEN meta.u_get_source_table_name(in_source_id) ELSE 'cte' || (in_cte - 1) END || '),
';

-- Aggregate query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ FOR v_cte IN 0 .. v_cte_max LOOP
v_sql := v_sql || COALESCE((SELECT string_agg( e.expression ||
CASE WHEN e.expression = 'T.' || e.alias THEN '' ELSE ' ' || e.alias END,', ')
FROM elements e WHERE e.type IN ('raw','system') AND cte = 0), '');

RAISE DEBUG 'Added raw attributes';
ELSEIF v_cte = v_cte_max AND in_mode = 'input' THEN
DELETE FROM elements WHERE type = 'system' AND alias = 's_input_id';
END IF;
Expand All @@ -72,7 +72,7 @@ FOR v_cte IN 0 .. v_cte_max LOOP
CASE WHEN e.expression = 'T.' || e.alias THEN '' ELSE e.alias END ,', ')
FROM elements e WHERE e.type = 'enrichment' AND e.cte = v_cte),'');
-- add FROM clause
v_sql := v_sql || E'\nFROM ' || CASE WHEN v_cte = 0 THEN 'input' ELSE 'cte' || (v_cte - 1) END
v_sql := v_sql || E'\nFROM ' || CASE WHEN v_cte = 0 THEN meta.u_get_source_table_name(in_source_id) ELSE 'cte' || (v_cte - 1) END
|| ' T';
-- Add current CTE joins
v_sql := v_sql || COALESCE((SELECT string_agg( E'\nLEFT JOIN ' || CASE WHEN in_source_id = e.source_id AND v_cte > 0 THEN 'cte' || (v_cte - 1) -- self-join
Expand All @@ -91,12 +91,6 @@ FOR v_cte IN 0 .. v_cte_max LOOP

END LOOP;

/*
v_sql := v_sql || ')' || E'\nSELECT ';
v_sql := v_sql || COALESCE((SELECT string_agg('T.' || e.alias ,', ')
FROM elements e WHERE e.type IN ('raw','system','enrichment')),'');
v_sql := v_sql || E'\nFROM cte' || v_cte_max;
*/

RETURN v_sql;

Expand All @@ -113,8 +107,15 @@ CREATE OR REPLACE FUNCTION meta.u_enr_query_generate_query(in_source_id int)
LANGUAGE plpgsql
AS $function$

DECLARE
v_hub_table_name text = meta.u_get_hub_table_name(in_source_id);
v_sql text;
BEGIN
RETURN meta.u_enr_query_generate_query(in_source_id, 'input', 0, '{}'::int[]);
v_sql := 'DROP TABLE IF EXISTS ' || v_hub_table_name || E';
CREATE TABLE ' || v_hub_table_name || ' AS
' || meta.u_enr_query_generate_query(in_source_id, 'input', 0, '{}'::int[]) || ';
';
RETURN v_sql;
END;

$function$;
17 changes: 17 additions & 0 deletions database/code/meta/u-utility/u_get_source_table_name.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@

CREATE OR REPLACE FUNCTION meta.u_get_source_table_name(in_source_id int)
RETURNS text
LANGUAGE plpgsql
AS $function$

BEGIN

RETURN COALESCE(
(SELECT s.ingestion_parameters->>'source_table'
FROM meta.source s WHERE s.source_id = in_source_id),
format('Undefined source table for source `%s`', meta.u_get_source_name(in_source_id))
);

END;

$function$;
13 changes: 10 additions & 3 deletions database/code/meta/u-utility/u_output_generate_query.sql
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,19 @@ DECLARE
v_output_subtype text;
v_error text;
v_output_source_ids int[];
v_output_table text;
v_output_schema text;
v_output_full_table text;

BEGIN

SELECT o.output_type, o.output_id, o.output_sub_type
INTO v_output_type, v_output_id, v_output_subtype
SELECT o.output_type, o.output_id, o.output_sub_type, o.output_package_parameters->>'schema_name', output_package_parameters->>'table_name'
INTO v_output_type, v_output_id, v_output_subtype, v_output_schema, v_output_table
FROM meta.output o
WHERE o.output_id = in_output_id;

v_output_full_table := COALESCE(v_output_schema || '.', '') || v_output_table;

SELECT array_agg(os.output_source_id) INTO v_output_source_ids
FROM meta.output_source os WHERE os.output_id = in_output_id;

Expand Down Expand Up @@ -143,7 +148,9 @@ END IF;

END LOOP;

v_query := array_to_string(v_queries, ' UNION ALL ');
v_query := 'DROP TABLE IF EXISTS ' || v_output_full_table || ';
CREATE TABLE ' || v_output_full_table || ' AS
' || array_to_string(v_queries, ' UNION ALL ');
RETURN v_query;

EXCEPTION WHEN OTHERS THEN
Expand Down

0 comments on commit cbabd2d

Please sign in to comment.