Skip to content

Commit

Permalink
session: add methods for creating compatible session with different API
Browse files Browse the repository at this point in the history
In order to make migration from the old API easier and allow doing it
gradually, some components of the client programs would probably like to
use the old API while the new components will use the new API. However,
in the current design, Session and LegacySession are two different
types and it's not possible to "cast" one to another - even though they
have nearly the same fields and implementations.

The previous commit made Cluster cloneable, based on the observation
that it's perfectly fine to clone Cluster's fields, construct a new one
and treat it as a shared facade, handle to the same "semantic" cluster.
The same applies to Session, actually - cloning a session would have
similar effect (though we encourage users to keep Session in an Arc so
that cloning is cheaper).

Instead of making GenericSession cloneable, we introduce methods which,
in reality, perform a clone but change the kind of session's API. This
allows to have two session objects which share the same resources but
have different APIs. This should be very useful when migrating large
projects to the new API - components that need to use the new API can
just "convert" the session to the new interface and use that.
  • Loading branch information
piodul authored and wprzytula committed Nov 12, 2024
1 parent 54791dc commit 9d82dd2
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 2 deletions.
59 changes: 57 additions & 2 deletions scylla/src/transport/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ where
schema_agreement_timeout: Duration,
schema_agreement_automatic_waiting: bool,
refresh_metadata_on_auto_schema_agreement: bool,
keyspace_name: ArcSwapOption<String>,
keyspace_name: Arc<ArcSwapOption<String>>,
tracing_info_fetch_attempts: NonZeroU32,
tracing_info_fetch_interval: Duration,
tracing_info_fetch_consistency: Consistency,
Expand Down Expand Up @@ -856,6 +856,32 @@ impl GenericSession<CurrentDeserializationApi> {
) -> Result<QueryResult, QueryError> {
self.do_batch(batch, values).await
}

/// Creates a new Session instance that shared resources with
/// the current Session but supports the legacy API.
///
/// This method is provided in order to make migration to the new
/// deserialization API easier. For example, if your program in general uses
/// the new API but you still have some modules left that use the old one,
/// you can use this method to create an instance that supports the old API
/// and pass it to the module that you intend to migrate later.
pub fn make_shared_session_with_legacy_api(&self) -> LegacySession {
LegacySession {
cluster: self.cluster.clone(),
default_execution_profile_handle: self.default_execution_profile_handle.clone(),
metrics: self.metrics.clone(),
refresh_metadata_on_auto_schema_agreement: self
.refresh_metadata_on_auto_schema_agreement,
schema_agreement_interval: self.schema_agreement_interval,
keyspace_name: self.keyspace_name.clone(),
schema_agreement_timeout: self.schema_agreement_timeout,
schema_agreement_automatic_waiting: self.schema_agreement_automatic_waiting,
tracing_info_fetch_attempts: self.tracing_info_fetch_attempts,
tracing_info_fetch_interval: self.tracing_info_fetch_interval,
tracing_info_fetch_consistency: self.tracing_info_fetch_consistency,
_phantom_deser_api: PhantomData,
}
}
}

impl GenericSession<LegacyDeserializationApi> {
Expand Down Expand Up @@ -932,6 +958,35 @@ impl GenericSession<LegacyDeserializationApi> {
) -> Result<LegacyQueryResult, QueryError> {
Ok(self.do_batch(batch, values).await?.into_legacy_result()?)
}

/// Creates a new Session instance that shares resources with
/// the current Session but supports the new API.
///
/// This method is provided in order to make migration to the new
/// deserialization API easier. For example, if your program in general uses
/// the old API but you want to migrate some modules to the new one, you
/// can use this method to create an instance that supports the new API
/// and pass it to the module that you intend to migrate.
///
/// The new session object will use the same connections and cluster
/// metadata.
pub fn make_shared_session_with_new_api(&self) -> Session {
Session {
cluster: self.cluster.clone(),
default_execution_profile_handle: self.default_execution_profile_handle.clone(),
metrics: self.metrics.clone(),
refresh_metadata_on_auto_schema_agreement: self
.refresh_metadata_on_auto_schema_agreement,
schema_agreement_interval: self.schema_agreement_interval,
keyspace_name: self.keyspace_name.clone(),
schema_agreement_timeout: self.schema_agreement_timeout,
schema_agreement_automatic_waiting: self.schema_agreement_automatic_waiting,
tracing_info_fetch_attempts: self.tracing_info_fetch_attempts,
tracing_info_fetch_interval: self.tracing_info_fetch_interval,
tracing_info_fetch_consistency: self.tracing_info_fetch_consistency,
_phantom_deser_api: PhantomData,
}
}
}

/// Represents a CQL session, which can be used to communicate
Expand Down Expand Up @@ -1045,7 +1100,7 @@ where
schema_agreement_automatic_waiting: config.schema_agreement_automatic_waiting,
refresh_metadata_on_auto_schema_agreement: config
.refresh_metadata_on_auto_schema_agreement,
keyspace_name: ArcSwapOption::default(), // will be set by use_keyspace
keyspace_name: Arc::new(ArcSwapOption::default()), // will be set by use_keyspace
tracing_info_fetch_attempts: config.tracing_info_fetch_attempts,
tracing_info_fetch_interval: config.tracing_info_fetch_interval,
tracing_info_fetch_consistency: config.tracing_info_fetch_consistency,
Expand Down
41 changes: 41 additions & 0 deletions scylla/src/transport/session_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3150,3 +3150,44 @@ async fn test_deserialize_empty_collections() {
.await;
assert!(map.is_empty());
}

#[tokio::test]
async fn test_api_migration_session_sharing() {
{
let session = create_new_session_builder().build().await.unwrap();
let session_shared = session.make_shared_session_with_legacy_api();

// If we are unlucky then we will race with metadata fetch/cluster update
// and both invocations will return different cluster data. This should be
// SUPER rare, but in order to reduce the chance of flakiness to a minimum
// we will try it three times in a row. Cluster data is updated once per
// minute, so this should be good enough.
let mut matched = false;
for _ in 0..3 {
let cd1 = session.get_cluster_data();
let cd2 = session_shared.get_cluster_data();

if Arc::ptr_eq(&cd1, &cd2) {
matched = true;
break;
}
}
assert!(matched);
}
{
let session = create_new_session_builder().build_legacy().await.unwrap();
let session_shared = session.make_shared_session_with_new_api();

let mut matched = false;
for _ in 0..3 {
let cd1 = session.get_cluster_data();
let cd2 = session_shared.get_cluster_data();

if Arc::ptr_eq(&cd1, &cd2) {
matched = true;
break;
}
}
assert!(matched);
}
}

0 comments on commit 9d82dd2

Please sign in to comment.