Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

removed cache and using filter #302

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
### Fixed
* Added lombok
* Fixed test cases
* Fixed issue where the primare metastore was not applying the allow filter to validate database clashes from other metastores.
abhimanyugupta07 marked this conversation as resolved.
Show resolved Hide resolved

## [3.9.5] - TBD
### Changed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;

import javax.validation.constraints.NotNull;
Expand All @@ -42,13 +39,8 @@

import lombok.extern.log4j.Log4j2;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableList.Builder;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import com.hotels.bdp.waggledance.api.WaggleDanceException;
import com.hotels.bdp.waggledance.api.model.AbstractMetaStore;
Expand All @@ -69,9 +61,7 @@

@Log4j2
public class StaticDatabaseMappingService implements MappingEventListener {
private static final String PRIMARY_KEY = "";
private final MetaStoreMappingFactory metaStoreMappingFactory;
private final LoadingCache<String, List<String>> primaryDatabasesCache;
private final Map<String, DatabaseMapping> mappingsByMetaStoreName;
private final Map<String, DatabaseMapping> mappingsByDatabaseName;
private final Map<String, List<String>> databaseMappingToDatabaseList;
Expand All @@ -85,22 +75,6 @@ public StaticDatabaseMappingService(
QueryMapping queryMapping) {
this.metaStoreMappingFactory = metaStoreMappingFactory;
this.queryMapping = queryMapping;
primaryDatabasesCache = CacheBuilder
.newBuilder()
.expireAfterAccess(1, TimeUnit.MINUTES)
.maximumSize(1)
.build(new CacheLoader<String, List<String>>() {

@Override
public List<String> load(String key) throws Exception {
if (primaryDatabaseMapping != null) {
return primaryDatabaseMapping.getClient().get_all_databases();
} else {
return Lists.newArrayList();
}
}
});

mappingsByMetaStoreName = Collections.synchronizedMap(new LinkedHashMap<>());
mappingsByDatabaseName = Collections.synchronizedMap(new LinkedHashMap<>());
databaseMappingToDatabaseList = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -131,12 +105,9 @@ private void add(AbstractMetaStore metaStore) {
validateMappableDatabases(mappableDatabases, metaStore);

if (metaStore.getFederationType() == PRIMARY) {
validatePrimaryMetastoreDatabases(mappableDatabases);
primaryDatabaseMapping = databaseMapping;
primaryDatabasesCache.invalidateAll();
} else {
validateFederatedMetastoreDatabases(mappableDatabases, metaStoreMapping);
}
validateMetastoreDatabases(mappableDatabases, metaStoreMapping);

mappingsByMetaStoreName.put(metaStoreMapping.getMetastoreMappingName(), databaseMapping);
addDatabaseMappings(mappableDatabases, databaseMapping);
Expand All @@ -155,42 +126,16 @@ private void validateMappableDatabases(List<String> mappableDatabases, AbstractM
}
}

private void validatePrimaryMetastoreDatabases(List<String> databases) {
private void validateMetastoreDatabases(List<String> databases, MetaStoreMapping metaStoreMapping) {
for (String database : databases) {
if (mappingsByDatabaseName.containsKey(database)) {
if (mappingsByDatabaseName.containsKey(database.toLowerCase(Locale.ROOT))) {
throw new WaggleDanceException("Database clash, found '"
+ database
+ "' in primary that was already mapped to a federated metastore '"
+ mappingsByDatabaseName.get(database).getMetastoreMappingName()
+ "', please remove the database from the federated metastore list it can't be"
+ " accessed via Waggle Dance");
}
}
}

private void validateFederatedMetastoreDatabases(List<String> mappableDatabases, MetaStoreMapping metaStoreMapping) {
try {
Set<String> allPrimaryDatabases = Sets.newHashSet(primaryDatabasesCache.get(PRIMARY_KEY));
for (String database : mappableDatabases) {
if (allPrimaryDatabases.contains(database.toLowerCase(Locale.ROOT))) {
throw new WaggleDanceException("Database clash, found '"
+ database
+ "' to be mapped for the federated metastore '"
+ metaStoreMapping.getMetastoreMappingName()
+ "' already present in the primary database, please remove the database from the list it can't be"
+ " accessed via Waggle Dance");
}
if (mappingsByDatabaseName.containsKey(database.toLowerCase(Locale.ROOT))) {
throw new WaggleDanceException("Database clash, found '"
+ database
+ "' to be mapped for the federated metastore '"
+ metaStoreMapping.getMetastoreMappingName()
+ "' already present in another federated database, please remove the database from the list it can't"
+ " be accessed via Waggle Dance");
}
+ "' to be mapped for the federated metastore '"
+ metaStoreMapping.getMetastoreMappingName()
+ "' already present in another federated metastore, please remove the database from the list it can't"
+ " be accessed via Waggle Dance");
}
} catch (ExecutionException e) {
throw new WaggleDanceException("Can't validate database clashes", e.getCause());
}
}

Expand Down Expand Up @@ -228,7 +173,6 @@ private DatabaseMapping createDatabaseMapping(MetaStoreMapping metaStoreMapping)
private void remove(AbstractMetaStore metaStore) {
if (metaStore.getFederationType() == PRIMARY) {
primaryDatabaseMapping = null;
primaryDatabasesCache.invalidateAll();
}

DatabaseMapping removed = mappingsByMetaStoreName.remove(metaStore.getName());
Expand Down Expand Up @@ -306,8 +250,8 @@ public DatabaseMapping databaseMapping(@NotNull String databaseName) throws NoSu
}

@Override
public void checkTableAllowed(String databaseName, String tableName,
DatabaseMapping mapping) throws NoSuchObjectException {
public void checkTableAllowed(String databaseName, String tableName, DatabaseMapping mapping)
throws NoSuchObjectException {
databaseName = GrammarUtils.removeCatName(databaseName);
if (!isTableAllowed(databaseName, tableName)) {
throw new NoSuchObjectException(String.format("%s.%s table not found in any mappings", databaseName, tableName));
Expand All @@ -319,7 +263,7 @@ public List<String> filterTables(String databaseName, List<String> tableNames, D
List<String> allowedTables = new ArrayList<>();
databaseName = GrammarUtils.removeCatName(databaseName);
String db = databaseName.toLowerCase(Locale.ROOT);
for (String table: tableNames)
for (String table : tableNames)
if (isTableAllowed(db, table)) {
allowedTables.add(table);
}
Expand Down Expand Up @@ -368,8 +312,8 @@ public PanopticOperationHandler getPanopticOperationHandler() {
@Override
public List<TableMeta> getTableMeta(String db_patterns, String tbl_patterns, List<String> tbl_types) {

BiFunction<TableMeta, DatabaseMapping, Boolean> filter = (tableMeta, mapping) ->
databaseAndTableAllowed(tableMeta.getDbName(), tableMeta.getTableName(), mapping);
BiFunction<TableMeta, DatabaseMapping, Boolean> filter = (tableMeta, mapping) -> databaseAndTableAllowed(
tableMeta.getDbName(), tableMeta.getTableName(), mapping);

Map<DatabaseMapping, String> mappingsForPattern = new LinkedHashMap<>();
for (DatabaseMapping mapping : getAvailableDatabaseMappings()) {
Expand All @@ -384,7 +328,7 @@ public List<String> getAllDatabases(String pattern) {
.containsKey(database);

BiFunction<String, DatabaseMapping, Boolean> filter1 = (database, mapping) -> filter.apply(database, mapping)
&& databaseMappingToDatabaseList.get(mapping.getMetastoreMappingName()).contains(database);
&& databaseMappingToDatabaseList.get(mapping.getMetastoreMappingName()).contains(database);

Map<DatabaseMapping, String> mappingsForPattern = new LinkedHashMap<>();
for (DatabaseMapping mapping : getAllDatabaseMappings()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private FederatedMetaStore newFederatedInstanceWithClient(
when(federatedDatabaseClient.get_all_databases()).thenReturn(allLowerCased);
return newMetastore;
}

@Test
public void databaseMappingPrimary() throws NoSuchObjectException {
DatabaseMapping databaseMapping = service.databaseMapping(PRIMARY_DB);
Expand Down Expand Up @@ -206,6 +206,35 @@ public void validatePrimaryMetaStoreClashThrowsException() throws TException {
service = new StaticDatabaseMappingService(metaStoreMappingFactory,
Arrays.asList(federatedMetastore, primaryMetastore), queryMapping);
}


@Test
public void validateNoPrimaryMetaStoreClashWhenMapped() throws TException {
federatedMetastore = newFederatedInstanceWithClient(FEDERATED_NAME, URI, Lists.newArrayList("db"), true);

primaryMetastore.setMappedDatabases(Lists.newArrayList("mapped_db"));
metaStoreMappingPrimary = mockNewMapping(true, primaryMetastore);
when(metaStoreMappingPrimary.getClient()).thenReturn(primaryDatabaseClient);
when(primaryDatabaseClient.get_all_databases()).thenReturn(Lists.newArrayList("db", "mapped_db"));
when(metaStoreMappingFactory.newInstance(primaryMetastore)).thenReturn(metaStoreMappingPrimary);

service = new StaticDatabaseMappingService(metaStoreMappingFactory,
Arrays.asList(primaryMetastore, federatedMetastore), queryMapping);
}

@Test
public void validateNoPrimaryMetaStoreClashWhenMappedPrimarySpecifiedLast() throws TException {
federatedMetastore = newFederatedInstanceWithClient(FEDERATED_NAME, URI, Lists.newArrayList("db"), true);

primaryMetastore.setMappedDatabases(Lists.newArrayList("mapped_db"));
metaStoreMappingPrimary = mockNewMapping(true, primaryMetastore);
when(metaStoreMappingPrimary.getClient()).thenReturn(primaryDatabaseClient);
when(primaryDatabaseClient.get_all_databases()).thenReturn(Lists.newArrayList("db", "mapped_db"));
when(metaStoreMappingFactory.newInstance(primaryMetastore)).thenReturn(metaStoreMappingPrimary);

service = new StaticDatabaseMappingService(metaStoreMappingFactory,
Arrays.asList(federatedMetastore, primaryMetastore), queryMapping);
}

@Test(expected = WaggleDanceException.class)
public void onRegisterPrimaryThrowsExceptionDueToExistingPrimary() {
Expand Down
Loading