Skip to content

Commit

Permalink
add metrics for compressed/uncompressed payload size
Browse files Browse the repository at this point in the history
  • Loading branch information
wperron committed Feb 8, 2024
1 parent 915bc61 commit aad66b7
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 78 deletions.
6 changes: 5 additions & 1 deletion examples/openresty/nginx.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ init_worker_by_lua_block {
local attr = require("opentelemetry.attribute")

-- create exporter
local exporter = otlp_exporter_new(exporter_client_new("otel-collector:4317", 3))
local use_gzip = true
local headers = {}
headers["Content-Encoding"] = "gzip"
local exporter_timeout = 10000
local exporter = otlp_exporter_new(exporter_client_new("otel-collector:4317", 3, headers), exporter_timeout, use_gzip)
-- create span processor
local batch_span_processor = batch_span_processor_new(exporter)
-- create tracer provider
Expand Down
21 changes: 7 additions & 14 deletions lib/opentelemetry/trace/exporter/http_client.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
local http = require("resty.http")
local zlib = require("zlib")
local otel_global = require("opentelemetry.global")
local exporter_request_compressed_payload_size = "otel.otlp_exporter.request_compressed_payload_size"
local exporter_request_uncompressed_payload_size = "otel.otlp_exporter.request_uncompressed_payload_size"

local _M = {
}

Expand All @@ -14,6 +18,7 @@ local mt = {
-- @timeout export request timeout second
-- @headers export request headers
-- @httpc openresty http client instance
-- @use_gzip flag to enable gzip compression on request body
-- @return http client
------------------------------------------------------------------
function _M.new(address, timeout, headers, httpc)
Expand All @@ -31,26 +36,14 @@ function _M.new(address, timeout, headers, httpc)
headers = headers,
httpc = httpc,
}

return setmetatable(self, mt)
end

function _M.do_request(self, body, encode_gzip)
function _M.do_request(self, body)
self.httpc = self.httpc or http.new()

encode_gzip = encode_gzip or false
self.httpc:set_timeout(self.timeout * 1000)

if encode_gzip then
-- Compress (deflate) request body
-- the compression should be set to Best Compression and window size
-- should be set to 15+16, see reference below:
-- https://github.com/brimworks/lua-zlib/issues/4#issuecomment-26383801
self.headers["Content-Encoding"] = "gzip"
local deflate_stream = zlib.deflate(zlib.BEST_COMPRESSION, 15+16)
local compressed_body = deflate_stream(body, "finish")
body = compressed_body
end

local res, err = self.httpc:request_uri(self.uri, {
method = "POST",
headers = self.headers,
Expand Down
30 changes: 25 additions & 5 deletions lib/opentelemetry/trace/exporter/otlp.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ local mt = {
__index = _M
}

function _M.new(http_client, timeout_ms, circuit_reset_timeout_ms, circuit_open_threshold)
function _M.new(http_client, timeout_ms, use_gzip, circuit_reset_timeout_ms, circuit_open_threshold)
local self = {
client = http_client,
timeout_ms = timeout_ms or DEFAULT_TIMEOUT_MS,
use_gzip = use_gzip or false,
circuit = circuit.new({
reset_timeout_ms = circuit_reset_timeout_ms,
failure_threshold = circuit_open_threshold
})
}),
}
return setmetatable(self, mt)
end
Expand All @@ -35,7 +36,7 @@ end
-- @return true if call succeeded; false if call failed
-- @return nil if call succeeded; error message string if call failed
--------------------------------------------------------------------------------
local function call_collector(exporter, pb_encoded_body)
local function call_collector(exporter, pb_encoded_body, use_gzip)
local start_time_ms = util.gettimeofday_ms()
local failures = 0
local res
Expand All @@ -54,8 +55,27 @@ local function call_collector(exporter, pb_encoded_body)
return false, "Circuit breaker is open"
end

if use_gzip then
-- Compress (deflate) request body
-- the compression should be set to Best Compression and window size
-- should be set to 15+16, see reference below:
-- https://github.com/brimworks/lua-zlib/issues/4#issuecomment-26383801
local deflate_stream = zlib.deflate(zlib.BEST_COMPRESSION, 15+16)
local compressed_body, _, _, bytes_out = deflate_stream(pb_encoded_body, "finish")

otel_global.metrics_reporter:record_value(
exporter_request_uncompressed_payload_size, string.len(pb_encoded_body))
otel_global.metrics_reporter:record_value(
exporter_request_compressed_payload_size, bytes_out)

pb_encoded_body = compressed_body
else
otel_global.metrics_reporter:record_value(
exporter_request_uncompressed_payload_size, string.len(pb_encoded_body))
end

-- Make request
res, res_error = exporter.client:do_request(pb_encoded_body, true)
res, res_error = exporter.client:do_request(pb_encoded_body)
local after_time = util.gettimeofday_ms()
otel_global.metrics_reporter:record_value(
exporter_request_duration_metric, after_time - current_time)
Expand Down Expand Up @@ -101,7 +121,7 @@ function _M.export_spans(self, spans)
body.resource_spans[1].instrumentation_library_spans[1].spans,
encoder.for_otlp(span))
end
return call_collector(self, pb.encode(body))
return call_collector(self, pb.encode(body), self.use_gzip)
end

function _M.shutdown(self)
Expand Down
57 changes: 0 additions & 57 deletions spec/trace/exporter/http_client_spec.lua

This file was deleted.

40 changes: 39 additions & 1 deletion spec/trace/exporter/otlp_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,23 @@ local context = require "opentelemetry.context"
local tp = Global.get_tracer_provider()
local tracer = tp:tracer("test")

local function is_gzip(_, _)
return function(value)
-- check that the value starts with the two magic bytes 0x1f, 0x8b and
-- the compression method byte set to 0x08
-- reference: https://www.ietf.org/rfc/rfc1952.txt
return string.sub(value, 1, 3) == string.from_hex("1F8B08")
end
end

assert:register("matcher", "gzip", is_gzip)

function string.from_hex(str)
return (str:gsub('..', function (cc)
return string.char(tonumber(cc, 16))
end))
end

describe("export_spans", function()
it("invokes do_request when there are no failures", function()
local span
Expand All @@ -17,7 +34,28 @@ describe("export_spans", function()
stub(ngx, "log")
cb:export_spans({ span })
ngx.log:revert()
assert.spy(c.do_request).was_called_with(c, match.is_string(), match.is_all_of(match.is_boolean(), match.is_equal(true)))
assert.spy(c.do_request).was_called_with(c, match.is_all_of(match.is_string(), match.is_not_gzip()))
end)

it("invokes do_request with gzip compression when configured", function()
local span
local ctx = context.new()
ctx, span = tracer:start(ctx, "test span")
span:finish()

local headers = {}
headers["Content-Encoding"] = "gzip"
local c = client.new("http://localhost:8080", 10, headers)
spy.on(c, "do_request")

local cb = exporter.new(c, 10000, true)

-- Supress log message, since we expect it
stub(ngx, "log")

cb:export_spans({ span })
ngx.log:revert()
assert.spy(c.do_request).was_called_with(c, match.is_all_of(match.is_string(), match.is_gzip()))
end)

it("doesn't invoke protected_call when failures is equal to retry limit", function()
Expand Down

0 comments on commit aad66b7

Please sign in to comment.