Skip to content

Commit

Permalink
feat: add rw_actor_splits (#18746)
Browse files Browse the repository at this point in the history
Signed-off-by: Shanicky Chen <[email protected]>
  • Loading branch information
shanicky authored Sep 27, 2024
1 parent 8e6ebb0 commit adc30c0
Show file tree
Hide file tree
Showing 11 changed files with 249 additions and 5 deletions.
20 changes: 20 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,25 @@ message ListActorStatesResponse {
repeated ActorState states = 1;
}

message ListActorSplitsRequest {}

message ListActorSplitsResponse {
enum FragmentType {
UNSPECIFIED = 0;
NON_SHARED_SOURCE = 1;
SHARED_SOURCE = 2;
SHARED_SOURCE_BACKFILL = 3;
}
message ActorSplit {
uint32 actor_id = 1;
uint32 fragment_id = 2;
uint32 source_id = 3;
string split_id = 4;
FragmentType fragment_type = 5;
}
repeated ActorSplit actor_splits = 1;
}

message ListObjectDependenciesRequest {}

message ListObjectDependenciesResponse {
Expand Down Expand Up @@ -302,6 +321,7 @@ service StreamManagerService {
rpc ListTableFragmentStates(ListTableFragmentStatesRequest) returns (ListTableFragmentStatesResponse);
rpc ListFragmentDistribution(ListFragmentDistributionRequest) returns (ListFragmentDistributionResponse);
rpc ListActorStates(ListActorStatesRequest) returns (ListActorStatesResponse);
rpc ListActorSplits(ListActorSplitsRequest) returns (ListActorSplitsResponse);
rpc ListObjectDependencies(ListObjectDependenciesRequest) returns (ListObjectDependenciesResponse);
rpc ApplyThrottle(ApplyThrottleRequest) returns (ApplyThrottleResponse);
rpc Recover(RecoverRequest) returns (RecoverResponse);
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,6 @@ mod rw_views;
mod rw_worker_nodes;

mod rw_actor_id_to_ddl;
mod rw_actor_splits;
mod rw_fragment_id_to_ddl;
mod rw_worker_actor_count;
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::types::Fields;
use risingwave_frontend_macro::system_catalog;
use risingwave_pb::meta::list_actor_splits_response::{ActorSplit, FragmentType};

use crate::catalog::system_catalog::SysCatalogReaderImpl;
use crate::error::Result;

#[derive(Fields)]
#[primary_key(actor_id, split_id, source_id)]
struct RwActorSplit {
actor_id: i32,
split_id: String,
source_id: i32,
fragment_id: i32,
fragment_type: String,
}

impl From<ActorSplit> for RwActorSplit {
fn from(actor_split: ActorSplit) -> Self {
Self {
actor_id: actor_split.actor_id as _,
split_id: actor_split.split_id,
source_id: actor_split.source_id as _,
fragment_id: actor_split.fragment_id as _,
fragment_type: FragmentType::try_from(actor_split.fragment_type)
.unwrap_or(FragmentType::Unspecified)
.as_str_name()
.to_string(),
}
}
}

#[system_catalog(table, "rw_catalog.rw_actor_splits")]
async fn read_rw_actor_splits(reader: &SysCatalogReaderImpl) -> Result<Vec<RwActorSplit>> {
let actor_splits = reader.meta_client.list_actor_splits().await?;
Ok(actor_splits.into_iter().map(RwActorSplit::from).collect())
}
7 changes: 7 additions & 0 deletions src/frontend/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use risingwave_pb::hummock::{
HummockSnapshot,
};
use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
use risingwave_pb::meta::list_actor_states_response::ActorState;
use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution;
use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
Expand Down Expand Up @@ -68,6 +69,8 @@ pub trait FrontendMetaClient: Send + Sync {

async fn list_actor_states(&self) -> Result<Vec<ActorState>>;

async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>>;

async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>>;

async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>>;
Expand Down Expand Up @@ -178,6 +181,10 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
self.0.list_actor_states().await
}

async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
self.0.list_actor_splits().await
}

async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
self.0.list_object_dependencies().await
}
Expand Down
5 changes: 5 additions & 0 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use risingwave_pb::hummock::{
HummockSnapshot,
};
use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
use risingwave_pb::meta::list_actor_states_response::ActorState;
use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution;
use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
Expand Down Expand Up @@ -973,6 +974,10 @@ impl FrontendMetaClient for MockFrontendMetaClient {
Ok(vec![])
}

async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
Ok(vec![])
}

async fn list_object_dependencies(&self) -> RpcResult<Vec<PbObjectDependencies>> {
Ok(vec![])
}
Expand Down
92 changes: 91 additions & 1 deletion src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ use std::collections::{HashMap, HashSet};

use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_connector::source::SplitMetaData;
use risingwave_meta::manager::{LocalNotification, MetadataManager};
use risingwave_meta::model;
use risingwave_meta::model::ActorId;
use risingwave_meta::stream::ThrottleConfig;
use risingwave_meta::stream::{SourceManagerRunningInfo, ThrottleConfig};
use risingwave_meta_model_v2::{SourceId, StreamingParallelism};
use risingwave_pb::meta::cancel_creating_jobs_request::Jobs;
use risingwave_pb::meta::list_actor_splits_response::FragmentType;
use risingwave_pb::meta::list_table_fragments_response::{
ActorInfo, FragmentInfo, TableFragmentInfo,
};
Expand Down Expand Up @@ -420,4 +422,92 @@ impl StreamManagerService for StreamServiceImpl {
.await;
Ok(Response::new(RecoverResponse {}))
}

async fn list_actor_splits(
&self,
_request: Request<ListActorSplitsRequest>,
) -> Result<Response<ListActorSplitsResponse>, Status> {
match &self.metadata_manager {
MetadataManager::V1(_) => Ok(Response::new(ListActorSplitsResponse {
// TODO: remove this when v1 is removed
actor_splits: vec![],
})),
MetadataManager::V2(mgr) => {
let SourceManagerRunningInfo {
source_fragments,
backfill_fragments,
mut actor_splits,
} = self.stream_manager.source_manager.get_running_info().await;

let source_actors = mgr.catalog_controller.list_source_actors().await?;

let is_shared_source = mgr
.catalog_controller
.list_source_id_with_shared_types()
.await?;

let fragment_to_source: HashMap<_, _> =
source_fragments
.into_iter()
.flat_map(|(source_id, fragment_ids)| {
let source_type = if is_shared_source
.get(&(source_id as _))
.copied()
.unwrap_or(false)
{
FragmentType::SharedSource
} else {
FragmentType::NonSharedSource
};

fragment_ids
.into_iter()
.map(move |fragment_id| (fragment_id, (source_id, source_type)))
})
.chain(backfill_fragments.into_iter().flat_map(
|(source_id, fragment_ids)| {
fragment_ids.into_iter().flat_map(
move |(fragment_id, upstream_fragment_id)| {
[
(
fragment_id,
(source_id, FragmentType::SharedSourceBackfill),
),
(
upstream_fragment_id,
(source_id, FragmentType::SharedSource),
),
]
},
)
},
))
.collect();

let actor_splits = source_actors
.into_iter()
.flat_map(|(actor_id, fragment_id)| {
let (source_id, fragment_type) = fragment_to_source
.get(&(fragment_id as _))
.copied()
.unwrap_or_default();

actor_splits
.remove(&(actor_id as _))
.unwrap_or_default()
.into_iter()
.map(move |split| list_actor_splits_response::ActorSplit {
actor_id: actor_id as _,
source_id: source_id as _,
fragment_id: fragment_id as _,
split_id: split.id().to_string(),
fragment_type: fragment_type.into(),
})
})
.collect_vec();

Ok(Response::new(ListActorSplitsResponse { actor_splits }))
}
}
}
}
22 changes: 22 additions & 0 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2723,6 +2723,28 @@ impl CatalogController {
inner.list_sources().await
}

// Return a hashmap to distinguish whether each source is shared or not.
pub async fn list_source_id_with_shared_types(&self) -> MetaResult<HashMap<SourceId, bool>> {
let inner = self.inner.read().await;
let source_ids: Vec<(SourceId, Option<StreamSourceInfo>)> = Source::find()
.select_only()
.columns([source::Column::SourceId, source::Column::SourceInfo])
.into_tuple()
.all(&inner.db)
.await?;

Ok(source_ids
.into_iter()
.map(|(source_id, info)| {
(
source_id,
info.map(|info| info.to_protobuf().cdc_source_job)
.unwrap_or(false),
)
})
.collect())
}

pub async fn list_source_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<SourceId>> {
let inner = self.inner.read().await;
let source_ids: Vec<SourceId> = Source::find()
Expand Down
14 changes: 14 additions & 0 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,20 @@ impl CatalogController {
Ok(actor_locations)
}

pub async fn list_source_actors(&self) -> MetaResult<Vec<(ActorId, FragmentId)>> {
let inner = self.inner.read().await;

let source_actors: Vec<(ActorId, FragmentId)> = Actor::find()
.select_only()
.filter(actor::Column::Splits.is_not_null())
.columns([actor::Column::ActorId, actor::Column::FragmentId])
.into_tuple()
.all(&inner.db)
.await?;

Ok(source_actors)
}

pub async fn list_fragment_descs(&self) -> MetaResult<Vec<FragmentDesc>> {
let inner = self.inner.read().await;
let fragment_descs: Vec<FragmentDesc> = Fragment::find()
Expand Down
12 changes: 10 additions & 2 deletions src/meta/src/controller/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use risingwave_meta_model_v2::prelude::*;
use risingwave_meta_model_v2::{
actor, actor_dispatcher, connection, database, fragment, function, index, object,
object_dependency, schema, secret, sink, source, subscription, table, user, user_privilege,
view, ActorId, DataTypeArray, DatabaseId, FragmentId, I32Array, ObjectId, PrivilegeId,
SchemaId, SourceId, StreamNode, UserId, VnodeBitmap, WorkerId,
view, ActorId, ConnectorSplits, DataTypeArray, DatabaseId, FragmentId, I32Array, ObjectId,
PrivilegeId, SchemaId, SourceId, StreamNode, UserId, VnodeBitmap, WorkerId,
};
use risingwave_pb::catalog::{
PbConnection, PbFunction, PbIndex, PbSecret, PbSink, PbSource, PbSubscription, PbTable, PbView,
Expand Down Expand Up @@ -256,6 +256,14 @@ pub struct PartialActorLocation {
pub status: ActorStatus,
}

#[derive(Clone, DerivePartialModel, FromQueryResult)]
#[sea_orm(entity = "Actor")]
pub struct PartialActorSplits {
pub actor_id: ActorId,
pub fragment_id: FragmentId,
pub splits: Option<ConnectorSplits>,
}

#[derive(FromQueryResult)]
pub struct FragmentDesc {
pub fragment_id: FragmentId,
Expand Down
15 changes: 15 additions & 0 deletions src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ pub struct SourceManagerCore {
actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
}

pub struct SourceManagerRunningInfo {
pub source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
pub backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
}

impl SourceManagerCore {
fn new(
metadata_manager: MetadataManager,
Expand Down Expand Up @@ -1101,6 +1107,15 @@ impl SourceManager {
core.actor_splits.clone()
}

pub async fn get_running_info(&self) -> SourceManagerRunningInfo {
let core = self.core.lock().await;
SourceManagerRunningInfo {
source_fragments: core.source_fragments.clone(),
backfill_fragments: core.backfill_fragments.clone(),
actor_splits: core.actor_splits.clone(),
}
}

/// Checks whether the external source metadata has changed, and sends a split assignment command
/// if it has.
///
Expand Down
Loading

0 comments on commit adc30c0

Please sign in to comment.