diff --git a/shotover-proxy/tests/opensearch_int_tests/mod.rs b/shotover-proxy/tests/opensearch_int_tests/mod.rs index aa30b7559..76fc28676 100644 --- a/shotover-proxy/tests/opensearch_int_tests/mod.rs +++ b/shotover-proxy/tests/opensearch_int_tests/mod.rs @@ -226,6 +226,17 @@ async fn opensearch_test_suite(client: &OpenSearch) { // test_authentication(&client).await; } +fn create_client(addr: &str) -> OpenSearch { + let url = Url::parse(addr).unwrap(); + let credentials = Credentials::Basic("admin".into(), "admin".into()); + let transport = TransportBuilder::new(SingleNodeConnectionPool::new(url)) + .cert_validation(CertificateValidation::None) + .auth(credentials) + .build() + .unwrap(); + OpenSearch::new(transport) +} + #[tokio::test(flavor = "multi_thread")] async fn passthrough_standard() { let _compose = docker_compose("tests/test-configs/opensearch-passthrough/docker-compose.yaml"); @@ -235,17 +246,85 @@ async fn passthrough_standard() { .await; let addr = "http://localhost:9201"; + let client = create_client(addr); - let url = Url::parse(addr).unwrap(); - let credentials = Credentials::Basic("admin".into(), "admin".into()); - let transport = TransportBuilder::new(SingleNodeConnectionPool::new(url)) - .cert_validation(CertificateValidation::None) - .auth(credentials) - .build() + opensearch_test_suite(&client).await; + + shotover.shutdown_and_then_consume_events(&[]).await; +} + +#[tokio::test(flavor = "multi_thread")] +async fn dual_write() { + let _compose = docker_compose("tests/test-configs/opensearch-dual-write/docker-compose.yaml"); + + let addr1 = "http://localhost:9201"; + let client1 = create_client(addr1); + let addr2 = "http://localhost:9202"; + let client2 = create_client(addr2); + + opensearch_test_suite(&client1).await; + opensearch_test_suite(&client2).await; + + let shotover = shotover_process("tests/test-configs/opensearch-dual-write/topology.yaml") + .start() + .await; + + let shotover_client = create_client("http://localhost:9200"); + opensearch_test_suite(&shotover_client).await; + + shotover_client + .indices() + .create(IndicesCreateParts::Index("test-index")) + .send() + .await .unwrap(); - let client = OpenSearch::new(transport); - opensearch_test_suite(&client).await; + let exists_response = shotover_client + .indices() + .exists(IndicesExistsParts::Index(&["test-index"])) + .send() + .await + .unwrap(); + + assert_eq!(exists_response.status_code(), StatusCode::OK); + + shotover_client + .index(IndexParts::Index("test-index")) + .body(json!({ + "name": "John", + "age": 30 + })) + .refresh(Refresh::WaitFor) + .send() + .await + .unwrap(); + + for client in &[shotover_client, client1, client2] { + let response = client + .search(SearchParts::Index(&["test-index"])) + .from(0) + .size(10) + .body(json!({ + "query": { + "match": { + "name": "John", + } + } + })) + .send() + .await + .unwrap(); + + let results = response.json::().await.unwrap(); + assert!(results["took"].as_i64().is_some()); + assert_eq!( + results["hits"].as_object().unwrap()["hits"] + .as_array() + .unwrap() + .len(), + 1 + ); + } shotover.shutdown_and_then_consume_events(&[]).await; } diff --git a/shotover-proxy/tests/test-configs/opensearch-dual-write/docker-compose.yaml b/shotover-proxy/tests/test-configs/opensearch-dual-write/docker-compose.yaml new file mode 100644 index 000000000..766327442 --- /dev/null +++ b/shotover-proxy/tests/test-configs/opensearch-dual-write/docker-compose.yaml @@ -0,0 +1,50 @@ +version: '3' +services: + opensearch-node1: + image: opensearchproject/opensearch:2.9.0 + container_name: opensearch-node1 + environment: + - cluster.name=opensearch-cluster-1 + - node.name=opensearch-node1 + - bootstrap.memory_lock=true + - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" + - discovery.type=single-node + - plugins.security.disabled=true + ulimits: + memlock: + soft: -1 + hard: -1 + nofile: + soft: 65536 + hard: 65536 + volumes: + - type: volume + target: /usr/share/opensearch/data + ports: + - 9201:9200 + - 9601:9600 + + opensearch-node2: + image: opensearchproject/opensearch:2.9.0 + container_name: opensearch-node2 + environment: + - cluster.name=opensearch-cluster-2 + - node.name=opensearch-node2 + - bootstrap.memory_lock=true + - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" + - discovery.type=single-node + - plugins.security.disabled=true + ulimits: + memlock: + soft: -1 + hard: -1 + nofile: + soft: 65536 + hard: 65536 + volumes: + - type: volume + target: /usr/share/opensearch/data + ports: + - 9202:9200 + - 9602:9600 + diff --git a/shotover-proxy/tests/test-configs/opensearch-dual-write/topology.yaml b/shotover-proxy/tests/test-configs/opensearch-dual-write/topology.yaml new file mode 100644 index 000000000..119e6673f --- /dev/null +++ b/shotover-proxy/tests/test-configs/opensearch-dual-write/topology.yaml @@ -0,0 +1,19 @@ +--- +sources: + opensearch_prod: + OpenSearch: + listen_addr: "127.0.0.1:9200" +chain_config: + main_chain: + - Tee: + behavior: Ignore + buffer_size: 10000 + chain: + - OpenSearchSink: + remote_address: "127.0.0.1:9202" + connect_timeout_ms: 3000 + - OpenSearchSink: + remote_address: "127.0.0.1:9201" + connect_timeout_ms: 3000 +source_to_chain_mapping: + opensearch_prod: main_chain