Skip to content

Commit

Permalink
revert some not needed change
Browse files Browse the repository at this point in the history
  • Loading branch information
vutran1710 committed Nov 29, 2023
1 parent 56a055a commit f01ed98
Showing 1 changed file with 4 additions and 1 deletion.
5 changes: 4 additions & 1 deletion ingestor/src/producers/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub struct DeltaLakeConfig {
#[derive(Clone)]
pub struct DeltaLakeProducer {
table: Arc<Mutex<DeltaTable>>,
writer: Arc<Mutex<RecordBatchWriter>>,
schema_ref: Arc<ArrowSchema>,
chain_name: String,
table_path: String,
Expand Down Expand Up @@ -89,8 +90,10 @@ impl DeltaLakeProducer {
.map_err(|e| ProducerError::Initialization(format!("{:?}", e)))?;
let schema_ref = Arc::new(arrow_schema);

let writer = RecordBatchWriter::for_table(&table)?;
let delta_lake_client = Self {
table: Arc::new(Mutex::new(table)),
writer: Arc::new(Mutex::new(writer)),
schema_ref,
chain_name,
table_path: deltalake_cfg.table_path,
Expand Down Expand Up @@ -177,7 +180,7 @@ impl<B: BlockTrait> ProducerTrait<B> for DeltaLakeProducer {
info!("Blocks serialized as json & joined as line-delimited");

let mut table = self.table.lock().await;
let mut writer = RecordBatchWriter::for_table(&table)?;
let mut writer = self.writer.lock().await;

let buf_reader = BufReader::new(content.as_bytes());
let reader = ReaderBuilder::new(self.schema_ref.clone()).with_batch_size(blocks.len());
Expand Down

0 comments on commit f01ed98

Please sign in to comment.