Skip to content

Commit

Permalink
Merge branch 'main' into add-clock-e2e-test
Browse files Browse the repository at this point in the history
  • Loading branch information
gabriel-aranha-cw authored May 22, 2024
2 parents 9f0d8a7 + 76808a9 commit abc1724
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 69 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
!docker/
!docker/Dockerfile.run_with_importer
!src/
!.cargo/
!Cargo.toml
!Cargo.lock
!static/
Expand Down
57 changes: 33 additions & 24 deletions chaos/install-dependencies.sh
Original file line number Diff line number Diff line change
@@ -1,27 +1,36 @@
#!/bin/sh

echo "Checking OS and installing dependencies..."

if [ "$(uname)" = "Darwin" ]; then
echo "Installing dependencies for macOS..."
if ! [ -x "$(command -v kind)" ]; then
brew install kind
fi
if ! [ -x "$(command -v kubectl)" ]; then
brew install kubectl
fi
#!/bin/bash

set -e

echo "Checking if Kind cluster exists..."
if ! kind get clusters | grep -q local-testing; then
echo "Setting up Kind cluster..."
kind create cluster --name local-testing
kind get kubeconfig --name local-testing > kubeconfig.yaml
else
echo "Installing dependencies for Linux..."
if ! [ -x "$(command -v kind)" ]; then
curl -Lo ./kind https://kind.sigs.k8s.io/dl/v0.11.1/kind-linux-amd64
chmod +x ./kind
sudo mv ./kind /usr/local/bin/kind
fi
if ! [ -x "$(command -v kubectl)" ]; then
curl -LO "https://storage.googleapis.com/kubernetes-release/release/$(curl -s https://storage.googleapis.com/kubernetes-release/release/stable.txt)/bin/linux/amd64/kubectl"
chmod +x ./kubectl
sudo mv ./kubectl /usr/local/bin/kubectl
fi
echo "Kind cluster already exists."
fi

echo "Dependencies installed"
echo "Configuring kubectl to use Kind cluster..."
export KUBECONFIG=$(pwd)/kubeconfig.yaml

echo "Checking if Docker image is already built..."
if ! docker images | grep -q local/run_with_importer; then
echo "Building Docker image..."
docker build -t local/run_with_importer -f ./docker/Dockerfile.run_with_importer .
else
echo "Docker image already built."
fi

echo "Loading Docker image into Kind..."
kind load docker-image local/run_with_importer --name local-testing

echo "Deploying application..."
kubectl apply -f chaos/local-deployment.yaml
kubectl apply -f chaos/local-service.yaml

echo "Waiting for pods to be ready..."
kubectl wait --for=condition=ready pod -l app=stratus-api --timeout=180s

echo "Deployment complete. Checking pod status..."
kubectl get pods -o wide
1 change: 1 addition & 0 deletions docker/Dockerfile.run_with_importer
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ COPY .sqlx /app/.sqlx
COPY build.rs /app/build.rs
COPY Cargo.toml /app/Cargo.toml
COPY Cargo.lock /app/Cargo.lock
COPY .cargo .cargo

RUN apt update
RUN apt-get install -y libclang-dev cmake
Expand Down
23 changes: 15 additions & 8 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -550,15 +550,23 @@ local-chaos-setup:
@echo $(pwd)
@echo "Installing dependencies..."
./chaos/install-dependencies.sh
@echo "Cleaning up any existing Kind cluster..."
kind delete cluster --name local-testing || true
@echo "Setting up Kind cluster..."
kind create cluster --name local-testing
@echo "Checking if Kind cluster exists..."
if ! kind get clusters | grep -q local-testing; then \
echo "Setting up Kind cluster..."; \
kind create cluster --name local-testing; \
kind get kubeconfig --name local-testing > kubeconfig.yaml; \
else \
echo "Kind cluster already exists."; \
fi
@echo "Configuring kubectl to use Kind cluster..."
kind get kubeconfig --name local-testing > kubeconfig.yaml
export KUBECONFIG=$(pwd)/kubeconfig.yaml
@echo "Building Docker image..."
docker build -t local/run_with_importer -f ./docker/Dockerfile.run_with_importer .
@echo "Checking if Docker image is already built..."
if ! docker images | grep -q local/run_with_importer; then \
echo "Building Docker image..."; \
docker build -t local/run_with_importer -f ./docker/Dockerfile.run_with_importer .; \
else \
echo "Docker image already built."; \
fi
@echo "Loading Docker image into Kind..."
kind load docker-image local/run_with_importer --name local-testing
@echo "Deploying application..."
Expand All @@ -575,7 +583,6 @@ local-chaos-cleanup:
kind delete cluster --name local-testing
@echo "Cleanup complete."


# Chaos Testing: Run chaos test
local-chaos-test:
just local-chaos-setup
Expand Down
3 changes: 2 additions & 1 deletion src/bin/importer_online.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use stratus::eth::BlockMiner;
use stratus::eth::Executor;
use stratus::ext::warn_task_cancellation;
use stratus::ext::warn_task_tx_closed;
use stratus::ext::DisplayExt;
#[cfg(feature = "metrics")]
use stratus::infra::metrics;
use stratus::infra::BlockchainClient;
Expand Down Expand Up @@ -233,7 +234,7 @@ async fn start_number_fetcher(chain: Arc<BlockchainClient>, cancellation: Cancel
Ok(number) => {
tracing::info!(
%number,
sync_interval = %humantime::Duration::from(sync_interval),
sync_interval = %sync_interval.to_string_ext(),
"fetched current block number via http. awaiting sync interval to retrieve again."
);
RPC_CURRENT_BLOCK.store(number.as_u64(), Ordering::SeqCst);
Expand Down
58 changes: 25 additions & 33 deletions src/eth/block_miner.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::Duration;

use ethereum_types::BloomInput;
use keccak_hasher::KeccakHasher;
use nonempty::NonEmpty;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;

use super::Consensus;
Expand All @@ -23,6 +24,7 @@ use crate::eth::primitives::TransactionMined;
use crate::eth::storage::StratusStorage;
use crate::ext::not;
use crate::ext::spawn_named;
use crate::ext::DisplayExt;
use crate::log_and_err;

pub struct BlockMiner {
Expand Down Expand Up @@ -64,18 +66,12 @@ impl BlockMiner {
let Some(block_time) = self.block_time else {
return log_and_err!("cannot spawn interval miner because it does not have a block time defined");
};
tracing::info!(block_time = %humantime::Duration::from(block_time), "spawning interval miner");
tracing::info!(block_time = %block_time.to_string_ext(), "spawning interval miner");

// spawn miner and ticker
let pending_blocks = Arc::new(AtomicUsize::new(0));
spawn_named(
"miner::miner",
interval_miner::run(Arc::clone(&self), Arc::clone(&pending_blocks), cancellation.child_token()),
);
spawn_named(
"miner::ticker",
interval_miner_ticker::run(block_time, Arc::clone(&pending_blocks), cancellation.child_token()),
);
let (ticks_tx, ticks_rx) = mpsc::unbounded_channel::<Instant>();
spawn_named("miner::miner", interval_miner::run(Arc::clone(&self), ticks_rx, cancellation.child_token()));
spawn_named("miner::ticker", interval_miner_ticker::run(block_time, ticks_tx, cancellation.child_token()));

Ok(())
}
Expand Down Expand Up @@ -316,36 +312,29 @@ pub fn block_from_local(number: BlockNumber, txs: NonEmpty<LocalTransactionExecu
// Miner
// -----------------------------------------------------------------------------
mod interval_miner {
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use tokio::task::yield_now;
use tokio::sync::mpsc;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;

use crate::eth::BlockMiner;
use crate::ext::warn_task_cancellation;
use crate::ext::warn_task_rx_closed;

pub async fn run(miner: Arc<BlockMiner>, pending_blocks: Arc<AtomicUsize>, cancellation: CancellationToken) {
loop {
pub async fn run(miner: Arc<BlockMiner>, mut ticks_rx: mpsc::UnboundedReceiver<Instant>, cancellation: CancellationToken) {
while let Some(tick) = ticks_rx.recv().await {
// check cancellation
if cancellation.is_cancelled() {
warn_task_cancellation("interval miner");
return;
}

// check pending blocks and mine if necessary
let pending = pending_blocks
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |n| Some(n.saturating_sub(1)))
.unwrap();
if pending > 0 {
tracing::info!(%pending, "interval mining block");
mine_and_commit(&miner).await;
} else {
tracing::debug!(%pending, "waiting for block interval");
yield_now().await;
}
// mine
tracing::info!(lag_ys = %tick.elapsed().as_micros(), "interval mining block");
mine_and_commit(&miner).await;
}
warn_task_rx_closed("interval miner");
}

#[inline(always)]
Expand Down Expand Up @@ -374,19 +363,19 @@ mod interval_miner {
}

mod interval_miner_ticker {
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::thread;
use std::time::Duration;

use chrono::Timelike;
use chrono::Utc;
use tokio::sync::mpsc;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;

use crate::ext::warn_task_cancellation;
use crate::ext::warn_task_rx_closed;

pub async fn run(block_time: Duration, pending_blocks: Arc<AtomicUsize>, cancellation: CancellationToken) {
pub async fn run(block_time: Duration, ticks_tx: mpsc::UnboundedSender<Instant>, cancellation: CancellationToken) {
// sync to next second
let next_second = (Utc::now() + Duration::from_secs(1)).with_nanosecond(0).unwrap();
thread::sleep((next_second - Utc::now()).to_std().unwrap());
Expand All @@ -405,8 +394,11 @@ mod interval_miner_ticker {
}

// await next tick
let _ = ticker.tick().await;
let _ = pending_blocks.fetch_add(1, Ordering::SeqCst);
let tick = ticker.tick().await;
if ticks_tx.send(tick).is_err() {
warn_task_rx_closed("interval miner ticker");
break;
};
}
}
}
2 changes: 1 addition & 1 deletion src/eth/rpc/rpc_middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl<F: Future<Output = MethodResponse>> Future for RpcResponse<F> {
id = %proj.id,
method = %proj.method,
function = %proj.function.clone().unwrap_or_default(),
duration_ms = %elapsed.as_millis(),
duration_ys = %elapsed.as_micros(),
success = %response.is_success(),
result = %response.as_result(),
"rpc response"
Expand Down
27 changes: 25 additions & 2 deletions src/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,24 +113,47 @@ impl<T> OptionExt<T> for Option<T> {
}
}

// -----------------------------------------------------------------------------
// Display
// -----------------------------------------------------------------------------

/// Allows to implement `to_string` for types that does not have it.
pub trait DisplayExt {
/// `to_string` for types that does not have it implemented.
fn to_string_ext(&self) -> String;
}

impl DisplayExt for std::time::Duration {
fn to_string_ext(&self) -> String {
humantime::Duration::from(*self).to_string()
}
}

// -----------------------------------------------------------------------------
// Tracing
// -----------------------------------------------------------------------------

/// Emits an warnign that a task is exiting because it received a cancenllation signal.
/// Emits an warning that a task is exiting because it received a cancenllation signal.
#[track_caller]
pub fn warn_task_cancellation(task: &str) {
let message = format!("exiting {} because it received a cancellation signal", task);
tracing::warn!(%message);
}

/// Emits an warnign that a task is exiting because the tx signal it is reading was closed.
/// Emits an warning that a task is exiting because the tx side was closed.
#[track_caller]
pub fn warn_task_tx_closed(task: &str) {
let message = format!("exiting {} because the tx channel on the other side was closed", task);
tracing::warn!(%message);
}

/// Emits an warning that a task is exiting because the rx side was closed.
#[track_caller]
pub fn warn_task_rx_closed(task: &str) {
let message = format!("exiting {} because the rx channel on the other side was closed", task);
tracing::warn!(%message);
}

// -----------------------------------------------------------------------------
// Tokio
// -----------------------------------------------------------------------------
Expand Down

0 comments on commit abc1724

Please sign in to comment.