Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros committed Oct 3, 2023
1 parent f534300 commit 632f251
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 126 deletions.
200 changes: 76 additions & 124 deletions shotover-proxy/tests/opensearch_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,36 @@ async fn assert_ok_and_get_json(response: Result<Response, Error>) -> Value {
}
}

async fn assert_ok_and_same_data(
response_a: Result<Response, Error>,
response_b: Result<Response, Error>,
) {
let mut response_a = assert_ok_and_get_json(response_a).await["hits"]["hits"]
.as_array()
.unwrap()
.clone();
let mut response_b = assert_ok_and_get_json(response_b).await["hits"]["hits"]
.as_array()
.unwrap()
.clone();

assert_eq!(response_a.len(), response_b.len());

response_a.sort_by(|a, b| {
let a_age = a["_source"]["age"].as_i64().unwrap();
let b_age = b["_source"]["age"].as_i64().unwrap();
a_age.cmp(&b_age)
});

response_b.sort_by(|a, b| {
let a_age = a["_source"]["age"].as_i64().unwrap();
let b_age = b["_source"]["age"].as_i64().unwrap();
a_age.cmp(&b_age)
});

assert_eq!(response_a, response_b,);
}

pub async fn test_bulk(client: &OpenSearch) {
assert_ok_and_get_json(
client
Expand Down Expand Up @@ -335,7 +365,7 @@ async fn dual_write_basic() {
shotover.shutdown_and_then_consume_events(&[]).await;
}

async fn index_1000_documents(client: &OpenSearch) {
async fn index_100_documents(client: &OpenSearch) {
let mut body: Vec<BulkOperation<_>> = vec![];
for i in 0..100 {
let op = BulkOperation::index(json!({
Expand Down Expand Up @@ -394,51 +424,49 @@ async fn dual_write_reindex() {
)
.await;

index_1000_documents(&source_client).await;
index_100_documents(&source_client).await;

let shotover_client_c = shotover_client.clone();
let dual_write_jh = tokio::spawn(async move {
for _ in 0..20 {
// get a random number in between 0 and 2000
let i = rand::random::<u32>() % 100;

let response = shotover_client_c
.search(SearchParts::Index(&["test-index"]))
.from(0)
.size(200)
.body(json!({
"query": {
"match": {
"age": i,
let response = assert_ok_and_get_json(
shotover_client_c
.search(SearchParts::Index(&["test-index"]))
.from(0)
.size(200)
.body(json!({
"query": {
"match": {
"age": i,
}
}
}
}))
.send()
.await
.unwrap();

let json_res = response.json::<Value>().await;

let document = match &json_res {
Ok(json) => &json["hits"]["hits"][0],
Err(e) => {
println!("Error: {:?}", e);
continue;
}
};

shotover_client_c
.update(opensearch::UpdateParts::IndexId(
"test-index",
document["_id"].as_str().unwrap(),
))
.body(json!({
"name": Value::String("Smith".into()),
}))
.refresh(Refresh::WaitFor)
.send()
.await
.unwrap();
}))
.send()
.await,
)
.await;

assert_eq!(response["hits"]["hits"].as_array().unwrap().len(), 1);

let document = &response["hits"]["hits"][0];

assert_ok_and_get_json(
shotover_client_c
.update(opensearch::UpdateParts::IndexId(
"test-index",
document["_id"].as_str().unwrap(),
))
.body(json!({
"doc": { "name" : Value::String("Smith".into())}
}))
.refresh(Refresh::WaitFor)
.send()
.await,
)
.await;

tokio::time::sleep(Duration::from_millis(500)).await;
}
Expand Down Expand Up @@ -474,7 +502,7 @@ async fn dual_write_reindex() {
let _ = tokio::join!(reindex_jh, dual_write_jh);

// verify both clusters end up in the same state
let target_response = target_client
let target = target_client
.search(SearchParts::Index(&["test-index"]))
.from(0)
.size(200)
Expand All @@ -485,14 +513,9 @@ async fn dual_write_reindex() {
}
}
}))
.send()
.await
.unwrap()
.json::<Value>()
.await
.unwrap();
.send();

let source_response = source_client
let source = source_client
.search(SearchParts::Index(&["test-index"]))
.from(0)
.size(200)
Expand All @@ -503,45 +526,12 @@ async fn dual_write_reindex() {
}
}
}))
.send()
.await
.unwrap()
.json::<Value>()
.await
.unwrap();

assert_eq!(
target_response["hits"]["hits"].as_array().unwrap().len(),
source_response["hits"]["hits"].as_array().unwrap().len()
);
.send();

target_response["hits"]["hits"]
.as_array()
.unwrap()
.clone()
.sort_by(|a, b| {
let a_age = a["_source"]["age"].as_i64().unwrap();
let b_age = b["_source"]["age"].as_i64().unwrap();
a_age.cmp(&b_age)
});

source_response["hits"]["hits"]
.as_array()
.unwrap()
.clone()
.sort_by(|a, b| {
let a_age = a["_source"]["age"].as_i64().unwrap();
let b_age = b["_source"]["age"].as_i64().unwrap();
a_age.cmp(&b_age)
});

assert_eq!(
target_response["hits"]["hits"].as_array().unwrap(),
source_response["hits"]["hits"].as_array().unwrap()
);
assert_ok_and_same_data(target.await, source.await).await;

// verify both clusters end up in the same state
let target_response = target_client
let target = target_client
.search(SearchParts::Index(&["test-index"]))
.from(0)
.size(200)
Expand All @@ -552,14 +542,9 @@ async fn dual_write_reindex() {
}
}
}))
.send()
.await
.unwrap()
.json::<Value>()
.await
.unwrap();
.send();

let source_response = source_client
let source = source_client
.search(SearchParts::Index(&["test-index"]))
.from(0)
.size(200)
Expand All @@ -570,42 +555,9 @@ async fn dual_write_reindex() {
}
}
}))
.send()
.await
.unwrap()
.json::<Value>()
.await
.unwrap();
.send();

assert_eq!(
target_response["hits"]["hits"].as_array().unwrap().len(),
source_response["hits"]["hits"].as_array().unwrap().len()
);

target_response["hits"]["hits"]
.as_array()
.unwrap()
.clone()
.sort_by(|a, b| {
let a_age = a["_source"]["age"].as_i64().unwrap();
let b_age = b["_source"]["age"].as_i64().unwrap();
a_age.cmp(&b_age)
});

source_response["hits"]["hits"]
.as_array()
.unwrap()
.clone()
.sort_by(|a, b| {
let a_age = a["_source"]["age"].as_i64().unwrap();
let b_age = b["_source"]["age"].as_i64().unwrap();
a_age.cmp(&b_age)
});

assert_eq!(
target_response["hits"]["hits"].as_array().unwrap(),
source_response["hits"]["hits"].as_array().unwrap()
);
assert_ok_and_same_data(target.await, source.await).await;

shotover.shutdown_and_then_consume_events(&[]).await;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ sources:
listen_addr: "127.0.0.1:9200"
chain:
- Tee:
# behavior: LogWarningOnMismatch
behavior: Ignore
behavior: LogWarningOnMismatch
buffer_size: 10000
chain:
- OpenSearchSinkSingle:
Expand Down

0 comments on commit 632f251

Please sign in to comment.