diff --git a/Cargo.lock b/Cargo.lock index cdc6eae453..ba109e1d38 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6243,6 +6243,7 @@ dependencies = [ "solana-rpc", "solana-rpc-client-api", "solana-runtime", + "solana-runtime-plugin", "solana-sdk", "solana-send-transaction-service", "solana-stake-program", @@ -7386,6 +7387,24 @@ dependencies = [ "zstd", ] +[[package]] +name = "solana-runtime-plugin" +version = "1.18.0" +dependencies = [ + "crossbeam-channel", + "json5", + "jsonrpc-core", + "jsonrpc-core-client", + "jsonrpc-derive", + "jsonrpc-ipc-server", + "jsonrpc-server-utils", + "libloading", + "log", + "solana-runtime", + "solana-sdk", + "thiserror", +] + [[package]] name = "solana-sdk" version = "1.18.0" @@ -7907,6 +7926,7 @@ dependencies = [ "solana-rpc-client", "solana-rpc-client-api", "solana-runtime", + "solana-runtime-plugin", "solana-sdk", "solana-send-transaction-service", "solana-storage-bigtable", diff --git a/Cargo.toml b/Cargo.toml index 58db6628d8..8f8d7d736c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -86,6 +86,7 @@ members = [ "rpc-client-nonce-utils", "rpc-test", "runtime", + "runtime-plugin", "runtime/store-tool", "sdk", "sdk/cargo-build-bpf", @@ -370,6 +371,7 @@ solana-rpc-client = { path = "rpc-client", version = "=1.18.0", default-features solana-rpc-client-api = { path = "rpc-client-api", version = "=1.18.0" } solana-rpc-client-nonce-utils = { path = "rpc-client-nonce-utils", version = "=1.18.0" } solana-runtime = { path = "runtime", version = "=1.18.0" } +solana-runtime-plugin = { path = "runtime-plugin", version = "=1.18.0" } solana-sdk = { path = "sdk", version = "=1.18.0" } solana-sdk-macro = { path = "sdk/macro", version = "=1.18.0" } solana-send-transaction-service = { path = "send-transaction-service", version = "=1.18.0" } diff --git a/core/Cargo.toml b/core/Cargo.toml index e3377370e6..32da4c3ba1 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -69,6 +69,7 @@ solana-rayon-threadlimit = { workspace = true } solana-rpc = { workspace = true } solana-rpc-client-api = { workspace = true } solana-runtime = { workspace = true } +solana-runtime-plugin = { workspace = true } solana-sdk = { workspace = true } solana-send-transaction-service = { workspace = true } solana-streamer = { workspace = true } diff --git a/core/src/validator.rs b/core/src/validator.rs index 0c90fe09ca..bec6922485 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -106,6 +106,10 @@ use { self, clean_orphaned_account_snapshot_dirs, move_and_async_delete_path_contents, }, }, + solana_runtime_plugin::{ + runtime_plugin_admin_rpc_service::RuntimePluginManagerRpcRequest, + runtime_plugin_service::RuntimePluginService, + }, solana_sdk::{ clock::Slot, epoch_schedule::MAX_LEADER_SCHEDULE_EPOCH_OFFSET, @@ -498,6 +502,10 @@ impl Validator { cluster_entrypoints: Vec, config: &ValidatorConfig, should_check_duplicate_instance: bool, + runtime_plugin_configs_and_request_rx: Option<( + Vec, + Receiver, + )>, rpc_to_plugin_manager_receiver: Option>, start_progress: Arc>, socket_addr_space: SocketAddrSpace, @@ -889,6 +897,17 @@ impl Validator { None, )); + if let Some((runtime_plugin_configs, request_rx)) = runtime_plugin_configs_and_request_rx { + RuntimePluginService::start( + &runtime_plugin_configs, + request_rx, + bank_forks.clone(), + block_commitment_cache.clone(), + exit.clone(), + ) + .unwrap(); + } + let max_slots = Arc::new(MaxSlots::default()); let (completed_data_sets_sender, completed_data_sets_receiver) = bounded(MAX_COMPLETED_DATA_SETS_IN_CHANNEL); @@ -2505,6 +2524,7 @@ mod tests { vec![LegacyContactInfo::try_from(&leader_node.info).unwrap()], &config, true, // should_check_duplicate_instance + None, None, // rpc_to_plugin_manager_receiver start_progress.clone(), SocketAddrSpace::Unspecified, @@ -2589,7 +2609,8 @@ mod tests { Arc::new(RwLock::new(vec![Arc::new(vote_account_keypair)])), vec![LegacyContactInfo::try_from(&leader_node.info).unwrap()], &config, - true, // should_check_duplicate_instance. + true, // should_check_duplicate_instance + None, None, // rpc_to_plugin_manager_receiver Arc::new(RwLock::new(ValidatorStartProgress::default())), SocketAddrSpace::Unspecified, diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index d180a4abaf..b587e1502d 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -288,6 +288,7 @@ impl LocalCluster { vec![], &leader_config, true, // should_check_duplicate_instance + None, None, // rpc_to_plugin_manager_receiver Arc::new(RwLock::new(ValidatorStartProgress::default())), socket_addr_space, @@ -503,6 +504,7 @@ impl LocalCluster { vec![LegacyContactInfo::try_from(&self.entry_point_info).unwrap()], &config, true, // should_check_duplicate_instance + None, None, // rpc_to_plugin_manager_receiver Arc::new(RwLock::new(ValidatorStartProgress::default())), socket_addr_space, @@ -900,6 +902,7 @@ impl Cluster for LocalCluster { .unwrap_or_default(), &safe_clone_config(&cluster_validator_info.config), true, // should_check_duplicate_instance + None, None, // rpc_to_plugin_manager_receiver Arc::new(RwLock::new(ValidatorStartProgress::default())), socket_addr_space, diff --git a/runtime-plugin/Cargo.toml b/runtime-plugin/Cargo.toml new file mode 100644 index 0000000000..2733ddb636 --- /dev/null +++ b/runtime-plugin/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "solana-runtime-plugin" +version = "1.18.0" +edition = "2021" + +[dependencies] +crossbeam-channel = { workspace = true } +json5 = { workspace = true } +jsonrpc-core = { workspace = true } +jsonrpc-core-client = { workspace = true, features = ["ipc"] } +jsonrpc-derive = { workspace = true } +jsonrpc-ipc-server = { workspace = true } +jsonrpc-server-utils = { workspace = true } +libloading = { workspace = true } +log = { workspace = true } +solana-runtime = { workspace = true } +solana-sdk = { workspace = true } +thiserror = { workspace = true } diff --git a/runtime-plugin/src/lib.rs b/runtime-plugin/src/lib.rs new file mode 100644 index 0000000000..477af43c9b --- /dev/null +++ b/runtime-plugin/src/lib.rs @@ -0,0 +1,4 @@ +pub mod runtime_plugin; +pub mod runtime_plugin_admin_rpc_service; +pub mod runtime_plugin_manager; +pub mod runtime_plugin_service; diff --git a/runtime-plugin/src/runtime_plugin.rs b/runtime-plugin/src/runtime_plugin.rs new file mode 100644 index 0000000000..7dc0b95fa4 --- /dev/null +++ b/runtime-plugin/src/runtime_plugin.rs @@ -0,0 +1,41 @@ +use { + solana_runtime::{bank_forks::BankForks, commitment::BlockCommitmentCache}, + std::{ + any::Any, + error, + fmt::Debug, + io, + sync::{atomic::AtomicBool, Arc, RwLock}, + }, + thiserror::Error, +}; + +pub type Result = std::result::Result; + +/// Errors returned by plugin calls +#[derive(Error, Debug)] +pub enum RuntimePluginError { + /// Error opening the configuration file; for example, when the file + /// is not found or when the validator process has no permission to read it. + #[error("Error opening config file. Error detail: ({0}).")] + ConfigFileOpenError(#[from] io::Error), + + /// Any custom error defined by the plugin. + #[error("Plugin-defined custom error. Error message: ({0})")] + Custom(Box), + + #[error("Failed to load a runtime plugin")] + FailedToLoadPlugin(#[from] Box), +} + +pub struct PluginDependencies { + pub bank_forks: Arc>, + pub block_commitment_cache: Arc>, + pub exit: Arc, +} + +pub trait RuntimePlugin: Any + Debug + Send + Sync { + fn name(&self) -> &'static str; + fn on_load(&mut self, config_file: &str, dependencies: PluginDependencies) -> Result<()>; + fn on_unload(&mut self); +} diff --git a/runtime-plugin/src/runtime_plugin_admin_rpc_service.rs b/runtime-plugin/src/runtime_plugin_admin_rpc_service.rs new file mode 100644 index 0000000000..fdc33b06c5 --- /dev/null +++ b/runtime-plugin/src/runtime_plugin_admin_rpc_service.rs @@ -0,0 +1,326 @@ +//! RPC interface to dynamically make changes to runtime plugins. + +use { + crossbeam_channel::Sender, + jsonrpc_core::{BoxFuture, ErrorCode, MetaIoHandler, Metadata, Result as JsonRpcResult}, + jsonrpc_core_client::{transports::ipc, RpcError}, + jsonrpc_derive::rpc, + jsonrpc_ipc_server::{ + tokio::{self, sync::oneshot::channel as oneshot_channel}, + RequestContext, ServerBuilder, + }, + jsonrpc_server_utils::tokio::sync::oneshot::Sender as OneShotSender, + log::*, + solana_sdk::exit::Exit, + std::{ + path::{Path, PathBuf}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, + }, + }, +}; + +#[derive(Debug)] +pub enum RuntimePluginManagerRpcRequest { + ReloadPlugin { + name: String, + config_file: String, + response_sender: OneShotSender>, + }, + UnloadPlugin { + name: String, + response_sender: OneShotSender>, + }, + LoadPlugin { + config_file: String, + response_sender: OneShotSender>, + }, + ListPlugins { + response_sender: OneShotSender>>, + }, +} + +#[rpc] +pub trait RuntimePluginAdminRpc { + type Metadata; + + #[rpc(meta, name = "reloadPlugin")] + fn reload_plugin( + &self, + meta: Self::Metadata, + name: String, + config_file: String, + ) -> BoxFuture>; + + #[rpc(meta, name = "unloadPlugin")] + fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture>; + + #[rpc(meta, name = "loadPlugin")] + fn load_plugin( + &self, + meta: Self::Metadata, + config_file: String, + ) -> BoxFuture>; + + #[rpc(meta, name = "listPlugins")] + fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture>>; +} + +#[derive(Clone)] +pub struct RuntimePluginAdminRpcRequestMetadata { + pub rpc_request_sender: Sender, + pub validator_exit: Arc>, +} + +impl Metadata for RuntimePluginAdminRpcRequestMetadata {} + +fn rpc_path(ledger_path: &Path) -> PathBuf { + #[cfg(target_family = "windows")] + { + // More information about the wackiness of pipe names over at + // https://docs.microsoft.com/en-us/windows/win32/ipc/pipe-names + if let Some(ledger_filename) = ledger_path.file_name() { + PathBuf::from(format!( + "\\\\.\\pipe\\{}-runtime_plugin_admin.rpc", + ledger_filename.to_string_lossy() + )) + } else { + PathBuf::from("\\\\.\\pipe\\runtime_plugin_admin.rpc") + } + } + #[cfg(not(target_family = "windows"))] + { + ledger_path.join("runtime_plugin_admin.rpc") + } +} + +/// Start the Runtime Plugin Admin RPC interface. +pub fn run( + ledger_path: &Path, + metadata: RuntimePluginAdminRpcRequestMetadata, + plugin_exit: Arc, +) { + let rpc_path = rpc_path(ledger_path); + + let event_loop = tokio::runtime::Builder::new_multi_thread() + .thread_name("solRuntimePluginAdminRpc") + .worker_threads(1) + .enable_all() + .build() + .unwrap(); + + std::thread::Builder::new() + .name("solAdminRpc".to_string()) + .spawn(move || { + let mut io = MetaIoHandler::default(); + io.extend_with(RuntimePluginAdminRpcImpl.to_delegate()); + + let validator_exit = metadata.validator_exit.clone(); + + match ServerBuilder::with_meta_extractor(io, move |_req: &RequestContext| { + metadata.clone() + }) + .event_loop_executor(event_loop.handle().clone()) + .start(&format!("{}", rpc_path.display())) + { + Err(e) => { + error!("Unable to start runtime plugin admin rpc service: {e:?}, exiting"); + validator_exit.write().unwrap().exit(); + } + Ok(server) => { + info!("started runtime plugin admin rpc service!"); + let close_handle = server.close_handle(); + let c_plugin_exit = plugin_exit.clone(); + validator_exit + .write() + .unwrap() + .register_exit(Box::new(move || { + close_handle.close(); + c_plugin_exit.store(true, Ordering::Relaxed); + })); + + server.wait(); + plugin_exit.store(true, Ordering::Relaxed); + } + } + }) + .unwrap(); +} + +pub struct RuntimePluginAdminRpcImpl; +impl RuntimePluginAdminRpc for RuntimePluginAdminRpcImpl { + type Metadata = RuntimePluginAdminRpcRequestMetadata; + + fn reload_plugin( + &self, + meta: Self::Metadata, + name: String, + config_file: String, + ) -> BoxFuture> { + Box::pin(async move { + let (response_sender, response_receiver) = oneshot_channel(); + + if meta + .rpc_request_sender + .send(RuntimePluginManagerRpcRequest::ReloadPlugin { + name, + config_file, + response_sender, + }) + .is_err() + { + error!("rpc_request_sender channel closed, exiting"); + meta.validator_exit.write().unwrap().exit(); + + return Err(jsonrpc_core::Error { + code: ErrorCode::InternalError, + message: "Internal channel disconnected while sending the request".to_string(), + data: None, + }); + } + + match response_receiver.await { + Err(_) => { + error!("response_receiver channel closed, exiting"); + meta.validator_exit.write().unwrap().exit(); + Err(jsonrpc_core::Error { + code: ErrorCode::InternalError, + message: "Internal channel disconnected while awaiting the response" + .to_string(), + data: None, + }) + } + Ok(resp) => resp, + } + }) + } + + fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture> { + Box::pin(async move { + let (response_sender, response_receiver) = oneshot_channel(); + + if meta + .rpc_request_sender + .send(RuntimePluginManagerRpcRequest::UnloadPlugin { + name, + response_sender, + }) + .is_err() + { + error!("rpc_request_sender channel closed, exiting"); + meta.validator_exit.write().unwrap().exit(); + + return Err(jsonrpc_core::Error { + code: ErrorCode::InternalError, + message: "Internal channel disconnected while sending the request".to_string(), + data: None, + }); + } + + match response_receiver.await { + Err(_) => { + error!("response_receiver channel closed, exiting"); + meta.validator_exit.write().unwrap().exit(); + Err(jsonrpc_core::Error { + code: ErrorCode::InternalError, + message: "Internal channel disconnected while awaiting the response" + .to_string(), + data: None, + }) + } + Ok(resp) => resp, + } + }) + } + + fn load_plugin( + &self, + meta: Self::Metadata, + config_file: String, + ) -> BoxFuture> { + Box::pin(async move { + let (response_sender, response_receiver) = oneshot_channel(); + + if meta + .rpc_request_sender + .send(RuntimePluginManagerRpcRequest::LoadPlugin { + config_file, + response_sender, + }) + .is_err() + { + error!("rpc_request_sender channel closed, exiting"); + meta.validator_exit.write().unwrap().exit(); + + return Err(jsonrpc_core::Error { + code: ErrorCode::InternalError, + message: "Internal channel disconnected while sending the request".to_string(), + data: None, + }); + } + + match response_receiver.await { + Err(_) => { + error!("response_receiver channel closed, exiting"); + meta.validator_exit.write().unwrap().exit(); + Err(jsonrpc_core::Error { + code: ErrorCode::InternalError, + message: "Internal channel disconnected while awaiting the response" + .to_string(), + data: None, + }) + } + Ok(resp) => resp, + } + }) + } + + fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture>> { + Box::pin(async move { + let (response_sender, response_receiver) = oneshot_channel(); + + if meta + .rpc_request_sender + .send(RuntimePluginManagerRpcRequest::ListPlugins { response_sender }) + .is_err() + { + error!("rpc_request_sender channel closed, exiting"); + meta.validator_exit.write().unwrap().exit(); + + return Err(jsonrpc_core::Error { + code: ErrorCode::InternalError, + message: "Internal channel disconnected while sending the request".to_string(), + data: None, + }); + } + + match response_receiver.await { + Err(_) => { + error!("response_receiver channel closed, exiting"); + meta.validator_exit.write().unwrap().exit(); + Err(jsonrpc_core::Error { + code: ErrorCode::InternalError, + message: "Internal channel disconnected while awaiting the response" + .to_string(), + data: None, + }) + } + Ok(resp) => resp, + } + }) + } +} + +// Connect to the Runtime Plugin RPC interface +pub async fn connect(ledger_path: &Path) -> Result { + let rpc_path = rpc_path(ledger_path); + if !rpc_path.exists() { + Err(RpcError::Client(format!( + "{} does not exist", + rpc_path.display() + ))) + } else { + ipc::connect::<_, gen_client::Client>(&format!("{}", rpc_path.display())).await + } +} diff --git a/runtime-plugin/src/runtime_plugin_manager.rs b/runtime-plugin/src/runtime_plugin_manager.rs new file mode 100644 index 0000000000..13e94973ce --- /dev/null +++ b/runtime-plugin/src/runtime_plugin_manager.rs @@ -0,0 +1,278 @@ +use { + crate::runtime_plugin::{PluginDependencies, RuntimePlugin}, + jsonrpc_core::{serde_json, ErrorCode, Result as JsonRpcResult}, + libloading::Library, + log::*, + solana_runtime::{bank_forks::BankForks, commitment::BlockCommitmentCache}, + std::{ + fs::File, + io::Read, + path::{Path, PathBuf}, + sync::{atomic::AtomicBool, Arc, RwLock}, + }, +}; + +#[derive(thiserror::Error, Debug)] +pub enum RuntimePluginManagerError { + #[error("Cannot open the the plugin config file")] + CannotOpenConfigFile(String), + + #[error("Cannot read the the plugin config file")] + CannotReadConfigFile(String), + + #[error("The config file is not in a valid Json format")] + InvalidConfigFileFormat(String), + + #[error("Plugin library path is not specified in the config file")] + LibPathNotSet, + + #[error("Invalid plugin path")] + InvalidPluginPath, + + #[error("Cannot load plugin shared library")] + PluginLoadError(String), + + #[error("The runtime plugin {0} is already loaded shared library")] + PluginAlreadyLoaded(String), + + #[error("The RuntimePlugin on_load method failed")] + PluginStartError(String), +} + +pub struct RuntimePluginManager { + plugins: Vec>, + libs: Vec, + bank_forks: Arc>, + block_commitment_cache: Arc>, + exit: Arc, +} + +impl RuntimePluginManager { + pub fn new( + bank_forks: Arc>, + block_commitment_cache: Arc>, + exit: Arc, + ) -> Self { + Self { + plugins: vec![], + libs: vec![], + bank_forks, + block_commitment_cache, + exit, + } + } + + /// This method allows dynamic loading of a runtime plugin. + /// Adds to the existing list of loaded plugins. + pub(crate) fn load_plugin( + &mut self, + plugin_config_path: impl AsRef, + ) -> JsonRpcResult { + // First load plugin + let (mut new_plugin, new_lib, config_file) = + load_plugin_from_config(plugin_config_path.as_ref()).map_err(|e| { + jsonrpc_core::Error { + code: ErrorCode::InvalidRequest, + message: format!("Failed to load plugin: {e}"), + data: None, + } + })?; + + // Then see if a plugin with this name already exists, if so return Err. + let name = new_plugin.name(); + if self.plugins.iter().any(|plugin| name.eq(plugin.name())) { + return Err(jsonrpc_core::Error { + code: ErrorCode::InvalidRequest, + message: format!( + "There already exists a plugin named {} loaded. Did not load requested plugin", + name, + ), + data: None, + }); + } + + new_plugin + .on_load( + config_file, + PluginDependencies { + bank_forks: self.bank_forks.clone(), + block_commitment_cache: self.block_commitment_cache.clone(), + exit: self.exit.clone(), + }, + ) + .map_err(|on_load_err| jsonrpc_core::Error { + code: ErrorCode::InvalidRequest, + message: format!( + "on_load method of plugin {} failed: {on_load_err}", + new_plugin.name() + ), + data: None, + })?; + + self.plugins.push(new_plugin); + self.libs.push(new_lib); + + Ok(name.to_string()) + } + + /// Unloads the plugins and loaded plugin libraries, making sure to fire + /// their `on_plugin_unload()` methods so they can do any necessary cleanup. + pub(crate) fn unload_all_plugins(&mut self) { + (0..self.plugins.len()).for_each(|idx| { + self.try_drop_plugin(idx); + }); + } + + pub(crate) fn unload_plugin(&mut self, name: &str) -> JsonRpcResult<()> { + // Check if any plugin names match this one + let idx = if let Some(idx) = self + .plugins + .iter() + .position(|plugin| plugin.name().eq(name)) + { + idx + } else { + // If we don't find one return an error + return Err(jsonrpc_core::error::Error { + code: ErrorCode::InvalidRequest, + message: String::from("The plugin you requested to unload is not loaded"), + data: None, + }); + }; + + // Unload and drop plugin and lib + self.try_drop_plugin(idx); + + Ok(()) + } + + /// Reloads an existing plugin. + pub(crate) fn reload_plugin(&mut self, name: &str, config_file: &str) -> JsonRpcResult<()> { + // Check if any plugin names match this one + let idx = if let Some(idx) = self + .plugins + .iter() + .position(|plugin| plugin.name().eq(name)) + { + idx + } else { + // If we don't find one return an error + return Err(jsonrpc_core::error::Error { + code: ErrorCode::InvalidRequest, + message: String::from("The plugin you requested to reload is not loaded"), + data: None, + }); + }; + + self.try_drop_plugin(idx); + + // Try to load plugin, library + // SAFETY: It is up to the validator to ensure this is a valid plugin library. + let (mut new_plugin, new_lib, new_parsed_config_file) = + load_plugin_from_config(config_file.as_ref()).map_err(|err| jsonrpc_core::Error { + code: ErrorCode::InvalidRequest, + message: err.to_string(), + data: None, + })?; + + // Attempt to on_load with new plugin + match new_plugin.on_load( + new_parsed_config_file, + PluginDependencies { + bank_forks: self.bank_forks.clone(), + block_commitment_cache: self.block_commitment_cache.clone(), + exit: self.exit.clone(), + }, + ) { + // On success, push plugin and library + Ok(()) => { + self.plugins.push(new_plugin); + self.libs.push(new_lib); + Ok(()) + } + // On failure, return error + Err(err) => Err(jsonrpc_core::error::Error { + code: ErrorCode::InvalidRequest, + message: format!( + "Failed to start new plugin (previous plugin was dropped!): {err}" + ), + data: None, + }), + } + } + + pub(crate) fn list_plugins(&self) -> JsonRpcResult> { + Ok(self.plugins.iter().map(|p| p.name().to_owned()).collect()) + } + + fn try_drop_plugin(&mut self, idx: usize) { + if idx < self.plugins.len() { + let mut plugin = self.plugins.remove(idx); + let _current_lib = self.libs.remove(idx); + plugin.on_unload(); + } else { + error!("failed to drop plugin: index {idx} out of bounds"); + } + } +} + +fn load_plugin_from_config( + plugin_config_path: &Path, +) -> Result<(Box, Library, &str), RuntimePluginManagerError> { + type PluginConstructor = unsafe fn() -> *mut dyn RuntimePlugin; + use libloading::Symbol; + + let mut file = match File::open(plugin_config_path) { + Ok(file) => file, + Err(err) => { + return Err(RuntimePluginManagerError::CannotOpenConfigFile(format!( + "Failed to open the plugin config file {plugin_config_path:?}, error: {err:?}" + ))); + } + }; + + let mut contents = String::new(); + if let Err(err) = file.read_to_string(&mut contents) { + return Err(RuntimePluginManagerError::CannotReadConfigFile(format!( + "Failed to read the plugin config file {plugin_config_path:?}, error: {err:?}" + ))); + } + + let result: serde_json::Value = match json5::from_str(&contents) { + Ok(value) => value, + Err(err) => { + return Err(RuntimePluginManagerError::InvalidConfigFileFormat(format!( + "The config file {plugin_config_path:?} is not in a valid Json5 format, error: {err:?}" + ))); + } + }; + + let libpath = result["libpath"] + .as_str() + .ok_or(RuntimePluginManagerError::LibPathNotSet)?; + let mut libpath = PathBuf::from(libpath); + if libpath.is_relative() { + let config_dir = plugin_config_path.parent().ok_or_else(|| { + RuntimePluginManagerError::CannotOpenConfigFile(format!( + "Failed to resolve parent of {plugin_config_path:?}", + )) + })?; + libpath = config_dir.join(libpath); + } + + let config_file = plugin_config_path + .as_os_str() + .to_str() + .ok_or(RuntimePluginManagerError::InvalidPluginPath)?; + + let (plugin, lib) = unsafe { + let lib = Library::new(libpath) + .map_err(|e| RuntimePluginManagerError::PluginLoadError(e.to_string()))?; + let constructor: Symbol = lib + .get(b"_create_plugin") + .map_err(|e| RuntimePluginManagerError::PluginLoadError(e.to_string()))?; + (Box::from_raw(constructor()), lib) + }; + + Ok((plugin, lib, config_file)) +} diff --git a/runtime-plugin/src/runtime_plugin_service.rs b/runtime-plugin/src/runtime_plugin_service.rs new file mode 100644 index 0000000000..c446f555a8 --- /dev/null +++ b/runtime-plugin/src/runtime_plugin_service.rs @@ -0,0 +1,121 @@ +use { + crate::{ + runtime_plugin::RuntimePluginError, + runtime_plugin_admin_rpc_service::RuntimePluginManagerRpcRequest, + runtime_plugin_manager::RuntimePluginManager, + }, + crossbeam_channel::Receiver, + log::{error, info}, + solana_runtime::{bank_forks::BankForks, commitment::BlockCommitmentCache}, + std::{ + path::PathBuf, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, + }, + thread::{self, JoinHandle}, + time::Duration, + }, +}; + +pub struct RuntimePluginService { + plugin_manager: Arc>, + rpc_thread: JoinHandle<()>, +} + +impl RuntimePluginService { + pub fn start( + plugin_config_files: &[PathBuf], + rpc_receiver: Receiver, + bank_forks: Arc>, + block_commitment_cache: Arc>, + exit: Arc, + ) -> Result { + let mut plugin_manager = + RuntimePluginManager::new(bank_forks, block_commitment_cache, exit.clone()); + + for config in plugin_config_files { + let name = plugin_manager + .load_plugin(config) + .map_err(|e| RuntimePluginError::FailedToLoadPlugin(e.into()))?; + info!("Loaded Runtime Plugin: {name}"); + } + + let plugin_manager = Arc::new(RwLock::new(plugin_manager)); + let rpc_thread = + Self::start_rpc_request_handler(rpc_receiver, plugin_manager.clone(), exit); + + Ok(Self { + plugin_manager, + rpc_thread, + }) + } + + pub fn join(self) { + self.rpc_thread.join().unwrap(); + self.plugin_manager.write().unwrap().unload_all_plugins(); + } + + fn start_rpc_request_handler( + rpc_receiver: Receiver, + plugin_manager: Arc>, + exit: Arc, + ) -> JoinHandle<()> { + thread::Builder::new() + .name("solRuntimePluginRpc".to_string()) + .spawn(move || { + const TIMEOUT: Duration = Duration::from_secs(3); + while !exit.load(Ordering::Relaxed) { + if let Ok(request) = rpc_receiver.recv_timeout(TIMEOUT) { + match request { + RuntimePluginManagerRpcRequest::ListPlugins { response_sender } => { + let plugin_list = plugin_manager.read().unwrap().list_plugins(); + if response_sender.send(plugin_list).is_err() { + error!("response_sender channel disconnected"); + return; + } + } + RuntimePluginManagerRpcRequest::ReloadPlugin { + ref name, + ref config_file, + response_sender, + } => { + let reload_result = plugin_manager + .write() + .unwrap() + .reload_plugin(name, config_file); + if response_sender.send(reload_result).is_err() { + error!("response_sender channel disconnected"); + return; + } + } + RuntimePluginManagerRpcRequest::LoadPlugin { + ref config_file, + response_sender, + } => { + let load_result = + plugin_manager.write().unwrap().load_plugin(config_file); + if response_sender.send(load_result).is_err() { + error!("response_sender channel disconnected"); + return; + } + } + RuntimePluginManagerRpcRequest::UnloadPlugin { + ref name, + response_sender, + } => { + let unload_result = + plugin_manager.write().unwrap().unload_plugin(name); + if response_sender.send(unload_result).is_err() { + error!("response_sender channel disconnected"); + return; + } + } + } + } + } + plugin_manager.write().unwrap().unload_all_plugins(); + }) + .unwrap() + } +} diff --git a/test-validator/src/lib.rs b/test-validator/src/lib.rs index e807f80c96..8a97786494 100644 --- a/test-validator/src/lib.rs +++ b/test-validator/src/lib.rs @@ -979,6 +979,7 @@ impl TestValidator { vec![], &validator_config, true, // should_check_duplicate_instance + None, rpc_to_plugin_manager_receiver, config.start_progress.clone(), socket_addr_space, diff --git a/validator/Cargo.toml b/validator/Cargo.toml index 69adeea93b..5d1a3270a3 100644 --- a/validator/Cargo.toml +++ b/validator/Cargo.toml @@ -54,6 +54,7 @@ solana-rpc = { workspace = true } solana-rpc-client = { workspace = true } solana-rpc-client-api = { workspace = true } solana-runtime = { workspace = true } +solana-runtime-plugin = { workspace = true } solana-sdk = { workspace = true } solana-send-transaction-service = { workspace = true } solana-storage-bigtable = { workspace = true } diff --git a/validator/src/cli.rs b/validator/src/cli.rs index 0da2e655c0..de2bf41b74 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -1183,6 +1183,14 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { .multiple(true) .help("Specify the configuration file for the Geyser plugin."), ) + .arg( + Arg::with_name("runtime_plugin_config") + .long("runtime-plugin-config") + .value_name("FILE") + .takes_value(true) + .multiple(true) + .help("Specify the configuration file for a Runtime plugin."), + ) .arg( Arg::with_name("snapshot_archive_format") .long("snapshot-archive-format") @@ -1698,6 +1706,48 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { SubCommand::with_name("run") .about("Run the validator") ) + .subcommand( + SubCommand::with_name("runtime-plugin") + .about("Manage and view runtime plugins") + .setting(AppSettings::SubcommandRequiredElseHelp) + .setting(AppSettings::InferSubcommands) + .subcommand( + SubCommand::with_name("list") + .about("List all current running runtime plugins") + ) + .subcommand( + SubCommand::with_name("unload") + .about("Unload a particular runtime plugin. You must specify the runtime plugin name") + .arg( + Arg::with_name("name") + .required(true) + .takes_value(true) + ) + ) + .subcommand( + SubCommand::with_name("reload") + .about("Reload a particular runtime plugin. You must specify the runtime plugin name and the new config path") + .arg( + Arg::with_name("name") + .required(true) + .takes_value(true) + ) + .arg( + Arg::with_name("config") + .required(true) + .takes_value(true) + ) + ) + .subcommand( + SubCommand::with_name("load") + .about("Load a new gesyer plugin. You must specify the config path. Fails if overwriting (use reload)") + .arg( + Arg::with_name("config") + .required(true) + .takes_value(true) + ) + ) + ) .subcommand( SubCommand::with_name("plugin") .about("Manage and view geyser plugins") @@ -2645,6 +2695,14 @@ pub fn test_app<'a>(version: &'a str, default_args: &'a DefaultTestArgs) -> App< .multiple(true) .help("Specify the configuration file for the Geyser plugin."), ) + .arg( + Arg::with_name("runtime_plugin_config") + .long("runtime-plugin-config") + .value_name("FILE") + .takes_value(true) + .multiple(true) + .help("Specify the configuration file for a Runtime plugin."), + ) .arg( Arg::with_name("deactivate_feature") .long("deactivate-feature") diff --git a/validator/src/main.rs b/validator/src/main.rs index d2c666da27..736a977e47 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1,10 +1,12 @@ #![allow(clippy::arithmetic_side_effects)] + #[cfg(not(target_env = "msvc"))] use jemallocator::Jemalloc; use { clap::{crate_name, value_t, value_t_or_exit, values_t, values_t_or_exit, ArgMatches}, console::style, crossbeam_channel::unbounded, + jsonrpc_server_utils::tokio::runtime::Runtime, log::*, rand::{seq::SliceRandom, thread_rng}, solana_accounts_db::{ @@ -57,6 +59,10 @@ use { ArchiveFormat, SnapshotVersion, }, }, + solana_runtime_plugin::{ + runtime_plugin_admin_rpc_service, + runtime_plugin_admin_rpc_service::RuntimePluginAdminRpcRequestMetadata, + }, solana_sdk::{ clock::{Slot, DEFAULT_S_PER_SLOT}, commitment_config::CommitmentConfig, @@ -85,7 +91,7 @@ use { path::{Path, PathBuf}, process::exit, str::FromStr, - sync::{Arc, Mutex, RwLock}, + sync::{atomic::AtomicBool, Arc, Mutex, RwLock}, time::{Duration, SystemTime}, }, }; @@ -680,6 +686,85 @@ pub fn main() { _ => unreachable!(), } } + ("runtime-plugin", Some(plugin_subcommand_matches)) => { + let runtime_plugin_rpc_client = runtime_plugin_admin_rpc_service::connect(&ledger_path); + let runtime = Runtime::new().unwrap(); + match plugin_subcommand_matches.subcommand() { + ("list", _) => { + let plugins = runtime + .block_on( + async move { runtime_plugin_rpc_client.await?.list_plugins().await }, + ) + .unwrap_or_else(|err| { + println!("Failed to list plugins: {err}"); + exit(1); + }); + if !plugins.is_empty() { + println!("Currently the following plugins are loaded:"); + for (plugin, i) in plugins.into_iter().zip(1..) { + println!(" {i}) {plugin}"); + } + } else { + println!("There are currently no plugins loaded"); + } + return; + } + ("unload", Some(subcommand_matches)) => { + if let Ok(name) = value_t!(subcommand_matches, "name", String) { + runtime + .block_on(async { + runtime_plugin_rpc_client + .await? + .unload_plugin(name.clone()) + .await + }) + .unwrap_or_else(|err| { + println!("Failed to unload plugin {name}: {err:?}"); + exit(1); + }); + println!("Successfully unloaded plugin: {name}"); + } + return; + } + ("load", Some(subcommand_matches)) => { + if let Ok(config) = value_t!(subcommand_matches, "config", String) { + let name = runtime + .block_on(async { + runtime_plugin_rpc_client + .await? + .load_plugin(config.clone()) + .await + }) + .unwrap_or_else(|err| { + println!("Failed to load plugin {config}: {err:?}"); + exit(1); + }); + println!("Successfully loaded plugin: {name}"); + } + return; + } + ("reload", Some(subcommand_matches)) => { + if let Ok(name) = value_t!(subcommand_matches, "name", String) { + if let Ok(config) = value_t!(subcommand_matches, "config", String) { + runtime + .block_on(async { + runtime_plugin_rpc_client + .await? + .reload_plugin(name.clone(), config.clone()) + .await + }) + .unwrap_or_else(|err| { + println!("Failed to reload plugin {name}: {err:?}"); + exit(1); + }); + println!("Successfully reloaded plugin: {name}"); + } + } + return; + } + _ => unreachable!(), + } + } ("contact-info", Some(subcommand_matches)) => { let output_mode = subcommand_matches.value_of("output"); let admin_client = admin_rpc_service::connect(&ledger_path); @@ -1814,6 +1899,31 @@ pub fn main() { }, ); + let runtime_plugin_config_and_rpc_rx = { + let plugin_exit = Arc::new(AtomicBool::new(false)); + let (rpc_request_sender, rpc_request_receiver) = unbounded(); + solana_runtime_plugin::runtime_plugin_admin_rpc_service::run( + &ledger_path, + RuntimePluginAdminRpcRequestMetadata { + rpc_request_sender, + validator_exit: validator_config.validator_exit.clone(), + }, + plugin_exit, + ); + + if matches.is_present("runtime_plugin_config") { + ( + values_t_or_exit!(matches, "runtime_plugin_config", String) + .into_iter() + .map(PathBuf::from) + .collect(), + rpc_request_receiver, + ) + } else { + (vec![], rpc_request_receiver) + } + }; + let gossip_host: IpAddr = matches .value_of("gossip_host") .map(|gossip_host| { @@ -1979,6 +2089,7 @@ pub fn main() { cluster_entrypoints, &validator_config, should_check_duplicate_instance, + Some(runtime_plugin_config_and_rpc_rx), rpc_to_plugin_manager_receiver, start_progress, socket_addr_space,