diff --git a/src/main/java/io/supertokens/bulkimport/BulkImport.java b/src/main/java/io/supertokens/bulkimport/BulkImport.java index abea35442..3d35db46b 100644 --- a/src/main/java/io/supertokens/bulkimport/BulkImport.java +++ b/src/main/java/io/supertokens/bulkimport/BulkImport.java @@ -59,7 +59,6 @@ import io.supertokens.pluginInterface.passwordless.PasswordlessImportUser; import io.supertokens.pluginInterface.passwordless.exception.DuplicatePhoneNumberException; import io.supertokens.pluginInterface.sqlStorage.SQLStorage; -import io.supertokens.pluginInterface.sqlStorage.TransactionConnection; import io.supertokens.pluginInterface.thirdparty.ThirdPartyImportUser; import io.supertokens.pluginInterface.thirdparty.exception.DuplicateThirdPartyUserException; import io.supertokens.pluginInterface.totp.TOTPDevice; @@ -177,7 +176,7 @@ public static synchronized AuthRecipeUserInfo importUser(Main main, AppIdentifie try { Storage[] allStoragesForApp = getAllProxyStoragesForApp(main, appIdentifier); - processUsersImportSteps(main, con, appIdentifier, bulkImportProxyStorage, List.of(user), allStoragesForApp); + processUsersImportSteps(main, appIdentifier, bulkImportProxyStorage, List.of(user), allStoragesForApp); bulkImportProxyStorage.commitTransactionForBulkImportProxyStorage(); @@ -200,11 +199,11 @@ public static synchronized AuthRecipeUserInfo importUser(Main main, AppIdentifie } } - public static void processUsersImportSteps(Main main, TransactionConnection connection, AppIdentifier appIdentifier, + public static void processUsersImportSteps(Main main, AppIdentifier appIdentifier, Storage bulkImportProxyStorage, List users, Storage[] allStoragesForApp) throws StorageTransactionLogicException { - processUsersLoginMethods(main, appIdentifier, bulkImportProxyStorage, users); try { + processUsersLoginMethods(main, appIdentifier, bulkImportProxyStorage, users); createPrimaryUsersAndLinkAccounts(main, appIdentifier, bulkImportProxyStorage, users); createMultipleUserIdMapping(appIdentifier, users, allStoragesForApp); verifyMultipleEmailForAllLoginMethods(appIdentifier, bulkImportProxyStorage, users); @@ -242,7 +241,7 @@ public static void processUsersLoginMethods(Main main, AppIdentifier appIdentifi appIdentifier)); } if (sortedLoginMethods.containsKey("passwordless")) { - importedUsers.addAll(processPasswordlessLoginMethods(appIdentifier, storage, + importedUsers.addAll(processPasswordlessLoginMethods(main, appIdentifier, storage, sortedLoginMethods.get("passwordless"))); } Set actualKeys = new HashSet<>(sortedLoginMethods.keySet()); @@ -268,7 +267,7 @@ public static void processUsersLoginMethods(Main main, AppIdentifier appIdentifi } } - private static List processPasswordlessLoginMethods(AppIdentifier appIdentifier, Storage storage, + private static List processPasswordlessLoginMethods(Main main, AppIdentifier appIdentifier, Storage storage, List loginMethods) throws StorageTransactionLogicException { try { @@ -279,13 +278,13 @@ private static List processPasswordlessLoginMethods(Ap appIdentifier.getConnectionUriDomain(), appIdentifier.getAppId(), loginMethod.tenantIds.get( 0)); // the cron runs per app. The app stays the same, the tenant can change - - usersToImport.add(new PasswordlessImportUser(userId, loginMethod.phoneNumber, + usersToImport.add(new PasswordlessImportUser(userId, loginMethod.phoneNumber, loginMethod.email, tenantIdentifierForLoginMethod, loginMethod.timeJoinedInMSSinceEpoch)); loginMethod.superTokensUserId = userId; } Passwordless.createPasswordlessUsers(storage, usersToImport); + return usersToImport; } catch (StorageQueryException | StorageTransactionLogicException e) { if (e.getCause() instanceof BulkImportBatchInsertException) { @@ -330,6 +329,7 @@ private static List processThirdpartyLoginMethods(Main loginMethod.superTokensUserId = userId; } ThirdParty.createMultipleThirdPartyUsers(storage, usersToImport); + return usersToImport; } catch (StorageQueryException | StorageTransactionLogicException e) { if (e.getCause() instanceof BulkImportBatchInsertException) { @@ -359,6 +359,7 @@ private static List processEmailPasswordLoginMethods( AppIdentifier appIdentifier) throws StorageTransactionLogicException { try { + //prepare data for batch import List usersToImport = new ArrayList<>(); for(LoginMethod emailPasswordLoginMethod : loginMethods) { @@ -379,6 +380,7 @@ private static List processEmailPasswordLoginMethods( } EmailPassword.createMultipleUsersWithPasswordHash(storage, usersToImport); + return usersToImport; } catch (StorageQueryException | StorageTransactionLogicException e) { if(e.getCause() instanceof BulkImportBatchInsertException){ diff --git a/src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkImportUsers.java b/src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkImportUsers.java index 74e73a2ae..3baa091d1 100644 --- a/src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkImportUsers.java +++ b/src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkImportUsers.java @@ -78,6 +78,7 @@ protected void doTaskPerApp(AppIdentifier app) this.batchSize); if(users == null || users.isEmpty()) { + // "No more users to process!" return; } diff --git a/src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkUsersImportWorker.java b/src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkUsersImportWorker.java index 855f616a8..a19fa6723 100644 --- a/src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkUsersImportWorker.java +++ b/src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkUsersImportWorker.java @@ -26,7 +26,6 @@ import io.supertokens.multitenancy.Multitenancy; import io.supertokens.output.Logging; import io.supertokens.pluginInterface.Storage; -import io.supertokens.pluginInterface.authRecipe.AuthRecipeUserInfo; import io.supertokens.pluginInterface.bulkimport.BulkImportUser; import io.supertokens.pluginInterface.bulkimport.exceptions.BulkImportBatchInsertException; import io.supertokens.pluginInterface.bulkimport.exceptions.BulkImportTransactionRolledBackException; @@ -76,7 +75,6 @@ private void processMultipleUsers(AppIdentifier appIdentifier, List { - try { - - BulkImport.processUsersImportSteps(main, con, appIdentifier, bulkImportProxyStorage, validUsers, allStoragesForApp); - - bulkImportProxyStorage.commitTransactionForBulkImportProxyStorage(); - - String[] toDelete = new String[validUsers.size()]; - for(int i = 0; i < validUsers.size(); i++) { - toDelete[i] = validUsers.get(i).id; - } + Map> partitionedUsers = partitionUsersByStorage(appIdentifier, validUsers); + for(SQLStorage bulkImportProxyStorage : partitionedUsers.keySet()) { + boolean shouldRetryImmediatley = true; + while (shouldRetryImmediatley) { + shouldRetryImmediatley = bulkImportProxyStorage.startTransaction(con -> { + try { + BulkImport.processUsersImportSteps(main, appIdentifier, bulkImportProxyStorage, partitionedUsers.get(bulkImportProxyStorage), + allStoragesForApp); + + bulkImportProxyStorage.commitTransactionForBulkImportProxyStorage(); + + String[] toDelete = new String[validUsers.size()]; + for (int i = 0; i < validUsers.size(); i++) { + toDelete[i] = validUsers.get(i).id; + } - baseTenantStorage.deleteBulkImportUsers(appIdentifier, toDelete); - } catch (StorageTransactionLogicException e) { - // We need to rollback the transaction manually because we have overridden that in the proxy - // storage - bulkImportProxyStorage.rollbackTransactionForBulkImportProxyStorage(); - if(isBulkImportTransactionRolledBackIsTheRealCause(e)){ - return true; - //@see BulkImportTransactionRolledBackException for explanation - } - handleProcessUserExceptions(app, validUsers, e, baseTenantStorage); + baseTenantStorage.deleteBulkImportUsers(appIdentifier, toDelete); + } catch (StorageTransactionLogicException e) { + // We need to rollback the transaction manually because we have overridden that in the proxy + // storage + bulkImportProxyStorage.rollbackTransactionForBulkImportProxyStorage(); + if (isBulkImportTransactionRolledBackIsTheRealCause(e)) { + return true; + //@see BulkImportTransactionRolledBackException for explanation + } + handleProcessUserExceptions(app, validUsers, e, baseTenantStorage); + } + return false; + }); } - return false; - }); + } } catch (StorageTransactionLogicException | InvalidConfigException e) { throw new RuntimeException(e); } catch (BulkImportBatchInsertException insertException) { @@ -217,7 +216,7 @@ private static void handleBulkImportException(List usersBatch, B .filter(bulkImportUser -> bulkImportUser.loginMethods.stream() .map(loginMethod -> loginMethod.superTokensUserId) - .anyMatch(s -> s.equals(userid))).findFirst(); + .anyMatch(s -> s!= null && s.equals(userid))).findFirst(); if(userWithId.isPresent()){ id = userWithId.get().id; } @@ -280,46 +279,19 @@ private void closeAllProxyStorages() throws StorageQueryException { userPoolToStorageMap.clear(); } - // Checks if the importedUser was processed from the same bulkImportUser entry. - private boolean isProcessedUserFromSameBulkImportUserEntry( - AuthRecipeUserInfo importedUser, BulkImportUser bulkImportEntry) { - if (bulkImportEntry == null || importedUser == null || bulkImportEntry.loginMethods == null || - importedUser.loginMethods == null) { - return false; - } - - for (BulkImportUser.LoginMethod lm1 : bulkImportEntry.loginMethods) { - for (io.supertokens.pluginInterface.authRecipe.LoginMethod lm2 : importedUser.loginMethods) { - if (lm2.recipeId.toString().equals(lm1.recipeId)) { - if (lm1.email != null && !lm1.email.equals(lm2.email)) { - return false; - } + private Map> partitionUsersByStorage(AppIdentifier appIdentifier, List users) + throws DbInitException, TenantOrAppNotFoundException, InvalidConfigException, IOException { + Map> result = new HashMap<>(); + for(BulkImportUser user: users) { + TenantIdentifier firstTenantIdentifier = new TenantIdentifier(appIdentifier.getConnectionUriDomain(), + appIdentifier.getAppId(), user.loginMethods.get(0).tenantIds.get(0)); - switch (lm1.recipeId) { - case "emailpassword": - if (lm1.passwordHash != null && !lm1.passwordHash.equals(lm2.passwordHash)) { - return false; - } - break; - case "thirdparty": - if ((lm1.thirdPartyId != null && !lm1.thirdPartyId.equals(lm2.thirdParty.id)) - || (lm1.thirdPartyUserId != null - && !lm1.thirdPartyUserId.equals(lm2.thirdParty.userId))) { - return false; - } - break; - case "passwordless": - if (lm1.phoneNumber != null && !lm1.phoneNumber.equals(lm2.phoneNumber)) { - return false; - } - break; - default: - return false; - } - } + SQLStorage bulkImportProxyStorage = (SQLStorage) getBulkImportProxyStorage(firstTenantIdentifier); + if(!result.containsKey(bulkImportProxyStorage)){ + result.put(bulkImportProxyStorage, new ArrayList<>()); } + result.get(bulkImportProxyStorage).add(user); } - - return true; - } + return result; + } } diff --git a/src/main/java/io/supertokens/emailpassword/EmailPassword.java b/src/main/java/io/supertokens/emailpassword/EmailPassword.java index 11b3ec431..61384e86a 100644 --- a/src/main/java/io/supertokens/emailpassword/EmailPassword.java +++ b/src/main/java/io/supertokens/emailpassword/EmailPassword.java @@ -261,11 +261,12 @@ public static ImportUserResponse createUserWithPasswordHash(TenantIdentifier ten public static void createMultipleUsersWithPasswordHash(Storage storage, List usersToImport) throws StorageQueryException, TenantOrAppNotFoundException, StorageTransactionLogicException { - EmailPasswordSQLStorage epStorage = StorageUtils.getEmailPasswordStorage(storage); - epStorage.startTransaction(con -> { - epStorage.signUpMultipleViaBulkImport_Transaction(con, usersToImport); - return null; - }); + + EmailPasswordSQLStorage epStorage = StorageUtils.getEmailPasswordStorage(storage); + epStorage.startTransaction(con -> { + epStorage.signUpMultipleViaBulkImport_Transaction(con, usersToImport); + return null; + }); } @TestOnly diff --git a/src/main/java/io/supertokens/passwordless/Passwordless.java b/src/main/java/io/supertokens/passwordless/Passwordless.java index a317c6716..2ff4e28ef 100644 --- a/src/main/java/io/supertokens/passwordless/Passwordless.java +++ b/src/main/java/io/supertokens/passwordless/Passwordless.java @@ -554,13 +554,13 @@ public static void createPasswordlessUsers(Storage storage, List importUsers) throws TenantOrAppNotFoundException, StorageQueryException, StorageTransactionLogicException { - PasswordlessSQLStorage passwordlessStorage = StorageUtils.getPasswordlessStorage(storage); + PasswordlessSQLStorage passwordlessStorage = StorageUtils.getPasswordlessStorage(storage); - passwordlessStorage.startTransaction(con -> { - passwordlessStorage.importPasswordlessUsers_Transaction(con, importUsers); - passwordlessStorage.commitTransaction(con); - return null; - }); + passwordlessStorage.startTransaction(con -> { + passwordlessStorage.importPasswordlessUsers_Transaction(con, importUsers); + passwordlessStorage.commitTransaction(con); + return null; + }); } @TestOnly diff --git a/src/main/java/io/supertokens/thirdparty/ThirdParty.java b/src/main/java/io/supertokens/thirdparty/ThirdParty.java index 57aa43819..a7f18bb54 100644 --- a/src/main/java/io/supertokens/thirdparty/ThirdParty.java +++ b/src/main/java/io/supertokens/thirdparty/ThirdParty.java @@ -359,13 +359,13 @@ public static SignInUpResponse createThirdPartyUser(TenantIdentifier tenantIdent public static void createMultipleThirdPartyUsers(Storage storage, List usersToImport) throws StorageQueryException, StorageTransactionLogicException, TenantOrAppNotFoundException { - ThirdPartySQLStorage tpStorage = StorageUtils.getThirdPartyStorage(storage); - tpStorage.startTransaction(con -> { - tpStorage.importThirdPartyUsers_Transaction(con, usersToImport); - tpStorage.commitTransaction(con); - return null; - }); + ThirdPartySQLStorage tpStorage = StorageUtils.getThirdPartyStorage(storage); + tpStorage.startTransaction(con -> { + tpStorage.importThirdPartyUsers_Transaction(con, usersToImport); + tpStorage.commitTransaction(con); + return null; + }); } @Deprecated diff --git a/src/test/java/io/supertokens/test/bulkimport/BulkImportFlowTest.java b/src/test/java/io/supertokens/test/bulkimport/BulkImportFlowTest.java index bbf16e3b9..b1ec345b0 100644 --- a/src/test/java/io/supertokens/test/bulkimport/BulkImportFlowTest.java +++ b/src/test/java/io/supertokens/test/bulkimport/BulkImportFlowTest.java @@ -41,6 +41,7 @@ import org.junit.rules.TestRule; import java.io.IOException; +import java.net.SocketTimeoutException; import java.util.HashMap; import java.util.Map; import java.util.Random; @@ -110,18 +111,29 @@ public void testWithOneMillionUsers() throws Exception { { long count = NUMBER_OF_USERS_TO_UPLOAD; while(true) { - JsonObject response = loadBulkImportUsersCountWithStatus(main, null); - assertEquals("OK", response.get("status").getAsString()); - count = response.get("count").getAsLong(); - int newUsersNumber = loadBulkImportUsersCountWithStatus(main, BulkImportStorage.BULK_IMPORT_USER_STATUS.NEW).get("count").getAsInt(); - int processingUsersNumber = loadBulkImportUsersCountWithStatus(main, BulkImportStorage.BULK_IMPORT_USER_STATUS.PROCESSING).get("count").getAsInt(); - - count = newUsersNumber + processingUsersNumber; - - if(count == 0 ){ - break; + try { + JsonObject response = loadBulkImportUsersCountWithStatus(main, null); + assertEquals("OK", response.get("status").getAsString()); + count = response.get("count").getAsLong(); + int newUsersNumber = loadBulkImportUsersCountWithStatus(main, + BulkImportStorage.BULK_IMPORT_USER_STATUS.NEW).get("count").getAsInt(); + int processingUsersNumber = loadBulkImportUsersCountWithStatus(main, + BulkImportStorage.BULK_IMPORT_USER_STATUS.PROCESSING).get("count").getAsInt(); + int failedUsersNumber = loadBulkImportUsersCountWithStatus(main, + BulkImportStorage.BULK_IMPORT_USER_STATUS.FAILED).get("count").getAsInt(); + count = newUsersNumber + processingUsersNumber; + + if (count == 0) { + break; + } + } catch (Exception e) { + if(e instanceof SocketTimeoutException) { + //ignore + } else { + throw e; + } } - Thread.sleep(5000); // one minute + Thread.sleep(5000); } } @@ -139,6 +151,81 @@ public void testWithOneMillionUsers() throws Exception { } + @Test + public void testBatchWithOneUser() throws Exception { + String[] args = {"../"}; + + // set processing thread number + Utils.setValueInConfig("bulk_migration_parallelism", "12"); + + TestingProcessManager.TestingProcess process = TestingProcessManager.start(args); + assertNotNull(process.checkOrWaitForEvent(ProcessState.PROCESS_STATE.STARTED)); + Main main = process.getProcess(); + + setFeatureFlags(main, new EE_FEATURES[]{ + EE_FEATURES.ACCOUNT_LINKING, EE_FEATURES.MULTI_TENANCY, EE_FEATURES.MFA}); + + int NUMBER_OF_USERS_TO_UPLOAD = 100; + + if (StorageLayer.getBaseStorage(main).getType() != STORAGE_TYPE.SQL || StorageLayer.isInMemDb(main)) { + return; + } + + // Create user roles before inserting bulk users + UserRoles.createNewRoleOrModifyItsPermissions(main, "role1", null); + UserRoles.createNewRoleOrModifyItsPermissions(main, "role2", null); + + // upload a bunch of users through the API + JsonObject usersJson = generateUsersJson(NUMBER_OF_USERS_TO_UPLOAD, 0); + + JsonObject response = uploadBulkImportUsersJson(main, usersJson); + assertEquals("OK", response.get("status").getAsString()); + + // Starting the processing cronjob here to be able to measure the runtime + startBulkImportCronjob(main, 8000); + + // wait for the cron job to process them + // periodically check the remaining unprocessed users + // Note1: the cronjob starts the processing automatically + // Note2: the successfully processed users get deleted from the bulk_import_users table + + long count = NUMBER_OF_USERS_TO_UPLOAD; + int failedUsersNumber = 0; + while (true) { + response = loadBulkImportUsersCountWithStatus(main, null); + assertEquals("OK", response.get("status").getAsString()); + count = response.get("count").getAsLong(); + int newUsersNumber = loadBulkImportUsersCountWithStatus(main, + BulkImportStorage.BULK_IMPORT_USER_STATUS.NEW).get("count").getAsInt(); + failedUsersNumber = loadBulkImportUsersCountWithStatus(main, + BulkImportStorage.BULK_IMPORT_USER_STATUS.FAILED).get("count").getAsInt(); + int processingUsersNumber = loadBulkImportUsersCountWithStatus(main, + BulkImportStorage.BULK_IMPORT_USER_STATUS.PROCESSING).get("count").getAsInt(); + + count = newUsersNumber + processingUsersNumber; + if(count == 0) { + break; + } + System.out.println("new: " + newUsersNumber); + System.out.println("failed: " + failedUsersNumber); + System.out.println("processing: " + processingUsersNumber); + Thread.sleep(5000); // 5 seconds + } + + //print failed users + JsonObject failedUsersLs = loadBulkImportUsersWithStatus(main, + BulkImportStorage.BULK_IMPORT_USER_STATUS.FAILED); + + // after processing finished, make sure every user got processed correctly + int failedImportedUsersNumber = loadBulkImportUsersCountWithStatus(main, + BulkImportStorage.BULK_IMPORT_USER_STATUS.FAILED).get("count").getAsInt(); + int usersInCore = loadUsersCount(main).get("count").getAsInt(); + assertEquals(NUMBER_OF_USERS_TO_UPLOAD , usersInCore + failedImportedUsersNumber); + assertEquals(0, failedImportedUsersNumber); + + + } + @Test public void testBatchWithDuplicate() throws Exception { String[] args = {"../"}; @@ -747,7 +834,7 @@ private static JsonObject generateUsersJson(int numberOfUsers, int startIndex) { JsonObject user = new JsonObject(); user.addProperty("externalUserId", UUID.randomUUID().toString()); - user.add("userMetadata", parser.parse("{\"key1\":\"value1\",\"key2\":{\"key3\":\"value3\"}}")); + user.add("userMetadata", parser.parse("{\"key1\":"+ UUID.randomUUID().toString() + ",\"key2\":{\"key3\":\"value3\"}}")); user.add("userRoles", parser.parse( "[{\"role\":\"role1\", \"tenantIds\": [\"public\"]},{\"role\":\"role2\", \"tenantIds\": [\"public\"]}]")); user.add("totpDevices", parser.parse("[{\"secretKey\":\"secretKey\",\"deviceName\":\"deviceName\"}]")); diff --git a/src/test/java/io/supertokens/test/bulkimport/BulkImportTestUtils.java b/src/test/java/io/supertokens/test/bulkimport/BulkImportTestUtils.java index 1f434fa48..3fac0ca38 100644 --- a/src/test/java/io/supertokens/test/bulkimport/BulkImportTestUtils.java +++ b/src/test/java/io/supertokens/test/bulkimport/BulkImportTestUtils.java @@ -16,20 +16,9 @@ package io.supertokens.test.bulkimport; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - import com.google.gson.JsonObject; import com.google.gson.JsonParser; - import io.supertokens.Main; -import io.supertokens.config.Config; -import io.supertokens.config.CoreConfig; import io.supertokens.emailpassword.PasswordHashing; import io.supertokens.featureflag.exceptions.FeatureNotEnabledException; import io.supertokens.multitenancy.Multitenancy; @@ -43,12 +32,7 @@ import io.supertokens.pluginInterface.bulkimport.BulkImportUser.UserRole; import io.supertokens.pluginInterface.exceptions.InvalidConfigException; import io.supertokens.pluginInterface.exceptions.StorageQueryException; -import io.supertokens.pluginInterface.multitenancy.AppIdentifier; -import io.supertokens.pluginInterface.multitenancy.EmailPasswordConfig; -import io.supertokens.pluginInterface.multitenancy.PasswordlessConfig; -import io.supertokens.pluginInterface.multitenancy.TenantConfig; -import io.supertokens.pluginInterface.multitenancy.TenantIdentifier; -import io.supertokens.pluginInterface.multitenancy.ThirdPartyConfig; +import io.supertokens.pluginInterface.multitenancy.*; import io.supertokens.pluginInterface.multitenancy.exceptions.TenantOrAppNotFoundException; import io.supertokens.pluginInterface.totp.TOTPDevice; import io.supertokens.storageLayer.StorageLayer; @@ -57,6 +41,12 @@ import io.supertokens.usermetadata.UserMetadata; import io.supertokens.userroles.UserRoles; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.*; + public class BulkImportTestUtils { public static List generateBulkImportUser(int numberOfUsers) { @@ -76,7 +66,7 @@ public static List generateBulkImportUserWithRoles(int numberOfU String id = io.supertokens.utils.Utils.getUUID(); String externalId = io.supertokens.utils.Utils.getUUID(); - JsonObject userMetadata = parser.parse("{\"key1\":\"value1\",\"key2\":{\"key3\":\"value3\"}}") + JsonObject userMetadata = parser.parse("{\"key1\":\""+id+"\",\"key2\":{\"key3\":\"value3\"}}") .getAsJsonObject(); List userRoles = new ArrayList<>(); diff --git a/src/test/java/io/supertokens/test/bulkimport/ProcessBulkImportUsersCronJobTest.java b/src/test/java/io/supertokens/test/bulkimport/ProcessBulkImportUsersCronJobTest.java index 0bf2647b2..8b3035913 100644 --- a/src/test/java/io/supertokens/test/bulkimport/ProcessBulkImportUsersCronJobTest.java +++ b/src/test/java/io/supertokens/test/bulkimport/ProcessBulkImportUsersCronJobTest.java @@ -143,7 +143,7 @@ public void shouldProcessBulkImportUsersInNotSoLargeNumbersInTheSameTenant() thr List users = generateBulkImportUser(usersCount); BulkImport.addUsers(appIdentifier, storage, users); - Thread.sleep(60000); + Thread.sleep(6000); List usersAfterProcessing = storage.getBulkImportUsers(appIdentifier, 1000, null, null, null); @@ -159,8 +159,6 @@ public void shouldProcessBulkImportUsersInNotSoLargeNumbersInTheSameTenant() thr @Test public void shouldProcessBulkImportUsersInLargeNumbersInTheSameTenant() throws Exception { - Utils.setValueInConfig("bulk_migration_parallelism", "8"); - TestingProcess process = startCronProcess(); Main main = process.getProcess(); @@ -198,7 +196,9 @@ public void shouldProcessBulkImportUsersInLargeNumbersInTheSameTenant() throws E } @Test - public void shouldProcessBulkImportUsersInMultipleTenantsWithDifferentStorages() throws Exception { + public void shouldProcessBulkImportUsersInMultipleTenantsWithDifferentStoragesOnMultipleThreads() throws Exception { + Utils.setValueInConfig("bulk_migration_parallelism", "3"); + TestingProcess process = startCronProcess(); Main main = process.getProcess(); @@ -226,9 +226,73 @@ public void shouldProcessBulkImportUsersInMultipleTenantsWithDifferentStorages() BulkImportUser bulkImportUserT1 = usersT1.get(0); BulkImportUser bulkImportUserT2 = usersT2.get(0); - BulkImport.addUsers(appIdentifier, storage, List.of(bulkImportUserT1, bulkImportUserT2)); + BulkImport.addUsers(appIdentifier, storage, usersT1); + BulkImport.addUsers(appIdentifier, storage, usersT2); - Thread.sleep(6000); + Thread.sleep(12000); + + List usersAfterProcessing = storage.getBulkImportUsers(appIdentifier, 100, null, + null, null); + + assertEquals(0, usersAfterProcessing.size()); + + Storage storageT1 = StorageLayer.getStorage(t1, main); + Storage storageT2 = StorageLayer.getStorage(t2, main); + + UserPaginationContainer containerT1 = AuthRecipe.getUsers(t1, storageT1, 100, "ASC", null, null, null); + UserPaginationContainer containerT2 = AuthRecipe.getUsers(t2, storageT2, 100, "ASC", null, null, null); + + assertEquals(usersT1.size() + usersT2.size(), containerT1.users.length + containerT2.users.length); + + UserIdMapping.populateExternalUserIdForUsers(appIdentifier, storageT1, containerT1.users); + UserIdMapping.populateExternalUserIdForUsers(appIdentifier, storageT2, containerT2.users); + + BulkImportTestUtils.assertBulkImportUserAndAuthRecipeUserAreEqual(main, appIdentifier, t1, storageT1, + bulkImportUserT1, + containerT1.users[0]); + BulkImportTestUtils.assertBulkImportUserAndAuthRecipeUserAreEqual(main, appIdentifier, t2, storageT2, + bulkImportUserT2, + containerT2.users[0]); + + process.kill(); + assertNotNull(process.checkOrWaitForEvent(ProcessState.PROCESS_STATE.STOPPED)); + } + + @Test + public void shouldProcessBulkImportUsersInMultipleTenantsWithDifferentStoragesOnOneThreads() throws Exception { + Utils.setValueInConfig("bulk_migration_parallelism", "1"); + + TestingProcess process = startCronProcess(); + Main main = process.getProcess(); + + if (StorageLayer.getBaseStorage(main).getType() != STORAGE_TYPE.SQL || StorageLayer.isInMemDb(main)) { + return; + } + + // Create user roles before inserting bulk users + { + UserRoles.createNewRoleOrModifyItsPermissions(main, "role1", null); + UserRoles.createNewRoleOrModifyItsPermissions(main, "role2", null); + } + + BulkImportTestUtils.createTenants(main); + + TenantIdentifier t1 = new TenantIdentifier(null, null, "t1"); + TenantIdentifier t2 = new TenantIdentifier(null, null, "t2"); + + BulkImportSQLStorage storage = (BulkImportSQLStorage) StorageLayer.getStorage(main); + AppIdentifier appIdentifier = new AppIdentifier(null, null); + + List usersT1 = generateBulkImportUser(1, List.of(t1.getTenantId()), 0); + List usersT2 = generateBulkImportUser(1, List.of(t2.getTenantId()), 1); + + BulkImportUser bulkImportUserT1 = usersT1.get(0); + BulkImportUser bulkImportUserT2 = usersT2.get(0); + + BulkImport.addUsers(appIdentifier, storage, usersT1); + BulkImport.addUsers(appIdentifier, storage, usersT2); + + Thread.sleep(12000); List usersAfterProcessing = storage.getBulkImportUsers(appIdentifier, 100, null, null, null); @@ -313,6 +377,62 @@ public void shouldProcessBulkImportUsersInLargeNumberInMultipleTenantsWithDiffer assertNotNull(process.checkOrWaitForEvent(ProcessState.PROCESS_STATE.STOPPED)); } + @Test + public void shouldProcessBulkImportUsersInLargeNumberInMultipleTenantsWithDifferentStoragesOnOneThread() throws Exception { + Utils.setValueInConfig("bulk_migration_parallelism", "1"); + + TestingProcess process = startCronProcess(); + Main main = process.getProcess(); + + if (StorageLayer.getBaseStorage(main).getType() != STORAGE_TYPE.SQL || StorageLayer.isInMemDb(main)) { + return; + } + + // Create user roles before inserting bulk users + { + UserRoles.createNewRoleOrModifyItsPermissions(main, "role1", null); + UserRoles.createNewRoleOrModifyItsPermissions(main, "role2", null); + } + + BulkImportTestUtils.createTenants(main); + + TenantIdentifier t1 = new TenantIdentifier(null, null, "t1"); + TenantIdentifier t2 = new TenantIdentifier(null, null, "t2"); + + BulkImportSQLStorage storage = (BulkImportSQLStorage) StorageLayer.getStorage(main); + AppIdentifier appIdentifier = new AppIdentifier(null, null); + + List usersT1 = generateBulkImportUser(50, List.of(t1.getTenantId()), 0); + List usersT2 = generateBulkImportUser(50, List.of(t2.getTenantId()), 50); + + List allUsers = new ArrayList<>(); + allUsers.addAll(usersT1); + allUsers.addAll(usersT2); + + BulkImport.addUsers(appIdentifier, storage, allUsers); + + Thread.sleep(2 * 60000); + + List usersAfterProcessing = storage.getBulkImportUsers(appIdentifier, 1000, null, + null, null); + + assertEquals(0, usersAfterProcessing.size()); + + Storage storageT1 = StorageLayer.getStorage(t1, main); + Storage storageT2 = StorageLayer.getStorage(t2, main); + + UserPaginationContainer containerT1 = AuthRecipe.getUsers(t1, storageT1, 500, "ASC", null, null, null); + UserPaginationContainer containerT2 = AuthRecipe.getUsers(t2, storageT2, 500, "ASC", null, null, null); + + assertEquals(usersT1.size() + usersT2.size(), containerT1.users.length + containerT2.users.length); + + UserIdMapping.populateExternalUserIdForUsers(appIdentifier, storageT1, containerT1.users); + UserIdMapping.populateExternalUserIdForUsers(appIdentifier, storageT2, containerT2.users); + + process.kill(); + assertNotNull(process.checkOrWaitForEvent(ProcessState.PROCESS_STATE.STOPPED)); + } + @Test public void shouldDeleteEverythingFromTheDBIfAnythingFails() throws Exception { // Creating a non-existing user role will result in an error. @@ -338,7 +458,7 @@ public void shouldDeleteEverythingFromTheDBIfAnythingFails() throws Exception { List users = generateBulkImportUser(1); BulkImport.addUsers(appIdentifier, storage, users); - Thread.sleep(6000); + Thread.sleep(12000); List usersAfterProcessing = storage.getBulkImportUsers(appIdentifier, 100, null, null, null); @@ -506,7 +626,7 @@ public void shouldThrowTenantHaveDifferentStoragesError() throws Exception { List users = generateBulkImportUser(1, List.of("t1", "t2"), 0); BulkImport.addUsers(appIdentifier, storage, users); - Thread.sleep(6000); + Thread.sleep(12000); List usersAfterProcessing = storage.getBulkImportUsers(appIdentifier, 100, null, null, null);