From 6f5199d8476fb74879725d8a93e8dc6d080cb0a1 Mon Sep 17 00:00:00 2001 From: Kip Kohn Date: Tue, 31 Oct 2023 23:41:45 -0700 Subject: [PATCH] [GOBBLIN-1945] Implement Distributed Data Movement (DDM) Gobblin-on-Temporal `WorkUnit` evaluation (#3816) * Implement Distributed Data Movement (DDM) Gobblin-on-Temporal `WorkUnit` evaluation * Adjust work unit processing tuning for start-to-close timeout and nested execution branching * Rework `ProcessWorkUnitImpl` and fix `FileSystem` misuse; plus convenience abstractions to load `FileSystem`, `JobState`, and `StateStore` * Fix `FileSystem` resource lifecycle, uniquely name each workflow, and drastically reduce worker concurrent task execution * Heed findbugs advice * prep before commit * Improve processing of required props * Update comment in response to PR feedback --- .../apache/gobblin/configuration/State.java | 5 +- gobblin-temporal/build.gradle | 1 + .../ddm/activity/ProcessWorkUnit.java | 31 +++ .../activity/impl/ProcessWorkUnitImpl.java | 228 ++++++++++++++++++ .../launcher/ProcessWorkUnitsJobLauncher.java | 96 ++++++++ .../temporal/ddm/util/JobStateUtils.java | 98 ++++++++ .../AbstractEagerFsDirBackedWorkload.java | 146 +++++++++++ ...FsDirBackedWorkUnitClaimCheckWorkload.java | 55 +++++ .../temporal/ddm/work/WUProcessingSpec.java | 74 ++++++ .../temporal/ddm/work/WorkUnitClaimCheck.java | 57 +++++ .../temporal/ddm/work/assistance/Help.java | 210 ++++++++++++++++ .../ddm/work/styles/FileSystemApt.java | 32 +++ .../work/styles/FileSystemJobStateful.java | 23 ++ .../temporal/ddm/work/styles/JobStateful.java | 26 ++ .../ddm/worker/WorkFulfillmentWorker.java | 61 +++++ .../workflow/ProcessWorkUnitsWorkflow.java | 32 +++ ...tingExecOfProcessWorkUnitWorkflowImpl.java | 54 +++++ .../impl/ProcessWorkUnitsWorkflowImpl.java | 64 +++++ .../org/apache/gobblin/util/HadoopUtils.java | 9 + .../apache/gobblin/util/JobLauncherUtils.java | 7 +- .../apache/gobblin/util/PropertiesUtils.java | 14 +- 21 files changed, 1312 insertions(+), 11 deletions(-) create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ProcessWorkUnit.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/AbstractEagerFsDirBackedWorkload.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/styles/FileSystemApt.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/styles/FileSystemJobStateful.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/styles/JobStateful.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java index eab5665c97e..b3d2257100e 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java @@ -53,6 +53,7 @@ public class State implements WritableShim { private static final Joiner LIST_JOINER = Joiner.on(","); private static final Splitter LIST_SPLITTER = Splitter.on(",").trimResults().omitEmptyStrings(); + private static final JsonParser JSON_PARSER = new JsonParser(); private String id; @@ -62,8 +63,6 @@ public class State implements WritableShim { @Getter private Properties specProperties; - private final JsonParser jsonParser = new JsonParser(); - public State() { this.specProperties = new Properties(); this.commonProperties = new Properties(); @@ -476,7 +475,7 @@ public boolean getPropAsBoolean(String key, boolean def) { * @return {@link JsonArray} value associated with the key */ public JsonArray getPropAsJsonArray(String key) { - JsonElement jsonElement = this.jsonParser.parse(getProp(key)); + JsonElement jsonElement = this.JSON_PARSER.parse(getProp(key)); Preconditions.checkArgument(jsonElement.isJsonArray(), "Value for key " + key + " is malformed, it must be a JsonArray: " + jsonElement); return jsonElement.getAsJsonArray(); diff --git a/gobblin-temporal/build.gradle b/gobblin-temporal/build.gradle index 25f9bf835f5..1832ac909de 100644 --- a/gobblin-temporal/build.gradle +++ b/gobblin-temporal/build.gradle @@ -27,6 +27,7 @@ dependencies { compile project(":gobblin-api") compile project(":gobblin-cluster") compile project(":gobblin-core") + compile project(":gobblin-data-management") compile project(":gobblin-metrics-libs:gobblin-metrics") compile project(":gobblin-metastore") compile project(":gobblin-runtime") diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ProcessWorkUnit.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ProcessWorkUnit.java new file mode 100644 index 00000000000..bdd9772da85 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ProcessWorkUnit.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.activity; + +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; +import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck; + + +/** Activity for processing/executing a {@link org.apache.gobblin.source.workunit.WorkUnit}, provided by claim-check */ +@ActivityInterface +public interface ProcessWorkUnit { + @ActivityMethod + // CAUTION: void return type won't work, as apparently it mayn't be the return type for `io.temporal.workflow.Functions.Func1`! + int processWorkUnit(WorkUnitClaimCheck wu); +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java new file mode 100644 index 00000000000..640d78da103 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.activity.impl; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import lombok.extern.slf4j.Slf4j; + +import com.google.common.collect.Lists; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; +import org.apache.gobblin.broker.iface.SharedResourcesBroker; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.data.management.copy.CopyEntity; +import org.apache.gobblin.data.management.copy.CopySource; +import org.apache.gobblin.data.management.copy.CopyableFile; +import org.apache.gobblin.metastore.StateStore; +import org.apache.gobblin.runtime.AbstractTaskStateTracker; +import org.apache.gobblin.runtime.GobblinMultiTaskAttempt; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.Task; +import org.apache.gobblin.runtime.TaskCreationException; +import org.apache.gobblin.runtime.TaskExecutor; +import org.apache.gobblin.runtime.TaskState; +import org.apache.gobblin.runtime.TaskStateTracker; +import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter; +import org.apache.gobblin.runtime.troubleshooter.NoopAutomaticTroubleshooter; +import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.temporal.ddm.activity.ProcessWorkUnit; +import org.apache.gobblin.temporal.ddm.util.JobStateUtils; +import org.apache.gobblin.temporal.ddm.work.assistance.Help; +import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck; +import org.apache.gobblin.util.JobLauncherUtils; + + +@Slf4j +public class ProcessWorkUnitImpl implements ProcessWorkUnit { + private static final int LOG_EXTENDED_PROPS_EVERY_WORK_UNITS_STRIDE = 100; + + @Override + public int processWorkUnit(WorkUnitClaimCheck wu) { + try (FileSystem fs = Help.loadFileSystemForce(wu)) { + List workUnits = loadFlattenedWorkUnits(wu, fs); + log.info("WU [{}] - loaded {} workUnits", wu.getCorrelator(), workUnits.size()); + JobState jobState = Help.loadJobState(wu, fs); + return execute(workUnits, wu, jobState, fs); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + protected List loadFlattenedWorkUnits(WorkUnitClaimCheck wu, FileSystem fs) throws IOException { + Path wuPath = new Path(wu.getWorkUnitPath()); + WorkUnit workUnit = JobLauncherUtils.createEmptyWorkUnitPerExtension(wuPath); + Help.deserializeStateWithRetries(fs, wuPath, workUnit, wu); + return JobLauncherUtils.flattenWorkUnits(Lists.newArrayList(workUnit)); + } + + /** + * NOTE: adapted from {@link org.apache.gobblin.runtime.mapreduce.MRJobLauncher.TaskRunner#run(org.apache.hadoop.mapreduce.Mapper.Context)} + * @return count of how many tasks executed (0 if execution ultimately failed, but we *believe* TaskState should already have been recorded beforehand) + */ + protected int execute(List workUnits, WorkUnitClaimCheck wu, JobState jobState, FileSystem fs) throws IOException, InterruptedException { + String containerId = "container-id-for-wu-" + wu.getCorrelator(); + StateStore taskStateStore = Help.openTaskStateStore(wu, fs); + + TaskStateTracker taskStateTracker = createEssentializedTaskStateTracker(wu); + TaskExecutor taskExecutor = new TaskExecutor(new Properties()); + GobblinMultiTaskAttempt.CommitPolicy multiTaskAttemptCommitPolicy = GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE; // as no speculative exec + + SharedResourcesBroker resourcesBroker = JobStateUtils.getSharedResourcesBroker(jobState); + AutomaticTroubleshooter troubleshooter = new NoopAutomaticTroubleshooter(); + // AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(wu.getStateConfig().getProperties())); + troubleshooter.start(); + + List fileSourcePaths = workUnits.stream() + .map(workUnit -> describeAsCopyableFile(workUnit, wu.getWorkUnitPath())) + .collect(Collectors.toList()); + log.info("WU [{}] - submitting {} workUnits for copying files: {}", wu.getCorrelator(), + workUnits.size(), fileSourcePaths); + log.debug("WU [{}] - (first) workUnit: {}", wu.getCorrelator(), workUnits.get(0).toJsonString()); + + try { + GobblinMultiTaskAttempt taskAttempt = GobblinMultiTaskAttempt.runWorkUnits( + jobState.getJobId(), containerId, jobState, workUnits, + taskStateTracker, taskExecutor, taskStateStore, multiTaskAttemptCommitPolicy, + resourcesBroker, troubleshooter.getIssueRepository(), createInterruptionPredicate(fs, jobState)); + return taskAttempt.getNumTasksCreated(); + } catch (TaskCreationException tce) { // derived type of `IOException` that ought not be caught! + throw tce; + } catch (IOException ioe) { + // presume execution already occurred, with `TaskState` written to reflect outcome + log.warn("WU [" + wu.getCorrelator() + "] - continuing on despite IOException:", ioe); + return 0; + } + } + + /** Demonstration processing, to isolate debugging of WU loading and deserialization */ + protected int countSumProperties(List workUnits, WorkUnitClaimCheck wu) { + int totalNumProps = workUnits.stream().mapToInt(workUnit -> workUnit.getPropertyNames().size()).sum(); + log.info("opened WU [{}] to find {} properties total at '{}'", wu.getCorrelator(), totalNumProps, wu.getWorkUnitPath()); + return totalNumProps; + } + + protected TaskStateTracker createEssentializedTaskStateTracker(WorkUnitClaimCheck wu) { + return new AbstractTaskStateTracker(new Properties(), log) { + @Override + public void registerNewTask(Task task) { + // TODO: shall we schedule metrics update based on config? + } + + @Override + public void onTaskRunCompletion(Task task) { + task.markTaskCompletion(); + } + + @Override + public void onTaskCommitCompletion(Task task) { + TaskState taskState = task.getTaskState(); + // TODO: if metrics configured, report them now + log.info("WU [{} = {}] - finished commit after {}ms with state {}{}", wu.getCorrelator(), task.getTaskId(), + taskState.getTaskDuration(), taskState.getWorkingState(), + taskState.getWorkingState().equals(WorkUnitState.WorkingState.SUCCESSFUL) + ? (" to: " + taskState.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR)) : ""); + log.debug("WU [{} = {}] - task state: {}", wu.getCorrelator(), task.getTaskId(), + taskState.toJsonString(shouldUseExtendedLogging(wu))); + getOptCopyableFile(taskState).ifPresent(copyableFile -> { + log.info("WU [{} = {}] - completed copyableFile: {}", wu.getCorrelator(), task.getTaskId(), + copyableFile.toJsonString(shouldUseExtendedLogging(wu))); + }); + } + }; + } + + protected String describeAsCopyableFile(WorkUnit workUnit, String workUnitPath) { + return getOptFirstCopyableFile(Lists.newArrayList(workUnit), workUnitPath) + .map(copyableFile -> copyableFile.getOrigin().getPath().toString()) + .orElse( + "<>") + + "): '" + workUnitPath + "'" + ); + } + + protected Optional getOptCopyableFile(TaskState taskState) { + return getOptCopyableFile(taskState, "taskState '" + taskState.getTaskId() + "'"); + } + + protected Optional getOptFirstCopyableFile(List workUnits, String workUnitPath) { + return Optional.of(workUnits).filter(wus -> wus.size() > 0).flatMap(wus -> + getOptCopyableFile(wus.get(0), "workUnit '" + workUnitPath + "'") + ); + } + + protected Optional getOptCopyableFile(State state, String logDesc) { + return getOptCopyEntityClass(state, logDesc).flatMap(copyEntityClass -> { + log.debug("(state) {} got (copyEntity) {}", state.getClass().getName(), copyEntityClass.getName()); + if (CopyableFile.class.isAssignableFrom(copyEntityClass)) { + String serialization = state.getProp(CopySource.SERIALIZED_COPYABLE_FILE); + if (serialization != null) { + return Optional.of((CopyableFile) CopyEntity.deserialize(serialization)); + } + } + return Optional.empty(); + }); + } + + protected Optional> getOptCopyEntityClass(State state, String logDesc) { + try { + return Optional.of(CopySource.getCopyEntityClass(state)); + } catch (IOException ioe) { + log.warn(logDesc + " - failed getting copy entity class:", ioe); + return Optional.empty(); + } + } + + protected Predicate createInterruptionPredicate(FileSystem fs, JobState jobState) { + // TODO - decide whether to support... and if so, employ a useful path; otherwise, just evaluate predicate to always false + Path interruptionPath = new Path("/not/a/real/path/that/should/ever/exist!"); + return createInterruptionPredicate(fs, interruptionPath); + } + + protected Predicate createInterruptionPredicate(FileSystem fs, Path interruptionPath) { + return (gmta) -> { + try { + return fs.exists(interruptionPath); + } catch (IOException ioe) { + return false; + } + }; + } + + protected boolean shouldUseExtendedLogging(WorkUnitClaimCheck wu) { + try { + return Long.parseLong(wu.getCorrelator()) % LOG_EXTENDED_PROPS_EVERY_WORK_UNITS_STRIDE == 0; + } catch (NumberFormatException nfe) { + log.warn("unexpected, non-numeric correlator: '{}'", wu.getCorrelator()); + return false; + } + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java new file mode 100644 index 00000000000..95425a64371 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.launcher; + +import java.net.URI; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; + +import lombok.extern.slf4j.Slf4j; + +import com.typesafe.config.ConfigFactory; +import io.temporal.client.WorkflowOptions; +import org.apache.hadoop.fs.Path; + +import org.apache.gobblin.metrics.Tag; +import org.apache.gobblin.runtime.JobLauncher; +import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner; +import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec; +import org.apache.gobblin.temporal.ddm.work.assistance.Help; +import org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow; +import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher; +import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobScheduler; +import org.apache.gobblin.util.PropertiesUtils; + +import static org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX; + + +/** + * A {@link JobLauncher} for the initial triggering of a Temporal workflow that executes {@link WorkUnit}s to fulfill + * the work they specify. see: {@link ProcessWorkUnitsWorkflow} + * + *

+ * This class is instantiated by the {@link GobblinTemporalJobScheduler#buildJobLauncher(Properties)} on every job submission to launch the Gobblin job. + * The actual task execution happens in the {@link GobblinTemporalTaskRunner}, usually in a different process. + *

+ */ +@Slf4j +public class ProcessWorkUnitsJobLauncher extends GobblinTemporalJobLauncher { + public static final String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_NAME_NODE_URI = GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX + "name.node.uri"; + public static final String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_UNITS_DIR = GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX + "work.units.dir"; + + public static final String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE = GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX + "work.max.branches.per.tree"; + public static final String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE = GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX + "work.max.sub.trees.per.tree"; + + public static final String WORKFLOW_ID_BASE = "ProcessWorkUnits"; + + public ProcessWorkUnitsJobLauncher( + Properties jobProps, + Path appWorkDir, + List> metadataTags, + ConcurrentHashMap runningMap + ) throws Exception { + super(jobProps, appWorkDir, metadataTags, runningMap); + } + + @Override + public void submitJob(List workunits) { + try { + URI nameNodeUri = new URI(PropertiesUtils.getRequiredProp(this.jobProps, GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_NAME_NODE_URI)); + // NOTE: `Path` is challenging for temporal to ser/de, but nonetheless do pre-construct as `Path`, to pre-validate this prop string's contents + Path workUnitsDir = new Path(PropertiesUtils.getRequiredProp(this.jobProps, GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_UNITS_DIR)); + WUProcessingSpec wuSpec = new WUProcessingSpec(nameNodeUri, workUnitsDir.toString()); + if (this.jobProps.containsKey(GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE) && + this.jobProps.containsKey(GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE)) { + int maxBranchesPerTree = PropertiesUtils.getRequiredPropAsInt(this.jobProps, GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE); + int maxSubTreesPerTree = PropertiesUtils.getRequiredPropAsInt(this.jobProps, GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE); + wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree, maxSubTreesPerTree)); + } + WorkflowOptions options = WorkflowOptions.newBuilder() + .setTaskQueue(this.queueName) + .setWorkflowId(Help.qualifyNamePerExec(WORKFLOW_ID_BASE, wuSpec, ConfigFactory.parseProperties(jobProps))) + .build(); + ProcessWorkUnitsWorkflow workflow = this.client.newWorkflowStub(ProcessWorkUnitsWorkflow.class, options); + workflow.process(wuSpec); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java new file mode 100644 index 00000000000..87d91e0c472 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.util; + +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +import lombok.extern.slf4j.Slf4j; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.typesafe.config.ConfigFactory; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; +import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; +import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance; +import org.apache.gobblin.broker.iface.SharedResourcesBroker; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metastore.FsStateStore; +import org.apache.gobblin.metastore.StateStore; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.TaskState; + + +/** + * Utilities for applying {@link JobState} info to various ends: + * - creating a {@link SharedResourcesBroker} + * - obtaining a {@link StateStore} + */ +@Slf4j +public class JobStateUtils { + private static final String OUTPUT_DIR_NAME = "output"; // following MRJobLauncher.OUTPUT_DIR_NAME + + // reuse same handle among activities executed by the same worker + private static final transient Cache> taskStateStoreByPath = CacheBuilder.newBuilder().build(); + + private JobStateUtils() {} + + public static StateStore openTaskStateStore(JobState jobState, FileSystem fs) { + try { + Path taskStateStorePath = JobStateUtils.getTaskStateStorePath(jobState, fs); + return taskStateStoreByPath.get(taskStateStorePath, () -> + openTaskStateStoreUncached(jobState, fs) + ); + } catch (ExecutionException ee) { + throw new RuntimeException(ee); + } + } + + public static StateStore openTaskStateStoreUncached(JobState jobState, FileSystem fs) { + Path taskStateStorePath = JobStateUtils.getTaskStateStorePath(jobState, fs); + log.info("opening FS task state store at path '{}'", taskStateStorePath); + return new FsStateStore<>(fs, taskStateStorePath.toUri().getPath(), TaskState.class); + } + + /** + * ATTENTION: derives path according to {@link org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same + * {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY} + * @return path to {@link FsStateStore} backing dir + */ + public static Path getTaskStateStorePath(JobState jobState, FileSystem fs) { + Properties jobProps = jobState.getProperties(); + Path jobOutputPath = new Path( + new Path( + new Path( + jobProps.getProperty(ConfigurationKeys.MR_JOB_ROOT_DIR_KEY), + JobState.getJobNameFromProps(jobProps)), + JobState.getJobIdFromProps(jobProps)), + OUTPUT_DIR_NAME); + return fs.makeQualified(jobOutputPath); + } + + public static SharedResourcesBroker getSharedResourcesBroker(JobState jobState) { + SharedResourcesBroker globalBroker = + SharedResourcesBrokerFactory.createDefaultTopLevelBroker( + ConfigFactory.parseProperties(jobState.getProperties()), + GobblinScopeTypes.GLOBAL.defaultScopeInstance()); + return globalBroker.newSubscopedBuilder(new JobScopeInstance(jobState.getJobName(), jobState.getJobId())).build(); + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/AbstractEagerFsDirBackedWorkload.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/AbstractEagerFsDirBackedWorkload.java new file mode 100644 index 00000000000..f6b6e05f104 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/AbstractEagerFsDirBackedWorkload.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.work; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Optional; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import lombok.AccessLevel; +import lombok.Getter; +import lombok.NonNull; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import com.fasterxml.jackson.annotation.JsonIgnore; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt; +import org.apache.gobblin.temporal.util.nesting.work.SeqSliceBackedWorkSpan; +import org.apache.gobblin.temporal.util.nesting.work.Workload; +import org.apache.gobblin.util.HadoopUtils; + + +/** + * {@link Workload} of `WORK_ITEM`s (as defined by derived class) that originates from the eagerly loaded contents of + * the directory `fsDir` within the {@link FileSystem} at `nameNodeUri`. + * + * IMPORTANT: to abide by Temporal's required determinism, a derived class must provide a {@link Comparator} for the + * *total ordering* of `WORK_ITEM`s. + */ +@lombok.NoArgsConstructor // IMPORTANT: for jackson (de)serialization +@lombok.RequiredArgsConstructor +@lombok.ToString(exclude = { "stateConfig", "cachedWorkItems" }) +@Slf4j +public abstract class AbstractEagerFsDirBackedWorkload implements Workload, FileSystemApt { + + @Getter + @NonNull private URI fileSystemUri; + // NOTE: use `String` rather than `Path` to avoid: com.fasterxml.jackson.databind.exc.MismatchedInputException: + // Cannot construct instance of `org.apache.hadoop.fs.Path` (although at least one Creator exists): + // cannot deserialize from Object value (no delegate- or property-based Creator) + @NonNull private String fsDir; + @Getter(AccessLevel.PROTECTED) @Setter(AccessLevel.PROTECTED) + private transient volatile WORK_ITEM[] cachedWorkItems = null; + + @Override + public Optional> getSpan(final int startIndex, final int numElements) { + WORK_ITEM[] workItems = getCachedWorkItems(); + if (startIndex >= workItems.length || startIndex < 0) { + return Optional.empty(); + } else { + return Optional.of(new SeqSliceBackedWorkSpan<>(workItems, startIndex, numElements)); + } + } + + @Override + public boolean isIndexKnownToExceed(final int index) { + return isDefiniteSize() && cachedWorkItems != null && index >= cachedWorkItems.length; + } + + @Override + @JsonIgnore // (because no-arg method resembles 'java bean property') + public boolean isDefiniteSize() { + return true; + } + + protected abstract WORK_ITEM fromFileStatus(FileStatus fileStatus); + + /** + * IMPORTANT: to satisfy Temporal's required determinism, the `WORK_ITEM`s need a consistent total ordering + * WARNING: this works so long as dir contents are unchanged in iterim + * TODO: handle case of dir contents growing (e.g. use timestamp to filter out newer paths)... how could we handle the case of shrinking/deletion? + */ + @JsonIgnore // (because no-arg method resembles 'java bean property') + protected abstract Comparator getWorkItemComparator(); + + /** Hook for each `WORK_ITEM` to be associated with its final, post-sorting ordinal index */ + protected void acknowledgeOrdering(int index, WORK_ITEM workItem) { + // no-op + } + + @JsonIgnore // (because no-arg method resembles 'java bean property') + protected PathFilter getPathFilter() { + return f -> true; + } + + @JsonIgnore // (because no-arg method resembles 'java bean property') + protected final synchronized WORK_ITEM[] getCachedWorkItems() { + if (cachedWorkItems != null) { + return cachedWorkItems; + } + try (FileSystem fs = loadFileSystem()) { + FileStatus[] fileStatuses = fs.listStatus(new Path(fsDir), this.getPathFilter()); + log.info("loaded {} paths from '{}'", fileStatuses.length, fsDir); + WORK_ITEM[] workItems = (WORK_ITEM[])Stream.of(fileStatuses).map(this::fromFileStatus).toArray(Object[]::new); + sortWorkItems(workItems); + IntStream.range(0, workItems.length) + .forEach(i -> this.acknowledgeOrdering(i, workItems[i])); + cachedWorkItems = workItems; + return cachedWorkItems; + } catch (FileNotFoundException fnfe) { + throw new RuntimeException("directory not found: '" + fsDir + "'"); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + + @JsonIgnore // (because no-arg method resembles 'java bean property') + @Override + public State getFileSystemConfig() { + return new State(); // TODO - figure out how to truly set! + } + + @JsonIgnore // (because no-arg method resembles 'java bean property') + protected FileSystem loadFileSystem() throws IOException { + return HadoopUtils.getFileSystem(this.fileSystemUri, this.getFileSystemConfig()); + } + + private void sortWorkItems(WORK_ITEM[] workItems) { + Arrays.sort(workItems, getWorkItemComparator()); + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java new file mode 100644 index 00000000000..d2c193cb101 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.work; + +import java.net.URI; +import java.util.Comparator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.hadoop.fs.FileStatus; + + +/** + * {@link AbstractEagerFsDirBackedWorkload} for {@link WorkUnitClaimCheck} `WORK_ITEM`s, which uses {@link WorkUnitClaimCheck#getWorkUnitPath()} + * for their total-ordering. + */ +@lombok.NoArgsConstructor // IMPORTANT: for jackson (de)serialization +@lombok.ToString(callSuper = true) +public class EagerFsDirBackedWorkUnitClaimCheckWorkload extends AbstractEagerFsDirBackedWorkload { + + public EagerFsDirBackedWorkUnitClaimCheckWorkload(URI fileSystemUri, String hdfsDir) { + super(fileSystemUri, hdfsDir); + } + + @Override + protected WorkUnitClaimCheck fromFileStatus(FileStatus fileStatus) { + // begin by setting all correlators to empty + return new WorkUnitClaimCheck("", this.getFileSystemUri(), fileStatus.getPath().toString()); + } + + @Override + @JsonIgnore // (because no-arg method resembles 'java bean property') + protected Comparator getWorkItemComparator() { + return Comparator.comparing(WorkUnitClaimCheck::getWorkUnitPath); + } + + @Override + protected void acknowledgeOrdering(int index, WorkUnitClaimCheck item) { + // later, after the post-total-ordering indices are know, use each item's index as its correlator + item.setCorrelator(Integer.toString(index)); + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java new file mode 100644 index 00000000000..3b2597194e9 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.work; + +import java.net.URI; +import java.util.Optional; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.hadoop.fs.Path; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.runtime.AbstractJobLauncher; +import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt; +import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful; +import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr; +import org.apache.gobblin.temporal.util.nesting.work.Workload; + + +/** + * Intended to reference multiple {@link org.apache.gobblin.source.workunit.WorkUnit}s to process, where `workUnitsDir` + * is resolved against the {@link org.apache.hadoop.fs.FileSystem} given by `nameNodeUri`. see: + */ +@Data +@NoArgsConstructor // IMPORTANT: for jackson (de)serialization +@RequiredArgsConstructor +public class WUProcessingSpec implements FileSystemApt, FileSystemJobStateful { + @NonNull private URI fileSystemUri; + @NonNull private String workUnitsDir; + @NonNull private Tuning tuning = Tuning.DEFAULT; + + @JsonIgnore // (because no-arg method resembles 'java bean property') + @Override + public State getFileSystemConfig() { + return new State(); // TODO - figure out how to truly set! + } + + @JsonIgnore // (because no-arg method resembles 'java bean property') + @Override + public Path getJobStatePath() { + // TODO: decide whether wise to hard-code... (per `MRJobLauncher` conventions, we expect job state file to be sibling of WU dir) + return new Path(new Path(workUnitsDir).getParent(), AbstractJobLauncher.JOB_STATE_FILE_NAME); + } + + /** Configuration for {@link org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow#performWorkload(WorkflowAddr, Workload, int, int, int, Optional)}*/ + @Data + @NoArgsConstructor // IMPORTANT: for jackson (de)serialization + @RequiredArgsConstructor + public static class Tuning { + public static final int DEFAULT_MAX_BRANCHES_PER_TREE = 900; + public static final int DEFAULT_SUB_TREES_PER_TREE = 30; + + public static final Tuning DEFAULT = new Tuning(DEFAULT_MAX_BRANCHES_PER_TREE, DEFAULT_SUB_TREES_PER_TREE); + + @NonNull private int maxBranchesPerTree; + @NonNull private int maxSubTreesPerTree; + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java new file mode 100644 index 00000000000..f0321566442 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.work; + +import java.net.URI; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.hadoop.fs.Path; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.runtime.AbstractJobLauncher; +import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt; +import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful; + +/** + * Conveys a {@link org.apache.gobblin.source.workunit.WorkUnit} by claim-check, where the `workUnitPath` is resolved + * against the {@link org.apache.hadoop.fs.FileSystem} given by `nameNodeUri`. see: + * @see Claim-Check Pattern + */ +@Data +@NoArgsConstructor // IMPORTANT: for jackson (de)serialization +@RequiredArgsConstructor +public class WorkUnitClaimCheck implements FileSystemApt, FileSystemJobStateful { + @NonNull private String correlator; + @NonNull private URI fileSystemUri; + @NonNull private String workUnitPath; + + @JsonIgnore // (because no-arg method resembles 'java bean property') + @Override + public State getFileSystemConfig() { + return new State(); // TODO - figure out how to truly set! + } + + @JsonIgnore // (because no-arg method resembles 'java bean property') + @Override + public Path getJobStatePath() { + // TODO: decide whether wise to hard-code... (per `MRJobLauncher` conventions, we expect job state file to be sibling of WU dir) + return new Path(new Path(workUnitPath).getParent().getParent(), AbstractJobLauncher.JOB_STATE_FILE_NAME); + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java new file mode 100644 index 00000000000..f382dcec1a5 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.work.assistance; + +import java.io.IOException; +import java.net.URI; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ExecutionException; + +import lombok.extern.slf4j.Slf4j; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +import com.typesafe.config.Config; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.metastore.StateStore; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.TaskState; +import org.apache.gobblin.temporal.ddm.util.JobStateUtils; +import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful; +import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt; +import org.apache.gobblin.temporal.ddm.work.styles.JobStateful; +import org.apache.gobblin.util.HadoopUtils; +import org.apache.gobblin.util.SerializationUtils; + + +/** Various capabilities useful in implementing Distributed Data Movement (DDM) */ +@Slf4j +public class Help { + public static final int MAX_DESERIALIZATION_FS_LOAD_ATTEMPTS = 5; + public static final int LOG_CACHE_STATS_EVERY_N_ACCESSES = 1000; + public static final String AZKABAN_FLOW_EXEC_ID_KEY = "azkaban.flow.execid"; + public static final String USER_TO_PROXY_KEY = "user.to.proxy"; + + // treat `JobState` as immutable and cache, for reuse among activities executed by the same worker + private static final transient Cache jobStateByPath = CacheBuilder.newBuilder().recordStats().build(); + private static final transient AtomicInteger jobStateAccessCount = new AtomicInteger(0); + + private Help() {} + + public static String qualifyNamePerExec(String name, FileSystemJobStateful f, Config workerConfig) { + return name + "_" + calcPerExecQualifier(f, workerConfig); + } + + public static String qualifyNamePerExec(String name, Config workerConfig) { + return name + "_" + calcPerExecQualifier(workerConfig); + } + + public static String calcPerExecQualifier(FileSystemJobStateful f, Config workerConfig) { + Optional optFlowExecId = Optional.empty(); + try { + optFlowExecId = Optional.of(loadJobState(f).getProp(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, null)); + } catch (IOException e) { + log.warn("unable to loadJobState", e); + } + return optFlowExecId.map(x -> x + "_").orElse("") + calcPerExecQualifier(workerConfig); + } + + public static String calcPerExecQualifier(Config workerConfig) { + String userToProxy = workerConfig.hasPath(USER_TO_PROXY_KEY) + ? workerConfig.getString(USER_TO_PROXY_KEY) : ""; + String azFlowExecId = workerConfig.hasPath(AZKABAN_FLOW_EXEC_ID_KEY) + ? workerConfig.getString(AZKABAN_FLOW_EXEC_ID_KEY) : UUID.randomUUID().toString(); + return userToProxy + "_" + azFlowExecId; + } + + public static FileSystem loadFileSystem(FileSystemApt a) throws IOException { + // NOTE: `FileSystem.get` appears to implement caching, which should facilitate sharing among activities executing on the same worker + return loadFileSystemForUri(a.getFileSystemUri(), a.getFileSystemConfig()); + } + + public static FileSystem loadFileSystemForUri(URI fsUri, State fsConfig) throws IOException { + // TODO - determine whether this works... unclear whether it led to "FS closed", or that had another cause... + // return HadoopUtils.getFileSystem(fsUri, fsConfig); + Configuration conf = HadoopUtils.getConfFromState(fsConfig); + return FileSystem.get(fsUri, conf); + } + + public static FileSystem loadFileSystemForce(FileSystemApt a) throws IOException { + return loadFileSystemForUriForce(a.getFileSystemUri(), a.getFileSystemConfig()); + } + + public static FileSystem loadFileSystemForUriForce(URI fsUri, State fsConfig) throws IOException { + // for reasons still not fully understood, we encountered many "FS closed" failures before disabling HDFS caching--especially as num WUs increased. + // perhaps caching-facilitated reuse of the same FS across multiple WUs caused prior WU execs to leave the FS in a problematic state for subsequent execs + // TODO - more investigation to sort out the true RC... and whether caching definitively is or is not possible for use here! + // return HadoopUtils.getFileSystem(fsUri, fsConfig); + Configuration conf = HadoopUtils.getConfFromState(fsConfig); + conf.setBoolean("fs.hdfs.impl.disable.cache", true); + return FileSystem.get(fsUri, conf); + } + + public static JobState loadJobState(FileSystemJobStateful f) throws IOException { + try (FileSystem fs = loadFileSystemForce(f)) { + return loadJobState(f, fs); + } + } + + public static JobState loadJobState(JobStateful js, FileSystem fs) throws IOException { + try { + incrementJobStateAccess(); + return jobStateByPath.get(js.getJobStatePath(), () -> + loadJobStateUncached(js, fs) + ); + } catch (ExecutionException ee) { + throw new IOException(ee); + } + } + + public static JobState loadJobStateUncached(JobStateful js, FileSystem fs) throws IOException { + JobState jobState = new JobState(); + SerializationUtils.deserializeState(fs, js.getJobStatePath(), jobState); + log.info("loaded jobState from '{}': {}", js.getJobStatePath(), jobState.toJsonString(true)); + return jobState; + } + + public static JobState loadJobStateWithRetries(FileSystemJobStateful f) throws IOException { + try (FileSystem fs = loadFileSystemForce(f)) { + return loadJobStateWithRetries(f, fs); + } + } + + public static JobState loadJobStateWithRetries(FileSystemJobStateful f, FileSystem fs) throws IOException { + try { + incrementJobStateAccess(); + return jobStateByPath.get(f.getJobStatePath(), () -> + loadJobStateUncachedWithRetries(f, fs, f) + ); + } catch (ExecutionException ee) { + throw new IOException(ee); + } + } + + public static JobState loadJobStateUncachedWithRetries(JobStateful js, FileSystem fs, FileSystemApt fsApt) throws IOException { + JobState jobState = new JobState(); + deserializeStateWithRetries(fs, js.getJobStatePath(), jobState, fsApt, MAX_DESERIALIZATION_FS_LOAD_ATTEMPTS); + log.info("loaded jobState from '{}': {}", js.getJobStatePath(), jobState.toJsonString(true)); + return jobState; + } + + public static void deserializeStateWithRetries(FileSystem fs, Path path, T state, FileSystemApt fsApt) + throws IOException { + deserializeStateWithRetries(fs, path, state, fsApt, MAX_DESERIALIZATION_FS_LOAD_ATTEMPTS); + } + + // TODO: decide whether actually necessary... it was added in a fit of debugging "FS closed" errors + public static void deserializeStateWithRetries(FileSystem fs, Path path, T state, FileSystemApt fsApt, int maxAttempts) + throws IOException { + for (int i = 0; i < maxAttempts; ++i) { + if (i > 0) { + log.info("reopening FS '{}' to retry ({}) deserialization (attempt {})", fsApt.getFileSystemUri(), + state.getClass().getSimpleName(), i); + fs = Help.loadFileSystem(fsApt); + } + try { + SerializationUtils.deserializeState(fs, path, state); + return; + } catch (IOException ioe) { + if (ioe.getMessage().equals("Filesystem closed") && i < maxAttempts - 1) { + continue; + } else { + throw ioe; + } + } + } + } + + public static StateStore openTaskStateStore(FileSystemJobStateful f) throws IOException { + try (FileSystem fs = Help.loadFileSystem(f)) { + return JobStateUtils.openTaskStateStore(Help.loadJobState(f, fs), fs); + } + } + + public static StateStore openTaskStateStore(FileSystemJobStateful js, FileSystem fs) throws IOException { + return JobStateUtils.openTaskStateStoreUncached(loadJobState(js), fs); + // public static StateStore openTaskStateStore(JobStateful js, FileSystem fs) throws IOException { + // return JobStateUtils.openTaskStateStore(loadJobState(js, fs), fs); + } + + private static void incrementJobStateAccess() { + int numAccesses = jobStateAccessCount.getAndIncrement(); + if (numAccesses % LOG_CACHE_STATS_EVERY_N_ACCESSES == 0) { + log.info("JobState(numAccesses: {}) - {}", numAccesses, jobStateByPath.stats()); + } + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/styles/FileSystemApt.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/styles/FileSystemApt.java new file mode 100644 index 00000000000..324aae04d48 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/styles/FileSystemApt.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.work.styles; + +import java.net.URI; +import com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.gobblin.configuration.State; + + +/** Marks a type that can indicate a {@link org.apache.hadoop.fs.FileSystem} via its {@link URI} and configuration */ +public interface FileSystemApt { + + URI getFileSystemUri(); + + @JsonIgnore // (because no-arg method resembles 'java bean property') + State getFileSystemConfig(); +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/styles/FileSystemJobStateful.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/styles/FileSystemJobStateful.java new file mode 100644 index 00000000000..d3b0accdb9e --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/styles/FileSystemJobStateful.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.work.styles; + + +/** Marks a type that can indicate both a {@link org.apache.hadoop.fs.FileSystem} and a {@link org.apache.gobblin.runtime.JobState} */ +public interface FileSystemJobStateful extends JobStateful, FileSystemApt { +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/styles/JobStateful.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/styles/JobStateful.java new file mode 100644 index 00000000000..3dc7aed32b6 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/styles/JobStateful.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.work.styles; + +import org.apache.hadoop.fs.Path; + + +/** Marks a type that can indicate a {@link org.apache.gobblin.runtime.JobState} via its {@link Path} */ +public interface JobStateful { + Path getJobStatePath(); +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java new file mode 100644 index 00000000000..9af6995b509 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.worker; + +import java.util.concurrent.TimeUnit; + +import com.typesafe.config.Config; +import io.temporal.client.WorkflowClient; +import io.temporal.worker.WorkerOptions; + +import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker; +import org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl; +import org.apache.gobblin.temporal.ddm.workflow.impl.NestingExecOfProcessWorkUnitWorkflowImpl; +import org.apache.gobblin.temporal.ddm.workflow.impl.ProcessWorkUnitsWorkflowImpl; + + +/** Worker for the {@link ProcessWorkUnitsWorkflowImpl} super-workflow */ +public class WorkFulfillmentWorker extends AbstractTemporalWorker { + public static final long DEADLOCK_DETECTION_TIMEOUT_SECONDS = 120; + public static final int MAX_EXECUTION_CONCURRENCY = 3; + + public WorkFulfillmentWorker(Config config, WorkflowClient workflowClient) { + super(config, workflowClient); + } + + @Override + protected Class[] getWorkflowImplClasses() { + return new Class[] { ProcessWorkUnitsWorkflowImpl.class, NestingExecOfProcessWorkUnitWorkflowImpl.class }; + } + + @Override + protected Object[] getActivityImplInstances() { + return new Object[] { new ProcessWorkUnitImpl() }; + } + + @Override + protected WorkerOptions createWorkerOptions() { + return WorkerOptions.newBuilder() + // default is only 1s - WAY TOO SHORT for `o.a.hadoop.fs.FileSystem#listStatus`! + .setDefaultDeadlockDetectionTimeout(TimeUnit.SECONDS.toMillis(DEADLOCK_DETECTION_TIMEOUT_SECONDS)) + .setMaxConcurrentActivityExecutionSize(MAX_EXECUTION_CONCURRENCY) + .setMaxConcurrentLocalActivityExecutionSize(MAX_EXECUTION_CONCURRENCY) + .setMaxConcurrentWorkflowTaskExecutionSize(MAX_EXECUTION_CONCURRENCY) + .build(); + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java new file mode 100644 index 00000000000..ba2ccf99acf --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.workflow; + +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec; + + +/** Temporal workflow for executing {@link WorkUnit}s to fulfill the work they specify. */ +@WorkflowInterface +public interface ProcessWorkUnitsWorkflow { + /** @return the number of {@link WorkUnit}s cumulatively processed successfully */ + @WorkflowMethod + int process(WUProcessingSpec wuSpec); +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java new file mode 100644 index 00000000000..074bb460972 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.workflow.impl; + +import io.temporal.activity.ActivityOptions; +import io.temporal.common.RetryOptions; +import io.temporal.workflow.Async; +import io.temporal.workflow.Promise; +import io.temporal.workflow.Workflow; +import java.time.Duration; + +import org.apache.gobblin.temporal.ddm.activity.ProcessWorkUnit; +import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck; +import org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWorkflowImpl; + + +/** {@link org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow} for {@link ProcessWorkUnit} */ +public class NestingExecOfProcessWorkUnitWorkflowImpl extends AbstractNestingExecWorkflowImpl { + + // RetryOptions specify how to automatically handle retries when Activities fail. + private static final RetryOptions ACTIVITY_RETRY_OPTS = RetryOptions.newBuilder() + .setInitialInterval(Duration.ofSeconds(3)) + .setMaximumInterval(Duration.ofSeconds(100)) + .setBackoffCoefficient(2) + .setMaximumAttempts(4) + .build(); + + private static final ActivityOptions ACTIVITY_OPTS = ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofSeconds(999)) + .setRetryOptions(ACTIVITY_RETRY_OPTS) + .build(); + + private final ProcessWorkUnit activityStub = Workflow.newActivityStub(ProcessWorkUnit.class, ACTIVITY_OPTS); + + @Override + protected Promise launchAsyncActivity(final WorkUnitClaimCheck wu) { + return Async.function(activityStub::processWorkUnit, wu); + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java new file mode 100644 index 00000000000..eafc624096e --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.temporal.ddm.workflow.impl; + +import java.util.Optional; + +import com.typesafe.config.ConfigFactory; + +import io.temporal.api.enums.v1.ParentClosePolicy; +import io.temporal.workflow.ChildWorkflowOptions; +import io.temporal.workflow.Workflow; + +import org.apache.gobblin.temporal.cluster.WorkerConfig; +import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec; +import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck; +import org.apache.gobblin.temporal.ddm.work.assistance.Help; +import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful; +import org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow; +import org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload; +import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr; +import org.apache.gobblin.temporal.util.nesting.work.Workload; +import org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow; + + +public class ProcessWorkUnitsWorkflowImpl implements ProcessWorkUnitsWorkflow { + public static final String CHILD_WORKFLOW_ID_BASE = "NestingExecWorkUnits"; + + @Override + public int process(WUProcessingSpec workSpec) { + Workload workload = createWorkload(workSpec); + NestingExecWorkflow processingWorkflow = createProcessingWorkflow(workSpec); + return processingWorkflow.performWorkload( + WorkflowAddr.ROOT, workload, 0, + workSpec.getTuning().getMaxBranchesPerTree(), workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty() + ); + } + + protected Workload createWorkload(WUProcessingSpec workSpec) { + return new EagerFsDirBackedWorkUnitClaimCheckWorkload(workSpec.getFileSystemUri(), workSpec.getWorkUnitsDir()); + } + + protected NestingExecWorkflow createProcessingWorkflow(FileSystemJobStateful f) { + ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder() + .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON) + .setWorkflowId(Help.qualifyNamePerExec(CHILD_WORKFLOW_ID_BASE, f, WorkerConfig.of(this).orElse(ConfigFactory.empty()))) + .build(); + // TODO: to incorporate multiple different concrete `NestingExecWorkflow` sub-workflows in the same super-workflow... shall we use queues?!?!? + return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts); + } +} diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java index 3e020f6f808..012795dd1be 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java @@ -1029,6 +1029,15 @@ public static FileSystem getSourceFileSystem(State state) throws IOException { return HadoopUtils.getOptionallyThrottledFileSystem(FileSystem.get(URI.create(uri), conf), state); } + /** + * Get a {@link FileSystem} for `fsUri` + * @throws IOException + */ + public static FileSystem getFileSystem(URI fsUri, State state) throws IOException { + Configuration conf = HadoopUtils.getConfFromState(state, Optional.absent()); + return HadoopUtils.getOptionallyThrottledFileSystem(FileSystem.get(fsUri, conf), state); + } + /** * Get a {@link FileSystem} object for the uri specified at {@link ConfigurationKeys#WRITER_FILE_SYSTEM_URI}. * @throws IOException diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java index 1f20b1b7334..89bf4d4bdce 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java @@ -130,12 +130,7 @@ public static List flattenWorkUnits(Collection workUnits) { public static List loadFlattenedWorkUnits(FileSystem fs, Path path) throws IOException { WorkUnit workUnit = JobLauncherUtils.createEmptyWorkUnitPerExtension(path); SerializationUtils.deserializeState(fs, path, workUnit); - - if (workUnit.isMultiWorkUnit()) { - return JobLauncherUtils.flattenWorkUnits(((MultiWorkUnit) workUnit).getWorkUnits()); - } else { - return Lists.newArrayList(workUnit); - } + return JobLauncherUtils.flattenWorkUnits(Lists.newArrayList(workUnit)); } /** @return an empty {@link WorkUnit}, potentially an empty {@link MultiWorkUnit}, based on the {@link Path} extension */ diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java index d6361b92779..0aaed372959 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java @@ -72,9 +72,19 @@ public static int getPropAsInt(Properties properties, String key, int defaultVal /** @throws {@link NullPointerException} when `key` not in `properties` */ public static int getRequiredPropAsInt(Properties properties, String key) { + return Integer.parseInt(getRequiredPropRaw(properties, key, Optional.of("an integer"))); + } + + /** @throws {@link NullPointerException} when `key` not in `properties` */ + public static String getRequiredProp(Properties properties, String key) { + return getRequiredPropRaw(properties, key, Optional.absent()); + } + + /** @throws {@link NullPointerException} when `key` not in `properties` */ + public static String getRequiredPropRaw(Properties properties, String key, Optional desc) { String value = properties.getProperty(key); - Preconditions.checkNotNull(value, "'" + key + "' must be set (to an integer)"); - return Integer.parseInt(value); + Preconditions.checkNotNull(value, "'" + key + "' must be set" + desc.transform(s -> " (to " + desc + ")").or("")); + return value; } public static long getPropAsLong(Properties properties, String key, long defaultValue) {