Skip to content

Commit

Permalink
feat(serverless backfill): proto changes (#17010)
Browse files Browse the repository at this point in the history
Co-authored-by: ArkBriar <[email protected]>
Co-authored-by: Shanicky Chen <[email protected]>
Co-authored-by: August <[email protected]>
  • Loading branch information
4 people authored Jul 24, 2024
1 parent f2fe044 commit ffc6d4a
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 2 deletions.
4 changes: 4 additions & 0 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ message WorkerNode {
optional uint64 started_at = 9;

uint32 parallelism = 10;

// Meta may assign labels to worker nodes to partition workload by label.
// This is used for serverless backfilling of materialized views.
string node_label = 11;
}

message Buffer {
Expand Down
10 changes: 10 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,16 @@ message DropSubscriptionResponse {
message CreateMaterializedViewRequest {
catalog.Table materialized_view = 1;
stream_plan.StreamFragmentGraph fragment_graph = 2;

// If SERVERLESS, the materialized view should be created using serverless backfill
// For that the controller will create a new compute node, which does backfilling and then is deleted.
// May alleviate pressure on the cluster during backfill process.
enum BackfillType {
UNSPECIFIED = 0;
REGULAR = 1;
SERVERLESS = 2;
}
BackfillType backfill = 3;
}

message CreateMaterializedViewResponse {
Expand Down
38 changes: 38 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ message TableFragments {

stream_plan.StreamContext ctx = 6;
TableParallelism parallelism = 7;

// Actors of a materialize view, sink, or table can only be scheduled on nodes with matching node_label.
string node_label = 8;

// If this is a materialized view: True if backfill is done, else false.
// If this is a regular table: Always true.
bool backfill_done = 9;
}

/// Worker slot mapping with fragment id, used for notification.
Expand Down Expand Up @@ -466,7 +473,10 @@ message SubscribeResponse {
}
common.Status status = 1;
Operation operation = 2;

// Catalog version
uint64 version = 3;

oneof info {
catalog.Database database = 4;
catalog.Schema schema = 5;
Expand Down Expand Up @@ -548,9 +558,37 @@ message TableParallelism {
}
}

// Changes a streaming job in place by overwriting its node_label.
// This may cause the re-scheduling of the streaming job actors.
message UpdateStreamingJobNodeLabelsRequest {
// Id of the materialized view, table, or sink which we want to update
uint32 id = 1;

// replace the node_label of the streaming job with a given id with below value
string node_label = 2;
}

// We do not need to add an explicit status field here, we can just use the RPC status
message UpdateStreamingJobNodeLabelsResponse {}

message GetServerlessStreamingJobsStatusRequest {}

// Descriptions of MVs and sinks
message GetServerlessStreamingJobsStatusResponse {
message Status {
uint32 table_id = 1;
string node_label = 2;
bool backfill_done = 3;
}

repeated Status streaming_job_statuses = 1;
}

service ScaleService {
rpc GetClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse);
rpc Reschedule(RescheduleRequest) returns (RescheduleResponse);
rpc UpdateStreamingJobNodeLabels(UpdateStreamingJobNodeLabelsRequest) returns (UpdateStreamingJobNodeLabelsResponse);
rpc GetServerlessStreamingJobsStatus(GetServerlessStreamingJobsStatusRequest) returns (GetServerlessStreamingJobsStatusResponse);
}

message MembersRequest {}
Expand Down
19 changes: 17 additions & 2 deletions src/meta/service/src/scale_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ use risingwave_meta_model_v2::FragmentId;
use risingwave_pb::common::WorkerType;
use risingwave_pb::meta::scale_service_server::ScaleService;
use risingwave_pb::meta::{
GetClusterInfoRequest, GetClusterInfoResponse, PbWorkerReschedule, RescheduleRequest,
RescheduleResponse,
GetClusterInfoRequest, GetClusterInfoResponse, GetServerlessStreamingJobsStatusRequest,
GetServerlessStreamingJobsStatusResponse, PbWorkerReschedule, RescheduleRequest,
RescheduleResponse, UpdateStreamingJobNodeLabelsRequest, UpdateStreamingJobNodeLabelsResponse,
};
use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
use tonic::{Request, Response, Status};
Expand Down Expand Up @@ -219,4 +220,18 @@ impl ScaleService for ScaleServiceImpl {
revision: next_revision.into(),
}))
}

async fn update_streaming_job_node_labels(
&self,
_request: Request<UpdateStreamingJobNodeLabelsRequest>,
) -> Result<Response<UpdateStreamingJobNodeLabelsResponse>, Status> {
todo!()
}

async fn get_serverless_streaming_jobs_status(
&self,
_request: Request<GetServerlessStreamingJobsStatusRequest>,
) -> Result<Response<GetServerlessStreamingJobsStatusResponse>, Status> {
todo!()
}
}
2 changes: 2 additions & 0 deletions src/meta/src/controller/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ impl From<WorkerInfo> for PbWorkerNode {
transactional_id: info.0.transaction_id.map(|id| id as _),
resource: info.2.resource,
started_at: info.2.started_at,
node_label: "".to_string(),
}
}
}
Expand Down Expand Up @@ -465,6 +466,7 @@ fn meta_node_info(host: &str, started_at: Option<u64>) -> PbWorkerNode {
total_cpu_cores: total_cpu_available() as _,
}),
started_at,
node_label: "".to_string(),
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,8 @@ impl CatalogController {
}
.into(),
),
node_label: "".to_string(),
backfill_done: true,
};

Ok(table_fragments)
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/manager/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ impl ClusterManager {
// resource doesn't need persist
resource: None,
started_at: None,
node_label: "".to_string(),
};

let mut worker = Worker::from_protobuf(worker_node.clone());
Expand Down Expand Up @@ -771,6 +772,7 @@ fn meta_node_info(host: &str, started_at: Option<u64>) -> WorkerNode {
}),
started_at,
parallelism: 0,
node_label: "".to_string(),
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ impl MetadataModel for TableFragments {
actor_splits: build_actor_connector_splits(&self.actor_splits),
ctx: Some(self.ctx.to_protobuf()),
parallelism: Some(self.assigned_parallelism.into()),
node_label: "".to_string(),
backfill_done: true,
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use risingwave_pb::cloud_service::*;
use risingwave_pb::common::{HostAddress, WorkerNode, WorkerType};
use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
use risingwave_pb::ddl_service::alter_owner_request::Object;
use risingwave_pb::ddl_service::create_materialized_view_request::PbBackfillType;
use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
use risingwave_pb::ddl_service::drop_table_request::SourceId;
use risingwave_pb::ddl_service::*;
Expand Down Expand Up @@ -353,6 +354,7 @@ impl MetaClient {
let request = CreateMaterializedViewRequest {
materialized_view: Some(table),
fragment_graph: Some(graph),
backfill: PbBackfillType::Regular as _,
};
let resp = self.inner.create_materialized_view(request).await?;
// TODO: handle error in `resp.status` here
Expand Down

0 comments on commit ffc6d4a

Please sign in to comment.