From abc4aedcad76f3b3b3b98d7feec8f66605ffb505 Mon Sep 17 00:00:00 2001 From: Ryan Slominski Date: Mon, 12 Feb 2024 17:29:57 -0500 Subject: [PATCH] Acquire locks with timeout (#13) --- .../jlab/epics2web/epics/ChannelManager.java | 163 +++++++++--------- .../websocket/WebSocketSessionManager.java | 30 +++- .../util/LockAcquisitionTimeoutException.java | 7 + 3 files changed, 111 insertions(+), 89 deletions(-) create mode 100644 src/main/java/org/jlab/util/LockAcquisitionTimeoutException.java diff --git a/src/main/java/org/jlab/epics2web/epics/ChannelManager.java b/src/main/java/org/jlab/epics2web/epics/ChannelManager.java index 3738559..140b978 100644 --- a/src/main/java/org/jlab/epics2web/epics/ChannelManager.java +++ b/src/main/java/org/jlab/epics2web/epics/ChannelManager.java @@ -6,6 +6,8 @@ import gov.aps.jca.TimeoutException; import gov.aps.jca.dbr.DBR; import gov.aps.jca.dbr.DBRType; +import org.jlab.util.LockAcquisitionTimeoutException; + import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -17,9 +19,10 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; -import java.util.stream.Collectors; import javax.json.JsonObjectBuilder; public class ChannelManager { @@ -39,6 +42,12 @@ public class ChannelManager { private final ScheduledExecutorService timeoutExecutor; private final ExecutorService callbackExecutor; + private final ReentrantLock clientLock = new ReentrantLock(); + private final ReentrantLock monitorLock = new ReentrantLock(); + + private final long ACQUIRE_RESOURCE_TIMEOUT_SECONDS = 5; + private final long CLEANUP_RESOURCE_TIMEOUT_SECONDS = 60; + /** * Create a new ChannelMonitorManager. * @@ -52,35 +61,6 @@ public ChannelManager(CAJContext context, ScheduledExecutorService timeoutExecut this.callbackExecutor = callbackExecutor; } - public void reset(CAJContext context) { - try { - this.context.destroy(); // Destroy old context - } catch (Exception e) { // IllegalStateException or CAException or whatever - LOGGER.log(Level.SEVERE, "Unable to destroy context with unresponsive virtual circuit", e); - } - - synchronized (monitorMap) { - monitorMap.clear(); - this.context = context; // Assign new context - } - - Map> old; - - synchronized (clientMap) { - old = new HashMap<>(clientMap); - for (PvListener listener : old.keySet()) { - clientMap.put(listener, Collections.emptySet()); - } - } - - for (PvListener listener : old.keySet()) { - Set pvs = old.get(listener); - String listOPvs = pvs.stream().collect(Collectors.joining(" ")); - //addPvs(listener, pvs); - LOGGER.log(Level.INFO, "Client: {0}, PVs: {1}", new Object[]{listener, listOPvs}); - } - } - public void addValueToJSON(JsonObjectBuilder builder, DBR dbr) { try { if (dbr.isDOUBLE()) { @@ -197,19 +177,6 @@ private DBR doGet(CAJChannel channel, boolean enumLabel) throws CAException { return dbr; } - /** - * Registers a PV monitor on the supplied PV for the given listener. - * Equivalent to calling addPvs with a set of one PV. - * - * @param listener The PvListener - * @param pv The EPICS PV name - */ - public void addPv(PvListener listener, String pv) { - HashSet pvSet = new HashSet<>(); - pvSet.add(pv); - addPvs(listener, pvSet); - } - /** * Registers PV monitors on the supplied PVs for the given listener. Note * that internally only a single monitor is used for any given PV. PVs for @@ -220,7 +187,7 @@ public void addPv(PvListener listener, String pv) { * @param listener The PvListener to receive notifications * @param addPvSet The set of PVs to monitor */ - public void addPvs(PvListener listener, Set addPvSet) { + public void addPvs(PvListener listener, Set addPvSet) throws InterruptedException, CAException, LockAcquisitionTimeoutException { Set newPvSet = new HashSet<>(); if (addPvSet != null) { @@ -236,21 +203,24 @@ public void addPvs(PvListener listener, Set addPvSet) { for (String pv : addPvSet) { //LOGGER.log(Level.FINEST, "addListener pv: {0}; pv: {1}", new Object[]{session, pv}); - ChannelMonitor monitor; - synchronized (monitorMap) { - monitor = monitorMap.get(pv); + ChannelMonitor monitor = null; + + if(monitorLock.tryLock(ACQUIRE_RESOURCE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + try { + monitor = monitorMap.get(pv); - if (monitor == null) { - //LOGGER.log(Level.FINEST, "Opening ChannelMonitor: {0}", pv); - try { + if (monitor == null) { + //LOGGER.log(Level.FINEST, "Opening ChannelMonitor: {0}", pv); monitor = new ChannelMonitor(pv, context, timeoutExecutor, callbackExecutor); monitorMap.put(pv, monitor); - } catch (CAException e) { - LOGGER.log(Level.WARNING, "Unable to create channel monitor; skipping", e); + } else { + //LOGGER.log(Level.FINEST, "Joining ChannelMonitor: {0}", pv); } - } else { - //LOGGER.log(Level.FINEST, "Joining ChannelMonitor: {0}", pv); + } finally { + monitorLock.unlock(); } + } else { + throw new LockAcquisitionTimeoutException("Timeout while acquiring monitorLock in addPvs"); } if (monitor != null) { @@ -259,14 +229,20 @@ public void addPvs(PvListener listener, Set addPvSet) { } } - synchronized (clientMap) { - Set oldPvSet = clientMap.get(listener); + if(clientLock.tryLock(ACQUIRE_RESOURCE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + try { + Set oldPvSet = clientMap.get(listener); + + if (oldPvSet != null) { + newPvSet.addAll(oldPvSet); + } - if (oldPvSet != null) { - newPvSet.addAll(oldPvSet); + clientMap.put(listener, newPvSet); + } finally { + clientLock.unlock(); } - - clientMap.put(listener, newPvSet); + } else { + throw new LockAcquisitionTimeoutException("Timeout while acquiring clientLock in addPvs"); } } @@ -276,19 +252,25 @@ public void addPvs(PvListener listener, Set addPvSet) { * @param listener The PvListener * @param clearPvSet The PV set to clear */ - public void clearPvs(PvListener listener, Set clearPvSet) { + public void clearPvs(PvListener listener, Set clearPvSet) throws InterruptedException, LockAcquisitionTimeoutException { Set newPvSet; - synchronized (clientMap) { - Set oldPvSet = clientMap.get(listener); + if(clientLock.tryLock(CLEANUP_RESOURCE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + try { + Set oldPvSet = clientMap.get(listener); - if (oldPvSet != null) { - newPvSet = new HashSet<>(oldPvSet); - newPvSet.removeAll(clearPvSet); - } else { - newPvSet = new HashSet<>(); + if (oldPvSet != null) { + newPvSet = new HashSet<>(oldPvSet); + newPvSet.removeAll(clearPvSet); + } else { + newPvSet = new HashSet<>(); + } + clientMap.put(listener, newPvSet); + } finally { + clientLock.unlock(); } - clientMap.put(listener, newPvSet); + } else { + throw new LockAcquisitionTimeoutException("Timeout while acquiring clientLock in addPvs"); } removeFromChannels(listener, clearPvSet); @@ -304,16 +286,21 @@ public void clearPvs(PvListener listener, Set clearPvSet) { * * @param listener The PvListener */ - public void addListener(PvListener listener) { + public void addListener(PvListener listener) throws InterruptedException, LockAcquisitionTimeoutException { + if(clientLock.tryLock(ACQUIRE_RESOURCE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + try { + Set pvSet = clientMap.get(listener); - synchronized (clientMap) { - Set pvSet = clientMap.get(listener); + if (pvSet == null) { + pvSet = new HashSet<>(); + } - if (pvSet == null) { - pvSet = new HashSet<>(); + clientMap.put(listener, pvSet); + } finally { + clientLock.unlock(); } - - clientMap.put(listener, pvSet); + } else { + throw new LockAcquisitionTimeoutException("Timeout while acquiring clientLock in addListener"); } } @@ -322,9 +309,9 @@ public void addListener(PvListener listener) { * channel then closes the channel. * * @param listener The PvListener - * @param pvList The PV list (and indirectly the channel list) + * @param pvSet The PV list (and indirectly the channel list) */ - private void removeFromChannels(PvListener listener, Set pvSet) { + private void removeFromChannels(PvListener listener, Set pvSet) throws InterruptedException, LockAcquisitionTimeoutException { if (pvSet != null) { // Some clients don't immediately connect to a pv so have an empty pv list for (String pv : pvSet) { int listenerCount = 0; @@ -335,15 +322,21 @@ private void removeFromChannels(PvListener listener, Set pvSet) { monitor.removeListener(listener); } - synchronized (monitorMap) { - monitor = monitorMap.get(pv); + if(monitorLock.tryLock(CLEANUP_RESOURCE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + try { + monitor = monitorMap.get(pv); - if (monitor != null) { - listenerCount = monitor.getListenerCount(); - if (listenerCount == 0) { - monitorMap.remove(pv); + if (monitor != null) { + listenerCount = monitor.getListenerCount(); + if (listenerCount == 0) { + monitorMap.remove(pv); + } } + } finally { + monitorLock.unlock(); } + } else { + throw new LockAcquisitionTimeoutException("Timeout while acquiring monitorLock in removeFromChannels"); } // We call close without holding a lock @@ -364,7 +357,7 @@ private void removeFromChannels(PvListener listener, Set pvSet) { * * @param listener The PvListener */ - public void removeListener(PvListener listener) { + public void removeListener(PvListener listener) throws LockAcquisitionTimeoutException, InterruptedException { //LOGGER.log(Level.FINEST, "removeListener: {0}", session); Set pvSet = clientMap.remove(listener); diff --git a/src/main/java/org/jlab/epics2web/websocket/WebSocketSessionManager.java b/src/main/java/org/jlab/epics2web/websocket/WebSocketSessionManager.java index 8c75c53..5516a1f 100644 --- a/src/main/java/org/jlab/epics2web/websocket/WebSocketSessionManager.java +++ b/src/main/java/org/jlab/epics2web/websocket/WebSocketSessionManager.java @@ -1,5 +1,6 @@ package org.jlab.epics2web.websocket; +import gov.aps.jca.CAException; import gov.aps.jca.dbr.DBR; import gov.aps.jca.dbr.DBRType; import java.io.IOException; @@ -20,6 +21,7 @@ import javax.websocket.Session; import org.jlab.epics2web.Application; import org.jlab.epics2web.epics.PvListener; +import org.jlab.util.LockAcquisitionTimeoutException; /** * Manages web socket sessions and ties them to channel access monitors. @@ -148,7 +150,12 @@ public void recordInteractionDate(Session session) { public void addClient(Session session) { WebSocketSessionMonitor listener = getListener(session); - Application.channelManager.addListener(listener); + try { + Application.channelManager.addListener(listener); + } catch (InterruptedException | LockAcquisitionTimeoutException e) { + LOGGER.log(Level.WARNING, "Unable to addClient: " + session.getId(), e); + // TODO: Retry? + } } /** @@ -162,7 +169,12 @@ public void removeClient(Session session) { if (listener != null) { listenerMap.remove(session); - Application.channelManager.removeListener(listener); + try { + Application.channelManager.removeListener(listener); + } catch (LockAcquisitionTimeoutException | InterruptedException e) { + LOGGER.log(Level.WARNING, "Unable to removeClient: " + session.getId(), e); + // TODO: Retry? + } } } @@ -175,7 +187,12 @@ public void removeClient(Session session) { public void addPvs(Session session, Set pvSet) { WebSocketSessionMonitor listener = getListener(session); - Application.channelManager.addPvs(listener, pvSet); + try { + Application.channelManager.addPvs(listener, pvSet); + } catch (InterruptedException | CAException | LockAcquisitionTimeoutException e) { + LOGGER.log(Level.WARNING, "Unable to addPvs: " + String.join(",", pvSet == null ? new HashSet<>() : pvSet), e); + // TODO: Retry? + } } /** @@ -187,7 +204,12 @@ public void addPvs(Session session, Set pvSet) { public void clearPvs(Session session, Set pvSet) { WebSocketSessionMonitor listener = getListener(session); - Application.channelManager.clearPvs(listener, pvSet); + try { + Application.channelManager.clearPvs(listener, pvSet); + } catch (InterruptedException | LockAcquisitionTimeoutException e) { + LOGGER.log(Level.WARNING, "Unable to clearPvs: " + String.join(",", pvSet == null ? new HashSet<>() : pvSet), e); + // TODO: Retry? + } } /** diff --git a/src/main/java/org/jlab/util/LockAcquisitionTimeoutException.java b/src/main/java/org/jlab/util/LockAcquisitionTimeoutException.java new file mode 100644 index 0000000..febf0e6 --- /dev/null +++ b/src/main/java/org/jlab/util/LockAcquisitionTimeoutException.java @@ -0,0 +1,7 @@ +package org.jlab.util; + +public class LockAcquisitionTimeoutException extends Exception { + public LockAcquisitionTimeoutException(String message) { + super(message); + } +}