Skip to content

Commit

Permalink
[controller][admin-tool][vpj][test] Add a multi-region config to cont…
Browse files Browse the repository at this point in the history
…rollers and infer many configs

1. Use multi.region config to control certain features instead of controlling them via explicit configs.
    * Native-replication - Enabled in multi-region mode. Disabled in single-region mode.
    * Admin channel consumption - Enabled in multi-region mode. Disabled in single-region mode.
        * This is still allowed to be disabled via LiveConfig for store migration purposes.
    * Controller allowing BATCH push via API - Disabled in multi-region mode. Enabled in single-region mode.
    * Active-active replication enabled on a controller - Enabled in multi-region mode. Disabled in single-region mode.
2. Commands and code to enable and disable Native-replication for a cluster has been removed as that is the only mode.
3. The following configs are now obsolete, and removed:
    * enable.native.replication.for.batch.only
    * enable.native.replication.for.batch.only
    * enable.native.replication.for.hybrid
    * enable.native.replication.as.default.for.batch.only
    * enable.native.replication.as.default.for.hybrid
    * enable.active.active.replication.as.default.for.batch.only.store
    * enable.active.active.replication.as.default.for.batch.only.store
    * admin.topic.remote.consumption.enabled
    * child.controller.admin.topic.consumption.enabled
        * Only removed from controller config
        * It is still used in LiveConfigs during store migration
    * active.active.enabled.on.controller
    * controller.enable.batch.push.from.admin.in.child
4. The following configs were unused and now removed:
    * kafka.min.log.compaction.lag.ms
    * topic.cleanup.send.concurrent.delete.requests.enabled
    * topic.deletion.status.poll.interval.ms
    * topic.creation.throttling.time.window.ms
    * topic.manager.kafka.operation.timeout.ms
    * admin.consumption.timeout.minute
    * amplification.factor
    * server.ingestion.isolation.metric.request.timeout.seconds
  • Loading branch information
nisargthakkar committed Jul 24, 2024
1 parent 22a3b29 commit ccc4583
Show file tree
Hide file tree
Showing 59 changed files with 983 additions and 1,493 deletions.
756 changes: 577 additions & 179 deletions clients/venice-admin-tool/README

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -471,12 +471,6 @@ public static void main(String[] args) throws Exception {
case REMOVE_FROM_STORE_ACL:
removeFromStoreAcl(cmd);
break;
case ENABLE_NATIVE_REPLICATION_FOR_CLUSTER:
enableNativeReplicationForCluster(cmd);
break;
case DISABLE_NATIVE_REPLICATION_FOR_CLUSTER:
disableNativeReplicationForCluster(cmd);
break;
case ENABLE_ACTIVE_ACTIVE_REPLICATION_FOR_CLUSTER:
enableActiveActiveReplicationForCluster(cmd);
break;
Expand Down Expand Up @@ -1810,7 +1804,7 @@ public static void checkMigrationStatus(
printSystemStoreMigrationStatus(destControllerClient, storeName, printFunction);
} else {
// This is a parent controller
System.err.println("\n=================== Parent Controllers ====================");
printFunction.apply("\n=================== Parent Controllers ====================");
printMigrationStatus(srcControllerClient, storeName, printFunction);
printMigrationStatus(destControllerClient, storeName, printFunction);

Expand All @@ -1821,7 +1815,7 @@ public static void checkMigrationStatus(
Map<String, ControllerClient> destChildControllerClientMap = getControllerClientMap(destClusterName, response);

for (Map.Entry<String, ControllerClient> entry: srcChildControllerClientMap.entrySet()) {
System.err.println("\n\n=================== Child Datacenter " + entry.getKey() + " ====================");
printFunction.apply("\n\n=================== Child Datacenter " + entry.getKey() + " ====================");

ControllerClient srcChildController = entry.getValue();
ControllerClient destChildController = destChildControllerClientMap.get(entry.getKey());
Expand Down Expand Up @@ -2586,34 +2580,6 @@ private static void removeFromStoreAcl(CommandLine cmd) throws Exception {
}
}

private static void enableNativeReplicationForCluster(CommandLine cmd) {
String storeType = getRequiredArgument(cmd, Arg.STORE_TYPE);
String sourceRegionParam = getOptionalArgument(cmd, Arg.NATIVE_REPLICATION_SOURCE_FABRIC);
Optional<String> sourceRegion =
StringUtils.isEmpty(sourceRegionParam) ? Optional.empty() : Optional.of(sourceRegionParam);
String regionsFilterParam = getOptionalArgument(cmd, Arg.REGIONS_FILTER);
Optional<String> regionsFilter =
StringUtils.isEmpty(regionsFilterParam) ? Optional.empty() : Optional.of(regionsFilterParam);

ControllerResponse response =
controllerClient.configureNativeReplicationForCluster(true, storeType, sourceRegion, regionsFilter);
printObject(response);
}

private static void disableNativeReplicationForCluster(CommandLine cmd) {
String storeType = getRequiredArgument(cmd, Arg.STORE_TYPE);
String sourceFabricParam = getOptionalArgument(cmd, Arg.NATIVE_REPLICATION_SOURCE_FABRIC);
Optional<String> sourceFabric =
StringUtils.isEmpty(sourceFabricParam) ? Optional.empty() : Optional.of(sourceFabricParam);
String regionsFilterParam = getOptionalArgument(cmd, Arg.REGIONS_FILTER);
Optional<String> regionsFilter =
StringUtils.isEmpty(regionsFilterParam) ? Optional.empty() : Optional.of(regionsFilterParam);

ControllerResponse response =
controllerClient.configureNativeReplicationForCluster(false, storeType, sourceFabric, regionsFilter);
printObject(response);
}

private static void enableActiveActiveReplicationForCluster(CommandLine cmd) {
String storeType = getRequiredArgument(cmd, Arg.STORE_TYPE);
String regionsFilterParam = getOptionalArgument(cmd, Arg.REGIONS_FILTER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,16 +389,6 @@ public enum Command {
"remove-from-store-acl", "Remove a principal from ACL's for an existing store",
new Arg[] { URL, STORE, PRINCIPAL }, new Arg[] { CLUSTER, READABILITY, WRITEABILITY }
),
ENABLE_NATIVE_REPLICATION_FOR_CLUSTER(
"enable-native-replication-for-cluster",
"enable native replication for certain stores (batch-only, hybrid-only, incremental-push, hybrid-or-incremental, all) in a cluster",
new Arg[] { URL, STORE_TYPE }, new Arg[] { CLUSTER, REGIONS_FILTER, NATIVE_REPLICATION_SOURCE_FABRIC }
),
DISABLE_NATIVE_REPLICATION_FOR_CLUSTER(
"disable-native-replication-for-cluster",
"disable native replication for certain stores (batch-only, hybrid-only, incremental-push, hybrid-or-incremental, all) in a cluster",
new Arg[] { URL, CLUSTER, STORE_TYPE }, new Arg[] { REGIONS_FILTER, NATIVE_REPLICATION_SOURCE_FABRIC }
),
ENABLE_ACTIVE_ACTIVE_REPLICATION_FOR_CLUSTER(
"enable-active-active-replication-for-cluster",
"enable active active replication for certain stores (batch-only, hybrid-only, incremental-push, hybrid-or-incremental, all) in a cluster",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static com.linkedin.venice.ConfigKeys.KAFKA_PRODUCER_DELIVERY_TIMEOUT_MS;
import static com.linkedin.venice.ConfigKeys.KAFKA_PRODUCER_REQUEST_TIMEOUT_MS;
import static com.linkedin.venice.ConfigKeys.KAFKA_PRODUCER_RETRIES_CONFIG;
import static com.linkedin.venice.ConfigKeys.MULTI_REGION;
import static com.linkedin.venice.ConfigKeys.VENICE_PARTITIONERS;
import static com.linkedin.venice.VeniceConstants.DEFAULT_SSL_FACTORY_CLASS_NAME;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.ALLOW_DUPLICATE_KEY;
Expand Down Expand Up @@ -47,7 +48,6 @@
import static com.linkedin.venice.hadoop.VenicePushJobConstants.KEY_FIELD_PROP;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.LEGACY_AVRO_KEY_FIELD_PROP;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.LEGACY_AVRO_VALUE_FIELD_PROP;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.MULTI_REGION;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.NON_CRITICAL_EXCEPTION;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.NOT_SET;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.PARENT_CONTROLLER_REGION_NAME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,6 @@ private VenicePushJobConstants() {
public static final String REWIND_EPOCH_TIME_BUFFER_IN_SECONDS_OVERRIDE =
"rewind.epoch.time.buffer.in.seconds.override";

/**
* This config specifies if Venice is deployed in a multi-region mode
*/
public static final String MULTI_REGION = "multi.region";

/**
* In single-region mode, this must be a comma-separated list of child controller URLs or {@literal d2://<d2ServiceNameForChildController>}
* In multi-region mode, it must be a comma-separated list of parent controller URLs or {@literal d2://<d2ServiceNameForParentController>}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.linkedin.venice.hadoop;

import static com.linkedin.venice.ConfigKeys.MULTI_REGION;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.D2_ZK_HOSTS_PREFIX;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.KAFKA_INPUT_BROKER_URL;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.KAFKA_INPUT_TOPIC;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.MULTI_REGION;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.REWIND_EPOCH_TIME_IN_SECONDS_OVERRIDE;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.SOURCE_GRID_FABRIC;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.SOURCE_KAFKA;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package com.linkedin.venice.hadoop;

import static com.linkedin.venice.ConfigKeys.MULTI_REGION;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.COMPRESSION_METRIC_COLLECTION_ENABLED;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.COMPRESSION_STRATEGY;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.D2_ZK_HOSTS_PREFIX;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.INPUT_PATH_PROP;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.KEY_FIELD_PROP;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.MULTI_REGION;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.POLL_JOB_STATUS_INTERVAL_MS;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.PUSH_JOB_STATUS_UPLOAD_ENABLE;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.SOURCE_GRID_FABRIC;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.venice.hadoop;

import static com.linkedin.venice.ConfigKeys.MULTI_REGION;
import static com.linkedin.venice.hadoop.VenicePushJob.getExecutionStatusFromControllerResponse;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.CONTROLLER_REQUEST_RETRY_ATTEMPTS;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.D2_ZK_HOSTS_PREFIX;
Expand All @@ -13,7 +14,6 @@
import static com.linkedin.venice.hadoop.VenicePushJobConstants.KEY_FIELD_PROP;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.LEGACY_AVRO_KEY_FIELD_PROP;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.LEGACY_AVRO_VALUE_FIELD_PROP;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.MULTI_REGION;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.PARENT_CONTROLLER_REGION_NAME;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.REPUSH_TTL_ENABLE;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.REPUSH_TTL_SECONDS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ enable.hybrid.push.ssl.whitelist=false
default.partition.size=100
min.active.replica=1
kafka.replication.factor=1
child.cluster.allowlist=false
child.cluster.allowlist=
default.replica.factor=1
controller.add.version.via.admin.protocol=true
controller.ssl.enabled=false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@ public class VeniceConstants {

public static final String SYSTEM_PROPERTY_FOR_APP_RUNNING_REGION = "com.linkedin.app.env";

// public static final String TIMESTAMP_FIELD_NAME = "timestamp"; //
//
// public static final String REPLICATION_CHECKPOINT_VECTOR_FIELD = "replication_checkpoint_vector";

/**
* This is a sentinel value to be used in TopicSwitch message rewindStartTimestamp field between controller and server.
* When controller specifies this, Leader server nodes will calculate the rewind start time itself.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,6 @@ private ConfigKeys() {
*/
public static final String KAFKA_LOG_COMPACTION_FOR_HYBRID_STORES = "kafka.log.compaction.for.hybrid.stores";

/**
* For log compaction enabled topics, this config will define the minimum time a message will remain uncompacted in the log.
*/
public static final String KAFKA_MIN_LOG_COMPACTION_LAG_MS = "kafka.min.log.compaction.lag.ms";

/**
* The minimum number of in sync replicas to set for store version topics.
*
Expand Down Expand Up @@ -144,39 +139,6 @@ private ConfigKeys() {
*/
public static final String KAFKA_REPLICATION_FACTOR_RT_TOPICS = "kafka.replication.factor.rt.topics";

/**
* TODO: the following 3 configs will be deprecated after the native replication migration is changed to a two-step
* process: 1. Turn on the cluster level config that takes care of newly created stores; 2. Run admin command
* to convert existing stores to native replication.
*/
/**
* Cluster-level config to enable native replication for all batch-only stores.
*/
public static final String ENABLE_NATIVE_REPLICATION_FOR_BATCH_ONLY = "enable.native.replication.for.batch.only";

/**
* Cluster-level config to enable native replication for all hybrid stores.
*/
public static final String ENABLE_NATIVE_REPLICATION_FOR_HYBRID = "enable.native.replication.for.hybrid";

/**
* Cluster-level config to enable native replication for new batch-only stores.
*/
public static final String ENABLE_NATIVE_REPLICATION_AS_DEFAULT_FOR_BATCH_ONLY =
"enable.native.replication.as.default.for.batch.only";

/**
* Cluster-level config to enable native replication for new hybrid stores.
*/
public static final String ENABLE_NATIVE_REPLICATION_AS_DEFAULT_FOR_HYBRID =
"enable.native.replication.as.default.for.hybrid";

/**
* Cluster-level config to enable active-active replication for new batch-only stores.
*/
public static final String ENABLE_ACTIVE_ACTIVE_REPLICATION_AS_DEFAULT_FOR_BATCH_ONLY_STORE =
"enable.active.active.replication.as.default.for.batch.only.store";

/**
* Cluster-level config to enable active-active replication for new hybrid stores.
*/
Expand All @@ -189,7 +151,7 @@ private ConfigKeys() {
public static final String ENABLE_BLOB_TRANSFER = "enable.blob.transfer";

/**
* Sets the default for whether or not do schema validation for all stores
* Sets the default for whether to do schema validation or not for all stores
*/
public static final String CONTROLLER_SCHEMA_VALIDATION_ENABLED = "controller.schema.validation.enabled";

Expand All @@ -216,7 +178,6 @@ private ConfigKeys() {
public static final String PARTITION_COUNT_ROUND_UP_SIZE = "partition.count.round.up.size";
public static final String OFFLINE_JOB_START_TIMEOUT_MS = "offline.job.start.timeout.ms";
public static final String DELAY_TO_REBALANCE_MS = "delay.to.rebalance.ms";
public static final String MIN_ACTIVE_REPLICA = "min.active.replica";
public static final String CLUSTER_TO_D2 = "cluster.to.d2";
public static final String CLUSTER_TO_SERVER_D2 = "cluster.to.server.d2";
public static final String HELIX_SEND_MESSAGE_TIMEOUT_MS = "helix.send.message.timeout.ms";
Expand Down Expand Up @@ -262,13 +223,6 @@ private ConfigKeys() {
public static final String TOPIC_CLEANUP_SLEEP_INTERVAL_BETWEEN_TOPIC_LIST_FETCH_MS =
"topic.cleanup.sleep.interval.between.topic.list.fetch.ms";
public static final String TOPIC_CLEANUP_DELAY_FACTOR = "topic.cleanup.delay.factor";
public static final String TOPIC_CLEANUP_SEND_CONCURRENT_DELETES_REQUESTS =
"topic.cleanup.send.concurrent.delete.requests.enabled";

/**
* Sleep interval for polling topic deletion status from ZK.
*/
public static final String TOPIC_DELETION_STATUS_POLL_INTERVAL_MS = "topic.deletion.status.poll.interval.ms";

/**
* The following config is to control the default retention time in milliseconds if it is not specified in store level.
Expand Down Expand Up @@ -304,17 +258,18 @@ private ConfigKeys() {
public static final String CONTROLLER_ENFORCE_SSL = "controller.enforce.ssl";

/**
* Whether child controllers will directly consume the source admin topic in the parent Kafka cluster.
* This config specifies if Venice is deployed in a multi-region mode
*/
public static final String ADMIN_TOPIC_REMOTE_CONSUMPTION_ENABLED = "admin.topic.remote.consumption.enabled";
public static final String MULTI_REGION = "multi.region";

/**
* This config defines the source region name of the admin topic
*/
public static final String ADMIN_TOPIC_SOURCE_REGION = "admin.topic.source.region";

/**
* This following config defines whether admin consumption should be enabled or not, and this config will only control the behavior in Child Controller.
* This following config defines whether admin consumption should be enabled or not, and this config will only control
* the behavior in Child Controller. This is used for store migration.
*/
public static final String CHILD_CONTROLLER_ADMIN_TOPIC_CONSUMPTION_ENABLED =
"child.controller.admin.topic.consumption.enabled";
Expand Down Expand Up @@ -744,12 +699,6 @@ private ConfigKeys() {
public static final String SERVER_INGESTION_ISOLATION_HEARTBEAT_REQUEST_TIMEOUT_SECONDS =
"server.ingestion.isolation.heartbeat.request.timeout.seconds";

/**
* Timeout for single metric request sent from main process to forked ingestion process.
*/
public static final String SERVER_INGESTION_ISOLATION_METRIC_REQUEST_TIMEOUT_SECONDS =
"server.ingestion.isolation.metric.request.timeout.seconds";

/**
* whether to enable checksum verification in the ingestion path from kafka to database persistency. If enabled it will
* keep a running checksum for all and only PUT kafka data message received in the ingestion task and periodically
Expand Down Expand Up @@ -1087,15 +1036,6 @@ private ConfigKeys() {
* */
public static final String CONTROLLER_CLUSTER_REPLICA = "controller.cluster.replica";

/**
* The time window in ms used to throttle the Kafka topic creation, during the time window, only 1 topic is allowed to
* be created.
*/
public static final String TOPIC_CREATION_THROTTLING_TIME_WINDOW_MS = "topic.creation.throttling.time.window.ms";

/** Timeout for create topic and delete topic operations. */
public static final String TOPIC_MANAGER_KAFKA_OPERATION_TIMEOUT_MS = "topic.manager.kafka.operation.timeout.ms";

/**
* This is the minimum number of Kafka topics that are guaranteed to be preserved by the leaky topic clean
* up routine. The topics with the highest version numbers will be favored by this preservative behavior.
Expand Down Expand Up @@ -1255,15 +1195,9 @@ private ConfigKeys() {
public static final String PARENT_KAFKA_CLUSTER_FABRIC_LIST = "parent.kafka.cluster.fabric.list";

/**
* Whether A/A is enabled on the controller. When it is true, all A/A required config (e.g.
* {@link #ACTIVE_ACTIVE_REAL_TIME_SOURCE_FABRIC_LIST}) must be set.
*/
public static final String ACTIVE_ACTIVE_ENABLED_ON_CONTROLLER = "active.active.enabled.on.controller";

/**
* A list of fabrics that are source(s) of the active active real time replication. When active-active replication
* is enabled on the controller {@link #ACTIVE_ACTIVE_ENABLED_ON_CONTROLLER} is true, this list should contain fabrics
* where the Venice server should consume from when it accepts the TS (TopicSwitch) message.
* A list of regions that are source(s) of the Active/Active real time replication. When running in a multi-region
* mode, this list should contain region names where the Venice server should consume from when it accepts the
* TS (TopicSwitch) message.
* Example value of this config: "dc-0, dc-1, dc-2".
*/
public static final String ACTIVE_ACTIVE_REAL_TIME_SOURCE_FABRIC_LIST = "active.active.real.time.source.fabric.list";
Expand All @@ -1276,12 +1210,6 @@ private ConfigKeys() {
public static final String PARENT_CONTROLLER_WAITING_TIME_FOR_CONSUMPTION_MS =
"parent.controller.waiting.time.for.consumption.ms";

/**
* If there is a failure in consuming from the admin topic, skip the message after retrying for this many minutes
* Default 5 days
*/
public static final String ADMIN_CONSUMPTION_TIMEOUT_MINUTES = "admin.consumption.timeout.minute";

/**
* The maximum time allowed for worker threads to execute admin messages in one cycle. A cycle is the processing of
* delegated admin messages by some number of worker thread(s) defined by {@code ADMIN_CONSUMPTION_MAX_WORKER_THREAD_POOL_SIZE}.
Expand Down Expand Up @@ -1561,16 +1489,6 @@ private ConfigKeys() {
*/
public static final String CONTROLLER_HAAS_SUPER_CLUSTER_NAME = "controller.haas.super.cluster.name";

/**
* Whether to enable batch push (including GF job) from Admin in Child Controller.
* In theory, we should disable batch push in Child Controller no matter what, but the fact is that today there are
* many tests, which are doing batch pushes to an individual cluster setup (only Child Controller), so disabling batch push from Admin
* in Child Controller will require a lot of refactoring.
* So the current strategy is to enable it by default, but disable it in EI and PROD.
*/
public static final String CONTROLLER_ENABLE_BATCH_PUSH_FROM_ADMIN_IN_CHILD =
"controller.enable.batch.push.from.admin.in.child";

/**
* A config that turns the key/value profiling stats on and off. This config can be placed in both Router and SNs and it
* is off by default. When switching it on, We will emit a fine grained histogram that reflects the distribution of
Expand All @@ -1589,11 +1507,6 @@ private ConfigKeys() {
* A config specifies which partitioning scheme should be used by VenicePushJob.
*/
public static final String PARTITIONER_CLASS = "partitioner.class";
/**
* A configs of over-partitioning factor
* number of Kafka partitions in each partition
*/
public static final String AMPLIFICATION_FACTOR = "amplification.factor";

/**
* A unique id that can represent this instance
Expand Down
Loading

0 comments on commit ccc4583

Please sign in to comment.