Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: bulk import fixes #238

Merged
merged 2 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;

public interface QueryExecutorTemplate {

Expand All @@ -44,6 +45,25 @@ static <T> T execute(Connection con, String QUERY, PreparedStatementValueSetter
}
}

static void executeBatch(Connection connection, String QUERY, List<PreparedStatementValueSetter> setters)
throws SQLException, StorageQueryException {
assert setters != null;
assert !setters.isEmpty();
try (PreparedStatement pst = connection.prepareStatement(QUERY)) {
int counter = 0;
for(PreparedStatementValueSetter setter: setters) {
setter.setValues(pst);
pst.addBatch();
counter++;

if(counter % 100 == 0) {
pst.executeBatch();
}
}
pst.executeBatch(); //for the possible remaining ones
}
}

static int update(Start start, String QUERY, PreparedStatementValueSetter setter)
throws SQLException, StorageQueryException {
try (Connection con = ConnectionPool.getConnection(start)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,21 @@
import io.supertokens.pluginInterface.exceptions.StorageQueryException;
import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException;
import io.supertokens.pluginInterface.multitenancy.AppIdentifier;
import io.supertokens.storage.postgresql.PreparedStatementValueSetter;
import io.supertokens.storage.postgresql.Start;
import io.supertokens.storage.postgresql.config.Config;
import io.supertokens.storage.postgresql.utils.Utils;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static io.supertokens.storage.postgresql.QueryExecutorTemplate.execute;
import static io.supertokens.storage.postgresql.QueryExecutorTemplate.update;
import static io.supertokens.storage.postgresql.QueryExecutorTemplate.*;

public class BulkImportQueries {
static String getQueryToCreateBulkImportUsersTable(Start start) {
Expand Down Expand Up @@ -125,28 +124,23 @@ public static void updateBulkImportUserStatus_Transaction(Start start, Connectio

public static void updateMultipleBulkImportUsersStatusToError_Transaction(Start start, Connection con, AppIdentifier appIdentifier,
@Nonnull Map<String,String> bulkImportUserIdToErrorMessage)
throws SQLException {
throws SQLException, StorageQueryException {
BULK_IMPORT_USER_STATUS errorStatus = BULK_IMPORT_USER_STATUS.FAILED;
String query = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable()
+ " SET status = ?, error_msg = ?, updated_at = ? WHERE app_id = ? and id = ?";
List<PreparedStatementValueSetter> setters = new ArrayList<>();

PreparedStatement setErrorStatement = con.prepareStatement(query);

int counter = 0;
for(String bulkImportUserId : bulkImportUserIdToErrorMessage.keySet()){
setErrorStatement.setString(1, errorStatus.toString());
setErrorStatement.setString(2, bulkImportUserIdToErrorMessage.get(bulkImportUserId));
setErrorStatement.setLong(3, System.currentTimeMillis());
setErrorStatement.setString(4, appIdentifier.getAppId());
setErrorStatement.setString(5, bulkImportUserId);
setErrorStatement.addBatch();

if(counter % 100 == 0) {
setErrorStatement.executeBatch();
}
setters.add(pst -> {
pst.setString(1, errorStatus.toString());
pst.setString(2, bulkImportUserIdToErrorMessage.get(bulkImportUserId));
pst.setLong(3, System.currentTimeMillis());
pst.setString(4, appIdentifier.getAppId());
pst.setString(5, bulkImportUserId);
});
}

setErrorStatement.executeBatch();
executeBatch(con, query, setters);
}

public static List<BulkImportUser> getBulkImportUsersAndChangeStatusToProcessing(Start start,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,19 @@
import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException;
import io.supertokens.pluginInterface.multitenancy.AppIdentifier;
import io.supertokens.pluginInterface.multitenancy.TenantIdentifier;
import io.supertokens.storage.postgresql.PreparedStatementValueSetter;
import io.supertokens.storage.postgresql.Start;
import io.supertokens.storage.postgresql.config.Config;
import io.supertokens.storage.postgresql.utils.Utils;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
import java.util.stream.Collectors;

import static io.supertokens.pluginInterface.RECIPE_ID.EMAIL_PASSWORD;
import static io.supertokens.storage.postgresql.QueryExecutorTemplate.execute;
import static io.supertokens.storage.postgresql.QueryExecutorTemplate.update;
import static io.supertokens.storage.postgresql.QueryExecutorTemplate.*;
import static io.supertokens.storage.postgresql.config.Config.getConfig;
import static java.lang.System.currentTimeMillis;

Expand Down Expand Up @@ -354,59 +353,52 @@ public static void signUpMultipleForBulkImport_Transaction(Start start, Connecti
"INSERT INTO " + getConfig(start).getEmailPasswordUserToTenantTable()
+ "(app_id, tenant_id, user_id, email)" + " VALUES(?, ?, ?, ?)";

PreparedStatement appIdToUserId = sqlCon.prepareStatement(app_id_to_user_id_QUERY);
PreparedStatement allAuthRecipeUsers = sqlCon.prepareStatement(all_auth_recipe_users_QUERY);
PreparedStatement emailPasswordUsers = sqlCon.prepareStatement(emailpassword_users_QUERY);
PreparedStatement emailPasswordUsersToTenant = sqlCon.prepareStatement(emailpassword_users_to_tenant_QUERY);
List<PreparedStatementValueSetter> appIdToUserIdSetters = new ArrayList<>();
List<PreparedStatementValueSetter> allAuthRecipeUsersSetters = new ArrayList<>();
List<PreparedStatementValueSetter> emailPasswordUsersSetters = new ArrayList<>();
List<PreparedStatementValueSetter> emailPasswordUsersToTenantSetters = new ArrayList<>();

int counter = 0;
for (EmailPasswordImportUser user : usersToSignUp) {
String userId = user.userId;
TenantIdentifier tenantIdentifier = user.tenantIdentifier;

appIdToUserId.setString(1, tenantIdentifier.getAppId());
appIdToUserId.setString(2, userId);
appIdToUserId.setString(3, userId);
appIdToUserId.setString(4, EMAIL_PASSWORD.toString());
appIdToUserId.addBatch();


allAuthRecipeUsers.setString(1, tenantIdentifier.getAppId());
allAuthRecipeUsers.setString(2, tenantIdentifier.getTenantId());
allAuthRecipeUsers.setString(3, userId);
allAuthRecipeUsers.setString(4, userId);
allAuthRecipeUsers.setString(5, EMAIL_PASSWORD.toString());
allAuthRecipeUsers.setLong(6, user.timeJoinedMSSinceEpoch);
allAuthRecipeUsers.setLong(7, user.timeJoinedMSSinceEpoch);
allAuthRecipeUsers.addBatch();

emailPasswordUsers.setString(1, tenantIdentifier.getAppId());
emailPasswordUsers.setString(2, userId);
emailPasswordUsers.setString(3, user.email);
emailPasswordUsers.setString(4, user.passwordHash);
emailPasswordUsers.setLong(5, user.timeJoinedMSSinceEpoch);
emailPasswordUsers.addBatch();

emailPasswordUsersToTenant.setString(1, tenantIdentifier.getAppId());
emailPasswordUsersToTenant.setString(2, tenantIdentifier.getTenantId());
emailPasswordUsersToTenant.setString(3, userId);
emailPasswordUsersToTenant.setString(4, user.email);
emailPasswordUsersToTenant.addBatch();
counter++;
if (counter % 100 == 0) {
appIdToUserId.executeBatch();
allAuthRecipeUsers.executeBatch();
emailPasswordUsers.executeBatch();
emailPasswordUsersToTenant.executeBatch();
}
}
appIdToUserIdSetters.add(pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, userId);
pst.setString(3, userId);
pst.setString(4, EMAIL_PASSWORD.toString());
});

allAuthRecipeUsersSetters.add(pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, tenantIdentifier.getTenantId());
pst.setString(3, userId);
pst.setString(4, userId);
pst.setString(5, EMAIL_PASSWORD.toString());
pst.setLong(6, user.timeJoinedMSSinceEpoch);
pst.setLong(7, user.timeJoinedMSSinceEpoch);
});

//execute the remaining ones
appIdToUserId.executeBatch();
allAuthRecipeUsers.executeBatch();
emailPasswordUsers.executeBatch();
emailPasswordUsersToTenant.executeBatch();
emailPasswordUsersSetters.add(pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, userId);
pst.setString(3, user.email);
pst.setString(4, user.passwordHash);
pst.setLong(5, user.timeJoinedMSSinceEpoch);
});

emailPasswordUsersToTenantSetters.add(pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, tenantIdentifier.getTenantId());
pst.setString(3, userId);
pst.setString(4, user.email);
});
}

executeBatch(sqlCon, app_id_to_user_id_QUERY, appIdToUserIdSetters);
executeBatch(sqlCon, all_auth_recipe_users_QUERY, allAuthRecipeUsersSetters);
executeBatch(sqlCon, emailpassword_users_QUERY, emailPasswordUsersSetters);
executeBatch(sqlCon, emailpassword_users_to_tenant_QUERY, emailPasswordUsersToTenantSetters);
sqlCon.commit();
} catch (SQLException throwables) {
throw new StorageTransactionLogicException(throwables);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,17 @@
import io.supertokens.pluginInterface.multitenancy.AppIdentifier;
import io.supertokens.pluginInterface.multitenancy.TenantIdentifier;
import io.supertokens.pluginInterface.sqlStorage.TransactionConnection;
import io.supertokens.storage.postgresql.PreparedStatementValueSetter;
import io.supertokens.storage.postgresql.Start;
import io.supertokens.storage.postgresql.config.Config;
import io.supertokens.storage.postgresql.utils.Utils;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;

import static io.supertokens.storage.postgresql.QueryExecutorTemplate.execute;
import static io.supertokens.storage.postgresql.QueryExecutorTemplate.update;
import static io.supertokens.storage.postgresql.QueryExecutorTemplate.*;
import static io.supertokens.storage.postgresql.config.Config.getConfig;
import static java.lang.System.currentTimeMillis;

Expand Down Expand Up @@ -130,41 +129,25 @@ public static void updateMultipleUsersIsEmailVerified_Transaction(Start start, C
boolean isEmailVerified)
throws SQLException, StorageQueryException {

String QUERY;
if (isEmailVerified) {
String QUERY = "INSERT INTO " + getConfig(start).getEmailVerificationTable()
QUERY = "INSERT INTO " + getConfig(start).getEmailVerificationTable()
+ "(app_id, user_id, email) VALUES(?, ?, ?)";
PreparedStatement insertQuery = con.prepareStatement(QUERY);
int counter = 0;
for(Map.Entry<String, String> emailToUser : emailToUserIds.entrySet()){
insertQuery.setString(1, appIdentifier.getAppId());
insertQuery.setString(2, emailToUser.getKey());
insertQuery.setString(3, emailToUser.getValue());
insertQuery.addBatch();

counter++;
if (counter % 100 == 0) {
insertQuery.executeBatch();
}
}
insertQuery.executeBatch();
} else {
String QUERY = "DELETE FROM " + getConfig(start).getEmailVerificationTable()
QUERY = "DELETE FROM " + getConfig(start).getEmailVerificationTable()
+ " WHERE app_id = ? AND user_id = ? AND email = ?";
PreparedStatement deleteQuery = con.prepareStatement(QUERY);
int counter = 0;
for (Map.Entry<String, String> emailToUser : emailToUserIds.entrySet()) {
deleteQuery.setString(1, appIdentifier.getAppId());
deleteQuery.setString(2, emailToUser.getValue());
deleteQuery.setString(3, emailToUser.getKey());
deleteQuery.addBatch();

counter++;
if (counter % 100 == 0) {
deleteQuery.executeBatch();
}
}
deleteQuery.executeBatch();
}

List<PreparedStatementValueSetter> setters = new ArrayList<>();

for(Map.Entry<String, String> emailToUser : emailToUserIds.entrySet()){
setters.add(pst -> {
pst.setString(1, appIdentifier.getAppId());
pst.setString(2, emailToUser.getKey());
pst.setString(3, emailToUser.getValue());
});
}
executeBatch(con, QUERY, setters);
}

public static void deleteAllEmailVerificationTokensForUser_Transaction(Start start, Connection con,
Expand Down Expand Up @@ -610,30 +593,30 @@ public static void updateMultipleIsEmailVerifiedToExternalUserIds(Start start, A
+ " SET user_id = ? WHERE app_id = ? AND user_id = ?";
String update_email_verification_tokens_table_query = "UPDATE " + getConfig(start).getEmailVerificationTokensTable()
+ " SET user_id = ? WHERE app_id = ? AND user_id = ?";
PreparedStatement updateEmailVerificationQuery = sqlCon.prepareStatement(update_email_verification_table_query);
PreparedStatement updateEmailVerificationTokensQuery = sqlCon.prepareStatement(update_email_verification_tokens_table_query);

int counter = 0;
List<PreparedStatementValueSetter> emailVerificationSetters = new ArrayList<>();
List<PreparedStatementValueSetter> emalVerificationTokensSetters = new ArrayList<>();

for (String supertokensUserId : supertokensUserIdToExternalUserId.keySet()){
updateEmailVerificationQuery.setString(1, supertokensUserIdToExternalUserId.get(supertokensUserId));
updateEmailVerificationQuery.setString(2, appIdentifier.getAppId());
updateEmailVerificationQuery.setString(3, supertokensUserId);
updateEmailVerificationQuery.addBatch();

updateEmailVerificationTokensQuery.setString(1, supertokensUserIdToExternalUserId.get(supertokensUserId));
updateEmailVerificationTokensQuery.setString(2, appIdentifier.getAppId());
updateEmailVerificationTokensQuery.setString(3, supertokensUserId);
updateEmailVerificationTokensQuery.addBatch();

counter++;
if(counter % 100 == 0) {
updateEmailVerificationQuery.executeBatch();
updateEmailVerificationTokensQuery.executeBatch();
}
emailVerificationSetters.add(pst -> {
pst.setString(1, supertokensUserIdToExternalUserId.get(supertokensUserId));
pst.setString(2, appIdentifier.getAppId());
pst.setString(3, supertokensUserId);
});

emalVerificationTokensSetters.add(pst -> {
pst.setString(1, supertokensUserIdToExternalUserId.get(supertokensUserId));
pst.setString(2, appIdentifier.getAppId());
pst.setString(3, supertokensUserId);
});
}

if(emailVerificationSetters.isEmpty()){
return null;
}
updateEmailVerificationQuery.executeBatch();
updateEmailVerificationTokensQuery.executeBatch();

executeBatch(sqlCon, update_email_verification_table_query, emailVerificationSetters);
executeBatch(sqlCon, update_email_verification_tokens_table_query, emalVerificationTokensSetters);
} catch (SQLException e) {
throw new StorageTransactionLogicException(e);
}
Expand Down
Loading
Loading