Skip to content

Commit

Permalink
fix: fixing issues and failing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tamassoltesz committed Dec 4, 2024
1 parent 535d00b commit 22b7726
Show file tree
Hide file tree
Showing 9 changed files with 306 additions and 133 deletions.
18 changes: 10 additions & 8 deletions src/main/java/io/supertokens/bulkimport/BulkImport.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import io.supertokens.pluginInterface.passwordless.PasswordlessImportUser;
import io.supertokens.pluginInterface.passwordless.exception.DuplicatePhoneNumberException;
import io.supertokens.pluginInterface.sqlStorage.SQLStorage;
import io.supertokens.pluginInterface.sqlStorage.TransactionConnection;
import io.supertokens.pluginInterface.thirdparty.ThirdPartyImportUser;
import io.supertokens.pluginInterface.thirdparty.exception.DuplicateThirdPartyUserException;
import io.supertokens.pluginInterface.totp.TOTPDevice;
Expand Down Expand Up @@ -177,7 +176,7 @@ public static synchronized AuthRecipeUserInfo importUser(Main main, AppIdentifie
try {
Storage[] allStoragesForApp = getAllProxyStoragesForApp(main, appIdentifier);

processUsersImportSteps(main, con, appIdentifier, bulkImportProxyStorage, List.of(user), allStoragesForApp);
processUsersImportSteps(main, appIdentifier, bulkImportProxyStorage, List.of(user), allStoragesForApp);

bulkImportProxyStorage.commitTransactionForBulkImportProxyStorage();

Expand All @@ -200,11 +199,11 @@ public static synchronized AuthRecipeUserInfo importUser(Main main, AppIdentifie
}
}

public static void processUsersImportSteps(Main main, TransactionConnection connection, AppIdentifier appIdentifier,
public static void processUsersImportSteps(Main main, AppIdentifier appIdentifier,
Storage bulkImportProxyStorage, List<BulkImportUser> users, Storage[] allStoragesForApp)
throws StorageTransactionLogicException {
processUsersLoginMethods(main, appIdentifier, bulkImportProxyStorage, users);
try {
processUsersLoginMethods(main, appIdentifier, bulkImportProxyStorage, users);
createPrimaryUsersAndLinkAccounts(main, appIdentifier, bulkImportProxyStorage, users);
createMultipleUserIdMapping(appIdentifier, users, allStoragesForApp);
verifyMultipleEmailForAllLoginMethods(appIdentifier, bulkImportProxyStorage, users);
Expand Down Expand Up @@ -242,7 +241,7 @@ public static void processUsersLoginMethods(Main main, AppIdentifier appIdentifi
appIdentifier));
}
if (sortedLoginMethods.containsKey("passwordless")) {
importedUsers.addAll(processPasswordlessLoginMethods(appIdentifier, storage,
importedUsers.addAll(processPasswordlessLoginMethods(main, appIdentifier, storage,
sortedLoginMethods.get("passwordless")));
}
Set<String> actualKeys = new HashSet<>(sortedLoginMethods.keySet());
Expand All @@ -268,7 +267,7 @@ public static void processUsersLoginMethods(Main main, AppIdentifier appIdentifi
}
}

private static List<? extends ImportUserBase> processPasswordlessLoginMethods(AppIdentifier appIdentifier, Storage storage,
private static List<? extends ImportUserBase> processPasswordlessLoginMethods(Main main, AppIdentifier appIdentifier, Storage storage,
List<LoginMethod> loginMethods)
throws StorageTransactionLogicException {
try {
Expand All @@ -279,13 +278,13 @@ private static List<? extends ImportUserBase> processPasswordlessLoginMethods(Ap
appIdentifier.getConnectionUriDomain(),
appIdentifier.getAppId(), loginMethod.tenantIds.get(
0)); // the cron runs per app. The app stays the same, the tenant can change

usersToImport.add(new PasswordlessImportUser(userId, loginMethod.phoneNumber,
usersToImport.add(new PasswordlessImportUser(userId, loginMethod.phoneNumber,
loginMethod.email, tenantIdentifierForLoginMethod, loginMethod.timeJoinedInMSSinceEpoch));
loginMethod.superTokensUserId = userId;
}

Passwordless.createPasswordlessUsers(storage, usersToImport);

return usersToImport;
} catch (StorageQueryException | StorageTransactionLogicException e) {
if (e.getCause() instanceof BulkImportBatchInsertException) {
Expand Down Expand Up @@ -330,6 +329,7 @@ private static List<? extends ImportUserBase> processThirdpartyLoginMethods(Main
loginMethod.superTokensUserId = userId;
}
ThirdParty.createMultipleThirdPartyUsers(storage, usersToImport);

return usersToImport;
} catch (StorageQueryException | StorageTransactionLogicException e) {
if (e.getCause() instanceof BulkImportBatchInsertException) {
Expand Down Expand Up @@ -359,6 +359,7 @@ private static List<? extends ImportUserBase> processEmailPasswordLoginMethods(
AppIdentifier appIdentifier)
throws StorageTransactionLogicException {
try {

//prepare data for batch import
List<EmailPasswordImportUser> usersToImport = new ArrayList<>();
for(LoginMethod emailPasswordLoginMethod : loginMethods) {
Expand All @@ -379,6 +380,7 @@ private static List<? extends ImportUserBase> processEmailPasswordLoginMethods(
}

EmailPassword.createMultipleUsersWithPasswordHash(storage, usersToImport);

return usersToImport;
} catch (StorageQueryException | StorageTransactionLogicException e) {
if(e.getCause() instanceof BulkImportBatchInsertException){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ protected void doTaskPerApp(AppIdentifier app)
this.batchSize);

if(users == null || users.isEmpty()) {
// "No more users to process!"
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.supertokens.multitenancy.Multitenancy;
import io.supertokens.output.Logging;
import io.supertokens.pluginInterface.Storage;
import io.supertokens.pluginInterface.authRecipe.AuthRecipeUserInfo;
import io.supertokens.pluginInterface.bulkimport.BulkImportUser;
import io.supertokens.pluginInterface.bulkimport.exceptions.BulkImportBatchInsertException;
import io.supertokens.pluginInterface.bulkimport.exceptions.BulkImportTransactionRolledBackException;
Expand Down Expand Up @@ -76,7 +75,6 @@ private void processMultipleUsers(AppIdentifier appIdentifier, List<BulkImportUs
BulkImportSQLStorage baseTenantStorage)
throws TenantOrAppNotFoundException, StorageQueryException, IOException,
DbInitException {

BulkImportUser user = null;
try {
final Storage[] allStoragesForApp = getAllProxyStoragesForApp(main, appIdentifier);
Expand Down Expand Up @@ -108,36 +106,37 @@ private void processMultipleUsers(AppIdentifier appIdentifier, List<BulkImportUs
}
// Since all the tenants of a user must share the storage, we will just use the
// storage of the first tenantId of the first loginMethod
TenantIdentifier firstTenantIdentifier = new TenantIdentifier(appIdentifier.getConnectionUriDomain(),
appIdentifier.getAppId(), validUsers.get(0).loginMethods.get(0).tenantIds.get(0));

SQLStorage bulkImportProxyStorage = (SQLStorage) getBulkImportProxyStorage(firstTenantIdentifier);

bulkImportProxyStorage.startTransaction(con -> {
try {

BulkImport.processUsersImportSteps(main, con, appIdentifier, bulkImportProxyStorage, validUsers, allStoragesForApp);

bulkImportProxyStorage.commitTransactionForBulkImportProxyStorage();

String[] toDelete = new String[validUsers.size()];
for(int i = 0; i < validUsers.size(); i++) {
toDelete[i] = validUsers.get(i).id;
}
Map<SQLStorage, List<BulkImportUser>> partitionedUsers = partitionUsersByStorage(appIdentifier, validUsers);
for(SQLStorage bulkImportProxyStorage : partitionedUsers.keySet()) {
boolean shouldRetryImmediatley = true;
while (shouldRetryImmediatley) {
shouldRetryImmediatley = bulkImportProxyStorage.startTransaction(con -> {
try {
BulkImport.processUsersImportSteps(main, appIdentifier, bulkImportProxyStorage, partitionedUsers.get(bulkImportProxyStorage),
allStoragesForApp);

bulkImportProxyStorage.commitTransactionForBulkImportProxyStorage();

String[] toDelete = new String[validUsers.size()];
for (int i = 0; i < validUsers.size(); i++) {
toDelete[i] = validUsers.get(i).id;
}

baseTenantStorage.deleteBulkImportUsers(appIdentifier, toDelete);
} catch (StorageTransactionLogicException e) {
// We need to rollback the transaction manually because we have overridden that in the proxy
// storage
bulkImportProxyStorage.rollbackTransactionForBulkImportProxyStorage();
if(isBulkImportTransactionRolledBackIsTheRealCause(e)){
return true;
//@see BulkImportTransactionRolledBackException for explanation
}
handleProcessUserExceptions(app, validUsers, e, baseTenantStorage);
baseTenantStorage.deleteBulkImportUsers(appIdentifier, toDelete);
} catch (StorageTransactionLogicException e) {
// We need to rollback the transaction manually because we have overridden that in the proxy
// storage
bulkImportProxyStorage.rollbackTransactionForBulkImportProxyStorage();
if (isBulkImportTransactionRolledBackIsTheRealCause(e)) {
return true;
//@see BulkImportTransactionRolledBackException for explanation
}
handleProcessUserExceptions(app, validUsers, e, baseTenantStorage);
}
return false;
});
}
return false;
});
}
} catch (StorageTransactionLogicException | InvalidConfigException e) {
throw new RuntimeException(e);
} catch (BulkImportBatchInsertException insertException) {
Expand Down Expand Up @@ -217,7 +216,7 @@ private static void handleBulkImportException(List<BulkImportUser> usersBatch, B
.filter(bulkImportUser ->
bulkImportUser.loginMethods.stream()
.map(loginMethod -> loginMethod.superTokensUserId)
.anyMatch(s -> s.equals(userid))).findFirst();
.anyMatch(s -> s!= null && s.equals(userid))).findFirst();
if(userWithId.isPresent()){
id = userWithId.get().id;
}
Expand Down Expand Up @@ -280,46 +279,19 @@ private void closeAllProxyStorages() throws StorageQueryException {
userPoolToStorageMap.clear();
}

// Checks if the importedUser was processed from the same bulkImportUser entry.
private boolean isProcessedUserFromSameBulkImportUserEntry(
AuthRecipeUserInfo importedUser, BulkImportUser bulkImportEntry) {
if (bulkImportEntry == null || importedUser == null || bulkImportEntry.loginMethods == null ||
importedUser.loginMethods == null) {
return false;
}

for (BulkImportUser.LoginMethod lm1 : bulkImportEntry.loginMethods) {
for (io.supertokens.pluginInterface.authRecipe.LoginMethod lm2 : importedUser.loginMethods) {
if (lm2.recipeId.toString().equals(lm1.recipeId)) {
if (lm1.email != null && !lm1.email.equals(lm2.email)) {
return false;
}
private Map<SQLStorage, List<BulkImportUser>> partitionUsersByStorage(AppIdentifier appIdentifier, List<BulkImportUser> users)
throws DbInitException, TenantOrAppNotFoundException, InvalidConfigException, IOException {
Map<SQLStorage, List<BulkImportUser>> result = new HashMap<>();
for(BulkImportUser user: users) {
TenantIdentifier firstTenantIdentifier = new TenantIdentifier(appIdentifier.getConnectionUriDomain(),
appIdentifier.getAppId(), user.loginMethods.get(0).tenantIds.get(0));

switch (lm1.recipeId) {
case "emailpassword":
if (lm1.passwordHash != null && !lm1.passwordHash.equals(lm2.passwordHash)) {
return false;
}
break;
case "thirdparty":
if ((lm1.thirdPartyId != null && !lm1.thirdPartyId.equals(lm2.thirdParty.id))
|| (lm1.thirdPartyUserId != null
&& !lm1.thirdPartyUserId.equals(lm2.thirdParty.userId))) {
return false;
}
break;
case "passwordless":
if (lm1.phoneNumber != null && !lm1.phoneNumber.equals(lm2.phoneNumber)) {
return false;
}
break;
default:
return false;
}
}
SQLStorage bulkImportProxyStorage = (SQLStorage) getBulkImportProxyStorage(firstTenantIdentifier);
if(!result.containsKey(bulkImportProxyStorage)){
result.put(bulkImportProxyStorage, new ArrayList<>());
}
result.get(bulkImportProxyStorage).add(user);
}

return true;
}
return result;
}
}
11 changes: 6 additions & 5 deletions src/main/java/io/supertokens/emailpassword/EmailPassword.java
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,12 @@ public static ImportUserResponse createUserWithPasswordHash(TenantIdentifier ten
public static void createMultipleUsersWithPasswordHash(Storage storage,
List<EmailPasswordImportUser> usersToImport)
throws StorageQueryException, TenantOrAppNotFoundException, StorageTransactionLogicException {
EmailPasswordSQLStorage epStorage = StorageUtils.getEmailPasswordStorage(storage);
epStorage.startTransaction(con -> {
epStorage.signUpMultipleViaBulkImport_Transaction(con, usersToImport);
return null;
});

EmailPasswordSQLStorage epStorage = StorageUtils.getEmailPasswordStorage(storage);
epStorage.startTransaction(con -> {
epStorage.signUpMultipleViaBulkImport_Transaction(con, usersToImport);
return null;
});
}

@TestOnly
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/io/supertokens/passwordless/Passwordless.java
Original file line number Diff line number Diff line change
Expand Up @@ -554,13 +554,13 @@ public static void createPasswordlessUsers(Storage storage,
List<PasswordlessImportUser> importUsers)
throws TenantOrAppNotFoundException, StorageQueryException,
StorageTransactionLogicException {
PasswordlessSQLStorage passwordlessStorage = StorageUtils.getPasswordlessStorage(storage);
PasswordlessSQLStorage passwordlessStorage = StorageUtils.getPasswordlessStorage(storage);

passwordlessStorage.startTransaction(con -> {
passwordlessStorage.importPasswordlessUsers_Transaction(con, importUsers);
passwordlessStorage.commitTransaction(con);
return null;
});
passwordlessStorage.startTransaction(con -> {
passwordlessStorage.importPasswordlessUsers_Transaction(con, importUsers);
passwordlessStorage.commitTransaction(con);
return null;
});
}

@TestOnly
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/io/supertokens/thirdparty/ThirdParty.java
Original file line number Diff line number Diff line change
Expand Up @@ -359,13 +359,13 @@ public static SignInUpResponse createThirdPartyUser(TenantIdentifier tenantIdent
public static void createMultipleThirdPartyUsers(Storage storage,
List<ThirdPartyImportUser> usersToImport)
throws StorageQueryException, StorageTransactionLogicException, TenantOrAppNotFoundException {
ThirdPartySQLStorage tpStorage = StorageUtils.getThirdPartyStorage(storage);

tpStorage.startTransaction(con -> {
tpStorage.importThirdPartyUsers_Transaction(con, usersToImport);
tpStorage.commitTransaction(con);
return null;
});
ThirdPartySQLStorage tpStorage = StorageUtils.getThirdPartyStorage(storage);
tpStorage.startTransaction(con -> {
tpStorage.importThirdPartyUsers_Transaction(con, usersToImport);
tpStorage.commitTransaction(con);
return null;
});
}

@Deprecated
Expand Down
Loading

0 comments on commit 22b7726

Please sign in to comment.