Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ensure jackson overrides are available to static initializers #16719

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

yaauie
Copy link
Member

@yaauie yaauie commented Nov 22, 2024

Release notes

Fixes an issue where Logstash could fail to read an event containing a very large string from the PQ.

What does this PR do?

Moves the application of jackson defaults overrides into pure java, and applies them statically before the org.logstash.ObjectMappers has a chance to start initializing object mappers that rely on the defaults.

We replace the runner's invocation (which was too late to be fully applied) with a verification that the configured defaults have been applied.

Why is it important/What is the impact to the user?

Ensures that the configured constraints are applied to all object mappers, including the CBOR_MAPPER that is used to decode events from the PQ.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • [ ] I have made corresponding changes to the documentation
  • [ ] I have made corresponding change to the default configuration files (and/or docker env variables)
  • I have added tests that prove my fix is effective or that my feature works

How to test this PR locally

  1. enable the PQ
    echo 'queue.type: persisted' > config/logstash.yml`
    
  2. Configure Logstash's Jackson to handle 30MB strings
    sed -i '/logstash.jackson.stream-read-constraints.max-string-length/d' config/jvm.properties
    echo '-Dlogstash.jackson.stream-read-constraints.max-string-length=30000000' >> config/jvm.properties
    
  3. generate a 20MiB+ newline-terminated string
    (dd if=<(base64 < /dev/urandom) bs=1K count="$(expr 20 '*' 1024)"; echo) > big.txt
    
  4. run logstash in a way where the input plugin will generate an event containing the aforementioned 20MB+ string:
    bin/logstash -e 'input { stdin { codec => plain } } output { stdout { codec => dots } }' < big.txt 
    

Related issues

Moves the application of jackson defaults overrides into pure java, and
applies them statically _before_ the `org.logstash.ObjectMappers` has a chance
to start initializing object mappers that rely on the defaults.

We replace the runner's invocation (which was too late to be fully applied) with
a _verification_ that the configured defaults have been applied.
@yaauie yaauie requested a review from donoghuc November 22, 2024 19:36
@donoghuc donoghuc self-assigned this Nov 22, 2024

private void assertLogObserved(final Level level, final String... messageFragments) {
assertThat(loggingSpyResource.getLogEvents())
.withFailMessage("Expected an {}-level log event containing {}", level, messageFragments)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The failed test output in CI

org.logstash.jackson.StreamReadConstraintsUtilTest > validatesApplication FAILED
--
  | java.lang.AssertionError: Expected an {}-level log event containing {}
  | at org.logstash.jackson.StreamReadConstraintsUtilTest.assertLogObserved(StreamReadConstraintsUtilTest.java:193)
  | at org.logstash.jackson.StreamReadConstraintsUtilTest.validatesApplication(StreamReadConstraintsUtilTest.java:181)

suggests interpolation is not quite setup right. The AssertJ docs have an example with %s style which seems to fix these up locally.

diff --git a/logstash-core/src/test/java/org/logstash/jackson/StreamReadConstraintsUtilTest.java b/logstash-core/src/test/java/org/logstash/jackson/StreamReadConstraintsUtilTest.java
index 076521a16..9ace4616a 100644
--- a/logstash-core/src/test/java/org/logstash/jackson/StreamReadConstraintsUtilTest.java
+++ b/logstash-core/src/test/java/org/logstash/jackson/StreamReadConstraintsUtilTest.java
@@ -6,6 +6,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.logstash.log.LoggingSpyResource;

+import java.util.Arrays;
 import java.util.Properties;
 import java.util.function.BiConsumer;
 import org.apache.logging.log4j.Level;
@@ -188,7 +189,7 @@ public class StreamReadConstraintsUtilTest {

     private void assertLogObserved(final Level level, final String... messageFragments) {
         assertThat(loggingSpyResource.getLogEvents())
-                .withFailMessage("Expected an {}-level log event containing {}", level, messageFragments)
+                .withFailMessage("Expected an %s-level log event containing %s", level, Arrays.toString(messageFragments))
                 .filteredOn(logEvent -> logEvent.getLevel().equals(level))
                 .anySatisfy(logEvent -> assertThat(logEvent.getMessage().getFormattedMessage()).contains(messageFragments));
     }

One thing that i find curious is that when I run the tests in this class in isolation (./gradlew :logstash-core:javaTests --tests org.logstash.jackson.StreamReadConstraintsUtilTest) The error is not reproduced. It only appears when running the whole suite. Its not immediately clear to me as to why.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I've found so far is that there are many logging configurations that get tested, and if these are run after some tests that uses an XML log config with root wrapper defined as <Configuration status="OFF"...>, the logs are discarded before being routed to appenders.

assertLogObserved(Level.INFO, "override `" + MAX_NUMBER_LENGTH.getPropertyName() + "` configured to `1110`");

assertLogObserved(Level.WARN, "override `" + StreamReadConstraintsUtil.PROP_PREFIX + "unsupported-option1` is unknown and has been ignored");
assertLogObserved(Level.WARN, "override `" + StreamReadConstraintsUtil.PROP_PREFIX + "unsupported-option1` is unknown and has been ignored");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mabye copy paste error? Should this be looking for option2?

Suggested change
assertLogObserved(Level.WARN, "override `" + StreamReadConstraintsUtil.PROP_PREFIX + "unsupported-option1` is unknown and has been ignored");
assertLogObserved(Level.WARN, "override `" + StreamReadConstraintsUtil.PROP_PREFIX + "unsupported-option2` is unknown and has been ignored");

@donoghuc
Copy link
Member

I'm having a hard time reproducing the original error. Here is what I've tried:

  1. I checked out head of logstash#main (aff8d1cce)
  2. Build ./gradlew clean bootstrap assemble installDefaultGems
  3. Configure: my config/logstash.yml has a single kv pair queue.type: persisted
  4. Generate test large input data
logstash git:(aff8d1cce) ✗ (dd if=<(base64 < /dev/urandom) bs=1K count="$(expr 25 '*' 1024)"; echo) > big.txt
25600+0 records in
25600+0 records out
26214400 bytes transferred in 0.057778 secs (453709024 bytes/sec)logstash git:(aff8d1cce) ✗ ls -lh big.txt
-rw-r--r--@ 1 cas  staff    25M Nov 22 13:56 big.txt
  1. Run pipeline
logstash git:(aff8d1cce) ✗ bin/logstash -e 'input { stdin { codec => plain } } output { stdout { codec => dots } }' < big.txt
Using system java: /Users/cas/.jenv/shims/java
Sending Logstash logs to /Users/cas/elastic-repos/logstash/logs which is now configured via log4j2.properties
[2024-11-22T13:57:07,543][INFO ][logstash.runner          ] Log4j configuration path used is: /Users/cas/elastic-repos/logstash/config/log4j2.properties
[2024-11-22T13:57:07,546][WARN ][logstash.runner          ] The use of JAVA_HOME has been deprecated. Logstash 8.0 and later ignores JAVA_HOME and uses the bundled JDK. Running Logstash with the bundled JDK is recommended. The bundled JDK has been verified to work with each specific version of Logstash, and generally provides best performance and reliability. If you have compelling reasons for using your own JDK (organizational-specific compliance requirements, for example), you can configure LS_JAVA_HOME to use that version instead.
[2024-11-22T13:57:07,546][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"9.0.0", "jruby.version"=>"jruby 9.4.9.0 (3.1.4) 2024-11-04 547c6b150e OpenJDK 64-Bit Server VM 21.0.5 on 21.0.5 +indy +jit [arm64-darwin]"}
[2024-11-22T13:57:07,547][INFO ][logstash.runner          ] JVM bootstrap flags: [-Xms1g, -Xmx1g, -Djava.awt.headless=true, -Dfile.encoding=UTF-8, -Djruby.compile.invokedynamic=true, -XX:+HeapDumpOnOutOfMemoryError, -Djava.security.egd=file:/dev/urandom, -Dlog4j2.isThreadContextMapInheritable=true, -Dlogstash.jackson.stream-read-constraints.max-string-length=200000000, -Dlogstash.jackson.stream-read-constraints.max-number-length=10000, -Djruby.regexp.interruptible=true, -Djdk.io.File.enableADS=true, --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED, --add-opens=java.base/java.security=ALL-UNNAMED, --add-opens=java.base/java.io=ALL-UNNAMED, --add-opens=java.base/java.nio.channels=ALL-UNNAMED, --add-opens=java.base/sun.nio.ch=ALL-UNNAMED, --add-opens=java.management/sun.management=ALL-UNNAMED, -Dio.netty.allocator.maxOrder=11]
[2024-11-22T13:57:07,548][INFO ][logstash.runner          ] Jackson default value override `logstash.jackson.stream-read-constraints.max-string-length` configured to `200000000`
[2024-11-22T13:57:07,548][INFO ][logstash.runner          ] Jackson default value override `logstash.jackson.stream-read-constraints.max-number-length` configured to `10000`
[2024-11-22T13:57:07,561][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2024-11-22T13:57:07,724][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false}
[2024-11-22T13:57:07,778][INFO ][org.reflections.Reflections] Reflections took 39 ms to scan 1 urls, producing 149 keys and 522 values
[2024-11-22T13:57:07,858][INFO ][logstash.javapipeline    ] Pipeline `main` is configured with `pipeline.ecs_compatibility: v8` setting. All plugins in this pipeline will default to `ecs_compatibility => v8` unless explicitly configured otherwise.
[2024-11-22T13:57:07,868][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>12, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1500, "pipeline.sources"=>["config string"], :thread=>"#<Thread:0x4792dd2 /Users/cas/elastic-repos/logstash/logstash-core/lib/logstash/java_pipeline.rb:138 run>"}
[2024-11-22T13:57:08,062][INFO ][logstash.javapipeline    ][main] Pipeline Java execution initialization time {"seconds"=>0.19}
[2024-11-22T13:57:08,071][INFO ][logstash.inputs.stdin    ][main] Automatically switching from plain to line codec {:plugin=>"stdin"}
[2024-11-22T13:57:08,074][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
[2024-11-22T13:57:08,091][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
.[2024-11-22T13:57:09,917][INFO ][logstash.javapipeline    ][main] Pipeline terminated {"pipeline.id"=>"main"}
[2024-11-22T13:57:10,117][INFO ][logstash.pipelinesregistry] Removed pipeline from registry successfully {:pipeline_id=>:main}
[2024-11-22T13:57:10,125][INFO ][logstash.runner          ] Logstash shut down.

It seems like it handles the input without error. I did a run with debug enabled and it seem to be using a queue but maybe the "default: memory" is not correct? (I would include all the debug run but it prints the huge file 😅 ).

[2024-11-22T14:04:31,964][DEBUG][logstash.runner          ] *queue.type: persisted (default: memory)
[2024-11-22T14:04:31,964][DEBUG][logstash.runner          ] queue.drain: false
[2024-11-22T14:04:31,964][DEBUG][logstash.runner          ] queue.page_capacity: 67108864
[2024-11-22T14:04:31,964][DEBUG][logstash.runner          ] queue.max_bytes: 1073741824
[2024-11-22T14:04:31,964][DEBUG][logstash.runner          ] queue.max_events: 0
[2024-11-22T14:04:31,964][DEBUG][logstash.runner          ] queue.checkpoint.acks: 1024
[2024-11-22T14:04:31,964][DEBUG][logstash.runner          ] queue.checkpoint.writes: 1024
[2024-11-22T14:04:31,964][DEBUG][logstash.runner          ] queue.checkpoint.interval: 1000
[2024-11-22T14:04:31,964][DEBUG][logstash.runner          ] queue.checkpoint.retry: true

I'm sure i'm missing something silly, i'll keep looking but if you can spot it let me know!

@elasticmachine
Copy link
Collaborator

elasticmachine commented Nov 22, 2024

@yaauie
Copy link
Member Author

yaauie commented Nov 25, 2024

Logging isn't deterministic in tests; depending on which tests are run first, the root logger can be left at the OFF level. I'll need to chase this down.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

PQ: Event containing 20+MB String cannot be deserialized for processing
4 participants