From 0d707359b97c6004c7572b8a9bbb9c9c81533291 Mon Sep 17 00:00:00 2001 From: tamassoltesz Date: Fri, 20 Sep 2024 16:28:31 +0200 Subject: [PATCH] feat: multithreaded bulk import --- config.yaml | 4 + devConfig.yaml | 4 + .../io/supertokens/config/CoreConfig.java | 12 + .../bulkimport/ProcessBulkImportUsers.java | 297 +++------------- .../ProcessBulkUsersImportWorker.java | 321 ++++++++++++++++++ .../multitenancy/Multitenancy.java | 2 +- .../test/bulkimport/BulkImportTestUtils.java | 15 +- .../ProcessBulkImportUsersCronJobTest.java | 185 +++++++++- 8 files changed, 585 insertions(+), 255 deletions(-) create mode 100644 src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkUsersImportWorker.java diff --git a/config.yaml b/config.yaml index fdb96d4ba..1207c544e 100644 --- a/config.yaml +++ b/config.yaml @@ -151,3 +151,7 @@ core_config_version: 0 # (OPTIONAL | Default: null) string value. If specified, the supertokens service will only load the specified CUD even # if there are more CUDs in the database and block all other CUDs from being used from this instance. # supertokens_saas_load_only_cud: + +# (DIFFERENT_ACROSS_TENANTS | OPTIONAL | Default: 1) int value. If specified, the supertokens core will use the +# specified number of threads to complete the migration of users. +# bulk_migration_parallelism: diff --git a/devConfig.yaml b/devConfig.yaml index 276b35d42..ee901b068 100644 --- a/devConfig.yaml +++ b/devConfig.yaml @@ -151,3 +151,7 @@ disable_telemetry: true # (OPTIONAL | Default: null) string value. If specified, the supertokens service will only load the specified CUD even # if there are more CUDs in the database and block all other CUDs from being used from this instance. # supertokens_saas_load_only_cud: + +# (DIFFERENT_ACROSS_TENANTS | OPTIONAL | Default: 1) int value. If specified, the supertokens core will use the +# specified number of threads to complete the migration of users. +# bulk_migration_parallelism: diff --git a/src/main/java/io/supertokens/config/CoreConfig.java b/src/main/java/io/supertokens/config/CoreConfig.java index 3de06caa7..ce38880a5 100644 --- a/src/main/java/io/supertokens/config/CoreConfig.java +++ b/src/main/java/io/supertokens/config/CoreConfig.java @@ -209,6 +209,10 @@ public class CoreConfig { @IgnoreForAnnotationCheck private boolean isNormalizedAndValid = false; + @NotConflictingInApp + @JsonProperty + private int bulk_migration_parallelism = 1; + public static Set getValidFields() { CoreConfig coreConfig = new CoreConfig(); JsonObject coreConfigObj = new GsonBuilder().serializeNulls().create().toJsonTree(coreConfig).getAsJsonObject(); @@ -398,6 +402,10 @@ public boolean getHttpsEnabled() { return webserver_https_enabled; } + public int getBulkMigrationParallelism() { + return bulk_migration_parallelism; + } + private String getConfigFileLocation(Main main) { return new File(CLIOptions.get(main).getConfigFilePath() == null ? CLIOptions.get(main).getInstallationPath() + "config.yaml" @@ -590,6 +598,10 @@ void normalizeAndValidate(Main main, boolean includeConfigFilePath) throws Inval } } + if (bulk_migration_parallelism < 1) { + throw new InvalidConfigException("Provided bulk_migration_parallelism must be >= 1"); + } + // Normalize if (ip_allow_regex != null) { ip_allow_regex = ip_allow_regex.trim(); diff --git a/src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkImportUsers.java b/src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkImportUsers.java index a4b235da0..177ad3da2 100644 --- a/src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkImportUsers.java +++ b/src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkImportUsers.java @@ -16,48 +16,37 @@ package io.supertokens.cronjobs.bulkimport; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import com.google.gson.JsonObject; - import io.supertokens.Main; -import io.supertokens.ResourceDistributor; import io.supertokens.bulkimport.BulkImport; import io.supertokens.bulkimport.BulkImportUserUtils; -import io.supertokens.bulkimport.exceptions.InvalidBulkImportDataException; import io.supertokens.config.Config; import io.supertokens.cronjobs.CronTask; import io.supertokens.cronjobs.CronTaskTest; -import io.supertokens.multitenancy.Multitenancy; -import io.supertokens.output.Logging; import io.supertokens.pluginInterface.STORAGE_TYPE; -import io.supertokens.pluginInterface.Storage; import io.supertokens.pluginInterface.StorageUtils; -import io.supertokens.pluginInterface.authRecipe.AuthRecipeUserInfo; -import io.supertokens.pluginInterface.authRecipe.sqlStorage.AuthRecipeSQLStorage; import io.supertokens.pluginInterface.bulkimport.BulkImportUser; -import io.supertokens.pluginInterface.bulkimport.BulkImportStorage.BULK_IMPORT_USER_STATUS; -import io.supertokens.pluginInterface.bulkimport.BulkImportUser.LoginMethod; import io.supertokens.pluginInterface.bulkimport.sqlStorage.BulkImportSQLStorage; -import io.supertokens.pluginInterface.exceptions.DbInitException; -import io.supertokens.pluginInterface.exceptions.InvalidConfigException; import io.supertokens.pluginInterface.exceptions.StorageQueryException; -import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException; import io.supertokens.pluginInterface.multitenancy.AppIdentifier; -import io.supertokens.pluginInterface.multitenancy.TenantConfig; import io.supertokens.pluginInterface.multitenancy.TenantIdentifier; import io.supertokens.pluginInterface.multitenancy.exceptions.TenantOrAppNotFoundException; -import io.supertokens.pluginInterface.sqlStorage.SQLStorage; import io.supertokens.storageLayer.StorageLayer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.Stream; + public class ProcessBulkImportUsers extends CronTask { public static final String RESOURCE_KEY = "io.supertokens.ee.cronjobs.ProcessBulkImportUsers"; - private Map userPoolToStorageMap = new HashMap<>(); + private ProcessBulkImportUsers(Main main, List> tenantsInfo) { super("ProcessBulkImportUsers", main, tenantsInfo, true); @@ -71,7 +60,7 @@ public static ProcessBulkImportUsers init(Main main, List @Override protected void doTaskPerApp(AppIdentifier app) - throws TenantOrAppNotFoundException, StorageQueryException, IOException, DbInitException { + throws TenantOrAppNotFoundException, StorageQueryException { if (StorageLayer.getBaseStorage(main).getType() != STORAGE_TYPE.SQL || StorageLayer.isInMemDb(main)) { return; @@ -86,8 +75,29 @@ protected void doTaskPerApp(AppIdentifier app) String[] allUserRoles = StorageUtils.getUserRolesStorage(bulkImportSQLStorage).getRoles(app); BulkImportUserUtils bulkImportUserUtils = new BulkImportUserUtils(allUserRoles); - for (BulkImportUser user : users) { - processUser(app, user, bulkImportUserUtils, bulkImportSQLStorage); + //split the loaded users list into smaller chunks + int NUMBER_OF_BATCHES = Config.getConfig(app.getAsPublicTenantIdentifier(), main).getBulkMigrationParallelism(); + List> loadedUsersChunks = makeChunksOf(users, NUMBER_OF_BATCHES); + + //pass the chunks for processing for the workers + ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_BATCHES);; + try { + List> tasks = new ArrayList<>(); + for (List userListChunk : loadedUsersChunks) { + tasks.add(executorService.submit( + new ProcessBulkUsersImportWorker(main, app, userListChunk, bulkImportSQLStorage, + bulkImportUserUtils))); + } + + for (Future task : tasks) { + task.get(); //to know if there were any errors while executing and for waiting in this thread for all the other threads to finish up + } + + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + finally { + executorService.shutdown(); } } @@ -113,230 +123,19 @@ public int getInitialWaitTimeSeconds() { return 0; } - private synchronized Storage getBulkImportProxyStorage(TenantIdentifier tenantIdentifier) - throws InvalidConfigException, IOException, TenantOrAppNotFoundException, DbInitException { - String userPoolId = StorageLayer.getStorage(tenantIdentifier, main).getUserPoolId(); - if (userPoolToStorageMap.containsKey(userPoolId)) { - return userPoolToStorageMap.get(userPoolId); - } - - TenantConfig[] allTenants = Multitenancy.getAllTenants(main); - - Map normalisedConfigs = Config.getNormalisedConfigsForAllTenants( - allTenants, - Config.getBaseConfigAsJsonObject(main)); - - for (ResourceDistributor.KeyClass key : normalisedConfigs.keySet()) { - if (key.getTenantIdentifier().equals(tenantIdentifier)) { - SQLStorage bulkImportProxyStorage = (SQLStorage) StorageLayer.getNewBulkImportProxyStorageInstance(main, - normalisedConfigs.get(key), tenantIdentifier, true); - - userPoolToStorageMap.put(userPoolId, bulkImportProxyStorage); - bulkImportProxyStorage.initStorage(false, new ArrayList<>()); - return bulkImportProxyStorage; - } - } - throw new TenantOrAppNotFoundException(tenantIdentifier); - } - - private Storage[] getAllProxyStoragesForApp(Main main, AppIdentifier appIdentifier) - throws StorageTransactionLogicException { - - try { - List allProxyStorages = new ArrayList<>(); - TenantConfig[] tenantConfigs = Multitenancy.getAllTenantsForApp(appIdentifier, main); - for (TenantConfig tenantConfig : tenantConfigs) { - allProxyStorages.add(getBulkImportProxyStorage(tenantConfig.tenantIdentifier)); - } - return allProxyStorages.toArray(new Storage[0]); - } catch (TenantOrAppNotFoundException e) { - throw new StorageTransactionLogicException(new Exception("E043: " + e.getMessage())); - } catch (InvalidConfigException e) { - throw new StorageTransactionLogicException(new InvalidConfigException("E044: " + e.getMessage())); - } catch (DbInitException e) { - throw new StorageTransactionLogicException(new DbInitException("E045: " + e.getMessage())); - } catch (IOException e) { - throw new StorageTransactionLogicException(new IOException("E046: " + e.getMessage())); + private List> makeChunksOf(List users, int numberOfChunks) { + List> chunks = new ArrayList<>(); + if (users != null && !users.isEmpty() && numberOfChunks > 0) { + AtomicInteger index = new AtomicInteger(0); + int chunkSize = users.size() / numberOfChunks + 1; + Stream> listStream = users.stream() + .collect(Collectors.groupingBy(x -> index.getAndIncrement() / chunkSize)) + .entrySet().stream() + .sorted(Map.Entry.comparingByKey()).map(Map.Entry::getValue); + + listStream.forEach(chunks::add); } + return chunks; } - private void closeAllProxyStorages() throws StorageQueryException { - for (SQLStorage storage : userPoolToStorageMap.values()) { - storage.closeConnectionForBulkImportProxyStorage(); - } - userPoolToStorageMap.clear(); - } - - private void processUser(AppIdentifier appIdentifier, BulkImportUser user, BulkImportUserUtils bulkImportUserUtils, - BulkImportSQLStorage baseTenantStorage) - throws TenantOrAppNotFoundException, StorageQueryException, IOException, - DbInitException { - - try { - if (Main.isTesting && Main.isTesting_skipBulkImportUserValidationInCronJob) { - // Skip validation when the flag is enabled during testing - } else { - // Validate the user - bulkImportUserUtils.createBulkImportUserFromJSON(main, appIdentifier, user.toJsonObject(), user.id); - } - - // Since all the tenants of a user must share the storage, we will just use the - // storage of the first tenantId of the first loginMethod - - TenantIdentifier firstTenantIdentifier = new TenantIdentifier(appIdentifier.getConnectionUriDomain(), - appIdentifier.getAppId(), user.loginMethods.get(0).tenantIds.get(0)); - - SQLStorage bulkImportProxyStorage = (SQLStorage) getBulkImportProxyStorage(firstTenantIdentifier); - - LoginMethod primaryLM = BulkImport.getPrimaryLoginMethod(user); - - AuthRecipeSQLStorage authRecipeSQLStorage = (AuthRecipeSQLStorage) getBulkImportProxyStorage( - firstTenantIdentifier); - - /* - * We use two separate storage instances: one for importing the user and another for managing bulk_import_users entries. - * This is necessary because the bulk_import_users entries are always in the public tenant storage, - * but the actual user data could be in a different storage. - * - * If transactions are committed individually, in this order: - * 1. Commit the transaction that imports the user. - * 2. Commit the transaction that deletes the corresponding bulk import entry. - * - * There's a risk where the first commit succeeds, but the second fails. This creates a situation where - * the bulk import entry is re-processed, even though the user has already been imported into the database. - * - * To resolve this, we added a `primaryUserId` field to the `bulk_import_users` table. - * The processing logic now follows these steps: - * - * 1. Import the user and get the `primaryUserId` (transaction uncommitted). - * 2. Update the `primaryUserId` in the corresponding bulk import entry. - * 3. Commit the import transaction from step 1. - * 4. Delete the bulk import entry. - * - * If step 2 or any earlier step fails, nothing is committed, preventing partial state. - * If step 3 fails, the `primaryUserId` in the bulk import entry is updated, but the user doesn't exist in the database—this results in re-processing on the - * next run. - * If step 4 fails, the user exists but the bulk import entry remains; this will be handled by deleting it in the next run. - * - * The following code implements this logic. - */ - if (user.primaryUserId != null) { - AuthRecipeUserInfo importedUser = authRecipeSQLStorage.getPrimaryUserById(appIdentifier, - user.primaryUserId); - - if (importedUser != null && isProcessedUserFromSameBulkImportUserEntry(importedUser, user)) { - baseTenantStorage.deleteBulkImportUsers(appIdentifier, new String[] { user.id }); - return; - } - } - - bulkImportProxyStorage.startTransaction(con -> { - try { - Storage[] allStoragesForApp = getAllProxyStoragesForApp(main, appIdentifier); - BulkImport.processUserImportSteps(main, con, appIdentifier, bulkImportProxyStorage, user, primaryLM, allStoragesForApp); - - // We are updating the primaryUserId in the bulkImportUser entry. This will help us handle the inconsistent transaction commit. - // If this update statement fails then the outer transaction will fail as well and the user will simpl be processed again. No inconsistency will happen in this - // case. - baseTenantStorage.updateBulkImportUserPrimaryUserId(appIdentifier, user.id, - primaryLM.superTokensUserId); - - // We need to commit the transaction manually because we have overridden that in the proxy storage - // If this fails, the primaryUserId will be updated in the bulkImportUser but it wouldn’t actually exist. - // When processing the user again, we'll check if primaryUserId exists with the same email. In this case the user won't exist, and we'll simply re-process it. - bulkImportProxyStorage.commitTransactionForBulkImportProxyStorage(); - - // NOTE: We need to use the baseTenantStorage as bulkImportProxyStorage could have a different storage than the baseTenantStorage - // If this fails, the primaryUserId will be updated in the bulkImportUser and it would exist in the database. - // When processing the user again, we'll check if primaryUserId exists with the same email. In this case the user will exist, and we'll simply delete the entry. - baseTenantStorage.deleteBulkImportUsers(appIdentifier, new String[] { user.id }); - return null; - } catch (StorageTransactionLogicException e) { - // We need to rollback the transaction manually because we have overridden that in the proxy storage - bulkImportProxyStorage.rollbackTransactionForBulkImportProxyStorage(); - throw e; - } finally { - closeAllProxyStorages(); - } - }); - } catch (StorageTransactionLogicException | InvalidBulkImportDataException | InvalidConfigException e) { - handleProcessUserExceptions(appIdentifier, user, e, baseTenantStorage); - } - } - - private void handleProcessUserExceptions(AppIdentifier appIdentifier, BulkImportUser user, Exception e, - BulkImportSQLStorage baseTenantStorage) - throws StorageQueryException { - // Java doesn't allow us to reassign local variables inside a lambda expression - // so we have to use an array. - String[] errorMessage = { e.getMessage() }; - - if (e instanceof StorageTransactionLogicException) { - StorageTransactionLogicException exception = (StorageTransactionLogicException) e; - // If the exception is due to a StorageQueryException, we want to retry the entry after sometime instead - // of marking it as FAILED. We will return early in that case. - if (exception.actualException instanceof StorageQueryException) { - Logging.error(main, null, "We got an StorageQueryException while processing a bulk import user entry. It will be retried again. Error Message: " + e.getMessage(), true); - return; - } - errorMessage[0] = exception.actualException.getMessage(); - } else if (e instanceof InvalidBulkImportDataException) { - errorMessage[0] = ((InvalidBulkImportDataException) e).errors.toString(); - } else if (e instanceof InvalidConfigException) { - errorMessage[0] = e.getMessage(); - } - - try { - baseTenantStorage.startTransaction(con -> { - baseTenantStorage.updateBulkImportUserStatus_Transaction(appIdentifier, con, user.id, - BULK_IMPORT_USER_STATUS.FAILED, errorMessage[0]); - return null; - }); - } catch (StorageTransactionLogicException e1) { - throw new StorageQueryException(e1.actualException); - } - } - - // Checks if the importedUser was processed from the same bulkImportUser entry. - private boolean isProcessedUserFromSameBulkImportUserEntry( - AuthRecipeUserInfo importedUser, BulkImportUser bulkImportEntry) { - if (bulkImportEntry == null || importedUser == null || bulkImportEntry.loginMethods == null || - importedUser.loginMethods == null) { - return false; - } - - for (LoginMethod lm1 : bulkImportEntry.loginMethods) { - for (io.supertokens.pluginInterface.authRecipe.LoginMethod lm2 : importedUser.loginMethods) { - if (lm2.recipeId.toString().equals(lm1.recipeId)) { - if (lm1.email != null && !lm1.email.equals(lm2.email)) { - return false; - } - - switch (lm1.recipeId) { - case "emailpassword": - if (lm1.passwordHash != null && !lm1.passwordHash.equals(lm2.passwordHash)) { - return false; - } - break; - case "thirdparty": - if ((lm1.thirdPartyId != null && !lm1.thirdPartyId.equals(lm2.thirdParty.id)) - || (lm1.thirdPartyUserId != null - && !lm1.thirdPartyUserId.equals(lm2.thirdParty.userId))) { - return false; - } - break; - case "passwordless": - if (lm1.phoneNumber != null && !lm1.phoneNumber.equals(lm2.phoneNumber)) { - return false; - } - break; - default: - return false; - } - } - } - } - - return true; - } } diff --git a/src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkUsersImportWorker.java b/src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkUsersImportWorker.java new file mode 100644 index 000000000..8122553d8 --- /dev/null +++ b/src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkUsersImportWorker.java @@ -0,0 +1,321 @@ +/* + * Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved. + * + * This software is licensed under the Apache License, Version 2.0 (the + * "License") as published by the Apache Software Foundation. + * + * You may not use this file except in compliance with the License. You may + * obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.supertokens.cronjobs.bulkimport; + +import com.google.gson.JsonObject; +import io.supertokens.Main; +import io.supertokens.ResourceDistributor; +import io.supertokens.bulkimport.BulkImport; +import io.supertokens.bulkimport.BulkImportUserUtils; +import io.supertokens.bulkimport.exceptions.InvalidBulkImportDataException; +import io.supertokens.config.Config; +import io.supertokens.multitenancy.Multitenancy; +import io.supertokens.output.Logging; +import io.supertokens.pluginInterface.Storage; +import io.supertokens.pluginInterface.authRecipe.AuthRecipeUserInfo; +import io.supertokens.pluginInterface.authRecipe.sqlStorage.AuthRecipeSQLStorage; +import io.supertokens.pluginInterface.bulkimport.BulkImportStorage; +import io.supertokens.pluginInterface.bulkimport.BulkImportUser; +import io.supertokens.pluginInterface.bulkimport.sqlStorage.BulkImportSQLStorage; +import io.supertokens.pluginInterface.exceptions.DbInitException; +import io.supertokens.pluginInterface.exceptions.InvalidConfigException; +import io.supertokens.pluginInterface.exceptions.StorageQueryException; +import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException; +import io.supertokens.pluginInterface.multitenancy.AppIdentifier; +import io.supertokens.pluginInterface.multitenancy.TenantConfig; +import io.supertokens.pluginInterface.multitenancy.TenantIdentifier; +import io.supertokens.pluginInterface.multitenancy.exceptions.TenantOrAppNotFoundException; +import io.supertokens.pluginInterface.sqlStorage.SQLStorage; +import io.supertokens.storageLayer.StorageLayer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ProcessBulkUsersImportWorker implements Runnable { + + private final Map userPoolToStorageMap = new HashMap<>(); + private final Main main; + private final AppIdentifier app; + private final List usersToImport; + private final BulkImportSQLStorage bulkImportSQLStorage; + private final BulkImportUserUtils bulkImportUserUtils; + + ProcessBulkUsersImportWorker(Main main, AppIdentifier app, List userListToImport, BulkImportSQLStorage bulkImportSQLStorage, BulkImportUserUtils bulkImportUserUtils){ + this.main = main; + this.app = app; + this.usersToImport = userListToImport; + this.bulkImportSQLStorage = bulkImportSQLStorage; + this.bulkImportUserUtils = bulkImportUserUtils; + } + + @Override + public void run() { + try { + processMultipleUsers(app, usersToImport, bulkImportUserUtils, bulkImportSQLStorage); + } catch (TenantOrAppNotFoundException | DbInitException | IOException | StorageQueryException e) { + throw new RuntimeException(e); + } + } + + private void processMultipleUsers(AppIdentifier appIdentifier, List users, + BulkImportUserUtils bulkImportUserUtils, + BulkImportSQLStorage baseTenantStorage) + throws TenantOrAppNotFoundException, StorageQueryException, IOException, + DbInitException { + + BulkImportUser lastStartedUser = null; + try { + for (BulkImportUser user : users) { + lastStartedUser = user; + if (Main.isTesting && Main.isTesting_skipBulkImportUserValidationInCronJob) { + // Skip validation when the flag is enabled during testing + } else { + // Validate the user + bulkImportUserUtils.createBulkImportUserFromJSON(main, appIdentifier, user.toJsonObject(), user.id); + } + + // Since all the tenants of a user must share the storage, we will just use the + // storage of the first tenantId of the first loginMethod + TenantIdentifier firstTenantIdentifier = new TenantIdentifier(appIdentifier.getConnectionUriDomain(), + appIdentifier.getAppId(), user.loginMethods.get(0).tenantIds.get(0)); + + SQLStorage bulkImportProxyStorage = (SQLStorage) getBulkImportProxyStorage(firstTenantIdentifier); + BulkImportUser.LoginMethod primaryLM = BulkImport.getPrimaryLoginMethod(user); + + AuthRecipeSQLStorage authRecipeSQLStorage = (AuthRecipeSQLStorage) getBulkImportProxyStorage( + firstTenantIdentifier); + + /* + * We use two separate storage instances: one for importing the user and another for managing + * bulk_import_users entries. + * This is necessary because the bulk_import_users entries are always in the public tenant storage, + * but the actual user data could be in a different storage. + * + * If transactions are committed individually, in this order: + * 1. Commit the transaction that imports the user. + * 2. Commit the transaction that deletes the corresponding bulk import entry. + * + * There's a risk where the first commit succeeds, but the second fails. This creates a situation where + * the bulk import entry is re-processed, even though the user has already been imported into the + * database. + * + * To resolve this, we added a `primaryUserId` field to the `bulk_import_users` table. + * The processing logic now follows these steps: + * + * 1. Import the user and get the `primaryUserId` (transaction uncommitted). + * 2. Update the `primaryUserId` in the corresponding bulk import entry. + * 3. Commit the import transaction from step 1. + * 4. Delete the bulk import entry. + * + * If step 2 or any earlier step fails, nothing is committed, preventing partial state. + * If step 3 fails, the `primaryUserId` in the bulk import entry is updated, but the user doesn't + * exist in the database—this results in re-processing on the + * next run. + * If step 4 fails, the user exists but the bulk import entry remains; this will be handled by + * deleting it in the next run. + * + * The following code implements this logic. + */ + if (user.primaryUserId != null) { + AuthRecipeUserInfo importedUser = authRecipeSQLStorage.getPrimaryUserById(appIdentifier, + user.primaryUserId); + + if (importedUser != null && isProcessedUserFromSameBulkImportUserEntry(importedUser, user)) { + baseTenantStorage.deleteBulkImportUsers(appIdentifier, new String[]{user.id}); + return; + } + } + + bulkImportProxyStorage.startTransaction(con -> { + try { + Storage[] allStoragesForApp = getAllProxyStoragesForApp(main, appIdentifier); + BulkImport.processUserImportSteps(main, con, appIdentifier, bulkImportProxyStorage, user, + primaryLM, allStoragesForApp); + + // We are updating the primaryUserId in the bulkImportUser entry. This will help us handle + // the inconsistent transaction commit. + // If this update statement fails then the outer transaction will fail as well and the user + // will simpl be processed again. No inconsistency will happen in this + // case. + baseTenantStorage.updateBulkImportUserPrimaryUserId(appIdentifier, user.id, + primaryLM.superTokensUserId); + + // We need to commit the transaction manually because we have overridden that in the proxy + // storage + // If this fails, the primaryUserId will be updated in the bulkImportUser but it wouldn’t + // actually exist. + // When processing the user again, we'll check if primaryUserId exists with the same email. + // In this case the user won't exist, and we'll simply re-process it. + bulkImportProxyStorage.commitTransactionForBulkImportProxyStorage(); + + // NOTE: We need to use the baseTenantStorage as bulkImportProxyStorage could have a + // different storage than the baseTenantStorage + // If this fails, the primaryUserId will be updated in the bulkImportUser and it would exist + // in the database. + // When processing the user again, we'll check if primaryUserId exists with the same email. + // In this case the user will exist, and we'll simply delete the entry. + baseTenantStorage.deleteBulkImportUsers(appIdentifier, new String[]{user.id}); + } catch (StorageTransactionLogicException e) { + // We need to rollback the transaction manually because we have overridden that in the proxy + // storage + bulkImportProxyStorage.rollbackTransactionForBulkImportProxyStorage(); + handleProcessUserExceptions(app, user, e, baseTenantStorage); + } + return null; + }); + + } + } catch (StorageTransactionLogicException | InvalidBulkImportDataException | InvalidConfigException e) { + handleProcessUserExceptions(appIdentifier, lastStartedUser, e, baseTenantStorage); + } finally { + closeAllProxyStorages(); //closing it here to reuse the existing connection with all the users + } + } + + private void handleProcessUserExceptions(AppIdentifier appIdentifier, BulkImportUser user, Exception e, + BulkImportSQLStorage baseTenantStorage) + throws StorageQueryException { + // Java doesn't allow us to reassign local variables inside a lambda expression + // so we have to use an array. + String[] errorMessage = { e.getMessage() }; + + if (e instanceof StorageTransactionLogicException) { + StorageTransactionLogicException exception = (StorageTransactionLogicException) e; + // If the exception is due to a StorageQueryException, we want to retry the entry after sometime instead + // of marking it as FAILED. We will return early in that case. + if (exception.actualException instanceof StorageQueryException) { + Logging.error(main, null, "We got an StorageQueryException while processing a bulk import user entry. It will be retried again. Error Message: " + e.getMessage(), true); + return; + } + errorMessage[0] = exception.actualException.getMessage(); + } else if (e instanceof InvalidBulkImportDataException) { + errorMessage[0] = ((InvalidBulkImportDataException) e).errors.toString(); + } else if (e instanceof InvalidConfigException) { + errorMessage[0] = e.getMessage(); + } + + try { + baseTenantStorage.startTransaction(con -> { + baseTenantStorage.updateBulkImportUserStatus_Transaction(appIdentifier, con, user.id, + BulkImportStorage.BULK_IMPORT_USER_STATUS.FAILED, errorMessage[0]); + return null; + }); + } catch (StorageTransactionLogicException e1) { + throw new StorageQueryException(e1.actualException); + } + } + + private synchronized Storage getBulkImportProxyStorage(TenantIdentifier tenantIdentifier) + throws InvalidConfigException, IOException, TenantOrAppNotFoundException, DbInitException { + String userPoolId = StorageLayer.getStorage(tenantIdentifier, main).getUserPoolId(); + if (userPoolToStorageMap.containsKey(userPoolId)) { + return userPoolToStorageMap.get(userPoolId); + } + + TenantConfig[] allTenants = Multitenancy.getAllTenants(main); + + Map normalisedConfigs = Config.getNormalisedConfigsForAllTenants( + allTenants, + Config.getBaseConfigAsJsonObject(main)); + + for (ResourceDistributor.KeyClass key : normalisedConfigs.keySet()) { + if (key.getTenantIdentifier().equals(tenantIdentifier)) { + SQLStorage bulkImportProxyStorage = (SQLStorage) StorageLayer.getNewBulkImportProxyStorageInstance(main, + normalisedConfigs.get(key), tenantIdentifier, true); + + userPoolToStorageMap.put(userPoolId, bulkImportProxyStorage); + bulkImportProxyStorage.initStorage(false, new ArrayList<>()); + return bulkImportProxyStorage; + } + } + throw new TenantOrAppNotFoundException(tenantIdentifier); + } + + private Storage[] getAllProxyStoragesForApp(Main main, AppIdentifier appIdentifier) + throws StorageTransactionLogicException { + + try { + List allProxyStorages = new ArrayList<>(); + TenantConfig[] tenantConfigs = Multitenancy.getAllTenantsForApp(appIdentifier, main); + for (TenantConfig tenantConfig : tenantConfigs) { + allProxyStorages.add(getBulkImportProxyStorage(tenantConfig.tenantIdentifier)); + } + return allProxyStorages.toArray(new Storage[0]); + } catch (TenantOrAppNotFoundException e) { + throw new StorageTransactionLogicException(new Exception("E043: " + e.getMessage())); + } catch (InvalidConfigException e) { + throw new StorageTransactionLogicException(new InvalidConfigException("E044: " + e.getMessage())); + } catch (DbInitException e) { + throw new StorageTransactionLogicException(new DbInitException("E045: " + e.getMessage())); + } catch (IOException e) { + throw new StorageTransactionLogicException(new IOException("E046: " + e.getMessage())); + } + } + + private void closeAllProxyStorages() throws StorageQueryException { + for (SQLStorage storage : userPoolToStorageMap.values()) { + storage.closeConnectionForBulkImportProxyStorage(); + } + userPoolToStorageMap.clear(); + } + + // Checks if the importedUser was processed from the same bulkImportUser entry. + private boolean isProcessedUserFromSameBulkImportUserEntry( + AuthRecipeUserInfo importedUser, BulkImportUser bulkImportEntry) { + if (bulkImportEntry == null || importedUser == null || bulkImportEntry.loginMethods == null || + importedUser.loginMethods == null) { + return false; + } + + for (BulkImportUser.LoginMethod lm1 : bulkImportEntry.loginMethods) { + for (io.supertokens.pluginInterface.authRecipe.LoginMethod lm2 : importedUser.loginMethods) { + if (lm2.recipeId.toString().equals(lm1.recipeId)) { + if (lm1.email != null && !lm1.email.equals(lm2.email)) { + return false; + } + + switch (lm1.recipeId) { + case "emailpassword": + if (lm1.passwordHash != null && !lm1.passwordHash.equals(lm2.passwordHash)) { + return false; + } + break; + case "thirdparty": + if ((lm1.thirdPartyId != null && !lm1.thirdPartyId.equals(lm2.thirdParty.id)) + || (lm1.thirdPartyUserId != null + && !lm1.thirdPartyUserId.equals(lm2.thirdParty.userId))) { + return false; + } + break; + case "passwordless": + if (lm1.phoneNumber != null && !lm1.phoneNumber.equals(lm2.phoneNumber)) { + return false; + } + break; + default: + return false; + } + } + } + } + + return true; + } +} diff --git a/src/main/java/io/supertokens/multitenancy/Multitenancy.java b/src/main/java/io/supertokens/multitenancy/Multitenancy.java index 2342599dc..88ce910da 100644 --- a/src/main/java/io/supertokens/multitenancy/Multitenancy.java +++ b/src/main/java/io/supertokens/multitenancy/Multitenancy.java @@ -269,7 +269,7 @@ public static boolean addNewOrUpdateAppOrTenant(Main main, TenantConfig newTenan } - public static boolean addNewOrUpdateAppOrTenant(Main main, TenantConfig newTenant, + public synchronized static boolean addNewOrUpdateAppOrTenant(Main main, TenantConfig newTenant, boolean shouldPreventProtectedConfigUpdate, boolean skipThirdPartyConfigValidation, boolean forceReloadResources) diff --git a/src/test/java/io/supertokens/test/bulkimport/BulkImportTestUtils.java b/src/test/java/io/supertokens/test/bulkimport/BulkImportTestUtils.java index 61740021b..1788316d0 100644 --- a/src/test/java/io/supertokens/test/bulkimport/BulkImportTestUtils.java +++ b/src/test/java/io/supertokens/test/bulkimport/BulkImportTestUtils.java @@ -28,6 +28,8 @@ import com.google.gson.JsonParser; import io.supertokens.Main; +import io.supertokens.config.Config; +import io.supertokens.config.CoreConfig; import io.supertokens.emailpassword.PasswordHashing; import io.supertokens.featureflag.exceptions.FeatureNotEnabledException; import io.supertokens.multitenancy.Multitenancy; @@ -62,6 +64,10 @@ public static List generateBulkImportUser(int numberOfUsers) { } public static List generateBulkImportUser(int numberOfUsers, List tenants, int startIndex) { + return generateBulkImportUserWithRoles(numberOfUsers, tenants, startIndex, List.of("role1", "role2")); + } + + public static List generateBulkImportUserWithRoles(int numberOfUsers, List tenants, int startIndex, List roles) { List users = new ArrayList<>(); JsonParser parser = new JsonParser(); @@ -74,8 +80,9 @@ public static List generateBulkImportUser(int numberOfUsers, Lis .getAsJsonObject(); List userRoles = new ArrayList<>(); - userRoles.add(new UserRole("role1", tenants)); - userRoles.add(new UserRole("role2", tenants)); + for(String roleName : roles) { + userRoles.add(new UserRole(roleName, tenants)); + } List totpDevices = new ArrayList<>(); totpDevices.add(new TotpDevice("secretKey", 30, 1, "deviceName")); @@ -113,10 +120,10 @@ public static void createTenants(Main main) new EmailPasswordConfig(true), new ThirdPartyConfig(true, null), new PasswordlessConfig(true), - null, null, new JsonObject())); + null, null, Config.getBaseConfigAsJsonObject(main))); } { // tenant 2 - JsonObject config = new JsonObject(); + JsonObject config = Config.getBaseConfigAsJsonObject(main); TenantIdentifier tenantIdentifier = new TenantIdentifier(null, null, "t2"); StorageLayer.getStorage(new TenantIdentifier(null, null, null), main) diff --git a/src/test/java/io/supertokens/test/bulkimport/ProcessBulkImportUsersCronJobTest.java b/src/test/java/io/supertokens/test/bulkimport/ProcessBulkImportUsersCronJobTest.java index 667287f5a..063362031 100644 --- a/src/test/java/io/supertokens/test/bulkimport/ProcessBulkImportUsersCronJobTest.java +++ b/src/test/java/io/supertokens/test/bulkimport/ProcessBulkImportUsersCronJobTest.java @@ -22,6 +22,7 @@ import io.supertokens.authRecipe.AuthRecipe; import io.supertokens.authRecipe.UserPaginationContainer; import io.supertokens.bulkimport.BulkImport; +import io.supertokens.config.Config; import io.supertokens.cronjobs.CronTaskTest; import io.supertokens.cronjobs.bulkimport.ProcessBulkImportUsers; import io.supertokens.featureflag.EE_FEATURES; @@ -47,9 +48,11 @@ import org.junit.rules.TestRule; import static io.supertokens.test.bulkimport.BulkImportTestUtils.generateBulkImportUser; +import static io.supertokens.test.bulkimport.BulkImportTestUtils.generateBulkImportUserWithRoles; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import java.io.IOException; import java.util.List; public class ProcessBulkImportUsersCronJobTest { @@ -114,6 +117,85 @@ public void shouldProcessBulkImportUsersInTheSameTenant() throws Exception { assertNotNull(process.checkOrWaitForEvent(ProcessState.PROCESS_STATE.STOPPED)); } + @Test + public void shouldProcessBulkImportUsersInNotSoLargeNumbersInTheSameTenant() throws Exception { + Utils.setValueInConfig("bulk_migration_parallelism", "12"); + TestingProcess process = startCronProcess(); + Main main = process.getProcess(); + + if (StorageLayer.getBaseStorage(main).getType() != STORAGE_TYPE.SQL || StorageLayer.isInMemDb(main)) { + return; + } + + // Create user roles before inserting bulk users + { + UserRoles.createNewRoleOrModifyItsPermissions(main, "role1", null); + UserRoles.createNewRoleOrModifyItsPermissions(main, "role2", null); + } + + BulkImportTestUtils.createTenants(main); + + BulkImportSQLStorage storage = (BulkImportSQLStorage) StorageLayer.getStorage(main); + AppIdentifier appIdentifier = new AppIdentifier(null, null); + + int usersCount = 15; + List users = generateBulkImportUser(usersCount); + BulkImport.addUsers(appIdentifier, storage, users); + + Thread.sleep(6000); + + List usersAfterProcessing = storage.getBulkImportUsers(appIdentifier, 1000, null, + null, null); + + assertEquals(0, usersAfterProcessing.size()); + + UserPaginationContainer container = AuthRecipe.getUsers(main, 1000, "ASC", null, null, null); + assertEquals(usersCount, container.users.length); + + process.kill(); + assertNotNull(process.checkOrWaitForEvent(ProcessState.PROCESS_STATE.STOPPED)); + } + + @Test + public void shouldProcessBulkImportUsersInLargeNumbersInTheSameTenant() throws Exception { + Utils.setValueInConfig("bulk_migration_parallelism", "12"); + + TestingProcess process = startCronProcess(); + Main main = process.getProcess(); + + if (StorageLayer.getBaseStorage(main).getType() != STORAGE_TYPE.SQL || StorageLayer.isInMemDb(main)) { + return; + } + + // Create user roles before inserting bulk users + { + UserRoles.createNewRoleOrModifyItsPermissions(main, "role1", null); + UserRoles.createNewRoleOrModifyItsPermissions(main, "role2", null); + } + + BulkImportTestUtils.createTenants(main); + + BulkImportSQLStorage storage = (BulkImportSQLStorage) StorageLayer.getStorage(main); + AppIdentifier appIdentifier = new AppIdentifier(null, null); + + int usersCount = 1000; + List users = generateBulkImportUser(usersCount); + BulkImport.addUsers(appIdentifier, storage, users); + + Thread.sleep(60000); // 1 minute + + List usersAfterProcessing = storage.getBulkImportUsers(appIdentifier, 1000, null, + null, null); + + assertEquals(0, usersAfterProcessing.size()); + + UserPaginationContainer container = AuthRecipe.getUsers(main, 1000, "ASC", null, null, null); + assertEquals(usersCount, container.users.length); + + process.kill(); + assertNotNull(process.checkOrWaitForEvent(ProcessState.PROCESS_STATE.STOPPED)); + } + @Test public void shouldProcessBulkImportUsersInMultipleTenantsWithDifferentStorages() throws Exception { TestingProcess process = startCronProcess(); @@ -175,7 +257,7 @@ public void shouldProcessBulkImportUsersInMultipleTenantsWithDifferentStorages() } @Test - public void shouldDeleteEverythingFromtheDBIfAnythingFails() throws Exception { + public void shouldDeleteEverythingFromTheDBIfAnythingFails() throws Exception { // Creating a non-existing user role will result in an error. // Since, user role creation happens at the last step of the bulk import process, everything should be deleted from the DB. @@ -194,6 +276,8 @@ public void shouldDeleteEverythingFromtheDBIfAnythingFails() throws Exception { BulkImportSQLStorage storage = (BulkImportSQLStorage) StorageLayer.getStorage(main); AppIdentifier appIdentifier = new AppIdentifier(null, null); + // note the missing role creation here! + List users = generateBulkImportUser(1); BulkImport.addUsers(appIdentifier, storage, users); @@ -212,6 +296,105 @@ public void shouldDeleteEverythingFromtheDBIfAnythingFails() throws Exception { assertEquals(0, container.users.length); } + + @Test + public void shouldDeleteEverythingFromTheDBIfAnythingFailsOnMultipleThreads() throws Exception { + Utils.setValueInConfig("bulk_migration_parallelism", "12"); + // Creating a non-existing user role will result in an error. + // Since, user role creation happens at the last step of the bulk import process, everything should be deleted from the DB. + + // NOTE: We will also need to disable the bulk import user validation in the cron job for this test to work. + Main.isTesting_skipBulkImportUserValidationInCronJob = true; + + TestingProcess process = startCronProcess(); + Main main = process.getProcess(); + + if (StorageLayer.getBaseStorage(main).getType() != STORAGE_TYPE.SQL || StorageLayer.isInMemDb(main)) { + return; + } + + BulkImportTestUtils.createTenants(main); + + BulkImportSQLStorage storage = (BulkImportSQLStorage) StorageLayer.getStorage(main); + AppIdentifier appIdentifier = new AppIdentifier(null, null); + + // note the missing role creation here! + + List users = generateBulkImportUser(100); + BulkImport.addUsers(appIdentifier, storage, users); + + Thread.sleep(60000); + + List usersAfterProcessing = storage.getBulkImportUsers(appIdentifier, 100, null, + null, null); + + assertEquals(100, usersAfterProcessing.size()); + + for(BulkImportUser userAfterProcessing: usersAfterProcessing){ + assertEquals(BULK_IMPORT_USER_STATUS.FAILED, userAfterProcessing.status); // should process every user and every one of them should fail because of the missing role + assertEquals("E034: Role role1 does not exist! You need pre-create the role before assigning it to the user.", + userAfterProcessing.errorMessage); + } + + UserPaginationContainer container = AuthRecipe.getUsers(main, 100, "ASC", null, null, null); + assertEquals(0, container.users.length); + } + + @Test + public void shouldDeleteOnlyFailedFromTheDBIfAnythingFailsOnMultipleThreads() throws Exception { + Utils.setValueInConfig("bulk_migration_parallelism", "12"); + // Creating a non-existing user role will result in an error. + // Since, user role creation happens at the last step of the bulk import process, everything should be deleted from the DB. + + // NOTE: We will also need to disable the bulk import user validation in the cron job for this test to work. + Main.isTesting_skipBulkImportUserValidationInCronJob = true; + + TestingProcess process = startCronProcess(); + Main main = process.getProcess(); + + if (StorageLayer.getBaseStorage(main).getType() != STORAGE_TYPE.SQL || StorageLayer.isInMemDb(main)) { + return; + } + + + BulkImportTestUtils.createTenants(main); + + BulkImportSQLStorage storage = (BulkImportSQLStorage) StorageLayer.getStorage(main); + AppIdentifier appIdentifier = new AppIdentifier(null, null); + + // Create one user role before inserting bulk users + { + UserRoles.createNewRoleOrModifyItsPermissions(main, "role1", null); + } + + List users = generateBulkImportUserWithRoles(99, List.of("public", "t1"), 0, List.of("role1")); + users.addAll(generateBulkImportUserWithRoles(1, List.of("public", "t1"), 99, List.of("notExistingRole"))); + + BulkImport.addUsers(appIdentifier, storage, users); + + Thread.sleep(60000); + + List usersAfterProcessing = storage.getBulkImportUsers(appIdentifier, 100, null, + null, null); + + assertEquals(1, usersAfterProcessing.size()); + + int numberOfFailed = 0; + for(int i = 0; i < usersAfterProcessing.size(); i++){ + if(usersAfterProcessing.get(i).status == BULK_IMPORT_USER_STATUS.FAILED) { + assertEquals( + "E034: Role notExistingRole does not exist! You need pre-create the role before assigning it to the user.", + usersAfterProcessing.get(i).errorMessage); + numberOfFailed++; + } + } + + UserPaginationContainer container = AuthRecipe.getUsers(main, 100, "ASC", null, null, null); + assertEquals(99, container.users.length); + assertEquals(1, numberOfFailed); + } + + @Test public void shouldThrowTenantDoesNotExistError() throws Exception { TestingProcess process = startCronProcess();