Skip to content

Commit

Permalink
Adds new resource hint for number of cpus per worker.
Browse files Browse the repository at this point in the history
  • Loading branch information
kerrydc committed Oct 5, 2023
1 parent c01b41f commit d4caf1b
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1982,5 +1982,9 @@ message StandardResourceHints {
// SDKs should convert the size to bytes, but can allow users to specify human-friendly units (e.g. GiB).
// Payload: ASCII encoded string of the base 10 representation of an integer number of bytes.
MIN_RAM_BYTES = 1 [(beam_urn) = "beam:resources:min_ram_bytes:v1"];
// Describes desired number of CPUs available in transform's execution environment.
// SDKs should accept and validate a positive integer count.
// Payload: ASCII encoded string of the base 10 representation of an integer number of CPUs.
CPU_COUNT = 2 [(beam_urn) = "beam:resources:cpu_count:v1"];
}
}
37 changes: 37 additions & 0 deletions sdks/go/pkg/beam/options/resource/hint.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,40 @@ func (h acceleratorHint) MergeWithOuter(outer Hint) Hint {
func (h acceleratorHint) String() string {
return fmt.Sprintf("accelerator=%v", h.value)
}

// CPUCount hints that this scope should be put in a machine with at least this many CPUs or vCPUs.
//
// Hints are advisory only and runners may not respect them.
//
// See https://beam.apache.org/documentation/runtime/resource-hints/ for more information about
// resource hints.
func CPUCount(v int) Hint {
return CPUCountHint{value: int(v)}
}

type CPUCountHint struct {
value int
}

func (CPUCountHint) URN() string {
return "beam:resources:cpu_count:v1"
}

func (h CPUCountHint) Payload() []byte {
// Go strings are utf8, and if the string is ascii,
// byte conversion handles that directly.
return []byte(strconv.FormatInt(h.value, 10))
}

// MergeWith an outer minRAMHints by keeping the maximum of the two cpu counts.
func (h CPUCountHint) MergeWithOuter(outer Hint) Hint {
// Intentional runtime panic from type assertion to catch hint merge errors.
if outer.(CPUCountHint).value > h.value {
return outer
}
return h
}

func (h CPUCountHint) String() string {
return fmt.Sprintf("cpu_count=%v", humanize.Bytes(int(h.value)))
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class ResourceHints {
private static final String MIN_RAM_URN = "beam:resources:min_ram_bytes:v1";
private static final String ACCELERATOR_URN = "beam:resources:accelerator:v1";

private static final String CPU_COUNT_URN = "beam:resources:cpu_count:v1";

// TODO: reference this from a common location in all packages that use this.
private static String getUrn(ProtocolMessageEnum value) {
return value.getValueDescriptor().getOptions().getExtension(RunnerApi.beamUrn);
Expand All @@ -64,6 +66,8 @@ private static String getUrn(ProtocolMessageEnum value) {
.put("minRam", MIN_RAM_URN)
.put("min_ram", MIN_RAM_URN) // Courtesy alias.
.put("accelerator", ACCELERATOR_URN)
.put("cpuCount", CPU_COUNT_URN)
.put("cpu_count", CPU_COUNT_URN) // Courtesy alias.
.build();

private static ImmutableMap<String, Function<String, ResourceHint>> parsers =
Expand Down Expand Up @@ -212,6 +216,47 @@ public int hashCode() {
}
}

/*package*/ static class IntHint extends ResourceHint {
private final int value;

@Override
public boolean equals(@Nullable Object other) {
if (other == null) {
return false;
} else if (this == other) {
return true;
} else if (other instanceof IntHint) {
return ((IntHint) other).value == value;
} else {
return false;
}
}

@Override
public int hashCode() {
return Integer.hashCode(value);
}

public IntHint(int value) {
this.value = value;
}

public static int parse(String s) {
return Integer.valueOf(s);
throw new IllegalArgumentException("Unable to parse '" + s + "' as an Integer value.");
}

@Override
public ResourceHint mergeWithOuter(ResourceHint outer) {
return new IntHint(Math.max(value, ((IntHint) outer).value));
}

@Override
public byte[] toBytes() {
return String.valueOf(value).getBytes(Charsets.US_ASCII);
}
}

/**
* Sets desired minimal available RAM size to have in transform's execution environment.
*
Expand Down Expand Up @@ -264,6 +309,23 @@ public ResourceHints withHint(String urn, ResourceHint hint) {
return new ResourceHints(newHints.build());
}

/**
* Sets desired minimal CPU or vCPU count to have in transform's execution environment.
*
* @param cpuCount specifies a positive CPU count.
*/
public ResourceHints withCPUCount(int cpuCount) {
if (cpuCount <= 0) {
LOG.error(
"Encountered invalid non-positive cpu count hint value {}.\n"
+ "The value is ignored. In the future, The method will require an object Long type "
+ "and throw an IllegalArgumentException for invalid values.",
cpuCount);
return this;
}
return withHint(CPU_COUNT_URN, new IntHint(cpuCount));
}

public Map<String, ResourceHint> hints() {
return hints;
}
Expand Down
1 change: 1 addition & 0 deletions sdks/python/.python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.10.7
15 changes: 15 additions & 0 deletions sdks/python/apache_beam/transforms/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
'ResourceHint',
'AcceleratorHint',
'MinRamHint',
'CpuCountHint',
'merge_resource_hints',
'parse_resource_hints',
'resource_hints_from_options',
Expand Down Expand Up @@ -177,6 +178,20 @@ def get_merged_value(
ResourceHint.register_resource_hint('minRam', MinRamHint)


class CpuCountHint(ResourceHint):
"""Describes desired hardware accelerators in execution environment."""
urn = resource_hints.CPU_COUNT.urn

@classmethod
def get_merged_value(
cls, outer_value, inner_value): # type: (int, int) -> int
return ResourceHint._use_max(outer_value, inner_value)


ResourceHint.register_resource_hint('cpu_count', CpuCountHint)
ResourceHint.register_resource_hint('cpuCount', CpuCountHint)


def parse_resource_hints(hints): # type: (Dict[Any, Any]) -> Dict[str, bytes]
parsed_hints = {}
for hint, value in hints.items():
Expand Down

0 comments on commit d4caf1b

Please sign in to comment.