From 9c382bb2011667a7a43e4c8f519b7672807e1ab7 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Thu, 5 Dec 2024 12:42:51 -0700 Subject: [PATCH] feat: adds /peers endpoint With this change a new API endpoint is added for accessing connected peers. The API is similar to the /api/v0/swarm/peers endpoint however it allows for multiple addresses and does not have to remain Kubo RPC compatible. The endpoint allows posting an address and therefore connecting to a new peer. Fixes: #608 --- Cargo.lock | 1 + Cargo.toml | 5 +- api-server/.openapi-generator/FILES | 2 + api-server/README.md | 10 +- api-server/api/openapi.yaml | 105 ++++++++++ api-server/docs/Peer.md | 11 + api-server/docs/Peers.md | 10 + api-server/docs/default_api.md | 75 +++++++ api-server/examples/client/main.rs | 32 ++- api-server/examples/server/server.rs | 39 +++- api-server/src/client/mod.rs | 285 +++++++++++++++++++++++++- api-server/src/lib.rs | 76 +++++++ api-server/src/models.rs | 288 +++++++++++++++++++++++++++ api-server/src/server/mod.rs | 186 ++++++++++++++++- api/Cargo.toml | 1 + api/ceramic.yaml | 84 ++++++++ api/src/lib.rs | 2 +- api/src/server.rs | 148 ++++++++++++-- api/src/tests.rs | 198 +++++++++++++++++- core/src/lib.rs | 2 +- core/src/peer.rs | 3 +- one/src/daemon.rs | 1 + one/src/network.rs | 56 +++++- 23 files changed, 1583 insertions(+), 37 deletions(-) create mode 100644 api-server/docs/Peer.md create mode 100644 api-server/docs/Peers.md diff --git a/Cargo.lock b/Cargo.lock index 07cb06da2..fd52c5bfb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2137,6 +2137,7 @@ dependencies = [ "ipld-core", "jemalloc_pprof", "mockall", + "multiaddr", "multibase 0.9.1", "object_store", "recon", diff --git a/Cargo.toml b/Cargo.toml index 8f1ab7803..f923947ae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -185,7 +185,10 @@ sha3 = "0.10" smallvec = "1.10" # pragma optimize hangs forver on 0.8, possibly due to libsqlite-sys upgrade sqlx = { version = "0.7", features = ["sqlite", "runtime-tokio", "chrono"] } -ssh-key = { version = "0.5.1", default-features = false } +ssh-key = { version = "0.5.1", default-features = false, features = [ + "std", + "rand_core", +] } ssi = { version = "0.7", features = ["ed25519"] } swagger = { version = "6.1", features = [ "serdejson", diff --git a/api-server/.openapi-generator/FILES b/api-server/.openapi-generator/FILES index 641a50edb..7120523a3 100644 --- a/api-server/.openapi-generator/FILES +++ b/api-server/.openapi-generator/FILES @@ -14,6 +14,8 @@ docs/Interest.md docs/InterestsGet.md docs/InterestsGetInterestsInner.md docs/NetworkInfo.md +docs/Peer.md +docs/Peers.md docs/StreamState.md docs/Version.md docs/default_api.md diff --git a/api-server/README.md b/api-server/README.md index f1649c487..a308ce791 100644 --- a/api-server/README.md +++ b/api-server/README.md @@ -15,7 +15,7 @@ To see how to make this your own, look here: [README]((https://openapi-generator.tech)) - API version: 0.45.0 -- Build date: 2024-11-25T18:44:34.261235426Z[Etc/UTC] +- Build date: 2024-12-06T08:58:05.827048302-07:00[America/Denver] @@ -82,6 +82,9 @@ cargo run --example client InterestsSortKeySortValueOptions cargo run --example client InterestsSortKeySortValuePost cargo run --example client LivenessGet cargo run --example client LivenessOptions +cargo run --example client PeersGet +cargo run --example client PeersOptions +cargo run --example client PeersPost cargo run --example client StreamsStreamIdGet cargo run --example client StreamsStreamIdOptions cargo run --example client VersionGet @@ -142,6 +145,9 @@ Method | HTTP request | Description [****](docs/default_api.md#) | **POST** /interests/{sort_key}/{sort_value} | Register interest for a sort key [****](docs/default_api.md#) | **GET** /liveness | Test the liveness of the Ceramic node [****](docs/default_api.md#) | **OPTIONS** /liveness | cors +[****](docs/default_api.md#) | **GET** /peers | Get list of connected peers +[****](docs/default_api.md#) | **OPTIONS** /peers | cors +[****](docs/default_api.md#) | **POST** /peers | Connect to a peer [****](docs/default_api.md#) | **GET** /streams/{stream_id} | Get stream state [****](docs/default_api.md#) | **OPTIONS** /streams/{stream_id} | cors [****](docs/default_api.md#) | **GET** /version | Get the version of the Ceramic node @@ -162,6 +168,8 @@ Method | HTTP request | Description - [InterestsGet](docs/InterestsGet.md) - [InterestsGetInterestsInner](docs/InterestsGetInterestsInner.md) - [NetworkInfo](docs/NetworkInfo.md) + - [Peer](docs/Peer.md) + - [Peers](docs/Peers.md) - [StreamState](docs/StreamState.md) - [Version](docs/Version.md) diff --git a/api-server/api/openapi.yaml b/api-server/api/openapi.yaml index b56749dfd..1619a06ba 100644 --- a/api-server/api/openapi.yaml +++ b/api-server/api/openapi.yaml @@ -566,6 +566,68 @@ paths: "200": description: cors summary: cors + /peers: + get: + responses: + "200": + content: + application/json: + schema: + $ref: '#/components/schemas/Peers' + description: success + "500": + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + description: Internal server error + summary: Get list of connected peers + options: + parameters: + - description: "Multiaddress of peer to connect to, at least one address must\ + \ contain the peer id." + explode: true + in: query + name: addresses + required: true + schema: + items: + type: string + type: array + style: form + responses: + "200": + description: cors + summary: cors + post: + parameters: + - description: "Multiaddress of peer to connect to, at least one address must\ + \ contain the peer id." + explode: true + in: query + name: addresses + required: true + schema: + items: + type: string + type: array + style: form + responses: + "204": + description: success + "400": + content: + application/json: + schema: + $ref: '#/components/schemas/BadRequestResponse' + description: bad request + "500": + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + description: Internal server error + summary: Connect to a peer components: requestBodies: EventData: @@ -789,6 +851,49 @@ components: - id title: State of a Ceramic stream type: object + Peers: + example: + peers: + - addresses: + - addresses + - addresses + id: id + - addresses: + - addresses + - addresses + id: id + properties: + peers: + items: + $ref: '#/components/schemas/Peer' + type: array + required: + - peers + title: List of Peers + type: object + Peer: + description: Information about a connected peer + example: + addresses: + - addresses + - addresses + id: id + properties: + id: + description: DID of peer + type: string + addresses: + description: "List of known multiaddress of peer, will always include the\ + \ peer id" + items: + description: Multiaddress where peer may be dialed + type: string + type: array + required: + - addresses + - id + title: Information about a connected peer + type: object _feed_resumeToken_get_200_response: example: resumeToken: resumeToken diff --git a/api-server/docs/Peer.md b/api-server/docs/Peer.md new file mode 100644 index 000000000..1dfd9710b --- /dev/null +++ b/api-server/docs/Peer.md @@ -0,0 +1,11 @@ +# Peer + +## Properties +Name | Type | Description | Notes +------------ | ------------- | ------------- | ------------- +**id** | **String** | DID of peer | +**addresses** | **Vec** | List of known multiaddress of peer, will always include the peer id | + +[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) + + diff --git a/api-server/docs/Peers.md b/api-server/docs/Peers.md new file mode 100644 index 000000000..b943842c5 --- /dev/null +++ b/api-server/docs/Peers.md @@ -0,0 +1,10 @@ +# Peers + +## Properties +Name | Type | Description | Notes +------------ | ------------- | ------------- | ------------- +**peers** | [**Vec**](Peer.md) | | + +[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) + + diff --git a/api-server/docs/default_api.md b/api-server/docs/default_api.md index 5107556b8..e614738ec 100644 --- a/api-server/docs/default_api.md +++ b/api-server/docs/default_api.md @@ -26,6 +26,9 @@ Method | HTTP request | Description ****](default_api.md#) | **POST** /interests/{sort_key}/{sort_value} | Register interest for a sort key ****](default_api.md#) | **GET** /liveness | Test the liveness of the Ceramic node ****](default_api.md#) | **OPTIONS** /liveness | cors +****](default_api.md#) | **GET** /peers | Get list of connected peers +****](default_api.md#) | **OPTIONS** /peers | cors +****](default_api.md#) | **POST** /peers | Connect to a peer ****](default_api.md#) | **GET** /streams/{stream_id} | Get stream state ****](default_api.md#) | **OPTIONS** /streams/{stream_id} | cors ****](default_api.md#) | **GET** /version | Get the version of the Ceramic node @@ -601,6 +604,78 @@ No authorization required [[Back to top]](#) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to Model list]](../README.md#documentation-for-models) [[Back to README]](../README.md) +# **** +> models::Peers () +Get list of connected peers + +### Required Parameters +This endpoint does not need any parameter. + +### Return type + +[**models::Peers**](Peers.md) + +### Authorization + +No authorization required + +### HTTP request headers + + - **Content-Type**: Not defined + - **Accept**: application/json + +[[Back to top]](#) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to Model list]](../README.md#documentation-for-models) [[Back to README]](../README.md) + +# **** +> (addresses) +cors + +### Required Parameters + +Name | Type | Description | Notes +------------- | ------------- | ------------- | ------------- + **addresses** | [**String**](String.md)| Multiaddress of peer to connect to, at least one address must contain the peer id. | + +### Return type + + (empty response body) + +### Authorization + +No authorization required + +### HTTP request headers + + - **Content-Type**: Not defined + - **Accept**: Not defined + +[[Back to top]](#) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to Model list]](../README.md#documentation-for-models) [[Back to README]](../README.md) + +# **** +> (addresses) +Connect to a peer + +### Required Parameters + +Name | Type | Description | Notes +------------- | ------------- | ------------- | ------------- + **addresses** | [**String**](String.md)| Multiaddress of peer to connect to, at least one address must contain the peer id. | + +### Return type + + (empty response body) + +### Authorization + +No authorization required + +### HTTP request headers + + - **Content-Type**: Not defined + - **Accept**: application/json + +[[Back to top]](#) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to Model list]](../README.md#documentation-for-models) [[Back to README]](../README.md) + # **** > models::StreamState (stream_id) Get stream state diff --git a/api-server/examples/client/main.rs b/api-server/examples/client/main.rs index 6f98352c7..e010c6613 100644 --- a/api-server/examples/client/main.rs +++ b/api-server/examples/client/main.rs @@ -10,8 +10,9 @@ use ceramic_api_server::{ FeedEventsOptionsResponse, FeedResumeTokenGetResponse, FeedResumeTokenOptionsResponse, InterestsOptionsResponse, InterestsPostResponse, InterestsSortKeySortValueOptionsResponse, InterestsSortKeySortValuePostResponse, LivenessGetResponse, LivenessOptionsResponse, - StreamsStreamIdGetResponse, StreamsStreamIdOptionsResponse, VersionGetResponse, - VersionOptionsResponse, VersionPostResponse, + PeersGetResponse, PeersOptionsResponse, PeersPostResponse, StreamsStreamIdGetResponse, + StreamsStreamIdOptionsResponse, VersionGetResponse, VersionOptionsResponse, + VersionPostResponse, }; use clap::{App, Arg}; #[allow(unused_imports)] @@ -61,6 +62,9 @@ fn main() { "InterestsSortKeySortValuePost", "LivenessGet", "LivenessOptions", + "PeersGet", + "PeersOptions", + "PeersPost", "StreamsStreamIdGet", "StreamsStreamIdOptions", "VersionGet", @@ -323,6 +327,30 @@ fn main() { (client.context() as &dyn Has).get().clone() ); } + Some("PeersGet") => { + let result = rt.block_on(client.peers_get()); + info!( + "{:?} (X-Span-ID: {:?})", + result, + (client.context() as &dyn Has).get().clone() + ); + } + Some("PeersOptions") => { + let result = rt.block_on(client.peers_options(&Vec::new())); + info!( + "{:?} (X-Span-ID: {:?})", + result, + (client.context() as &dyn Has).get().clone() + ); + } + Some("PeersPost") => { + let result = rt.block_on(client.peers_post(&Vec::new())); + info!( + "{:?} (X-Span-ID: {:?})", + result, + (client.context() as &dyn Has).get().clone() + ); + } Some("StreamsStreamIdGet") => { let result = rt.block_on(client.streams_stream_id_get("stream_id_example".to_string())); info!( diff --git a/api-server/examples/server/server.rs b/api-server/examples/server/server.rs index 5201f70f7..6924d3df4 100644 --- a/api-server/examples/server/server.rs +++ b/api-server/examples/server/server.rs @@ -109,8 +109,9 @@ use ceramic_api_server::{ FeedResumeTokenGetResponse, FeedResumeTokenOptionsResponse, InterestsOptionsResponse, InterestsPostResponse, InterestsSortKeySortValueOptionsResponse, InterestsSortKeySortValuePostResponse, LivenessGetResponse, LivenessOptionsResponse, - StreamsStreamIdGetResponse, StreamsStreamIdOptionsResponse, VersionGetResponse, - VersionOptionsResponse, VersionPostResponse, + PeersGetResponse, PeersOptionsResponse, PeersPostResponse, StreamsStreamIdGetResponse, + StreamsStreamIdOptionsResponse, VersionGetResponse, VersionOptionsResponse, + VersionPostResponse, }; use std::error::Error; use swagger::ApiError; @@ -397,6 +398,40 @@ where Err(ApiError("Generic failure".into())) } + /// Get list of connected peers + async fn peers_get(&self, context: &C) -> Result { + info!("peers_get() - X-Span-ID: {:?}", context.get().0.clone()); + Err(ApiError("Generic failure".into())) + } + + /// cors + async fn peers_options( + &self, + addresses: &Vec, + context: &C, + ) -> Result { + info!( + "peers_options({:?}) - X-Span-ID: {:?}", + addresses, + context.get().0.clone() + ); + Err(ApiError("Generic failure".into())) + } + + /// Connect to a peer + async fn peers_post( + &self, + addresses: &Vec, + context: &C, + ) -> Result { + info!( + "peers_post({:?}) - X-Span-ID: {:?}", + addresses, + context.get().0.clone() + ); + Err(ApiError("Generic failure".into())) + } + /// Get stream state async fn streams_stream_id_get( &self, diff --git a/api-server/src/client/mod.rs b/api-server/src/client/mod.rs index b711d2262..aa9dfe357 100644 --- a/api-server/src/client/mod.rs +++ b/api-server/src/client/mod.rs @@ -50,8 +50,9 @@ use crate::{ FeedResumeTokenGetResponse, FeedResumeTokenOptionsResponse, InterestsOptionsResponse, InterestsPostResponse, InterestsSortKeySortValueOptionsResponse, InterestsSortKeySortValuePostResponse, LivenessGetResponse, LivenessOptionsResponse, - StreamsStreamIdGetResponse, StreamsStreamIdOptionsResponse, VersionGetResponse, - VersionOptionsResponse, VersionPostResponse, + PeersGetResponse, PeersOptionsResponse, PeersPostResponse, StreamsStreamIdGetResponse, + StreamsStreamIdOptionsResponse, VersionGetResponse, VersionOptionsResponse, + VersionPostResponse, }; /// Convert input into a base path, e.g. "http://example:123". Also checks the scheme as it goes. @@ -2406,6 +2407,286 @@ where } } + async fn peers_get(&self, context: &C) -> Result { + let mut client_service = self.client_service.clone(); + let mut uri = format!("{}/ceramic/peers", self.base_path); + + // Query parameters + let query_string = { + let mut query_string = form_urlencoded::Serializer::new("".to_owned()); + query_string.finish() + }; + if !query_string.is_empty() { + uri += "?"; + uri += &query_string; + } + + let uri = match Uri::from_str(&uri) { + Ok(uri) => uri, + Err(err) => return Err(ApiError(format!("Unable to build URI: {}", err))), + }; + + let mut request = match Request::builder() + .method("GET") + .uri(uri) + .body(Body::empty()) + { + Ok(req) => req, + Err(e) => return Err(ApiError(format!("Unable to create request: {}", e))), + }; + + let header = HeaderValue::from_str(Has::::get(context).0.as_str()); + request.headers_mut().insert( + HeaderName::from_static("x-span-id"), + match header { + Ok(h) => h, + Err(e) => { + return Err(ApiError(format!( + "Unable to create X-Span ID header value: {}", + e + ))) + } + }, + ); + + let response = client_service + .call((request, context.clone())) + .map_err(|e| ApiError(format!("No response received: {}", e))) + .await?; + + match response.status().as_u16() { + 200 => { + let body = response.into_body(); + let body = body + .into_raw() + .map_err(|e| ApiError(format!("Failed to read response: {}", e))) + .await?; + let body = str::from_utf8(&body) + .map_err(|e| ApiError(format!("Response was not valid UTF8: {}", e)))?; + let body = serde_json::from_str::(body).map_err(|e| { + ApiError(format!("Response body did not match the schema: {}", e)) + })?; + Ok(PeersGetResponse::Success(body)) + } + 500 => { + let body = response.into_body(); + let body = body + .into_raw() + .map_err(|e| ApiError(format!("Failed to read response: {}", e))) + .await?; + let body = str::from_utf8(&body) + .map_err(|e| ApiError(format!("Response was not valid UTF8: {}", e)))?; + let body = serde_json::from_str::(body).map_err(|e| { + ApiError(format!("Response body did not match the schema: {}", e)) + })?; + Ok(PeersGetResponse::InternalServerError(body)) + } + code => { + let headers = response.headers().clone(); + let body = response.into_body().take(100).into_raw().await; + Err(ApiError(format!( + "Unexpected response code {}:\n{:?}\n\n{}", + code, + headers, + match body { + Ok(body) => match String::from_utf8(body) { + Ok(body) => body, + Err(e) => format!("", e), + }, + Err(e) => format!("", e), + } + ))) + } + } + } + + async fn peers_options( + &self, + param_addresses: &Vec, + context: &C, + ) -> Result { + let mut client_service = self.client_service.clone(); + let mut uri = format!("{}/ceramic/peers", self.base_path); + + // Query parameters + let query_string = { + let mut query_string = form_urlencoded::Serializer::new("".to_owned()); + query_string.append_pair( + "addresses", + ¶m_addresses + .iter() + .map(ToString::to_string) + .collect::>() + .join(","), + ); + query_string.finish() + }; + if !query_string.is_empty() { + uri += "?"; + uri += &query_string; + } + + let uri = match Uri::from_str(&uri) { + Ok(uri) => uri, + Err(err) => return Err(ApiError(format!("Unable to build URI: {}", err))), + }; + + let mut request = match Request::builder() + .method("OPTIONS") + .uri(uri) + .body(Body::empty()) + { + Ok(req) => req, + Err(e) => return Err(ApiError(format!("Unable to create request: {}", e))), + }; + + let header = HeaderValue::from_str(Has::::get(context).0.as_str()); + request.headers_mut().insert( + HeaderName::from_static("x-span-id"), + match header { + Ok(h) => h, + Err(e) => { + return Err(ApiError(format!( + "Unable to create X-Span ID header value: {}", + e + ))) + } + }, + ); + + let response = client_service + .call((request, context.clone())) + .map_err(|e| ApiError(format!("No response received: {}", e))) + .await?; + + match response.status().as_u16() { + 200 => Ok(PeersOptionsResponse::Cors), + code => { + let headers = response.headers().clone(); + let body = response.into_body().take(100).into_raw().await; + Err(ApiError(format!( + "Unexpected response code {}:\n{:?}\n\n{}", + code, + headers, + match body { + Ok(body) => match String::from_utf8(body) { + Ok(body) => body, + Err(e) => format!("", e), + }, + Err(e) => format!("", e), + } + ))) + } + } + } + + async fn peers_post( + &self, + param_addresses: &Vec, + context: &C, + ) -> Result { + let mut client_service = self.client_service.clone(); + let mut uri = format!("{}/ceramic/peers", self.base_path); + + // Query parameters + let query_string = { + let mut query_string = form_urlencoded::Serializer::new("".to_owned()); + query_string.append_pair( + "addresses", + ¶m_addresses + .iter() + .map(ToString::to_string) + .collect::>() + .join(","), + ); + query_string.finish() + }; + if !query_string.is_empty() { + uri += "?"; + uri += &query_string; + } + + let uri = match Uri::from_str(&uri) { + Ok(uri) => uri, + Err(err) => return Err(ApiError(format!("Unable to build URI: {}", err))), + }; + + let mut request = match Request::builder() + .method("POST") + .uri(uri) + .body(Body::empty()) + { + Ok(req) => req, + Err(e) => return Err(ApiError(format!("Unable to create request: {}", e))), + }; + + let header = HeaderValue::from_str(Has::::get(context).0.as_str()); + request.headers_mut().insert( + HeaderName::from_static("x-span-id"), + match header { + Ok(h) => h, + Err(e) => { + return Err(ApiError(format!( + "Unable to create X-Span ID header value: {}", + e + ))) + } + }, + ); + + let response = client_service + .call((request, context.clone())) + .map_err(|e| ApiError(format!("No response received: {}", e))) + .await?; + + match response.status().as_u16() { + 204 => Ok(PeersPostResponse::Success), + 400 => { + let body = response.into_body(); + let body = body + .into_raw() + .map_err(|e| ApiError(format!("Failed to read response: {}", e))) + .await?; + let body = str::from_utf8(&body) + .map_err(|e| ApiError(format!("Response was not valid UTF8: {}", e)))?; + let body = + serde_json::from_str::(body).map_err(|e| { + ApiError(format!("Response body did not match the schema: {}", e)) + })?; + Ok(PeersPostResponse::BadRequest(body)) + } + 500 => { + let body = response.into_body(); + let body = body + .into_raw() + .map_err(|e| ApiError(format!("Failed to read response: {}", e))) + .await?; + let body = str::from_utf8(&body) + .map_err(|e| ApiError(format!("Response was not valid UTF8: {}", e)))?; + let body = serde_json::from_str::(body).map_err(|e| { + ApiError(format!("Response body did not match the schema: {}", e)) + })?; + Ok(PeersPostResponse::InternalServerError(body)) + } + code => { + let headers = response.headers().clone(); + let body = response.into_body().take(100).into_raw().await; + Err(ApiError(format!( + "Unexpected response code {}:\n{:?}\n\n{}", + code, + headers, + match body { + Ok(body) => match String::from_utf8(body) { + Ok(body) => body, + Err(e) => format!("", e), + }, + Err(e) => format!("", e), + } + ))) + } + } + } + async fn streams_stream_id_get( &self, param_stream_id: String, diff --git a/api-server/src/lib.rs b/api-server/src/lib.rs index 970af735e..f7c9629ae 100644 --- a/api-server/src/lib.rs +++ b/api-server/src/lib.rs @@ -205,6 +205,32 @@ pub enum LivenessOptionsResponse { Cors, } +#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[must_use] +pub enum PeersGetResponse { + /// success + Success(models::Peers), + /// Internal server error + InternalServerError(models::ErrorResponse), +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub enum PeersOptionsResponse { + /// cors + Cors, +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[must_use] +pub enum PeersPostResponse { + /// success + Success, + /// bad request + BadRequest(models::BadRequestResponse), + /// Internal server error + InternalServerError(models::ErrorResponse), +} + #[derive(Debug, PartialEq, Serialize, Deserialize)] #[must_use] pub enum StreamsStreamIdGetResponse { @@ -391,6 +417,23 @@ pub trait Api { /// cors async fn liveness_options(&self, context: &C) -> Result; + /// Get list of connected peers + async fn peers_get(&self, context: &C) -> Result; + + /// cors + async fn peers_options( + &self, + addresses: &Vec, + context: &C, + ) -> Result; + + /// Connect to a peer + async fn peers_post( + &self, + addresses: &Vec, + context: &C, + ) -> Result; + /// Get stream state async fn streams_stream_id_get( &self, @@ -537,6 +580,18 @@ pub trait ApiNoContext { /// cors async fn liveness_options(&self) -> Result; + /// Get list of connected peers + async fn peers_get(&self) -> Result; + + /// cors + async fn peers_options( + &self, + addresses: &Vec, + ) -> Result; + + /// Connect to a peer + async fn peers_post(&self, addresses: &Vec) -> Result; + /// Get stream state async fn streams_stream_id_get( &self, @@ -779,6 +834,27 @@ impl + Send + Sync, C: Clone + Send + Sync> ApiNoContext for Contex self.api().liveness_options(&context).await } + /// Get list of connected peers + async fn peers_get(&self) -> Result { + let context = self.context().clone(); + self.api().peers_get(&context).await + } + + /// cors + async fn peers_options( + &self, + addresses: &Vec, + ) -> Result { + let context = self.context().clone(); + self.api().peers_options(addresses, &context).await + } + + /// Connect to a peer + async fn peers_post(&self, addresses: &Vec) -> Result { + let context = self.context().clone(); + self.api().peers_post(addresses, &context).await + } + /// Get stream state async fn streams_stream_id_get( &self, diff --git a/api-server/src/models.rs b/api-server/src/models.rs index 0836e5ee7..01b6467f6 100644 --- a/api-server/src/models.rs +++ b/api-server/src/models.rs @@ -1612,6 +1612,294 @@ impl std::convert::TryFrom for header::IntoHeaderVal } } +/// Information about a connected peer +#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize, validator::Validate)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct Peer { + /// DID of peer + #[serde(rename = "id")] + pub id: String, + + /// List of known multiaddress of peer, will always include the peer id + #[serde(rename = "addresses")] + pub addresses: Vec, +} + +impl Peer { + #[allow(clippy::new_without_default)] + pub fn new(id: String, addresses: Vec) -> Peer { + Peer { id, addresses } + } +} + +/// Converts the Peer value to the Query Parameters representation (style=form, explode=false) +/// specified in https://swagger.io/docs/specification/serialization/ +/// Should be implemented in a serde serializer +impl std::string::ToString for Peer { + fn to_string(&self) -> String { + let params: Vec> = vec![ + Some("id".to_string()), + Some(self.id.to_string()), + Some("addresses".to_string()), + Some( + self.addresses + .iter() + .map(|x| x.to_string()) + .collect::>() + .join(","), + ), + ]; + + params.into_iter().flatten().collect::>().join(",") + } +} + +/// Converts Query Parameters representation (style=form, explode=false) to a Peer value +/// as specified in https://swagger.io/docs/specification/serialization/ +/// Should be implemented in a serde deserializer +impl std::str::FromStr for Peer { + type Err = String; + + fn from_str(s: &str) -> std::result::Result { + /// An intermediate representation of the struct to use for parsing. + #[derive(Default)] + #[allow(dead_code)] + struct IntermediateRep { + pub id: Vec, + pub addresses: Vec>, + } + + let mut intermediate_rep = IntermediateRep::default(); + + // Parse into intermediate representation + let mut string_iter = s.split(','); + let mut key_result = string_iter.next(); + + while key_result.is_some() { + let val = match string_iter.next() { + Some(x) => x, + None => { + return std::result::Result::Err("Missing value while parsing Peer".to_string()) + } + }; + + if let Some(key) = key_result { + #[allow(clippy::match_single_binding)] + match key { + #[allow(clippy::redundant_clone)] + "id" => intermediate_rep.id.push( + ::from_str(val).map_err(|x| x.to_string())?, + ), + "addresses" => { + return std::result::Result::Err( + "Parsing a container in this style is not supported in Peer" + .to_string(), + ) + } + _ => { + return std::result::Result::Err( + "Unexpected key while parsing Peer".to_string(), + ) + } + } + } + + // Get the next key + key_result = string_iter.next(); + } + + // Use the intermediate representation to return the struct + std::result::Result::Ok(Peer { + id: intermediate_rep + .id + .into_iter() + .next() + .ok_or_else(|| "id missing in Peer".to_string())?, + addresses: intermediate_rep + .addresses + .into_iter() + .next() + .ok_or_else(|| "addresses missing in Peer".to_string())?, + }) + } +} + +// Methods for converting between header::IntoHeaderValue and hyper::header::HeaderValue + +#[cfg(any(feature = "client", feature = "server"))] +impl std::convert::TryFrom> for hyper::header::HeaderValue { + type Error = String; + + fn try_from( + hdr_value: header::IntoHeaderValue, + ) -> std::result::Result { + let hdr_value = hdr_value.to_string(); + match hyper::header::HeaderValue::from_str(&hdr_value) { + std::result::Result::Ok(value) => std::result::Result::Ok(value), + std::result::Result::Err(e) => std::result::Result::Err(format!( + "Invalid header value for Peer - value: {} is invalid {}", + hdr_value, e + )), + } + } +} + +#[cfg(any(feature = "client", feature = "server"))] +impl std::convert::TryFrom for header::IntoHeaderValue { + type Error = String; + + fn try_from(hdr_value: hyper::header::HeaderValue) -> std::result::Result { + match hdr_value.to_str() { + std::result::Result::Ok(value) => match ::from_str(value) { + std::result::Result::Ok(value) => { + std::result::Result::Ok(header::IntoHeaderValue(value)) + } + std::result::Result::Err(err) => std::result::Result::Err(format!( + "Unable to convert header value '{}' into Peer - {}", + value, err + )), + }, + std::result::Result::Err(e) => std::result::Result::Err(format!( + "Unable to convert header: {:?} to string: {}", + hdr_value, e + )), + } + } +} + +#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize, validator::Validate)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct Peers { + #[serde(rename = "peers")] + pub peers: Vec, +} + +impl Peers { + #[allow(clippy::new_without_default)] + pub fn new(peers: Vec) -> Peers { + Peers { peers } + } +} + +/// Converts the Peers value to the Query Parameters representation (style=form, explode=false) +/// specified in https://swagger.io/docs/specification/serialization/ +/// Should be implemented in a serde serializer +impl std::string::ToString for Peers { + fn to_string(&self) -> String { + let params: Vec> = vec![ + // Skipping peers in query parameter serialization + + ]; + + params.into_iter().flatten().collect::>().join(",") + } +} + +/// Converts Query Parameters representation (style=form, explode=false) to a Peers value +/// as specified in https://swagger.io/docs/specification/serialization/ +/// Should be implemented in a serde deserializer +impl std::str::FromStr for Peers { + type Err = String; + + fn from_str(s: &str) -> std::result::Result { + /// An intermediate representation of the struct to use for parsing. + #[derive(Default)] + #[allow(dead_code)] + struct IntermediateRep { + pub peers: Vec>, + } + + let mut intermediate_rep = IntermediateRep::default(); + + // Parse into intermediate representation + let mut string_iter = s.split(','); + let mut key_result = string_iter.next(); + + while key_result.is_some() { + let val = match string_iter.next() { + Some(x) => x, + None => { + return std::result::Result::Err( + "Missing value while parsing Peers".to_string(), + ) + } + }; + + if let Some(key) = key_result { + #[allow(clippy::match_single_binding)] + match key { + "peers" => { + return std::result::Result::Err( + "Parsing a container in this style is not supported in Peers" + .to_string(), + ) + } + _ => { + return std::result::Result::Err( + "Unexpected key while parsing Peers".to_string(), + ) + } + } + } + + // Get the next key + key_result = string_iter.next(); + } + + // Use the intermediate representation to return the struct + std::result::Result::Ok(Peers { + peers: intermediate_rep + .peers + .into_iter() + .next() + .ok_or_else(|| "peers missing in Peers".to_string())?, + }) + } +} + +// Methods for converting between header::IntoHeaderValue and hyper::header::HeaderValue + +#[cfg(any(feature = "client", feature = "server"))] +impl std::convert::TryFrom> for hyper::header::HeaderValue { + type Error = String; + + fn try_from( + hdr_value: header::IntoHeaderValue, + ) -> std::result::Result { + let hdr_value = hdr_value.to_string(); + match hyper::header::HeaderValue::from_str(&hdr_value) { + std::result::Result::Ok(value) => std::result::Result::Ok(value), + std::result::Result::Err(e) => std::result::Result::Err(format!( + "Invalid header value for Peers - value: {} is invalid {}", + hdr_value, e + )), + } + } +} + +#[cfg(any(feature = "client", feature = "server"))] +impl std::convert::TryFrom for header::IntoHeaderValue { + type Error = String; + + fn try_from(hdr_value: hyper::header::HeaderValue) -> std::result::Result { + match hdr_value.to_str() { + std::result::Result::Ok(value) => match ::from_str(value) { + std::result::Result::Ok(value) => { + std::result::Result::Ok(header::IntoHeaderValue(value)) + } + std::result::Result::Err(err) => std::result::Result::Err(format!( + "Unable to convert header value '{}' into Peers - {}", + value, err + )), + }, + std::result::Result::Err(e) => std::result::Result::Err(format!( + "Unable to convert header: {:?} to string: {}", + hdr_value, e + )), + } + } +} + /// The state of a Ceramic stream as defined by the stream type aggregation and conflict resolution rules. #[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize, validator::Validate)] #[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] diff --git a/api-server/src/server/mod.rs b/api-server/src/server/mod.rs index 61d129f09..1bb831c44 100644 --- a/api-server/src/server/mod.rs +++ b/api-server/src/server/mod.rs @@ -31,8 +31,9 @@ use crate::{ FeedResumeTokenGetResponse, FeedResumeTokenOptionsResponse, InterestsOptionsResponse, InterestsPostResponse, InterestsSortKeySortValueOptionsResponse, InterestsSortKeySortValuePostResponse, LivenessGetResponse, LivenessOptionsResponse, - StreamsStreamIdGetResponse, StreamsStreamIdOptionsResponse, VersionGetResponse, - VersionOptionsResponse, VersionPostResponse, + PeersGetResponse, PeersOptionsResponse, PeersPostResponse, StreamsStreamIdGetResponse, + StreamsStreamIdOptionsResponse, VersionGetResponse, VersionOptionsResponse, + VersionPostResponse, }; mod paths { @@ -51,6 +52,7 @@ mod paths { r"^/ceramic/interests$", r"^/ceramic/interests/(?P[^/?#]*)/(?P[^/?#]*)$", r"^/ceramic/liveness$", + r"^/ceramic/peers$", r"^/ceramic/streams/(?P[^/?#]*)$", r"^/ceramic/version$" ]) @@ -89,14 +91,15 @@ mod paths { .expect("Unable to create regex for INTERESTS_SORT_KEY_SORT_VALUE"); } pub(crate) static ID_LIVENESS: usize = 10; - pub(crate) static ID_STREAMS_STREAM_ID: usize = 11; + pub(crate) static ID_PEERS: usize = 11; + pub(crate) static ID_STREAMS_STREAM_ID: usize = 12; lazy_static! { pub static ref REGEX_STREAMS_STREAM_ID: regex::Regex = #[allow(clippy::invalid_regex)] regex::Regex::new(r"^/ceramic/streams/(?P[^/?#]*)$") .expect("Unable to create regex for STREAMS_STREAM_ID"); } - pub(crate) static ID_VERSION: usize = 12; + pub(crate) static ID_VERSION: usize = 13; } pub struct MakeService @@ -1767,6 +1770,174 @@ where Ok(response) } + // PeersGet - GET /peers + hyper::Method::GET if path.matched(paths::ID_PEERS) => { + let result = api_impl.peers_get(&context).await; + let mut response = Response::new(Body::empty()); + response.headers_mut().insert( + HeaderName::from_static("x-span-id"), + HeaderValue::from_str( + (&context as &dyn Has) + .get() + .0 + .clone() + .as_str(), + ) + .expect("Unable to create X-Span-ID header value"), + ); + + match result { + Ok(rsp) => match rsp { + PeersGetResponse::Success(body) => { + *response.status_mut() = StatusCode::from_u16(200) + .expect("Unable to turn 200 into a StatusCode"); + response.headers_mut().insert( + CONTENT_TYPE, + HeaderValue::from_str("application/json") + .expect("Unable to create Content-Type header for PEERS_GET_SUCCESS")); + let body_content = serde_json::to_string(&body) + .expect("impossible to fail to serialize"); + *response.body_mut() = Body::from(body_content); + } + PeersGetResponse::InternalServerError(body) => { + *response.status_mut() = StatusCode::from_u16(500) + .expect("Unable to turn 500 into a StatusCode"); + response.headers_mut().insert( + CONTENT_TYPE, + HeaderValue::from_str("application/json") + .expect("Unable to create Content-Type header for PEERS_GET_INTERNAL_SERVER_ERROR")); + let body_content = serde_json::to_string(&body) + .expect("impossible to fail to serialize"); + *response.body_mut() = Body::from(body_content); + } + }, + Err(_) => { + // Application code returned an error. This should not happen, as the implementation should + // return a valid response. + *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + *response.body_mut() = Body::from("An internal error occurred"); + } + } + + Ok(response) + } + + // PeersOptions - OPTIONS /peers + hyper::Method::OPTIONS if path.matched(paths::ID_PEERS) => { + // Query parameters (note that non-required or collection query parameters will ignore garbage values, rather than causing a 400 response) + let query_params = + form_urlencoded::parse(uri.query().unwrap_or_default().as_bytes()) + .collect::>(); + let param_addresses = query_params + .iter() + .filter(|e| e.0 == "addresses") + .map(|e| e.1.clone()) + .filter_map(|param_addresses| param_addresses.parse().ok()) + .collect::>(); + + let result = api_impl + .peers_options(param_addresses.as_ref(), &context) + .await; + let mut response = Response::new(Body::empty()); + response.headers_mut().insert( + HeaderName::from_static("x-span-id"), + HeaderValue::from_str( + (&context as &dyn Has) + .get() + .0 + .clone() + .as_str(), + ) + .expect("Unable to create X-Span-ID header value"), + ); + + match result { + Ok(rsp) => match rsp { + PeersOptionsResponse::Cors => { + *response.status_mut() = StatusCode::from_u16(200) + .expect("Unable to turn 200 into a StatusCode"); + } + }, + Err(_) => { + // Application code returned an error. This should not happen, as the implementation should + // return a valid response. + *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + *response.body_mut() = Body::from("An internal error occurred"); + } + } + + Ok(response) + } + + // PeersPost - POST /peers + hyper::Method::POST if path.matched(paths::ID_PEERS) => { + // Query parameters (note that non-required or collection query parameters will ignore garbage values, rather than causing a 400 response) + let query_params = + form_urlencoded::parse(uri.query().unwrap_or_default().as_bytes()) + .collect::>(); + let param_addresses = query_params + .iter() + .filter(|e| e.0 == "addresses") + .map(|e| e.1.clone()) + .filter_map(|param_addresses| param_addresses.parse().ok()) + .collect::>(); + + let result = api_impl + .peers_post(param_addresses.as_ref(), &context) + .await; + let mut response = Response::new(Body::empty()); + response.headers_mut().insert( + HeaderName::from_static("x-span-id"), + HeaderValue::from_str( + (&context as &dyn Has) + .get() + .0 + .clone() + .as_str(), + ) + .expect("Unable to create X-Span-ID header value"), + ); + + match result { + Ok(rsp) => match rsp { + PeersPostResponse::Success => { + *response.status_mut() = StatusCode::from_u16(204) + .expect("Unable to turn 204 into a StatusCode"); + } + PeersPostResponse::BadRequest(body) => { + *response.status_mut() = StatusCode::from_u16(400) + .expect("Unable to turn 400 into a StatusCode"); + response.headers_mut().insert( + CONTENT_TYPE, + HeaderValue::from_str("application/json") + .expect("Unable to create Content-Type header for PEERS_POST_BAD_REQUEST")); + let body_content = serde_json::to_string(&body) + .expect("impossible to fail to serialize"); + *response.body_mut() = Body::from(body_content); + } + PeersPostResponse::InternalServerError(body) => { + *response.status_mut() = StatusCode::from_u16(500) + .expect("Unable to turn 500 into a StatusCode"); + response.headers_mut().insert( + CONTENT_TYPE, + HeaderValue::from_str("application/json") + .expect("Unable to create Content-Type header for PEERS_POST_INTERNAL_SERVER_ERROR")); + let body_content = serde_json::to_string(&body) + .expect("impossible to fail to serialize"); + *response.body_mut() = Body::from(body_content); + } + }, + Err(_) => { + // Application code returned an error. This should not happen, as the implementation should + // return a valid response. + *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + *response.body_mut() = Body::from("An internal error occurred"); + } + } + + Ok(response) + } + // StreamsStreamIdGet - GET /streams/{stream_id} hyper::Method::GET if path.matched(paths::ID_STREAMS_STREAM_ID) => { // Path parameters @@ -2075,6 +2246,7 @@ where _ if path.matched(paths::ID_INTERESTS) => method_not_allowed(), _ if path.matched(paths::ID_INTERESTS_SORT_KEY_SORT_VALUE) => method_not_allowed(), _ if path.matched(paths::ID_LIVENESS) => method_not_allowed(), + _ if path.matched(paths::ID_PEERS) => method_not_allowed(), _ if path.matched(paths::ID_STREAMS_STREAM_ID) => method_not_allowed(), _ if path.matched(paths::ID_VERSION) => method_not_allowed(), _ => Ok(Response::builder() @@ -2165,6 +2337,12 @@ impl RequestParser for ApiRequestParser { hyper::Method::GET if path.matched(paths::ID_LIVENESS) => Some("LivenessGet"), // LivenessOptions - OPTIONS /liveness hyper::Method::OPTIONS if path.matched(paths::ID_LIVENESS) => Some("LivenessOptions"), + // PeersGet - GET /peers + hyper::Method::GET if path.matched(paths::ID_PEERS) => Some("PeersGet"), + // PeersOptions - OPTIONS /peers + hyper::Method::OPTIONS if path.matched(paths::ID_PEERS) => Some("PeersOptions"), + // PeersPost - POST /peers + hyper::Method::POST if path.matched(paths::ID_PEERS) => Some("PeersPost"), // StreamsStreamIdGet - GET /streams/{stream_id} hyper::Method::GET if path.matched(paths::ID_STREAMS_STREAM_ID) => { Some("StreamsStreamIdGet") diff --git a/api/Cargo.toml b/api/Cargo.toml index dcc11a614..d95b690d8 100644 --- a/api/Cargo.toml +++ b/api/Cargo.toml @@ -21,6 +21,7 @@ futures.workspace = true ipld-core.workspace = true ceramic-car.workspace = true multibase.workspace = true +multiaddr.workspace = true recon.workspace = true serde.workspace = true serde_ipld_dagcbor.workspace = true diff --git a/api/ceramic.yaml b/api/ceramic.yaml index 22f47441e..c22721045 100644 --- a/api/ceramic.yaml +++ b/api/ceramic.yaml @@ -519,6 +519,62 @@ paths: application/json: schema: $ref: "#/components/schemas/ErrorResponse" + "/peers": + options: + summary: cors + parameters: + - name: addresses + in: query + description: Multiaddress of peer to connect to, at least one address must contain the peer id. + schema: + type: array + items: + type: string + required: true + responses: + "200": + description: cors + get: + summary: Get list of connected peers + responses: + "200": + description: success + content: + application/json: + schema: + $ref: "#/components/schemas/Peers" + "500": + description: Internal server error + content: + application/json: + schema: + $ref: "#/components/schemas/ErrorResponse" + post: + summary: Connect to a peer + parameters: + - name: addresses + in: query + description: Multiaddress of peer to connect to, at least one address must contain the peer id. + schema: + type: array + items: + type: string + required: true + responses: + "204": + description: success + "400": + description: bad request + content: + application/json: + schema: + $ref: "#/components/schemas/BadRequestResponse" + "500": + description: Internal server error + content: + application/json: + schema: + $ref: "#/components/schemas/ErrorResponse" components: requestBodies: EventData: @@ -705,3 +761,31 @@ components: data: type: string description: Multibase encoding of the data of the stream. Content is stream type specific. + Peers: + title: List of Peers + type: object + required: + - peers + properties: + peers: + type: array + items: + schema: + $ref: "#/components/schemas/Peer" + Peer: + title: Information about a connected peer + description: Information about a connected peer + type: object + required: + - id + - addresses + properties: + id: + type: string + description: DID of peer + addresses: + type: array + description: List of known multiaddress of peer, will always include the peer id + items: + type: string + description: Multiaddress where peer may be dialed diff --git a/api/src/lib.rs b/api/src/lib.rs index 3be76df8a..2a3fa1fb5 100644 --- a/api/src/lib.rs +++ b/api/src/lib.rs @@ -5,7 +5,7 @@ pub use resume_token::ResumeToken; pub use server::{ ApiItem, EventDataResult, EventInsertResult, EventService, IncludeEventData, InterestService, - Server, + Multiaddr, P2PService, PeerInfo, Server, }; #[cfg(test)] diff --git a/api/src/server.rs b/api/src/server.rs index 0de34ece7..1f5d4a734 100644 --- a/api/src/server.rs +++ b/api/src/server.rs @@ -6,7 +6,7 @@ mod event; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::time::Duration; use std::{future::Future, ops::Range}; use std::{marker::PhantomData, ops::RangeBounds}; @@ -22,7 +22,7 @@ use std::{ use anyhow::Result; use async_trait::async_trait; -use ceramic_api_server::models::{BadRequestResponse, ErrorResponse, EventData}; +use ceramic_api_server::models::{BadRequestResponse, ErrorResponse, EventData, Peer, Peers}; use ceramic_api_server::{ models::{self, Event}, DebugHeapGetResponse, EventsEventIdGetResponse, EventsPostResponse, @@ -32,9 +32,11 @@ use ceramic_api_server::{ use ceramic_api_server::{ Api, ConfigNetworkGetResponse, ExperimentalEventsSepSepValueGetResponse, ExperimentalInterestsGetResponse, FeedEventsGetResponse, FeedResumeTokenGetResponse, - InterestsPostResponse, + InterestsPostResponse, PeersGetResponse, PeersOptionsResponse, PeersPostResponse, +}; +use ceramic_core::{ + ensure_multiaddr_has_p2p, Cid, EventId, Interest, Network, NodeId, PeerId, StreamId, }; -use ceramic_core::{Cid, EventId, Interest, Network, NodeId, PeerId, StreamId}; use ceramic_pipeline::EVENT_STATES_TABLE; use datafusion::arrow::array::{ as_dictionary_array, as_map_array, Array as _, ArrayAccessor as _, BinaryArray, @@ -48,16 +50,19 @@ use datafusion::functions_aggregate::expr_fn::last_value; use datafusion::logical_expr::expr::WindowFunction; use datafusion::logical_expr::{col, lit, BuiltInWindowFunction, Expr, ExprFunctionExt}; use futures::TryFutureExt; +use multiaddr::Protocol; use recon::Key; use swagger::{ApiError, ByteArray}; #[cfg(not(target_env = "msvc"))] use tikv_jemalloc_ctl::epoch; use tokio::sync::broadcast; -use tracing::{instrument, Level}; +use tracing::{instrument, trace, Level}; use crate::server::event::event_id_from_car; use crate::ResumeToken; +pub use multiaddr::Multiaddr; + /// How many events to try to process at once i.e. read from the channel in batches. const EVENTS_TO_RECEIVE: usize = 10; /// When the incoming events queue has at least this many items, we'll store them. @@ -145,7 +150,7 @@ impl TryFrom for ValidatedInterest { } } -/// Trait for accessing persistent storage of Interests +/// InterestService must provide access to interests #[async_trait] pub trait InterestService: Send + Sync { /// Returns true if the key was newly inserted, false if it already existed. @@ -164,6 +169,28 @@ impl InterestService for Arc { } } +#[async_trait] +pub trait P2PService: Send + Sync { + async fn peers(&self) -> Result>; + async fn peer_connect(&self, addrs: &[Multiaddr]) -> Result<()>; +} + +#[async_trait] +impl P2PService for Arc { + async fn peers(&self) -> Result> { + self.as_ref().peers().await + } + async fn peer_connect(&self, addrs: &[Multiaddr]) -> Result<()> { + self.as_ref().peer_connect(addrs).await + } +} + +/// Information about connected peers +pub struct PeerInfo { + pub id: NodeId, + pub addresses: Vec, +} + #[derive(Debug, Clone)] pub enum EventInsertResult { Success(EventId), @@ -346,11 +373,12 @@ struct InsertTask { } #[derive(Clone)] -pub struct Server { +pub struct Server { node_id: NodeId, network: Network, interest: I, model: Arc, + p2p: P, // If we need to restart this ever, we'll need a mutex. For now we want to avoid locking the channel // so we just keep track to gracefully shutdown, but if the task dies, the server is in a fatal error state. insert_task: Arc, @@ -360,16 +388,18 @@ pub struct Server { pipeline: Option, } -impl Server +impl Server where I: InterestService, M: EventService + 'static, + P: P2PService, { pub fn new( node_id: NodeId, network: Network, interest: I, model: Arc, + p2p: P, pipeline: Option, shutdown_signal: broadcast::Receiver<()>, ) -> Self { @@ -386,6 +416,7 @@ where network, interest, model, + p2p, insert_task, marker: PhantomData, authentication: false, @@ -457,7 +488,7 @@ where } async fn process_events(events: &mut Vec, event_store: &Arc, node_id: NodeId) { - tracing::debug!(count = events.len(), "process_events"); + trace!(count = events.len(), "process_events"); if events.is_empty() { return; } @@ -969,6 +1000,38 @@ where }, )) } + async fn get_peers(&self) -> Result { + let peers = + self.p2p.peers().await.map_err(|err| { + ErrorResponse::new(format!("failed to get peer information: {err}")) + })?; + Ok(PeersGetResponse::Success(Peers { + peers: peers + .into_iter() + .map(|peer| { + let peer_id = peer.id.peer_id(); + Peer { + id: peer.id.did_key(), + addresses: peer + .addresses + .into_iter() + .map(|addr| ensure_multiaddr_has_p2p(addr, peer_id).to_string()) + .collect(), + } + }) + .collect(), + })) + } + async fn peer_connect( + &self, + addrs: Vec, + ) -> Result { + self.p2p + .peer_connect(&addrs) + .await + .map_err(|err| ErrorResponse::new(format!("failed to get peer information: {err}")))?; + Ok(PeersPostResponse::Success) + } } pub(crate) fn decode_event_id(value: &str) -> Result { @@ -998,11 +1061,12 @@ pub(crate) fn decode_multibase_data(value: &str) -> Result, BadRequestRe } #[async_trait] -impl Api for Server +impl Api for Server where C: Send + Sync, I: InterestService + Sync, M: EventService + Sync + 'static, + P: P2PService, { #[instrument(skip(self, _context), ret(level = Level::DEBUG), err(level = Level::ERROR))] async fn liveness_get( @@ -1080,6 +1144,7 @@ where .or_else(|err| Ok(FeedResumeTokenGetResponse::InternalServerError(err))) } + #[instrument(skip(self, _context), ret(level = Level::DEBUG), err(level = Level::ERROR))] async fn experimental_interests_get( &self, peer_id: Option, @@ -1178,6 +1243,7 @@ where })) } + #[instrument(skip(self, _context), ret(level = Level::DEBUG), err(level = Level::ERROR))] async fn streams_stream_id_get( &self, stream_id: String, @@ -1208,6 +1274,57 @@ where } } + #[instrument(skip(self, _context), ret(level = Level::DEBUG), err(level = Level::ERROR))] + async fn peers_get(&self, _context: &C) -> Result { + self.get_peers() + .await + .or_else(|err| Ok(PeersGetResponse::InternalServerError(err))) + } + + #[instrument(skip(self, _context), ret(level = Level::DEBUG), err(level = Level::ERROR))] + async fn peers_post( + &self, + addresses: &Vec, + _context: &C, + ) -> Result { + let mut addrs: Vec = Vec::new(); + for address in addresses { + match address.parse() { + Ok(a) => addrs.push(a), + Err(err) => { + return Ok(PeersPostResponse::BadRequest(BadRequestResponse::new( + format!("address is not a well formed multiaddr: {err}"), + ))) + } + } + } + let peer_ids: HashSet<_> = addrs + .iter() + .flat_map(|addr| { + addr.iter().filter_map(|protocol| { + if let Protocol::P2p(peer_id) = protocol { + Some(peer_id) + } else { + None + } + }) + }) + .collect(); + if peer_ids.is_empty() { + return Ok(PeersPostResponse::BadRequest(BadRequestResponse::new( + "at least one address must contain a peer id".to_string(), + ))); + }; + if peer_ids.len() > 1 { + return Ok(PeersPostResponse::BadRequest(BadRequestResponse::new( + "more than one unique peer id found in the addresses".to_string(), + ))); + }; + self.peer_connect(addrs) + .await + .or_else(|err| Ok(PeersPostResponse::InternalServerError(err))) + } + /// cors async fn config_network_options( &self, @@ -1293,8 +1410,14 @@ where ) -> Result { Ok(ceramic_api_server::InterestsSortKeySortValueOptionsResponse::Cors) } - - /// Test the liveness of the Ceramic node + /// cors + async fn peers_options( + &self, + _addresses: &Vec, + _context: &C, + ) -> Result { + Ok(PeersOptionsResponse::Cors) + } /// cors async fn liveness_options( @@ -1312,6 +1435,7 @@ where Ok(ceramic_api_server::VersionOptionsResponse::Cors) } + /// cors async fn streams_stream_id_options( &self, _stream_id: String, diff --git a/api/src/tests.rs b/api/src/tests.rs index 3ca80ed6f..f145a27b2 100644 --- a/api/src/tests.rs +++ b/api/src/tests.rs @@ -2,9 +2,10 @@ use std::{ops::Range, str::FromStr, sync::Arc}; -use crate::server::{decode_multibase_data, BuildResponse, Server}; +use crate::server::{decode_multibase_data, BuildResponse, P2PService, Server}; use crate::{ ApiItem, EventDataResult, EventInsertResult, EventService, IncludeEventData, InterestService, + PeerInfo, }; use anyhow::Result; @@ -28,6 +29,7 @@ use datafusion::execution::config::SessionConfig; use datafusion::execution::context::SessionContext; use expect_test::expect; use mockall::{mock, predicate}; +use multiaddr::Multiaddr; use multibase::Base; use recon::Key; use test_log::test; @@ -144,6 +146,15 @@ mock! { } } +mock! { + pub P2PService {} + #[async_trait] + impl crate::P2PService for P2PService { + async fn peers(&self) -> Result>; + async fn peer_connect(&self, addrs: &[Multiaddr]) -> Result<()>; + } +} + /// Given a mock of the EventStore, prepare it to expect calls to load the init event. pub fn mock_get_init_event(mock_store: &mut MockEventStoreTest) { // Expect two get_block calls @@ -178,19 +189,21 @@ pub fn mock_get_unsigned_init_event(mock_store: &mut MockEventStoreTest) { } /// Wrapper around server initialization that handles creating the shutdown handler -fn create_test_server( +fn create_test_server( node_id: NodeId, network: Network, interest: I, model: Arc, + p2p: P, pipeline: Option, -) -> Server +) -> Server where I: InterestService, M: EventService + 'static, + P: P2PService, { let (_, rx) = tokio::sync::broadcast::channel(1); - Server::new(node_id, network, interest, model, pipeline, rx) + Server::new(node_id, network, interest, model, p2p, pipeline, rx) } #[test(tokio::test)] @@ -227,6 +240,7 @@ async fn create_event() { network, mock_interest, Arc::new(mock_event_store), + MockP2PService::new(), None, ); let resp = server @@ -276,6 +290,7 @@ async fn create_event_twice() { network, mock_interest, Arc::new(mock_event_store), + MockP2PService::new(), None, ); let (resp1, resp2) = join!( @@ -334,6 +349,7 @@ async fn create_event_fails() { network, mock_interest, Arc::new(mock_event_store), + MockP2PService::new(), None, ); let resp = server @@ -390,6 +406,7 @@ async fn register_interest_sort_value() { network, mock_interest, Arc::new(mock_event_store), + MockP2PService::new(), None, ); let interest = models::Interest { @@ -418,6 +435,7 @@ async fn register_interest_sort_value_bad_request() { network, mock_interest, Arc::new(mock_event_store), + MockP2PService::new(), None, ); let interest = models::Interest { @@ -473,6 +491,7 @@ async fn register_interest_sort_value_controller() { network, mock_interest, Arc::new(mock_event_store), + MockP2PService::new(), None, ); let resp = server @@ -531,6 +550,7 @@ async fn register_interest_value_controller_stream() { network, mock_interest, Arc::new(mock_event_store), + MockP2PService::new(), None, ); let resp = server @@ -603,6 +623,7 @@ async fn get_interests() { network, mock_interest, Arc::new(mock_event_store), + MockP2PService::new(), None, ); let resp = server @@ -695,6 +716,7 @@ async fn get_interests_for_peer() { network, mock_interest, Arc::new(mock_event_store), + MockP2PService::new(), None, ); let resp = server @@ -762,6 +784,7 @@ async fn get_events_for_interest_range() { network, mock_interest, Arc::new(mock_event_store), + MockP2PService::new(), None, ); let resp = server @@ -817,6 +840,7 @@ async fn events_event_id_get_by_event_id_success() { network, mock_interest, Arc::new(mock_event_store), + MockP2PService::new(), None, ); let result = server.events_event_id_get(event_id_str, &Context).await; @@ -850,6 +874,7 @@ async fn events_event_id_get_by_cid_success() { network, mock_interest, Arc::new(mock_event_store), + MockP2PService::new(), None, ); let result = server @@ -865,6 +890,170 @@ async fn events_event_id_get_by_cid_success() { assert_eq!(event.data.unwrap(), event_data_base64); } +#[test(tokio::test)] +async fn peers() { + let node_id = NodeKey::random().id(); + let network = Network::InMemory; + let mut mock_p2p_svc = MockP2PService::new(); + mock_p2p_svc.expect_peers().once().returning(|| { + Ok(vec![ + PeerInfo { + id: NodeId::try_from_did_key( + "did:key:z6MktxbrtQY3yx8Wue2hNS1eA3mEXXVb5n8FDL6a7bHkdZqJ", + ) + .unwrap(), + addresses: vec!["/ip4/127.0.0.1/tcp/4111", "/ip4/127.0.0.1/udp/4111/quic-v1"] + .into_iter() + .map(Multiaddr::from_str) + .collect::, _>>() + .unwrap(), + }, + PeerInfo { + id: NodeId::try_from_did_key( + "did:key:z6Mkr5oAhjqJc2sedxmaiUQUdQGkdtNkDNpmc9M1gxopvkfP", + ) + .unwrap(), + addresses: vec!["/ip4/127.0.0.1/tcp/4112"] + .into_iter() + .map(Multiaddr::from_str) + .collect::, _>>() + .unwrap(), + }, + ]) + }); + let server = create_test_server( + node_id, + network, + MockAccessInterestStoreTest::new(), + Arc::new(MockEventStoreTest::new()), + mock_p2p_svc, + None, + ); + let peers = server.peers_get(&Context).await.unwrap(); + expect![[r#" + Success( + Peers { + peers: [ + Peer { + id: "did:key:z6MktxbrtQY3yx8Wue2hNS1eA3mEXXVb5n8FDL6a7bHkdZqJ", + addresses: [ + "/ip4/127.0.0.1/tcp/4111/p2p/12D3KooWQKhz3HsKqJuYD2hRKTo95kJ5CHGNhaxNUuYffJUdcHmr", + "/ip4/127.0.0.1/udp/4111/quic-v1/p2p/12D3KooWQKhz3HsKqJuYD2hRKTo95kJ5CHGNhaxNUuYffJUdcHmr", + ], + }, + Peer { + id: "did:key:z6Mkr5oAhjqJc2sedxmaiUQUdQGkdtNkDNpmc9M1gxopvkfP", + addresses: [ + "/ip4/127.0.0.1/tcp/4112/p2p/12D3KooWMSuHrdAaTPefwMSJfWByZ6obJe9XqBetsio7EfzhuUbw", + ], + }, + ], + }, + ) + "#]].assert_debug_eq(&peers) +} + +#[test(tokio::test)] +async fn peer_connect() { + let node_id = NodeKey::random().id(); + let network = Network::InMemory; + let mut mock_p2p_svc = MockP2PService::new(); + let addresses = vec![ + "/ip4/127.0.0.1/tcp/4101/p2p/12D3KooWPFGbRHWfDaWt5MFFeqAHBBq3v5BqeJ4X7pmn2V1t6uNs", + "/ip4/127.0.0.1/udp/4111/quic-v1", + ]; + let addrs = addresses + .iter() + .map(|addr| addr.parse()) + .collect::, _>>() + .unwrap(); + mock_p2p_svc + .expect_peer_connect() + .once() + .with(predicate::eq(addrs)) + .returning(|_| Ok(())); + let server = create_test_server( + node_id, + network, + MockAccessInterestStoreTest::new(), + Arc::new(MockEventStoreTest::new()), + mock_p2p_svc, + None, + ); + let result = server + .peers_post( + &addresses.into_iter().map(ToOwned::to_owned).collect(), + &Context, + ) + .await + .unwrap(); + expect![[r#" + Success + "#]] + .assert_debug_eq(&result); +} +#[test(tokio::test)] +async fn peer_connect_no_peer_id() { + let node_id = NodeKey::random().id(); + let network = Network::InMemory; + let addresses = vec!["/ip4/127.0.0.1/tcp/4101", "/ip4/127.0.0.1/udp/4111/quic-v1"]; + let server = create_test_server( + node_id, + network, + MockAccessInterestStoreTest::new(), + Arc::new(MockEventStoreTest::new()), + MockP2PService::new(), + None, + ); + let result = server + .peers_post( + &addresses.into_iter().map(ToOwned::to_owned).collect(), + &Context, + ) + .await + .unwrap(); + expect![[r#" + BadRequest( + BadRequestResponse { + message: "at least one address must contain a peer id", + }, + ) + "#]] + .assert_debug_eq(&result); +} +#[test(tokio::test)] +async fn peer_connect_conflicting_peer_ids() { + let node_id = NodeKey::random().id(); + let network = Network::InMemory; + let addresses = vec![ + "/ip4/127.0.0.1/tcp/4101/p2p/12D3KooWPFGbRHWfDaWt5MFFeqAHBBq3v5BqeJ4X7pmn2V1t6uNs", + "/ip4/127.0.0.1/udp/4111/quic-v1/p2p/12D3KooWFpSKYLQ6bKLnjrXN5CkGyDsQzAron9UvtHq9yLEKETVQ", + ]; + let server = create_test_server( + node_id, + network, + MockAccessInterestStoreTest::new(), + Arc::new(MockEventStoreTest::new()), + MockP2PService::new(), + None, + ); + let result = server + .peers_post( + &addresses.into_iter().map(ToOwned::to_owned).collect(), + &Context, + ) + .await + .unwrap(); + expect![[r#" + BadRequest( + BadRequestResponse { + message: "more than one unique peer id found in the addresses", + }, + ) + "#]] + .assert_debug_eq(&result); +} + #[test(tokio::test)] async fn stream_state() { let node_id = NodeKey::random().id(); @@ -890,6 +1079,7 @@ async fn stream_state() { network, mock_interest, Arc::new(mock_event_store), + MockP2PService::new(), Some(pipeline), ); let result = server diff --git a/core/src/lib.rs b/core/src/lib.rs index 61eee7a6c..4c83ac325 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -21,7 +21,7 @@ pub use interest::{Interest, PeerId}; pub use jwk::Jwk; pub use network::Network; pub use node_id::{NodeId, NodeKey}; -pub use peer::{PeerEntry, PeerKey}; +pub use peer::{ensure_multiaddr_has_p2p, PeerEntry, PeerKey}; pub use range::RangeOpen; pub use serialize_ext::SerializeExt; pub use stream_id::{StreamId, StreamIdType, METAMODEL_STREAM_ID}; diff --git a/core/src/peer.rs b/core/src/peer.rs index 7e1e9dbf6..24a0640e6 100644 --- a/core/src/peer.rs +++ b/core/src/peer.rs @@ -78,7 +78,8 @@ impl PeerEntry { } } -fn ensure_multiaddr_has_p2p(addr: Multiaddr, peer_id: PeerId) -> Multiaddr { +/// Returns a the provided multiaddr ensuring it contains the specified peer id. +pub fn ensure_multiaddr_has_p2p(addr: Multiaddr, peer_id: PeerId) -> Multiaddr { if !addr.iter().any(|protocol| match protocol { multiaddr::Protocol::P2p(id) => id == peer_id, _ => false, diff --git a/one/src/daemon.rs b/one/src/daemon.rs index af2faa00d..d2189ac56 100644 --- a/one/src/daemon.rs +++ b/one/src/daemon.rs @@ -700,6 +700,7 @@ pub async fn run(opts: DaemonOpts) -> Result<()> { network, interest_api_svc, Arc::new(model_svc), + ipfs.client(), pipeline_ctx, shutdown_signal.resubscribe(), ); diff --git a/one/src/network.rs b/one/src/network.rs index 0c6ab9101..8ed52a2a3 100644 --- a/one/src/network.rs +++ b/one/src/network.rs @@ -2,12 +2,14 @@ use std::sync::Arc; -use anyhow::Result; -use ceramic_core::{EventId, Interest, NodeKey, PeerKey}; +use anyhow::{anyhow, Result}; +use async_trait::async_trait; +use ceramic_core::{EventId, Interest, NodeId, NodeKey, PeerKey}; use ceramic_kubo_rpc::{IpfsMetrics, IpfsMetricsMiddleware, IpfsService}; use ceramic_p2p::{Config as P2pConfig, Libp2pConfig, Node, PeerService}; use iroh_rpc_client::P2pClient; use iroh_rpc_types::{p2p::P2pAddr, Addr}; +use multiaddr::Protocol; use recon::{libp2p::Recon, Sha256a}; use tokio::task::{self, JoinHandle}; use tracing::{debug, error}; @@ -85,13 +87,12 @@ impl Builder { where S: iroh_bitswap::Store, { - let ipfs_service = Arc::new(IpfsService::new( - P2pClient::new(self.state.p2p.addr.clone()).await?, - block_store, - )); + let client = P2pClient::new(self.state.p2p.addr.clone()).await?; + let ipfs_service = Arc::new(IpfsService::new(client.clone(), block_store)); let ipfs_service = IpfsMetricsMiddleware::new(ipfs_service, ipfs_metrics); Ok(Ipfs { api: ipfs_service, + client, p2p: self.state.p2p, }) } @@ -100,6 +101,7 @@ impl Builder { // Provides Ipfs node implementation pub struct Ipfs { api: IpfsMetricsMiddleware>>, + client: P2pClient, p2p: Service, } @@ -110,12 +112,54 @@ impl Ipfs { pub fn api(&self) -> IpfsMetricsMiddleware>> { self.api.clone() } + pub fn client(&self) -> IpfsClient { + IpfsClient(self.client.clone()) + } pub async fn stop(self) -> Result<()> { self.p2p.stop().await?; Ok(()) } } +#[derive(Clone)] +pub struct IpfsClient(P2pClient); + +#[async_trait] +impl ceramic_api::P2PService for IpfsClient { + async fn peers(&self) -> Result> { + self.0 + .get_peers() + .await? + .into_iter() + .map(|(peer_id, addresses)| { + Ok(ceramic_api::PeerInfo { + id: NodeId::try_from_peer_id(&peer_id)?, + addresses, + }) + }) + .collect() + } + async fn peer_connect(&self, addrs: &[ceramic_api::Multiaddr]) -> Result<()> { + // Find peer id in one of the addresses + let peer_id = addrs + .iter() + .filter_map(|addr| { + addr.iter() + .filter_map(|protocol| { + if let Protocol::P2p(peer_id) = protocol { + Some(peer_id) + } else { + None + } + }) + .next() + }) + .next() + .ok_or_else(|| anyhow!("multi addr must contain a peer id"))?; + self.0.connect(peer_id, addrs.to_vec()).await + } +} + struct Service { addr: A, task: JoinHandle<()>,