Skip to content

Commit

Permalink
feat(sink): add es dynamic route (#18698)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Sep 25, 2024
1 parent d1baacf commit 45ef6c4
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 25 deletions.
2 changes: 2 additions & 0 deletions ci/scripts/e2e-elasticsearch-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ sleep 5
echo "--- checking elasticsearch sink result"
curl -XGET -u elastic:risingwave "http://elasticsearch:9200/test/_search" -H 'Content-Type: application/json' -d'{"query":{"match_all":{}}}' > ./e2e_test/sink/elasticsearch/elasticsearch_sink.tmp.result
curl -XGET -u elastic:risingwave "http://elasticsearch:9200/test1/_search" -H 'Content-Type: application/json' -d'{"query":{"match_all":{}}}' > ./e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.tmp.result
curl -XGET -u elastic:risingwave "http://elasticsearch:9200/test_route/_search" -H 'Content-Type: application/json' -d'{"query":{"match_all":{}}}' > ./e2e_test/sink/elasticsearch/elasticsearch_with_route.tmp.result
python3 e2e_test/sink/elasticsearch/elasticsearch.py e2e_test/sink/elasticsearch/elasticsearch_sink.result e2e_test/sink/elasticsearch/elasticsearch_sink.tmp.result
python3 e2e_test/sink/elasticsearch/elasticsearch.py e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.result e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.tmp.result
python3 e2e_test/sink/elasticsearch/elasticsearch.py e2e_test/sink/elasticsearch/elasticsearch_with_route.result e2e_test/sink/elasticsearch/elasticsearch_with_route.tmp.result
if [ $? -ne 0 ]; then
echo "The output is not as expected."
exit 1
Expand Down
29 changes: 29 additions & 0 deletions e2e_test/sink/elasticsearch/elasticsearch_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,23 @@ CREATE TABLE t7 (
st struct<st1 int, st2 int>,
);

statement ok
CREATE TABLE test_route (
v1 int primary key,
v2 int,
v3 varchar,
);

statement ok
CREATE SINK test_route_sink from test_route WITH (
connector = 'elasticsearch',
index = 'test_route',
url = 'http://elasticsearch:9200',
username = 'elastic',
password = 'risingwave',
routing_column = 'v3'
);

statement ok
CREATE SINK s7 from t7 WITH (
connector = 'elasticsearch',
Expand All @@ -33,6 +50,12 @@ CREATE SINK s8 from t7 WITH (
delimiter = '_'
);

statement ok
INSERT INTO test_route VALUES
(1,1,'test1'),
(2,2,'test2'),
(3,3,'test3');

statement ok
INSERT INTO t7 VALUES
(1, 2, '1-2', '1970-01-01', '00:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z',(1,1)),
Expand All @@ -57,5 +80,11 @@ DROP SINK s7;
statement ok
DROP SINK s8;

statement ok
DROP SINK test_route_sink;

statement ok
DROP TABLE test_route;

statement ok
DROP TABLE t7;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"took":27,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":3,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test_route","_type":"_doc","_id":"2","_score":1.0,"_routing":"test2","_source":{"v1":2,"v2":2,"v3":"test2"}},{"_index":"test_route","_type":"_doc","_id":"3","_score":1.0,"_routing":"test3","_source":{"v1":3,"v2":3,"v3":"test3"}},{"_index":"test_route","_type":"_doc","_id":"1","_score":1.0,"_routing":"test1","_source":{"v1":1,"v2":1,"v3":"test1"}}]}}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
import java.util.concurrent.TimeUnit;

public interface BulkProcessorAdapter {
public void addRow(String index, String key, String doc) throws InterruptedException;
public void addRow(String index, String key, String doc, String routing)
throws InterruptedException;

public void deleteRow(String index, String key) throws InterruptedException;
public void deleteRow(String index, String key, String routing) throws InterruptedException;

public void flush();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,21 +76,28 @@ public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException
}

@Override
public void addRow(String index, String key, String doc) throws InterruptedException {
public void addRow(String index, String key, String doc, String routing)
throws InterruptedException {
UpdateRequest updateRequest;
updateRequest =
new UpdateRequest(index, "_doc", key)
.doc(doc, XContentType.JSON)
.retryOnConflict(this.retryOnConflict);
if (routing != null) {
updateRequest.routing(routing);
}
updateRequest.docAsUpsert(true);
this.requestTracker.addWriteTask();
this.esBulkProcessor.add(updateRequest);
}

@Override
public void deleteRow(String index, String key) throws InterruptedException {
public void deleteRow(String index, String key, String routing) throws InterruptedException {
DeleteRequest deleteRequest;
deleteRequest = new DeleteRequest(index, "_doc", key);
if (routing != null) {
deleteRequest.routing(routing);
}
this.requestTracker.addWriteTask();
this.esBulkProcessor.add(deleteRequest);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@
public class EsSink extends SinkWriterBase {
private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
private static final String ERROR_REPORT_TEMPLATE = "Error message %s";
private static final int INDEX_INDEX = 0;
private static final int KEY_INDEX = 1;
private static final int DOC_INDEX = 2;
private static final int ROUTING_INDEX = 3;

private final EsSinkConfig config;
private BulkProcessorAdapter bulkProcessor;
Expand Down Expand Up @@ -165,22 +169,26 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) {

private void writeRow(SinkRow row)
throws JsonMappingException, JsonProcessingException, InterruptedException {
final String key = (String) row.get(1);
String doc = (String) row.get(2);
final String key = (String) row.get(KEY_INDEX);
String doc = (String) row.get(DOC_INDEX);
final String index;
if (config.getIndex() == null) {
index = (String) row.get(0);
index = (String) row.get(INDEX_INDEX);
} else {
index = config.getIndex();
}
String routing = null;
if (config.getRoutingColumn() != null) {
routing = (String) row.get(ROUTING_INDEX);
}
switch (row.getOp()) {
case INSERT:
case UPDATE_INSERT:
this.bulkProcessor.addRow(index, key, doc);
this.bulkProcessor.addRow(index, key, doc, routing);
break;
case DELETE:
case UPDATE_DELETE:
this.bulkProcessor.deleteRow(index, key);
this.bulkProcessor.deleteRow(index, key, routing);
break;
default:
throw Status.INVALID_ARGUMENT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public class EsSinkConfig extends CommonSinkConfig {
@JsonProperty(value = "concurrent_requests")
private Integer concurrentRequests;

@JsonProperty(value = "routing_column")
private String routingColumn;

@JsonCreator
public EsSinkConfig(@JsonProperty(value = "url") String url) {
this.url = url;
Expand Down Expand Up @@ -142,4 +145,13 @@ public EsSinkConfig withConcurrentRequests(Integer concurrentRequests) {
this.concurrentRequests = concurrentRequests;
return this;
}

public String getRoutingColumn() {
return routingColumn;
}

public EsSinkConfig withRoutingColumn(String routingColumn) {
this.routingColumn = routingColumn;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,28 +47,18 @@ public void validate(
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES, true);
EsSinkConfig config = mapper.convertValue(tableProperties, EsSinkConfig.class);

// 1. check url
HttpHost host;
try {
host = HttpHost.create(config.getUrl());
} catch (IllegalArgumentException e) {
throw Status.INVALID_ARGUMENT.withDescription(e.getMessage()).asRuntimeException();
}
if (config.getRoutingColumn() != null) {
checkColumn(config.getRoutingColumn(), tableSchema, Data.DataType.TypeName.VARCHAR);
}
if (config.getIndexColumn() != null) {
Data.DataType.TypeName typeName = tableSchema.getColumnType(config.getIndexColumn());
if (typeName == null) {
throw Status.INVALID_ARGUMENT
.withDescription(
"Index column " + config.getIndexColumn() + " not found in schema")
.asRuntimeException();
}
if (!typeName.equals(Data.DataType.TypeName.VARCHAR)) {
throw Status.INVALID_ARGUMENT
.withDescription(
"Index column must be of type String, but found " + typeName)
.asRuntimeException();
}
checkColumn(config.getIndexColumn(), tableSchema, Data.DataType.TypeName.VARCHAR);
if (config.getIndex() != null) {
throw Status.INVALID_ARGUMENT
.withDescription("index and index_column cannot be set at the same time")
Expand Down Expand Up @@ -109,4 +99,25 @@ public void validate(
throw Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException();
}
}

private void checkColumn(
String column, TableSchema tableSchema, Data.DataType.TypeName typeName) {
Data.DataType.TypeName columnType = tableSchema.getColumnType(column);
if (columnType == null) {
throw Status.INVALID_ARGUMENT
.withDescription("Column " + column + " not found in schema")
.asRuntimeException();
}
if (!columnType.equals(typeName)) {
throw Status.INVALID_ARGUMENT
.withDescription(
"Column "
+ column
+ " must be of type "
+ typeName
+ ", but found "
+ columnType)
.asRuntimeException();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,21 +76,28 @@ public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException
}

@Override
public void addRow(String index, String key, String doc) throws InterruptedException {
public void addRow(String index, String key, String doc, String routing)
throws InterruptedException {
UpdateRequest updateRequest;
updateRequest =
new UpdateRequest(index, key)
.doc(doc, XContentType.JSON)
.retryOnConflict(this.retryOnConflict);
if (routing != null) {
updateRequest.routing(routing);
}
updateRequest.docAsUpsert(true);
this.requestTracker.addWriteTask();
this.opensearchBulkProcessor.add(updateRequest);
}

@Override
public void deleteRow(String index, String key) throws InterruptedException {
public void deleteRow(String index, String key, String routing) throws InterruptedException {
DeleteRequest deleteRequest;
deleteRequest = new DeleteRequest(index, key);
if (routing != null) {
deleteRequest.routing(routing);
}
this.requestTracker.addWriteTask();
this.opensearchBulkProcessor.add(deleteRequest);
}
Expand Down
29 changes: 29 additions & 0 deletions src/connector/src/sink/elasticsearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use super::remote::{ElasticSearchSink, OpenSearchSink};
use crate::sink::{Result, Sink};
pub const ES_OPTION_DELIMITER: &str = "delimiter";
pub const ES_OPTION_INDEX_COLUMN: &str = "index_column";
pub const ES_OPTION_ROUTING_COLUMN: &str = "routing_column";

pub enum StreamChunkConverter {
Es(EsStreamChunkConverter),
Expand All @@ -52,11 +53,23 @@ impl StreamChunkConverter {
.ok_or_else(|| anyhow!("Cannot find {}", ES_OPTION_INDEX_COLUMN))
})
.transpose()?;
let routing_column = properties
.get(ES_OPTION_ROUTING_COLUMN)
.cloned()
.map(|n| {
schema
.fields()
.iter()
.position(|s| s.name == n)
.ok_or_else(|| anyhow!("Cannot find {}", ES_OPTION_ROUTING_COLUMN))
})
.transpose()?;
Ok(StreamChunkConverter::Es(EsStreamChunkConverter::new(
schema,
pk_indices.clone(),
properties.get(ES_OPTION_DELIMITER).cloned(),
index_column,
routing_column,
)?))
} else {
Ok(StreamChunkConverter::Other)
Expand All @@ -74,13 +87,15 @@ pub struct EsStreamChunkConverter {
json_encoder: JsonEncoder,
fn_build_id: Box<dyn Fn(RowRef<'_>) -> Result<String> + Send>,
index_column: Option<usize>,
routing_column: Option<usize>,
}
impl EsStreamChunkConverter {
fn new(
schema: Schema,
pk_indices: Vec<usize>,
delimiter: Option<String>,
index_column: Option<usize>,
routing_column: Option<usize>,
) -> Result<Self> {
let fn_build_id: Box<dyn Fn(RowRef<'_>) -> Result<String> + Send> = if pk_indices.is_empty()
{
Expand Down Expand Up @@ -127,6 +142,7 @@ impl EsStreamChunkConverter {
json_encoder,
fn_build_id,
index_column,
routing_column,
})
}

Expand All @@ -138,6 +154,8 @@ impl EsStreamChunkConverter {
<JsonbArrayBuilder as risingwave_common::array::ArrayBuilder>::new(chunk.capacity());
let mut index_builder =
<Utf8ArrayBuilder as risingwave_common::array::ArrayBuilder>::new(chunk.capacity());
let mut routing_builder =
<Utf8ArrayBuilder as risingwave_common::array::ArrayBuilder>::new(chunk.capacity());
for (op, row) in chunk.rows() {
ops.push(op);
id_string_builder.append(Some(&self.build_id(row)?));
Expand All @@ -150,18 +168,29 @@ impl EsStreamChunkConverter {
} else {
index_builder.append_null();
}
if let Some(index) = self.routing_column {
routing_builder.append(Some(
row.datum_at(index)
.ok_or_else(|| anyhow!("No value found in row, index is {}", index))?
.into_utf8(),
));
} else {
routing_builder.append_null();
}
let json = JsonbVal::from(Value::Object(self.json_encoder.encode(row)?));
json_builder.append(Some(json.as_scalar_ref()));
}
let json_array = risingwave_common::array::ArrayBuilder::finish(json_builder);
let id_string_array = risingwave_common::array::ArrayBuilder::finish(id_string_builder);
let index_string_array = risingwave_common::array::ArrayBuilder::finish(index_builder);
let routing_string_array = risingwave_common::array::ArrayBuilder::finish(routing_builder);
Ok(StreamChunk::new(
ops,
vec![
std::sync::Arc::new(ArrayImpl::Utf8(index_string_array)),
std::sync::Arc::new(ArrayImpl::Utf8(id_string_array)),
std::sync::Arc::new(ArrayImpl::Jsonb(json_array)),
std::sync::Arc::new(ArrayImpl::Utf8(routing_string_array)),
],
))
}
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ impl RemoteLogSinker {
ColumnDesc::unnamed(ColumnId::from(0), DataType::Varchar).to_protobuf(),
ColumnDesc::unnamed(ColumnId::from(1), DataType::Varchar).to_protobuf(),
ColumnDesc::unnamed(ColumnId::from(2), DataType::Jsonb).to_protobuf(),
ColumnDesc::unnamed(ColumnId::from(2), DataType::Varchar).to_protobuf(),
];
Some(TableSchema {
columns,
Expand Down

0 comments on commit 45ef6c4

Please sign in to comment.