diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ServiceWorkflowHistoryIterator.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ServiceWorkflowHistoryIterator.java index 16dad7d58..b17e7d4f2 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ServiceWorkflowHistoryIterator.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ServiceWorkflowHistoryIterator.java @@ -72,24 +72,27 @@ class ServiceWorkflowHistoryIterator implements WorkflowHistoryIterator { } // Returns true if more history events are available. - // Server can return page tokens that point to empty pages. - // We need to verify that page is valid before returning true. - // Otherwise next() method would throw NoSuchElementException after hasNext() returning true. @Override public boolean hasNext() { if (current.hasNext()) { return true; } - if (nextPageToken.isEmpty()) { - return false; - } - - GetWorkflowExecutionHistoryResponse response = queryWorkflowExecutionHistory(); - - current = response.getHistory().getEventsList().iterator(); - nextPageToken = response.getNextPageToken(); + while (!nextPageToken.isEmpty()) { + // Server can return page tokens that point to empty pages. + // We need to verify that page is valid before returning true. + // Otherwise, next() method would throw NoSuchElementException after hasNext() returning + // true. + GetWorkflowExecutionHistoryResponse response = queryWorkflowExecutionHistory(); - return current.hasNext(); + current = response.getHistory().getEventsList().iterator(); + nextPageToken = response.getNextPageToken(); + // Server can return an empty page, but a valid nextPageToken that contains + // more events. + if (current.hasNext()) { + return true; + } + } + return false; } @Override diff --git a/temporal-sdk/src/test/java/io/temporal/internal/replay/ServiceWorkflowHistoryIteratorTest.java b/temporal-sdk/src/test/java/io/temporal/internal/replay/ServiceWorkflowHistoryIteratorTest.java index f7d011e31..809a4be57 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/replay/ServiceWorkflowHistoryIteratorTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/replay/ServiceWorkflowHistoryIteratorTest.java @@ -32,19 +32,23 @@ import org.junit.Test; public class ServiceWorkflowHistoryIteratorTest { - - public static final ByteString EMPTY_PAGE_TOKEN = - ByteString.copyFrom("empty page token", Charset.defaultCharset()); public static final ByteString NEXT_PAGE_TOKEN = ByteString.copyFrom("next token", Charset.defaultCharset()); + public static final ByteString EMPTY_HISTORY_PAGE = + ByteString.copyFrom("empty history page token", Charset.defaultCharset()); + public static final ByteString NEXT_NEXT_PAGE_TOKEN = + ByteString.copyFrom("next next token", Charset.defaultCharset()); + public static final ByteString EMPTY_PAGE_TOKEN = + ByteString.copyFrom("empty page token", Charset.defaultCharset()); /* This test Scenario verifies following things: 1. hasNext() method makes a call to the server to retrieve workflow history when current history is empty and history token is available and cached the result. 2. next() method reuses cached history when possible. - 3. hasNext() fetches an empty page and return false. - 4. next() throws NoSuchElementException when neither history no history token is available. + 3. hasNext() keeps fetching as long as the server returns a next page token. + 4. hasNext() fetches an empty page and return false. + 5. next() throws NoSuchElementException when neither history no history token is available. */ @Test public void verifyHasNextIsFalseWhenHistoryIsEmpty() { @@ -58,13 +62,22 @@ public void verifyHasNextIsFalseWhenHistoryIsEmpty() { GetWorkflowExecutionHistoryResponse queryWorkflowExecutionHistory() { timesCalledServer.incrementAndGet(); try { + History history = HistoryUtils.generateWorkflowTaskWithInitialHistory().getHistory(); if (EMPTY_PAGE_TOKEN.equals(nextPageToken)) { return GetWorkflowExecutionHistoryResponse.newBuilder().build(); + } else if (EMPTY_HISTORY_PAGE.equals(nextPageToken)) { + return GetWorkflowExecutionHistoryResponse.newBuilder() + .setNextPageToken(NEXT_NEXT_PAGE_TOKEN) + .build(); + } else if (NEXT_NEXT_PAGE_TOKEN.equals(nextPageToken)) { + return GetWorkflowExecutionHistoryResponse.newBuilder() + .setHistory(history) + .setNextPageToken(EMPTY_PAGE_TOKEN) + .build(); } - History history = HistoryUtils.generateWorkflowTaskWithInitialHistory().getHistory(); return GetWorkflowExecutionHistoryResponse.newBuilder() .setHistory(history) - .setNextPageToken(EMPTY_PAGE_TOKEN) + .setNextPageToken(EMPTY_HISTORY_PAGE) .build(); } catch (Exception e) { throw new RuntimeException(e); @@ -80,9 +93,15 @@ GetWorkflowExecutionHistoryResponse queryWorkflowExecutionHistory() { Assert.assertTrue(iterator.hasNext()); Assert.assertNotNull(iterator.next()); Assert.assertEquals(1, timesCalledServer.get()); + Assert.assertTrue(iterator.hasNext()); + Assert.assertEquals(3, timesCalledServer.get()); + Assert.assertNotNull(iterator.next()); + Assert.assertTrue(iterator.hasNext()); + Assert.assertNotNull(iterator.next()); + Assert.assertTrue(iterator.hasNext()); + Assert.assertNotNull(iterator.next()); Assert.assertFalse(iterator.hasNext()); - Assert.assertEquals(2, timesCalledServer.get()); Assert.assertThrows(NoSuchElementException.class, iterator::next); - Assert.assertEquals(2, timesCalledServer.get()); + Assert.assertEquals(4, timesCalledServer.get()); } }