Skip to content

Commit

Permalink
feat(batch): support rest catalog for iceberg source (#15535)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Mar 8, 2024
1 parent 3f102e1 commit 6f9244d
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 1 deletion.
18 changes: 18 additions & 0 deletions integration_tests/iceberg-source/docker/rest/config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[risingwave]
db=dev
user=root
host=127.0.0.1
port=4566

[source]
connector=iceberg
s3.endpoint=http://minio-0:9301
s3.access.key = hummockadmin
s3.secret.key = hummockadmin
s3.region = ap-southeast-1
catalog.type=rest
catalog.name=demo
catalog.uri = http://rest:8181
warehouse.path = s3://icebergdata/demo
database.name=s1
table.name=t1
99 changes: 99 additions & 0 deletions integration_tests/iceberg-source/docker/rest/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
version: '3.8'

services:
rest:
image: tabulario/iceberg-rest:0.6.0
environment:
- AWS_ACCESS_KEY_ID=hummockadmin
- AWS_SECRET_ACCESS_KEY=hummockadmin
- AWS_REGION=us-east-1
- CATALOG_CATOLOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog
- CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory
- CATALOG_WAREHOUSE=s3://icebergdata/demo
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
- CATALOG_S3_ENDPOINT=http://minio-0:9301
depends_on:
- minio-0
networks:
- iceberg_net
links:
- minio-0:icebergdata.minio-0
expose:
- 8181

spark:
depends_on:
- minio-0
- rest
image: ghcr.io/icelake-io/icelake-spark:0.1
environment:
- AWS_ACCESS_KEY_ID=hummockadmin
- AWS_SECRET_ACCESS_KEY=hummockadmin
- AWS_REGION=us-east-1
- SPARK_HOME=/opt/spark
- PYSPARK_PYTHON=/usr/bin/python3.9
- PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/spark/bin:/opt/spark/sbin
user: root
networks:
iceberg_net:
links:
- minio-0:icebergdata.minio-0
expose:
- 15002
healthcheck:
test: netstat -ltn | grep -c 15002
interval: 1s
retries: 1200
volumes:
- ./spark-script:/spark-script
entrypoint: ["/spark-script/spark-connect-server.sh"]

risingwave-standalone:
extends:
file: ../../../../docker/docker-compose.yml
service: risingwave-standalone
healthcheck:
test:
- CMD-SHELL
- bash -c 'printf \"GET / HTTP/1.1\n\n\" > /dev/tcp/127.0.0.1/4566; exit $$?;'
interval: 1s
timeout: 30s
networks:
iceberg_net:

minio-0:
extends:
file: ../../../../docker/docker-compose.yml
service: minio-0
entrypoint: "
/bin/sh -c '
set -e
mkdir -p \"/data/icebergdata/demo\"
mkdir -p \"/data/hummock001\"
/usr/bin/docker-entrypoint.sh \"$$0\" \"$$@\"
'"
networks:
iceberg_net:

etcd-0:
extends:
file: ../../../../docker/docker-compose.yml
service: etcd-0
networks:
iceberg_net:

volumes:
risingwave-standalone:
external: false
etcd-0:
external: false
minio-0:
external: false

networks:
iceberg_net:
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/bash

set -ex

JARS=$(find /opt/spark/deps -type f -name "*.jar" | tr '\n' ':')

/opt/spark/sbin/start-connect-server.sh \
--master local[3] \
--driver-class-path $JARS \
--conf spark.driver.bindAddress=0.0.0.0 \
--conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.demo.catalog-impl=org.apache.iceberg.rest.RESTCatalog \
--conf spark.sql.catalog.demo.uri=http://rest:8181 \
--conf spark.sql.catalog.demo.s3.endpoint=http://minio-0:9301 \
--conf spark.sql.catalog.demo.s3.path.style.access=true \
--conf spark.sql.catalog.demo.s3.access.key=hummockadmin \
--conf spark.sql.catalog.demo.s3.secret.key=hummockadmin \
--conf spark.sql.defaultCatalog=demo

tail -f /opt/spark/logs/spark*.out
2 changes: 1 addition & 1 deletion integration_tests/iceberg-source/python/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def run_case(case):


if __name__ == "__main__":
case_names = ["storage"]
case_names = ["rest", "storage"]
for case_name in case_names:
print(f"Running test case: {case_name}")
run_case(case_name)
3 changes: 3 additions & 0 deletions src/connector/src/source/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ pub struct IcebergProperties {
// must be set for other catalogs.
#[serde(rename = "catalog.name")]
pub catalog_name: Option<String>,
#[serde(rename = "catalog.uri")]
pub catalog_uri: Option<String>, // URI of iceberg catalog, only applicable in rest catalog.
#[serde(rename = "database.name")]
pub database_name: Option<String>,
#[serde(rename = "table.name")]
Expand All @@ -66,6 +68,7 @@ impl IcebergProperties {
database_name: self.database_name.clone(),
table_name: self.table_name.clone(),
catalog_type: self.catalog_type.clone(),
uri: self.catalog_uri.clone(),
path: self.warehouse_path.clone(),
endpoint: Some(self.endpoint.clone()),
access_key: self.s3_access.clone(),
Expand Down
3 changes: 3 additions & 0 deletions src/connector/with_options_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ IcebergProperties:
- name: catalog.name
field_type: String
required: false
- name: catalog.uri
field_type: String
required: false
- name: database.name
field_type: String
required: false
Expand Down

0 comments on commit 6f9244d

Please sign in to comment.