Skip to content

Commit

Permalink
s/Store/Box<Store>/ WIP, compiler is happy
Browse files Browse the repository at this point in the history
  • Loading branch information
fl0rek committed Sep 19, 2023
1 parent 6ffe648 commit 761d6da
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 60 deletions.
5 changes: 3 additions & 2 deletions node/src/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ use tracing::instrument;
mod client;
mod server;
mod utils;

use crate::exchange::client::ExchangeClientHandler;
use crate::exchange::server::ExchangeServerHandler;
use crate::p2p::P2pError;
use crate::peer_tracker::PeerTracker;
use crate::store::Store;
use crate::store::WrappedStore;
use crate::utils::{stream_protocol_id, OneshotResultSender};

/// Max request size in bytes
Expand All @@ -48,7 +49,7 @@ pub(crate) struct ExchangeBehaviour {
pub(crate) struct ExchangeConfig<'a> {
pub network_id: &'a str,
pub peer_tracker: Arc<PeerTracker>,
pub header_store: Arc<Store>,
pub header_store: WrappedStore,
}

#[derive(Debug, thiserror::Error)]
Expand Down
7 changes: 3 additions & 4 deletions node/src/exchange/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@ use libp2p::{
request_response::{InboundFailure, RequestId, ResponseChannel},
PeerId,
};
use std::sync::Arc;
use tracing::instrument;

use crate::store::Store;
use crate::store::WrappedStore;

pub(super) struct ExchangeServerHandler {
_store: Arc<Store>,
_store: WrappedStore,
}

impl ExchangeServerHandler {
pub(super) fn new(store: Arc<Store>) -> Self {
pub(super) fn new(store: WrappedStore) -> Self {
ExchangeServerHandler { _store: store }
}

Expand Down
4 changes: 2 additions & 2 deletions node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use libp2p::identity::Keypair;
use libp2p::{Multiaddr, PeerId};

use crate::p2p::{P2p, P2pArgs, P2pService};
use crate::store::Store;
use crate::store::{InMemoryStore, WrappedStore};
use crate::syncer::{Syncer, SyncerArgs, SyncerService};

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -53,7 +53,7 @@ where
SyncerSrv: SyncerService<P2pSrv>,
{
pub async fn new(config: NodeConfig) -> Result<Self, NodeError<P2pSrv, SyncerSrv>> {
let store = Arc::new(Store::new());
let store: WrappedStore = Arc::new(Box::new(InMemoryStore::new()));

let p2p = Arc::new(
P2pSrv::start(P2pArgs {
Expand Down
4 changes: 2 additions & 2 deletions node/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use tracing::{debug, info, instrument, warn};
use crate::exchange::{ExchangeBehaviour, ExchangeConfig};
use crate::executor::{spawn, Executor};
use crate::peer_tracker::PeerTracker;
use crate::store::Store;
use crate::store::WrappedStore;
use crate::utils::{gossipsub_ident_topic, OneshotResultSender, OneshotSenderExt};
use crate::Service;

Expand Down Expand Up @@ -74,7 +74,7 @@ pub struct P2pArgs {
pub local_keypair: Keypair,
pub bootstrap_peers: Vec<Multiaddr>,
pub listen_on: Vec<Multiaddr>,
pub store: Arc<Store>,
pub store: WrappedStore,
}

#[doc(hidden)]
Expand Down
101 changes: 56 additions & 45 deletions node/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,82 +1,93 @@
use async_trait::async_trait;
use celestia_types::ExtendedHeader;
use core::fmt::Debug;
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tendermint::Hash;
use thiserror::Error;
use tracing::{error, info, instrument};

trait StoreTrait {
type Header;
type Hash;
type Error;
type Result<T, E = StoreError> = std::result::Result<T, E>;

fn init(genesis: Self::Header) -> Self;
pub type WrappedStore = Arc<Box<dyn Store + Sync + Send>>;

#[async_trait]
pub trait Store: Send + Sync + Debug {
//fn start(&mut self);
//fn stop(&mut self);

// what getters
fn get_by_hash(&self, hash: &Self::Hash) -> Result<Self::Header, Self::Error>;
fn get_by_height(&self, height: u64) -> Result<Self::Header, Self::Error>;
fn get_by_hash(&self, hash: &Hash) -> Result<ExtendedHeader>;
fn get_by_height(&self, height: u64) -> Result<ExtendedHeader>;

fn height(&self) -> u64;
fn has(&self, hash: &Self::Hash) -> bool;
fn has(&self, hash: &Hash) -> bool;
fn has_at(&self, height: u64) -> bool;

fn append<I: IntoIterator<Item = Self::Header>>(
&mut self,
headers: I,
) -> Result<(), Self::Error>;
fn append_single(&mut self, header: ExtendedHeader) -> Result<()>; //headers: Vec<ExtendedHeader>) -> Result<()>;
}

trait StoreExt {
fn append<I: IntoIterator<Item = ExtendedHeader>>(&mut self, headers: I) -> Result<()>;
}

impl<S: Store> StoreExt for S {
fn append<I: IntoIterator<Item = ExtendedHeader>>(&mut self, headers: I) -> Result<()> {
let headers = headers.into_iter();

for (idx, header) in headers.enumerate() {
if let Err(e) = self.append_single(header) {
error!("error appending: {e}");
return Err(StoreError::ContinuousAppendFailedAt(idx));
}
}

Ok(())
}
}

impl StoreTrait for Store {
type Header = ExtendedHeader;
type Hash = Hash;
type Error = StoreError;
/*
impl StoreBuilder for InMemoryStore {
fn empty() -> WrappedStore {
Arc::new(Box::new(Self::new()))
}
fn init(genesis: Self::Header) -> Self {
Store::with_genesis(genesis)
fn with_genesis(genesis: ExtendedHeader) -> WrappedStore {
Arc::new(Box::new(Self::with_genesis(genesis)))
}
}
*/

fn get_by_hash(&self, hash: &Self::Hash) -> Result<Self::Header, Self::Error> {
impl Store for InMemoryStore {
fn get_by_hash(&self, hash: &Hash) -> Result<ExtendedHeader, StoreError> {
self.get_by_hash(hash)
}

fn get_by_height(&self, height: u64) -> Result<Self::Header, Self::Error> {
fn get_by_height(&self, height: u64) -> Result<ExtendedHeader, StoreError> {
self.get_by_height(height)
}

fn height(&self) -> u64 {
self.get_head_height()
}

fn has(&self, hash: &Self::Hash) -> bool {
fn has(&self, hash: &Hash) -> bool {
self.exists_by_hash(hash)
}

fn has_at(&self, height: u64) -> bool {
self.exists_by_height(height)
}

fn append<I: IntoIterator<Item = Self::Header>>(
&mut self,
headers: I,
) -> Result<(), Self::Error> {
let headers = headers.into_iter();

for (idx, header) in headers.enumerate() {
if let Err(e) = self.append_continuous(header) {
error!("error appending: {e}");
return Err(StoreError::ContinuousAppendFailedAt(idx));
}
}

Ok(())
fn append_single(&mut self, header: ExtendedHeader) -> Result<()> {
self.append_continuous(header)
}
}

#[derive(Debug)]
pub struct Store {
pub struct InMemoryStore {
headers: DashMap<Hash, ExtendedHeader>,
height_to_hash: DashMap<u64, Hash>,
head_height: AtomicU64,
Expand Down Expand Up @@ -104,9 +115,9 @@ pub enum StoreError {
LostHash(Hash),
}

impl Store {
impl InMemoryStore {
pub fn new() -> Self {
Store {
InMemoryStore {
headers: DashMap::new(),
height_to_hash: DashMap::new(),
head_height: AtomicU64::new(0),
Expand All @@ -117,7 +128,7 @@ impl Store {
let genesis_hash = genesis.hash();
let genesis_height = genesis.height().value();

Store {
InMemoryStore {
headers: DashMap::from_iter([(genesis_hash, genesis)]),
height_to_hash: DashMap::from_iter([(genesis_height, genesis_hash)]),
head_height: AtomicU64::new(genesis_height),
Expand Down Expand Up @@ -212,9 +223,9 @@ impl Store {
}
}

impl Default for Store {
impl Default for InMemoryStore {
fn default() -> Self {
Store::new()
Self::new()
}
}

Expand Down Expand Up @@ -277,8 +288,8 @@ pub mod tests {
.unwrap()
}

pub fn gen_filled_store(height: u64) -> Store {
let s = Store::new();
pub fn gen_filled_store(height: u64) -> InMemoryStore {
let s = InMemoryStore::new();

// block height is 1-indexed
for height in 1..=height {
Expand All @@ -291,7 +302,7 @@ pub mod tests {

#[test]
fn test_empty_store() {
let s = Store::new();
let s = InMemoryStore::new();
assert_eq!(s.get_head_height(), 0);
assert_eq!(s.get_head(), Err(StoreError::NotFound));
assert_eq!(s.get_by_height(1), Err(StoreError::NotFound));
Expand All @@ -303,7 +314,7 @@ pub mod tests {

#[test]
fn test_read_write() {
let s = Store::new();
let s = InMemoryStore::new();
let header = gen_extended_header(1);
s.append_continuous(header.clone()).unwrap();
assert_eq!(s.get_head_height(), 1);
Expand Down
6 changes: 3 additions & 3 deletions node/src/syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use async_trait::async_trait;

use crate::{p2p::P2pService, store::Store, Service};
use crate::{p2p::P2pService, store::WrappedStore, Service};

type Result<T, E = SyncerError> = std::result::Result<T, E>;

Expand All @@ -13,12 +13,12 @@ pub enum SyncerError {}
#[derive(Debug)]
pub struct Syncer<P2pSrv: P2pService> {
p2p: Arc<P2pSrv>,
store: Arc<Store>,
store: WrappedStore,
}

pub struct SyncerArgs<P2pSrv: P2pService> {
pub p2p: Arc<P2pSrv>,
pub store: Arc<Store>,
pub store: WrappedStore,
}

#[doc(hidden)]
Expand Down
4 changes: 2 additions & 2 deletions types/src/extended_header.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use core::fmt::{Display, Formatter};
use std::fmt::{Display, Formatter};
use std::time::Duration;

use celestia_proto::header::pb::ExtendedHeader as RawExtendedHeader;
Expand Down Expand Up @@ -30,7 +30,7 @@ pub struct ExtendedHeader {
}

impl Display for ExtendedHeader {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "hash: {}; height: {}", self.hash(), self.height())
}
}
Expand Down

0 comments on commit 761d6da

Please sign in to comment.