Skip to content

Commit

Permalink
Collect prefetch worker statistics whenever a query is popped by a
Browse files Browse the repository at this point in the history
worker.
  • Loading branch information
Davi Arnaut committed Apr 17, 2012
1 parent 56f7ebf commit a980f94
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 25 deletions.
50 changes: 31 additions & 19 deletions prefetch_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,29 @@ static char* convert_to_select(const std::string query, uint *length)
return NULL;
}

struct worker_stats_t
{
uint64_t popped_queries;
uint64_t old_queries;
uint64_t discarded_queries;
uint64_t converted_queries;
uint64_t executed_selects;
uint64_t error_selects;
};

static void update_stats(worker_stats_t *stats)
{
static const worker_stats_t reset= {0};
pthread_mutex_lock(&worker_mutex);
stat_popped_queries += stats->popped_queries;
stat_old_queries += stats->old_queries;
stat_discarded_queries += stats->discarded_queries;
stat_converted_queries += stats->converted_queries;
stat_executed_selects += stats->executed_selects;
stat_error_selects += stats->error_selects;
pthread_mutex_unlock(&worker_mutex);
*stats= reset;
}

void* prefetch_worker(void *worker_info)
{
Expand All @@ -96,12 +119,7 @@ void* prefetch_worker(void *worker_info)
char current_db[1024]= "";
worker_info_t *info= (worker_info_t*)worker_info;
uint worker_id= info->worker_id;
uint64_t popped_queries= 0;
uint64_t old_queries= 0;
uint64_t discarded_queries= 0;
uint64_t converted_queries= 0;
uint64_t executed_selects= 0;
uint64_t error_selects= 0;
worker_stats_t stats= {0};
my_bool reconnect= true;

mysql= mysql_init(NULL);
Expand All @@ -122,17 +140,18 @@ void* prefetch_worker(void *worker_info)

while (1)
{
update_stats(&stats);
query= queue[worker_id]->wait_and_pop();
if (query->shutdown)
{
delete query;
goto end;
}
popped_queries++;
stats.popped_queries++;

if (query->pos <= sql_thread_pos)
{
old_queries++;
stats.old_queries++;
free_query(query);
continue;
}
Expand All @@ -142,7 +161,7 @@ void* prefetch_worker(void *worker_info)
char* select_query= convert_to_select(qev->query, &select_len);
if (select_query != NULL)
{
converted_queries++;
stats.converted_queries++;
// database has changed
if (strcmp(current_db, qev->db_name.c_str()))
{
Expand All @@ -158,10 +177,10 @@ void* prefetch_worker(void *worker_info)
if (ret)
{
print_log("ERROR: Got error on query. Error code:%d message:%s. query:%s", mysql_errno(mysql),mysql_error(mysql), select_query);
error_selects++;
stats.error_selects++;
} else
{
executed_selects++;
stats.executed_selects++;
}
free_query(query, select_query);
result = mysql_store_result(mysql);
Expand All @@ -181,14 +200,7 @@ void* prefetch_worker(void *worker_info)
if (mysql)
mysql_close(mysql);
mysql_thread_end();
pthread_mutex_lock(&worker_mutex);
stat_popped_queries += popped_queries;
stat_old_queries += old_queries;
stat_discarded_queries += discarded_queries;
stat_converted_queries += converted_queries;
stat_executed_selects += executed_selects;
stat_error_selects += error_selects;
pthread_mutex_unlock(&worker_mutex);
update_stats(&stats);
pthread_exit(0);
}

24 changes: 18 additions & 6 deletions replication_booster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -479,18 +479,30 @@ static void print_status(FILE *stream)

static void print_statistics(FILE *stream)
{
uint64_t popped_queries, old_queries, discarded_queries;
uint64_t converted_queries, executed_selects, error_selects;

pthread_mutex_lock(&worker_mutex);
popped_queries = stat_popped_queries;
old_queries = stat_old_queries;
discarded_queries = stat_discarded_queries;
converted_queries = stat_converted_queries;
executed_selects = stat_executed_selects;
error_selects = stat_error_selects;
pthread_mutex_unlock(&worker_mutex);

fprintf(stream, "Statistics:\n");
fprintf(stream, " Parsed binlog events: %lu\n", stat_parsed_binlog_events);
fprintf(stream, " Skipped binlog events by offset: %lu\n", stat_skipped_binlog_events);
fprintf(stream, " Unrelated binlog events: %lu\n", stat_unrelated_binlog_events);
fprintf(stream, " Queries discarded in front: %lu\n", stat_discarded_in_front_queries);
fprintf(stream, " Queries pushed to workers: %lu\n", stat_pushed_queries);
fprintf(stream, " Queries popped by workers: %lu\n", stat_popped_queries);
fprintf(stream, " Old queries popped by workers: %lu\n", stat_old_queries);
fprintf(stream, " Queries discarded by workers: %lu\n", stat_discarded_queries);
fprintf(stream, " Queries converted to select: %lu\n", stat_converted_queries);
fprintf(stream, " Executed SELECT queries: %lu\n", stat_executed_selects);
fprintf(stream, " Error SELECT queries: %lu\n", stat_error_selects);
fprintf(stream, " Queries popped by workers: %lu\n", popped_queries);
fprintf(stream, " Old queries popped by workers: %lu\n", old_queries);
fprintf(stream, " Queries discarded by workers: %lu\n", discarded_queries);
fprintf(stream, " Queries converted to select: %lu\n", converted_queries);
fprintf(stream, " Executed SELECT queries: %lu\n", executed_selects);
fprintf(stream, " Error SELECT queries: %lu\n", error_selects);
fprintf(stream, " Number of times to read relay log limit: %lu\n", stat_reached_ahead_relay_log);
fprintf(stream, " Number of times to reach end of relay log: %lu\n", stat_reached_end_of_relay_log);
}
Expand Down

0 comments on commit a980f94

Please sign in to comment.