Skip to content

Commit

Permalink
opensearch dual write
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros committed Aug 21, 2023
1 parent e8014ff commit 26bf276
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 8 deletions.
95 changes: 87 additions & 8 deletions shotover-proxy/tests/opensearch_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,17 @@ async fn opensearch_test_suite(client: &OpenSearch) {
// test_authentication(&client).await;
}

fn create_client(addr: &str) -> OpenSearch {
let url = Url::parse(addr).unwrap();
let credentials = Credentials::Basic("admin".into(), "admin".into());
let transport = TransportBuilder::new(SingleNodeConnectionPool::new(url))
.cert_validation(CertificateValidation::None)
.auth(credentials)
.build()
.unwrap();
OpenSearch::new(transport)
}

#[tokio::test(flavor = "multi_thread")]
async fn passthrough_standard() {
let _compose = docker_compose("tests/test-configs/opensearch-passthrough/docker-compose.yaml");
Expand All @@ -235,17 +246,85 @@ async fn passthrough_standard() {
.await;

let addr = "http://localhost:9201";
let client = create_client(addr);

let url = Url::parse(addr).unwrap();
let credentials = Credentials::Basic("admin".into(), "admin".into());
let transport = TransportBuilder::new(SingleNodeConnectionPool::new(url))
.cert_validation(CertificateValidation::None)
.auth(credentials)
.build()
opensearch_test_suite(&client).await;

shotover.shutdown_and_then_consume_events(&[]).await;
}

#[tokio::test(flavor = "multi_thread")]
async fn dual_write() {
let _compose = docker_compose("tests/test-configs/opensearch-dual-write/docker-compose.yaml");

let addr1 = "http://localhost:9201";
let client1 = create_client(addr1);
let addr2 = "http://localhost:9202";
let client2 = create_client(addr2);

opensearch_test_suite(&client1).await;
opensearch_test_suite(&client2).await;

let shotover = shotover_process("tests/test-configs/opensearch-dual-write/topology.yaml")
.start()
.await;

let shotover_client = create_client("http://localhost:9200");
opensearch_test_suite(&shotover_client).await;

shotover_client
.indices()
.create(IndicesCreateParts::Index("test-index"))
.send()
.await
.unwrap();
let client = OpenSearch::new(transport);

opensearch_test_suite(&client).await;
let exists_response = shotover_client
.indices()
.exists(IndicesExistsParts::Index(&["test-index"]))
.send()
.await
.unwrap();

assert_eq!(exists_response.status_code(), StatusCode::OK);

shotover_client
.index(IndexParts::Index("test-index"))
.body(json!({
"name": "John",
"age": 30
}))
.refresh(Refresh::WaitFor)
.send()
.await
.unwrap();

for client in &[shotover_client, client1, client2] {
let response = client
.search(SearchParts::Index(&["test-index"]))
.from(0)
.size(10)
.body(json!({
"query": {
"match": {
"name": "John",
}
}
}))
.send()
.await
.unwrap();

let results = response.json::<Value>().await.unwrap();
assert!(results["took"].as_i64().is_some());
assert_eq!(
results["hits"].as_object().unwrap()["hits"]
.as_array()
.unwrap()
.len(),
1
);
}

shotover.shutdown_and_then_consume_events(&[]).await;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
version: '3'
services:
opensearch-node1:
image: opensearchproject/opensearch:2.9.0
container_name: opensearch-node1
environment:
- cluster.name=opensearch-cluster-1
- node.name=opensearch-node1
- bootstrap.memory_lock=true
- "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
- discovery.type=single-node
- plugins.security.disabled=true
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
volumes:
- type: volume
target: /usr/share/opensearch/data
ports:
- 9201:9200
- 9601:9600

opensearch-node2:
image: opensearchproject/opensearch:2.9.0
container_name: opensearch-node2
environment:
- cluster.name=opensearch-cluster-2
- node.name=opensearch-node2
- bootstrap.memory_lock=true
- "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
- discovery.type=single-node
- plugins.security.disabled=true
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
volumes:
- type: volume
target: /usr/share/opensearch/data
ports:
- 9202:9200
- 9602:9600

Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
sources:
opensearch_prod:
OpenSearch:
listen_addr: "127.0.0.1:9200"
chain_config:
main_chain:
- Tee:
behavior: Ignore
buffer_size: 10000
chain:
- OpenSearchSink:
remote_address: "127.0.0.1:9202"
connect_timeout_ms: 3000
- OpenSearchSink:
remote_address: "127.0.0.1:9201"
connect_timeout_ms: 3000
source_to_chain_mapping:
opensearch_prod: main_chain

0 comments on commit 26bf276

Please sign in to comment.