From 26c87acb97b2d43cad79fee432b2354eda5e32c8 Mon Sep 17 00:00:00 2001 From: "yunjin.choi" Date: Mon, 16 Jan 2017 12:32:17 +0100 Subject: [PATCH] added extra parameter to configure protocol between tcp and udp --- src/ngx_http_graphite_module.c | 94 +++++++++++++++++++++++++++++++--- 1 file changed, 88 insertions(+), 6 deletions(-) diff --git a/src/ngx_http_graphite_module.c b/src/ngx_http_graphite_module.c index c57df92..314390e 100644 --- a/src/ngx_http_graphite_module.c +++ b/src/ngx_http_graphite_module.c @@ -8,6 +8,7 @@ typedef struct { ngx_uint_t enable; ngx_str_t host; + ngx_str_t protocol; ngx_shm_zone_t *shared; char *buffer; @@ -84,6 +85,8 @@ static char *ngx_http_graphite_config_arg_shared(ngx_conf_t *cf, ngx_command_t * static char *ngx_http_graphite_config_arg_buffer(ngx_conf_t *cf, ngx_command_t *cmd, void *conf, ngx_str_t *value); static char *ngx_http_graphite_config_arg_package(ngx_conf_t *cf, ngx_command_t *cmd, void *conf, ngx_str_t *value); static char *ngx_http_graphite_config_arg_template(ngx_conf_t *cf, ngx_command_t *cmd, void *conf, ngx_str_t *value); +static char *ngx_http_graphite_config_arg_protocol(ngx_conf_t *cf, ngx_command_t *cmd, void *conf, ngx_str_t *value); + static char *ngx_http_graphite_param_arg_name(ngx_conf_t *cf, ngx_command_t *cmd, void *data, ngx_str_t *value); static char *ngx_http_graphite_param_arg_aggregate(ngx_conf_t *cf, ngx_command_t *cmd, void *data, ngx_str_t *value); @@ -218,7 +221,7 @@ typedef struct ngx_http_graphite_arg_s { ngx_str_t deflt; } ngx_http_graphite_arg_t; -#define CONFIG_ARGS_COUNT 11 +#define CONFIG_ARGS_COUNT 12 static const ngx_http_graphite_arg_t ngx_http_graphite_config_args[CONFIG_ARGS_COUNT] = { { ngx_string("prefix"), ngx_http_graphite_config_arg_prefix, ngx_null_string }, @@ -236,6 +239,7 @@ static const ngx_http_graphite_arg_t ngx_http_graphite_config_args[CONFIG_ARGS_C { ngx_string("buffer"), ngx_http_graphite_config_arg_buffer, ngx_string("64k") }, { ngx_string("package"), ngx_http_graphite_config_arg_package, ngx_string("1400") }, { ngx_string("template"), ngx_http_graphite_config_arg_template, ngx_null_string }, + { ngx_string("protocol"), ngx_http_graphite_config_arg_protocol, ngx_string("udp") }, }; #define PARAM_ARGS_COUNT 3 @@ -692,6 +696,13 @@ ngx_http_graphite_config(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "graphite config server not set"); return NGX_CONF_ERROR; } + if (gmcf->protocol.len == 0){ + ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "graphite config protocol is not specified"); + return NGX_CONF_ERROR; + } else if (ngx_strcmp(gmcf->protocol.data, "udp") != 0 && ngx_strcmp(gmcf->protocol.data, "tcp") != 0) { + ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "graphite config invalid protocol"); + return NGX_CONF_ERROR; + } if (gmcf->port < 1 || gmcf->port > 65535) { ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "graphite config port must be in range form 1 to 65535"); @@ -971,6 +982,16 @@ ngx_http_graphite_config_arg_host(ngx_conf_t *cf, ngx_command_t *cmd, void *conf return ngx_http_graphite_parse_string(cf, value, &gmcf->host); } +static char * +ngx_http_graphite_config_arg_protocol(ngx_conf_t *cf, ngx_command_t *cmd, void *conf, ngx_str_t *value) { + + ngx_http_graphite_main_conf_t *gmcf; + + gmcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_graphite_module); + + return ngx_http_graphite_parse_string(cf, value, &gmcf->protocol); +} + #define SERVER_LEN 255 static char * @@ -1794,7 +1815,13 @@ ngx_http_graphite_timer_event_handler(ngx_event_t *ev) { char *next = NULL; char *nl = NULL; - int fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + int fd; + if (ngx_strcmp(gmcf->protocol.data, "tcp") == 0) { + fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + } else { + fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + } + if (fd < 0) { ngx_log_error(NGX_LOG_ALERT, ev->log, 0, "graphite can't create socket"); if (!(ngx_quit || ngx_terminate || ngx_exiting)) @@ -1802,6 +1829,53 @@ ngx_http_graphite_timer_event_handler(ngx_event_t *ev) { return; } + if (ngx_strcmp(gmcf->protocol.data, "tcp") == 0) { + + int flags; + + /* Set socket to non-blocking */ + if ((flags = fcntl(fd, F_GETFL, 0)) < 0) + { + ngx_log_error(NGX_LOG_ALERT, ev->log, 0, "failed to get the current non-blocking flag"); + if (!(ngx_quit || ngx_terminate || ngx_exiting)) + ngx_add_timer(ev, gmcf->frequency); + return; + } + + if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) + { + ngx_log_error(NGX_LOG_ALERT, ev->log, 0, "failed to set the socket non-blocking"); + if (!(ngx_quit || ngx_terminate || ngx_exiting)) + ngx_add_timer(ev, gmcf->frequency); + return; + } + int res = connect(fd, (struct sockaddr *)&sin, sizeof(sin)); + if (res < 0) { + if (errno != EINPROGRESS) { + ngx_log_error(NGX_LOG_ALERT, ev->log, 0, "Error connecting %d - %s", errno, strerror(errno)); + if (!(ngx_quit || ngx_terminate || ngx_exiting)) + ngx_add_timer(ev, gmcf->frequency); + return; + } + } + + // Set to blocking mode again... + if( (flags = fcntl(fd, F_GETFL, NULL)) < 0) { + ngx_log_error(NGX_LOG_ALERT, ev->log, 0, "failed to get the current non-blocking flag"); + if (!(ngx_quit || ngx_terminate || ngx_exiting)) + ngx_add_timer(ev, gmcf->frequency); + return; + } + flags &= (~O_NONBLOCK); + if( fcntl(fd, F_SETFL, flags) < 0) { + ngx_log_error(NGX_LOG_ALERT, ev->log, 0, "failed to set the socket blocking again"); + if (!(ngx_quit || ngx_terminate || ngx_exiting)) + ngx_add_timer(ev, gmcf->frequency); + return; + } + } + + while (*part) { next = part; nl = part; @@ -1810,11 +1884,19 @@ ngx_http_graphite_timer_event_handler(ngx_event_t *ev) { nl = next; next++; } - if (nl > part) { - - if (sendto(fd, part, nl - part + 1, 0, (struct sockaddr*)&sin, sizeof(sin)) == -1) - ngx_log_error(NGX_LOG_ALERT, ev->log, 0, "graphite can't send udp packet"); + if (ngx_strcmp(gmcf->protocol.data, "udp") == 0) { + if (sendto(fd, part, nl - part + 1, 0, (struct sockaddr*)&sin, sizeof(sin)) == -1) + ngx_log_error(NGX_LOG_ALERT, ev->log, 0, "graphite can't send udp packet"); + } else if (ngx_strcmp(gmcf->protocol.data, "tcp") == 0) { + if(send(fd, part, nl - part + 1, 0) < 0) { + if(errno == EWOULDBLOCK) { + ngx_log_error(NGX_LOG_ALERT, ev->log, 0, "buffer is full"); + } else { + ngx_log_error(NGX_LOG_ALERT, ev->log, 0, "graphite can't send tcp packet"); + } + } + } } else { ngx_log_error(NGX_LOG_ALERT, ev->log, 0, "graphite package size too small, need send %z", (size_t)(next - part));