From aa9b9476af6156087a9e437c751f4ed9c1cd9864 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Tue, 11 Jun 2024 15:16:45 -0400 Subject: [PATCH 1/8] Adjust JVM heap size for extremely large memory machine --- sdks/java/container/boot.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go index ceda3d2be666..38c8acda7fc6 100644 --- a/sdks/java/container/boot.go +++ b/sdks/java/container/boot.go @@ -248,18 +248,18 @@ func main() { } // heapSizeLimit returns 80% of the runner limit, if provided. If not provided, -// it returns 70% of the physical memory on the machine. If it cannot determine -// that value, it returns 1GB. This is an imperfect heuristic. It aims to -// ensure there is memory for non-heap use and other overhead, while also not -// underutilizing the machine. if set_recommended_max_xmx experiment is enabled, -// sets xmx to 32G. Under 32G JVM enables CompressedOops. CompressedOops -// utilizes memory more efficiently, and has positive impact on GC performance -// and cache hit rate. +// it returns max(70% M, M - 32GB) where M the physical memory on the machine. +// If it cannot determine that value, it returns 1GB. This is an imperfect +// heuristic. It aims to ensure there is memory for non-heap use and other +// overhead, while also not underutilizing the machine. +// if set_recommended_max_xmx experiment is enabled, sets xmx to 32G. Under 32G +// JVM enables CompressedOops. CompressedOops utilizes memory more efficiently, +// and has positive impact on GC performance and cache hit rate. func heapSizeLimit(info *fnpb.ProvisionInfo, setRecommendedMaxXmx bool) uint64 { if setRecommendedMaxXmx { return 32 << 30 } else if size, err := syscallx.PhysicalMemorySize(); err == nil { - return (size * 70) / 100 + return max(size - (32 << 30), (size * 70) / 100) } return 1 << 30 } From a7bba05ad6b60f2809298ff640d391f024342d3b Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Tue, 11 Jun 2024 15:26:22 -0400 Subject: [PATCH 2/8] Update go lang level to 1.21 --- sdks/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/go.mod b/sdks/go.mod index 11a17c3faf1b..848f2031dac5 100644 --- a/sdks/go.mod +++ b/sdks/go.mod @@ -20,7 +20,7 @@ // directory. module github.com/apache/beam/sdks/v2 -go 1.20 +go 1.21 require ( cloud.google.com/go/bigquery v1.60.0 From 67b9f081b4862143f13b28d7b2f26f02df784a26 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Tue, 11 Jun 2024 16:59:54 -0400 Subject: [PATCH 3/8] avoid bump go version and use if for max --- sdks/go.mod | 2 +- sdks/java/container/boot.go | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/sdks/go.mod b/sdks/go.mod index 848f2031dac5..11a17c3faf1b 100644 --- a/sdks/go.mod +++ b/sdks/go.mod @@ -20,7 +20,7 @@ // directory. module github.com/apache/beam/sdks/v2 -go 1.21 +go 1.20 require ( cloud.google.com/go/bigquery v1.60.0 diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go index 38c8acda7fc6..6b8732db4a0b 100644 --- a/sdks/java/container/boot.go +++ b/sdks/java/container/boot.go @@ -259,7 +259,13 @@ func heapSizeLimit(info *fnpb.ProvisionInfo, setRecommendedMaxXmx bool) uint64 { if setRecommendedMaxXmx { return 32 << 30 } else if size, err := syscallx.PhysicalMemorySize(); err == nil { - return max(size - (32 << 30), (size * 70) / 100) + var lim uint64 = (size * 70) / 100 + var limLarge uint64 = size - (32 << 30) + if lim > limLarge { + return lim + } else { + return limLarge + } } return 1 << 30 } From a28b35518a41a88814bf44b433b10fe59a2784bb Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Tue, 11 Jun 2024 17:51:41 -0400 Subject: [PATCH 4/8] style fix --- sdks/java/container/boot.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go index 6b8732db4a0b..0bb61f07b6a8 100644 --- a/sdks/java/container/boot.go +++ b/sdks/java/container/boot.go @@ -263,9 +263,8 @@ func heapSizeLimit(info *fnpb.ProvisionInfo, setRecommendedMaxXmx bool) uint64 { var limLarge uint64 = size - (32 << 30) if lim > limLarge { return lim - } else { - return limLarge } + return limLarge } return 1 << 30 } From 81651363470394536b5fadd8b82827fa906346a7 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Tue, 11 Jun 2024 21:01:05 -0400 Subject: [PATCH 5/8] Fix integer overflow --- sdks/java/container/boot.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go index 0bb61f07b6a8..d1534ad52002 100644 --- a/sdks/java/container/boot.go +++ b/sdks/java/container/boot.go @@ -260,11 +260,10 @@ func heapSizeLimit(info *fnpb.ProvisionInfo, setRecommendedMaxXmx bool) uint64 { return 32 << 30 } else if size, err := syscallx.PhysicalMemorySize(); err == nil { var lim uint64 = (size * 70) / 100 - var limLarge uint64 = size - (32 << 30) - if lim > limLarge { + if size-lim < 32<<30 { return lim } - return limLarge + return size - (32 << 30) } return 1 << 30 } From 62188ef59cddc03488221f1563f8bb4d8003c381 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Fri, 14 Jun 2024 18:11:22 -0400 Subject: [PATCH 6/8] Add test --- sdks/java/container/boot.go | 37 +++++++++++++++++++------------- sdks/java/container/boot_test.go | 12 +++++++++++ 2 files changed, 34 insertions(+), 15 deletions(-) diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go index d1534ad52002..742d18be8315 100644 --- a/sdks/java/container/boot.go +++ b/sdks/java/container/boot.go @@ -31,7 +31,6 @@ import ( "github.com/apache/beam/sdks/v2/go/container/tools" "github.com/apache/beam/sdks/v2/go/pkg/beam/artifact" - fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" "github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx" "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx" @@ -159,9 +158,19 @@ func main() { cp = append(cp, filepath.Join(dir, filepath.FromSlash(name))) } - var setRecommendedMaxXmx = strings.Contains(options, "set_recommended_max_xmx") + var lim uint64 + if strings.Contains(options, "set_recommended_max_xmx") { + lim = 32 << 30 + } else { + size, err := syscallx.PhysicalMemorySize() + if err != nil { + size = 0 + } + lim = HeapSizeLimit(size) + } + args := []string{ - "-Xmx" + strconv.FormatUint(heapSizeLimit(info, setRecommendedMaxXmx), 10), + "-Xmx" + strconv.FormatUint(lim, 10), // ParallelGC the most adequate for high throughput and lower CPU utilization // It is the default GC in Java 8, but not on newer versions "-XX:+UseParallelGC", @@ -248,24 +257,22 @@ func main() { } // heapSizeLimit returns 80% of the runner limit, if provided. If not provided, -// it returns max(70% M, M - 32GB) where M the physical memory on the machine. -// If it cannot determine that value, it returns 1GB. This is an imperfect +// it returns max(70% size, size - 32GB). Set size=0 if the physical memory on +// the machine was undetermined, then it returns 1GB. This is an imperfect // heuristic. It aims to ensure there is memory for non-heap use and other // overhead, while also not underutilizing the machine. // if set_recommended_max_xmx experiment is enabled, sets xmx to 32G. Under 32G // JVM enables CompressedOops. CompressedOops utilizes memory more efficiently, // and has positive impact on GC performance and cache hit rate. -func heapSizeLimit(info *fnpb.ProvisionInfo, setRecommendedMaxXmx bool) uint64 { - if setRecommendedMaxXmx { - return 32 << 30 - } else if size, err := syscallx.PhysicalMemorySize(); err == nil { - var lim uint64 = (size * 70) / 100 - if size-lim < 32<<30 { - return lim - } - return size - (32 << 30) +func HeapSizeLimit(size uint64) uint64 { + if size == 0 { + return 1 << 30 + } + lim := (size * 70) / 100 + if size-lim < 32<<30 { + return lim } - return 1 << 30 + return size - (32 << 30) } // Options represents java VM invocation options in a simple, diff --git a/sdks/java/container/boot_test.go b/sdks/java/container/boot_test.go index 0228c53521bc..0529278daed5 100644 --- a/sdks/java/container/boot_test.go +++ b/sdks/java/container/boot_test.go @@ -78,3 +78,15 @@ func TestBuildOptions(t *testing.T) { t.Errorf("BuildOptions(%v).JavaProperties = %v, want %v", metaOptions, javaOptions.Properties, wantProperties) } } + +func TestHeapSizeLimit(t *testing.T) { + if lim := HeapSizeLimit(0); lim != 1 << 30 { + t.Errorf("HeapSizeLimit(0). Actual (%d). want 1 GB", lim) + } + if lim := HeapSizeLimit(2 << 30); lim != (2 << 30) * 7 / 10 { + t.Errorf("HeapSizeLimit(0). Actual (%d). want 700 MB", lim) + } + if lim := HeapSizeLimit(200 << 30); lim != (200 - 32) << 30 { + t.Errorf("HeapSizeLimit(0). Actual (%d). want 168 GB", lim) + } +} From 43d44ece696d42b0c14b04faf47d72c86058c81b Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Tue, 18 Jun 2024 13:11:56 -0400 Subject: [PATCH 7/8] Fix test error message --- sdks/java/container/boot_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/container/boot_test.go b/sdks/java/container/boot_test.go index 0529278daed5..61d67e93ecbb 100644 --- a/sdks/java/container/boot_test.go +++ b/sdks/java/container/boot_test.go @@ -84,9 +84,9 @@ func TestHeapSizeLimit(t *testing.T) { t.Errorf("HeapSizeLimit(0). Actual (%d). want 1 GB", lim) } if lim := HeapSizeLimit(2 << 30); lim != (2 << 30) * 7 / 10 { - t.Errorf("HeapSizeLimit(0). Actual (%d). want 700 MB", lim) + t.Errorf("HeapSizeLimit(2 GB). Actual (%d). want 1.4 GB", lim) } if lim := HeapSizeLimit(200 << 30); lim != (200 - 32) << 30 { - t.Errorf("HeapSizeLimit(0). Actual (%d). want 168 GB", lim) + t.Errorf("HeapSizeLimit(200 GB). Actual (%d). want 168 GB", lim) } } From 88a565f9f32ee89a6b755a76d70cb67b520a7776 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Tue, 18 Jun 2024 13:18:01 -0400 Subject: [PATCH 8/8] Fix format whitespace --- sdks/java/container/boot.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go index 742d18be8315..14e2e4311b45 100644 --- a/sdks/java/container/boot.go +++ b/sdks/java/container/boot.go @@ -160,14 +160,14 @@ func main() { var lim uint64 if strings.Contains(options, "set_recommended_max_xmx") { - lim = 32 << 30 - } else { - size, err := syscallx.PhysicalMemorySize() - if err != nil { - size = 0 - } - lim = HeapSizeLimit(size) - } + lim = 32 << 30 + } else { + size, err := syscallx.PhysicalMemorySize() + if err != nil { + size = 0 + } + lim = HeapSizeLimit(size) + } args := []string{ "-Xmx" + strconv.FormatUint(lim, 10),