From 16de1cd78ab4cf6121e145bc336b86ec6615db0f Mon Sep 17 00:00:00 2001 From: Ekaterina Sokolova Date: Wed, 3 Jul 2024 12:40:32 +0300 Subject: [PATCH 1/4] Add progress bar during query execution to track progress. Progress bar is run-time SQL-query progress indicator. Function progress_bar(pid) extracts the current query state from backend with specified 'pid'. Then gets the numerical values of the actual rows and total rows and count progress for the whole query tree. Function returns numeric value from 0 to 1 describing the measure of query fulfillment. This function can be used to be embedded in the PostgreSQL GUI. To intuitively track progress without using a graphical client, you can use the additionally implemented function progress_bar_visual(pid, delay). It prints state every period specified by 'delay' (in seconds). --- README.md | 114 ++++++++++++++++ init.sql | 11 ++ pg_query_state.c | 264 ++++++++++++++++++++++++++++++++++++- t/test_bad_progress_bar.pl | 67 ++++++++++ tests/common.py | 48 +++++++ tests/pg_qs_test_runner.py | 1 + tests/test_cases.py | 15 +++ 7 files changed, 516 insertions(+), 4 deletions(-) create mode 100644 t/test_bad_progress_bar.pl diff --git a/README.md b/README.md index fba15cd..62fe990 100644 --- a/README.md +++ b/README.md @@ -321,3 +321,117 @@ Do not hesitate to post your issues, questions and new ideas at the [issues](htt ## Authors [Maksim Milyutin](https://github.com/maksm90) Alexey Kondratov Postgres Professional Ltd., Russia + +## Function progress\_bar +```plpgsql +progress_bar( + integer pid +) returns FLOAT +``` +extracts the current query state from backend with specified 'pid'. Then gets the numerical values of the actual rows and total rows and count progress for the whole query tree. Function returns numeric value from 0 to 1 describing the measure of query fulfillment. If there is no information about current state of the query, or the impossibility of counting, the corresponding messages will be displayed. + +## Function progress\_bar\_visual +```plpgsql +progress_bar( + integer pid, + integer delay +) returns VOID +``` +cyclically extracts and print the current query state in numeric value from backend with specified 'pid' every period specified by 'delay' in seconds. This is the looping version of the progress\_bar function that returns void value. + +**_Warning_**: Calling role have to be superuser or member of the role whose backend is being called. Otherwise function prints ERROR message `permission denied`. + +## Examples +Assume first backend executes some function: +```sql +postgres=# insert into table_name select generate_series(1,10000000); +``` +Other backend can get the follow output: +```sql +postgres=# SELECT pid FROM pg_stat_activity where query like 'insert%'; + pid +------- + 23877 +(1 row) + +postgres=# SELECT progress_bar(23877); + progress_bar +-------------- + 0.6087927 +(1 row) +``` +Or continuous version: +```sql +postgres=# SELECT progress_bar_visual(23877, 1); +Progress = 0.043510 +Progress = 0.085242 +Progress = 0.124921 +Progress = 0.168168 +Progress = 0.213803 +Progress = 0.250362 +Progress = 0.292632 +Progress = 0.331454 +Progress = 0.367509 +Progress = 0.407450 +Progress = 0.448646 +Progress = 0.488171 +Progress = 0.530559 +Progress = 0.565558 +Progress = 0.608039 +Progress = 0.645778 +Progress = 0.654842 +Progress = 0.699006 +Progress = 0.735760 +Progress = 0.787641 +Progress = 0.832160 +Progress = 0.871077 +Progress = 0.911858 +Progress = 0.956362 +Progress = 0.995097 +Progress = 1.000000 + progress_bar_visual +--------------------- + 1 +(1 row) +``` +Also uncountable queries exist. Assume first backend executes some function: +```sql +DELETE from table_name; +``` +Other backend can get the follow output: +```sql +postgres=# SELECT pid FROM pg_stat_activity where query like 'delete%'; + pid +------- + 23877 +(1 row) + +postgres=# SELECT progress_bar(23877); +INFO: Counting Progress doesn't available + progress_bar +-------------- + -1 +(1 row) + +postgres=# SELECT progress_bar_visual(23877, 5); +INFO: Counting Progress doesn't available + progress_bar_visual +--------------------- + -1 +(1 row) +``` + +## Reinstallation +If you already have a module 'pg_query_state' without progress bar functions installed, execute this in the module's directory: +``` +make install USE_PGXS=1 +``` +It is essential to restart the PostgreSQL instance. After that, execute the following queries in psql: +```sql +DROP EXTENSION IF EXISTS pg_query_state; +CREATE EXTENSION pg_query_state; +``` + +## Authors +Ekaterina Sokolova Postgres Professional Ltd., Russia +Vyacheslav Makarov Postgres Professional Ltd., Russia diff --git a/init.sql b/init.sql index 3a9bb61..5675f1d 100644 --- a/init.sql +++ b/init.sql @@ -15,3 +15,14 @@ CREATE FUNCTION pg_query_state(pid integer , leader_pid integer) AS 'MODULE_PATHNAME' LANGUAGE C STRICT VOLATILE; + +CREATE FUNCTION progress_bar(pid integer) + RETURNS FLOAT + AS 'MODULE_PATHNAME' + LANGUAGE C STRICT VOLATILE; + +CREATE FUNCTION progress_bar_visual(pid integer + , delay integer = 1) + RETURNS FLOAT + AS 'MODULE_PATHNAME', 'progress_bar' + LANGUAGE C STRICT VOLATILE; diff --git a/pg_query_state.c b/pg_query_state.c index 1949643..1571d03 100644 --- a/pg_query_state.c +++ b/pg_query_state.c @@ -63,11 +63,11 @@ static shm_mq_result receive_msg_by_parts(shm_mq_handle *mqh, Size *total, void **datap, int64 timeout, int *rc, bool nowait); /* Global variables */ -List *QueryDescStack = NIL; +List *QueryDescStack = NIL; static ProcSignalReason UserIdPollReason = INVALID_PROCSIGNAL; static ProcSignalReason QueryStatePollReason = INVALID_PROCSIGNAL; static ProcSignalReason WorkerPollReason = INVALID_PROCSIGNAL; -static bool module_initialized = false; +static bool module_initialized = false; static const char *be_state_str[] = { /* BackendState -> string repr */ "undefined", /* STATE_UNDEFINED */ "idle", /* STATE_IDLE */ @@ -103,8 +103,8 @@ static List *GetRemoteBackendQueryStates(PGPROC *leader, /* Shared memory variables */ shm_toc *toc = NULL; RemoteUserIdResult *counterpart_userid = NULL; -pg_qs_params *params = NULL; -shm_mq *mq = NULL; +pg_qs_params *params = NULL; +shm_mq *mq = NULL; /* * Estimate amount of shared memory needed. @@ -1247,3 +1247,259 @@ DetachPeer(void) ereport(LOG, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("pg_query_state peer is not responding"))); } + +/* + * Count progress of query execution like ratio of + * number of received to planned rows in persent. + * Changes of this function can lead to more plausible results. + */ +static double +CountProgress(char *plan_text) +{ + char *plan; /* Copy of plan_text */ + char *node; /* Part of plan with information about single node */ + char *rows; /* Pointer to rows */ + char *actual_rows_str; /* Actual rows in string format */ + char *plan_rows_str; /* Planned rows in string format */ + int len; /* Length of rows in string format */ + double actual_rows; /* Actual rows */ + double plan_rows; /* Planned rows */ + double progress = 0; /* Summary progress on nodes */ + int node_amount = 0; /* Amount of plantree nodes using in counting progress */ + + plan = palloc(sizeof(char) * (strlen(plan_text) + 1)); + strcpy(plan, plan_text); + node = strtok(plan, "["); /* Get information about upper node */ + while (node != NULL) + { + if (strstr(node, "Seq Scan") == NULL) + { + if (strstr(node, "ModifyTable") == NULL) + { + if (strstr(node, "Result") == NULL) + { + if ((rows = strstr(node, "Rows Removed by Filter")) != NULL) + { + node_amount++; + rows = (char *) (rows + strlen("Rows Removed by Filter\": ") * sizeof(char)); + + /* + * Filter node have 2 conditions: + * 1) Was not filtered (current progress = 0) + * 2) Was filtered (current progress = 1) + */ + if (rows[0] != '0') + progress += 1; + } + else if ((rows = strstr(node, "\"Actual Rows\": ")) != NULL) + { + node_amount++; + rows = (char *) (rows + strlen("\"Actual Rows\": ") * sizeof(char)); + len = strstr(rows, "\n") - rows; + if ((strstr(rows, ",") - rows) < len) + len = strstr(rows, ",") - rows; + actual_rows_str = palloc(sizeof(char) * (len + 1)); + actual_rows_str[len] = 0; + strncpy(actual_rows_str, rows, len); + actual_rows = strtod(actual_rows_str, NULL); + pfree(actual_rows_str); + + rows = strstr(node, "\"Plan Rows\": "); + rows = (char *) (rows + strlen("\"Plan Rows\": ") * sizeof(char)); + len = strstr(rows, ",") - rows; + plan_rows_str = palloc(sizeof(char) * (len + 1)); + plan_rows_str[len] = 0; + strncpy(plan_rows_str, rows, len); + plan_rows = strtod(plan_rows_str, NULL); + pfree(plan_rows_str); + + if (plan_rows > actual_rows) + progress += actual_rows / plan_rows; + else + progress += 1; + } + } + } + } + node = strtok(NULL, "["); + } + + pfree(plan); + if (node_amount > 0) + { + progress = progress / node_amount; + if (progress == 1) + progress = 0.999999; + } + else + return -1; + return progress; +} + +static double +GetCurrentNumericState(shm_mq_msg *msg) +{ + typedef struct + { + PGPROC *proc; + ListCell *frame_cursor; + int frame_index; + List *stack; + } proc_state; + + /* multicall context type */ + typedef struct + { + ListCell *proc_cursor; + List *procs; + } pg_qs_fctx; + + pg_qs_fctx *fctx; + List *qs_stack; + proc_state *p_state; + stack_frame *frame; + char *plan_text; + + fctx = (pg_qs_fctx *) palloc(sizeof(pg_qs_fctx)); + fctx->procs = NIL; + p_state = (proc_state *) palloc(sizeof(proc_state)); + qs_stack = deserialize_stack(msg->stack, msg->stack_depth); + p_state->proc = msg->proc; + p_state->stack = qs_stack; + p_state->frame_index = 0; + p_state->frame_cursor = list_head(qs_stack); + fctx->procs = lappend(fctx->procs, p_state); + fctx->proc_cursor = list_head(fctx->procs); + frame = (stack_frame *) lfirst(p_state->frame_cursor); + plan_text = frame->plan->vl_dat; + return CountProgress(plan_text); +} + +PG_FUNCTION_INFO_V1(progress_bar); +Datum +progress_bar(PG_FUNCTION_ARGS) +{ + pid_t pid = PG_GETARG_INT32(0); + int delay = 0; + PGPROC *proc; + Oid counterpart_user_id; + shm_mq_msg *msg; + List *bg_worker_procs = NIL; + List *msgs; + double progress; + double old_progress; + + if (PG_NARGS() == 2) + { + /* + * This is continuous mode, function 'progress_bar_visual', + * we need to get delay value. + */ + delay = PG_GETARG_INT32(1); + if (delay < 1) + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("the value of \"delay\" must be positive integer"))); + } + + if (!module_initialized) + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("pg_query_state wasn't initialized yet"))); + + if (pid == MyProcPid) + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("attempt to extract state of current process"))); + + proc = BackendPidGetProc(pid); + if (!proc || +#if PG_VERSION_NUM >= 170000 + proc->vxid.procNumber == INVALID_PROC_NUMBER || +#else + proc->backendId == InvalidBackendId || +#endif + proc->databaseId == InvalidOid || + proc->roleId == InvalidOid) + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("backend with pid=%d not found", pid))); + + counterpart_user_id = GetRemoteBackendUserId(proc); + if (!(superuser() || GetUserId() == counterpart_user_id)) + ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied"))); + + old_progress = 0; + progress = 0; + if (SRF_IS_FIRSTCALL()) + { + pg_atomic_write_u32(&counterpart_userid->n_peers, 1); + params->reqid = ++reqid; + } + + bg_worker_procs = GetRemoteBackendWorkers(proc); + msgs = GetRemoteBackendQueryStates(proc, + bg_worker_procs, + 0, 1, 0, 0, 0, + EXPLAIN_FORMAT_JSON); + if (list_length(msgs) == 0) + elog(WARNING, "backend does not reply"); + msg = (shm_mq_msg *) linitial(msgs); + + switch (msg->result_code) + { + case QUERY_NOT_RUNNING: + elog(INFO, "query not runing"); + PG_RETURN_FLOAT8((float8) -1); + break; + case STAT_DISABLED: + elog(INFO, "query execution statistics disabled"); + PG_RETURN_FLOAT8((float8) -1); + default: + break; + } + if (msg->result_code == QS_RETURNED && delay == 0) + { + progress = GetCurrentNumericState(msg); + if (progress < 0) + { + elog(INFO, "Counting Progress doesn't available"); + PG_RETURN_FLOAT8((float8) -1); + } + else + PG_RETURN_FLOAT8((float8) progress); + } + else if (msg->result_code == QS_RETURNED) + { + while (msg->result_code == QS_RETURNED) + { + progress = GetCurrentNumericState(msg); + if (progress > old_progress) + { + elog(INFO, "\rProgress = %f", progress); + old_progress = progress; + } + else if (progress < 0) + { + elog(INFO, "Counting Progress doesn't available"); + break; + } + + for (int i = 0; i < delay; i++) + { + pg_usleep(1000000); + CHECK_FOR_INTERRUPTS(); + } + + bg_worker_procs = GetRemoteBackendWorkers(proc); + msgs = GetRemoteBackendQueryStates(proc, + bg_worker_procs, + 0, 1, 0, 0, 0, + EXPLAIN_FORMAT_JSON); + if (list_length(msgs) == 0) + elog(WARNING, "backend does not reply"); + msg = (shm_mq_msg *) linitial(msgs); + } + if (progress > -1) + elog(INFO, "\rProgress = 1.000000"); + PG_RETURN_FLOAT8((float8) 1); + } + PG_RETURN_FLOAT8((float8) -1); +} diff --git a/t/test_bad_progress_bar.pl b/t/test_bad_progress_bar.pl new file mode 100644 index 0000000..e57954b --- /dev/null +++ b/t/test_bad_progress_bar.pl @@ -0,0 +1,67 @@ +# pg_query_state/t/test_bad_progress_bar.pl +# +# Check uncorrect launches of functions progress_bar(pid) +# and progress_bar_visual(pid, delay) + +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 2; + +# List of checks for bad cases: +# 1) appealing to a bad pid +# ------- requires DBI and DBD::Pg modules ------- +# 2) extracting the state of the process itself + +# Test whether we have both DBI and DBD::pg +my $dbdpg_rc = eval +{ + require DBI; + require DBD::Pg; + DBD::Pg->import(':async'); + 1; +}; + +# start backend for function progress_bar +my $node = PostgresNode->get_new_node('master'); +$node->init; +$node->start; +$node->append_conf('postgresql.conf', "shared_preload_libraries = 'pg_query_state'"); +$node->restart; +$node->psql('postgres', 'CREATE EXTENSION pg_query_state;'); + +subtest 'Extracting from bad pid' => sub { + my $stderr; + $node->psql('postgres', 'SELECT * from progress_bar(-1)', stderr => \$stderr); + is ($stderr, 'psql::1: ERROR: backend with pid=-1 not found', "appealing to a bad pid for progress_bar"); + $node->psql('postgres', 'SELECT * from progress_bar(-1)_visual', stderr => \$stderr); + is ($stderr, 'psql::1: ERROR: backend with pid=-1 not found', "appealing to a bad pid for progress_bar_visual"); +}; + +if ( not $dbdpg_rc) { + diag('DBI and DBD::Pg are not available, skip 2/3 tests'); +} + +SKIP: { + skip "DBI and DBD::Pg are not available", 2 if not $dbdpg_rc; + + my $dbh_status = DBI->connect('DBI:Pg:' . $node->connstr($_)); + if ( !defined $dbh_status ) + { + die "Cannot connect to database for dbh with progress_bar\n"; + } + + my $pid_status = $dbh_status->{pg_pid}; + + subtest 'Extracting your own status' => sub { + $dbh_status->do('SELECT * from progress_bar(' . $pid_status . ')'); + is($dbh_status->errstr, 'ERROR: attempt to extract state of current process', "extracting the state of the process itself for progress_bar"); + $dbh_status->do('SELECT * from progress_bar_visual(' . $pid_status . ')'); + is($dbh_status->errstr, 'ERROR: attempt to extract state of current process', "extracting the state of the process itself for progress_bar_visual"); + }; + + $dbh_status->disconnect; +} + +$node->stop('fast'); diff --git a/tests/common.py b/tests/common.py index 6dab69a..209c704 100644 --- a/tests/common.py +++ b/tests/common.py @@ -161,6 +161,54 @@ def onetime_query_state(config, async_conn, query, args={}, num_workers=0): set_guc(async_conn, 'enable_mergejoin', 'on') return result, notices +def progress_bar(config, pid): + conn = psycopg2.connect(**config) + curs = conn.cursor() + + curs.callproc('progress_bar', (pid,)) + result = curs.fetchall() + notices = conn.notices[:] + conn.close() + + return result, notices + +def onetime_progress_bar(config, async_conn, query, args={}, num_workers=0): + """ + Get intermediate state of 'query' on connection 'async_conn' after number of 'steps' + of node executions from start of query + """ + + acurs = async_conn.cursor() + + set_guc(async_conn, 'enable_mergejoin', 'off') + set_guc(async_conn, 'max_parallel_workers_per_gather', num_workers) + acurs.execute(query) + + # extract progress of current query + MAX_PG_QS_RETRIES = 10 + DELAY_BETWEEN_RETRIES = 0.1 + pg_qs_args = { + 'config': config, + 'pid': async_conn.get_backend_pid(), + } + for k, v in args.items(): + pg_qs_args[k] = v + n_retries = 0 + while True: + result, notices = progress_bar(**pg_qs_args) + n_retries += 1 + if len(result) > 0: + break + if n_retries >= MAX_PG_QS_RETRIES: + # pg_query_state callings don't return any result, more likely run + # query has completed + break + time.sleep(DELAY_BETWEEN_RETRIES) + wait(async_conn) + + set_guc(async_conn, 'enable_mergejoin', 'on') + return result, notices + def set_guc(async_conn, param, value): acurs = async_conn.cursor() acurs.execute('set %s to %s' % (param, value)) diff --git a/tests/pg_qs_test_runner.py b/tests/pg_qs_test_runner.py index f4088a9..4317ea1 100644 --- a/tests/pg_qs_test_runner.py +++ b/tests/pg_qs_test_runner.py @@ -68,6 +68,7 @@ class TeardownException(Exception): pass test_formats, test_timing_buffers_conflicts, test_insert_on_conflict, + test_progress_bar, ] def setup(con): diff --git a/tests/test_cases.py b/tests/test_cases.py index c866f58..edf5616 100644 --- a/tests/test_cases.py +++ b/tests/test_cases.py @@ -397,3 +397,18 @@ def test_timing_buffers_conflicts(config): and 'WARNING: buffers statistics disabled\n' in notices common.n_close((acon,)) + +def test_progress_bar(config): + """test progress_bar of simple query""" + + acon, = common.n_async_connect(config) + query = 'select * from foo join bar on foo.c1=bar.c1' + + qs, notices = common.onetime_progress_bar(config, acon, query) + assert qs[0][0] >= 0 and qs[0][0] < 1 + first_qs = qs[0][0] + + qs, _ = common.onetime_progress_bar(config, acon, query) + assert qs[0][0] >= first_qs and qs[0][0] < 1 + + common.n_close((acon,)) From 1e3e16306a92ceabb6f68313b08f3a7144d34356 Mon Sep 17 00:00:00 2001 From: Ekaterina Sokolova Date: Wed, 3 Jul 2024 15:31:24 +0300 Subject: [PATCH 2/4] Rename function by adding the prefix "pg_". --- README.md | 36 ++++++++++++++++++------------------ init.sql | 6 +++--- pg_query_state.c | 6 +++--- t/test_bad_progress_bar.pl | 24 ++++++++++++------------ tests/common.py | 2 +- tests/test_cases.py | 2 +- 6 files changed, 38 insertions(+), 38 deletions(-) diff --git a/README.md b/README.md index 62fe990..87e0eb3 100644 --- a/README.md +++ b/README.md @@ -324,7 +324,7 @@ Alexey Kondratov Postgres Professional Ltd., Russia ## Function progress\_bar ```plpgsql -progress_bar( +pg_progress_bar( integer pid ) returns FLOAT ``` @@ -332,7 +332,7 @@ extracts the current query state from backend with specified 'pid'. Then gets th ## Function progress\_bar\_visual ```plpgsql -progress_bar( +pg_progress_bar_visual( integer pid, integer delay ) returns VOID @@ -354,15 +354,15 @@ postgres=# SELECT pid FROM pg_stat_activity where query like 'insert%'; 23877 (1 row) -postgres=# SELECT progress_bar(23877); - progress_bar --------------- - 0.6087927 +postgres=# SELECT pg_progress_bar(23877); + pg_progress_bar +----------------- + 0.6087927 (1 row) ``` Or continuous version: ```sql -postgres=# SELECT progress_bar_visual(23877, 1); +postgres=# SELECT pg_progress_bar_visual(23877, 1); Progress = 0.043510 Progress = 0.085242 Progress = 0.124921 @@ -389,9 +389,9 @@ Progress = 0.911858 Progress = 0.956362 Progress = 0.995097 Progress = 1.000000 - progress_bar_visual ---------------------- - 1 + pg_progress_bar_visual +------------------------ + 1 (1 row) ``` Also uncountable queries exist. Assume first backend executes some function: @@ -406,18 +406,18 @@ postgres=# SELECT pid FROM pg_stat_activity where query like 'delete%'; 23877 (1 row) -postgres=# SELECT progress_bar(23877); +postgres=# SELECT pg_progress_bar(23877); INFO: Counting Progress doesn't available - progress_bar --------------- - -1 + pg_progress_bar +----------------- + -1 (1 row) -postgres=# SELECT progress_bar_visual(23877, 5); +postgres=# SELECT pg_progress_bar_visual(23877, 5); INFO: Counting Progress doesn't available - progress_bar_visual ---------------------- - -1 + pg_progress_bar_visual +------------------------ + -1 (1 row) ``` diff --git a/init.sql b/init.sql index 5675f1d..2dc7363 100644 --- a/init.sql +++ b/init.sql @@ -16,13 +16,13 @@ CREATE FUNCTION pg_query_state(pid integer AS 'MODULE_PATHNAME' LANGUAGE C STRICT VOLATILE; -CREATE FUNCTION progress_bar(pid integer) +CREATE FUNCTION pg_progress_bar(pid integer) RETURNS FLOAT AS 'MODULE_PATHNAME' LANGUAGE C STRICT VOLATILE; -CREATE FUNCTION progress_bar_visual(pid integer +CREATE FUNCTION pg_progress_bar_visual(pid integer , delay integer = 1) RETURNS FLOAT - AS 'MODULE_PATHNAME', 'progress_bar' + AS 'MODULE_PATHNAME', 'pg_progress_bar' LANGUAGE C STRICT VOLATILE; diff --git a/pg_query_state.c b/pg_query_state.c index 1571d03..9329f78 100644 --- a/pg_query_state.c +++ b/pg_query_state.c @@ -1375,9 +1375,9 @@ GetCurrentNumericState(shm_mq_msg *msg) return CountProgress(plan_text); } -PG_FUNCTION_INFO_V1(progress_bar); +PG_FUNCTION_INFO_V1(pg_progress_bar); Datum -progress_bar(PG_FUNCTION_ARGS) +pg_progress_bar(PG_FUNCTION_ARGS) { pid_t pid = PG_GETARG_INT32(0); int delay = 0; @@ -1392,7 +1392,7 @@ progress_bar(PG_FUNCTION_ARGS) if (PG_NARGS() == 2) { /* - * This is continuous mode, function 'progress_bar_visual', + * This is continuous mode, function 'pg_progress_bar_visual', * we need to get delay value. */ delay = PG_GETARG_INT32(1); diff --git a/t/test_bad_progress_bar.pl b/t/test_bad_progress_bar.pl index e57954b..2f3da4e 100644 --- a/t/test_bad_progress_bar.pl +++ b/t/test_bad_progress_bar.pl @@ -1,7 +1,7 @@ # pg_query_state/t/test_bad_progress_bar.pl # -# Check uncorrect launches of functions progress_bar(pid) -# and progress_bar_visual(pid, delay) +# Check uncorrect launches of functions pg_progress_bar(pid) +# and pg_progress_bar_visual(pid, delay) use strict; use warnings; @@ -23,7 +23,7 @@ 1; }; -# start backend for function progress_bar +# start backend for function pg_progress_bar my $node = PostgresNode->get_new_node('master'); $node->init; $node->start; @@ -33,10 +33,10 @@ subtest 'Extracting from bad pid' => sub { my $stderr; - $node->psql('postgres', 'SELECT * from progress_bar(-1)', stderr => \$stderr); - is ($stderr, 'psql::1: ERROR: backend with pid=-1 not found', "appealing to a bad pid for progress_bar"); - $node->psql('postgres', 'SELECT * from progress_bar(-1)_visual', stderr => \$stderr); - is ($stderr, 'psql::1: ERROR: backend with pid=-1 not found', "appealing to a bad pid for progress_bar_visual"); + $node->psql('postgres', 'SELECT * from pg_progress_bar(-1)', stderr => \$stderr); + is ($stderr, 'psql::1: ERROR: backend with pid=-1 not found', "appealing to a bad pid for pg_progress_bar"); + $node->psql('postgres', 'SELECT * from pg_progress_bar(-1)_visual', stderr => \$stderr); + is ($stderr, 'psql::1: ERROR: backend with pid=-1 not found', "appealing to a bad pid for pg_progress_bar_visual"); }; if ( not $dbdpg_rc) { @@ -49,16 +49,16 @@ my $dbh_status = DBI->connect('DBI:Pg:' . $node->connstr($_)); if ( !defined $dbh_status ) { - die "Cannot connect to database for dbh with progress_bar\n"; + die "Cannot connect to database for dbh with pg_progress_bar\n"; } my $pid_status = $dbh_status->{pg_pid}; subtest 'Extracting your own status' => sub { - $dbh_status->do('SELECT * from progress_bar(' . $pid_status . ')'); - is($dbh_status->errstr, 'ERROR: attempt to extract state of current process', "extracting the state of the process itself for progress_bar"); - $dbh_status->do('SELECT * from progress_bar_visual(' . $pid_status . ')'); - is($dbh_status->errstr, 'ERROR: attempt to extract state of current process', "extracting the state of the process itself for progress_bar_visual"); + $dbh_status->do('SELECT * from pg_progress_bar(' . $pid_status . ')'); + is($dbh_status->errstr, 'ERROR: attempt to extract state of current process', "extracting the state of the process itself for pg_progress_bar"); + $dbh_status->do('SELECT * from pg_progress_bar_visual(' . $pid_status . ')'); + is($dbh_status->errstr, 'ERROR: attempt to extract state of current process', "extracting the state of the process itself for pg_progress_bar_visual"); }; $dbh_status->disconnect; diff --git a/tests/common.py b/tests/common.py index 209c704..c69a8c7 100644 --- a/tests/common.py +++ b/tests/common.py @@ -165,7 +165,7 @@ def progress_bar(config, pid): conn = psycopg2.connect(**config) curs = conn.cursor() - curs.callproc('progress_bar', (pid,)) + curs.callproc('pg_progress_bar', (pid,)) result = curs.fetchall() notices = conn.notices[:] conn.close() diff --git a/tests/test_cases.py b/tests/test_cases.py index edf5616..5e47d64 100644 --- a/tests/test_cases.py +++ b/tests/test_cases.py @@ -399,7 +399,7 @@ def test_timing_buffers_conflicts(config): common.n_close((acon,)) def test_progress_bar(config): - """test progress_bar of simple query""" + """test pg_progress_bar of simple query""" acon, = common.n_async_connect(config) query = 'select * from foo join bar on foo.c1=bar.c1' From ac1a53b15a15b5d2a8ebb282cd235d4665819047 Mon Sep 17 00:00:00 2001 From: Ekaterina Sokolova Date: Thu, 31 Oct 2024 11:27:39 +0300 Subject: [PATCH 3/4] Version 1.2 --- Makefile | 5 +++-- pg_query_state--1.1--1.2.sql | 14 ++++++++++++++ pg_query_state.control | 2 +- 3 files changed, 18 insertions(+), 3 deletions(-) create mode 100644 pg_query_state--1.1--1.2.sql diff --git a/Makefile b/Makefile index 4468c51..bce7a41 100644 --- a/Makefile +++ b/Makefile @@ -4,8 +4,9 @@ MODULE_big = pg_query_state OBJS = pg_query_state.o signal_handler.o $(WIN32RES) EXTENSION = pg_query_state -EXTVERSION = 1.1 -DATA = pg_query_state--1.0--1.1.sql +EXTVERSION = 1.2 +DATA = pg_query_state--1.0--1.1.sql \ + pg_query_state--1.1--1.2.sql DATA_built = $(EXTENSION)--$(EXTVERSION).sql PGFILEDESC = "pg_query_state - facility to track progress of plan execution" diff --git a/pg_query_state--1.1--1.2.sql b/pg_query_state--1.1--1.2.sql new file mode 100644 index 0000000..594d5ff --- /dev/null +++ b/pg_query_state--1.1--1.2.sql @@ -0,0 +1,14 @@ +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "ALTER EXTENSION pg_query_state UPDATE TO '1.2'" to load this file. \quit + +CREATE FUNCTION pg_progress_bar(pid integer) + RETURNS FLOAT + AS 'MODULE_PATHNAME' + LANGUAGE C STRICT VOLATILE; + +CREATE FUNCTION pg_progress_bar_visual(pid integer + , delay integer = 1) + RETURNS FLOAT + AS 'MODULE_PATHNAME', 'pg_progress_bar' + LANGUAGE C STRICT VOLATILE; + \ No newline at end of file diff --git a/pg_query_state.control b/pg_query_state.control index fdf637e..9373bb0 100644 --- a/pg_query_state.control +++ b/pg_query_state.control @@ -1,5 +1,5 @@ # pg_query_state extension comment = 'tool for inspection query progress' -default_version = '1.1' +default_version = '1.2' module_pathname = '$libdir/pg_query_state' relocatable = true From 54130c8ef36b31a3eeddbd2be1efe898e6779f7c Mon Sep 17 00:00:00 2001 From: Arseny Kositsin Date: Wed, 20 Nov 2024 11:56:05 +0300 Subject: [PATCH 4/4] [PGPRO-8358] Added the progress value to the pg_query_state message The progress value is equal to the actual_rows ratio to plan_rows as a percentage. Or 100% if actual_rows > plan_rows. The progress value is added for each node in the current_loop. The progress value is added to the message taking into account the format (TEXT, XML, JSON, YAML) and provided that the parameter costs = true. Tags: pg_query_state --- pg_query_state.h | 8 ++ signal_handler.c | 203 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 211 insertions(+) diff --git a/pg_query_state.h b/pg_query_state.h index f632008..7de0c30 100644 --- a/pg_query_state.h +++ b/pg_query_state.h @@ -92,4 +92,12 @@ extern void DetachPeer(void); extern void UnlockShmem(LOCKTAG *tag); extern void LockShmem(LOCKTAG *tag, uint32 key); +/* + * The size of the buffer in which the progress message is generated. + * The length of this line is the maximum in XML format when the + * progress reaches 100%. If the size of this buffer becomes insufficient, + * it will need to be increased (e.g., when adding a new format). + */ +#define BUF_SIZE_PROGRESS 30 + #endif diff --git a/signal_handler.c b/signal_handler.c index dfe8780..85df8d7 100644 --- a/signal_handler.c +++ b/signal_handler.c @@ -37,6 +37,9 @@ typedef enum } msg_by_parts_result; static msg_by_parts_result send_msg_by_parts(shm_mq_handle *mqh, Size nbytes, const void *data); +static void append_string_info_by_ptr(StringInfo str, char *add_ptr, char *add_data, int add_data_len); +static void add_node_count_progress(ExplainState *es); +static int get_num_left_spaces(char *actual_rows, StringInfo str); /* * Get List of stack_frames as a stack of function calls starting from outermost call. @@ -91,6 +94,12 @@ runtime_explain() es->str->data[es->str->len - 1] = '}'; } + /* + * Adding progress to the node depending on the es->costs parameter + */ + if (es->costs == true) + add_node_count_progress(es); + qs_frame->plan = es->str->data; result = lcons(qs_frame, result); @@ -99,6 +108,200 @@ runtime_explain() return result; } +/* + * A structure with tags for parsing in different formats + */ +typedef struct +{ + char *node_tag; + char *plan_rows_tag; + char *actual_rows_tag; + char *current_loop_tag; + char *add_ptr_tag; + char *add_str_progress_tag; + char *add_str_prefix; +} parse_tags; + +/* + * An array of structures with tags for parsing the + * pg_query_state message in different formats + */ +static parse_tags parse_format[] = +{ + + /* TEXT format */ + { + .node_tag = "->", + .plan_rows_tag = "rows=", + .actual_rows_tag = "actual rows=", + .current_loop_tag = "Current loop:", + .add_ptr_tag = ")", + .add_str_progress_tag = "Progress: %d%%", + .add_str_prefix = "," + }, + + /* XML format */ + { + .node_tag = "", + .plan_rows_tag = "", + .actual_rows_tag = "", + .current_loop_tag = "", + .add_ptr_tag = ">", + .add_str_progress_tag = "%d%%format < 4); + + /* Get the value of the pointer depending on the format */ + form = &(parse_format[es->format]); + + /* Search for the node for which progress is being added */ + node = strstr(es->str->data, form->node_tag); + + /* Parsing the node */ + while (node != NULL) + { + if ((current_loop = strstr(node, form->current_loop_tag)) != NULL && + (plan_rows = strstr(node, form->plan_rows_tag)) != NULL && + (actual_rows = strstr(current_loop, form->actual_rows_tag)) != NULL) + { + /* Storing the pointer to the actual rows before the offset */ + actual_rows_old = actual_rows; + + /* Getting the value of plan rows */ + plan_rows = (char *) (plan_rows + strlen(form->plan_rows_tag) * sizeof(char)); + plan_rows_d = atof(plan_rows); + + /* Getting the value of actual rows */ + actual_rows = (char *) (actual_rows + strlen(form->actual_rows_tag) * sizeof(char)); + actual_rows_d = atof(actual_rows); + + /* It cannot be divided by 0 */ + if (plan_rows_d == 0) + return; + + /* Calculating the progress value */ + if (plan_rows_d > actual_rows_d) + progress = (actual_rows_d * 100) / plan_rows_d; + else + progress = 100; + + /* + * Search for a place to add progress, if there is no such place, + * then the progress is added to the end of the line + */ + if ((add_ptr = strstr(actual_rows, form->add_ptr_tag)) == NULL) + add_ptr = actual_rows + strlen(actual_rows); + + /* Checking that the maximum lenght row cannot overflow the buffer */ + Assert(strlen(parse_format[EXPLAIN_FORMAT_XML].add_str_progress_tag) + + 1 < BUF_SIZE_PROGRESS); + + /* Forming a line with the progress and add it */ + sprintf(add_str, form->add_str_progress_tag, progress); + len = strlen(add_str); + append_string_info_by_ptr(es->str, add_ptr, add_str, len); + + /* + * Adding an indent, that is important for JSON, YAML, and XML + * formats + */ + len = get_num_left_spaces(actual_rows_old, es->str); + append_string_info_by_ptr(es->str, add_ptr, actual_rows_old - len, len); + + /* Adding a prefix */ + len = strlen(form->add_str_prefix); + append_string_info_by_ptr(es->str, add_ptr, form->add_str_prefix, len); + } + + /* Moving on to the next node */ + node = strstr(node + 1, form->node_tag); + } +} + +/* + * The function returns the number of spaces + * to the left of the passed pointer + */ +static int +get_num_left_spaces(char *passed_ptr, StringInfo str) +{ + char *ptr = passed_ptr - 1; + + while ((*ptr == ' ' || *ptr == '\t') && ptr > str->data) + ptr--; + + return (passed_ptr - ptr - 1); +} + +/* + * The function adds a string to the + * StringInfo structure by pointer + */ +static void +append_string_info_by_ptr(StringInfo str, char *add_ptr, char *add_data, int add_data_len) +{ + /* + * Increasing the size of the StringInfo structure to add new information + * there + */ + enlargeStringInfo(str, add_data_len); + + /* Shifting the data in order to add a new line */ + memmove(add_ptr + add_data_len, add_ptr, strlen(add_ptr) + 1); + + /* Adding a line */ + memcpy(add_ptr, add_data, add_data_len); +} + /* * Compute length of serialized stack frame */