From b5f124cfd812e525f14722a175b27849ada9b3f2 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Mon, 14 Aug 2023 11:26:59 +0200 Subject: [PATCH] Terminate stream with error on `null` values returned by `RedisElementReader` for top-level elements. We now emit InvalidDataAccessApiUsageException when a RedisElementReader returns null in the context of a top-level stream to indicate invalid API usage although RedisElementReader.read can generally return null values if these are being collected in a container or value wrapper or parent complex object. Apply consistent wording to operations documentation. --- .../redis/connection/convert/Converters.java | 2 +- .../core/DefaultReactiveGeoOperations.java | 2 + .../core/DefaultReactiveHashOperations.java | 44 ++++++++++++--- .../core/DefaultReactiveListOperations.java | 52 +++++++++++------- .../core/DefaultReactiveSetOperations.java | 34 ++++++++---- .../core/DefaultReactiveValueOperations.java | 28 +++++++--- .../core/DefaultReactiveZSetOperations.java | 53 ++++++++++++------- .../redis/core/ReactiveGeoOperations.java | 16 +++--- .../redis/core/ReactiveHashOperations.java | 8 ++- .../core/ReactiveHyperLogLogOperations.java | 2 +- .../redis/core/ReactiveListOperations.java | 8 ++- .../redis/core/ReactiveRedisOperations.java | 6 +++ .../redis/core/ReactiveRedisTemplate.java | 25 +++++++-- .../redis/core/ReactiveSetOperations.java | 8 ++- .../redis/core/ReactiveStreamOperations.java | 16 +----- .../redis/core/ReactiveValueOperations.java | 6 +++ .../redis/core/ReactiveZSetOperations.java | 8 ++- .../script/DefaultReactiveScriptExecutor.java | 13 ++++- .../core/script/ReactiveScriptExecutor.java | 6 +++ .../data/redis/core/script/ScriptUtils.java | 2 + .../serializer/RedisSerializationContext.java | 4 +- ...veStringRedisTemplateIntegrationTests.java | 32 ++++++++++- 22 files changed, 276 insertions(+), 99 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/connection/convert/Converters.java b/src/main/java/org/springframework/data/redis/connection/convert/Converters.java index fd6ae29d5b..7956f03bb7 100644 --- a/src/main/java/org/springframework/data/redis/connection/convert/Converters.java +++ b/src/main/java/org/springframework/data/redis/connection/convert/Converters.java @@ -469,7 +469,7 @@ public static Object parse(Object source, String sourcePath, Map Map.Entry entryOf(K key, V value) { + public static Map.Entry entryOf(@Nullable K key, @Nullable V value) { return new AbstractMap.SimpleImmutableEntry<>(key, value); } diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveGeoOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveGeoOperations.java index d7a9fe1181..4fa364f68d 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveGeoOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveGeoOperations.java @@ -39,6 +39,7 @@ import org.springframework.data.redis.domain.geo.GeoReference.GeoMemberReference; import org.springframework.data.redis.domain.geo.GeoShape; import org.springframework.data.redis.serializer.RedisSerializationContext; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** @@ -320,6 +321,7 @@ private ByteBuffer rawValue(V value) { return serializationContext.getValueSerializationPair().write(value); } + @Nullable private V readValue(ByteBuffer buffer) { return serializationContext.getValueSerializationPair().read(buffer); } diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveHashOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveHashOperations.java index 3c258354a7..da1050b487 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveHashOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveHashOperations.java @@ -26,9 +26,11 @@ import java.util.function.Function; import org.reactivestreams.Publisher; +import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.redis.connection.ReactiveHashCommands; import org.springframework.data.redis.connection.convert.Converters; import org.springframework.data.redis.serializer.RedisSerializationContext; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** @@ -126,7 +128,8 @@ public Mono randomKey(H key) { Assert.notNull(key, "Key must not be null"); - return createMono(hashCommands -> hashCommands.hRandField(rawKey(key))).map(this::readHashKey); + return template.doCreateMono(connection -> connection // + .hashCommands().hRandField(rawKey(key))).map(this::readRequiredHashKey); } @Override @@ -142,7 +145,8 @@ public Flux randomKeys(H key, long count) { Assert.notNull(key, "Key must not be null"); - return createFlux(hashCommands -> hashCommands.hRandField(rawKey(key), count)).map(this::readHashKey); + return template.doCreateFlux(connection -> connection // + .hashCommands().hRandField(rawKey(key), count)).map(this::readRequiredHashKey); } @Override @@ -159,8 +163,8 @@ public Flux keys(H key) { Assert.notNull(key, "Key must not be null"); - return createFlux(hashCommands -> hashCommands.hKeys(rawKey(key)) // - .map(this::readHashKey)); + return createFlux(connection -> connection.hKeys(rawKey(key)) // + .map(this::readRequiredHashKey)); } @Override @@ -207,8 +211,8 @@ public Flux values(H key) { Assert.notNull(key, "Key must not be null"); - return createFlux(hashCommands -> hashCommands.hVals(rawKey(key)) // - .map(this::readHashValue)); + return createFlux(connection -> connection.hVals(rawKey(key)) // + .map(this::readRequiredHashValue)); } @Override @@ -265,13 +269,37 @@ private ByteBuffer rawHashValue(HV key) { } @SuppressWarnings("unchecked") + @Nullable private HK readHashKey(ByteBuffer value) { return (HK) serializationContext.getHashKeySerializationPair().read(value); } + private HK readRequiredHashKey(ByteBuffer buffer) { + + HK hashKey = readHashKey(buffer); + + if (hashKey == null) { + throw new InvalidDataAccessApiUsageException("Deserialized hash key is null"); + } + + return hashKey; + } + @SuppressWarnings("unchecked") - private HV readHashValue(ByteBuffer value) { - return (HV) (value == null ? value : serializationContext.getHashValueSerializationPair().read(value)); + @Nullable + private HV readHashValue(@Nullable ByteBuffer value) { + return (HV) (value == null ? null : serializationContext.getHashValueSerializationPair().read(value)); + } + + private HV readRequiredHashValue(ByteBuffer buffer) { + + HV hashValue = readHashValue(buffer); + + if (hashValue == null) { + throw new InvalidDataAccessApiUsageException("Deserialized hash value is null"); + } + + return hashValue; } private Map.Entry deserializeHashEntry(Map.Entry source) { diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveListOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveListOperations.java index e3e9ee8c40..032b47e350 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveListOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveListOperations.java @@ -27,11 +27,13 @@ import java.util.function.Function; import org.reactivestreams.Publisher; +import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.redis.connection.ReactiveListCommands; import org.springframework.data.redis.connection.ReactiveListCommands.Direction; import org.springframework.data.redis.connection.ReactiveListCommands.LPosCommand; import org.springframework.data.redis.connection.RedisListCommands.Position; import org.springframework.data.redis.serializer.RedisSerializationContext; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** @@ -59,7 +61,7 @@ public Flux range(K key, long start, long end) { Assert.notNull(key, "Key must not be null"); - return createFlux(listCommands -> listCommands.lRange(rawKey(key), start, end).map(this::readValue)); + return createFlux(connection -> connection.lRange(rawKey(key), start, end).map(this::readRequiredValue)); } @Override @@ -172,8 +174,8 @@ public Mono move(K sourceKey, Direction from, K destinationKey, Direction to) Assert.notNull(from, "From direction must not be null"); Assert.notNull(to, "To direction must not be null"); - return createMono(listCommands -> - listCommands.lMove(rawKey(sourceKey), rawKey(destinationKey), from, to).map(this::readValue)); + return createMono(connection -> connection.lMove(rawKey(sourceKey), rawKey(destinationKey), from, to) + .map(this::readRequiredValue)); } @Override @@ -185,8 +187,8 @@ public Mono move(K sourceKey, Direction from, K destinationKey, Direction to, Assert.notNull(to, "To direction must not be null"); Assert.notNull(timeout, "Timeout must not be null"); - return createMono(listCommands -> - listCommands.bLMove(rawKey(sourceKey), rawKey(destinationKey), from, to, timeout).map(this::readValue)); + return createMono(connection -> connection.bLMove(rawKey(sourceKey), rawKey(destinationKey), from, to, timeout) + .map(this::readRequiredValue)); } @Override @@ -211,7 +213,7 @@ public Mono index(K key, long index) { Assert.notNull(key, "Key must not be null"); - return createMono(listCommands -> listCommands.lIndex(rawKey(key), index).map(this::readValue)); + return createMono(connection -> connection.lIndex(rawKey(key), index).map(this::readRequiredValue)); } @Override @@ -236,7 +238,7 @@ public Mono leftPop(K key) { Assert.notNull(key, "Key must not be null"); - return createMono(listCommands -> listCommands.lPop(rawKey(key)).map(this::readValue)); + return createMono(connection -> connection.lPop(rawKey(key)).map(this::readRequiredValue)); } @@ -245,7 +247,7 @@ public Flux leftPop(K key, long count) { Assert.notNull(key, "Key must not be null"); - return createFlux(listCommands -> listCommands.lPop(rawKey(key), count).map(this::readValue)); + return createFlux(listCommands -> listCommands.lPop(rawKey(key), count).map(this::readRequiredValue)); } @Override @@ -255,9 +257,8 @@ public Mono leftPop(K key, Duration timeout) { Assert.notNull(timeout, "Duration must not be null"); Assert.isTrue(isZeroOrGreaterOneSecond(timeout), "Duration must be either zero or greater or equal to 1 second"); - return createMono(listCommands -> - listCommands.blPop(Collections.singletonList(rawKey(key)), timeout) - .map(popResult -> readValue(popResult.getValue()))); + return createMono(connection -> connection.blPop(Collections.singletonList(rawKey(key)), timeout) + .mapNotNull(popResult -> readValue(popResult.getValue()))); } @Override @@ -265,7 +266,7 @@ public Mono rightPop(K key) { Assert.notNull(key, "Key must not be null"); - return createMono(listCommands -> listCommands.rPop(rawKey(key)).map(this::readValue)); + return createMono(listCommands -> listCommands.rPop(rawKey(key)).map(this::readRequiredValue)); } @Override @@ -273,7 +274,7 @@ public Flux rightPop(K key, long count) { Assert.notNull(key, "Key must not be null"); - return createFlux(listCommands -> listCommands.rPop(rawKey(key), count).map(this::readValue)); + return createFlux(listCommands -> listCommands.rPop(rawKey(key), count).map(this::readRequiredValue)); } @Override @@ -283,9 +284,8 @@ public Mono rightPop(K key, Duration timeout) { Assert.notNull(timeout, "Duration must not be null"); Assert.isTrue(isZeroOrGreaterOneSecond(timeout), "Duration must be either zero or greater or equal to 1 second"); - return createMono(listCommands -> - listCommands.brPop(Collections.singletonList(rawKey(key)), timeout) - .map(popResult -> readValue(popResult.getValue()))); + return createMono(connection -> connection.brPop(Collections.singletonList(rawKey(key)), timeout) + .mapNotNull(popResult -> readValue(popResult.getValue()))); } @Override @@ -294,8 +294,8 @@ public Mono rightPopAndLeftPush(K sourceKey, K destinationKey) { Assert.notNull(sourceKey, "Source key must not be null"); Assert.notNull(destinationKey, "Destination key must not be null"); - return createMono(listCommands -> - listCommands.rPopLPush(rawKey(sourceKey), rawKey(destinationKey)).map(this::readValue)); + return createMono(connection -> connection.rPopLPush(rawKey(sourceKey), rawKey(destinationKey)) + .map(this::readRequiredValue)); } @Override @@ -306,8 +306,8 @@ public Mono rightPopAndLeftPush(K sourceKey, K destinationKey, Duration timeo Assert.notNull(timeout, "Duration must not be null"); Assert.isTrue(isZeroOrGreaterOneSecond(timeout), "Duration must be either zero or greater or equal to 1 second"); - return createMono(listCommands -> - listCommands.bRPopLPush(rawKey(sourceKey), rawKey(destinationKey), timeout).map(this::readValue)); + return createMono(connection -> connection.bRPopLPush(rawKey(sourceKey), rawKey(destinationKey), timeout) + .map(this::readRequiredValue)); } @Override @@ -344,7 +344,19 @@ private ByteBuffer rawValue(V value) { return serializationContext.getValueSerializationPair().write(value); } + @Nullable private V readValue(ByteBuffer buffer) { return serializationContext.getValueSerializationPair().read(buffer); } + + private V readRequiredValue(ByteBuffer buffer) { + + V v = readValue(buffer); + + if (v == null) { + throw new InvalidDataAccessApiUsageException("Deserialized list value is null"); + } + + return v; + } } diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveSetOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveSetOperations.java index 218b93eb97..07bde24301 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveSetOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveSetOperations.java @@ -28,8 +28,10 @@ import java.util.function.Function; import org.reactivestreams.Publisher; +import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.redis.connection.ReactiveSetCommands; import org.springframework.data.redis.serializer.RedisSerializationContext; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** @@ -89,7 +91,7 @@ public Mono pop(K key) { Assert.notNull(key, "Key must not be null"); - return createMono(setCommands -> setCommands.sPop(rawKey(key)).map(this::readValue)); + return createMono(setCommands -> setCommands.sPop(rawKey(key)).map(this::readRequiredValue)); } @Override @@ -97,7 +99,7 @@ public Flux pop(K key, long count) { Assert.notNull(key, "Key must not be null"); - return createFlux(setCommands -> setCommands.sPop(rawKey(key), count).map(this::readValue)); + return createFlux(setCommands -> setCommands.sPop(rawKey(key), count).map(this::readRequiredValue)); } @Override @@ -175,7 +177,7 @@ public Flux intersect(Collection keys) { .map(this::rawKey) // .collectList() // .flatMapMany(setCommands::sInter) // - .map(this::readValue)); + .map(this::readRequiredValue)); } @Override @@ -237,7 +239,7 @@ public Flux union(Collection keys) { .map(this::rawKey) // .collectList() // .flatMapMany(setCommands::sUnion) // - .map(this::readValue)); + .map(this::readRequiredValue)); } @Override @@ -299,7 +301,7 @@ public Flux difference(Collection keys) { .map(this::rawKey) // .collectList() // .flatMapMany(setCommands::sDiff) // - .map(this::readValue)); + .map(this::readRequiredValue)); } @Override @@ -339,7 +341,7 @@ public Flux members(K key) { Assert.notNull(key, "Key must not be null"); - return createFlux(setCommands -> setCommands.sMembers(rawKey(key)).map(this::readValue)); + return createFlux(setCommands -> setCommands.sMembers(rawKey(key)).map(this::readRequiredValue)); } @Override @@ -348,7 +350,7 @@ public Flux scan(K key, ScanOptions options) { Assert.notNull(key, "Key must not be null"); Assert.notNull(options, "ScanOptions must not be null"); - return createFlux(setCommands -> setCommands.sScan(rawKey(key), options).map(this::readValue)); + return createFlux(setCommands -> setCommands.sScan(rawKey(key), options).map(this::readRequiredValue)); } @Override @@ -356,7 +358,7 @@ public Mono randomMember(K key) { Assert.notNull(key, "Key must not be null"); - return createMono(setCommands -> setCommands.sRandMember(rawKey(key)).map(this::readValue)); + return createMono(setCommands -> setCommands.sRandMember(rawKey(key)).map(this::readRequiredValue)); } @Override @@ -364,7 +366,7 @@ public Flux distinctRandomMembers(K key, long count) { Assert.isTrue(count > 0, "Negative count not supported; Use randomMembers to allow duplicate elements"); - return createFlux(setCommands -> setCommands.sRandMember(rawKey(key), count).map(this::readValue)); + return createFlux(setCommands -> setCommands.sRandMember(rawKey(key), count).map(this::readRequiredValue)); } @Override @@ -372,7 +374,7 @@ public Flux randomMembers(K key, long count) { Assert.isTrue(count > 0, "Use a positive number for count; This method is already allowing duplicate elements"); - return createFlux(setCommands -> setCommands.sRandMember(rawKey(key), -count).map(this::readValue)); + return createFlux(setCommands -> setCommands.sRandMember(rawKey(key), -count).map(this::readRequiredValue)); } @Override @@ -415,7 +417,19 @@ private ByteBuffer rawValue(V value) { return serializationContext.getValueSerializationPair().write(value); } + @Nullable private V readValue(ByteBuffer buffer) { return serializationContext.getValueSerializationPair().read(buffer); } + + private V readRequiredValue(ByteBuffer buffer) { + + V v = readValue(buffer); + + if (v == null) { + throw new InvalidDataAccessApiUsageException("Deserialized set value is null"); + } + + return v; + } } diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveValueOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveValueOperations.java index 8bb8327999..f08b74800c 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveValueOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveValueOperations.java @@ -27,6 +27,7 @@ import java.util.function.Function; import org.reactivestreams.Publisher; +import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.redis.connection.BitFieldSubCommands; import org.springframework.data.redis.connection.ReactiveNumberCommands; import org.springframework.data.redis.connection.ReactiveStringCommands; @@ -34,6 +35,7 @@ import org.springframework.data.redis.core.types.Expiration; import org.springframework.data.redis.serializer.RedisSerializationContext; import org.springframework.data.redis.serializer.RedisSerializationContext.SerializationPair; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** @@ -147,7 +149,7 @@ public Mono get(Object key) { Assert.notNull(key, "Key must not be null"); return createMono(stringCommands -> stringCommands.get(rawKey((K) key)) // - .map(this::readValue)); + .map(this::readRequiredValue)); } @Override @@ -156,7 +158,7 @@ public Mono getAndDelete(K key) { Assert.notNull(key, "Key must not be null"); return createMono(stringCommands -> stringCommands.getDel(rawKey(key)) // - .map(this::readValue)); + .map(this::readRequiredValue)); } @Override @@ -166,7 +168,7 @@ public Mono getAndExpire(K key, Duration timeout) { Assert.notNull(timeout, "Timeout must not be null"); return createMono(stringCommands -> stringCommands.getEx(rawKey(key), Expiration.from(timeout)) // - .map(this::readValue)); + .map(this::readRequiredValue)); } @Override @@ -175,7 +177,7 @@ public Mono getAndPersist(K key) { Assert.notNull(key, "Key must not be null"); return createMono(stringCommands -> stringCommands.getEx(rawKey(key), Expiration.persistent()) // - .map(this::readValue)); + .map(this::readRequiredValue)); } @Override @@ -183,7 +185,8 @@ public Mono getAndSet(K key, V value) { Assert.notNull(key, "Key must not be null"); - return createMono(stringCommands -> stringCommands.getSet(rawKey(key), rawValue(value)).map(value()::read)); + return createMono(stringCommands -> stringCommands.getSet(rawKey(key), rawValue(value)) + .mapNotNull(value()::read)); } @Override @@ -252,7 +255,7 @@ public Mono get(K key, long start, long end) { Assert.notNull(key, "Key must not be null"); return createMono(stringCommands -> stringCommands.getRange(rawKey(key), start, end) // - .map(stringSerializationPair()::read)); + .mapNotNull(stringSerializationPair()::read)); } @Override @@ -326,10 +329,22 @@ private ByteBuffer rawValue(V value) { return serializationContext.getValueSerializationPair().write(value); } + @Nullable private V readValue(ByteBuffer buffer) { return serializationContext.getValueSerializationPair().read(buffer); } + private V readRequiredValue(ByteBuffer buffer) { + + V v = readValue(buffer); + + if (v == null) { + throw new InvalidDataAccessApiUsageException("Deserialized value is null"); + } + + return v; + } + private SerializationPair stringSerializationPair() { return serializationContext.getStringSerializationPair(); } @@ -357,4 +372,5 @@ private List deserializeValues(List source) { return result; } + } diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveZSetOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveZSetOperations.java index 451f5e6810..696a80992f 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveZSetOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveZSetOperations.java @@ -27,7 +27,7 @@ import java.util.function.Function; import org.reactivestreams.Publisher; - +import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.domain.Range; import org.springframework.data.redis.connection.Limit; import org.springframework.data.redis.connection.ReactiveZSetCommands; @@ -38,6 +38,7 @@ import org.springframework.data.redis.core.ZSetOperations.TypedTuple; import org.springframework.data.redis.serializer.RedisSerializationContext; import org.springframework.data.redis.util.ByteUtils; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** @@ -110,7 +111,7 @@ public Mono randomMember(K key) { Assert.notNull(key, "Key must not be null"); - return createMono(zSetCommands -> zSetCommands.zRandMember(rawKey(key))).map(this::readValue); + return createMono(zSetCommands -> zSetCommands.zRandMember(rawKey(key))).map(this::readRequiredValue); } @Override @@ -119,7 +120,7 @@ public Flux distinctRandomMembers(K key, long count) { Assert.notNull(key, "Key must not be null"); Assert.isTrue(count > 0, "Negative count not supported; Use randomMembers to allow duplicate elements"); - return createFlux(zSetCommands -> zSetCommands.zRandMember(rawKey(key), count)).map(this::readValue); + return createFlux(zSetCommands -> zSetCommands.zRandMember(rawKey(key), count)).map(this::readRequiredValue); } @Override @@ -128,7 +129,7 @@ public Flux randomMembers(K key, long count) { Assert.notNull(key, "Key must not be null"); Assert.isTrue(count > 0, "Use a positive number for count; This method is already allowing duplicate elements"); - return createFlux(zSetCommands -> zSetCommands.zRandMember(rawKey(key), -count)).map(this::readValue); + return createFlux(zSetCommands -> zSetCommands.zRandMember(rawKey(key), -count)).map(this::readRequiredValue); } @Override @@ -181,7 +182,7 @@ public Flux range(K key, Range range) { Assert.notNull(key, "Key must not be null"); Assert.notNull(range, "Range must not be null"); - return createFlux(zSetCommands -> zSetCommands.zRange(rawKey(key), range).map(this::readValue)); + return createFlux(zSetCommands -> zSetCommands.zRange(rawKey(key), range).map(this::readRequiredValue)); } @Override @@ -199,7 +200,7 @@ public Flux rangeByScore(K key, Range range) { Assert.notNull(key, "Key must not be null"); Assert.notNull(range, "Range must not be null"); - return createFlux(zSetCommands -> zSetCommands.zRangeByScore(rawKey(key), range).map(this::readValue)); + return createFlux(zSetCommands -> zSetCommands.zRangeByScore(rawKey(key), range).map(this::readRequiredValue)); } @Override @@ -218,7 +219,8 @@ public Flux rangeByScore(K key, Range range, Limit limit) { Assert.notNull(key, "Key must not be null"); Assert.notNull(range, "Range must not be null"); - return createFlux(zSetCommands -> zSetCommands.zRangeByScore(rawKey(key), range, limit).map(this::readValue)); + return createFlux(zSetCommands -> zSetCommands.zRangeByScore(rawKey(key), range, limit) + .map(this::readRequiredValue)); } @Override @@ -238,7 +240,7 @@ public Flux reverseRange(K key, Range range) { Assert.notNull(key, "Key must not be null"); Assert.notNull(range, "Range must not be null"); - return createFlux(zSetCommands -> zSetCommands.zRevRange(rawKey(key), range).map(this::readValue)); + return createFlux(zSetCommands -> zSetCommands.zRevRange(rawKey(key), range).map(this::readRequiredValue)); } @Override @@ -257,7 +259,8 @@ public Flux reverseRangeByScore(K key, Range range) { Assert.notNull(key, "Key must not be null"); Assert.notNull(range, "Range must not be null"); - return createFlux(zSetCommands -> zSetCommands.zRevRangeByScore(rawKey(key), range).map(this::readValue)); + return createFlux(zSetCommands -> zSetCommands.zRevRangeByScore(rawKey(key), range) + .map(this::readRequiredValue)); } @Override @@ -276,8 +279,8 @@ public Flux reverseRangeByScore(K key, Range range, Limit limit) { Assert.notNull(key, "Key must not be null"); Assert.notNull(range, "Range must not be null"); - return createFlux(zSetCommands -> - zSetCommands.zRevRangeByScore(rawKey(key), range, limit).map(this::readValue)); + return createFlux(zSetCommands -> zSetCommands.zRevRangeByScore(rawKey(key), range, limit) + .map(this::readRequiredValue)); } @Override @@ -481,7 +484,7 @@ public Flux difference(K key, Collection otherKeys) { return createFlux(zSetCommands -> Flux.fromIterable(getKeys(key, otherKeys)) // .map(this::rawKey) // .collectList() // - .flatMapMany(zSetCommands::zDiff).map(this::readValue)); + .flatMapMany(zSetCommands::zDiff).map(this::readRequiredValue)); } @Override @@ -519,7 +522,7 @@ public Flux intersect(K key, Collection otherKeys) { return createFlux(zSetCommands -> Flux.fromIterable(getKeys(key, otherKeys)) // .map(this::rawKey) // .collectList() // - .flatMapMany(zSetCommands::zInter).map(this::readValue)); + .flatMapMany(zSetCommands::zInter).map(this::readRequiredValue)); } @Override @@ -587,7 +590,8 @@ public Flux union(K key, Collection otherKeys) { return createFlux(zSetCommands -> Flux.fromIterable(getKeys(key, otherKeys)) // .map(this::rawKey) // .collectList() // - .flatMapMany(zSetCommands::zUnion).map(this::readValue)); + .flatMapMany(zSetCommands::zUnion) // + .map(this::readRequiredValue)); } @Override @@ -660,7 +664,7 @@ public Flux rangeByLex(K key, Range range) { Assert.notNull(key, "Key must not be null"); Assert.notNull(range, "Range must not be null"); - return createFlux(zSetCommands -> zSetCommands.zRangeByLex(rawKey(key), range).map(this::readValue)); + return createFlux(zSetCommands -> zSetCommands.zRangeByLex(rawKey(key), range).map(this::readRequiredValue)); } @Override @@ -670,7 +674,7 @@ public Flux rangeByLex(K key, Range range, Limit limit) { Assert.notNull(range, "Range must not be null"); Assert.notNull(limit, "Limit must not be null"); - return createFlux(zSetCommands -> zSetCommands.zRangeByLex(rawKey(key), range, limit).map(this::readValue)); + return createFlux(zSetCommands -> zSetCommands.zRangeByLex(rawKey(key), range, limit).map(this::readRequiredValue)); } @Override @@ -679,7 +683,7 @@ public Flux reverseRangeByLex(K key, Range range) { Assert.notNull(key, "Key must not be null"); Assert.notNull(range, "Range must not be null"); - return createFlux(zSetCommands -> zSetCommands.zRevRangeByLex(rawKey(key), range).map(this::readValue)); + return createFlux(zSetCommands -> zSetCommands.zRevRangeByLex(rawKey(key), range).map(this::readRequiredValue)); } @Override @@ -689,7 +693,8 @@ public Flux reverseRangeByLex(K key, Range range, Limit limit) { Assert.notNull(range, "Range must not be null"); Assert.notNull(limit, "Limit must not be null"); - return createFlux(zSetCommands -> zSetCommands.zRevRangeByLex(rawKey(key), range, limit).map(this::readValue)); + return createFlux(zSetCommands -> zSetCommands.zRevRangeByLex(rawKey(key), range, limit) + .map(this::readRequiredValue)); } @Override @@ -732,10 +737,22 @@ private ByteBuffer rawValue(V value) { return serializationContext.getValueSerializationPair().write(value); } + @Nullable private V readValue(ByteBuffer buffer) { return serializationContext.getValueSerializationPair().read(buffer); } + private V readRequiredValue(ByteBuffer buffer) { + + V v = readValue(buffer); + + if (v == null) { + throw new InvalidDataAccessApiUsageException("Deserialized sorted set value is null"); + } + + return v; + } + private TypedTuple readTypedTuple(Tuple raw) { return new DefaultTypedTuple<>(readValue(ByteBuffer.wrap(raw.getValue())), raw.getScore()); } diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveGeoOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveGeoOperations.java index c5eb41b970..99974cac70 100644 --- a/src/main/java/org/springframework/data/redis/core/ReactiveGeoOperations.java +++ b/src/main/java/org/springframework/data/redis/core/ReactiveGeoOperations.java @@ -35,7 +35,7 @@ import org.springframework.data.redis.domain.geo.GeoShape; /** - * Reactive Redis operations for geo commands. + * Reactive Redis operations for Geo Commands. * * @author Mark Paluch * @author Christoph Strobl @@ -289,8 +289,7 @@ default Flux>> search(K key, GeoReference reference, * @since 2.6 * @see Redis Documentation: GEOSEARCH */ - default Flux>> search(K key, GeoReference reference, - BoundingBox boundingBox) { + default Flux>> search(K key, GeoReference reference, BoundingBox boundingBox) { return search(key, reference, boundingBox, GeoSearchCommandArgs.newGeoSearchArgs()); } @@ -306,8 +305,8 @@ default Flux>> search(K key, GeoReference reference, * @since 2.6 * @see Redis Documentation: GEOSEARCH */ - default Flux>> search(K key, GeoReference reference, - BoundingBox boundingBox, GeoSearchCommandArgs args) { + default Flux>> search(K key, GeoReference reference, BoundingBox boundingBox, + GeoSearchCommandArgs args) { return search(key, reference, GeoShape.byBox(boundingBox), args); } @@ -383,8 +382,7 @@ default Mono searchAndStore(K key, K destKey, GeoReference reference, D * @since 2.6 * @see Redis Documentation: GEOSEARCHSTORE */ - default Mono searchAndStore(K key, K destKey, GeoReference reference, - BoundingBox boundingBox) { + default Mono searchAndStore(K key, K destKey, GeoReference reference, BoundingBox boundingBox) { return searchAndStore(key, destKey, reference, boundingBox, GeoSearchStoreCommandArgs.newGeoSearchStoreArgs()); } @@ -400,8 +398,8 @@ default Mono searchAndStore(K key, K destKey, GeoReference reference, * @since 2.6 * @see Redis Documentation: GEOSEARCHSTORE */ - default Mono searchAndStore(K key, K destKey, GeoReference reference, - BoundingBox boundingBox, GeoSearchStoreCommandArgs args) { + default Mono searchAndStore(K key, K destKey, GeoReference reference, BoundingBox boundingBox, + GeoSearchStoreCommandArgs args) { return searchAndStore(key, destKey, reference, GeoShape.byBox(boundingBox), args); } diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveHashOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveHashOperations.java index b0d8efddfc..9516bdc508 100644 --- a/src/main/java/org/springframework/data/redis/core/ReactiveHashOperations.java +++ b/src/main/java/org/springframework/data/redis/core/ReactiveHashOperations.java @@ -18,12 +18,18 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.List; import java.util.Map; /** - * Redis map specific operations working on a hash. + * Reactive Redis operations for Hash Commands. + *

+ * Streams of methods returning {@code Mono} or {@code Flux} are terminated with + * {@link org.springframework.dao.InvalidDataAccessApiUsageException} when + * {@link org.springframework.data.redis.serializer.RedisElementReader#read(ByteBuffer)} returns {@code null} for a + * particular element as Reactive Streams prohibit the usage of {@code null} values. * * @author Mark Paluch * @author Christoph Strobl diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveHyperLogLogOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveHyperLogLogOperations.java index 8dbe30ed92..d19da8fd66 100644 --- a/src/main/java/org/springframework/data/redis/core/ReactiveHyperLogLogOperations.java +++ b/src/main/java/org/springframework/data/redis/core/ReactiveHyperLogLogOperations.java @@ -18,7 +18,7 @@ import reactor.core.publisher.Mono; /** - * Redis cardinality specific operations working on a HyperLogLog multiset. + * Reactive Redis operations for working on a HyperLogLog multiset. * * @author Mark Paluch * @since 2.0 diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveListOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveListOperations.java index fd153ef02e..0273ab292e 100644 --- a/src/main/java/org/springframework/data/redis/core/ReactiveListOperations.java +++ b/src/main/java/org/springframework/data/redis/core/ReactiveListOperations.java @@ -20,6 +20,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.nio.ByteBuffer; import java.time.Duration; import java.util.Collection; @@ -28,7 +29,12 @@ import org.springframework.util.Assert; /** - * Redis list specific operations. + * Reactive Redis operations for List Commands. + *

+ * Streams of methods returning {@code Mono} or {@code Flux} are terminated with + * {@link org.springframework.dao.InvalidDataAccessApiUsageException} when + * {@link org.springframework.data.redis.serializer.RedisElementReader#read(ByteBuffer)} returns {@code null} for a + * particular element as Reactive Streams prohibit the usage of {@code null} values. * * @author Mark Paluch * @author Christoph Strobl diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveRedisOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveRedisOperations.java index 19f02af13c..8424daf4aa 100644 --- a/src/main/java/org/springframework/data/redis/core/ReactiveRedisOperations.java +++ b/src/main/java/org/springframework/data/redis/core/ReactiveRedisOperations.java @@ -18,6 +18,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.nio.ByteBuffer; import java.time.Duration; import java.time.Instant; import java.util.Arrays; @@ -41,6 +42,11 @@ /** * Interface that specified a basic set of Redis operations, implemented by {@link ReactiveRedisTemplate}. Not often * used but a useful option for extensibility and testability (as it can be easily mocked or stubbed). + *

+ * Streams of methods returning {@code Mono} or {@code Flux} are terminated with + * {@link org.springframework.dao.InvalidDataAccessApiUsageException} when + * {@link org.springframework.data.redis.serializer.RedisElementReader#read(ByteBuffer)} returns {@code null} for a + * particular element as Reactive Streams prohibit the usage of {@code null} values. * * @author Mark Paluch * @author Christoph Strobl diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java b/src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java index da4c76825a..6745a47a5c 100644 --- a/src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java +++ b/src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java @@ -27,7 +27,7 @@ import java.util.stream.Collectors; import org.reactivestreams.Publisher; - +import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.redis.connection.DataType; import org.springframework.data.redis.connection.ReactiveRedisConnection; import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse; @@ -55,6 +55,11 @@ *

* Note that while the template is generified, it is up to the serializers/deserializers to properly convert the given * Objects to and from binary data. + *

+ * Streams of methods returning {@code Mono} or {@code Flux} are terminated with + * {@link org.springframework.dao.InvalidDataAccessApiUsageException} when + * {@link org.springframework.data.redis.serializer.RedisElementReader#read(ByteBuffer)} returns {@code null} for a + * particular element as Reactive Streams prohibit the usage of {@code null} values. * * @author Mark Paluch * @author Christoph Strobl @@ -331,7 +336,7 @@ public Flux keys(K pattern) { return doCreateFlux(connection -> connection.keyCommands().keys(rawKey(pattern))) // .flatMap(Flux::fromIterable) // - .map(this::readKey); + .map(this::readRequiredKey); } @Override @@ -340,12 +345,12 @@ public Flux scan(ScanOptions options) { Assert.notNull(options, "ScanOptions must not be null"); return doCreateFlux(connection -> connection.keyCommands().scan(options)) // - .map(this::readKey); + .map(this::readRequiredKey); } @Override public Mono randomKey() { - return doCreateMono(connection -> connection.keyCommands().randomKey()).map(this::readKey); + return doCreateMono(connection -> connection.keyCommands().randomKey()).map(this::readRequiredKey); } @Override @@ -666,7 +671,19 @@ private ByteBuffer rawKey(K key) { return getSerializationContext().getKeySerializationPair().getWriter().write(key); } + @Nullable private K readKey(ByteBuffer buffer) { return getSerializationContext().getKeySerializationPair().getReader().read(buffer); } + + private K readRequiredKey(ByteBuffer buffer) { + + K key = readKey(buffer); + + if (key == null) { + throw new InvalidDataAccessApiUsageException("Deserialized key is null"); + } + + return key; + } } diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveSetOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveSetOperations.java index c8c34dd82d..f8d0feff61 100644 --- a/src/main/java/org/springframework/data/redis/core/ReactiveSetOperations.java +++ b/src/main/java/org/springframework/data/redis/core/ReactiveSetOperations.java @@ -18,11 +18,17 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.Map; /** - * Redis set specific operations. + * Reactive Redis operations for Set Commands. + *

+ * Streams of methods returning {@code Mono} or {@code Flux} are terminated with + * {@link org.springframework.dao.InvalidDataAccessApiUsageException} when + * {@link org.springframework.data.redis.serializer.RedisElementReader#read(ByteBuffer)} returns {@code null} for a + * particular element as Reactive Streams prohibit the usage of {@code null} values. * * @author Mark Paluch * @author Christoph Strobl diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java index 1002a660b5..9314bc2383 100644 --- a/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java +++ b/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java @@ -23,32 +23,20 @@ import java.util.Map; import org.reactivestreams.Publisher; - import org.springframework.data.domain.Range; import org.springframework.data.redis.connection.Limit; import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions; -import org.springframework.data.redis.connection.stream.ByteBufferRecord; -import org.springframework.data.redis.connection.stream.Consumer; -import org.springframework.data.redis.connection.stream.MapRecord; -import org.springframework.data.redis.connection.stream.ObjectRecord; -import org.springframework.data.redis.connection.stream.PendingMessage; -import org.springframework.data.redis.connection.stream.PendingMessages; -import org.springframework.data.redis.connection.stream.PendingMessagesSummary; -import org.springframework.data.redis.connection.stream.ReadOffset; +import org.springframework.data.redis.connection.stream.*; import org.springframework.data.redis.connection.stream.Record; -import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumer; import org.springframework.data.redis.connection.stream.StreamInfo.XInfoGroup; import org.springframework.data.redis.connection.stream.StreamInfo.XInfoStream; -import org.springframework.data.redis.connection.stream.StreamOffset; -import org.springframework.data.redis.connection.stream.StreamReadOptions; -import org.springframework.data.redis.connection.stream.StreamRecords; import org.springframework.data.redis.hash.HashMapper; import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** - * Redis stream specific operations. + * Reactive Redis operations for Stream Commands. * * @author Mark Paluch * @author Christoph Strobl diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveValueOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveValueOperations.java index 639c121f42..be8a0c0fbe 100644 --- a/src/main/java/org/springframework/data/redis/core/ReactiveValueOperations.java +++ b/src/main/java/org/springframework/data/redis/core/ReactiveValueOperations.java @@ -17,6 +17,7 @@ import reactor.core.publisher.Mono; +import java.nio.ByteBuffer; import java.time.Duration; import java.util.Collection; import java.util.List; @@ -26,6 +27,11 @@ /** * Reactive Redis operations for simple (or in Redis terminology 'string') values. + *

+ * Streams of methods returning {@code Mono} or {@code Flux} are terminated with + * {@link org.springframework.dao.InvalidDataAccessApiUsageException} when + * {@link org.springframework.data.redis.serializer.RedisElementReader#read(ByteBuffer)} returns {@code null} for a + * particular element as Reactive Streams prohibit the usage of {@code null} values. * * @author Mark Paluch * @author Jiahe Cai diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveZSetOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveZSetOperations.java index ecbffc9eab..1416b92c7e 100644 --- a/src/main/java/org/springframework/data/redis/core/ReactiveZSetOperations.java +++ b/src/main/java/org/springframework/data/redis/core/ReactiveZSetOperations.java @@ -18,6 +18,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.nio.ByteBuffer; import java.time.Duration; import java.util.Collection; import java.util.Collections; @@ -32,7 +33,12 @@ import org.springframework.lang.Nullable; /** - * Redis ZSet/sorted set specific operations. + * Reactive Redis operations for Sorted (ZSet) Commands. + *

+ * Streams of methods returning {@code Mono} or {@code Flux} are terminated with + * {@link org.springframework.dao.InvalidDataAccessApiUsageException} when + * {@link org.springframework.data.redis.serializer.RedisElementReader#read(ByteBuffer)} returns {@code null} for a + * particular element as Reactive Streams prohibit the usage of {@code null} values. * * @author Mark Paluch * @author Christoph Strobl diff --git a/src/main/java/org/springframework/data/redis/core/script/DefaultReactiveScriptExecutor.java b/src/main/java/org/springframework/data/redis/core/script/DefaultReactiveScriptExecutor.java index c3eabf4ac2..b85c13047a 100644 --- a/src/main/java/org/springframework/data/redis/core/script/DefaultReactiveScriptExecutor.java +++ b/src/main/java/org/springframework/data/redis/core/script/DefaultReactiveScriptExecutor.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.stream.Stream; +import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.redis.RedisSystemException; import org.springframework.data.redis.connection.ReactiveRedisConnection; import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; @@ -79,7 +80,6 @@ public Flux execute(RedisScript script, List keys, List args) { } @Override - @SuppressWarnings("unchecked") public Flux execute(RedisScript script, List keys, List args, RedisElementWriter argsWriter, RedisElementReader resultReader) { @@ -134,7 +134,16 @@ protected ByteBuffer scriptBytes(RedisScript script) { } protected Flux deserializeResult(RedisElementReader reader, Flux result) { - return result.map(it -> ScriptUtils.deserializeResult(reader, it)); + return result.map(it -> { + + T value = ScriptUtils.deserializeResult(reader, it); + + if (value == null) { + throw new InvalidDataAccessApiUsageException("Deserialized script result is null"); + } + + return value; + }); } protected SerializationPair keySerializer() { diff --git a/src/main/java/org/springframework/data/redis/core/script/ReactiveScriptExecutor.java b/src/main/java/org/springframework/data/redis/core/script/ReactiveScriptExecutor.java index f61a6cd735..20f8503fbc 100644 --- a/src/main/java/org/springframework/data/redis/core/script/ReactiveScriptExecutor.java +++ b/src/main/java/org/springframework/data/redis/core/script/ReactiveScriptExecutor.java @@ -17,6 +17,7 @@ import reactor.core.publisher.Flux; +import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; @@ -26,6 +27,11 @@ /** * Executes {@link RedisScript}s using reactive infrastructure. + *

+ * Streams of methods returning {@code Mono} or {@code Flux} are terminated with + * {@link org.springframework.dao.InvalidDataAccessApiUsageException} when + * {@link org.springframework.data.redis.serializer.RedisElementReader#read(ByteBuffer)} returns {@code null} for a + * particular element as Reactive Streams prohibit the usage of {@code null} values. * * @author Mark Paluch * @author Christoph Strobl diff --git a/src/main/java/org/springframework/data/redis/core/script/ScriptUtils.java b/src/main/java/org/springframework/data/redis/core/script/ScriptUtils.java index f7fead21a8..8e89cc9190 100644 --- a/src/main/java/org/springframework/data/redis/core/script/ScriptUtils.java +++ b/src/main/java/org/springframework/data/redis/core/script/ScriptUtils.java @@ -22,6 +22,7 @@ import org.springframework.dao.NonTransientDataAccessException; import org.springframework.data.redis.serializer.RedisElementReader; import org.springframework.data.redis.serializer.RedisSerializer; +import org.springframework.lang.Nullable; /** * Utilities for Lua script execution and result deserialization. @@ -71,6 +72,7 @@ static T deserializeResult(RedisSerializer resultSerializer, Object resul * @param result must not be {@literal null}. * @return the deserialized result. */ + @Nullable @SuppressWarnings({ "unchecked" }) static T deserializeResult(RedisElementReader reader, Object result) { diff --git a/src/main/java/org/springframework/data/redis/serializer/RedisSerializationContext.java b/src/main/java/org/springframework/data/redis/serializer/RedisSerializationContext.java index b0b75dfc37..23f54b5c35 100644 --- a/src/main/java/org/springframework/data/redis/serializer/RedisSerializationContext.java +++ b/src/main/java/org/springframework/data/redis/serializer/RedisSerializationContext.java @@ -17,6 +17,7 @@ import java.nio.ByteBuffer; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** @@ -269,8 +270,9 @@ static SerializationPair byteBuffer() { * Deserialize a {@link ByteBuffer} into the according type. * * @param buffer must not be {@literal null}. - * @return the deserialized value. + * @return the deserialized value. Can be {@literal null}. */ + @Nullable default T read(ByteBuffer buffer) { return getReader().read(buffer); } diff --git a/src/test/java/org/springframework/data/redis/core/ReactiveStringRedisTemplateIntegrationTests.java b/src/test/java/org/springframework/data/redis/core/ReactiveStringRedisTemplateIntegrationTests.java index 3257300576..0b3c8db611 100644 --- a/src/test/java/org/springframework/data/redis/core/ReactiveStringRedisTemplateIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/core/ReactiveStringRedisTemplateIntegrationTests.java @@ -20,9 +20,13 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; - +import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; import org.springframework.data.redis.connection.lettuce.extension.LettuceConnectionFactoryExtension; +import org.springframework.data.redis.serializer.RedisElementReader; +import org.springframework.data.redis.serializer.RedisElementWriter; +import org.springframework.data.redis.serializer.RedisSerializationContext; +import org.springframework.data.redis.serializer.StringRedisSerializer; /** * Integration tests for {@link ReactiveStringRedisTemplate}. @@ -55,4 +59,30 @@ void shouldSetAndGetKeys() { template.opsForValue().set("key", "value").as(StepVerifier::create).expectNext(true).verifyComplete(); template.opsForValue().get("key").as(StepVerifier::create).expectNext("value").verifyComplete(); } + + @Test // GH-2655 + void keysFailsOnNullElements() { + + template.opsForValue().set("a", "1").as(StepVerifier::create).expectNext(true).verifyComplete(); + template.opsForValue().set("b", "1").as(StepVerifier::create).expectNext(true).verifyComplete(); + + RedisElementWriter writer = RedisElementWriter.from(StringRedisSerializer.UTF_8); + RedisElementReader reader = RedisElementReader.from(StringRedisSerializer.UTF_8); + RedisSerializationContext nullReadingContext = RedisSerializationContext + . newSerializationContext(StringRedisSerializer.UTF_8).key(buffer -> { + + String read = reader.read(buffer); + if ("a".equals(read)) { + return null; + } + + return read; + }, writer).build(); + + ReactiveRedisTemplate customTemplate = new ReactiveRedisTemplate<>(template.getConnectionFactory(), + nullReadingContext); + + customTemplate.keys("b").as(StepVerifier::create).expectNext("b").verifyComplete(); + customTemplate.keys("a").as(StepVerifier::create).verifyError(InvalidDataAccessApiUsageException.class); + } }