From 45ef6c45bac250e351a1dda1ad366e02ffe98343 Mon Sep 17 00:00:00 2001 From: Xinhao Xu <84456268+xxhZs@users.noreply.github.com> Date: Wed, 25 Sep 2024 21:18:16 +0800 Subject: [PATCH] feat(sink): add es dynamic route (#18698) --- ci/scripts/e2e-elasticsearch-sink-test.sh | 2 + .../sink/elasticsearch/elasticsearch_sink.slt | 29 ++++++++++++++ .../elasticsearch_with_route.result | 1 + .../connector/BulkProcessorAdapter.java | 5 ++- .../ElasticBulkProcessorAdapter.java | 11 +++++- .../java/com/risingwave/connector/EsSink.java | 18 ++++++--- .../risingwave/connector/EsSinkConfig.java | 12 ++++++ .../risingwave/connector/EsSinkFactory.java | 39 ++++++++++++------- .../OpensearchBulkProcessorAdapter.java | 11 +++++- src/connector/src/sink/elasticsearch.rs | 29 ++++++++++++++ src/connector/src/sink/remote.rs | 1 + 11 files changed, 133 insertions(+), 25 deletions(-) create mode 100644 e2e_test/sink/elasticsearch/elasticsearch_with_route.result diff --git a/ci/scripts/e2e-elasticsearch-sink-test.sh b/ci/scripts/e2e-elasticsearch-sink-test.sh index 029cddbf91a90..00a1cb564efba 100755 --- a/ci/scripts/e2e-elasticsearch-sink-test.sh +++ b/ci/scripts/e2e-elasticsearch-sink-test.sh @@ -14,8 +14,10 @@ sleep 5 echo "--- checking elasticsearch sink result" curl -XGET -u elastic:risingwave "http://elasticsearch:9200/test/_search" -H 'Content-Type: application/json' -d'{"query":{"match_all":{}}}' > ./e2e_test/sink/elasticsearch/elasticsearch_sink.tmp.result curl -XGET -u elastic:risingwave "http://elasticsearch:9200/test1/_search" -H 'Content-Type: application/json' -d'{"query":{"match_all":{}}}' > ./e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.tmp.result +curl -XGET -u elastic:risingwave "http://elasticsearch:9200/test_route/_search" -H 'Content-Type: application/json' -d'{"query":{"match_all":{}}}' > ./e2e_test/sink/elasticsearch/elasticsearch_with_route.tmp.result python3 e2e_test/sink/elasticsearch/elasticsearch.py e2e_test/sink/elasticsearch/elasticsearch_sink.result e2e_test/sink/elasticsearch/elasticsearch_sink.tmp.result python3 e2e_test/sink/elasticsearch/elasticsearch.py e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.result e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.tmp.result +python3 e2e_test/sink/elasticsearch/elasticsearch.py e2e_test/sink/elasticsearch/elasticsearch_with_route.result e2e_test/sink/elasticsearch/elasticsearch_with_route.tmp.result if [ $? -ne 0 ]; then echo "The output is not as expected." exit 1 diff --git a/e2e_test/sink/elasticsearch/elasticsearch_sink.slt b/e2e_test/sink/elasticsearch/elasticsearch_sink.slt index 29e600ea450f2..842e3f3303438 100644 --- a/e2e_test/sink/elasticsearch/elasticsearch_sink.slt +++ b/e2e_test/sink/elasticsearch/elasticsearch_sink.slt @@ -13,6 +13,23 @@ CREATE TABLE t7 ( st struct, ); +statement ok +CREATE TABLE test_route ( + v1 int primary key, + v2 int, + v3 varchar, +); + +statement ok +CREATE SINK test_route_sink from test_route WITH ( + connector = 'elasticsearch', + index = 'test_route', + url = 'http://elasticsearch:9200', + username = 'elastic', + password = 'risingwave', + routing_column = 'v3' +); + statement ok CREATE SINK s7 from t7 WITH ( connector = 'elasticsearch', @@ -33,6 +50,12 @@ CREATE SINK s8 from t7 WITH ( delimiter = '_' ); +statement ok +INSERT INTO test_route VALUES + (1,1,'test1'), + (2,2,'test2'), + (3,3,'test3'); + statement ok INSERT INTO t7 VALUES (1, 2, '1-2', '1970-01-01', '00:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z',(1,1)), @@ -57,5 +80,11 @@ DROP SINK s7; statement ok DROP SINK s8; +statement ok +DROP SINK test_route_sink; + +statement ok +DROP TABLE test_route; + statement ok DROP TABLE t7; diff --git a/e2e_test/sink/elasticsearch/elasticsearch_with_route.result b/e2e_test/sink/elasticsearch/elasticsearch_with_route.result new file mode 100644 index 0000000000000..88ac7ef7233f3 --- /dev/null +++ b/e2e_test/sink/elasticsearch/elasticsearch_with_route.result @@ -0,0 +1 @@ +{"took":27,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":3,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test_route","_type":"_doc","_id":"2","_score":1.0,"_routing":"test2","_source":{"v1":2,"v2":2,"v3":"test2"}},{"_index":"test_route","_type":"_doc","_id":"3","_score":1.0,"_routing":"test3","_source":{"v1":3,"v2":3,"v3":"test3"}},{"_index":"test_route","_type":"_doc","_id":"1","_score":1.0,"_routing":"test1","_source":{"v1":1,"v2":1,"v3":"test1"}}]}} \ No newline at end of file diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java index 11fe576d76ebd..1e15b20a4288e 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java @@ -19,9 +19,10 @@ import java.util.concurrent.TimeUnit; public interface BulkProcessorAdapter { - public void addRow(String index, String key, String doc) throws InterruptedException; + public void addRow(String index, String key, String doc, String routing) + throws InterruptedException; - public void deleteRow(String index, String key) throws InterruptedException; + public void deleteRow(String index, String key, String routing) throws InterruptedException; public void flush(); diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java index 77caae8bff8e8..6054e5b9242de 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java @@ -76,21 +76,28 @@ public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException } @Override - public void addRow(String index, String key, String doc) throws InterruptedException { + public void addRow(String index, String key, String doc, String routing) + throws InterruptedException { UpdateRequest updateRequest; updateRequest = new UpdateRequest(index, "_doc", key) .doc(doc, XContentType.JSON) .retryOnConflict(this.retryOnConflict); + if (routing != null) { + updateRequest.routing(routing); + } updateRequest.docAsUpsert(true); this.requestTracker.addWriteTask(); this.esBulkProcessor.add(updateRequest); } @Override - public void deleteRow(String index, String key) throws InterruptedException { + public void deleteRow(String index, String key, String routing) throws InterruptedException { DeleteRequest deleteRequest; deleteRequest = new DeleteRequest(index, "_doc", key); + if (routing != null) { + deleteRequest.routing(routing); + } this.requestTracker.addWriteTask(); this.esBulkProcessor.add(deleteRequest); } diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java index 985c27639926d..5f9c574cde6a2 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java @@ -46,6 +46,10 @@ public class EsSink extends SinkWriterBase { private static final Logger LOG = LoggerFactory.getLogger(EsSink.class); private static final String ERROR_REPORT_TEMPLATE = "Error message %s"; + private static final int INDEX_INDEX = 0; + private static final int KEY_INDEX = 1; + private static final int DOC_INDEX = 2; + private static final int ROUTING_INDEX = 3; private final EsSinkConfig config; private BulkProcessorAdapter bulkProcessor; @@ -165,22 +169,26 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) { private void writeRow(SinkRow row) throws JsonMappingException, JsonProcessingException, InterruptedException { - final String key = (String) row.get(1); - String doc = (String) row.get(2); + final String key = (String) row.get(KEY_INDEX); + String doc = (String) row.get(DOC_INDEX); final String index; if (config.getIndex() == null) { - index = (String) row.get(0); + index = (String) row.get(INDEX_INDEX); } else { index = config.getIndex(); } + String routing = null; + if (config.getRoutingColumn() != null) { + routing = (String) row.get(ROUTING_INDEX); + } switch (row.getOp()) { case INSERT: case UPDATE_INSERT: - this.bulkProcessor.addRow(index, key, doc); + this.bulkProcessor.addRow(index, key, doc, routing); break; case DELETE: case UPDATE_DELETE: - this.bulkProcessor.deleteRow(index, key); + this.bulkProcessor.deleteRow(index, key, routing); break; default: throw Status.INVALID_ARGUMENT diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkConfig.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkConfig.java index 36dfed37dbfd2..76a0b07472153 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkConfig.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkConfig.java @@ -53,6 +53,9 @@ public class EsSinkConfig extends CommonSinkConfig { @JsonProperty(value = "concurrent_requests") private Integer concurrentRequests; + @JsonProperty(value = "routing_column") + private String routingColumn; + @JsonCreator public EsSinkConfig(@JsonProperty(value = "url") String url) { this.url = url; @@ -142,4 +145,13 @@ public EsSinkConfig withConcurrentRequests(Integer concurrentRequests) { this.concurrentRequests = concurrentRequests; return this; } + + public String getRoutingColumn() { + return routingColumn; + } + + public EsSinkConfig withRoutingColumn(String routingColumn) { + this.routingColumn = routingColumn; + return this; + } } diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java index 03e888a892df3..5108f4eab4787 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java @@ -47,7 +47,6 @@ public void validate( ObjectMapper mapper = new ObjectMapper(); mapper.configure(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES, true); EsSinkConfig config = mapper.convertValue(tableProperties, EsSinkConfig.class); - // 1. check url HttpHost host; try { @@ -55,20 +54,11 @@ public void validate( } catch (IllegalArgumentException e) { throw Status.INVALID_ARGUMENT.withDescription(e.getMessage()).asRuntimeException(); } + if (config.getRoutingColumn() != null) { + checkColumn(config.getRoutingColumn(), tableSchema, Data.DataType.TypeName.VARCHAR); + } if (config.getIndexColumn() != null) { - Data.DataType.TypeName typeName = tableSchema.getColumnType(config.getIndexColumn()); - if (typeName == null) { - throw Status.INVALID_ARGUMENT - .withDescription( - "Index column " + config.getIndexColumn() + " not found in schema") - .asRuntimeException(); - } - if (!typeName.equals(Data.DataType.TypeName.VARCHAR)) { - throw Status.INVALID_ARGUMENT - .withDescription( - "Index column must be of type String, but found " + typeName) - .asRuntimeException(); - } + checkColumn(config.getIndexColumn(), tableSchema, Data.DataType.TypeName.VARCHAR); if (config.getIndex() != null) { throw Status.INVALID_ARGUMENT .withDescription("index and index_column cannot be set at the same time") @@ -109,4 +99,25 @@ public void validate( throw Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException(); } } + + private void checkColumn( + String column, TableSchema tableSchema, Data.DataType.TypeName typeName) { + Data.DataType.TypeName columnType = tableSchema.getColumnType(column); + if (columnType == null) { + throw Status.INVALID_ARGUMENT + .withDescription("Column " + column + " not found in schema") + .asRuntimeException(); + } + if (!columnType.equals(typeName)) { + throw Status.INVALID_ARGUMENT + .withDescription( + "Column " + + column + + " must be of type " + + typeName + + ", but found " + + columnType) + .asRuntimeException(); + } + } } diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java index 40375b9324601..38295bb974ad3 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java @@ -76,21 +76,28 @@ public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException } @Override - public void addRow(String index, String key, String doc) throws InterruptedException { + public void addRow(String index, String key, String doc, String routing) + throws InterruptedException { UpdateRequest updateRequest; updateRequest = new UpdateRequest(index, key) .doc(doc, XContentType.JSON) .retryOnConflict(this.retryOnConflict); + if (routing != null) { + updateRequest.routing(routing); + } updateRequest.docAsUpsert(true); this.requestTracker.addWriteTask(); this.opensearchBulkProcessor.add(updateRequest); } @Override - public void deleteRow(String index, String key) throws InterruptedException { + public void deleteRow(String index, String key, String routing) throws InterruptedException { DeleteRequest deleteRequest; deleteRequest = new DeleteRequest(index, key); + if (routing != null) { + deleteRequest.routing(routing); + } this.requestTracker.addWriteTask(); this.opensearchBulkProcessor.add(deleteRequest); } diff --git a/src/connector/src/sink/elasticsearch.rs b/src/connector/src/sink/elasticsearch.rs index 5e45c2b8c74aa..bdcd2e515bbb2 100644 --- a/src/connector/src/sink/elasticsearch.rs +++ b/src/connector/src/sink/elasticsearch.rs @@ -28,6 +28,7 @@ use super::remote::{ElasticSearchSink, OpenSearchSink}; use crate::sink::{Result, Sink}; pub const ES_OPTION_DELIMITER: &str = "delimiter"; pub const ES_OPTION_INDEX_COLUMN: &str = "index_column"; +pub const ES_OPTION_ROUTING_COLUMN: &str = "routing_column"; pub enum StreamChunkConverter { Es(EsStreamChunkConverter), @@ -52,11 +53,23 @@ impl StreamChunkConverter { .ok_or_else(|| anyhow!("Cannot find {}", ES_OPTION_INDEX_COLUMN)) }) .transpose()?; + let routing_column = properties + .get(ES_OPTION_ROUTING_COLUMN) + .cloned() + .map(|n| { + schema + .fields() + .iter() + .position(|s| s.name == n) + .ok_or_else(|| anyhow!("Cannot find {}", ES_OPTION_ROUTING_COLUMN)) + }) + .transpose()?; Ok(StreamChunkConverter::Es(EsStreamChunkConverter::new( schema, pk_indices.clone(), properties.get(ES_OPTION_DELIMITER).cloned(), index_column, + routing_column, )?)) } else { Ok(StreamChunkConverter::Other) @@ -74,6 +87,7 @@ pub struct EsStreamChunkConverter { json_encoder: JsonEncoder, fn_build_id: Box) -> Result + Send>, index_column: Option, + routing_column: Option, } impl EsStreamChunkConverter { fn new( @@ -81,6 +95,7 @@ impl EsStreamChunkConverter { pk_indices: Vec, delimiter: Option, index_column: Option, + routing_column: Option, ) -> Result { let fn_build_id: Box) -> Result + Send> = if pk_indices.is_empty() { @@ -127,6 +142,7 @@ impl EsStreamChunkConverter { json_encoder, fn_build_id, index_column, + routing_column, }) } @@ -138,6 +154,8 @@ impl EsStreamChunkConverter { ::new(chunk.capacity()); let mut index_builder = ::new(chunk.capacity()); + let mut routing_builder = + ::new(chunk.capacity()); for (op, row) in chunk.rows() { ops.push(op); id_string_builder.append(Some(&self.build_id(row)?)); @@ -150,18 +168,29 @@ impl EsStreamChunkConverter { } else { index_builder.append_null(); } + if let Some(index) = self.routing_column { + routing_builder.append(Some( + row.datum_at(index) + .ok_or_else(|| anyhow!("No value found in row, index is {}", index))? + .into_utf8(), + )); + } else { + routing_builder.append_null(); + } let json = JsonbVal::from(Value::Object(self.json_encoder.encode(row)?)); json_builder.append(Some(json.as_scalar_ref())); } let json_array = risingwave_common::array::ArrayBuilder::finish(json_builder); let id_string_array = risingwave_common::array::ArrayBuilder::finish(id_string_builder); let index_string_array = risingwave_common::array::ArrayBuilder::finish(index_builder); + let routing_string_array = risingwave_common::array::ArrayBuilder::finish(routing_builder); Ok(StreamChunk::new( ops, vec![ std::sync::Arc::new(ArrayImpl::Utf8(index_string_array)), std::sync::Arc::new(ArrayImpl::Utf8(id_string_array)), std::sync::Arc::new(ArrayImpl::Jsonb(json_array)), + std::sync::Arc::new(ArrayImpl::Utf8(routing_string_array)), ], )) } diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index aa8ca0625d05f..40d9601a2b491 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -271,6 +271,7 @@ impl RemoteLogSinker { ColumnDesc::unnamed(ColumnId::from(0), DataType::Varchar).to_protobuf(), ColumnDesc::unnamed(ColumnId::from(1), DataType::Varchar).to_protobuf(), ColumnDesc::unnamed(ColumnId::from(2), DataType::Jsonb).to_protobuf(), + ColumnDesc::unnamed(ColumnId::from(2), DataType::Varchar).to_protobuf(), ]; Some(TableSchema { columns,