Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make it possible to run fluency without runtime dependency on jackson #904

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,18 @@

import org.komamitsu.fluency.Fluency;
import org.komamitsu.fluency.fluentd.ingester.FluentdIngester;
import org.komamitsu.fluency.fluentd.ingester.sender.FluentdSender;
import org.komamitsu.fluency.fluentd.ingester.sender.MultiSender;
import org.komamitsu.fluency.fluentd.ingester.sender.RetryableSender;
import org.komamitsu.fluency.fluentd.ingester.sender.SSLSender;
import org.komamitsu.fluency.fluentd.ingester.sender.TCPSender;
import org.komamitsu.fluency.fluentd.ingester.sender.*;
import org.komamitsu.fluency.fluentd.ingester.sender.failuredetect.FailureDetector;
import org.komamitsu.fluency.fluentd.ingester.sender.failuredetect.PhiAccrualFailureDetectStrategy;
import org.komamitsu.fluency.fluentd.ingester.sender.heartbeat.SSLHeartbeater;
import org.komamitsu.fluency.fluentd.ingester.sender.heartbeat.TCPHeartbeater;
import org.komamitsu.fluency.fluentd.ingester.sender.retry.ExponentialBackOffRetryStrategy;
import org.komamitsu.fluency.fluentd.recordformat.FluentdRecordFormatter;
import org.komamitsu.fluency.ingester.Ingester;
import org.komamitsu.fluency.recordformat.RecordFormatter;

import java.net.InetSocketAddress;
import javax.net.ssl.SSLSocketFactory;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -47,7 +44,14 @@ public class FluencyBuilderForFluentd
private SSLSocketFactory sslSocketFactory;
protected Integer connectionTimeoutMilli;
protected Integer readTimeoutMilli;
protected FluentdRecordFormatter recordFormatter = new FluentdRecordFormatter();
protected RecordFormatter recordFormatter;

public FluencyBuilderForFluentd() {
this(new FluentdRecordFormatter());
}
public FluencyBuilderForFluentd (RecordFormatter recordFormatter) {
pkolaric marked this conversation as resolved.
Show resolved Hide resolved
pkolaric marked this conversation as resolved.
Show resolved Hide resolved
this.recordFormatter = recordFormatter;
}

public Integer getSenderMaxRetryCount()
{
Expand Down Expand Up @@ -124,12 +128,12 @@ public void setReadTimeoutMilli(Integer readTimeoutMilli)
this.readTimeoutMilli = readTimeoutMilli;
}

public FluentdRecordFormatter getRecordFormatter()
public RecordFormatter getRecordFormatter()
{
return recordFormatter;
}

public void setRecordFormatter(FluentdRecordFormatter recordFormatter)
public void setRecordFormatter(RecordFormatter recordFormatter)
{
this.recordFormatter = recordFormatter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@

package org.komamitsu.fluency.fluentd.ingester;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.komamitsu.fluency.fluentd.ingester.sender.FluentdSender;
import org.komamitsu.fluency.fluentd.ingester.sender.RequestOption;
import org.komamitsu.fluency.ingester.Ingester;
import org.komamitsu.fluency.ingester.sender.Sender;
import org.msgpack.core.MessageBufferPacker;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessagePacker;
import org.msgpack.jackson.dataformat.MessagePackFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand All @@ -37,7 +35,6 @@ public class FluentdIngester
{
private final Config config;
private final FluentdSender sender;
private final ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory());

public FluentdIngester(FluentdSender sender)
{
Expand Down Expand Up @@ -72,15 +69,15 @@ public void ingest(String tag, ByteBuffer dataBuffer)
// discussion on issue #181.
String token = UUID.randomUUID().toString();

ByteBuffer optionBuffer = ByteBuffer.wrap(objectMapper.writeValueAsBytes(new RequestOption(dataLength, token)));
ByteBuffer optionBuffer = packRequestOption(dataLength, token);
List<ByteBuffer> buffers = Arrays.asList(headerBuffer, dataBuffer, optionBuffer);

synchronized (sender) {
sender.sendWithAck(buffers, token);
}
}
else {
ByteBuffer optionBuffer = ByteBuffer.wrap(objectMapper.writeValueAsBytes(new RequestOption(dataLength, null)));
ByteBuffer optionBuffer = packRequestOption(dataLength, null);
pkolaric marked this conversation as resolved.
Show resolved Hide resolved
List<ByteBuffer> buffers = Arrays.asList(headerBuffer, dataBuffer, optionBuffer);

synchronized (sender) {
Expand All @@ -89,6 +86,17 @@ public void ingest(String tag, ByteBuffer dataBuffer)
}
}

private static ByteBuffer packRequestOption(int dataLength, String token) throws IOException {
MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
pkolaric marked this conversation as resolved.
Show resolved Hide resolved
packer.packMapHeader(token == null ? 1 : 2);
packer.packString("size").packInt(dataLength);
if (token != null) {
packer.packString("chunk").packString(token);
}
packer.close();
return ByteBuffer.wrap(packer.toByteArray());
}

@Override
public Sender getSender()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,19 @@

package org.komamitsu.fluency.fluentd.ingester.sender;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.komamitsu.fluency.fluentd.ingester.Response;
import org.komamitsu.fluency.fluentd.ingester.sender.failuredetect.FailureDetector;
import org.komamitsu.fluency.util.ExecutorServiceUtils;
import org.komamitsu.fluency.validation.Validatable;
import org.komamitsu.fluency.validation.annotation.Min;
import org.msgpack.jackson.dataformat.MessagePackFactory;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessageUnpacker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.*;

public abstract class NetworkSender<T>
extends FluentdSender
Expand All @@ -45,7 +39,7 @@ public abstract class NetworkSender<T>

private final Config config;
private final FailureDetector failureDetector;
private final ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory());


NetworkSender(FailureDetector failureDetector)
{
Expand Down Expand Up @@ -124,9 +118,13 @@ protected synchronized void sendInternal(List<ByteBuffer> buffers, String ackTok
throw new SocketTimeoutException("Socket read timeout");
}

Response response = objectMapper.readValue(optionBuffer, Response.class);
if (!ackToken.equals(response.getAck())) {
throw new UnmatchedAckException("Ack tokens don't matched: expected=" + ", got=" + response.getAck());
try (MessageUnpacker responseUnpacker = MessagePack.newDefaultUnpacker(optionBuffer)) {
responseUnpacker.unpackMapHeader();
responseUnpacker.unpackString();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack field is contained in a Map structure data, and the order and the number of fields can be changed https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#response. So, the Ack value must be obtained by specifying key ack.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed this one... Fixed.

String responseAckToken = responseUnpacker.unpackString();
if (!ackToken.equals(responseAckToken)) {
throw new UnmatchedAckException("Ack tokens don't matched: expected=" + ", got=" + responseAckToken);
}
}
}
catch (IOException e) {
Expand Down