Skip to content

Commit

Permalink
Cluster Scan - glide-core and python (valkey-io#1623)
Browse files Browse the repository at this point in the history
Scan command for Glide-Core and Py
  • Loading branch information
avifenesh authored and cyip10 committed Jul 16, 2024
1 parent 754a971 commit f34722d
Show file tree
Hide file tree
Showing 20 changed files with 948 additions and 25 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/lint-rust/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ runs:
github-token: ${{ inputs.github-token }}

- uses: Swatinem/rust-cache@v2
with:
github-token: ${{ inputs.github-token }}

- run: cargo fmt --all -- --check
working-directory: ${{ inputs.cargo-toml-folder }}
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
* Python: Added FCALL_RO command ([#1721](https://github.com/aws/glide-for-redis/pull/1721))
* Python: Added WATCH and UNWATCH command ([#1736](https://github.com/aws/glide-for-redis/pull/1736))
* Python: Added LPos command ([#1740](https://github.com/aws/glide-for-redis/pull/1740))
* Python: Added SCAN command ([#1623](https://github.com/aws/glide-for-redis/pull/1623))

### Breaking Changes
* Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494))
Expand Down
1 change: 1 addition & 0 deletions glide-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ directories = { version = "4.0", optional = true }
once_cell = "1.18.0"
arcstr = "1.1.5"
sha1_smol = "1.0.0"
nanoid = "0.4.0"

[features]
socket-layer = ["directories", "integer-encoding", "num_cpus", "protobuf", "tokio-util"]
Expand Down
49 changes: 47 additions & 2 deletions glide-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
*/
mod types;

use crate::cluster_scan_container::insert_cluster_scan_cursor;
use crate::scripts_container::get_script;
use futures::FutureExt;
use logger_core::log_info;
use redis::aio::ConnectionLike;
use redis::cluster_async::ClusterConnection;
use redis::cluster_routing::{Routable, RoutingInfo, SingleNodeRoutingInfo};
use redis::{Cmd, ErrorKind, PushInfo, Value};
use redis::{RedisError, RedisResult};
use redis::{Cmd, ErrorKind, ObjectType, PushInfo, RedisError, RedisResult, ScanStateRC, Value};
pub use standalone_client::StandaloneClient;
use std::io;
use std::time::Duration;
Expand All @@ -28,6 +28,7 @@ pub const DEFAULT_RESPONSE_TIMEOUT: Duration = Duration::from_millis(250);
pub const DEFAULT_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_millis(250);
pub const DEFAULT_PERIODIC_CHECKS_INTERVAL: Duration = Duration::from_secs(60);
pub const INTERNAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(250);
pub const FINISHED_SCAN_CURSOR: &str = "finished";

pub(super) fn get_port(address: &NodeAddress) -> u16 {
const DEFAULT_PORT: u16 = 6379;
Expand Down Expand Up @@ -245,6 +246,50 @@ impl Client {
.boxed()
}

// Cluster scan is not passed to redis-rs as a regular command, so we need to handle it separately.
// We send the command to a specific function in the redis-rs cluster client, which internally handles the
// the complication of a command scan, and generate the command base on the logic in the redis-rs library.
//
// The function returns a tuple with the cursor and the keys found in the scan.
// The cursor is not a regular cursor, but an ARC to a struct that contains the cursor and the data needed
// to continue the scan called ScanState.
// In order to avoid passing Rust GC to clean the ScanState when the cursor (ref) is passed to the wrapper,
// which means that Rust layer is not aware of the cursor anymore, we need to keep the ScanState alive.
// We do that by storing the ScanState in a global container, and return a cursor-id of the cursor to the wrapper.
//
// The wrapper create an object contain the cursor-id with a drop function that will remove the cursor from the container.
// When the ref is removed from the hash-map, there's no more references to the ScanState, and the GC will clean it.
pub async fn cluster_scan<'a>(
&'a mut self,
scan_state_cursor: &'a ScanStateRC,
match_pattern: &'a Option<&str>,
count: Option<usize>,
object_type: Option<ObjectType>,
) -> RedisResult<Value> {
match self.internal_client {
ClientWrapper::Standalone(_) => {
unreachable!("Cluster scan is not supported in standalone mode")
}
ClientWrapper::Cluster { ref mut client } => {
let (cursor, keys) = client
.cluster_scan(
scan_state_cursor.clone(),
*match_pattern,
count,
object_type,
)
.await?;

let cluster_cursor_id = if cursor.is_finished() {
Value::BulkString(FINISHED_SCAN_CURSOR.into())
} else {
Value::BulkString(insert_cluster_scan_cursor(cursor).into())
};
Ok(Value::Array(vec![cluster_cursor_id, Value::Array(keys)]))
}
}
}

fn get_transaction_values(
pipeline: &redis::Pipeline,
mut values: Vec<Value>,
Expand Down
65 changes: 65 additions & 0 deletions glide-core/src/cluster_scan_container.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0
*/
use logger_core::log_debug;
use nanoid::nanoid;
use once_cell::sync::Lazy;
use redis::{RedisResult, ScanStateRC};
use std::{collections::HashMap, sync::Mutex};

// This is a container for storing the cursor of a cluster scan.
// The cursor for a cluster scan is a ref to the actual ScanState struct in redis-rs.
// In order to avoid dropping it when it is passed between layers of the application,
// we store it in this container and only pass the id of the cursor.
// The cursor is stored in the container and can be retrieved using the id.
// In wrapper layer we wrap the id in an object, which, when dropped, trigger the removal of the cursor from the container.
// When the ref is removed from the container, the actual ScanState struct is dropped by Rust GC.

static CONTAINER: Lazy<Mutex<HashMap<String, ScanStateRC>>> =
Lazy::new(|| Mutex::new(HashMap::new()));

pub fn insert_cluster_scan_cursor(scan_state: ScanStateRC) -> String {
let id = nanoid!();
CONTAINER.lock().unwrap().insert(id.clone(), scan_state);
log_debug(
"scan_state_cursor insert",
format!(
"Inserted to container scan_state_cursor with id: `{:?}`",
id
),
);
id
}

pub fn get_cluster_scan_cursor(id: String) -> RedisResult<ScanStateRC> {
let scan_state_rc = CONTAINER.lock().unwrap().get(&id).cloned();
log_debug(
"scan_state_cursor get",
format!(
"Retrieved from container scan_state_cursor with id: `{:?}`",
id
),
);
match scan_state_rc {
Some(scan_state_rc) => Ok(scan_state_rc),
None => Err(redis::RedisError::from((
redis::ErrorKind::ResponseError,
"Invalid scan_state_cursor id",
format!(
"The scan_state_cursor sent with id: `{:?}` does not exist",
id
),
))),
}
}

pub fn remove_scan_state_cursor(id: String) {
log_debug(
"scan_state_cursor remove",
format!(
"Removed from container scan_state_cursor with id: `{:?}`",
id
),
);
CONTAINER.lock().unwrap().remove(&id);
}
1 change: 1 addition & 0 deletions glide-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ pub use socket_listener::*;
pub mod errors;
pub mod scripts_container;
pub use client::ConnectionRequest;
pub mod cluster_scan_container;
pub mod request_type;
11 changes: 10 additions & 1 deletion glide-core/src/protobuf/redis_request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ enum RequestType {
XAutoClaim = 203;
Wait = 208;
XClaim = 209;
Scan = 210;
}

message Command {
Expand All @@ -268,13 +269,21 @@ message Transaction {
repeated Command commands = 1;
}

message ClusterScan {
string cursor = 1;
optional string match_pattern = 2;
optional int64 count = 3;
optional string object_type = 4;
}

message RedisRequest {
uint32 callback_idx = 1;

oneof command {
Command single_command = 2;
Transaction transaction = 3;
ScriptInvocation script_invocation = 4;
ClusterScan cluster_scan = 5;
}
Routes route = 5;
Routes route = 6;
}
3 changes: 3 additions & 0 deletions glide-core/src/request_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ pub enum RequestType {
XAutoClaim = 203,
Wait = 208,
XClaim = 209,
Scan = 210,
}

fn get_two_word_command(first: &str, second: &str) -> Cmd {
Expand Down Expand Up @@ -431,6 +432,7 @@ impl From<::protobuf::EnumOrUnknown<ProtobufRequestType>> for RequestType {
ProtobufRequestType::XAutoClaim => RequestType::XAutoClaim,
ProtobufRequestType::Wait => RequestType::Wait,
ProtobufRequestType::XClaim => RequestType::XClaim,
ProtobufRequestType::Scan => RequestType::Scan,
}
}
}
Expand Down Expand Up @@ -646,6 +648,7 @@ impl RequestType {
RequestType::XAutoClaim => Some(cmd("XAUTOCLAIM")),
RequestType::Wait => Some(cmd("WAIT")),
RequestType::XClaim => Some(cmd("XCLAIM")),
RequestType::Scan => Some(cmd("SCAN")),
}
}
}
Loading

0 comments on commit f34722d

Please sign in to comment.