Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make it possible to run fluency without runtime dependency on jackson #904

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,20 @@

import org.komamitsu.fluency.Fluency;
import org.komamitsu.fluency.fluentd.ingester.sender.FluentdSender;
import org.komamitsu.fluency.fluentd.ingester.sender.MultiSender;
import org.komamitsu.fluency.fluentd.ingester.sender.UnixSocketSender;
import org.komamitsu.fluency.fluentd.ingester.sender.failuredetect.FailureDetector;
import org.komamitsu.fluency.fluentd.ingester.sender.failuredetect.PhiAccrualFailureDetectStrategy;
import org.komamitsu.fluency.fluentd.ingester.sender.heartbeat.UnixSocketHeartbeater;

import java.net.UnixDomainSocketAddress;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;

public class FluencyExtBuilderForFluentd
extends FluencyBuilderForFluentd
{
public Fluency build(Path socketPath)
{
return buildFromIngester(
recordFormatter,
prepareRecordFormatter(),
buildIngester(createBaseSender(socketPath, false)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,18 @@

import org.komamitsu.fluency.Fluency;
import org.komamitsu.fluency.fluentd.ingester.FluentdIngester;
import org.komamitsu.fluency.fluentd.ingester.sender.FluentdSender;
import org.komamitsu.fluency.fluentd.ingester.sender.MultiSender;
import org.komamitsu.fluency.fluentd.ingester.sender.RetryableSender;
import org.komamitsu.fluency.fluentd.ingester.sender.SSLSender;
import org.komamitsu.fluency.fluentd.ingester.sender.TCPSender;
import org.komamitsu.fluency.fluentd.ingester.sender.*;
import org.komamitsu.fluency.fluentd.ingester.sender.failuredetect.FailureDetector;
import org.komamitsu.fluency.fluentd.ingester.sender.failuredetect.PhiAccrualFailureDetectStrategy;
import org.komamitsu.fluency.fluentd.ingester.sender.heartbeat.SSLHeartbeater;
import org.komamitsu.fluency.fluentd.ingester.sender.heartbeat.TCPHeartbeater;
import org.komamitsu.fluency.fluentd.ingester.sender.retry.ExponentialBackOffRetryStrategy;
import org.komamitsu.fluency.fluentd.recordformat.FluentdRecordFormatter;
import org.komamitsu.fluency.ingester.Ingester;
import org.komamitsu.fluency.recordformat.RecordFormatter;

import java.net.InetSocketAddress;
import javax.net.ssl.SSLSocketFactory;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -47,7 +44,7 @@ public class FluencyBuilderForFluentd
private SSLSocketFactory sslSocketFactory;
protected Integer connectionTimeoutMilli;
protected Integer readTimeoutMilli;
protected FluentdRecordFormatter recordFormatter = new FluentdRecordFormatter();
private RecordFormatter recordFormatter;

public Integer getSenderMaxRetryCount()
{
Expand Down Expand Up @@ -124,34 +121,43 @@ public void setReadTimeoutMilli(Integer readTimeoutMilli)
this.readTimeoutMilli = readTimeoutMilli;
}

public FluentdRecordFormatter getRecordFormatter()
public RecordFormatter getRecordFormatter()
{
return recordFormatter;
}

public void setRecordFormatter(FluentdRecordFormatter recordFormatter)
public void setRecordFormatter(RecordFormatter recordFormatter)
{
this.recordFormatter = recordFormatter;
}

protected RecordFormatter prepareRecordFormatter() {
if (recordFormatter != null) {
return recordFormatter;
}
else {
return new FluentdRecordFormatter();
}
}

public Fluency build(String host, int port)
{
return buildFromIngester(
recordFormatter,
prepareRecordFormatter(),
buildIngester(createBaseSender(host, port)));
}

public Fluency build(int port)
{
return buildFromIngester(
recordFormatter,
prepareRecordFormatter(),
buildIngester(createBaseSender(null, port)));
}

public Fluency build()
{
return buildFromIngester(
recordFormatter,
prepareRecordFormatter(),
buildIngester(createBaseSender(null, null)));
}

Expand All @@ -162,7 +168,7 @@ public Fluency build(List<InetSocketAddress> servers)
senders.add(createBaseSender(server.getHostName(), server.getPort(), true));
}
return buildFromIngester(
recordFormatter,
prepareRecordFormatter(),
buildIngester(new MultiSender(senders)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@

package org.komamitsu.fluency.fluentd.ingester;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.komamitsu.fluency.fluentd.ingester.sender.FluentdSender;
import org.komamitsu.fluency.fluentd.ingester.sender.RequestOption;
import org.komamitsu.fluency.ingester.Ingester;
import org.komamitsu.fluency.ingester.sender.Sender;
import org.msgpack.core.MessageBufferPacker;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessagePacker;
import org.msgpack.jackson.dataformat.MessagePackFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand All @@ -37,7 +35,6 @@ public class FluentdIngester
{
private final Config config;
private final FluentdSender sender;
private final ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory());

public FluentdIngester(FluentdSender sender)
{
Expand Down Expand Up @@ -72,15 +69,15 @@ public void ingest(String tag, ByteBuffer dataBuffer)
// discussion on issue #181.
String token = UUID.randomUUID().toString();

ByteBuffer optionBuffer = ByteBuffer.wrap(objectMapper.writeValueAsBytes(new RequestOption(dataLength, token)));
ByteBuffer optionBuffer = packRequestOption(dataLength, token);
List<ByteBuffer> buffers = Arrays.asList(headerBuffer, dataBuffer, optionBuffer);

synchronized (sender) {
sender.sendWithAck(buffers, token);
}
}
else {
ByteBuffer optionBuffer = ByteBuffer.wrap(objectMapper.writeValueAsBytes(new RequestOption(dataLength, null)));
ByteBuffer optionBuffer = packRequestOption(dataLength, null);
pkolaric marked this conversation as resolved.
Show resolved Hide resolved
List<ByteBuffer> buffers = Arrays.asList(headerBuffer, dataBuffer, optionBuffer);

synchronized (sender) {
Expand All @@ -89,6 +86,17 @@ public void ingest(String tag, ByteBuffer dataBuffer)
}
}

private static ByteBuffer packRequestOption(int dataLength, String token) throws IOException {
try (MessageBufferPacker packer = MessagePack.newDefaultBufferPacker()) {
packer.packMapHeader(token == null ? 1 : 2);
packer.packString("size").packInt(dataLength);
if (token != null) {
packer.packString("chunk").packString(token);
}
return ByteBuffer.wrap(packer.toByteArray());
}
}

@Override
public Sender getSender()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,19 @@

package org.komamitsu.fluency.fluentd.ingester.sender;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.komamitsu.fluency.fluentd.ingester.Response;
import org.komamitsu.fluency.fluentd.ingester.sender.failuredetect.FailureDetector;
import org.komamitsu.fluency.util.ExecutorServiceUtils;
import org.komamitsu.fluency.validation.Validatable;
import org.komamitsu.fluency.validation.annotation.Min;
import org.msgpack.jackson.dataformat.MessagePackFactory;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessageUnpacker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.*;

public abstract class NetworkSender<T>
extends FluentdSender
Expand All @@ -45,7 +39,7 @@ public abstract class NetworkSender<T>

private final Config config;
private final FailureDetector failureDetector;
private final ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory());


NetworkSender(FailureDetector failureDetector)
{
Expand Down Expand Up @@ -124,10 +118,21 @@ protected synchronized void sendInternal(List<ByteBuffer> buffers, String ackTok
throw new SocketTimeoutException("Socket read timeout");
}

Response response = objectMapper.readValue(optionBuffer, Response.class);
if (!ackToken.equals(response.getAck())) {
throw new UnmatchedAckException("Ack tokens don't matched: expected=" + ", got=" + response.getAck());
try (MessageUnpacker responseUnpacker = MessagePack.newDefaultUnpacker(optionBuffer)) {
for (int i = 0; i < responseUnpacker.unpackMapHeader(); i++) {
if (!"ack".equalsIgnoreCase(responseUnpacker.unpackString())) {
responseUnpacker.skipValue();
} else {
String responseAckToken = responseUnpacker.unpackString();
if (ackToken.equals(responseAckToken)) {
return;
} else {
throw new UnmatchedAckException("Ack tokens don't matched: expected=" + ", got=" + responseAckToken);
}
}
}
}
throw new IOException("Missing `ack` attribute in the response!");
}
catch (IOException e) {
LOG.error("Failed to send {} bytes data", totalDataSize);
Expand Down

This file was deleted.