Skip to content

Commit

Permalink
support
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Sep 19, 2024
1 parent 745fb16 commit 741534a
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
import java.util.concurrent.TimeUnit;

public interface BulkProcessorAdapter {
public void addRow(String index, String key, String doc) throws InterruptedException;
public void addRow(String index, String key, String doc, String routing)
throws InterruptedException;

public void deleteRow(String index, String key) throws InterruptedException;
public void deleteRow(String index, String key, String routing) throws InterruptedException;

public void flush();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,21 +76,28 @@ public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException
}

@Override
public void addRow(String index, String key, String doc) throws InterruptedException {
public void addRow(String index, String key, String doc, String routing)
throws InterruptedException {
UpdateRequest updateRequest;
updateRequest =
new UpdateRequest(index, "_doc", key)
.doc(doc, XContentType.JSON)
.retryOnConflict(this.retryOnConflict);
if (routing != null) {
updateRequest.routing(routing);
}
updateRequest.docAsUpsert(true);
this.requestTracker.addWriteTask();
this.esBulkProcessor.add(updateRequest);
}

@Override
public void deleteRow(String index, String key) throws InterruptedException {
public void deleteRow(String index, String key, String routing) throws InterruptedException {
DeleteRequest deleteRequest;
deleteRequest = new DeleteRequest(index, "_doc", key);
if (routing != null) {
deleteRequest.routing(routing);
}
this.requestTracker.addWriteTask();
this.esBulkProcessor.add(deleteRequest);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@
public class EsSink extends SinkWriterBase {
private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
private static final String ERROR_REPORT_TEMPLATE = "Error message %s";
private static final int INDEX_INDEX = 0;
private static final int KEY_INDEX = 1;
private static final int DOC_INDEX = 2;
private static final int ROUTING_INDEX = 3;

private final EsSinkConfig config;
private BulkProcessorAdapter bulkProcessor;
Expand Down Expand Up @@ -165,22 +169,26 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) {

private void writeRow(SinkRow row)
throws JsonMappingException, JsonProcessingException, InterruptedException {
final String key = (String) row.get(1);
String doc = (String) row.get(2);
final String key = (String) row.get(KEY_INDEX);
String doc = (String) row.get(DOC_INDEX);
final String index;
if (config.getIndex() == null) {
index = (String) row.get(0);
index = (String) row.get(INDEX_INDEX);
} else {
index = config.getIndex();
}
String routing = null;
if (config.getRoutingColumn() != null) {
routing = (String) row.get(ROUTING_INDEX);
}
switch (row.getOp()) {
case INSERT:
case UPDATE_INSERT:
this.bulkProcessor.addRow(index, key, doc);
this.bulkProcessor.addRow(index, key, doc, routing);
break;
case DELETE:
case UPDATE_DELETE:
this.bulkProcessor.deleteRow(index, key);
this.bulkProcessor.deleteRow(index, key, routing);
break;
default:
throw Status.INVALID_ARGUMENT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public class EsSinkConfig extends CommonSinkConfig {
@JsonProperty(value = "concurrent_requests")
private Integer concurrentRequests;

@JsonProperty(value = "routing_column")
private String routingColumn;

@JsonCreator
public EsSinkConfig(@JsonProperty(value = "url") String url) {
this.url = url;
Expand Down Expand Up @@ -142,4 +145,13 @@ public EsSinkConfig withConcurrentRequests(Integer concurrentRequests) {
this.concurrentRequests = concurrentRequests;
return this;
}

public String getRoutingColumn() {
return routingColumn;
}

public EsSinkConfig withRoutingColumn(String routingColumn) {
this.routingColumn = routingColumn;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,28 +47,18 @@ public void validate(
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES, true);
EsSinkConfig config = mapper.convertValue(tableProperties, EsSinkConfig.class);

// 1. check url
HttpHost host;
try {
host = HttpHost.create(config.getUrl());
} catch (IllegalArgumentException e) {
throw Status.INVALID_ARGUMENT.withDescription(e.getMessage()).asRuntimeException();
}
if (config.getRoutingColumn() != null) {
checkColumn(config.getRoutingColumn(), tableSchema, Data.DataType.TypeName.VARCHAR);
}
if (config.getIndexColumn() != null) {
Data.DataType.TypeName typeName = tableSchema.getColumnType(config.getIndexColumn());
if (typeName == null) {
throw Status.INVALID_ARGUMENT
.withDescription(
"Index column " + config.getIndexColumn() + " not found in schema")
.asRuntimeException();
}
if (!typeName.equals(Data.DataType.TypeName.VARCHAR)) {
throw Status.INVALID_ARGUMENT
.withDescription(
"Index column must be of type String, but found " + typeName)
.asRuntimeException();
}
checkColumn(config.getIndexColumn(), tableSchema, Data.DataType.TypeName.VARCHAR);
if (config.getIndex() != null) {
throw Status.INVALID_ARGUMENT
.withDescription("index and index_column cannot be set at the same time")
Expand Down Expand Up @@ -109,4 +99,25 @@ public void validate(
throw Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException();
}
}

private void checkColumn(
String column, TableSchema tableSchema, Data.DataType.TypeName typeName) {
Data.DataType.TypeName columnType = tableSchema.getColumnType(column);
if (columnType == null) {
throw Status.INVALID_ARGUMENT
.withDescription("Column " + column + " not found in schema")
.asRuntimeException();
}
if (!columnType.equals(typeName)) {
throw Status.INVALID_ARGUMENT
.withDescription(
"Column "
+ column
+ " must be of type "
+ typeName
+ ", but found "
+ columnType)
.asRuntimeException();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,21 +76,28 @@ public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException
}

@Override
public void addRow(String index, String key, String doc) throws InterruptedException {
public void addRow(String index, String key, String doc, String routing)
throws InterruptedException {
UpdateRequest updateRequest;
updateRequest =
new UpdateRequest(index, key)
.doc(doc, XContentType.JSON)
.retryOnConflict(this.retryOnConflict);
if (routing != null) {
updateRequest.routing(routing);
}
updateRequest.docAsUpsert(true);
this.requestTracker.addWriteTask();
this.opensearchBulkProcessor.add(updateRequest);
}

@Override
public void deleteRow(String index, String key) throws InterruptedException {
public void deleteRow(String index, String key, String routing) throws InterruptedException {
DeleteRequest deleteRequest;
deleteRequest = new DeleteRequest(index, key);
if (routing != null) {
deleteRequest.routing(routing);
}
this.requestTracker.addWriteTask();
this.opensearchBulkProcessor.add(deleteRequest);
}
Expand Down
29 changes: 29 additions & 0 deletions src/connector/src/sink/elasticsearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use super::remote::{ElasticSearchSink, OpenSearchSink};
use crate::sink::{Result, Sink};
pub const ES_OPTION_DELIMITER: &str = "delimiter";
pub const ES_OPTION_INDEX_COLUMN: &str = "index_column";
pub const ES_OPTION_ROUTING_COLUMN: &str = "routing_column";

pub enum StreamChunkConverter {
Es(EsStreamChunkConverter),
Expand All @@ -52,11 +53,23 @@ impl StreamChunkConverter {
.ok_or_else(|| anyhow!("Cannot find {}", ES_OPTION_INDEX_COLUMN))
})
.transpose()?;
let routing_column = properties
.get(ES_OPTION_ROUTING_COLUMN)
.cloned()
.map(|n| {
schema
.fields()
.iter()
.position(|s| s.name == n)
.ok_or_else(|| anyhow!("Cannot find {}", ES_OPTION_ROUTING_COLUMN))
})
.transpose()?;
Ok(StreamChunkConverter::Es(EsStreamChunkConverter::new(
schema,
pk_indices.clone(),
properties.get(ES_OPTION_DELIMITER).cloned(),
index_column,
routing_column,
)?))
} else {
Ok(StreamChunkConverter::Other)
Expand All @@ -74,13 +87,15 @@ pub struct EsStreamChunkConverter {
json_encoder: JsonEncoder,
fn_build_id: Box<dyn Fn(RowRef<'_>) -> Result<String> + Send>,
index_column: Option<usize>,
routing_column: Option<usize>,
}
impl EsStreamChunkConverter {
fn new(
schema: Schema,
pk_indices: Vec<usize>,
delimiter: Option<String>,
index_column: Option<usize>,
routing_column: Option<usize>,
) -> Result<Self> {
let fn_build_id: Box<dyn Fn(RowRef<'_>) -> Result<String> + Send> = if pk_indices.is_empty()
{
Expand Down Expand Up @@ -127,6 +142,7 @@ impl EsStreamChunkConverter {
json_encoder,
fn_build_id,
index_column,
routing_column,
})
}

Expand All @@ -138,6 +154,8 @@ impl EsStreamChunkConverter {
<JsonbArrayBuilder as risingwave_common::array::ArrayBuilder>::new(chunk.capacity());
let mut index_builder =
<Utf8ArrayBuilder as risingwave_common::array::ArrayBuilder>::new(chunk.capacity());
let mut routing_builder =
<Utf8ArrayBuilder as risingwave_common::array::ArrayBuilder>::new(chunk.capacity());
for (op, row) in chunk.rows() {
ops.push(op);
id_string_builder.append(Some(&self.build_id(row)?));
Expand All @@ -150,18 +168,29 @@ impl EsStreamChunkConverter {
} else {
index_builder.append_null();
}
if let Some(index) = self.routing_column {
routing_builder.append(Some(
row.datum_at(index)
.ok_or_else(|| anyhow!("No value found in row, index is {}", index))?
.into_utf8(),
));
} else {
index_builder.append_null();
}
let json = JsonbVal::from(Value::Object(self.json_encoder.encode(row)?));
json_builder.append(Some(json.as_scalar_ref()));
}
let json_array = risingwave_common::array::ArrayBuilder::finish(json_builder);
let id_string_array = risingwave_common::array::ArrayBuilder::finish(id_string_builder);
let index_string_array = risingwave_common::array::ArrayBuilder::finish(index_builder);
let routing_string_array = risingwave_common::array::ArrayBuilder::finish(routing_builder);
Ok(StreamChunk::new(
ops,
vec![
std::sync::Arc::new(ArrayImpl::Utf8(index_string_array)),
std::sync::Arc::new(ArrayImpl::Utf8(id_string_array)),
std::sync::Arc::new(ArrayImpl::Jsonb(json_array)),
std::sync::Arc::new(ArrayImpl::Utf8(routing_string_array)),
],
))
}
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ impl RemoteLogSinker {
ColumnDesc::unnamed(ColumnId::from(0), DataType::Varchar).to_protobuf(),
ColumnDesc::unnamed(ColumnId::from(1), DataType::Varchar).to_protobuf(),
ColumnDesc::unnamed(ColumnId::from(2), DataType::Jsonb).to_protobuf(),
ColumnDesc::unnamed(ColumnId::from(2), DataType::Varchar).to_protobuf(),
];
Some(TableSchema {
columns,
Expand Down

0 comments on commit 741534a

Please sign in to comment.