From 67c42e938ec18aeed101ac354d3c342b0bdabbb3 Mon Sep 17 00:00:00 2001 From: tamassoltesz Date: Fri, 27 Sep 2024 12:13:38 +0200 Subject: [PATCH] fix: fixing unreliable mutithreaded bulk import with mysql --- .../bulkimport/BulkImportUserUtils.java | 2 +- .../ProcessBulkUsersImportWorker.java | 44 ++++++++++++++----- .../ProcessBulkImportUsersCronJobTest.java | 12 ++--- 3 files changed, 40 insertions(+), 18 deletions(-) diff --git a/src/main/java/io/supertokens/bulkimport/BulkImportUserUtils.java b/src/main/java/io/supertokens/bulkimport/BulkImportUserUtils.java index 7c6aac760..f2df6892e 100644 --- a/src/main/java/io/supertokens/bulkimport/BulkImportUserUtils.java +++ b/src/main/java/io/supertokens/bulkimport/BulkImportUserUtils.java @@ -265,7 +265,7 @@ private String validateAndNormaliseExternalUserId(String externalUserId, List 255) { - errors.add("externalUserId " + externalUserId + " is too long. Max length is 128."); + errors.add("externalUserId " + externalUserId + " is too long. Max length is 255."); } if (!allExternalUserIds.add(externalUserId)) { diff --git a/src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkUsersImportWorker.java b/src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkUsersImportWorker.java index 8122553d8..ef7dc9a6e 100644 --- a/src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkUsersImportWorker.java +++ b/src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkUsersImportWorker.java @@ -30,6 +30,7 @@ import io.supertokens.pluginInterface.authRecipe.sqlStorage.AuthRecipeSQLStorage; import io.supertokens.pluginInterface.bulkimport.BulkImportStorage; import io.supertokens.pluginInterface.bulkimport.BulkImportUser; +import io.supertokens.pluginInterface.bulkimport.exceptions.BulkImportTransactionRolledBackException; import io.supertokens.pluginInterface.bulkimport.sqlStorage.BulkImportSQLStorage; import io.supertokens.pluginInterface.exceptions.DbInitException; import io.supertokens.pluginInterface.exceptions.InvalidConfigException; @@ -80,12 +81,16 @@ private void processMultipleUsers(AppIdentifier appIdentifier, List { + BulkImportUser finalUser = user; + shouldRetryImmediately = bulkImportProxyStorage.startTransaction(con -> { try { Storage[] allStoragesForApp = getAllProxyStoragesForApp(main, appIdentifier); - BulkImport.processUserImportSteps(main, con, appIdentifier, bulkImportProxyStorage, user, + BulkImport.processUserImportSteps(main, con, appIdentifier, bulkImportProxyStorage, finalUser, primaryLM, allStoragesForApp); // We are updating the primaryUserId in the bulkImportUser entry. This will help us handle @@ -154,7 +160,7 @@ private void processMultipleUsers(AppIdentifier appIdentifier, List users = generateBulkImportUser(usersCount); BulkImport.addUsers(appIdentifier, storage, users); - Thread.sleep(6000); + Thread.sleep(60000); List usersAfterProcessing = storage.getBulkImportUsers(appIdentifier, 1000, null, null, null); @@ -159,7 +159,7 @@ public void shouldProcessBulkImportUsersInNotSoLargeNumbersInTheSameTenant() thr @Test public void shouldProcessBulkImportUsersInLargeNumbersInTheSameTenant() throws Exception { - Utils.setValueInConfig("bulk_migration_parallelism", "12"); + Utils.setValueInConfig("bulk_migration_parallelism", "8"); TestingProcess process = startCronProcess(); Main main = process.getProcess(); @@ -259,7 +259,7 @@ public void shouldProcessBulkImportUsersInMultipleTenantsWithDifferentStorages() @Test public void shouldProcessBulkImportUsersInLargeNumberInMultipleTenantsWithDifferentStorages() throws Exception { - Utils.setValueInConfig("bulk_migration_parallelism", "4"); + Utils.setValueInConfig("bulk_migration_parallelism", "8"); TestingProcess process = startCronProcess(); Main main = process.getProcess(); @@ -356,7 +356,7 @@ public void shouldDeleteEverythingFromTheDBIfAnythingFails() throws Exception { @Test public void shouldDeleteEverythingFromTheDBIfAnythingFailsOnMultipleThreads() throws Exception { - Utils.setValueInConfig("bulk_migration_parallelism", "12"); + Utils.setValueInConfig("bulk_migration_parallelism", "8"); // Creating a non-existing user role will result in an error. // Since, user role creation happens at the last step of the bulk import process, everything should be deleted from the DB. @@ -399,7 +399,7 @@ public void shouldDeleteEverythingFromTheDBIfAnythingFailsOnMultipleThreads() th @Test public void shouldDeleteOnlyFailedFromTheDBIfAnythingFailsOnMultipleThreads() throws Exception { - Utils.setValueInConfig("bulk_migration_parallelism", "12"); + Utils.setValueInConfig("bulk_migration_parallelism", "8"); // Creating a non-existing user role will result in an error. // Since, user role creation happens at the last step of the bulk import process, everything should be deleted from the DB.