Skip to content

Commit

Permalink
Backport ETCD fix from upsync-module
Browse files Browse the repository at this point in the history
  • Loading branch information
CallMeFoxie committed Jun 9, 2016
1 parent 7d2fcaf commit e8da4e3
Showing 1 changed file with 100 additions and 56 deletions.
156 changes: 100 additions & 56 deletions src/ngx_stream_upsync_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ static void ngx_stream_upsync_recv_handler(ngx_event_t *event);
static void ngx_stream_upsync_send_handler(ngx_event_t *event);
static void ngx_stream_upsync_timeout_handler(ngx_event_t *event);
static void ngx_stream_upsync_clean_event(void *upsync_server);
static ngx_int_t ngx_stream_upsync_parse_init(void *upsync_server);
static ngx_int_t ngx_stream_upsync_etcd_parse_init(void *upsync_server);
static ngx_int_t ngx_stream_upsync_consul_parse_init(void *upsync_server);
static ngx_int_t ngx_stream_upsync_dump_server(
ngx_stream_upsync_server_t *upsync_server);
static ngx_int_t ngx_stream_upsync_init_server(ngx_event_t *event);
Expand All @@ -204,7 +205,6 @@ static void ngx_stream_upsync_event_init(ngx_stream_upstream_rr_peer_t *peer,
ngx_stream_upsync_server_t *upsync_server);

static ngx_int_t ngx_stream_http_parser_init();
static void ngx_stream_http_parser_execute(ngx_stream_upsync_ctx_t *ctx);

static int ngx_stream_http_status(http_parser *p, const char *buf, size_t len);
static int ngx_stream_http_header_field_cb(http_parser *p, const char *buf,
Expand Down Expand Up @@ -329,15 +329,15 @@ static ngx_upsync_conf_t ngx_upsync_types[] = {
NGX_STREAM_UPSYNC_CONSUL,
ngx_stream_upsync_send_handler,
ngx_stream_upsync_recv_handler,
ngx_stream_upsync_parse_init,
ngx_stream_upsync_consul_parse_init,
ngx_stream_upsync_consul_parse_json,
ngx_stream_upsync_clean_event },

{ ngx_string("etcd"),
NGX_STREAM_UPSYNC_ETCD,
ngx_stream_upsync_send_handler,
ngx_stream_upsync_recv_handler,
ngx_stream_upsync_parse_init,
ngx_stream_upsync_etcd_parse_init,
ngx_stream_upsync_etcd_parse_json,
ngx_stream_upsync_clean_event },

Expand Down Expand Up @@ -1347,6 +1347,18 @@ ngx_stream_upsync_etcd_parse_json(void *data)
"upsync_parse_json: root error");
return NGX_ERROR;
}

cJSON *errorCode = cJSON_GetObjectItem(root, "errorCode");

if (errorCode != NULL) {
if (errorCode->valueint == 401) { // trigger reload, we've gone too far with index
upsync_server->index = 0;
upsync_type_conf->clean(upsync_server);
ngx_add_timer(&upsync_server->upsync_ev, 0);
}
cJSON_Delete(root);
return NGX_ERROR;
}

cJSON *action = cJSON_GetObjectItem(root, "action");
if (action != NULL) {
Expand Down Expand Up @@ -2470,9 +2482,9 @@ ngx_stream_upsync_send_handler(ngx_event_t *event)

if (upsync_type_conf->upsync_type == NGX_STREAM_UPSYNC_ETCD) {
if (upsync_server->index != 0) {
ngx_sprintf(request, "GET %V?wait=true&recursive=true&waitIndex=%d"
ngx_sprintf(request, "GET %V?wait=true&recursive=true&waitIndex=%d"
" HTTP/1.0\r\nHost: %V\r\nAccept: */*\r\n\r\n",
&upscf->upsync_send, upsync_server->index,
&upscf->upsync_send, upsync_server->index,
&upscf->conf_server.name);

} else {
Expand Down Expand Up @@ -2637,33 +2649,100 @@ ngx_stream_upsync_recv_handler(ngx_event_t *event)


static ngx_int_t
ngx_stream_upsync_parse_init(void *data)
ngx_stream_upsync_consul_parse_init(void *data)
{
ngx_upsync_conf_t *upsync_type_conf;
char *buf;
size_t parsed;
ngx_stream_upsync_ctx_t *ctx;
ngx_stream_upsync_server_t *upsync_server = data;

upsync_type_conf = upsync_server->upscf->upsync_type_conf;
ctx = &upsync_server->ctx;

if (upsync_type_conf->upsync_type == NGX_STREAM_UPSYNC_CONSUL
|| upsync_type_conf->upsync_type == NGX_STREAM_UPSYNC_ETCD)
{
if (ngx_stream_http_parser_init() == NGX_ERROR) {
return NGX_ERROR;
if (ngx_stream_http_parser_init() == NGX_ERROR) {
return NGX_ERROR;
}

buf = (char *)ctx->recv.pos;

ctx->body.pos = ctx->body.last = NULL;

parsed = http_parser_execute(parser, &settings, buf, ngx_strlen(buf));
if (parsed != ngx_strlen(buf)) {
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0,
"upsync_consul_parse_init: parsed body size is wrong");
return NGX_ERROR;
}

if (ngx_strncmp(state.status, "OK", 2) == 0) {

if (ngx_strlen(state.http_body) != 0) {
ctx->body.pos = state.http_body;
ctx->body.last = state.http_body + ngx_strlen(state.http_body);

}
}

ngx_stream_http_parser_execute(ctx);
if (ctx->body.pos != ctx->body.last) {
*(ctx->body.last + 1) = '\0';
if (parser != NULL) {
ngx_free(parser);
parser = NULL;
}

if (ctx->body.pos != ctx->body.last) {
*(ctx->body.last + 1) = '\0';

} else {
return NGX_ERROR;
}

return NGX_OK;
}


static ngx_int_t
ngx_stream_upsync_etcd_parse_init(void *data)
{
char *buf;
size_t parsed;
ngx_stream_upsync_ctx_t *ctx;
ngx_stream_upsync_server_t *upsync_server = data;

ctx = &upsync_server->ctx;

if (ngx_stream_http_parser_init() == NGX_ERROR) {
return NGX_ERROR;
}

buf = (char *)ctx->recv.pos;

ctx->body.pos = ctx->body.last = NULL;

parsed = http_parser_execute(parser, &settings, buf, ngx_strlen(buf));
if (parsed != ngx_strlen(buf)) {
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0,
"upsync_etcd_parse_init: parsed body size is wrong");
return NGX_ERROR;
}

if (ngx_strncmp(state.status, "OK", 2) == 0
|| ngx_strncmp(state.status, "Bad", 3) == 0) {

if (ngx_strlen(state.http_body) != 0) {
ctx->body.pos = state.http_body;
ctx->body.last = state.http_body + ngx_strlen(state.http_body);

} else {
return NGX_ERROR;
}
}

if (parser != NULL) {
ngx_free(parser);
parser = NULL;
}

if (ctx->body.pos != ctx->body.last) {
*(ctx->body.last + 1) = '\0';

} else {
ctx->body.pos = ctx->recv.pos;
ctx->body.last = ctx->recv.last;
return NGX_ERROR;
}

return NGX_OK;
Expand Down Expand Up @@ -3032,41 +3111,6 @@ ngx_stream_http_parser_init()
}


static void
ngx_stream_http_parser_execute(ngx_stream_upsync_ctx_t *ctx)
{
char *buf;
size_t parsed;

buf = (char *)ctx->recv.pos;

ctx->body.pos = ctx->body.last = NULL;

parsed = http_parser_execute(parser, &settings, buf, ngx_strlen(buf));
if (parsed != ngx_strlen(buf)) {
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0,
"http_parser_execute: parsed body size is wrong");
return;
}

if (ngx_strncmp(state.status, "OK", 2) == 0) {

if (ngx_strlen(state.http_body) != 0) {
ctx->body.pos = state.http_body;
ctx->body.last = state.http_body + ngx_strlen(state.http_body);

} else if (ngx_strlen(state.http_body) == 0) {
return;
}
}

ngx_free(parser);
parser = NULL;

return;
}


static int
ngx_stream_http_status(http_parser *p, const char *buf, size_t len)
{
Expand Down

0 comments on commit e8da4e3

Please sign in to comment.