Skip to content

Commit

Permalink
removed cache and using filter
Browse files Browse the repository at this point in the history
  • Loading branch information
patduin committed Nov 17, 2023
1 parent dc2b8b3 commit cacba6b
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 70 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
### Fixed
* Added lombok
* Fixed test cases
* Fixed issue where the primare metastore was not applying the allow filter to validate database clashes from other metastores.

## [3.9.5] - TBD
### Changed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;

import javax.validation.constraints.NotNull;
Expand All @@ -42,13 +39,8 @@

import lombok.extern.log4j.Log4j2;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableList.Builder;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import com.hotels.bdp.waggledance.api.WaggleDanceException;
import com.hotels.bdp.waggledance.api.model.AbstractMetaStore;
Expand All @@ -69,9 +61,7 @@

@Log4j2
public class StaticDatabaseMappingService implements MappingEventListener {
private static final String PRIMARY_KEY = "";
private final MetaStoreMappingFactory metaStoreMappingFactory;
private final LoadingCache<String, List<String>> primaryDatabasesCache;
private final Map<String, DatabaseMapping> mappingsByMetaStoreName;
private final Map<String, DatabaseMapping> mappingsByDatabaseName;
private final Map<String, List<String>> databaseMappingToDatabaseList;
Expand All @@ -85,22 +75,6 @@ public StaticDatabaseMappingService(
QueryMapping queryMapping) {
this.metaStoreMappingFactory = metaStoreMappingFactory;
this.queryMapping = queryMapping;
primaryDatabasesCache = CacheBuilder
.newBuilder()
.expireAfterAccess(1, TimeUnit.MINUTES)
.maximumSize(1)
.build(new CacheLoader<String, List<String>>() {

@Override
public List<String> load(String key) throws Exception {
if (primaryDatabaseMapping != null) {
return primaryDatabaseMapping.getClient().get_all_databases();
} else {
return Lists.newArrayList();
}
}
});

mappingsByMetaStoreName = Collections.synchronizedMap(new LinkedHashMap<>());
mappingsByDatabaseName = Collections.synchronizedMap(new LinkedHashMap<>());
databaseMappingToDatabaseList = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -131,12 +105,9 @@ private void add(AbstractMetaStore metaStore) {
validateMappableDatabases(mappableDatabases, metaStore);

if (metaStore.getFederationType() == PRIMARY) {
validatePrimaryMetastoreDatabases(mappableDatabases);
primaryDatabaseMapping = databaseMapping;
primaryDatabasesCache.invalidateAll();
} else {
validateFederatedMetastoreDatabases(mappableDatabases, metaStoreMapping);
}
validateMetastoreDatabases(mappableDatabases, metaStoreMapping);

mappingsByMetaStoreName.put(metaStoreMapping.getMetastoreMappingName(), databaseMapping);
addDatabaseMappings(mappableDatabases, databaseMapping);
Expand All @@ -155,42 +126,16 @@ private void validateMappableDatabases(List<String> mappableDatabases, AbstractM
}
}

private void validatePrimaryMetastoreDatabases(List<String> databases) {
private void validateMetastoreDatabases(List<String> databases, MetaStoreMapping metaStoreMapping) {
for (String database : databases) {
if (mappingsByDatabaseName.containsKey(database)) {
if (mappingsByDatabaseName.containsKey(database.toLowerCase(Locale.ROOT))) {
throw new WaggleDanceException("Database clash, found '"
+ database
+ "' in primary that was already mapped to a federated metastore '"
+ mappingsByDatabaseName.get(database).getMetastoreMappingName()
+ "', please remove the database from the federated metastore list it can't be"
+ " accessed via Waggle Dance");
}
}
}

private void validateFederatedMetastoreDatabases(List<String> mappableDatabases, MetaStoreMapping metaStoreMapping) {
try {
Set<String> allPrimaryDatabases = Sets.newHashSet(primaryDatabasesCache.get(PRIMARY_KEY));
for (String database : mappableDatabases) {
if (allPrimaryDatabases.contains(database.toLowerCase(Locale.ROOT))) {
throw new WaggleDanceException("Database clash, found '"
+ database
+ "' to be mapped for the federated metastore '"
+ metaStoreMapping.getMetastoreMappingName()
+ "' already present in the primary database, please remove the database from the list it can't be"
+ " accessed via Waggle Dance");
}
if (mappingsByDatabaseName.containsKey(database.toLowerCase(Locale.ROOT))) {
throw new WaggleDanceException("Database clash, found '"
+ database
+ "' to be mapped for the federated metastore '"
+ metaStoreMapping.getMetastoreMappingName()
+ "' already present in another federated database, please remove the database from the list it can't"
+ " be accessed via Waggle Dance");
}
+ "' to be mapped for the federated metastore '"
+ metaStoreMapping.getMetastoreMappingName()
+ "' already present in another federated metastore, please remove the database from the list it can't"
+ " be accessed via Waggle Dance");
}
} catch (ExecutionException e) {
throw new WaggleDanceException("Can't validate database clashes", e.getCause());
}
}

Expand Down Expand Up @@ -228,7 +173,6 @@ private DatabaseMapping createDatabaseMapping(MetaStoreMapping metaStoreMapping)
private void remove(AbstractMetaStore metaStore) {
if (metaStore.getFederationType() == PRIMARY) {
primaryDatabaseMapping = null;
primaryDatabasesCache.invalidateAll();
}

DatabaseMapping removed = mappingsByMetaStoreName.remove(metaStore.getName());
Expand Down Expand Up @@ -306,8 +250,8 @@ public DatabaseMapping databaseMapping(@NotNull String databaseName) throws NoSu
}

@Override
public void checkTableAllowed(String databaseName, String tableName,
DatabaseMapping mapping) throws NoSuchObjectException {
public void checkTableAllowed(String databaseName, String tableName, DatabaseMapping mapping)
throws NoSuchObjectException {
databaseName = GrammarUtils.removeCatName(databaseName);
if (!isTableAllowed(databaseName, tableName)) {
throw new NoSuchObjectException(String.format("%s.%s table not found in any mappings", databaseName, tableName));
Expand All @@ -319,7 +263,7 @@ public List<String> filterTables(String databaseName, List<String> tableNames, D
List<String> allowedTables = new ArrayList<>();
databaseName = GrammarUtils.removeCatName(databaseName);
String db = databaseName.toLowerCase(Locale.ROOT);
for (String table: tableNames)
for (String table : tableNames)
if (isTableAllowed(db, table)) {
allowedTables.add(table);
}
Expand Down Expand Up @@ -368,8 +312,8 @@ public PanopticOperationHandler getPanopticOperationHandler() {
@Override
public List<TableMeta> getTableMeta(String db_patterns, String tbl_patterns, List<String> tbl_types) {

BiFunction<TableMeta, DatabaseMapping, Boolean> filter = (tableMeta, mapping) ->
databaseAndTableAllowed(tableMeta.getDbName(), tableMeta.getTableName(), mapping);
BiFunction<TableMeta, DatabaseMapping, Boolean> filter = (tableMeta, mapping) -> databaseAndTableAllowed(
tableMeta.getDbName(), tableMeta.getTableName(), mapping);

Map<DatabaseMapping, String> mappingsForPattern = new LinkedHashMap<>();
for (DatabaseMapping mapping : getAvailableDatabaseMappings()) {
Expand All @@ -384,7 +328,7 @@ public List<String> getAllDatabases(String pattern) {
.containsKey(database);

BiFunction<String, DatabaseMapping, Boolean> filter1 = (database, mapping) -> filter.apply(database, mapping)
&& databaseMappingToDatabaseList.get(mapping.getMetastoreMappingName()).contains(database);
&& databaseMappingToDatabaseList.get(mapping.getMetastoreMappingName()).contains(database);

Map<DatabaseMapping, String> mappingsForPattern = new LinkedHashMap<>();
for (DatabaseMapping mapping : getAllDatabaseMappings()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private FederatedMetaStore newFederatedInstanceWithClient(
when(federatedDatabaseClient.get_all_databases()).thenReturn(allLowerCased);
return newMetastore;
}

@Test
public void databaseMappingPrimary() throws NoSuchObjectException {
DatabaseMapping databaseMapping = service.databaseMapping(PRIMARY_DB);
Expand Down Expand Up @@ -206,6 +206,35 @@ public void validatePrimaryMetaStoreClashThrowsException() throws TException {
service = new StaticDatabaseMappingService(metaStoreMappingFactory,
Arrays.asList(federatedMetastore, primaryMetastore), queryMapping);
}


@Test
public void validateNoPrimaryMetaStoreClashWhenMapped() throws TException {
federatedMetastore = newFederatedInstanceWithClient(FEDERATED_NAME, URI, Lists.newArrayList("db"), true);

primaryMetastore.setMappedDatabases(Lists.newArrayList("mapped_db"));
metaStoreMappingPrimary = mockNewMapping(true, primaryMetastore);
when(metaStoreMappingPrimary.getClient()).thenReturn(primaryDatabaseClient);
when(primaryDatabaseClient.get_all_databases()).thenReturn(Lists.newArrayList("db", "mapped_db"));
when(metaStoreMappingFactory.newInstance(primaryMetastore)).thenReturn(metaStoreMappingPrimary);

service = new StaticDatabaseMappingService(metaStoreMappingFactory,
Arrays.asList(primaryMetastore, federatedMetastore), queryMapping);
}

@Test
public void validateNoPrimaryMetaStoreClashWhenMappedPrimarySpecifiedLast() throws TException {
federatedMetastore = newFederatedInstanceWithClient(FEDERATED_NAME, URI, Lists.newArrayList("db"), true);

primaryMetastore.setMappedDatabases(Lists.newArrayList("mapped_db"));
metaStoreMappingPrimary = mockNewMapping(true, primaryMetastore);
when(metaStoreMappingPrimary.getClient()).thenReturn(primaryDatabaseClient);
when(primaryDatabaseClient.get_all_databases()).thenReturn(Lists.newArrayList("db", "mapped_db"));
when(metaStoreMappingFactory.newInstance(primaryMetastore)).thenReturn(metaStoreMappingPrimary);

service = new StaticDatabaseMappingService(metaStoreMappingFactory,
Arrays.asList(federatedMetastore, primaryMetastore), queryMapping);
}

@Test(expected = WaggleDanceException.class)
public void onRegisterPrimaryThrowsExceptionDueToExistingPrimary() {
Expand Down

0 comments on commit cacba6b

Please sign in to comment.