Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster Scan - glide-core and python #1623

Merged
merged 1 commit into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/lint-rust/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ runs:
github-token: ${{ inputs.github-token }}

- uses: Swatinem/rust-cache@v2
with:
github-token: ${{ inputs.github-token }}

- run: cargo fmt --all -- --check
working-directory: ${{ inputs.cargo-toml-folder }}
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
* Python: Added FCALL_RO command ([#1721](https://github.com/aws/glide-for-redis/pull/1721))
* Python: Added WATCH and UNWATCH command ([#1736](https://github.com/aws/glide-for-redis/pull/1736))
* Python: Added LPos command ([#1740](https://github.com/aws/glide-for-redis/pull/1740))
* Python: Added SCAN command ([#1623](https://github.com/aws/glide-for-redis/pull/1623))

### Breaking Changes
* Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494))
Expand Down
1 change: 1 addition & 0 deletions glide-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ directories = { version = "4.0", optional = true }
once_cell = "1.18.0"
arcstr = "1.1.5"
sha1_smol = "1.0.0"
nanoid = "0.4.0"

[features]
socket-layer = ["directories", "integer-encoding", "num_cpus", "protobuf", "tokio-util"]
Expand Down
49 changes: 47 additions & 2 deletions glide-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
*/
mod types;

use crate::cluster_scan_container::insert_cluster_scan_cursor;
use crate::scripts_container::get_script;
use futures::FutureExt;
use logger_core::log_info;
use redis::aio::ConnectionLike;
use redis::cluster_async::ClusterConnection;
use redis::cluster_routing::{Routable, RoutingInfo, SingleNodeRoutingInfo};
use redis::{Cmd, ErrorKind, PushInfo, Value};
use redis::{RedisError, RedisResult};
use redis::{Cmd, ErrorKind, ObjectType, PushInfo, RedisError, RedisResult, ScanStateRC, Value};
pub use standalone_client::StandaloneClient;
use std::io;
use std::time::Duration;
Expand All @@ -28,6 +28,7 @@ pub const DEFAULT_RESPONSE_TIMEOUT: Duration = Duration::from_millis(250);
pub const DEFAULT_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_millis(250);
pub const DEFAULT_PERIODIC_CHECKS_INTERVAL: Duration = Duration::from_secs(60);
pub const INTERNAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(250);
pub const FINISHED_SCAN_CURSOR: &str = "finished";

pub(super) fn get_port(address: &NodeAddress) -> u16 {
const DEFAULT_PORT: u16 = 6379;
Expand Down Expand Up @@ -245,6 +246,50 @@ impl Client {
.boxed()
}

// Cluster scan is not passed to redis-rs as a regular command, so we need to handle it separately.
// We send the command to a specific function in the redis-rs cluster client, which internally handles the
// the complication of a command scan, and generate the command base on the logic in the redis-rs library.
//
// The function returns a tuple with the cursor and the keys found in the scan.
// The cursor is not a regular cursor, but an ARC to a struct that contains the cursor and the data needed
// to continue the scan called ScanState.
// In order to avoid passing Rust GC to clean the ScanState when the cursor (ref) is passed to the wrapper,
// which means that Rust layer is not aware of the cursor anymore, we need to keep the ScanState alive.
// We do that by storing the ScanState in a global container, and return a cursor-id of the cursor to the wrapper.
//
// The wrapper create an object contain the cursor-id with a drop function that will remove the cursor from the container.
// When the ref is removed from the hash-map, there's no more references to the ScanState, and the GC will clean it.
pub async fn cluster_scan<'a>(
avifenesh marked this conversation as resolved.
Show resolved Hide resolved
avifenesh marked this conversation as resolved.
Show resolved Hide resolved
avifenesh marked this conversation as resolved.
Show resolved Hide resolved
&'a mut self,
scan_state_cursor: &'a ScanStateRC,
match_pattern: &'a Option<&str>,
count: Option<usize>,
object_type: Option<ObjectType>,
) -> RedisResult<Value> {
match self.internal_client {
ClientWrapper::Standalone(_) => {
unreachable!("Cluster scan is not supported in standalone mode")
}
ClientWrapper::Cluster { ref mut client } => {
let (cursor, keys) = client
.cluster_scan(
scan_state_cursor.clone(),
*match_pattern,
count,
object_type,
)
.await?;

let cluster_cursor_id = if cursor.is_finished() {
Value::BulkString(FINISHED_SCAN_CURSOR.into())
} else {
Value::BulkString(insert_cluster_scan_cursor(cursor).into())
};
Ok(Value::Array(vec![cluster_cursor_id, Value::Array(keys)]))
}
}
}

fn get_transaction_values(
pipeline: &redis::Pipeline,
mut values: Vec<Value>,
Expand Down
65 changes: 65 additions & 0 deletions glide-core/src/cluster_scan_container.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0
*/
use logger_core::log_debug;
use nanoid::nanoid;
use once_cell::sync::Lazy;
use redis::{RedisResult, ScanStateRC};
use std::{collections::HashMap, sync::Mutex};

// This is a container for storing the cursor of a cluster scan.
// The cursor for a cluster scan is a ref to the actual ScanState struct in redis-rs.
// In order to avoid dropping it when it is passed between layers of the application,
// we store it in this container and only pass the id of the cursor.
// The cursor is stored in the container and can be retrieved using the id.
// In wrapper layer we wrap the id in an object, which, when dropped, trigger the removal of the cursor from the container.
// When the ref is removed from the container, the actual ScanState struct is dropped by Rust GC.
avifenesh marked this conversation as resolved.
Show resolved Hide resolved

static CONTAINER: Lazy<Mutex<HashMap<String, ScanStateRC>>> =
avifenesh marked this conversation as resolved.
Show resolved Hide resolved
Lazy::new(|| Mutex::new(HashMap::new()));

pub fn insert_cluster_scan_cursor(scan_state: ScanStateRC) -> String {
let id = nanoid!();
CONTAINER.lock().unwrap().insert(id.clone(), scan_state);
log_debug(
"scan_state_cursor insert",
format!(
"Inserted to container scan_state_cursor with id: `{:?}`",
id
),
);
id
}

pub fn get_cluster_scan_cursor(id: String) -> RedisResult<ScanStateRC> {
let scan_state_rc = CONTAINER.lock().unwrap().get(&id).cloned();
log_debug(
"scan_state_cursor get",
format!(
"Retrieved from container scan_state_cursor with id: `{:?}`",
id
),
);
match scan_state_rc {
Some(scan_state_rc) => Ok(scan_state_rc),
None => Err(redis::RedisError::from((
redis::ErrorKind::ResponseError,
"Invalid scan_state_cursor id",
avifenesh marked this conversation as resolved.
Show resolved Hide resolved
format!(
"The scan_state_cursor sent with id: `{:?}` does not exist",
id
),
))),
}
}

pub fn remove_scan_state_cursor(id: String) {
log_debug(
"scan_state_cursor remove",
format!(
"Removed from container scan_state_cursor with id: `{:?}`",
id
),
);
CONTAINER.lock().unwrap().remove(&id);
}
1 change: 1 addition & 0 deletions glide-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ pub use socket_listener::*;
pub mod errors;
pub mod scripts_container;
pub use client::ConnectionRequest;
pub mod cluster_scan_container;
pub mod request_type;
11 changes: 10 additions & 1 deletion glide-core/src/protobuf/redis_request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ enum RequestType {
XAutoClaim = 203;
Wait = 208;
XClaim = 209;
Scan = 210;
}

message Command {
Expand All @@ -268,13 +269,21 @@ message Transaction {
repeated Command commands = 1;
}

message ClusterScan {
avifenesh marked this conversation as resolved.
Show resolved Hide resolved
string cursor = 1;
optional string match_pattern = 2;
optional int64 count = 3;
optional string object_type = 4;
avifenesh marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit problematic. Enum in protobuf is numbers, enum in python need to choose between string to enumerate (as far as i was able when tried)
I wanted to use the same enum in python for both standalone which has to strings, and both cluster mode, so i couldn't use an enumerate enum in python.
The first try was enum as you suggest but i then i tackle this issue.
Do you have any other solution?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I deleted the comment before I saw you responded. I think adding an enum in protobuf might not be the best solution as we already have an enum in the core side and an enum in the wrapper side, it would just add an overhead to add another one in protobuf.
Ideally we would have only a single enum in the core side - and probably another one in protobuf, however because this enum is a user-facing API, we don't want to expose the protobuf enum as a part of the API, and importing the redis-rs obejctType would also require to duplicate each enum in the lib.rs file to map from the ffi-enum to the rust-enum, which wouldn't save us anything. So we probably best keeping it as you did with all other user exposed command options.

}

message RedisRequest {
uint32 callback_idx = 1;

oneof command {
Command single_command = 2;
Transaction transaction = 3;
ScriptInvocation script_invocation = 4;
ClusterScan cluster_scan = 5;
}
Routes route = 5;
Routes route = 6;
}
3 changes: 3 additions & 0 deletions glide-core/src/request_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ pub enum RequestType {
XAutoClaim = 203,
Wait = 208,
XClaim = 209,
Scan = 210,
}

fn get_two_word_command(first: &str, second: &str) -> Cmd {
Expand Down Expand Up @@ -431,6 +432,7 @@ impl From<::protobuf::EnumOrUnknown<ProtobufRequestType>> for RequestType {
ProtobufRequestType::XAutoClaim => RequestType::XAutoClaim,
ProtobufRequestType::Wait => RequestType::Wait,
ProtobufRequestType::XClaim => RequestType::XClaim,
ProtobufRequestType::Scan => RequestType::Scan,
}
}
}
Expand Down Expand Up @@ -646,6 +648,7 @@ impl RequestType {
RequestType::XAutoClaim => Some(cmd("XAUTOCLAIM")),
RequestType::Wait => Some(cmd("WAIT")),
RequestType::XClaim => Some(cmd("XCLAIM")),
RequestType::Scan => Some(cmd("SCAN")),
}
}
}
Loading
Loading