Skip to content

Commit

Permalink
feat: notify about successful tcp writes
Browse files Browse the repository at this point in the history
  • Loading branch information
michaldziuba03 committed Oct 29, 2024
1 parent ea2f65f commit 2230609
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 32 deletions.
4 changes: 2 additions & 2 deletions core/js_internals.h

Large diffs are not rendered by default.

25 changes: 21 additions & 4 deletions core/tcp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <buffer.h>
#include <cstring>
#include <errors.h>
#include <iostream>
#include <memory>
#include <writer.h>

Expand Down Expand Up @@ -167,14 +168,30 @@ void TcpStream::write(const v8::FunctionCallbackInfo<v8::Value> &args) {
}

auto ui = args[0].As<v8::Uint8Array>();
char *data = Buffer::getBytes(ui);
size_t size = Buffer::getSize(ui);
Writer *writer = new Writer(data, size);
writer->setBufferView(isolate, ui);
auto *writer = new Writer<TcpStream>(stream, ui, TcpStream::onWrite);

pd_tcp_write(&stream->handle, &writer->op);
}

// callback handlers:
void TcpStream::onWrite(TcpStream *stream, int status, size_t written) {
Pand *pand = Pand::get();
v8::Isolate *isolate = pand->isolate;
v8::HandleScope handle_scope(isolate);

v8::Local<v8::Object> obj = stream->obj.Get(isolate);

if (status < 0) {
auto err = Pand::makeSystemError(isolate, status);
v8::Local<v8::Value> argv[1] = {err};
Pand::makeCallback(obj, isolate, "onError", argv, 1);
return;
}

v8::Local<v8::Value> argv[1] = {Pand::integer(isolate, written)};
Pand::makeCallback(obj, isolate, "onWrite", argv, 1);
}

void TcpStream::onConnect(pd_tcp_t *handle, int status) {
Pand *pand = Pand::get();
TcpStream *stream = static_cast<TcpStream *>(handle->data);
Expand Down
2 changes: 1 addition & 1 deletion core/tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class TcpStream {

static void onData(pd_tcp_t *, char *, size_t);

static void onWrite(pd_write_t *, int);
static void onWrite(TcpStream *, int, size_t);

static void onClose(pd_tcp_t *);
};
Expand Down
33 changes: 20 additions & 13 deletions core/writer.h
Original file line number Diff line number Diff line change
@@ -1,31 +1,38 @@
#pragma once
#include <buffer.h>
#include <iostream>
#include <pandio.h>
#include <v8.h>

namespace pand::core {
/* Writer is mechanism to make async writes with buffers managed by v8 GC without
being garbage collected before write */
struct Writer {
/* Writer is mechanism to make async writes with buffers managed by v8 GC
without being garbage collected before write */
template <typename TStream> struct Writer {
TStream *stream;
void (*onWrite)(TStream *, int, size_t);
pd_write_t op;
v8::Persistent<v8::Value> value;

Writer(char *buf, size_t len) {
Writer(TStream *stream, v8::Local<v8::Uint8Array> data,
void (*onWrite)(TStream *, int, size_t)) {
value.Reset(data->GetIsolate(), data);
char *buf = Buffer::getBytes(data);
size_t len = Buffer::getSize(data);
pd_write_init(&op, buf, len, Writer::afterWrite);
this->stream = stream;
this->onWrite = onWrite;
op.udata = this;
}

~Writer() { value.Reset(); }

void setArrayBuffer(v8::Isolate *isolate, v8::Local<v8::ArrayBuffer> ab) {
value.Reset(isolate, ab);
}

void setBufferView(v8::Isolate *isolate, v8::Local<v8::Uint8Array> ui) {
value.Reset(isolate, ui);
}

static void afterWrite(pd_write_t *op, int written) {
static void afterWrite(pd_write_t *op, int status) {
Writer *writer = static_cast<Writer *>(op->udata);
if (writer->onWrite) {
// pandio notifies only when full buffer is written (or when we get error)
writer->onWrite(writer->stream, status, op->data.len);
}

delete writer;
}
};
Expand Down
18 changes: 12 additions & 6 deletions js/net.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const { TcpStream, TcpServer } = Runtime.bind('tcp');
const { TcpStream, TcpServer } = Runtime.bind("tcp");

const kEmpty = 0;
const kConnecting = 1;
Expand All @@ -21,6 +21,7 @@ export class Socket {
this.#handle.onError = this.#onError.bind(this);
this.#handle.onData = this.#onData.bind(this);
this.#handle.onClose = this.#onClose.bind(this);
this.#handle.onWrite = this.#onWrite.bind(this);
}

get connecting() {
Expand All @@ -45,10 +46,14 @@ export class Socket {
}
}

#onWrite(size) {
// will be usefull when we will implement backpressure
}

#onClose() {
if (this.#queue.length > 0) {
const promise = this.#queue.shift();
promise.reject(new Error('Connection closed by peer'));
promise.reject(new Error("Connection closed by peer"));
}
this.#handle = null;
this.#state = kDestroyed;
Expand Down Expand Up @@ -114,15 +119,16 @@ export class Socket {
}

let buf;
if (typeof chunk === 'string') {
if (typeof chunk === "string") {
buf = Buffer.from(chunk, encoding);
}
else if (chunk instanceof Uint8Array) {
} else if (chunk instanceof Uint8Array) {
buf = chunk;
}

if (!buf) {
throw new TypeError('Write buffer must be a string, <Uint8Array> or <Buffer>');
throw new TypeError(
"Write buffer must be a string, <Uint8Array> or <Buffer>"
);
}

this.#handle.write(buf);
Expand Down
20 changes: 14 additions & 6 deletions js/uuid.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
"use strict";

const chars = "0123456789abcdef"

/** @param {number} byte */
function hex(byte) {
const upper = (byte & 0xf0) >> 4;
const lower = byte & 0xf;

return chars[upper] + chars[lower];
}

export function uuidv4() {
const bytes = Buffer.random(16);

bytes[6] = (bytes[6] & 0x0f) | 0x40;
bytes[8] = (bytes[8] & 0x3f) | 0x80;

/** @param {Uint8Array} bytes */
function toUUID(bytes) {
return (
hex(bytes[0]) +
hex(bytes[1]) +
Expand All @@ -36,3 +35,12 @@ export function uuidv4() {
hex(bytes[15])
);
}

export function uuidv4() {
const bytes = Buffer.random(16);

bytes[6] = (bytes[6] & 0x0f) | 0x40;
bytes[8] = (bytes[8] & 0x3f) | 0x80;

return toUUID(bytes);
}

0 comments on commit 2230609

Please sign in to comment.