Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance improvement #107

Merged
merged 16 commits into from
Feb 13, 2024
Merged
18 changes: 8 additions & 10 deletions client/worker_event_processing.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
let height;
let width;
let wsURL;
let eventURL;
let portrait;
let draw;
let latestX;
let latestY;
// Constants for the maximum values from the WebSocket messages
const MAX_X_VALUE = 15725;
const MAX_Y_VALUE = 20966;

Expand All @@ -16,7 +15,7 @@ onmessage = (event) => {
case 'init':
height = event.data.height;
width = event.data.width;
wsURL = event.data.wsURL;
eventURL = event.data.eventURL;
portrait = event.data.portrait;
initiateEventsListener();
break;
Expand All @@ -33,10 +32,9 @@ onmessage = (event) => {


async function initiateEventsListener() {
const RETRY_DELAY_MS = 3000; // Delay before retrying the connection (in milliseconds)
ws = new WebSocket(wsURL);
const eventSource = new EventSource(eventURL);
draw = true;
ws.onmessage = (event) => {
eventSource.onmessage = (event) => {
const message = JSON.parse(event.data);
if (message.Type === 3) {
if (message.Code === 24) {
Expand Down Expand Up @@ -70,21 +68,21 @@ async function initiateEventsListener() {
}
}

ws.onerror = () => {
eventSource.onerror = () => {
postMessage({
type: 'error',
message: "websocket error",
});
console.error('WebSocket error occurred. Attempting to reconnect...');
console.error('EventStrean error occurred. Attempting to reconnect...');
//setTimeout(connectWebSocket, 3000); // Reconnect after 3 seconds
};

ws.onclose = () => {
eventSource.onclose = () => {
postMessage({
type: 'error',
message: 'closed connection'
});
console.log('WebSocket connection closed. Attempting to reconnect...');
console.log('EventStream connection closed. Attempting to reconnect...');
//setTimeout(connectWebSocket, 3000); // Reconnect after 3 seconds
};
}
Expand Down
2 changes: 0 additions & 2 deletions client/worker_gesture_processing.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
let wsURL;
// Constants for the maximum values from the WebSocket messages
const SWIPE_DISTANCE = 200;

Expand All @@ -7,7 +6,6 @@ onmessage = (event) => {

switch (data.type) {
case 'init':
wsURL = event.data.wsURL;
fetchStream();
break;
case 'terminate':
Expand Down
10 changes: 8 additions & 2 deletions client/worker_stream_processing.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,19 @@ async function initiateStream() {
case 30: // red
imageData[offset+3] = 0;
break;
case 10: // red
case 6: // red
imageData[offset] = 255;
imageData[offset+1] = 0;
imageData[offset+2] = 0;
imageData[offset+3] = 255;
break;
case 18: // blue
case 8: // red
imageData[offset] = 255;
imageData[offset+1] = 0;
imageData[offset+2] = 0;
imageData[offset+3] = 255;
break;
case 12: // blue
imageData[offset] = 0;
imageData[offset+1] = 0;
imageData[offset+2] = 255;
Expand Down
7 changes: 2 additions & 5 deletions client/workersHandling.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,17 @@ streamWorker.onmessage = (event) => {


// Determine the WebSocket protocol based on the current window protocol
const wsProtocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const wsURL = `${wsProtocol}//${window.location.host}/events`;
const wsGestureURL = `${wsProtocol}//${window.location.host}/gestures`;
const eventURL = `/events`;
// Send the OffscreenCanvas to the worker for initialization
eventWorker.postMessage({
type: 'init',
width: width,
height: height,
portrait: portrait,
wsURL: wsURL
eventURL: eventURL
});
gestureWorker.postMessage({
type: 'init',
wsURL: wsGestureURL
});

gestureWorker.onmessage = (event) => {
Expand Down
3 changes: 0 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@ module github.com/owulveryck/goMarkableStream
go 1.20

require (
github.com/gobwas/ws v1.3.1
github.com/kelseyhightower/envconfig v1.4.0
golang.ngrok.com/ngrok v1.4.1
)

require (
github.com/go-stack/stack v1.8.1 // indirect
github.com/gobwas/httphead v0.1.0 // indirect
github.com/gobwas/pool v0.2.1 // indirect
github.com/inconshreveable/log15 v3.0.0-testing.3+incompatible // indirect
github.com/inconshreveable/log15/v3 v3.0.0-testing.5 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
Expand Down
7 changes: 0 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/go-stack/stack v1.8.1 h1:ntEHSVwIt7PNXNpgPmVfMrNhLtgjlmnZha2kOpuRiDw=
github.com/go-stack/stack v1.8.1/go.mod h1:dcoOX6HbPZSZptuspn9bctJ+N/CnF5gGygcUP3XYfe4=
github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU=
github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM=
github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og=
github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.3.1 h1:Qi34dfLMWJbiKaNbDVzM9x27nZBjmkaW6i4+Ku+pGVU=
github.com/gobwas/ws v1.3.1/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
Expand Down Expand Up @@ -35,7 +29,6 @@ golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g=
golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.8.0 h1:n5xxQn2i3PC0yLAbjTpNT85q/Kgzcr2gIoX9OrJUols=
Expand Down
24 changes: 9 additions & 15 deletions internal/eventhttphandler/pen_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@ package eventhttphandler

import (
"encoding/json"
"log"
"fmt"
"net/http"

"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"

"github.com/owulveryck/goMarkableStream/internal/events"
"github.com/owulveryck/goMarkableStream/internal/pubsub"
)
Expand All @@ -26,15 +23,14 @@ type EventHandler struct {

// ServeHTTP implements http.Handler
func (h *EventHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
conn, _, _, err := ws.UpgradeHTTP(r, w)
if err != nil {
http.Error(w, "cannot upgrade connection "+err.Error(), http.StatusInternalServerError)
return
}
eventC := h.inputEventBus.Subscribe("eventListener")
defer func() {
h.inputEventBus.Unsubscribe(eventC)
}()
// Set necessary headers to indicate a stream
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")

for {
select {
Expand All @@ -53,12 +49,10 @@ func (h *EventHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, "cannot send json encode the message "+err.Error(), http.StatusInternalServerError)
return
}
// Send the JSON message to the WebSocket client
err = wsutil.WriteServerText(conn, jsonMessage)
if err != nil {
log.Println(err)
return
}
// Send the event
fmt.Fprintf(w, "data: %s\n\n", jsonMessage)
w.(http.Flusher).Flush() // Ensure client receives the message immediately

}
}
}
42 changes: 30 additions & 12 deletions internal/rle/rle.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@ import (

var encodedPool = sync.Pool{
New: func() interface{} {
return make([]uint8, 0, remarkable.ScreenHeight*remarkable.ScreenWidth)
return new(bytes.Buffer)
},
}

var bufferPool = sync.Pool{
New: func() any {
return make([]byte, 0, remarkable.ScreenHeight*remarkable.ScreenWidth*2)
},
}

Expand Down Expand Up @@ -38,26 +44,38 @@ func (rlewriter *RLE) Write(data []byte) (int, error) {
if length == 0 {
return 0, nil
}
encoded := encodedPool.Get().([]uint8) // Borrow a slice from the pool
defer encodedPool.Put(encoded)
buf := bufferPool.Get().([]uint8)
defer bufferPool.Put(buf)

current := data[0]
count := 0
count := uint8(0)

for _, datum := range data {
for i := 0; i < remarkable.ScreenHeight*remarkable.ScreenWidth*2; i += 2 {
datum := data[i]
if count < 254 && datum == current {
count++
} else {
encoded = append(encoded, uint8(count))
encoded = append(encoded, uint8(current))
buf = append(buf, count)
buf = append(buf, current)
current = datum
count = 1
}
}
/*
for i := 0; i < remarkable.ScreenWidth*remarkable.ScreenHeight; i++ {
datum := data[i*2]
if count < 254 && datum == current {
count++
} else {
buf = append(buf, count)
buf = append(buf, current)
current = datum
count = 1
}
}
*/
buf = append(buf, count)
buf = append(buf, current)

encoded = append(encoded, uint8(count))
encoded = append(encoded, uint8(current))

n, err := io.Copy(rlewriter.sub, bytes.NewBuffer(encoded))
return int(n), err
return rlewriter.sub.Write(buf)
}
29 changes: 29 additions & 0 deletions internal/stream/benchfetchandsend_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package stream

import (
"testing"

"github.com/owulveryck/goMarkableStream/internal/rle"
)

func BenchmarkFetchAndSend(b *testing.B) {
// Setup: Create a large enough mockReaderAt to test performance
width, height := 2872, 2404 // Example size; adjust based on your needs
mockReader := NewMockReaderAt(width, height) // Using the mock from the previous example

handler := StreamHandler{
file: mockReader,
pointerAddr: 0,
}

mockWriter := NewMockResponseWriter()

rleWriter := rle.NewRLE(mockWriter)

data := make([]byte, width*height) // Adjust based on your payload size

b.ResetTimer() // Start timing here
for i := 0; i < b.N; i++ {
handler.fetchAndSend(rleWriter, data)
}
}
51 changes: 34 additions & 17 deletions internal/stream/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/owulveryck/goMarkableStream/internal/events"
"github.com/owulveryck/goMarkableStream/internal/pubsub"
"github.com/owulveryck/goMarkableStream/internal/remarkable"
"github.com/owulveryck/goMarkableStream/internal/rle"
Expand Down Expand Up @@ -81,7 +82,6 @@ func (h *StreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer rawFrameBuffer.Put(rawData) // Return the slice to the pool when done
// the informations are int4, therefore store it in a uint8array to reduce data transfer
rleWriter := rle.NewRLE(w)
extractor := &oneOutOfTwo{rleWriter}
writing := true
stopWriting := time.NewTicker(2 * time.Second)
defer stopWriting.Stop()
Expand All @@ -95,27 +95,44 @@ func (h *StreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
select {
case <-r.Context().Done():
return
case <-eventC:
writing = true
stopWriting.Reset(2 * time.Second)
case event := <-eventC:
if event.Code == 24 || event.Source == events.Touch {
writing = true
stopWriting.Reset(2000 * time.Millisecond)
}
case <-stopWriting.C:
writing = false
case <-ticker.C:
if writing {
_, err := h.file.ReadAt(rawData, h.pointerAddr)
if err != nil {
log.Println(err)
return
}
_, err = extractor.Write(rawData)
if err != nil {
log.Println("Error in writing", err)
return
}
if w, ok := w.(http.Flusher); ok {
w.Flush()
}
h.fetchAndSend(rleWriter, rawData)
}
}
}
}

func (h *StreamHandler) fetchAndSend(w io.Writer, rawData []uint8) {
_, err := h.file.ReadAt(rawData, h.pointerAddr)
if err != nil {
log.Println(err)
return
}
_, err = w.Write(rawData)
if err != nil {
log.Println("Error in writing", err)
return
}
if w, ok := w.(http.Flusher); ok {
w.Flush()
}
}

func sum(d []uint8) int {
val := 0 // Assuming `int` is large enough to avoid overflow
// Manual loop unrolling could be done here, but it's typically not recommended
// for readability and maintenance reasons unless profiling identifies this loop
// as a significant bottleneck.
for _, v := range d {
val += int(v)
}
return val
}
Loading
Loading