Skip to content

Commit

Permalink
File: subprocess seek
Browse files Browse the repository at this point in the history
  • Loading branch information
tyt2y3 committed Sep 20, 2023
1 parent e3169ea commit 13b7fe0
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 18 deletions.
98 changes: 80 additions & 18 deletions sea-streamer-file/sea-streamer-file-reader/src/subprocess.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import { FileErr } from "./error";
import { MessageSource, isEndOfStream } from "./message";
import { StreamMode } from "./types";
import { Message } from "./format";
import { PULSE_MESSAGE, SEA_STREAMER_INTERNAL, SeqNo, SeqPos, ShardId, StreamKey, StreamMode } from "./types";
import { Message, MessageHeader } from "./format";
import { Buffer } from "./buffer";
import { Buffer as SystemBuffer } from "node:buffer";

export interface CtrlMsg {
cmd: "open" | "more" | "exit";
cmd: "open" | "more" | "seek" | "exit";
/** file path */
path?: string;
/** n-th beacon */
nth?: number;
}

export interface IpcMessage {
Expand All @@ -19,21 +24,47 @@ export interface StatusUpdate {
readUpTo: bigint;
}

enum State {
Init,
Running,
PreSeek,
Seeking,
}

let sleepFor = 1;
let quota = 10000;
let global: {
error: boolean;
state: State;
source: MessageSource | undefined;
} = {
error: false,
state: State.Init,
source: undefined,
};

process.on("message", (msg) => onMessage(msg as CtrlMsg));

function onMessage(ctrl: CtrlMsg) {
if (ctrl.cmd === "open") {
open(ctrl.path!);
open(ctrl.path!).then(run);
} else if (ctrl.cmd === "more") {
sleepFor = 1;
quota = 10000;
} else if (ctrl.cmd === "seek") {
if (global.state === State.Running) {
seek(ctrl.nth!).then(run);
} else {
process.send!({ error: "Not seekable" }); global.error = true; return;
}
} else if (ctrl.cmd === "exit") {
process.exit(0);
if (global.error) {
process.exit(1);
} else {
process.exit(0);
}
} else {
process.send!({ error: "Unknown cmd." }); process.exit(1);
process.send!({ error: "Unknown cmd." }); global.error = true; return;
}
}

Expand All @@ -45,51 +76,82 @@ async function open(path: string) {
let source;
try {
source = await MessageSource.new(path, StreamMode.LiveReplay);
if (source instanceof FileErr) { process.send!({ error: "Failed to read file header" }); process.exit(1); }
if (source instanceof FileErr) { process.send!({ error: "Failed to read file header" }); global.error = true; return; }
global.source = source;
} catch (e) {
process.send!({ error: `Failed to open file: ${e}` }); process.exit(1);
process.send!({ error: `Failed to open file: ${e}` }); global.error = true; return;
}
run(source);
}

async function run(source: MessageSource) {
async function run() {
if (global.error) {
return;
}
global.state = State.Running as State;
const source = global.source!;
const batchSize = 100;
const buffer = [];
let ended = false;

while (!ended) {
if (quota === 0) {
if (quota <= 0) {
await sleep(sleepFor);
if (sleepFor < 1024) {
sleepFor <<= 1;
}
continue;
}
if (global.state as State === State.PreSeek) { global.state = State.Seeking as State; return; }
for (let i = 0; i < batchSize; i++) {
const message = await source.next();
if (message instanceof FileErr) { process.send!({ error: message.toString() }); process.exit(1); }
if (message instanceof FileErr) { process.send!({ error: message.toString() }); global.error = true; return; }
buffer.push(message);
if (isEndOfStream(message)) {
ended = true;
break;
}
}
if (global.state as State === State.PreSeek) { global.state = State.Seeking as State; return; }
process.send!({ messages: buffer, status: getStatus() });
quota -= buffer.length;
buffer.length = 0;
}

process.send!({ messages: buffer, status: getStatus() });
await source.close();
}

function getStatus(): StatusUpdate {
return {
fileSize: source.knownSize(),
readFrom: source.getReadFrom(),
readUpTo: source.getOffset(),
};
async function seek(nth: number) {
if (global.error) {
return;
}
global.state = State.PreSeek as State;
while (global.state === State.PreSeek) { await sleep(1); }
if (global.state === State.Seeking) {
const source = global.source!;
await source.rewind(new SeqPos.At(BigInt(nth)));
const payload = new Buffer();
payload.append(SystemBuffer.from(PULSE_MESSAGE));
const pulse = new Message(new MessageHeader(
new StreamKey(SEA_STREAMER_INTERNAL),
new ShardId(0n),
new SeqNo(0n),
new Date(),
), payload);
process.send!({ messages: [pulse], status: getStatus() });
} else {
process.send!({ error: "Not seeking?" }); global.error = true; return;
}
}

function getStatus(): StatusUpdate {
return {
fileSize: global.source!.knownSize(),
readFrom: global.source!.getReadFrom(),
readUpTo: global.source!.getOffset(),
};
}

function sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
1 change: 1 addition & 0 deletions sea-streamer-file/sea-streamer-file-reader/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export const SEA_STREAMER_INTERNAL: string = "SEA_STREAMER_INTERNAL";
export const PULSE_MESSAGE: string = "PULSE";
export const EOS_MESSAGE_SIZE: bigint = 56n;

export type Timestamp = Date;
Expand Down

0 comments on commit 13b7fe0

Please sign in to comment.