Skip to content

Commit

Permalink
Refactor rw_actor_splits.rs: Rename struct, change split_id type,…
Browse files Browse the repository at this point in the history
… add `From` impl; simplify `read_rw_actor_splits`
  • Loading branch information
shanicky committed Sep 27, 2024
1 parent 6e8efe2 commit ea4bae6
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,36 +12,35 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::types::{Fields, JsonbVal};
use risingwave_common::types::Fields;
use risingwave_frontend_macro::system_catalog;
use risingwave_pb::meta::list_actor_splits_response::ActorSplit;

use crate::catalog::system_catalog::SysCatalogReaderImpl;
use crate::error::Result;

#[derive(Fields)]
#[primary_key(actor_id, split_id, source_id)]
struct RwActorSplits {
struct RwActorSplit {
actor_id: i32,
split_id: i32,
split_id: String,
source_id: i32,
fragment_id: i32,
}

impl From<ActorSplit> for RwActorSplit {
fn from(actor_split: ActorSplit) -> Self {
Self {
actor_id: actor_split.actor_id as _,
split_id: actor_split.split_id,
source_id: actor_split.source_id as _,
fragment_id: actor_split.fragment_id as _,
}
}
}

#[system_catalog(table, "rw_catalog.rw_actor_splits")]
async fn read_rw_actor_splits(reader: &SysCatalogReaderImpl) -> Result<Vec<RwActorSplits>> {
async fn read_rw_actor_splits(reader: &SysCatalogReaderImpl) -> Result<Vec<RwActorSplit>> {
let actor_splits = reader.meta_client.list_actor_splits().await?;

// actor_id, fragment_id, source_id, split_id

// Ok(states
// .into_iter()
// .map(|state| RwActor {
// actor_id: state.actor_id as i32,
// fragment_id: state.fragment_id as i32,
// worker_id: state.worker_id as i32,
// state: state.state().as_str_name().into(),
// })
// .collect())

Ok(vec![])
Ok(actor_splits.into_iter().map(RwActorSplit::from).collect())
}
48 changes: 16 additions & 32 deletions src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,10 +424,11 @@ impl StreamManagerService for StreamServiceImpl {

async fn list_actor_splits(
&self,
request: Request<ListActorSplitsRequest>,
_request: Request<ListActorSplitsRequest>,
) -> Result<Response<ListActorSplitsResponse>, Status> {
match &self.metadata_manager {
MetadataManager::V1(_) => Ok(Response::new(ListActorSplitsResponse {
// TODO: remove this when v1 is removed
actor_splits: vec![],
})),
MetadataManager::V2(mgr) => {
Expand All @@ -437,20 +438,18 @@ impl StreamManagerService for StreamServiceImpl {
mut actor_splits,
} = self.stream_manager.source_manager.get_running_info().await;

let mut fragment_to_source = HashMap::new();

let fragment_sources =
let fragment_to_source: HashMap<_, _> =
source_fragments
.into_iter()
.flat_map(|(source_id, fragment_ids)| {
fragment_ids
.into_iter()
.map(|fragment_id| (fragment_id, source_id))
.map(move |fragment_id| (fragment_id, source_id))
})
.chain(backfill_fragments.into_iter().flat_map(
|(source_id, fragment_ids)| {
fragment_ids.into_iter().flat_map(
|(fragment_id, upstream_fragment_id)| {
move |(fragment_id, upstream_fragment_id)| {
[
(fragment_id, source_id),
(upstream_fragment_id, source_id),
Expand All @@ -459,43 +458,28 @@ impl StreamManagerService for StreamServiceImpl {
)
},
))
.collect_vec();

for (fragment_id, source_id) in fragment_sources {
if let Some(prev_source_id) = fragment_to_source.insert(fragment_id, source_id)
{
tracing::warn!(
"fragment {} has multiple sources: {} and {}",
fragment_id,
prev_source_id,
source_id
);
}
}
.collect();

let source_actors = mgr.catalog_controller.list_source_actors().await?;

let actor_splits = source_actors
.into_iter()
.flat_map(|(actor_id, fragment_id)| {
let splits = actor_splits
let source_id = fragment_to_source
.get(&(fragment_id as _))
.copied()
.map(|id| id as _)
.unwrap_or_default();

actor_splits
.remove(&(actor_id as _))
.unwrap_or_default()
.into_iter()
.map(|split| split.id())
.collect_vec();

splits
.into_iter()
.map(|split| list_actor_splits_response::ActorSplit {
.map(move |split| list_actor_splits_response::ActorSplit {
actor_id: actor_id as _,
source_id: fragment_to_source
.get(&(fragment_id as _))
.copied()
.map(|id| id as _)
.unwrap_or_default(),
source_id,
fragment_id: fragment_id as _,
split_id: split.to_string(),
split_id: split.id().to_string(),
})
})
.collect_vec();
Expand Down
6 changes: 3 additions & 3 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ use risingwave_pb::stream_plan::{
use sea_orm::sea_query::Expr;
use sea_orm::ActiveValue::Set;
use sea_orm::{
ColumnTrait, DbErr, EntityOrSelect, EntityTrait, JoinType, ModelTrait, PaginatorTrait,
QueryFilter, QuerySelect, RelationTrait, TransactionTrait, Value,
ColumnTrait, DbErr, EntityTrait, JoinType, ModelTrait, PaginatorTrait, QueryFilter,
QuerySelect, RelationTrait, TransactionTrait, Value,
};

use crate::controller::catalog::{CatalogController, CatalogControllerInner};
use crate::controller::utils::{
get_actor_dispatchers, get_fragment_mappings, rebuild_fragment_mapping_from_actors,
FragmentDesc, PartialActorLocation, PartialActorSplits, PartialFragmentStateTables,
FragmentDesc, PartialActorLocation, PartialFragmentStateTables,
};
use crate::manager::{ActorInfos, InflightFragmentInfo, LocalNotification};
use crate::model::{TableFragments, TableParallelism};
Expand Down

0 comments on commit ea4bae6

Please sign in to comment.