From 6f9244d8bb41cf1cab3499e95a383e06a36a0b81 Mon Sep 17 00:00:00 2001 From: Dylan Date: Fri, 8 Mar 2024 13:34:45 +0800 Subject: [PATCH] feat(batch): support rest catalog for iceberg source (#15535) --- .../iceberg-source/docker/rest/config.ini | 18 ++++ .../docker/rest/docker-compose.yml | 99 +++++++++++++++++++ .../rest/spark-script/spark-connect-server.sh | 21 ++++ .../iceberg-source/python/main.py | 2 +- src/connector/src/source/iceberg/mod.rs | 3 + src/connector/with_options_source.yaml | 3 + 6 files changed, 145 insertions(+), 1 deletion(-) create mode 100644 integration_tests/iceberg-source/docker/rest/config.ini create mode 100644 integration_tests/iceberg-source/docker/rest/docker-compose.yml create mode 100755 integration_tests/iceberg-source/docker/rest/spark-script/spark-connect-server.sh diff --git a/integration_tests/iceberg-source/docker/rest/config.ini b/integration_tests/iceberg-source/docker/rest/config.ini new file mode 100644 index 000000000000..f6ed90563dae --- /dev/null +++ b/integration_tests/iceberg-source/docker/rest/config.ini @@ -0,0 +1,18 @@ +[risingwave] +db=dev +user=root +host=127.0.0.1 +port=4566 + +[source] +connector=iceberg +s3.endpoint=http://minio-0:9301 +s3.access.key = hummockadmin +s3.secret.key = hummockadmin +s3.region = ap-southeast-1 +catalog.type=rest +catalog.name=demo +catalog.uri = http://rest:8181 +warehouse.path = s3://icebergdata/demo +database.name=s1 +table.name=t1 \ No newline at end of file diff --git a/integration_tests/iceberg-source/docker/rest/docker-compose.yml b/integration_tests/iceberg-source/docker/rest/docker-compose.yml new file mode 100644 index 000000000000..025db74c23ca --- /dev/null +++ b/integration_tests/iceberg-source/docker/rest/docker-compose.yml @@ -0,0 +1,99 @@ +version: '3.8' + +services: + rest: + image: tabulario/iceberg-rest:0.6.0 + environment: + - AWS_ACCESS_KEY_ID=hummockadmin + - AWS_SECRET_ACCESS_KEY=hummockadmin + - AWS_REGION=us-east-1 + - CATALOG_CATOLOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog + - CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory + - CATALOG_WAREHOUSE=s3://icebergdata/demo + - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO + - CATALOG_S3_ENDPOINT=http://minio-0:9301 + depends_on: + - minio-0 + networks: + - iceberg_net + links: + - minio-0:icebergdata.minio-0 + expose: + - 8181 + + spark: + depends_on: + - minio-0 + - rest + image: ghcr.io/icelake-io/icelake-spark:0.1 + environment: + - AWS_ACCESS_KEY_ID=hummockadmin + - AWS_SECRET_ACCESS_KEY=hummockadmin + - AWS_REGION=us-east-1 + - SPARK_HOME=/opt/spark + - PYSPARK_PYTHON=/usr/bin/python3.9 + - PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/spark/bin:/opt/spark/sbin + user: root + networks: + iceberg_net: + links: + - minio-0:icebergdata.minio-0 + expose: + - 15002 + healthcheck: + test: netstat -ltn | grep -c 15002 + interval: 1s + retries: 1200 + volumes: + - ./spark-script:/spark-script + entrypoint: ["/spark-script/spark-connect-server.sh"] + + risingwave-standalone: + extends: + file: ../../../../docker/docker-compose.yml + service: risingwave-standalone + healthcheck: + test: + - CMD-SHELL + - bash -c 'printf \"GET / HTTP/1.1\n\n\" > /dev/tcp/127.0.0.1/4566; exit $$?;' + interval: 1s + timeout: 30s + networks: + iceberg_net: + + minio-0: + extends: + file: ../../../../docker/docker-compose.yml + service: minio-0 + entrypoint: " + + /bin/sh -c ' + + set -e + + mkdir -p \"/data/icebergdata/demo\" + mkdir -p \"/data/hummock001\" + + /usr/bin/docker-entrypoint.sh \"$$0\" \"$$@\" + + '" + networks: + iceberg_net: + + etcd-0: + extends: + file: ../../../../docker/docker-compose.yml + service: etcd-0 + networks: + iceberg_net: + +volumes: + risingwave-standalone: + external: false + etcd-0: + external: false + minio-0: + external: false + +networks: + iceberg_net: \ No newline at end of file diff --git a/integration_tests/iceberg-source/docker/rest/spark-script/spark-connect-server.sh b/integration_tests/iceberg-source/docker/rest/spark-script/spark-connect-server.sh new file mode 100755 index 000000000000..7e79ae000a06 --- /dev/null +++ b/integration_tests/iceberg-source/docker/rest/spark-script/spark-connect-server.sh @@ -0,0 +1,21 @@ +#!/bin/bash + +set -ex + +JARS=$(find /opt/spark/deps -type f -name "*.jar" | tr '\n' ':') + +/opt/spark/sbin/start-connect-server.sh \ + --master local[3] \ + --driver-class-path $JARS \ + --conf spark.driver.bindAddress=0.0.0.0 \ + --conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \ + --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ + --conf spark.sql.catalog.demo.catalog-impl=org.apache.iceberg.rest.RESTCatalog \ + --conf spark.sql.catalog.demo.uri=http://rest:8181 \ + --conf spark.sql.catalog.demo.s3.endpoint=http://minio-0:9301 \ + --conf spark.sql.catalog.demo.s3.path.style.access=true \ + --conf spark.sql.catalog.demo.s3.access.key=hummockadmin \ + --conf spark.sql.catalog.demo.s3.secret.key=hummockadmin \ + --conf spark.sql.defaultCatalog=demo + +tail -f /opt/spark/logs/spark*.out \ No newline at end of file diff --git a/integration_tests/iceberg-source/python/main.py b/integration_tests/iceberg-source/python/main.py index fd2ebcbe5f8c..f4cd77653908 100644 --- a/integration_tests/iceberg-source/python/main.py +++ b/integration_tests/iceberg-source/python/main.py @@ -113,7 +113,7 @@ def run_case(case): if __name__ == "__main__": - case_names = ["storage"] + case_names = ["rest", "storage"] for case_name in case_names: print(f"Running test case: {case_name}") run_case(case_name) diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index 76c64f2f19fa..cee743a827c1 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -50,6 +50,8 @@ pub struct IcebergProperties { // must be set for other catalogs. #[serde(rename = "catalog.name")] pub catalog_name: Option, + #[serde(rename = "catalog.uri")] + pub catalog_uri: Option, // URI of iceberg catalog, only applicable in rest catalog. #[serde(rename = "database.name")] pub database_name: Option, #[serde(rename = "table.name")] @@ -66,6 +68,7 @@ impl IcebergProperties { database_name: self.database_name.clone(), table_name: self.table_name.clone(), catalog_type: self.catalog_type.clone(), + uri: self.catalog_uri.clone(), path: self.warehouse_path.clone(), endpoint: Some(self.endpoint.clone()), access_key: self.s3_access.clone(), diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index a36557c0d7ee..abce71deba56 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -59,6 +59,9 @@ IcebergProperties: - name: catalog.name field_type: String required: false + - name: catalog.uri + field_type: String + required: false - name: database.name field_type: String required: false