From 01c138594402fdca205d8bd0f52871dd043edcd7 Mon Sep 17 00:00:00 2001 From: pociej Date: Thu, 6 Jun 2024 08:16:32 +0200 Subject: [PATCH] feat: properly handle yagns events #62 --- backend/src/di.ts | 2 +- backend/src/errors/codes.ts | 8 + backend/src/services/user/model.ts | 2 +- backend/src/services/user/routes.ts | 8 +- backend/src/services/user/service.ts | 12 +- backend/src/services/user/types.ts | 4 +- backend/src/services/yagna/routes.ts | 54 +++++- backend/src/services/yagna/service.ts | 164 +++++++++++++----- backend/src/socket.io.ts | 1 + frontend/package.json | 2 + .../src/components/atoms/etherscanLink.tsx | 11 +- .../src/components/homePage/events/event.tsx | 48 +++++ .../src/components/homePage/events/events.tsx | 10 +- .../homePage/statusSections/allocation.tsx | 6 +- .../src/components/providers/userProvider.tsx | 8 +- .../{alloctionLink.tsx => shortLink.tsx} | 14 +- .../src/hooks/depositContract/useDeposit.ts | 1 - frontend/src/hooks/events/useEvents.ts | 88 +++------- frontend/src/hooks/events/useYagnaEvents.tsx | 73 ++++++++ frontend/src/types/events.ts | 13 +- frontend/src/types/golemResource.ts | 5 + pnpm-lock.yaml | 6 + ya-ts-client | 2 +- 23 files changed, 397 insertions(+), 145 deletions(-) rename frontend/src/components/{alloctionLink.tsx => shortLink.tsx} (68%) create mode 100644 frontend/src/hooks/events/useYagnaEvents.tsx create mode 100644 frontend/src/types/golemResource.ts diff --git a/backend/src/di.ts b/backend/src/di.ts index 0d4b341..8dd4ea7 100644 --- a/backend/src/di.ts +++ b/backend/src/di.ts @@ -85,4 +85,4 @@ container.cradle.db.then(() => { container.cradle.fileService.init(); -container.cradle.Yagna.observeDebitNoteEvents(); +container.cradle.Yagna.observeEvents(); diff --git a/backend/src/errors/codes.ts b/backend/src/errors/codes.ts index c29a31d..a73516d 100644 --- a/backend/src/errors/codes.ts +++ b/backend/src/errors/codes.ts @@ -5,6 +5,7 @@ export enum ErrorCode { NO_WORKER = "NO_WORKER", USER_NOT_FOUND = "USER_NOT_FOUND", ALLOCATION_NOT_FOUND = "ALLOCATION_NOT_FOUND", + AGREEMENT_NOT_FOUND = "AGREEMENT_NOT_FOUND", ALLOCATION_TOP_UP_FAILED = "ALLOCATION_TOP_UP_FAILED", } @@ -14,6 +15,7 @@ export const errorMessages = { [ErrorCode.NO_ALLOCATION]: () => "No allocation found", [ErrorCode.NO_WORKER]: () => "No worker", [ErrorCode.USER_NOT_FOUND]: () => "User not found", + [ErrorCode.AGREEMENT_NOT_FOUND]: () => "Agreement not found", [ErrorCode.ALLOCATION_NOT_FOUND]: ({ allocationId, }: { @@ -35,6 +37,12 @@ export type ErrorParams = { [ErrorCode.ALLOCATION_NOT_FOUND]: { allocationId: string; }; + [ErrorCode.ALLOCATION_TOP_UP_FAILED]: { + allocationId: string; + }; + [ErrorCode.AGREEMENT_NOT_FOUND]: { + agreementId: string; + }; }; type ErrorArgs = { diff --git a/backend/src/services/user/model.ts b/backend/src/services/user/model.ts index 3473653..99be000 100644 --- a/backend/src/services/user/model.ts +++ b/backend/src/services/user/model.ts @@ -19,7 +19,7 @@ const schema = new mongoose.Schema( currentAllocationAmount: { type: Number, }, - currentActivityId: { + currentAgreementId: { type: String, }, deposits: [ diff --git a/backend/src/services/user/routes.ts b/backend/src/services/user/routes.ts index 49a6910..a01e12d 100644 --- a/backend/src/services/user/routes.ts +++ b/backend/src/services/user/routes.ts @@ -1,9 +1,7 @@ -import fastify, { FastifyInstance, FastifyRequest } from "fastify"; +import { FastifyInstance } from "fastify"; import { container } from "../../di.js"; import fastifyPlugin from "fastify-plugin"; -import { userModel } from "./model.js"; import { jwtDecode } from "jwt-decode"; -import { set } from "mongoose"; export const userService = fastifyPlugin( (fastify: FastifyInstance, opts, done) => { fastify.io.of("/me").use((socket, next) => { @@ -15,8 +13,6 @@ export const userService = fastifyPlugin( }); fastify.io.of("/me").on("connection", async (socket) => { - console.log("socket", socket.handshake.auth.token); - const user = jwtDecode<{ _id: string; }>(socket.handshake.auth.token); @@ -38,6 +34,8 @@ export const userService = fastifyPlugin( socket.emit("user", userDTO); }, 500); + //TODO: watch for changes but this will need replica set + // userModel // .watch( // [ diff --git a/backend/src/services/user/service.ts b/backend/src/services/user/service.ts index 5d1554b..cdae251 100644 --- a/backend/src/services/user/service.ts +++ b/backend/src/services/user/service.ts @@ -78,8 +78,8 @@ export const userService: IUserService = { currentAllocation: { id: user.currentAllocationId, }, - currentActivity: { - id: user.currentActivityId, + currentAgreement: { + id: user.currentAgreementId, }, deposits: user.deposits.map((d) => { return { @@ -91,13 +91,17 @@ export const userService: IUserService = { }), }; }, - setCurrentActivityId: async (userId: string, activityId: string) => { + setCurrentAgreementId: async (userId: string, agreementId: string) => { await userModel.updateOne( { _id: userId }, - { currentActivityId: activityId } + { currentAgreementId: agreementId } ); }, + terminateCurrentAgreement: async (userId: string) => { + await userModel.updateOne({ _id: userId }, { currentAgreementId: null }); + }, + invalidateCurrentDeposit: async (userId: string) => { await userModel .updateOne( diff --git a/backend/src/services/user/types.ts b/backend/src/services/user/types.ts index 4b38214..460eba0 100644 --- a/backend/src/services/user/types.ts +++ b/backend/src/services/user/types.ts @@ -27,7 +27,7 @@ export interface IUser { deposits: Deposit[]; currentAllocationId: string; currentAllocationAmount: number; - currentActivityId: string; + currentAgreementId: string; } export interface IUserService { @@ -49,6 +49,6 @@ export interface IUserService { amount: number ): Promise; getUserById(userId: UserIdType): Promise; - setCurrentActivityId(userId: UserIdType, activityId: string): Promise; + setCurrentAgreementId(userId: UserIdType, agreementId: string): Promise; getUserDTO(userId: UserIdType): Promise; } diff --git a/backend/src/services/yagna/routes.ts b/backend/src/services/yagna/routes.ts index dcdaf92..52f8a85 100644 --- a/backend/src/services/yagna/routes.ts +++ b/backend/src/services/yagna/routes.ts @@ -1,6 +1,8 @@ import fastify, { FastifyInstance, FastifyRequest } from "fastify"; import { container } from "../../di.js"; import fastifyPlugin from "fastify-plugin"; +import { jwtDecode } from "jwt-decode"; +import { merge } from "rxjs"; export const Yagna = fastifyPlugin((fastify: FastifyInstance, opts, done) => { fastify.post("/allocation", { @@ -74,6 +76,21 @@ export const Yagna = fastifyPlugin((fastify: FastifyInstance, opts, done) => { } }, }); + fastify.get("/agreement", { + onRequest: [fastify.authenticate], + handler: async (request, reply) => { + const requestUser = request.user; + const Yagna = container.cradle.Yagna; + const agreement = await Yagna.getUserAgreement(requestUser._id).catch( + (e) => { + reply.code(500).send({ + message: e.message, + }); + } + ); + reply.code(200).send(agreement); + }, + }); fastify.post("/create-agreement", { onRequest: [fastify.authenticate], handler: async (request, reply) => { @@ -124,12 +141,12 @@ export const Yagna = fastifyPlugin((fastify: FastifyInstance, opts, done) => { ); const worker = await Yagna.getUserWorker(requestUser._id); await worker.context?.activity.stop(); - if (!user?.currentActivityId) { + if (!user?.currentAgreementId) { reply.code(500).send({ message: "No agreement found", }); } else { - container.cradle.userService.setCurrentActivityId(requestUser._id, ""); + container.cradle.userService.setCurrentAgreementId(requestUser._id, ""); reply .code(201) .send(container.cradle.userService.getUserDTO(requestUser._id)); @@ -186,5 +203,36 @@ export const Yagna = fastifyPlugin((fastify: FastifyInstance, opts, done) => { }, }); - done(); + (["debitNoteEvents", "invoiceEvents", "agreementEvents"] as const).forEach( + (eventType) => { + fastify.io.of(`/${eventType}`).use((socket, next) => { + const token = socket.handshake.auth.token; + if (!token) { + next(new Error("Authentication error")); + } + next(); + }); + fastify.io.of(`/${eventType}`).on("connection", async (socket) => { + console.log("--------------"); + console.log("connected to", eventType); + console.log("--------------"); + const user = jwtDecode<{ + _id: string; + }>(socket.handshake.auth.token); + if (!user._id) { + throw new Error(`Wrong token`); + } + if (!user) { + throw new Error( + `User not found with id ${socket.handshake.auth.token}` + ); + } + const eventStream = await container.cradle.Yagna[`${eventType}`]; + eventStream.subscribe((event) => { + socket.emit("event", event); + }); + done(); + }); + } + ); }); diff --git a/backend/src/services/yagna/service.ts b/backend/src/services/yagna/service.ts index 7dcb62d..a3bfe3e 100644 --- a/backend/src/services/yagna/service.ts +++ b/backend/src/services/yagna/service.ts @@ -7,14 +7,20 @@ import { ErrorCode } from "../../errors/codes.js"; import dayjs from "dayjs"; import bigDecimal from "js-big-decimal"; +import { v4 as uuidv4 } from "uuid"; + const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); import { Worker } from "./worker.js"; import { WorkContext } from "@golem-sdk/golem-js"; import { TaskExecutor } from "@golem-sdk/task-executor"; import { formatEther, parseEther } from "viem"; +import { UUID } from "mongodb"; export class Yagna { public debitNoteEvents: Subject; + public invoiceEvents: Subject; + public agreementEvents: Subject; + private paymentService: YaTsClient.PaymentApi.RequestorService; private activityService: YaTsClient.MarketApi.RequestorService; private identityService: YaTsClient.IdentityApi.DefaultService; @@ -22,6 +28,7 @@ export class Yagna { private YagnaConfig: { appKey: string; apiUrl: string }; private lastDebitNoteEventTimestamp: string = new Date().toISOString(); private lastInvoiceEventTimestamp: string = new Date().toISOString(); + private lastAgreementEventTimestamp: string = new Date().toISOString(); private userContext = { data: new Map< string, @@ -55,7 +62,11 @@ export class Yagna { constructor(YagnaConfig: { appKey: string; apiUrl: string }) { this.YagnaConfig = YagnaConfig; + this.debitNoteEvents = new Subject(); + this.invoiceEvents = new Subject(); + this.agreementEvents = new Subject(); + const paymentClient = new YaTsClient.PaymentApi.Client({ BASE: `${YagnaConfig.apiUrl}/payment-api/v1`, HEADERS: { @@ -81,25 +92,56 @@ export class Yagna { } async makeAgreement(userId: string) { + //creating executor by task executor makes agreement const executor = await this.userContext.getExecutor(userId); + //in order to make sure agreement wont be automatically closed after 90s + //which is HARDCODED in yagna we make worker which under the hood makes activity + // which prevents agreement from closing + const worker = await this.getUserWorker(userId); + console.log("Activity created", worker.context?.activity.id); + const agreement = await executor.getAgreement(); + container.cradle.userService.setCurrentAgreementId(userId, agreement.id); + agreement.events.on("terminated", (e: any) => { + console.log("agreement terminated", e); + this.agreementEvents.next({ + agreement, + event: "terminated", + }); + }); + } - const agreement = executor.getAgreement(); + async getUserAgreement(userId: string) { + debugLog("market", "getting user agreement", userId); + const user = await container.cradle.userService.getUserById(userId); + if (!user) { + throw new Error({ code: ErrorCode.USER_NOT_FOUND }); + } + const agreementId = user.currentAgreementId; + if (!agreementId) { + return null; + } + const agreement = await this.activityService.getAgreement(agreementId); + debugLog("market", "got user agreement", agreement); + if (!agreement) { + throw new Error({ + code: ErrorCode.AGREEMENT_NOT_FOUND, + payload: { + agreementId, + }, + }); + } + return { ...agreement, id: agreementId }; } async createUserAllocation(userId: string) { - debugLog("payments", "creating user allocation", userId); const userService = container.cradle.userService; const userDeposit = await userService.getCurrentDeposit(userId); if (!userDeposit) { throw new Error({ code: ErrorCode.NO_DEPOSIT }); } - console.log("userDeposit", userDeposit); try { // @ts-ignore - - console.log("userDeposit", userDeposit.id.toString(16)); - // @ts-ignore const allocation = await this.paymentService.createAllocation({ totalAmount: formatEther(userDeposit.amount), makeDeposit: false, @@ -115,8 +157,6 @@ export class Yagna { timeout: new Date(Number(userDeposit.validTo) * 1000).toISOString(), }); - console.log("allocation", allocation); - container.cradle.userService.setCurrentAllocationId( userId, allocation.allocationId @@ -128,7 +168,6 @@ export class Yagna { // @ts-ignore } async getUserAllocation(userId: string) { - debugLog("payments", "getting user allocation", userId); const user = await container.cradle.userService.getUserById(userId); if (!user) { throw new Error({ code: ErrorCode.USER_NOT_FOUND }); @@ -138,7 +177,6 @@ export class Yagna { return null; } const allocation = await this.paymentService.getAllocation(allocationId); - debugLog("payments", "got user allocation", allocation); if (!allocation) { throw new Error({ code: ErrorCode.ALLOCATION_NOT_FOUND, @@ -224,6 +262,7 @@ export class Yagna { userId, executor.allocation.id ); + executor.allocation; return executor; } @@ -272,15 +311,7 @@ export class Yagna { newWorker.context = ctx; newWorker.setState("free"); - debugLog( - "payments", - "worker connected, agreement done", - ctx.activity.agreement.id - ); - container.cradle.userService.setCurrentActivityId( - userId, - ctx.activity.agreement.id - ); + resolve(newWorker); }) .catch((e: any) => { @@ -291,51 +322,96 @@ export class Yagna { }); } + //TODO : extract common logic + async observeDebitNoteEvents() { debugLog("payments", "observing events"); while (this.isRunning) { - debugLog( - "payments", - "fetching debit note events", - this.lastDebitNoteEventTimestamp - ); const debitNoteEvents = await this.paymentService.getDebitNoteEvents( 5, this.lastDebitNoteEventTimestamp, - 10, - //@ts-ignore - null + 10 ); - const invoiceEvents = await this.paymentService.getInvoiceEvents( - 5, - this.lastDebitNoteEventTimestamp, - 10, - //@ts-ignore - null - ); + debitNoteEvents.forEach((event) => { + if ( + dayjs(event.eventDate).isAfter( + dayjs(this.lastDebitNoteEventTimestamp) + ) + ) { + this.lastDebitNoteEventTimestamp = event.eventDate; + } + }); - debitNoteEvents.forEach((event: any) => { + debitNoteEvents.forEach(async (event: any) => { const debitNoteId = event.debitNoteId; - const debitNote = this.paymentService.getDebitNote(debitNoteId); - this.debitNoteEvents.next(debitNote); - this.lastDebitNoteEventTimestamp = event.eventDate; + const debitNote = await this.paymentService.getDebitNote(debitNoteId); + this.debitNoteEvents.next({ + debitNote, + event, + id: uuidv4(), + }); }); } + } + async observeInvoiceEvents() { + debugLog("payments", "observing events"); while (this.isRunning) { const invoiceEvents = await this.paymentService.getInvoiceEvents( 5, this.lastInvoiceEventTimestamp, - 10, - //@ts-ignore - null + 10 ); - invoiceEvents.forEach((event: any) => { - this.debitNoteEvents.next(event); - this.lastInvoiceEventTimestamp = event.eventDate; + invoiceEvents.forEach((event) => { + if ( + dayjs(event.eventDate).isAfter(dayjs(this.lastInvoiceEventTimestamp)) + ) { + this.lastInvoiceEventTimestamp = event.eventDate; + } + }); + + invoiceEvents.forEach(async (event: any) => { + console.log("invoice event", event); + const invoiceId = event.invoiceId; + const invoice = await this.paymentService.getInvoice(invoiceId); + this.invoiceEvents.next({ invoice, event, id: uuidv4() }); }); } } + async observeAgreementEvents() { + while (this.isRunning) { + const events = await this.activityService.collectAgreementEvents( + 5, + this.lastAgreementEventTimestamp, + 10 + ); + + events.forEach((event) => { + if ( + dayjs(event.eventDate).isAfter( + dayjs(this.lastAgreementEventTimestamp) + ) + ) { + this.lastAgreementEventTimestamp = event.eventDate; + } + }); + + events.forEach(async (event: any) => { + const agreementId = event.agreementId; + const agreement = await this.activityService.getAgreement(agreementId); + this.agreementEvents.next({ + id: uuidv4(), + agreement, + event, + }); + }); + } + } + async observeEvents() { + this.observeDebitNoteEvents(); + this.observeInvoiceEvents(); + this.observeAgreementEvents(); + } } diff --git a/backend/src/socket.io.ts b/backend/src/socket.io.ts index 60fd4ee..bc5057e 100644 --- a/backend/src/socket.io.ts +++ b/backend/src/socket.io.ts @@ -34,6 +34,7 @@ declare module "fastify" { {}, { user: (data: IUser) => void; + event: (data: any) => void; } >; } diff --git a/frontend/package.json b/frontend/package.json index 074ae18..274a838 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -15,6 +15,7 @@ "@tanstack/react-query": "^5.25.0", "@types/debug": "^4.1.12", "@types/ramda": "^0.29.11", + "@types/uuid": "^9.0.8", "@uidotdev/usehooks": "^2.4.1", "@wagmi/core": "^2.5.7", "@web3modal/siwe": "^4.1.11", @@ -38,6 +39,7 @@ "ts-pattern": "^5.1.1", "use-local-storage-state": "^19.2.0", "usehooks-ts": "^3.1.0", + "uuid": "^9.0.1", "viem": "~2.7.19", "wagmi": "^2.5.7" }, diff --git a/frontend/src/components/atoms/etherscanLink.tsx b/frontend/src/components/atoms/etherscanLink.tsx index 9e1aa27..9b06159 100644 --- a/frontend/src/components/atoms/etherscanLink.tsx +++ b/frontend/src/components/atoms/etherscanLink.tsx @@ -1,10 +1,17 @@ import { Link } from "react-daisyui"; import { shortTransaction } from "utils/shortTransaction"; -export const EtherScanLink = ({ hash }: { hash: `0x${string}` }) => { +export const EtherScanLink = ({ + hash, + route, +}: { + route?: string; + hash: `0x${string}`; +}) => { + if (!route) route = "tx"; return ( { + return ( + + + Agreement Created +
+
+ Agreement ID: +
+
+ ProviderId :{" "} + +
+
+
+
+ ); +}; + +const AgreementTerminatedEvent = (event: { + kind: Event.AGREEMENT_TERMINATED; + payload: Payload[Event.AGREEMENT_TERMINATED]; +}) => { + return ( + + + Agreement Terminated +
+
+ Agreement ID: +
+
+
+
+ ); +}; + const DepositExtendedEvent = (event: { kind: Event.DEPOSIT_EXTENDED; payload: Payload[Event.DEPOSIT_EXTENDED]; @@ -136,6 +180,10 @@ export const EventCard = (event: EventType) => { return ; case Event.DEPOSIT_EXTENDED: return ; + case Event.AGREEMENT_SIGNED: + return ; + case Event.AGREEMENT_TERMINATED: + return ; } })()} diff --git a/frontend/src/components/homePage/events/events.tsx b/frontend/src/components/homePage/events/events.tsx index 2e1d1bd..82dfba1 100644 --- a/frontend/src/components/homePage/events/events.tsx +++ b/frontend/src/components/homePage/events/events.tsx @@ -4,6 +4,7 @@ import { useAllocationEvents } from "hooks/events/useAllocationEvents"; import { EventCard } from "./event"; import { uniqBy } from "ramda"; import { useDepositEvents } from "hooks/events/useDepositEvents"; +import { useYagnaEvents } from "hooks/events/useYagnaEvents"; import { merge } from "rxjs"; export const Events = () => { @@ -13,11 +14,17 @@ export const Events = () => { timestamp: number; })[] >([]); + const { events$: allocationEvents$ } = useAllocationEvents(); const { events$: depositEvents$ } = useDepositEvents(); + const { events$: yagnaEvents$ } = useYagnaEvents(); useEffect(() => { - const sub = merge(allocationEvents$, depositEvents$).subscribe((event) => { + const sub = merge( + allocationEvents$, + depositEvents$, + yagnaEvents$ + ).subscribe((event) => { setEvents((prevEvents) => { return uniqBy( (e) => { @@ -32,7 +39,6 @@ export const Events = () => { setTimeout(() => { sub.unsubscribe(); }, 0); - console.log("unsubscribing"); }; }, []); diff --git a/frontend/src/components/homePage/statusSections/allocation.tsx b/frontend/src/components/homePage/statusSections/allocation.tsx index 449922a..51f1c2d 100644 --- a/frontend/src/components/homePage/statusSections/allocation.tsx +++ b/frontend/src/components/homePage/statusSections/allocation.tsx @@ -1,4 +1,4 @@ -import { AllocationLink } from "components/alloctionLink"; +import { ShortLink } from "components/shortLink"; import { GLMAmountStat } from "components/atoms/GLMAmount"; import { useCreateAllocation } from "hooks/useCreateAllocation"; import { useCurrentAllocation } from "hooks/useCurrentAllocation"; @@ -24,9 +24,7 @@ export const Allocation = () => { >
Allocation
- +
{ + +export const ShortLink = ({ id = "" }: { id?: string }) => { const { enqueueSnackbar } = useSnackbar(); const [isCopied, setIsCopied] = useState(false); const handleClick = () => { - navigator.clipboard.writeText(allocationId); + navigator.clipboard.writeText(id); setIsCopied(true); - enqueueSnackbar("Allocation Id to clipboard", { variant: "success" }); + enqueueSnackbar("Id to clipboard", { variant: "success" }); }; - const shortenedId = `${allocationId.slice(0, 3)}...${allocationId.slice(-3)}`; + const shortenedId = + id.length > 9 ? `${id.slice(0, 3)}...${id.slice(-3)}` : `${id}`; return (
diff --git a/frontend/src/hooks/depositContract/useDeposit.ts b/frontend/src/hooks/depositContract/useDeposit.ts index 64af0cb..07699c8 100644 --- a/frontend/src/hooks/depositContract/useDeposit.ts +++ b/frontend/src/hooks/depositContract/useDeposit.ts @@ -141,7 +141,6 @@ export function useUserCurrentDeposit() { } }, [isSuccess, data]); - console.log("deposit data", data); return { ...(data?.[0] === ZERO_ADDRESS ? { diff --git a/frontend/src/hooks/events/useEvents.ts b/frontend/src/hooks/events/useEvents.ts index 8fdf37f..46864bd 100644 --- a/frontend/src/hooks/events/useEvents.ts +++ b/frontend/src/hooks/events/useEvents.ts @@ -2,56 +2,20 @@ import { useCallback, useEffect, useRef, useState } from "react"; import { Subject } from "rxjs"; import { Payload, Event, ExtractPayload } from "types/events"; import useLocalStorageState from "use-local-storage-state"; - +import { v4 as uuidv4 } from "uuid"; const getId = (e: any) => e.id; -// const subject = new Subject(); - -// const initialState: Task[] = []; - -// let state = initialState; - -// export const eventStore = { -// init: (state) => { -// subject.next(state); -// }, -// subscribe: (setState: any) => { -// subject.subscribe(setState); -// }, -// addTask: (content: string) => { -// const task = { -// content, -// id: uid(), -// isDone: false, -// }; -// state = [...state, task]; -// subject.next(state); -// }, -// removeTask: (id: string) => { -// const tasks = state.filter((task) => task.id !== id); -// state = tasks; -// subject.next(state); -// }, -// completeTask: (id: string) => { -// const tasks = state.map((task) => { -// if (task.id === id) { -// task.isDone = !task.isDone; -// } -// return task; -// }); -// state = tasks; -// subject.next(state); -// }, -// initialState, -// }; - export const useEvents = ({ key, eventKind, }: { key: string; - eventKind: K; + eventKind: K | ((s: string) => K); }) => { + if (typeof eventKind !== "function") { + const kindOfEvent = eventKind; + eventKind = (s: string) => kindOfEvent; + } const [currentEvents, setCurrentEvents] = useLocalStorageState(key, { defaultValue: [], }); @@ -62,7 +26,7 @@ export const useEvents = ({ if (!events$.current) { events$.current = new Subject<{ kind: K; - payload: Payload[typeof eventKind]; + payload: Payload[typeof eventKind extends (s: string) => infer R ? R : K]; id: number; timestamp: number; }>(); @@ -72,29 +36,31 @@ export const useEvents = ({ previousEvents.current = []; } - const emit = useCallback((payload: ExtractPayload) => { - const currentEvents = JSON.parse(localStorage.getItem(key) || "[]"); - const newEvents = [ - ...currentEvents, - { - ...{ - kind: eventKind, - payload, + const emit = useCallback( + (payload: ExtractPayload, eventType: string = "") => { + const currentEvents = JSON.parse(localStorage.getItem(key) || "[]"); + const newEvents = [ + ...currentEvents, + { + ...{ + kind: eventKind(eventType), + payload, + }, + //@ts-ignore + id: payload?.id || uuidv4(), + timestamp: Date.now(), }, - id: currentEvents.length + 1, - timestamp: Date.now(), - }, - ]; - setCurrentEvents(newEvents); - }, []); + ]; + setCurrentEvents(newEvents); + }, + [] + ); useEffect(() => { if (currentEvents) { currentEvents.forEach((e: any) => { - if (e.kind === eventKind) { - if (!previousEvents.current.find((p) => getId(p) === getId(e))) { - events$.current?.next(e); - } + if (!previousEvents.current.find((p) => getId(p) === getId(e))) { + events$.current?.next(e); } }); setIsFirstRun(false); diff --git a/frontend/src/hooks/events/useYagnaEvents.tsx b/frontend/src/hooks/events/useYagnaEvents.tsx new file mode 100644 index 0000000..a1ad8a0 --- /dev/null +++ b/frontend/src/hooks/events/useYagnaEvents.tsx @@ -0,0 +1,73 @@ +import useSWRSubscription from "swr/subscription"; +import { io } from "socket.io-client"; +import { useEffect, useRef, useState } from "react"; +import { useLocalStorage } from "hooks/useLocalStorage"; +import { Subject, merge } from "rxjs"; +import { string } from "ts-pattern/dist/patterns"; +import { Event } from "types/events"; +import { match } from "ts-pattern"; +import { useEvents } from "./useEvents"; + +enum yagnaEventTopic { + debitNote = "debitNote", + invoice = "invoice", + agreement = "agreement", +} + +//As ya-ts-client doet not provide the event type, we need to create a mapper + +export const getEventKind = (yagnaEventType: string): Event => { + console.log("yagnaEventType", yagnaEventType); + return match(yagnaEventType) + .with("AgreementTerminatedEvent", () => Event.AGREEMENT_TERMINATED) + .with("AgreementApprovedEvent", () => Event.AGREEMENT_SIGNED) + .with("InvoiceCreatedEvent", () => Event.NEW_INVOICE) + .otherwise(() => { + throw new Error(`Unknown event type: ${yagnaEventType}`); + }); +}; + +const socketFactory = (eventEndpoint: yagnaEventTopic) => + io(`http://localhost:5174/${eventEndpoint}Events`, { + autoConnect: false, + }); + +export const useYagnaEvent = (event: yagnaEventTopic) => { + const [accessToken] = useLocalStorage("accessToken"); + + const socketRef = useRef(socketFactory(event)); + + const { events$, emit } = useEvents({ + key: `yagna${event.toUpperCase()}Events`, + eventKind: getEventKind, + }); + + useEffect(() => { + if (accessToken) { + socketRef.current.auth = { token: accessToken }; + socketRef.current.connect(); + + socketRef.current.on("event", (data: any) => { + emit({ id: data.id, ...data[event] }, data.event.eventType); + }); + } + }, [accessToken]); + + return { + events$, + }; +}; + +export const useYagnaEvents = () => { + const { events$: debitNoteEvents$ } = useYagnaEvent( + yagnaEventTopic.debitNote + ); + const { events$: invoiceEvents$ } = useYagnaEvent(yagnaEventTopic.invoice); + const { events$: agreementEvents$ } = useYagnaEvent( + yagnaEventTopic.agreement + ); + + return { + events$: merge(debitNoteEvents$, invoiceEvents$, agreementEvents$), + }; +}; diff --git a/frontend/src/types/events.ts b/frontend/src/types/events.ts index 9f94ab7..02b7c1a 100644 --- a/frontend/src/types/events.ts +++ b/frontend/src/types/events.ts @@ -38,10 +38,7 @@ export type Payload = { amount: number; validityTimestamp: number; }; - [Event.AGREEMENT_SIGNED]: { - providerId: string; - agreementId: string; - }; + [Event.FILE_SCAN_OK]: { fileId: string; }; @@ -53,9 +50,17 @@ export type Payload = { invoiceId: string; transactionId?: string; }; + + [Event.AGREEMENT_SIGNED]: { + agreementId: string; + offer: { + providerId: `0x${string}`; + }; + }; [Event.AGREEMENT_TERMINATED]: { agreementId: string; }; + [Event.PROVIDER_PAID]: { agreementId: string; }; diff --git a/frontend/src/types/golemResource.ts b/frontend/src/types/golemResource.ts new file mode 100644 index 0000000..48a43c0 --- /dev/null +++ b/frontend/src/types/golemResource.ts @@ -0,0 +1,5 @@ +export enum GolemResource { + ALLOCATION = "allocation", + AGREEMENT = "agreement", + ACTIVITY = "activity", +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 2f7c2fa..c3a1a5a 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -145,6 +145,9 @@ importers: '@types/ramda': specifier: ^0.29.11 version: 0.29.12 + '@types/uuid': + specifier: ^9.0.8 + version: 9.0.8 '@uidotdev/usehooks': specifier: ^2.4.1 version: 2.4.1(react-dom@18.2.0)(react@18.2.0) @@ -214,6 +217,9 @@ importers: usehooks-ts: specifier: ^3.1.0 version: 3.1.0(react@18.2.0) + uuid: + specifier: ^9.0.1 + version: 9.0.1 viem: specifier: ~2.7.19 version: 2.7.22(typescript@5.4.3) diff --git a/ya-ts-client b/ya-ts-client index 582202d..6aae28b 160000 --- a/ya-ts-client +++ b/ya-ts-client @@ -1 +1 @@ -Subproject commit 582202db3c271666711f43e45202064414449278 +Subproject commit 6aae28bee2ffb55d43f7548b7bf91350770fb8c9