diff --git a/examples/compare-tokens.rs b/examples/compare-tokens.rs index 547180d878..d0d6bf2975 100644 --- a/examples/compare-tokens.rs +++ b/examples/compare-tokens.rs @@ -33,13 +33,13 @@ async fn main() -> Result<()> { ) .await?; - let t = prepared.calculate_token(&(pk,))?.unwrap().value; + let t = prepared.calculate_token(&(pk,))?.unwrap().value(); println!( "Token endpoints for query: {:?}", session .get_cluster_data() - .get_token_endpoints("examples_ks", Token { value: t }) + .get_token_endpoints("examples_ks", Token::new(t)) .iter() .map(|(node, _shard)| node.address) .collect::>() diff --git a/scylla/src/routing.rs b/scylla/src/routing.rs index 5fedecd6b0..a5788c0b10 100644 --- a/scylla/src/routing.rs +++ b/scylla/src/routing.rs @@ -5,8 +5,42 @@ use std::num::NonZeroU16; use thiserror::Error; #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug)] + +/// Token is a result of computing a hash of a primary key +/// +/// It is basically an i64 with one caveat: i64::MIN is not +/// a valid token. It is used to represent infinity. +/// For this reason tokens are normalized - i64::MIN +/// is replaced with i64::MAX. See this fragment of +/// Scylla code for more information: +/// +/// +/// This struct is a wrapper over i64 that performs this normalization +/// when initialized using `new()` method. pub struct Token { - pub value: i64, + value: i64, +} + +impl Token { + /// Creates a new token with given value, normalizing the value if necessary + #[inline] + pub fn new(value: i64) -> Self { + Self { + value: if value == i64::MIN { i64::MAX } else { value }, + } + } + + /// Invalid Token - contains i64::MIN as value. + /// + /// This is (currently) only required by CDCPartitioner. + /// See the following comment: + /// https://github.com/scylladb/scylla-rust-driver/blob/049dc3546d24e45106fed0fdb985ec2511ab5192/scylla/src/transport/partitioner.rs#L312-L322 + pub(crate) const INVALID: Self = Token { value: i64::MIN }; + + #[inline] + pub fn value(&self) -> i64 { + self.value + } } pub type Shard = u32; diff --git a/scylla/src/transport/load_balancing/default.rs b/scylla/src/transport/load_balancing/default.rs index e3c1f97377..0c849bd552 100644 --- a/scylla/src/transport/load_balancing/default.rs +++ b/scylla/src/transport/load_balancing/default.rs @@ -1152,9 +1152,7 @@ mod tests { datacenter: Some(dc.to_string()), rack: None, address: id_to_invalid_addr(*id), - tokens: vec![Token { - value: *id as i64 * 100, - }], + tokens: vec![Token::new(*id as i64 * 100)], host_id: Uuid::new_v4(), }) .collect::>(); @@ -1271,7 +1269,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::Two, ..Default::default() @@ -1296,7 +1294,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::Two, ..Default::default() @@ -1320,7 +1318,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::LocalOne, // local Consistency forbids datacenter failover ..Default::default() @@ -1342,7 +1340,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::One, ..Default::default() @@ -1364,7 +1362,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_3), consistency: Consistency::Quorum, ..Default::default() @@ -1389,7 +1387,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_3), consistency: Consistency::Quorum, ..Default::default() @@ -1413,7 +1411,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_3), consistency: Consistency::Quorum, ..Default::default() @@ -1435,7 +1433,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_SS_RF_2), consistency: Consistency::Two, ..Default::default() @@ -1459,7 +1457,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_SS_RF_2), consistency: Consistency::LocalOne, // local Consistency forbids datacenter failover ..Default::default() @@ -1500,7 +1498,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: None, // no keyspace consistency: Consistency::Quorum, ..Default::default() @@ -1519,7 +1517,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::Quorum, ..Default::default() @@ -1541,7 +1539,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::Quorum, ..Default::default() @@ -1560,7 +1558,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::Quorum, ..Default::default() @@ -1582,7 +1580,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::Quorum, ..Default::default() @@ -1607,7 +1605,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_3), consistency: Consistency::One, ..Default::default() @@ -1635,7 +1633,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 560 }), + token: Some(Token::new(560)), keyspace: Some(KEYSPACE_SS_RF_2), consistency: Consistency::Two, ..Default::default() @@ -1662,7 +1660,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_SS_RF_2), consistency: Consistency::One, ..Default::default() @@ -1742,7 +1740,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::Two, is_confirmed_lwt: true, @@ -1768,7 +1766,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::Two, is_confirmed_lwt: true, @@ -1793,7 +1791,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::LocalOne, // local Consistency forbids datacenter failover is_confirmed_lwt: true, @@ -1816,7 +1814,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::One, is_confirmed_lwt: true, @@ -1839,7 +1837,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_3), consistency: Consistency::Quorum, is_confirmed_lwt: true, @@ -1865,7 +1863,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_3), consistency: Consistency::Quorum, is_confirmed_lwt: true, @@ -1890,7 +1888,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_3), consistency: Consistency::Quorum, is_confirmed_lwt: true, @@ -1913,7 +1911,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_SS_RF_2), consistency: Consistency::Two, is_confirmed_lwt: true, @@ -1938,7 +1936,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_SS_RF_2), consistency: Consistency::LocalOne, // local Consistency forbids datacenter failover is_confirmed_lwt: true, @@ -1981,7 +1979,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: None, // no keyspace consistency: Consistency::Quorum, is_confirmed_lwt: true, @@ -2001,7 +1999,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::Quorum, is_confirmed_lwt: true, @@ -2024,7 +2022,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::Quorum, is_confirmed_lwt: true, @@ -2044,7 +2042,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::Quorum, is_confirmed_lwt: true, @@ -2067,7 +2065,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::Quorum, is_confirmed_lwt: true, @@ -2093,7 +2091,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_3), consistency: Consistency::One, is_confirmed_lwt: true, @@ -2122,7 +2120,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 760 }), + token: Some(Token::new(760)), keyspace: Some(KEYSPACE_SS_RF_2), consistency: Consistency::Two, is_confirmed_lwt: true, @@ -2149,7 +2147,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_SS_RF_2), consistency: Consistency::One, is_confirmed_lwt: true, @@ -3404,7 +3402,7 @@ mod latency_awareness { (E, too_few_measurements_slow()), ], routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_3), consistency: Consistency::Quorum, ..Default::default() @@ -3424,7 +3422,7 @@ mod latency_awareness { // Latency-awareness has old minimum average cached, so does not fire. preset_min_avg: Some(100 * min_avg), routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_3), consistency: Consistency::Quorum, ..Default::default() @@ -3453,7 +3451,7 @@ mod latency_awareness { (C, too_few_measurements_fast_leader()), ], routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::Quorum, ..Default::default() @@ -3472,7 +3470,7 @@ mod latency_awareness { // No latency stats, so latency-awareness is a no-op. preset_min_avg: None, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some("invalid"), consistency: Consistency::Quorum, ..Default::default() diff --git a/scylla/src/transport/locator/mod.rs b/scylla/src/transport/locator/mod.rs index d53301b08c..f87a32770c 100644 --- a/scylla/src/transport/locator/mod.rs +++ b/scylla/src/transport/locator/mod.rs @@ -771,8 +771,7 @@ mod tests { // For each case (token, limit_to_dc, strategy), we are checking // that ReplicasOrdered yields replicas in the expected order. let check = |token, limit_to_dc, strategy, expected| { - let replica_set = - locator.replicas_for_token(Token { value: token }, strategy, limit_to_dc); + let replica_set = locator.replicas_for_token(Token::new(token), strategy, limit_to_dc); let replicas_ordered = replica_set.into_replicas_ordered(); let ids: Vec<_> = replicas_ordered .into_iter() diff --git a/scylla/src/transport/locator/precomputed_replicas.rs b/scylla/src/transport/locator/precomputed_replicas.rs index 6e0cc249a9..69851df507 100644 --- a/scylla/src/transport/locator/precomputed_replicas.rs +++ b/scylla/src/transport/locator/precomputed_replicas.rs @@ -255,10 +255,8 @@ mod tests { ); let check = |token, replication_factor, expected_node_ids| { - let replicas = precomputed_replicas.get_precomputed_simple_strategy_replicas( - Token { value: token }, - replication_factor, - ); + let replicas = precomputed_replicas + .get_precomputed_simple_strategy_replicas(Token::new(token), replication_factor); let ids: Vec = replicas .unwrap() @@ -273,7 +271,7 @@ mod tests { check(160, 1, vec![F]); check(160, 2, vec![F, A]); assert_eq!( - precomputed_replicas.get_precomputed_simple_strategy_replicas(Token { value: 160 }, 3), + precomputed_replicas.get_precomputed_simple_strategy_replicas(Token::new(160), 3), None ); @@ -300,7 +298,7 @@ mod tests { let check = |token, dc, replication_factor, expected_node_ids| { let replicas = precomputed_replicas.get_precomputed_network_strategy_replicas( - Token { value: token }, + Token::new(token), dc, replication_factor, ); @@ -320,7 +318,7 @@ mod tests { check(160, "eu", 3, vec![A, C, G]); assert_eq!( precomputed_replicas.get_precomputed_network_strategy_replicas( - Token { value: 160 }, + Token::new(160), "eu", 4 ), @@ -333,7 +331,7 @@ mod tests { check(160, "us", 3, vec![F, D, E]); assert_eq!( precomputed_replicas.get_precomputed_network_strategy_replicas( - Token { value: 160 }, + Token::new(160), "us", 4 ), diff --git a/scylla/src/transport/locator/replication_info.rs b/scylla/src/transport/locator/replication_info.rs index 2043b444fb..76747abf2c 100644 --- a/scylla/src/transport/locator/replication_info.rs +++ b/scylla/src/transport/locator/replication_info.rs @@ -220,8 +220,8 @@ mod tests { let replication_info = ReplicationInfo::new(ring); let check = |token, replication_factor, expected_node_ids| { - let replicas = replication_info - .simple_strategy_replicas(Token { value: token }, replication_factor); + let replicas = + replication_info.simple_strategy_replicas(Token::new(token), replication_factor); let ids: Vec = replicas.map(|node| node.address.port()).collect(); assert_eq!(ids, expected_node_ids); @@ -255,8 +255,7 @@ mod tests { let replication_info = ReplicationInfo::new(ring); let check = |token, dc, rf, expected| { - let replicas = - replication_info.nts_replicas_in_datacenter(Token { value: token }, dc, rf); + let replicas = replication_info.nts_replicas_in_datacenter(Token::new(token), dc, rf); let ids: Vec = replicas.map(|node| node.address.port()).collect(); assert_eq!(ids, expected); diff --git a/scylla/src/transport/locator/test.rs b/scylla/src/transport/locator/test.rs index eb72feff58..f93b2b7b8e 100644 --- a/scylla/src/transport/locator/test.rs +++ b/scylla/src/transport/locator/test.rs @@ -52,11 +52,7 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata { datacenter: Some("eu".into()), rack: Some("r1".to_owned()), address: id_to_invalid_addr(1), - tokens: vec![ - Token { value: 50 }, - Token { value: 250 }, - Token { value: 400 }, - ], + tokens: vec![Token::new(50), Token::new(250), Token::new(400)], host_id: Uuid::new_v4(), }, Peer { @@ -64,11 +60,7 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata { datacenter: Some("eu".into()), rack: Some("r1".to_owned()), address: id_to_invalid_addr(2), - tokens: vec![ - Token { value: 100 }, - Token { value: 600 }, - Token { value: 900 }, - ], + tokens: vec![Token::new(100), Token::new(600), Token::new(900)], host_id: Uuid::new_v4(), }, Peer { @@ -76,11 +68,7 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata { datacenter: Some("eu".into()), rack: Some("r1".to_owned()), address: id_to_invalid_addr(3), - tokens: vec![ - Token { value: 300 }, - Token { value: 650 }, - Token { value: 700 }, - ], + tokens: vec![Token::new(300), Token::new(650), Token::new(700)], host_id: Uuid::new_v4(), }, Peer { @@ -88,7 +76,7 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata { datacenter: Some("us".into()), rack: Some("r1".to_owned()), address: id_to_invalid_addr(4), - tokens: vec![Token { value: 350 }, Token { value: 550 }], + tokens: vec![Token::new(350), Token::new(550)], host_id: Uuid::new_v4(), }, Peer { @@ -96,7 +84,7 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata { datacenter: Some("us".into()), rack: Some("r1".to_owned()), address: id_to_invalid_addr(5), - tokens: vec![Token { value: 150 }, Token { value: 750 }], + tokens: vec![Token::new(150), Token::new(750)], host_id: Uuid::new_v4(), }, Peer { @@ -104,7 +92,7 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata { datacenter: Some("us".into()), rack: Some("r2".to_owned()), address: id_to_invalid_addr(6), - tokens: vec![Token { value: 200 }, Token { value: 450 }], + tokens: vec![Token::new(200), Token::new(450)], host_id: Uuid::new_v4(), }, Peer { @@ -112,7 +100,7 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata { datacenter: Some("eu".into()), rack: Some("r2".to_owned()), address: id_to_invalid_addr(7), - tokens: vec![Token { value: 500 }, Token { value: 800 }], + tokens: vec![Token::new(500), Token::new(800)], host_id: Uuid::new_v4(), }, ]; @@ -252,7 +240,7 @@ fn test_datacenter_info(locator: &ReplicaLocator) { fn test_simple_strategy_replicas(locator: &ReplicaLocator) { assert_replica_set_equal_to( locator.replicas_for_token( - Token { value: 450 }, + Token::new(450), &Strategy::SimpleStrategy { replication_factor: 3, }, @@ -263,7 +251,7 @@ fn test_simple_strategy_replicas(locator: &ReplicaLocator) { assert_replica_set_equal_to( locator.replicas_for_token( - Token { value: 450 }, + Token::new(450), &Strategy::SimpleStrategy { replication_factor: 4, }, @@ -274,7 +262,7 @@ fn test_simple_strategy_replicas(locator: &ReplicaLocator) { assert_replica_set_equal_to( locator.replicas_for_token( - Token { value: 201 }, + Token::new(201), &Strategy::SimpleStrategy { replication_factor: 4, }, @@ -285,7 +273,7 @@ fn test_simple_strategy_replicas(locator: &ReplicaLocator) { assert_replica_set_equal_to( locator.replicas_for_token( - Token { value: 201 }, + Token::new(201), &Strategy::SimpleStrategy { replication_factor: 0, }, @@ -298,7 +286,7 @@ fn test_simple_strategy_replicas(locator: &ReplicaLocator) { // in that dc. assert_replica_set_equal_to( locator.replicas_for_token( - Token { value: 50 }, + Token::new(50), &Strategy::SimpleStrategy { replication_factor: 1, }, @@ -309,7 +297,7 @@ fn test_simple_strategy_replicas(locator: &ReplicaLocator) { assert_replica_set_equal_to( locator.replicas_for_token( - Token { value: 50 }, + Token::new(50), &Strategy::SimpleStrategy { replication_factor: 3, }, @@ -320,7 +308,7 @@ fn test_simple_strategy_replicas(locator: &ReplicaLocator) { assert_replica_set_equal_to( locator.replicas_for_token( - Token { value: 50 }, + Token::new(50), &Strategy::SimpleStrategy { replication_factor: 3, }, @@ -333,7 +321,7 @@ fn test_simple_strategy_replicas(locator: &ReplicaLocator) { fn test_network_topology_strategy_replicas(locator: &ReplicaLocator) { assert_replica_set_equal_to( locator.replicas_for_token( - Token { value: 75 }, + Token::new(75), &Strategy::NetworkTopologyStrategy { datacenter_repfactors: [("eu".to_owned(), 1), ("us".to_owned(), 1)] .into_iter() @@ -346,7 +334,7 @@ fn test_network_topology_strategy_replicas(locator: &ReplicaLocator) { assert_replica_set_equal_to( locator.replicas_for_token( - Token { value: 75 }, + Token::new(75), &Strategy::NetworkTopologyStrategy { datacenter_repfactors: [("eu".to_owned(), 1), ("us".to_owned(), 1)] .into_iter() @@ -359,7 +347,7 @@ fn test_network_topology_strategy_replicas(locator: &ReplicaLocator) { assert_replica_set_equal_to( locator.replicas_for_token( - Token { value: 75 }, + Token::new(75), &Strategy::NetworkTopologyStrategy { datacenter_repfactors: [("eu".to_owned(), 1), ("us".to_owned(), 1)] .into_iter() @@ -372,7 +360,7 @@ fn test_network_topology_strategy_replicas(locator: &ReplicaLocator) { assert_replica_set_equal_to( locator.replicas_for_token( - Token { value: 75 }, + Token::new(75), &Strategy::NetworkTopologyStrategy { datacenter_repfactors: [("eu".to_owned(), 2), ("us".to_owned(), 1)] .into_iter() @@ -388,7 +376,7 @@ fn test_network_topology_strategy_replicas(locator: &ReplicaLocator) { assert_replica_set_equal_to( locator.replicas_for_token( - Token { value: 75 }, + Token::new(75), &Strategy::NetworkTopologyStrategy { datacenter_repfactors: [("unknown".to_owned(), 2), ("us".to_owned(), 1)] .into_iter() @@ -401,7 +389,7 @@ fn test_network_topology_strategy_replicas(locator: &ReplicaLocator) { assert_replica_set_equal_to( locator.replicas_for_token( - Token { value: 800 }, + Token::new(800), &Strategy::NetworkTopologyStrategy { datacenter_repfactors: [("eu".to_owned(), 1), ("us".to_owned(), 1)] .into_iter() @@ -416,7 +404,7 @@ fn test_network_topology_strategy_replicas(locator: &ReplicaLocator) { fn test_replica_set_len(locator: &ReplicaLocator) { let merged_nts_len = locator .replicas_for_token( - Token { value: 75 }, + Token::new(75), &Strategy::NetworkTopologyStrategy { datacenter_repfactors: [("eu".to_owned(), 2), ("us".to_owned(), 1)] .into_iter() @@ -431,7 +419,7 @@ fn test_replica_set_len(locator: &ReplicaLocator) { // replica set length was limited. let capped_merged_nts_len = locator .replicas_for_token( - Token { value: 75 }, + Token::new(75), &Strategy::NetworkTopologyStrategy { datacenter_repfactors: [("eu".to_owned(), 69), ("us".to_owned(), 1)] .into_iter() @@ -444,7 +432,7 @@ fn test_replica_set_len(locator: &ReplicaLocator) { let filtered_nts_len = locator .replicas_for_token( - Token { value: 450 }, + Token::new(450), &Strategy::NetworkTopologyStrategy { datacenter_repfactors: [("eu".to_owned(), 2), ("us".to_owned(), 1)] .into_iter() @@ -457,7 +445,7 @@ fn test_replica_set_len(locator: &ReplicaLocator) { let ss_len = locator .replicas_for_token( - Token { value: 75 }, + Token::new(75), &Strategy::SimpleStrategy { replication_factor: 3, }, @@ -469,7 +457,7 @@ fn test_replica_set_len(locator: &ReplicaLocator) { // Test if the replica set length was capped when a datacenter name was provided. let filtered_ss_len = locator .replicas_for_token( - Token { value: 75 }, + Token::new(75), &Strategy::SimpleStrategy { replication_factor: 3, }, @@ -494,8 +482,7 @@ fn test_replica_set_choose(locator: &ReplicaLocator) { let mut rng = ChaCha8Rng::seed_from_u64(69); for strategy in strategies { - let replica_set_generator = - || locator.replicas_for_token(Token { value: 75 }, &strategy, None); + let replica_set_generator = || locator.replicas_for_token(Token::new(75), &strategy, None); // Verify that after a certain number of random selections, the set of selected replicas // will contain all nodes in the ring (replica set was created using a strategy with @@ -535,8 +522,7 @@ fn test_replica_set_choose_filtered(locator: &ReplicaLocator) { let mut rng = ChaCha8Rng::seed_from_u64(69); for strategy in strategies { - let replica_set_generator = - || locator.replicas_for_token(Token { value: 75 }, &strategy, None); + let replica_set_generator = || locator.replicas_for_token(Token::new(75), &strategy, None); // Verify that after a certain number of random selections with a dc filter, the set of // selected replicas will contain all nodes in the specified dc ring. @@ -565,7 +551,7 @@ fn test_replica_set_choose_filtered(locator: &ReplicaLocator) { // Check that choosing from an empty set yields no value. let empty = locator .replicas_for_token( - Token { value: 75 }, + Token::new(75), &Strategy::LocalStrategy, Some("unknown_dc_name"), ) diff --git a/scylla/src/transport/locator/token_ring.rs b/scylla/src/transport/locator/token_ring.rs index bf1932445c..b78a0d8084 100644 --- a/scylla/src/transport/locator/token_ring.rs +++ b/scylla/src/transport/locator/token_ring.rs @@ -78,117 +78,117 @@ mod tests { fn test_token_ring() { setup_tracing(); let ring_data = [ - (Token { value: -30 }, -3), - (Token { value: -20 }, -2), - (Token { value: -10 }, -1), - (Token { value: 0 }, 0), - (Token { value: 10 }, 1), - (Token { value: 20 }, 2), - (Token { value: 30 }, 3), + (Token::new(-30), -3), + (Token::new(-20), -2), + (Token::new(-10), -1), + (Token::new(0), 0), + (Token::new(10), 1), + (Token::new(20), 2), + (Token::new(30), 3), ]; let ring: TokenRing = TokenRing::new(ring_data.into_iter()); assert_eq!( - ring.ring_range(Token { value: -35 }) + ring.ring_range(Token::new(-35)) .cloned() .collect::>(), vec![-3, -2, -1, 0, 1, 2, 3] ); assert_eq!( - ring.ring_range(Token { value: -30 }) + ring.ring_range(Token::new(-30)) .cloned() .collect::>(), vec![-3, -2, -1, 0, 1, 2, 3] ); assert_eq!( - ring.ring_range(Token { value: -25 }) + ring.ring_range(Token::new(-25)) .cloned() .collect::>(), vec![-2, -1, 0, 1, 2, 3, -3] ); assert_eq!( - ring.ring_range(Token { value: -20 }) + ring.ring_range(Token::new(-20)) .cloned() .collect::>(), vec![-2, -1, 0, 1, 2, 3, -3] ); assert_eq!( - ring.ring_range(Token { value: -15 }) + ring.ring_range(Token::new(-15)) .cloned() .collect::>(), vec![-1, 0, 1, 2, 3, -3, -2] ); assert_eq!( - ring.ring_range(Token { value: -10 }) + ring.ring_range(Token::new(-10)) .cloned() .collect::>(), vec![-1, 0, 1, 2, 3, -3, -2] ); assert_eq!( - ring.ring_range(Token { value: -5 }) + ring.ring_range(Token::new(-5)) .cloned() .collect::>(), vec![0, 1, 2, 3, -3, -2, -1] ); assert_eq!( - ring.ring_range(Token { value: 0 }) + ring.ring_range(Token::new(0)) .cloned() .collect::>(), vec![0, 1, 2, 3, -3, -2, -1] ); assert_eq!( - ring.ring_range(Token { value: 5 }) + ring.ring_range(Token::new(5)) .cloned() .collect::>(), vec![1, 2, 3, -3, -2, -1, 0] ); assert_eq!( - ring.ring_range(Token { value: 10 }) + ring.ring_range(Token::new(10)) .cloned() .collect::>(), vec![1, 2, 3, -3, -2, -1, 0] ); assert_eq!( - ring.ring_range(Token { value: 15 }) + ring.ring_range(Token::new(15)) .cloned() .collect::>(), vec![2, 3, -3, -2, -1, 0, 1] ); assert_eq!( - ring.ring_range(Token { value: 20 }) + ring.ring_range(Token::new(20)) .cloned() .collect::>(), vec![2, 3, -3, -2, -1, 0, 1] ); assert_eq!( - ring.ring_range(Token { value: 25 }) + ring.ring_range(Token::new(25)) .cloned() .collect::>(), vec![3, -3, -2, -1, 0, 1, 2] ); assert_eq!( - ring.ring_range(Token { value: 30 }) + ring.ring_range(Token::new(30)) .cloned() .collect::>(), vec![3, -3, -2, -1, 0, 1, 2] ); assert_eq!( - ring.ring_range(Token { value: 35 }) + ring.ring_range(Token::new(35)) .cloned() .collect::>(), vec![-3, -2, -1, 0, 1, 2, 3] diff --git a/scylla/src/transport/partitioner.rs b/scylla/src/transport/partitioner.rs index 5b1d008f35..ee36543b61 100644 --- a/scylla/src/transport/partitioner.rs +++ b/scylla/src/transport/partitioner.rs @@ -255,9 +255,7 @@ impl PartitionerHasher for Murmur3PartitionerHasher { h1 += h2; h2 += h1; - Token { - value: (((h2.0 as i128) << 64) | h1.0 as i128) as i64, - } + Token::new((((h2.0 as i128) << 64) | h1.0 as i128) as i64) } } @@ -303,9 +301,7 @@ impl PartitionerHasher for CDCPartitionerHasher { // If the buffer is full, we can compute and fix the token. if *len == Self::BUF_CAPACITY { - let token = Token { - value: (&mut &buf[..]).get_i64(), - }; + let token = Token::new((&mut &buf[..]).get_i64()); self.state = CDCPartitionerHasherState::Computed(token); } } @@ -315,7 +311,12 @@ impl PartitionerHasher for CDCPartitionerHasher { fn finish(&self) -> Token { match self.state { - CDCPartitionerHasherState::Feeding { .. } => Token { value: i64::MIN }, + // Looking at Scylla code it seems that here we actually want token with this value. + // If the value is too short Scylla returns `dht::minimum_token()`: + // https://github.com/scylladb/scylladb/blob/4be70bfc2bc7f133cab492b4aac7bab9c790a48c/cdc/cdc_partitioner.cc#L32 + // When you call `long_token` on `minimum_token` it will actually return `i64::MIN`: + // https://github.com/scylladb/scylladb/blob/0a7854ea4de04f20b71326ba5940b5fac6f7241a/dht/token.cc#L21-L35 + CDCPartitionerHasherState::Feeding { .. } => Token::INVALID, CDCPartitionerHasherState::Computed(token) => token, } } @@ -373,7 +374,7 @@ mod tests { use super::{CDCPartitioner, Murmur3Partitioner, Partitioner}; fn assert_correct_murmur3_hash(pk: &'static str, expected_hash: i64) { - let hash = Murmur3Partitioner.hash_one(pk.as_bytes()).value; + let hash = Murmur3Partitioner.hash_one(pk.as_bytes()).value(); assert_eq!(hash, expected_hash); } @@ -391,7 +392,7 @@ mod tests { } fn assert_correct_cdc_hash(pk: &'static str, expected_hash: i64) { - let hash = CDCPartitioner.hash_one(pk.as_bytes()).value; + let hash = CDCPartitioner.hash_one(pk.as_bytes()).value(); assert_eq!(hash, expected_hash); } diff --git a/scylla/src/transport/session.rs b/scylla/src/transport/session.rs index 81e44cd78e..47ec56f845 100644 --- a/scylla/src/transport/session.rs +++ b/scylla/src/transport/session.rs @@ -1949,7 +1949,7 @@ impl RequestSpan { ); } if let Some(token) = token { - span.record("token", token.value); + span.record("token", token.value()); } Self { diff --git a/scylla/src/transport/session_test.rs b/scylla/src/transport/session_test.rs index 7fbd31aa9c..a00c4f8832 100644 --- a/scylla/src/transport/session_test.rs +++ b/scylla/src/transport/session_test.rs @@ -231,7 +231,7 @@ async fn test_prepared_statement() { .unwrap() .single_row_typed() .unwrap(); - let token = Token { value }; + let token = Token::new(value); let prepared_token = Murmur3Partitioner .hash_one(&prepared_statement.compute_partition_key(&values).unwrap()); assert_eq!(token, prepared_token); @@ -250,7 +250,7 @@ async fn test_prepared_statement() { .unwrap() .single_row_typed() .unwrap(); - let token = Token { value }; + let token = Token::new(value); let prepared_token = Murmur3Partitioner.hash_one( &prepared_complex_pk_statement .compute_partition_key(&values) @@ -500,7 +500,7 @@ async fn test_token_calculation() { .unwrap() .single_row_typed() .unwrap(); - let token = Token { value }; + let token = Token::new(value); let prepared_token = Murmur3Partitioner .hash_one(&prepared_statement.compute_partition_key(&values).unwrap()); assert_eq!(token, prepared_token); @@ -2782,7 +2782,8 @@ async fn test_manual_primary_key_computation() { .unwrap(); println!( "by_prepared: {}, by_hand: {}", - token_by_prepared.value, token_by_hand.value + token_by_prepared.value(), + token_by_hand.value() ); assert_eq!(token_by_prepared, token_by_hand); } diff --git a/scylla/src/transport/topology.rs b/scylla/src/transport/topology.rs index a6f3a34a9d..b468050c0b 100644 --- a/scylla/src/transport/topology.rs +++ b/scylla/src/transport/topology.rs @@ -429,9 +429,7 @@ impl Metadata { Peer { address: endpoint.address(), - tokens: vec![Token { - value: token as i64, - }], + tokens: vec![Token::new(token as i64)], datacenter: None, rack: None, host_id: Uuid::new_v4(), @@ -887,9 +885,7 @@ async fn create_peer_from_row( // Also, we could implement support for Cassandra's other standard partitioners // like RandomPartitioner or ByteOrderedPartitioner. trace!("Couldn't parse tokens as 64-bit integers: {}, proceeding with a dummy token. If you're using a partitioner with different token size, consider migrating to murmur3", e); - vec![Token { - value: rand::thread_rng().gen::(), - }] + vec![Token::new(rand::thread_rng().gen::())] } };