Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enha: redis historical values #1564

Merged
merged 2 commits into from
Jul 30, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 106 additions & 27 deletions src/eth/storage/redis/redis_permanent.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#![allow(unused_variables)]

use itertools::Itertools;
use redis::Client as RedisClient;
use redis::Commands;
Expand All @@ -24,6 +22,7 @@ use crate::ext::to_json_string;
use crate::log_and_err;

type RedisVecOptString = RedisResult<Vec<Option<String>>>;
type RedisVecString = RedisResult<Vec<String>>;
type RedisOptString = RedisResult<Option<String>>;
type RedisOptUsize = RedisResult<Option<usize>>;
type RedisVoid = RedisResult<()>;
Expand Down Expand Up @@ -74,30 +73,34 @@ impl PermanentStorage for RedisPermanentStorage {

fn save_block(&self, block: Block) -> anyhow::Result<()> {
// generate block keys
let key_number = key_block_by_number(block.number());
let key_hash = key_block_by_hash(&block.hash());
let key_block_number = key_block_by_number(block.number());
let key_block_hash = key_block_by_hash(&block.hash());

// generate values
let block_json = to_json_string(&block);

// blocks
let mut redis_values = vec![
(key_number, block_json.clone()),
(key_hash, block_json.clone()),
let mut mset_values = vec![
(key_block_number, block_json.clone()),
(key_block_hash, block_json.clone()),
("block::latest".to_owned(), block_json),
];
let mut zadd_values = vec![];

// transactions
for tx in &block.transactions {
let tx_key = key_tx(&tx.input.hash);
let tx_value = to_json_string(&tx);
redis_values.push((tx_key, tx_value));
mset_values.push((tx_key, tx_value));
}

// changes
for changes in block.compact_account_changes() {
// account
let mut account = Account::default();
let mut account = Account {
address: changes.address,
..Account::default()
};
if let Some(nonce) = changes.nonce.take() {
account.nonce = nonce;
}
Expand All @@ -107,27 +110,37 @@ impl PermanentStorage for RedisPermanentStorage {
if let Some(bytecode) = changes.bytecode.take() {
account.bytecode = bytecode;
}
let account_key = key_account(&account.address);

let account_value = to_json_string(&account);
redis_values.push((account_key, account_value));
mset_values.push((key_account(&account.address), account_value.clone()));
zadd_values.push((key_account_history(&account.address), account_value, block.number().as_u64()));

// slot
for slot in changes.slots.into_values() {
if let Some(slot) = slot.take() {
let slot_key = key_slot(&account.address, &slot.index);
let slot_value = to_json_string(&slot);
redis_values.push((slot_key, slot_value));
mset_values.push((key_slot(&account.address, &slot.index), slot_value.clone()));
zadd_values.push((key_slot_history(&account.address, &slot.index), slot_value, block.number().as_u64()));
}
}
}

// execute command
// execute mset command
let mut conn = self.conn()?;
let set: RedisVoid = conn.mset(&redis_values);
match set {
Ok(_) => Ok(()),
Err(e) => log_and_err!(reason = e, "failed to write block to redis"),
let set: RedisVoid = conn.mset(&mset_values);
if let Err(e) = set {
return log_and_err!(reason = e, "failed to write block mset to redis");
}

// execute zadd commands
for (key, value, score) in zadd_values {
let zadd: RedisVoid = conn.zadd(key, value, score);
if let Err(e) = zadd {
return log_and_err!(reason = e, "failed to write block zadd to redis");
}
}

Ok(())
}

fn read_block(&self, block_filter: &BlockFilter) -> anyhow::Result<Option<Block>> {
Expand Down Expand Up @@ -231,39 +244,95 @@ impl PermanentStorage for RedisPermanentStorage {

fn read_account(&self, address: &Address, point_in_time: &crate::eth::storage::StoragePointInTime) -> anyhow::Result<Option<Account>> {
// prepare keys
let account_key = key_account(address);

// execute and parse
let mut conn = self.conn()?;
match point_in_time {
StoragePointInTime::Mined | StoragePointInTime::Pending => {
// prepare key
let account_key = key_account(address);

// execute
let redis_account: RedisOptString = conn.get(account_key);

// parse
match redis_account {
Ok(Some(json)) => Ok(Some(from_json_str(&json))),
Ok(None) => Ok(None),
Err(e) => log_and_err!(reason = e, "failed to read account from redis"),
Err(e) => log_and_err!(reason = e, "failed to read account from redis current value"),
}
}
StoragePointInTime::MinedPast(number) => {
// prepare key
let account_key = key_account_history(address);

// execute
let mut cmd = redis::cmd("ZRANGE");
cmd.arg(account_key)
.arg(number.as_u64())
.arg(0)
.arg("BYSCORE")
.arg("REV")
.arg("LIMIT")
.arg(0)
.arg(1);
let redis_account: RedisVecString = cmd.query(&mut conn);

// parse
match redis_account {
Ok(vec_json) => match vec_json.first() {
Some(json) => Ok(Some(from_json_str(json))),
None => Ok(None),
},
Err(e) => log_and_err!(reason = e, "failed to read account from redis historical value"),
}
}
StoragePointInTime::MinedPast(number) => Ok(None),
}
}

fn read_slot(&self, address: &Address, index: &SlotIndex, point_in_time: &crate::eth::storage::StoragePointInTime) -> anyhow::Result<Option<Slot>> {
// prepare keys
let slot_key = key_slot(address, index);

fn read_slot(&self, address: &Address, index: &SlotIndex, point_in_time: &StoragePointInTime) -> anyhow::Result<Option<Slot>> {
// execute command and parse
let mut conn = self.conn()?;
match point_in_time {
StoragePointInTime::Mined | StoragePointInTime::Pending => {
// prepare key
let slot_key = key_slot(address, index);

// execute
let redis_slot: RedisOptString = conn.get(slot_key);

// parse
match redis_slot {
Ok(Some(json)) => Ok(Some(from_json_str(&json))),
Ok(None) => Ok(None),
Err(e) => log_and_err!(reason = e, "failed to read slot from redis"),
Err(e) => log_and_err!(reason = e, "failed to read slot from redis current value"),
}
}
StoragePointInTime::MinedPast(number) => {
// prepare key
let slot_key = key_slot_history(address, index);

// execute
let mut cmd = redis::cmd("ZRANGE");
cmd.arg(slot_key)
.arg(number.as_u64())
.arg(0)
.arg("BYSCORE")
.arg("REV")
.arg("LIMIT")
.arg(0)
.arg(1);
let redis_account: RedisVecString = cmd.query(&mut conn);

// parse
match redis_account {
Ok(vec_json) => match vec_json.first() {
Some(json) => Ok(Some(from_json_str(json))),
None => Ok(None),
},
Err(e) => log_and_err!(reason = e, "failed to read account from redis historical value"),
}
}
StoragePointInTime::MinedPast(number) => Ok(None),
}
}

Expand Down Expand Up @@ -297,11 +366,21 @@ fn key_account(address: &Address) -> String {
format!("account::{}", address)
}

/// Generates a key for accessing an account history.
fn key_account_history(address: &Address) -> String {
format!("account_history::{}", address)
}

/// Generates a key for accessing a slot.
fn key_slot(address: &Address, index: &SlotIndex) -> String {
format!("slot::{}::{}", address, index)
}

/// Generates a key for accessing a slot history.
fn key_slot_history(address: &Address, index: &SlotIndex) -> String {
format!("slot_history::{}::{}", address, index)
}

/// Generates a key for accessing a transaction.
fn key_tx(hash: &Hash) -> String {
format!("tx::{}", hash)
Expand Down