Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance improvement #107

Merged
merged 16 commits into from
Feb 13, 2024
Merged
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.20.4
go-version: 1.22

- name: Build
run: go build -v ./...
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.20.4
go-version: 1.22
- name: Install Protoc
uses: arduino/setup-protoc@v1
-
Expand Down
18 changes: 8 additions & 10 deletions client/worker_event_processing.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
let height;
let width;
let wsURL;
let eventURL;
let portrait;
let draw;
let latestX;
let latestY;
// Constants for the maximum values from the WebSocket messages
const MAX_X_VALUE = 15725;
const MAX_Y_VALUE = 20966;

Expand All @@ -16,7 +15,7 @@ onmessage = (event) => {
case 'init':
height = event.data.height;
width = event.data.width;
wsURL = event.data.wsURL;
eventURL = event.data.eventURL;
portrait = event.data.portrait;
initiateEventsListener();
break;
Expand All @@ -33,10 +32,9 @@ onmessage = (event) => {


async function initiateEventsListener() {
const RETRY_DELAY_MS = 3000; // Delay before retrying the connection (in milliseconds)
ws = new WebSocket(wsURL);
const eventSource = new EventSource(eventURL);
draw = true;
ws.onmessage = (event) => {
eventSource.onmessage = (event) => {
const message = JSON.parse(event.data);
if (message.Type === 3) {
if (message.Code === 24) {
Expand Down Expand Up @@ -70,21 +68,21 @@ async function initiateEventsListener() {
}
}

ws.onerror = () => {
eventSource.onerror = () => {
postMessage({
type: 'error',
message: "websocket error",
});
console.error('WebSocket error occurred. Attempting to reconnect...');
console.error('EventStrean error occurred. Attempting to reconnect...');
//setTimeout(connectWebSocket, 3000); // Reconnect after 3 seconds
};

ws.onclose = () => {
eventSource.onclose = () => {
postMessage({
type: 'error',
message: 'closed connection'
});
console.log('WebSocket connection closed. Attempting to reconnect...');
console.log('EventStream connection closed. Attempting to reconnect...');
//setTimeout(connectWebSocket, 3000); // Reconnect after 3 seconds
};
}
Expand Down
2 changes: 0 additions & 2 deletions client/worker_gesture_processing.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
let wsURL;
// Constants for the maximum values from the WebSocket messages
const SWIPE_DISTANCE = 200;

Expand All @@ -7,7 +6,6 @@ onmessage = (event) => {

switch (data.type) {
case 'init':
wsURL = event.data.wsURL;
fetchStream();
break;
case 'terminate':
Expand Down
10 changes: 8 additions & 2 deletions client/worker_stream_processing.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,19 @@ async function initiateStream() {
case 30: // red
imageData[offset+3] = 0;
break;
case 10: // red
case 6: // red
imageData[offset] = 255;
imageData[offset+1] = 0;
imageData[offset+2] = 0;
imageData[offset+3] = 255;
break;
case 18: // blue
case 8: // red
imageData[offset] = 255;
imageData[offset+1] = 0;
imageData[offset+2] = 0;
imageData[offset+3] = 255;
break;
case 12: // blue
imageData[offset] = 0;
imageData[offset+1] = 0;
imageData[offset+2] = 255;
Expand Down
7 changes: 2 additions & 5 deletions client/workersHandling.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,17 @@ streamWorker.onmessage = (event) => {


// Determine the WebSocket protocol based on the current window protocol
const wsProtocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const wsURL = `${wsProtocol}//${window.location.host}/events`;
const wsGestureURL = `${wsProtocol}//${window.location.host}/gestures`;
const eventURL = `/events`;
// Send the OffscreenCanvas to the worker for initialization
eventWorker.postMessage({
type: 'init',
width: width,
height: height,
portrait: portrait,
wsURL: wsURL
eventURL: eventURL
});
gestureWorker.postMessage({
type: 'init',
wsURL: wsGestureURL
});

gestureWorker.onmessage = (event) => {
Expand Down
5 changes: 1 addition & 4 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
module github.com/owulveryck/goMarkableStream

go 1.20
go 1.22

require (
github.com/gobwas/ws v1.3.1
github.com/kelseyhightower/envconfig v1.4.0
golang.ngrok.com/ngrok v1.4.1
)

require (
github.com/go-stack/stack v1.8.1 // indirect
github.com/gobwas/httphead v0.1.0 // indirect
github.com/gobwas/pool v0.2.1 // indirect
github.com/inconshreveable/log15 v3.0.0-testing.3+incompatible // indirect
github.com/inconshreveable/log15/v3 v3.0.0-testing.5 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
Expand Down
14 changes: 7 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-stack/stack v1.8.1 h1:ntEHSVwIt7PNXNpgPmVfMrNhLtgjlmnZha2kOpuRiDw=
github.com/go-stack/stack v1.8.1/go.mod h1:dcoOX6HbPZSZptuspn9bctJ+N/CnF5gGygcUP3XYfe4=
github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU=
github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM=
github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og=
github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.3.1 h1:Qi34dfLMWJbiKaNbDVzM9x27nZBjmkaW6i4+Ku+pGVU=
github.com/gobwas/ws v1.3.1/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/hashicorp/yamux v0.1.1 h1:yrQxtgseBDrq9Y652vSRDvsKCJKOUD+GzTS4Y0Y8pvE=
github.com/hashicorp/yamux v0.1.1/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ=
github.com/inconshreveable/log15 v3.0.0-testing.3+incompatible h1:zaX5fYT98jX5j4UhO/WbfY8T1HkgVrydiDMC9PWqGCo=
github.com/inconshreveable/log15 v3.0.0-testing.3+incompatible/go.mod h1:cOaXtrgN4ScfRrD9Bre7U1thNq5RtJ8ZoP4iXVGRj6o=
github.com/inconshreveable/log15/v3 v3.0.0-testing.5 h1:h4e0f3kjgg+RJBlKOabrohjHe47D3bbAB9BgMrc3DYA=
Expand All @@ -24,18 +21,20 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk
github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
golang.ngrok.com/muxado/v2 v2.0.0 h1:bu9eIDhRdYNtIXNnqat/HyMeHYOAbUH55ebD7gTvW6c=
golang.ngrok.com/muxado/v2 v2.0.0/go.mod h1:wzxJYX4xiAtmwumzL+QsukVwFRXmPNv86vB8RPpOxyM=
golang.ngrok.com/ngrok v1.4.1 h1:z53H/hAqSJf+K5wL3v4m01Dp4rU0wcf323iMPBQ27QA=
golang.ngrok.com/ngrok v1.4.1/go.mod h1:8a8GVoqR305t0O51ld211Xq2UeKgm32o8px24ddvXZI=
golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g=
golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0=
golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.8.0 h1:n5xxQn2i3PC0yLAbjTpNT85q/Kgzcr2gIoX9OrJUols=
Expand All @@ -45,3 +44,4 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
24 changes: 9 additions & 15 deletions internal/eventhttphandler/pen_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@ package eventhttphandler

import (
"encoding/json"
"log"
"fmt"
"net/http"

"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"

"github.com/owulveryck/goMarkableStream/internal/events"
"github.com/owulveryck/goMarkableStream/internal/pubsub"
)
Expand All @@ -26,15 +23,14 @@ type EventHandler struct {

// ServeHTTP implements http.Handler
func (h *EventHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
conn, _, _, err := ws.UpgradeHTTP(r, w)
if err != nil {
http.Error(w, "cannot upgrade connection "+err.Error(), http.StatusInternalServerError)
return
}
eventC := h.inputEventBus.Subscribe("eventListener")
defer func() {
h.inputEventBus.Unsubscribe(eventC)
}()
// Set necessary headers to indicate a stream
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")

for {
select {
Expand All @@ -53,12 +49,10 @@ func (h *EventHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, "cannot send json encode the message "+err.Error(), http.StatusInternalServerError)
return
}
// Send the JSON message to the WebSocket client
err = wsutil.WriteServerText(conn, jsonMessage)
if err != nil {
log.Println(err)
return
}
// Send the event
fmt.Fprintf(w, "data: %s\n\n", jsonMessage)
w.(http.Flusher).Flush() // Ensure client receives the message immediately

}
}
}
42 changes: 30 additions & 12 deletions internal/rle/rle.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@ import (

var encodedPool = sync.Pool{
New: func() interface{} {
return make([]uint8, 0, remarkable.ScreenHeight*remarkable.ScreenWidth)
return new(bytes.Buffer)
},
}

var bufferPool = sync.Pool{
New: func() any {
return make([]byte, 0, remarkable.ScreenHeight*remarkable.ScreenWidth*2)
},
}

Expand Down Expand Up @@ -38,26 +44,38 @@ func (rlewriter *RLE) Write(data []byte) (int, error) {
if length == 0 {
return 0, nil
}
encoded := encodedPool.Get().([]uint8) // Borrow a slice from the pool
defer encodedPool.Put(encoded)
buf := bufferPool.Get().([]uint8)
defer bufferPool.Put(buf)

current := data[0]
count := 0
count := uint8(0)

for _, datum := range data {
for i := 0; i < remarkable.ScreenHeight*remarkable.ScreenWidth*2; i += 2 {
datum := data[i]
if count < 254 && datum == current {
count++
} else {
encoded = append(encoded, uint8(count))
encoded = append(encoded, uint8(current))
buf = append(buf, count)
buf = append(buf, current)
current = datum
count = 1
}
}
/*
for i := 0; i < remarkable.ScreenWidth*remarkable.ScreenHeight; i++ {
datum := data[i*2]
if count < 254 && datum == current {
count++
} else {
buf = append(buf, count)
buf = append(buf, current)
current = datum
count = 1
}
}
*/
buf = append(buf, count)
buf = append(buf, current)

encoded = append(encoded, uint8(count))
encoded = append(encoded, uint8(current))

n, err := io.Copy(rlewriter.sub, bytes.NewBuffer(encoded))
return int(n), err
return rlewriter.sub.Write(buf)
}
29 changes: 29 additions & 0 deletions internal/stream/benchfetchandsend_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package stream

import (
"testing"

"github.com/owulveryck/goMarkableStream/internal/rle"
)

func BenchmarkFetchAndSend(b *testing.B) {
// Setup: Create a large enough mockReaderAt to test performance
width, height := 2872, 2404 // Example size; adjust based on your needs
mockReader := NewMockReaderAt(width, height) // Using the mock from the previous example

handler := StreamHandler{
file: mockReader,
pointerAddr: 0,
}

mockWriter := NewMockResponseWriter()

rleWriter := rle.NewRLE(mockWriter)

data := make([]byte, width*height) // Adjust based on your payload size

b.ResetTimer() // Start timing here
for i := 0; i < b.N; i++ {
handler.fetchAndSend(rleWriter, data)
}
}
Loading
Loading