From 4a2c4cb210d273c36e8e7435e6995a077f37ae4d Mon Sep 17 00:00:00 2001 From: Primoz Kolaric <5181461+pkolaric@users.noreply.github.com> Date: Tue, 24 Sep 2024 17:16:33 +0200 Subject: [PATCH 1/5] make it possible to run fluency without runtime dependency on jackson-databind --- .../fluentd/FluencyBuilderForFluentd.java | 22 ++++++++++------- .../fluentd/ingester/FluentdIngester.java | 20 +++++++++++----- .../ingester/sender/NetworkSender.java | 24 +++++++++---------- 3 files changed, 38 insertions(+), 28 deletions(-) diff --git a/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/FluencyBuilderForFluentd.java b/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/FluencyBuilderForFluentd.java index eac64f4d..11dd71cc 100644 --- a/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/FluencyBuilderForFluentd.java +++ b/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/FluencyBuilderForFluentd.java @@ -18,11 +18,7 @@ import org.komamitsu.fluency.Fluency; import org.komamitsu.fluency.fluentd.ingester.FluentdIngester; -import org.komamitsu.fluency.fluentd.ingester.sender.FluentdSender; -import org.komamitsu.fluency.fluentd.ingester.sender.MultiSender; -import org.komamitsu.fluency.fluentd.ingester.sender.RetryableSender; -import org.komamitsu.fluency.fluentd.ingester.sender.SSLSender; -import org.komamitsu.fluency.fluentd.ingester.sender.TCPSender; +import org.komamitsu.fluency.fluentd.ingester.sender.*; import org.komamitsu.fluency.fluentd.ingester.sender.failuredetect.FailureDetector; import org.komamitsu.fluency.fluentd.ingester.sender.failuredetect.PhiAccrualFailureDetectStrategy; import org.komamitsu.fluency.fluentd.ingester.sender.heartbeat.SSLHeartbeater; @@ -30,9 +26,10 @@ import org.komamitsu.fluency.fluentd.ingester.sender.retry.ExponentialBackOffRetryStrategy; import org.komamitsu.fluency.fluentd.recordformat.FluentdRecordFormatter; import org.komamitsu.fluency.ingester.Ingester; +import org.komamitsu.fluency.recordformat.RecordFormatter; -import java.net.InetSocketAddress; import javax.net.ssl.SSLSocketFactory; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; @@ -47,7 +44,14 @@ public class FluencyBuilderForFluentd private SSLSocketFactory sslSocketFactory; protected Integer connectionTimeoutMilli; protected Integer readTimeoutMilli; - protected FluentdRecordFormatter recordFormatter = new FluentdRecordFormatter(); + protected RecordFormatter recordFormatter; + + public FluencyBuilderForFluentd() { + this(new FluentdRecordFormatter()); + } + public FluencyBuilderForFluentd (RecordFormatter recordFormatter) { + this.recordFormatter = recordFormatter; + } public Integer getSenderMaxRetryCount() { @@ -124,12 +128,12 @@ public void setReadTimeoutMilli(Integer readTimeoutMilli) this.readTimeoutMilli = readTimeoutMilli; } - public FluentdRecordFormatter getRecordFormatter() + public RecordFormatter getRecordFormatter() { return recordFormatter; } - public void setRecordFormatter(FluentdRecordFormatter recordFormatter) + public void setRecordFormatter(RecordFormatter recordFormatter) { this.recordFormatter = recordFormatter; } diff --git a/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/ingester/FluentdIngester.java b/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/ingester/FluentdIngester.java index 23a6f5a5..c9d52d93 100644 --- a/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/ingester/FluentdIngester.java +++ b/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/ingester/FluentdIngester.java @@ -16,14 +16,12 @@ package org.komamitsu.fluency.fluentd.ingester; -import com.fasterxml.jackson.databind.ObjectMapper; import org.komamitsu.fluency.fluentd.ingester.sender.FluentdSender; -import org.komamitsu.fluency.fluentd.ingester.sender.RequestOption; import org.komamitsu.fluency.ingester.Ingester; import org.komamitsu.fluency.ingester.sender.Sender; +import org.msgpack.core.MessageBufferPacker; import org.msgpack.core.MessagePack; import org.msgpack.core.MessagePacker; -import org.msgpack.jackson.dataformat.MessagePackFactory; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -37,7 +35,6 @@ public class FluentdIngester { private final Config config; private final FluentdSender sender; - private final ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory()); public FluentdIngester(FluentdSender sender) { @@ -72,7 +69,7 @@ public void ingest(String tag, ByteBuffer dataBuffer) // discussion on issue #181. String token = UUID.randomUUID().toString(); - ByteBuffer optionBuffer = ByteBuffer.wrap(objectMapper.writeValueAsBytes(new RequestOption(dataLength, token))); + ByteBuffer optionBuffer = packRequestOption(dataLength, token); List buffers = Arrays.asList(headerBuffer, dataBuffer, optionBuffer); synchronized (sender) { @@ -80,7 +77,7 @@ public void ingest(String tag, ByteBuffer dataBuffer) } } else { - ByteBuffer optionBuffer = ByteBuffer.wrap(objectMapper.writeValueAsBytes(new RequestOption(dataLength, null))); + ByteBuffer optionBuffer = packRequestOption(dataLength, null); List buffers = Arrays.asList(headerBuffer, dataBuffer, optionBuffer); synchronized (sender) { @@ -89,6 +86,17 @@ public void ingest(String tag, ByteBuffer dataBuffer) } } + private static ByteBuffer packRequestOption(int dataLength, String token) throws IOException { + MessageBufferPacker packer = MessagePack.newDefaultBufferPacker(); + packer.packMapHeader(token == null ? 1 : 2); + packer.packString("size").packInt(dataLength); + if (token != null) { + packer.packString("chunk").packString(token); + } + packer.close(); + return ByteBuffer.wrap(packer.toByteArray()); + } + @Override public Sender getSender() { diff --git a/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/ingester/sender/NetworkSender.java b/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/ingester/sender/NetworkSender.java index 8be5d26a..7f2f7398 100644 --- a/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/ingester/sender/NetworkSender.java +++ b/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/ingester/sender/NetworkSender.java @@ -16,13 +16,11 @@ package org.komamitsu.fluency.fluentd.ingester.sender; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.komamitsu.fluency.fluentd.ingester.Response; import org.komamitsu.fluency.fluentd.ingester.sender.failuredetect.FailureDetector; import org.komamitsu.fluency.util.ExecutorServiceUtils; -import org.komamitsu.fluency.validation.Validatable; import org.komamitsu.fluency.validation.annotation.Min; -import org.msgpack.jackson.dataformat.MessagePackFactory; +import org.msgpack.core.MessagePack; +import org.msgpack.core.MessageUnpacker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,11 +28,7 @@ import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; public abstract class NetworkSender extends FluentdSender @@ -45,7 +39,7 @@ public abstract class NetworkSender private final Config config; private final FailureDetector failureDetector; - private final ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory()); + NetworkSender(FailureDetector failureDetector) { @@ -124,9 +118,13 @@ protected synchronized void sendInternal(List buffers, String ackTok throw new SocketTimeoutException("Socket read timeout"); } - Response response = objectMapper.readValue(optionBuffer, Response.class); - if (!ackToken.equals(response.getAck())) { - throw new UnmatchedAckException("Ack tokens don't matched: expected=" + ", got=" + response.getAck()); + try (MessageUnpacker responseUnpacker = MessagePack.newDefaultUnpacker(optionBuffer)) { + responseUnpacker.unpackMapHeader(); + responseUnpacker.unpackString(); + String responseAckToken = responseUnpacker.unpackString(); + if (!ackToken.equals(responseAckToken)) { + throw new UnmatchedAckException("Ack tokens don't matched: expected=" + ", got=" + responseAckToken); + } } } catch (IOException e) { From 344429f4593f880f4fbd4e89155260b5de80d20f Mon Sep 17 00:00:00 2001 From: Primoz Kolaric <5181461+pkolaric@users.noreply.github.com> Date: Tue, 8 Oct 2024 15:23:01 +0200 Subject: [PATCH 2/5] - formatting - try-with-resources - deserialize as an arbitrary map --- .../fluentd/FluencyBuilderForFluentd.java | 3 ++- .../fluentd/ingester/FluentdIngester.java | 14 +++++++------- .../fluentd/ingester/sender/NetworkSender.java | 17 ++++++++++++----- 3 files changed, 21 insertions(+), 13 deletions(-) diff --git a/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/FluencyBuilderForFluentd.java b/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/FluencyBuilderForFluentd.java index 11dd71cc..e61ac018 100644 --- a/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/FluencyBuilderForFluentd.java +++ b/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/FluencyBuilderForFluentd.java @@ -49,7 +49,8 @@ public class FluencyBuilderForFluentd public FluencyBuilderForFluentd() { this(new FluentdRecordFormatter()); } - public FluencyBuilderForFluentd (RecordFormatter recordFormatter) { + + public FluencyBuilderForFluentd(RecordFormatter recordFormatter) { this.recordFormatter = recordFormatter; } diff --git a/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/ingester/FluentdIngester.java b/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/ingester/FluentdIngester.java index c9d52d93..70d13e69 100644 --- a/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/ingester/FluentdIngester.java +++ b/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/ingester/FluentdIngester.java @@ -87,14 +87,14 @@ public void ingest(String tag, ByteBuffer dataBuffer) } private static ByteBuffer packRequestOption(int dataLength, String token) throws IOException { - MessageBufferPacker packer = MessagePack.newDefaultBufferPacker(); - packer.packMapHeader(token == null ? 1 : 2); - packer.packString("size").packInt(dataLength); - if (token != null) { - packer.packString("chunk").packString(token); + try (MessageBufferPacker packer = MessagePack.newDefaultBufferPacker()) { + packer.packMapHeader(token == null ? 1 : 2); + packer.packString("size").packInt(dataLength); + if (token != null) { + packer.packString("chunk").packString(token); + } + return ByteBuffer.wrap(packer.toByteArray()); } - packer.close(); - return ByteBuffer.wrap(packer.toByteArray()); } @Override diff --git a/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/ingester/sender/NetworkSender.java b/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/ingester/sender/NetworkSender.java index 7f2f7398..77754eed 100644 --- a/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/ingester/sender/NetworkSender.java +++ b/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/ingester/sender/NetworkSender.java @@ -119,13 +119,20 @@ protected synchronized void sendInternal(List buffers, String ackTok } try (MessageUnpacker responseUnpacker = MessagePack.newDefaultUnpacker(optionBuffer)) { - responseUnpacker.unpackMapHeader(); - responseUnpacker.unpackString(); - String responseAckToken = responseUnpacker.unpackString(); - if (!ackToken.equals(responseAckToken)) { - throw new UnmatchedAckException("Ack tokens don't matched: expected=" + ", got=" + responseAckToken); + for (int i=0;i Date: Fri, 11 Oct 2024 13:07:12 +0200 Subject: [PATCH 3/5] Update fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/ingester/sender/NetworkSender.java Co-authored-by: Mitsunori Komatsu --- .../fluency/fluentd/ingester/sender/NetworkSender.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/ingester/sender/NetworkSender.java b/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/ingester/sender/NetworkSender.java index 77754eed..ea9bf683 100644 --- a/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/ingester/sender/NetworkSender.java +++ b/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/ingester/sender/NetworkSender.java @@ -119,7 +119,7 @@ protected synchronized void sendInternal(List buffers, String ackTok } try (MessageUnpacker responseUnpacker = MessagePack.newDefaultUnpacker(optionBuffer)) { - for (int i=0;i Date: Fri, 11 Oct 2024 13:09:13 +0200 Subject: [PATCH 4/5] removing RequestOption class --- .../ingester/sender/RequestOption.java | 56 ------------------- 1 file changed, 56 deletions(-) delete mode 100644 fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/ingester/sender/RequestOption.java diff --git a/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/ingester/sender/RequestOption.java b/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/ingester/sender/RequestOption.java deleted file mode 100644 index 3855a259..00000000 --- a/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/ingester/sender/RequestOption.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright 2019 Mitsunori Komatsu (komamitsu) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.komamitsu.fluency.fluentd.ingester.sender; - -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.annotation.JsonProperty; - -@JsonInclude(JsonInclude.Include.NON_NULL) -public class RequestOption -{ - private final int size; - private final String chunk; - - public RequestOption( - @JsonProperty("size") int size, - @JsonProperty("chunk") String chunk) - { - this.size = size; - this.chunk = chunk; - } - - @JsonProperty("size") - public int getSize() - { - return size; - } - - @JsonProperty("chunk") - public String getChunk() - { - return chunk; - } - - @Override - public String toString() - { - return "RequestOption{" + - "size=" + size + - ", chunk=" + chunk + - '}'; - } -} From 17d4694b9c12ae61aeaa1fd34283bfb1fbc8e0e5 Mon Sep 17 00:00:00 2001 From: Primoz Kolaric <5181461+pkolaric@users.noreply.github.com> Date: Fri, 11 Oct 2024 22:42:00 +0200 Subject: [PATCH 5/5] default recordFormatter value via prepareRecordFormatter() --- .../fluentd/FluencyExtBuilderForFluentd.java | 6 +---- .../fluentd/FluencyBuilderForFluentd.java | 27 ++++++++++--------- 2 files changed, 15 insertions(+), 18 deletions(-) diff --git a/fluency-fluentd-ext/src/main/java/org/komamitsu/fluency/fluentd/FluencyExtBuilderForFluentd.java b/fluency-fluentd-ext/src/main/java/org/komamitsu/fluency/fluentd/FluencyExtBuilderForFluentd.java index f74beb84..d6ea5b1b 100644 --- a/fluency-fluentd-ext/src/main/java/org/komamitsu/fluency/fluentd/FluencyExtBuilderForFluentd.java +++ b/fluency-fluentd-ext/src/main/java/org/komamitsu/fluency/fluentd/FluencyExtBuilderForFluentd.java @@ -18,16 +18,12 @@ import org.komamitsu.fluency.Fluency; import org.komamitsu.fluency.fluentd.ingester.sender.FluentdSender; -import org.komamitsu.fluency.fluentd.ingester.sender.MultiSender; import org.komamitsu.fluency.fluentd.ingester.sender.UnixSocketSender; import org.komamitsu.fluency.fluentd.ingester.sender.failuredetect.FailureDetector; import org.komamitsu.fluency.fluentd.ingester.sender.failuredetect.PhiAccrualFailureDetectStrategy; import org.komamitsu.fluency.fluentd.ingester.sender.heartbeat.UnixSocketHeartbeater; -import java.net.UnixDomainSocketAddress; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.List; public class FluencyExtBuilderForFluentd extends FluencyBuilderForFluentd @@ -35,7 +31,7 @@ public class FluencyExtBuilderForFluentd public Fluency build(Path socketPath) { return buildFromIngester( - recordFormatter, + prepareRecordFormatter(), buildIngester(createBaseSender(socketPath, false))); } diff --git a/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/FluencyBuilderForFluentd.java b/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/FluencyBuilderForFluentd.java index e61ac018..d7156ead 100644 --- a/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/FluencyBuilderForFluentd.java +++ b/fluency-fluentd/src/main/java/org/komamitsu/fluency/fluentd/FluencyBuilderForFluentd.java @@ -44,15 +44,7 @@ public class FluencyBuilderForFluentd private SSLSocketFactory sslSocketFactory; protected Integer connectionTimeoutMilli; protected Integer readTimeoutMilli; - protected RecordFormatter recordFormatter; - - public FluencyBuilderForFluentd() { - this(new FluentdRecordFormatter()); - } - - public FluencyBuilderForFluentd(RecordFormatter recordFormatter) { - this.recordFormatter = recordFormatter; - } + private RecordFormatter recordFormatter; public Integer getSenderMaxRetryCount() { @@ -139,24 +131,33 @@ public void setRecordFormatter(RecordFormatter recordFormatter) this.recordFormatter = recordFormatter; } + protected RecordFormatter prepareRecordFormatter() { + if (recordFormatter != null) { + return recordFormatter; + } + else { + return new FluentdRecordFormatter(); + } + } + public Fluency build(String host, int port) { return buildFromIngester( - recordFormatter, + prepareRecordFormatter(), buildIngester(createBaseSender(host, port))); } public Fluency build(int port) { return buildFromIngester( - recordFormatter, + prepareRecordFormatter(), buildIngester(createBaseSender(null, port))); } public Fluency build() { return buildFromIngester( - recordFormatter, + prepareRecordFormatter(), buildIngester(createBaseSender(null, null))); } @@ -167,7 +168,7 @@ public Fluency build(List servers) senders.add(createBaseSender(server.getHostName(), server.getPort(), true)); } return buildFromIngester( - recordFormatter, + prepareRecordFormatter(), buildIngester(new MultiSender(senders))); }