Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task Executors in Other Languages #483

Open
5 of 6 tasks
jcgsville opened this issue Jul 18, 2024 · 6 comments
Open
5 of 6 tasks

Task Executors in Other Languages #483

jcgsville opened this issue Jul 18, 2024 · 6 comments
Assignees

Comments

@jcgsville
Copy link
Contributor

jcgsville commented Jul 18, 2024

Intro

It is not practical to use Node.js for all asynchronous work. In green field development, machine learning and other data-intensive or compute-intensive applications often mandate the use of other languages. Some teams like mine may also need to interact with C# APIs and other aspects of the Microsoft ecosystem. We would like to use Node.js for most task executors related to business logic and web application development, but selectively implement some task executors in other languages.

Another use case would be using Graphile Worker without any usage of Node.js. In my opinion, the most powerful aspects of Graphile Worker are 1 - the ability to enqueue a job within the same transaction as writing to your application data store and 2 - Postgres's LISTEN/NOTIFY starting jobs in milliseconds. My understanding is that neither of these super powers require Node.js, and expanding Graphile Worker to other language ecosystems would allow many more engineers to leverage Postgres as a job queue.

Additionally, I can imagine many companies and teams may have legacy code written in other languages that would be impractical to rewrite in Node.js. Supporting task execution in other languages may provide those teams with an on-ramp to move to Graphile Worker more gradually.

Today, Graphile Worker supports task executors in other languages via loading executable files. This works well if there is little startup cost. However, some languages and some specific use cases may have a large startup cost. Examples would be loading a large python project, or loading a large parameters file from disk for a deep learning task.

In an ideal world with infinite developer capacity, it would be cool to see versions of Graphile Worker in many popular languages that are identical at the postgres layer so that people could easily use one language or several languages interchangeably with the same queue store. Regardless of the existence of versions of Graphile Worker in other languages, I think it makes sense for today's Graphile Worker to add support for task executors in arbitrary languages by communicating over unix sockets with local servers containing task executors.

Technical Proposal

I propose we add an opt-in feature to Graphile Worker that allows local servers to host task executors to process jobs. For the purposes of this proposal, I will call these local servers "sidecar servers" to distinguish them from Graphile Worker's primary Node.js process. Feel free to suggest alternative names 🙂

I have spent some time trying to understand how Graphile Worker's internals work to propose a set of changes. Feel free to suggest many changes to my proposal or an entirely different approach if I missed or misunderstood some aspects of Graphile Worker 🙂.

LoadTaskFromSidecarServerPlugin.ts

Most of the changes to Graphile Worker will be in the form of this new plugin which will not be included in the default preset.

This plugin will add a hook called loadTasksFromSidecarServer. This hook will send an initial message to the sidecar server requesting a list of task identifiers that it supports. The sidecar server will respond with a message with a list of task identifiers that it supports. Similar to LoadTaskFromExecutableFilePlugin, loadTasksFromSidecarServer will make a task function for each identifier. This task function will send the full job information over the socket to the sidecar server. When the sidecar server responds with a message indicating it has completed the job, the function will resolve. This plugin will use a single shared connection to the sidecar server.

The plugin will need to be configured with a file path at which it will open a socket for communication to and from the sidecar server. Both the Graphile Worker Node.js process and the sidecar server must have permissions to read and write from that file. I am not exactly sure how the file path will be configured in graphile worker, as I'm not yet deeply familiar with Graphile Config. I think when the plugin is in use, it can pull from a property like worker.sidecarServerSocketPath or something like that.

The mechanism by which the runner and CLI gets tasks will need to be modified to support the new option, as the current mechanisms are pretty tightly coupled to the notion of a task directory. We could either add a condition for this new mechanism or we could create a bit more of a decoupled interface so that future task loaders can be added with fewer changes to CLI and runner. I imagine that decoupled interface would look like changing task loading to a single loadTasks hook that each plugin can implement.

The Sidecar Server

The sidecar server will be very simple. It needs to listen to the socket at the same path as configured in the Graphile Worker Node.js process. Its primary function will be to route jobs to calls to task executors according to the task identifier of the job. The sidecar will respond on the same socket with a success message when a job is complete.

The messages sent between the Graphile Worker Node.js process and the sidecar server will be serialized as JSON.

There may need to be some heart beat to ensure that the connection between to two processes has not broken. I don't think this is necessary, though.

Breaking changes

By being opt-in, this feature will introduce no breaking changes to Graphile Worker. There will need to be some changes to cli.ts and runner.ts which will change code on the critical execution path for all Graphile Worker users. These changes should not introduce any incompatibility for existing users of Graphile Worker. If we desire to minimize or eliminate changes to CLI or runner, we could pursue a different implementation that relies on a task directory, or at least a simulated task directory.

Supporting development

I:

  • am interested in building this feature myself
  • am interested in collaborating on building this feature
  • am willing to help testing this feature before it's released
  • am willing to write a test-driven test suite for this feature (before it exists)
  • am a Graphile sponsor ❤️
  • have an active support or consultancy contract with Graphile
@benjie
Copy link
Member

benjie commented Aug 6, 2024

The interesting thing about this proposal is that it doesn't use the filesystem as a source of truth. Previously I was anticipating that tasks, even in other languages, would involve a file in the tasks directory that would get recognized. This could still be done with a slightly modified version of this proposal (you'd create a tasks/my-task-identifier.sidecar file or similar for each task, and inside that would be the configuration of what to use - which service to connect to, which endpoint to call, how to do so, etc). I still prefer this approach, because it gives each Worker control over which tasks it executes, but I also like the idea of Worker being used as a queue only, and the task specifics coming from another server.

To accomplish this goal, you will need to add another hook. We have this hook currently:

await compiledSharedOptions.hooks.process("loadTaskFromFiles", event);

This is used for each task identifier that we scan from the tasks/ folder. But you want to add more things that don't exist in there, so you will need to add another hook that has access to the tasks object:

const tasks: TaskList = Object.create(null);

I would do it after all the tasks have been loaded from files, that would make the hook most useful since it gives people a chance to tweak the tasks that are already loaded, as well as adding their own:

await compiledSharedOptions.hooks.process("loadTasks", event);

Though now I come to think of it... Perhaps it would be better to instead wrap the internals of getTasks with a middleware:

worker/src/getTasks.ts

Lines 97 to 99 in 010feac

const result = await getTasksInternal(compiledSharedOptions, taskPath);
// This assign is used in `__tests__/getTasks.test.ts`
return Object.assign(result, { compiledSharedOptions });

Specifically I'm thinking:

export async function getTasks(
  options: SharedOptions,
  taskPath: string,
): Promise<WatchedTaskList> {
  const compiledSharedOptions = processSharedOptions(options);
  return compiledSharedOptions.middleware.run(
    "getTasks",
    { compiledSharedOptions, taskPath },
    ({ compiledSharedOptions, taskPath }) => {
      const result = await getTasksInternal(compiledSharedOptions, taskPath);
      // This assign is used in `__tests__/getTasks.test.ts`
      return Object.assign(result, { compiledSharedOptions });
    }
  );
}

This would enable maximum flexibility, including the ability to prevent reading tasks from the filesystem entirely. This would require adding middleware support to Graphile Worker, but I recently added support to graphile-config so it should be relatively straightforward to do so.

If you want to go this route; the first step would be adding middleware support to Worker.

Here's how grafserv creates middleware:

https://github.com/graphile/crystal/blob/502f13db09c101c34a05aeb168abf10aa6ebc587/grafast/grafserv/src/hooks.ts#L17-L25

And here's where Worker creates hooks:

worker/src/lib.ts

Lines 237 to 260 in 010feac

const hooks = new AsyncHooks<GraphileConfig.WorkerHooks>();
compiled = {
version,
maxMigrationNumber: MAX_MIGRATION_NUMBER,
breakingMigrationNumbers: BREAKING_MIGRATIONS,
events,
logger,
workerSchema,
escapedWorkerSchema,
_rawOptions: options,
hooks,
resolvedPreset,
};
applyHooks(
resolvedPreset.plugins,
(p) => p.worker?.hooks,
(name, fn, plugin) => {
const context: WorkerPluginContext = compiled!;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const cb = ((...args: any[]) => fn(context, ...args)) as any;
cb.displayName = `${plugin.name}_hook_${name}`;
hooks.hook(name, cb);
},
);

Hopefully you can see the parallels. Note that applyHooks has been renamed to orderedApply in the latest graphile-config.

@jcgsville
Copy link
Contributor Author

Thanks for the suggestions and pointers. I read through the graphile config docs, and didn't see any explanation of how graphile config defines events / hooks / middleware and how one ought to use them. Can you point me towards anywhere they are discussed in writing? Happy also to add to the docs once I understand how they work

@benjie
Copy link
Member

benjie commented Aug 7, 2024

There aren't really any docs for it yet; I'm afraid you'll have to rely on the TypeScript types and finding examples like the ones I have linked. The parts you're asking about are solely intended for use as library authors right now, rather than end consumers, so I've not invested the time in writing documentation for them yet.

@jcgsville
Copy link
Contributor Author

I was originally thinking of the sidecar server as conceptually "part" of the worker. In this framing, letting the sidecar tell the worker what task identifiers it supports still conforms to the idea of "[giving] each Worker control over which tasks it executes". I was optimizing for allowing a task to be defined in one place - the sidecar - vs having to be defined in the sidecar and the node.js worker.

That said, upon further reflection, I think defining the sidecar as part of the worker is a bit of a stretch, and it seems pretty reasonable to have a .sidecar file to keep the filesystem as the source of truth for now. It's definitely a simpler lift.

I'm moving forward with that direction for now. In the future, I may revisit the idea of adding the middleware for more dynamic task loading.

@jcgsville
Copy link
Contributor Author

I got a little proof of concept of a plugin working with lots of things that I'd still improve and add.

Plugin POC
import { GraphileConfig } from "graphile-config";
import { createConnection } from "net";

import { Task } from "../index.js";
import { version } from "../version.js";

const SIDECAR_EXTENSION = ".sidecar";

declare global {
  namespace GraphileConfig {
    interface WorkerOptions {
      /**
       * Port used to communicate with the sidecar server, if enabled.
       */
      taskSidecarPort?: number;
    }
  }
}

export const LoadTaskFromSidecarFilePlugin: GraphileConfig.Plugin = {
  name: "LoadTaskFromSidecarFilePlugin",
  version,

  worker: {
    hooks: {
      init(ctx) {
        console.log("initing LoadTaskFromSidecarFilePlugin");
        if (!ctx.resolvedPreset.worker.taskSidecarPort) {
          ctx.logger.warn("Sidecar server port not set, so sidecar plugin will be disabled");
        }
      },
      async loadTaskFromFiles(ctx, details) {
        // Check it hasn't already been handled
        if (details.handler) {
          return;
        }

        if (!ctx.resolvedPreset.worker.taskSidecarPort) {
          return;
        }

        const { fileDetailsList, taskIdentifier } = details;

        let sidecarFileFound = false;

        for (const fileDetails of fileDetailsList) {
          if (fileDetails.extension === SIDECAR_EXTENSION) {
            sidecarFileFound = true;
            break;
          }
        }

        if (!sidecarFileFound) {
          // Don't know how to handle; skip
          return;
        }

        ctx.logger.debug(`Making sidecar task '${taskIdentifier}'`, {
          taskIdentifier,
        });
        details.handler = makeTaskForSidecar(taskIdentifier, ctx.resolvedPreset.worker.taskSidecarPort);
      },
    },
  },
};

function makeTaskForSidecar(taskIdentifier: string, taskSidecarPort: number): Task {
  return (payload, helpers) => {
    return new Promise((resolve, reject) => {
      helpers.logger.debug(`Running sidecar task '${taskIdentifier}'`, {
        taskIdentifier,
      });

      // TODO: How do I share the connection between all tasks?
      // TODO: How can I get the path from the graphile config?
      const sidecarConnection = createConnection({ port: taskSidecarPort });

      sidecarConnection.on('error', (err: Error) => {
        helpers.logger.error(`Error connecting to sidecar server: ${err.message}`);
        reject(err);
      });

      sidecarConnection.on('connect', () => {
        sidecarConnection.write(JSON.stringify({
          payload,
          taskIdentifier,
          jobId: helpers.job.id,
        }));
      });

      sidecarConnection.on('data', (data) => {
        try {
          const response = JSON.parse(data.toString());
          if (response.error) {
            reject(new Error(response.error));
          } else {
            resolve(response.result);
          }
        } catch (err) {
          reject(err);
        }
        sidecarConnection.end();
      });
    });
  };
}

Before I continue, I have a couple open questions:

1st Party v 3rd Party

I am unsure whether this feature deserves to be in 1st party code base, or if it should be a third party plugin. Does anyone have any thoughts on this?

Worker Preset

If it should be in the first party code base, should it be included in the Worker Preset such that all you have to do is provide the necessary configuration to enable it? Or should we export the plugin and allow the user to extend the default preset with this plugin?

Sharing the Connection

With the current set of plugin hooks supported in graphile worker, I am not sure how I could initiate the connection and share it across all task executions. If I understand correctly, we either need an addition to the supported hooks, or I would have to add logic to getUtilsAndReleasersFromOptions() that initiates the connection similar to how the postgres pool is created. I think we probably don't want to do that, as that leaks the sidecar concept into somewhat unrelated parts of the code.

Is there a reason that the init hook's type is synchronous?

init(): void;

Here is where the init hook is called, so maybe this would have to be updated to be asynchronous. That would be a pretty invasive change, maybe even a breaking change. It looks like processSharedOptions() is used in some synchronous functions that are exported as synchronous.

Promise.resolve(hooks.process("init")).catch((error) => {

I don't think having a new, async hook would solve the problem as the function where's it's used would still need to be updated to be async.

@jcgsville
Copy link
Contributor Author

jcgsville commented Dec 3, 2024

It just occurred to me that I could initiate the connection in loadTaskFromFiles(), but that feels a bit icky, as that is somewhat outside the scope of "loading tasks". Maybe I'm being too precious, though

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants