Skip to content

Commit

Permalink
feat(tcp): add methods to pause/resume reading from tcp stream
Browse files Browse the repository at this point in the history
  • Loading branch information
michaldziuba03 committed Aug 8, 2024
1 parent 90849d2 commit 40d0054
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 10 deletions.
2 changes: 1 addition & 1 deletion core/js_internals.hh

Large diffs are not rendered by default.

52 changes: 44 additions & 8 deletions core/tcp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class TcpStream {
public:
lx_connection_t *conn;
lx_timer_t timeout;
bool paused = false;
v8::Persistent<v8::Object> obj;

TcpStream(v8::Isolate *isolate, v8::Local<v8::Object> obj) {
Expand Down Expand Up @@ -41,6 +42,12 @@ class TcpStream {

v8::Local<v8::FunctionTemplate> setTimeoutTemplate = v8::FunctionTemplate::New(isolate, TcpStream::set_timeout);
t->PrototypeTemplate()->Set(isolate, "setTimeout", setTimeoutTemplate);

v8::Local<v8::FunctionTemplate> pauseTemplate = v8::FunctionTemplate::New(isolate, TcpStream::pause);
t->PrototypeTemplate()->Set(isolate, "pause", pauseTemplate);

v8::Local<v8::FunctionTemplate> resumeTemplate = v8::FunctionTemplate::New(isolate, TcpStream::resume);
t->PrototypeTemplate()->Set(isolate, "resume", resumeTemplate);

v8::Local<v8::Function> func = t->GetFunction(context).ToLocalChecked();
TcpStream::streamConstructor.Reset(isolate, func);
Expand All @@ -67,14 +74,10 @@ class TcpStream {
char *buf = strdup(*str);

lx_write_t *write_op = lx_write_alloc(buf, str.length());
write_op->data = stream;
lx_write(write_op, stream->conn, TcpStream::handle_write);
}

static void handle_write(lx_write_t *write_op, int status) {
free((void*)write_op->buf);
free(write_op);
}

static void close(const v8::FunctionCallbackInfo<v8::Value> &args) {
assert(args.Length() == 0);
TcpStream *stream = static_cast<TcpStream*>(args.This()->GetAlignedPointerFromInternalField(0));
Expand All @@ -84,20 +87,53 @@ class TcpStream {

static void set_timeout(const v8::FunctionCallbackInfo<v8::Value> &args) {
v8::Isolate *isolate = args.GetIsolate();

v8::HandleScope handle_scope(isolate);

if (args.Length() != 1 && !args[0]->IsNumber()) {
isolate->ThrowException(v8::Exception::Error(v8_str(isolate, "Expected timeout as number")));
return;
}

int64_t timeout = args[0].As<v8::Number>()->Value();
TcpStream *stream = static_cast<TcpStream*>(args.This()->GetAlignedPointerFromInternalField(0));


if (timeout == 0) {
lx_timer_stop(&stream->timeout);
return;
}

lx_timer_stop(&stream->timeout);
lx_timer_start(&stream->timeout, TcpStream::handle_timeout, timeout);
}

static void pause(const v8::FunctionCallbackInfo<v8::Value> &args) {
v8::Isolate *isolate = args.GetIsolate();
v8::HandleScope handle_scope(isolate);

TcpStream *stream = static_cast<TcpStream*>(args.This()->GetAlignedPointerFromInternalField(0));
if (!stream->paused) {
lx_stop_reading(&stream->conn->event, stream->conn->fd);
stream->paused = true;
}
}

static void resume(const v8::FunctionCallbackInfo<v8::Value> &args) {
v8::Isolate *isolate = args.GetIsolate();
v8::HandleScope handle_scope(isolate);

TcpStream *stream = static_cast<TcpStream*>(args.This()->GetAlignedPointerFromInternalField(0));
if (stream->paused) {
lx_set_read_event(&stream->conn->event, stream->conn->fd);
stream->paused = false;
}
}

/* handlers for event loop */
static void handle_write(lx_write_t *write_op, int status) {
free((void*)write_op->buf);
free(write_op);
}

static void handle_data(lx_connection_t *conn) {
TcpStream *stream = static_cast<TcpStream*>(conn->data);

Expand Down Expand Up @@ -147,7 +183,7 @@ class TcpServer {
v8::Global<v8::Function> callback;
int port;

static const int64_t kDefaultTimeout = 5 * 1000;
static const int64_t kDefaultTimeout = 120 * 1000;
public:
static void initialize(v8::Local<v8::Object> exports, v8::Isolate *isolate, v8::Local<v8::Context> context) {
v8::Local<v8::Function> func = v8::FunctionTemplate::New(isolate, listen)->GetFunction(context).ToLocalChecked();
Expand Down
25 changes: 25 additions & 0 deletions js/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,29 @@ class Socket {
// otherwise - ignore and make it safe to call multiple times
}

pause() {
if (this._handle) {
this._handle.pause();
}
}

resume() {
if (this._handle) {
this._handle.resume();
}
}

setTimeout(delay) {
if (!Number.isInteger(delay)) {
throw new Error('Invalid socket timeout.');
}

if (this._handle) {
this._handle.setTimeout(delay);
return true;
}

return false;
}

write(data) {
Expand All @@ -45,6 +64,12 @@ function toPort(value) {
return port;
}

class Server {
constructor() {
this.connections = 0;
}
}

export const tcpListen = (callback, port) => {
net.tcpListen((handle) => {
const socket = new Socket(handle);
Expand Down
6 changes: 5 additions & 1 deletion sample.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@ console.log(`Url: ${import.meta.url}`);

tcpListen((socket) => {
console.log("Client connected");
socket.pause();
setTimeout(() => {
socket.resume();
}, 10 * 1000)

socket.setTimeout(60 * 1000);
socket.read((chunk) => {
console.log(`Received data`);
const response = `<h1>${chunk}</h1>`; // echo HTTP request
socket.write(`HTTP/1.1 200 OK\r\nContent-Length: ${response.length}\r\nContent-Type: text/html\r\nConnection: close\r\n\r\n`);
socket.write(response);
socket.close();
socket.setTimeout(60 * 1000);
});
}, 8000);

0 comments on commit 40d0054

Please sign in to comment.