diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go index d1534ad52002..742d18be8315 100644 --- a/sdks/java/container/boot.go +++ b/sdks/java/container/boot.go @@ -31,7 +31,6 @@ import ( "github.com/apache/beam/sdks/v2/go/container/tools" "github.com/apache/beam/sdks/v2/go/pkg/beam/artifact" - fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" "github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx" "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx" @@ -159,9 +158,19 @@ func main() { cp = append(cp, filepath.Join(dir, filepath.FromSlash(name))) } - var setRecommendedMaxXmx = strings.Contains(options, "set_recommended_max_xmx") + var lim uint64 + if strings.Contains(options, "set_recommended_max_xmx") { + lim = 32 << 30 + } else { + size, err := syscallx.PhysicalMemorySize() + if err != nil { + size = 0 + } + lim = HeapSizeLimit(size) + } + args := []string{ - "-Xmx" + strconv.FormatUint(heapSizeLimit(info, setRecommendedMaxXmx), 10), + "-Xmx" + strconv.FormatUint(lim, 10), // ParallelGC the most adequate for high throughput and lower CPU utilization // It is the default GC in Java 8, but not on newer versions "-XX:+UseParallelGC", @@ -248,24 +257,22 @@ func main() { } // heapSizeLimit returns 80% of the runner limit, if provided. If not provided, -// it returns max(70% M, M - 32GB) where M the physical memory on the machine. -// If it cannot determine that value, it returns 1GB. This is an imperfect +// it returns max(70% size, size - 32GB). Set size=0 if the physical memory on +// the machine was undetermined, then it returns 1GB. This is an imperfect // heuristic. It aims to ensure there is memory for non-heap use and other // overhead, while also not underutilizing the machine. // if set_recommended_max_xmx experiment is enabled, sets xmx to 32G. Under 32G // JVM enables CompressedOops. CompressedOops utilizes memory more efficiently, // and has positive impact on GC performance and cache hit rate. -func heapSizeLimit(info *fnpb.ProvisionInfo, setRecommendedMaxXmx bool) uint64 { - if setRecommendedMaxXmx { - return 32 << 30 - } else if size, err := syscallx.PhysicalMemorySize(); err == nil { - var lim uint64 = (size * 70) / 100 - if size-lim < 32<<30 { - return lim - } - return size - (32 << 30) +func HeapSizeLimit(size uint64) uint64 { + if size == 0 { + return 1 << 30 + } + lim := (size * 70) / 100 + if size-lim < 32<<30 { + return lim } - return 1 << 30 + return size - (32 << 30) } // Options represents java VM invocation options in a simple, diff --git a/sdks/java/container/boot_test.go b/sdks/java/container/boot_test.go index 0228c53521bc..0529278daed5 100644 --- a/sdks/java/container/boot_test.go +++ b/sdks/java/container/boot_test.go @@ -78,3 +78,15 @@ func TestBuildOptions(t *testing.T) { t.Errorf("BuildOptions(%v).JavaProperties = %v, want %v", metaOptions, javaOptions.Properties, wantProperties) } } + +func TestHeapSizeLimit(t *testing.T) { + if lim := HeapSizeLimit(0); lim != 1 << 30 { + t.Errorf("HeapSizeLimit(0). Actual (%d). want 1 GB", lim) + } + if lim := HeapSizeLimit(2 << 30); lim != (2 << 30) * 7 / 10 { + t.Errorf("HeapSizeLimit(0). Actual (%d). want 700 MB", lim) + } + if lim := HeapSizeLimit(200 << 30); lim != (200 - 32) << 30 { + t.Errorf("HeapSizeLimit(0). Actual (%d). want 168 GB", lim) + } +}