Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Reproduce flaky OutboxProcessorIntegrationTest.java #162

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ allprojects {

testLogging {
events(SKIPPED, PASSED, FAILED)
showStandardStreams = false // change to true to get log output from tests
showStandardStreams = true // change to true to get log output from tests
exceptionFormat = FULL
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

import eu.rekawek.toxiproxy.Proxy;
import eu.rekawek.toxiproxy.ToxiproxyClient;
import eu.rekawek.toxiproxy.model.Toxic;
import eu.rekawek.toxiproxy.model.ToxicDirection;
import lombok.SneakyThrows;
import org.testcontainers.containers.ToxiproxyContainer;

import java.io.IOException;
Expand All @@ -38,6 +40,7 @@ public interface ProxiedContainerSupport {
* Cuts the connection by setting bandwidth in both directions to zero.
* @param shouldCutConnection true if the connection should be cut, or false if it should be re-enabled
*/
@SneakyThrows
default void setConnectionCut(boolean shouldCutConnection) {
synchronized (isCurrentlyCut) {
if (shouldCutConnection != isCurrentlyCut.get()) {
Expand All @@ -47,8 +50,8 @@ default void setConnectionCut(boolean shouldCutConnection) {
getProxy().toxics().bandwidth(CUT_CONNECTION_UPSTREAM, ToxicDirection.UPSTREAM, 0);
isCurrentlyCut.set(true);
} else {
getProxy().toxics().get(CUT_CONNECTION_DOWNSTREAM).remove();
getProxy().toxics().get(CUT_CONNECTION_UPSTREAM).remove();
removeToxicIfPresent(CUT_CONNECTION_DOWNSTREAM);
removeToxicIfPresent(CUT_CONNECTION_UPSTREAM);
isCurrentlyCut.set(false);
}
} catch (IOException e) {
Expand All @@ -58,6 +61,24 @@ default void setConnectionCut(boolean shouldCutConnection) {
}
}

private void removeToxicIfPresent(String name) throws IOException {
Toxic toxic = getToxic(name);
if (toxic != null) {
toxic.remove();
}
}

private Toxic getToxic(String name) {
try {
return getProxy().toxics().get(name);
} catch (IOException e) {
if (e.getMessage().contains("[404]")) {
return null;
}
throw new RuntimeException("Could not get toxic", e);
}
}

static Proxy createProxy(String service, ToxiproxyContainer toxiproxy, int exposedPort) {
final ToxiproxyClient toxiproxyClient = new ToxiproxyClient(toxiproxy.getHost(), toxiproxy.getControlPort());
Proxy proxy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

@FlywayTest
@SuppressWarnings({"unused", "ConstantConditions"})
Expand Down Expand Up @@ -167,8 +168,13 @@ void should_processRecordsInOrder_whenKafkaIsTemporarilyNotAvailable() {

// then
List<OutboxRecord> outboxRecords = getSortedById(outboxRecordMonos);
Iterator<ConsumerRecord<String, byte[]>> kafkaRecordsIter = consumeAndDeduplicateRecords(outboxRecords.size(), Duration.ofSeconds(30))
.iterator();
Collection<ConsumerRecord<String, byte[]>> kafkaRecords = consumeAndDeduplicateRecords(outboxRecords.size(), Duration.ofSeconds(30));

List<Long> outboxRecordIds = outboxRecords.stream().map(OutboxRecord::getId).toList();
List<Long> kafkaRecordIds = kafkaRecords.stream().map(rec -> toLong(rec.headers().lastHeader(HEADERS_SEQUENCE_NAME).value())).toList();
assertEquals(outboxRecordIds, kafkaRecordIds, "OutboxRecord ids and Kafka record ids do not match");

Iterator<ConsumerRecord<String, byte[]>> kafkaRecordsIter = kafkaRecords.iterator();
for (OutboxRecord outboxRecord : outboxRecords) {
assertConsumedRecord(outboxRecord, eventSource, kafkaRecordsIter.next());
}
Expand Down
Loading