Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: multithreaded bulk import #162

Merged
merged 17 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

## [6.4.0]

- Adds support for Bulk Import
- Adds `BulkImportUser` class to represent a bulk import user
- Adds `BulkImportStorage` interface
- Adds `DuplicateUserIdException` class
- Adds `createBulkImportProxyStorageInstance` method in `Storage` class
- Adds `closeConnectionForBulkImportProxyStorage`, `commitTransactionForBulkImportProxyStorage`, and `rollbackTransactionForBulkImportProxyStorage` method in `SQLStorage` class
- Adds `BulkImportTransactionRolledBackException` for signaling if the transaction was rolled back by the DBMS

## [6.3.0] - 2024-10-02

- Adds `OAuthStorage` interface for OAuth Provider support
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ plugins {
id 'java-library'
}

version = "6.3.0"
version = "6.4.0"

repositories {
mavenCentral()
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/io/supertokens/pluginInterface/Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@
import io.supertokens.pluginInterface.multitenancy.exceptions.TenantOrAppNotFoundException;

import java.util.List;
import java.util.Map;
import java.util.Set;

public interface Storage {

// if silent is true, do not log anything out on the console
void constructor(String processId, boolean silent, boolean isTesting);

Storage createBulkImportProxyStorageInstance();

void loadConfig(JsonObject jsonConfig, Set<LOG_LEVEL> logLevels, TenantIdentifier tenantIdentifier)
throws InvalidConfigException;

Expand Down Expand Up @@ -78,6 +81,9 @@ void setKeyValue(TenantIdentifier tenantIdentifier, String key, KeyValueInfo inf
boolean isUserIdBeingUsedInNonAuthRecipe(AppIdentifier appIdentifier, String className, String userId)
throws StorageQueryException;

Map<String, List<String>> findNonAuthRecipesWhereForUserIdsUsed(AppIdentifier appIdentifier, List<String> userIds)
throws StorageQueryException;

// to be used for testing purposes only. This function will add dummy data to non-auth tables.
void addInfoToNonAuthRecipesBasedOnUserId(TenantIdentifier tenantIdentifier, String className, String userId)
throws StorageQueryException;
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/io/supertokens/pluginInterface/StorageUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.supertokens.pluginInterface;

import io.supertokens.pluginInterface.authRecipe.sqlStorage.AuthRecipeSQLStorage;
import io.supertokens.pluginInterface.bulkimport.sqlStorage.BulkImportSQLStorage;
import io.supertokens.pluginInterface.dashboard.sqlStorage.DashboardSQLStorage;
import io.supertokens.pluginInterface.emailpassword.sqlStorage.EmailPasswordSQLStorage;
import io.supertokens.pluginInterface.emailverification.sqlStorage.EmailVerificationSQLStorage;
Expand Down Expand Up @@ -134,6 +135,15 @@ public static MultitenancyStorage getMultitenancyStorage(Storage storage) {
return (MultitenancyStorage) storage;
}

public static BulkImportSQLStorage getBulkImportStorage(Storage storage) {
if (storage.getType() != STORAGE_TYPE.SQL) {
// we only support SQL for now
throw new UnsupportedOperationException("");
}

return (BulkImportSQLStorage) storage;
}

public static OAuthStorage getOAuthStorage(Storage storage) {
if (storage.getType() != STORAGE_TYPE.SQL) {
// we only support SQL for now
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.List;

public interface AuthRecipeStorage extends Storage {

Expand All @@ -45,6 +46,8 @@ AuthRecipeUserInfo[] getUsers(TenantIdentifier tenantIdentifier, @Nonnull Intege

boolean doesUserIdExist(TenantIdentifier tenantIdentifierIdentifier, String userId) throws StorageQueryException;

List<String> findExistingUserIds(AppIdentifier appIdentifier, List<String> userIds) throws StorageQueryException;

AuthRecipeUserInfo getPrimaryUserById(AppIdentifier appIdentifier, String userId) throws StorageQueryException;

String getPrimaryUserIdStrForUserId(AppIdentifier appIdentifier, String userId) throws StorageQueryException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,19 @@
import io.supertokens.pluginInterface.sqlStorage.SQLStorage;
import io.supertokens.pluginInterface.sqlStorage.TransactionConnection;

import java.util.List;
import java.util.Map;

public interface AuthRecipeSQLStorage extends AuthRecipeStorage, SQLStorage {

AuthRecipeUserInfo getPrimaryUserById_Transaction(AppIdentifier appIdentifier, TransactionConnection con,
String userId)
throws StorageQueryException;

List<AuthRecipeUserInfo> getPrimaryUsersByIds_Transaction(AppIdentifier appIdentifier, TransactionConnection con,
List<String> userIds)
throws StorageQueryException;

// lock order:
// - emailpassword table
// - thirdparty table
Expand All @@ -38,6 +45,13 @@ AuthRecipeUserInfo[] listPrimaryUsersByEmail_Transaction(AppIdentifier appIdenti
String email)
throws StorageQueryException;

//helper method for bulk import
AuthRecipeUserInfo[] listPrimaryUsersByMultipleEmailsOrPhoneNumbersOrThirdparty_Transaction(AppIdentifier appIdentifier,
TransactionConnection con,
List<String> emails, List<String> phones,
Map<String, String> thirdpartyIdToThirdpartyUserId)
throws StorageQueryException;

// locks only passwordless table
AuthRecipeUserInfo[] listPrimaryUsersByPhoneNumber_Transaction(AppIdentifier appIdentifier,
TransactionConnection con, String phoneNumber)
Expand All @@ -52,9 +66,15 @@ AuthRecipeUserInfo[] listPrimaryUsersByThirdPartyInfo_Transaction(AppIdentifier
void makePrimaryUser_Transaction(AppIdentifier appIdentifier, TransactionConnection con, String userId)
throws StorageQueryException;

void makePrimaryUsers_Transaction(AppIdentifier appIdentifier, TransactionConnection con, List<String> userIds)
throws StorageQueryException;

void linkAccounts_Transaction(AppIdentifier appIdentifier, TransactionConnection con, String recipeUserId,
String primaryUserId) throws StorageQueryException;

void linkMultipleAccounts_Transaction(AppIdentifier appIdentifier, TransactionConnection con,
Map<String, String> recipeUserIdByPrimaryUserId) throws StorageQueryException;

void unlinkAccounts_Transaction(AppIdentifier appIdentifier, TransactionConnection con, String primaryUserId,
String recipeUserId)
throws StorageQueryException;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved.
*
* This software is licensed under the Apache License, Version 2.0 (the
* "License") as published by the Apache Software Foundation.
*
* You may not use this file except in compliance with the License. You may
* obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package io.supertokens.pluginInterface.bulkimport;

import java.util.List;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import io.supertokens.pluginInterface.exceptions.StorageQueryException;
import io.supertokens.pluginInterface.multitenancy.AppIdentifier;
import io.supertokens.pluginInterface.multitenancy.exceptions.TenantOrAppNotFoundException;
import io.supertokens.pluginInterface.nonAuthRecipe.NonAuthRecipeStorage;

public interface BulkImportStorage extends NonAuthRecipeStorage {
/**
* Add users to the bulk_import_users table
*/
void addBulkImportUsers(AppIdentifier appIdentifier, List<BulkImportUser> users)
throws StorageQueryException,
TenantOrAppNotFoundException,
io.supertokens.pluginInterface.bulkimport.exceptions.DuplicateUserIdException;

/**
* Get users from the bulk_import_users table
*/
List<BulkImportUser> getBulkImportUsers(AppIdentifier appIdentifier, @Nonnull Integer limit, @Nullable BULK_IMPORT_USER_STATUS status,
@Nullable String bulkImportUserId, @Nullable Long createdAt) throws StorageQueryException;

/**
* Delete users by id from the bulk_import_users table
*/
List<String> deleteBulkImportUsers(AppIdentifier appIdentifier, @Nonnull String[] bulkImportUserIds) throws StorageQueryException;

/**
* Returns the users from the bulk_import_users table for processing
*/
List<BulkImportUser> getBulkImportUsersAndChangeStatusToProcessing(AppIdentifier appIdentifier, @Nonnull Integer limit) throws StorageQueryException;


/**
* Update the bulk_import_user's primary_user_id by bulk_import_user_id
*/
void updateBulkImportUserPrimaryUserId(AppIdentifier appIdentifier, @Nonnull String bulkImportUserId, @Nonnull String primaryUserId) throws StorageQueryException;

/**
* Returns the count of users from the bulk_import_users table
*/
long getBulkImportUsersCount(AppIdentifier appIdentifier, @Nullable BULK_IMPORT_USER_STATUS status) throws StorageQueryException;

public enum BULK_IMPORT_USER_STATUS {
NEW, PROCESSING, FAILED
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved.
*
* This software is licensed under the Apache License, Version 2.0 (the
* "License") as published by the Apache Software Foundation.
*
* You may not use this file except in compliance with the License. You may
* obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package io.supertokens.pluginInterface.bulkimport;

import java.util.List;

import com.google.gson.Gson;
import com.google.gson.JsonObject;

import io.supertokens.pluginInterface.bulkimport.BulkImportStorage.BULK_IMPORT_USER_STATUS;

public class BulkImportUser {
public String id;
public String externalUserId;
public JsonObject userMetadata;
public List<UserRole> userRoles;
public List<TotpDevice> totpDevices;
public List<LoginMethod> loginMethods;

// Following fields come from the DB Record.
public BULK_IMPORT_USER_STATUS status;
public String primaryUserId;
public String errorMessage;
public Long createdAt;
public Long updatedAt;

private static final Gson gson = new Gson();

public BulkImportUser(String id, String externalUserId, JsonObject userMetadata, List<UserRole> userRoles,
List<TotpDevice> totpDevices, List<LoginMethod> loginMethods) {
this.id = id;
this.externalUserId = externalUserId;
this.userMetadata = userMetadata;
this.userRoles = userRoles;
this.totpDevices = totpDevices;
this.loginMethods = loginMethods;
}

public static BulkImportUser forTesting_fromJson(JsonObject jsonObject) {
return gson.fromJson(jsonObject, BulkImportUser.class);
}

// The bulk_import_users table stores users to be imported via a Cron Job.
// It has a `raw_data` column containing user data in JSON format.

// The BulkImportUser class represents this `raw_data`, including additional fields like `status`, `createdAt`, and `updatedAt`.
// First, we validate all fields of `raw_data` using the BulkImportUser class, then store this data in the bulk_import_users table.
// This function retrieves the `raw_data` after removing the additional fields.
public String toRawDataForDbStorage() {
JsonObject jsonObject = gson.fromJson(new Gson().toJson(this), JsonObject.class);
jsonObject.remove("status");
jsonObject.remove("createdAt");
jsonObject.remove("updatedAt");
return jsonObject.toString();
}

// The bulk_import_users table contains a `raw_data` column with user data in JSON format, along with other columns such as `id`, `status`, `primary_user_id`, and `error_msg` etc.

// When creating an instance of the BulkImportUser class, the extra fields must be passed separately as they are not part of the `raw_data`.
// This function creates a BulkImportUser instance from a stored bulk_import_user entry.
public static BulkImportUser fromRawDataFromDbStorage(String id, String rawData, BULK_IMPORT_USER_STATUS status, String primaryUserId, String errorMessage, long createdAt, long updatedAt) {
BulkImportUser user = gson.fromJson(rawData, BulkImportUser.class);
user.id = id;
user.status = status;
user.primaryUserId = primaryUserId;
user.errorMessage = errorMessage;
user.createdAt = createdAt;
user.updatedAt = updatedAt;
return user;
}

public JsonObject toJsonObject() {
return gson.fromJson(gson.toJson(this), JsonObject.class);
}

public static class UserRole {
public String role;
public List<String> tenantIds;

public UserRole(String role, List<String> tenantIds) {
this.role = role;
this.tenantIds = tenantIds;
}
}

public static class TotpDevice {
public String secretKey;
public int period;
public int skew;
public String deviceName;

public TotpDevice(String secretKey, int period, int skew, String deviceName) {
this.secretKey = secretKey;
this.period = period;
this.skew = skew;
this.deviceName = deviceName;
}
}

public static class LoginMethod {
public List<String> tenantIds;
public boolean isVerified;
public boolean isPrimary;
public long timeJoinedInMSSinceEpoch;
public String recipeId;
public String email;
public String passwordHash;
public String hashingAlgorithm;
public String plainTextPassword;
public String thirdPartyId;
public String thirdPartyUserId;
public String phoneNumber;
public String superTokensUserId;
public String externalUserId;

public String getSuperTokenOrExternalUserId() {
return this.externalUserId != null ? this.externalUserId : this.superTokensUserId;
}

public LoginMethod(List<String> tenantIds, String recipeId, boolean isVerified, boolean isPrimary,
long timeJoinedInMSSinceEpoch, String email, String passwordHash, String hashingAlgorithm, String plainTextPassword,
String thirdPartyId, String thirdPartyUserId, String phoneNumber) {
this.tenantIds = tenantIds;
this.recipeId = recipeId;
this.isVerified = isVerified;
this.isPrimary = isPrimary;
this.timeJoinedInMSSinceEpoch = timeJoinedInMSSinceEpoch;
this.email = email;
this.passwordHash = passwordHash;
this.hashingAlgorithm = hashingAlgorithm;
this.plainTextPassword = plainTextPassword;
this.thirdPartyId = thirdPartyId;
this.thirdPartyUserId = thirdPartyUserId;
this.phoneNumber = phoneNumber;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved.
*
* This software is licensed under the Apache License, Version 2.0 (the
* "License") as published by the Apache Software Foundation.
*
* You may not use this file except in compliance with the License. You may
* obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package io.supertokens.pluginInterface.bulkimport;

import io.supertokens.pluginInterface.multitenancy.TenantIdentifier;

public class ImportUserBase {

public String userId;
public String email;
public TenantIdentifier tenantIdentifier;
public long timeJoinedMSSinceEpoch;

public ImportUserBase(String userId, String email, TenantIdentifier tenantIdentifier, long timeJoinedMSSinceEpoch) {
this.userId = userId; //this will be the supertokens userId.
this.email = email;
this.tenantIdentifier = tenantIdentifier;
this.timeJoinedMSSinceEpoch = timeJoinedMSSinceEpoch;
}
}
Loading
Loading