Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Add auto-generated plugins to import data to Splitgraph from all supported data sources (Airbyte, Singer Taps, etc), and add export plugin to export from Splitgraph to Seafowl via Splitgraph GraphQL API #20

Open
wants to merge 33 commits into
base: main
Choose a base branch
from

Conversation

milesrichardson
Copy link
Contributor

WIP

Example (initialization part is pseudocode as it needs to be cleaned up)

// This part is pseudocode as it needs to be cleaned up (some junk needs to be passed as plugin args atm)
const { db } = makeSplitgraphHTTPContext({
    plugins: [
      new AirbyteGithubImportPlugin(),
    ],
    authenticatedCredential: {
      apiKey: credential.apiKey,
      apiSecret: credential.apiSecret,
      anonymous: false,
    },
  });

const res = await db.importData(
  "airbyte-github",
  {
    credentials: {
      credentials: {
        personal_access_token: GITHUB_PAT_SECRET,
      },
    },
    params: {
      repository: "splitgraph/seafowl",
      start_date: "2021-06-01T00:00:00Z",
    },
  },
  {
    namespace: namespace,
    repository: "madatdata-test-github-ingestion",
    tables: [
      {
        name: "stargazers",
        options: {
          airbyte_cursor_field: ["starred_at"],
          airbyte_primary_key_field: [],
        },
        schema: [],
      },
    ],
  }
);

type-checking/autocomplete works based on which plugin is specified in the first parameter:

image

Result: https://www.splitgraph.com/miles/madatdata-test-github-ingestion/latest/-/tables

…lugin`

Keep interface and behavior exactly the same; tests and typechecks
now pass, and CSV-specific behavior is contained only in `SplitgraphImportCSVPlugin`.
…this.DerivedClass`

In the `withOptions` builder method, which is defined in the abstract
base class but which returns an instance of the derived class (with
an obviously unknown name), return an instance of the derived class
by referencing its constructor with `Object.getPrototypeOf(this).constructor`,
instead of forcing the derived class to store a reference to itself in
a property variable. This is arguably a less hacky method, but still
depends on the assumption that the derived class does not override
the constructor (but if it does, it can always override the builder
method as well).
Some functions were missing any annotation, and they are used
only in derived classes in order for methods in the base class
to call them, so there is no need for them to be public (which
is the default when there is no annotation).
Instead of just `Csv`, use e.g. `CsvTableParamsSchema`, which makes
them globally unique and also unique within each plugin (which has
a generated interface for `TableParamsSchema`, `ParamsSchema`,
and `CredentialsSchema`).
I was hoping this would also upgrade Babel to latest so that a bug
where it throws a syntax error for TypeScript 5.0 feature of `const` type
parameters would allow using them in files with GQL queries, but alas
it does not fix it. Still, upgrade those packages.
This allows returning an interface from a factory function for
creating classes (so that autogenerated code can simply call that
factory function instead of implementing a class itself), where
the static property of PluginName is known and inferrable ahead of time.
…r auto-generated plugins

This doesn't actually work for auto-generated plugins yet, but
it's almost there. Basically the auto-generated code just needs
to call this function to create a class that can be instantiated
just like SplitgraphImportCsvPlugin.
…td plugin

This is currently manually created, but the idea is that a file like
this will be auto-generated for each plugin. WIP
…thub

If environment variable `VITE_TEST_GITHUB_PAT_SECRET` is defined, then
run an integration test which ingests data from the Seafowl GitHub
repository into a Splitgraph repo `madatdata-test-github-ingestion`
under the namespace of the username associated with the integration
test `API_KEY` and `API_SECRET`.

Technically, it's hardcoded to be disabled right now so hot reload
testing doesn't spam ingestion, but it works, e.g. see this repository
which was ingested with the new code in `db-splitgraph.test.ts`:

https://www.splitgraph.com/miles/madatdata-test-github-ingestion/latest/-/tables
…r its plugin directory

Actually auto-generate the class for the plugin (as a script that
calls a factory function to create the class) in the `plugin.ts`
file within each plugin directory.
Same as recent change that added it to `ImportPlugin<PluginName>`
…gin` base class

Adopt the same pattern used for import plugin inheritance, so that
we can create a plugin for exporting to Seafowl (as opposed to exporting
to a parquet/csv file, like the current plugin), while re-using most of
the code, particularly regarding waiting for jobs.
@milesrichardson milesrichardson force-pushed the generated-import-plugins branch from 1ec77f3 to fcdc15b Compare May 17, 2023 02:24
…o Seafowl

We already supported "manually" exporting from Splitgraph to Seafowl
by first exporting a `.parquet` file via the existing `ExportQueryPlugin`,
and then importing it into Seafowl with `SeafowlImportFilePlugin`.

However, the Splitgraph API supports the more robust use case of
exporting one or multiple queries, tables and VDBs from Splitgraph
to Seafowl, via the GraphQL mutation `exportToSeafowl` which is
available with an authenticated request to the GraphQL API.

This commit adds a plugin to call that API with one or multiple queries,
tables or VDBs (with the types being fairly raw and maybe a bit cumbersome,
albeit still equipped with full autocomplete and sufficient TypeDoc annotations),
and to wait for the tasks to resolve before returning an object containing
the result, including a list of passed jobs, and if applicable, a list of
any failed jobs (since each export target creates a separate job, we need
to wait for them separately, and each can fail or pass independently of the others).

The result type is a bit of a mess, but it doesn't actually matter much
since you don't really need to do anything with it, assuming error
is not `null`, because that means the export was successful and you can
proceed by simply querying the resultant data in Seafowl.
…f-hosted Seafowl instance

Keep this separate from the existing Seafowl integration tests, and
add new environment variables `VITE_TEST_SEAFOWL_EXPORT_DEST_{URL,DBNAME,SECRET}`
for the "export target" Seafowl instance to use in Seafowl integration tests. If
all of them are set, then the integration test will execute.

Write a simple test that exporting a single query works as expected. Note
this doesn't actually query the resulting data in the Seafowl instance; it's
just a start of a "good enough" integration test for this functionality. More
notably, it also doesn't test exporting tables, or vdbs, or exporting multiple
queries/vdbs/tables, or failure modes; future robust testing should include
mocked responses for unit testing that covers the "wait for all tasks" logic
which is most likely to have one (or a few) bugs lurking in it.
Use `splitgraph-` prefix where applicable, and be more accurate
about export destination, e.g. `SplitgraphExportQueryToFilePlugin`
instead of `ExportQueryPlugin`
@milesrichardson milesrichardson changed the title Draft: Import data from any of the data sources with a Splitgraph plugin, auto-generated from the GraphQL API Draft: Add auto-generated plugins to import data to Splitgraph from all supported data sources (Airbyte, Singer Taps, etc), and add export plugin to export from Splitgraph to Seafowl via Splitgraph GraphQL API May 17, 2023
Allow starting a task, getting a taskId, and not waiting for it,
so that later can check whether it's completed (point of this is
so a Vercel server function can do the checking and the polling
can be triggered from the client).
When `defer: true`, the returned `taskIds` value will include a
dictionary of taskIds as returned by the GraphQL API with `{ tables, queries, vdbs }`,
and the consumer can take each of these individual IDs and check them individually
(the idea being that a UI might render e.g. each table separately and then check
for their status independently). There's not really a need for a bulk check
since it would ultimately amount to the same requests anyway, just a question
of whether the vercel backend or the client is doing the batching.
@milesrichardson
Copy link
Contributor Author

Some stuff in this PR, mainly around refactoring and adding features to import and export plugins:

There is now an import plugin for all 100+ data sources supported by Splitgraph (Airbyte connectors, Singer taps, etc.)

Plugins can be "installed" by including them in the list of plugins passed to the constructor of the database (e.g. new SeafowlDb({plugins: [...]}) or new SplitgraphDb({plugins: [...]}), but usually called indirectly through helper functions like makeSplitgraphHTTPContext):

const { db } = makeSplitgraphHTTPContext({
    plugins: [
      new AirbyteGithubImportPlugin(),
    ],
    authenticatedCredential: {
      apiKey: credential.apiKey,
      apiSecret: credential.apiSecret,
      anonymous: false,
    },
  });

Each plugin has a name, in this case airbyte-github, which is then specified as the first parameter to db.exportData or db.importData or db.pollDeferredTask (see below). The rest of the parameters to that function are defined by the function implementation of the plugin and are typechecked/auto-completed accordingly.

For the auto-generated plugins, credentials and params are auto-completed and typechecked via auto-generated types which are created from the JSONSchema returned by the Splitgraph API.

For example, import a GitHub repository into a Splitgraph repository using the airbyte-github plugin:

// (helper function that adds `new AirbyteGithubImportPlugin()` to the list of plugins)
const db = createRealDb();

// You need to be authenticated to Splitgraph to use this plugin
// Get the token, to get your username, so that you can import to a new repository in your namespace
const { username: namespace } = await fetchToken(db);

await db.importData(
  "airbyte-github",
  {
    credentials: {
      credentials: {
        personal_access_token: GITHUB_PAT_SECRET,
      },
    },
    params: {
      repository: "splitgraph/seafowl",
      start_date: "2021-06-01T00:00:00Z",
    },
  },
  {
    namespace: namespace,
    repository: "madatdata-test-github-ingestion",
    tables: [
      {
        name: "stargazers",
        options: {
          airbyte_cursor_field: ["starred_at"],
          airbyte_primary_key_field: [],
        },
        schema: [],
      },
    ],
  }
);

If the repository does not exist yet, then it will be created. Otherwise, a new image will be created with the imported data. This uses the StartExternalRepositoryLoad mutation from the Splitgraph GraphQL API.

Splitgraph import and export plugins can be "deferred" with { defer: true } parameter

By default, importData an exportData will create the import/export jobs via the Splitgraph API, and then wait for them to complete by polling the job status. However, this might not always be desirable. For example, perhaps you are submitting the job from the server side because it requires a secret (like your Splitgraph API_SECRET, or your credential for whatever upstream data source you're importing, like a personal access token for airbyte-github). In this case, you don't want the server side request to block until the ingestion job is complete, because it would likely timeout or hit limits of your Edge Function ™ vendor.

Now you can pass { defer: true } to the importData and exportData functions, and they will return immediately with a serialized task description containing a taskId (or multiple taskIds in the case of an export of table(s)/quer(y)(ies)/vdb(s) to Seafowl). Then, you can call pollDeferredTask with a parameter for taskId to check whether the job has completed yet. The idea is that you would have one Edge Function ™ to submit the ingestion job, and another Edge Function ™ to check whether a taskId has completed yet (effectively acting as a relatively dumb proxy to the Splitgraph API). Then the client side doesn't need any secrets, and it can poll the edge function with reasonable granularity.

Note: For Seafowl, we still have a SeafowlImportFilePlugin which "imports" a file to Seafowl by uploading it to the multi-part upload endpoint, but this plugin is not deferrable because it actually does the work of uploading the file directly to Seafowl. If you want a deferred export to Seafowl, you probably want to go through Splitgraph first (by uploading the file to Splitgraph, and then using Splitgraph to export to Seafowl; since the actual export happens on the Splitgraph backend, it's deferrable).

Example: Defer export of query result from Splitgraph to Seafowl

Execute a query on Splitgraph and export its result to a table in a Seafowl DB. This uses the Splitgraph API and can also export multiple tables, vdbs, or queries at once. A separate job with its own taskId is created and returned for each.

(Notice the { defer: true } in the last parameter)

// Export the result of this query from Splitgraph to a Seafowl instance
const queryToExport = `SELECT a as int_val, string_agg(random()::text, '') as text_val
FROM generate_series(1, 5) a, generate_series(1, 50) b
GROUP BY a ORDER BY a;`;

const res = await db.exportData(
  "export-to-seafowl",
  {
    queries: [
      {
        source: {
          query: queryToExport,
        },
        destination: {
          schema: "madatdata_testing",
          table: destTable,
        },
      },
    ],
  },
  {
    seafowlInstance: {
      selfHosted: {
        url: SEAFOWL_DEST_URL,
        dbname: SEAFOWL_DEST_DBNAME,
        secret: SEAFOWL_DEST_SECRET,
      },
    },
  },
  { defer: true }
);

Then, check if it's completed yet (in this example it hasn't) (mind the messy types):

const startedTask = await db.pollDeferredTask("export-to-seafowl", {
  taskId: taskId as string,
});

expect(startedTask.completed).toBe(false);

Example: Defer export of Splitgraph query to Parquet file

Execute a query on Splitgraph and export its result to a Parquet file at a publicly accessible, signed temporary URL that will expire in 24 hours:

const {
  taskId,
  response,
} = await db.exportData(
  "export-query-to-file",
  {
    query: `SELECT a as int_val, string_agg(random()::text, '') as text_val
FROM generate_series(1, 5) a, generate_series(1, 50) b
GROUP BY a ORDER BY a;`,
    vdbId: "ddn",
  },
  {
    format: "parquet",
    filename: "random-series",
  },
  { defer: true }
);

Check whether the export is complete:

const startedTask = await db.pollDeferredTask("export-query-to-file", {
  taskId: taskId as string,
});

expect(startedTask.completed).toBe(false);

Once it's completed, the URL of the exported file will be available (just like it would be available in the return value of exportData when called without { defer: true }):

const shouldBeCompletedTask = await db.pollDeferredTask(
  "export-query-to-file",
  { taskId: response.taskId }
);

const parquetFileURL = shouldBeCompletedTask.response?.output.url;

Example: Defer import of uploaded CSV to table on Splitgraph DDN

Upload a CSV to object storage (using a pre-signed upload URL from Splitgraph), then provide that URL to the Splitgraph API to "import" the CSV as a table on the Splitgraph DDN

(Each new export creates an image. You can see the results of the integration testing in my public repo at https://splitgraph.com/miles/dunno )

const { response, info, taskId } = await db.importData(
  "csv",
  { data: Buffer.from(`name;candies\r\nBob;5\r\nAlice;10`) },
  {
    tableName: `irrelevant-${randSuffix()}`,
    namespace,
    repository: "dunno",
    tableParams: {
      delimiter: ";",
    },
  },
  { defer: true }
);

Check whether it's complete - in this case you also need to provide a namespace and repository, but those parameters are typechecked (again, ignore the as string 👀):

const startedTask = await db.pollDeferredTask("csv", {
  taskId: taskId as string,
  namespace,
  repository: "dunno",
});

expect(startedTask.completed).toBe(false); // we just started it, so it's not completed yet :)

… quickly

The test defers an export task, and then checks its status, which
it expects to still be pending. Previously, a query exporting 5 rows
to parquet was completing quickly enough that the test would sometimes
fail because it expected the task to still be pending but it was complete.
Export a query of 10,000 random rows instead, to make it take a bit longer...
…s create a single job

Instead of receiving a task ID for each exported table from the request,
we now receive a task ID representing the job of exporting all the tables.
If a function is passed as the query parameter, instead of a string,
then call it to create the query.

If an abort signal is passed, call it in the cleanup function of
the hook. If it's not passed, then create one by default and call that.

This solves an issue where first render could be empty if the query
wasn't ready yet (e.g. some interpolated value like table name came
from the URL which hasn't been parsed), and the abort signal implements
the correct behavior for unpredictable mounting sequences (you're not
supposed to rely on `useEffect` only executing once).
… that shouldn't be released in next version)
milesrichardson added a commit to splitgraph/demo-github-analytics that referenced this pull request Aug 2, 2023
Migrate pull request from: splitgraph/madatdata#21

into its own repo, using `git-filter-repo` to include only commits from
subdirectory `examples/nextjs-import-airbyte-github-export-seafowl/`

ref: https://github.com/newren/git-filter-repo

This commit adds the Yarn files necessary for running the example in
an isolated repo (as opposed to as one of multiple examples in a shared
multi-workspace `examples`), points the dependencies to `canary` versions
(which reflect versions in splitgraph/madatdata#20),
and also updates the readme with information for running in local development.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant