Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1700] Flink supports fallback to vanilla Flink built-in shuffle implementation #2932

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.*;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -129,12 +128,8 @@ public ShuffleIOOwnerContext createShuffleIOOwnerContext(
nettyGroup.addGroup(METRIC_GROUP_INPUT));
}

public void releasePartitionsLocally(Collection<ResultPartitionID> partitionIds) {
throw new FlinkRuntimeException("Not implemented yet.");
}

public Collection<ResultPartitionID> getPartitionsOccupyingLocalResources() {
SteNicholas marked this conversation as resolved.
Show resolved Hide resolved
return new ArrayList<>();
return resultPartitionManager.getUnreleasedPartitions();
}

@VisibleForTesting
Expand Down Expand Up @@ -176,13 +171,17 @@ public List<IndexedInputGate> createInputGates(
IndexedInputGate[] inputGates = new IndexedInputGate[inputGateDescriptors.size()];
for (int gateIndex = 0; gateIndex < inputGates.length; gateIndex++) {
InputGateDeploymentDescriptor igdd = inputGateDescriptors.get(gateIndex);
IndexedInputGate inputGate = createInputGateInternal(ownerContext, gateIndex, igdd);
IndexedInputGate inputGate =
createInputGateInternal(ownerContext, producerStateProvider, gateIndex, igdd);
inputGates[gateIndex] = inputGate;
}
return Arrays.asList(inputGates);
}
}

abstract IndexedInputGate createInputGateInternal(
ShuffleIOOwnerContext ownerContext, int gateIndex, InputGateDeploymentDescriptor igdd);
ShuffleIOOwnerContext ownerContext,
PartitionProducerStateProvider producerStateProvider,
int gateIndex,
InputGateDeploymentDescriptor igdd);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,24 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import javax.annotation.Nullable;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.io.network.NettyShuffleServiceFactory;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.shuffle.JobShuffleContext;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ProducerDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
Expand All @@ -43,16 +51,20 @@

import org.apache.celeborn.client.LifecycleManager;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.exception.CelebornIOException;
import org.apache.celeborn.common.util.JavaUtils;
import org.apache.celeborn.common.util.ThreadUtils;
import org.apache.celeborn.plugin.flink.fallback.ShuffleFallbackPolicyRunner;
import org.apache.celeborn.plugin.flink.utils.FlinkUtils;

public class RemoteShuffleMaster implements ShuffleMaster<RemoteShuffleDescriptor> {
public class RemoteShuffleMaster implements ShuffleMaster<ShuffleDescriptor> {
private static final Logger LOG = LoggerFactory.getLogger(RemoteShuffleMaster.class);
private final CelebornConf conf;
private final ShuffleMasterContext shuffleMasterContext;
// Flink JobId -> Celeborn register shuffleIds
private final Map<JobID, Set<Integer>> jobShuffleIds = JavaUtils.newConcurrentHashMap();
private final ConcurrentHashMap.KeySetView<JobID, Boolean> nettyJobIds =
ConcurrentHashMap.newKeySet();
private String celebornAppId;
private volatile LifecycleManager lifecycleManager;
private final ShuffleTaskInfo shuffleTaskInfo = new ShuffleTaskInfo();
Expand All @@ -62,20 +74,26 @@ public class RemoteShuffleMaster implements ShuffleMaster<RemoteShuffleDescripto
"celeborn-client-remote-shuffle-master-executor");
private final ResultPartitionAdapter resultPartitionDelegation;
private final long lifecycleManagerTimestamp;
private final NettyShuffleServiceFactory nettyShuffleServiceFactory;
private volatile NettyShuffleMaster nettyShuffleMaster;

public RemoteShuffleMaster(
ShuffleMasterContext shuffleMasterContext, ResultPartitionAdapter resultPartitionDelegation) {
ShuffleMasterContext shuffleMasterContext,
ResultPartitionAdapter resultPartitionDelegation,
@Nullable NettyShuffleServiceFactory nettyShuffleServiceFactory) {
Configuration configuration = shuffleMasterContext.getConfiguration();
checkShuffleConfig(configuration);
this.conf = FlinkUtils.toCelebornConf(configuration);
this.shuffleMasterContext = shuffleMasterContext;
this.resultPartitionDelegation = resultPartitionDelegation;
this.lifecycleManagerTimestamp = System.currentTimeMillis();
this.nettyShuffleServiceFactory = nettyShuffleServiceFactory;
}

@Override
public void registerJob(JobShuffleContext context) {
JobID jobID = context.getJobId();
LOG.info("Register job: {}.", jobID);
if (lifecycleManager == null) {
synchronized (RemoteShuffleMaster.class) {
if (lifecycleManager == null) {
Expand All @@ -87,17 +105,31 @@ public void registerJob(JobShuffleContext context) {
}
}

Set<Integer> previousShuffleIds = jobShuffleIds.putIfAbsent(jobID, new HashSet<>());
LOG.info("Register job: {}.", jobID);
if (previousShuffleIds != null) {
throw new RuntimeException("Duplicated registration job: " + jobID);
try {
if (nettyShuffleServiceFactory != null
&& ShuffleFallbackPolicyRunner.applyFallbackPolicies(context, conf, lifecycleManager)) {
LOG.warn("Fallback to vanilla Flink NettyShuffleMaster for job: {}.", jobID);
nettyJobIds.add(jobID);
nettyShuffleMaster().registerJob(context);
} else {
Set<Integer> previousShuffleIds = jobShuffleIds.putIfAbsent(jobID, new HashSet<>());
if (previousShuffleIds != null) {
throw new RuntimeException("Duplicated registration job: " + jobID);
}
shuffleResourceTracker.registerJob(context);
}
} catch (CelebornIOException e) {
throw new RuntimeException(e);
}
shuffleResourceTracker.registerJob(context);
}

@Override
public void unregisterJob(JobID jobID) {
LOG.info("Unregister job: {}.", jobID);
if (nettyJobIds.remove(jobID)) {
nettyShuffleMaster().unregisterJob(jobID);
return;
}
Set<Integer> shuffleIds = jobShuffleIds.remove(jobID);
if (shuffleIds != null) {
executor.execute(
Expand All @@ -116,46 +148,56 @@ public void unregisterJob(JobID jobID) {
}

@Override
public CompletableFuture<RemoteShuffleDescriptor> registerPartitionWithProducer(
public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer(
JobID jobID, PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) {
return CompletableFuture.supplyAsync(
() -> {
Set<Integer> shuffleIds = jobShuffleIds.get(jobID);
if (shuffleIds == null) {
throw new RuntimeException("Can not find job in lifecycleManager, job: " + jobID);
}
if (nettyJobIds.contains(jobID)) {
try {
return nettyShuffleMaster()
.registerPartitionWithProducer(jobID, partitionDescriptor, producerDescriptor)
.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
} else {
Set<Integer> shuffleIds = jobShuffleIds.get(jobID);
if (shuffleIds == null) {
throw new RuntimeException("Can not find job in lifecycleManager, job: " + jobID);
}

FlinkResultPartitionInfo resultPartitionInfo =
new FlinkResultPartitionInfo(jobID, partitionDescriptor, producerDescriptor);
ShuffleResourceDescriptor shuffleResourceDescriptor =
shuffleTaskInfo.genShuffleResourceDescriptor(
resultPartitionInfo.getShuffleId(),
resultPartitionInfo.getTaskId(),
resultPartitionInfo.getAttemptId());
FlinkResultPartitionInfo resultPartitionInfo =
new FlinkResultPartitionInfo(jobID, partitionDescriptor, producerDescriptor);
ShuffleResourceDescriptor shuffleResourceDescriptor =
shuffleTaskInfo.genShuffleResourceDescriptor(
resultPartitionInfo.getShuffleId(),
resultPartitionInfo.getTaskId(),
resultPartitionInfo.getAttemptId());

synchronized (shuffleIds) {
shuffleIds.add(shuffleResourceDescriptor.getShuffleId());
}
synchronized (shuffleIds) {
shuffleIds.add(shuffleResourceDescriptor.getShuffleId());
}

RemoteShuffleResource remoteShuffleResource =
new RemoteShuffleResource(
lifecycleManager.getHost(),
lifecycleManager.getPort(),
lifecycleManagerTimestamp,
shuffleResourceDescriptor);
RemoteShuffleResource remoteShuffleResource =
new RemoteShuffleResource(
lifecycleManager.getHost(),
lifecycleManager.getPort(),
lifecycleManagerTimestamp,
shuffleResourceDescriptor);

shuffleResourceTracker.addPartitionResource(
jobID,
shuffleResourceDescriptor.getShuffleId(),
shuffleResourceDescriptor.getPartitionId(),
resultPartitionInfo.getResultPartitionId());
shuffleResourceTracker.addPartitionResource(
jobID,
shuffleResourceDescriptor.getShuffleId(),
shuffleResourceDescriptor.getPartitionId(),
resultPartitionInfo.getResultPartitionId());

return new RemoteShuffleDescriptor(
celebornAppId,
jobID,
resultPartitionInfo.getShuffleId(),
resultPartitionInfo.getResultPartitionId(),
remoteShuffleResource);
return new RemoteShuffleDescriptor(
celebornAppId,
jobID,
resultPartitionInfo.getShuffleId(),
resultPartitionInfo.getResultPartitionId(),
remoteShuffleResource);
}
},
executor);
}
Expand All @@ -164,30 +206,34 @@ public CompletableFuture<RemoteShuffleDescriptor> registerPartitionWithProducer(
public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) {
executor.execute(
() -> {
if (!(shuffleDescriptor instanceof RemoteShuffleDescriptor)) {
if (shuffleDescriptor instanceof RemoteShuffleDescriptor) {
try {
RemoteShuffleDescriptor descriptor = (RemoteShuffleDescriptor) shuffleDescriptor;
RemoteShuffleResource shuffleResource = descriptor.getShuffleResource();
ShuffleResourceDescriptor resourceDescriptor =
shuffleResource.getMapPartitionShuffleDescriptor();
LOG.debug("release partition resource: {}.", resourceDescriptor);
lifecycleManager.releasePartition(
resourceDescriptor.getShuffleId(), resourceDescriptor.getPartitionId());
shuffleResourceTracker.removePartitionResource(
descriptor.getJobId(),
resourceDescriptor.getShuffleId(),
resourceDescriptor.getPartitionId());
} catch (Throwable throwable) {
// it is not a problem if we failed to release the target data partition
// because the session timeout mechanism will do the work for us latter
LOG.debug("Failed to release data partition {}.", shuffleDescriptor, throwable);
}
} else if (shuffleDescriptor instanceof NettyShuffleDescriptor) {
nettyShuffleMaster().releasePartitionExternally(shuffleDescriptor);
} else {
LOG.error(
"Only RemoteShuffleDescriptor is supported {}.",
shuffleDescriptor.getClass().getName());
"Unsupported shuffle descriptor {}. Only supports {} and {}",
shuffleDescriptor.getClass().getName(),
RemoteShuffleDescriptor.class.getName(),
NettyShuffleDescriptor.class.getName());
shuffleMasterContext.onFatalError(
new RuntimeException("Illegal shuffle descriptor type."));
return;
}
try {
RemoteShuffleDescriptor descriptor = (RemoteShuffleDescriptor) shuffleDescriptor;
RemoteShuffleResource shuffleResource = descriptor.getShuffleResource();
ShuffleResourceDescriptor resourceDescriptor =
shuffleResource.getMapPartitionShuffleDescriptor();
LOG.debug("release partition resource: {}.", resourceDescriptor);
lifecycleManager.releasePartition(
resourceDescriptor.getShuffleId(), resourceDescriptor.getPartitionId());
shuffleResourceTracker.removePartitionResource(
descriptor.getJobId(),
resourceDescriptor.getShuffleId(),
resourceDescriptor.getPartitionId());
} catch (Throwable throwable) {
// it is not a problem if we failed to release the target data partition
// because the session timeout mechanism will do the work for us latter
LOG.debug("Failed to release data partition {}.", shuffleDescriptor, throwable);
}
});
}
Expand Down Expand Up @@ -224,11 +270,16 @@ public MemorySize computeShuffleMemorySizeForTask(
@Override
public void close() throws Exception {
try {
nettyJobIds.clear();
jobShuffleIds.clear();
LifecycleManager manager = lifecycleManager;
if (null != manager) {
manager.stop();
}
if (nettyShuffleMaster != null) {
nettyShuffleMaster.close();
nettyShuffleMaster = null;
}
} catch (Exception e) {
LOG.warn("Encounter exception when shutdown: {}", e.getMessage(), e);
}
Expand All @@ -254,4 +305,20 @@ private void checkShuffleConfig(Configuration configuration) {
BatchShuffleMode.ALL_EXCHANGES_BLOCKING.name()));
}
}

private NettyShuffleMaster nettyShuffleMaster() {
if (nettyShuffleMaster == null) {
synchronized (this) {
if (nettyShuffleMaster == null) {
nettyShuffleMaster = nettyShuffleServiceFactory.createShuffleMaster(shuffleMasterContext);
}
}
}
return nettyShuffleMaster;
}

@VisibleForTesting
public ConcurrentHashMap.KeySetView<JobID, Boolean> nettyJobIds() {
return nettyJobIds;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.plugin.flink.fallback;

import org.apache.flink.runtime.shuffle.JobShuffleContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.celeborn.client.LifecycleManager;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.protocol.FallbackPolicy;

public class ForceFallbackPolicy implements ShuffleFallbackPolicy {

private static final Logger LOG = LoggerFactory.getLogger(ForceFallbackPolicy.class);

public static final ForceFallbackPolicy INSTANCE = new ForceFallbackPolicy();
/**
* If celeborn.client.flink.shuffle.fallback.policy is ALWAYS, fallback to flink built-in shuffle
* implementation.
*
* @param shuffleContext The job shuffle context of Flink.
* @param celebornConf The configuration of Celeborn.
* @param lifecycleManager The {@link LifecycleManager} of Celeborn.
* @return Return true if celeborn.client.flink.shuffle.fallback.policy is ALWAYS, otherwise
* false.
*/
@Override
public boolean needFallback(
JobShuffleContext shuffleContext,
CelebornConf celebornConf,
LifecycleManager lifecycleManager) {
FallbackPolicy shuffleFallbackPolicy = celebornConf.flinkShuffleFallbackPolicy();
if (FallbackPolicy.ALWAYS.equals(shuffleFallbackPolicy)) {
LOG.warn(
"{} is {}, forcibly fallback to flink built-in shuffle implementation.",
CelebornConf.FLINK_SHUFFLE_FALLBACK_POLICY().key(),
FallbackPolicy.ALWAYS.name());
}
return FallbackPolicy.ALWAYS.equals(shuffleFallbackPolicy);
}
}
Loading
Loading