Skip to content

Commit

Permalink
refactor(WIP): use file storage blob storage instead of blob file man…
Browse files Browse the repository at this point in the history
…ager
  • Loading branch information
PJColombo committed Apr 27, 2024
1 parent c4bf9c1 commit 5f68f96
Show file tree
Hide file tree
Showing 14 changed files with 205 additions and 124 deletions.
3 changes: 2 additions & 1 deletion packages/api/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import type {
} from "@trpc/server/adapters/node-http";
import jwt from "jsonwebtoken";

import { blobPropagator } from "@blobscan/blob-propagator";
import { getBlobPropagator } from "@blobscan/blob-propagator";
import { getBlobStorageManager } from "@blobscan/blob-storage-manager";
import { prisma } from "@blobscan/db";

Expand Down Expand Up @@ -50,6 +50,7 @@ function getJWTFromRequest(

export async function createTRPCInnerContext(opts?: CreateInnerContextOptions) {
const blobStorageManager = await getBlobStorageManager();
const blobPropagator = await getBlobPropagator();

return {
prisma,
Expand Down
7 changes: 6 additions & 1 deletion packages/api/test/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,12 @@ export async function createTestContext({
});

if (withBlobPropagator) {
ctx.blobPropagator = createBlobPropagator();
ctx.blobPropagator = await createBlobPropagator(
ctx.blobStorageManager,
ctx.prisma
);
} else {
ctx.blobPropagator = undefined;
}

return ctx;
Expand Down
141 changes: 91 additions & 50 deletions packages/blob-propagator/src/BlobPropagator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,19 @@
import { FlowProducer, Queue, Worker } from "bullmq";
import type { ConnectionOptions, FlowJob, WorkerOptions } from "bullmq";

import type { $Enums } from "@blobscan/db";
import type { BlobStorage } from "@blobscan/db";
import type {
BlobStorage,
BlobStorageManager,
} from "@blobscan/blob-storage-manager";
import type { $Enums, BlobscanPrismaClient } from "@blobscan/db";
import { logger } from "@blobscan/logger";

import { blobFileManager } from "./blob-file-manager";
import type { Blob, BlobPropagationJobData } from "./types";
import type {
Blob,
BlobPropagationJobData,
BlobPropagationWorkerParams,
} from "./types";
import {
FINALIZER_WORKER_NAME,
STORAGE_WORKER_NAMES,
Expand All @@ -28,9 +35,12 @@ const STORAGE_WORKER_PROCESSORS = {
FILE_SYSTEM: fileSystemProcessor,
};

export type BlobPropagatorOptions = Partial<{
workerOptions: WorkerOptions;
}>;
export type BlobPropagatorConfig = {
blobStorageManager: BlobStorageManager;
tmpBlobStorage: $Enums.BlobStorage;
prisma: BlobscanPrismaClient;
workerOptions: Partial<WorkerOptions>;
};

const DEFAULT_WORKER_OPTIONS: WorkerOptions = {
autorun: true,
Expand All @@ -39,25 +49,58 @@ const DEFAULT_WORKER_OPTIONS: WorkerOptions = {
};

export class BlobPropagator {
protected blobStorageManager: BlobStorageManager;
protected prisma: BlobscanPrismaClient;
protected temporaryBlobStorage: BlobStorage;

protected blobPropagationFlowProducer: FlowProducer;
protected finalizerWorker: Worker;
protected storageWorkers: Record<
$Enums.BlobStorage,
Worker<BlobPropagationJobData>
>;

constructor(
storages: $Enums.BlobStorage[],
{ workerOptions }: BlobPropagatorOptions = {}
) {
if (!storages.length) {
constructor({
blobStorageManager,
prisma,
tmpBlobStorage,
workerOptions,
}: BlobPropagatorConfig) {
const availableStorageNames = blobStorageManager
.getAllStorages()
.map((s) => s.name);

if (!availableStorageNames) {
throw new Error(
"Couldn't instantiate blob propagator: No storages available"
);
}

if (!availableStorageNames.find((s) => s === tmpBlobStorage)) {
throw new Error(
"Couldn't instantiate blob propagator: no storages given"
"Couldn't instantiate blob propagator: Temporary storage not found in blob storage manager"
);
}

this.blobStorageManager = blobStorageManager;
this.prisma = prisma;

const temporaryBlobStorage = blobStorageManager.getStorage(tmpBlobStorage);

if (!temporaryBlobStorage) {
throw new Error(
"Couldn't instantiate blob propagator: Temporary storage not found in blob storage manager"
);
}

this.temporaryBlobStorage = temporaryBlobStorage;

this.storageWorkers = this.#createBlobStorageWorkers(
storages,
availableStorageNames,
{
blobStorageManager,
prisma,
},
workerOptions
);
this.finalizerWorker = this.#createFinalizerWorker(workerOptions);
Expand Down Expand Up @@ -111,14 +154,12 @@ export class BlobPropagator {
});
}

async propagateBlob(blob: Blob) {
await blobFileManager.createFile(blob);
async propagateBlob({ versionedHash, data }: Blob) {
await this.temporaryBlobStorage.storeBlob(versionedHash, data);

const blobFlowJob = await this.#createBlobPropagationFlowJob({
versionedHash: blob.versionedHash,
});
const flowJob = await this.#createBlobPropagationFlowJob(versionedHash);

await this.blobPropagationFlowProducer.add(blobFlowJob);
await this.blobPropagationFlowProducer.add(flowJob);
}

async propagateBlobs(blobs: Blob[]) {
Expand All @@ -132,11 +173,13 @@ export class BlobPropagator {
});

await Promise.all(
uniqueBlobs.map((blob) => blobFileManager.createFile(blob))
uniqueBlobs.map(({ versionedHash, data }) =>
this.temporaryBlobStorage.storeBlob(versionedHash, data)
)
);

const blobPropagationFlowJobs = uniqueBlobs.map(({ versionedHash }) =>
this.#createBlobPropagationFlowJob({ versionedHash })
this.#createBlobPropagationFlowJob(versionedHash)
);

await this.blobPropagationFlowProducer.addBulk(blobPropagationFlowJobs);
Expand Down Expand Up @@ -189,39 +232,37 @@ export class BlobPropagator {

#createBlobStorageWorkers(
storages: $Enums.BlobStorage[],
params: BlobPropagationWorkerParams,
opts: WorkerOptions = {}
) {
return storages.reduce<Record<BlobStorage, Worker<BlobPropagationJobData>>>(
(workers, storageName) => {
const workerName = STORAGE_WORKER_NAMES[storageName];
const storageWorker = new Worker<BlobPropagationJobData>(
workerName,
STORAGE_WORKER_PROCESSORS[storageName],
{
...DEFAULT_WORKER_OPTIONS,
...opts,
}
);

storageWorker.on("completed", (job) => {
logger.debug(`Job ${job.id} completed by ${workerName}`);
});

storageWorker.on("failed", (job, err) => {
logger.error(`Job ${job?.id} failed: ${err} (worker: ${workerName})`);
});

workers[storageName] = storageWorker;

return workers;
},
{} as Record<BlobStorage, Worker<BlobPropagationJobData>>
);
}
return storages.reduce<
Record<$Enums.BlobStorage, Worker<BlobPropagationJobData>>
>((workers, storageName) => {
const workerName = STORAGE_WORKER_NAMES[storageName];
const storageWorker = new Worker<BlobPropagationJobData>(
workerName,
STORAGE_WORKER_PROCESSORS[storageName](params),
{
...DEFAULT_WORKER_OPTIONS,
...opts,
}
);

storageWorker.on("completed", (job) => {
logger.debug(`Job ${job.id} completed by ${workerName}`);
});

#createBlobPropagationFlowJob(data: BlobPropagationJobData): FlowJob {
const versionedHash = data.versionedHash;
storageWorker.on("failed", (job, err) => {
logger.error(`Job ${job?.id} failed: ${err} (worker: ${workerName})`);
});

workers[storageName] = storageWorker;

return workers;
}, {} as Record<$Enums.BlobStorage, Worker<BlobPropagationJobData>>);
}

#createBlobPropagationFlowJob(versionedHash: string): FlowJob {
const storageWorkerNames = Object.values(this.storageWorkers).map(
(storageWorker) => storageWorker.name
);
Expand Down
49 changes: 29 additions & 20 deletions packages/blob-propagator/src/blob-propagator.ts
Original file line number Diff line number Diff line change
@@ -1,34 +1,43 @@
import IORedis from "ioredis";

import { BlobStorage } from "@blobscan/db";
import { getBlobStorageManager } from "@blobscan/blob-storage-manager";
import type { BlobStorageManager } from "@blobscan/blob-storage-manager";
import { prisma } from "@blobscan/db";
import type { BlobscanPrismaClient } from "@blobscan/db";

import { BlobPropagator } from "./BlobPropagator";
import { env } from "./env";

function createBlobPropagator() {
const availableStorages: BlobStorage[] = [];

if (env.GOOGLE_STORAGE_ENABLED) {
availableStorages.push(BlobStorage.GOOGLE);
}

if (env.POSTGRES_STORAGE_ENABLED) {
availableStorages.push(BlobStorage.POSTGRES);
}

if (env.SWARM_STORAGE_ENABLED) {
availableStorages.push(BlobStorage.SWARM);
}

async function createBlobPropagator(
blobStorageManager: BlobStorageManager,
prisma: BlobscanPrismaClient
) {
const connection = new IORedis(env.REDIS_URI, { maxRetriesPerRequest: null });

return new BlobPropagator(availableStorages, {
return new BlobPropagator({
blobStorageManager,
prisma,
tmpBlobStorage: env.BLOB_PROPAGATOR_TMP_STORAGE,
workerOptions: {
connection,
},
});
}
const blobPropagator =
env.BLOB_PROPAGATOR_ENABLED === true ? createBlobPropagator() : undefined;

export { blobPropagator, createBlobPropagator };
let blobPropagator: BlobPropagator | undefined;

async function getBlobPropagator() {
if (!env.BLOB_PROPAGATOR_ENABLED) {
return;
}

if (!blobPropagator) {
const blobStorageManager = await getBlobStorageManager();

blobPropagator = await createBlobPropagator(blobStorageManager, prisma);
}

return blobPropagator;
}

export { getBlobPropagator, createBlobPropagator };
10 changes: 9 additions & 1 deletion packages/blob-propagator/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ export const env = createEnv({
SWARM_STORAGE_ENABLED: booleanSchema.default("false"),

BLOB_PROPAGATOR_ENABLED: booleanSchema.default("false"),
BLOB_PROPAGATOR_TMP_STORAGE: z.enum([
"FILE_SYSTEM",
"GOOGLE",
"POSTGRES",
"SWARM",
] as const),
TEST: booleanSchema.optional(),
},

Expand All @@ -25,7 +31,9 @@ export const env = createEnv({
console.log(
`Blob propagator configuration: enabled=${
env.BLOB_PROPAGATOR_ENABLED
} redisUri=${maskPassword(env.REDIS_URI)}`
} redisUri=${maskPassword(env.REDIS_URI)} tmpStorage=${
env.BLOB_PROPAGATOR_TMP_STORAGE
}`
);
},
});
Expand Down
2 changes: 1 addition & 1 deletion packages/blob-propagator/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ export type { EnvVars as Environment } from "./env";

export { env } from "./env";
export { BlobPropagator } from "./BlobPropagator";
export { blobPropagator } from "./blob-propagator";
export { getBlobPropagator } from "./blob-propagator";
export * from "./types";
export {
createBlobPropagationFlowJob,
Expand Down
19 changes: 14 additions & 5 deletions packages/blob-propagator/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import type { Job, Processor, Queue, Worker } from "bullmq";

import type { BlobReference } from "@blobscan/blob-storage-manager";
import type {
BlobReference,
BlobStorageManager,
} from "@blobscan/blob-storage-manager";
import { BlobscanPrismaClient } from "@blobscan/db";

export type Blob = {
versionedHash: string;
Expand All @@ -9,14 +13,19 @@ export type Blob = {

export type BlobPropagationJobData = {
versionedHash: string;
tmpBlobStorageDataRef: BlobReference;
};

export type BlobPropagationJob = Job<BlobPropagationJobData>;

export type BlobPropagationWorkerProcessor = Processor<
BlobPropagationJobData,
BlobReference
>;
export type BlobPropagationWorkerParams = {
blobStorageManager: BlobStorageManager;
prisma: BlobscanPrismaClient;
};

export type BlobPropagationWorkerProcessor = (
params: BlobPropagationWorkerParams
) => Processor<BlobPropagationJobData, BlobReference>;

export type BlobPropagationWorker = Worker<BlobPropagationJobData>;

Expand Down
Loading

0 comments on commit 5f68f96

Please sign in to comment.