diff --git a/src/ngx_stream_upsync_module.c b/src/ngx_stream_upsync_module.c index 7bc5e28..cf6a88f 100644 --- a/src/ngx_stream_upsync_module.c +++ b/src/ngx_stream_upsync_module.c @@ -183,7 +183,8 @@ static void ngx_stream_upsync_recv_handler(ngx_event_t *event); static void ngx_stream_upsync_send_handler(ngx_event_t *event); static void ngx_stream_upsync_timeout_handler(ngx_event_t *event); static void ngx_stream_upsync_clean_event(void *upsync_server); -static ngx_int_t ngx_stream_upsync_parse_init(void *upsync_server); +static ngx_int_t ngx_stream_upsync_etcd_parse_init(void *upsync_server); +static ngx_int_t ngx_stream_upsync_consul_parse_init(void *upsync_server); static ngx_int_t ngx_stream_upsync_dump_server( ngx_stream_upsync_server_t *upsync_server); static ngx_int_t ngx_stream_upsync_init_server(ngx_event_t *event); @@ -204,7 +205,6 @@ static void ngx_stream_upsync_event_init(ngx_stream_upstream_rr_peer_t *peer, ngx_stream_upsync_server_t *upsync_server); static ngx_int_t ngx_stream_http_parser_init(); -static void ngx_stream_http_parser_execute(ngx_stream_upsync_ctx_t *ctx); static int ngx_stream_http_status(http_parser *p, const char *buf, size_t len); static int ngx_stream_http_header_field_cb(http_parser *p, const char *buf, @@ -329,7 +329,7 @@ static ngx_upsync_conf_t ngx_upsync_types[] = { NGX_STREAM_UPSYNC_CONSUL, ngx_stream_upsync_send_handler, ngx_stream_upsync_recv_handler, - ngx_stream_upsync_parse_init, + ngx_stream_upsync_consul_parse_init, ngx_stream_upsync_consul_parse_json, ngx_stream_upsync_clean_event }, @@ -337,7 +337,7 @@ static ngx_upsync_conf_t ngx_upsync_types[] = { NGX_STREAM_UPSYNC_ETCD, ngx_stream_upsync_send_handler, ngx_stream_upsync_recv_handler, - ngx_stream_upsync_parse_init, + ngx_stream_upsync_etcd_parse_init, ngx_stream_upsync_etcd_parse_json, ngx_stream_upsync_clean_event }, @@ -1347,6 +1347,18 @@ ngx_stream_upsync_etcd_parse_json(void *data) "upsync_parse_json: root error"); return NGX_ERROR; } + + cJSON *errorCode = cJSON_GetObjectItem(root, "errorCode"); + + if (errorCode != NULL) { + if (errorCode->valueint == 401) { // trigger reload, we've gone too far with index + upsync_server->index = 0; + upsync_type_conf->clean(upsync_server); + ngx_add_timer(&upsync_server->upsync_ev, 0); + } + cJSON_Delete(root); + return NGX_ERROR; + } cJSON *action = cJSON_GetObjectItem(root, "action"); if (action != NULL) { @@ -2470,9 +2482,9 @@ ngx_stream_upsync_send_handler(ngx_event_t *event) if (upsync_type_conf->upsync_type == NGX_STREAM_UPSYNC_ETCD) { if (upsync_server->index != 0) { - ngx_sprintf(request, "GET %V?wait=true&recursive=true&waitIndex=%d" + ngx_sprintf(request, "GET %V?wait=true&recursive=true&waitIndex=%d" " HTTP/1.0\r\nHost: %V\r\nAccept: */*\r\n\r\n", - &upscf->upsync_send, upsync_server->index, + &upscf->upsync_send, upsync_server->index, &upscf->conf_server.name); } else { @@ -2637,33 +2649,100 @@ ngx_stream_upsync_recv_handler(ngx_event_t *event) static ngx_int_t -ngx_stream_upsync_parse_init(void *data) +ngx_stream_upsync_consul_parse_init(void *data) { - ngx_upsync_conf_t *upsync_type_conf; + char *buf; + size_t parsed; ngx_stream_upsync_ctx_t *ctx; ngx_stream_upsync_server_t *upsync_server = data; - upsync_type_conf = upsync_server->upscf->upsync_type_conf; ctx = &upsync_server->ctx; - if (upsync_type_conf->upsync_type == NGX_STREAM_UPSYNC_CONSUL - || upsync_type_conf->upsync_type == NGX_STREAM_UPSYNC_ETCD) - { - if (ngx_stream_http_parser_init() == NGX_ERROR) { - return NGX_ERROR; + if (ngx_stream_http_parser_init() == NGX_ERROR) { + return NGX_ERROR; + } + + buf = (char *)ctx->recv.pos; + + ctx->body.pos = ctx->body.last = NULL; + + parsed = http_parser_execute(parser, &settings, buf, ngx_strlen(buf)); + if (parsed != ngx_strlen(buf)) { + ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, + "upsync_consul_parse_init: parsed body size is wrong"); + return NGX_ERROR; + } + + if (ngx_strncmp(state.status, "OK", 2) == 0) { + + if (ngx_strlen(state.http_body) != 0) { + ctx->body.pos = state.http_body; + ctx->body.last = state.http_body + ngx_strlen(state.http_body); + } + } - ngx_stream_http_parser_execute(ctx); - if (ctx->body.pos != ctx->body.last) { - *(ctx->body.last + 1) = '\0'; + if (parser != NULL) { + ngx_free(parser); + parser = NULL; + } + + if (ctx->body.pos != ctx->body.last) { + *(ctx->body.last + 1) = '\0'; + + } else { + return NGX_ERROR; + } + + return NGX_OK; +} + + +static ngx_int_t +ngx_stream_upsync_etcd_parse_init(void *data) +{ + char *buf; + size_t parsed; + ngx_stream_upsync_ctx_t *ctx; + ngx_stream_upsync_server_t *upsync_server = data; + + ctx = &upsync_server->ctx; + + if (ngx_stream_http_parser_init() == NGX_ERROR) { + return NGX_ERROR; + } + + buf = (char *)ctx->recv.pos; + + ctx->body.pos = ctx->body.last = NULL; + + parsed = http_parser_execute(parser, &settings, buf, ngx_strlen(buf)); + if (parsed != ngx_strlen(buf)) { + ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, + "upsync_etcd_parse_init: parsed body size is wrong"); + return NGX_ERROR; + } + + if (ngx_strncmp(state.status, "OK", 2) == 0 + || ngx_strncmp(state.status, "Bad", 3) == 0) { + + if (ngx_strlen(state.http_body) != 0) { + ctx->body.pos = state.http_body; + ctx->body.last = state.http_body + ngx_strlen(state.http_body); - } else { - return NGX_ERROR; } + } + + if (parser != NULL) { + ngx_free(parser); + parser = NULL; + } + + if (ctx->body.pos != ctx->body.last) { + *(ctx->body.last + 1) = '\0'; } else { - ctx->body.pos = ctx->recv.pos; - ctx->body.last = ctx->recv.last; + return NGX_ERROR; } return NGX_OK; @@ -3032,41 +3111,6 @@ ngx_stream_http_parser_init() } -static void -ngx_stream_http_parser_execute(ngx_stream_upsync_ctx_t *ctx) -{ - char *buf; - size_t parsed; - - buf = (char *)ctx->recv.pos; - - ctx->body.pos = ctx->body.last = NULL; - - parsed = http_parser_execute(parser, &settings, buf, ngx_strlen(buf)); - if (parsed != ngx_strlen(buf)) { - ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, - "http_parser_execute: parsed body size is wrong"); - return; - } - - if (ngx_strncmp(state.status, "OK", 2) == 0) { - - if (ngx_strlen(state.http_body) != 0) { - ctx->body.pos = state.http_body; - ctx->body.last = state.http_body + ngx_strlen(state.http_body); - - } else if (ngx_strlen(state.http_body) == 0) { - return; - } - } - - ngx_free(parser); - parser = NULL; - - return; -} - - static int ngx_stream_http_status(http_parser *p, const char *buf, size_t len) {