Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
segfaultdoc committed Oct 19, 2023
1 parent e58026c commit 33abab2
Show file tree
Hide file tree
Showing 6 changed files with 774 additions and 0 deletions.
18 changes: 18 additions & 0 deletions runtime-plugin/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "solana-runtime-plugin"
version = "1.18.0"
edition = "2021"

[dependencies]
crossbeam-channel = { workspace = true }
json5 = { workspace = true }
jsonrpc-core = { workspace = true }
jsonrpc-core-client = { workspace = true }
jsonrpc-derive = { workspace = true }
jsonrpc-ipc-server = { workspace = true }
jsonrpc-server-utils = { workspace = true }
libloading = { workspace = true }
log = { workspace = true }
solana-runtime = { workspace = true }
solana-sdk = { workspace = true }
thiserror = { workspace = true }
4 changes: 4 additions & 0 deletions runtime-plugin/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub mod runtime_plugin;
pub mod runtime_plugin_admin_rpc_service;
pub mod runtime_plugin_manager;
pub mod runtime_plugin_service;
41 changes: 41 additions & 0 deletions runtime-plugin/src/runtime_plugin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use {
solana_runtime::{bank_forks::BankForks, commitment::BlockCommitmentCache},
std::{
any::Any,
error,
fmt::Debug,
io,
sync::{atomic::AtomicBool, Arc, RwLock},
},
thiserror::Error,
};

pub type Result<T> = std::result::Result<T, RuntimePluginError>;

/// Errors returned by plugin calls
#[derive(Error, Debug)]
pub enum RuntimePluginError {
/// Error opening the configuration file; for example, when the file
/// is not found or when the validator process has no permission to read it.
#[error("Error opening config file. Error detail: ({0}).")]
ConfigFileOpenError(#[from] io::Error),

/// Any custom error defined by the plugin.
#[error("Plugin-defined custom error. Error message: ({0})")]
Custom(Box<dyn error::Error + Send + Sync>),

#[error("Failed to load a runtime plugin")]
FailedToLoadPlugin(#[from] Box<dyn std::error::Error>),
}

pub struct PluginDependencies {
pub bank_forks: Arc<RwLock<BankForks>>,
pub block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
pub exit: Arc<AtomicBool>,
}

pub trait RuntimePlugin: Any + Debug + Send + Sync {
fn name(&self) -> &'static str;
fn on_load(&mut self, config_file: &str, dependencies: PluginDependencies) -> Result<()>;
fn on_unload(&mut self);
}
312 changes: 312 additions & 0 deletions runtime-plugin/src/runtime_plugin_admin_rpc_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
//! RPC interface to dynamically make changes to runtime plugins.
use {
crossbeam_channel::Sender,
jsonrpc_core::{BoxFuture, ErrorCode, MetaIoHandler, Metadata, Result as JsonRpcResult},
jsonrpc_derive::rpc,
jsonrpc_ipc_server::{
tokio::{self, sync::oneshot::channel as oneshot_channel},
RequestContext, ServerBuilder,
},
jsonrpc_server_utils::tokio::sync::oneshot::Sender as OneShotSender,
log::*,
solana_sdk::exit::Exit,
std::{
path::{Path, PathBuf},
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
},
};

#[derive(Debug)]
pub enum RuntimePluginManagerRpcRequest {
ReloadPlugin {
name: String,
config_file: String,
response_sender: OneShotSender<JsonRpcResult<()>>,
},
UnloadPlugin {
name: String,
response_sender: OneShotSender<JsonRpcResult<()>>,
},
LoadPlugin {
config_file: String,
response_sender: OneShotSender<JsonRpcResult<String>>,
},
ListPlugins {
response_sender: OneShotSender<JsonRpcResult<Vec<String>>>,
},
}

#[rpc]
pub trait RuntimePluginAdminRpc {
type Metadata;

#[rpc(meta, name = "reloadPlugin")]
fn reload_plugin(
&self,
meta: Self::Metadata,
name: String,
config_file: String,
) -> BoxFuture<JsonRpcResult<()>>;

#[rpc(meta, name = "unloadPlugin")]
fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture<JsonRpcResult<()>>;

#[rpc(meta, name = "loadPlugin")]
fn load_plugin(
&self,
meta: Self::Metadata,
config_file: String,
) -> BoxFuture<JsonRpcResult<String>>;

#[rpc(meta, name = "listPlugins")]
fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture<JsonRpcResult<Vec<String>>>;
}

#[derive(Clone)]
pub struct RuntimePluginAdminRpcRequestMetadata {
pub rpc_request_sender: Sender<RuntimePluginManagerRpcRequest>,
pub validator_exit: Arc<RwLock<Exit>>,
}

impl Metadata for RuntimePluginAdminRpcRequestMetadata {}

/// Start the Runtime Plugin Admin RPC interface.
pub fn run(
ledger_path: &Path,
metadata: RuntimePluginAdminRpcRequestMetadata,
plugin_exit: Arc<AtomicBool>,
) {
fn rpc_path(ledger_path: &Path) -> PathBuf {
#[cfg(target_family = "windows")]
{
// More information about the wackiness of pipe names over at
// https://docs.microsoft.com/en-us/windows/win32/ipc/pipe-names
if let Some(ledger_filename) = ledger_path.file_name() {
PathBuf::from(format!(
"\\\\.\\pipe\\{}-runtime_plugin_admin.rpc",
ledger_filename.to_string_lossy()
))
} else {
PathBuf::from("\\\\.\\pipe\\runtime_plugin_admin.rpc")
}
}
#[cfg(not(target_family = "windows"))]
{
ledger_path.join("runtime_plugin_admin.rpc")
}
}

let rpc_path = rpc_path(ledger_path);

let event_loop = tokio::runtime::Builder::new_multi_thread()
.thread_name("solRuntimePluginAdminRpc")
.worker_threads(1)
.enable_all()
.build()
.unwrap();

std::thread::Builder::new()
.name("solAdminRpc".to_string())
.spawn(move || {
let mut io = MetaIoHandler::default();
io.extend_with(RuntimePluginAdminRpcImpl.to_delegate());

let validator_exit = metadata.validator_exit.clone();

match ServerBuilder::with_meta_extractor(io, move |_req: &RequestContext| {
metadata.clone()
})
.event_loop_executor(event_loop.handle().clone())
.start(&format!("{}", rpc_path.display()))
{
Err(e) => {
error!("Unable to start runtime plugin admin rpc service: {e:?}, exiting");
validator_exit.write().unwrap().exit();
}
Ok(server) => {
info!("started runtime plugin admin rpc service!");
let close_handle = server.close_handle();
let c_plugin_exit = plugin_exit.clone();
validator_exit
.write()
.unwrap()
.register_exit(Box::new(move || {
close_handle.close();
c_plugin_exit.store(true, Ordering::Relaxed);
}));

server.wait();
plugin_exit.store(true, Ordering::Relaxed);
}
}
})
.unwrap();
}

pub struct RuntimePluginAdminRpcImpl;
impl RuntimePluginAdminRpc for RuntimePluginAdminRpcImpl {
type Metadata = RuntimePluginAdminRpcRequestMetadata;

fn reload_plugin(
&self,
meta: Self::Metadata,
name: String,
config_file: String,
) -> BoxFuture<JsonRpcResult<()>> {
Box::pin(async move {
let (response_sender, response_receiver) = oneshot_channel();

if meta
.rpc_request_sender
.send(RuntimePluginManagerRpcRequest::ReloadPlugin {
name,
config_file,
response_sender,
})
.is_err()
{
error!("rpc_request_sender channel closed, exiting");
meta.validator_exit.write().unwrap().exit();

return Err(jsonrpc_core::Error {
code: ErrorCode::InternalError,
message: "Internal channel disconnected while sending the request".to_string(),
data: None,
});
}

match response_receiver.await {
Err(_) => {
error!("response_receiver channel closed, exiting");
meta.validator_exit.write().unwrap().exit();
Err(jsonrpc_core::Error {
code: ErrorCode::InternalError,
message: "Internal channel disconnected while awaiting the response"
.to_string(),
data: None,
})
}
Ok(resp) => resp,
}
})
}

fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture<JsonRpcResult<()>> {
Box::pin(async move {
let (response_sender, response_receiver) = oneshot_channel();

if meta
.rpc_request_sender
.send(RuntimePluginManagerRpcRequest::UnloadPlugin {
name,
response_sender,
})
.is_err()
{
error!("rpc_request_sender channel closed, exiting");
meta.validator_exit.write().unwrap().exit();

return Err(jsonrpc_core::Error {
code: ErrorCode::InternalError,
message: "Internal channel disconnected while sending the request".to_string(),
data: None,
});
}

match response_receiver.await {
Err(_) => {
error!("response_receiver channel closed, exiting");
meta.validator_exit.write().unwrap().exit();
Err(jsonrpc_core::Error {
code: ErrorCode::InternalError,
message: "Internal channel disconnected while awaiting the response"
.to_string(),
data: None,
})
}
Ok(resp) => resp,
}
})
}

fn load_plugin(
&self,
meta: Self::Metadata,
config_file: String,
) -> BoxFuture<JsonRpcResult<String>> {
Box::pin(async move {
let (response_sender, response_receiver) = oneshot_channel();

if meta
.rpc_request_sender
.send(RuntimePluginManagerRpcRequest::LoadPlugin {
config_file,
response_sender,
})
.is_err()
{
error!("rpc_request_sender channel closed, exiting");
meta.validator_exit.write().unwrap().exit();

return Err(jsonrpc_core::Error {
code: ErrorCode::InternalError,
message: "Internal channel disconnected while sending the request".to_string(),
data: None,
});
}

match response_receiver.await {
Err(_) => {
error!("response_receiver channel closed, exiting");
meta.validator_exit.write().unwrap().exit();
Err(jsonrpc_core::Error {
code: ErrorCode::InternalError,
message: "Internal channel disconnected while awaiting the response"
.to_string(),
data: None,
})
}
Ok(resp) => resp,
}
})
}

fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture<JsonRpcResult<Vec<String>>> {
Box::pin(async move {
let (response_sender, response_receiver) = oneshot_channel();

if meta
.rpc_request_sender
.send(RuntimePluginManagerRpcRequest::ListPlugins { response_sender })
.is_err()
{
error!("rpc_request_sender channel closed, exiting");
meta.validator_exit.write().unwrap().exit();

return Err(jsonrpc_core::Error {
code: ErrorCode::InternalError,
message: "Internal channel disconnected while sending the request".to_string(),
data: None,
});
}

match response_receiver.await {
Err(_) => {
error!("response_receiver channel closed, exiting");
meta.validator_exit.write().unwrap().exit();
Err(jsonrpc_core::Error {
code: ErrorCode::InternalError,
message: "Internal channel disconnected while awaiting the response"
.to_string(),
data: None,
})
}
Ok(resp) => resp,
}
})
}
}
Loading

0 comments on commit 33abab2

Please sign in to comment.