From 11becdec707ee22c879a40320d26149ac73cc90c Mon Sep 17 00:00:00 2001 From: Beria Date: Tue, 23 Aug 2016 13:19:44 +0530 Subject: [PATCH] Buffer append output results + fix extra incorrect results ### What is this PR for? There are 2 issues and their proposed fixes: 1. On a paragraph run, for every line of output, there is a broadcast of the new line from zeppelin. In case of thousands of lines of output, the browser/s would hang because of the volume of these append-output events. 2. In the above case, besides the browser-hang, another bug observed is that result data is will repeated twice (coming from append-output calls + finish-event calls). The proposed solution for #1 is: - Buffer the append-output event into a queue instead of sending the event immediately. - In a separate thread, read from the queue periodically and send the append-output event. Solution for #2 is: - Donot append output to result if the paragraph is not runnig. ### What type of PR is it? Improvement + Bug Fix ### Todos ### What is the Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-1292 ### How should this be tested? The test could be to run a simple paragraph with large result. Eg: ``` %sh for i in {1..10000} do echo $i done ``` PS: One will need to clear browser cache between running with and without this code patch since there are javascript changes as well. ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? It could need for the design. Otherwise I have added code comments explaining behaviour. Author: Beria Closes #1283 from beriaanirudh/ZEPPELIN-1292 and squashes the following commits: 17f0524 [Beria] Use diamond operator 7852368 [Beria] nit 4b68c86 [Beria] fix checkstyle d168614 [Beria] Remove un-necessary class CheckAppendOutputRunner 2eae38e [Beria] Make AppendOutputRunner non-static 72c316d [Beria] Scheduler service to replace while loop in AppendOutputRunner 599281f [Beria] fix unit tests that run after dd24816 [Beria] Add license in test file 3984ef8 [Beria] fix tests when ran with other tests 1c893c0 [Beria] Add licensing 1bdd669 [Beria] fix javadoc comment 27790e4 [Beria] Avoid infinite loop in tests 5057bb3 [Beria] Incorporate feedback 1. Synchronize on AppendOutputRunner creation 2. Use ScheduledExecutorService instead of while loop 3. Remove Thread.sleep() from tests 82e9c4a [Beria] Fix comment 7020f0c [Beria] Buffer append output results + fix extra incorrect results --- .../remote/AppendOutputBuffer.java | 48 ++++ .../remote/AppendOutputRunner.java | 118 +++++++++ .../remote/RemoteInterpreterEventPoller.java | 14 +- .../remote/AppendOutputRunnerTest.java | 235 ++++++++++++++++++ .../paragraph/paragraph.controller.js | 11 +- 5 files changed, 424 insertions(+), 2 deletions(-) create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java create mode 100644 zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java new file mode 100644 index 00000000000..e1484dabaaa --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +/** + * This element stores the buffered + * append-data of paragraph's output. + */ +public class AppendOutputBuffer { + + private String noteId; + private String paragraphId; + private String data; + + public AppendOutputBuffer(String noteId, String paragraphId, String data) { + this.noteId = noteId; + this.paragraphId = paragraphId; + this.data = data; + } + + public String getNoteId() { + return noteId; + } + + public String getParagraphId() { + return paragraphId; + } + + public String getData() { + return data; + } + +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java new file mode 100644 index 00000000000..86ea11a8bda --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This thread sends paragraph's append-data + * periodically, rather than continously, with + * a period of BUFFER_TIME_MS. It handles append-data + * for all paragraphs across all notebooks. + */ +public class AppendOutputRunner implements Runnable { + + private static final Logger logger = + LoggerFactory.getLogger(AppendOutputRunner.class); + public static final Long BUFFER_TIME_MS = new Long(100); + private static final Long SAFE_PROCESSING_TIME = new Long(10); + private static final Long SAFE_PROCESSING_STRING_SIZE = new Long(100000); + + private final BlockingQueue queue = new LinkedBlockingQueue<>(); + private final RemoteInterpreterProcessListener listener; + + public AppendOutputRunner(RemoteInterpreterProcessListener listener) { + this.listener = listener; + } + + @Override + public void run() { + + Map > noteMap = new HashMap<>(); + List list = new LinkedList<>(); + + /* "drainTo" method does not wait for any element + * to be present in the queue, and thus this loop would + * continuosly run (with period of BUFFER_TIME_MS). "take()" method + * waits for the queue to become non-empty and then removes + * one element from it. Rest elements from queue (if present) are + * removed using "drainTo" method. Thus we save on some un-necessary + * cpu-cycles. + */ + try { + list.add(queue.take()); + } catch (InterruptedException e) { + logger.error("Wait for OutputBuffer queue interrupted: " + e.getMessage()); + } + Long processingStartTime = System.currentTimeMillis(); + queue.drainTo(list); + + for (AppendOutputBuffer buffer: list) { + String noteId = buffer.getNoteId(); + String paragraphId = buffer.getParagraphId(); + + Map paragraphMap = (noteMap.containsKey(noteId)) ? + noteMap.get(noteId) : new HashMap(); + StringBuilder builder = paragraphMap.containsKey(paragraphId) ? + paragraphMap.get(paragraphId) : new StringBuilder(); + + builder.append(buffer.getData()); + paragraphMap.put(paragraphId, builder); + noteMap.put(noteId, paragraphMap); + } + Long processingTime = System.currentTimeMillis() - processingStartTime; + + if (processingTime > SAFE_PROCESSING_TIME) { + logger.warn("Processing time for buffered append-output is high: " + + processingTime + " milliseconds."); + } else { + logger.debug("Processing time for append-output took " + + processingTime + " milliseconds"); + } + + Long sizeProcessed = new Long(0); + for (String noteId: noteMap.keySet()) { + for (String paragraphId: noteMap.get(noteId).keySet()) { + String data = noteMap.get(noteId).get(paragraphId).toString(); + sizeProcessed += data.length(); + listener.onOutputAppend(noteId, paragraphId, data); + } + } + + if (sizeProcessed > SAFE_PROCESSING_STRING_SIZE) { + logger.warn("Processing size for buffered append-output is high: " + + sizeProcessed + " characters."); + } else { + logger.debug("Processing size for append-output is " + + sizeProcessed + " characters"); + } + } + + public void appendBuffer(String noteId, String paragraphId, String outputToAppend) { + queue.offer(new AppendOutputBuffer(noteId, paragraphId, outputToAppend)); + } + +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java index 48c14d50bde..090aeeaaee3 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java @@ -39,12 +39,18 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; /** * Processes message from RemoteInterpreter process */ public class RemoteInterpreterEventPoller extends Thread { private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterEventPoller.class); + private static final ScheduledExecutorService appendService = + Executors.newSingleThreadScheduledExecutor(); private final RemoteInterpreterProcessListener listener; private final ApplicationEventListener appListener; @@ -72,6 +78,9 @@ public void setInterpreterGroup(InterpreterGroup interpreterGroup) { @Override public void run() { Client client = null; + AppendOutputRunner runner = new AppendOutputRunner(listener); + ScheduledFuture appendFuture = appendService.scheduleWithFixedDelay( + runner, 0, AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS); while (!shutdown) { // wait and retry @@ -157,7 +166,7 @@ public void run() { String appId = outputAppend.get("appId"); if (appId == null) { - listener.onOutputAppend(noteId, paragraphId, outputToAppend); + runner.appendBuffer(noteId, paragraphId, outputToAppend); } else { appListener.onOutputAppend(noteId, paragraphId, appId, outputToAppend); } @@ -192,6 +201,9 @@ public void run() { logger.error("Can't handle event " + event, e); } } + if (appendFuture != null) { + appendFuture.cancel(true); + } } private void sendResourcePoolResponseGetAll(ResourceSet resourceSet) { diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java new file mode 100644 index 00000000000..8e9f5b361e6 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; +import org.junit.After; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class AppendOutputRunnerTest { + + private static final int NUM_EVENTS = 10000; + private static final int NUM_CLUBBED_EVENTS = 100; + private static final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); + private static ScheduledFuture future = null; + /* It is being accessed by multiple threads. + * While loop for 'loopForBufferCompletion' could + * run for-ever. + */ + private volatile static int numInvocations = 0; + + @After + public void afterEach() { + if (future != null) { + future.cancel(true); + } + } + + @Test + public void testSingleEvent() throws InterruptedException { + RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); + String[][] buffer = {{"note", "para", "data\n"}}; + + loopForCompletingEvents(listener, 1, buffer); + verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), any(String.class)); + verify(listener, times(1)).onOutputAppend("note", "para", "data\n"); + } + + @Test + public void testMultipleEventsOfSameParagraph() throws InterruptedException { + RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); + String note1 = "note1"; + String para1 = "para1"; + String[][] buffer = { + {note1, para1, "data1\n"}, + {note1, para1, "data2\n"}, + {note1, para1, "data3\n"} + }; + + loopForCompletingEvents(listener, 1, buffer); + verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), any(String.class)); + verify(listener, times(1)).onOutputAppend(note1, para1, "data1\ndata2\ndata3\n"); + } + + @Test + public void testMultipleEventsOfDifferentParagraphs() throws InterruptedException { + RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); + String note1 = "note1"; + String note2 = "note2"; + String para1 = "para1"; + String para2 = "para2"; + String[][] buffer = { + {note1, para1, "data1\n"}, + {note1, para2, "data2\n"}, + {note2, para1, "data3\n"}, + {note2, para2, "data4\n"} + }; + loopForCompletingEvents(listener, 4, buffer); + + verify(listener, times(4)).onOutputAppend(any(String.class), any(String.class), any(String.class)); + verify(listener, times(1)).onOutputAppend(note1, para1, "data1\n"); + verify(listener, times(1)).onOutputAppend(note1, para2, "data2\n"); + verify(listener, times(1)).onOutputAppend(note2, para1, "data3\n"); + verify(listener, times(1)).onOutputAppend(note2, para2, "data4\n"); + } + + @Test + public void testClubbedData() throws InterruptedException { + RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); + AppendOutputRunner runner = new AppendOutputRunner(listener); + future = service.scheduleWithFixedDelay(runner, 0, + AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS); + Thread thread = new Thread(new BombardEvents(runner)); + thread.start(); + thread.join(); + Thread.sleep(1000); + + /* NUM_CLUBBED_EVENTS is a heuristic number. + * It has been observed that for 10,000 continuos event + * calls, 30-40 Web-socket calls are made. Keeping + * the unit-test to a pessimistic 100 web-socket calls. + */ + verify(listener, atMost(NUM_CLUBBED_EVENTS)).onOutputAppend(any(String.class), any(String.class), any(String.class)); + } + + @Test + public void testWarnLoggerForLargeData() throws InterruptedException { + RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); + AppendOutputRunner runner = new AppendOutputRunner(listener); + String data = "data\n"; + int numEvents = 100000; + + for (int i=0; i log; + + int warnLogCounter; + LoggingEvent sizeWarnLogEntry = null; + do { + warnLogCounter = 0; + log = appender.getLog(); + for (LoggingEvent logEntry: log) { + if (Level.WARN.equals(logEntry.getLevel())) { + sizeWarnLogEntry = logEntry; + warnLogCounter += 1; + } + } + } while(warnLogCounter != 2); + + String loggerString = "Processing size for buffered append-output is high: " + + (data.length() * numEvents) + " characters."; + assertTrue(loggerString.equals(sizeWarnLogEntry.getMessage())); + } + + private class BombardEvents implements Runnable { + + private final AppendOutputRunner runner; + + private BombardEvents(AppendOutputRunner runner) { + this.runner = runner; + } + + @Override + public void run() { + String noteId = "noteId"; + String paraId = "paraId"; + for (int i=0; i log = new ArrayList<>(); + + @Override + public boolean requiresLayout() { + return false; + } + + @Override + protected void append(final LoggingEvent loggingEvent) { + log.add(loggingEvent); + } + + @Override + public void close() { + } + + public List getLog() { + return new ArrayList<>(log); + } + } + + private void prepareInvocationCounts(RemoteInterpreterProcessListener listener) { + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + numInvocations += 1; + return null; + } + }).when(listener).onOutputAppend(any(String.class), any(String.class), any(String.class)); + } + + private void loopForCompletingEvents(RemoteInterpreterProcessListener listener, + int numTimes, String[][] buffer) { + numInvocations = 0; + prepareInvocationCounts(listener); + AppendOutputRunner runner = new AppendOutputRunner(listener); + for (String[] bufferElement: buffer) { + runner.appendBuffer(bufferElement[0], bufferElement[1], bufferElement[2]); + } + future = service.scheduleWithFixedDelay(runner, 0, + AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS); + long startTimeMs = System.currentTimeMillis(); + while(numInvocations != numTimes) { + if (System.currentTimeMillis() - startTimeMs > 2000) { + fail("Buffered events were not sent for 2 seconds"); + } + } + } +} \ No newline at end of file diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js index 78892704bc2..302d107843f 100644 --- a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js +++ b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js @@ -2575,7 +2575,16 @@ angular.module('zeppelinWebApp').controller('ParagraphCtrl', function($scope, $r }); $scope.$on('appendParagraphOutput', function(event, data) { - if ($scope.paragraph.id === data.paragraphId) { + /* It has been observed that append events + * can be errorneously called even if paragraph + * execution has ended, and in that case, no append + * should be made. Also, it was observed that between PENDING + * and RUNNING states, append-events can be called and we can't + * miss those, else during the length of paragraph run, few + * initial output line/s will be missing. + */ + if ($scope.paragraph.id === data.paragraphId && + ($scope.paragraph.status === 'RUNNING' || $scope.paragraph.status === 'PENDING')) { if ($scope.flushStreamingOutput) { $scope.clearTextOutput(); $scope.flushStreamingOutput = false;