Skip to content

Commit

Permalink
Merge pull request #527 from nats-io/js-updates
Browse files Browse the repository at this point in the history
v2.10 related updates
  • Loading branch information
wallyqs authored Feb 5, 2024
2 parents 280b9ec + 84cae33 commit aa46c60
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 26 deletions.
14 changes: 1 addition & 13 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,6 @@ jobs:
- bash ./scripts/install_nats.sh
install:
- pip install -e .[fast-mail-parser]
- name: "Python: 3.11 (nats-server@dev)"
python: "3.11"
env:
- NATS_SERVER_VERSION=dev
before_install:
- sudo apt update && sudo apt install gcc build-essential -y
- sudo apt-get install python3-pip
- sudo apt-get install python3-pytest
- pip install --upgrade pip
- bash ./scripts/install_nats.sh
install:
- pip install -e .[fast-mail-parser]
- name: "Python: 3.7"
python: "3.7"
before_install:
Expand All @@ -101,5 +89,5 @@ jobs:
- name: "Python: 3.8"
- name: "Python: 3.11"
- name: "Python: 3.11/uvloop"
- name: "Python: 3.11 (nats-server@dev)"
- name: "Python: 3.11 (nats-server@main)"
- name: "Python: 3.12"
2 changes: 2 additions & 0 deletions nats/js/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ class StreamState(Base):
deleted: Optional[List[int]] = None
num_deleted: Optional[int] = None
lost: Optional[LostStreamData] = None
subjects: Optional[Dict[str, int]] = None

@classmethod
def from_response(cls, resp: Dict[str, Any]):
Expand Down Expand Up @@ -384,6 +385,7 @@ class ConsumerConfig(Base):
ack_wait: Optional[float] = None # in seconds
max_deliver: Optional[int] = None
filter_subject: Optional[str] = None
filter_subjects: Optional[List[str]] = None
replay_policy: Optional[ReplayPolicy] = ReplayPolicy.INSTANT
rate_limit_bps: Optional[int] = None
sample_freq: Optional[str] = None
Expand Down
23 changes: 17 additions & 6 deletions nats/js/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ async def subscribe(
pending_bytes_limit: int = DEFAULT_JS_SUB_PENDING_BYTES_LIMIT,
deliver_policy: Optional[api.DeliverPolicy] = None,
headers_only: Optional[bool] = None,
inactive_threshold: Optional[float] = None,
) -> PushSubscription:
"""Create consumer if needed and push-subscribe to it.
Expand Down Expand Up @@ -272,6 +273,8 @@ async def cb(msg):
if deliver_policy:
# NOTE: deliver_policy is defaulting to ALL so check is different for this one.
config.deliver_policy = deliver_policy
if inactive_threshold:
config.inactive_threshold = inactive_threshold

# Create inbox for push consumer.
deliver = self._nc.new_inbox()
Expand Down Expand Up @@ -461,12 +464,13 @@ async def main():

async def pull_subscribe_bind(
self,
durable: str,
stream: str,
consumer: Optional[str] = None,
stream: Optional[str] = None,
inbox_prefix: bytes = api.INBOX_PREFIX,
pending_msgs_limit: int = DEFAULT_JS_SUB_PENDING_MSGS_LIMIT,
pending_bytes_limit: int = DEFAULT_JS_SUB_PENDING_BYTES_LIMIT,
name: Optional[str] = None,
durable: Optional[str] = None,
) -> JetStreamContext.PullSubscription:
"""
pull_subscribe returns a `PullSubscription` that can be delivered messages
Expand Down Expand Up @@ -494,22 +498,29 @@ async def main():
asyncio.run(main())
"""
if not stream:
raise ValueError("nats: stream name is required")
deliver = inbox_prefix + self._nc._nuid.next()
sub = await self._nc.subscribe(
deliver.decode(),
pending_msgs_limit=pending_msgs_limit,
pending_bytes_limit=pending_bytes_limit
)
consumer = None
consumer_name = None
# In nats.py v2.7.0 changing the first arg to be 'consumer' instead of 'durable',
# but continue to support for backwards compatibility.
if durable:
consumer = durable
consumer_name = durable
elif name:
# This should not be common and 'consumer' arg preferred instead but support anyway.
consumer_name = name
else:
consumer = name
consumer_name = consumer
return JetStreamContext.PullSubscription(
js=self,
sub=sub,
stream=stream,
consumer=consumer,
consumer=consumer_name,
deliver=deliver,
)

Expand Down
6 changes: 6 additions & 0 deletions nats/js/kv.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ async def watch(
include_history=False,
ignore_deletes=False,
meta_only=False,
inactive_threshold=None,
) -> KeyWatcher:
"""
watch will fire a callback when a key that matches the keys
Expand Down Expand Up @@ -436,12 +437,17 @@ async def watch_updates(msg):
if not include_history:
deliver_policy = api.DeliverPolicy.LAST_PER_SUBJECT

# Cleanup watchers after 5 minutes of inactivity by default.
if not inactive_threshold:
inactive_threshold = 5 * 60

watcher._sub = await self._js.subscribe(
subject,
cb=watch_updates,
ordered_consumer=True,
deliver_policy=deliver_policy,
headers_only=meta_only,
inactive_threshold=inactive_threshold,
)
await asyncio.sleep(0)

Expand Down
7 changes: 5 additions & 2 deletions nats/js/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,15 @@ async def find_stream_name_by_subject(self, subject: str) -> str:
raise NotFoundError
return info['streams'][0]

async def stream_info(self, name: str) -> api.StreamInfo:
async def stream_info(self, name: str, subjects_filter: Optional[str] = None) -> api.StreamInfo:
"""
Get the latest StreamInfo by stream name.
"""
req_data = ''
if subjects_filter:
req_data = json.dumps({"subjects_filter": subjects_filter})
resp = await self._api_request(
f"{self._prefix}.STREAM.INFO.{name}", timeout=self._timeout
f"{self._prefix}.STREAM.INFO.{name}", req_data.encode(), timeout=self._timeout
)
return api.StreamInfo.from_response(resp)

Expand Down
2 changes: 1 addition & 1 deletion scripts/install_nats.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

set -e

export DEFAULT_NATS_SERVER_VERSION=latest
export DEFAULT_NATS_SERVER_VERSION=v2.10.10

export NATS_SERVER_VERSION="${NATS_SERVER_VERSION:=$DEFAULT_NATS_SERVER_VERSION}"

Expand Down
156 changes: 152 additions & 4 deletions tests/test_js.py
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,114 @@ async def test_ephemeral_pull_subscribe(self):
self.assertTrue(cinfo.config.durable_name == None)
await nc.close()


@async_test
async def test_consumer_with_multiple_filters(self):
nc = NATS()
await nc.connect()
js = nc.jetstream()
jsm = nc.jsm()

# Create stream.
await jsm.add_stream(name="ctests", subjects=["a", "b", "c.>"])
await js.publish("a", b'A')
await js.publish("b", b'B')
await js.publish("c.d", b'CD')
await js.publish("c.d.e", b'CDE')

# Create ephemeral pull consumer with a name.
stream_name = "ctests"
consumer_name = "multi"
cinfo = await jsm.add_consumer(
stream_name,
name=consumer_name,
ack_policy="explicit",
filter_subjects=["a", "b", "c.d.e"],
durable_name=consumer_name, # must be the same as name
)
assert cinfo.config.name == consumer_name

sub = await js.pull_subscribe_bind(consumer_name, stream_name)
msgs = await sub.fetch(1)
assert msgs[0].data == b'A'
ok = await msgs[0].ack_sync()
assert ok

msgs = await sub.fetch(1)
assert msgs[0].data == b'B'
ok = await msgs[0].ack_sync()
assert ok

msgs = await sub.fetch(1)
assert msgs[0].data == b'CDE'
ok = await msgs[0].ack_sync()
assert ok

@async_test
async def test_fetch_pull_subscribe_bind(self):
nc = NATS()
await nc.connect()

js = nc.jetstream()

stream_name = "TESTFETCH"
await js.add_stream(name=stream_name, subjects=["foo", "bar"])

for i in range(0, 5):
await js.publish("foo", b'A')

# Fetch with multiple filters on an ephemeral consumer.
cinfo = await js.add_consumer(
stream_name,
filter_subjects=["foo", "bar"],
inactive_threshold=300.0,
)

# Using named arguments.
psub = await js.pull_subscribe_bind(stream=stream_name, consumer=cinfo.name)
msgs = await psub.fetch(1)
msg = msgs[0]
await msg.ack()

# Backwards compatible way.
psub = await js.pull_subscribe_bind(cinfo.name, stream_name)
msgs = await psub.fetch(1)
msg = msgs[0]
await msg.ack()

# Using durable argument to refer to ephemeral is ok for backwards compatibility.
psub = await js.pull_subscribe_bind(durable=cinfo.name, stream=stream_name)
msgs = await psub.fetch(1)
msg = msgs[0]
await msg.ack()

# stream, consumer name order
psub = await js.pull_subscribe_bind(stream=stream_name, durable=cinfo.name)
msgs = await psub.fetch(1)
msg = msgs[0]
await msg.ack()

assert msg.metadata.num_pending == 1

# name can also be used to refer to the consumer name
psub = await js.pull_subscribe_bind(stream=stream_name, name=cinfo.name)
msgs = await psub.fetch(1)
msg = msgs[0]
await msg.ack()

# no pending messages
assert msg.metadata.num_pending == 0

with pytest.raises(ValueError) as err:
await js.pull_subscribe_bind(durable=cinfo.name)
assert str(err.value) == 'nats: stream name is required'

with pytest.raises(ValueError) as err:
await js.pull_subscribe_bind(cinfo.name)
assert str(err.value) == 'nats: stream name is required'

await nc.close()

class JSMTest(SingleJetStreamServerTestCase):

@async_test
Expand Down Expand Up @@ -1190,6 +1298,32 @@ async def test_consumer_with_name(self):

await nc.close()

@async_test
async def test_jsm_stream_info_options(self):
nc = NATS()
await nc.connect()
js = nc.jetstream()
jsm = nc.jsm()

# Create stream
stream = await jsm.add_stream(name="foo", subjects=["foo.>"])

for i in range(0, 5):
await js.publish("foo.%d" % i, b'A')

si = await jsm.stream_info("foo", subjects_filter=">")
assert si.state.messages == 5
assert si.state.subjects == {'foo.0': 1, 'foo.1': 1, 'foo.2': 1, 'foo.3': 1, 'foo.4': 1}

# When nothing matches streams subject will be empty.
si = await jsm.stream_info("foo", subjects_filter="asdf")
assert si.state.messages == 5
assert si.state.subjects == None

# By default do not report the number of subjects either.
si = await jsm.stream_info("foo")
assert si.state.messages == 5
assert si.state.subjects == None

class SubscribeTest(SingleJetStreamServerTestCase):

Expand Down Expand Up @@ -2072,6 +2206,9 @@ async def error_handler(e):

await kv.delete("hello.1")

status = await kv.status()
assert status.values == 102

# Get after delete is again a not found error.
with pytest.raises(KeyNotFoundError) as err:
await kv.get("hello.1")
Expand All @@ -2080,7 +2217,6 @@ async def error_handler(e):
assert err.value.entry.revision == 102
assert err.value.entry.value == None
assert err.value.op == 'DEL'

await kv.purge("hello.5")

with pytest.raises(KeyNotFoundError) as err:
Expand All @@ -2094,8 +2230,11 @@ async def error_handler(e):
with pytest.raises(NotFoundError):
await kv.get("hello.5")

# Check remaining messages in the stream state.
status = await kv.status()
assert status.values == 2
# NOTE: Behavior changed here from v2.10.9 => v2.10.10
# assert status.values == 2
assert status.values == 1

entry = await kv.get("hello")
assert "hello" == entry.key
Expand All @@ -2110,13 +2249,13 @@ async def error_handler(e):
assert 1 == entry.revision

status = await kv.status()
assert status.values == 2
assert status.values == 1

for i in range(100, 200):
await kv.put(f"hello.{i}", b'Hello JS KV!')

status = await kv.status()
assert status.values == 102
assert status.values == 101

with pytest.raises(NotFoundError):
await kv.get("hello.5")
Expand Down Expand Up @@ -2659,6 +2798,15 @@ async def error_handler(e):
assert e.key == 't.hello'
assert e.revision == 15

# Default watch timeout should 5 minutes
ci = await js.consumer_info("KV_WATCH", w._sub._consumer)
assert ci.config.inactive_threshold == 300.0

# Setup new watch with a custom inactive_threshold.
w = await kv.watchall(inactive_threshold=10.0)
ci = await js.consumer_info("KV_WATCH", w._sub._consumer)
assert ci.config.inactive_threshold == 10.0

await nc.close()

@async_test
Expand Down
11 changes: 11 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,3 +605,14 @@ def wrapper(test_case, *args, **kw):
)

return wrapper

def async_debug_test(test_case_fun, timeout=3600):

@wraps(test_case_fun)
def wrapper(test_case, *args, **kw):
asyncio.set_event_loop(test_case.loop)
return asyncio.run(
asyncio.wait_for(test_case_fun(test_case, *args, **kw), timeout)
)

return wrapper

0 comments on commit aa46c60

Please sign in to comment.