Skip to content

Commit

Permalink
fix(worker): Pass configured tolerations to AsyncOrchestratorPodProce…
Browse files Browse the repository at this point in the history
…ss (#10365)

The previous implementation of AsyncOrchestratorPodProcess did not account for tolerations that are configured for the worker pods; so if a nodeSelector was configured to isolate pods into their own private node group, the lack of a toleration could make these pods unscheduled since they would not tolerate the node taint on such a node group.

Co-authored-by: Justen Walker <[email protected]>
Co-authored-by: Davin Chia <[email protected]>
  • Loading branch information
3 people committed Dec 14, 2023
1 parent 927deae commit c50599a
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.TolerationPOJO;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.metrics.lib.MetricAttribute;
import io.airbyte.metrics.lib.MetricClient;
Expand All @@ -21,6 +22,8 @@
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.SecretVolumeSourceBuilder;
import io.fabric8.kubernetes.api.model.StatusDetails;
import io.fabric8.kubernetes.api.model.Toleration;
import io.fabric8.kubernetes.api.model.TolerationBuilder;
import io.fabric8.kubernetes.api.model.Volume;
import io.fabric8.kubernetes.api.model.VolumeBuilder;
import io.fabric8.kubernetes.api.model.VolumeMount;
Expand Down Expand Up @@ -413,7 +416,8 @@ public void create(final Map<String, String> allLabels,
final ResourceRequirements resourceRequirements,
final Map<String, String> fileMap,
final Map<Integer, Integer> portMap,
final Map<String, String> nodeSelectors) {
final Map<String, String> nodeSelectors,
final List<TolerationPOJO> tolerations) {
final List<Volume> volumes = new ArrayList<>();
final List<VolumeMount> volumeMounts = new ArrayList<>();
final List<EnvVar> envVars = new ArrayList<>();
Expand Down Expand Up @@ -523,6 +527,7 @@ public void create(final Map<String, String> allLabels,
.withInitContainers(initContainer)
.withVolumes(volumes)
.withNodeSelector(nodeSelectors)
.withTolerations(buildPodTolerations(tolerations))
.endSpec()
.build();

Expand Down Expand Up @@ -562,6 +567,19 @@ public void create(final Map<String, String> allLabels,
copyFilesToKubeConfigVolumeMain(createdPod, updatedFileMap);
}

private Toleration[] buildPodTolerations(final List<TolerationPOJO> tolerations) {
if (tolerations == null || tolerations.isEmpty()) {
return null;
}
return tolerations.stream().map(workerPodToleration -> new TolerationBuilder()
.withKey(workerPodToleration.getKey())
.withEffect(workerPodToleration.getEffect())
.withOperator(workerPodToleration.getOperator())
.withValue(workerPodToleration.getValue())
.build())
.toArray(Toleration[]::new);
}

private static void copyFilesToKubeConfigVolumeMain(final Pod podDefinition, final Map<String, String> files) {
final List<Map.Entry<String, String>> fileEntries = new ArrayList<>(files.entrySet());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,14 +198,16 @@ public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException
final var nodeSelectors =
isCustomConnector ? workerConfigs.getWorkerIsolatedKubeNodeSelectors().orElse(workerConfigs.getworkerKubeNodeSelectors())
: workerConfigs.getworkerKubeNodeSelectors();
final var tolerations = workerConfigs.getWorkerKubeTolerations();

try {
process.create(
allLabels,
resourceRequirements,
fileMap,
portMap,
nodeSelectors);
nodeSelectors,
tolerations);
} catch (final KubernetesClientException e) {
ApmTraceUtils.addExceptionToTrace(e);
throw new WorkerException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ void testAsyncOrchestratorPodProcess(final String pullPolicy) throws Interrupted

asyncProcess.create(Map.of(), new WorkerConfigs(new EnvConfigs()).getResourceRequirements(), Map.of(
OrchestratorConstants.INIT_FILE_APPLICATION, AsyncOrchestratorPodProcess.NO_OP,
OrchestratorConstants.INIT_FILE_ENV_MAP, Jsons.serialize(envMap)), portMap, workerConfigs.getworkerKubeNodeSelectors());
OrchestratorConstants.INIT_FILE_ENV_MAP, Jsons.serialize(envMap)), portMap, workerConfigs.getworkerKubeNodeSelectors(),
workerConfigs.getWorkerKubeTolerations());

// a final activity waits until there is output from the kube pod process
asyncProcess.waitFor(10, TimeUnit.SECONDS);
Expand Down

0 comments on commit c50599a

Please sign in to comment.