Skip to content

Commit

Permalink
fix: fixing unreliable mutithreaded bulk import with mysql
Browse files Browse the repository at this point in the history
  • Loading branch information
tamassoltesz committed Sep 27, 2024
1 parent 0c0dea8 commit 67c42e9
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ private String validateAndNormaliseExternalUserId(String externalUserId, List<St
}

if (externalUserId.length() > 255) {
errors.add("externalUserId " + externalUserId + " is too long. Max length is 128.");
errors.add("externalUserId " + externalUserId + " is too long. Max length is 255.");
}

if (!allExternalUserIds.add(externalUserId)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.supertokens.pluginInterface.authRecipe.sqlStorage.AuthRecipeSQLStorage;
import io.supertokens.pluginInterface.bulkimport.BulkImportStorage;
import io.supertokens.pluginInterface.bulkimport.BulkImportUser;
import io.supertokens.pluginInterface.bulkimport.exceptions.BulkImportTransactionRolledBackException;
import io.supertokens.pluginInterface.bulkimport.sqlStorage.BulkImportSQLStorage;
import io.supertokens.pluginInterface.exceptions.DbInitException;
import io.supertokens.pluginInterface.exceptions.InvalidConfigException;
Expand Down Expand Up @@ -80,12 +81,16 @@ private void processMultipleUsers(AppIdentifier appIdentifier, List<BulkImportUs
throws TenantOrAppNotFoundException, StorageQueryException, IOException,
DbInitException {

BulkImportUser lastStartedUser = null;
BulkImportUser user = null;
try {
for (BulkImportUser user : users) {
lastStartedUser = user;
if (Main.isTesting && Main.isTesting_skipBulkImportUserValidationInCronJob) {
boolean shouldRetryImmediately = false;
int userIndexPointer = 0;
while(userIndexPointer < users.size()){
user = users.get(userIndexPointer);
if ((Main.isTesting && Main.isTesting_skipBulkImportUserValidationInCronJob) || shouldRetryImmediately) {
// Skip validation when the flag is enabled during testing
// Skip validation if it's a retry run. This already passed validation. A revalidation triggers
// an invalid external user id already exists validation error - which is not true!
} else {
// Validate the user
bulkImportUserUtils.createBulkImportUserFromJSON(main, appIdentifier, user.toJsonObject(), user.id);
Expand Down Expand Up @@ -143,18 +148,19 @@ private void processMultipleUsers(AppIdentifier appIdentifier, List<BulkImportUs
}
}

bulkImportProxyStorage.startTransaction(con -> {
BulkImportUser finalUser = user;
shouldRetryImmediately = bulkImportProxyStorage.startTransaction(con -> {
try {
Storage[] allStoragesForApp = getAllProxyStoragesForApp(main, appIdentifier);
BulkImport.processUserImportSteps(main, con, appIdentifier, bulkImportProxyStorage, user,
BulkImport.processUserImportSteps(main, con, appIdentifier, bulkImportProxyStorage, finalUser,
primaryLM, allStoragesForApp);

// We are updating the primaryUserId in the bulkImportUser entry. This will help us handle
// the inconsistent transaction commit.
// If this update statement fails then the outer transaction will fail as well and the user
// will simpl be processed again. No inconsistency will happen in this
// case.
baseTenantStorage.updateBulkImportUserPrimaryUserId(appIdentifier, user.id,
baseTenantStorage.updateBulkImportUserPrimaryUserId(appIdentifier, finalUser.id,
primaryLM.superTokensUserId);

// We need to commit the transaction manually because we have overridden that in the proxy
Expand All @@ -171,24 +177,40 @@ private void processMultipleUsers(AppIdentifier appIdentifier, List<BulkImportUs
// in the database.
// When processing the user again, we'll check if primaryUserId exists with the same email.
// In this case the user will exist, and we'll simply delete the entry.
baseTenantStorage.deleteBulkImportUsers(appIdentifier, new String[]{user.id});
baseTenantStorage.deleteBulkImportUsers(appIdentifier, new String[]{finalUser.id});
} catch (StorageTransactionLogicException e) {
// We need to rollback the transaction manually because we have overridden that in the proxy
// storage
bulkImportProxyStorage.rollbackTransactionForBulkImportProxyStorage();
handleProcessUserExceptions(app, user, e, baseTenantStorage);
if(isBulkImportTransactionRolledBackIsTheRealCause(e)){
return true;
//@see BulkImportTransactionRolledBackException for explanation
}
handleProcessUserExceptions(app, finalUser, e, baseTenantStorage);
}
return null;
return false;
});

if(!shouldRetryImmediately){
userIndexPointer++;
}
}
} catch (StorageTransactionLogicException | InvalidBulkImportDataException | InvalidConfigException e) {
handleProcessUserExceptions(appIdentifier, lastStartedUser, e, baseTenantStorage);
handleProcessUserExceptions(appIdentifier, user, e, baseTenantStorage);
} finally {
closeAllProxyStorages(); //closing it here to reuse the existing connection with all the users
}
}

private boolean isBulkImportTransactionRolledBackIsTheRealCause(Throwable exception) {
if(exception instanceof BulkImportTransactionRolledBackException){
return true;
} else if(exception.getCause()!=null){
return isBulkImportTransactionRolledBackIsTheRealCause(exception.getCause());
}
return false;
}

private void handleProcessUserExceptions(AppIdentifier appIdentifier, BulkImportUser user, Exception e,
BulkImportSQLStorage baseTenantStorage)
throws StorageQueryException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void shouldProcessBulkImportUsersInTheSameTenant() throws Exception {

@Test
public void shouldProcessBulkImportUsersInNotSoLargeNumbersInTheSameTenant() throws Exception {
Utils.setValueInConfig("bulk_migration_parallelism", "2");
Utils.setValueInConfig("bulk_migration_parallelism", "8");
TestingProcess process = startCronProcess();
Main main = process.getProcess();

Expand All @@ -143,7 +143,7 @@ public void shouldProcessBulkImportUsersInNotSoLargeNumbersInTheSameTenant() thr
List<BulkImportUser> users = generateBulkImportUser(usersCount);
BulkImport.addUsers(appIdentifier, storage, users);

Thread.sleep(6000);
Thread.sleep(60000);

List<BulkImportUser> usersAfterProcessing = storage.getBulkImportUsers(appIdentifier, 1000, null,
null, null);
Expand All @@ -159,7 +159,7 @@ public void shouldProcessBulkImportUsersInNotSoLargeNumbersInTheSameTenant() thr

@Test
public void shouldProcessBulkImportUsersInLargeNumbersInTheSameTenant() throws Exception {
Utils.setValueInConfig("bulk_migration_parallelism", "12");
Utils.setValueInConfig("bulk_migration_parallelism", "8");

TestingProcess process = startCronProcess();
Main main = process.getProcess();
Expand Down Expand Up @@ -259,7 +259,7 @@ public void shouldProcessBulkImportUsersInMultipleTenantsWithDifferentStorages()

@Test
public void shouldProcessBulkImportUsersInLargeNumberInMultipleTenantsWithDifferentStorages() throws Exception {
Utils.setValueInConfig("bulk_migration_parallelism", "4");
Utils.setValueInConfig("bulk_migration_parallelism", "8");

TestingProcess process = startCronProcess();
Main main = process.getProcess();
Expand Down Expand Up @@ -356,7 +356,7 @@ public void shouldDeleteEverythingFromTheDBIfAnythingFails() throws Exception {

@Test
public void shouldDeleteEverythingFromTheDBIfAnythingFailsOnMultipleThreads() throws Exception {
Utils.setValueInConfig("bulk_migration_parallelism", "12");
Utils.setValueInConfig("bulk_migration_parallelism", "8");
// Creating a non-existing user role will result in an error.
// Since, user role creation happens at the last step of the bulk import process, everything should be deleted from the DB.

Expand Down Expand Up @@ -399,7 +399,7 @@ public void shouldDeleteEverythingFromTheDBIfAnythingFailsOnMultipleThreads() th

@Test
public void shouldDeleteOnlyFailedFromTheDBIfAnythingFailsOnMultipleThreads() throws Exception {
Utils.setValueInConfig("bulk_migration_parallelism", "12");
Utils.setValueInConfig("bulk_migration_parallelism", "8");
// Creating a non-existing user role will result in an error.
// Since, user role creation happens at the last step of the bulk import process, everything should be deleted from the DB.

Expand Down

0 comments on commit 67c42e9

Please sign in to comment.