diff --git a/leshan-server-redis/src/main/java/org/eclipse/leshan/server/redis/RedisRegistrationStore.java b/leshan-server-redis/src/main/java/org/eclipse/leshan/server/redis/RedisRegistrationStore.java index 540b77f07d..b844a6e3f6 100644 --- a/leshan-server-redis/src/main/java/org/eclipse/leshan/server/redis/RedisRegistrationStore.java +++ b/leshan-server-redis/src/main/java/org/eclipse/leshan/server/redis/RedisRegistrationStore.java @@ -27,9 +27,11 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; +import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -67,29 +69,22 @@ * A RegistrationStore which stores registrations and observations in Redis. */ public class RedisRegistrationStore implements RegistrationStore, Startable, Stoppable, Destroyable { - - /** Default time in seconds between 2 cleaning tasks (used to remove expired registration). */ - public static final long DEFAULT_CLEAN_PERIOD = 60; - public static final int DEFAULT_CLEAN_LIMIT = 500; - /** Defaut Extra time for registration lifetime in seconds */ - public static final long DEFAULT_GRACE_PERIOD = 0; - private static final Logger LOG = LoggerFactory.getLogger(RedisRegistrationStore.class); // Redis key prefixes - private static final String REG_EP = "REG:EP:"; // (Endpoint => Registration) - private static final String REG_EP_REGID_IDX = "EP:REGID:"; // secondary index key (Registration ID => Endpoint) - private static final String REG_EP_ADDR_IDX = "EP:ADDR:"; // secondary index key (Socket Address => Endpoint) - private static final String REG_EP_IDENTITY = "EP:IDENTITY:"; // secondary index key (Identity => Endpoint) - private static final String LOCK_EP = "LOCK:EP:"; - private static final byte[] OBS_TKN = "OBS:TKN:".getBytes(UTF_8); - private static final String OBS_TKNS_REGID_IDX = "TKNS:REGID:"; // secondary index (token list by registration) - private static final byte[] EXP_EP = "EXP:EP".getBytes(UTF_8); // a sorted set used for registration expiration - // (expiration date, Endpoint) + private final String registrationByEndpointPrefix; // (Endpoint => Registration) + private final String endpointByRegistrationIdPrefix; // secondary index key (Registration ID => Endpoint) + private final String endpointBySocketAddressPrefix; // secondary index key (Socket Address => Endpoint) + private final String endpointByIdentityPrefix; // secondary index key (Identity => Endpoint) + private final String endpointLockPrefix; + private final byte[] observationTokenPrefix; + private final String observationTokensByRegistrationIdPrefix; // secondary index (Registration => Token list) + private final byte[] endpointExpirationKey; // a sorted set used for registration expiration (expiration date, + // Endpoint) private final Pool pool; - // Listener use to notify when a registration expires + // Listener used to notify about a registration expiration private ExpirationListener expirationListener; private final ScheduledExecutorService schedExecutor; @@ -104,35 +99,25 @@ public class RedisRegistrationStore implements RegistrationStore, Startable, Sto private final RegistrationSerDes registrationSerDes; public RedisRegistrationStore(Pool p) { - this(p, DEFAULT_CLEAN_PERIOD, DEFAULT_GRACE_PERIOD, DEFAULT_CLEAN_LIMIT); // default clean period 60s - } - - public RedisRegistrationStore(Pool p, long cleanPeriodInSec, long lifetimeGracePeriodInSec, int cleanLimit) { - this(p, Executors.newScheduledThreadPool(1, - new NamedThreadFactory(String.format("RedisRegistrationStore Cleaner (%ds)", cleanPeriodInSec))), - cleanPeriodInSec, lifetimeGracePeriodInSec, cleanLimit); - } - - public RedisRegistrationStore(Pool p, ScheduledExecutorService schedExecutor, long cleanPeriodInSec, - long lifetimeGracePeriodInSec, int cleanLimit) { - this(p, schedExecutor, cleanPeriodInSec, lifetimeGracePeriodInSec, cleanLimit, new SingleInstanceJedisLock()); - } - - public RedisRegistrationStore(Pool p, ScheduledExecutorService schedExecutor, long cleanPeriodInSec, - long lifetimeGracePeriodInSec, int cleanLimit, JedisLock redisLock) { - this(p, schedExecutor, cleanPeriodInSec, lifetimeGracePeriodInSec, cleanLimit, redisLock, - new RegistrationSerDes()); + this(new Builder(p)); } - public RedisRegistrationStore(Pool p, ScheduledExecutorService schedExecutor, long cleanPeriodInSec, - long lifetimeGracePeriodInSec, int cleanLimit, JedisLock redisLock, RegistrationSerDes registrationSerDes) { - this.pool = p; - this.schedExecutor = schedExecutor; - this.cleanPeriod = cleanPeriodInSec; - this.cleanLimit = cleanLimit; - this.gracePeriod = lifetimeGracePeriodInSec; - this.lock = redisLock; - this.registrationSerDes = registrationSerDes; + public RedisRegistrationStore(Builder builder) { + this.pool = builder.pool; + this.registrationByEndpointPrefix = builder.registrationByEndpointPrefix; + this.endpointByRegistrationIdPrefix = builder.endpointByRegistrationIdPrefix; + this.endpointBySocketAddressPrefix = builder.endpointBySocketAddressPrefix; + this.endpointByIdentityPrefix = builder.endpointByIdentityPrefix; + this.endpointLockPrefix = builder.endpointLockPrefix; + this.observationTokenPrefix = builder.observationTokenPrefix.getBytes(UTF_8); + this.observationTokensByRegistrationIdPrefix = builder.observationTokensByRegistrationIdPrefix; + this.endpointExpirationKey = builder.endpointExpirationKey.getBytes(UTF_8); + this.cleanPeriod = builder.cleanPeriod; + this.cleanLimit = builder.cleanLimit; + this.gracePeriod = builder.gracePeriod; + this.schedExecutor = builder.schedExecutor; + this.lock = builder.lock; + this.registrationSerDes = builder.registrationSerDes; } /* *************** Redis Key utility function **************** */ @@ -149,11 +134,11 @@ private byte[] toKey(String prefix, String registrationID) { } private byte[] toLockKey(String endpoint) { - return toKey(LOCK_EP, endpoint); + return toKey(endpointLockPrefix, endpoint); } private byte[] toLockKey(byte[] endpoint) { - return toKey(LOCK_EP.getBytes(UTF_8), endpoint); + return toKey(endpointLockPrefix.getBytes(UTF_8), endpoint); } /* *************** Leshan Registration API **************** */ @@ -313,7 +298,7 @@ public Registration getRegistrationByIdentity(Identity identity) { @Override public Iterator getAllRegistrations() { - return new RedisIterator(pool, new ScanParams().match(REG_EP + "*").count(100)); + return new RedisIterator(pool, new ScanParams().match(registrationByEndpointPrefix + "*").count(100)); } protected class RedisIterator implements Iterator { @@ -450,31 +435,32 @@ private void removeSecondaryIndex(Jedis j, byte[] indexKey, String endpointName) } private void addOrUpdateExpiration(Jedis j, Registration registration) { - j.zadd(EXP_EP, registration.getExpirationTimeStamp(gracePeriod), registration.getEndpoint().getBytes(UTF_8)); + j.zadd(endpointExpirationKey, registration.getExpirationTimeStamp(gracePeriod), + registration.getEndpoint().getBytes(UTF_8)); } private void removeExpiration(Jedis j, Registration registration) { - j.zrem(EXP_EP, registration.getEndpoint().getBytes(UTF_8)); + j.zrem(endpointExpirationKey, registration.getEndpoint().getBytes(UTF_8)); } private byte[] toRegIdKey(String registrationId) { - return toKey(REG_EP_REGID_IDX, registrationId); + return toKey(endpointByRegistrationIdPrefix, registrationId); } private byte[] toRegAddrKey(InetSocketAddress addr) { - return toKey(REG_EP_ADDR_IDX, addr.getAddress().toString() + ":" + addr.getPort()); + return toKey(endpointBySocketAddressPrefix, addr.getAddress().toString() + ":" + addr.getPort()); } private byte[] toRegIdentityKey(Identity identity) { - return toKey(REG_EP_IDENTITY, IdentitySerDes.serialize(identity).toString()); + return toKey(endpointByIdentityPrefix, IdentitySerDes.serialize(identity).toString()); } private byte[] toEndpointKey(String endpoint) { - return toKey(REG_EP, endpoint); + return toKey(registrationByEndpointPrefix, endpoint); } private byte[] toEndpointKey(byte[] endpoint) { - return toKey(REG_EP.getBytes(UTF_8), endpoint); + return toKey(registrationByEndpointPrefix.getBytes(UTF_8), endpoint); } private byte[] serializeReg(Registration registration) { @@ -507,7 +493,7 @@ public Collection addObservation(String registrationId, Observation // Add and Get previous observation byte[] previousValue; - byte[] key = toKey(OBS_TKN, observation.getId().getBytes()); + byte[] key = toKey(observationTokenPrefix, observation.getId().getBytes()); byte[] serializeObs = serializeObs(observation); if (addIfAbsent) { previousValue = j.get(key); @@ -519,7 +505,7 @@ public Collection addObservation(String registrationId, Observation } // secondary index to get the list by registrationId - j.lpush(toKey(OBS_TKNS_REGID_IDX, registrationId), observation.getId().getBytes()); + j.lpush(toKey(observationTokensByRegistrationIdPrefix, registrationId), observation.getId().getBytes()); // log any collisions Observation previousObservation; @@ -619,7 +605,7 @@ public Collection removeObservations(String registrationId) { // get endpoint and create lock String endpoint = registration.getEndpoint(); byte[] lockValue = null; - byte[] lockKey = toKey(LOCK_EP, endpoint); + byte[] lockKey = toKey(endpointLockPrefix, endpoint); try { lockValue = lock.acquire(j, lockKey); @@ -647,8 +633,8 @@ private Registration getRegistration(Jedis j, String registrationId) { private Collection unsafeGetObservations(Jedis j, String registrationId) { Collection result = new ArrayList<>(); - for (byte[] token : j.lrange(toKey(OBS_TKNS_REGID_IDX, registrationId), 0, -1)) { - byte[] obs = j.get(toKey(OBS_TKN, token)); + for (byte[] token : j.lrange(toKey(observationTokensByRegistrationIdPrefix, registrationId), 0, -1)) { + byte[] obs = j.get(toKey(observationTokenPrefix, token)); if (obs != null) { result.add(deserializeObs(obs)); } @@ -657,7 +643,7 @@ private Collection unsafeGetObservations(Jedis j, String registrati } private Observation unsafeGetObservation(Jedis j, ObservationIdentifier observationId) { - byte[] obs = j.get(toKey(OBS_TKN, observationId.getBytes())); + byte[] obs = j.get(toKey(observationTokenPrefix, observationId.getBytes())); if (obs == null) { return null; } else { @@ -666,22 +652,22 @@ private Observation unsafeGetObservation(Jedis j, ObservationIdentifier observat } private void unsafeRemoveObservation(Jedis j, String registrationId, ObservationIdentifier observationId) { - if (j.del(toKey(OBS_TKN, observationId.getBytes())) > 0L) { - j.lrem(toKey(OBS_TKNS_REGID_IDX, registrationId), 0, observationId.getBytes()); + if (j.del(toKey(observationTokenPrefix, observationId.getBytes())) > 0L) { + j.lrem(toKey(observationTokensByRegistrationIdPrefix, registrationId), 0, observationId.getBytes()); } } private Collection unsafeRemoveAllObservations(Jedis j, String registrationId) { Collection removed = new ArrayList<>(); - byte[] regIdKey = toKey(OBS_TKNS_REGID_IDX, registrationId); + byte[] regIdKey = toKey(observationTokensByRegistrationIdPrefix, registrationId); // fetch all observations by token for (byte[] token : j.lrange(regIdKey, 0, -1)) { - byte[] obs = j.get(toKey(OBS_TKN, token)); + byte[] obs = j.get(toKey(observationTokenPrefix, token)); if (obs != null) { removed.add(deserializeObs(obs)); } - j.del(toKey(OBS_TKN, token)); + j.del(toKey(observationTokenPrefix, token)); } j.del(regIdKey); @@ -743,7 +729,7 @@ private class Cleaner implements Runnable { public void run() { try (Jedis j = pool.getResource()) { - List endpointsExpired = j.zrangeByScore(EXP_EP, Double.NEGATIVE_INFINITY, + List endpointsExpired = j.zrangeByScore(endpointExpirationKey, Double.NEGATIVE_INFINITY, System.currentTimeMillis(), 0, cleanLimit); for (byte[] endpoint : endpointsExpired) { @@ -768,4 +754,273 @@ public void run() { public void setExpirationListener(ExpirationListener listener) { expirationListener = listener; } + + /** + * Class helping to build and configure a {@link RedisRegistrationStore}. + */ + public static class Builder { + /** Default time in seconds between 2 cleaning tasks (used to remove expired registration) */ + public static final long DEFAULT_CLEAN_PERIOD = 60; + public static final int DEFAULT_CLEAN_LIMIT = 500; + /** Default extra time for registration lifetime in seconds */ + public static final long DEFAULT_GRACE_PERIOD = 0; + + private final Pool pool; + private String prefix; + private String registrationByEndpointPrefix; + private String endpointByRegistrationIdPrefix; + private String endpointBySocketAddressPrefix; + private String endpointByIdentityPrefix; + private String endpointLockPrefix; + private String observationTokenPrefix; + private String observationTokensByRegistrationIdPrefix; + private String endpointExpirationKey; + + private long cleanPeriod; + private int cleanLimit; + private long gracePeriod; + + private ScheduledExecutorService schedExecutor; + private JedisLock lock; + private RegistrationSerDes registrationSerDes; + + /** + * Set the prefix for all keys and prefixes. + *

+ * Default value is {@literal REGSTORE#}. + */ + public Builder setPrefix(String prefix) { + this.prefix = prefix; + return this; + } + + /** + * Set the key prefix for registration info lookup by endpoint. + *

+ * Default value is {@literal REG#EP#}. Should not be {@code null} or empty. Leshan v1.x used + * {@literal REG:EP:}. + */ + public Builder setRegistrationByEndpointPrefix(String registrationByEndpointPrefix) { + this.registrationByEndpointPrefix = registrationByEndpointPrefix; + return this; + } + + /** + * Set the key prefix for endpoint lookup by registration ID. + *

+ * Default value is {@literal EP#REGID#}. Should not be {@code null} or empty. Leshan v1.x used + * {@literal EP:REGID:}. + */ + public Builder setEndpointByRegistrationIdPrefix(String endpointByRegistrationIdPrefix) { + this.endpointByRegistrationIdPrefix = endpointByRegistrationIdPrefix; + return this; + } + + /** + * Set the key prefix for endpoint lookup by socket address. + *

+ * Default value is {@literal EP#ADDR#}. Should not be {@code null} or empty. Leshan v1.x used + * {@literal EP:ADDR:}. + */ + public Builder setEndpointBySocketAddressPrefix(String endpointBySocketAddressPrefix) { + this.endpointBySocketAddressPrefix = endpointBySocketAddressPrefix; + return this; + } + + /** + * Set the key prefix for endpoint lookup by registration identity. + *

+ * Default value is {@literal EP#IDENTITY#}. Should not be {@code null} or empty. Leshan v1.x used + * {@literal EP:IDENTITY:}. + */ + public Builder setEndpointByIdentityPrefix(String endpointByIdentityPrefix) { + this.endpointByIdentityPrefix = endpointByIdentityPrefix; + return this; + } + + /** + * Set the key prefix for endpoint locks lookup. + *

+ * Default value is {@literal LOCK#EP#}. Should not be {@code null} or empty. Leshan v1.x used + * {@literal LOCK:EP:}. + */ + public Builder setEndpointLockPrefix(String endpointLockPrefix) { + this.endpointLockPrefix = endpointLockPrefix; + return this; + } + + /** + * Set the key prefix for observation token lookup. + *

+ * Default value is {@literal OBS#TKN#}. Should not be {@code null} or empty. Leshan v1.x used + * {@literal OBS:TKN:}. + */ + public Builder setObservationTokenPrefix(String observationTokenPrefix) { + this.observationTokenPrefix = observationTokenPrefix; + return this; + } + + /** + * Set the key prefix for observation tokens list lookup by registration ID. + *

+ * Default value is {@literal TKNS#REGID#}. Should not be {@code null} or empty. Leshan v1.x used + * {@literal TKNS:REGID:}. + */ + public Builder setObservationTokensByRegistrationIdPrefix(String observationTokensByRegistrationIdPrefix) { + this.observationTokensByRegistrationIdPrefix = observationTokensByRegistrationIdPrefix; + return this; + } + + /** + * Set the key for expiration key lookup. It is a sorted set used for registration expiration (expiration date, + * endpoint). + *

+ * Default value is {@literal EXP#EP}. Should not be {@code null} or empty. Leshan v1.x used {@literal EXP:EP}. + */ + public Builder setEndpointExpirationKey(String endpointExpirationKey) { + this.endpointExpirationKey = endpointExpirationKey; + return this; + } + + /** + * Set the clean period in seconds. + *

+ * Default value is {@literal 60 seconds}. + */ + public Builder setCleanPeriod(long cleanPeriod) { + this.cleanPeriod = cleanPeriod; + return this; + } + + /** + * Set the maximum number to clean in a clean period. + *

+ * Default value is {@literal 500}. + */ + public Builder setCleanLimit(int cleanLimit) { + this.cleanLimit = cleanLimit; + return this; + } + + /** + * Set the grace period in seconds. + *

+ * Default value is {@literal 0 seconds}. + */ + public Builder setGracePeriod(long gracePeriod) { + this.gracePeriod = gracePeriod; + return this; + } + + public Builder setSchedExecutor(ScheduledExecutorService schedExecutor) { + this.schedExecutor = schedExecutor; + return this; + } + + public Builder setLock(JedisLock lock) { + this.lock = lock; + return this; + } + + public Builder setRegistrationSerDes(RegistrationSerDes registrationSerDes) { + this.registrationSerDes = registrationSerDes; + return this; + } + + public Builder(Pool pool) { + this.pool = pool; + this.prefix = "REGSTORE#"; + this.registrationByEndpointPrefix = "REG#EP#"; + this.endpointByRegistrationIdPrefix = "EP#REGID#"; + this.endpointBySocketAddressPrefix = "EP#ADDR#"; + this.endpointByIdentityPrefix = "EP#IDENTITY#"; + this.endpointLockPrefix = "LOCK#EP#"; + this.observationTokenPrefix = "OBS#TKN#"; + this.observationTokensByRegistrationIdPrefix = "TKNS#REGID#"; + this.endpointExpirationKey = "EXP#EP"; + this.cleanPeriod = DEFAULT_CLEAN_PERIOD; + this.cleanLimit = DEFAULT_CLEAN_LIMIT; + this.gracePeriod = DEFAULT_GRACE_PERIOD; + } + + /** + * Create the {@link RedisRegistrationStore}. + *

+ * Throws {@link IllegalArgumentException} when any of prefixes is not set or is equal to some other. + */ + public RedisRegistrationStore build() throws IllegalArgumentException { + if (this.registrationByEndpointPrefix == null || this.registrationByEndpointPrefix.isEmpty()) { + throw new IllegalArgumentException("registrationByEndpointPrefix should not be empty"); + } + + if (this.endpointByRegistrationIdPrefix == null || this.endpointByRegistrationIdPrefix.isEmpty()) { + throw new IllegalArgumentException("endpointByRegistrationIdPrefix should not be empty"); + } + + if (this.endpointBySocketAddressPrefix == null || this.endpointBySocketAddressPrefix.isEmpty()) { + throw new IllegalArgumentException("endpointBySocketAddressPrefix should not be empty"); + } + + if (this.endpointByIdentityPrefix == null || this.endpointByIdentityPrefix.isEmpty()) { + throw new IllegalArgumentException("endpointByIdentityPrefix should not be empty"); + } + + if (this.endpointLockPrefix == null || this.endpointLockPrefix.isEmpty()) { + throw new IllegalArgumentException("endpointLockPrefix should not be empty"); + } + + if (this.observationTokenPrefix == null || this.observationTokenPrefix.isEmpty()) { + throw new IllegalArgumentException("observationTokenPrefix should not be empty"); + } + + if (this.observationTokensByRegistrationIdPrefix == null + || this.observationTokensByRegistrationIdPrefix.isEmpty()) { + throw new IllegalArgumentException("observationTokensByRegistrationIdPrefix should not be empty"); + } + + if (this.endpointExpirationKey == null || this.endpointExpirationKey.isEmpty()) { + throw new IllegalArgumentException("endpointExpirationKey should not be empty"); + } + + // Make sure same prefix is not used more than once + String[] prefixes = new String[] { this.registrationByEndpointPrefix, this.endpointByRegistrationIdPrefix, + this.endpointBySocketAddressPrefix, this.endpointByIdentityPrefix, this.endpointLockPrefix, + this.observationTokenPrefix, this.observationTokensByRegistrationIdPrefix, + this.endpointExpirationKey }; + Set uniquePrefixes = new HashSet<>(); + + for (String prefix : prefixes) { + if (!uniquePrefixes.add(prefix)) { + throw new IllegalArgumentException(String.format("prefix name %s is taken already", prefix)); + } + } + + if (this.prefix != null) { + this.registrationByEndpointPrefix = this.prefix + this.registrationByEndpointPrefix; + this.endpointByRegistrationIdPrefix = this.prefix + this.endpointByRegistrationIdPrefix; + this.endpointBySocketAddressPrefix = this.prefix + this.endpointBySocketAddressPrefix; + this.endpointByIdentityPrefix = this.prefix + this.endpointByIdentityPrefix; + this.endpointLockPrefix = this.prefix + this.endpointLockPrefix; + this.observationTokenPrefix = this.prefix + this.observationTokenPrefix; + this.observationTokensByRegistrationIdPrefix = this.prefix + + this.observationTokensByRegistrationIdPrefix; + this.endpointExpirationKey = this.prefix + this.endpointExpirationKey; + } + + if (this.schedExecutor == null) { + this.schedExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory( + String.format("RedisRegistrationStore Cleaner (%ds)", this.cleanPeriod))); + } + + if (this.lock == null) { + this.lock = new SingleInstanceJedisLock(); + } + + if (this.registrationSerDes == null) { + this.registrationSerDes = new RegistrationSerDes(); + } + + return new RedisRegistrationStore(this); + } + } }