From 89aeb187f2c9350fa51fc2b2f690c93a57e523b9 Mon Sep 17 00:00:00 2001 From: dmitryor <34167644+dmitryor@users.noreply.github.com> Date: Wed, 10 Apr 2024 01:28:23 -0700 Subject: [PATCH] Correct per-entry HashMap overhead in WindmillStateCache (#30672) --- .../windmill/state/WindmillStateCache.java | 3 ++- .../windmill/state/WindmillStateCacheTest.java | 12 ++++++------ .../state/WindmillStateInternalsTest.java | 18 +++++++++--------- 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java index 85c74fe8591d..c6c49134bcb5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java @@ -64,7 +64,8 @@ public class WindmillStateCache implements StatusDataProvider { // Initial size of hash tables per entry. private static final int INITIAL_HASH_MAP_CAPACITY = 4; // Overhead of each hash map entry. - private static final int HASH_MAP_ENTRY_OVERHEAD = 16; + // https://appsintheopen.com/posts/52-the-memory-overhead-of-java-ojects + private static final int HASH_MAP_ENTRY_OVERHEAD = 32; // Overhead of each StateCacheEntry. One long, plus a hash table. private static final int PER_CACHE_ENTRY_OVERHEAD = 8 + HASH_MAP_ENTRY_OVERHEAD * INITIAL_HASH_MAP_CAPACITY; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java index 1f4355b156be..446a34f73dec 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java @@ -168,15 +168,15 @@ public void testBasic() throws Exception { assertEquals(0, cache.getWeight()); keyCache.persist(); - assertEquals(254, cache.getWeight()); + assertEquals(414, cache.getWeight()); keyCache.put(triggerNamespace(0, 0), new TestStateTag("tag3"), new TestState("t3"), 2); keyCache.put(triggerNamespace(0, 0), new TestStateTag("tag2"), new TestState("t2"), 2); // Observes updated weight in entries, though cache will not know about it. - assertEquals(290, cache.getWeight()); + assertEquals(482, cache.getWeight()); keyCache.persist(); - assertEquals(290, cache.getWeight()); + assertEquals(482, cache.getWeight()); keyCache = cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 0L, 2L).forFamily(STATE_FAMILY); @@ -212,7 +212,7 @@ public void testInvalidation() throws Exception { keyCache = cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 0L, 2L).forFamily(STATE_FAMILY); - assertEquals(127, cache.getWeight()); + assertEquals(207, cache.getWeight()); assertEquals( Optional.of(new TestState("g1")), keyCache.get(StateNamespaces.global(), new TestStateTag("tag1"))); @@ -221,7 +221,7 @@ public void testInvalidation() throws Exception { cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 1L, 3L).forFamily(STATE_FAMILY); assertEquals( Optional.empty(), keyCache.get(StateNamespaces.global(), new TestStateTag("tag1"))); - assertEquals(127, cache.getWeight()); + assertEquals(207, cache.getWeight()); } /** Verifies that the cache is invalidated when the cache token changes. */ @@ -254,7 +254,7 @@ public void testStaleWorkItem() throws Exception { assertEquals(Optional.of(new TestState("w2")), keyCache.get(windowNamespace(0), tag)); assertEquals(0, cache.getWeight()); keyCache.persist(); - assertEquals(127, cache.getWeight()); + assertEquals(207, cache.getWeight()); assertEquals(Optional.of(new TestState("w2")), keyCache.get(windowNamespace(0), tag)); // Previous work token. diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java index d53b1d8c3e89..a53240d64530 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java @@ -3043,7 +3043,7 @@ public void testCachedValue() throws Exception { value.write("Hi"); underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); - assertEquals(141, cache.getWeight()); + assertEquals(221, cache.getWeight()); resetUnderTest(); value = underTest.state(NAMESPACE, addr); @@ -3051,7 +3051,7 @@ public void testCachedValue() throws Exception { value.clear(); underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); - assertEquals(139, cache.getWeight()); + assertEquals(219, cache.getWeight()); resetUnderTest(); value = underTest.state(NAMESPACE, addr); @@ -3083,7 +3083,7 @@ public void testCachedBag() throws Exception { underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); - assertEquals(147, cache.getWeight()); + assertEquals(227, cache.getWeight()); resetUnderTest(); bag = underTest.state(NAMESPACE, addr); @@ -3103,7 +3103,7 @@ public void testCachedBag() throws Exception { underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); - assertEquals(140, cache.getWeight()); + assertEquals(220, cache.getWeight()); resetUnderTest(); bag = underTest.state(NAMESPACE, addr); @@ -3114,7 +3114,7 @@ public void testCachedBag() throws Exception { underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); - assertEquals(141, cache.getWeight()); + assertEquals(221, cache.getWeight()); resetUnderTest(); bag = underTest.state(NAMESPACE, addr); @@ -3145,7 +3145,7 @@ public void testCachedWatermarkHold() throws Exception { underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); - assertEquals(151, cache.getWeight()); + assertEquals(231, cache.getWeight()); resetUnderTest(); hold = underTest.state(NAMESPACE, addr); @@ -3154,7 +3154,7 @@ public void testCachedWatermarkHold() throws Exception { underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); - assertEquals(151, cache.getWeight()); + assertEquals(231, cache.getWeight()); resetUnderTest(); hold = underTest.state(NAMESPACE, addr); @@ -3185,7 +3185,7 @@ public void testCachedCombining() throws Exception { underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); - assertEquals(144, cache.getWeight()); + assertEquals(224, cache.getWeight()); resetUnderTest(); value = underTest.state(NAMESPACE, COMBINING_ADDR); @@ -3196,7 +3196,7 @@ public void testCachedCombining() throws Exception { underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); - assertEquals(143, cache.getWeight()); + assertEquals(223, cache.getWeight()); resetUnderTest(); value = underTest.state(NAMESPACE, COMBINING_ADDR);