Skip to content

Commit

Permalink
Delta protobuf (#5)
Browse files Browse the repository at this point in the history
* init

* up

* fix

* fix test

* fix tests

* Add zorder

* add config for block-zorder

* rename

* update readme

* fix wrong assert

* up

* rename block-zorder to block-partition

* update table config

* update readme
  • Loading branch information
vutran1710 authored Dec 6, 2023
1 parent a8e04ec commit 5025c07
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 140 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
--batch 50 \
--task-limit 50 \
--channel-size 2 \
--block-partition 100000 \
--config secrets/ethereum.toml \
--request-timeout 50 --producer delta --resumer redis://localhost:6379 --block-descriptor ./ingestor/src/proto/ethereum.desc
```
Expand Down Expand Up @@ -43,6 +44,7 @@ There are optional & required values. Some required values are provided with def
| METRICS_PORT | —metrics-port | u32 | 8060 | | prometheus service port |
| CHANNEL_SIZE | —channel-size | u8 | 50 | 5 | Number of downloaded block pool create in memory |
| REQUEST_TIMEOUT | —request-timeout | u8 | 5 | | http request timeout |
| BLOCK_PARTITION | —block-partition | u32 | 100_000 | depend on chain | blocks-per-file partitioning |
| CONFIG_FILE | —config | string | /etc/config/config.toml | use the sample_eth_config.toml in /secrets/ | config file RPC list endpoints by chain |

Example command:
Expand All @@ -57,6 +59,7 @@ RUST_LOG=info cargo run -p ingestor -- \
--config secrets/config.toml \
--request-timeout 1 \
--resumer redis://localhost:6379
--block-partition 100000 // default: 100_000
```

### Ingestor Mode
Expand Down
71 changes: 0 additions & 71 deletions common_libs/src/delta_utils.rs

This file was deleted.

1 change: 0 additions & 1 deletion common_libs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ pub use tokio_stream;
pub use uuid;
pub use warp;

pub mod delta_utils;
pub mod load_balancer;
pub mod proto_utils;
pub mod utils;
Expand Down
8 changes: 4 additions & 4 deletions ingestor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ pub struct CommandConfig {
#[clap(long, env = "BLOCK_TIME", default_value_t = 20)]
#[arg(long = "block-time", default_value_t = 20)]
pub block_time: u16,
#[clap(long, env = "BLOCK_DESCRIPTOR")]
#[arg(long = "block-descriptor")]
pub block_descriptor: Option<String>,
#[clap(long, env = "BLOCK_PARTITION", default_value_t = 100000)]
#[arg(long = "block-partition", default_value_t = 100000)]
pub block_partition: u32,
}

impl Default for CommandConfig {
Expand All @@ -94,7 +94,7 @@ impl Default for CommandConfig {
resumer_key: None,
resumer: None,
producer: None,
block_descriptor: None,
block_partition: 100_000,
}
}
}
Expand Down
157 changes: 105 additions & 52 deletions ingestor/src/producers/delta.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use serde::Deserialize;
use serde::Serialize;
use std::collections::HashMap;
use std::io::BufReader;
use std::collections::HashSet;
use std::sync::Arc;

use common_libs::async_trait::async_trait;
use common_libs::delta_utils;
use common_libs::deltalake::arrow::array::*;
use common_libs::deltalake::arrow::datatypes::Schema as ArrowSchema;
use common_libs::deltalake::arrow::json::ReaderBuilder;
use common_libs::deltalake::operations::create::CreateBuilder;
use common_libs::deltalake::schema::Schema;
use common_libs::deltalake::schema::SchemaDataType;
Expand All @@ -19,11 +20,6 @@ use common_libs::deltalake::SchemaField;
use common_libs::envy;
use common_libs::log::info;
use common_libs::tokio::sync::Mutex;
use common_libs::utils;

use serde::Deserialize;
use serde::Serialize;
use serde_json::json;

use crate::config::CommandConfig;
use crate::core::ProducerTrait;
Expand Down Expand Up @@ -55,24 +51,54 @@ pub struct DeltaLakeProducer {
schema_ref: Arc<ArrowSchema>,
chain_name: String,
table_path: String,
block_partition: u32,
}

impl DeltaLakeProducer {
pub async fn new(cfg: &CommandConfig) -> Result<Self, ProducerError> {
assert!(
cfg.block_descriptor.is_some(),
"Block descriptor is required"
);
let block_descriptor_file =
utils::load_file(cfg.block_descriptor.clone().unwrap().as_str());
let (chain_name, table_schemas) =
delta_utils::analyze_proto_descriptor(block_descriptor_file);
let deltalake_cfg = envy::from_env::<DeltaLakeConfig>().unwrap();

// NOTE: At this point, table-path always exists! Safe to call unwrap()
let (table, is_create_new) = Self::open_table(
&deltalake_cfg.table_path,
table_schemas,
vec![
SchemaField::new(
"block_number".to_string(),
SchemaDataType::primitive("long".to_string()),
false,
HashMap::default(),
),
SchemaField::new(
"block_partition".to_string(),
SchemaDataType::primitive("long".to_string()),
false,
HashMap::default(),
),
SchemaField::new(
"hash".to_string(),
SchemaDataType::primitive("string".to_string()),
false,
HashMap::default(),
),
SchemaField::new(
"parent_hash".to_string(),
SchemaDataType::primitive("string".to_string()),
false,
HashMap::default(),
),
SchemaField::new(
"block_data".to_string(),
SchemaDataType::primitive("binary".to_string()),
false,
HashMap::default(),
),
SchemaField::new(
"created_at".to_string(),
SchemaDataType::primitive("long".to_string()),
false,
HashMap::default(),
),
],
DeltaLakeProducer::get_table_config(),
)
.await?;
Expand All @@ -83,20 +109,24 @@ impl DeltaLakeProducer {
info!("Created new table");
}

info!("block-parition = {}", cfg.block_partition);

let metadata = table
.get_metadata()
.map_err(|e| ProducerError::Initialization(format!("{:?}", e)))?;
let arrow_schema = <ArrowSchema as TryFrom<&Schema>>::try_from(&metadata.schema.clone())
.map_err(|e| ProducerError::Initialization(format!("{:?}", e)))?;

let schema_ref = Arc::new(arrow_schema);

let writer = RecordBatchWriter::for_table(&table)?;
let delta_lake_client = Self {
table: Arc::new(Mutex::new(table)),
writer: Arc::new(Mutex::new(writer)),
schema_ref,
chain_name,
chain_name: cfg.chain.to_string(),
table_path: deltalake_cfg.table_path,
block_partition: cfg.block_partition,
};
Ok(delta_lake_client)
}
Expand All @@ -106,18 +136,28 @@ impl DeltaLakeProducer {
table_config.insert(DeltaConfigKey::AppendOnly, Some("true".to_string()));
table_config.insert(
DeltaConfigKey::AutoOptimizeAutoCompact,
Some("auto".to_string()),
);
table_config.insert(
DeltaConfigKey::AutoOptimizeOptimizeWrite,
Some("true".to_string()),
);
// TODO: we should determine the exact value here
// table_config.insert(
// DeltaConfigKey::LogRetentionDuration,
// Some("100".to_string()),
// );
// table_config.insert(
// DeltaConfigKey::DeletedFileRetentionDuration,
// Some("20".to_string()),
// );
// table_config.insert(DeltaConfigKey::CheckpointInterval, Some("10".to_string()));
table_config.insert(
DeltaConfigKey::DataSkippingNumIndexedCols,
Some("2".to_string()),
);
table_config.insert(
DeltaConfigKey::LogRetentionDuration,
Some("interval 7 days".to_string()),
);
table_config.insert(
DeltaConfigKey::DeletedFileRetentionDuration,
Some("interval 2 days".to_string()),
);
table_config.insert(
DeltaConfigKey::CheckpointInterval,
Some("interval 1 hour".to_string()),
);
return table_config;
}

Expand All @@ -126,7 +166,10 @@ impl DeltaLakeProducer {
columns: Vec<SchemaField>,
table_config: HashMap<DeltaConfigKey, Option<String>>,
) -> Result<(DeltaTable, bool), ProducerError> {
info!("Opening table at: {table_path}");
info!(
"Opening table at: {table_path}, config={} keys",
table_config.len()
);
let mut table = DeltaTableBuilder::from_uri(table_path)
.with_allow_http(true)
.build()
Expand All @@ -142,13 +185,8 @@ impl DeltaLakeProducer {
let table = CreateBuilder::default()
.with_object_store(table.object_store())
.with_columns(columns)
.with_column(
"created_at",
SchemaDataType::primitive("long".to_string()),
false,
None,
)
.with_configuration(table_config)
.with_partition_columns(vec!["block_partition".to_string()])
.await
.unwrap();

Expand All @@ -165,27 +203,42 @@ impl DeltaLakeProducer {
#[async_trait]
impl<B: BlockTrait> ProducerTrait<B> for DeltaLakeProducer {
async fn publish_blocks(&self, blocks: Vec<B>) -> Result<(), ProducerError> {
let content = blocks
.iter()
.map(|b| {
let json_val = serde_json::to_value(&b).unwrap();
let timestamp = b.get_writer_timestamp();
let mut current_value = json_val.as_object().unwrap().to_owned();
current_value.insert("created_at".to_string(), json!(timestamp));
serde_json::to_string(&current_value).unwrap()
})
.collect::<Vec<String>>()
.join("\n");

info!("Blocks serialized as json & joined as line-delimited");
let mut block_numbers = vec![];
let mut block_partitions = vec![];
let mut block_hashes = vec![];
let mut block_parent_hashes = vec![];
let mut block_data = vec![];
let mut created_ats = vec![];

let mut partition_set = HashSet::new();

for block in blocks {
block_numbers.push(block.get_number() as i64);
let partition_number = block.get_number() / self.block_partition as u64;
partition_set.insert(partition_number);
block_partitions.push(partition_number as i64);
block_hashes.push(block.get_hash());
block_parent_hashes.push(block.get_parent_hash());
let block_bytes = block.encode_to_vec();
block_data.push(block_bytes);
created_ats.push(block.get_writer_timestamp() as i64);
}

info!("block batch partition set = {:?}", partition_set);

let arrow_array: Vec<Arc<dyn Array>> = vec![
Arc::new(Int64Array::from(block_numbers)),
Arc::new(Int64Array::from(block_partitions)),
Arc::new(StringArray::from(block_hashes)),
Arc::new(StringArray::from(block_parent_hashes)),
Arc::new(BinaryArray::from_iter_values(block_data)),
Arc::new(Int64Array::from(created_ats)),
];

let batch = RecordBatch::try_new(self.schema_ref.clone(), arrow_array).unwrap();
let mut table = self.table.lock().await;
let mut writer = self.writer.lock().await;

let buf_reader = BufReader::new(content.as_bytes());
let reader = ReaderBuilder::new(self.schema_ref.clone()).with_batch_size(blocks.len());
let mut reader = reader.build(buf_reader).unwrap();
let batch = reader.next().unwrap().unwrap();
info!("RecordBatch -> rows = {}", batch.num_rows());
writer.write(batch).await?;

Expand Down
6 changes: 3 additions & 3 deletions ingestor/src/tests/mock_ingestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::proto::BlockChain;
use crate::proto::BlockTrait;
use crate::resumers::create_resumer;
use crate::resumers::Resumer;
use common_libs::tokio::sync::Mutex;
use common_libs::tokio::sync::RwLock;
use metrics::Registry;
use std::sync::Arc;

Expand All @@ -25,7 +25,7 @@ pub async fn create_fake_ethereum_ingestor<B: BlockTrait>(
Ingestor<B>,
Arc<FakeClient>,
Arc<Resumer<B>>,
Arc<Mutex<Vec<B>>>,
Arc<RwLock<Vec<B>>>,
),
IngestorError,
>
Expand All @@ -45,7 +45,7 @@ where
let name_service = NameService::from((&cfg, BlockChain::Ethereum));

let stdout_producer = create_producer(&cfg, &name_service).await?;
let mut blocks = Arc::new(Mutex::new(vec![]));
let mut blocks = Arc::new(RwLock::new(vec![]));

if let Producer::StdOut(p) = &stdout_producer {
blocks = p.blocks.clone();
Expand Down
Loading

0 comments on commit 5025c07

Please sign in to comment.