diff --git a/src/main/java/io/supertokens/StorageAndUserIdMappingForBulkImport.java b/src/main/java/io/supertokens/StorageAndUserIdMappingForBulkImport.java new file mode 100644 index 000000000..0daeedf96 --- /dev/null +++ b/src/main/java/io/supertokens/StorageAndUserIdMappingForBulkImport.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved. + * + * This software is licensed under the Apache License, Version 2.0 (the + * "License") as published by the Apache Software Foundation. + * + * You may not use this file except in compliance with the License. You may + * obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.supertokens; + +import io.supertokens.pluginInterface.Storage; +import io.supertokens.pluginInterface.useridmapping.UserIdMapping; + +public class StorageAndUserIdMappingForBulkImport extends StorageAndUserIdMapping { + + public String userIdInQuestion; + + public StorageAndUserIdMappingForBulkImport(Storage storage, + UserIdMapping userIdMapping, String userIdInQuestion) { + super(storage, userIdMapping); + this.userIdInQuestion = userIdInQuestion; + } +} diff --git a/src/main/java/io/supertokens/authRecipe/AuthRecipe.java b/src/main/java/io/supertokens/authRecipe/AuthRecipe.java index 1820a32c9..d8eb407ed 100644 --- a/src/main/java/io/supertokens/authRecipe/AuthRecipe.java +++ b/src/main/java/io/supertokens/authRecipe/AuthRecipe.java @@ -979,8 +979,7 @@ public static List createPrimaryUsers(Main main, List allDistinctEmails, List allDistinctPhones, Map thirdpartyUserIdsToThirdpartyIds) - throws StorageQueryException, AccountInfoAlreadyAssociatedWithAnotherPrimaryUserIdException, - RecipeUserIdAlreadyLinkedWithPrimaryUserIdException, UnknownUserIdException, TenantOrAppNotFoundException, + throws StorageQueryException, TenantOrAppNotFoundException, FeatureNotEnabledException { if (!Utils.isAccountLinkingEnabled(main, appIdentifier)) { throw new FeatureNotEnabledException( diff --git a/src/main/java/io/supertokens/bulkimport/BulkImport.java b/src/main/java/io/supertokens/bulkimport/BulkImport.java index d148f340b..abea35442 100644 --- a/src/main/java/io/supertokens/bulkimport/BulkImport.java +++ b/src/main/java/io/supertokens/bulkimport/BulkImport.java @@ -206,17 +206,15 @@ public static void processUsersImportSteps(Main main, TransactionConnection conn processUsersLoginMethods(main, appIdentifier, bulkImportProxyStorage, users); try { createPrimaryUsersAndLinkAccounts(main, appIdentifier, bulkImportProxyStorage, users); - } catch (AccountInfoAlreadyAssociatedWithAnotherPrimaryUserIdException | - RecipeUserIdAlreadyLinkedWithPrimaryUserIdException | StorageQueryException | FeatureNotEnabledException | - TenantOrAppNotFoundException | UnknownUserIdException e) { - throw new RuntimeException(e); + createMultipleUserIdMapping(appIdentifier, users, allStoragesForApp); + verifyMultipleEmailForAllLoginMethods(appIdentifier, bulkImportProxyStorage, users); + createMultipleTotpDevices(main, appIdentifier, bulkImportProxyStorage, users); + createMultipleUserMetadata(appIdentifier, bulkImportProxyStorage, users); + createMultipleUserRoles(main, appIdentifier, bulkImportProxyStorage, users); + } catch ( StorageQueryException | FeatureNotEnabledException | + TenantOrAppNotFoundException e) { + throw new StorageTransactionLogicException(e); } - - createMultipleUserIdMapping(appIdentifier, users, allStoragesForApp); - verifyMultipleEmailForAllLoginMethods(appIdentifier, bulkImportProxyStorage, users); - createMultipleTotpDevices(main, appIdentifier, bulkImportProxyStorage, users); - createMultipleUserMetadata(appIdentifier, bulkImportProxyStorage, users); - createMultipleUserRoles(main, appIdentifier, bulkImportProxyStorage, users); } public static void processUsersLoginMethods(Main main, AppIdentifier appIdentifier, Storage storage, @@ -456,9 +454,8 @@ private static void associateUserToTenants(Main main, AppIdentifier appIdentifie private static void createPrimaryUsersAndLinkAccounts(Main main, AppIdentifier appIdentifier, Storage storage, List users) - throws StorageTransactionLogicException, AccountInfoAlreadyAssociatedWithAnotherPrimaryUserIdException, - RecipeUserIdAlreadyLinkedWithPrimaryUserIdException, StorageQueryException, FeatureNotEnabledException, - TenantOrAppNotFoundException, UnknownUserIdException { + throws StorageTransactionLogicException, StorageQueryException, FeatureNotEnabledException, + TenantOrAppNotFoundException { List userIds = users.stream() .map(bulkImportUser -> getPrimaryLoginMethod(bulkImportUser).getSuperTokenOrExternalUserId()) @@ -591,6 +588,7 @@ public static void createMultipleUserIdMapping(AppIdentifier appIdentifier, if(user.externalUserId != null) { LoginMethod primaryLoginMethod = getPrimaryLoginMethod(user); superTokensUserIdToExternalUserId.put(primaryLoginMethod.superTokensUserId, user.externalUserId); + primaryLoginMethod.externalUserId = user.externalUserId; } } try { @@ -645,7 +643,7 @@ public static void createMultipleUserMetadata(AppIdentifier appIdentifier, Stora public static void createMultipleUserRoles(Main main, AppIdentifier appIdentifier, Storage storage, List users) throws StorageTransactionLogicException { - Map> rolesToUserByTenant = new HashMap<>(); + Map>> rolesToUserByTenant = new HashMap<>(); for (BulkImportUser user : users) { if (user.userRoles != null) { @@ -658,24 +656,34 @@ public static void createMultipleUserRoles(Main main, AppIdentifier appIdentifie rolesToUserByTenant.put(tenantIdentifier, new HashMap<>()); } - rolesToUserByTenant.get(tenantIdentifier).put(user.externalUserId, userRole.role); + if(!rolesToUserByTenant.get(tenantIdentifier).containsKey(user.externalUserId)){ + rolesToUserByTenant.get(tenantIdentifier).put(user.externalUserId, new ArrayList<>()); + } + rolesToUserByTenant.get(tenantIdentifier).get(user.externalUserId).add(userRole.role); } } } } try { - UserRoles.addMultipleRolesToMultipleUsers(main, storage, rolesToUserByTenant); + UserRoles.addMultipleRolesToMultipleUsers(main, appIdentifier, storage, rolesToUserByTenant); } catch (TenantOrAppNotFoundException e) { throw new StorageTransactionLogicException(new Exception("E033: " + e.getMessage())); } catch (StorageTransactionLogicException e) { - if(e.actualException instanceof UnknownRoleException){ - throw new StorageTransactionLogicException(new Exception("E034: Role " - + " does not exist! You need pre-create the role before assigning it to the user.")); + if(e.actualException instanceof BulkImportBatchInsertException){ + Map errorsByPosition = ((BulkImportBatchInsertException) e.getCause()).exceptionByUserId; + for (String userid : errorsByPosition.keySet()) { + Exception exception = errorsByPosition.get(userid); + if (exception instanceof UnknownRoleException) { + String message = "E034: Role does not exist! You need to pre-create the role before " + + "assigning it to the user."; + errorsByPosition.put(userid, new Exception(message)); + } + } + throw new StorageTransactionLogicException(new BulkImportBatchInsertException("roles errors translated", errorsByPosition)); } else { throw new StorageTransactionLogicException(e); } - } } @@ -686,7 +694,7 @@ public static void verifyMultipleEmailForAllLoginMethods(AppIdentifier appIdenti Map emailToUserId = new HashMap<>(); for (BulkImportUser user : users) { for (LoginMethod lm : user.loginMethods) { - emailToUserId.put(lm.email, lm.getSuperTokenOrExternalUserId()); + emailToUserId.put(lm.getSuperTokenOrExternalUserId(), lm.email); } } diff --git a/src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkImportUsers.java b/src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkImportUsers.java index 32ce82077..74e73a2ae 100644 --- a/src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkImportUsers.java +++ b/src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkImportUsers.java @@ -35,7 +35,10 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -71,10 +74,8 @@ protected void doTaskPerApp(AppIdentifier app) String[] allUserRoles = StorageUtils.getUserRolesStorage(bulkImportSQLStorage).getRoles(app); BulkImportUserUtils bulkImportUserUtils = new BulkImportUserUtils(allUserRoles); - System.out.println(Thread.currentThread().getName() + " ProcessBulkImportUsers: " + " starting to load users " + this.batchSize); List users = bulkImportSQLStorage.getBulkImportUsersAndChangeStatusToProcessing(app, this.batchSize); - System.out.println(Thread.currentThread().getName() + " ProcessBulkImportUsers: " + " loaded users"); if(users == null || users.isEmpty()) { return; @@ -91,7 +92,7 @@ protected void doTaskPerApp(AppIdentifier app) try { List> tasks = new ArrayList<>(); - for (int i =0; i< NUMBER_OF_BATCHES; i++) { + for (int i =0; i< NUMBER_OF_BATCHES && i < loadedUsersChunks.size(); i++) { tasks.add(executorService.submit(new ProcessBulkUsersImportWorker(main, app, loadedUsersChunks.get(i), bulkImportSQLStorage, bulkImportUserUtils))); } @@ -101,14 +102,9 @@ protected void doTaskPerApp(AppIdentifier app) Thread.sleep(1000); } Void result = (Void) task.get(); //to know if there were any errors while executing and for waiting in this thread for all the other threads to finish up - System.out.println("Result: " + result); } - - executorService.shutdownNow(); - if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { - System.out.println("Pool did not terminate"); - } + } catch (ExecutionException | InterruptedException e) { throw new RuntimeException(e); } diff --git a/src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkUsersImportWorker.java b/src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkUsersImportWorker.java index 5b8da78e9..855f616a8 100644 --- a/src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkUsersImportWorker.java +++ b/src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkUsersImportWorker.java @@ -43,10 +43,7 @@ import io.supertokens.storageLayer.StorageLayer; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; public class ProcessBulkUsersImportWorker implements Runnable { @@ -83,23 +80,32 @@ private void processMultipleUsers(AppIdentifier appIdentifier, List validUsers = new ArrayList<>(); + Map validationErrorsBeforeActualProcessing = new HashMap<>(); while(userIndexPointer < users.size()) { user = users.get(userIndexPointer); - if ((Main.isTesting && Main.isTesting_skipBulkImportUserValidationInCronJob) || - shouldRetryImmediately) { + if (Main.isTesting && Main.isTesting_skipBulkImportUserValidationInCronJob) { // Skip validation when the flag is enabled during testing // Skip validation if it's a retry run. This already passed validation. A revalidation triggers // an invalid external user id already exists validation error - which is not true! - //TODO set invalid users status to failed + validUsers.add(user); } else { // Validate the user - validUsers.add(bulkImportUserUtils.createBulkImportUserFromJSON(main, appIdentifier, user.toJsonObject(), user.id)); + try { + validUsers.add(bulkImportUserUtils.createBulkImportUserFromJSON(main, appIdentifier, + user.toJsonObject(), user.id)); + } catch (InvalidBulkImportDataException exception) { + validationErrorsBeforeActualProcessing.put(user.id, new Exception( + String.valueOf(exception.errors))); + } } userIndexPointer+=1; } + + if(!validationErrorsBeforeActualProcessing.isEmpty()) { + throw new BulkImportBatchInsertException("Invalid input data", validationErrorsBeforeActualProcessing); + } // Since all the tenants of a user must share the storage, we will just use the // storage of the first tenantId of the first loginMethod TenantIdentifier firstTenantIdentifier = new TenantIdentifier(appIdentifier.getConnectionUriDomain(), @@ -107,7 +113,7 @@ private void processMultipleUsers(AppIdentifier appIdentifier, List { + bulkImportProxyStorage.startTransaction(con -> { try { BulkImport.processUsersImportSteps(main, con, appIdentifier, bulkImportProxyStorage, validUsers, allStoragesForApp); @@ -132,12 +138,10 @@ private void processMultipleUsers(AppIdentifier appIdentifier, List userIndexToError = ((BulkImportBatchInsertException) exception.actualException).exceptionByUserId; - for(String userid : userIndexToError.keySet()){ - String id = usersBatch.stream() - .filter(bulkImportUser -> - bulkImportUser.loginMethods.stream() - .map(loginMethod -> loginMethod.superTokensUserId) - .anyMatch(s -> s.equals(userid))).findFirst().get().id; - bulkImportUserIdToErrorMessage.put(id, userIndexToError.get(userid).getMessage()); - } + handleBulkImportException(usersBatch, (BulkImportBatchInsertException) exception.actualException, bulkImportUserIdToErrorMessage); } else { //fail the whole batch errorMessage[0] = exception.actualException.getMessage(); @@ -190,6 +186,8 @@ private void handleProcessUserExceptions(AppIdentifier appIdentifier, List usersBatch, BulkImportBatchInsertException exception, + Map bulkImportUserIdToErrorMessage) { + Map userIndexToError = exception.exceptionByUserId; + for(String userid : userIndexToError.keySet()){ + Optional userWithId = usersBatch.stream() + .filter(bulkImportUser -> bulkImportUser.id.equals(userid) || bulkImportUser.externalUserId.equals(userid)).findFirst(); + String id = null; + if(userWithId.isPresent()){ + id = userWithId.get().id; + } + + if(id == null) { + userWithId = usersBatch.stream() + .filter(bulkImportUser -> + bulkImportUser.loginMethods.stream() + .map(loginMethod -> loginMethod.superTokensUserId) + .anyMatch(s -> s.equals(userid))).findFirst(); + if(userWithId.isPresent()){ + id = userWithId.get().id; + } + } + bulkImportUserIdToErrorMessage.put(id, userIndexToError.get(userid).getMessage()); + } + } + private synchronized Storage getBulkImportProxyStorage(TenantIdentifier tenantIdentifier) throws InvalidConfigException, IOException, TenantOrAppNotFoundException, DbInitException { String userPoolId = StorageLayer.getStorage(tenantIdentifier, main).getUserPoolId(); @@ -229,7 +252,7 @@ private synchronized Storage getBulkImportProxyStorage(TenantIdentifier tenantId throw new TenantOrAppNotFoundException(tenantIdentifier); } - private Storage[] getAllProxyStoragesForApp(Main main, AppIdentifier appIdentifier) + private synchronized Storage[] getAllProxyStoragesForApp(Main main, AppIdentifier appIdentifier) throws StorageTransactionLogicException { try { diff --git a/src/main/java/io/supertokens/inmemorydb/Start.java b/src/main/java/io/supertokens/inmemorydb/Start.java index 5ca0ecbb1..18f290816 100644 --- a/src/main/java/io/supertokens/inmemorydb/Start.java +++ b/src/main/java/io/supertokens/inmemorydb/Start.java @@ -658,7 +658,8 @@ public boolean isUserIdBeingUsedInNonAuthRecipe(AppIdentifier appIdentifier, Str public Map> findNonAuthRecipesWhereForUserIdsUsed(AppIdentifier appIdentifier, List userIds) throws StorageQueryException { - return Map.of(); + throw new UnsupportedOperationException("'findNonAuthRecipesWhereForUserIdsUsed' is not supported for in-memory db"); + } @TestOnly @@ -933,7 +934,7 @@ public void deleteEmailPasswordUser_Transaction(TransactionConnection con, AppId public void signUpMultipleViaBulkImport_Transaction(TransactionConnection connection, List users) throws StorageQueryException, StorageTransactionLogicException { - //TODO + throw new UnsupportedOperationException("'signUpMultipleViaBulkImport_Transaction' is not supported for in-memory db"); } @Override @@ -1156,7 +1157,7 @@ public void updateIsEmailVerifiedToExternalUserId(AppIdentifier appIdentifier, S public void updateMultipleIsEmailVerifiedToExternalUserIds(AppIdentifier appIdentifier, Map supertokensUserIdToExternalUserId) throws StorageQueryException { - + throw new UnsupportedOperationException("'updateMultipleIsEmailVerifiedToExternalUserIds' is not supported for in-memory db"); } @Override @@ -1197,14 +1198,14 @@ public void deleteThirdPartyUser_Transaction(TransactionConnection con, AppIdent public void importThirdPartyUsers_Transaction(TransactionConnection con, List usersToImport) throws StorageQueryException, StorageTransactionLogicException { - // TODO + throw new UnsupportedOperationException("'importThirdPartyUsers_Transaction' is not supported for in-memory db"); } @Override public void importPasswordlessUsers_Transaction(TransactionConnection con, List users) throws StorageQueryException { - // TODO + throw new UnsupportedOperationException("'importPasswordlessUsers_Transaction' is not supported for in-memory db"); } @Override @@ -1350,7 +1351,7 @@ public boolean doesUserIdExist(TenantIdentifier tenantIdentifier, String userId) @Override public List findExistingUserIds(AppIdentifier appIdentifier, List userIds) throws StorageQueryException { - return List.of(); // TODO + throw new UnsupportedOperationException("'findExistingUserIds' is not supported for in-memory db"); } @Override @@ -1917,7 +1918,7 @@ public Map getMultipleUsersMetadatas_Transaction(AppIdentifi TransactionConnection con, List userIds) throws StorageQueryException { - return Map.of(); // TODO + throw new UnsupportedOperationException("'getMultipleUsersMetadatas_Transaction' is not supported for in-memory db"); } @@ -1951,7 +1952,7 @@ public int setUserMetadata_Transaction(AppIdentifier appIdentifier, TransactionC public void setMultipleUsersMetadatas_Transaction(AppIdentifier appIdentifier, TransactionConnection con, Map metadataByUserId) throws StorageQueryException, TenantOrAppNotFoundException { - //TODO + throw new UnsupportedOperationException("'setMultipleUsersMetadatas_Transaction' is not supported for in-memory db"); } @Override @@ -2199,10 +2200,10 @@ public boolean doesRoleExist_Transaction(AppIdentifier appIdentifier, Transactio } @Override - public List doesMultipleRoleExist_Transaction(AppIdentifier appIdentifier, TransactionConnection con, + public List doesMultipleRoleExist_Transaction(AppIdentifier appIdentifier, TransactionConnection con, List roles) throws StorageQueryException { - // TODO - return List.of(); + throw new UnsupportedOperationException("'doesMultipleRoleExist_Transaction' is not supported for in-memory db"); + } @Override @@ -2218,9 +2219,9 @@ public void deleteAllRolesForUser_Transaction(TransactionConnection con, AppIden @Override public void addRolesToUsers_Transaction(TransactionConnection connection, - Map> rolesToUserByTenants) + Map>> rolesToUserByTenants) throws StorageQueryException { - // TODO + throw new UnsupportedOperationException("'addRolesToUsers_Transaction' is not supported for in-memory db"); } @Override @@ -2267,7 +2268,7 @@ public void createUserIdMapping(AppIdentifier appIdentifier, String superTokensU public void createBulkUserIdMapping(AppIdentifier appIdentifier, Map superTokensUserIdToExternalUserId) throws StorageQueryException { - + throw new UnsupportedOperationException("'createBulkUserIdMapping' is not supported for in-memory db"); } @Override @@ -2775,7 +2776,7 @@ public TOTPDevice createDevice_Transaction(TransactionConnection con, AppIdentif public void createDevices_Transaction(TransactionConnection con, AppIdentifier appIdentifier, List devices) throws StorageQueryException, TenantOrAppNotFoundException { - // TODO + throw new UnsupportedOperationException("'createDevices_Transaction' is not supported for in-memory db"); } @Override @@ -3014,7 +3015,7 @@ public AuthRecipeUserInfo[] listPrimaryUsersByEmail_Transaction(AppIdentifier ap public AuthRecipeUserInfo[] listPrimaryUsersByMultipleEmailsOrPhoneNumbersOrThirdparty_Transaction( AppIdentifier appIdentifier, TransactionConnection con, List emails, List phones, Map thirdpartyIdToThirdpartyUserId) throws StorageQueryException { - return new AuthRecipeUserInfo[0]; // TODO + throw new UnsupportedOperationException("'listPrimaryUsersByMultipleEmailsOrPhoneNumbersOrThirdparty_Transaction' is not supported for in-memory db"); } @Override @@ -3102,7 +3103,7 @@ public void linkAccounts_Transaction(AppIdentifier appIdentifier, TransactionCon public void linkMultipleAccounts_Transaction(AppIdentifier appIdentifier, TransactionConnection con, Map recipeUserIdByPrimaryUserId) throws StorageQueryException { - // TODO + throw new UnsupportedOperationException("'linkMultipleAccounts_Transaction' is not supported for in-memory db"); } @Override @@ -3187,7 +3188,7 @@ public List getMultipleUserIdMapping_Transaction(TransactionConne AppIdentifier appIdentifier, List userIds, boolean isSupertokensIds) throws StorageQueryException { - return List.of(); // TODO + throw new UnsupportedOperationException("'getMultipleUserIdMapping_Transaction' is not supported for in-memory db"); } @Override diff --git a/src/main/java/io/supertokens/storageLayer/StorageLayer.java b/src/main/java/io/supertokens/storageLayer/StorageLayer.java index 630110a03..c054b7feb 100644 --- a/src/main/java/io/supertokens/storageLayer/StorageLayer.java +++ b/src/main/java/io/supertokens/storageLayer/StorageLayer.java @@ -17,10 +17,7 @@ package io.supertokens.storageLayer; import com.google.gson.JsonObject; -import io.supertokens.Main; -import io.supertokens.ProcessState; -import io.supertokens.ResourceDistributor; -import io.supertokens.StorageAndUserIdMapping; +import io.supertokens.*; import io.supertokens.cliOptions.CLIOptions; import io.supertokens.config.Config; import io.supertokens.exceptions.QuitProgramException; @@ -609,10 +606,7 @@ public static List findStorageAndUserIdMappingForBulkUs .filter(userIdMapping -> (userIdType == UserIdType.SUPERTOKENS && userIdMapping.superTokensUserId.equals(existingId)) || (userIdType == UserIdType.EXTERNAL && userIdMapping.externalUserId.equals(existingId)) ) .findFirst().orElse(null); - if(mappingForId == null && userIdType == UserIdType.SUPERTOKENS) { - mappingForId = new UserIdMapping(existingId, null, null); - } - allMappingsFromAllStorages.add(new StorageAndUserIdMapping(storage, mappingForId)); + allMappingsFromAllStorages.add(new StorageAndUserIdMappingForBulkImport(storage, mappingForId, existingId)); } } } else { diff --git a/src/main/java/io/supertokens/useridmapping/UserIdMapping.java b/src/main/java/io/supertokens/useridmapping/UserIdMapping.java index 1e9f2c117..453ec6e54 100644 --- a/src/main/java/io/supertokens/useridmapping/UserIdMapping.java +++ b/src/main/java/io/supertokens/useridmapping/UserIdMapping.java @@ -18,6 +18,7 @@ import io.supertokens.Main; import io.supertokens.StorageAndUserIdMapping; +import io.supertokens.StorageAndUserIdMappingForBulkImport; import io.supertokens.pluginInterface.Storage; import io.supertokens.pluginInterface.StorageUtils; import io.supertokens.pluginInterface.authRecipe.AuthRecipeUserInfo; @@ -60,6 +61,15 @@ public UserIdBulkMappingResult(String supertokensUserId, String externalUserId, this.error = error; this.externalUserId = externalUserId; } + + @Override + public String toString() { + return "UserIdBulkMappingResult{" + + "supertokensUserId='" + supertokensUserId + '\'' + + ", externalUserId='" + externalUserId + '\'' + + ", error=" + error + + '}'; + } } @TestOnly @@ -218,16 +228,20 @@ public static List createMultipleUserIdMappings(AppIden for(Map.Entry supertokensIdToExternalId : superTokensUserIdToExternalUserId.entrySet()) { String supertokensId = supertokensIdToExternalId.getKey(); String externalId = supertokensIdToExternalId.getValue(); - StorageAndUserIdMapping mappingByExternal = findStorageAndUserIdMappingForUser(externalId, mappingAndStorageWithExternal, false); - if (mappingByExternal != null && mappingByExternal.userIdMapping != null ){ + StorageAndUserIdMapping mappingByExternal = findStorageAndUserIdMappingForUser(externalId, + mappingAndStorageWithExternal, false); + if (mappingByExternal != null && mappingByExternal.userIdMapping != null) { mappingResults.add(new UserIdBulkMappingResult(supertokensId, externalId, - new UserIdMappingAlreadyExistsException(supertokensId.equals(mappingByExternal.userIdMapping.superTokensUserId), + new UserIdMappingAlreadyExistsException( + supertokensId.equals(mappingByExternal.userIdMapping.superTokensUserId), externalId.equals(mappingByExternal.userIdMapping.externalUserId)))); continue; } - StorageAndUserIdMapping mappingBySupertokens = findStorageAndUserIdMappingForUser(supertokensId, mappingAndStorageWithSupertokens, true); - if(mappingBySupertokens == null) { - mappingResults.add(new UserIdBulkMappingResult(supertokensId, externalId, new UnknownSuperTokensUserIdException())); + StorageAndUserIdMapping mappingBySupertokens = findStorageAndUserIdMappingForUser(supertokensId, + mappingAndStorageWithSupertokens, true); + if (mappingBySupertokens == null) { + mappingResults.add(new UserIdBulkMappingResult(supertokensId, externalId, + new UnknownSuperTokensUserIdException())); continue; } Storage userStorage = mappingBySupertokens.storage; @@ -242,14 +256,15 @@ public static List createMultipleUserIdMappings(AppIden { if (findStorageAndUserIdMappingForUser(externalId, mappingAndStoragesAsInvalid, true) != null) { - mappingResults.add(new UserIdBulkMappingResult(supertokensId, externalId, new ServletException(new WebserverAPI.BadRequestException( - "Cannot create a userId mapping where the externalId is also a SuperTokens userID")))); + mappingResults.add(new UserIdBulkMappingResult(supertokensId, externalId, + new ServletException(new WebserverAPI.BadRequestException( + "Cannot create a userId mapping where the externalId is also a SuperTokens userID")))); continue; } } List storageClasses; - if(userIdsUsedInNonAuthRecipes.containsKey(supertokensId)){ + if (userIdsUsedInNonAuthRecipes.containsKey(supertokensId)) { storageClasses = userIdsUsedInNonAuthRecipes.get(supertokensId); } else { storageClasses = new ArrayList<>(); @@ -280,35 +295,44 @@ public static List createMultipleUserIdMappings(AppIden } } else { //if we are not making any exceptions, then having the id used is an error! - if(!storageClasses.isEmpty()) { - createBulkIdMappingErrorForNonAuthRecipeUsage(storageClasses, mappingResults, supertokensId, externalId); + if (!storageClasses.isEmpty()) { + createBulkIdMappingErrorForNonAuthRecipeUsage(storageClasses, mappingResults, supertokensId, + externalId); continue; } } noErrorFound.add(mappingBySupertokens); } + } + //userstorage - group users by storage + Map> partitionedMappings = partitionUsersByStorage(noErrorFound); + for(Storage storage : partitionedMappings.keySet()){ - //userstorage - group users by storage - Map> partitionedMappings = partitionUsersByStorage(noErrorFound); - for(Storage storage : partitionedMappings.keySet()){ - List mappingsForCurrentStorage = partitionedMappings.get(storage); + List mappingsForCurrentStorage = partitionedMappings.get(storage); + Map mappingInCurrentStorageThatNeedsToBeDone = new HashMap<>(); + Map supertokensIdToExternalIdInCurrentStorageForEmailUpdate = new HashMap<>(); - Map supertokensIdToExternalIdInCurrentStorage = new HashMap<>(); - for(StorageAndUserIdMapping storageAndUserIdMapping: mappingsForCurrentStorage) { - supertokensIdToExternalIdInCurrentStorage.put(storageAndUserIdMapping.userIdMapping.superTokensUserId, - superTokensUserIdToExternalUserId.get(storageAndUserIdMapping.userIdMapping.superTokensUserId)); + for(StorageAndUserIdMapping storageAndUserIdMapping: mappingsForCurrentStorage) { + String userIdInQuestion = ((StorageAndUserIdMappingForBulkImport)storageAndUserIdMapping).userIdInQuestion; + + if(supertokensToExternalUserIdsToUpdateEmailVerified.keySet().contains(userIdInQuestion)){ + supertokensIdToExternalIdInCurrentStorageForEmailUpdate.put(userIdInQuestion, + superTokensUserIdToExternalUserId.get(userIdInQuestion)); } + mappingInCurrentStorageThatNeedsToBeDone.put(userIdInQuestion, superTokensUserIdToExternalUserId.get(userIdInQuestion)); + } - EmailVerificationStorage emailVerificationStorage = StorageUtils.getEmailVerificationStorage(storage); - emailVerificationStorage.updateMultipleIsEmailVerifiedToExternalUserIds(appIdentifier, supertokensIdToExternalIdInCurrentStorage); + StorageUtils.getUserIdMappingStorage(storage).createBulkUserIdMapping(appIdentifier, mappingInCurrentStorageThatNeedsToBeDone); - StorageUtils.getUserIdMappingStorage(storage).createBulkUserIdMapping(appIdentifier, supertokensIdToExternalIdInCurrentStorage); - for(String supertokensIdForResult : supertokensIdToExternalIdInCurrentStorage.keySet()) { - mappingResults.add(new UserIdBulkMappingResult(supertokensIdForResult, supertokensIdToExternalIdInCurrentStorage.get(supertokensIdForResult), null)); - } + EmailVerificationStorage emailVerificationStorage = StorageUtils.getEmailVerificationStorage(storage); + emailVerificationStorage.updateMultipleIsEmailVerifiedToExternalUserIds(appIdentifier, supertokensIdToExternalIdInCurrentStorageForEmailUpdate); + + for(String supertokensIdForResult : mappingInCurrentStorageThatNeedsToBeDone.keySet()) { + mappingResults.add(new UserIdBulkMappingResult(supertokensIdForResult, mappingInCurrentStorageThatNeedsToBeDone.get(supertokensIdForResult), null)); } } + Map errors = new HashMap<>(); for(UserIdBulkMappingResult result : mappingResults){ if(result.error != null) { @@ -345,16 +369,18 @@ private static Map> partitionUsersByStora private static StorageAndUserIdMapping findStorageAndUserIdMappingForUser(String userId, List findIn, boolean supertokensId) { List mappings = findIn.stream().filter(storageAndUserIdMapping -> { - if(storageAndUserIdMapping.userIdMapping != null) { + if(storageAndUserIdMapping instanceof StorageAndUserIdMappingForBulkImport && ((StorageAndUserIdMappingForBulkImport) storageAndUserIdMapping).userIdInQuestion != null) { + return ((StorageAndUserIdMappingForBulkImport)storageAndUserIdMapping).userIdInQuestion.equals(userId); + } else if(storageAndUserIdMapping.userIdMapping != null) { if(supertokensId) { - return storageAndUserIdMapping.userIdMapping.superTokensUserId.equals(userId); + return userId.equals(storageAndUserIdMapping.userIdMapping.superTokensUserId); } else { - return storageAndUserIdMapping.userIdMapping.externalUserId.equals(userId); + return userId.equals(storageAndUserIdMapping.userIdMapping.externalUserId); } } return false; }).collect(Collectors.toList()); // theoretically it shouldn't happen that there are more than one element in the list - if(mappings.size() > 1) { + if(mappings.size() > 1 && !(mappings.get(0) instanceof StorageAndUserIdMappingForBulkImport)) { throw new IllegalStateException("more than one mapping exists for Id."); } return mappings.isEmpty() ? null : mappings.get(0); diff --git a/src/main/java/io/supertokens/userroles/UserRoles.java b/src/main/java/io/supertokens/userroles/UserRoles.java index 478ca23c8..8d88b637a 100644 --- a/src/main/java/io/supertokens/userroles/UserRoles.java +++ b/src/main/java/io/supertokens/userroles/UserRoles.java @@ -19,6 +19,7 @@ import io.supertokens.Main; import io.supertokens.pluginInterface.Storage; import io.supertokens.pluginInterface.StorageUtils; +import io.supertokens.pluginInterface.bulkimport.exceptions.BulkImportBatchInsertException; import io.supertokens.pluginInterface.exceptions.StorageQueryException; import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException; import io.supertokens.pluginInterface.multitenancy.AppIdentifier; @@ -31,9 +32,7 @@ import org.jetbrains.annotations.TestOnly; import javax.annotation.Nullable; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; +import java.util.*; public class UserRoles { // add a role to a user and return true, if the role is already mapped to the user return false, but if @@ -59,34 +58,52 @@ public static boolean addRoleToUser(Main main, TenantIdentifier tenantIdentifier } } - public static void addMultipleRolesToMultipleUsers(Main main, Storage storage, Map> rolesToUserByTenant) + public static void addMultipleRolesToMultipleUsers(Main main, AppIdentifier appIdentifier, Storage storage, + Map>> rolesToUserByTenant) throws StorageTransactionLogicException, TenantOrAppNotFoundException { // Roles are stored in public tenant storage and role to user mapping is stored in the tenant's storage // We do this because it's not straight forward to replicate roles to all storages of an app - for(TenantIdentifier tenantIdentifier : rolesToUserByTenant.keySet()){ - Storage appStorage = StorageLayer.getStorage( - tenantIdentifier.toAppIdentifier().getAsPublicTenantIdentifier(), main); - - try { - UserRolesSQLStorage userRolesStorage = StorageUtils.getUserRolesStorage(storage); - userRolesStorage.startTransaction(con -> { - - List rolesFound = ((UserRolesSQLStorage) appStorage).doesMultipleRoleExist_Transaction( - tenantIdentifier.toAppIdentifier().getAsPublicTenantIdentifier().toAppIdentifier(), - con, new ArrayList<>(rolesToUserByTenant.get(tenantIdentifier).values())); + Storage appStorage = StorageLayer.getStorage( + appIdentifier.getAsPublicTenantIdentifier(), main); - if(rolesFound.contains(Boolean.FALSE)){ - throw new StorageTransactionLogicException(new UnknownRoleException()); + try { + UserRolesSQLStorage userRolesStorage = StorageUtils.getUserRolesStorage(storage); + UserRolesSQLStorage publicRoleStorage = StorageUtils.getUserRolesStorage(appStorage); + Map errorsByUser = new HashMap<>(); + publicRoleStorage.startTransaction(con -> { + Set rolesToSearchFor = new HashSet<>(); + for (TenantIdentifier tenantIdentifier : rolesToUserByTenant.keySet()) { + for(String userId : rolesToUserByTenant.get(tenantIdentifier).keySet()){ + rolesToSearchFor.addAll(rolesToUserByTenant.get(tenantIdentifier).get(userId)); } - userRolesStorage.addRolesToUsers_Transaction(con, rolesToUserByTenant); - userRolesStorage.commitTransaction(con); - return null; - }); + } + List rolesFound = ((UserRolesSQLStorage) appStorage).doesMultipleRoleExist_Transaction( + appIdentifier, con, + new ArrayList<>(rolesToSearchFor)); + + for (Map> rolesToUsers : rolesToUserByTenant.values()) { + for (String userId : rolesToUsers.keySet()) { + List rolesOfUser = rolesToUsers.get(userId); + if (!new HashSet<>(rolesFound).containsAll(rolesOfUser)) { //wrapping in hashset for performance reasons + errorsByUser.put(userId, new UnknownRoleException()); + } + } + } + if (!errorsByUser.isEmpty()) { + throw new StorageTransactionLogicException( + new BulkImportBatchInsertException("Roles errors", errorsByUser)); + } + return null; + }); + userRolesStorage.startTransaction(con -> { + userRolesStorage.addRolesToUsers_Transaction(con, rolesToUserByTenant); + userRolesStorage.commitTransaction(con); + return null; + }); - } catch (StorageQueryException e) { - throw new StorageTransactionLogicException(e); - } + } catch (StorageQueryException e) { + throw new StorageTransactionLogicException(e); } } diff --git a/src/main/java/io/supertokens/webserver/api/bulkimport/ImportUserAPI.java b/src/main/java/io/supertokens/webserver/api/bulkimport/ImportUserAPI.java index f21dd809c..cc8bcd6f8 100644 --- a/src/main/java/io/supertokens/webserver/api/bulkimport/ImportUserAPI.java +++ b/src/main/java/io/supertokens/webserver/api/bulkimport/ImportUserAPI.java @@ -27,6 +27,7 @@ import io.supertokens.pluginInterface.StorageUtils; import io.supertokens.pluginInterface.authRecipe.AuthRecipeUserInfo; import io.supertokens.pluginInterface.bulkimport.BulkImportUser; +import io.supertokens.pluginInterface.bulkimport.exceptions.BulkImportBatchInsertException; import io.supertokens.pluginInterface.exceptions.DbInitException; import io.supertokens.pluginInterface.exceptions.InvalidConfigException; import io.supertokens.pluginInterface.exceptions.StorageQueryException; @@ -86,16 +87,30 @@ protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws S result.addProperty("status", "OK"); result.add("user", importedUser.toJson()); super.sendJsonResponse(200, result, resp); + } catch (StorageQueryException e) { + JsonArray errors = new JsonArray(); + if(e.getCause() instanceof BulkImportBatchInsertException){ + BulkImportBatchInsertException insertException = (BulkImportBatchInsertException) e.getCause(); + errors.addAll( + insertException.exceptionByUserId.values().stream().map(exc -> exc.getMessage()).map(JsonPrimitive::new) + .collect(JsonArray::new, JsonArray::add, JsonArray::addAll) + ); + } else { + errors.add(new JsonPrimitive(e.getMessage())); + } + + JsonObject errorResponseJson = new JsonObject(); + errorResponseJson.add("errors", errors); + throw new ServletException(new WebserverAPI.BadRequestException(errorResponseJson.toString())); + } catch (TenantOrAppNotFoundException | InvalidConfigException | DbInitException e) { + throw new ServletException(e); } catch (io.supertokens.bulkimport.exceptions.InvalidBulkImportDataException e) { JsonArray errors = e.errors.stream() .map(JsonPrimitive::new) .collect(JsonArray::new, JsonArray::add, JsonArray::addAll); - JsonObject errorResponseJson = new JsonObject(); errorResponseJson.add("errors", errors); throw new ServletException(new WebserverAPI.BadRequestException(errorResponseJson.toString())); - } catch (StorageQueryException | TenantOrAppNotFoundException | InvalidConfigException | DbInitException e) { - throw new ServletException(e); } } } diff --git a/src/test/java/io/supertokens/test/bulkimport/BulkImportFlowTest.java b/src/test/java/io/supertokens/test/bulkimport/BulkImportFlowTest.java index 509fa5e23..bbf16e3b9 100644 --- a/src/test/java/io/supertokens/test/bulkimport/BulkImportFlowTest.java +++ b/src/test/java/io/supertokens/test/bulkimport/BulkImportFlowTest.java @@ -22,11 +22,12 @@ import com.google.gson.JsonParser; import io.supertokens.Main; import io.supertokens.ProcessState; -import io.supertokens.config.Config; import io.supertokens.featureflag.EE_FEATURES; import io.supertokens.featureflag.FeatureFlagTestContent; +import io.supertokens.multitenancy.Multitenancy; import io.supertokens.pluginInterface.STORAGE_TYPE; import io.supertokens.pluginInterface.bulkimport.BulkImportStorage; +import io.supertokens.pluginInterface.multitenancy.*; import io.supertokens.storageLayer.StorageLayer; import io.supertokens.test.TestingProcessManager; import io.supertokens.test.Utils; @@ -39,8 +40,6 @@ import org.junit.Test; import org.junit.rules.TestRule; -import java.io.File; -import java.io.FileWriter; import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -78,11 +77,7 @@ public void testWithOneMillionUsers() throws Exception { setFeatureFlags(main, new EE_FEATURES[] { EE_FEATURES.ACCOUNT_LINKING, EE_FEATURES.MULTI_TENANCY, EE_FEATURES.MFA }); - //int NUMBER_OF_USERS_TO_UPLOAD = 1000000; // million - int NUMBER_OF_USERS_TO_UPLOAD = 10000; - int parallelism_set_to = Config.getConfig(main).getBulkMigrationParallelism(); - System.out.println("Number of users to be imported with bulk import: " + NUMBER_OF_USERS_TO_UPLOAD); - System.out.println("Worker threads: " + parallelism_set_to); + int NUMBER_OF_USERS_TO_UPLOAD = 1000000; // million if (StorageLayer.getBaseStorage(main).getType() != STORAGE_TYPE.SQL || StorageLayer.isInMemDb(main)) { return; @@ -100,16 +95,13 @@ public void testWithOneMillionUsers() throws Exception { JsonObject request = generateUsersJson(10000, i * 10000); // API allows 10k users upload at once JsonObject response = uploadBulkImportUsersJson(main, request); assertEquals("OK", response.get("status").getAsString()); - System.out.println(i + " Uploaded 10k users for bulk import"); } } - long processingStartedTime = System.currentTimeMillis(); - + long processingStarted = System.currentTimeMillis(); // Starting the processing cronjob here to be able to measure the runtime startBulkImportCronjob(main, 8000); - System.out.println("CronJob started"); // wait for the cron job to process them // periodically check the remaining unprocessed users @@ -121,36 +113,21 @@ public void testWithOneMillionUsers() throws Exception { JsonObject response = loadBulkImportUsersCountWithStatus(main, null); assertEquals("OK", response.get("status").getAsString()); count = response.get("count").getAsLong(); - System.out.println("Number of unprocessed users: " + count + "," + response); int newUsersNumber = loadBulkImportUsersCountWithStatus(main, BulkImportStorage.BULK_IMPORT_USER_STATUS.NEW).get("count").getAsInt(); - int failedUsersNumber = loadBulkImportUsersCountWithStatus(main, BulkImportStorage.BULK_IMPORT_USER_STATUS.FAILED).get("count").getAsInt(); int processingUsersNumber = loadBulkImportUsersCountWithStatus(main, BulkImportStorage.BULK_IMPORT_USER_STATUS.PROCESSING).get("count").getAsInt(); - System.out.println("\t stats: "); - System.out.println("\t\tNEW: \t" + newUsersNumber); - System.out.println("\t\tFAILED: \t" + failedUsersNumber); - System.out.println("\t\tPROCESSING: \t" + processingUsersNumber); count = newUsersNumber + processingUsersNumber; - long elapsedSeconds = (System.currentTimeMillis() - processingStartedTime) / 1000; - System.out.println("Elapsed time: " + elapsedSeconds + " seconds, (" + elapsedSeconds / 3600 + " hours)"); if(count == 0 ){ break; } - Thread.sleep(60000); // one minute + Thread.sleep(5000); // one minute } } - long processingFinishedTime = System.currentTimeMillis(); - System.out.println("Processing took " + (processingFinishedTime - processingStartedTime) / 1000 + " seconds"); - - //print failed users - { - JsonObject failedUsersLs = loadBulkImportUsersWithStatus(main, BulkImportStorage.BULK_IMPORT_USER_STATUS.FAILED); - if(failedUsersLs.has("users") ){ - System.out.println(failedUsersLs.get("users")); - } - } + long processingFinished = System.currentTimeMillis(); + System.out.println("Processed " + NUMBER_OF_USERS_TO_UPLOAD + " users in " + (processingFinished - processingStarted) / 1000 + + " seconds ( or " + (processingFinished - processingStarted) / 60000 + " minutes)"); // after processing finished, make sure every user got processed correctly { @@ -163,7 +140,7 @@ public void testWithOneMillionUsers() throws Exception { } @Test - public void testBatchWithDuplicates() throws Exception { + public void testBatchWithDuplicate() throws Exception { String[] args = {"../"}; // set processing thread number @@ -226,20 +203,291 @@ public void testBatchWithDuplicates() throws Exception { //print failed users JsonObject failedUsersLs = loadBulkImportUsersWithStatus(main, BulkImportStorage.BULK_IMPORT_USER_STATUS.FAILED); - if (failedUsersLs.has("users")) { - System.out.println(failedUsersLs.get("users")); - } - System.out.println("Failed Users: " + failedUsersLs); - System.out.println("Failed Users Number: " + failedUsersNumber); // after processing finished, make sure every user got processed correctly int failedImportedUsersNumber = loadBulkImportUsersCountWithStatus(main, BulkImportStorage.BULK_IMPORT_USER_STATUS.FAILED).get("count").getAsInt(); int usersInCore = loadUsersCount(main).get("count").getAsInt(); - System.out.println("Users in core: " + usersInCore); assertEquals(NUMBER_OF_USERS_TO_UPLOAD + 2, usersInCore + failedImportedUsersNumber); assertEquals(2, failedImportedUsersNumber); + + for(JsonElement userJson : failedUsersLs.get("users").getAsJsonArray()) { + String errorMessage = userJson.getAsJsonObject().get("errorMessage").getAsString(); + assertTrue(errorMessage.startsWith("E003:")); + } + + } + + @Test + public void testBatchWithDuplicateUserIdMappingWithInputValidation() throws Exception { + String[] args = {"../"}; + + // set processing thread number + Utils.setValueInConfig("bulk_migration_parallelism", "12"); + + TestingProcessManager.TestingProcess process = TestingProcessManager.start(args); + assertNotNull(process.checkOrWaitForEvent(ProcessState.PROCESS_STATE.STARTED)); + Main main = process.getProcess(); + + setFeatureFlags(main, new EE_FEATURES[]{ + EE_FEATURES.ACCOUNT_LINKING, EE_FEATURES.MULTI_TENANCY, EE_FEATURES.MFA}); + + int NUMBER_OF_USERS_TO_UPLOAD = 20; + + if (StorageLayer.getBaseStorage(main).getType() != STORAGE_TYPE.SQL || StorageLayer.isInMemDb(main)) { + return; + } + + // Create user roles before inserting bulk users + UserRoles.createNewRoleOrModifyItsPermissions(main, "role1", null); + UserRoles.createNewRoleOrModifyItsPermissions(main, "role2", null); + + // upload a bunch of users through the API + JsonObject usersJson = generateUsersJson(NUMBER_OF_USERS_TO_UPLOAD, 0); + + //set the first and last users' externalId to the same value + usersJson.get("users").getAsJsonArray().get(0).getAsJsonObject().addProperty("externalUserId", + "some-text-external-id"); + usersJson.get("users").getAsJsonArray().get(19).getAsJsonObject().addProperty("externalUserId", + "some-text-external-id"); + + try { + JsonObject response = uploadBulkImportUsersJson(main, usersJson); + } catch (HttpResponseException expected) { + assertEquals(400, expected.statusCode); + assertEquals("Http error. Status Code: 400. Message: {\"error\":\"Data has missing or invalid fields. Please check the users field for more details.\",\"users\":[{\"index\":19,\"errors\":[\"externalUserId some-text-external-id is not unique. It is already used by another user.\"]}]}", + expected.getMessage()); + } + } + + @Test + public void testBatchWithInvalidInput() throws Exception { + String[] args = {"../"}; + + // set processing thread number + Utils.setValueInConfig("bulk_migration_parallelism", "12"); + + TestingProcessManager.TestingProcess process = TestingProcessManager.start(args); + assertNotNull(process.checkOrWaitForEvent(ProcessState.PROCESS_STATE.STARTED)); + Main main = process.getProcess(); + + setFeatureFlags(main, new EE_FEATURES[]{ + EE_FEATURES.ACCOUNT_LINKING, EE_FEATURES.MULTI_TENANCY, EE_FEATURES.MFA}); + + int NUMBER_OF_USERS_TO_UPLOAD = 2; + + if (StorageLayer.getBaseStorage(main).getType() != STORAGE_TYPE.SQL || StorageLayer.isInMemDb(main)) { + return; + } + + // Create user roles before inserting bulk users + UserRoles.createNewRoleOrModifyItsPermissions(main, "role1", null); + UserRoles.createNewRoleOrModifyItsPermissions(main, "role2", null); + + // upload a bunch of users through the API + JsonObject usersJson = generateUsersJson(NUMBER_OF_USERS_TO_UPLOAD, 0); + + usersJson.get("users").getAsJsonArray().get(0).getAsJsonObject().addProperty("externalUserId", + Boolean.FALSE); // invalid, should be string + + try { + JsonObject response = uploadBulkImportUsersJson(main, usersJson); + } catch (HttpResponseException exception) { + assertEquals(400, exception.statusCode); + assertEquals("Http error. Status Code: 400. Message: {\"error\":\"Data has missing or invalid " + + "fields. Please check the users field for more details.\",\"users\":[{\"index\":0,\"errors\":" + + "[\"externalUserId should be of type string.\"]}]}", exception.getMessage()); + } + } + + @Test + public void testBatchWithMissingRole() throws Exception { + String[] args = {"../"}; + + // set processing thread number + Utils.setValueInConfig("bulk_migration_parallelism", "12"); + + TestingProcessManager.TestingProcess process = TestingProcessManager.start(args); + assertNotNull(process.checkOrWaitForEvent(ProcessState.PROCESS_STATE.STARTED)); + Main main = process.getProcess(); + + setFeatureFlags(main, new EE_FEATURES[]{ + EE_FEATURES.ACCOUNT_LINKING, EE_FEATURES.MULTI_TENANCY, EE_FEATURES.MFA}); + + int NUMBER_OF_USERS_TO_UPLOAD = 2; + + if (StorageLayer.getBaseStorage(main).getType() != STORAGE_TYPE.SQL || StorageLayer.isInMemDb(main)) { + return; + } + + // Creating only one user role before inserting bulk users + UserRoles.createNewRoleOrModifyItsPermissions(main, "role1", null); + + // upload a bunch of users through the API + JsonObject usersJson = generateUsersJson(NUMBER_OF_USERS_TO_UPLOAD, 0); + + try { + JsonObject response = uploadBulkImportUsersJson(main, usersJson); + } catch (HttpResponseException exception) { + assertEquals(400, exception.statusCode); + assertEquals(400, exception.statusCode); + assertEquals("Http error. Status Code: 400. Message: {\"error\":\"Data has missing or " + + "invalid fields. Please check the users field for more details.\",\"users\":[{\"index\":0,\"errors\"" + + ":[\"Role role2 does not exist.\"]},{\"index\":1,\"errors\":[\"Role role2 does not exist.\"]}]}", + exception.getMessage()); + } + } + + @Test + public void testBatchWithOnlyOneWithDuplicate() throws Exception { + String[] args = {"../"}; + + // set processing thread number + Utils.setValueInConfig("bulk_migration_parallelism", "2"); + + TestingProcessManager.TestingProcess process = TestingProcessManager.start(args); + assertNotNull(process.checkOrWaitForEvent(ProcessState.PROCESS_STATE.STARTED)); + Main main = process.getProcess(); + + setFeatureFlags(main, new EE_FEATURES[]{ + EE_FEATURES.ACCOUNT_LINKING, EE_FEATURES.MULTI_TENANCY, EE_FEATURES.MFA}); + + int NUMBER_OF_USERS_TO_UPLOAD = 9; + + if (StorageLayer.getBaseStorage(main).getType() != STORAGE_TYPE.SQL || StorageLayer.isInMemDb(main)) { + return; + } + + //create tenant t1 + TenantIdentifier tenantIdentifier = new TenantIdentifier(null, null, "t1"); + + Multitenancy.addNewOrUpdateAppOrTenant( + main, + new TenantIdentifier(null, null, null), + new TenantConfig( + tenantIdentifier, + new EmailPasswordConfig(true), + new ThirdPartyConfig(true, null), + new PasswordlessConfig(true), + null, null, new JsonObject())); + + // Create user roles before inserting bulk users + UserRoles.createNewRoleOrModifyItsPermissions(main, "role1", null); + UserRoles.createNewRoleOrModifyItsPermissions(main, "role2", null); + + // upload a bunch of users through the API + JsonObject usersJson = generateUsersJson(NUMBER_OF_USERS_TO_UPLOAD, 0); + + usersJson.get("users").getAsJsonArray().add(generateUsersJson(1, 0).get("users").getAsJsonArray().get(0).getAsJsonObject()); + + JsonObject response = uploadBulkImportUsersJson(main, usersJson); + assertEquals("OK", response.get("status").getAsString()); + + // Starting the processing cronjob here to be able to measure the runtime + startBulkImportCronjob(main, 10); + + // wait for the cron job to process them + // periodically check the remaining unprocessed users + // Note1: the cronjob starts the processing automatically + // Note2: the successfully processed users get deleted from the bulk_import_users table + + long count = NUMBER_OF_USERS_TO_UPLOAD; + int failedUsersNumber = 0; + while (true) { + response = loadBulkImportUsersCountWithStatus(main, null); + assertEquals("OK", response.get("status").getAsString()); + int newUsersNumber = loadBulkImportUsersCountWithStatus(main, + BulkImportStorage.BULK_IMPORT_USER_STATUS.NEW).get("count").getAsInt(); + int processingUsersNumber = loadBulkImportUsersCountWithStatus(main, + BulkImportStorage.BULK_IMPORT_USER_STATUS.PROCESSING).get("count").getAsInt(); + + count = newUsersNumber + processingUsersNumber; + if(count == 0) { + break; + } + Thread.sleep(5000); + } + + //print failed users + JsonObject failedUsersLs = loadBulkImportUsersWithStatus(main, + BulkImportStorage.BULK_IMPORT_USER_STATUS.FAILED); + + // after processing finished, make sure every user got processed correctly + int failedImportedUsersNumber = loadBulkImportUsersCountWithStatus(main, + BulkImportStorage.BULK_IMPORT_USER_STATUS.FAILED).get("count").getAsInt(); + int usersInCore = loadUsersCount(main).get("count").getAsInt(); + assertEquals(NUMBER_OF_USERS_TO_UPLOAD + 1, usersInCore + failedImportedUsersNumber); + assertEquals(1, failedImportedUsersNumber); + + + for(JsonElement userJson : failedUsersLs.get("users").getAsJsonArray()) { + String errorMessage = userJson.getAsJsonObject().get("errorMessage").getAsString(); + assertTrue(errorMessage.startsWith("E003:")); + } + + } + + @Test + public void testBatchWithOneThreadWorks() throws Exception { + String[] args = {"../"}; + + // set processing thread number + Utils.setValueInConfig("bulk_migration_parallelism", "1"); + + TestingProcessManager.TestingProcess process = TestingProcessManager.start(args); + assertNotNull(process.checkOrWaitForEvent(ProcessState.PROCESS_STATE.STARTED)); + Main main = process.getProcess(); + + setFeatureFlags(main, new EE_FEATURES[]{ + EE_FEATURES.ACCOUNT_LINKING, EE_FEATURES.MULTI_TENANCY, EE_FEATURES.MFA}); + + int NUMBER_OF_USERS_TO_UPLOAD = 5; + + if (StorageLayer.getBaseStorage(main).getType() != STORAGE_TYPE.SQL || StorageLayer.isInMemDb(main)) { + return; + } + + // Create user roles before inserting bulk users + UserRoles.createNewRoleOrModifyItsPermissions(main, "role1", null); + UserRoles.createNewRoleOrModifyItsPermissions(main, "role2", null); + + // upload a bunch of users through the API + JsonObject usersJson = generateUsersJson(NUMBER_OF_USERS_TO_UPLOAD, 0); + + JsonObject response = uploadBulkImportUsersJson(main, usersJson); + assertEquals("OK", response.get("status").getAsString()); + + // Starting the processing cronjob here to be able to measure the runtime + startBulkImportCronjob(main, 8000); + + // wait for the cron job to process them + // periodically check the remaining unprocessed users + // Note1: the cronjob starts the processing automatically + // Note2: the successfully processed users get deleted from the bulk_import_users table + + long count = NUMBER_OF_USERS_TO_UPLOAD; + while (true) { + response = loadBulkImportUsersCountWithStatus(main, null); + assertEquals("OK", response.get("status").getAsString()); + int newUsersNumber = loadBulkImportUsersCountWithStatus(main, + BulkImportStorage.BULK_IMPORT_USER_STATUS.NEW).get("count").getAsInt(); + int processingUsersNumber = loadBulkImportUsersCountWithStatus(main, + BulkImportStorage.BULK_IMPORT_USER_STATUS.PROCESSING).get("count").getAsInt(); + + count = newUsersNumber + processingUsersNumber; + if(count == 0) { + break; + } + Thread.sleep(5000); // 5 seconds + } + + // after processing finished, make sure every user got processed correctly + int failedImportedUsersNumber = loadBulkImportUsersCountWithStatus(main, + BulkImportStorage.BULK_IMPORT_USER_STATUS.FAILED).get("count").getAsInt(); + int usersInCore = loadUsersCount(main).get("count").getAsInt(); + assertEquals(NUMBER_OF_USERS_TO_UPLOAD, usersInCore); + assertEquals(0, failedImportedUsersNumber); } @Test @@ -257,9 +505,6 @@ public void testFirstLazyImportAfterBulkImport() throws Exception { EE_FEATURES.ACCOUNT_LINKING, EE_FEATURES.MULTI_TENANCY, EE_FEATURES.MFA }); int NUMBER_OF_USERS_TO_UPLOAD = 1000; - int parallelism_set_to = Config.getConfig(main).getBulkMigrationParallelism(); - System.out.println("Number of users to be imported with bulk import: " + NUMBER_OF_USERS_TO_UPLOAD); - System.out.println("Worker threads: " + parallelism_set_to); if (StorageLayer.getBaseStorage(main).getType() != STORAGE_TYPE.SQL || StorageLayer.isInMemDb(main)) { return; @@ -282,24 +527,17 @@ public void testFirstLazyImportAfterBulkImport() throws Exception { assertEquals("OK", lazyImportResponse.get("status").getAsString()); assertNotNull(lazyImportResponse.get("user")); successfully_lazy_imported++; - System.out.println(i + "th lazy imported"); -// System.out.println("\tOriginal user: " + userToImportLazy); -// System.out.println("\tResponse user: " + lazyImportResponse.get("user")); } // bulk import all of the users { JsonObject bulkUploadResponse = uploadBulkImportUsersJson(main, allUsersJson); assertEquals("OK", bulkUploadResponse.get("status").getAsString()); - System.out.println("Bulk uploaded all of the users"); } - long processingStartedTime = System.currentTimeMillis(); - - // Starting the processing cronjob here to be able to measure the runtime startBulkImportCronjob(main, 10000); - System.out.println("CronJob started"); + // wait for the cron job to process them // periodically check the remaining unprocessed users @@ -310,15 +548,8 @@ public void testFirstLazyImportAfterBulkImport() throws Exception { while(count != 0) { JsonObject response = loadBulkImportUsersCountWithStatus(main, null); assertEquals("OK", response.get("status").getAsString()); - count = response.get("count").getAsLong(); - System.out.println("Number of unprocessed users: " + count + "," + response); int newUsersNumber = loadBulkImportUsersCountWithStatus(main, BulkImportStorage.BULK_IMPORT_USER_STATUS.NEW).get("count").getAsInt(); - int failedUsersNumber = loadBulkImportUsersCountWithStatus(main, BulkImportStorage.BULK_IMPORT_USER_STATUS.FAILED).get("count").getAsInt(); int processingUsersNumber = loadBulkImportUsersCountWithStatus(main, BulkImportStorage.BULK_IMPORT_USER_STATUS.PROCESSING).get("count").getAsInt(); - System.out.println("\t stats: "); - System.out.println("\t\tNEW: \t" + newUsersNumber); - System.out.println("\t\tFAILED: \t" + failedUsersNumber); - System.out.println("\t\tPROCESSING: \t" + processingUsersNumber); count = newUsersNumber + processingUsersNumber; // + processingUsersNumber; @@ -326,8 +557,6 @@ public void testFirstLazyImportAfterBulkImport() throws Exception { } } - long processingFinishedTime = System.currentTimeMillis(); - System.out.println("Processing took " + (processingFinishedTime - processingStartedTime) / 1000 + " seconds"); // expect: lazy imported users are already there, duplicate.. errors // expect: not lazy imported users are imported successfully @@ -344,7 +573,6 @@ public void testFirstLazyImportAfterBulkImport() throws Exception { String errorMessage = failedUser.getAsJsonObject().get("errorMessage").getAsString(); assertTrue(errorMessage.startsWith("E003:") || errorMessage.startsWith("E005:") || errorMessage.startsWith("E006:") || errorMessage.startsWith("E007:")); // duplicate email, phone, etc errors - System.out.println(errorMessage); } stopBulkImportCronjob(main); @@ -362,9 +590,6 @@ public void testLazyImport() throws Exception { EE_FEATURES.ACCOUNT_LINKING, EE_FEATURES.MULTI_TENANCY, EE_FEATURES.MFA }); int NUMBER_OF_USERS_TO_UPLOAD = 100; - int parallelism_set_to = Config.getConfig(main).getBulkMigrationParallelism(); - System.out.println("Number of users to be imported with bulk import: " + NUMBER_OF_USERS_TO_UPLOAD); - System.out.println("Worker threads: " + parallelism_set_to); if (StorageLayer.getBaseStorage(main).getType() != STORAGE_TYPE.SQL || StorageLayer.isInMemDb(main)) { return; @@ -387,7 +612,6 @@ public void testLazyImport() throws Exception { assertEquals("OK", lazyImportResponse.get("status").getAsString()); assertNotNull(lazyImportResponse.get("user")); successfully_lazy_imported++; - System.out.println(i + "th lazy imported"); } // expect: lazy imported users are already there, duplicate.. errors @@ -470,10 +694,10 @@ public void testLazyImportDuplicatesFail() throws Exception { JsonObject userToImportLazyAgain = allUsersJson.get("users").getAsJsonArray().get(0).getAsJsonObject(); try { - JsonObject lazyImportResponseTwo = lazyImportUser(main, userToImportLazy); + JsonObject lazyImportResponseTwo = lazyImportUser(main, userToImportLazyAgain); } catch (HttpResponseException expected) { - assertEquals(400, expected.statusCode); System.out.println(expected.getMessage()); + assertEquals(400, expected.statusCode); } } @@ -481,7 +705,7 @@ private static JsonObject lazyImportUser(Main main, JsonObject user) throws HttpResponseException, IOException { return HttpRequestForTesting.sendJsonPOSTRequest(main, "", "http://localhost:3567/bulk-import/import", - user, 1000, 1000, null, Utils.getCdiVersionStringLatestForTests(), null); + user, 100000, 100000, null, Utils.getCdiVersionStringLatestForTests(), null); } private static JsonObject loadBulkImportUsersCountWithStatus(Main main, BulkImportStorage.BULK_IMPORT_USER_STATUS status) @@ -528,6 +752,7 @@ private static JsonObject generateUsersJson(int numberOfUsers, int startIndex) { "[{\"role\":\"role1\", \"tenantIds\": [\"public\"]},{\"role\":\"role2\", \"tenantIds\": [\"public\"]}]")); user.add("totpDevices", parser.parse("[{\"secretKey\":\"secretKey\",\"deviceName\":\"deviceName\"}]")); + //JsonArray tenanatIds = parser.parse("[\"public\", \"t1\"]").getAsJsonArray(); JsonArray tenanatIds = parser.parse("[\"public\"]").getAsJsonArray(); String email = " johndoe+" + (i + startIndex) + "@gmail.com "; @@ -635,53 +860,4 @@ private static JsonObject uploadBulkImportUsersJson(Main main, JsonObject reques "http://localhost:3567/bulk-import/users", request, 1000, 10000, null, Utils.getCdiVersionStringLatestForTests(), null); } - - @Test - public void writeUsersToFile() throws Exception { - String[] args = { "../" }; - - // set processing thread number - Utils.setValueInConfig("bulk_migration_parallelism", "14"); - - TestingProcessManager.TestingProcess process = TestingProcessManager.start(args); - assertNotNull(process.checkOrWaitForEvent(ProcessState.PROCESS_STATE.STARTED)); - Main main = process.getProcess(); - - setFeatureFlags(main, new EE_FEATURES[] { - EE_FEATURES.ACCOUNT_LINKING, EE_FEATURES.MULTI_TENANCY, EE_FEATURES.MFA }); - - int NUMBER_OF_USERS_TO_UPLOAD = 1000000; - int parallelism_set_to = Config.getConfig(main).getBulkMigrationParallelism(); - System.out.println("Number of users to be imported with bulk import: " + NUMBER_OF_USERS_TO_UPLOAD); - System.out.println("Worker threads: " + parallelism_set_to); - - if (StorageLayer.getBaseStorage(main).getType() != STORAGE_TYPE.SQL || StorageLayer.isInMemDb(main)) { - return; - } - - // Create user roles before inserting bulk users - { - UserRoles.createNewRoleOrModifyItsPermissions(main, "role1", null); - UserRoles.createNewRoleOrModifyItsPermissions(main, "role2", null); - } - - // upload a bunch of users through the API - { - for (int i = 0; i < (NUMBER_OF_USERS_TO_UPLOAD / 10000); i++) { - JsonObject request = generateUsersJson(10000, i * 10000); // API allows 10k users upload at once - FileWriter fileWriter = new FileWriter(new File("/home/prophet/Projects/bulkimport-users-" + i + ".json")); - fileWriter.write(String.valueOf(request)); - fileWriter.flush(); - fileWriter.close(); - } - - } - - System.out.println("setup done, waiting"); - while(true){ - Thread.sleep(10000); - } - } - - } diff --git a/src/test/java/io/supertokens/test/bulkimport/ProcessBulkImportUsersCronJobTest.java b/src/test/java/io/supertokens/test/bulkimport/ProcessBulkImportUsersCronJobTest.java index 3e3446535..0bf2647b2 100644 --- a/src/test/java/io/supertokens/test/bulkimport/ProcessBulkImportUsersCronJobTest.java +++ b/src/test/java/io/supertokens/test/bulkimport/ProcessBulkImportUsersCronJobTest.java @@ -19,13 +19,10 @@ import io.supertokens.Main; import io.supertokens.ProcessState; -import io.supertokens.ResourceDistributor; import io.supertokens.authRecipe.AuthRecipe; import io.supertokens.authRecipe.UserPaginationContainer; import io.supertokens.bulkimport.BulkImport; import io.supertokens.bulkimport.BulkImportBackgroundJobManager; -import io.supertokens.config.Config; -import io.supertokens.cronjobs.CronTask; import io.supertokens.cronjobs.CronTaskTest; import io.supertokens.cronjobs.Cronjobs; import io.supertokens.cronjobs.bulkimport.ProcessBulkImportUsers; @@ -33,35 +30,32 @@ import io.supertokens.featureflag.FeatureFlagTestContent; import io.supertokens.pluginInterface.STORAGE_TYPE; import io.supertokens.pluginInterface.Storage; -import io.supertokens.pluginInterface.bulkimport.BulkImportUser; import io.supertokens.pluginInterface.bulkimport.BulkImportStorage.BULK_IMPORT_USER_STATUS; +import io.supertokens.pluginInterface.bulkimport.BulkImportUser; import io.supertokens.pluginInterface.bulkimport.sqlStorage.BulkImportSQLStorage; import io.supertokens.pluginInterface.multitenancy.AppIdentifier; import io.supertokens.pluginInterface.multitenancy.TenantIdentifier; import io.supertokens.pluginInterface.multitenancy.exceptions.TenantOrAppNotFoundException; import io.supertokens.storageLayer.StorageLayer; -import io.supertokens.test.CronjobTest; import io.supertokens.test.TestingProcessManager; import io.supertokens.test.TestingProcessManager.TestingProcess; import io.supertokens.test.Utils; import io.supertokens.useridmapping.UserIdMapping; import io.supertokens.userroles.UserRoles; - import org.junit.AfterClass; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestRule; +import java.util.ArrayList; +import java.util.List; + import static io.supertokens.test.bulkimport.BulkImportTestUtils.generateBulkImportUser; import static io.supertokens.test.bulkimport.BulkImportTestUtils.generateBulkImportUserWithRoles; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - public class ProcessBulkImportUsersCronJobTest { @Rule public TestRule watchman = Utils.getOnFailure(); @@ -352,7 +346,7 @@ public void shouldDeleteEverythingFromTheDBIfAnythingFails() throws Exception { assertEquals(1, usersAfterProcessing.size()); assertEquals(BULK_IMPORT_USER_STATUS.FAILED, usersAfterProcessing.get(0).status); - assertEquals("E034: Role role1 does not exist! You need pre-create the role before assigning it to the user.", + assertEquals("E034: Role does not exist! You need to pre-create the role before assigning it to the user.", usersAfterProcessing.get(0).errorMessage); UserPaginationContainer container = AuthRecipe.getUsers(main, 100, "ASC", null, null, null); @@ -395,7 +389,7 @@ public void shouldDeleteEverythingFromTheDBIfAnythingFailsOnMultipleThreads() th for(BulkImportUser userAfterProcessing: usersAfterProcessing){ assertEquals(BULK_IMPORT_USER_STATUS.FAILED, userAfterProcessing.status); // should process every user and every one of them should fail because of the missing role - assertEquals("E034: Role role1 does not exist! You need pre-create the role before assigning it to the user.", + assertEquals("E034: Role does not exist! You need to pre-create the role before assigning it to the user.", userAfterProcessing.errorMessage); } @@ -419,7 +413,6 @@ public void shouldDeleteOnlyFailedFromTheDBIfAnythingFailsOnMultipleThreads() th return; } - BulkImportTestUtils.createTenants(main); BulkImportSQLStorage storage = (BulkImportSQLStorage) StorageLayer.getStorage(main); @@ -435,7 +428,7 @@ public void shouldDeleteOnlyFailedFromTheDBIfAnythingFailsOnMultipleThreads() th BulkImport.addUsers(appIdentifier, storage, users); - Thread.sleep(2 * 60000); + Thread.sleep(60000); // one minute List usersAfterProcessing = storage.getBulkImportUsers(appIdentifier, 100, null, null, null); @@ -446,7 +439,7 @@ public void shouldDeleteOnlyFailedFromTheDBIfAnythingFailsOnMultipleThreads() th for(int i = 0; i < usersAfterProcessing.size(); i++){ if(usersAfterProcessing.get(i).status == BULK_IMPORT_USER_STATUS.FAILED) { assertEquals( - "E034: Role notExistingRole does not exist! You need pre-create the role before assigning it to the user.", + "E034: Role does not exist! You need to pre-create the role before assigning it to the user.", usersAfterProcessing.get(i).errorMessage); numberOfFailed++; } @@ -538,13 +531,13 @@ private TestingProcess startCronProcess() throws InterruptedException, TenantOrA // We are setting a non-zero initial wait for tests to avoid race condition with the beforeTest process that deletes data in the storage layer CronTaskTest.getInstance(main).setInitialWaitTimeInSeconds(ProcessBulkImportUsers.RESOURCE_KEY, 5); - CronTaskTest.getInstance(main).setIntervalInSeconds(ProcessBulkImportUsers.RESOURCE_KEY, 100000); + CronTaskTest.getInstance(main).setIntervalInSeconds(ProcessBulkImportUsers.RESOURCE_KEY, 1); process.startProcess(); assertNotNull(process.checkOrWaitForEvent(ProcessState.PROCESS_STATE.STARTED)); Cronjobs.addCronjob(main, (ProcessBulkImportUsers) main.getResourceDistributor().getResource(new TenantIdentifier(null, null, null), ProcessBulkImportUsers.RESOURCE_KEY)); - BulkImportBackgroundJobManager.startBackgroundJob(main, 1000); + BulkImportBackgroundJobManager.startBackgroundJob(main, 8000); if (StorageLayer.getStorage(main).getType() != STORAGE_TYPE.SQL) { return null; }