Skip to content

Commit

Permalink
[GOBBLIN-1944] Add gobblin-temporal load generator for a single subsu…
Browse files Browse the repository at this point in the history
…ming super-workflow with a configurable number of activities nested beneath (#3815)

* Add gobblin-temporal load generator for a single subsuming super-workflow with a configurable number of activities nested beneath

* Update per findbugs advice

* Improve processing of int props
  • Loading branch information
phet authored Nov 1, 2023
1 parent da6b1df commit 96092f7
Show file tree
Hide file tree
Showing 8 changed files with 357 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.loadgen.activity;

import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import org.apache.gobblin.temporal.loadgen.work.IllustrationItem;


/**
* Activity for processing {@link IllustrationItem}s
*
* CAUTION/FINDING: an `@ActivityInterface` must not be parameterized (e.g. here, by WORK_ITEM), as doing so results in:
* io.temporal.failure.ApplicationFailure: message='class java.util.LinkedHashMap cannot be cast to class
* org.apache.gobblin.temporal.loadgen.work.IllustrationItem', type='java.lang.ClassCastException'
*/
@ActivityInterface
public interface IllustrationItemActivity {
@ActivityMethod
String handleItem(IllustrationItem item);
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.loadgen.activity.impl;


import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.temporal.loadgen.activity.IllustrationItemActivity;
import org.apache.gobblin.temporal.loadgen.work.IllustrationItem;


@Slf4j
public class IllustrationItemActivityImpl implements IllustrationItemActivity {
@Override
public String handleItem(final IllustrationItem item) {
log.info("Now illustrating - '" + item.getName() + "'");
return item.getName();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.loadgen.launcher;

import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import io.temporal.client.WorkflowOptions;
import org.apache.hadoop.fs.Path;

import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.JobLauncher;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobScheduler;
import org.apache.gobblin.temporal.loadgen.work.IllustrationItem;
import org.apache.gobblin.temporal.loadgen.work.SimpleGeneratedWorkload;
import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
import org.apache.gobblin.temporal.util.nesting.work.Workload;
import org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow;
import org.apache.gobblin.util.PropertiesUtils;

import static org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX;


/**
* A {@link JobLauncher} for the initial triggering of a Temporal workflow that generates arbitrary load of many
* activities nested beneath a single subsuming super-workflow. see: {@link NestingExecWorkflow}
*
* <p>
* This class is instantiated by the {@link GobblinTemporalJobScheduler#buildJobLauncher(Properties)} on every job submission to launch the Gobblin job.
* The actual task execution happens in the {@link GobblinTemporalTaskRunner}, usually in a different process.
* </p>
*/
@Alpha
@Slf4j
public class GenArbitraryLoadJobLauncher extends GobblinTemporalJobLauncher {
public static final String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_NUM_ACTIVITIES = GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX + "num.activities";
public static final String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_MAX_BRANCHES_PER_TREE = GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX + "max.branches.per.tree";
public static final String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_MAX_SUB_TREES_PER_TREE = GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX + "max.sub.trees.per.tree";

public GenArbitraryLoadJobLauncher(
Properties jobProps,
Path appWorkDir,
List<? extends Tag<?>> metadataTags,
ConcurrentHashMap<String, Boolean> runningMap
) throws Exception {
super(jobProps, appWorkDir, metadataTags, runningMap);
}

@Override
public void submitJob(List<WorkUnit> workunits) {
int numActivities = PropertiesUtils.getRequiredPropAsInt(this.jobProps, GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_NUM_ACTIVITIES);
int maxBranchesPerTree = PropertiesUtils.getRequiredPropAsInt(this.jobProps, GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_MAX_BRANCHES_PER_TREE);
int maxSubTreesPerTree = PropertiesUtils.getRequiredPropAsInt(this.jobProps, GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_MAX_SUB_TREES_PER_TREE);

Workload<IllustrationItem> workload = SimpleGeneratedWorkload.createAs(numActivities);
WorkflowOptions options = WorkflowOptions.newBuilder().setTaskQueue(this.queueName).build();

// WARNING: although type param must agree w/ that of `workload`, it's entirely unverified by type checker!
// ...and more to the point, mismatch would occur at runtime (`performWorkload` on the workflow type given to the stub)!
NestingExecWorkflow<IllustrationItem> workflow = this.client.newWorkflowStub(NestingExecWorkflow.class, options);

workflow.performWorkload(WorkflowAddr.ROOT, workload, 0, maxBranchesPerTree, maxSubTreesPerTree, Optional.empty());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.loadgen.work;

import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;


/** Generally, this would specify what "work" needs performing plus how to perform, but for now merely a unique name (to log) */
@Data
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
@RequiredArgsConstructor
public class IllustrationItem {
@NonNull
private String name;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.loadgen.work;

import com.fasterxml.jackson.annotation.JsonIgnore;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.AccessLevel;
import org.apache.gobblin.temporal.util.nesting.work.SeqBackedWorkSpan;
import org.apache.gobblin.temporal.util.nesting.work.Workload;


/** Example, illustration workload that synthesizes its work items; genuine {@link Workload}s generally arise from query/calc */
@lombok.AllArgsConstructor(access = AccessLevel.PRIVATE)
@lombok.NoArgsConstructor // IMPORTANT: for jackson (de)serialization
@lombok.ToString
public class SimpleGeneratedWorkload implements Workload<IllustrationItem> {
private int numItems;

/** Factory method */
public static SimpleGeneratedWorkload createAs(final int numItems) {
return new SimpleGeneratedWorkload(numItems);
}

@Override
public Optional<Workload.WorkSpan<IllustrationItem>> getSpan(final int startIndex, final int numElements) {
if (startIndex >= numItems || startIndex < 0) {
return Optional.empty();
} else {
List<IllustrationItem> elems = IntStream.range(startIndex, Math.min(startIndex + numElements, numItems))
.mapToObj(n -> new IllustrationItem("item-" + n + "-of-" + numItems))
.collect(Collectors.toList());
return Optional.of(new SeqBackedWorkSpan<>(elems, startIndex));
}
}

@Override
public boolean isIndexKnownToExceed(final int index) {
return isDefiniteSize() && index >= numItems;
}

@Override
@JsonIgnore // (because no-arg method resembles 'java bean property')
public boolean isDefiniteSize() {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.loadgen.worker;

import com.typesafe.config.Config;
import io.temporal.client.WorkflowClient;
import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker;
import org.apache.gobblin.temporal.loadgen.activity.impl.IllustrationItemActivityImpl;
import org.apache.gobblin.temporal.loadgen.workflow.impl.NestingExecOfIllustrationItemActivityWorkflowImpl;


/** Worker for {@link NestingExecOfIllustrationItemActivityWorkflowImpl} and said activity impl */
public class ArbitraryLoadWorker extends AbstractTemporalWorker {
public ArbitraryLoadWorker(Config config, WorkflowClient workflowClient) {
super(config, workflowClient);
}

@Override
protected Class<?>[] getWorkflowImplClasses() {
return new Class[] { NestingExecOfIllustrationItemActivityWorkflowImpl.class };
}

@Override
protected Object[] getActivityImplInstances() {
return new Object[] { new IllustrationItemActivityImpl() };
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.loadgen.workflow.impl;

import io.temporal.activity.ActivityOptions;
import io.temporal.common.RetryOptions;
import io.temporal.workflow.Async;
import io.temporal.workflow.Promise;
import io.temporal.workflow.Workflow;
import java.time.Duration;
import org.apache.gobblin.temporal.loadgen.activity.IllustrationItemActivity;
import org.apache.gobblin.temporal.loadgen.work.IllustrationItem;
import org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWorkflowImpl;


/** {@link org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow} for {@link IllustrationItem} */
public class NestingExecOfIllustrationItemActivityWorkflowImpl
extends AbstractNestingExecWorkflowImpl<IllustrationItem, String> {

// RetryOptions specify how to automatically handle retries when Activities fail.
private static final RetryOptions ACTIVITY_RETRY_OPTS = RetryOptions.newBuilder()
.setInitialInterval(Duration.ofSeconds(1))
.setMaximumInterval(Duration.ofSeconds(100))
.setBackoffCoefficient(2)
.setMaximumAttempts(3)
.build();

private static final ActivityOptions ACTIVITY_OPTS = ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(10))
.setRetryOptions(ACTIVITY_RETRY_OPTS)
.build();

private final IllustrationItemActivity activityStub =
Workflow.newActivityStub(IllustrationItemActivity.class, ACTIVITY_OPTS);

@Override
protected Promise<String> launchAsyncActivity(final IllustrationItem item) {
return Async.function(activityStub::handleItem, item);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ public static int getPropAsInt(Properties properties, String key, int defaultVal
return Integer.parseInt(properties.getProperty(key, Integer.toString(defaultValue)));
}

/** @throws {@link NullPointerException} when `key` not in `properties` */
public static int getRequiredPropAsInt(Properties properties, String key) {
String value = properties.getProperty(key);
Preconditions.checkNotNull(value, "'" + key + "' must be set (to an integer)");
return Integer.parseInt(value);
}

public static long getPropAsLong(Properties properties, String key, long defaultValue) {
return Long.parseLong(properties.getProperty(key, Long.toString(defaultValue)));
}
Expand Down

0 comments on commit 96092f7

Please sign in to comment.