Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve ack log messages #282

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

sokomishalov
Copy link

When working with fluent-bit and exceeding max message size on it, fluency log message is quite confusing:

com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of `org.komamitsu.fluency.fluentd.ingester.Response` (although at least one Creator exists): no int/Int-argument constructor/factory method to deserialize from Number value (0)
 at [Source: (byte[])"����������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������"; line: -1, column: 0]

@komamitsu
Copy link
Owner

Hi @sokomishalov,

Could you give me an instruction including configurations to reproduce this issue?

@sokomishalov
Copy link
Author

sokomishalov commented Sep 4, 2021

Well, it's quite simple to reproduce - you just have to exceed fluent-bit buffering limits, fluency configuration is default:

@Test
fun `Stress test`() {
    val builderForFluentd = FluencyBuilderForFluentd()
    val fluency = builderForFluentd.build(24224)
    val hugeMessage = "a".repeat(1 * 1024 * 1024)
    repeat(1000) { fluency.emit("test", mapOf("foo" to hugeMessage)) }
}

Also I've discovered that same issue was created before (#171)

@komamitsu
Copy link
Owner

I tried to reproduce this issue, but didn't.
I changed Fluent Bit's buffer size down to 4KB since no error happened with the default 32KB.

Fluency test code

        FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
        Fluency fluency = builder.build();
        HashMap<String, Object> map = new HashMap<>();
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < 1000; i++) {
            sb.append("a");
        }
        map.put("name", sb.toString());
        for (int i = 0; i < 1000; i++) {
            fluency.emit("foo.bar", map);
        }
        fluency.close();

Fluent Bit

$ fluent-bit -V
Fluent Bit v1.8.6
$ cat fluentbit.conf
[INPUT]
    Name              forward
    Listen            0.0.0.0
    Port              24224
    Buffer_Chunk_Size 4K
    Buffer_Max_Size   4K

[OUTPUT]
    Name   stdout
    Match  *

$ fluent-bit -c fluentbit.conf
    :
[2021/09/05 22:29:13] [ warn] [input:forward:forward.0] fd=25 incoming data exceed limit (4000 bytes)
[2021/09/05 22:29:14] [ warn] [input:forward:forward.0] fd=25 incoming data exceed limit (4000 bytes)
[2021/09/05 22:29:14] [ warn] [input:forward:forward.0] fd=25 incoming data exceed limit (4000 bytes)
[2021/09/05 22:29:16] [ warn] [input:forward:forward.0] fd=25 incoming data exceed limit (4000 bytes)
    :

Fluency log

    :
51593 [pool-1-thread-1] ERROR o.k.f.f.i.sender.NetworkSender - Failed to send 1015025 bytes data
51593 [pool-1-thread-1] WARN  o.k.f.f.i.sender.RetryableSender - Sender failed to send data. sender=RetryableSender{baseSender=TCPSender{config=Config{host='127.0.0.1', port=24224, connectionTimeoutMilli=5000, readTimeoutMilli=5000, waitBeforeCloseMilli=1000} Config{senderErrorHandler=null}} NetworkSender{config=Config{host='127.0.0.1', port=24224, connectionTimeoutMilli=5000, readTimeoutMilli=5000, waitBeforeCloseMilli=1000} Config{senderErrorHandler=null}, failureDetector=null} org.komamitsu.fluency.fluentd.ingester.sender.TCPSender@62ffe26a, retryStrategy=ExponentialBackOffRetryStrategy{config=Config{baseIntervalMillis=400, maxIntervalMillis=30000} Config{maxRetryCount=7}} RetryStrategy{config=Config{baseIntervalMillis=400, maxIntervalMillis=30000} Config{maxRetryCount=7}}, isClosed=false} org.komamitsu.fluency.fluentd.ingester.sender.RetryableSender@61a17cdb, retry=7
java.io.IOException: Broken pipe
    at java.base/sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
    at java.base/sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
    at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:182)
    at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:130)
    at java.base/sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:493)
    at java.base/java.nio.channels.SocketChannel.write(SocketChannel.java:507)
    at org.komamitsu.fluency.fluentd.ingester.sender.TCPSender.sendBuffers(TCPSender.java:86)
    at org.komamitsu.fluency.fluentd.ingester.sender.TCPSender.sendBuffers(TCPSender.java:31)
    at org.komamitsu.fluency.fluentd.ingester.sender.NetworkSender.sendInternal(NetworkSender.java:102)
    at org.komamitsu.fluency.fluentd.ingester.sender.FluentdSender.sendInternalWithRestoreBufferPositions(FluentdSender.java:74)
    at org.komamitsu.fluency.fluentd.ingester.sender.FluentdSender.send(FluentdSender.java:56)
    at org.komamitsu.fluency.fluentd.ingester.sender.RetryableSender.sendInternal(RetryableSender.java:77)
    at org.komamitsu.fluency.fluentd.ingester.sender.FluentdSender.sendInternalWithRestoreBufferPositions(FluentdSender.java:74)
    at org.komamitsu.fluency.fluentd.ingester.sender.FluentdSender.send(FluentdSender.java:56)
    at org.komamitsu.fluency.fluentd.ingester.FluentdIngester.ingest(FluentdIngester.java:87)
    :

@sokomishalov
Copy link
Author

Somehow I can't reproduce it too. I'll be back when either I or someone else got luck with it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants