Skip to content

Commit

Permalink
relay-builder: add read/write policy plugins
Browse files Browse the repository at this point in the history
Closes #675

Signed-off-by: Yuki Kishimoto <[email protected]>
  • Loading branch information
v0l authored and yukibtc committed Dec 12, 2024
1 parent 1fb05ba commit f21ffbd
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 3 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
* sdk: automatically resend event after NIP-42 authentication ([Yuki Kishimoto])
* relay-builder: add NIP42 support ([Yuki Kishimoto])
* relay-builder: add negentropy support ([Yuki Kishimoto])
* relay-builder: add read/write policy plugins ([v0l])

### Fixed

Expand Down Expand Up @@ -894,6 +895,7 @@ added `nostrdb` storage backend, added NIP32 and completed NIP51 support and mor
[rodant]: https://github.com/rodant
[erskingardner]: https://github.com/erskingardner
[J. Azad EMERY]: https://github.com/ethicnology
[v0l]: https://github.com/v0l

<!-- Tags -->
[Unreleased]: https://github.com/rust-nostr/nostr/compare/v0.37.0...HEAD
Expand Down
69 changes: 69 additions & 0 deletions crates/nostr-relay-builder/examples/policy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (c) 2024 Rust Nostr Developers
// Distributed under the MIT software license

use std::collections::HashSet;
use std::net::SocketAddr;
use std::time::Duration;

use nostr_relay_builder::prelude::*;

/// Accept only certain event kinds
#[derive(Debug)]
struct AcceptKinds {
pub kinds: HashSet<Kind>,
}

#[async_trait]
impl WritePolicy for AcceptKinds {
async fn admit_event(&self, event: &Event, _addr: &SocketAddr) -> PolicyResult {
if self.kinds.contains(&event.kind) {
PolicyResult::Accept
} else {
PolicyResult::Reject("kind not accepted".to_string())
}
}
}

/// Reject requests if there are more than [limit] authors in the filter
#[derive(Debug)]
struct RejectAuthorLimit {
pub limit: usize,
}

#[async_trait]
impl QueryPolicy for RejectAuthorLimit {
async fn admit_query(&self, query: &[Filter], _addr: &SocketAddr) -> PolicyResult {
if query
.iter()
.any(|f| f.authors.as_ref().map(|a| a.len()).unwrap_or(0) > self.limit)
{
PolicyResult::Reject("query too expensive".to_string())
} else {
PolicyResult::Accept
}
}
}

#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();

let accept_profile_data = AcceptKinds {
kinds: HashSet::from([Kind::Metadata, Kind::RelayList, Kind::ContactList]),
};

let low_author_limit = RejectAuthorLimit { limit: 2 };

let builder = RelayBuilder::default()
.write_policy(accept_profile_data)
.query_policy(low_author_limit);

let relay = LocalRelay::run(builder).await?;

println!("Url: {}", relay.url());

// Keep up the program
loop {
tokio::time::sleep(Duration::from_secs(60)).await;
}
}
51 changes: 50 additions & 1 deletion crates/nostr-relay-builder/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

//! Relay Builder
use std::net::IpAddr;
use std::fmt;
use std::net::{IpAddr, SocketAddr};
#[cfg(all(feature = "tor", any(target_os = "android", target_os = "ios")))]
use std::path::Path;
#[cfg(feature = "tor")]
Expand Down Expand Up @@ -93,6 +94,28 @@ pub enum RelayBuilderMode {
PublicKey(PublicKey),
}

/// Generic plugin policy response
pub enum PolicyResult {
/// Policy enforces that the event/query should be accepted
Accept,
/// Policy enforces that the event/query should be rejected
Reject(String),
}

/// Custom policy for accepting events into the relay database
#[async_trait]
pub trait WritePolicy: fmt::Debug + Send + Sync {
/// Check if the policy should accept an event
async fn admit_event(&self, event: &Event, addr: &SocketAddr) -> PolicyResult;
}

/// Filters REQ's to the internal relay database
#[async_trait]
pub trait QueryPolicy: fmt::Debug + Send + Sync {
/// Check if the policy should accept a query
async fn admit_query(&self, query: &[Filter], addr: &SocketAddr) -> PolicyResult;
}

/// Testing options
#[derive(Debug, Clone, Default)]
pub struct RelayTestOptions {
Expand Down Expand Up @@ -157,6 +180,10 @@ pub struct RelayBuilder {
pub(crate) max_connections: Option<usize>,
/// Min POW difficulty
pub(crate) min_pow: Option<u8>,
/// Write policy plugins
pub(crate) write_plugins: Vec<Arc<dyn WritePolicy>>,
/// Query policy plugins
pub(crate) query_plugins: Vec<Arc<dyn QueryPolicy>>,
/// Test options
pub(crate) test: RelayTestOptions,
}
Expand All @@ -177,6 +204,8 @@ impl Default for RelayBuilder {
tor: None,
max_connections: None,
min_pow: None,
write_plugins: Vec::new(),
query_plugins: Vec::new(),
test: RelayTestOptions::default(),
}
}
Expand Down Expand Up @@ -250,6 +279,26 @@ impl RelayBuilder {
self
}

/// Add a write policy plugin
#[inline]
pub fn write_policy<T>(mut self, policy: T) -> Self
where
T: WritePolicy + 'static,
{
self.write_plugins.push(Arc::new(policy));
self
}

/// Add a query policy plugin
#[inline]
pub fn query_policy<T>(mut self, policy: T) -> Self
where
T: QueryPolicy + 'static,
{
self.query_plugins.push(Arc::new(policy));
self
}

/// Testing options
#[inline]
pub(crate) fn test(mut self, test: RelayTestOptions) -> Self {
Expand Down
42 changes: 40 additions & 2 deletions crates/nostr-relay-builder/src/local/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use tokio::sync::{broadcast, Semaphore};
use super::session::{Nip42Session, RateLimiterResponse, Session, Tokens};
use super::util;
use crate::builder::{
RateLimit, RelayBuilder, RelayBuilderMode, RelayBuilderNip42, RelayTestOptions,
PolicyResult, QueryPolicy, RateLimit, RelayBuilder, RelayBuilderMode, RelayBuilderNip42,
RelayTestOptions, WritePolicy,
};
use crate::error::Error;

Expand All @@ -39,6 +40,8 @@ pub(super) struct InnerLocalRelay {
min_pow: Option<u8>, // TODO: use AtomicU8 to allow to change it?
#[cfg(feature = "tor")]
hidden_service: Option<String>,
write_policy: Vec<Arc<dyn WritePolicy>>,
query_policy: Vec<Arc<dyn QueryPolicy>>,
nip42: Option<RelayBuilderNip42>,
test: RelayTestOptions,
}
Expand Down Expand Up @@ -102,6 +105,8 @@ impl InnerLocalRelay {
min_pow: builder.min_pow,
#[cfg(feature = "tor")]
hidden_service,
write_policy: builder.write_plugins,
query_policy: builder.query_plugins,
nip42: builder.nip42,
test: builder.test,
};
Expand Down Expand Up @@ -187,7 +192,7 @@ impl InnerLocalRelay {
match msg {
Message::Text(json) => {
tracing::trace!("Received {json}");
self.handle_client_msg(&mut session, &mut tx, ClientMessage::from_json(json)?)
self.handle_client_msg(&mut session, &mut tx, ClientMessage::from_json(json)?, &addr)
.await?;
}
Message::Binary(..) => {
Expand Down Expand Up @@ -238,6 +243,7 @@ impl InnerLocalRelay {
session: &mut Session,
ws_tx: &mut WsTx,
msg: ClientMessage,
addr: &SocketAddr,
) -> Result<()> {
match msg {
ClientMessage::Event(event) => {
Expand Down Expand Up @@ -311,6 +317,23 @@ impl InnerLocalRelay {
}
}

// check write policy
for policy in self.write_policy.iter() {
let event_id = event.id;
if let PolicyResult::Reject(m) = policy.admit_event(&event, addr).await {
return self
.send_msg(
ws_tx,
RelayMessage::Ok {
event_id,
status: false,
message: format!("{}: {}", MachineReadablePrefix::Blocked, m),
},
)
.await;
}
}

// Check if event already exists
let event_status = self.database.check_id(&event.id).await?;
match event_status {
Expand Down Expand Up @@ -508,6 +531,21 @@ impl InnerLocalRelay {
}
}

// check query policy plugins
for plugin in self.query_policy.iter() {
if let PolicyResult::Reject(msg) = plugin.admit_query(&filters, addr).await {
return self
.send_msg(
ws_tx,
RelayMessage::Closed {
subscription_id,
message: format!("{}: {}", MachineReadablePrefix::Error, msg),
},
)
.await;
}
}

// Update session subscriptions
session
.subscriptions
Expand Down

0 comments on commit f21ffbd

Please sign in to comment.