From 13573061f7b98c759209d60a165162c5966521b1 Mon Sep 17 00:00:00 2001 From: conorbros Date: Mon, 4 Sep 2023 19:29:03 +1000 Subject: [PATCH 1/7] WIP --- Cargo.lock | 54 +++++- shotover-proxy/config/config.yaml | 2 +- .../tests/opensearch_int_tests/mod.rs | 176 +++++++++++++++++- .../opensearch-dual-write/docker-compose.yaml | 50 +++++ .../opensearch-dual-write/topology.yaml | 19 ++ shotover/Cargo.toml | 1 + shotover/src/codec/opensearch.rs | 168 ++++++++++++----- shotover/src/frame/opensearch.rs | 76 +++++++- shotover/src/message/mod.rs | 7 +- 9 files changed, 493 insertions(+), 60 deletions(-) create mode 100644 shotover-proxy/tests/test-configs/opensearch-dual-write/docker-compose.yaml create mode 100644 shotover-proxy/tests/test-configs/opensearch-dual-write/topology.yaml diff --git a/Cargo.lock b/Cargo.lock index a04354f45..c793f8cc4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "adler32" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" + [[package]] name = "aead" version = "0.5.2" @@ -1241,6 +1247,15 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa" +[[package]] +name = "core2" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b49ba7ef1ad6107f8824dbe97de947cbaac53c44e7f9756a1fba0d37c1eec505" +dependencies = [ + "memchr", +] + [[package]] name = "cpufeatures" version = "0.2.9" @@ -1568,6 +1583,12 @@ dependencies = [ "syn 2.0.38", ] +[[package]] +name = "dary_heap" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7762d17f1241643615821a8455a0b2c3e803784b058693d990b11f2dce25a0ca" + [[package]] name = "dashmap" version = "5.5.3" @@ -2573,6 +2594,30 @@ version = "0.2.149" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" +[[package]] +name = "libflate" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7d5654ae1795afc7ff76f4365c2c8791b0feb18e8996a96adad8ffd7c3b2bf" +dependencies = [ + "adler32", + "core2", + "crc32fast", + "dary_heap", + "libflate_lz77", +] + +[[package]] +name = "libflate_lz77" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be5f52fb8c451576ec6b79d3f4deb327398bc05bbdbd99021a6e77a4c855d524" +dependencies = [ + "core2", + "hashbrown 0.13.2", + "rle-decode-fast", +] + [[package]] name = "libm" version = "0.2.8" @@ -3824,6 +3869,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "rle-decode-fast" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422" + [[package]] name = "rsa" version = "0.9.2" @@ -4403,7 +4454,8 @@ dependencies = [ "hyper", "itertools 0.11.0", "kafka-protocol", - "lz4_flex", + "libflate", + "lz4_flex 0.11.1", "metrics", "metrics-exporter-prometheus", "nonzero_ext", diff --git a/shotover-proxy/config/config.yaml b/shotover-proxy/config/config.yaml index 42ea740ca..79879aee2 100644 --- a/shotover-proxy/config/config.yaml +++ b/shotover-proxy/config/config.yaml @@ -1,3 +1,3 @@ --- -main_log_level: "info,shotover_proxy=info" +main_log_level: "debug,shotover_proxy=debug" observability_interface: "0.0.0.0:9001" diff --git a/shotover-proxy/tests/opensearch_int_tests/mod.rs b/shotover-proxy/tests/opensearch_int_tests/mod.rs index 2fcd3814f..c161c86c2 100644 --- a/shotover-proxy/tests/opensearch_int_tests/mod.rs +++ b/shotover-proxy/tests/opensearch_int_tests/mod.rs @@ -2,19 +2,28 @@ use crate::shotover_process; use opensearch::{ auth::Credentials, cert::CertificateValidation, + cluster::ClusterHealthParts, http::{ headers::{HeaderName, HeaderValue}, response::Response, + response::Response, transport::{SingleNodeConnectionPool, TransportBuilder}, Method, StatusCode, Url, }, indices::{IndicesCreateParts, IndicesDeleteParts, IndicesExistsParts}, nodes::NodesInfoParts, + indices::{IndicesCreateParts, IndicesDeleteParts, IndicesExistsParts, IndicesGetParts}, params::Refresh, + params::WaitForStatus, BulkOperation, BulkParts, DeleteParts, Error, IndexParts, OpenSearch, SearchParts, }; use serde_json::{json, Value}; use test_helpers::docker_compose::docker_compose; +// use tokio::{ +// sync::oneshot, +// task::JoinHandle, +// time::{interval, Duration}, +// }; async fn assert_ok_and_get_json(response: Result) -> Value { let response = response.unwrap(); @@ -242,6 +251,24 @@ async fn opensearch_test_suite(client: &OpenSearch) { .await; } +fn create_client(addr: &str) -> OpenSearch { + let url = Url::parse(addr).unwrap(); + let credentials = Credentials::Basic("admin".into(), "admin".into()); + let transport = TransportBuilder::new(SingleNodeConnectionPool::new(url)) + .cert_validation(CertificateValidation::None) + .auth(credentials) + .build() + .unwrap(); + let client = OpenSearch::new(transport); + + client + .cluster() + .health(ClusterHealthParts::None) + .wait_for_status(WaitForStatus::Green); + + client +} + #[tokio::test(flavor = "multi_thread")] async fn passthrough_standard() { let _compose = docker_compose("tests/test-configs/opensearch-passthrough/docker-compose.yaml"); @@ -251,17 +278,150 @@ async fn passthrough_standard() { .await; let addr = "http://localhost:9201"; + let client = create_client(addr); - let url = Url::parse(addr).unwrap(); - let credentials = Credentials::Basic("admin".into(), "admin".into()); - let transport = TransportBuilder::new(SingleNodeConnectionPool::new(url)) - .cert_validation(CertificateValidation::None) - .auth(credentials) - .build() + opensearch_test_suite(&client).await; + + shotover.shutdown_and_then_consume_events(&[]).await; +} + +#[tokio::test(flavor = "multi_thread")] +async fn dual_write_basic() { + let _compose = docker_compose("tests/test-configs/opensearch-dual-write/docker-compose.yaml"); + + let addr1 = "http://localhost:9201"; + let client1 = create_client(addr1); + let addr2 = "http://localhost:9202"; + let client2 = create_client(addr2); + + let shotover = shotover_process("tests/test-configs/opensearch-dual-write/topology.yaml") + .start() + .await; + + let shotover_client = create_client("http://localhost:9200"); + + shotover_client + .indices() + .create(IndicesCreateParts::Index("test-index")) + .send() + .await .unwrap(); - let client = OpenSearch::new(transport); - opensearch_test_suite(&client).await; + let exists_response = shotover_client + .indices() + .exists(IndicesExistsParts::Index(&["test-index"])) + .send() + .await + .unwrap(); + + assert_eq!(exists_response.status_code(), StatusCode::OK); + + shotover_client + .index(IndexParts::Index("test-index")) + .body(json!({ + "name": "John", + "age": 30 + })) + .refresh(Refresh::WaitFor) + .send() + .await + .unwrap(); + + for client in &[shotover_client, client1, client2] { + let response = client + .search(SearchParts::Index(&["test-index"])) + .from(0) + .size(10) + .body(json!({ + "query": { + "match": { + "name": "John", + } + } + })) + .send() + .await + .unwrap(); + + let results = response.json::().await.unwrap(); + assert!(results["took"].as_i64().is_some()); + assert_eq!( + results["hits"].as_object().unwrap()["hits"] + .as_array() + .unwrap() + .len(), + 1 + ); + } + + shotover.shutdown_and_then_consume_events(&[]).await; +} + +// async fn start_writer_thread( +// client: OpenSearch, +// mut shutdown_notification_rx: oneshot::Receiver<()>, +// ) -> tokio::task::JoinHandle<()> { +// tokio::spawn(async move { +// let mut i = 0; +// let mut interval = interval(Duration::from_millis(100)); +// +// loop { +// tokio::select! { +// _ = interval.tick() => { +// // TODO send the message to opensearch +// }, +// _ = &mut shutdown_notification_rx => { +// println!("shutting down writer thread"); +// break; +// } +// } +// } +// }) +// } + +#[tokio::test(flavor = "multi_thread")] +async fn dual_write() { + // let shotover_addr = "http://localhost:9200"; + let source_addr = "http://localhost:9201"; + // let target_addr = "http://localhost:9202"; + + let _compose = docker_compose("tests/test-configs/opensearch-dual-write/docker-compose.yaml"); + + let shotover = shotover_process("tests/test-configs/opensearch-dual-write/topology.yaml") + .start() + .await; + + // let shotover_client = create_client(shotover_addr); + let source_client = create_client(source_addr); + // let target_client = create_client(target_addr); + + // Create indexes in source cluster + assert_ok_and_get_json( + source_client + .indices() + .create(IndicesCreateParts::Index("test-index")) + .send() + .await, + ) + .await; + + // Get index info from source cluster and create in target cluster + + let index_info = assert_ok_and_get_json( + source_client + .indices() + .get(IndicesGetParts::Index(&["test-index"])) + .send() + .await, + ) + .await; + + println!("{:?}", index_info); + + // Begin dual writes and verify data ends up in both clusters + // Begin reindex operations + // Continue dual writing until reindex operation complete + // verify both clusters end up in the same state shotover.shutdown_and_then_consume_events(&[]).await; } diff --git a/shotover-proxy/tests/test-configs/opensearch-dual-write/docker-compose.yaml b/shotover-proxy/tests/test-configs/opensearch-dual-write/docker-compose.yaml new file mode 100644 index 000000000..766327442 --- /dev/null +++ b/shotover-proxy/tests/test-configs/opensearch-dual-write/docker-compose.yaml @@ -0,0 +1,50 @@ +version: '3' +services: + opensearch-node1: + image: opensearchproject/opensearch:2.9.0 + container_name: opensearch-node1 + environment: + - cluster.name=opensearch-cluster-1 + - node.name=opensearch-node1 + - bootstrap.memory_lock=true + - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" + - discovery.type=single-node + - plugins.security.disabled=true + ulimits: + memlock: + soft: -1 + hard: -1 + nofile: + soft: 65536 + hard: 65536 + volumes: + - type: volume + target: /usr/share/opensearch/data + ports: + - 9201:9200 + - 9601:9600 + + opensearch-node2: + image: opensearchproject/opensearch:2.9.0 + container_name: opensearch-node2 + environment: + - cluster.name=opensearch-cluster-2 + - node.name=opensearch-node2 + - bootstrap.memory_lock=true + - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" + - discovery.type=single-node + - plugins.security.disabled=true + ulimits: + memlock: + soft: -1 + hard: -1 + nofile: + soft: 65536 + hard: 65536 + volumes: + - type: volume + target: /usr/share/opensearch/data + ports: + - 9202:9200 + - 9602:9600 + diff --git a/shotover-proxy/tests/test-configs/opensearch-dual-write/topology.yaml b/shotover-proxy/tests/test-configs/opensearch-dual-write/topology.yaml new file mode 100644 index 000000000..382a66e8d --- /dev/null +++ b/shotover-proxy/tests/test-configs/opensearch-dual-write/topology.yaml @@ -0,0 +1,19 @@ +--- +sources: + opensearch_prod: + OpenSearch: + listen_addr: "127.0.0.1:9200" +chain_config: + main_chain: + - Tee: + behavior: FailOnMismatch + buffer_size: 10000 + chain: + - OpenSearchSinkSingle: + remote_address: "127.0.0.1:9202" + connect_timeout_ms: 3000 + - OpenSearchSinkSingle: + remote_address: "127.0.0.1:9201" + connect_timeout_ms: 3000 +source_to_chain_mapping: + opensearch_prod: main_chain diff --git a/shotover/Cargo.toml b/shotover/Cargo.toml index d86e66396..2232faf0e 100644 --- a/shotover/Cargo.toml +++ b/shotover/Cargo.toml @@ -93,6 +93,7 @@ string = "0.3.0" xxhash-rust = { version = "0.8.6", features = ["xxh3"] } dashmap = "5.4.0" atoi = "2.0.0" +libflate = "2.0.0" [dev-dependencies] criterion = { version = "0.5.0", features = ["async_tokio"] } diff --git a/shotover/src/codec/opensearch.rs b/shotover/src/codec/opensearch.rs index 6a45beb98..84ec09277 100644 --- a/shotover/src/codec/opensearch.rs +++ b/shotover/src/codec/opensearch.rs @@ -258,7 +258,7 @@ impl Encoder for OpenSearchEncoder { Encodable::Frame(frame) => { let opensearch_frame = frame.into_opensearch().unwrap(); - match opensearch_frame.headers { + match &opensearch_frame.headers { HttpHead::Request(request_parts) => { self.last_outgoing_method .lock() @@ -313,46 +313,126 @@ impl Encoder for OpenSearchEncoder { } } -#[cfg(test)] -mod opensearch_tests { - use crate::codec::{opensearch::OpenSearchCodecBuilder, CodecBuilder, Direction}; - use bytes::BytesMut; - use tokio_util::codec::{Decoder, Encoder}; - - fn test_frame(raw_frame: &[u8], direction: Direction) { - let (mut decoder, mut encoder) = OpenSearchCodecBuilder::new(direction).build(); - let message = decoder - .decode(&mut BytesMut::from(raw_frame)) - .unwrap() - .unwrap(); - - let mut dest = BytesMut::new(); - encoder.encode(message, &mut dest).unwrap(); - assert_eq!(raw_frame, &dest); - } - - const RESPONSE: [u8; 186] = *b"\ - HTTP/1.1 200 OK\r\n\ - date: Mon, 27 Jul 2009 12:28:53 GMT\r\n\ - server: Apache/2.2.14 (Win32)\r\n\ - last-modified: Wed, 22 Jul 2009 19:15:56 GMT\r\n\ - content-length: 9\r\n\ - content-type: text/html\r\n\r\n\ - something"; - - const REQUEST: [u8; 90] = *b"\ - POST /cgi-bin/process.cgi HTTP/1.1\r\n\ - connection: Keep-Alive\r\n\ - content-length: 9\r\n\r\n\ - something"; - - #[test] - fn test_request() { - test_frame(&REQUEST, Direction::Source); - } - - #[test] - fn test_response() { - test_frame(&RESPONSE, Direction::Sink); - } -} +// #[cfg(test)] +// mod opensearch_tests { +// use crate::codec::{opensearch::OpenSearchCodecBuilder, CodecBuilder, Direction}; +// use bytes::BytesMut; +// use hex_literal::hex; +// use serde_json::Value; +// use tokio_util::codec::{Decoder, Encoder}; +// +// fn test_frame(raw_frame: &[u8], direction: Direction) { +// let (mut decoder, mut encoder) = OpenSearchCodecBuilder::new(direction).build(); +// let message = decoder +// .decode(&mut BytesMut::from(raw_frame)) +// .unwrap() +// .unwrap(); +// +// println!("message: {:?}", message); +// +// let mut dest = BytesMut::new(); +// encoder.encode(message, &mut dest).unwrap(); +// +// println!("dest: {:x?}", dest); +// println!("raw_frame: {:?}", raw_frame); +// +// assert_eq!(raw_frame, &dest); +// } +// +// const RESPONSE: [u8; 186] = *b"\ +// HTTP/1.1 200 OK\r\n\ +// date: Mon, 27 Jul 2009 12:28:53 GMT\r\n\ +// server: Apache/2.2.14 (Win32)\r\n\ +// last-modified: Wed, 22 Jul 2009 19:15:56 GMT\r\n\ +// content-length: 9\r\n\ +// content-type: text/html\r\n\r\n\ +// something"; +// +// const REQUEST: [u8; 90] = *b"\ +// POST /cgi-bin/process.cgi HTTP/1.1\r\n\ +// connection: Keep-Alive\r\n\ +// content-length: 9\r\n\r\n\ +// something"; +// +// const GZIP_RESPONSE: [u8; 594] = hex!("48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d 0a 63 6f 6e 74 65 6e 74 2d 74 79 70 65 3a 20 61 70 70 6c 69 63 61 74 69 6f 6e 2f 6a 73 6f 6e 3b 20 63 68 61 72 73 65 74 3d 55 54 46 2d 38 0d 0a 63 6f 6e 74 65 6e 74 2d 65 6e 63 6f 64 69 6e 67 3a 20 67 7a 69 70 0d 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a 20 34 38 33 0d 0a 0d 0a 1f 8b 08 00 00 00 00 00 00 00 cc 94 c9 6e db 30 10 86 5f 45 98 b3 52 78 5f 74 33 d2 16 c8 d6 c0 4e da 06 0d 0c 82 16 a9 98 30 25 2a 5c 0c 0b 82 df 3d 23 1a 41 63 a0 25 7b ec 45 20 31 ff 7c 9c e1 fc 54 0b 56 a9 1d 64 a3 71 0a 56 94 9c 11 e5 2c 64 05 95 86 a7 40 cc 96 6a 66 20 6b 51 66 a9 84 6c 98 82 71 79 ce 8d 29 1c 6e fb b8 dd 89 ba e6 0c b2 5e 0a 05 15 b2 5b 0e 4e 4b a7 39 e6 3e b7 e0 31 5e 21 2a c6 0f 90 c1 27 55 f3 ca 70 aa f3 ed 85 da 18 ae f7 74 23 a4 b0 0d a4 50 29 c6 51 b2 b9 5d 15 8b d5 ed f4 a1 bc bc d9 7d bd 53 e6 d1 bc 5c 2d 31 ae 39 35 aa f2 45 35 75 a7 7c 75 5c 37 a7 5a 09 3f e4 bc b6 02 e3 bf 85 f0 4d 25 25 ad 6b 51 bd 24 85 72 15 c3 af 4e 9e c5 3a 11 55 a2 34 e3 3a b1 2a 31 4a db c4 e7 fd 4b 91 5e 43 9c 13 d8 17 b0 5f 0b db 3c 95 bd c7 dc fd dc 7c a7 07 42 d8 e7 e5 02 8e c7 f4 4f bd 5b 6e ec c5 e9 8c ff a8 d9 b3 aa ce ba a3 d7 fb d5 6a 71 65 96 4d 79 7d f9 63 76 77 73 ff 85 b0 25 76 b7 3e a6 b0 15 f6 a3 3f 5a d8 53 e9 70 28 7d 1c b6 e6 92 fa 51 64 c0 5f 01 c5 25 5e 8d c9 95 c6 78 e5 a4 7c cf 46 87 90 f7 3b af 95 41 20 5a cf 5f 6c af 5b 9d 65 10 a3 9c ce 11 d0 82 f0 8e b2 c2 ca ce 04 f7 68 a8 07 6f a8 ee a4 6e 98 68 bd 1e 96 f8 57 78 3f 02 47 73 07 e1 fd 10 7c 10 81 e3 13 09 c2 07 21 f8 30 02 c7 57 1a 84 0f 43 f0 51 04 3e 8a c0 47 21 f8 38 02 f7 bf a1 c0 40 c7 21 f8 24 02 9f 44 e0 93 10 7c 1a 81 4f 23 f0 69 08 3e 8b c0 67 11 f8 2c 04 9f 47 e0 f3 08 7c be c6 97 7e 7c 03 00 00 ff ff 03 00 c2 4e 64 8e 2a 06 00 00"); +// +// // #[test] +// // fn test_request() { +// // test_frame(&REQUEST, Direction::Source); +// // } +// // +// // #[test] +// // fn test_response() { +// // test_frame(&RESPONSE, Direction::Sink); +// // } +// +// #[test] +// fn test_encoding() { +// test_frame(&GZIP_RESPONSE, Direction::Sink); +// } +// +// fn decode_gzip(bytes: Vec) -> Value { +// use libflate::gzip::Decoder; +// use libflate::gzip::Encoder; +// use std::io::{Read, Write}; +// +// let mut decoder = Decoder::new(&bytes[..]).unwrap(); +// +// let mut decoded_data = Vec::new(); +// decoder.read_to_end(&mut decoded_data).unwrap(); +// +// let json = serde_json::from_slice::(&decoded_data).unwrap(); +// +// println!("decoded json: {:?}", json); +// +// json +// } +// +// fn encode_gzip(value: Value) -> Vec { +// use libflate::gzip::Decoder; +// use libflate::gzip::Encoder; +// use std::io::{Read, Write}; +// +// let json_bytes = serde_json::to_vec(&value).unwrap(); +// +// let mut encoder = Encoder::new(Vec::new()).unwrap(); +// encoder.write_all(&json_bytes).unwrap(); +// let encoded_data = encoder.finish().into_result().unwrap(); +// +// encoded_data +// } +// +// #[test] +// fn test_gzip_codec() { +// use libflate::gzip::Decoder; +// use libflate::gzip::Encoder; +// use std::io::{Read, Write}; +// +// /* let bytes = hex!("1f 8b 08 00 00 00 00 00 00 00 cc 94 c9 6e db 30 10 86 5f 45 98 b3 52 78 5f 74 33 d2 16 c8 d6 c0 4e da 06 0d 0c 82 16 a9 98 30 25 2a 5c 0c 0b 82 df 3d 23 1a 41 63 a0 25 7b ec 45 20 31 ff 7c 9c e1 fc 54 0b 56 a9 1d 64 a3 71 0a 56 94 9c 11 e5 2c 64 05 95 86 a7 40 cc 96 6a 66 20 6b 51 66 a9 84 6c 98 82 71 79 ce 8d 29 1c 6e fb b8 dd 89 ba e6 0c b2 5e 0a 05 15 b2 5b 0e 4e 4b a7 39 e6 3e b7 e0 31 5e 21 2a c6 0f 90 c1 27 55 f3 ca 70 aa f3 ed 85 da 18 ae f7 74 23 a4 b0 0d a4 50 29 c6 51 b2 b9 5d 15 8b d5 ed f4 a1 bc bc d9 7d bd 53 e6 d1 bc 5c 2d 31 ae 39 35 aa f2 45 35 75 a7 7c 75 5c 37 a7 5a 09 3f e4 bc b6 02 e3 bf 85 f0 4d 25 25 ad 6b 51 bd 24 85 72 15 c3 af 4e 9e c5 3a 11 55 a2 34 e3 3a b1 2a 31 4a db c4 e7 fd 4b 91 5e 43 9c 13 d8 17 b0 5f 0b db 3c 95 bd c7 dc fd dc 7c a7 07 42 d8 e7 e5 02 8e c7 f4 4f bd 5b 6e ec c5 e9 8c ff a8 d9 b3 aa ce ba a3 d7 fb d5 6a 71 65 96 4d 79 7d f9 63 76 77 73 ff 85 b0 25 76 b7 3e a6 b0 15 f6 a3 3f 5a d8 53 e9 70 28 7d 1c b6 e6 92 fa 51 64 c0 5f 01 c5 25 5e 8d c9 95 c6 78 e5 a4 7c cf 46 87 90 f7 3b af 95 41 20 5a cf 5f 6c af 5b 9d 65 10 a3 9c ce 11 d0 82 f0 8e b2 c2 ca ce 04 f7 68 a8 07 6f a8 ee a4 6e 98 68 bd 1e 96 f8 57 78 3f 02 47 73 07 e1 fd 10 7c 10 81 e3 13 09 c2 07 21 f8 30 02 c7 57 1a 84 0f 43 f0 51 04 3e 8a c0 47 21 f8 38 02 f7 bf a1 c0 40 c7 21 f8 24 02 9f 44 e0 93 10 7c 1a 81 4f 23 f0 69 08 3e 8b c0 67 11 f8 2c 04 9f 47 e0 f3 08 7c be c6 97 7e 7c 03 00 00 ff ff 03 00 c2 4e 64 8e 2a 06 00 00"); */ +// +// let bytes = +// hex!("1f8b0800000000000003ab56ca48cdc9c957b252502acf2fca4951aa050022aea38612000000"); +// +// let json = decode_gzip(bytes.to_vec()); +// let encoded_data = encode_gzip(json.clone()); +// +// // assert_eq!(bytes.to_vec(), encoded_data); +// // +// let json = decode_gzip(encoded_data.clone()); +// let encoded_data2 = encode_gzip(json.clone()); +// assert_eq!(encoded_data, encoded_data2); +// +// // +// // let json_bytes = serde_json::to_vec(&json).unwrap(); +// // let new_json = serde_json::from_slice::(&json_bytes).unwrap(); +// // +// // assert_eq!(json,new_json); +// // assert_eq!(decoded_data, json_bytes); +// // +// /* let mut encoder = Encoder::new(Vec::new()).unwrap(); +// encoder.write_all(&decoded_data).unwrap(); +// let encoded_data = encoder.finish().into_result().unwrap(); +// +// assert_eq!(encoded_data, bytes); */ +// } +// } diff --git a/shotover/src/frame/opensearch.rs b/shotover/src/frame/opensearch.rs index fe76f16c5..11f5e2733 100644 --- a/shotover/src/frame/opensearch.rs +++ b/shotover/src/frame/opensearch.rs @@ -1,19 +1,26 @@ use anyhow::Result; use bytes::Bytes; +use derivative::Derivative; use http::{HeaderMap, Method, StatusCode, Uri, Version}; +use serde_json::Value; +use std::fmt; -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, Derivative)] +#[derivative(PartialEq)] pub struct ResponseParts { pub status: StatusCode, pub version: Version, + #[derivative(PartialEq = "ignore")] pub headers: HeaderMap, } -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, Derivative)] +#[derivative(PartialEq)] pub struct RequestParts { pub method: Method, pub uri: Uri, pub version: Version, + #[derivative(PartialEq = "ignore")] pub headers: HeaderMap, } @@ -23,12 +30,33 @@ pub enum HttpHead { Request(RequestParts), } -#[derive(Debug, Clone, PartialEq)] +impl HttpHead { + pub fn headers(&self) -> &HeaderMap { + match &self { + HttpHead::Response(response) => &response.headers, + HttpHead::Request(request) => &request.headers, + } + } +} + +#[derive(Clone, Derivative)] +#[derivative(PartialEq)] pub struct OpenSearchFrame { pub headers: HttpHead, + + #[derivative(PartialEq = "ignore")] pub body: Bytes, } +impl fmt::Debug for OpenSearchFrame { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("OpenSearchFrame") + .field("headers", &self.headers) + .field("body", &self.json_str()) + .finish() + } +} + impl OpenSearchFrame { pub fn new(headers: HttpHead, body: Bytes) -> Self { Self { headers, body } @@ -37,4 +65,46 @@ impl OpenSearchFrame { pub fn from_bytes(_bytes: &Bytes) -> Result { todo!(); } + + pub fn new_server_error_response() -> Self { + let headers = HttpHead::Response(ResponseParts { + status: StatusCode::INTERNAL_SERVER_ERROR, + version: Version::HTTP_11, + headers: HeaderMap::new(), + }); + let body = Bytes::new(); + Self::new(headers, body) + } + + pub fn json_str(&self) -> String { + use http::header; + + if self.body.is_empty() { + return "".to_string(); + }; + + let body = if self + .headers + .headers() + .get(header::CONTENT_ENCODING) + .is_some() + { + use libflate::gzip::Decoder; + use std::io::Read; + + let mut decoder = Decoder::new(&self.body[..]).unwrap(); + + let mut decoded_data = Vec::new(); + decoder.read_to_end(&mut decoded_data).unwrap(); + + decoded_data + } else { + self.body.to_vec() + }; + + match serde_json::from_slice::(&body) { + Ok(json) => serde_json::to_string_pretty(&json).unwrap(), + Err(_) => format!("failed to parse json: {:?}", pretty_hex::pretty_hex(&body)), + } + } } diff --git a/shotover/src/message/mod.rs b/shotover/src/message/mod.rs index 31d1d3363..fcdfe4755 100644 --- a/shotover/src/message/mod.rs +++ b/shotover/src/message/mod.rs @@ -4,6 +4,7 @@ use crate::codec::kafka::RequestHeader; use crate::codec::CodecState; use crate::frame::cassandra::Tracing; use crate::frame::redis::redis_query_type; +use crate::frame::OpenSearchFrame; use crate::frame::{ cassandra, cassandra::{CassandraMetadata, CassandraOperation}, @@ -278,7 +279,7 @@ impl Message { Metadata::Kafka => return Err(anyhow!(error).context( "A generic error cannot be formed because the kafka protocol does not support it", )), - Metadata::OpenSearch => unimplemented!() + Metadata::OpenSearch => Frame::OpenSearch(OpenSearchFrame::new_server_error_response()), })) } @@ -295,14 +296,14 @@ impl Message { MessageType::Redis => Ok(Metadata::Redis), MessageType::Kafka => Ok(Metadata::Kafka), MessageType::Dummy => Err(anyhow!("Dummy has no metadata")), - MessageType::OpenSearch => Err(anyhow!("OpenSearch has no metadata")), + MessageType::OpenSearch => Ok(Metadata::OpenSearch), }, MessageInner::Parsed { frame, .. } | MessageInner::Modified { frame } => match frame { Frame::Cassandra(frame) => Ok(Metadata::Cassandra(frame.metadata())), Frame::Kafka(_) => Ok(Metadata::Kafka), Frame::Redis(_) => Ok(Metadata::Redis), Frame::Dummy => Err(anyhow!("dummy has no metadata")), - Frame::OpenSearch(_) => Err(anyhow!("OpenSearch has no metadata")), + Frame::OpenSearch(_) => Ok(Metadata::OpenSearch), }, } } From a125ca9ffc1e88647564d5b496f7f2d8eaaa7c13 Mon Sep 17 00:00:00 2001 From: conorbros Date: Thu, 21 Sep 2023 17:49:09 +1000 Subject: [PATCH 2/7] WIP --- shotover-proxy/config/config.yaml | 2 +- .../tests/opensearch_int_tests/mod.rs | 155 +++++++++++++----- .../opensearch-dual-write/docker-compose.yaml | 74 +++++++-- .../opensearch-dual-write/topology.yaml | 4 +- shotover/src/codec/opensearch.rs | 7 +- shotover/src/frame/opensearch.rs | 17 ++ shotover/src/message/mod.rs | 2 +- shotover/src/transforms/mod.rs | 2 +- shotover/src/transforms/opensearch/filter.rs | 0 shotover/src/transforms/opensearch/mod.rs | 120 +------------- .../src/transforms/opensearch/sink_single.rs | 119 ++++++++++++++ test-helpers/src/docker_compose.rs | 5 + 12 files changed, 322 insertions(+), 185 deletions(-) create mode 100644 shotover/src/transforms/opensearch/filter.rs create mode 100644 shotover/src/transforms/opensearch/sink_single.rs diff --git a/shotover-proxy/config/config.yaml b/shotover-proxy/config/config.yaml index 79879aee2..42ea740ca 100644 --- a/shotover-proxy/config/config.yaml +++ b/shotover-proxy/config/config.yaml @@ -1,3 +1,3 @@ --- -main_log_level: "debug,shotover_proxy=debug" +main_log_level: "info,shotover_proxy=info" observability_interface: "0.0.0.0:9001" diff --git a/shotover-proxy/tests/opensearch_int_tests/mod.rs b/shotover-proxy/tests/opensearch_int_tests/mod.rs index c161c86c2..290656e4e 100644 --- a/shotover-proxy/tests/opensearch_int_tests/mod.rs +++ b/shotover-proxy/tests/opensearch_int_tests/mod.rs @@ -11,6 +11,7 @@ use opensearch::{ Method, StatusCode, Url, }, indices::{IndicesCreateParts, IndicesDeleteParts, IndicesExistsParts}, + indices::{IndicesCreateParts, IndicesDeleteParts, IndicesExistsParts}, nodes::NodesInfoParts, indices::{IndicesCreateParts, IndicesDeleteParts, IndicesExistsParts, IndicesGetParts}, params::Refresh, @@ -19,11 +20,11 @@ use opensearch::{ }; use serde_json::{json, Value}; use test_helpers::docker_compose::docker_compose; -// use tokio::{ -// sync::oneshot, -// task::JoinHandle, -// time::{interval, Duration}, -// }; +use tokio::{ + sync::oneshot, + task::JoinHandle, + time::{interval, Duration}, +}; async fn assert_ok_and_get_json(response: Result) -> Value { let response = response.unwrap(); @@ -289,9 +290,9 @@ async fn passthrough_standard() { async fn dual_write_basic() { let _compose = docker_compose("tests/test-configs/opensearch-dual-write/docker-compose.yaml"); - let addr1 = "http://localhost:9201"; + let addr1 = "http://172.16.1.2:9200"; let client1 = create_client(addr1); - let addr2 = "http://localhost:9202"; + let addr2 = "http://172.16.1.3:9200"; let client2 = create_client(addr2); let shotover = shotover_process("tests/test-configs/opensearch-dual-write/topology.yaml") @@ -357,33 +358,34 @@ async fn dual_write_basic() { shotover.shutdown_and_then_consume_events(&[]).await; } -// async fn start_writer_thread( -// client: OpenSearch, -// mut shutdown_notification_rx: oneshot::Receiver<()>, -// ) -> tokio::task::JoinHandle<()> { -// tokio::spawn(async move { -// let mut i = 0; -// let mut interval = interval(Duration::from_millis(100)); -// -// loop { -// tokio::select! { -// _ = interval.tick() => { -// // TODO send the message to opensearch -// }, -// _ = &mut shutdown_notification_rx => { -// println!("shutting down writer thread"); -// break; -// } -// } -// } -// }) -// } +async fn index_1000_documents(client: &OpenSearch) { + let mut body: Vec> = vec![]; + for i in 0..100 { + let op = BulkOperation::index(json!({ + "name": "John", + "age": i + })) + .id(i.to_string()) + .into(); + body.push(op); + } + + assert_ok_and_get_json( + client + .bulk(BulkParts::Index("test-index")) + .body(body) + .refresh(Refresh::WaitFor) + .send() + .await, + ) + .await; +} #[tokio::test(flavor = "multi_thread")] -async fn dual_write() { - // let shotover_addr = "http://localhost:9200"; - let source_addr = "http://localhost:9201"; - // let target_addr = "http://localhost:9202"; +async fn dual_write_reindex() { + let shotover_addr = "http://localhost:9200"; + let source_addr = "http://172.16.1.2:9200"; + let target_addr = "http://172.16.1.3:9200"; let _compose = docker_compose("tests/test-configs/opensearch-dual-write/docker-compose.yaml"); @@ -391,9 +393,9 @@ async fn dual_write() { .start() .await; - // let shotover_client = create_client(shotover_addr); + let shotover_client = create_client(shotover_addr); let source_client = create_client(source_addr); - // let target_client = create_client(target_addr); + let target_client = create_client(target_addr); // Create indexes in source cluster assert_ok_and_get_json( @@ -405,18 +407,91 @@ async fn dual_write() { ) .await; - // Get index info from source cluster and create in target cluster - - let index_info = assert_ok_and_get_json( - source_client + // Create in target cluster + assert_ok_and_get_json( + target_client .indices() - .get(IndicesGetParts::Index(&["test-index"])) + .create(IndicesCreateParts::Index("test-index")) .send() .await, ) .await; - println!("{:?}", index_info); + index_1000_documents(&source_client).await; + + let shotover_client_c = shotover_client.clone(); + let dual_write_jh = tokio::spawn(async move { + for _ in 0..20 { + // get a random number in between 0 and 2000 + let i = rand::random::() % 100; + + let response = shotover_client_c + .search(SearchParts::Index(&["test-index"])) + .from(0) + .size(200) + .body(json!({ + "query": { + "match": { + "age": i, + } + } + })) + .allow_no_indices(true) + .send() + .await + .unwrap(); + + let json_res = response.json::().await; + + let document = match &json_res { + Ok(json) => &json["hits"]["hits"][0], + Err(e) => { + println!("Error: {:?}", e); + continue; + } + }; + + // shotover_client_c + // .index(IndexParts::Index("test-index")) + // .body(json!({ + // "name": Value::String(format!("{} Smith", document["_source"]["name"].as_str().unwrap())), + // "age": document["_source"]["age"] + // })) + // .refresh(Refresh::WaitFor) + // .send() + // .await + // .unwrap(); + + tokio::time::sleep(Duration::from_millis(500)).await; + } + }); + + let target_client_c = target_client.clone(); + let reindex_jh = tokio::spawn(async move { + target_client_c + .reindex() + .body(json!( + { + "source":{ + "remote":{ + "host": source_addr, + "username":"admin", + "password":"admin" + }, + "index": "test-index" + }, + "dest":{ + "index": "test-index", + } + } + )) + .requests_per_second(1) + .send() + .await + .unwrap(); + }); + + let _ = tokio::join!(reindex_jh, dual_write_jh); // Begin dual writes and verify data ends up in both clusters // Begin reindex operations diff --git a/shotover-proxy/tests/test-configs/opensearch-dual-write/docker-compose.yaml b/shotover-proxy/tests/test-configs/opensearch-dual-write/docker-compose.yaml index 766327442..10643fea9 100644 --- a/shotover-proxy/tests/test-configs/opensearch-dual-write/docker-compose.yaml +++ b/shotover-proxy/tests/test-configs/opensearch-dual-write/docker-compose.yaml @@ -1,15 +1,30 @@ version: '3' +networks: + cluster_subnet: + name: cluster_subnet + driver: bridge + ipam: + driver: default + config: + - subnet: 172.16.1.0/24 + gateway: 172.16.1.1 + services: opensearch-node1: image: opensearchproject/opensearch:2.9.0 container_name: opensearch-node1 + networks: + cluster_subnet: + ipv4_address: 172.16.1.2 environment: - - cluster.name=opensearch-cluster-1 - - node.name=opensearch-node1 - - bootstrap.memory_lock=true - - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" - - discovery.type=single-node - - plugins.security.disabled=true + &environment + cluster.name: opensearch-cluster-1 + node.name: opensearch-node1 + bootstrap.memory_lock: true + OPENSEARCH_JAVA_OPTS: "-Xms512m -Xmx512m" + discovery.type: single-node + plugins.security.disabled: true + reindex.remote.allowlist: 172.16.1.2:9200 ulimits: memlock: soft: -1 @@ -20,20 +35,20 @@ services: volumes: - type: volume target: /usr/share/opensearch/data - ports: - - 9201:9200 - - 9601:9600 + # ports: + # - 9201:9200 + # - 9601:9600 opensearch-node2: image: opensearchproject/opensearch:2.9.0 container_name: opensearch-node2 + networks: + cluster_subnet: + ipv4_address: 172.16.1.3 environment: - - cluster.name=opensearch-cluster-2 - - node.name=opensearch-node2 - - bootstrap.memory_lock=true - - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" - - discovery.type=single-node - - plugins.security.disabled=true + <<: *environment + cluster.name: opensearch-cluster-2 + node.name: opensearch-node2 ulimits: memlock: soft: -1 @@ -44,7 +59,30 @@ services: volumes: - type: volume target: /usr/share/opensearch/data - ports: - - 9202:9200 - - 9602:9600 + # ports: + # - 9202:9200 + # - 9602:9600 + # + # + + opensearch-dashboards: + image: opensearchproject/opensearch-dashboards:2.9.0 + container_name: opensearch-dashboards + environment: + OPENSEARCH_HOSTS: '["http://172.16.1.2:9200"]' + DISABLE_SECURITY_DASHBOARDS_PLUGIN: true + networks: + cluster_subnet: + ipv4_address: 172.16.1.4 + + opensearch-dashboard-2: + image: opensearchproject/opensearch-dashboards:2.9.0 + container_name: opensearch-dashboard-2 + environment: + OPENSEARCH_HOSTS: '["http://172.16.1.3:9200"]' + DISABLE_SECURITY_DASHBOARDS_PLUGIN: true + networks: + cluster_subnet: + ipv4_address: 172.16.1.5 + diff --git a/shotover-proxy/tests/test-configs/opensearch-dual-write/topology.yaml b/shotover-proxy/tests/test-configs/opensearch-dual-write/topology.yaml index 382a66e8d..d78506ee6 100644 --- a/shotover-proxy/tests/test-configs/opensearch-dual-write/topology.yaml +++ b/shotover-proxy/tests/test-configs/opensearch-dual-write/topology.yaml @@ -10,10 +10,10 @@ chain_config: buffer_size: 10000 chain: - OpenSearchSinkSingle: - remote_address: "127.0.0.1:9202" + remote_address: "172.16.1.3:9200" connect_timeout_ms: 3000 - OpenSearchSinkSingle: - remote_address: "127.0.0.1:9201" + remote_address: "172.16.1.2:9200" connect_timeout_ms: 3000 source_to_chain_mapping: opensearch_prod: main_chain diff --git a/shotover/src/codec/opensearch.rs b/shotover/src/codec/opensearch.rs index 84ec09277..f21f0ea3d 100644 --- a/shotover/src/codec/opensearch.rs +++ b/shotover/src/codec/opensearch.rs @@ -117,9 +117,10 @@ impl OpenSearchDecoder { let mut headers = [httparse::EMPTY_HEADER; 16]; let mut response = httparse::Response::new(&mut headers); - let body_start = match response.parse(src).unwrap() { - httparse::Status::Complete(body_start) => body_start, - httparse::Status::Partial => return Ok(None), + let body_start = match response.parse(src) { + Ok(httparse::Status::Complete(body_start)) => body_start, + Ok(httparse::Status::Partial) => return Ok(None), + Err(err) => return Err(anyhow!("error parsing response: {}", err)), }; match response.version.unwrap() { 1 => (), diff --git a/shotover/src/frame/opensearch.rs b/shotover/src/frame/opensearch.rs index 11f5e2733..2f3c280e0 100644 --- a/shotover/src/frame/opensearch.rs +++ b/shotover/src/frame/opensearch.rs @@ -5,6 +5,8 @@ use http::{HeaderMap, Method, StatusCode, Uri, Version}; use serde_json::Value; use std::fmt; +use crate::message::QueryType; + #[derive(Debug, Clone, Derivative)] #[derivative(PartialEq)] pub struct ResponseParts { @@ -76,6 +78,21 @@ impl OpenSearchFrame { Self::new(headers, body) } + pub fn get_query_type(&self) -> QueryType { + if let HttpHead::Request(request) = &self.headers { + match &request.method { + &Method::GET | &Method::HEAD => QueryType::Read, + &Method::POST | &Method::PUT | &Method::DELETE | &Method::PATCH => QueryType::Write, + m => { + tracing::warn!("handled method: {:?}", m); + QueryType::Read + } + } + } else { + QueryType::Read + } + } + pub fn json_str(&self) -> String { use http::header; diff --git a/shotover/src/message/mod.rs b/shotover/src/message/mod.rs index fcdfe4755..aea46fc4b 100644 --- a/shotover/src/message/mod.rs +++ b/shotover/src/message/mod.rs @@ -245,7 +245,7 @@ impl Message { Some(Frame::Redis(redis)) => redis_query_type(redis), // free-standing function as we cant define methods on RedisFrame Some(Frame::Kafka(_)) => todo!(), Some(Frame::Dummy) => todo!(), - Some(Frame::OpenSearch(_)) => todo!(), + Some(Frame::OpenSearch(os)) => os.get_query_type(), None => QueryType::ReadWrite, } } diff --git a/shotover/src/transforms/mod.rs b/shotover/src/transforms/mod.rs index a6f99877a..b613ef28b 100644 --- a/shotover/src/transforms/mod.rs +++ b/shotover/src/transforms/mod.rs @@ -18,7 +18,7 @@ use crate::transforms::load_balance::ConnectionBalanceAndPool; use crate::transforms::loopback::Loopback; use crate::transforms::null::NullSink; #[cfg(feature = "alpha-transforms")] -use crate::transforms::opensearch::OpenSearchSinkSingle; +use crate::transforms::opensearch::sink_single::OpenSearchSinkSingle; use crate::transforms::parallel_map::ParallelMap; use crate::transforms::protect::Protect; use crate::transforms::query_counter::QueryCounter; diff --git a/shotover/src/transforms/opensearch/filter.rs b/shotover/src/transforms/opensearch/filter.rs new file mode 100644 index 000000000..e69de29bb diff --git a/shotover/src/transforms/opensearch/mod.rs b/shotover/src/transforms/opensearch/mod.rs index 1a84d10c2..c6e4fd3f5 100644 --- a/shotover/src/transforms/opensearch/mod.rs +++ b/shotover/src/transforms/opensearch/mod.rs @@ -1,119 +1 @@ -use crate::tcp; -use crate::transforms::{ - Messages, Transform, TransformBuilder, TransformConfig, Transforms, Wrapper, -}; -use crate::{ - codec::{opensearch::OpenSearchCodecBuilder, CodecBuilder, Direction}, - transforms::util::{ - cluster_connection_pool::{spawn_read_write_tasks, Connection}, - Request, - }, -}; -use anyhow::{anyhow, Result}; -use async_trait::async_trait; -use serde::{Deserialize, Serialize}; -use std::time::Duration; -use tokio::sync::oneshot; -use tracing::trace; - -#[derive(Serialize, Deserialize, Debug)] -pub struct OpenSearchSinkSingleConfig { - #[serde(rename = "remote_address")] - address: String, - connect_timeout_ms: u64, -} - -#[typetag::serde(name = "OpenSearchSinkSingle")] -#[async_trait(?Send)] -impl TransformConfig for OpenSearchSinkSingleConfig { - async fn get_builder(&self, chain_name: String) -> Result> { - Ok(Box::new(OpenSearchSinkSingleBuilder::new( - self.address.clone(), - chain_name, - self.connect_timeout_ms, - ))) - } -} - -#[derive(Clone)] -pub struct OpenSearchSinkSingleBuilder { - address: String, - connect_timeout: Duration, -} - -impl OpenSearchSinkSingleBuilder { - pub fn new(address: String, _chain_name: String, connect_timeout_ms: u64) -> Self { - let connect_timeout = Duration::from_millis(connect_timeout_ms); - - Self { - address, - connect_timeout, - } - } -} - -impl TransformBuilder for OpenSearchSinkSingleBuilder { - fn build(&self) -> Transforms { - Transforms::OpenSearchSinkSingle(OpenSearchSinkSingle { - address: self.address.clone(), - connect_timeout: self.connect_timeout, - codec_builder: OpenSearchCodecBuilder::new(Direction::Sink), - connection: None, - }) - } - - fn get_name(&self) -> &'static str { - "OpenSearchSinkSingle" - } - - fn is_terminating(&self) -> bool { - true - } -} - -pub struct OpenSearchSinkSingle { - address: String, - connection: Option, - connect_timeout: Duration, - codec_builder: OpenSearchCodecBuilder, -} - -#[async_trait] -impl Transform for OpenSearchSinkSingle { - async fn transform<'a>(&'a mut self, requests_wrapper: Wrapper<'a>) -> Result { - // Return immediately if we have no messages. - // If we tried to send no messages we would block forever waiting for a reply that will never come. - if requests_wrapper.requests.is_empty() { - return Ok(requests_wrapper.requests); - } - - if self.connection.is_none() { - trace!("creating outbound connection {:?}", self.address); - - let tcp_stream = tcp::tcp_stream(self.connect_timeout, self.address.clone()).await?; - let (rx, tx) = tcp_stream.into_split(); - self.connection = Some(spawn_read_write_tasks(&self.codec_builder, rx, tx)); - } - - let connection = self.connection.as_mut().unwrap(); - - let messages_len = requests_wrapper.requests.len(); - - let mut result = Vec::with_capacity(messages_len); - for message in requests_wrapper.requests { - let (tx, rx) = oneshot::channel(); - - connection - .send(Request { - message, - return_chan: Some(tx), - }) - .map_err(|_| anyhow!("Failed to send"))?; - - let message = rx.await?.response?; - result.push(message); - } - - Ok(result) - } -} +pub mod sink_single; diff --git a/shotover/src/transforms/opensearch/sink_single.rs b/shotover/src/transforms/opensearch/sink_single.rs new file mode 100644 index 000000000..1a84d10c2 --- /dev/null +++ b/shotover/src/transforms/opensearch/sink_single.rs @@ -0,0 +1,119 @@ +use crate::tcp; +use crate::transforms::{ + Messages, Transform, TransformBuilder, TransformConfig, Transforms, Wrapper, +}; +use crate::{ + codec::{opensearch::OpenSearchCodecBuilder, CodecBuilder, Direction}, + transforms::util::{ + cluster_connection_pool::{spawn_read_write_tasks, Connection}, + Request, + }, +}; +use anyhow::{anyhow, Result}; +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use std::time::Duration; +use tokio::sync::oneshot; +use tracing::trace; + +#[derive(Serialize, Deserialize, Debug)] +pub struct OpenSearchSinkSingleConfig { + #[serde(rename = "remote_address")] + address: String, + connect_timeout_ms: u64, +} + +#[typetag::serde(name = "OpenSearchSinkSingle")] +#[async_trait(?Send)] +impl TransformConfig for OpenSearchSinkSingleConfig { + async fn get_builder(&self, chain_name: String) -> Result> { + Ok(Box::new(OpenSearchSinkSingleBuilder::new( + self.address.clone(), + chain_name, + self.connect_timeout_ms, + ))) + } +} + +#[derive(Clone)] +pub struct OpenSearchSinkSingleBuilder { + address: String, + connect_timeout: Duration, +} + +impl OpenSearchSinkSingleBuilder { + pub fn new(address: String, _chain_name: String, connect_timeout_ms: u64) -> Self { + let connect_timeout = Duration::from_millis(connect_timeout_ms); + + Self { + address, + connect_timeout, + } + } +} + +impl TransformBuilder for OpenSearchSinkSingleBuilder { + fn build(&self) -> Transforms { + Transforms::OpenSearchSinkSingle(OpenSearchSinkSingle { + address: self.address.clone(), + connect_timeout: self.connect_timeout, + codec_builder: OpenSearchCodecBuilder::new(Direction::Sink), + connection: None, + }) + } + + fn get_name(&self) -> &'static str { + "OpenSearchSinkSingle" + } + + fn is_terminating(&self) -> bool { + true + } +} + +pub struct OpenSearchSinkSingle { + address: String, + connection: Option, + connect_timeout: Duration, + codec_builder: OpenSearchCodecBuilder, +} + +#[async_trait] +impl Transform for OpenSearchSinkSingle { + async fn transform<'a>(&'a mut self, requests_wrapper: Wrapper<'a>) -> Result { + // Return immediately if we have no messages. + // If we tried to send no messages we would block forever waiting for a reply that will never come. + if requests_wrapper.requests.is_empty() { + return Ok(requests_wrapper.requests); + } + + if self.connection.is_none() { + trace!("creating outbound connection {:?}", self.address); + + let tcp_stream = tcp::tcp_stream(self.connect_timeout, self.address.clone()).await?; + let (rx, tx) = tcp_stream.into_split(); + self.connection = Some(spawn_read_write_tasks(&self.codec_builder, rx, tx)); + } + + let connection = self.connection.as_mut().unwrap(); + + let messages_len = requests_wrapper.requests.len(); + + let mut result = Vec::with_capacity(messages_len); + for message in requests_wrapper.requests { + let (tx, rx) = oneshot::channel(); + + connection + .send(Request { + message, + return_chan: Some(tx), + }) + .map_err(|_| anyhow!("Failed to send"))?; + + let message = rx.await?.response?; + result.push(message); + } + + Ok(result) + } +} diff --git a/test-helpers/src/docker_compose.rs b/test-helpers/src/docker_compose.rs index baf43bffa..dd761483d 100644 --- a/test-helpers/src/docker_compose.rs +++ b/test-helpers/src/docker_compose.rs @@ -81,5 +81,10 @@ pub fn get_image_waiters() -> &'static [Image] { log_regex_to_wait_for: r"Node started", timeout: 120, }, + Image { + name: "opensearchproject/opensearch-dashboards:2.9.0", + log_regex_to_wait_for: r"Server running", + timeout: 120, + }, ] } From 8004807e1182c1b5d709cd338f3d22986f317ac2 Mon Sep 17 00:00:00 2001 From: conorbros Date: Thu, 28 Sep 2023 22:06:23 +1000 Subject: [PATCH 3/7] tests passing --- .../tests/opensearch_int_tests/mod.rs | 166 ++++++++++++++++-- .../opensearch-dual-write/topology.yaml | 30 ++-- shotover/src/codec/opensearch.rs | 14 +- shotover/src/server.rs | 1 + shotover/src/transforms/chain.rs | 4 +- 5 files changed, 178 insertions(+), 37 deletions(-) diff --git a/shotover-proxy/tests/opensearch_int_tests/mod.rs b/shotover-proxy/tests/opensearch_int_tests/mod.rs index 290656e4e..77bdc30a9 100644 --- a/shotover-proxy/tests/opensearch_int_tests/mod.rs +++ b/shotover-proxy/tests/opensearch_int_tests/mod.rs @@ -20,11 +20,7 @@ use opensearch::{ }; use serde_json::{json, Value}; use test_helpers::docker_compose::docker_compose; -use tokio::{ - sync::oneshot, - task::JoinHandle, - time::{interval, Duration}, -}; +use tokio::time::Duration; async fn assert_ok_and_get_json(response: Result) -> Value { let response = response.unwrap(); @@ -436,7 +432,6 @@ async fn dual_write_reindex() { } } })) - .allow_no_indices(true) .send() .await .unwrap(); @@ -451,16 +446,18 @@ async fn dual_write_reindex() { } }; - // shotover_client_c - // .index(IndexParts::Index("test-index")) - // .body(json!({ - // "name": Value::String(format!("{} Smith", document["_source"]["name"].as_str().unwrap())), - // "age": document["_source"]["age"] - // })) - // .refresh(Refresh::WaitFor) - // .send() - // .await - // .unwrap(); + shotover_client_c + .update(opensearch::UpdateParts::IndexId( + "test-index", + document["_id"].as_str().unwrap(), + )) + .body(json!({ + "name": Value::String("Smith".into()), + })) + .refresh(Refresh::WaitFor) + .send() + .await + .unwrap(); tokio::time::sleep(Duration::from_millis(500)).await; } @@ -491,12 +488,143 @@ async fn dual_write_reindex() { .unwrap(); }); + // Begin dual writes + // Begin reindex operations let _ = tokio::join!(reindex_jh, dual_write_jh); - // Begin dual writes and verify data ends up in both clusters - // Begin reindex operations - // Continue dual writing until reindex operation complete // verify both clusters end up in the same state + let target_response = target_client + .search(SearchParts::Index(&["test-index"])) + .from(0) + .size(200) + .body(json!({ + "query": { + "match": { + "name": "Smith", + } + } + })) + .send() + .await + .unwrap() + .json::() + .await + .unwrap(); + + let source_response = source_client + .search(SearchParts::Index(&["test-index"])) + .from(0) + .size(200) + .body(json!({ + "query": { + "match": { + "name": "Smith", + } + } + })) + .send() + .await + .unwrap() + .json::() + .await + .unwrap(); + + assert_eq!( + target_response["hits"]["hits"].as_array().unwrap().len(), + source_response["hits"]["hits"].as_array().unwrap().len() + ); + + target_response["hits"]["hits"] + .as_array() + .unwrap() + .clone() + .sort_by(|a, b| { + let a_age = a["_source"]["age"].as_i64().unwrap(); + let b_age = b["_source"]["age"].as_i64().unwrap(); + a_age.cmp(&b_age) + }); + + source_response["hits"]["hits"] + .as_array() + .unwrap() + .clone() + .sort_by(|a, b| { + let a_age = a["_source"]["age"].as_i64().unwrap(); + let b_age = b["_source"]["age"].as_i64().unwrap(); + a_age.cmp(&b_age) + }); + + assert_eq!( + target_response["hits"]["hits"].as_array().unwrap(), + source_response["hits"]["hits"].as_array().unwrap() + ); + + // verify both clusters end up in the same state + let target_response = target_client + .search(SearchParts::Index(&["test-index"])) + .from(0) + .size(200) + .body(json!({ + "query": { + "match": { + "name": "John", + } + } + })) + .send() + .await + .unwrap() + .json::() + .await + .unwrap(); + + let source_response = source_client + .search(SearchParts::Index(&["test-index"])) + .from(0) + .size(200) + .body(json!({ + "query": { + "match": { + "name": "John", + } + } + })) + .send() + .await + .unwrap() + .json::() + .await + .unwrap(); + + assert_eq!( + target_response["hits"]["hits"].as_array().unwrap().len(), + source_response["hits"]["hits"].as_array().unwrap().len() + ); + + target_response["hits"]["hits"] + .as_array() + .unwrap() + .clone() + .sort_by(|a, b| { + let a_age = a["_source"]["age"].as_i64().unwrap(); + let b_age = b["_source"]["age"].as_i64().unwrap(); + a_age.cmp(&b_age) + }); + + source_response["hits"]["hits"] + .as_array() + .unwrap() + .clone() + .sort_by(|a, b| { + let a_age = a["_source"]["age"].as_i64().unwrap(); + let b_age = b["_source"]["age"].as_i64().unwrap(); + a_age.cmp(&b_age) + }); + + assert_eq!( + target_response["hits"]["hits"].as_array().unwrap(), + source_response["hits"]["hits"].as_array().unwrap() + ); shotover.shutdown_and_then_consume_events(&[]).await; } diff --git a/shotover-proxy/tests/test-configs/opensearch-dual-write/topology.yaml b/shotover-proxy/tests/test-configs/opensearch-dual-write/topology.yaml index d78506ee6..633a5c0d0 100644 --- a/shotover-proxy/tests/test-configs/opensearch-dual-write/topology.yaml +++ b/shotover-proxy/tests/test-configs/opensearch-dual-write/topology.yaml @@ -1,19 +1,17 @@ --- sources: - opensearch_prod: - OpenSearch: + - OpenSearch: + name: "OpenSearch" listen_addr: "127.0.0.1:9200" -chain_config: - main_chain: - - Tee: - behavior: FailOnMismatch - buffer_size: 10000 - chain: - - OpenSearchSinkSingle: - remote_address: "172.16.1.3:9200" - connect_timeout_ms: 3000 - - OpenSearchSinkSingle: - remote_address: "172.16.1.2:9200" - connect_timeout_ms: 3000 -source_to_chain_mapping: - opensearch_prod: main_chain + chain: + - Tee: + # behavior: LogWarningOnMismatch + behavior: Ignore + buffer_size: 10000 + chain: + - OpenSearchSinkSingle: + remote_address: "172.16.1.3:9200" + connect_timeout_ms: 3000 + - OpenSearchSinkSingle: + remote_address: "172.16.1.2:9200" + connect_timeout_ms: 3000 diff --git a/shotover/src/codec/opensearch.rs b/shotover/src/codec/opensearch.rs index f21f0ea3d..3e9399954 100644 --- a/shotover/src/codec/opensearch.rs +++ b/shotover/src/codec/opensearch.rs @@ -120,7 +120,13 @@ impl OpenSearchDecoder { let body_start = match response.parse(src) { Ok(httparse::Status::Complete(body_start)) => body_start, Ok(httparse::Status::Partial) => return Ok(None), - Err(err) => return Err(anyhow!("error parsing response: {}", err)), + Err(err) => { + return Err(anyhow!( + "error: {} parsing response: {}", + err, + pretty_hex::pretty_hex(&src) + )) + } }; match response.version.unwrap() { 1 => (), @@ -197,6 +203,12 @@ impl Decoder for OpenSearchDecoder { content_length, }) = decode_result { + tracing::debug!( + "{}: incoming OpenSearch message:\n{}", + self.direction, + pretty_hex::pretty_hex(&src) + ); + self.state = State::ReadingBody(http_headers, content_length); src.advance(body_start); } else { diff --git a/shotover/src/server.rs b/shotover/src/server.rs index 8ba76af67..04718b53e 100644 --- a/shotover/src/server.rs +++ b/shotover/src/server.rs @@ -175,6 +175,7 @@ impl TcpCodecListener { "connection", id = self.connection_count, source = self.source_name.as_str(), + chain = self.chain_builder.name.as_str(), ); let transport = self.transport; async { diff --git a/shotover/src/transforms/chain.rs b/shotover/src/transforms/chain.rs index 66bb95028..fd5c98fc7 100644 --- a/shotover/src/transforms/chain.rs +++ b/shotover/src/transforms/chain.rs @@ -278,6 +278,8 @@ impl TransformChainBuilder { #[cfg(test)] let count_clone = count.clone(); + let span = tracing::error_span!("subchain"); + // Even though we don't keep the join handle, this thread will wrap up once all corresponding senders have been dropped. let mut chain = self.build(); @@ -329,7 +331,7 @@ impl TransformChainBuilder { ), } } - .in_current_span(), + .instrument(span), ); BufferedChain { From 673d269c752bb88647f1118d0dacd3bc0cdb6647 Mon Sep 17 00:00:00 2001 From: conorbros Date: Mon, 2 Oct 2023 21:35:54 +1100 Subject: [PATCH 4/7] update docker --- .../opensearch-dual-write/docker-compose.yaml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/shotover-proxy/tests/test-configs/opensearch-dual-write/docker-compose.yaml b/shotover-proxy/tests/test-configs/opensearch-dual-write/docker-compose.yaml index 10643fea9..0d32cbc0a 100644 --- a/shotover-proxy/tests/test-configs/opensearch-dual-write/docker-compose.yaml +++ b/shotover-proxy/tests/test-configs/opensearch-dual-write/docker-compose.yaml @@ -20,10 +20,10 @@ services: &environment cluster.name: opensearch-cluster-1 node.name: opensearch-node1 - bootstrap.memory_lock: true + bootstrap.memory_lock: "true" OPENSEARCH_JAVA_OPTS: "-Xms512m -Xmx512m" discovery.type: single-node - plugins.security.disabled: true + plugins.security.disabled: "true" reindex.remote.allowlist: 172.16.1.2:9200 ulimits: memlock: @@ -70,7 +70,7 @@ services: container_name: opensearch-dashboards environment: OPENSEARCH_HOSTS: '["http://172.16.1.2:9200"]' - DISABLE_SECURITY_DASHBOARDS_PLUGIN: true + DISABLE_SECURITY_DASHBOARDS_PLUGIN: "true" networks: cluster_subnet: ipv4_address: 172.16.1.4 @@ -80,7 +80,7 @@ services: container_name: opensearch-dashboard-2 environment: OPENSEARCH_HOSTS: '["http://172.16.1.3:9200"]' - DISABLE_SECURITY_DASHBOARDS_PLUGIN: true + DISABLE_SECURITY_DASHBOARDS_PLUGIN: "true" networks: cluster_subnet: ipv4_address: 172.16.1.5 From f0e3caa8f811c41b53bc7ec9e215946df6f01d9f Mon Sep 17 00:00:00 2001 From: conorbros Date: Tue, 3 Oct 2023 11:35:14 +1100 Subject: [PATCH 5/7] cleanup --- .../tests/opensearch_int_tests/mod.rs | 200 +++++++----------- .../opensearch-dual-write/topology.yaml | 3 +- 2 files changed, 77 insertions(+), 126 deletions(-) diff --git a/shotover-proxy/tests/opensearch_int_tests/mod.rs b/shotover-proxy/tests/opensearch_int_tests/mod.rs index 77bdc30a9..e12c8cf48 100644 --- a/shotover-proxy/tests/opensearch_int_tests/mod.rs +++ b/shotover-proxy/tests/opensearch_int_tests/mod.rs @@ -40,6 +40,36 @@ async fn assert_ok_and_get_json(response: Result) -> Value { } } +async fn assert_ok_and_same_data( + response_a: Result, + response_b: Result, +) { + let mut response_a = assert_ok_and_get_json(response_a).await["hits"]["hits"] + .as_array() + .unwrap() + .clone(); + let mut response_b = assert_ok_and_get_json(response_b).await["hits"]["hits"] + .as_array() + .unwrap() + .clone(); + + assert_eq!(response_a.len(), response_b.len()); + + response_a.sort_by(|a, b| { + let a_age = a["_source"]["age"].as_i64().unwrap(); + let b_age = b["_source"]["age"].as_i64().unwrap(); + a_age.cmp(&b_age) + }); + + response_b.sort_by(|a, b| { + let a_age = a["_source"]["age"].as_i64().unwrap(); + let b_age = b["_source"]["age"].as_i64().unwrap(); + a_age.cmp(&b_age) + }); + + assert_eq!(response_a, response_b,); +} + pub async fn test_bulk(client: &OpenSearch) { assert_ok_and_get_json( client @@ -354,7 +384,7 @@ async fn dual_write_basic() { shotover.shutdown_and_then_consume_events(&[]).await; } -async fn index_1000_documents(client: &OpenSearch) { +async fn index_100_documents(client: &OpenSearch) { let mut body: Vec> = vec![]; for i in 0..100 { let op = BulkOperation::index(json!({ @@ -413,7 +443,7 @@ async fn dual_write_reindex() { ) .await; - index_1000_documents(&source_client).await; + index_100_documents(&source_client).await; let shotover_client_c = shotover_client.clone(); let dual_write_jh = tokio::spawn(async move { @@ -421,43 +451,41 @@ async fn dual_write_reindex() { // get a random number in between 0 and 2000 let i = rand::random::() % 100; - let response = shotover_client_c - .search(SearchParts::Index(&["test-index"])) - .from(0) - .size(200) - .body(json!({ - "query": { - "match": { - "age": i, + let response = assert_ok_and_get_json( + shotover_client_c + .search(SearchParts::Index(&["test-index"])) + .from(0) + .size(200) + .body(json!({ + "query": { + "match": { + "age": i, + } } - } - })) - .send() - .await - .unwrap(); - - let json_res = response.json::().await; - - let document = match &json_res { - Ok(json) => &json["hits"]["hits"][0], - Err(e) => { - println!("Error: {:?}", e); - continue; - } - }; - - shotover_client_c - .update(opensearch::UpdateParts::IndexId( - "test-index", - document["_id"].as_str().unwrap(), - )) - .body(json!({ - "name": Value::String("Smith".into()), - })) - .refresh(Refresh::WaitFor) - .send() - .await - .unwrap(); + })) + .send() + .await, + ) + .await; + + assert_eq!(response["hits"]["hits"].as_array().unwrap().len(), 1); + + let document = &response["hits"]["hits"][0]; + + assert_ok_and_get_json( + shotover_client_c + .update(opensearch::UpdateParts::IndexId( + "test-index", + document["_id"].as_str().unwrap(), + )) + .body(json!({ + "doc": { "name" : Value::String("Smith".into())} + })) + .refresh(Refresh::WaitFor) + .send() + .await, + ) + .await; tokio::time::sleep(Duration::from_millis(500)).await; } @@ -493,7 +521,7 @@ async fn dual_write_reindex() { let _ = tokio::join!(reindex_jh, dual_write_jh); // verify both clusters end up in the same state - let target_response = target_client + let target = target_client .search(SearchParts::Index(&["test-index"])) .from(0) .size(200) @@ -504,14 +532,9 @@ async fn dual_write_reindex() { } } })) - .send() - .await - .unwrap() - .json::() - .await - .unwrap(); + .send(); - let source_response = source_client + let source = source_client .search(SearchParts::Index(&["test-index"])) .from(0) .size(200) @@ -522,45 +545,12 @@ async fn dual_write_reindex() { } } })) - .send() - .await - .unwrap() - .json::() - .await - .unwrap(); - - assert_eq!( - target_response["hits"]["hits"].as_array().unwrap().len(), - source_response["hits"]["hits"].as_array().unwrap().len() - ); + .send(); - target_response["hits"]["hits"] - .as_array() - .unwrap() - .clone() - .sort_by(|a, b| { - let a_age = a["_source"]["age"].as_i64().unwrap(); - let b_age = b["_source"]["age"].as_i64().unwrap(); - a_age.cmp(&b_age) - }); - - source_response["hits"]["hits"] - .as_array() - .unwrap() - .clone() - .sort_by(|a, b| { - let a_age = a["_source"]["age"].as_i64().unwrap(); - let b_age = b["_source"]["age"].as_i64().unwrap(); - a_age.cmp(&b_age) - }); - - assert_eq!( - target_response["hits"]["hits"].as_array().unwrap(), - source_response["hits"]["hits"].as_array().unwrap() - ); + assert_ok_and_same_data(target.await, source.await).await; // verify both clusters end up in the same state - let target_response = target_client + let target = target_client .search(SearchParts::Index(&["test-index"])) .from(0) .size(200) @@ -571,14 +561,9 @@ async fn dual_write_reindex() { } } })) - .send() - .await - .unwrap() - .json::() - .await - .unwrap(); + .send(); - let source_response = source_client + let source = source_client .search(SearchParts::Index(&["test-index"])) .from(0) .size(200) @@ -589,42 +574,9 @@ async fn dual_write_reindex() { } } })) - .send() - .await - .unwrap() - .json::() - .await - .unwrap(); - - assert_eq!( - target_response["hits"]["hits"].as_array().unwrap().len(), - source_response["hits"]["hits"].as_array().unwrap().len() - ); + .send(); - target_response["hits"]["hits"] - .as_array() - .unwrap() - .clone() - .sort_by(|a, b| { - let a_age = a["_source"]["age"].as_i64().unwrap(); - let b_age = b["_source"]["age"].as_i64().unwrap(); - a_age.cmp(&b_age) - }); - - source_response["hits"]["hits"] - .as_array() - .unwrap() - .clone() - .sort_by(|a, b| { - let a_age = a["_source"]["age"].as_i64().unwrap(); - let b_age = b["_source"]["age"].as_i64().unwrap(); - a_age.cmp(&b_age) - }); - - assert_eq!( - target_response["hits"]["hits"].as_array().unwrap(), - source_response["hits"]["hits"].as_array().unwrap() - ); + assert_ok_and_same_data(target.await, source.await).await; shotover.shutdown_and_then_consume_events(&[]).await; } diff --git a/shotover-proxy/tests/test-configs/opensearch-dual-write/topology.yaml b/shotover-proxy/tests/test-configs/opensearch-dual-write/topology.yaml index 633a5c0d0..26e7574ab 100644 --- a/shotover-proxy/tests/test-configs/opensearch-dual-write/topology.yaml +++ b/shotover-proxy/tests/test-configs/opensearch-dual-write/topology.yaml @@ -5,8 +5,7 @@ sources: listen_addr: "127.0.0.1:9200" chain: - Tee: - # behavior: LogWarningOnMismatch - behavior: Ignore + behavior: LogWarningOnMismatch buffer_size: 10000 chain: - OpenSearchSinkSingle: From a625d212d801e5e8834239f6cbf035769160e3f9 Mon Sep 17 00:00:00 2001 From: conorbros Date: Fri, 20 Oct 2023 10:45:12 +1100 Subject: [PATCH 6/7] fix and update --- Cargo.lock | 127 ++++++++++-------- .../tests/opensearch_int_tests/mod.rs | 7 +- 2 files changed, 80 insertions(+), 54 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c793f8cc4..b9c9b9c8b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -60,14 +60,15 @@ dependencies = [ [[package]] name = "ahash" -version = "0.8.3" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f" +checksum = "cd7d5a2cecb58716e47d67d5703a249964b14c7be1ec3cad3affc295b2d1c35d" dependencies = [ "cfg-if", "getrandom 0.2.10", "once_cell", "version_check", + "zerocopy", ] [[package]] @@ -644,7 +645,7 @@ dependencies = [ "aws-config", "aws-sdk-ec2", "aws-sdk-iam", - "base64 0.21.4", + "base64 0.21.5", "russh", "russh-keys", "ssh-key", @@ -707,9 +708,9 @@ checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" [[package]] name = "base64" -version = "0.21.4" +version = "0.21.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ba43ea6f343b788c8764558649e08df62f86c6ef251fdaeb1ffd010a9ae50a2" +checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" [[package]] name = "base64-simd" @@ -877,7 +878,7 @@ dependencies = [ "cached_proc_macro", "cached_proc_macro_types", "futures", - "hashbrown 0.14.1", + "hashbrown 0.14.2", "instant", "once_cell", "thiserror", @@ -1258,9 +1259,9 @@ dependencies = [ [[package]] name = "cpufeatures" -version = "0.2.9" +version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a17b76ff3a4162b0b27f354a0c87015ddad39d35f9c0c36607a3bdd175dde1f1" +checksum = "3fbc60abd742b35f2492f808e1abbb83d45f72db402e14c55057edc9c7b1e9e4" dependencies = [ "libc", ] @@ -1596,7 +1597,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" dependencies = [ "cfg-if", - "hashbrown 0.14.1", + "hashbrown 0.14.2", "lock_api", "once_cell", "parking_lot_core", @@ -2008,7 +2009,7 @@ dependencies = [ "rustls-native-certs", "rustls-webpki", "semver", - "socket2 0.5.4", + "socket2 0.5.5", "tokio", "tokio-rustls", "tokio-stream", @@ -2235,7 +2236,7 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5681137554ddff44396e5f149892c769d45301dd9aa19c51602a89ee214cb0ec" dependencies = [ - "hashbrown 0.13.1", + "hashbrown 0.13.2", ] [[package]] @@ -2246,18 +2247,18 @@ checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" [[package]] name = "hashbrown" -version = "0.13.1" +version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33ff8ae62cd3a9102e5637afc8452c55acf3844001bd5374e0b0bd7b6616c038" +checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" dependencies = [ "ahash", ] [[package]] name = "hashbrown" -version = "0.14.1" +version = "0.14.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dfda62a12f55daeae5015f81b0baea145391cb4520f86c248fc615d72640d12" +checksum = "f93e7192158dbcda357bdec5fb5788eebf8bbac027f3f33e719d29135ae84156" dependencies = [ "ahash", "allocator-api2", @@ -2362,7 +2363,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.9", + "socket2 0.4.10", "tokio", "tower-service", "tracing", @@ -2455,7 +2456,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8adf3ddd720272c6ea8bf59463c04e0f93d0bbf7c5439b691bca2987e0270897" dependencies = [ "equivalent", - "hashbrown 0.14.1", + "hashbrown 0.14.2", ] [[package]] @@ -2512,9 +2513,9 @@ checksum = "e1be380c410bf0595e94992a648ea89db4dd3f3354ba54af206fd2a68cf5ac8e" [[package]] name = "ipnet" -version = "2.8.0" +version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28b29a3cd74f0f4598934efe3aeba42bae0eb4680554128851ebbecb02af14e6" +checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" [[package]] name = "is-terminal" @@ -2723,7 +2724,7 @@ version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a4964177ddfdab1e3a2b37aec7cf320e14169abb0ed73999f558136409178d5" dependencies = [ - "base64 0.21.4", + "base64 0.21.5", "hyper", "indexmap 1.9.3", "ipnet", @@ -2748,13 +2749,13 @@ dependencies = [ [[package]] name = "metrics-util" -version = "0.15.1" +version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4de2ed6e491ed114b40b732e4d1659a9d53992ebd87490c44a6ffe23739d973e" +checksum = "111cb375987443c3de8d503580b536f77dc8416d32db62d9456db5d93bd7ac47" dependencies = [ "crossbeam-epoch", "crossbeam-utils", - "hashbrown 0.13.1", + "hashbrown 0.13.2", "metrics", "num_cpus", "quanta", @@ -2784,9 +2785,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.8" +version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" +checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0" dependencies = [ "libc", "wasi 0.11.0+wasi-snapshot-preview1", @@ -3087,7 +3088,7 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7899e6ad63d5c1dc6394785d625cadf560aaa5e606d6b709fd5cc6bbf1727f1" dependencies = [ - "base64 0.21.4", + "base64 0.21.5", "bytes", "dyn-clone", "lazy_static", @@ -3275,7 +3276,7 @@ version = "3.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3163d2912b7c3b52d651a055f2c7eec9ba5cd22d26ef75b8dd3a59980b185923" dependencies = [ - "base64 0.21.4", + "base64 0.21.5", "serde", ] @@ -3412,9 +3413,9 @@ dependencies = [ [[package]] name = "portable-atomic" -version = "1.4.3" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31114a898e107c51bb1609ffaf55a0e011cf6a4d7f1170d0015a165082c0338b" +checksum = "b559898e0b4931ed2d3b959ab0c2da4d99cc644c4b0b1a35b4d344027f474023" [[package]] name = "powerfmt" @@ -3687,7 +3688,7 @@ dependencies = [ "rand 0.8.5", "ryu", "sha1_smol", - "socket2 0.4.9", + "socket2 0.4.10", "tokio", "tokio-util", "url", @@ -3802,7 +3803,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b" dependencies = [ "async-compression", - "base64 0.21.4", + "base64 0.21.5", "bytes", "encoding_rs", "futures-core", @@ -4037,9 +4038,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.19" +version = "0.38.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "745ecfa778e66b2b63c88a61cb36e0eea109e803b0b86bf9879fbc77c70e86ed" +checksum = "67ce50cb2e16c2903e30d1cbccfd8387a74b9d4c938b6a4c5ec6cc7556f7a8a0" dependencies = [ "bitflags 2.4.1", "errno", @@ -4078,7 +4079,7 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d3987094b1d07b653b7dfdc3f70ce9a1da9c51ac18c1b06b662e4f9a0e9f4b2" dependencies = [ - "base64 0.21.4", + "base64 0.21.5", ] [[package]] @@ -4186,7 +4187,7 @@ dependencies = [ "scylla-macros", "smallvec", "snap", - "socket2 0.5.4", + "socket2 0.5.5", "strum 0.23.0", "strum_macros 0.23.1", "thiserror", @@ -4428,7 +4429,7 @@ dependencies = [ "aws-sdk-kms", "backtrace", "backtrace-ext", - "base64 0.21.4", + "base64 0.21.5", "bigdecimal 0.4.2", "bincode", "bytes", @@ -4455,7 +4456,7 @@ dependencies = [ "itertools 0.11.0", "kafka-protocol", "libflate", - "lz4_flex 0.11.1", + "lz4_flex", "metrics", "metrics-exporter-prometheus", "nonzero_ext", @@ -4586,9 +4587,9 @@ checksum = "5e9f0ab6ef7eb7353d9119c170a436d1bf248eea575ac42d19d12f4e34130831" [[package]] name = "socket2" -version = "0.4.9" +version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662" +checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" dependencies = [ "libc", "winapi", @@ -4596,9 +4597,9 @@ dependencies = [ [[package]] name = "socket2" -version = "0.5.4" +version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e" +checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" dependencies = [ "libc", "windows-sys 0.48.0", @@ -4838,18 +4839,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.49" +version = "1.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1177e8c6d7ede7afde3585fd2513e611227efd6481bd78d2e82ba1ce16557ed4" +checksum = "f9a7210f5c9a7156bb50aa36aed4c95afb51df0df00713949448cf9e97d382d2" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.49" +version = "1.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10712f02019e9288794769fba95cd6847df9874d49d871d062172f9dd41bc4cc" +checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8" dependencies = [ "proc-macro2", "quote", @@ -4934,7 +4935,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.4", + "socket2 0.5.5", "tokio-macros", "windows-sys 0.48.0", ] @@ -5066,9 +5067,9 @@ dependencies = [ [[package]] name = "toml_datetime" -version = "0.6.3" +version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7cda73e2f1397b1262d6dfdcef8aafae14d1de7748d66822d3bfeeb6d03e5e4b" +checksum = "3550f4e9685620ac18a50ed434eb3aec30db8ba93b0287467bca5826ea25baf1" [[package]] name = "toml_edit" @@ -5111,9 +5112,9 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" [[package]] name = "tracing" -version = "0.1.39" +version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee2ef2af84856a50c1d430afce2fdded0a4ec7eda868db86409b4543df0797f9" +checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ "log", "pin-project-lite", @@ -5155,12 +5156,12 @@ dependencies = [ [[package]] name = "tracing-log" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922" +checksum = "f751112709b4e791d8ce53e32c4ed2d353565a795ce84da2285393f41557bdf2" dependencies = [ - "lazy_static", "log", + "once_cell", "tracing-core", ] @@ -5784,6 +5785,26 @@ dependencies = [ "time", ] +[[package]] +name = "zerocopy" +version = "0.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c19fae0c8a9efc6a8281f2e623db8af1db9e57852e04cde3e754dd2dc29340f" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc56589e9ddd1f1c28d4b4b5c773ce232910a6bb67a70133d61c9e347585efe9" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.38", +] + [[package]] name = "zeroize" version = "1.6.0" diff --git a/shotover-proxy/tests/opensearch_int_tests/mod.rs b/shotover-proxy/tests/opensearch_int_tests/mod.rs index e12c8cf48..1feba62c5 100644 --- a/shotover-proxy/tests/opensearch_int_tests/mod.rs +++ b/shotover-proxy/tests/opensearch_int_tests/mod.rs @@ -20,6 +20,7 @@ use opensearch::{ }; use serde_json::{json, Value}; use test_helpers::docker_compose::docker_compose; +use test_helpers::shotover_process::{EventMatcher, Level}; use tokio::time::Duration; async fn assert_ok_and_get_json(response: Result) -> Value { @@ -578,5 +579,9 @@ async fn dual_write_reindex() { assert_ok_and_same_data(target.await, source.await).await; - shotover.shutdown_and_then_consume_events(&[]).await; + shotover + .shutdown_and_then_consume_events(&[EventMatcher::new() + .with_level(Level::Warn) + .with_target("shotover::transforms::tee")]) + .await; } From b3730f143ea9172ff2069fcae0a60aa83747029f Mon Sep 17 00:00:00 2001 From: conorbros Date: Tue, 31 Oct 2023 10:07:58 +1100 Subject: [PATCH 7/7] more tests --- .../tests/opensearch_int_tests/mod.rs | 86 ++++++++++++++++--- .../opensearch-dual-write/docker-compose.yaml | 4 +- .../opensearch-dual-write/topology.yaml | 1 + 3 files changed, 79 insertions(+), 12 deletions(-) diff --git a/shotover-proxy/tests/opensearch_int_tests/mod.rs b/shotover-proxy/tests/opensearch_int_tests/mod.rs index 1feba62c5..adb5074c9 100644 --- a/shotover-proxy/tests/opensearch_int_tests/mod.rs +++ b/shotover-proxy/tests/opensearch_int_tests/mod.rs @@ -1,4 +1,5 @@ use crate::shotover_process; +use hyper::{Body as HyperBody, Client, Method as HyperMethod, Request, Response as HyperResponse}; use opensearch::{ auth::Credentials, cert::CertificateValidation, @@ -6,16 +7,12 @@ use opensearch::{ http::{ headers::{HeaderName, HeaderValue}, response::Response, - response::Response, transport::{SingleNodeConnectionPool, TransportBuilder}, Method, StatusCode, Url, }, indices::{IndicesCreateParts, IndicesDeleteParts, IndicesExistsParts}, - indices::{IndicesCreateParts, IndicesDeleteParts, IndicesExistsParts}, nodes::NodesInfoParts, - indices::{IndicesCreateParts, IndicesDeleteParts, IndicesExistsParts, IndicesGetParts}, - params::Refresh, - params::WaitForStatus, + params::{Refresh, WaitForStatus}, BulkOperation, BulkParts, DeleteParts, Error, IndexParts, OpenSearch, SearchParts, }; use serde_json::{json, Value}; @@ -23,6 +20,22 @@ use test_helpers::docker_compose::docker_compose; use test_helpers::shotover_process::{EventMatcher, Level}; use tokio::time::Duration; +async fn hyper_request( + uri: String, + method: HyperMethod, + body: HyperBody, +) -> HyperResponse { + let client = Client::new(); + + let req = Request::builder() + .method(method) + .uri(uri) + .body(body) + .expect("request builder"); + + client.request(req).await.unwrap() +} + async fn assert_ok_and_get_json(response: Result) -> Value { let response = response.unwrap(); let status = response.status_code(); @@ -41,9 +54,23 @@ async fn assert_ok_and_get_json(response: Result) -> Value { } } +async fn get_cluster_name(client: &OpenSearch) -> String { + let response = assert_ok_and_get_json( + client + .cluster() + .stats(opensearch::cluster::ClusterStatsParts::None) + .send() + .await, + ) + .await; + + response["cluster_name"].as_str().unwrap().to_owned() +} + async fn assert_ok_and_same_data( response_a: Result, response_b: Result, + warn: bool, ) { let mut response_a = assert_ok_and_get_json(response_a).await["hits"]["hits"] .as_array() @@ -54,7 +81,12 @@ async fn assert_ok_and_same_data( .unwrap() .clone(); - assert_eq!(response_a.len(), response_b.len()); + if !warn { + assert_eq!(response_a.len(), response_b.len()); + } else if response_a.len() != response_b.len() { + println!("Response A len: {:#?}", response_a.len()); + println!("Response B len: {:#?}", response_b.len()); + } response_a.sort_by(|a, b| { let a_age = a["_source"]["age"].as_i64().unwrap(); @@ -68,7 +100,12 @@ async fn assert_ok_and_same_data( a_age.cmp(&b_age) }); - assert_eq!(response_a, response_b,); + if !warn { + assert_eq!(response_a, response_b); + } else if response_a != response_b { + println!("Response A: {:#?}", response_a); + println!("Response B: {:#?}", response_b); + } } pub async fn test_bulk(client: &OpenSearch) { @@ -308,6 +345,19 @@ async fn passthrough_standard() { let addr = "http://localhost:9201"; let client = create_client(addr); + let res = client + .nodes() + .info(NodesInfoParts::None) + .header( + HeaderName::from_lowercase(b"accept-encoding").unwrap(), + HeaderValue::from_str("").unwrap(), + ) + .send() + .await + .unwrap(); + + println!("{:#?}", res); + opensearch_test_suite(&client).await; shotover.shutdown_and_then_consume_events(&[]).await; @@ -420,9 +470,14 @@ async fn dual_write_reindex() { .start() .await; - let shotover_client = create_client(shotover_addr); let source_client = create_client(source_addr); let target_client = create_client(target_addr); + let shotover_client = create_client(shotover_addr); + + // verify that shotover is returning responses from the source clusters + let cluster_name = get_cluster_name(&shotover_client).await; + println!("Shotover is returning responses from {:?}", cluster_name); + assert_eq!(cluster_name, "source-cluster"); // Create indexes in source cluster assert_ok_and_get_json( @@ -548,7 +603,7 @@ async fn dual_write_reindex() { })) .send(); - assert_ok_and_same_data(target.await, source.await).await; + assert_ok_and_same_data(target.await, source.await, true).await; // verify both clusters end up in the same state let target = target_client @@ -577,7 +632,18 @@ async fn dual_write_reindex() { })) .send(); - assert_ok_and_same_data(target.await, source.await).await; + assert_ok_and_same_data(target.await, source.await, true).await; + + // switch shotover to the target cluster + let _ = hyper_request( + format!("http://localhost:{}/transform/tee/result-source", 1234), + HyperMethod::PUT, + HyperBody::from("tee-chain"), + ) + .await; + let cluster_name = get_cluster_name(&shotover_client).await; + println!("Shotover is returning responses from {:?}", cluster_name); + assert_eq!(cluster_name, "target-cluster"); shotover .shutdown_and_then_consume_events(&[EventMatcher::new() diff --git a/shotover-proxy/tests/test-configs/opensearch-dual-write/docker-compose.yaml b/shotover-proxy/tests/test-configs/opensearch-dual-write/docker-compose.yaml index 0d32cbc0a..07f1f0bde 100644 --- a/shotover-proxy/tests/test-configs/opensearch-dual-write/docker-compose.yaml +++ b/shotover-proxy/tests/test-configs/opensearch-dual-write/docker-compose.yaml @@ -18,7 +18,7 @@ services: ipv4_address: 172.16.1.2 environment: &environment - cluster.name: opensearch-cluster-1 + cluster.name: source-cluster node.name: opensearch-node1 bootstrap.memory_lock: "true" OPENSEARCH_JAVA_OPTS: "-Xms512m -Xmx512m" @@ -47,7 +47,7 @@ services: ipv4_address: 172.16.1.3 environment: <<: *environment - cluster.name: opensearch-cluster-2 + cluster.name: target-cluster node.name: opensearch-node2 ulimits: memlock: diff --git a/shotover-proxy/tests/test-configs/opensearch-dual-write/topology.yaml b/shotover-proxy/tests/test-configs/opensearch-dual-write/topology.yaml index 26e7574ab..2cc41a6ca 100644 --- a/shotover-proxy/tests/test-configs/opensearch-dual-write/topology.yaml +++ b/shotover-proxy/tests/test-configs/opensearch-dual-write/topology.yaml @@ -5,6 +5,7 @@ sources: listen_addr: "127.0.0.1:9200" chain: - Tee: + switch_port: 1234 behavior: LogWarningOnMismatch buffer_size: 10000 chain: