From d721cd5ce63d2426ce619cd7fa75536032c2d3fa Mon Sep 17 00:00:00 2001 From: Ankit Tiwari Date: Fri, 29 Mar 2024 11:49:11 +0530 Subject: [PATCH] fix: PR changes --- .../bulkimport/ProcessBulkImportUsers.java | 116 ++++++++++++++---- 1 file changed, 92 insertions(+), 24 deletions(-) diff --git a/src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkImportUsers.java b/src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkImportUsers.java index 0be95228a..918a9cefa 100644 --- a/src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkImportUsers.java +++ b/src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkImportUsers.java @@ -48,6 +48,7 @@ import io.supertokens.pluginInterface.Storage; import io.supertokens.pluginInterface.StorageUtils; import io.supertokens.pluginInterface.authRecipe.AuthRecipeUserInfo; +import io.supertokens.pluginInterface.authRecipe.sqlStorage.AuthRecipeSQLStorage; import io.supertokens.pluginInterface.bulkimport.BulkImportUser; import io.supertokens.pluginInterface.bulkimport.BulkImportStorage.BULK_IMPORT_USER_STATUS; import io.supertokens.pluginInterface.bulkimport.BulkImportUser.LoginMethod; @@ -205,7 +206,23 @@ private void processUser(AppIdentifier appIdentifier, BulkImportUser user, BulkI LoginMethod primaryLM = getPrimaryLoginMethod(user); + AuthRecipeSQLStorage authRecipeSQLStorage = (AuthRecipeSQLStorage) getProxyStorage(firstTenantIdentifier); try { + // If primaryUserId is not null, it means we may have already processed this user but failed to delete the entry + // If the primaryUserId exists in the database, we'll delete the corresponding entry from the bulkImportUser table and proceed to skip this user. + if (user.primaryUserId != null) { + AuthRecipeUserInfo processedUser = authRecipeSQLStorage.getPrimaryUserById(appIdentifier, + user.primaryUserId); + + if (processedUser != null && isProcessedUserFromSameBulkImportUserEntry(processedUser, user)) { + baseTenantStorage.startTransaction(con2 -> { + baseTenantStorage.deleteBulkImportUser_Transaction(appIdentifier, con2, user.id); + return null; + }); + return; + } + } + bulkImportProxyStorage.startTransaction(con -> { try { for (LoginMethod lm : user.loginMethods) { @@ -219,15 +236,24 @@ private void processUser(AppIdentifier appIdentifier, BulkImportUser user, BulkI createUserMetadata(appIdentifier, bulkImportProxyStorage, user, primaryLM); createUserRoles(main, appIdentifier, bulkImportProxyStorage, user); + // We are updating the primaryUserId in the bulkImportUser entry. This will help us handle the inconsistent transaction commit. + // If this update statement fails then the outer transaction will fail as well and the user will simpl be processed again. No inconsistency will happen in this + // case. + baseTenantStorage.updateBulkImportUserPrimaryUserId(appIdentifier, user.id, + primaryLM.superTokensUserId); + + // We need to commit the transaction manually because we have overridden that in the proxy storage + // If this fails, the primaryUserId will be updated in the bulkImportUser but it wouldn’t actually exist. + // When processing the user again, we'll check if primaryUserId exists with the same email. In this case the user won't exist, and we'll simply re-process it. + bulkImportProxyStorage.commitTransactionForBulkImportProxyStorage(); + // NOTE: We need to use the baseTenantStorage as bulkImportProxyStorage could have a different storage than the baseTenantStorage + // If this fails, the primaryUserId will be updated in the bulkImportUser and it would exist in the database. + // When processing the user again, we'll check if primaryUserId exists with the same email. In this case the user will exist, and we'll simply delete the entry. baseTenantStorage.startTransaction(con2 -> { - baseTenantStorage.deleteBulkImportUser_Transaction(appIdentifier, con2, - user.id); + baseTenantStorage.deleteBulkImportUser_Transaction(appIdentifier, con2, user.id); return null; }); - - // We need to commit the transaction manually because we have overridden that in the proxy storage - bulkImportProxyStorage.commitTransactionForBulkImportProxyStorage(); return null; } catch (StorageTransactionLogicException e) { // We need to rollback the transaction manually because we have overridden that in the proxy storage @@ -291,7 +317,7 @@ private void processEmailPasswordLoginMethod(TenantIdentifier tenantIdentifier, ImportUserResponse userInfo = EmailPassword.createUserWithPasswordHash(tenantIdentifier, storage, lm.email, lm.passwordHash, lm.timeJoinedInMSSinceEpoch); - lm.superTokensOrExternalUserId = userInfo.user.getSupertokensUserId(); + lm.superTokensUserId = userInfo.user.getSupertokensUserId(); } catch (StorageQueryException | TenantOrAppNotFoundException e) { throw new StorageTransactionLogicException(e); } catch (DuplicateEmailException e) { @@ -307,7 +333,7 @@ private void processThirdPartyLoginMethod(TenantIdentifier tenantIdentifier, Sto tenantIdentifier, storage, lm.thirdPartyId, lm.thirdPartyUserId, lm.email, lm.timeJoinedInMSSinceEpoch); - lm.superTokensOrExternalUserId = userInfo.user.getSupertokensUserId(); + lm.superTokensUserId = userInfo.user.getSupertokensUserId(); } catch (StorageQueryException | TenantOrAppNotFoundException e) { throw new StorageTransactionLogicException(e); } catch (DuplicateThirdPartyUserException e) { @@ -322,7 +348,7 @@ private void processPasswordlessLoginMethod(TenantIdentifier tenantIdentifier, S AuthRecipeUserInfo userInfo = Passwordless.createPasswordlessUser(tenantIdentifier, storage, lm.email, lm.phoneNumber, lm.timeJoinedInMSSinceEpoch); - lm.superTokensOrExternalUserId = userInfo.getSupertokensUserId(); + lm.superTokensUserId = userInfo.getSupertokensUserId(); } catch (StorageQueryException | TenantOrAppNotFoundException | RestartFlowException e) { throw new StorageTransactionLogicException(e); } @@ -338,7 +364,7 @@ private void associateUserToTenants(Main main, AppIdentifier appIdentifier, Stor TenantIdentifier tenantIdentifier = new TenantIdentifier(appIdentifier.getConnectionUriDomain(), appIdentifier.getAppId(), tenantId); - Multitenancy.addUserIdToTenant(main, tenantIdentifier, storage, lm.superTokensOrExternalUserId); + Multitenancy.addUserIdToTenant(main, tenantIdentifier, storage, lm.getSuperTokenOrExternalUserId()); } catch (TenantOrAppNotFoundException | UnknownUserIdException | StorageQueryException | FeatureNotEnabledException | DuplicateEmailException | DuplicatePhoneNumberException | DuplicateThirdPartyUserException | AnotherPrimaryUserWithPhoneNumberAlreadyExistsException @@ -357,12 +383,12 @@ private void createPrimaryUserAndLinkAccounts(Main main, } try { - AuthRecipe.createPrimaryUser(main, appIdentifier, storage, primaryLM.superTokensOrExternalUserId); + AuthRecipe.createPrimaryUser(main, appIdentifier, storage, primaryLM.getSuperTokenOrExternalUserId()); } catch (TenantOrAppNotFoundException | FeatureNotEnabledException | StorageQueryException e) { throw new StorageTransactionLogicException(e); } catch (UnknownUserIdException e) { throw new StorageTransactionLogicException(new Exception( - "We tried to create the primary user for the userId " + primaryLM.superTokensOrExternalUserId + "We tried to create the primary user for the userId " + primaryLM.getSuperTokenOrExternalUserId() + " but it doesn't exist. This should not happen. Please contact support.")); } catch (RecipeUserIdAlreadyLinkedWithPrimaryUserIdException | AccountInfoAlreadyAssociatedWithAnotherPrimaryUserIdException e) { @@ -372,24 +398,24 @@ private void createPrimaryUserAndLinkAccounts(Main main, for (LoginMethod lm : user.loginMethods) { try { - if (lm.superTokensOrExternalUserId.equals(primaryLM.superTokensOrExternalUserId)) { + if (lm.getSuperTokenOrExternalUserId().equals(primaryLM.getSuperTokenOrExternalUserId())) { continue; } - AuthRecipe.linkAccounts(main, appIdentifier, storage, lm.superTokensOrExternalUserId, - primaryLM.superTokensOrExternalUserId); + AuthRecipe.linkAccounts(main, appIdentifier, storage, lm.getSuperTokenOrExternalUserId(), + primaryLM.getSuperTokenOrExternalUserId()); } catch (TenantOrAppNotFoundException | FeatureNotEnabledException | StorageQueryException e) { throw new StorageTransactionLogicException(e); } catch (UnknownUserIdException e) { throw new StorageTransactionLogicException( - new Exception("We tried to link the userId " + lm.superTokensOrExternalUserId - + " to the primary userId " + primaryLM.superTokensOrExternalUserId + new Exception("We tried to link the userId " + lm.getSuperTokenOrExternalUserId() + + " to the primary userId " + primaryLM.getSuperTokenOrExternalUserId() + " but it doesn't exist. This should not happen. Please contact support.")); } catch (InputUserIdIsNotAPrimaryUserException e) { throw new StorageTransactionLogicException( - new Exception("We tried to link the userId " + lm.superTokensOrExternalUserId - + " to the primary userId " + primaryLM.superTokensOrExternalUserId + new Exception("We tried to link the userId " + lm.getSuperTokenOrExternalUserId() + + " to the primary userId " + primaryLM.getSuperTokenOrExternalUserId() + " but it is not a primary user. This should not happen. Please contact support.")); } catch (AccountInfoAlreadyAssociatedWithAnotherPrimaryUserIdException | RecipeUserIdAlreadyLinkedWithAnotherPrimaryUserIdException e) { @@ -405,10 +431,10 @@ private void createUserIdMapping(Main main, AppIdentifier appIdentifier, try { UserIdMapping.createUserIdMapping( appIdentifier, getAllProxyStoragesForApp(main, appIdentifier), - primaryLM.superTokensOrExternalUserId, user.externalUserId, + primaryLM.superTokensUserId, user.externalUserId, null, false, true); - primaryLM.superTokensOrExternalUserId = user.externalUserId; + primaryLM.externalUserId = user.externalUserId; } catch (StorageQueryException | ServletException | TenantOrAppNotFoundException | InvalidConfigException | IOException | DbInitException e) { throw new StorageTransactionLogicException(e); @@ -418,7 +444,7 @@ appIdentifier, getAllProxyStoragesForApp(main, appIdentifier), } catch (UnknownSuperTokensUserIdException e) { throw new StorageTransactionLogicException( new Exception("We tried to create the externalUserId mapping for the superTokenUserId " - + primaryLM.superTokensOrExternalUserId + + primaryLM.superTokensUserId + " but it doesn't exist. This should not happen. Please contact support.")); } } @@ -428,7 +454,7 @@ private void createUserMetadata(AppIdentifier appIdentifier, Storage storage, Bu LoginMethod primaryLM) throws StorageTransactionLogicException { if (user.userMetadata != null) { try { - UserMetadata.updateUserMetadata(appIdentifier, storage, primaryLM.superTokensOrExternalUserId, + UserMetadata.updateUserMetadata(appIdentifier, storage, primaryLM.getSuperTokenOrExternalUserId(), user.userMetadata); } catch (StorageQueryException | TenantOrAppNotFoundException e) { throw new StorageTransactionLogicException(e); @@ -471,7 +497,7 @@ private void verifyEmailForAllLoginMethods(AppIdentifier appIdentifier, Transact .getEmailVerificationStorage(storage); emailVerificationSQLStorage .updateIsEmailVerified_Transaction(tenantIdentifier.toAppIdentifier(), con, - lm.superTokensOrExternalUserId, lm.email, true); + lm.getSuperTokenOrExternalUserId(), lm.email, true); } catch (TenantOrAppNotFoundException | StorageQueryException e) { throw new StorageTransactionLogicException(e); } @@ -482,7 +508,7 @@ private void createTotpDevices(Main main, AppIdentifier appIdentifier, Storage s List totpDevices, LoginMethod primaryLM) throws StorageTransactionLogicException { for (TotpDevice totpDevice : totpDevices) { try { - Totp.createDevice(main, appIdentifier, storage, primaryLM.superTokensOrExternalUserId, + Totp.createDevice(main, appIdentifier, storage, primaryLM.getSuperTokenOrExternalUserId(), totpDevice.deviceName, totpDevice.skew, totpDevice.period, totpDevice.secretKey, true, System.currentTimeMillis()); } catch (TenantOrAppNotFoundException | StorageQueryException | FeatureNotEnabledException e) { @@ -509,4 +535,46 @@ private BulkImportUser.LoginMethod getPrimaryLoginMethod(BulkImportUser user) { } return oldestLM; } + + private boolean isProcessedUserFromSameBulkImportUserEntry( + AuthRecipeUserInfo processedUser, BulkImportUser bulkImportUser) { + if (bulkImportUser == null || processedUser == null || bulkImportUser.loginMethods == null || + processedUser.loginMethods == null) { + return false; + } + + for (LoginMethod lm1 : bulkImportUser.loginMethods) { + for (io.supertokens.pluginInterface.authRecipe.LoginMethod lm2 : processedUser.loginMethods) { + if (lm2.recipeId.toString().equals(lm1.recipeId)) { + if (lm1.email != null && !lm1.email.equals(lm2.email)) { + return false; + } + + switch (lm1.recipeId) { + case "emailpassword": + if (lm1.passwordHash != null && !lm1.passwordHash.equals(lm2.passwordHash)) { + return false; + } + break; + case "thirdparty": + if ((lm1.thirdPartyId != null && !lm1.thirdPartyId.equals(lm2.thirdParty.id)) + || (lm1.thirdPartyUserId != null + && !lm1.thirdPartyUserId.equals(lm2.thirdParty.userId))) { + return false; + } + break; + case "passwordless": + if (lm1.phoneNumber != null && !lm1.phoneNumber.equals(lm2.phoneNumber)) { + return false; + } + break; + default: + return false; + } + } + } + } + + return true; + } }