Skip to content

Commit

Permalink
feat: multithreaded bulk import (#1077) (#1079)
Browse files Browse the repository at this point in the history
* feat: Add BulkImport APIs and cron

* chore: update pull request template

* fix: Use the correct tenant config to create the proxy storage

* fix: PR changes

* fix: PR changes

* fix: PR changes

* fix: PR changes

* fix: PR changes

* fix: PR changes

* fix: Update version

* fix: PR changes

* fix: PR changes

* fix: Rename DeleteBulkImportUser API path

* fix: disable bulk import for in-memory db

* fix: a bug with createTotpDevices

* fix: PR changes

* feat: Add an api to import user in sync

* feat: Add an api to get count of bulk import users

* fix: PR changes

* fix: Add error codes and plainTextPassword import

* fix: PR changes

* feat: multithreaded bulk import

* fix: changelog update

* fix: add new test

* fix: fixing unreliable mutithreaded bulk import with mysql

* fix: review fixes

* fix: fixing failing tests

* feat: bulkimport flow tests

* feat: bulk import cron starter api

* fix: tweaking params for faster import

* fix: tests

* checkpoint

* fix: remove vacuuming

* fix: minor tweaks

* feat: bulk inserting the bulk migration data

* fix: fast as a lightning

* fix: restoring lost method

* fix: reworked error handling to comform previous approach with messages

* fix: fixing tests

* fix: fixing failing tests, changing version

* chore: update changelog

* fix: fixing issues and failing tests

* fix: review changes

* fix: review fixes, reworking cron start/stop

---------

Co-authored-by: Tamas Soltesz <[email protected]>
Co-authored-by: Ankit Tiwari <[email protected]>
  • Loading branch information
3 people authored Dec 19, 2024
1 parent 038b5c8 commit b6ee5c5
Show file tree
Hide file tree
Showing 51 changed files with 7,747 additions and 104 deletions.
1 change: 1 addition & 0 deletions .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ highlighting the necessary changes)
- If no such branch exists, then create one from the latest released branch.
- [ ] If added a foreign key constraint on `app_id_to_user_id` table, make sure to delete from this table when deleting
the user as well if `deleteUserIdMappingToo` is false.
- [ ] If added a new recipe, then make sure to update the bulk import API to include the new recipe.

## Remaining TODOs for this PR

Expand Down
40 changes: 40 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,46 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

## [9.4.0]

### Added
- Adds property `bulk_migration_parallelism` for fine-tuning the worker threads number
- Adds APIs to bulk import users
- GET `/bulk-import/users`
- POST `/bulk-import/users`
- GET `/bulk-import/users/count`
- POST `/bulk-import/users/remove`
- POST `/bulk-import/users/import`
- POST `/bulk-import/backgroundjob`
- GET `/bulk-import/backgroundjob`
- Adds `ProcessBulkImportUsers` cron job to process bulk import users
- Adds multithreaded worker support for the `ProcessBulkImportUsers` cron job for faster bulk imports
- Adds support for lazy importing users

### Migrations

```sql
"CREATE TABLE IF NOT EXISTS bulk_import_users (
id CHAR(36),
app_id VARCHAR(64) NOT NULL DEFAULT 'public',
primary_user_id VARCHAR(36),
raw_data TEXT NOT NULL,
status VARCHAR(128) DEFAULT 'NEW',
error_msg TEXT,
created_at BIGINT NOT NULL,
updated_at BIGINT NOT NULL,
CONSTRAINT bulk_import_users_pkey PRIMARY KEY(app_id, id),
CONSTRAINT bulk_import_users__app_id_fkey FOREIGN KEY(app_id) REFERENCES apps(app_id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS bulk_import_users_status_updated_at_index ON bulk_import_users (app_id, status, updated_at);
CREATE INDEX IF NOT EXISTS bulk_import_users_pagination_index1 ON bulk_import_users (app_id, status, created_at DESC,
id DESC);
CREATE INDEX IF NOT EXISTS bulk_import_users_pagination_index2 ON bulk_import_users (app_id, created_at DESC, id DESC);
```
## [9.3.1]
- Includes exception class name in 500 error message
Expand Down
3 changes: 1 addition & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ compileTestJava { options.encoding = "UTF-8" }
// }
//}

version = "9.3.1"

version = "9.4.0"

repositories {
mavenCentral()
Expand Down
4 changes: 4 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,7 @@ core_config_version: 0

# (Optional | Default: null) string value. The encryption key used for saving OAuth client secret on the database.
# oauth_client_secret_encryption_key:

# (DIFFERENT_ACROSS_APPS | OPTIONAL | Default: number of available processor cores) int value. If specified,
# the supertokens core will use the specified number of threads to complete the migration of users.
# bulk_migration_parallelism:
5 changes: 5 additions & 0 deletions devConfig.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,8 @@ disable_telemetry: true

# (Optional | Default: null) string value. The encryption key used for saving OAuth client secret on the database.
# oauth_client_secret_encryption_key:

# (DIFFERENT_ACROSS_APPS | OPTIONAL | Default: number of available processor cores) int value. If specified,
# the supertokens core will use the specified number of threads to complete the migration of users.
# bulk_migration_parallelism:

6 changes: 6 additions & 0 deletions src/main/java/io/supertokens/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.supertokens.config.Config;
import io.supertokens.config.CoreConfig;
import io.supertokens.cronjobs.Cronjobs;
import io.supertokens.cronjobs.bulkimport.ProcessBulkImportUsers;
import io.supertokens.cronjobs.cleanupOAuthSessionsAndChallenges.CleanupOAuthSessionsAndChallenges;
import io.supertokens.cronjobs.deleteExpiredAccessTokenSigningKeys.DeleteExpiredAccessTokenSigningKeys;
import io.supertokens.cronjobs.deleteExpiredDashboardSessions.DeleteExpiredDashboardSessions;
Expand Down Expand Up @@ -61,6 +62,8 @@ public class Main {

// this is a special variable that will be set to true by TestingProcessManager
public static boolean isTesting = false;
// this flag is used in ProcessBulkImportUsersCronJobTest to skip the user validation
public static boolean isTesting_skipBulkImportUserValidationInCronJob = false;

// this is a special variable that will be set to true by TestingProcessManager
public static boolean makeConsolePrintSilent = false;
Expand Down Expand Up @@ -257,6 +260,9 @@ private void init() throws IOException, StorageQueryException {
// starts DeleteExpiredAccessTokenSigningKeys cronjob if the access token signing keys can change
Cronjobs.addCronjob(this, DeleteExpiredAccessTokenSigningKeys.init(this, uniqueUserPoolIdsTenants));

// initializes ProcessBulkImportUsers cronjob to process bulk import users
Cronjobs.addCronjob(this, ProcessBulkImportUsers.init(this, uniqueUserPoolIdsTenants));

Cronjobs.addCronjob(this, CleanupOAuthSessionsAndChallenges.init(this, uniqueUserPoolIdsTenants));

// this is to ensure tenantInfos are in sync for the new cron job as well
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved.
*
* This software is licensed under the Apache License, Version 2.0 (the
* "License") as published by the Apache Software Foundation.
*
* You may not use this file except in compliance with the License. You may
* obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package io.supertokens;

import io.supertokens.pluginInterface.Storage;
import io.supertokens.pluginInterface.useridmapping.UserIdMapping;

public class StorageAndUserIdMappingForBulkImport extends StorageAndUserIdMapping {

public String userIdInQuestion;

public StorageAndUserIdMappingForBulkImport(Storage storage,
UserIdMapping userIdMapping, String userIdInQuestion) {
super(storage, userIdMapping);
this.userIdInQuestion = userIdInQuestion;
}
}
Loading

0 comments on commit b6ee5c5

Please sign in to comment.