Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce schema pull request volume #582

Open
wants to merge 8 commits into
base: palantir-cassandra-2.2.18
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 64 additions & 9 deletions src/java/org/apache/cassandra/service/MigrationManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;

import com.palantir.logsafe.SafeArg;
import com.palantir.tracing.CloseableTracer;

import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.util.function.BiFunction;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -62,10 +65,13 @@ public class MigrationManager

private static final RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();

public static final ConcurrentHashMap<UUID, Set<InetAddress>> scheduledSchemaPulls = new ConcurrentHashMap<>();

public static final int MIGRATION_DELAY_IN_MS = 60000;
public static final int MAX_SCHEDULED_SCHEMA_PULL_REQUESTS = 3;

private final List<MigrationListener> listeners = new CopyOnWriteArrayList<>();

private MigrationManager() {}

public void register(MigrationListener listener)
Expand Down Expand Up @@ -103,7 +109,7 @@ private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetA
return;
}

if ((Schema.instance.getVersion() != null && Schema.instance.getVersion().equals(theirVersion)) || !shouldPullSchemaFrom(endpoint))
if ((Schema.instance.getVersion() != null && Schema.instance.getVersion().equals(theirVersion)) || !shouldPullSchemaFrom(endpoint, theirVersion))
{
logger.debug("Not pulling schema because versions match or shouldPullSchemaFrom returned false");
return;
Expand All @@ -113,7 +119,7 @@ private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetA
{
// If we think we may be bootstrapping or have recently started, submit MigrationTask immediately
logger.debug("Submitting migration task for {}", endpoint);
submitMigrationTask(endpoint);
submitMigrationTask(endpoint, theirVersion);
}

if (onChange && Boolean.getBoolean("palantir_cassandra.unsafe_schema_migration"))
Expand All @@ -136,6 +142,7 @@ public void run()
if (epState == null)
{
logger.debug("epState vanished for {}, not submitting migration task", endpoint);
scheduledSchemaPulls.computeIfPresent(theirVersion, removeEndpointFromSchemaPulls(endpoint));
return;
}
VersionedValue value = epState.getApplicationState(ApplicationState.SCHEMA);
Expand All @@ -149,25 +156,42 @@ public void run()
endpoint,
theirVersion,
currentVersion);
scheduledSchemaPulls.computeIfPresent(theirVersion, removeEndpointFromSchemaPulls(endpoint));
return;
}

if (Schema.instance.getVersion().equals(currentVersion))
{
logger.debug("not submitting migration task for {} because our versions match", endpoint);
scheduledSchemaPulls.computeIfPresent(theirVersion, removeEndpointFromSchemaPulls(endpoint));
return;
}
logger.debug("submitting migration task for endpoint {}, endpoint schema version {}, and our schema version",
endpoint,
currentVersion,
Schema.instance.getVersion());
submitMigrationTask(endpoint);
logger.debug("submitting migration task for endpoint {}, endpoint schema version {}, and our schema version {}",
SafeArg.of("endpoint", endpoint),
SafeArg.of("endpointVersion", currentVersion),
SafeArg.of("schemaVersion", Schema.instance.getVersion()));
submitMigrationTask(endpoint, currentVersion);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be theirVersion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/palantir/cassandra/pull/582/files#diff-8191bf68e083d923ddd54125087c550e0fa3136ea0c62d722e79a5d1a1fbe030R149

This should prevent us from having this drift, so I think currentVersion should be fine.

}
};
scheduledSchemaPulls.computeIfAbsent(theirVersion, v -> new HashSet<>()).add(endpoint);
ScheduledExecutors.nonPeriodicTasks.schedule(runnable, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS);
}
}

static BiFunction<UUID, Set<InetAddress>, Set<InetAddress>> removeEndpointFromSchemaPulls(InetAddress endpoint) {
return (v, s) -> {
logger.debug("Removing endpoint from scheduled schema pulls",
SafeArg.of("endpoint", endpoint),
SafeArg.of("schemaVersion", v),
SafeArg.of("scheduledPulls", s));
s.remove(endpoint);
if (!s.isEmpty()) {
return s;
}
return null;
};
}

private static Future<?> submitMigrationTask(InetAddress endpoint)
{
/*
Expand All @@ -177,17 +201,48 @@ private static Future<?> submitMigrationTask(InetAddress endpoint)
return StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint));
}

/**
*
*/
private static Future<?> submitMigrationTask(InetAddress endpoint, UUID theirVersion)
{
/*
* Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are
* running in the gossip stage.
*/
return StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint, theirVersion));
}

public static boolean shouldPullSchemaFrom(InetAddress endpoint)
{
/*
* Don't request schema from nodes with a differnt or unknonw major version (may have incompatible schema)
* Don't request schema from nodes with a differnt or unknown major version (may have incompatible schema)
* Don't request schema from fat clients
*/
return MessagingService.instance().knowsVersion(endpoint)
&& MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version
&& !Gossiper.instance.isGossipOnlyMember(endpoint);
}

public static boolean shouldPullSchemaFrom(InetAddress endpoint, UUID theirVersion)
{
/*
* Don't request schema from nodes with a differnt or unknown major version (may have incompatible schema)
* Don't request schema from fat clients
* Don't request schema from bootstrapping nodes (?)
* Don't request schema if we have scheduled a pull request for that schema version
*/
Set<InetAddress> currentlyScheduledRequests = scheduledSchemaPulls.getOrDefault(theirVersion, Collections.emptySet());
boolean noScheduledRequests = currentlyScheduledRequests.size() < MAX_SCHEDULED_SCHEMA_PULL_REQUESTS
&& !currentlyScheduledRequests.contains(endpoint);
logger.debug("Evaluating schema pull criteria", SafeArg.of("schemaVersion", theirVersion), SafeArg.of("scheduledPulls", currentlyScheduledRequests));
return MessagingService.instance().knowsVersion(endpoint)
&& MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version
&& !Gossiper.instance.isGossipOnlyMember(endpoint)
&& !Schema.emptyVersion.equals(theirVersion)
&& noScheduledRequests;
}

public static boolean isReadyForBootstrap()
{
return ((ThreadPoolExecutor) StageManager.getStage(Stage.MIGRATION)).getActiveCount() == 0;
Expand Down
41 changes: 38 additions & 3 deletions src/java/org/apache/cassandra/service/MigrationTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@
import java.io.IOException;
import java.net.InetAddress;
import java.util.Collection;
import java.util.Optional;
import java.util.UUID;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.palantir.logsafe.SafeArg;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.schema.LegacySchemaTables;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.net.IAsyncCallback;
Expand All @@ -40,10 +44,18 @@ class MigrationTask extends WrappedRunnable
private static final Logger logger = LoggerFactory.getLogger(MigrationTask.class);

private final InetAddress endpoint;
private final Optional<UUID> version;

MigrationTask(InetAddress endpoint)
{
this.endpoint = endpoint;
this.version = Optional.empty();
}

MigrationTask(InetAddress endpoint, UUID version)
{
this.endpoint = endpoint;
this.version = Optional.of(version);
}

public void runMayThrow() throws Exception
Expand All @@ -65,13 +77,14 @@ public void runMayThrow() throws Exception

MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance);

IAsyncCallback<Collection<Mutation>> cb = new IAsyncCallback<Collection<Mutation>>()
IAsyncCallbackWithFailure<Collection<Mutation>> cb = new IAsyncCallbackWithFailure<Collection<Mutation>>()
{
@Override
public void response(MessageIn<Collection<Mutation>> message)
{
try
{
logger.debug("Processing response to schema pull from endpoint", SafeArg.of("endpoint", endpoint));
LegacySchemaTables.mergeSchema(message.payload);
}
catch (IOException e)
Expand All @@ -82,13 +95,35 @@ public void response(MessageIn<Collection<Mutation>> message)
{
logger.error("Configuration exception merging remote schema", e);
}
finally
{
// always attempt to clean up our outstanding schema pull request if created with a version
version.ifPresent(v -> {
logger.debug("Successfully processed response to schema pull",
SafeArg.of("endpoint", endpoint),
SafeArg.of("schemaVersion", v));
MigrationManager.scheduledSchemaPulls.computeIfPresent(v, MigrationManager.removeEndpointFromSchemaPulls(endpoint));
});
}
}

@Override
public void onFailure(InetAddress from)
{
// always attempt to clean up our outstanding schema pull request if created with a version
version.ifPresent(v -> {
logger.debug("Timed out waiting for response to schema pull",
SafeArg.of("endpoint", endpoint),
SafeArg.of("schemaVersion", v));
MigrationManager.scheduledSchemaPulls.computeIfPresent(v, MigrationManager.removeEndpointFromSchemaPulls(endpoint));
});
}

public boolean isLatencyForSnitch()
{
return false;
}
};
MessagingService.instance().sendRR(message, endpoint, cb);
}
MessagingService.instance().sendRRWithFailure(message, endpoint, cb);
}
}