Skip to content

Commit

Permalink
feat: add value support to Recon (#217)
Browse files Browse the repository at this point in the history
With this change Recon is a key value store instead of simply a key
store.

Recon synchronization is the same in principle but its underlying
protocol has been refactored.

The protocol design is now has clear initiator and responder roles.
Generally the initiator makes requests and the responder responds.
Messages themselves are small and represent a small amount of work. This
way both nodes can be actively working concurrently to synchronize.
Prior to this change nodes would be idle while the other was working and
would frequently deadlock if a single message size grew too large.

Values are synchronized by sending ValueRequests and ValueResponses
along side synchronization messages.

The API into the Recon protocol is now a Stream + Sink allowing any
system that can transport messages in full duplex a possible transport
for Recon. The protocol implementation is no longer specific to libp2p.
This means we can use HTTP in the future. However this PR removes the
HTTP implementation as its not trivial with the HTTP server we are using
to create a full duplex channel.
  • Loading branch information
nathanielc authored Jan 23, 2024
1 parent 4a55ed9 commit 5284db8
Show file tree
Hide file tree
Showing 55 changed files with 3,982 additions and 316,883 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion api-server/.openapi-generator/FILES
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ Cargo.toml
README.md
api/openapi.yaml
docs/Event.md
docs/Ring.md
docs/Version.md
docs/default_api.md
examples/ca.pem
Expand Down
4 changes: 1 addition & 3 deletions api-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ To see how to make this your own, look here:
[README]((https://openapi-generator.tech))

- API version: 0.9.0
- Build date: 2023-11-13T20:39:15.118815608Z[Etc/UTC]
- Build date: 2024-01-17T12:46:40.618372865-07:00[America/Denver]



Expand Down Expand Up @@ -100,15 +100,13 @@ Method | HTTP request | Description
------------- | ------------- | -------------
[****](docs/default_api.md#) | **POST** /events | Creates a new event
[****](docs/default_api.md#) | **GET** /liveness | Test the liveness of the Ceramic node
[****](docs/default_api.md#) | **POST** /recon | Sends a Recon message
[****](docs/default_api.md#) | **GET** /subscribe/{sort_key}/{sort_value} | Get events for a stream
[****](docs/default_api.md#) | **POST** /version | Get the version of the Ceramic node


## Documentation For Models

- [Event](docs/Event.md)
- [Ring](docs/Ring.md)
- [Version](docs/Version.md)


Expand Down
32 changes: 5 additions & 27 deletions api-server/api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -96,28 +96,6 @@ paths:
"204":
description: success
summary: Creates a new event
/recon:
post:
parameters:
- description: Recon ring
explode: true
in: query
name: ring
required: true
schema:
$ref: '#/components/schemas/Ring'
style: form
requestBody:
$ref: '#/components/requestBodies/Message'
responses:
"200":
content:
application/cbor-seq:
schema:
format: binary
type: string
description: success
summary: Sends a Recon message
components:
requestBodies:
Event:
Expand Down Expand Up @@ -148,17 +126,17 @@ components:
description: A Ceramic event as part of a Ceramic Stream
example:
eventId: eventId
eventData: eventData
properties:
eventId:
description: Multibase encoding of event id bytes.
type: string
eventData:
description: Multibase encoding of event data.
type: string
required:
- eventData
- eventId
title: A Ceramic Event
type: object
Ring:
enum:
- interest
- model
type: string

1 change: 1 addition & 0 deletions api-server/docs/Event.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Name | Type | Description | Notes
------------ | ------------- | ------------- | -------------
**event_id** | **String** | Multibase encoding of event id bytes. |
**event_data** | **String** | Multibase encoding of event data. |

[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md)

Expand Down
27 changes: 0 additions & 27 deletions api-server/docs/default_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ Method | HTTP request | Description
------------- | ------------- | -------------
****](default_api.md#) | **POST** /events | Creates a new event
****](default_api.md#) | **GET** /liveness | Test the liveness of the Ceramic node
****](default_api.md#) | **POST** /recon | Sends a Recon message
****](default_api.md#) | **GET** /subscribe/{sort_key}/{sort_value} | Get events for a stream
****](default_api.md#) | **POST** /version | Get the version of the Ceramic node

Expand Down Expand Up @@ -58,32 +57,6 @@ No authorization required

[[Back to top]](#) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to Model list]](../README.md#documentation-for-models) [[Back to README]](../README.md)

# ****
> swagger::ByteArray (ring, body)
Sends a Recon message

### Required Parameters

Name | Type | Description | Notes
------------- | ------------- | ------------- | -------------
**ring** | [****](.md)| Recon ring |
**body** | **swagger::ByteArray**| Recon message to send |

### Return type

[**swagger::ByteArray**](file.md)

### Authorization

No authorization required

### HTTP request headers

- **Content-Type**: application/cbor-seq
- **Accept**: application/cbor-seq

[[Back to top]](#) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to Model list]](../README.md#documentation-for-models) [[Back to README]](../README.md)

# ****
> Vec<models::Event> (sort_key, sort_value, optional)
Get events for a stream
Expand Down
11 changes: 1 addition & 10 deletions api-server/examples/client/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#[allow(unused_imports)]
use ceramic_api_server::{
models, Api, ApiNoContext, Client, ContextWrapperExt, EventsPostResponse, LivenessGetResponse,
ReconPostResponse, SubscribeSortKeySortValueGetResponse, VersionPostResponse,
SubscribeSortKeySortValueGetResponse, VersionPostResponse,
};
use clap::{App, Arg};
#[allow(unused_imports)]
Expand Down Expand Up @@ -103,15 +103,6 @@ fn main() {
(client.context() as &dyn Has<XSpanIdString>).get().clone()
);
}
/* Disabled because there's no example.
Some("ReconPost") => {
let result = rt.block_on(client.recon_post(
???,
swagger::ByteArray(Vec::from("BYTE_ARRAY_DATA_HERE"))
));
info!("{:?} (X-Span-ID: {:?})", result, (client.context() as &dyn Has<XSpanIdString>).get().clone());
},
*/
Some("SubscribeSortKeySortValueGet") => {
let result = rt.block_on(client.subscribe_sort_key_sort_value_get(
"sort_key_example".to_string(),
Expand Down
21 changes: 2 additions & 19 deletions api-server/examples/server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ impl<C> Server<C> {

use ceramic_api_server::server::MakeService;
use ceramic_api_server::{
Api, EventsPostResponse, LivenessGetResponse, ReconPostResponse,
SubscribeSortKeySortValueGetResponse, VersionPostResponse,
Api, EventsPostResponse, LivenessGetResponse, SubscribeSortKeySortValueGetResponse,
VersionPostResponse,
};
use std::error::Error;
use swagger::ApiError;
Expand Down Expand Up @@ -134,23 +134,6 @@ where
Err(ApiError("Generic failure".into()))
}

/// Sends a Recon message
async fn recon_post(
&self,
ring: models::Ring,
body: swagger::ByteArray,
context: &C,
) -> Result<ReconPostResponse, ApiError> {
let context = context.clone();
info!(
"recon_post({:?}, {:?}) - X-Span-ID: {:?}",
ring,
body,
context.get().0.clone()
);
Err(ApiError("Generic failure".into()))
}

/// Get events for a stream
async fn subscribe_sort_key_sort_value_get(
&self,
Expand Down
102 changes: 2 additions & 100 deletions api-server/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ const FRAGMENT_ENCODE_SET: &AsciiSet = &percent_encoding::CONTROLS
const ID_ENCODE_SET: &AsciiSet = &FRAGMENT_ENCODE_SET.add(b'|');

use crate::{
Api, EventsPostResponse, LivenessGetResponse, ReconPostResponse,
SubscribeSortKeySortValueGetResponse, VersionPostResponse,
Api, EventsPostResponse, LivenessGetResponse, SubscribeSortKeySortValueGetResponse,
VersionPostResponse,
};

/// Convert input into a base path, e.g. "http://example:123". Also checks the scheme as it goes.
Expand Down Expand Up @@ -547,104 +547,6 @@ where
}
}

async fn recon_post(
&self,
param_ring: models::Ring,
param_body: swagger::ByteArray,
context: &C,
) -> Result<ReconPostResponse, ApiError> {
let mut client_service = self.client_service.clone();
let mut uri = format!("{}/ceramic/recon", self.base_path);

// Query parameters
let query_string = {
let mut query_string = form_urlencoded::Serializer::new("".to_owned());
query_string.append_pair("ring", &param_ring.to_string());
query_string.finish()
};
if !query_string.is_empty() {
uri += "?";
uri += &query_string;
}

let uri = match Uri::from_str(&uri) {
Ok(uri) => uri,
Err(err) => return Err(ApiError(format!("Unable to build URI: {}", err))),
};

let mut request = match Request::builder()
.method("POST")
.uri(uri)
.body(Body::empty())
{
Ok(req) => req,
Err(e) => return Err(ApiError(format!("Unable to create request: {}", e))),
};

let body = param_body.0;
*request.body_mut() = Body::from(body);

let header = "application/cbor-seq";
request.headers_mut().insert(
CONTENT_TYPE,
match HeaderValue::from_str(header) {
Ok(h) => h,
Err(e) => {
return Err(ApiError(format!(
"Unable to create header: {} - {}",
header, e
)))
}
},
);
let header = HeaderValue::from_str(Has::<XSpanIdString>::get(context).0.as_str());
request.headers_mut().insert(
HeaderName::from_static("x-span-id"),
match header {
Ok(h) => h,
Err(e) => {
return Err(ApiError(format!(
"Unable to create X-Span ID header value: {}",
e
)))
}
},
);

let response = client_service
.call((request, context.clone()))
.map_err(|e| ApiError(format!("No response received: {}", e)))
.await?;

match response.status().as_u16() {
200 => {
let body = response.into_body();
let body = body
.into_raw()
.map_err(|e| ApiError(format!("Failed to read response: {}", e)))
.await?;
let body = swagger::ByteArray(body.to_vec());
Ok(ReconPostResponse::Success(body))
}
code => {
let headers = response.headers().clone();
let body = response.into_body().take(100).into_raw().await;
Err(ApiError(format!(
"Unexpected response code {}:\n{:?}\n\n{}",
code,
headers,
match body {
Ok(body) => match String::from_utf8(body) {
Ok(body) => body,
Err(e) => format!("<Body was not UTF8: {:?}>", e),
},
Err(e) => format!("<Failed to read body: {}>", e),
}
)))
}
}
}

async fn subscribe_sort_key_sort_value_get(
&self,
param_sort_key: String,
Expand Down
31 changes: 0 additions & 31 deletions api-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,6 @@ pub enum LivenessGetResponse {
Success,
}

#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub enum ReconPostResponse {
/// success
Success(swagger::ByteArray),
}

#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub enum SubscribeSortKeySortValueGetResponse {
/// success
Expand Down Expand Up @@ -73,14 +67,6 @@ pub trait Api<C: Send + Sync> {
/// Test the liveness of the Ceramic node
async fn liveness_get(&self, context: &C) -> Result<LivenessGetResponse, ApiError>;

/// Sends a Recon message
async fn recon_post(
&self,
ring: models::Ring,
body: swagger::ByteArray,
context: &C,
) -> Result<ReconPostResponse, ApiError>;

/// Get events for a stream
async fn subscribe_sort_key_sort_value_get(
&self,
Expand Down Expand Up @@ -114,13 +100,6 @@ pub trait ApiNoContext<C: Send + Sync> {
/// Test the liveness of the Ceramic node
async fn liveness_get(&self) -> Result<LivenessGetResponse, ApiError>;

/// Sends a Recon message
async fn recon_post(
&self,
ring: models::Ring,
body: swagger::ByteArray,
) -> Result<ReconPostResponse, ApiError>;

/// Get events for a stream
async fn subscribe_sort_key_sort_value_get(
&self,
Expand Down Expand Up @@ -173,16 +152,6 @@ impl<T: Api<C> + Send + Sync, C: Clone + Send + Sync> ApiNoContext<C> for Contex
self.api().liveness_get(&context).await
}

/// Sends a Recon message
async fn recon_post(
&self,
ring: models::Ring,
body: swagger::ByteArray,
) -> Result<ReconPostResponse, ApiError> {
let context = self.context().clone();
self.api().recon_post(ring, body, &context).await
}

/// Get events for a stream
async fn subscribe_sort_key_sort_value_get(
&self,
Expand Down
Loading

0 comments on commit 5284db8

Please sign in to comment.