Skip to content

Commit

Permalink
feat: update to deno v2
Browse files Browse the repository at this point in the history
  • Loading branch information
ohroy committed Oct 29, 2024
1 parent 63cbcb4 commit 0e0570b
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 74 deletions.
4 changes: 4 additions & 0 deletions deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,9 @@
"codegen:codec": "deno run codegen/generate_codec.ts | deno fmt - > src/amqp_codec.ts",
"codegen:types": "deno run codegen/generate_types.ts | deno fmt - > src/amqp_types.ts",
"codegen:constants": "deno run codegen/generate_constants.ts | deno fmt - > src/amqp_constants.ts"
},
"imports": {
"@std/io": "jsr:@std/io@^0.225.0",
"@std/streams": "jsr:@std/streams@^1.0.7"
}
}
80 changes: 51 additions & 29 deletions deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion deps.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export { Buffer, BufReader, writeAll } from "jsr:@std/io";
export { writeAll, Buffer } from "jsr:@std/io";
17 changes: 8 additions & 9 deletions src/amqp_frame_reader.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
import { FrameError } from "./frame_error.ts";
import { decodeHeader, decodeMethod } from "./amqp_codec.ts";
import type { IncomingFrame } from "./amqp_frame.ts";
import { BufReader } from "../deps.ts";
import type { Reader } from "jsr:@std/io/types";

export class AmqpFrameReader {
#timer: null | number = null;
#reader: BufReader;
#reader: Reader;

constructor(r: Deno.Reader) {
this.#reader = BufReader.create(r);
constructor(r: Reader) {
this.#reader = r;
}

#readBytes = async (length: number): Promise<Uint8Array> => {
const n = await this.#reader.readFull(new Uint8Array(length));
const buf = new Uint8Array(length);
const n = await this.#reader.read(buf);

if (n === null) {
throw new FrameError("EOF");
}

return n;
return buf;
};

#readFrame = async (): Promise<IncomingFrame> => {
Expand Down Expand Up @@ -80,9 +81,7 @@ export class AmqpFrameReader {
}
}

read(
timeout: number,
): Promise<IncomingFrame> {
read(timeout: number): Promise<IncomingFrame> {
this.abort();

if (timeout <= 0) {
Expand Down
4 changes: 2 additions & 2 deletions src/amqp_multiplexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ function createSocketDemux(
emit(frame);
}
} catch (error) {
handleError(error);
handleError(error as Error);
}
}

Expand All @@ -134,7 +134,7 @@ function createSocketDemux(
removeSubscriber(subscriber);
}
} catch (error) {
subscriber.error(error);
subscriber.error(error as Error);
removeSubscriber(subscriber);
}
}
Expand Down
60 changes: 30 additions & 30 deletions src/amqp_socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { encodeHeader, encodeMethod } from "./amqp_codec.ts";
import { AmqpFrameReader } from "./amqp_frame_reader.ts";
import type { IncomingFrame, OutgoingFrame } from "./amqp_frame.ts";
import { writeAll } from "../deps.ts";

import type { Reader,Writer, Closer } from "jsr:@std/io/types";
export interface AmqpSocketWriter {
write(frames: Array<OutgoingFrame>): Promise<void>;
}
Expand Down Expand Up @@ -49,16 +49,7 @@ function encodeFrame(frame: OutgoingFrame): Uint8Array {
return data;
}

const HEARTBEAT_FRAME = new Uint8Array([
8,
0,
0,
0,
0,
0,
0,
206,
]);
const HEARTBEAT_FRAME = new Uint8Array([8, 0, 0, 0, 0, 0, 0, 206]);

function splitArray(arr: Uint8Array, size: number): Uint8Array[] {
const chunks: Uint8Array[] = [];
Expand All @@ -78,16 +69,18 @@ interface AmqpSocketOptions {
frameMax?: number;
}

export class AmqpSocket implements AmqpSocketWriter, AmqpSocketReader, AmqpSocketCloser {
#conn: Deno.Reader & Deno.Writer & Deno.Closer;
export class AmqpSocket
implements AmqpSocketWriter, AmqpSocketReader, AmqpSocketCloser
{
#conn: Reader & Writer & Closer;
#reader: AmqpFrameReader;
#sendTimer: number | null = null;
#sendTimeout = 0;
#readTimeout = 0;
#frameMax = -1;
#guard: Promise<void>;

constructor(conn: Deno.Reader & Deno.Writer & Deno.Closer) {
constructor(conn: Reader & Writer & Closer) {
this.#conn = conn;
this.#reader = new AmqpFrameReader(conn);
this.#guard = Promise.resolve();
Expand All @@ -108,9 +101,16 @@ export class AmqpSocket implements AmqpSocketWriter, AmqpSocketReader, AmqpSocke
};

tune(options: AmqpSocketOptions) {
this.#readTimeout = options.readTimeout !== undefined ? options.readTimeout : this.#readTimeout;
this.#sendTimeout = options.sendTimeout !== undefined ? options.sendTimeout : this.#sendTimeout;
this.#frameMax = options.frameMax !== undefined ? options.frameMax : this.#frameMax;
this.#readTimeout =
options.readTimeout !== undefined
? options.readTimeout
: this.#readTimeout;
this.#sendTimeout =
options.sendTimeout !== undefined
? options.sendTimeout
: this.#sendTimeout;
this.#frameMax =
options.frameMax !== undefined ? options.frameMax : this.#frameMax;
this.#resetSendTimer();
}

Expand All @@ -124,23 +124,23 @@ export class AmqpSocket implements AmqpSocketWriter, AmqpSocketReader, AmqpSocke
this.#resetSendTimer();
for (const frame of frames) {
if (frame.type === "content") {
const chunks = this.#frameMax > 8 && frame.payload.length > this.#frameMax - 8
? splitArray(
frame.payload,
this.#frameMax - 8,
).map((chunk) =>
encodeFrame({
type: "content",
channel: frame.channel,
payload: chunk,
})
)
: [encodeFrame(frame)];
const chunks =
this.#frameMax > 8 && frame.payload.length > this.#frameMax - 8
? splitArray(frame.payload, this.#frameMax - 8).map((chunk) =>
encodeFrame({
type: "content",
channel: frame.channel,
payload: chunk,
}),
)
: [encodeFrame(frame)];
for (const chunk of chunks) {
this.#guard = this.#guard.then(() => writeAll(this.#conn, chunk));
}
} else {
this.#guard = this.#guard.then(() => writeAll(this.#conn, encodeFrame(frame)));
this.#guard = this.#guard.then(() =>
writeAll(this.#conn, encodeFrame(frame)),
);
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/amqp_socket_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ import { arrayOf, assertEquals, assertRejects, test } from "./testing.ts";
import { mock } from "./mock.ts";
import { FrameError } from "./frame_error.ts";
import { createResolvable } from "./resolvable.ts";
import type { Reader,Writer, Closer } from "jsr:@std/io/types";

function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}

function createConn() {
return mock.obj<Deno.Reader & Deno.Writer & Deno.Closer>({
return mock.obj<Reader & Writer & Closer>({
read: mock.fn(),
write: mock.fn(async () => {}),
close: mock.fn(() => {}),
Expand Down
6 changes: 4 additions & 2 deletions src/encoding/utils.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import type { Reader, ReaderSync } from "@std/io/types";

export function splitArray<T>(arr: T[], size: number): T[][] {
const chunks: T[][] = [];
let index = 0;
Expand All @@ -22,7 +24,7 @@ export function assertLength(arr: Uint8Array, length: number) {
}

export async function readBytes(
r: Deno.Reader,
r: Reader,
length: number,
): Promise<Uint8Array | null> {
const data = new Uint8Array(length);
Expand All @@ -39,7 +41,7 @@ export async function readBytes(
return data;
}

export function readBytesSync(r: Deno.ReaderSync, length: number): Uint8Array {
export function readBytesSync(r: ReaderSync, length: number): Uint8Array {
const data = new Uint8Array(length);
const result = r.readSync(data);

Expand Down

0 comments on commit 0e0570b

Please sign in to comment.