Skip to content

Commit

Permalink
Merge branch 'main' into yiming/more-frontend-hummock-info
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Sep 18, 2024
2 parents e4b390c + 06d5cde commit 8fff90b
Show file tree
Hide file tree
Showing 73 changed files with 737 additions and 2,903 deletions.
38 changes: 38 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ aws-smithy-types = { version = "1", default-features = false, features = [
aws-endpoint = "0.60"
aws-types = "1"
axum = "=0.7.4" # TODO: 0.7.5+ does not work with current toolchain
axum-extra = "0.9"
etcd-client = { package = "madsim-etcd-client", version = "0.6" }
futures-async-stream = "0.2.9"
hytra = "0.1"
Expand Down
6 changes: 6 additions & 0 deletions ci/scripts/backwards-compat-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ ENABLE_BUILD_RUST=$ENABLE_BUILD
# Use target/debug for simplicity.
ENABLE_RELEASE_PROFILE=false
ENABLE_PYTHON_UDF=true
ENABLE_JS_UDF=true
EOF

# See https://github.com/risingwavelabs/risingwave/pull/15448
Expand All @@ -100,6 +102,10 @@ setup_old_cluster() {
set -e
echo "Failed to download ${OLD_VERSION} from github releases, build from source later during \`risedev d\`"
configure_rw "$OLD_VERSION" true
elif [[ $OLD_VERSION = '1.10.1' || $OLD_VERSION = '1.10.0' ]]; then
set -e
echo "1.10.x has older openssl version, build from source later during \`risedev d\`"
configure_rw "$OLD_VERSION" true
else
set -e
tar -xvf risingwave-v"${OLD_VERSION}"-x86_64-unknown-linux.tar.gz
Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ cargo build \
--timings


artifacts=(risingwave sqlsmith compaction-test risingwave_regress_test risingwave_e2e_extended_mode_test risedev-dev delete-range-test)
artifacts=(risingwave sqlsmith compaction-test risingwave_regress_test risingwave_e2e_extended_mode_test risedev-dev)

echo "--- Show link info"
ldd target/"$profile"/risingwave
Expand Down
2 changes: 2 additions & 0 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,8 @@ steps:
run: source-test-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
environment:
- BUILDKITE_BRANCH
- ./ci/plugins/upload-failure-logs
matrix:
setup:
Expand Down
2 changes: 2 additions & 0 deletions ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,8 @@ steps:
run: source-test-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
environment:
- BUILDKITE_BRANCH
- ./ci/plugins/upload-failure-logs
matrix:
setup:
Expand Down
22 changes: 17 additions & 5 deletions e2e_test/backwards-compat-tests/scripts/utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ check_version() {
local VERSION=$1
local raw_version=$(run_sql "SELECT version();")
echo "--- Version"
echo "$raw_version"
echo "raw_version: $raw_version"
local version=$(echo $raw_version | grep -i risingwave | sed 's/^.*risingwave-\([0-9]*\.[0-9]*\.[0-9]\).*$/\1/i')
if [[ "$version" != "$VERSION" ]]; then
echo "Version mismatch, expected $VERSION, got $version"
Expand Down Expand Up @@ -133,12 +133,24 @@ get_old_version() {

# Then we sort them in descending order.
echo "--- VERSIONS"
local sorted_versions=$(echo -e "$tags" | sort -t '.' -n)
local sorted_versions=$(echo -e "$tags" | sort -V)
echo "$sorted_versions"

# We handle the edge case where the current branch is the one being released.
# If so, we need to prune it from the list.
# We cannot simply use 'git branch --show-current', because buildkite checks out with the commit,
# rather than branch. So the current state is detached.
# Instead we rely on BUILDKITE_BRANCH, provided by buildkite.
local current_branch=$(echo "$BUILDKITE_BRANCH" | tr -d 'v')
echo "--- CURRENT BRANCH: $current_branch"

echo "--- PRUNED VERSIONS"
local pruned_versions=$(echo -e "$sorted_versions" | grep -v "$current_branch")
echo "$pruned_versions"

# Then we take the Nth latest version.
# We set $OLD_VERSION to this.
OLD_VERSION=$(echo -e "$sorted_versions" | tail -n $VERSION_OFFSET | head -1)
OLD_VERSION=$(echo -e "$pruned_versions" | tail -n $VERSION_OFFSET | head -1)
}

get_new_version() {
Expand Down Expand Up @@ -182,7 +194,7 @@ seed_old_cluster() {
cp -r e2e_test/tpch/* $TEST_DIR/tpch

./risedev clean-data
./risedev d full-without-monitoring && rm .risingwave/log/*
ENABLE_PYTHON_UDF=1 ENABLE_JS_UDF=1 ./risedev d full-without-monitoring && rm .risingwave/log/*

check_version "$OLD_VERSION"

Expand Down Expand Up @@ -240,7 +252,7 @@ seed_old_cluster() {

validate_new_cluster() {
echo "--- Start cluster on latest"
./risedev d full-without-monitoring
ENABLE_PYTHON_UDF=1 ENABLE_JS_UDF=1 ./risedev d full-without-monitoring

echo "--- Wait ${RECOVERY_DURATION}s for Recovery on Old Cluster Data"
sleep $RECOVERY_DURATION
Expand Down
13 changes: 4 additions & 9 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,6 @@ import "stream_plan.proto";
option java_package = "com.risingwave.proto";
option optimize_for = SPEED;

message BuildActorInfo {
stream_plan.StreamActor actor = 1;
message SubscriptionIds {
repeated uint32 subscription_ids = 1;
}
map<uint32, SubscriptionIds> related_subscriptions = 2;
}

message InjectBarrierRequest {
string request_id = 1;
stream_plan.Barrier barrier = 2;
Expand All @@ -35,7 +27,9 @@ message InjectBarrierRequest {
repeated uint32 actor_ids_to_pre_sync_barrier_mutation = 7;

repeated common.ActorInfo broadcast_info = 8;
repeated BuildActorInfo actors_to_build = 9;
repeated stream_plan.StreamActor actors_to_build = 9;
repeated stream_plan.SubscriptionUpstreamInfo subscriptions_to_add = 10;
repeated stream_plan.SubscriptionUpstreamInfo subscriptions_to_remove = 11;
}

message BarrierCompleteResponse {
Expand Down Expand Up @@ -74,6 +68,7 @@ message WaitEpochCommitResponse {
message StreamingControlStreamRequest {
message InitRequest {
uint64 version_id = 1;
repeated stream_plan.SubscriptionUpstreamInfo subscriptions = 2;
}

message RemovePartialGraphRequest {
Expand Down
2 changes: 2 additions & 0 deletions src/common/common_service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ normal = ["workspace-hack"]
[dependencies]
async-trait = "0.1"
axum = { workspace = true }
axum-extra = { workspace = true, features = ["query"] }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
http = "1"
prometheus = { version = "0.13" }
risingwave_common = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
serde = { version = "1", features = ["derive"] }
thiserror = "1"
thiserror-ext = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio", features = ["rt", "rt-multi-thread", "sync", "macros", "time", "signal"] }
Expand Down
60 changes: 53 additions & 7 deletions src/common/common_service/src/metrics_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,34 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::ops::Deref;
use std::sync::OnceLock;

use axum::body::Body;
use axum::handler::{Handler, HandlerWithoutStateExt};
use axum::response::{IntoResponse, Response};
use axum::{Extension, Router};
use axum::Extension;
use axum_extra::extract::Query as ExtraQuery;
use prometheus::{Encoder, Registry, TextEncoder};
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
use serde::Deserialize;
use thiserror_ext::AsReport;
use tokio::net::TcpListener;
use tower_http::add_extension::AddExtensionLayer;
use tower_http::compression::CompressionLayer;
use tracing::{error, info, warn};

pub struct MetricsManager {}
/// The filter for metrics scrape handler. See [`MetricsManager::metrics`] for more details.
#[derive(Debug, Deserialize)]
struct Filter {
#[serde(default)]
include: HashSet<String>,
#[serde(default)]
exclude: HashSet<String>,
}

pub struct MetricsManager;

impl MetricsManager {
pub fn boot_metrics_service(listen_addr: String) {
Expand All @@ -41,12 +54,12 @@ impl MetricsManager {
listen_addr
);

let service = Router::new()
.fallback(Self::metrics_service)
let service = Self::metrics
.layer(AddExtensionLayer::new(
GLOBAL_METRICS_REGISTRY.deref().clone(),
))
.layer(CompressionLayer::new());
.layer(CompressionLayer::new())
.into_make_service();

let serve_future =
axum::serve(TcpListener::bind(&listen_addr).await.unwrap(), service);
Expand All @@ -64,11 +77,44 @@ impl MetricsManager {
}
}

/// Gather metrics from the global registry and encode them in the Prometheus text format.
///
/// The handler accepts the following query parameters to filter metrics. Note that `include`
/// and `exclude` should not be used together.
///
/// - `/metrics` (without filter)
/// - `/metrics?include=foo` (include one metric)
/// - `/metrics?include=foo&include=bar` (include multiple metrics)
/// - `/metrics?exclude=foo&exclude=bar` (include all but foo and bar)
///
/// One can specify parameters by configuring Prometheus scrape config like below:
/// ```yaml
/// - job_name: compute-node
/// params:
/// include: ["foo", "bar"]
/// ```
#[expect(clippy::unused_async, reason = "required by service_fn")]
async fn metrics_service(Extension(registry): Extension<Registry>) -> impl IntoResponse {
async fn metrics(
ExtraQuery(Filter { include, exclude }): ExtraQuery<Filter>,
Extension(registry): Extension<Registry>,
) -> impl IntoResponse {
let mut mf = registry.gather();

// Filter metrics by name.
// TODO: can we avoid gathering them all?
if !include.is_empty() && !exclude.is_empty() {
return Response::builder()
.status(400)
.body("should not specify both include and exclude".into())
.unwrap();
} else if !include.is_empty() {
mf.retain(|fam| include.contains(fam.get_name()));
} else if !exclude.is_empty() {
mf.retain(|fam| !exclude.contains(fam.get_name()));
}

let encoder = TextEncoder::new();
let mut buffer = vec![];
let mf = registry.gather();
encoder.encode(&mf, &mut buffer).unwrap();

Response::builder()
Expand Down
15 changes: 10 additions & 5 deletions src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use risingwave_hummock_sdk::to_committed_batch_query_epoch;
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_stream::common::table::state_table::StateTable;
use risingwave_stream::common::table::test_utils::gen_pbtable;
use risingwave_stream::error::StreamResult;
use risingwave_stream::executor::monitor::StreamingMetrics;
use risingwave_stream::executor::test_utils::MockSource;
Expand Down Expand Up @@ -211,12 +212,16 @@ async fn test_cdc_backfill() -> StreamResult<()> {
ColumnDesc::unnamed(ColumnId::from(4), state_schema[4].data_type.clone()),
];

let state_table = StateTable::new_without_distribution(
let state_table = StateTable::from_table_catalog(
&gen_pbtable(
TableId::from(0x42),
column_descs,
vec![OrderType::ascending()],
vec![0],
0,
),
memory_state_store.clone(),
TableId::from(0x42),
column_descs.clone(),
vec![OrderType::ascending()],
vec![0_usize],
None,
)
.await;

Expand Down
15 changes: 10 additions & 5 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::panic_store::PanicStateStore;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_stream::common::table::state_table::StateTable;
use risingwave_stream::common::table::test_utils::gen_pbtable;
use risingwave_stream::error::StreamResult;
use risingwave_stream::executor::dml::DmlExecutor;
use risingwave_stream::executor::monitor::StreamingMetrics;
Expand Down Expand Up @@ -448,12 +449,16 @@ async fn test_row_seq_scan() -> StreamResult<()> {
ColumnDesc::unnamed(ColumnId::from(2), schema[2].data_type.clone()),
];

let mut state = StateTable::new_without_distribution(
let mut state = StateTable::from_table_catalog(
&gen_pbtable(
TableId::from(0x42),
column_descs.clone(),
vec![OrderType::ascending()],
vec![0],
0,
),
memory_state_store.clone(),
TableId::from(0x42),
column_descs.clone(),
vec![OrderType::ascending()],
vec![0_usize],
None,
)
.await;
let table = StorageTable::for_test(
Expand Down
8 changes: 0 additions & 8 deletions src/ctl/src/cmd_impl/hummock/sst_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,6 @@ pub async fn sst_dump_via_sstable_store(
println!("Bloom Filter Size: {}", sstable_meta.bloom_filter.len());
println!("Key Count: {}", sstable_meta.key_count);
println!("Version: {}", sstable_meta.version);
println!(
"Monotonoic Deletes Count: {}",
sstable_meta.monotonic_tombstone_events.len()
);
for monotonic_delete in &sstable_meta.monotonic_tombstone_events {
println!("\tevent key: {:?}", monotonic_delete.event_key);
println!("\tnew epoch: {:?}", monotonic_delete.new_epoch);
}

println!("Block Count: {}", sstable.block_count());
for i in 0..sstable.block_count() {
Expand Down
Loading

0 comments on commit 8fff90b

Please sign in to comment.