Skip to content

Commit

Permalink
Buffer append output results + fix extra incorrect results
Browse files Browse the repository at this point in the history
### What is this PR for?
There are 2 issues and their proposed fixes:
1. On a paragraph run, for every line of output, there is a broadcast of the new line from zeppelin. In case of thousands of lines of output, the browser/s would hang because of the volume of these append-output events.
2. In the above case, besides the browser-hang, another bug observed is that result data is will repeated twice (coming from append-output calls + finish-event calls).

The proposed solution for #1 is:
- Buffer the append-output event into a queue instead of sending the event immediately.
- In a separate thread, read from the queue periodically and send the append-output event.

Solution for #2 is:
- Donot append output to result if the paragraph is not runnig.

### What type of PR is it?
Improvement + Bug Fix

### Todos

### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-1292

### How should this be tested?
The test could be to run a simple paragraph with large result. Eg:
```
%sh
for i in {1..10000}
do
echo $i
done
```
PS: One will need to clear browser cache between running with and without this code patch since there are javascript changes as well.

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update?
No
* Is there breaking changes for older versions?
No
* Does this needs documentation?
It could need for the design. Otherwise I have added code comments explaining behaviour.

Author: Beria <[email protected]>

Closes apache#1283 from beriaanirudh/ZEPPELIN-1292 and squashes the following commits:

17f0524 [Beria] Use diamond operator
7852368 [Beria] nit
4b68c86 [Beria] fix checkstyle
d168614 [Beria] Remove un-necessary class CheckAppendOutputRunner
2eae38e [Beria] Make AppendOutputRunner non-static
72c316d [Beria] Scheduler service to replace while loop in AppendOutputRunner
599281f [Beria] fix unit tests that run after
dd24816 [Beria] Add license in test file
3984ef8 [Beria] fix tests when ran with other tests
1c893c0 [Beria] Add licensing
1bdd669 [Beria] fix javadoc comment
27790e4 [Beria] Avoid infinite loop in tests
5057bb3 [Beria] Incorporate feedback 1. Synchronize on AppendOutputRunner creation 2. Use ScheduledExecutorService instead of while loop 3. Remove Thread.sleep() from tests
82e9c4a [Beria] Fix comment
7020f0c [Beria] Buffer append output results + fix extra incorrect results
  • Loading branch information
Beria authored and Leemoonsoo committed Sep 3, 2016
1 parent cee58aa commit 11becde
Show file tree
Hide file tree
Showing 5 changed files with 424 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zeppelin.interpreter.remote;

/**
* This element stores the buffered
* append-data of paragraph's output.
*/
public class AppendOutputBuffer {

private String noteId;
private String paragraphId;
private String data;

public AppendOutputBuffer(String noteId, String paragraphId, String data) {
this.noteId = noteId;
this.paragraphId = paragraphId;
this.data = data;
}

public String getNoteId() {
return noteId;
}

public String getParagraphId() {
return paragraphId;
}

public String getData() {
return data;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zeppelin.interpreter.remote;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This thread sends paragraph's append-data
* periodically, rather than continously, with
* a period of BUFFER_TIME_MS. It handles append-data
* for all paragraphs across all notebooks.
*/
public class AppendOutputRunner implements Runnable {

private static final Logger logger =
LoggerFactory.getLogger(AppendOutputRunner.class);
public static final Long BUFFER_TIME_MS = new Long(100);
private static final Long SAFE_PROCESSING_TIME = new Long(10);
private static final Long SAFE_PROCESSING_STRING_SIZE = new Long(100000);

private final BlockingQueue<AppendOutputBuffer> queue = new LinkedBlockingQueue<>();
private final RemoteInterpreterProcessListener listener;

public AppendOutputRunner(RemoteInterpreterProcessListener listener) {
this.listener = listener;
}

@Override
public void run() {

Map<String, Map<String, StringBuilder> > noteMap = new HashMap<>();
List<AppendOutputBuffer> list = new LinkedList<>();

/* "drainTo" method does not wait for any element
* to be present in the queue, and thus this loop would
* continuosly run (with period of BUFFER_TIME_MS). "take()" method
* waits for the queue to become non-empty and then removes
* one element from it. Rest elements from queue (if present) are
* removed using "drainTo" method. Thus we save on some un-necessary
* cpu-cycles.
*/
try {
list.add(queue.take());
} catch (InterruptedException e) {
logger.error("Wait for OutputBuffer queue interrupted: " + e.getMessage());
}
Long processingStartTime = System.currentTimeMillis();
queue.drainTo(list);

for (AppendOutputBuffer buffer: list) {
String noteId = buffer.getNoteId();
String paragraphId = buffer.getParagraphId();

Map<String, StringBuilder> paragraphMap = (noteMap.containsKey(noteId)) ?
noteMap.get(noteId) : new HashMap<String, StringBuilder>();
StringBuilder builder = paragraphMap.containsKey(paragraphId) ?
paragraphMap.get(paragraphId) : new StringBuilder();

builder.append(buffer.getData());
paragraphMap.put(paragraphId, builder);
noteMap.put(noteId, paragraphMap);
}
Long processingTime = System.currentTimeMillis() - processingStartTime;

if (processingTime > SAFE_PROCESSING_TIME) {
logger.warn("Processing time for buffered append-output is high: " +
processingTime + " milliseconds.");
} else {
logger.debug("Processing time for append-output took "
+ processingTime + " milliseconds");
}

Long sizeProcessed = new Long(0);
for (String noteId: noteMap.keySet()) {
for (String paragraphId: noteMap.get(noteId).keySet()) {
String data = noteMap.get(noteId).get(paragraphId).toString();
sizeProcessed += data.length();
listener.onOutputAppend(noteId, paragraphId, data);
}
}

if (sizeProcessed > SAFE_PROCESSING_STRING_SIZE) {
logger.warn("Processing size for buffered append-output is high: " +
sizeProcessed + " characters.");
} else {
logger.debug("Processing size for append-output is " +
sizeProcessed + " characters");
}
}

public void appendBuffer(String noteId, String paragraphId, String outputToAppend) {
queue.offer(new AppendOutputBuffer(noteId, paragraphId, outputToAppend));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,18 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
* Processes message from RemoteInterpreter process
*/
public class RemoteInterpreterEventPoller extends Thread {
private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterEventPoller.class);
private static final ScheduledExecutorService appendService =
Executors.newSingleThreadScheduledExecutor();
private final RemoteInterpreterProcessListener listener;
private final ApplicationEventListener appListener;

Expand Down Expand Up @@ -72,6 +78,9 @@ public void setInterpreterGroup(InterpreterGroup interpreterGroup) {
@Override
public void run() {
Client client = null;
AppendOutputRunner runner = new AppendOutputRunner(listener);
ScheduledFuture<?> appendFuture = appendService.scheduleWithFixedDelay(
runner, 0, AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);

while (!shutdown) {
// wait and retry
Expand Down Expand Up @@ -157,7 +166,7 @@ public void run() {
String appId = outputAppend.get("appId");

if (appId == null) {
listener.onOutputAppend(noteId, paragraphId, outputToAppend);
runner.appendBuffer(noteId, paragraphId, outputToAppend);
} else {
appListener.onOutputAppend(noteId, paragraphId, appId, outputToAppend);
}
Expand Down Expand Up @@ -192,6 +201,9 @@ public void run() {
logger.error("Can't handle event " + event, e);
}
}
if (appendFuture != null) {
appendFuture.cancel(true);
}
}

private void sendResourcePoolResponseGetAll(ResourceSet resourceSet) {
Expand Down
Loading

0 comments on commit 11becde

Please sign in to comment.