Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(storage): assert no read version in storage reset #14876

Merged
merged 5 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions src/storage/hummock_test/src/state_store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1328,14 +1328,10 @@ async fn test_gc_watermark_and_clear_shared_buffer() {
min_object_id_epoch2,
);

hummock_storage.clear_shared_buffer().await.unwrap();
drop(local_hummock_storage);

let read_version = local_hummock_storage.read_version();
hummock_storage.clear_shared_buffer().await.unwrap();

let read_version = read_version.read();
assert!(read_version.staging().imm.is_empty());
assert!(read_version.staging().sst.is_empty());
assert_eq!(read_version.committed().max_committed_epoch(), epoch1);
assert_eq!(
hummock_storage
.sstable_object_id_manager()
Expand Down
99 changes: 63 additions & 36 deletions src/storage/src/hummock/event_handler/hummock_event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;

use arc_swap::ArcSwap;
use await_tree::InstrumentAwait;
use itertools::Itertools;
use parking_lot::RwLock;
use prometheus::core::{AtomicU64, GenericGauge};
use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo};
Expand All @@ -37,7 +38,10 @@ use crate::hummock::event_handler::uploader::{
default_spawn_merging_task, HummockUploader, SpawnMergingTask, SpawnUploadTask, SyncedData,
UploadTaskInfo, UploadTaskPayload, UploaderEvent,
};
use crate::hummock::event_handler::{HummockEvent, HummockVersionUpdate};
use crate::hummock::event_handler::{
HummockEvent, HummockReadVersionRef, HummockVersionUpdate, ReadOnlyReadVersionMapping,
ReadOnlyRwLockRef,
};
use crate::hummock::local_version::pinned_version::PinnedVersion;
use crate::hummock::store::version::{
HummockReadVersion, StagingData, StagingSstableInfo, VersionUpdate,
Expand Down Expand Up @@ -116,7 +120,9 @@ pub struct HummockEventHandler {
hummock_event_tx: mpsc::UnboundedSender<HummockEvent>,
hummock_event_rx: mpsc::UnboundedReceiver<HummockEvent>,
pending_sync_requests: BTreeMap<HummockEpoch, oneshot::Sender<HummockResult<SyncResult>>>,
read_version_mapping: Arc<ReadVersionMappingType>,
read_version_mapping: Arc<RwLock<ReadVersionMappingType>>,
/// A copy of `read_version_mapping` but owned by event handler
local_read_version_mapping: HashMap<LocalInstanceId, HummockReadVersionRef>,

version_update_notifier_tx: Arc<tokio::sync::watch::Sender<HummockEpoch>>,
pinned_version: Arc<ArcSwap<PinnedVersion>>,
Expand Down Expand Up @@ -231,6 +237,7 @@ impl HummockEventHandler {
pinned_version: Arc::new(ArcSwap::from_pointee(pinned_version)),
write_conflict_detector,
read_version_mapping,
local_read_version_mapping: Default::default(),
uploader,
refiller,
last_instance_id: 0,
Expand All @@ -246,8 +253,8 @@ impl HummockEventHandler {
self.pinned_version.clone()
}

pub fn read_version_mapping(&self) -> Arc<ReadVersionMappingType> {
self.read_version_mapping.clone()
pub fn read_version_mapping(&self) -> ReadOnlyReadVersionMapping {
ReadOnlyRwLockRef::new(self.read_version_mapping.clone())
}

pub fn buffer_tracker(&self) -> &BufferTracker {
Expand All @@ -270,7 +277,7 @@ impl HummockEventHandler {
// older data first
.rev()
.for_each(|staging_sstable_info| {
Self::for_each_read_version(&self.read_version_mapping, |read_version| {
self.for_each_read_version(|read_version| {
read_version.update(VersionUpdate::Staging(StagingData::Sst(
staging_sstable_info.clone(),
)))
Expand Down Expand Up @@ -308,21 +315,15 @@ impl HummockEventHandler {

/// This function will be performed under the protection of the `read_version_mapping` read
/// lock, and add write lock on each `read_version` operation
fn for_each_read_version<F>(read_version: &Arc<ReadVersionMappingType>, mut f: F)
where
F: FnMut(&mut HummockReadVersion),
{
let read_version_mapping_guard = read_version.read();

read_version_mapping_guard
fn for_each_read_version(&self, mut f: impl FnMut(&mut HummockReadVersion)) {
self.local_read_version_mapping
.values()
.flat_map(HashMap::values)
.for_each(|read_version| f(read_version.write().deref_mut()));
.for_each(|read_version: &HummockReadVersionRef| f(read_version.write().deref_mut()));
}

fn handle_data_spilled(&mut self, staging_sstable_info: StagingSstableInfo) {
// todo: do some prune for version update
Self::for_each_read_version(&self.read_version_mapping, |read_version| {
self.for_each_read_version(|read_version| {
trace!("data_spilled. SST size {}", staging_sstable_info.imm_size());
read_version.update(VersionUpdate::Staging(StagingData::Sst(
staging_sstable_info.clone(),
Expand Down Expand Up @@ -401,6 +402,7 @@ impl HummockEventHandler {
self.uploader.max_synced_epoch(),
self.uploader.max_sealed_epoch(),
);

self.uploader.clear();

for (epoch, result_sender) in self.pending_sync_requests.extract_if(|_, _| true) {
Expand All @@ -413,11 +415,14 @@ impl HummockEventHandler {
);
}

{
Self::for_each_read_version(&self.read_version_mapping, |read_version| {
read_version.clear_uncommitted()
});
}
assert!(
self.local_read_version_mapping.is_empty(),
"read version mapping not empty when clear. remaining tables: {:?}",
self.local_read_version_mapping
.values()
.map(|read_version| read_version.read().table_id())
.collect_vec()
);

if let Some(sstable_object_id_manager) = &self.sstable_object_id_manager {
sstable_object_id_manager
Expand Down Expand Up @@ -472,7 +477,7 @@ impl HummockEventHandler {
.store(Arc::new(new_pinned_version.clone()));

{
Self::for_each_read_version(&self.read_version_mapping, |read_version| {
self.for_each_read_version(|read_version| {
read_version.update(VersionUpdate::CommittedSnapshot(new_pinned_version.clone()))
});
}
Expand Down Expand Up @@ -544,20 +549,19 @@ impl HummockEventHandler {

UploaderEvent::ImmMerged(merge_output) => {
// update read version for corresponding table shards
let read_guard = self.read_version_mapping.read();
if let Some(shards) = read_guard.get(&merge_output.table_id) {
shards.get(&merge_output.instance_id).map_or_else(
|| {
warn!(
"handle ImmMerged: table instance not found. table {}, instance {}",
&merge_output.table_id, &merge_output.instance_id
)
},
|read_version| {
read_version.write().update(VersionUpdate::Staging(
StagingData::MergedImmMem(merge_output.merged_imm),
));
},
if let Some(read_version) = self
.local_read_version_mapping
.get(&merge_output.instance_id)
{
read_version
.write()
.update(VersionUpdate::Staging(StagingData::MergedImmMem(
merge_output.merged_imm,
)));
} else {
warn!(
"handle ImmMerged: table instance not found. table {:?}, instance {}",
&merge_output.table_id, &merge_output.instance_id
)
}
}
Expand Down Expand Up @@ -589,6 +593,13 @@ impl HummockEventHandler {
}

HummockEvent::ImmToUploader(imm) => {
assert!(
self.local_read_version_mapping
.contains_key(&imm.instance_id),
"add imm from non-existing read version instance: instance_id: {}, table_id {}",
imm.instance_id,
imm.table_id,
);
self.uploader.add_imm(imm);
self.uploader.may_flush();
}
Expand All @@ -611,8 +622,14 @@ impl HummockEventHandler {
epoch,
opts,
table_id,
..
instance_id,
} => {
assert!(
self.local_read_version_mapping
.contains_key(&instance_id),
"seal epoch from non-existing read version instance: instance_id: {}, table_id: {}, epoch: {}",
instance_id, table_id, epoch,
);
if let Some((direction, watermarks)) = opts.table_watermarks {
self.uploader
.add_table_watermarks(epoch, table_id, watermarks, direction)
Expand Down Expand Up @@ -648,6 +665,8 @@ impl HummockEventHandler {
);

{
self.local_read_version_mapping
.insert(instance_id, basic_read_version.clone());
let mut read_version_mapping_guard = self.read_version_mapping.write();

read_version_mapping_guard
Expand Down Expand Up @@ -682,6 +701,14 @@ impl HummockEventHandler {
"read version deregister: table_id: {}, instance_id: {}",
table_id, instance_id
);
self.local_read_version_mapping
.remove(&instance_id)
.unwrap_or_else(|| {
panic!(
"DestroyHummockInstance inexist instance table_id {} instance_id {}",
table_id, instance_id
)
});
let mut read_version_mapping_guard = self.read_version_mapping.write();
let entry = read_version_mapping_guard
.get_mut(&table_id)
Expand Down
28 changes: 23 additions & 5 deletions src/storage/src/hummock/event_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use parking_lot::RwLock;
use parking_lot::{RwLock, RwLockReadGuard};
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::HummockEpoch;
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -88,8 +88,7 @@ pub enum HummockEvent {

RegisterReadVersion {
table_id: TableId,
new_read_version_sender:
oneshot::Sender<(Arc<RwLock<HummockReadVersion>>, LocalInstanceGuard)>,
new_read_version_sender: oneshot::Sender<(HummockReadVersionRef, LocalInstanceGuard)>,
is_replicated: bool,
},

Expand Down Expand Up @@ -173,8 +172,27 @@ impl std::fmt::Debug for HummockEvent {
}

pub type LocalInstanceId = u64;
pub type ReadVersionMappingType =
RwLock<HashMap<TableId, HashMap<LocalInstanceId, Arc<RwLock<HummockReadVersion>>>>>;
pub type HummockReadVersionRef = Arc<RwLock<HummockReadVersion>>;
pub type ReadVersionMappingType = HashMap<TableId, HashMap<LocalInstanceId, HummockReadVersionRef>>;
pub type ReadOnlyReadVersionMapping = ReadOnlyRwLockRef<ReadVersionMappingType>;

pub struct ReadOnlyRwLockRef<T>(Arc<RwLock<T>>);

impl<T> Clone for ReadOnlyRwLockRef<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}

impl<T> ReadOnlyRwLockRef<T> {
pub fn new(inner: Arc<RwLock<T>>) -> Self {
Self(inner)
}

pub fn read(&self) -> RwLockReadGuard<'_, T> {
self.0.read()
}
}

pub struct LocalInstanceGuard {
pub table_id: TableId,
Expand Down
4 changes: 2 additions & 2 deletions src/storage/src/hummock/store/hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::hummock::backup_reader::{BackupReader, BackupReaderRef};
use crate::hummock::compactor::CompactorContext;
use crate::hummock::event_handler::hummock_event_handler::BufferTracker;
use crate::hummock::event_handler::{
HummockEvent, HummockEventHandler, HummockVersionUpdate, ReadVersionMappingType,
HummockEvent, HummockEventHandler, HummockVersionUpdate, ReadOnlyReadVersionMapping,
};
use crate::hummock::local_version::pinned_version::{start_pinned_version_worker, PinnedVersion};
use crate::hummock::observer_manager::HummockObserverNode;
Expand Down Expand Up @@ -98,7 +98,7 @@ pub struct HummockStorage {

_shutdown_guard: Arc<HummockStorageShutdownGuard>,

read_version_mapping: Arc<ReadVersionMappingType>,
read_version_mapping: ReadOnlyReadVersionMapping,

backup_reader: BackupReaderRef,

Expand Down
11 changes: 5 additions & 6 deletions src/storage/src/hummock/store/local_hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@ use std::sync::Arc;

use await_tree::InstrumentAwait;
use bytes::Bytes;
use parking_lot::RwLock;
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_common::util::epoch::MAX_SPILL_TIMES;
use risingwave_hummock_sdk::key::{is_empty_key_range, TableKey, TableKeyRange};
use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch};
use tokio::sync::mpsc;
use tracing::{warn, Instrument};

use super::version::{HummockReadVersion, StagingData, VersionUpdate};
use super::version::{StagingData, VersionUpdate};
use crate::error::StorageResult;
use crate::hummock::event_handler::{HummockEvent, LocalInstanceGuard};
use crate::hummock::event_handler::{HummockEvent, HummockReadVersionRef, LocalInstanceGuard};
use crate::hummock::iterator::{
ConcatIteratorInner, Forward, HummockIteratorUnion, MergeIterator, SkipWatermarkIterator,
UserIterator,
Expand Down Expand Up @@ -64,7 +63,7 @@ pub struct LocalHummockStorage {
instance_guard: LocalInstanceGuard,

/// Read handle.
read_version: Arc<RwLock<HummockReadVersion>>,
read_version: HummockReadVersionRef,

/// This indicates that this `LocalHummockStorage` replicates another `LocalHummockStorage`.
/// It's used by executors in different CNs to synchronize states.
Expand Down Expand Up @@ -517,7 +516,7 @@ impl LocalHummockStorage {
#[allow(clippy::too_many_arguments)]
pub fn new(
instance_guard: LocalInstanceGuard,
read_version: Arc<RwLock<HummockReadVersion>>,
read_version: HummockReadVersionRef,
hummock_version_reader: HummockVersionReader,
event_sender: mpsc::UnboundedSender<HummockEvent>,
memory_limiter: Arc<MemoryLimiter>,
Expand Down Expand Up @@ -548,7 +547,7 @@ impl LocalHummockStorage {
}

/// See `HummockReadVersion::update` for more details.
pub fn read_version(&self) -> Arc<RwLock<HummockReadVersion>> {
pub fn read_version(&self) -> HummockReadVersionRef {
self.read_version.clone()
}

Expand Down
17 changes: 6 additions & 11 deletions src/storage/src/hummock/store/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use tracing::Instrument;

use super::StagingDataIterator;
use crate::error::StorageResult;
use crate::hummock::event_handler::HummockReadVersionRef;
use crate::hummock::iterator::{
ConcatIterator, ForwardMergeRangeIterator, HummockIteratorUnion, MergeIterator,
SkipWatermarkIterator, UserIterator,
Expand Down Expand Up @@ -244,6 +245,10 @@ impl HummockReadVersion {
Self::new_with_replication_option(table_id, committed_version, false)
}

pub fn table_id(&self) -> TableId {
self.table_id
}

/// Updates the read version with `VersionUpdate`.
/// There will be three data types to be processed
/// `VersionUpdate::Staging`
Expand Down Expand Up @@ -405,16 +410,6 @@ impl HummockReadVersion {
}
}

pub fn clear_uncommitted(&mut self) {
self.staging.imm.clear();
self.staging.sst.clear();
self.table_watermarks = self
.committed
.table_watermark_index()
.get(&self.table_id)
.cloned()
}

pub fn add_merged_imm(&mut self, merged_imm: ImmutableMemtable) {
assert!(merged_imm.get_imm_ids().iter().rev().is_sorted());
let min_imm_id = *merged_imm.get_imm_ids().last().expect("non-empty");
Expand Down Expand Up @@ -510,7 +505,7 @@ pub fn read_filter_for_batch(
epoch: HummockEpoch, // for check
table_id: TableId,
mut key_range: TableKeyRange,
read_version_vec: Vec<Arc<RwLock<HummockReadVersion>>>,
read_version_vec: Vec<HummockReadVersionRef>,
) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
assert!(!read_version_vec.is_empty());
let mut staging_vec = Vec::with_capacity(read_version_vec.len());
Expand Down
Loading
Loading