Skip to content

Commit

Permalink
[GOBBLIN-1941] Develop Temporal abstractions, including Workload fo…
Browse files Browse the repository at this point in the history
…r workflows of unbounded size through sub-workflow nesting (#3811)

* Define `Workload` abstraction for Temporal workflows of unbounded size through sub-workflow nesting

* Adjust Gobblin-Temporal configurability for consistency and abstraction

* Define `WorkerConfig`, to pass the `TemporalWorker`'s configuration to the workflows and activities it hosts

* Improve javadoc

* Javadoc fixup

* Minor changes

* Update per review suggestions

* Insert pause, to spread the load on the temporal server, before launch of each child workflow that may have direct leaves of its own

* Appease findbugs by having `SeqSliceBackedWorkSpan::next` throw `NoSuchElementException`

* Add comment
  • Loading branch information
phet authored Oct 30, 2023
1 parent 629b9cc commit da6b1df
Show file tree
Hide file tree
Showing 12 changed files with 619 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,18 @@ public interface GobblinTemporalConfigurationKeys {

String PREFIX = "gobblin.temporal.";

String WORKER_CLASS = PREFIX + "worker";
String WORKER_CLASS = PREFIX + "worker.class";
String DEFAULT_WORKER_CLASS = HelloWorldWorker.class.getName();
String GOBBLIN_TEMPORAL_NAMESPACE = PREFIX + "namespace";
String DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE = PREFIX + "namespace";

String GOBBLIN_TEMPORAL_TASK_QUEUE = PREFIX + "task.queue.name";
String DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE = "GobblinTemporalTaskQueue";
String GOBBLIN_TEMPORAL_JOB_LAUNCHER = PREFIX + "job.launcher";
String DEFAULT_GOBBLIN_TEMPORAL_JOB_LAUNCHER = HelloWorldJobLauncher.class.getName();
String GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX = PREFIX + "job.launcher.";
String GOBBLIN_TEMPORAL_JOB_LAUNCHER_CLASS = GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX + "class";
String DEFAULT_GOBBLIN_TEMPORAL_JOB_LAUNCHER_CLASS = HelloWorldJobLauncher.class.getName();

String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX = GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX + "arg.";

/**
* Number of worker processes to spin up per task runner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@

package org.apache.gobblin.temporal.cluster;

import java.util.Arrays;

import com.typesafe.config.Config;

import io.temporal.client.WorkflowClient;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import io.temporal.worker.WorkerOptions;

import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.util.ConfigUtils;


public abstract class AbstractTemporalWorker {
/** Basic boilerplate for a {@link TemporalWorker} to register its activity and workflow capabilities and listen on a particular queue */
public abstract class AbstractTemporalWorker implements TemporalWorker {
private final WorkflowClient workflowClient;
private final String queueName;
private final WorkerFactory workerFactory;
Expand All @@ -42,10 +46,13 @@ public AbstractTemporalWorker(Config cfg, WorkflowClient client) {

// Create a Worker factory that can be used to create Workers that poll specific Task Queues.
workerFactory = WorkerFactory.newInstance(workflowClient);

stashWorkerConfig(cfg);
}

@Override
public void start() {
Worker worker = workerFactory.newWorker(queueName);
Worker worker = workerFactory.newWorker(queueName, createWorkerOptions());
// This Worker hosts both Workflow and Activity implementations.
// Workflows are stateful, so you need to supply a type to create instances.
worker.registerWorkflowImplementationTypes(getWorkflowImplClasses());
Expand All @@ -55,16 +62,25 @@ public void start() {
workerFactory.start();
}

/**
* Shuts down the worker.
*/
@Override
public void shutdown() {
workerFactory.shutdown();
}

protected WorkerOptions createWorkerOptions() {
return null;
}

/** @return workflow types for *implementation* classes (not interface) */
protected abstract Class<?>[] getWorkflowImplClasses();

/** @return activity instances; NOTE: activities must be stateless and thread-safe, so a shared instance is used. */
protected abstract Object[] getActivityImplInstances();

private final void stashWorkerConfig(Config cfg) {
// stash in association with...
WorkerConfig.forWorker(this.getClass(), cfg); // the worker itself
Arrays.stream(getWorkflowImplClasses()).forEach(clazz -> WorkerConfig.withImpl(clazz, cfg)); // its workflow impls
Arrays.stream(getActivityImplInstances()).forEach(obj -> WorkerConfig.withImpl(obj.getClass(), cfg)); // its activity impls
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public class GobblinTemporalTaskRunner implements StandardMetricsBridge {
protected final String temporalQueueName;
private final boolean isMetricReportingFailureFatal;
private final boolean isEventReportingFailureFatal;
private final List<AbstractTemporalWorker> workers;
private final List<TemporalWorker> workers;

public GobblinTemporalTaskRunner(String applicationName,
String applicationId,
Expand Down Expand Up @@ -234,7 +234,7 @@ public void start()
}
}

private AbstractTemporalWorker initiateWorker() throws Exception{
private TemporalWorker initiateWorker() throws Exception {
logger.info("Starting Temporal Worker");

String connectionUri = clusterConfig.getString(GobblinTemporalConfigurationKeys.TEMPORAL_CONNECTION_STRING);
Expand All @@ -246,8 +246,8 @@ private AbstractTemporalWorker initiateWorker() throws Exception{

String workerClassName = ConfigUtils.getString(clusterConfig,
GobblinTemporalConfigurationKeys.WORKER_CLASS, GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS);
AbstractTemporalWorker worker = GobblinConstructorUtils.invokeLongestConstructor(
(Class<AbstractTemporalWorker>) Class.forName(workerClassName), clusterConfig, client);
TemporalWorker worker = GobblinConstructorUtils.invokeLongestConstructor(
(Class<TemporalWorker>)Class.forName(workerClassName), clusterConfig, client);
worker.start();
logger.info("A new worker is started.");
return worker;
Expand Down Expand Up @@ -286,9 +286,7 @@ public synchronized void stop() {
this.containerMetrics.get().stopMetricsReporting();
}

for (AbstractTemporalWorker worker : workers) {
worker.shutdown();
}
workers.forEach(TemporalWorker::shutdown);

logger.info("All services are stopped.");

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.cluster;

/** Marker interface for a temporal.io "worker", with capability to `start()` and `shutdown()` */
public interface TemporalWorker {

/** Starts the worker */
void start();

/** Shuts down the worker */
void shutdown();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.cluster;

import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

import lombok.extern.slf4j.Slf4j;

import com.typesafe.config.Config;


/**
* Static holder to stash the {@link Config} used to construct each kind of {@link org.apache.gobblin.temporal.cluster.TemporalWorker}
* (within the current JVM). Lookup may be by either the {@link Class} of the worker or of any workflow or activity implementation supplied by
* that worker. The objective is to facilitate sharing the worker's Config with workflow and activity implementations (running within that worker).
*
* ATTENTION: for sanity, construct multiple instances of the same worker always with the same {@link Config}. When this is violated, the `Config`
* given to the most-recently constructed worker "wins".
*
* NOTE: the preservation and sharing of {@link Config} is predicated entirely on its immutability. Thank you TypeSafe!
* Storage indexing uses FQ class name, not the {@link Class}, to be independent of classloader.
*/
@Slf4j
public class WorkerConfig {
private static final ConcurrentHashMap<String, Config> configByFQClassName = new ConcurrentHashMap<>();

private WorkerConfig() {}

/** @return whether initialized now (vs. being previously known) */
public static boolean forWorker(Class<? extends TemporalWorker> workerClass, Config config) {
return storeAs(workerClass.getName(), config);
}

/** @return whether initialized now (vs. being previously known) */
public static boolean withImpl(Class<?> workflowOrActivityImplClass, Config config) {
return storeAs(workflowOrActivityImplClass.getName(), config);
}

public static Optional<Config> ofWorker(Class<? extends TemporalWorker> workerClass) {
return Optional.ofNullable(configByFQClassName.get(workerClass.getName()));
}

public static Optional<Config> ofImpl(Class<?> workflowOrActivityImplClass) {
return Optional.ofNullable(configByFQClassName.get(workflowOrActivityImplClass.getName()));
}

public static Optional<Config> of(Object workflowOrActivityImpl) {
return ofImpl(workflowOrActivityImpl.getClass());
}

private static boolean storeAs(String className, Config config) {
Config prior = configByFQClassName.put(className, config);
log.info("storing config of {} values as '{}'{}", config.entrySet().size(), className, prior == null ? " (new)" : "");
return prior == null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public GobblinTemporalJobLauncher buildJobLauncher(Properties jobProps)

Class<? extends GobblinTemporalJobLauncher> jobLauncherClass =
(Class<? extends GobblinTemporalJobLauncher>) Class.forName(combinedProps.getProperty(
GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_LAUNCHER, GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_LAUNCHER));
GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_LAUNCHER_CLASS, GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_LAUNCHER_CLASS));
return GobblinConstructorUtils.invokeLongestConstructor(jobLauncherClass, combinedProps,
this.appWorkDir,
this.metadataTags,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.util.nesting.work;

import java.util.Iterator;
import java.util.List;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;


/** Logical sub-sequence of `WORK_ITEM`s, backed for simplicity's sake by an in-memory collection */
@NoArgsConstructor
@RequiredArgsConstructor
public class SeqBackedWorkSpan<WORK_ITEM> implements Workload.WorkSpan<WORK_ITEM> {

@NonNull
private List<WORK_ITEM> elems;
// CAUTION: despite the "warning: @NonNull is meaningless on a primitive @lombok.RequiredArgsConstructor"...
// if removed, no two-arg ctor is generated, so syntax error on `new CollectionBackedTaskSpan(elems, startIndex)`
@NonNull
private int startingIndex;
private transient Iterator<WORK_ITEM> statefulDelegatee = null;

@Override
public int getNumElems() {
return elems.size();
}

@Override
public boolean hasNext() {
if (statefulDelegatee == null) {
statefulDelegatee = elems.iterator();
}
return statefulDelegatee.hasNext();
}

@Override
public WORK_ITEM next() {
if (statefulDelegatee == null) {
throw new IllegalStateException("first call `hasNext()`!");
}
return statefulDelegatee.next();
}

@Override
public String toString() {
return getClassNickname() + "(" + startingIndex + "... {+" + getNumElems() + "})";
}

protected String getClassNickname() {
// return getClass().getSimpleName();
return "WorkSpan";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.util.nesting.work;

import java.util.NoSuchElementException;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import com.fasterxml.jackson.annotation.JsonIgnore;


/** Logical sub-sequence of `WORK_ITEM`s, backed for simplicity's sake by an in-memory collection, *SHARED* w/ other work spans */
@NoArgsConstructor
@RequiredArgsConstructor
public class SeqSliceBackedWorkSpan<WORK_ITEM> implements Workload.WorkSpan<WORK_ITEM> {
private static final int NOT_SET_SENTINEL = -1;

@NonNull private WORK_ITEM[] sharedElems;
// CAUTION: despite the "warning: @NonNull is meaningless on a primitive @lombok.RequiredArgsConstructor"...
// if removed, no two-arg ctor is generated, so syntax error on `new CollectionSliceBackedTaskSpan(elems, startIndex)`
@NonNull private int startingIndex;
@NonNull private int numElements;
private transient volatile int nextElemIndex = NOT_SET_SENTINEL;

@Override
public int getNumElems() {
return getEndingIndex() - startingIndex;
}

@Override
public boolean hasNext() {
if (nextElemIndex == NOT_SET_SENTINEL) {
nextElemIndex = startingIndex; // NOTE: `startingIndex` should be effectively `final` (post-deser) and always >= 0
}
return nextElemIndex < this.getEndingIndex();
}

@Override
public WORK_ITEM next() {
if (nextElemIndex >= startingIndex + numElements) {
throw new NoSuchElementException("index " + nextElemIndex + " >= " + startingIndex + " + " + numElements);
}
return sharedElems[nextElemIndex++];
}

@Override
public String toString() {
return getClassNickname() + "(" + startingIndex + "... {+" + getNumElems() + "})";
}

protected String getClassNickname() {
// return getClass().getSimpleName();
return "WorkSpan";
}

@JsonIgnore // (because no-arg method resembles 'java bean property')
protected final int getEndingIndex() {
return Math.min(startingIndex + numElements, sharedElems.length);
}
}
Loading

0 comments on commit da6b1df

Please sign in to comment.