diff --git a/Cargo.lock b/Cargo.lock
index 1c074e276553a..1e711a211887b 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -8685,6 +8685,7 @@ dependencies = [
"strum_macros 0.26.1",
"task_stats_alloc",
"tempfile",
+ "thiserror-ext",
"tikv-jemallocator",
"tracing",
"vergen",
@@ -9195,6 +9196,7 @@ dependencies = [
"async-trait",
"auto_enums",
"chrono",
+ "chrono-tz",
"criterion",
"expect-test",
"fancy-regex",
diff --git a/Makefile.toml b/Makefile.toml
index 983b304d74e51..8820acf67c7bd 100644
--- a/Makefile.toml
+++ b/Makefile.toml
@@ -757,10 +757,10 @@ tmux list-windows -t risedev -F "#{window_name} #{pane_id}" \
if [[ -n $(tmux list-windows -t risedev | grep kafka) ]];
then
echo "kill kafka"
- kill_kafka
+ kill_kafka || true
echo "kill zookeeper"
- kill_zookeeper
+ kill_zookeeper || true
# Kill their tmux sessions
tmux list-windows -t risedev -F "#{pane_id}" | xargs -I {} tmux send-keys -t {} C-c C-d
diff --git a/README.md b/README.md
index 44443cfab8282..1611af1815175 100644
--- a/README.md
+++ b/README.md
@@ -72,7 +72,7 @@ Don’t have Docker? Learn how to install RisingWave on Mac, Ubuntu, and other e
## Production deployments
-For **single-node deployment**, please refer to [Docker Compose](https://docs.risingwave.com/docs/current/risingwave-trial/?method=docker-compose).
+For **single-node deployment**, please refer to [Docker Compose](https://docs.risingwave.com/docs/current/risingwave-docker-compose/).
For **distributed deployment**, please refer to [Kubernetes with Helm](https://docs.risingwave.com/docs/current/risingwave-k8s-helm/) or [Kubernetes with Operator](https://docs.risingwave.com/docs/current/risingwave-kubernetes/).
diff --git a/backwards-compat-tests/scripts/utils.sh b/backwards-compat-tests/scripts/utils.sh
index 1afbf08dd4441..5990aac026077 100644
--- a/backwards-compat-tests/scripts/utils.sh
+++ b/backwards-compat-tests/scripts/utils.sh
@@ -103,19 +103,21 @@ insert_json_kafka() {
local JSON=$1
echo "$JSON" | "$KAFKA_PATH"/bin/kafka-console-producer.sh \
--topic backwards_compat_test_kafka_source \
- --bootstrap-server localhost:29092
+ --bootstrap-server localhost:29092 \
+ --property "parse.key=true" \
+ --property "key.separator=,"
}
seed_json_kafka() {
- insert_json_kafka '{"timestamp": "2023-07-28 07:11:00", "user_id": 1, "page_id": 1, "action": "gtrgretrg"}'
- insert_json_kafka '{"timestamp": "2023-07-28 07:11:00", "user_id": 2, "page_id": 1, "action": "fsdfgerrg"}'
- insert_json_kafka '{"timestamp": "2023-07-28 07:11:00", "user_id": 3, "page_id": 1, "action": "sdfergtth"}'
- insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 4, "page_id": 2, "action": "erwerhghj"}'
- insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 5, "page_id": 2, "action": "kiku7ikkk"}'
- insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 6, "page_id": 3, "action": "6786745ge"}'
- insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 7, "page_id": 3, "action": "fgbgfnyyy"}'
- insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 8, "page_id": 4, "action": "werwerwwe"}'
- insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 9, "page_id": 4, "action": "yjtyjtyyy"}'
+ insert_json_kafka '{"user_id": 1},{"timestamp": "2023-07-28 07:11:00", "user_id": 1, "page_id": 1, "action": "gtrgretrg"}'
+ insert_json_kafka '{"user_id": 2},{"timestamp": "2023-07-28 07:11:00", "user_id": 2, "page_id": 1, "action": "fsdfgerrg"}'
+ insert_json_kafka '{"user_id": 3},{"timestamp": "2023-07-28 07:11:00", "user_id": 3, "page_id": 1, "action": "sdfergtth"}'
+ insert_json_kafka '{"user_id": 4},{"timestamp": "2023-07-28 06:54:00", "user_id": 4, "page_id": 2, "action": "erwerhghj"}'
+ insert_json_kafka '{"user_id": 5},{"timestamp": "2023-07-28 06:54:00", "user_id": 5, "page_id": 2, "action": "kiku7ikkk"}'
+ insert_json_kafka '{"user_id": 6},{"timestamp": "2023-07-28 06:54:00", "user_id": 6, "page_id": 3, "action": "6786745ge"}'
+ insert_json_kafka '{"user_id": 7},{"timestamp": "2023-07-28 06:54:00", "user_id": 7, "page_id": 3, "action": "fgbgfnyyy"}'
+ insert_json_kafka '{"user_id": 8},{"timestamp": "2023-07-28 06:54:00", "user_id": 8, "page_id": 4, "action": "werwerwwe"}'
+ insert_json_kafka '{"user_id": 9},{"timestamp": "2023-07-28 06:54:00", "user_id": 9, "page_id": 4, "action": "yjtyjtyyy"}'
}
# https://stackoverflow.com/a/4024263
@@ -225,6 +227,12 @@ seed_old_cluster() {
create_kafka_topic
seed_json_kafka
sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/kafka/seed.slt"
+ # use the old syntax for version at most 1.5.4
+ if version_le "$OLD_VERSION" "1.5.4" ; then
+ sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/kafka/upsert/deprecate_upsert.slt"
+ else
+ sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/kafka/upsert/include_key_as.slt"
+ fi
echo "--- KAFKA TEST: wait 5s for kafka to process data"
sleep 5
diff --git a/backwards-compat-tests/slt/kafka/upsert/deprecate_upsert.slt b/backwards-compat-tests/slt/kafka/upsert/deprecate_upsert.slt
new file mode 100644
index 0000000000000..55cfce886455d
--- /dev/null
+++ b/backwards-compat-tests/slt/kafka/upsert/deprecate_upsert.slt
@@ -0,0 +1,16 @@
+statement ok
+CREATE TABLE IF NOT EXISTS kafka_table
+(
+ action varchar,
+ user_id integer,
+ obj_id integer,
+ name varchar,
+ page_id integer,
+ age integer
+)
+WITH (
+ connector='kafka',
+ topic='backwards_compat_test_kafka_source',
+ properties.bootstrap.server='localhost:29092',
+ scan.startup.mode='earliest',
+) FORMAT UPSERT ENCODE JSON;
\ No newline at end of file
diff --git a/backwards-compat-tests/slt/kafka/upsert/include_key_as.slt b/backwards-compat-tests/slt/kafka/upsert/include_key_as.slt
new file mode 100644
index 0000000000000..36ef426574223
--- /dev/null
+++ b/backwards-compat-tests/slt/kafka/upsert/include_key_as.slt
@@ -0,0 +1,18 @@
+statement ok
+CREATE TABLE IF NOT EXISTS kafka_table
+(
+ action varchar,
+ user_id integer,
+ obj_id integer,
+ name varchar,
+ page_id integer,
+ age integer,
+ primary key (_rw_key)
+)
+INCLUDE key as _rw_key
+WITH (
+ connector='kafka',
+ topic='backwards_compat_test_kafka_source',
+ properties.bootstrap.server='localhost:29092',
+ scan.startup.mode='earliest',
+) FORMAT UPSERT ENCODE JSON;
\ No newline at end of file
diff --git a/backwards-compat-tests/slt/kafka/validate_restart.slt b/backwards-compat-tests/slt/kafka/validate_restart.slt
index 7058b118f4d20..6d853007b9829 100644
--- a/backwards-compat-tests/slt/kafka/validate_restart.slt
+++ b/backwards-compat-tests/slt/kafka/validate_restart.slt
@@ -50,3 +50,16 @@ werwerwwe 8 NULL NULL 4 NULL
yjtyjtyyy 9 NULL NULL 4 NULL
yjtyjtyyy 9 NULL NULL 4 NULL
+# kafka_table should do the upsert and overwrite the existing records
+query I rowsort
+SELECT action, user_id, obj_id, name, page_id, age, _rw_key FROM kafka_table;
+----
+6786745ge 6 NULL NULL 3 NULL \x7b22757365725f6964223a20367d
+erwerhghj 4 NULL NULL 2 NULL \x7b22757365725f6964223a20347d
+fgbgfnyyy 7 NULL NULL 3 NULL \x7b22757365725f6964223a20377d
+fsdfgerrg 2 NULL NULL 1 NULL \x7b22757365725f6964223a20327d
+gtrgretrg 1 NULL NULL 1 NULL \x7b22757365725f6964223a20317d
+kiku7ikkk 5 NULL NULL 2 NULL \x7b22757365725f6964223a20357d
+sdfergtth 3 NULL NULL 1 NULL \x7b22757365725f6964223a20337d
+werwerwwe 8 NULL NULL 4 NULL \x7b22757365725f6964223a20387d
+yjtyjtyyy 9 NULL NULL 4 NULL \x7b22757365725f6964223a20397d
diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml
index 4a9f2970b84c7..db017be647376 100644
--- a/ci/docker-compose.yml
+++ b/ci/docker-compose.yml
@@ -88,10 +88,27 @@ services:
- message_queue
- elasticsearch
- clickhouse-server
- - pulsar
+ - redis-server
+ - pulsar-server
+ - cassandra-server
+ - starrocks-fe-server
+ - starrocks-be-server
volumes:
- ..:/risingwave
+ sink-doris-env:
+ image: public.ecr.aws/x5u3w5h6/rw-build-env:v20231109
+ depends_on:
+ - doris-fe-server
+ - doris-be-server
+ volumes:
+ - ..:/risingwave
+ command: >
+ sh -c "sudo sysctl -w vm.max_map_count=2000000"
+ networks:
+ mynetwork:
+ ipv4_address: 172.121.0.4
+
rw-build-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240213
volumes:
@@ -159,10 +176,96 @@ services:
expose:
- 9009
-# Temporary workaround for json schema registry test since redpanda only supports
-# protobuf/avro schema registry. Should be removed after the support.
-# Related tracking issue:
-# https://github.com/redpanda-data/redpanda/issues/1878
+ redis-server:
+ container_name: redis-server
+ image: 'redis:latest'
+ expose:
+ - 6379
+ ports:
+ - 6378:6379
+ healthcheck:
+ test: ["CMD", "redis-cli", "ping"]
+ interval: 5s
+ timeout: 30s
+ retries: 50
+
+ doris-fe-server:
+ platform: linux/amd64
+ image: apache/doris:2.0.0_alpha-fe-x86_64
+ hostname: doris-fe-server
+ command: >
+ sh -c "sudo sysctl -w vm.max_map_count=2000000"
+ environment:
+ - FE_SERVERS=fe1:172.121.0.2:9010
+ - FE_ID=1
+ ports:
+ - "8030:8030"
+ - "9030:9030"
+ networks:
+ mynetwork:
+ ipv4_address: 172.121.0.2
+
+ doris-be-server:
+ platform: linux/amd64
+ image: apache/doris:2.0.0_alpha-be-x86_64
+ hostname: doris-be-server
+ command: >
+ sh -c "sudo sysctl -w vm.max_map_count=2000000"
+ environment:
+ - FE_SERVERS=fe1:172.121.0.2:9010
+ - BE_ADDR=172.121.0.3:9050
+ depends_on:
+ - doris-fe-server
+ ports:
+ - "9050:9050"
+ networks:
+ mynetwork:
+ ipv4_address: 172.121.0.3
+
+ cassandra-server:
+ container_name: cassandra-server
+ image: cassandra:4.0
+ ports:
+ - 9042:9042
+ environment:
+ - CASSANDRA_CLUSTER_NAME=cloudinfra
+
+ starrocks-fe-server:
+ container_name: starrocks-fe-server
+ image: starrocks/fe-ubuntu:3.1.7
+ hostname: starrocks-fe-server
+ command:
+ /opt/starrocks/fe/bin/start_fe.sh
+ ports:
+ - 28030:8030
+ - 29020:9020
+ - 29030:9030
+ healthcheck:
+ test: ["CMD", "curl", "-f", "http://localhost:9030"]
+ interval: 5s
+ timeout: 5s
+ retries: 30
+
+ starrocks-be-server:
+ image: starrocks/be-ubuntu:3.1.7
+ command:
+ - /bin/bash
+ - -c
+ - |
+ sleep 15s; mysql --connect-timeout 2 -h starrocks-fe-server -P9030 -uroot -e "alter system add backend \"starrocks-be-server:9050\";"
+ /opt/starrocks/be/bin/start_be.sh
+ ports:
+ - 28040:8040
+ - 29050:9050
+ hostname: starrocks-be-server
+ container_name: starrocks-be-server
+ depends_on:
+ - starrocks-fe-server
+
+# # Temporary workaround for json schema registry test since redpanda only supports
+# # protobuf/avro schema registry. Should be removed after the support.
+# # Related tracking issue:
+# # https://github.com/redpanda-data/redpanda/issues/1878
zookeeper:
container_name: zookeeper
image: confluentinc/cp-zookeeper:latest
@@ -201,8 +304,8 @@ services:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9093,PLAINTEXT_INTERNAL://localhost:29093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
- pulsar:
- container_name: pulsar
+ pulsar-server:
+ container_name: pulsar-server
image: apachepulsar/pulsar:latest
command: bin/pulsar standalone
ports:
@@ -216,3 +319,9 @@ services:
interval: 5s
timeout: 5s
retries: 5
+networks:
+ mynetwork:
+ ipam:
+ config:
+ - subnet: 172.121.80.0/16
+ default:
diff --git a/ci/scripts/e2e-cassandra-sink-test.sh b/ci/scripts/e2e-cassandra-sink-test.sh
new file mode 100755
index 0000000000000..c393d510d19a2
--- /dev/null
+++ b/ci/scripts/e2e-cassandra-sink-test.sh
@@ -0,0 +1,65 @@
+#!/usr/bin/env bash
+
+# Exits as soon as any line fails.
+set -euo pipefail
+
+source ci/scripts/common.sh
+
+# prepare environment
+export CONNECTOR_LIBS_PATH="./connector-node/libs"
+
+while getopts 'p:' opt; do
+ case ${opt} in
+ p )
+ profile=$OPTARG
+ ;;
+ \? )
+ echo "Invalid Option: -$OPTARG" 1>&2
+ exit 1
+ ;;
+ : )
+ echo "Invalid option: $OPTARG requires an argument" 1>&2
+ ;;
+ esac
+done
+shift $((OPTIND -1))
+
+download_and_prepare_rw "$profile" source
+
+echo "--- Download connector node package"
+buildkite-agent artifact download risingwave-connector.tar.gz ./
+mkdir ./connector-node
+tar xf ./risingwave-connector.tar.gz -C ./connector-node
+
+echo "--- starting risingwave cluster"
+cargo make ci-start ci-sink-test
+sleep 1
+
+echo "--- create cassandra table"
+curl https://downloads.apache.org/cassandra/4.1.3/apache-cassandra-4.1.3-bin.tar.gz --output apache-cassandra-4.1.3-bin.tar.gz
+tar xfvz apache-cassandra-4.1.3-bin.tar.gz
+cd apache-cassandra-4.1.3/bin
+export CQLSH_HOST=cassandra-server
+export CQLSH_PORT=9042
+./cqlsh -e "CREATE KEYSPACE demo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};use demo;
+CREATE table demo_bhv_table(v1 int primary key,v2 smallint,v3 bigint,v4 float,v5 double,v6 text,v7 date,v8 timestamp,v9 boolean);"
+
+echo "--- testing sinks"
+cd ../../
+sqllogictest -p 4566 -d dev './e2e_test/sink/cassandra_sink.slt'
+sleep 1
+cd apache-cassandra-4.1.3/bin
+./cqlsh -e "COPY demo.demo_bhv_table TO './query_result.csv' WITH HEADER = false AND ENCODING = 'UTF-8';"
+
+if cat ./query_result.csv | awk -F "," '{
+ exit !($1 == 1 && $2 == 1 && $3 == 1 && $4 == 1.1 && $5 == 1.2 && $6 == "test" && $7 == "2013-01-01" && $8 == "2013-01-01 01:01:01.000+0000" && $9 == "False\r"); }'; then
+ echo "Cassandra sink check passed"
+else
+ cat ./query_result.csv
+ echo "The output is not as expected."
+ exit 1
+fi
+
+echo "--- Kill cluster"
+cd ../../
+cargo make ci-kill
\ No newline at end of file
diff --git a/ci/scripts/e2e-clickhouse-sink-test.sh b/ci/scripts/e2e-clickhouse-sink-test.sh
index 3464bd3c3c14d..5443d4e53b7fd 100755
--- a/ci/scripts/e2e-clickhouse-sink-test.sh
+++ b/ci/scripts/e2e-clickhouse-sink-test.sh
@@ -24,14 +24,14 @@ shift $((OPTIND -1))
download_and_prepare_rw "$profile" source
echo "--- starting risingwave cluster"
-cargo make ci-start ci-clickhouse-test
+cargo make ci-start ci-sink-test
sleep 1
echo "--- create clickhouse table"
curl https://clickhouse.com/ | sh
sleep 2
-./clickhouse client --host=clickhouse-server --port=9000 --query="CREATE table demo_test(v1 Int32,v2 Int64,v3 String)ENGINE = ReplacingMergeTree PRIMARY KEY (v1);"
+./clickhouse client --host=clickhouse-server --port=9000 --query="CREATE table demo_test(v1 Int32,v2 Int64,v3 String,v4 Enum16('A'=1,'B'=2))ENGINE = ReplacingMergeTree PRIMARY KEY (v1);"
echo "--- testing sinks"
sqllogictest -p 4566 -d dev './e2e_test/sink/clickhouse_sink.slt'
@@ -41,13 +41,13 @@ sleep 5
# check sink destination using shell
if cat ./query_result.csv | sort | awk -F "," '{
-if ($1 == 1 && $2 == 50 && $3 == "\"1-50\"") c1++;
- if ($1 == 13 && $2 == 2 && $3 == "\"13-2\"") c2++;
- if ($1 == 2 && $2 == 2 && $3 == "\"2-2\"") c3++;
- if ($1 == 21 && $2 == 2 && $3 == "\"21-2\"") c4++;
- if ($1 == 3 && $2 == 2 && $3 == "\"3-2\"") c5++;
- if ($1 == 5 && $2 == 2 && $3 == "\"5-2\"") c6++;
- if ($1 == 8 && $2 == 2 && $3 == "\"8-2\"") c7++; }
+if ($1 == 1 && $2 == 50 && $3 == "\"1-50\"" && $4 == "\"A\"") c1++;
+ if ($1 == 13 && $2 == 2 && $3 == "\"13-2\"" && $4 == "\"B\"") c2++;
+ if ($1 == 2 && $2 == 2 && $3 == "\"2-2\"" && $4 == "\"B\"") c3++;
+ if ($1 == 21 && $2 == 2 && $3 == "\"21-2\"" && $4 == "\"A\"") c4++;
+ if ($1 == 3 && $2 == 2 && $3 == "\"3-2\"" && $4 == "\"A\"") c5++;
+ if ($1 == 5 && $2 == 2 && $3 == "\"5-2\"" && $4 == "\"B\"") c6++;
+ if ($1 == 8 && $2 == 2 && $3 == "\"8-2\"" && $4 == "\"A\"") c7++; }
END { exit !(c1 == 1 && c2 == 1 && c3 == 1 && c4 == 1 && c5 == 1 && c6 == 1 && c7 == 1); }'; then
echo "Clickhouse sink check passed"
else
diff --git a/ci/scripts/e2e-deltalake-sink-rust-test.sh b/ci/scripts/e2e-deltalake-sink-rust-test.sh
index 71ff1eede8e4d..cc0c287e8b572 100755
--- a/ci/scripts/e2e-deltalake-sink-rust-test.sh
+++ b/ci/scripts/e2e-deltalake-sink-rust-test.sh
@@ -32,8 +32,7 @@ mkdir ./connector-node
tar xf ./risingwave-connector.tar.gz -C ./connector-node
echo "--- starting risingwave cluster"
-mkdir -p .risingwave/log
-cargo make ci-start ci-deltalake-test
+cargo make ci-start ci-sink-test
sleep 1
# prepare minio deltalake sink
diff --git a/ci/scripts/e2e-doris-sink-test.sh b/ci/scripts/e2e-doris-sink-test.sh
new file mode 100755
index 0000000000000..30bfdaf129e26
--- /dev/null
+++ b/ci/scripts/e2e-doris-sink-test.sh
@@ -0,0 +1,59 @@
+#!/usr/bin/env bash
+
+# Exits as soon as any line fails.
+set -euo pipefail
+
+source ci/scripts/common.sh
+
+while getopts 'p:' opt; do
+ case ${opt} in
+ p )
+ profile=$OPTARG
+ ;;
+ \? )
+ echo "Invalid Option: -$OPTARG" 1>&2
+ exit 1
+ ;;
+ : )
+ echo "Invalid option: $OPTARG requires an argument" 1>&2
+ ;;
+ esac
+done
+shift $((OPTIND -1))
+
+download_and_prepare_rw "$profile" source
+
+echo "--- starting risingwave cluster"
+cargo make ci-start ci-sink-test
+sleep 1
+
+echo "--- create doris table"
+apt-get update -y && apt-get install -y mysql-client
+sleep 2
+mysql -uroot -P 9030 -h doris-fe-server -e "CREATE database demo;use demo;
+CREATE table demo_bhv_table(v1 int,v2 smallint,v3 bigint,v4 float,v5 double,v6 string,v7 datev2,v8 datetime,v9 boolean) UNIQUE KEY(\`v1\`)
+DISTRIBUTED BY HASH(\`v1\`) BUCKETS 1
+PROPERTIES (
+ \"replication_allocation\" = \"tag.location.default: 1\"
+);
+CREATE USER 'users'@'%' IDENTIFIED BY '123456';
+GRANT ALL ON *.* TO 'users'@'%';"
+sleep 2
+
+echo "--- testing sinks"
+sqllogictest -p 4566 -d dev './e2e_test/sink/doris_sink.slt'
+sleep 1
+mysql -uroot -P 9030 -h doris-fe-server -e "select * from demo.demo_bhv_table" > ./query_result.csv
+
+
+if cat ./query_result.csv | sed '1d; s/\t/,/g' | awk -F "," '{
+ exit !($1 == 1 && $2 == 1 && $3 == 1 && $4 == 1.1 && $5 == 1.2 && $6 == "test" && $7 == "2013-01-01" && $8 == "2013-01-01 01:01:01" && $9 == 0); }'; then
+ echo "Doris sink check passed"
+else
+ cat ./query_result.csv
+ echo "The output is not as expected."
+ exit 1
+fi
+
+echo "--- Kill cluster"
+cargo make ci-kill
\ No newline at end of file
diff --git a/ci/scripts/e2e-pulsar-sink-test.sh b/ci/scripts/e2e-pulsar-sink-test.sh
index ee8848832f940..f942ad945b3e9 100755
--- a/ci/scripts/e2e-pulsar-sink-test.sh
+++ b/ci/scripts/e2e-pulsar-sink-test.sh
@@ -21,7 +21,7 @@ shift $((OPTIND -1))
download_and_prepare_rw "$profile" source
echo "--- starting risingwave cluster"
-cargo make ci-start ci-pulsar-test
+cargo make ci-start ci-sink-test
sleep 1
echo "--- waiting until pulsar is healthy"
diff --git a/ci/scripts/e2e-redis-sink-test.sh b/ci/scripts/e2e-redis-sink-test.sh
new file mode 100755
index 0000000000000..cf64662db4051
--- /dev/null
+++ b/ci/scripts/e2e-redis-sink-test.sh
@@ -0,0 +1,48 @@
+#!/usr/bin/env bash
+
+# Exits as soon as any line fails.
+set -euo pipefail
+
+source ci/scripts/common.sh
+
+while getopts 'p:' opt; do
+ case ${opt} in
+ p )
+ profile=$OPTARG
+ ;;
+ \? )
+ echo "Invalid Option: -$OPTARG" 1>&2
+ exit 1
+ ;;
+ : )
+ echo "Invalid option: $OPTARG requires an argument" 1>&2
+ ;;
+ esac
+done
+shift $((OPTIND -1))
+
+download_and_prepare_rw "$profile" source
+
+echo "--- starting risingwave cluster"
+cargo make ci-start ci-sink-test
+apt-get update -y && apt-get install -y redis-server
+sleep 1
+
+echo "--- testing sinks"
+sqllogictest -p 4566 -d dev './e2e_test/sink/redis_sink.slt'
+sleep 1
+
+redis-cli -h redis-server -p 6379 get {\"v1\":1} >> ./query_result.txt
+redis-cli -h redis-server -p 6379 get V1:1 >> ./query_result.txt
+
+# check sink destination using shell
+if cat ./query_result.txt | tr '\n' '\0' | xargs -0 -n1 bash -c '[[ "$0" == "{\"v1\":1,\"v2\":1,\"v3\":1,\"v4\":1.100000023841858,\"v5\":1.2,\"v6\":\"test\",\"v7\":734869,\"v8\":\"2013-01-01T01:01:01.000000Z\",\"v9\":false}" || "$0" == "V2:1,V3:1" ]]'; then
+ echo "Redis sink check passed"
+else
+ cat ./query_result.txt
+ echo "The output is not as expected."
+ exit 1
+fi
+
+echo "--- Kill cluster"
+cargo make ci-kill
\ No newline at end of file
diff --git a/ci/scripts/e2e-starrocks-sink-test.sh b/ci/scripts/e2e-starrocks-sink-test.sh
new file mode 100755
index 0000000000000..256f4448f9198
--- /dev/null
+++ b/ci/scripts/e2e-starrocks-sink-test.sh
@@ -0,0 +1,58 @@
+#!/usr/bin/env bash
+
+# Exits as soon as any line fails.
+set -euo pipefail
+
+source ci/scripts/common.sh
+
+while getopts 'p:' opt; do
+ case ${opt} in
+ p )
+ profile=$OPTARG
+ ;;
+ \? )
+ echo "Invalid Option: -$OPTARG" 1>&2
+ exit 1
+ ;;
+ : )
+ echo "Invalid option: $OPTARG requires an argument" 1>&2
+ ;;
+ esac
+done
+shift $((OPTIND -1))
+
+download_and_prepare_rw "$profile" source
+
+echo "--- starting risingwave cluster"
+cargo make ci-start ci-sink-test
+sleep 1
+
+
+echo "--- create starrocks table"
+apt-get update -y && apt-get install -y mysql-client
+sleep 2
+mysql -uroot -P 9030 -h starrocks-fe-server -e "CREATE database demo;use demo;
+CREATE table demo_bhv_table(v1 int,v2 smallint,v3 bigint,v4 float,v5 double,v6 string,v7 date,v8 datetime,v9 boolean,v10 json) ENGINE=OLAP
+PRIMARY KEY(\`v1\`)
+DISTRIBUTED BY HASH(\`v1\`) properties(\"replication_num\" = \"1\");
+CREATE USER 'users'@'%' IDENTIFIED BY '123456';
+GRANT ALL ON *.* TO 'users'@'%';"
+sleep 2
+
+echo "--- testing sinks"
+sqllogictest -p 4566 -d dev './e2e_test/sink/starrocks_sink.slt'
+sleep 1
+mysql -uroot -P 9030 -h starrocks-fe-server -e "select * from demo.demo_bhv_table" > ./query_result.csv
+
+
+if cat ./query_result.csv | sed '1d; s/\t/,/g' | awk -F "," '{
+ exit !($1 == 1 && $2 == 1 && $3 == 1 && $4 == 1.1 && $5 == 1.2 && $6 == "test" && $7 == "2013-01-01" && $8 == "2013-01-01 01:01:01" && $9 == 0 && $10 = "{"v101": 100}"); }'; then
+ echo "Starrocks sink check passed"
+else
+ cat ./query_result.csv
+ echo "The output is not as expected."
+ exit 1
+fi
+
+echo "--- Kill cluster"
+cargo make ci-kill
\ No newline at end of file
diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml
index 835c46fb01e60..934458bcca1bc 100644
--- a/ci/workflows/main-cron.yml
+++ b/ci/workflows/main-cron.yml
@@ -815,6 +815,94 @@ steps:
timeout_in_minutes: 10
retry: *auto-retry
+ - label: "end-to-end redis sink test"
+ key: "e2e-redis-sink-tests"
+ command: "ci/scripts/e2e-redis-sink-test.sh -p ci-release"
+ if: |
+ !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
+ || build.pull_request.labels includes "ci/run-e2e-redis-sink-tests"
+ || build.env("CI_STEPS") =~ /(^|,)e2e-redis-sink-tests?(,|$$)/
+ depends_on:
+ - "build"
+ - "build-other"
+ plugins:
+ - docker-compose#v4.9.0:
+ run: sink-test-env
+ config: ci/docker-compose.yml
+ mount-buildkite-agent: true
+ - ./ci/plugins/upload-failure-logs
+ timeout_in_minutes: 10
+ retry: *auto-retry
+
+ - label: "set vm_max_map_count_2000000"
+ key: "set-vm_max_map_count"
+ if: |
+ !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
+ || build.pull_request.labels includes "ci/run-e2e-doris-sink-tests"
+ || build.env("CI_STEPS") =~ /(^|,)e2e-doris-sink-tests?(,|$$)/
+ command: "sudo sysctl -w vm.max_map_count=2000000"
+ depends_on:
+ - "build"
+ - "build-other"
+
+ - label: "end-to-end doris sink test"
+ key: "e2e-doris-sink-tests"
+ command: "ci/scripts/e2e-doris-sink-test.sh -p ci-release"
+ if: |
+ !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
+ || build.pull_request.labels includes "ci/run-e2e-doris-sink-tests"
+ || build.env("CI_STEPS") =~ /(^|,)e2e-doris-sink-tests?(,|$$)/
+ depends_on:
+ - "build"
+ - "build-other"
+ - "set-vm_max_map_count"
+ plugins:
+ - docker-compose#v4.9.0:
+ run: sink-doris-env
+ config: ci/docker-compose.yml
+ mount-buildkite-agent: true
+ - ./ci/plugins/upload-failure-logs
+ timeout_in_minutes: 10
+ retry: *auto-retry
+
+ - label: "end-to-end starrocks sink test"
+ key: "e2e-starrocks-sink-tests"
+ command: "ci/scripts/e2e-starrocks-sink-test.sh -p ci-release"
+ if: |
+ !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
+ || build.pull_request.labels includes "ci/run-e2e-starrocks-sink-tests"
+ || build.env("CI_STEPS") =~ /(^|,)e2e-starrocks-sink-tests?(,|$$)/
+ depends_on:
+ - "build"
+ - "build-other"
+ plugins:
+ - docker-compose#v4.9.0:
+ run: sink-test-env
+ config: ci/docker-compose.yml
+ mount-buildkite-agent: true
+ - ./ci/plugins/upload-failure-logs
+ timeout_in_minutes: 10
+ retry: *auto-retry
+
+ - label: "end-to-end cassandra sink test"
+ key: "e2e-cassandra-sink-tests"
+ command: "ci/scripts/e2e-cassandra-sink-test.sh -p ci-release"
+ if: |
+ !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
+ || build.pull_request.labels includes "ci/run-e2e-cassandra-sink-tests"
+ || build.env("CI_STEPS") =~ /(^|,)e2e-cassandra-sink-tests?(,|$$)/
+ depends_on:
+ - "build"
+ - "build-other"
+ plugins:
+ - docker-compose#v4.9.0:
+ run: sink-test-env
+ config: ci/docker-compose.yml
+ mount-buildkite-agent: true
+ - ./ci/plugins/upload-failure-logs
+ timeout_in_minutes: 10
+ retry: *auto-retry
+
- label: "end-to-end clickhouse sink test"
key: "e2e-clickhouse-sink-tests"
command: "ci/scripts/e2e-clickhouse-sink-test.sh -p ci-release"
diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml
index c48de6df64f1c..a67f915d943cc 100644
--- a/ci/workflows/pull-request.yml
+++ b/ci/workflows/pull-request.yml
@@ -292,6 +292,75 @@ steps:
timeout_in_minutes: 10
retry: *auto-retry
+ - label: "end-to-end redis sink test"
+ if: build.pull_request.labels includes "ci/run-e2e-redis-sink-tests" || build.env("CI_STEPS") =~ /(^|,) e2e-redis-sink-tests?(,|$$)/
+ command: "ci/scripts/e2e-redis-sink-test.sh -p ci-dev"
+ depends_on:
+ - "build"
+ - "build-other"
+ plugins:
+ - docker-compose#v4.9.0:
+ run: sink-test-env
+ config: ci/docker-compose.yml
+ mount-buildkite-agent: true
+ - ./ci/plugins/upload-failure-logs
+ timeout_in_minutes: 10
+ retry: *auto-retry
+
+ - label: "set vm_max_map_count_2000000"
+ key: "set-vm_max_map_count"
+ if: build.pull_request.labels includes "ci/run-e2e-doris-sink-tests" || build.env("CI_STEPS") =~ /(^|,) e2e-doris-sink-tests?(,|$$)/
+ command: "sudo sysctl -w vm.max_map_count=2000000"
+ depends_on:
+ - "build"
+ - "build-other"
+
+ - label: "end-to-end doris sink test"
+ if: build.pull_request.labels includes "ci/run-e2e-doris-sink-tests" || build.env("CI_STEPS") =~ /(^|,) e2e-doris-sink-tests?(,|$$)/
+ command: "ci/scripts/e2e-doris-sink-test.sh -p ci-dev"
+ depends_on:
+ - "build"
+ - "build-other"
+ - "set-vm_max_map_count"
+ plugins:
+ - docker-compose#v4.9.0:
+ run: sink-doris-env
+ config: ci/docker-compose.yml
+ mount-buildkite-agent: true
+ - ./ci/plugins/upload-failure-logs
+ timeout_in_minutes: 10
+ retry: *auto-retry
+
+ - label: "end-to-end starrocks sink test"
+ if: build.pull_request.labels includes "ci/run-e2e-starrocks-sink-tests" || build.env("CI_STEPS") =~ /(^|,) e2e-starrocks-sink-tests?(,|$$)/
+ command: "ci/scripts/e2e-starrocks-sink-test.sh -p ci-dev"
+ depends_on:
+ - "build"
+ - "build-other"
+ plugins:
+ - docker-compose#v4.9.0:
+ run: sink-test-env
+ config: ci/docker-compose.yml
+ mount-buildkite-agent: true
+ - ./ci/plugins/upload-failure-logs
+ timeout_in_minutes: 10
+ retry: *auto-retry
+
+ - label: "end-to-end cassandra sink test"
+ if: build.pull_request.labels includes "ci/run-e2e-cassandra-sink-tests" || build.env("CI_STEPS") =~ /(^|,) e2e-cassandra-sink-tests?(,|$$)/
+ command: "ci/scripts/e2e-cassandra-sink-test.sh -p ci-dev"
+ depends_on:
+ - "build"
+ - "build-other"
+ plugins:
+ - docker-compose#v4.9.0:
+ run: sink-test-env
+ config: ci/docker-compose.yml
+ mount-buildkite-agent: true
+ - ./ci/plugins/upload-failure-logs
+ timeout_in_minutes: 10
+ retry: *auto-retry
+
- label: "e2e java-binding test"
if: build.pull_request.labels includes "ci/run-java-binding-tests" || build.env("CI_STEPS") =~ /(^|,)java-binding-tests?(,|$$)/
command: "ci/scripts/java-binding-test.sh -p ci-dev"
diff --git a/e2e_test/batch/basic/make_time.slt.part b/e2e_test/batch/basic/make_time.slt.part
index 7a11b837c4fdb..ff1d4453e0efd 100644
--- a/e2e_test/batch/basic/make_time.slt.part
+++ b/e2e_test/batch/basic/make_time.slt.part
@@ -9,7 +9,12 @@ SELECT make_timestamptz(1973, 07, 15, 08, 15, 55.33);
query T
SELECT make_timestamptz(-1973, 07, 15, 08, 15, 55.33);
----
--1972-07-15 08:15:55.330+00:00
+1973-07-15 08:15:55.330+00:00 BC
+
+query T
+SELECT make_timestamptz(20240, 1, 26, 14, 20, 26);
+----
+20240-01-26 14:20:26+00:00
query error Invalid parameter year, month, day: invalid date: -3-2-29
SELECT make_timestamptz(-4, 02, 29, 08, 15, 55.33);
@@ -17,7 +22,7 @@ SELECT make_timestamptz(-4, 02, 29, 08, 15, 55.33);
query T
SELECT make_timestamptz(-5, 02, 29, 08, 15, 55.33);
----
--0004-02-29 08:15:55.330+00:00
+0005-02-29 08:15:55.330+00:00 BC
query error Invalid parameter sec: invalid sec: -55.33
SELECT make_timestamptz(1973, 07, 15, 08, 15, -55.33);
@@ -105,6 +110,11 @@ SELECT make_date(2024, 1, 26);
----
2024-01-26
+query T
+SELECT make_date(20240, 1, 26);
+----
+20240-01-26
+
query T
SELECT make_date(-2024, 1, 26);
----
@@ -146,10 +156,15 @@ SELECT make_timestamp(2024, 1, 26, 14, 20, 26);
----
2024-01-26 14:20:26
+query T
+SELECT make_timestamp(20240, 1, 26, 14, 20, 26);
+----
+20240-01-26 14:20:26
+
query T
SELECT make_timestamp(-1973, 07, 15, 08, 15, 55.33);
----
--1972-07-15 08:15:55.330
+1973-07-15 08:15:55.330 BC
query error Invalid parameter year, month, day: invalid date: -3-2-29
SELECT make_timestamp(-4, 02, 29, 08, 15, 55.33);
@@ -157,4 +172,14 @@ SELECT make_timestamp(-4, 02, 29, 08, 15, 55.33);
query T
SELECT make_timestamp(-5, 02, 29, 08, 15, 55.33);
----
--0004-02-29 08:15:55.330
+0005-02-29 08:15:55.330 BC
+
+query T
+select '0001-01-01 12:34:56'::timestamp - '10 year'::interval;
+----
+0010-01-01 12:34:56 BC
+
+query T
+select '0001-01-01 12:34:56'::timestamptz - '10 year'::interval;
+----
+0010-01-01 12:34:56+00:00 BC
diff --git a/e2e_test/batch/catalog/pg_cast.slt.part b/e2e_test/batch/catalog/pg_cast.slt.part
index b8ab68a5ed5cd..b1558d1e144c4 100644
--- a/e2e_test/batch/catalog/pg_cast.slt.part
+++ b/e2e_test/batch/catalog/pg_cast.slt.part
@@ -82,8 +82,9 @@ SELECT * FROM pg_catalog.pg_cast;
78 3802 701 e
79 3802 1700 e
80 3802 1043 a
-81 1301 701 e
-82 1301 1043 a
+81 20 20 e
+82 1301 701 e
+83 1301 1043 a
query TT rowsort
SELECT s.typname, t.typname
diff --git a/e2e_test/batch/catalog/pg_class.slt.part b/e2e_test/batch/catalog/pg_class.slt.part
index 2f2ffbe016e3a..ff31c27dcc17d 100644
--- a/e2e_test/batch/catalog/pg_class.slt.part
+++ b/e2e_test/batch/catalog/pg_class.slt.part
@@ -11,7 +11,7 @@ SELECT oid,relname,relowner,relkind FROM pg_catalog.pg_class ORDER BY oid limit
8 pg_cast 1 r
9 pg_class 1 v
10 pg_collation 1 v
-11 pg_constraint 1 v
+11 pg_constraint 1 r
12 pg_conversion 1 v
13 pg_database 1 v
14 pg_depend 1 v
@@ -20,4 +20,4 @@ SELECT oid,relname,relowner,relkind FROM pg_catalog.pg_class ORDER BY oid limit
query ITIT
SELECT oid,relname,relowner,relkind FROM pg_catalog.pg_class WHERE oid = 'pg_namespace'::regclass;
----
-24 pg_namespace 1 v
+25 pg_namespace 1 v
diff --git a/e2e_test/batch/catalog/pg_constraint.slt.part b/e2e_test/batch/catalog/pg_constraint.slt.part
new file mode 100644
index 0000000000000..a2a36e73f5416
--- /dev/null
+++ b/e2e_test/batch/catalog/pg_constraint.slt.part
@@ -0,0 +1,10 @@
+statement ok
+create table t(a int, b int, c varchar, primary key(a,b));
+
+query TTTT
+select conname, contype, conkey from pg_constraint where conname='t_pkey';
+----
+t_pkey p {1,2}
+
+statement ok
+drop table t;
diff --git a/e2e_test/batch/catalog/pg_settings.slt.part b/e2e_test/batch/catalog/pg_settings.slt.part
index 5f37db11fcb91..c8e927ba72b9f 100644
--- a/e2e_test/batch/catalog/pg_settings.slt.part
+++ b/e2e_test/batch/catalog/pg_settings.slt.part
@@ -63,6 +63,14 @@ query TT
SELECT * FROM pg_catalog.pg_settings where name='dummy';
----
+# https://github.com/risingwavelabs/risingwave/issues/15125
+query TT
+SELECT min(name) name, context FROM pg_catalog.pg_settings GROUP BY context;
+----
+application_name user
+backup_storage_directory postmaster
+block_size_kb internal
+
# Tab-completion of `SET` command
query T
SELECT name
diff --git a/e2e_test/batch/catalog/version.slt.part b/e2e_test/batch/catalog/version.slt.part
index b2ba9e2a877c5..dc3e0399b1e6a 100644
--- a/e2e_test/batch/catalog/version.slt.part
+++ b/e2e_test/batch/catalog/version.slt.part
@@ -1,4 +1,4 @@
query T
-select substring(version() from 1 for 14);
+select substring(version() from 1 for 16);
----
-PostgreSQL 9.5
+PostgreSQL 13.14
diff --git a/e2e_test/batch/functions/setting.slt.part b/e2e_test/batch/functions/setting.slt.part
index 77d1d80e46590..233399d80a025 100644
--- a/e2e_test/batch/functions/setting.slt.part
+++ b/e2e_test/batch/functions/setting.slt.part
@@ -1,12 +1,12 @@
query T
SELECT current_setting('server_version');
----
-9.5.0
+13.14.0
query I
-SELECT CAST(current_setting('server_version_num') AS INT) / 100 AS version;
+SELECT current_setting('server_version_num') AS version;
----
-905
+130014
query T
SELECT set_config('client_min_messages', 'warning', false);
diff --git a/e2e_test/error_ui/simple/main.slt b/e2e_test/error_ui/simple/main.slt
index b4cebbdfeff70..3197544b45d75 100644
--- a/e2e_test/error_ui/simple/main.slt
+++ b/e2e_test/error_ui/simple/main.slt
@@ -27,7 +27,7 @@ db error: ERROR: Failed to run the query
Caused by these errors (recent errors listed first):
1: gRPC request to meta service failed: Internal error
- 2: SystemParams error: unrecognized system param "not_exist_key"
+ 2: SystemParams error: unrecognized system parameter "not_exist_key"
query error
diff --git a/e2e_test/sink/cassandra_sink.slt b/e2e_test/sink/cassandra_sink.slt
new file mode 100644
index 0000000000000..7091e8da70783
--- /dev/null
+++ b/e2e_test/sink/cassandra_sink.slt
@@ -0,0 +1,33 @@
+statement ok
+CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamptz, v9 boolean);
+
+statement ok
+CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;
+
+statement ok
+CREATE SINK s6
+FROM
+ mv6 WITH (
+ connector = 'cassandra',
+ type = 'append-only',
+ force_append_only='true',
+ cassandra.url = 'cassandra-server:9042',
+ cassandra.keyspace = 'demo',
+ cassandra.table = 'demo_bhv_table',
+ cassandra.datacenter = 'datacenter1',
+);
+
+statement ok
+INSERT INTO t6 VALUES (1, 1, 1, 1.1, 1.2, 'test', '2013-01-01', '2013-01-01 01:01:01+00:00' , false);
+
+statement ok
+FLUSH;
+
+statement ok
+DROP SINK s6;
+
+statement ok
+DROP MATERIALIZED VIEW mv6;
+
+statement ok
+DROP TABLE t6;
\ No newline at end of file
diff --git a/e2e_test/sink/clickhouse_sink.slt b/e2e_test/sink/clickhouse_sink.slt
index 909bdbfd6356b..9791f484326d7 100644
--- a/e2e_test/sink/clickhouse_sink.slt
+++ b/e2e_test/sink/clickhouse_sink.slt
@@ -1,11 +1,11 @@
statement ok
-CREATE TABLE t6 (v1 int primary key, v2 bigint, v3 varchar);
+CREATE TABLE t6 (v1 int primary key, v2 bigint, v3 varchar, v4 smallint);
statement ok
CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;
statement ok
-CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH (
+CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3, mv6.v4 as v4 from mv6 WITH (
connector = 'clickhouse',
type = 'append-only',
force_append_only='true',
@@ -17,7 +17,7 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH
);
statement ok
-INSERT INTO t6 VALUES (1, 50, '1-50'), (2, 2, '2-2'), (3, 2, '3-2'), (5, 2, '5-2'), (8, 2, '8-2'), (13, 2, '13-2'), (21, 2, '21-2');
+INSERT INTO t6 VALUES (1, 50, '1-50', 1), (2, 2, '2-2', 2), (3, 2, '3-2', 1), (5, 2, '5-2', 2), (8, 2, '8-2', 1), (13, 2, '13-2', 2), (21, 2, '21-2', 1);
statement ok
FLUSH;
diff --git a/e2e_test/sink/doris_sink.slt b/e2e_test/sink/doris_sink.slt
new file mode 100644
index 0000000000000..2c552bbb26143
--- /dev/null
+++ b/e2e_test/sink/doris_sink.slt
@@ -0,0 +1,34 @@
+statement ok
+CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamp, v9 boolean);
+
+statement ok
+CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;
+
+statement ok
+CREATE SINK s6
+FROM
+ mv6 WITH (
+ connector = 'doris',
+ type = 'append-only',
+ doris.url = 'http://doris-fe-server:8030',
+ doris.user = 'users',
+ doris.password = '123456',
+ doris.database = 'demo',
+ doris.table='demo_bhv_table',
+ force_append_only='true'
+);
+
+statement ok
+INSERT INTO t6 VALUES (1, 1, 1, 1.1, 1.2, 'test', '2013-01-01', '2013-01-01 01:01:01' , false);
+
+statement ok
+FLUSH;
+
+statement ok
+DROP SINK s6;
+
+statement ok
+DROP MATERIALIZED VIEW mv6;
+
+statement ok
+DROP TABLE t6;
\ No newline at end of file
diff --git a/e2e_test/sink/redis_sink.slt b/e2e_test/sink/redis_sink.slt
new file mode 100644
index 0000000000000..7475a80ae696e
--- /dev/null
+++ b/e2e_test/sink/redis_sink.slt
@@ -0,0 +1,41 @@
+statement ok
+CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamptz, v9 boolean);
+
+statement ok
+CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;
+
+statement ok
+CREATE SINK s61
+FROM
+ mv6 WITH (
+ primary_key = 'v1',
+ connector = 'redis',
+ redis.url= 'redis://redis-server:6379/',
+)FORMAT PLAIN ENCODE JSON(force_append_only='true');
+
+statement ok
+CREATE SINK s62
+FROM
+ mv6 WITH (
+ primary_key = 'v1',
+ connector = 'redis',
+ redis.url= 'redis://redis-server:6379/',
+)FORMAT PLAIN ENCODE TEMPLATE(force_append_only='true', key_format = 'V1:{v1}', value_format = 'V2:{v2},V3:{v3}');
+
+statement ok
+INSERT INTO t6 VALUES (1, 1, 1, 1.1, 1.2, 'test', '2013-01-01', '2013-01-01 01:01:01+00:00' , false);
+
+statement ok
+FLUSH;
+
+statement ok
+DROP SINK s61;
+
+statement ok
+DROP SINK s62;
+
+statement ok
+DROP MATERIALIZED VIEW mv6;
+
+statement ok
+DROP TABLE t6;
\ No newline at end of file
diff --git a/e2e_test/sink/sink_into_table/basic.slt b/e2e_test/sink/sink_into_table/basic.slt
index 1bc5a47907077..890087e207fd0 100644
--- a/e2e_test/sink/sink_into_table/basic.slt
+++ b/e2e_test/sink/sink_into_table/basic.slt
@@ -362,6 +362,35 @@ drop table t_b;
statement ok
drop table t_c;
+# cycle check (with materialize view)
+
+statement ok
+create table t_a(v int primary key);
+
+statement ok
+create materialized view m_a as select v from t_a;
+
+statement ok
+create table t_b(v int primary key);
+
+statement ok
+create sink s_a into t_b as select v from m_a;
+
+statement error Creating such a sink will result in circular dependency
+create sink s_b into t_a as select v from t_b;
+
+statement ok
+drop sink s_a;
+
+statement ok
+drop table t_b;
+
+statement ok
+drop materialized view m_a;
+
+statement ok
+drop table t_a;
+
# multi sinks
statement ok
diff --git a/e2e_test/sink/starrocks_sink.slt b/e2e_test/sink/starrocks_sink.slt
new file mode 100644
index 0000000000000..a1ee1b0ffe039
--- /dev/null
+++ b/e2e_test/sink/starrocks_sink.slt
@@ -0,0 +1,36 @@
+statement ok
+CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamp, v9 boolean, v10 jsonb);
+
+statement ok
+CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;
+
+statement ok
+CREATE SINK s6
+FROM
+ mv6 WITH (
+ connector = 'starrocks',
+ type = 'upsert',
+ starrocks.host = 'starrocks-fe-server',
+ starrocks.mysqlport = '9030',
+ starrocks.httpport = '8030',
+ starrocks.user = 'users',
+ starrocks.password = '123456',
+ starrocks.database = 'demo',
+ starrocks.table = 'demo_bhv_table',
+ primary_key = 'v1'
+);
+
+statement ok
+INSERT INTO t6 VALUES (1, 1, 1, 1.1, 1.2, 'test', '2013-01-01', '2013-01-01 01:01:01' , false, '{"v101":100}');
+
+statement ok
+FLUSH;
+
+statement ok
+DROP SINK s6;
+
+statement ok
+DROP MATERIALIZED VIEW mv6;
+
+statement ok
+DROP TABLE t6;
\ No newline at end of file
diff --git a/e2e_test/source/basic/ddl.slt b/e2e_test/source/basic/ddl.slt
index 6e640e047d4c2..402cf129b86ba 100644
--- a/e2e_test/source/basic/ddl.slt
+++ b/e2e_test/source/basic/ddl.slt
@@ -28,7 +28,8 @@ db error: ERROR: Failed to run the query
Caused by these errors (recent errors listed first):
1: gRPC request to meta service failed: Internal error
2: failed to create source worker
- 3: missing field `properties.bootstrap.server`
+ 3: failed to parse json
+ 4: missing field `properties.bootstrap.server`
statement error
diff --git a/e2e_test/streaming/bug_fixes/issue_15198.slt b/e2e_test/streaming/bug_fixes/issue_15198.slt
new file mode 100644
index 0000000000000..a69aede18c2c9
--- /dev/null
+++ b/e2e_test/streaming/bug_fixes/issue_15198.slt
@@ -0,0 +1,23 @@
+# https://github.com/risingwavelabs/risingwave/issues/15198
+
+statement ok
+SET RW_IMPLICIT_FLUSH TO TRUE;
+
+statement ok
+create materialized view "tumble_with_offset"
+as (
+ with
+ input as (
+ select 1 as id, TO_TIMESTAMP('2024-01-01 01:30:02', 'YYYY-MM-DD HH24:MI:SS') as timestamps
+ )
+ select *
+ from tumble(input, timestamps, interval '1 DAY', '+6 HOURS')
+);
+
+query ITTT
+select * from tumble_with_offset;
+----
+1 2024-01-01 01:30:02+00:00 2023-12-31 06:00:00+00:00 2024-01-01 06:00:00+00:00
+
+statement ok
+drop materialized view tumble_with_offset;
diff --git a/e2e_test/streaming/rate_limit.slt b/e2e_test/streaming/rate_limit/basic.slt
similarity index 100%
rename from e2e_test/streaming/rate_limit.slt
rename to e2e_test/streaming/rate_limit/basic.slt
diff --git a/e2e_test/streaming/rate_limit/upstream_amplification.slt b/e2e_test/streaming/rate_limit/upstream_amplification.slt
new file mode 100644
index 0000000000000..71be801a78fc2
--- /dev/null
+++ b/e2e_test/streaming/rate_limit/upstream_amplification.slt
@@ -0,0 +1,44 @@
+# This test will test that barrier latency does not spike
+# when there's rate limit on source.
+# The upstream side should backpressure the source reader,
+# but still allow barriers to flow through.
+
+statement ok
+SET STREAMING_PARALLELISM=2;
+
+statement ok
+SET STREAMING_RATE_LIMIT=1;
+
+statement ok
+CREATE TABLE source_table (i1 int)
+WITH (
+ connector = 'datagen',
+ fields.i1.start = '1',
+ fields.i1.end = '5',
+ datagen.rows.per.second = '10000'
+) FORMAT PLAIN ENCODE JSON;
+
+statement ok
+CREATE SINK sink AS
+ SELECT x.i1 as i1 FROM source_table x
+ JOIN source_table s1 ON x.i1 = s1.i1
+ JOIN source_table s2 ON x.i1 = s2.i1
+ JOIN source_table s3 ON x.i1 = s3.i1
+ WITH (connector = 'blackhole');
+
+# The following sequence of FLUSH should be fast, since barrier should be able to bypass sink.
+# Otherwise, these FLUSH will take a long time to complete, and trigger timeout.
+statement ok
+flush;
+
+statement ok
+flush;
+
+statement ok
+flush;
+
+statement ok
+drop sink sink;
+
+statement ok
+drop table source_table;
\ No newline at end of file
diff --git a/integration_tests/http-sink/README.md b/integration_tests/http-sink/README.md
new file mode 100644
index 0000000000000..d956cb4ea95a4
--- /dev/null
+++ b/integration_tests/http-sink/README.md
@@ -0,0 +1,34 @@
+# Demo: Sinking to Http
+
+In this demo, we want to showcase how RisingWave is able to sink data to Http. This feature is depended on https://github.com/getindata/flink-http-connector.
+
+It has a few limitations:
+1. It offers only two options for HTTP method, i.e, PUT and POST.
+2. It can only execute one request-reply round to the service (session-less).
+3. It cannot handle status codes in the SQL API.
+
+Therefore, we suggest you to try Python UDF at first.
+
+### Demo:
+1. Launch the cluster:
+
+```sh
+docker-compose up -d
+```
+
+The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data.
+
+2. Build an Http Server that can be built on its own
+
+3. Execute the SQL queries in sequence:
+
+- create_source.sql
+- create_mv.sql
+- create_sink.sql
+
+4. Check the contents in Http Server:
+On the Http Server side it will receive the json string, something like:
+```
+{"user_id":5,"target_id":"siFqrkdlCn"}
+```
+The number of json is 1000
diff --git a/integration_tests/http-sink/create_mv.sql b/integration_tests/http-sink/create_mv.sql
new file mode 100644
index 0000000000000..8a291a3c95ea7
--- /dev/null
+++ b/integration_tests/http-sink/create_mv.sql
@@ -0,0 +1,6 @@
+CREATE MATERIALIZED VIEW bhv_mv AS
+SELECT
+ user_id,
+ target_id
+FROM
+ user_behaviors;
diff --git a/integration_tests/http-sink/create_sink.sql b/integration_tests/http-sink/create_sink.sql
new file mode 100644
index 0000000000000..0644d1d51934b
--- /dev/null
+++ b/integration_tests/http-sink/create_sink.sql
@@ -0,0 +1,11 @@
+CREATE sink bhv_http_sink FROM bhv_mv WITH (
+ connector = 'http',
+ url = 'http://localhost:8080/endpoint',
+ format = 'json',
+ type = 'append-only',
+ force_append_only='true',
+ primary_key = 'user_id',
+ gid.connector.http.sink.header.Origin = '*',
+ "gid.connector.http.sink.header.X-Content-Type-Options" = 'nosniff',
+ "gid.connector.http.sink.header.Content-Type" = 'application/json'
+);
\ No newline at end of file
diff --git a/integration_tests/http-sink/create_source.sql b/integration_tests/http-sink/create_source.sql
new file mode 100644
index 0000000000000..c28c10f3616da
--- /dev/null
+++ b/integration_tests/http-sink/create_source.sql
@@ -0,0 +1,18 @@
+CREATE table user_behaviors (
+ user_id int,
+ target_id VARCHAR,
+ target_type VARCHAR,
+ event_timestamp TIMESTAMP,
+ behavior_type VARCHAR,
+ parent_target_type VARCHAR,
+ parent_target_id VARCHAR,
+ PRIMARY KEY(user_id)
+) WITH (
+ connector = 'datagen',
+ fields.user_id.kind = 'sequence',
+ fields.user_id.start = '1',
+ fields.user_id.end = '1000',
+ fields.user_name.kind = 'random',
+ fields.user_name.length = '10',
+ datagen.rows.per.second = '10'
+) FORMAT PLAIN ENCODE JSON;
\ No newline at end of file
diff --git a/integration_tests/http-sink/docker-compose.yml b/integration_tests/http-sink/docker-compose.yml
new file mode 100644
index 0000000000000..8fba5ff352dc0
--- /dev/null
+++ b/integration_tests/http-sink/docker-compose.yml
@@ -0,0 +1,37 @@
+---
+version: "3"
+services:
+ risingwave-standalone:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: risingwave-standalone
+ etcd-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: etcd-0
+ grafana-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: grafana-0
+ minio-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: minio-0
+ prometheus-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: prometheus-0
+volumes:
+ risingwave-standalone:
+ external: false
+ etcd-0:
+ external: false
+ grafana-0:
+ external: false
+ minio-0:
+ external: false
+ prometheus-0:
+ external: false
+ message_queue:
+ external: false
+name: risingwave-compose
diff --git a/integration_tests/starrocks-sink/docker-compose.yml b/integration_tests/starrocks-sink/docker-compose.yml
index 4210206aa7705..81ef7c277dad0 100644
--- a/integration_tests/starrocks-sink/docker-compose.yml
+++ b/integration_tests/starrocks-sink/docker-compose.yml
@@ -2,7 +2,7 @@
version: "3"
services:
starrocks-fe:
- image: starrocks/fe-ubuntu:latest
+ image: starrocks/fe-ubuntu:3.1.7
hostname: starrocks-fe
container_name: starrocks-fe
volumes:
@@ -19,7 +19,7 @@ services:
timeout: 5s
retries: 30
starrocks-be:
- image: starrocks/be-ubuntu:latest
+ image: starrocks/be-ubuntu:3.1.7
command:
- /bin/bash
- -c
@@ -27,6 +27,7 @@ services:
sleep 15s; mysql --connect-timeout 2 -h starrocks-fe -P9030 -uroot -e "alter system add backend \"starrocks-be:9050\";"
/opt/starrocks/be/bin/start_be.sh
ports:
+ - 9050:9050
- 8040:8040
hostname: starrocks-be
container_name: starrocks-be
diff --git a/java/connector-node/risingwave-connector-service/pom.xml b/java/connector-node/risingwave-connector-service/pom.xml
index 047c523c1c7db..d51d67497ce05 100644
--- a/java/connector-node/risingwave-connector-service/pom.xml
+++ b/java/connector-node/risingwave-connector-service/pom.xml
@@ -99,7 +99,6 @@
com.risingwave
risingwave-sink-mock-flink-http-sink
- provided
diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java
index 9ac3d257b2bad..7c883335cfc23 100644
--- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java
+++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java
@@ -23,6 +23,7 @@
public class CassandraConfig extends CommonSinkConfig {
/** Required */
private String type;
+
/** Required */
private String url;
diff --git a/java/connector-node/risingwave-sink-mock-flink/risingwave-sink-mock-flink-http-sink/src/main/java/com/risingwave/mock/flink/http/HttpFlinkMockSinkFactory.java b/java/connector-node/risingwave-sink-mock-flink/risingwave-sink-mock-flink-http-sink/src/main/java/com/risingwave/mock/flink/http/HttpFlinkMockSinkFactory.java
index a969dddd620f7..d316eeae74bed 100644
--- a/java/connector-node/risingwave-sink-mock-flink/risingwave-sink-mock-flink-http-sink/src/main/java/com/risingwave/mock/flink/http/HttpFlinkMockSinkFactory.java
+++ b/java/connector-node/risingwave-sink-mock-flink/risingwave-sink-mock-flink-http-sink/src/main/java/com/risingwave/mock/flink/http/HttpFlinkMockSinkFactory.java
@@ -26,6 +26,8 @@
/**
* The `FlinkMockSinkFactory` implementation of the http sink is responsible for creating the http
* counterpart of the `DynamicTableSinkFactory`. And `validate` don't need to do anything.
+ *
+ *
This feature is depended on https://github.com/getindata/flink-http-connector
*/
public class HttpFlinkMockSinkFactory implements FlinkMockSinkFactory {
@Override
diff --git a/java/pom.xml b/java/pom.xml
index 5f168c48bd9ef..c6e39b34cfc0b 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -69,7 +69,7 @@
1.53.0
2.10
0.1.0-SNAPSHOT
- 2.27.1
+ 2.43.0
2.20.0
2.0.9
1.5.0
@@ -391,7 +391,7 @@
- 1.7
+ 1.20.0
diff --git a/lints/src/format_error.rs b/lints/src/format_error.rs
index 0d1df649460e8..8dcbed8cb520d 100644
--- a/lints/src/format_error.rs
+++ b/lints/src/format_error.rs
@@ -16,7 +16,7 @@ use clippy_utils::diagnostics::span_lint_and_help;
use clippy_utils::macros::{
find_format_arg_expr, find_format_args, is_format_macro, macro_backtrace,
};
-use clippy_utils::ty::implements_trait;
+use clippy_utils::ty::{implements_trait, match_type};
use clippy_utils::{
is_in_cfg_test, is_in_test_function, is_trait_method, match_def_path, match_function_call,
};
@@ -64,6 +64,7 @@ const TRACING_FIELD_DEBUG: [&str; 3] = ["tracing_core", "field", "debug"];
const TRACING_FIELD_DISPLAY: [&str; 3] = ["tracing_core", "field", "display"];
const TRACING_MACROS_EVENT: [&str; 3] = ["tracing", "macros", "event"];
const ANYHOW_MACROS_ANYHOW: [&str; 3] = ["anyhow", "macros", "anyhow"];
+const ANYHOW_ERROR: [&str; 2] = ["anyhow", "Error"];
impl<'tcx> LateLintPass<'tcx> for FormatError {
fn check_expr(&mut self, cx: &LateContext<'tcx>, expr: &'tcx Expr<'_>) {
@@ -143,7 +144,10 @@ fn check_fmt_arg_in_anyhow_error(cx: &LateContext<'_>, arg_expr: &Expr<'_>) {
check_fmt_arg_with_help(
cx,
arg_expr,
- "consider directly wrapping the error with `anyhow::anyhow!(..)` instead of formatting it",
+ (
+ "consider directly wrapping the error with `anyhow::anyhow!(..)` instead of formatting it",
+ "consider removing the redundant wrapping of `anyhow::anyhow!(..)`",
+ ),
);
}
@@ -151,12 +155,16 @@ fn check_fmt_arg_in_anyhow_context(cx: &LateContext<'_>, arg_expr: &Expr<'_>) {
check_fmt_arg_with_help(
cx,
arg_expr,
- "consider using `anyhow::Error::context`, `anyhow::Context::(with_)context` to \
+ (
+ "consider using `anyhow::Context::(with_)context` to \
attach additional message to the error and make it an error source instead",
+ "consider using `.context(..)` to \
+ attach additional message to the error and make it an error source instead",
+ ),
);
}
-fn check_fmt_arg_with_help(cx: &LateContext<'_>, arg_expr: &Expr<'_>, help: &str) {
+fn check_fmt_arg_with_help(cx: &LateContext<'_>, arg_expr: &Expr<'_>, help: impl Help) {
check_arg(cx, arg_expr, arg_expr.span, help);
}
@@ -169,27 +177,56 @@ fn check_to_string_call(cx: &LateContext<'_>, receiver: &Expr<'_>, to_string_spa
);
}
-fn check_arg(cx: &LateContext<'_>, arg_expr: &Expr<'_>, span: Span, help: &str) {
+fn check_arg(cx: &LateContext<'_>, arg_expr: &Expr<'_>, span: Span, help: impl Help) {
let Some(error_trait_id) = cx.tcx.get_diagnostic_item(sym::Error) else {
return;
};
let ty = cx.typeck_results().expr_ty(arg_expr).peel_refs();
- if implements_trait(cx, ty, error_trait_id, &[]) {
- if let Some(span) = core::iter::successors(Some(span), |s| s.parent_callsite())
- .find(|s| s.can_be_used_for_suggestions())
- {
- // TODO: applicable suggestions
- span_lint_and_help(
- cx,
- FORMAT_ERROR,
- span,
- "should not format error directly",
- None,
- help,
- );
- }
+ let help = if implements_trait(cx, ty, error_trait_id, &[]) {
+ help.normal_help()
+ } else if match_type(cx, ty, &ANYHOW_ERROR) {
+ help.anyhow_help()
+ } else {
+ return;
+ };
+
+ if let Some(span) = core::iter::successors(Some(span), |s| s.parent_callsite())
+ .find(|s| s.can_be_used_for_suggestions())
+ {
+ // TODO: applicable suggestions
+ span_lint_and_help(
+ cx,
+ FORMAT_ERROR,
+ span,
+ "should not format error directly",
+ None,
+ help,
+ );
+ }
+}
+
+trait Help {
+ fn normal_help(&self) -> &str;
+ fn anyhow_help(&self) -> &str {
+ self.normal_help()
+ }
+}
+
+impl Help for &str {
+ fn normal_help(&self) -> &str {
+ self
+ }
+}
+
+impl Help for (&str, &str) {
+ fn normal_help(&self) -> &str {
+ self.0
+ }
+
+ fn anyhow_help(&self) -> &str {
+ self.1
}
}
diff --git a/lints/src/lib.rs b/lints/src/lib.rs
index d2c78515272f4..df77538d3cf17 100644
--- a/lints/src/lib.rs
+++ b/lints/src/lib.rs
@@ -14,6 +14,7 @@
#![feature(rustc_private)]
#![feature(let_chains)]
+#![feature(lazy_cell)]
#![warn(unused_extern_crates)]
extern crate rustc_ast;
diff --git a/lints/ui/format_error.rs b/lints/ui/format_error.rs
index eeead1306ea3f..0e46c72766157 100644
--- a/lints/ui/format_error.rs
+++ b/lints/ui/format_error.rs
@@ -55,4 +55,22 @@ fn main() {
let _ = anyhow!("{:?}", err);
let _ = anyhow!("some error occurred: {}", err);
let _ = anyhow!("some error occurred: {:?}", err);
+
+ // `anyhow::Error` does not implement `Error` trait, test the special path here.
+ let make_anyhow_err = || anyhow!("foobar");
+ let anyhow_err = make_anyhow_err();
+
+ let _ = format!("{}", anyhow_err);
+ let _ = format!("{}", &anyhow_err);
+ let _ = format!("{}", &&anyhow_err);
+ let _ = format!("{}", Box::new(&anyhow_err)); // TODO: fail to lint
+
+ tracing::field::display(&anyhow_err);
+ tracing::field::debug(make_anyhow_err());
+
+ let _ = anyhow_err.to_string();
+ let _ = (&&anyhow_err).to_string();
+
+ let _ = anyhow!("{}", anyhow_err);
+ let _ = anyhow!("some error occurred: {:?}", anyhow_err);
}
diff --git a/lints/ui/format_error.stderr b/lints/ui/format_error.stderr
index 8ec6e69b7fcf4..0eb4786380a79 100644
--- a/lints/ui/format_error.stderr
+++ b/lints/ui/format_error.stderr
@@ -262,7 +262,7 @@ error: should not format error directly
LL | let _ = anyhow!("some error occurred: {}", err);
| ^^^
|
- = help: consider using `anyhow::Error::context`, `anyhow::Context::(with_)context` to attach additional message to the error and make it an error source instead
+ = help: consider using `anyhow::Context::(with_)context` to attach additional message to the error and make it an error source instead
error: should not format error directly
--> $DIR/format_error.rs:57:50
@@ -270,7 +270,79 @@ error: should not format error directly
LL | let _ = anyhow!("some error occurred: {:?}", err);
| ^^^
|
- = help: consider using `anyhow::Error::context`, `anyhow::Context::(with_)context` to attach additional message to the error and make it an error source instead
+ = help: consider using `anyhow::Context::(with_)context` to attach additional message to the error and make it an error source instead
-error: aborting due to 34 previous errors
+error: should not format error directly
+ --> $DIR/format_error.rs:63:27
+ |
+LL | let _ = format!("{}", anyhow_err);
+ | ^^^^^^^^^^
+ |
+ = help: consider importing `thiserror_ext::AsReport` and using `.as_report()` instead
+
+error: should not format error directly
+ --> $DIR/format_error.rs:64:27
+ |
+LL | let _ = format!("{}", &anyhow_err);
+ | ^^^^^^^^^^^
+ |
+ = help: consider importing `thiserror_ext::AsReport` and using `.as_report()` instead
+
+error: should not format error directly
+ --> $DIR/format_error.rs:65:27
+ |
+LL | let _ = format!("{}", &&anyhow_err);
+ | ^^^^^^^^^^^^
+ |
+ = help: consider importing `thiserror_ext::AsReport` and using `.as_report()` instead
+
+error: should not format error directly
+ --> $DIR/format_error.rs:68:29
+ |
+LL | tracing::field::display(&anyhow_err);
+ | ^^^^^^^^^^^
+ |
+ = help: consider importing `thiserror_ext::AsReport` and recording the error as a field with `error = %.as_report()` instead
+
+error: should not format error directly
+ --> $DIR/format_error.rs:69:27
+ |
+LL | tracing::field::debug(make_anyhow_err());
+ | ^^^^^^^^^^^^^^^^^
+ |
+ = help: consider importing `thiserror_ext::AsReport` and recording the error as a field with `error = %.as_report()` instead
+
+error: should not format error directly
+ --> $DIR/format_error.rs:71:24
+ |
+LL | let _ = anyhow_err.to_string();
+ | ^^^^^^^^^^^
+ |
+ = help: consider importing `thiserror_ext::AsReport` and using `.to_report_string()` instead
+
+error: should not format error directly
+ --> $DIR/format_error.rs:72:28
+ |
+LL | let _ = (&&anyhow_err).to_string();
+ | ^^^^^^^^^^^
+ |
+ = help: consider importing `thiserror_ext::AsReport` and using `.to_report_string()` instead
+
+error: should not format error directly
+ --> $DIR/format_error.rs:74:27
+ |
+LL | let _ = anyhow!("{}", anyhow_err);
+ | ^^^^^^^^^^
+ |
+ = help: consider removing the redundant wrapping of `anyhow::anyhow!(..)`
+
+error: should not format error directly
+ --> $DIR/format_error.rs:75:50
+ |
+LL | let _ = anyhow!("some error occurred: {:?}", anyhow_err);
+ | ^^^^^^^^^^
+ |
+ = help: consider using `.context(..)` to attach additional message to the error and make it an error source instead
+
+error: aborting due to 43 previous errors
diff --git a/proto/buf.yaml b/proto/buf.yaml
index 1aa31816ce0af..abad30f04506c 100644
--- a/proto/buf.yaml
+++ b/proto/buf.yaml
@@ -1,7 +1,8 @@
version: v1
breaking:
use:
- - WIRE # https://docs.buf.build/breaking/rules
+ - WIRE_JSON # https://docs.buf.build/breaking/rules
+ # https://github.com/risingwavelabs/risingwave/issues/15030
lint:
use:
- DEFAULT
diff --git a/proto/plan_common.proto b/proto/plan_common.proto
index 82f9fbc63a0f8..79a1b1622704e 100644
--- a/proto/plan_common.proto
+++ b/proto/plan_common.proto
@@ -54,10 +54,8 @@ message ColumnDesc {
// This field is used to represent the connector-spec additional column type.
// UNSPECIFIED or unset for normal column.
- // deprecated, use AdditionalColumn instead
- // AdditionalColumnType additional_column_type = 9;
- reserved "additional_column_type";
- reserved 9;
+ // deprecated, use AdditionalColumn instead, keep for compatibility with v1.6.x
+ AdditionalColumnType additional_column_type = 9;
ColumnDescVersion version = 10;
@@ -136,6 +134,7 @@ enum FormatType {
FORMAT_TYPE_CANAL = 5;
FORMAT_TYPE_UPSERT = 6;
FORMAT_TYPE_PLAIN = 7;
+ FORMAT_TYPE_NONE = 8;
}
enum EncodeType {
@@ -147,6 +146,7 @@ enum EncodeType {
ENCODE_TYPE_JSON = 5;
ENCODE_TYPE_BYTES = 6;
ENCODE_TYPE_TEMPLATE = 7;
+ ENCODE_TYPE_NONE = 8;
}
enum RowFormatType {
@@ -216,3 +216,14 @@ message AdditionalColumn {
AdditionalColumnHeaders headers = 7;
}
}
+
+enum AdditionalColumnType {
+ ADDITIONAL_COLUMN_TYPE_UNSPECIFIED = 0;
+ ADDITIONAL_COLUMN_TYPE_KEY = 1;
+ ADDITIONAL_COLUMN_TYPE_TIMESTAMP = 2;
+ ADDITIONAL_COLUMN_TYPE_PARTITION = 3;
+ ADDITIONAL_COLUMN_TYPE_OFFSET = 4;
+ ADDITIONAL_COLUMN_TYPE_HEADER = 5;
+ ADDITIONAL_COLUMN_TYPE_FILENAME = 6;
+ ADDITIONAL_COLUMN_TYPE_NORMAL = 7;
+}
diff --git a/risedev.yml b/risedev.yml
index 69b0c23b05dd3..cb352daab6cf9 100644
--- a/risedev.yml
+++ b/risedev.yml
@@ -164,6 +164,17 @@ profile:
- use: compactor
# - use: prometheus
# - use: grafana
+ fs:
+ steps:
+ # - use: etcd
+ - use: meta-node
+ - use: compute-node
+ - use: frontend
+ - use: opendal
+ engine: fs
+ - use: compactor
+ # - use: prometheus
+ # - use: grafana
webhdfs:
steps:
# - use: etcd
@@ -872,27 +883,7 @@ profile:
- use: frontend
- use: compactor
- ci-deltalake-test:
- config-path: src/config/ci.toml
- steps:
- - use: minio
- - use: meta-node
- - use: compute-node
- enable-tiered-cache: true
- - use: frontend
- - use: compactor
-
- ci-clickhouse-test:
- config-path: src/config/ci.toml
- steps:
- - use: minio
- - use: meta-node
- - use: compute-node
- enable-tiered-cache: true
- - use: frontend
- - use: compactor
-
- ci-pulsar-test:
+ ci-sink-test:
config-path: src/config/ci.toml
steps:
- use: minio
diff --git a/src/batch/src/error.rs b/src/batch/src/error.rs
index 5631707e2f422..f4d7341cbc6dd 100644
--- a/src/batch/src/error.rs
+++ b/src/batch/src/error.rs
@@ -20,6 +20,7 @@ pub use anyhow::anyhow;
use risingwave_common::array::ArrayError;
use risingwave_common::error::BoxedError;
use risingwave_common::util::value_encoding::error::ValueEncodingError;
+use risingwave_connector::error::ConnectorError;
use risingwave_dml::error::DmlError;
use risingwave_expr::ExprError;
use risingwave_pb::PbFieldNotFound;
@@ -156,3 +157,9 @@ impl From for Status {
Self::from(&err)
}
}
+
+impl From for BatchError {
+ fn from(value: ConnectorError) -> Self {
+ Self::Connector(value.into())
+ }
+}
diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs
index caf1289220d48..9c24e554c8e1f 100644
--- a/src/batch/src/executor/iceberg_scan.rs
+++ b/src/batch/src/executor/iceberg_scan.rs
@@ -116,11 +116,7 @@ impl IcebergScanExecutor {
#[try_stream(ok = DataChunk, error = BatchError)]
async fn do_execute(self: Box) {
- let table = self
- .iceberg_config
- .load_table()
- .await
- .map_err(BatchError::Internal)?;
+ let table = self.iceberg_config.load_table().await?;
let table_scan: TableScan = table
.new_scan_builder()
diff --git a/src/batch/src/executor/source.rs b/src/batch/src/executor/source.rs
index e67d4b26f850d..fe053ff63dfc8 100644
--- a/src/batch/src/executor/source.rs
+++ b/src/batch/src/executor/source.rs
@@ -87,6 +87,7 @@ impl BoxedExecutorBuilder for SourceExecutor {
};
let source_ctrl_opts = SourceCtrlOpts {
chunk_size: source.context().get_config().developer.chunk_size,
+ rate_limit: None,
};
let column_ids: Vec<_> = source_node
diff --git a/src/cmd/src/lib.rs b/src/cmd/src/lib.rs
index ce110c9effc17..13d5fcad5ec8c 100644
--- a/src/cmd/src/lib.rs
+++ b/src/cmd/src/lib.rs
@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+use risingwave_common::error::v2::AsReport as _;
use risingwave_compactor::CompactorOpts;
use risingwave_compute::ComputeNodeOpts;
use risingwave_ctl::CliOpts as CtlOpts;
@@ -67,13 +68,12 @@ pub fn ctl(opts: CtlOpts) {
// Note: Use a simple current thread runtime for ctl.
// When there's a heavy workload, multiple thread runtime seems to respond slowly. May need
// further investigation.
- tokio::runtime::Builder::new_current_thread()
+ if let Err(e) = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(risingwave_ctl::start(opts))
- .inspect_err(|e| {
- eprintln!("{:#?}", e);
- })
- .unwrap();
+ {
+ eprintln!("Error: {:#?}", e.as_report());
+ }
}
diff --git a/src/cmd_all/Cargo.toml b/src/cmd_all/Cargo.toml
index bb57fbfe88a09..c5f193ef8a2a3 100644
--- a/src/cmd_all/Cargo.toml
+++ b/src/cmd_all/Cargo.toml
@@ -58,6 +58,7 @@ workspace-hack = { path = "../workspace-hack" }
expect-test = "1"
[build-dependencies]
+thiserror-ext = { workspace = true }
vergen = { version = "8", default-features = false, features = [
"build",
"git",
diff --git a/src/cmd_all/build.rs b/src/cmd_all/build.rs
index a4a7c27e65685..38d9f2d7107a6 100644
--- a/src/cmd_all/build.rs
+++ b/src/cmd_all/build.rs
@@ -12,11 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+use thiserror_ext::AsReport;
use vergen::EmitBuilder;
fn main() {
if let Err(e) = EmitBuilder::builder().git_sha(true).fail_on_error().emit() {
// Leave the environment variable unset if error occurs.
- println!("cargo:warning={}", e)
+ println!("cargo:warning={}", e.as_report())
}
}
diff --git a/src/cmd_all/src/bin/risingwave.rs b/src/cmd_all/src/bin/risingwave.rs
index 2c167fc1bdc20..e9173abefe1df 100644
--- a/src/cmd_all/src/bin/risingwave.rs
+++ b/src/cmd_all/src/bin/risingwave.rs
@@ -239,6 +239,7 @@ fn standalone(opts: StandaloneOpts) {
/// high level options to standalone mode node-level options.
/// We will start a standalone instance, with all nodes in the same process.
fn single_node(opts: SingleNodeOpts) {
+ opts.create_store_directories().unwrap();
let opts = risingwave_cmd_all::map_single_node_opts_to_standalone_opts(&opts);
let settings = risingwave_rt::LoggerSettings::from_opts(&opts)
.with_target("risingwave_storage", Level::WARN)
diff --git a/src/cmd_all/src/single_node.rs b/src/cmd_all/src/single_node.rs
index b89f861f6e4fd..042a0feee9863 100644
--- a/src/cmd_all/src/single_node.rs
+++ b/src/cmd_all/src/single_node.rs
@@ -14,6 +14,7 @@
use std::sync::LazyLock;
+use anyhow::Result;
use clap::Parser;
use home::home_dir;
use risingwave_common::config::{AsyncStackTraceOption, MetaBackend};
@@ -64,7 +65,7 @@ pub struct SingleNodeOpts {
/// The store directory used by meta store and object store.
#[clap(long, env = "RW_SINGLE_NODE_STORE_DIRECTORY")]
- store_directory: Option,
+ pub store_directory: Option,
/// The address of the meta node.
#[clap(long, env = "RW_SINGLE_NODE_META_ADDR")]
@@ -142,6 +143,7 @@ pub fn map_single_node_opts_to_standalone_opts(opts: &SingleNodeOpts) -> ParsedS
}
}
+// Defaults
impl SingleNodeOpts {
fn default_frontend_opts() -> FrontendOpts {
FrontendOpts {
@@ -227,3 +229,15 @@ impl SingleNodeOpts {
}
}
}
+
+impl SingleNodeOpts {
+ pub fn create_store_directories(&self) -> Result<()> {
+ let store_directory = self
+ .store_directory
+ .as_ref()
+ .unwrap_or_else(|| &*DEFAULT_STORE_DIRECTORY);
+ std::fs::create_dir_all(format!("{}/meta_store", store_directory))?;
+ std::fs::create_dir_all(format!("{}/state_store", store_directory))?;
+ Ok(())
+ }
+}
diff --git a/src/common/fields-derive/src/gen/test_empty_pk.rs b/src/common/fields-derive/src/gen/test_empty_pk.rs
new file mode 100644
index 0000000000000..ffb5ff268bed1
--- /dev/null
+++ b/src/common/fields-derive/src/gen/test_empty_pk.rs
@@ -0,0 +1,29 @@
+impl ::risingwave_common::types::Fields for Data {
+ const PRIMARY_KEY: Option<&'static [usize]> = Some(&[]);
+ fn fields() -> Vec<(&'static str, ::risingwave_common::types::DataType)> {
+ vec![
+ ("v1", < i16 as ::risingwave_common::types::WithDataType >
+ ::default_data_type()), ("v2", < String as
+ ::risingwave_common::types::WithDataType > ::default_data_type())
+ ]
+ }
+ fn into_owned_row(self) -> ::risingwave_common::row::OwnedRow {
+ ::risingwave_common::row::OwnedRow::new(
+ vec![
+ ::risingwave_common::types::ToOwnedDatum::to_owned_datum(self.v1),
+ ::risingwave_common::types::ToOwnedDatum::to_owned_datum(self.v2)
+ ],
+ )
+ }
+}
+impl From for ::risingwave_common::types::ScalarImpl {
+ fn from(v: Data) -> Self {
+ ::risingwave_common::types::StructValue::new(
+ vec![
+ ::risingwave_common::types::ToOwnedDatum::to_owned_datum(v.v1),
+ ::risingwave_common::types::ToOwnedDatum::to_owned_datum(v.v2)
+ ],
+ )
+ .into()
+ }
+}
diff --git a/src/common/fields-derive/src/gen/test_no_pk.rs b/src/common/fields-derive/src/gen/test_no_pk.rs
new file mode 100644
index 0000000000000..9e1b3e7892969
--- /dev/null
+++ b/src/common/fields-derive/src/gen/test_no_pk.rs
@@ -0,0 +1,29 @@
+impl ::risingwave_common::types::Fields for Data {
+ const PRIMARY_KEY: Option<&'static [usize]> = None;
+ fn fields() -> Vec<(&'static str, ::risingwave_common::types::DataType)> {
+ vec![
+ ("v1", < i16 as ::risingwave_common::types::WithDataType >
+ ::default_data_type()), ("v2", < String as
+ ::risingwave_common::types::WithDataType > ::default_data_type())
+ ]
+ }
+ fn into_owned_row(self) -> ::risingwave_common::row::OwnedRow {
+ ::risingwave_common::row::OwnedRow::new(
+ vec![
+ ::risingwave_common::types::ToOwnedDatum::to_owned_datum(self.v1),
+ ::risingwave_common::types::ToOwnedDatum::to_owned_datum(self.v2)
+ ],
+ )
+ }
+}
+impl From for ::risingwave_common::types::ScalarImpl {
+ fn from(v: Data) -> Self {
+ ::risingwave_common::types::StructValue::new(
+ vec![
+ ::risingwave_common::types::ToOwnedDatum::to_owned_datum(v.v1),
+ ::risingwave_common::types::ToOwnedDatum::to_owned_datum(v.v2)
+ ],
+ )
+ .into()
+ }
+}
diff --git a/src/common/fields-derive/src/gen/test_output.rs b/src/common/fields-derive/src/gen/test_output.rs
index 517dcdefc7a8c..a804a379bfd4a 100644
--- a/src/common/fields-derive/src/gen/test_output.rs
+++ b/src/common/fields-derive/src/gen/test_output.rs
@@ -1,4 +1,5 @@
impl ::risingwave_common::types::Fields for Data {
+ const PRIMARY_KEY: Option<&'static [usize]> = Some(&[1usize, 0usize]);
fn fields() -> Vec<(&'static str, ::risingwave_common::types::DataType)> {
vec![
("v1", < i16 as ::risingwave_common::types::WithDataType >
@@ -21,9 +22,6 @@ impl ::risingwave_common::types::Fields for Data {
],
)
}
- fn primary_key() -> &'static [usize] {
- &[1usize, 0usize]
- }
}
impl From for ::risingwave_common::types::ScalarImpl {
fn from(v: Data) -> Self {
diff --git a/src/common/fields-derive/src/lib.rs b/src/common/fields-derive/src/lib.rs
index 86fa229a5adcd..dae648d1dc343 100644
--- a/src/common/fields-derive/src/lib.rs
+++ b/src/common/fields-derive/src/lib.rs
@@ -16,7 +16,7 @@ use proc_macro2::TokenStream;
use quote::quote;
use syn::{Data, DeriveInput, Result};
-#[proc_macro_derive(Fields, attributes(primary_key))]
+#[proc_macro_derive(Fields, attributes(primary_key, fields))]
pub fn fields(tokens: proc_macro::TokenStream) -> proc_macro::TokenStream {
inner(tokens.into()).into()
}
@@ -46,6 +46,16 @@ fn gen(tokens: TokenStream) -> Result {
));
};
+ let style = get_style(&input);
+ if let Some(style) = &style {
+ if !["Title Case", "TITLE CASE", "snake_case"].contains(&style.value().as_str()) {
+ return Err(syn::Error::new_spanned(
+ style,
+ "only `Title Case`, `TITLE CASE`, and `snake_case` are supported",
+ ));
+ }
+ }
+
let fields_rw: Vec = struct_
.fields
.iter()
@@ -55,6 +65,12 @@ fn gen(tokens: TokenStream) -> Result {
if name.starts_with("r#") {
name = name[2..].to_string();
}
+ // cast style
+ match style.as_ref().map_or(String::new(), |f| f.value()).as_str() {
+ "Title Case" => name = to_title_case(&name),
+ "TITLE CASE" => name = to_title_case(&name).to_uppercase(),
+ _ => {}
+ }
let ty = &field.ty;
quote! {
(#name, <#ty as ::risingwave_common::types::WithDataType>::default_data_type())
@@ -66,16 +82,17 @@ fn gen(tokens: TokenStream) -> Result {
.iter()
.map(|field| field.ident.as_ref().expect("field no name"))
.collect::>();
- let primary_key = get_primary_key(&input).map(|indices| {
- quote! {
- fn primary_key() -> &'static [usize] {
- &[#(#indices),*]
- }
- }
- });
+ let primary_key = get_primary_key(&input).map_or_else(
+ || quote! { None },
+ |indices| {
+ quote! { Some(&[#(#indices),*]) }
+ },
+ );
Ok(quote! {
impl ::risingwave_common::types::Fields for #ident {
+ const PRIMARY_KEY: Option<&'static [usize]> = #primary_key;
+
fn fields() -> Vec<(&'static str, ::risingwave_common::types::DataType)> {
vec![#(#fields_rw),*]
}
@@ -84,7 +101,6 @@ fn gen(tokens: TokenStream) -> Result {
::risingwave_common::types::ToOwnedDatum::to_owned_datum(self.#names)
),*])
}
- #primary_key
}
impl From<#ident> for ::risingwave_common::types::ScalarImpl {
fn from(v: #ident) -> Self {
@@ -117,7 +133,9 @@ fn get_primary_key(input: &syn::DeriveInput) -> Option> {
return Some(
keys.to_string()
.split(',')
- .map(|s| index(s.trim()))
+ .map(|s| s.trim())
+ .filter(|s| !s.is_empty())
+ .map(index)
.collect(),
);
}
@@ -132,6 +150,46 @@ fn get_primary_key(input: &syn::DeriveInput) -> Option> {
None
}
+/// Get name style from `#[fields(style = "xxx")]` attribute.
+fn get_style(input: &syn::DeriveInput) -> Option {
+ let style = input.attrs.iter().find_map(|attr| match &attr.meta {
+ syn::Meta::List(list) if list.path.is_ident("fields") => {
+ let name_value: syn::MetaNameValue = syn::parse2(list.tokens.clone()).ok()?;
+ if name_value.path.is_ident("style") {
+ Some(name_value.value)
+ } else {
+ None
+ }
+ }
+ _ => None,
+ })?;
+ match style {
+ syn::Expr::Lit(lit) => match lit.lit {
+ syn::Lit::Str(s) => Some(s),
+ _ => None,
+ },
+ _ => None,
+ }
+}
+
+/// Convert `snake_case` to `Title Case`.
+fn to_title_case(s: &str) -> String {
+ let mut title = String::new();
+ let mut next_upper = true;
+ for c in s.chars() {
+ if c == '_' {
+ title.push(' ');
+ next_upper = true;
+ } else if next_upper {
+ title.push(c.to_uppercase().next().unwrap());
+ next_upper = false;
+ } else {
+ title.push(c);
+ }
+ }
+ title
+}
+
#[cfg(test)]
mod tests {
use indoc::indoc;
@@ -143,6 +201,18 @@ mod tests {
prettyplease::unparse(&output)
}
+ fn do_test(code: &str, expected_path: &str) {
+ let input: TokenStream = str::parse(code).unwrap();
+
+ let output = super::gen(input).unwrap();
+
+ let output = pretty_print(output);
+
+ let expected = expect_test::expect_file![expected_path];
+
+ expected.assert_eq(&output);
+ }
+
#[test]
fn test_gen() {
let code = indoc! {r#"
@@ -157,14 +227,33 @@ mod tests {
}
"#};
- let input: TokenStream = str::parse(code).unwrap();
+ do_test(code, "gen/test_output.rs");
+ }
- let output = super::gen(input).unwrap();
+ #[test]
+ fn test_no_pk() {
+ let code = indoc! {r#"
+ #[derive(Fields)]
+ struct Data {
+ v1: i16,
+ v2: String,
+ }
+ "#};
- let output = pretty_print(output);
+ do_test(code, "gen/test_no_pk.rs");
+ }
- let expected = expect_test::expect_file!["gen/test_output.rs"];
+ #[test]
+ fn test_empty_pk() {
+ let code = indoc! {r#"
+ #[derive(Fields)]
+ #[primary_key()]
+ struct Data {
+ v1: i16,
+ v2: String,
+ }
+ "#};
- expected.assert_eq(&output);
+ do_test(code, "gen/test_empty_pk.rs");
}
}
diff --git a/src/common/src/catalog/column.rs b/src/common/src/catalog/column.rs
index f82e96a80c0e2..82d2f22f41cb4 100644
--- a/src/common/src/catalog/column.rs
+++ b/src/common/src/catalog/column.rs
@@ -170,6 +170,7 @@ impl ColumnDesc {
type_name: self.type_name.clone(),
generated_or_default_column: self.generated_or_default_column.clone(),
description: self.description.clone(),
+ additional_column_type: 0, // deprecated
additional_column: Some(self.additional_column.clone()),
version: self.version as i32,
}
@@ -305,6 +306,7 @@ impl From<&ColumnDesc> for PbColumnDesc {
type_name: c.type_name.clone(),
generated_or_default_column: c.generated_or_default_column.clone(),
description: c.description.clone(),
+ additional_column_type: 0, // deprecated
additional_column: c.additional_column.clone().into(),
version: c.version as i32,
}
diff --git a/src/common/src/catalog/test_utils.rs b/src/common/src/catalog/test_utils.rs
index 9930a5717b849..ae87b3a881f84 100644
--- a/src/common/src/catalog/test_utils.rs
+++ b/src/common/src/catalog/test_utils.rs
@@ -60,6 +60,7 @@ impl ColumnDescTestExt for ColumnDesc {
field_descs: fields,
generated_or_default_column: None,
description: None,
+ additional_column_type: 0, // deprecated
additional_column: Some(AdditionalColumn { column_type: None }),
version: ColumnDescVersion::Pr13707 as i32,
}
diff --git a/src/common/src/config.rs b/src/common/src/config.rs
index 971fb28d208c2..b1415a00b1362 100644
--- a/src/common/src/config.rs
+++ b/src/common/src/config.rs
@@ -391,9 +391,9 @@ impl<'de> Deserialize<'de> for DefaultParallelism {
VirtualNode::COUNT
)))?
} else {
- NonZeroUsize::new(i)
- .context("default parallelism should be greater than 0")
- .map_err(|e| serde::de::Error::custom(e.to_string()))?
+ NonZeroUsize::new(i).ok_or_else(|| {
+ serde::de::Error::custom("default parallelism should be greater than 0")
+ })?
})),
}
}
diff --git a/src/common/src/field_generator/mod.rs b/src/common/src/field_generator/mod.rs
index 0309798068854..679d60ba1f188 100644
--- a/src/common/src/field_generator/mod.rs
+++ b/src/common/src/field_generator/mod.rs
@@ -18,6 +18,7 @@ mod varchar;
use std::time::Duration;
+// TODO(error-handling): use a new error type
use anyhow::{anyhow, Result};
use chrono::{DateTime, FixedOffset};
pub use numeric::*;
diff --git a/src/common/src/lib.rs b/src/common/src/lib.rs
index 980897d5636e7..313c0bada6616 100644
--- a/src/common/src/lib.rs
+++ b/src/common/src/lib.rs
@@ -92,9 +92,9 @@ pub const UNKNOWN_GIT_SHA: &str = "unknown";
// The single source of truth of the pg parameters, Used in ConfigMap and current_cluster_version.
// The version of PostgreSQL that Risingwave claims to be.
-pub const PG_VERSION: &str = "9.5.0";
+pub const PG_VERSION: &str = "13.14.0";
/// The version of PostgreSQL that Risingwave claims to be.
-pub const SERVER_VERSION_NUM: i32 = 90500;
+pub const SERVER_VERSION_NUM: i32 = 130014;
/// Shows the server-side character set encoding. At present, this parameter can be shown but not set, because the encoding is determined at database creation time. It is also the default value of `client_encoding`.
pub const SERVER_ENCODING: &str = "UTF8";
/// see
diff --git a/src/common/src/system_param/mod.rs b/src/common/src/system_param/mod.rs
index 278390887dd51..82677e57e9753 100644
--- a/src/common/src/system_param/mod.rs
+++ b/src/common/src/system_param/mod.rs
@@ -340,7 +340,7 @@ macro_rules! impl_set_system_param {
)*
_ => {
Err(format!(
- "unrecognized system param {:?}",
+ "unrecognized system parameter {:?}",
key
))
}
diff --git a/src/common/src/types/datetime.rs b/src/common/src/types/datetime.rs
index af6b54b057c82..c609017d06e3f 100644
--- a/src/common/src/types/datetime.rs
+++ b/src/common/src/types/datetime.rs
@@ -328,17 +328,15 @@ impl ToText for Date {
/// ```
fn write(&self, f: &mut W) -> std::fmt::Result {
let (ce, year) = self.0.year_ce();
- if ce {
- write!(f, "{}", self.0)
- } else {
- write!(
- f,
- "{:04}-{:02}-{:02} BC",
- year,
- self.0.month(),
- self.0.day()
- )
- }
+ let suffix = if ce { "" } else { " BC" };
+ write!(
+ f,
+ "{:04}-{:02}-{:02}{}",
+ year,
+ self.0.month(),
+ self.0.day(),
+ suffix
+ )
}
fn write_with_type(&self, ty: &DataType, f: &mut W) -> std::fmt::Result {
@@ -364,7 +362,17 @@ impl ToText for Time {
impl ToText for Timestamp {
fn write(&self, f: &mut W) -> std::fmt::Result {
- write!(f, "{}", self.0)
+ let (ce, year) = self.0.year_ce();
+ let suffix = if ce { "" } else { " BC" };
+ write!(
+ f,
+ "{:04}-{:02}-{:02} {}{}",
+ year,
+ self.0.month(),
+ self.0.day(),
+ self.0.time(),
+ suffix
+ )
}
fn write_with_type(&self, ty: &DataType, f: &mut W) -> std::fmt::Result {
diff --git a/src/common/src/types/fields.rs b/src/common/src/types/fields.rs
index f52717297792e..df1795804af00 100644
--- a/src/common/src/types/fields.rs
+++ b/src/common/src/types/fields.rs
@@ -58,17 +58,18 @@ use crate::util::chunk_coalesce::DataChunkBuilder;
/// }
/// ```
pub trait Fields {
+ /// The primary key of the table.
+ ///
+ /// - `None` if the primary key is not applicable.
+ /// - `Some(&[])` if the primary key is empty, i.e., there'll be at most one row in the table.
+ const PRIMARY_KEY: Option<&'static [usize]>;
+
/// Return the schema of the struct.
fn fields() -> Vec<(&'static str, DataType)>;
/// Convert the struct to an `OwnedRow`.
fn into_owned_row(self) -> OwnedRow;
- /// The primary key of the table.
- fn primary_key() -> &'static [usize] {
- &[]
- }
-
/// Create a [`DataChunkBuilder`](crate::util::chunk_coalesce::DataChunkBuilder) with the schema of the struct.
fn data_chunk_builder(capacity: usize) -> DataChunkBuilder {
DataChunkBuilder::new(
diff --git a/src/common/src/types/serial.rs b/src/common/src/types/serial.rs
index 9bfbf5e4fcac7..5c84c95fa0f7a 100644
--- a/src/common/src/types/serial.rs
+++ b/src/common/src/types/serial.rs
@@ -26,6 +26,12 @@ use crate::util::row_id::RowId;
#[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd, Default, Hash)]
pub struct Serial(i64);
+impl From for i64 {
+ fn from(value: Serial) -> i64 {
+ value.0
+ }
+}
+
impl From for Serial {
fn from(value: i64) -> Self {
Self(value)
diff --git a/src/common/src/types/timestamptz.rs b/src/common/src/types/timestamptz.rs
index 41359bdf84377..f14d5d2edee6e 100644
--- a/src/common/src/types/timestamptz.rs
+++ b/src/common/src/types/timestamptz.rs
@@ -17,7 +17,7 @@ use std::io::Write;
use std::str::FromStr;
use bytes::{Bytes, BytesMut};
-use chrono::{TimeZone, Utc};
+use chrono::{DateTime, Datelike, TimeZone, Utc};
use chrono_tz::Tz;
use postgres_types::{accepts, to_sql_checked, IsNull, ToSql, Type};
use serde::{Deserialize, Serialize};
@@ -201,6 +201,26 @@ impl std::fmt::Display for Timestamptz {
}
}
+pub fn write_date_time_tz(
+ instant_local: DateTime,
+ writer: &mut impl std::fmt::Write,
+) -> std::fmt::Result {
+ let date = instant_local.date_naive();
+ let (ce, year) = date.year_ce();
+ write!(
+ writer,
+ "{:04}-{:02}-{:02} {}",
+ year,
+ date.month(),
+ date.day(),
+ instant_local.format(if ce {
+ "%H:%M:%S%.f%:z"
+ } else {
+ "%H:%M:%S%.f%:z BC"
+ })
+ )
+}
+
#[cfg(test)]
mod test {
use super::*;
diff --git a/src/connector/src/aws_utils.rs b/src/connector/src/aws_utils.rs
index cf70a90e07cda..1578c7b844422 100644
--- a/src/connector/src/aws_utils.rs
+++ b/src/connector/src/aws_utils.rs
@@ -21,6 +21,7 @@ use aws_sdk_s3::{client as s3_client, config as s3_config};
use url::Url;
use crate::common::AwsAuthProps;
+use crate::error::ConnectorResult;
const AWS_CUSTOM_CONFIG_KEY: [&str; 3] = ["retry_times", "conn_timeout", "read_timeout"];
@@ -106,7 +107,7 @@ pub fn s3_client(
pub async fn load_file_descriptor_from_s3(
location: &Url,
config: &AwsAuthProps,
-) -> anyhow::Result> {
+) -> ConnectorResult> {
let bucket = location
.domain()
.with_context(|| format!("illegal file path {}", location))?;
diff --git a/src/connector/src/common.rs b/src/connector/src/common.rs
index 418155250e74e..d5944eb07fa3c 100644
--- a/src/connector/src/common.rs
+++ b/src/connector/src/common.rs
@@ -17,7 +17,7 @@ use std::collections::HashMap;
use std::io::Write;
use std::time::Duration;
-use anyhow::{anyhow, Context, Ok};
+use anyhow::{anyhow, Context};
use async_nats::jetstream::consumer::DeliverPolicy;
use async_nats::jetstream::{self};
use aws_sdk_kinesis::Client as KinesisClient;
@@ -35,6 +35,7 @@ use with_options::WithOptions;
use crate::aws_utils::load_file_descriptor_from_s3;
use crate::deserialize_duration_from_string;
+use crate::error::ConnectorResult;
use crate::sink::SinkError;
use crate::source::nats::source::NatsOffset;
// The file describes the common abstractions for each connector and can be used in both source and
@@ -72,7 +73,7 @@ pub struct AwsAuthProps {
}
impl AwsAuthProps {
- async fn build_region(&self) -> anyhow::Result {
+ async fn build_region(&self) -> ConnectorResult {
if let Some(region_name) = &self.region {
Ok(Region::new(region_name.clone()))
} else {
@@ -85,11 +86,11 @@ impl AwsAuthProps {
.build()
.region()
.await
- .ok_or_else(|| anyhow::format_err!("region should be provided"))?)
+ .context("region should be provided")?)
}
}
- fn build_credential_provider(&self) -> anyhow::Result {
+ fn build_credential_provider(&self) -> ConnectorResult {
if self.access_key.is_some() && self.secret_key.is_some() {
Ok(SharedCredentialsProvider::new(
aws_credential_types::Credentials::from_keys(
@@ -99,16 +100,14 @@ impl AwsAuthProps {
),
))
} else {
- Err(anyhow!(
- "Both \"access_key\" and \"secret_access\" are required."
- ))
+ bail!("Both \"access_key\" and \"secret_access\" are required.")
}
}
async fn with_role_provider(
&self,
credential: SharedCredentialsProvider,
- ) -> anyhow::Result {
+ ) -> ConnectorResult {
if let Some(role_name) = &self.arn {
let region = self.build_region().await?;
let mut role = AssumeRoleProvider::builder(role_name)
@@ -124,7 +123,7 @@ impl AwsAuthProps {
}
}
- pub async fn build_config(&self) -> anyhow::Result {
+ pub async fn build_config(&self) -> ConnectorResult {
let region = self.build_region().await?;
let credentials_provider = self
.with_role_provider(self.build_credential_provider()?)
@@ -386,12 +385,19 @@ pub struct PulsarOauthCommon {
pub scope: Option,
}
+fn create_credential_temp_file(credentials: &[u8]) -> std::io::Result {
+ let mut f = NamedTempFile::new()?;
+ f.write_all(credentials)?;
+ f.as_file().sync_all()?;
+ Ok(f)
+}
+
impl PulsarCommon {
pub(crate) async fn build_client(
&self,
oauth: &Option,
aws_auth_props: &AwsAuthProps,
- ) -> anyhow::Result> {
+ ) -> ConnectorResult> {
let mut pulsar_builder = Pulsar::builder(&self.service_url, TokioExecutor);
let mut temp_file = None;
if let Some(oauth) = oauth.as_ref() {
@@ -399,10 +405,10 @@ impl PulsarCommon {
match url.scheme() {
"s3" => {
let credentials = load_file_descriptor_from_s3(&url, aws_auth_props).await?;
- let mut f = NamedTempFile::new()?;
- f.write_all(&credentials)?;
- f.as_file().sync_all()?;
- temp_file = Some(f);
+ temp_file = Some(
+ create_credential_temp_file(&credentials)
+ .context("failed to create temp file for pulsar credentials")?,
+ );
}
"file" => {}
_ => {
@@ -477,7 +483,7 @@ pub struct KinesisCommon {
}
impl KinesisCommon {
- pub(crate) async fn build_client(&self) -> anyhow::Result {
+ pub(crate) async fn build_client(&self) -> ConnectorResult {
let config = AwsAuthProps {
region: Some(self.stream_region.clone()),
endpoint: self.endpoint.clone(),
@@ -539,7 +545,7 @@ pub struct NatsCommon {
}
impl NatsCommon {
- pub(crate) async fn build_client(&self) -> anyhow::Result {
+ pub(crate) async fn build_client(&self) -> ConnectorResult {
let mut connect_options = async_nats::ConnectOptions::new();
match self.connect_mode.as_str() {
"user_and_password" => {
@@ -582,7 +588,7 @@ impl NatsCommon {
Ok(client)
}
- pub(crate) async fn build_context(&self) -> anyhow::Result {
+ pub(crate) async fn build_context(&self) -> ConnectorResult {
let client = self.build_client().await?;
let jetstream = async_nats::jetstream::new(client);
Ok(jetstream)
@@ -593,7 +599,7 @@ impl NatsCommon {
stream: String,
split_id: String,
start_sequence: NatsOffset,
- ) -> anyhow::Result<
+ ) -> ConnectorResult<
async_nats::jetstream::consumer::Consumer,
> {
let context = self.build_context().await?;
@@ -612,13 +618,16 @@ impl NatsCommon {
NatsOffset::Earliest => DeliverPolicy::All,
NatsOffset::Latest => DeliverPolicy::Last,
NatsOffset::SequenceNumber(v) => {
- let parsed = v.parse::()?;
+ let parsed = v
+ .parse::()
+ .context("failed to parse nats offset as sequence number")?;
DeliverPolicy::ByStartSequence {
start_sequence: 1 + parsed,
}
}
NatsOffset::Timestamp(v) => DeliverPolicy::ByStartTime {
- start_time: OffsetDateTime::from_unix_timestamp_nanos(v * 1_000_000)?,
+ start_time: OffsetDateTime::from_unix_timestamp_nanos(v * 1_000_000)
+ .context("invalid timestamp for nats offset")?,
},
NatsOffset::None => DeliverPolicy::All,
};
@@ -635,7 +644,7 @@ impl NatsCommon {
&self,
jetstream: jetstream::Context,
stream: String,
- ) -> anyhow::Result {
+ ) -> ConnectorResult {
let subjects: Vec = self.subject.split(',').map(|s| s.to_string()).collect();
let mut config = jetstream::stream::Config {
name: stream,
@@ -662,7 +671,7 @@ impl NatsCommon {
Ok(stream)
}
- pub(crate) fn create_credential(&self, seed: &str, jwt: &str) -> anyhow::Result {
+ pub(crate) fn create_credential(&self, seed: &str, jwt: &str) -> ConnectorResult {
let creds = format!(
"-----BEGIN NATS USER JWT-----\n{}\n------END NATS USER JWT------\n\n\
************************* IMPORTANT *************************\n\
diff --git a/src/connector/src/error.rs b/src/connector/src/error.rs
index 4cf36e9859d36..3dc10af3d8e7a 100644
--- a/src/connector/src/error.rs
+++ b/src/connector/src/error.rs
@@ -13,13 +13,57 @@
// limitations under the License.
use risingwave_common::error::v2::def_anyhow_newtype;
+use risingwave_pb::PbFieldNotFound;
+use risingwave_rpc_client::error::RpcError;
+
+use crate::parser::AccessError;
+use crate::schema::schema_registry::{ConcurrentRequestError, WireFormatError};
+use crate::schema::InvalidOptionError;
+use crate::sink::SinkError;
def_anyhow_newtype! {
pub ConnectorError,
+ // Common errors
+ std::io::Error => transparent,
+
+ // Fine-grained connector errors
+ AccessError => transparent,
+ WireFormatError => transparent,
+ ConcurrentRequestError => transparent,
+ InvalidOptionError => transparent,
+ SinkError => transparent,
+ PbFieldNotFound => transparent,
+
// TODO(error-handling): Remove implicit contexts below and specify ad-hoc context for each conversion.
+
+ // Parsing errors
+ url::ParseError => "failed to parse url",
+ serde_json::Error => "failed to parse json",
+ csv::Error => "failed to parse csv",
+
+ // Connector errors
+ opendal::Error => transparent, // believed to be self-explanatory
+
mysql_async::Error => "MySQL error",
tokio_postgres::Error => "Postgres error",
+ apache_avro::Error => "Avro error",
+ rdkafka::error::KafkaError => "Kafka error",
+ pulsar::Error => "Pulsar error",
+ async_nats::jetstream::consumer::StreamError => "Nats error",
+ async_nats::jetstream::consumer::pull::MessagesError => "Nats error",
+ async_nats::jetstream::context::CreateStreamError => "Nats error",
+ async_nats::jetstream::stream::ConsumerError => "Nats error",
+ icelake::Error => "Iceberg error",
+ redis::RedisError => "Redis error",
+ arrow_schema::ArrowError => "Arrow error",
+ google_cloud_pubsub::client::google_cloud_auth::error::Error => "Google Cloud error",
}
-pub type ConnectorResult = Result;
+pub type ConnectorResult = std::result::Result;
+
+impl From for RpcError {
+ fn from(value: ConnectorError) -> Self {
+ RpcError::Internal(value.0)
+ }
+}
diff --git a/src/connector/src/macros.rs b/src/connector/src/macros.rs
index 9a2383dbb4a96..d4a546c6a00a7 100644
--- a/src/connector/src/macros.rs
+++ b/src/connector/src/macros.rs
@@ -36,7 +36,8 @@ macro_rules! for_all_classified_sources {
{ Gcs, $crate::source::filesystem::opendal_source::GcsProperties , $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalGcs> },
{ OpendalS3, $crate::source::filesystem::opendal_source::OpendalS3Properties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalS3> },
{ PosixFs, $crate::source::filesystem::opendal_source::PosixFsProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalPosixFs> },
- { Test, $crate::source::test_source::TestSourceProperties, $crate::source::test_source::TestSourceSplit}
+ { Test, $crate::source::test_source::TestSourceProperties, $crate::source::test_source::TestSourceSplit},
+ { Iceberg, $crate::source::iceberg::IcebergProperties, $crate::source::iceberg::IcebergSplit}
}
$(
,$extra_args
@@ -167,12 +168,12 @@ macro_rules! impl_split {
$(
impl TryFrom for $split {
- type Error = anyhow::Error;
+ type Error = $crate::error::ConnectorError;
fn try_from(split: SplitImpl) -> std::result::Result {
match split {
SplitImpl::$variant_name(inner) => Ok(inner),
- other => Err(anyhow::anyhow!("expect {} but get {:?}", stringify!($split), other))
+ other => risingwave_common::bail!("expect {} but get {:?}", stringify!($split), other),
}
}
}
diff --git a/src/connector/src/parser/additional_columns.rs b/src/connector/src/parser/additional_columns.rs
index c1da30f788b3e..06cc061566690 100644
--- a/src/connector/src/parser/additional_columns.rs
+++ b/src/connector/src/parser/additional_columns.rs
@@ -27,6 +27,7 @@ use risingwave_pb::plan_common::{
AdditionalColumnTimestamp,
};
+use crate::error::ConnectorResult;
use crate::source::{
GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, OPENDAL_S3_CONNECTOR, PULSAR_CONNECTOR,
S3_CONNECTOR,
@@ -86,7 +87,7 @@ pub fn build_additional_column_catalog(
inner_field_name: Option<&str>,
data_type: Option<&str>,
reject_unknown_connector: bool,
-) -> anyhow::Result {
+) -> ConnectorResult {
let compatible_columns = match (
COMPATIBLE_ADDITIONAL_COLUMNS.get(connector_name),
reject_unknown_connector,
diff --git a/src/connector/src/parser/avro/parser.rs b/src/connector/src/parser/avro/parser.rs
index 5e876d2ce9324..7343f1c43118c 100644
--- a/src/connector/src/parser/avro/parser.rs
+++ b/src/connector/src/parser/avro/parser.rs
@@ -23,6 +23,7 @@ use risingwave_pb::plan_common::ColumnDesc;
use super::schema_resolver::ConfluentSchemaResolver;
use super::util::avro_schema_to_column_descs;
+use crate::error::ConnectorResult;
use crate::parser::unified::avro::{AvroAccess, AvroParseOptions};
use crate::parser::unified::AccessImpl;
use crate::parser::util::bytes_from_url;
@@ -40,7 +41,7 @@ pub struct AvroAccessBuilder {
}
impl AccessBuilder for AvroAccessBuilder {
- async fn generate_accessor(&mut self, payload: Vec) -> anyhow::Result> {
+ async fn generate_accessor(&mut self, payload: Vec) -> ConnectorResult> {
self.value = self.parse_avro_value(&payload, Some(&*self.schema)).await?;
Ok(AccessImpl::Avro(AvroAccess::new(
self.value.as_ref().unwrap(),
@@ -50,7 +51,7 @@ impl AccessBuilder for AvroAccessBuilder {
}
impl AvroAccessBuilder {
- pub fn new(config: AvroParserConfig, encoding_type: EncodingType) -> anyhow::Result {
+ pub fn new(config: AvroParserConfig, encoding_type: EncodingType) -> ConnectorResult {
let AvroParserConfig {
schema,
key_schema,
@@ -71,7 +72,7 @@ impl AvroAccessBuilder {
&self,
payload: &[u8],
reader_schema: Option<&Schema>,
- ) -> anyhow::Result