Skip to content

Commit

Permalink
Add FragmentType enum and field to ListActorSplitsResponse in pro…
Browse files Browse the repository at this point in the history
…to and rw_actor_splits.rs
  • Loading branch information
shanicky committed Sep 27, 2024
1 parent ea4bae6 commit 7bb6408
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 9 deletions.
7 changes: 7 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,18 @@ message ListActorStatesResponse {
message ListActorSplitsRequest {}

message ListActorSplitsResponse {
enum FragmentType {
UNSPECIFIED = 0;
NON_SHARED_SOURCE = 1;
SHARED_SOURCE = 2;
SHARED_SOURCE_BACKFILL = 3;
}
message ActorSplit {
uint32 actor_id = 1;
uint32 fragment_id = 2;
uint32 source_id = 3;
string split_id = 4;
FragmentType fragment_type = 5;
}
repeated ActorSplit actor_splits = 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use risingwave_common::types::Fields;
use risingwave_frontend_macro::system_catalog;
use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
use risingwave_pb::meta::list_actor_splits_response::{ActorSplit, FragmentType};

use crate::catalog::system_catalog::SysCatalogReaderImpl;
use crate::error::Result;
Expand All @@ -26,6 +26,7 @@ struct RwActorSplit {
split_id: String,
source_id: i32,
fragment_id: i32,
fragment_type: String,
}

impl From<ActorSplit> for RwActorSplit {
Expand All @@ -35,6 +36,10 @@ impl From<ActorSplit> for RwActorSplit {
split_id: actor_split.split_id,
source_id: actor_split.source_id as _,
fragment_id: actor_split.fragment_id as _,
fragment_type: FragmentType::try_from(actor_split.fragment_type)
.unwrap_or(FragmentType::Unspecified)
.as_str_name()
.to_string(),
}
}
}
Expand Down
38 changes: 30 additions & 8 deletions src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use risingwave_meta::model::ActorId;
use risingwave_meta::stream::{SourceManagerRunningInfo, ThrottleConfig};
use risingwave_meta_model_v2::{SourceId, StreamingParallelism};
use risingwave_pb::meta::cancel_creating_jobs_request::Jobs;
use risingwave_pb::meta::list_actor_splits_response::FragmentType;
use risingwave_pb::meta::list_table_fragments_response::{
ActorInfo, FragmentInfo, TableFragmentInfo,
};
Expand Down Expand Up @@ -438,37 +439,57 @@ impl StreamManagerService for StreamServiceImpl {
mut actor_splits,
} = self.stream_manager.source_manager.get_running_info().await;

let source_actors = mgr.catalog_controller.list_source_actors().await?;

let is_shared_source = mgr
.catalog_controller
.list_source_id_with_shared_types()
.await?;

let fragment_to_source: HashMap<_, _> =
source_fragments
.into_iter()
.flat_map(|(source_id, fragment_ids)| {
let source_type = if is_shared_source
.get(&(source_id as _))
.copied()
.unwrap_or(false)
{
FragmentType::SharedSource
} else {
FragmentType::NonSharedSource
};

fragment_ids
.into_iter()
.map(move |fragment_id| (fragment_id, source_id))
.map(move |fragment_id| (fragment_id, (source_id, source_type)))
})
.chain(backfill_fragments.into_iter().flat_map(
|(source_id, fragment_ids)| {
fragment_ids.into_iter().flat_map(
move |(fragment_id, upstream_fragment_id)| {
[
(fragment_id, source_id),
(upstream_fragment_id, source_id),
(
fragment_id,
(source_id, FragmentType::SharedSourceBackfill),
),
(
upstream_fragment_id,
(source_id, FragmentType::SharedSource),
),
]
},
)
},
))
.collect();

let source_actors = mgr.catalog_controller.list_source_actors().await?;

let actor_splits = source_actors
.into_iter()
.flat_map(|(actor_id, fragment_id)| {
let source_id = fragment_to_source
let (source_id, fragment_type) = fragment_to_source
.get(&(fragment_id as _))
.copied()
.map(|id| id as _)
.unwrap_or_default();

actor_splits
Expand All @@ -477,9 +498,10 @@ impl StreamManagerService for StreamServiceImpl {
.into_iter()
.map(move |split| list_actor_splits_response::ActorSplit {
actor_id: actor_id as _,
source_id,
source_id: source_id as _,
fragment_id: fragment_id as _,
split_id: split.id().to_string(),
fragment_type: fragment_type.into(),
})
})
.collect_vec();
Expand Down
22 changes: 22 additions & 0 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2723,6 +2723,28 @@ impl CatalogController {
inner.list_sources().await
}

// Return a hashmap to distinguish whether each source is shared or not.
pub async fn list_source_id_with_shared_types(&self) -> MetaResult<HashMap<SourceId, bool>> {
let inner = self.inner.read().await;
let source_ids: Vec<(SourceId, Option<StreamSourceInfo>)> = Source::find()
.select_only()
.columns([source::Column::SourceId, source::Column::SourceInfo])
.into_tuple()
.all(&inner.db)
.await?;

Ok(source_ids
.into_iter()
.map(|(source_id, info)| {
(
source_id,
info.map(|info| info.to_protobuf().cdc_source_job)
.unwrap_or(false),
)
})
.collect())
}

pub async fn list_source_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<SourceId>> {
let inner = self.inner.read().await;
let source_ids: Vec<SourceId> = Source::find()
Expand Down

0 comments on commit 7bb6408

Please sign in to comment.