Skip to content

Commit

Permalink
feat: properly handle yagns events
Browse files Browse the repository at this point in the history
  • Loading branch information
pociej committed Jun 6, 2024
1 parent 2eee87b commit 01c1385
Show file tree
Hide file tree
Showing 23 changed files with 397 additions and 145 deletions.
2 changes: 1 addition & 1 deletion backend/src/di.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,4 @@ container.cradle.db.then(() => {

container.cradle.fileService.init();

container.cradle.Yagna.observeDebitNoteEvents();
container.cradle.Yagna.observeEvents();
8 changes: 8 additions & 0 deletions backend/src/errors/codes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export enum ErrorCode {
NO_WORKER = "NO_WORKER",
USER_NOT_FOUND = "USER_NOT_FOUND",
ALLOCATION_NOT_FOUND = "ALLOCATION_NOT_FOUND",
AGREEMENT_NOT_FOUND = "AGREEMENT_NOT_FOUND",
ALLOCATION_TOP_UP_FAILED = "ALLOCATION_TOP_UP_FAILED",
}

Expand All @@ -14,6 +15,7 @@ export const errorMessages = {
[ErrorCode.NO_ALLOCATION]: () => "No allocation found",
[ErrorCode.NO_WORKER]: () => "No worker",
[ErrorCode.USER_NOT_FOUND]: () => "User not found",
[ErrorCode.AGREEMENT_NOT_FOUND]: () => "Agreement not found",
[ErrorCode.ALLOCATION_NOT_FOUND]: ({
allocationId,
}: {
Expand All @@ -35,6 +37,12 @@ export type ErrorParams = {
[ErrorCode.ALLOCATION_NOT_FOUND]: {
allocationId: string;
};
[ErrorCode.ALLOCATION_TOP_UP_FAILED]: {
allocationId: string;
};
[ErrorCode.AGREEMENT_NOT_FOUND]: {
agreementId: string;
};
};

type ErrorArgs<T> = {
Expand Down
2 changes: 1 addition & 1 deletion backend/src/services/user/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const schema = new mongoose.Schema<IUser>(
currentAllocationAmount: {
type: Number,
},
currentActivityId: {
currentAgreementId: {
type: String,
},
deposits: [
Expand Down
8 changes: 3 additions & 5 deletions backend/src/services/user/routes.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import fastify, { FastifyInstance, FastifyRequest } from "fastify";
import { FastifyInstance } from "fastify";
import { container } from "../../di.js";
import fastifyPlugin from "fastify-plugin";
import { userModel } from "./model.js";
import { jwtDecode } from "jwt-decode";
import { set } from "mongoose";
export const userService = fastifyPlugin(
(fastify: FastifyInstance, opts, done) => {
fastify.io.of("/me").use((socket, next) => {
Expand All @@ -15,8 +13,6 @@ export const userService = fastifyPlugin(
});

fastify.io.of("/me").on("connection", async (socket) => {
console.log("socket", socket.handshake.auth.token);

const user = jwtDecode<{
_id: string;
}>(socket.handshake.auth.token);
Expand All @@ -38,6 +34,8 @@ export const userService = fastifyPlugin(
socket.emit("user", userDTO);
}, 500);

//TODO: watch for changes but this will need replica set

// userModel
// .watch(
// [
Expand Down
12 changes: 8 additions & 4 deletions backend/src/services/user/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ export const userService: IUserService = {
currentAllocation: {
id: user.currentAllocationId,
},
currentActivity: {
id: user.currentActivityId,
currentAgreement: {
id: user.currentAgreementId,
},
deposits: user.deposits.map((d) => {
return {
Expand All @@ -91,13 +91,17 @@ export const userService: IUserService = {
}),
};
},
setCurrentActivityId: async (userId: string, activityId: string) => {
setCurrentAgreementId: async (userId: string, agreementId: string) => {
await userModel.updateOne(
{ _id: userId },
{ currentActivityId: activityId }
{ currentAgreementId: agreementId }
);
},

terminateCurrentAgreement: async (userId: string) => {
await userModel.updateOne({ _id: userId }, { currentAgreementId: null });
},

invalidateCurrentDeposit: async (userId: string) => {
await userModel
.updateOne(
Expand Down
4 changes: 2 additions & 2 deletions backend/src/services/user/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export interface IUser {
deposits: Deposit[];
currentAllocationId: string;
currentAllocationAmount: number;
currentActivityId: string;
currentAgreementId: string;
}

export interface IUserService {
Expand All @@ -49,6 +49,6 @@ export interface IUserService {
amount: number
): Promise<boolean>;
getUserById(userId: UserIdType): Promise<IUser | null>;
setCurrentActivityId(userId: UserIdType, activityId: string): Promise<void>;
setCurrentAgreementId(userId: UserIdType, agreementId: string): Promise<void>;
getUserDTO(userId: UserIdType): Promise<any>;
}
54 changes: 51 additions & 3 deletions backend/src/services/yagna/routes.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import fastify, { FastifyInstance, FastifyRequest } from "fastify";
import { container } from "../../di.js";
import fastifyPlugin from "fastify-plugin";
import { jwtDecode } from "jwt-decode";
import { merge } from "rxjs";

export const Yagna = fastifyPlugin((fastify: FastifyInstance, opts, done) => {
fastify.post("/allocation", {
Expand Down Expand Up @@ -74,6 +76,21 @@ export const Yagna = fastifyPlugin((fastify: FastifyInstance, opts, done) => {
}
},
});
fastify.get("/agreement", {
onRequest: [fastify.authenticate],
handler: async (request, reply) => {
const requestUser = request.user;
const Yagna = container.cradle.Yagna;
const agreement = await Yagna.getUserAgreement(requestUser._id).catch(
(e) => {
reply.code(500).send({
message: e.message,
});
}
);
reply.code(200).send(agreement);
},
});
fastify.post("/create-agreement", {
onRequest: [fastify.authenticate],
handler: async (request, reply) => {
Expand Down Expand Up @@ -124,12 +141,12 @@ export const Yagna = fastifyPlugin((fastify: FastifyInstance, opts, done) => {
);
const worker = await Yagna.getUserWorker(requestUser._id);
await worker.context?.activity.stop();
if (!user?.currentActivityId) {
if (!user?.currentAgreementId) {
reply.code(500).send({
message: "No agreement found",
});
} else {
container.cradle.userService.setCurrentActivityId(requestUser._id, "");
container.cradle.userService.setCurrentAgreementId(requestUser._id, "");
reply
.code(201)
.send(container.cradle.userService.getUserDTO(requestUser._id));
Expand Down Expand Up @@ -186,5 +203,36 @@ export const Yagna = fastifyPlugin((fastify: FastifyInstance, opts, done) => {
},
});

done();
(["debitNoteEvents", "invoiceEvents", "agreementEvents"] as const).forEach(
(eventType) => {
fastify.io.of(`/${eventType}`).use((socket, next) => {
const token = socket.handshake.auth.token;
if (!token) {
next(new Error("Authentication error"));
}
next();
});
fastify.io.of(`/${eventType}`).on("connection", async (socket) => {
console.log("--------------");
console.log("connected to", eventType);
console.log("--------------");
const user = jwtDecode<{
_id: string;
}>(socket.handshake.auth.token);
if (!user._id) {
throw new Error(`Wrong token`);
}
if (!user) {
throw new Error(
`User not found with id ${socket.handshake.auth.token}`
);
}
const eventStream = await container.cradle.Yagna[`${eventType}`];
eventStream.subscribe((event) => {
socket.emit("event", event);
});
done();
});
}
);
});
Loading

0 comments on commit 01c1385

Please sign in to comment.