From 9d82dd2a867f7c5d41136848dd370fe5f3f174de Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Tue, 14 Mar 2023 14:04:54 +0100 Subject: [PATCH] session: add methods for creating compatible session with different API In order to make migration from the old API easier and allow doing it gradually, some components of the client programs would probably like to use the old API while the new components will use the new API. However, in the current design, Session and LegacySession are two different types and it's not possible to "cast" one to another - even though they have nearly the same fields and implementations. The previous commit made Cluster cloneable, based on the observation that it's perfectly fine to clone Cluster's fields, construct a new one and treat it as a shared facade, handle to the same "semantic" cluster. The same applies to Session, actually - cloning a session would have similar effect (though we encourage users to keep Session in an Arc so that cloning is cheaper). Instead of making GenericSession cloneable, we introduce methods which, in reality, perform a clone but change the kind of session's API. This allows to have two session objects which share the same resources but have different APIs. This should be very useful when migrating large projects to the new API - components that need to use the new API can just "convert" the session to the new interface and use that. --- scylla/src/transport/session.rs | 59 +++++++++++++++++++++++++++- scylla/src/transport/session_test.rs | 41 +++++++++++++++++++ 2 files changed, 98 insertions(+), 2 deletions(-) diff --git a/scylla/src/transport/session.rs b/scylla/src/transport/session.rs index 4db0bbde2..587e06658 100644 --- a/scylla/src/transport/session.rs +++ b/scylla/src/transport/session.rs @@ -186,7 +186,7 @@ where schema_agreement_timeout: Duration, schema_agreement_automatic_waiting: bool, refresh_metadata_on_auto_schema_agreement: bool, - keyspace_name: ArcSwapOption, + keyspace_name: Arc>, tracing_info_fetch_attempts: NonZeroU32, tracing_info_fetch_interval: Duration, tracing_info_fetch_consistency: Consistency, @@ -856,6 +856,32 @@ impl GenericSession { ) -> Result { self.do_batch(batch, values).await } + + /// Creates a new Session instance that shared resources with + /// the current Session but supports the legacy API. + /// + /// This method is provided in order to make migration to the new + /// deserialization API easier. For example, if your program in general uses + /// the new API but you still have some modules left that use the old one, + /// you can use this method to create an instance that supports the old API + /// and pass it to the module that you intend to migrate later. + pub fn make_shared_session_with_legacy_api(&self) -> LegacySession { + LegacySession { + cluster: self.cluster.clone(), + default_execution_profile_handle: self.default_execution_profile_handle.clone(), + metrics: self.metrics.clone(), + refresh_metadata_on_auto_schema_agreement: self + .refresh_metadata_on_auto_schema_agreement, + schema_agreement_interval: self.schema_agreement_interval, + keyspace_name: self.keyspace_name.clone(), + schema_agreement_timeout: self.schema_agreement_timeout, + schema_agreement_automatic_waiting: self.schema_agreement_automatic_waiting, + tracing_info_fetch_attempts: self.tracing_info_fetch_attempts, + tracing_info_fetch_interval: self.tracing_info_fetch_interval, + tracing_info_fetch_consistency: self.tracing_info_fetch_consistency, + _phantom_deser_api: PhantomData, + } + } } impl GenericSession { @@ -932,6 +958,35 @@ impl GenericSession { ) -> Result { Ok(self.do_batch(batch, values).await?.into_legacy_result()?) } + + /// Creates a new Session instance that shares resources with + /// the current Session but supports the new API. + /// + /// This method is provided in order to make migration to the new + /// deserialization API easier. For example, if your program in general uses + /// the old API but you want to migrate some modules to the new one, you + /// can use this method to create an instance that supports the new API + /// and pass it to the module that you intend to migrate. + /// + /// The new session object will use the same connections and cluster + /// metadata. + pub fn make_shared_session_with_new_api(&self) -> Session { + Session { + cluster: self.cluster.clone(), + default_execution_profile_handle: self.default_execution_profile_handle.clone(), + metrics: self.metrics.clone(), + refresh_metadata_on_auto_schema_agreement: self + .refresh_metadata_on_auto_schema_agreement, + schema_agreement_interval: self.schema_agreement_interval, + keyspace_name: self.keyspace_name.clone(), + schema_agreement_timeout: self.schema_agreement_timeout, + schema_agreement_automatic_waiting: self.schema_agreement_automatic_waiting, + tracing_info_fetch_attempts: self.tracing_info_fetch_attempts, + tracing_info_fetch_interval: self.tracing_info_fetch_interval, + tracing_info_fetch_consistency: self.tracing_info_fetch_consistency, + _phantom_deser_api: PhantomData, + } + } } /// Represents a CQL session, which can be used to communicate @@ -1045,7 +1100,7 @@ where schema_agreement_automatic_waiting: config.schema_agreement_automatic_waiting, refresh_metadata_on_auto_schema_agreement: config .refresh_metadata_on_auto_schema_agreement, - keyspace_name: ArcSwapOption::default(), // will be set by use_keyspace + keyspace_name: Arc::new(ArcSwapOption::default()), // will be set by use_keyspace tracing_info_fetch_attempts: config.tracing_info_fetch_attempts, tracing_info_fetch_interval: config.tracing_info_fetch_interval, tracing_info_fetch_consistency: config.tracing_info_fetch_consistency, diff --git a/scylla/src/transport/session_test.rs b/scylla/src/transport/session_test.rs index 6c4beeb4a..2e99087a7 100644 --- a/scylla/src/transport/session_test.rs +++ b/scylla/src/transport/session_test.rs @@ -3150,3 +3150,44 @@ async fn test_deserialize_empty_collections() { .await; assert!(map.is_empty()); } + +#[tokio::test] +async fn test_api_migration_session_sharing() { + { + let session = create_new_session_builder().build().await.unwrap(); + let session_shared = session.make_shared_session_with_legacy_api(); + + // If we are unlucky then we will race with metadata fetch/cluster update + // and both invocations will return different cluster data. This should be + // SUPER rare, but in order to reduce the chance of flakiness to a minimum + // we will try it three times in a row. Cluster data is updated once per + // minute, so this should be good enough. + let mut matched = false; + for _ in 0..3 { + let cd1 = session.get_cluster_data(); + let cd2 = session_shared.get_cluster_data(); + + if Arc::ptr_eq(&cd1, &cd2) { + matched = true; + break; + } + } + assert!(matched); + } + { + let session = create_new_session_builder().build_legacy().await.unwrap(); + let session_shared = session.make_shared_session_with_new_api(); + + let mut matched = false; + for _ in 0..3 { + let cd1 = session.get_cluster_data(); + let cd2 = session_shared.get_cluster_data(); + + if Arc::ptr_eq(&cd1, &cd2) { + matched = true; + break; + } + } + assert!(matched); + } +}