Skip to content
This repository has been archived by the owner on Oct 3, 2022. It is now read-only.

Fixed #41 Race Condition in JCacheManager #42

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 105 additions & 40 deletions ehcache-jcache/src/main/java/org/ehcache/jcache/JCacheManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,14 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

import javax.cache.Cache;
import javax.cache.CacheException;
Expand Down Expand Up @@ -60,7 +64,7 @@ public class JCacheManager implements javax.cache.CacheManager {
private final CacheManager cacheManager;
private final URI uri;
private final Properties props;
private final ConcurrentHashMap<String, JCache> allCaches = new ConcurrentHashMap<String, JCache>();
private final ConcurrentHashMap<String, Future<JCache>> allCaches = new ConcurrentHashMap<String, Future<JCache>>();
private volatile boolean closed = false;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final ConcurrentMap<JCache, JCacheManagementMXBean> cfgMXBeans = new ConcurrentHashMap<JCache, JCacheManagementMXBean>();
Expand Down Expand Up @@ -100,25 +104,47 @@ public <K, V, C extends Configuration<K, V>> Cache<K, V> createCache(final Strin
if(configuration == null) {
throw new NullPointerException();
}

return getOrPutCacheAtomically(cacheName, new Callable<JCache>() {
@Override
public JCache call() throws Exception {
return createCache0(cacheName, configuration);
}
});
}

private <K, V> JCache<K, V> getOrPutCacheAtomically(String cacheName, Callable<JCache> creator) {
FutureTask<JCache> myFuture = new FutureTask<JCache>(creator);

// Only configure the cache if it is the one that has been added to the map
Future<JCache> previousFuture = allCaches.putIfAbsent(cacheName, myFuture);
if (previousFuture != null) {
throw new CacheException("A cache called " + cacheName + " already exists in this CacheManager");
}

myFuture.run();

JCache<K, V> jCache = allCaches.get(cacheName);
if (jCache != null) {
throw new CacheException();
try {
return myFuture.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new CacheException("This thread has been interrupted while the cache has been configured", e);
} catch (ExecutionException e) {
throw new CacheException("An exception has occurred while configuring the cache", e);
}
}

private <K, V, C extends Configuration<K, V>> JCache<K, V> createCache0(String cacheName, C configuration) {
cacheManager.addCacheIfAbsent(new net.sf.ehcache.Cache(toEhcacheConfig(cacheName, configuration)));
Ehcache ehcache = cacheManager.getEhcache(cacheName);
final JCacheConfiguration<K, V> cfg = new JCacheConfiguration<K, V>(configuration);
jCache = new JCache<K, V>(this, cfg, ehcache);
JCache<K, V> previous = allCaches.putIfAbsent(cacheName, jCache);
if(previous != null) {
// todo validate config
return previous;
}

JCache jCache = new JCache<K, V>(this, cfg, ehcache);
if(cfg.isStatisticsEnabled()) {
enableStatistics(cacheName, true);
enableStatistics(true, jCache);
}
if(cfg.isManagementEnabled()) {
enableManagement(cacheName, true);
enableManagement(true, jCache);
}
return jCache;
}
Expand All @@ -129,25 +155,14 @@ public <K, V> Cache<K, V> getCache(final String cacheName, final Class<K> keyTyp
if(valueType == null) {
throw new NullPointerException();
}
JCache<K, V> jCache = allCaches.get(cacheName);
if(jCache != null) {
if(!keyType.isAssignableFrom(jCache.getConfiguration(CompleteConfiguration.class).getKeyType())) {
throw new ClassCastException();

JCache jCache = getOrPutCacheAtomically(cacheName, new Callable<JCache>() {
@Override
public JCache call() throws Exception {
return getCache0(cacheName, keyType, valueType);
}
if(!valueType.isAssignableFrom(jCache.getConfiguration(CompleteConfiguration.class).getValueType())) {
throw new ClassCastException();
}
return jCache;
}
final net.sf.ehcache.Cache cache = cacheManager.getCache(cacheName);
if (cache == null) {
return null;
}
jCache = new JCache<K, V>(this, new JCacheConfiguration<K, V>(null, null, keyType, valueType), cache);
final JCache<K, V> previous = allCaches.putIfAbsent(cacheName, jCache);
if(previous != null) {
jCache = previous;
}
});

if(!keyType.isAssignableFrom(jCache.getConfiguration(CompleteConfiguration.class).getKeyType())) {
throw new ClassCastException();
}
Expand All @@ -156,13 +171,24 @@ public <K, V> Cache<K, V> getCache(final String cacheName, final Class<K> keyTyp
}
return jCache;
}

private <K, V> JCache<K, V> getCache0(final String cacheName, final Class<K> keyType, final Class<V> valueType) {
final net.sf.ehcache.Cache cache = cacheManager.getCache(cacheName);
if (cache == null) {
// Can't create the, so retract the promise of creating it.
allCaches.remove(cacheName);
return null;
}

return new JCache<K, V>(this, new JCacheConfiguration<K, V>(null, null, keyType, valueType), cache);
}

@Override
public <K, V> Cache<K, V> getCache(final String cacheName) {
final JCache<K, V> jCache = allCaches.get(cacheName);
final JCache<K, V> jCache = getCacheIfExists(cacheName);
if(jCache == null) {
refreshAllCaches();
return allCaches.get(cacheName);
return getCacheIfExists(cacheName);
}
if(jCache.getConfiguration(CompleteConfiguration.class).getKeyType() != Object.class ||
jCache.getConfiguration(CompleteConfiguration.class).getValueType() != Object.class) {
Expand All @@ -171,6 +197,23 @@ public <K, V> Cache<K, V> getCache(final String cacheName) {
return jCache;
}

private JCache getCacheIfExists(final String cacheName) {
Future<JCache> future = allCaches.get(cacheName);
if (future == null) {
return null;
}

try {
return future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new CacheException("This thread has been interrupted while another thread configured the requested cache", e);
} catch (ExecutionException e) {
throw new CacheException("An exception has occurred while another thread was configuring the requested cache", e);
}
}


@Override
public Iterable<String> getCacheNames() {
return Collections.unmodifiableSet(new HashSet<String>(allCaches.keySet()));
Expand All @@ -179,7 +222,7 @@ public Iterable<String> getCacheNames() {
@Override
public void destroyCache(final String cacheName) {
checkNotClosed();
final JCache jCache = allCaches.get(cacheName);
final JCache jCache = getCacheIfExists(cacheName);
if (jCache != null) {
jCache.close();
}
Expand All @@ -189,7 +232,7 @@ public void destroyCache(final String cacheName) {
public void enableManagement(final String cacheName, final boolean enabled) {
checkNotClosed();
if(cacheName == null) throw new NullPointerException();
final JCache jCache = allCaches.get(cacheName);
final JCache jCache = getCacheIfExists(cacheName);
if(jCache == null) {
throw new NullPointerException();
}
Expand Down Expand Up @@ -221,7 +264,7 @@ private void enableManagement(final boolean enabled, final JCache jCache) {
public void enableStatistics(final String cacheName, final boolean enabled) {
checkNotClosed();
if(cacheName == null) throw new NullPointerException();
final JCache jCache = allCaches.get(cacheName);
final JCache jCache = getCacheIfExists(cacheName);
if(jCache == null) {
throw new NullPointerException();
}
Expand Down Expand Up @@ -298,8 +341,14 @@ public void close() {

void shutdown() {
closed = true;
for (JCache jCache : allCaches.values()) {
jCache.close();
for (Future<JCache> future : allCaches.values()) {
// This will wait until caches being added are configured, then remove them
// closed is true, so no new caches can be added when we are here.
try {
future.get().close();
} catch (Exception e) {
// ignore according to the spec of CacheManager#close()
}
}
cacheManager.shutdown();
allCaches.clear();
Expand All @@ -325,7 +374,12 @@ private void refreshAllCaches() {
for (String s : cacheManager.getCacheNames()) {
final net.sf.ehcache.Cache cache = cacheManager.getCache(s);
if(cache != null) {
allCaches.put(s, new JCache(this, new JCacheConfiguration(cache.getCacheConfiguration()), cache));
allCaches.putIfAbsent(s, new FutureTask<JCache>(new Callable<JCache>() {
@Override
public JCache call() throws Exception {
return new JCache(JCacheManager.this, new JCacheConfiguration(cache.getCacheConfiguration()), cache);
}
}));
}
}
}
Expand Down Expand Up @@ -353,8 +407,19 @@ private void checkNotClosed() {
}

void shutdown(final JCache jCache) {
final JCache r = allCaches.remove(jCache.getName());
if (r == jCache) {
final Future<JCache> removedFuture = allCaches.remove(jCache.getName());
JCache removedCache;
try {
removedCache = removedFuture.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} catch (ExecutionException e) {
// the cache failed to configure, but it should be shut down anyway, so just ignore it
// The thread that configured the cache will get the exception of course, so it is not lost
return;
}
if (removedCache == jCache) {
enableStatistics(false, jCache);
enableManagement(false, jCache);
cacheManager.removeCache(jCache.getName());
Expand Down