Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflows #1

Merged
merged 21 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions .github/workflows/compute.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: "the name of the plugin"
name: "Workflow Functions"

on:
workflow_dispatch:
Expand All @@ -18,12 +18,24 @@ on:

jobs:
compute:
name: "plugin name"
name: "Telegram-Bot"
runs-on: ubuntu-latest
permissions: write-all
env:
SUPABASE_URL: ${{ secrets.SUPABASE_URL }}
SUPABASE_KEY: ${{ secrets.SUPABASE_KEY }}

BOT_TOKEN: ${{ secrets.BOT_TOKEN }}
BOT_MODE: ${{ secrets.BOT_MODE }}
BOT_ADMINS: ${{ secrets.BOT_ADMINS }}
BOT_WEBHOOK: ${{ secrets.BOT_WEBHOOK }}
BOT_WEBHOOK_SECRET: ${{ secrets.BOT_WEBHOOK_SECRET }}

TELEGRAM_APP_ID: ${{ secrets.TELEGRAM_APP_ID }}
TELEGRAM_API_HASH: ${{ secrets.TELEGRAM_API_HASH }}

LOG_LEVEL: ${{ secrets.LOG_LEVEL }}
DEBUG: ${{ secrets.DEBUG }}

steps:
- uses: actions/checkout@v4
Expand All @@ -37,8 +49,20 @@ jobs:
run: yarn

- name: execute directive
run: npx tsx ./src/main.ts
id: plugin-name
run: npx tsx ./src/workflow-entry.ts
id: telegram-bot
env:
SUPABASE_URL: ${{ secrets.SUPABASE_URL }}
SUPABASE_KEY: ${{ secrets.SUPABASE_KEY }}

BOT_TOKEN: ${{ secrets.BOT_TOKEN }}
BOT_MODE: ${{ secrets.BOT_MODE }}
BOT_ADMINS: ${{ secrets.BOT_ADMINS }}
BOT_WEBHOOK: ${{ secrets.BOT_WEBHOOK }}
BOT_WEBHOOK_SECRET: ${{ secrets.BOT_WEBHOOK_SECRET }}

TELEGRAM_APP_ID: ${{ secrets.TELEGRAM_APP_ID }}
TELEGRAM_API_HASH: ${{ secrets.TELEGRAM_API_HASH }}

LOG_LEVEL: ${{ secrets.LOG_LEVEL }}
DEBUG: ${{ secrets.DEBUG }}
5 changes: 2 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
"worker": "wrangler dev --env dev --port 3000",
"dev": "tsc-watch --onSuccess \"tsx ./src/main.ts\"",
"start": "tsx ./src/main.ts",
"deploy": "wrangler deploy --minify src/main.ts",
"build": "npx tsx build/esbuild-build.ts",
"serve": "npx tsx build/esbuild-server.ts"
"deploy": "wrangler deploy --minify src/main.ts"
},
"keywords": [
"typescript",
Expand Down Expand Up @@ -55,6 +53,7 @@
"grammy-guard": "0.5.0",
"hono": "^4.5.9",
"iso-639-1": "3.1.2",
"octokit": "^4.0.2",
"pino": "9.3.2",
"pino-pretty": "11.2.2",
"telegram": "^2.24.11",
Expand Down
69 changes: 43 additions & 26 deletions src/handlers/callbacks-proxy.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,8 @@
import { Context, SupportedEvents, SupportedEventsU } from "../types";
import { ProxyCallbacks } from "#root/types/proxy.js";
import { Context, SupportedEventsU } from "../types";
import { createChat } from "../workflow-functions/create-chat";
import { closeWorkroom, createWorkroom, reOpenWorkroom } from "./github/workrooms";

export type CallbackResult = { status: 200 | 201 | 204 | 404 | 500, reason: string; content?: string | Record<string, any> };

/**
* The `Context` type is a generic type defined as `Context<TEvent, TPayload>`,
* where `TEvent` is a string representing the event name (e.g., "issues.labeled")
* and `TPayload` is the webhook payload type for that event, derived from
* the `SupportedEvents` type map.
*
* The `ProxyCallbacks` type is defined using `Partial<ProxyTypeHelper>` to allow
* optional callbacks for each event type. This is useful because not all events
* may have associated callbacks.
*
* The expected function signature for callbacks looks like this:
*
* ```typescript
* fn(context: Context<"issues.labeled", SupportedEvents["issues.labeled"]>): Promise<Result>
* ```
*/

type ProxyCallbacks = ProxyTypeHelper;
type ProxyTypeHelper = {
[K in SupportedEventsU]: Array<(context: Context<K, SupportedEvents[K]>) => Promise<CallbackResult>>;
};

/**
* Why do we need this wrapper function?
*
Expand All @@ -39,7 +17,6 @@ type ProxyTypeHelper = {
function handleCallback(callback: Function, context: Context) {
return callback(context);
}

/**
* The `callbacks` object defines an array of callback functions for each supported event type.
*
Expand Down Expand Up @@ -93,3 +70,43 @@ export function proxyCallbacks(context: Context): ProxyCallbacks {
},
});
}

/**
* These are function which get dispatched by this worker to fire off workflows
* in the repository. We enter through the main `compute.yml` just like a typical
* action plugin would, we forward the same payload that the worker received to
* the workflow the same way that the kernel does.
*
* - First event fires, `issues.labeled` and the worker catches it.
* - The worker then dispatches a workflow to `compute.yml` with the event name as the input.
* - The workflow receives a `issues.labeled` payload but eventName is now WorkflowFunction (`create-telegram-chat`).
* - The workflow then runs the `createChat` function which needs a node env to run.
*
* I.e we're essentially running the first dual action/worker plugin which is
* ideal for telegram-bot as it's a bot that needs to be able to be super flexible.
*/
const workflowCallbacks = {
"issues.labeled": [
createChat
]
} as ProxyCallbacks;


export function proxyWorkflowCallbacks(context: Context): ProxyCallbacks {
return new Proxy(workflowCallbacks, {
get(target, prop: SupportedEventsU) {
if (!target[prop]) {
context.logger.info(`No callbacks found for event ${prop}`);
return { status: 204, reason: "skipped" };
}
return (async () => {
try {
return await Promise.all(target[prop].map((callback) => handleCallback(callback, context)));
} catch (er) {
context.logger.error(`Failed to handle event ${prop}`, { er });
return { status: 500, reason: "failed" };
}
})();
},
});
}
31 changes: 4 additions & 27 deletions src/handlers/github/workrooms.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { Chat } from "#root/adapters/supabase/helpers/chats.js";
import { CallbackResult } from "#root/types/proxy.js";
import { TelegramBotSingleton } from "#root/utils/telegram-bot-single.js";
import { Context, SupportedEvents } from "../../types";
import { CallbackResult } from "../callbacks-proxy";
import { repositoryDispatch } from "../repository-dispatch";
import { addCommentToIssue } from "./utils/add-comment-to-issues";

/**
Expand All @@ -20,32 +21,8 @@ import { addCommentToIssue } from "./utils/add-comment-to-issues";
*/

export async function createWorkroom(context: Context<"issues.labeled", SupportedEvents["issues.labeled"]>): Promise<CallbackResult> {
const { logger, config, adapters: { supabase: { chats } } } = context;
const bot = TelegramBotSingleton.getInstance().getBot();
const title = context.payload.issue.title
const { issue, repository } = context.payload;
const { full_name } = repository;
const [owner, repo] = full_name.split("/");

const workroom = await chats.getChatByTaskNodeId(issue.node_id);

if (workroom) {
logger.debug("Workroom already exists for issue", { title });
return { status: 404, reason: "workroom_already_exists" };
}

logger.info(`Creating workroom for issue ${title}`);

try {
const forum = await bot.api?.createForumTopic(config.supergroupChatId, title);
await addCommentToIssue(context, `Workroom created: https://t.me/${config.supergroupChatName}/${forum?.message_thread_id}`, owner, repo, issue.number);
await chats.saveChat(forum?.message_thread_id, title, issue.node_id);
return { status: 201, reason: "workroom_created" };
} catch (er) {
await addCommentToIssue(context, logger.error(`Failed to create workroom for issue ${title}`, { er }).logMessage.diff, owner, repo, issue.number);
return { status: 500, reason: "workroom_creation_failed" };
}

await repositoryDispatch(context, "create-telegram-chat").catch(console.error);
return { status: 200, reason: "workflow_dispatched" };
}

export async function closeWorkroom(context: Context<"issues.closed", SupportedEvents["issues.closed"]>): Promise<CallbackResult> {
Expand Down
52 changes: 52 additions & 0 deletions src/handlers/repository-dispatch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { getAppOctokit } from "#root/helpers/authenticated-octokit.js";
import { PluginContext } from "#root/utils/plugin-context-single.js";
import { Context } from "../types";

/**
* Used by the worker instance to kick off workflows within it's own repository.
*
* These workflows are extensions of the worker allowing for more complex operations
* to be performed outside of Cloudflare Workers' limitations.
*
* @param env The environment variables for the worker instance. These
* will be taken from the repository's secrets.
* @param args The arguments passed to the workflow.
*
*/

export async function repositoryDispatch(context: Context, workflow: string) {
const inputs = PluginContext.getInstance().getInputs();
const { logger } = context;
const repository = "telegram--bot";
const owner = "ubq-testing";
const branch = "workflows";
const app = await getAppOctokit(context);
const installation = await app.octokit.rest.apps.getRepoInstallation({ owner, repo: repository });

// Set the installation id for the octokit instance

const octokit = await app.getInstallationOctokit(installation.data.id);

logger.info(`Dispatching workflow function: ${workflow}`);


/**
* We'll hit the main workflow entry and pass in the same inputs so
* that it essentially runs on the same context as the worker.
*/

Reflect.deleteProperty(inputs, "signature");

return await octokit.rest.actions.createWorkflowDispatch({
owner,
repo: repository,
workflow_id: "compute.yml",
ref: branch,
inputs: {
...inputs,
eventPayload: JSON.stringify(context.payload),
settings: JSON.stringify(context.config),
}
});
}

7 changes: 7 additions & 0 deletions src/helpers/authenticated-octokit.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { App } from "octokit";
import { Context } from "../types";

export async function getAppOctokit(context: Context) {
const { env: { APP_ID, APP_PRIVATE_KEY } } = context;
return new App({ appId: APP_ID, privateKey: APP_PRIVATE_KEY });
}
22 changes: 3 additions & 19 deletions src/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,16 @@ import { Env, PluginInputs } from "./types";
import { Context } from "./types";
import { PluginContext } from "./utils/plugin-context-single";
import { proxyCallbacks } from "./handlers/callbacks-proxy";
import { LogReturn } from "@ubiquity-dao/ubiquibot-logger";
import { addCommentToIssue } from "./handlers/github/utils/add-comment-to-issues";
import { bubbleUpErrorComment, sanitizeMetadata } from "./utils/errors";

export async function runPlugin(context: Context) {
const { logger, eventName } = context;
const { eventName } = context;

try {
return proxyCallbacks(context)[eventName]
} catch (err) {
let errorMessage;
if (err instanceof LogReturn) {
errorMessage = err;
} else if (err instanceof Error) {
errorMessage = context.logger.error(err.message, { error: err });
} else {
errorMessage = context.logger.error("An error occurred", { err });
}
await addCommentToIssue(context, `${errorMessage?.logMessage.diff}\n<!--\n${sanitizeMetadata(errorMessage?.metadata)}\n-->`);
return bubbleUpErrorComment(context, err)
}

logger.error(`Unsupported event: ${eventName}`);
}


function sanitizeMetadata(obj: LogReturn["metadata"]): string {
return JSON.stringify(obj, null, 2).replace(/</g, "&lt;").replace(/>/g, "&gt;").replace(/--/g, "&#45;&#45;");
}

/**
Expand Down
9 changes: 4 additions & 5 deletions src/types/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@ const allowedUpdates = T.Object({

export const env = T.Object({
BOT_TOKEN: T.String(),
BOT_MODE: T.String(),
LOG_LEVEL: T.String(),
DEBUG: T.Transform(T.Union([T.String(), T.Boolean()])).Decode((str) => str === "true" || str === "false" ? str === "true" : str).Encode((bool) => bool.toString()),
BOT_WEBHOOK: T.String(),
BOT_WEBHOOK_SECRET: T.String(),
SERVER_HOST: T.String(),
SERVER_PORT: T.Transform(T.Unknown()).Decode((str) => Number(str)).Encode((num) => num.toString()),
BOT_ADMINS: T.Transform(T.Unknown()).Decode((str) => Array.isArray(str) ? str.map(Number) : [Number(str)]).Encode((arr) => arr.toString()),
ALLOWED_UPDATES: T.Optional(T.Array(T.KeyOf(allowedUpdates))),
SUPABASE_URL: T.String(),
SUPABASE_KEY: T.String(),
TELEGRAM_APP_ID: T.Transform(T.Unknown()).Decode((str) => Number(str)).Encode((num) => num.toString()),
TELEGRAM_API_HASH: T.String(),
APP_ID: T.Transform(T.Unknown()).Decode((str) => Number(str)).Encode((num) => num.toString()),
APP_PRIVATE_KEY: T.Transform(T.Unknown()).Decode((str) => String(str)).Encode((str) => str),
});

/**
Expand Down
25 changes: 25 additions & 0 deletions src/types/proxy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { Context, SupportedEvents, SupportedEventsU } from "./context";

export type CallbackResult = { status: 200 | 201 | 204 | 404 | 500, reason: string; content?: string | Record<string, any> };

/**
* The `Context` type is a generic type defined as `Context<TEvent, TPayload>`,
* where `TEvent` is a string representing the event name (e.g., "issues.labeled")
* and `TPayload` is the webhook payload type for that event, derived from
* the `SupportedEvents` type map.
*
* The `ProxyCallbacks` object is cast to allow optional callbacks
* for each event type. This is useful because not all events may have associated callbacks.
* As opposed to Partial<ProxyCallbacks> which could mean an undefined object.
*
* The expected function signature for callbacks looks like this:
*
* ```typescript
* fn(context: Context<"issues.labeled", SupportedEvents["issues.labeled"]>): Promise<Result>
* ```
*/

type ProxyTypeHelper = {
[K in SupportedEventsU]: Array<(context: Context<K, SupportedEvents[K]>) => Promise<CallbackResult>>;
};
export type ProxyCallbacks = ProxyTypeHelper;
5 changes: 5 additions & 0 deletions src/types/typeguards.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,8 @@ export function isGithubPayload(inputs: any): inputs is PluginInputs {
return false;
}
}


export function isIssueLabeledEvent(context: Context): context is Context<"issues.labeled"> {
return context.eventName === "issues.labeled";
}
18 changes: 18 additions & 0 deletions src/utils/errors.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,24 @@
import { LogReturn } from "@ubiquity-dao/ubiquibot-logger";
import { Context } from "../types";
import { addCommentToIssue } from "#root/handlers/github/utils/add-comment-to-issues.js";

export function handleUncaughtError(error: unknown) {
console.error(error);
const status = 500;
return new Response(JSON.stringify({ error }), { status: status, headers: { "content-type": "application/json" } });
}
export function sanitizeMetadata(obj: LogReturn["metadata"]): string {
return JSON.stringify(obj, null, 2).replace(/</g, "&lt;").replace(/>/g, "&gt;").replace(/--/g, "&#45;&#45;");
}

export async function bubbleUpErrorComment(context: Context, err: unknown) {
let errorMessage;
if (err instanceof LogReturn) {
errorMessage = err;
} else if (err instanceof Error) {
errorMessage = context.logger.error(err.message, { error: err });
} else {
errorMessage = context.logger.error("An error occurred", { err });
}
await addCommentToIssue(context, `${errorMessage?.logMessage.diff}\n<!--\n${sanitizeMetadata(errorMessage?.metadata)}\n-->`);
}
Loading
Loading