Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): add es dynamic route #18698

Merged
merged 4 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ci/scripts/e2e-elasticsearch-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ sleep 5
echo "--- checking elasticsearch sink result"
curl -XGET -u elastic:risingwave "http://elasticsearch:9200/test/_search" -H 'Content-Type: application/json' -d'{"query":{"match_all":{}}}' > ./e2e_test/sink/elasticsearch/elasticsearch_sink.tmp.result
curl -XGET -u elastic:risingwave "http://elasticsearch:9200/test1/_search" -H 'Content-Type: application/json' -d'{"query":{"match_all":{}}}' > ./e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.tmp.result
curl -XGET -u elastic:risingwave "http://elasticsearch:9200/test_route/_search" -H 'Content-Type: application/json' -d'{"query":{"match_all":{}}}' > ./e2e_test/sink/elasticsearch/elasticsearch_with_route.tmp.result
python3 e2e_test/sink/elasticsearch/elasticsearch.py e2e_test/sink/elasticsearch/elasticsearch_sink.result e2e_test/sink/elasticsearch/elasticsearch_sink.tmp.result
python3 e2e_test/sink/elasticsearch/elasticsearch.py e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.result e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.tmp.result
python3 e2e_test/sink/elasticsearch/elasticsearch.py e2e_test/sink/elasticsearch/elasticsearch_with_route.result e2e_test/sink/elasticsearch/elasticsearch_with_route.tmp.result
if [ $? -ne 0 ]; then
echo "The output is not as expected."
exit 1
Expand Down
29 changes: 29 additions & 0 deletions e2e_test/sink/elasticsearch/elasticsearch_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,23 @@ CREATE TABLE t7 (
st struct<st1 int, st2 int>,
);

statement ok
CREATE TABLE test_route (
v1 int primary key,
v2 int,
v3 varchar,
);

statement ok
CREATE SINK test_route_sink from test_route WITH (
connector = 'elasticsearch',
index = 'test_route',
url = 'http://elasticsearch:9200',
username = 'elastic',
password = 'risingwave',
routing_column = 'v3'
);

statement ok
CREATE SINK s7 from t7 WITH (
connector = 'elasticsearch',
Expand All @@ -33,6 +50,12 @@ CREATE SINK s8 from t7 WITH (
delimiter = '_'
);

statement ok
INSERT INTO test_route VALUES
(1,1,'test1'),
(2,2,'test2'),
(3,3,'test3');

statement ok
INSERT INTO t7 VALUES
(1, 2, '1-2', '1970-01-01', '00:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z',(1,1)),
Expand All @@ -57,5 +80,11 @@ DROP SINK s7;
statement ok
DROP SINK s8;

statement ok
DROP SINK test_route_sink;

statement ok
DROP TABLE test_route;

statement ok
DROP TABLE t7;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"took":27,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":3,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test_route","_type":"_doc","_id":"2","_score":1.0,"_routing":"test2","_source":{"v1":2,"v2":2,"v3":"test2"}},{"_index":"test_route","_type":"_doc","_id":"3","_score":1.0,"_routing":"test3","_source":{"v1":3,"v2":3,"v3":"test3"}},{"_index":"test_route","_type":"_doc","_id":"1","_score":1.0,"_routing":"test1","_source":{"v1":1,"v2":1,"v3":"test1"}}]}}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
import java.util.concurrent.TimeUnit;

public interface BulkProcessorAdapter {
public void addRow(String index, String key, String doc) throws InterruptedException;
public void addRow(String index, String key, String doc, String routing)
throws InterruptedException;

public void deleteRow(String index, String key) throws InterruptedException;
public void deleteRow(String index, String key, String routing) throws InterruptedException;

public void flush();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,21 +76,28 @@ public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException
}

@Override
public void addRow(String index, String key, String doc) throws InterruptedException {
public void addRow(String index, String key, String doc, String routing)
throws InterruptedException {
UpdateRequest updateRequest;
updateRequest =
new UpdateRequest(index, "_doc", key)
.doc(doc, XContentType.JSON)
.retryOnConflict(this.retryOnConflict);
if (routing != null) {
updateRequest.routing(routing);
}
updateRequest.docAsUpsert(true);
this.requestTracker.addWriteTask();
this.esBulkProcessor.add(updateRequest);
}

@Override
public void deleteRow(String index, String key) throws InterruptedException {
public void deleteRow(String index, String key, String routing) throws InterruptedException {
DeleteRequest deleteRequest;
deleteRequest = new DeleteRequest(index, "_doc", key);
if (routing != null) {
deleteRequest.routing(routing);
}
this.requestTracker.addWriteTask();
this.esBulkProcessor.add(deleteRequest);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@
public class EsSink extends SinkWriterBase {
private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
private static final String ERROR_REPORT_TEMPLATE = "Error message %s";
private static final int INDEX_INDEX = 0;
private static final int KEY_INDEX = 1;
private static final int DOC_INDEX = 2;
private static final int ROUTING_INDEX = 3;

private final EsSinkConfig config;
private BulkProcessorAdapter bulkProcessor;
Expand Down Expand Up @@ -165,22 +169,26 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) {

private void writeRow(SinkRow row)
throws JsonMappingException, JsonProcessingException, InterruptedException {
final String key = (String) row.get(1);
String doc = (String) row.get(2);
final String key = (String) row.get(KEY_INDEX);
String doc = (String) row.get(DOC_INDEX);
final String index;
if (config.getIndex() == null) {
index = (String) row.get(0);
index = (String) row.get(INDEX_INDEX);
} else {
index = config.getIndex();
}
String routing = null;
if (config.getRoutingColumn() != null) {
routing = (String) row.get(ROUTING_INDEX);
}
switch (row.getOp()) {
case INSERT:
case UPDATE_INSERT:
this.bulkProcessor.addRow(index, key, doc);
this.bulkProcessor.addRow(index, key, doc, routing);
break;
case DELETE:
case UPDATE_DELETE:
this.bulkProcessor.deleteRow(index, key);
this.bulkProcessor.deleteRow(index, key, routing);
break;
default:
throw Status.INVALID_ARGUMENT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public class EsSinkConfig extends CommonSinkConfig {
@JsonProperty(value = "concurrent_requests")
private Integer concurrentRequests;

@JsonProperty(value = "routing_column")
private String routingColumn;

@JsonCreator
public EsSinkConfig(@JsonProperty(value = "url") String url) {
this.url = url;
Expand Down Expand Up @@ -142,4 +145,13 @@ public EsSinkConfig withConcurrentRequests(Integer concurrentRequests) {
this.concurrentRequests = concurrentRequests;
return this;
}

public String getRoutingColumn() {
return routingColumn;
}

public EsSinkConfig withRoutingColumn(String routingColumn) {
this.routingColumn = routingColumn;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,28 +47,18 @@ public void validate(
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES, true);
EsSinkConfig config = mapper.convertValue(tableProperties, EsSinkConfig.class);

// 1. check url
HttpHost host;
try {
host = HttpHost.create(config.getUrl());
} catch (IllegalArgumentException e) {
throw Status.INVALID_ARGUMENT.withDescription(e.getMessage()).asRuntimeException();
}
if (config.getRoutingColumn() != null) {
checkColumn(config.getRoutingColumn(), tableSchema, Data.DataType.TypeName.VARCHAR);
}
if (config.getIndexColumn() != null) {
Data.DataType.TypeName typeName = tableSchema.getColumnType(config.getIndexColumn());
if (typeName == null) {
throw Status.INVALID_ARGUMENT
.withDescription(
"Index column " + config.getIndexColumn() + " not found in schema")
.asRuntimeException();
}
if (!typeName.equals(Data.DataType.TypeName.VARCHAR)) {
throw Status.INVALID_ARGUMENT
.withDescription(
"Index column must be of type String, but found " + typeName)
.asRuntimeException();
}
checkColumn(config.getIndexColumn(), tableSchema, Data.DataType.TypeName.VARCHAR);
if (config.getIndex() != null) {
throw Status.INVALID_ARGUMENT
.withDescription("index and index_column cannot be set at the same time")
Expand Down Expand Up @@ -109,4 +99,25 @@ public void validate(
throw Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException();
}
}

private void checkColumn(
String column, TableSchema tableSchema, Data.DataType.TypeName typeName) {
Data.DataType.TypeName columnType = tableSchema.getColumnType(column);
if (columnType == null) {
throw Status.INVALID_ARGUMENT
.withDescription("Column " + column + " not found in schema")
.asRuntimeException();
}
if (!columnType.equals(typeName)) {
throw Status.INVALID_ARGUMENT
.withDescription(
"Column "
+ column
+ " must be of type "
+ typeName
+ ", but found "
+ columnType)
.asRuntimeException();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,21 +76,28 @@ public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException
}

@Override
public void addRow(String index, String key, String doc) throws InterruptedException {
public void addRow(String index, String key, String doc, String routing)
throws InterruptedException {
UpdateRequest updateRequest;
updateRequest =
new UpdateRequest(index, key)
.doc(doc, XContentType.JSON)
.retryOnConflict(this.retryOnConflict);
if (routing != null) {
updateRequest.routing(routing);
}
updateRequest.docAsUpsert(true);
this.requestTracker.addWriteTask();
this.opensearchBulkProcessor.add(updateRequest);
}

@Override
public void deleteRow(String index, String key) throws InterruptedException {
public void deleteRow(String index, String key, String routing) throws InterruptedException {
DeleteRequest deleteRequest;
deleteRequest = new DeleteRequest(index, key);
if (routing != null) {
deleteRequest.routing(routing);
}
this.requestTracker.addWriteTask();
this.opensearchBulkProcessor.add(deleteRequest);
}
Expand Down
29 changes: 29 additions & 0 deletions src/connector/src/sink/elasticsearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use super::remote::{ElasticSearchSink, OpenSearchSink};
use crate::sink::{Result, Sink};
pub const ES_OPTION_DELIMITER: &str = "delimiter";
pub const ES_OPTION_INDEX_COLUMN: &str = "index_column";
pub const ES_OPTION_ROUTING_COLUMN: &str = "routing_column";

pub enum StreamChunkConverter {
Es(EsStreamChunkConverter),
Expand All @@ -52,11 +53,23 @@ impl StreamChunkConverter {
.ok_or_else(|| anyhow!("Cannot find {}", ES_OPTION_INDEX_COLUMN))
})
.transpose()?;
let routing_column = properties
.get(ES_OPTION_ROUTING_COLUMN)
.cloned()
.map(|n| {
schema
.fields()
.iter()
.position(|s| s.name == n)
.ok_or_else(|| anyhow!("Cannot find {}", ES_OPTION_ROUTING_COLUMN))
})
.transpose()?;
Ok(StreamChunkConverter::Es(EsStreamChunkConverter::new(
schema,
pk_indices.clone(),
properties.get(ES_OPTION_DELIMITER).cloned(),
index_column,
routing_column,
)?))
} else {
Ok(StreamChunkConverter::Other)
Expand All @@ -74,13 +87,15 @@ pub struct EsStreamChunkConverter {
json_encoder: JsonEncoder,
fn_build_id: Box<dyn Fn(RowRef<'_>) -> Result<String> + Send>,
index_column: Option<usize>,
routing_column: Option<usize>,
}
impl EsStreamChunkConverter {
fn new(
schema: Schema,
pk_indices: Vec<usize>,
delimiter: Option<String>,
index_column: Option<usize>,
routing_column: Option<usize>,
) -> Result<Self> {
let fn_build_id: Box<dyn Fn(RowRef<'_>) -> Result<String> + Send> = if pk_indices.is_empty()
{
Expand Down Expand Up @@ -127,6 +142,7 @@ impl EsStreamChunkConverter {
json_encoder,
fn_build_id,
index_column,
routing_column,
})
}

Expand All @@ -138,6 +154,8 @@ impl EsStreamChunkConverter {
<JsonbArrayBuilder as risingwave_common::array::ArrayBuilder>::new(chunk.capacity());
let mut index_builder =
<Utf8ArrayBuilder as risingwave_common::array::ArrayBuilder>::new(chunk.capacity());
let mut routing_builder =
<Utf8ArrayBuilder as risingwave_common::array::ArrayBuilder>::new(chunk.capacity());
for (op, row) in chunk.rows() {
ops.push(op);
id_string_builder.append(Some(&self.build_id(row)?));
Expand All @@ -150,18 +168,29 @@ impl EsStreamChunkConverter {
} else {
index_builder.append_null();
}
if let Some(index) = self.routing_column {
routing_builder.append(Some(
row.datum_at(index)
.ok_or_else(|| anyhow!("No value found in row, index is {}", index))?
.into_utf8(),
));
} else {
routing_builder.append_null();
}
let json = JsonbVal::from(Value::Object(self.json_encoder.encode(row)?));
json_builder.append(Some(json.as_scalar_ref()));
}
let json_array = risingwave_common::array::ArrayBuilder::finish(json_builder);
let id_string_array = risingwave_common::array::ArrayBuilder::finish(id_string_builder);
let index_string_array = risingwave_common::array::ArrayBuilder::finish(index_builder);
let routing_string_array = risingwave_common::array::ArrayBuilder::finish(routing_builder);
Ok(StreamChunk::new(
ops,
vec![
std::sync::Arc::new(ArrayImpl::Utf8(index_string_array)),
std::sync::Arc::new(ArrayImpl::Utf8(id_string_array)),
std::sync::Arc::new(ArrayImpl::Jsonb(json_array)),
std::sync::Arc::new(ArrayImpl::Utf8(routing_string_array)),
],
))
}
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ impl RemoteLogSinker {
ColumnDesc::unnamed(ColumnId::from(0), DataType::Varchar).to_protobuf(),
ColumnDesc::unnamed(ColumnId::from(1), DataType::Varchar).to_protobuf(),
ColumnDesc::unnamed(ColumnId::from(2), DataType::Jsonb).to_protobuf(),
ColumnDesc::unnamed(ColumnId::from(2), DataType::Varchar).to_protobuf(),
];
Some(TableSchema {
columns,
Expand Down
Loading