Skip to content

Commit

Permalink
Добавит потоки
Browse files Browse the repository at this point in the history
  • Loading branch information
RnizereB committed Oct 13, 2024
1 parent 3173111 commit 061a2a1
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 23 deletions.
15 changes: 13 additions & 2 deletions src/cli/commands/import.command.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,29 @@
import { TSVFileReader } from '../../shared/libs/file-reader/tsv-file-reader.js';
import { Offer } from '../../shared/types/offer-type.js';
import { Command } from './command.interface.js';

export class ImportCommand implements Command {
public getName(): string {
return '--import';
}

public execute(...parameters: string[]): void {
private onImportedOffer(offer: Offer): void {
console.info(offer);
}

private onCompleteImport(count: number) {
console.info(`${count} rows imported`);
}

public async execute(...parameters: string[]): Promise<void> {
const [filename] = parameters;
const fileReader = new TSVFileReader(filename.trim());

fileReader.on('line', this.onImportedOffer);
fileReader.once('end', this.onCompleteImport);

try {
fileReader.read();
console.log(fileReader.toArray());
} catch (err) {
if (!(err instanceof Error)) {
throw err;
Expand Down
51 changes: 30 additions & 21 deletions src/shared/libs/file-reader/tsv-file-reader.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
import { readFileSync } from 'node:fs';
import EventEmitter from 'node:events';
import { createReadStream } from 'node:fs';

import { FileReader } from './index.js';
import { Offer, TypeUser } from '../../types/index.js';
export class TSVFileReader implements FileReader {
private rawData = '';
export class TSVFileReader extends EventEmitter implements FileReader {
private CHUNK_SIZE = 16384;

constructor(
private readonly filename: string
) {}

private parseRawDataToOffers(): Offer[] {
return this.rawData
.split('\n')
.filter((row) => row.trim().length > 0)
.map((line) => this.parseLineToOffer(line));
) {
super();
}

private parseLineToOffer(line: string): Offer {
Expand Down Expand Up @@ -80,18 +77,30 @@ export class TSVFileReader implements FileReader {
return {name, email, avatarUser, password, typeUser:TypeUser[typeUser as 'Pro' | 'Usual']};
}

private validateRawData(): void {
if (! this.rawData) {
throw new Error('File was not read');
}
}
public async read(): Promise<void> {
const readStream = createReadStream(this.filename, {
highWaterMark: this.CHUNK_SIZE,
encoding: 'utf-8'
});

public read(): void {
this.rawData = readFileSync(this.filename, 'utf-8');
}
let remainingData = '';
let nextLinePosititon = -1;
let importedRowCouint = 0;

public toArray(): Offer[] {
this.validateRawData();
return this.parseRawDataToOffers();
for await (const chunk of readStream) {
remainingData += chunk.toString();

while((nextLinePosititon = remainingData.indexOf('\n')) >= 0) {
const completeRow = remainingData.slice(0, nextLinePosititon + 1);
remainingData = remainingData.slice(++nextLinePosititon);
importedRowCouint++;

const parsedOffer = this.parseLineToOffer(completeRow);
this.emit('line', parsedOffer);
}
}

this.emit('end', importedRowCouint);
}
}

3 changes: 3 additions & 0 deletions src/shared/libs/file-writer/file-writer.interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export interface FileWriter {
write(row: string): void;
}
1 change: 1 addition & 0 deletions src/shared/libs/file-writer/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export { FileWriter } from './file-writer.interface.js';
28 changes: 28 additions & 0 deletions src/shared/libs/file-writer/tsv-file-wtiter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { FileWriter } from './index.js';
import { WriteStream } from 'node:fs';
import { createWriteStream } from 'node:fs';

export class TSVFileWriter implements FileWriter {

private stream: WriteStream;

constructor(filename: string) {
this.stream = createWriteStream(filename, {
flags: 'w',
encoding: 'utf-8',
autoClose: true
});
}

public async write(row: string): Promise<unknown> {
const writeSuccess = this.stream.write(`${row}\n`);

if (! writeSuccess) {
return new Promise((resolve) => {
this.stream.once('drain', () => resolve(true));
});
}

return Promise.resolve();
}
}

0 comments on commit 061a2a1

Please sign in to comment.