diff --git a/Makefile b/Makefile index 4468c51..bce7a41 100644 --- a/Makefile +++ b/Makefile @@ -4,8 +4,9 @@ MODULE_big = pg_query_state OBJS = pg_query_state.o signal_handler.o $(WIN32RES) EXTENSION = pg_query_state -EXTVERSION = 1.1 -DATA = pg_query_state--1.0--1.1.sql +EXTVERSION = 1.2 +DATA = pg_query_state--1.0--1.1.sql \ + pg_query_state--1.1--1.2.sql DATA_built = $(EXTENSION)--$(EXTVERSION).sql PGFILEDESC = "pg_query_state - facility to track progress of plan execution" diff --git a/README.md b/README.md index fba15cd..87e0eb3 100644 --- a/README.md +++ b/README.md @@ -321,3 +321,117 @@ Do not hesitate to post your issues, questions and new ideas at the [issues](htt ## Authors [Maksim Milyutin](https://github.com/maksm90) Alexey Kondratov Postgres Professional Ltd., Russia + +## Function progress\_bar +```plpgsql +pg_progress_bar( + integer pid +) returns FLOAT +``` +extracts the current query state from backend with specified 'pid'. Then gets the numerical values of the actual rows and total rows and count progress for the whole query tree. Function returns numeric value from 0 to 1 describing the measure of query fulfillment. If there is no information about current state of the query, or the impossibility of counting, the corresponding messages will be displayed. + +## Function progress\_bar\_visual +```plpgsql +pg_progress_bar_visual( + integer pid, + integer delay +) returns VOID +``` +cyclically extracts and print the current query state in numeric value from backend with specified 'pid' every period specified by 'delay' in seconds. This is the looping version of the progress\_bar function that returns void value. + +**_Warning_**: Calling role have to be superuser or member of the role whose backend is being called. Otherwise function prints ERROR message `permission denied`. + +## Examples +Assume first backend executes some function: +```sql +postgres=# insert into table_name select generate_series(1,10000000); +``` +Other backend can get the follow output: +```sql +postgres=# SELECT pid FROM pg_stat_activity where query like 'insert%'; + pid +------- + 23877 +(1 row) + +postgres=# SELECT pg_progress_bar(23877); + pg_progress_bar +----------------- + 0.6087927 +(1 row) +``` +Or continuous version: +```sql +postgres=# SELECT pg_progress_bar_visual(23877, 1); +Progress = 0.043510 +Progress = 0.085242 +Progress = 0.124921 +Progress = 0.168168 +Progress = 0.213803 +Progress = 0.250362 +Progress = 0.292632 +Progress = 0.331454 +Progress = 0.367509 +Progress = 0.407450 +Progress = 0.448646 +Progress = 0.488171 +Progress = 0.530559 +Progress = 0.565558 +Progress = 0.608039 +Progress = 0.645778 +Progress = 0.654842 +Progress = 0.699006 +Progress = 0.735760 +Progress = 0.787641 +Progress = 0.832160 +Progress = 0.871077 +Progress = 0.911858 +Progress = 0.956362 +Progress = 0.995097 +Progress = 1.000000 + pg_progress_bar_visual +------------------------ + 1 +(1 row) +``` +Also uncountable queries exist. Assume first backend executes some function: +```sql +DELETE from table_name; +``` +Other backend can get the follow output: +```sql +postgres=# SELECT pid FROM pg_stat_activity where query like 'delete%'; + pid +------- + 23877 +(1 row) + +postgres=# SELECT pg_progress_bar(23877); +INFO: Counting Progress doesn't available + pg_progress_bar +----------------- + -1 +(1 row) + +postgres=# SELECT pg_progress_bar_visual(23877, 5); +INFO: Counting Progress doesn't available + pg_progress_bar_visual +------------------------ + -1 +(1 row) +``` + +## Reinstallation +If you already have a module 'pg_query_state' without progress bar functions installed, execute this in the module's directory: +``` +make install USE_PGXS=1 +``` +It is essential to restart the PostgreSQL instance. After that, execute the following queries in psql: +```sql +DROP EXTENSION IF EXISTS pg_query_state; +CREATE EXTENSION pg_query_state; +``` + +## Authors +Ekaterina Sokolova Postgres Professional Ltd., Russia +Vyacheslav Makarov Postgres Professional Ltd., Russia diff --git a/init.sql b/init.sql index 3a9bb61..2dc7363 100644 --- a/init.sql +++ b/init.sql @@ -15,3 +15,14 @@ CREATE FUNCTION pg_query_state(pid integer , leader_pid integer) AS 'MODULE_PATHNAME' LANGUAGE C STRICT VOLATILE; + +CREATE FUNCTION pg_progress_bar(pid integer) + RETURNS FLOAT + AS 'MODULE_PATHNAME' + LANGUAGE C STRICT VOLATILE; + +CREATE FUNCTION pg_progress_bar_visual(pid integer + , delay integer = 1) + RETURNS FLOAT + AS 'MODULE_PATHNAME', 'pg_progress_bar' + LANGUAGE C STRICT VOLATILE; diff --git a/pg_query_state--1.1--1.2.sql b/pg_query_state--1.1--1.2.sql new file mode 100644 index 0000000..594d5ff --- /dev/null +++ b/pg_query_state--1.1--1.2.sql @@ -0,0 +1,14 @@ +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "ALTER EXTENSION pg_query_state UPDATE TO '1.2'" to load this file. \quit + +CREATE FUNCTION pg_progress_bar(pid integer) + RETURNS FLOAT + AS 'MODULE_PATHNAME' + LANGUAGE C STRICT VOLATILE; + +CREATE FUNCTION pg_progress_bar_visual(pid integer + , delay integer = 1) + RETURNS FLOAT + AS 'MODULE_PATHNAME', 'pg_progress_bar' + LANGUAGE C STRICT VOLATILE; + \ No newline at end of file diff --git a/pg_query_state.c b/pg_query_state.c index 1949643..9329f78 100644 --- a/pg_query_state.c +++ b/pg_query_state.c @@ -63,11 +63,11 @@ static shm_mq_result receive_msg_by_parts(shm_mq_handle *mqh, Size *total, void **datap, int64 timeout, int *rc, bool nowait); /* Global variables */ -List *QueryDescStack = NIL; +List *QueryDescStack = NIL; static ProcSignalReason UserIdPollReason = INVALID_PROCSIGNAL; static ProcSignalReason QueryStatePollReason = INVALID_PROCSIGNAL; static ProcSignalReason WorkerPollReason = INVALID_PROCSIGNAL; -static bool module_initialized = false; +static bool module_initialized = false; static const char *be_state_str[] = { /* BackendState -> string repr */ "undefined", /* STATE_UNDEFINED */ "idle", /* STATE_IDLE */ @@ -103,8 +103,8 @@ static List *GetRemoteBackendQueryStates(PGPROC *leader, /* Shared memory variables */ shm_toc *toc = NULL; RemoteUserIdResult *counterpart_userid = NULL; -pg_qs_params *params = NULL; -shm_mq *mq = NULL; +pg_qs_params *params = NULL; +shm_mq *mq = NULL; /* * Estimate amount of shared memory needed. @@ -1247,3 +1247,259 @@ DetachPeer(void) ereport(LOG, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("pg_query_state peer is not responding"))); } + +/* + * Count progress of query execution like ratio of + * number of received to planned rows in persent. + * Changes of this function can lead to more plausible results. + */ +static double +CountProgress(char *plan_text) +{ + char *plan; /* Copy of plan_text */ + char *node; /* Part of plan with information about single node */ + char *rows; /* Pointer to rows */ + char *actual_rows_str; /* Actual rows in string format */ + char *plan_rows_str; /* Planned rows in string format */ + int len; /* Length of rows in string format */ + double actual_rows; /* Actual rows */ + double plan_rows; /* Planned rows */ + double progress = 0; /* Summary progress on nodes */ + int node_amount = 0; /* Amount of plantree nodes using in counting progress */ + + plan = palloc(sizeof(char) * (strlen(plan_text) + 1)); + strcpy(plan, plan_text); + node = strtok(plan, "["); /* Get information about upper node */ + while (node != NULL) + { + if (strstr(node, "Seq Scan") == NULL) + { + if (strstr(node, "ModifyTable") == NULL) + { + if (strstr(node, "Result") == NULL) + { + if ((rows = strstr(node, "Rows Removed by Filter")) != NULL) + { + node_amount++; + rows = (char *) (rows + strlen("Rows Removed by Filter\": ") * sizeof(char)); + + /* + * Filter node have 2 conditions: + * 1) Was not filtered (current progress = 0) + * 2) Was filtered (current progress = 1) + */ + if (rows[0] != '0') + progress += 1; + } + else if ((rows = strstr(node, "\"Actual Rows\": ")) != NULL) + { + node_amount++; + rows = (char *) (rows + strlen("\"Actual Rows\": ") * sizeof(char)); + len = strstr(rows, "\n") - rows; + if ((strstr(rows, ",") - rows) < len) + len = strstr(rows, ",") - rows; + actual_rows_str = palloc(sizeof(char) * (len + 1)); + actual_rows_str[len] = 0; + strncpy(actual_rows_str, rows, len); + actual_rows = strtod(actual_rows_str, NULL); + pfree(actual_rows_str); + + rows = strstr(node, "\"Plan Rows\": "); + rows = (char *) (rows + strlen("\"Plan Rows\": ") * sizeof(char)); + len = strstr(rows, ",") - rows; + plan_rows_str = palloc(sizeof(char) * (len + 1)); + plan_rows_str[len] = 0; + strncpy(plan_rows_str, rows, len); + plan_rows = strtod(plan_rows_str, NULL); + pfree(plan_rows_str); + + if (plan_rows > actual_rows) + progress += actual_rows / plan_rows; + else + progress += 1; + } + } + } + } + node = strtok(NULL, "["); + } + + pfree(plan); + if (node_amount > 0) + { + progress = progress / node_amount; + if (progress == 1) + progress = 0.999999; + } + else + return -1; + return progress; +} + +static double +GetCurrentNumericState(shm_mq_msg *msg) +{ + typedef struct + { + PGPROC *proc; + ListCell *frame_cursor; + int frame_index; + List *stack; + } proc_state; + + /* multicall context type */ + typedef struct + { + ListCell *proc_cursor; + List *procs; + } pg_qs_fctx; + + pg_qs_fctx *fctx; + List *qs_stack; + proc_state *p_state; + stack_frame *frame; + char *plan_text; + + fctx = (pg_qs_fctx *) palloc(sizeof(pg_qs_fctx)); + fctx->procs = NIL; + p_state = (proc_state *) palloc(sizeof(proc_state)); + qs_stack = deserialize_stack(msg->stack, msg->stack_depth); + p_state->proc = msg->proc; + p_state->stack = qs_stack; + p_state->frame_index = 0; + p_state->frame_cursor = list_head(qs_stack); + fctx->procs = lappend(fctx->procs, p_state); + fctx->proc_cursor = list_head(fctx->procs); + frame = (stack_frame *) lfirst(p_state->frame_cursor); + plan_text = frame->plan->vl_dat; + return CountProgress(plan_text); +} + +PG_FUNCTION_INFO_V1(pg_progress_bar); +Datum +pg_progress_bar(PG_FUNCTION_ARGS) +{ + pid_t pid = PG_GETARG_INT32(0); + int delay = 0; + PGPROC *proc; + Oid counterpart_user_id; + shm_mq_msg *msg; + List *bg_worker_procs = NIL; + List *msgs; + double progress; + double old_progress; + + if (PG_NARGS() == 2) + { + /* + * This is continuous mode, function 'pg_progress_bar_visual', + * we need to get delay value. + */ + delay = PG_GETARG_INT32(1); + if (delay < 1) + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("the value of \"delay\" must be positive integer"))); + } + + if (!module_initialized) + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("pg_query_state wasn't initialized yet"))); + + if (pid == MyProcPid) + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("attempt to extract state of current process"))); + + proc = BackendPidGetProc(pid); + if (!proc || +#if PG_VERSION_NUM >= 170000 + proc->vxid.procNumber == INVALID_PROC_NUMBER || +#else + proc->backendId == InvalidBackendId || +#endif + proc->databaseId == InvalidOid || + proc->roleId == InvalidOid) + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("backend with pid=%d not found", pid))); + + counterpart_user_id = GetRemoteBackendUserId(proc); + if (!(superuser() || GetUserId() == counterpart_user_id)) + ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied"))); + + old_progress = 0; + progress = 0; + if (SRF_IS_FIRSTCALL()) + { + pg_atomic_write_u32(&counterpart_userid->n_peers, 1); + params->reqid = ++reqid; + } + + bg_worker_procs = GetRemoteBackendWorkers(proc); + msgs = GetRemoteBackendQueryStates(proc, + bg_worker_procs, + 0, 1, 0, 0, 0, + EXPLAIN_FORMAT_JSON); + if (list_length(msgs) == 0) + elog(WARNING, "backend does not reply"); + msg = (shm_mq_msg *) linitial(msgs); + + switch (msg->result_code) + { + case QUERY_NOT_RUNNING: + elog(INFO, "query not runing"); + PG_RETURN_FLOAT8((float8) -1); + break; + case STAT_DISABLED: + elog(INFO, "query execution statistics disabled"); + PG_RETURN_FLOAT8((float8) -1); + default: + break; + } + if (msg->result_code == QS_RETURNED && delay == 0) + { + progress = GetCurrentNumericState(msg); + if (progress < 0) + { + elog(INFO, "Counting Progress doesn't available"); + PG_RETURN_FLOAT8((float8) -1); + } + else + PG_RETURN_FLOAT8((float8) progress); + } + else if (msg->result_code == QS_RETURNED) + { + while (msg->result_code == QS_RETURNED) + { + progress = GetCurrentNumericState(msg); + if (progress > old_progress) + { + elog(INFO, "\rProgress = %f", progress); + old_progress = progress; + } + else if (progress < 0) + { + elog(INFO, "Counting Progress doesn't available"); + break; + } + + for (int i = 0; i < delay; i++) + { + pg_usleep(1000000); + CHECK_FOR_INTERRUPTS(); + } + + bg_worker_procs = GetRemoteBackendWorkers(proc); + msgs = GetRemoteBackendQueryStates(proc, + bg_worker_procs, + 0, 1, 0, 0, 0, + EXPLAIN_FORMAT_JSON); + if (list_length(msgs) == 0) + elog(WARNING, "backend does not reply"); + msg = (shm_mq_msg *) linitial(msgs); + } + if (progress > -1) + elog(INFO, "\rProgress = 1.000000"); + PG_RETURN_FLOAT8((float8) 1); + } + PG_RETURN_FLOAT8((float8) -1); +} diff --git a/pg_query_state.control b/pg_query_state.control index fdf637e..9373bb0 100644 --- a/pg_query_state.control +++ b/pg_query_state.control @@ -1,5 +1,5 @@ # pg_query_state extension comment = 'tool for inspection query progress' -default_version = '1.1' +default_version = '1.2' module_pathname = '$libdir/pg_query_state' relocatable = true diff --git a/pg_query_state.h b/pg_query_state.h index f632008..7de0c30 100644 --- a/pg_query_state.h +++ b/pg_query_state.h @@ -92,4 +92,12 @@ extern void DetachPeer(void); extern void UnlockShmem(LOCKTAG *tag); extern void LockShmem(LOCKTAG *tag, uint32 key); +/* + * The size of the buffer in which the progress message is generated. + * The length of this line is the maximum in XML format when the + * progress reaches 100%. If the size of this buffer becomes insufficient, + * it will need to be increased (e.g., when adding a new format). + */ +#define BUF_SIZE_PROGRESS 30 + #endif diff --git a/signal_handler.c b/signal_handler.c index dfe8780..85df8d7 100644 --- a/signal_handler.c +++ b/signal_handler.c @@ -37,6 +37,9 @@ typedef enum } msg_by_parts_result; static msg_by_parts_result send_msg_by_parts(shm_mq_handle *mqh, Size nbytes, const void *data); +static void append_string_info_by_ptr(StringInfo str, char *add_ptr, char *add_data, int add_data_len); +static void add_node_count_progress(ExplainState *es); +static int get_num_left_spaces(char *actual_rows, StringInfo str); /* * Get List of stack_frames as a stack of function calls starting from outermost call. @@ -91,6 +94,12 @@ runtime_explain() es->str->data[es->str->len - 1] = '}'; } + /* + * Adding progress to the node depending on the es->costs parameter + */ + if (es->costs == true) + add_node_count_progress(es); + qs_frame->plan = es->str->data; result = lcons(qs_frame, result); @@ -99,6 +108,200 @@ runtime_explain() return result; } +/* + * A structure with tags for parsing in different formats + */ +typedef struct +{ + char *node_tag; + char *plan_rows_tag; + char *actual_rows_tag; + char *current_loop_tag; + char *add_ptr_tag; + char *add_str_progress_tag; + char *add_str_prefix; +} parse_tags; + +/* + * An array of structures with tags for parsing the + * pg_query_state message in different formats + */ +static parse_tags parse_format[] = +{ + + /* TEXT format */ + { + .node_tag = "->", + .plan_rows_tag = "rows=", + .actual_rows_tag = "actual rows=", + .current_loop_tag = "Current loop:", + .add_ptr_tag = ")", + .add_str_progress_tag = "Progress: %d%%", + .add_str_prefix = "," + }, + + /* XML format */ + { + .node_tag = "", + .plan_rows_tag = "", + .actual_rows_tag = "", + .current_loop_tag = "", + .add_ptr_tag = ">", + .add_str_progress_tag = "%d%%format < 4); + + /* Get the value of the pointer depending on the format */ + form = &(parse_format[es->format]); + + /* Search for the node for which progress is being added */ + node = strstr(es->str->data, form->node_tag); + + /* Parsing the node */ + while (node != NULL) + { + if ((current_loop = strstr(node, form->current_loop_tag)) != NULL && + (plan_rows = strstr(node, form->plan_rows_tag)) != NULL && + (actual_rows = strstr(current_loop, form->actual_rows_tag)) != NULL) + { + /* Storing the pointer to the actual rows before the offset */ + actual_rows_old = actual_rows; + + /* Getting the value of plan rows */ + plan_rows = (char *) (plan_rows + strlen(form->plan_rows_tag) * sizeof(char)); + plan_rows_d = atof(plan_rows); + + /* Getting the value of actual rows */ + actual_rows = (char *) (actual_rows + strlen(form->actual_rows_tag) * sizeof(char)); + actual_rows_d = atof(actual_rows); + + /* It cannot be divided by 0 */ + if (plan_rows_d == 0) + return; + + /* Calculating the progress value */ + if (plan_rows_d > actual_rows_d) + progress = (actual_rows_d * 100) / plan_rows_d; + else + progress = 100; + + /* + * Search for a place to add progress, if there is no such place, + * then the progress is added to the end of the line + */ + if ((add_ptr = strstr(actual_rows, form->add_ptr_tag)) == NULL) + add_ptr = actual_rows + strlen(actual_rows); + + /* Checking that the maximum lenght row cannot overflow the buffer */ + Assert(strlen(parse_format[EXPLAIN_FORMAT_XML].add_str_progress_tag) + + 1 < BUF_SIZE_PROGRESS); + + /* Forming a line with the progress and add it */ + sprintf(add_str, form->add_str_progress_tag, progress); + len = strlen(add_str); + append_string_info_by_ptr(es->str, add_ptr, add_str, len); + + /* + * Adding an indent, that is important for JSON, YAML, and XML + * formats + */ + len = get_num_left_spaces(actual_rows_old, es->str); + append_string_info_by_ptr(es->str, add_ptr, actual_rows_old - len, len); + + /* Adding a prefix */ + len = strlen(form->add_str_prefix); + append_string_info_by_ptr(es->str, add_ptr, form->add_str_prefix, len); + } + + /* Moving on to the next node */ + node = strstr(node + 1, form->node_tag); + } +} + +/* + * The function returns the number of spaces + * to the left of the passed pointer + */ +static int +get_num_left_spaces(char *passed_ptr, StringInfo str) +{ + char *ptr = passed_ptr - 1; + + while ((*ptr == ' ' || *ptr == '\t') && ptr > str->data) + ptr--; + + return (passed_ptr - ptr - 1); +} + +/* + * The function adds a string to the + * StringInfo structure by pointer + */ +static void +append_string_info_by_ptr(StringInfo str, char *add_ptr, char *add_data, int add_data_len) +{ + /* + * Increasing the size of the StringInfo structure to add new information + * there + */ + enlargeStringInfo(str, add_data_len); + + /* Shifting the data in order to add a new line */ + memmove(add_ptr + add_data_len, add_ptr, strlen(add_ptr) + 1); + + /* Adding a line */ + memcpy(add_ptr, add_data, add_data_len); +} + /* * Compute length of serialized stack frame */ diff --git a/t/test_bad_progress_bar.pl b/t/test_bad_progress_bar.pl new file mode 100644 index 0000000..2f3da4e --- /dev/null +++ b/t/test_bad_progress_bar.pl @@ -0,0 +1,67 @@ +# pg_query_state/t/test_bad_progress_bar.pl +# +# Check uncorrect launches of functions pg_progress_bar(pid) +# and pg_progress_bar_visual(pid, delay) + +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 2; + +# List of checks for bad cases: +# 1) appealing to a bad pid +# ------- requires DBI and DBD::Pg modules ------- +# 2) extracting the state of the process itself + +# Test whether we have both DBI and DBD::pg +my $dbdpg_rc = eval +{ + require DBI; + require DBD::Pg; + DBD::Pg->import(':async'); + 1; +}; + +# start backend for function pg_progress_bar +my $node = PostgresNode->get_new_node('master'); +$node->init; +$node->start; +$node->append_conf('postgresql.conf', "shared_preload_libraries = 'pg_query_state'"); +$node->restart; +$node->psql('postgres', 'CREATE EXTENSION pg_query_state;'); + +subtest 'Extracting from bad pid' => sub { + my $stderr; + $node->psql('postgres', 'SELECT * from pg_progress_bar(-1)', stderr => \$stderr); + is ($stderr, 'psql::1: ERROR: backend with pid=-1 not found', "appealing to a bad pid for pg_progress_bar"); + $node->psql('postgres', 'SELECT * from pg_progress_bar(-1)_visual', stderr => \$stderr); + is ($stderr, 'psql::1: ERROR: backend with pid=-1 not found', "appealing to a bad pid for pg_progress_bar_visual"); +}; + +if ( not $dbdpg_rc) { + diag('DBI and DBD::Pg are not available, skip 2/3 tests'); +} + +SKIP: { + skip "DBI and DBD::Pg are not available", 2 if not $dbdpg_rc; + + my $dbh_status = DBI->connect('DBI:Pg:' . $node->connstr($_)); + if ( !defined $dbh_status ) + { + die "Cannot connect to database for dbh with pg_progress_bar\n"; + } + + my $pid_status = $dbh_status->{pg_pid}; + + subtest 'Extracting your own status' => sub { + $dbh_status->do('SELECT * from pg_progress_bar(' . $pid_status . ')'); + is($dbh_status->errstr, 'ERROR: attempt to extract state of current process', "extracting the state of the process itself for pg_progress_bar"); + $dbh_status->do('SELECT * from pg_progress_bar_visual(' . $pid_status . ')'); + is($dbh_status->errstr, 'ERROR: attempt to extract state of current process', "extracting the state of the process itself for pg_progress_bar_visual"); + }; + + $dbh_status->disconnect; +} + +$node->stop('fast'); diff --git a/tests/common.py b/tests/common.py index 6dab69a..c69a8c7 100644 --- a/tests/common.py +++ b/tests/common.py @@ -161,6 +161,54 @@ def onetime_query_state(config, async_conn, query, args={}, num_workers=0): set_guc(async_conn, 'enable_mergejoin', 'on') return result, notices +def progress_bar(config, pid): + conn = psycopg2.connect(**config) + curs = conn.cursor() + + curs.callproc('pg_progress_bar', (pid,)) + result = curs.fetchall() + notices = conn.notices[:] + conn.close() + + return result, notices + +def onetime_progress_bar(config, async_conn, query, args={}, num_workers=0): + """ + Get intermediate state of 'query' on connection 'async_conn' after number of 'steps' + of node executions from start of query + """ + + acurs = async_conn.cursor() + + set_guc(async_conn, 'enable_mergejoin', 'off') + set_guc(async_conn, 'max_parallel_workers_per_gather', num_workers) + acurs.execute(query) + + # extract progress of current query + MAX_PG_QS_RETRIES = 10 + DELAY_BETWEEN_RETRIES = 0.1 + pg_qs_args = { + 'config': config, + 'pid': async_conn.get_backend_pid(), + } + for k, v in args.items(): + pg_qs_args[k] = v + n_retries = 0 + while True: + result, notices = progress_bar(**pg_qs_args) + n_retries += 1 + if len(result) > 0: + break + if n_retries >= MAX_PG_QS_RETRIES: + # pg_query_state callings don't return any result, more likely run + # query has completed + break + time.sleep(DELAY_BETWEEN_RETRIES) + wait(async_conn) + + set_guc(async_conn, 'enable_mergejoin', 'on') + return result, notices + def set_guc(async_conn, param, value): acurs = async_conn.cursor() acurs.execute('set %s to %s' % (param, value)) diff --git a/tests/pg_qs_test_runner.py b/tests/pg_qs_test_runner.py index f4088a9..4317ea1 100644 --- a/tests/pg_qs_test_runner.py +++ b/tests/pg_qs_test_runner.py @@ -68,6 +68,7 @@ class TeardownException(Exception): pass test_formats, test_timing_buffers_conflicts, test_insert_on_conflict, + test_progress_bar, ] def setup(con): diff --git a/tests/test_cases.py b/tests/test_cases.py index c866f58..5e47d64 100644 --- a/tests/test_cases.py +++ b/tests/test_cases.py @@ -397,3 +397,18 @@ def test_timing_buffers_conflicts(config): and 'WARNING: buffers statistics disabled\n' in notices common.n_close((acon,)) + +def test_progress_bar(config): + """test pg_progress_bar of simple query""" + + acon, = common.n_async_connect(config) + query = 'select * from foo join bar on foo.c1=bar.c1' + + qs, notices = common.onetime_progress_bar(config, acon, query) + assert qs[0][0] >= 0 and qs[0][0] < 1 + first_qs = qs[0][0] + + qs, _ = common.onetime_progress_bar(config, acon, query) + assert qs[0][0] >= first_qs and qs[0][0] < 1 + + common.n_close((acon,))