Skip to content

Commit

Permalink
scheduler
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Sep 4, 2024
1 parent 8bd296a commit cf8bfd5
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 31 deletions.
6 changes: 6 additions & 0 deletions src/common/src/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,12 @@ impl From<&PbBuffer> for Bitmap {
}
}

impl From<PbBuffer> for Bitmap {
fn from(buf: PbBuffer) -> Self {
Self::from(&buf)
}
}

/// Bitmap iterator.
pub struct BitmapIter<'a> {
bits: Option<&'a [usize]>,
Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1612,6 +1612,7 @@ impl DdlController {

let parallelism = self.resolve_stream_parallelism(specified_parallelism, &cluster_info)?;

// TODO(var-vnode): use vnode count from config
const MAX_PARALLELISM: NonZeroUsize = NonZeroUsize::new(VirtualNode::COUNT).unwrap();

let parallelism_limited = parallelism > MAX_PARALLELISM;
Expand Down Expand Up @@ -1643,7 +1644,7 @@ impl DdlController {
// Otherwise, it defaults to FIXED based on deduction.
let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
(None, DefaultParallelism::Full) if parallelism_limited => {
tracing::warn!("Parallelism limited to 256 in ADAPTIVE mode");
tracing::warn!("Parallelism limited to {MAX_PARALLELISM} in ADAPTIVE mode");
TableParallelism::Adaptive
}
(None, DefaultParallelism::Full) => TableParallelism::Adaptive,
Expand Down
57 changes: 33 additions & 24 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use risingwave_common::catalog::TableId;
use risingwave_common::hash::{ActorMapping, VirtualNode};
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_meta_model_v2::{actor, fragment, ObjectId, StreamingParallelism};
use risingwave_pb::common::{Buffer, PbActorLocation, WorkerNode, WorkerType};
use risingwave_pb::common::{PbActorLocation, WorkerNode, WorkerType};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::table_fragments::actor_status::ActorState;
use risingwave_pb::meta::table_fragments::fragment::{
Expand Down Expand Up @@ -126,7 +126,8 @@ pub struct CustomActorInfo {
pub fragment_id: u32,
pub dispatcher: Vec<Dispatcher>,
pub upstream_actor_id: Vec<u32>,
pub vnode_bitmap: Option<Buffer>,
/// `None` if singleton.
pub vnode_bitmap: Option<Bitmap>,
}

impl From<&PbStreamActor> for CustomActorInfo {
Expand All @@ -145,7 +146,7 @@ impl From<&PbStreamActor> for CustomActorInfo {
fragment_id: *fragment_id,
dispatcher: dispatcher.clone(),
upstream_actor_id: upstream_actor_id.clone(),
vnode_bitmap: vnode_bitmap.clone(),
vnode_bitmap: vnode_bitmap.as_ref().map(Bitmap::from),
}
}
}
Expand Down Expand Up @@ -252,14 +253,21 @@ pub fn rebalance_actor_vnode(
let target_actor_count = actors.len() - actors_to_remove.len() + actors_to_create.len();
assert!(target_actor_count > 0);

// `vnode_bitmap` must be set on distributed fragments.
let vnode_count = actors[0]
.vnode_bitmap
.as_ref()
.expect("vnode bitmap unset")
.len();

// represents the balance of each actor, used to sort later
#[derive(Debug)]
struct Balance {
actor_id: ActorId,
balance: i32,
builder: BitmapBuilder,
}
let (expected, mut remain) = VirtualNode::COUNT.div_rem(&target_actor_count);
let (expected, mut remain) = vnode_count.div_rem(&target_actor_count);

tracing::debug!(
"expected {}, remain {}, prev actors {}, target actors {}",
Expand All @@ -271,11 +279,11 @@ pub fn rebalance_actor_vnode(

let (mut removed, mut rest): (Vec<_>, Vec<_>) = actors
.iter()
.filter_map(|actor| {
actor
.vnode_bitmap
.as_ref()
.map(|buffer| (actor.actor_id as ActorId, Bitmap::from(buffer)))
.map(|actor| {
(
actor.actor_id as ActorId,
actor.vnode_bitmap.clone().expect("vnode bitmap unset"),
)
})
.partition(|(actor_id, _)| actors_to_remove.contains(actor_id));

Expand All @@ -294,7 +302,7 @@ pub fn rebalance_actor_vnode(
builder
};

let (prev_expected, _) = VirtualNode::COUNT.div_rem(&actors.len());
let (prev_expected, _) = vnode_count.div_rem(&actors.len());

let prev_remain = removed
.iter()
Expand Down Expand Up @@ -327,7 +335,7 @@ pub fn rebalance_actor_vnode(
.map(|actor_id| Balance {
actor_id: *actor_id,
balance: -(expected as i32),
builder: BitmapBuilder::zeroed(VirtualNode::COUNT),
builder: BitmapBuilder::zeroed(vnode_count),
})
.collect_vec();

Expand Down Expand Up @@ -389,7 +397,7 @@ pub fn rebalance_actor_vnode(
let n = min(abs(src.balance), abs(dst.balance));

let mut moved = 0;
for idx in (0..VirtualNode::COUNT).rev() {
for idx in (0..vnode_count).rev() {
if moved >= n {
break;
}
Expand Down Expand Up @@ -605,7 +613,7 @@ impl ScaleController {
.flatten()
.map(|id| *id as _)
.collect(),
vnode_bitmap: vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
vnode_bitmap: vnode_bitmap.map(|b| Bitmap::from(&b.to_protobuf())),
};

actor_map.insert(actor_id as _, actor_info.clone());
Expand Down Expand Up @@ -657,7 +665,7 @@ impl ScaleController {
fragment_id: fragment_id as _,
dispatcher,
upstream_actor_id,
vnode_bitmap,
vnode_bitmap: vnode_bitmap.map(|b| b.to_protobuf()),
// todo, we need to fill this part
mview_definition: "".to_string(),
expr_context: expr_contexts
Expand Down Expand Up @@ -1409,9 +1417,7 @@ impl ScaleController {
if let Some(actor) = ctx.actor_map.get(actor_id) {
let bitmap = vnode_bitmap_updates.get(actor_id).unwrap();

if let Some(buffer) = actor.vnode_bitmap.as_ref() {
let prev_bitmap = Bitmap::from(buffer);

if let Some(prev_bitmap) = actor.vnode_bitmap.as_ref() {
if prev_bitmap.eq(bitmap) {
vnode_bitmap_updates.remove(actor_id);
}
Expand Down Expand Up @@ -1813,6 +1819,9 @@ impl ScaleController {
&self,
policy: TableResizePolicy,
) -> MetaResult<HashMap<FragmentId, WorkerReschedule>> {
// TODO(var-vnode): use vnode count from config
let max_parallelism = VirtualNode::COUNT;

let TableResizePolicy {
worker_ids,
table_parallelisms,
Expand Down Expand Up @@ -2096,12 +2105,12 @@ impl ScaleController {
}
FragmentDistributionType::Hash => match parallelism {
TableParallelism::Adaptive => {
if all_available_slots > VirtualNode::COUNT {
tracing::warn!("available parallelism for table {table_id} is larger than VirtualNode::COUNT, force limit to VirtualNode::COUNT");
// force limit to VirtualNode::COUNT
if all_available_slots > max_parallelism {
tracing::warn!("available parallelism for table {table_id} is larger than max parallelism, force limit to {max_parallelism}");
// force limit to `max_parallelism`
let target_worker_slots = schedule_units_for_slots(
&schedulable_worker_slots,
VirtualNode::COUNT,
max_parallelism,
table_id,
)?;

Expand All @@ -2123,10 +2132,10 @@ impl ScaleController {
}
}
TableParallelism::Fixed(mut n) => {
if n > VirtualNode::COUNT {
if n > max_parallelism {
// This should be unreachable, but we still intercept it to prevent accidental modifications.
tracing::warn!("parallelism {n} for table {table_id} is larger than VirtualNode::COUNT, force limit to VirtualNode::COUNT");
n = VirtualNode::COUNT
tracing::warn!("specified parallelism {n} for table {table_id} is larger than max parallelism, force limit to {max_parallelism}");
n = max_parallelism
}

let target_worker_slots =
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/stream/stream_graph/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ impl Scheduler {
assert_eq!(scheduled_worker_slots.len(), parallelism);

// Build the default hash mapping uniformly.
// TODO(var-vnode): use vnode count from config
let default_hash_mapping =
WorkerSlotMapping::build_from_ids(&scheduled_worker_slots, VirtualNode::COUNT);

Expand Down
10 changes: 4 additions & 6 deletions src/meta/src/stream/test_scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,14 @@ mod tests {
.iter()
.map(|actor_id| CustomActorInfo {
actor_id: *actor_id,
vnode_bitmap: actor_bitmaps
.get(actor_id)
.map(|bitmap| bitmap.to_protobuf()),
vnode_bitmap: actor_bitmaps.get(actor_id).cloned(),
..Default::default()
})
.collect()
}

fn check_affinity_for_scale_in(bitmap: &Bitmap, actor: &CustomActorInfo) {
let prev_bitmap = Bitmap::from(actor.vnode_bitmap.as_ref().unwrap());
let prev_bitmap = actor.vnode_bitmap.as_ref().unwrap();

for idx in 0..VirtualNode::COUNT_FOR_TEST {
if prev_bitmap.is_set(idx) {
Expand Down Expand Up @@ -125,7 +123,7 @@ mod tests {
.map(|actor| {
(
actor.actor_id as ActorId,
Bitmap::from(actor.vnode_bitmap.as_ref().unwrap()),
actor.vnode_bitmap.unwrap().clone(),
)
})
.collect();
Expand Down Expand Up @@ -226,7 +224,7 @@ mod tests {
}

let target_bitmap = result.get(&actor.actor_id).unwrap();
let prev_bitmap = Bitmap::from(actor.vnode_bitmap.as_ref().unwrap());
let prev_bitmap = actor.vnode_bitmap.as_ref().unwrap();
assert!(prev_bitmap.eq(target_bitmap));
}
}
Expand Down

0 comments on commit cf8bfd5

Please sign in to comment.