diff --git a/.travis.yml b/.travis.yml index d93e3f16..cff0afc5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -72,18 +72,6 @@ jobs: - bash ./scripts/install_nats.sh install: - pip install -e .[fast-mail-parser] - - name: "Python: 3.11 (nats-server@dev)" - python: "3.11" - env: - - NATS_SERVER_VERSION=dev - before_install: - - sudo apt update && sudo apt install gcc build-essential -y - - sudo apt-get install python3-pip - - sudo apt-get install python3-pytest - - pip install --upgrade pip - - bash ./scripts/install_nats.sh - install: - - pip install -e .[fast-mail-parser] - name: "Python: 3.7" python: "3.7" before_install: @@ -101,5 +89,5 @@ jobs: - name: "Python: 3.8" - name: "Python: 3.11" - name: "Python: 3.11/uvloop" - - name: "Python: 3.11 (nats-server@dev)" + - name: "Python: 3.11 (nats-server@main)" - name: "Python: 3.12" diff --git a/nats/js/api.py b/nats/js/api.py index c543d0f3..725a63c3 100644 --- a/nats/js/api.py +++ b/nats/js/api.py @@ -181,6 +181,7 @@ class StreamState(Base): deleted: Optional[List[int]] = None num_deleted: Optional[int] = None lost: Optional[LostStreamData] = None + subjects: Optional[Dict[str, int]] = None @classmethod def from_response(cls, resp: Dict[str, Any]): @@ -384,6 +385,7 @@ class ConsumerConfig(Base): ack_wait: Optional[float] = None # in seconds max_deliver: Optional[int] = None filter_subject: Optional[str] = None + filter_subjects: Optional[List[str]] = None replay_policy: Optional[ReplayPolicy] = ReplayPolicy.INSTANT rate_limit_bps: Optional[int] = None sample_freq: Optional[str] = None diff --git a/nats/js/client.py b/nats/js/client.py index 1cf5eb9a..2c54eca7 100644 --- a/nats/js/client.py +++ b/nats/js/client.py @@ -152,6 +152,7 @@ async def subscribe( pending_bytes_limit: int = DEFAULT_JS_SUB_PENDING_BYTES_LIMIT, deliver_policy: Optional[api.DeliverPolicy] = None, headers_only: Optional[bool] = None, + inactive_threshold: Optional[float] = None, ) -> PushSubscription: """Create consumer if needed and push-subscribe to it. @@ -272,6 +273,8 @@ async def cb(msg): if deliver_policy: # NOTE: deliver_policy is defaulting to ALL so check is different for this one. config.deliver_policy = deliver_policy + if inactive_threshold: + config.inactive_threshold = inactive_threshold # Create inbox for push consumer. deliver = self._nc.new_inbox() @@ -461,12 +464,13 @@ async def main(): async def pull_subscribe_bind( self, - durable: str, - stream: str, + consumer: Optional[str] = None, + stream: Optional[str] = None, inbox_prefix: bytes = api.INBOX_PREFIX, pending_msgs_limit: int = DEFAULT_JS_SUB_PENDING_MSGS_LIMIT, pending_bytes_limit: int = DEFAULT_JS_SUB_PENDING_BYTES_LIMIT, name: Optional[str] = None, + durable: Optional[str] = None, ) -> JetStreamContext.PullSubscription: """ pull_subscribe returns a `PullSubscription` that can be delivered messages @@ -494,22 +498,29 @@ async def main(): asyncio.run(main()) """ + if not stream: + raise ValueError("nats: stream name is required") deliver = inbox_prefix + self._nc._nuid.next() sub = await self._nc.subscribe( deliver.decode(), pending_msgs_limit=pending_msgs_limit, pending_bytes_limit=pending_bytes_limit ) - consumer = None + consumer_name = None + # In nats.py v2.7.0 changing the first arg to be 'consumer' instead of 'durable', + # but continue to support for backwards compatibility. if durable: - consumer = durable + consumer_name = durable + elif name: + # This should not be common and 'consumer' arg preferred instead but support anyway. + consumer_name = name else: - consumer = name + consumer_name = consumer return JetStreamContext.PullSubscription( js=self, sub=sub, stream=stream, - consumer=consumer, + consumer=consumer_name, deliver=deliver, ) diff --git a/nats/js/kv.py b/nats/js/kv.py index a8f0dcfd..40b92cd6 100644 --- a/nats/js/kv.py +++ b/nats/js/kv.py @@ -387,6 +387,7 @@ async def watch( include_history=False, ignore_deletes=False, meta_only=False, + inactive_threshold=None, ) -> KeyWatcher: """ watch will fire a callback when a key that matches the keys @@ -436,12 +437,17 @@ async def watch_updates(msg): if not include_history: deliver_policy = api.DeliverPolicy.LAST_PER_SUBJECT + # Cleanup watchers after 5 minutes of inactivity by default. + if not inactive_threshold: + inactive_threshold = 5 * 60 + watcher._sub = await self._js.subscribe( subject, cb=watch_updates, ordered_consumer=True, deliver_policy=deliver_policy, headers_only=meta_only, + inactive_threshold=inactive_threshold, ) await asyncio.sleep(0) diff --git a/nats/js/manager.py b/nats/js/manager.py index 5d11805a..7bce1816 100644 --- a/nats/js/manager.py +++ b/nats/js/manager.py @@ -68,12 +68,15 @@ async def find_stream_name_by_subject(self, subject: str) -> str: raise NotFoundError return info['streams'][0] - async def stream_info(self, name: str) -> api.StreamInfo: + async def stream_info(self, name: str, subjects_filter: Optional[str] = None) -> api.StreamInfo: """ Get the latest StreamInfo by stream name. """ + req_data = '' + if subjects_filter: + req_data = json.dumps({"subjects_filter": subjects_filter}) resp = await self._api_request( - f"{self._prefix}.STREAM.INFO.{name}", timeout=self._timeout + f"{self._prefix}.STREAM.INFO.{name}", req_data.encode(), timeout=self._timeout ) return api.StreamInfo.from_response(resp) diff --git a/scripts/install_nats.sh b/scripts/install_nats.sh index 1dfde185..a963a408 100755 --- a/scripts/install_nats.sh +++ b/scripts/install_nats.sh @@ -2,7 +2,7 @@ set -e -export DEFAULT_NATS_SERVER_VERSION=latest +export DEFAULT_NATS_SERVER_VERSION=v2.10.10 export NATS_SERVER_VERSION="${NATS_SERVER_VERSION:=$DEFAULT_NATS_SERVER_VERSION}" diff --git a/tests/test_js.py b/tests/test_js.py index a15742f4..25f79a2b 100644 --- a/tests/test_js.py +++ b/tests/test_js.py @@ -801,6 +801,114 @@ async def test_ephemeral_pull_subscribe(self): self.assertTrue(cinfo.config.durable_name == None) await nc.close() + + @async_test + async def test_consumer_with_multiple_filters(self): + nc = NATS() + await nc.connect() + js = nc.jetstream() + jsm = nc.jsm() + + # Create stream. + await jsm.add_stream(name="ctests", subjects=["a", "b", "c.>"]) + await js.publish("a", b'A') + await js.publish("b", b'B') + await js.publish("c.d", b'CD') + await js.publish("c.d.e", b'CDE') + + # Create ephemeral pull consumer with a name. + stream_name = "ctests" + consumer_name = "multi" + cinfo = await jsm.add_consumer( + stream_name, + name=consumer_name, + ack_policy="explicit", + filter_subjects=["a", "b", "c.d.e"], + durable_name=consumer_name, # must be the same as name + ) + assert cinfo.config.name == consumer_name + + sub = await js.pull_subscribe_bind(consumer_name, stream_name) + msgs = await sub.fetch(1) + assert msgs[0].data == b'A' + ok = await msgs[0].ack_sync() + assert ok + + msgs = await sub.fetch(1) + assert msgs[0].data == b'B' + ok = await msgs[0].ack_sync() + assert ok + + msgs = await sub.fetch(1) + assert msgs[0].data == b'CDE' + ok = await msgs[0].ack_sync() + assert ok + + @async_test + async def test_fetch_pull_subscribe_bind(self): + nc = NATS() + await nc.connect() + + js = nc.jetstream() + + stream_name = "TESTFETCH" + await js.add_stream(name=stream_name, subjects=["foo", "bar"]) + + for i in range(0, 5): + await js.publish("foo", b'A') + + # Fetch with multiple filters on an ephemeral consumer. + cinfo = await js.add_consumer( + stream_name, + filter_subjects=["foo", "bar"], + inactive_threshold=300.0, + ) + + # Using named arguments. + psub = await js.pull_subscribe_bind(stream=stream_name, consumer=cinfo.name) + msgs = await psub.fetch(1) + msg = msgs[0] + await msg.ack() + + # Backwards compatible way. + psub = await js.pull_subscribe_bind(cinfo.name, stream_name) + msgs = await psub.fetch(1) + msg = msgs[0] + await msg.ack() + + # Using durable argument to refer to ephemeral is ok for backwards compatibility. + psub = await js.pull_subscribe_bind(durable=cinfo.name, stream=stream_name) + msgs = await psub.fetch(1) + msg = msgs[0] + await msg.ack() + + # stream, consumer name order + psub = await js.pull_subscribe_bind(stream=stream_name, durable=cinfo.name) + msgs = await psub.fetch(1) + msg = msgs[0] + await msg.ack() + + assert msg.metadata.num_pending == 1 + + # name can also be used to refer to the consumer name + psub = await js.pull_subscribe_bind(stream=stream_name, name=cinfo.name) + msgs = await psub.fetch(1) + msg = msgs[0] + await msg.ack() + + # no pending messages + assert msg.metadata.num_pending == 0 + + with pytest.raises(ValueError) as err: + await js.pull_subscribe_bind(durable=cinfo.name) + assert str(err.value) == 'nats: stream name is required' + + with pytest.raises(ValueError) as err: + await js.pull_subscribe_bind(cinfo.name) + assert str(err.value) == 'nats: stream name is required' + + await nc.close() + class JSMTest(SingleJetStreamServerTestCase): @async_test @@ -1190,6 +1298,32 @@ async def test_consumer_with_name(self): await nc.close() + @async_test + async def test_jsm_stream_info_options(self): + nc = NATS() + await nc.connect() + js = nc.jetstream() + jsm = nc.jsm() + + # Create stream + stream = await jsm.add_stream(name="foo", subjects=["foo.>"]) + + for i in range(0, 5): + await js.publish("foo.%d" % i, b'A') + + si = await jsm.stream_info("foo", subjects_filter=">") + assert si.state.messages == 5 + assert si.state.subjects == {'foo.0': 1, 'foo.1': 1, 'foo.2': 1, 'foo.3': 1, 'foo.4': 1} + + # When nothing matches streams subject will be empty. + si = await jsm.stream_info("foo", subjects_filter="asdf") + assert si.state.messages == 5 + assert si.state.subjects == None + + # By default do not report the number of subjects either. + si = await jsm.stream_info("foo") + assert si.state.messages == 5 + assert si.state.subjects == None class SubscribeTest(SingleJetStreamServerTestCase): @@ -2072,6 +2206,9 @@ async def error_handler(e): await kv.delete("hello.1") + status = await kv.status() + assert status.values == 102 + # Get after delete is again a not found error. with pytest.raises(KeyNotFoundError) as err: await kv.get("hello.1") @@ -2080,7 +2217,6 @@ async def error_handler(e): assert err.value.entry.revision == 102 assert err.value.entry.value == None assert err.value.op == 'DEL' - await kv.purge("hello.5") with pytest.raises(KeyNotFoundError) as err: @@ -2094,8 +2230,11 @@ async def error_handler(e): with pytest.raises(NotFoundError): await kv.get("hello.5") + # Check remaining messages in the stream state. status = await kv.status() - assert status.values == 2 + # NOTE: Behavior changed here from v2.10.9 => v2.10.10 + # assert status.values == 2 + assert status.values == 1 entry = await kv.get("hello") assert "hello" == entry.key @@ -2110,13 +2249,13 @@ async def error_handler(e): assert 1 == entry.revision status = await kv.status() - assert status.values == 2 + assert status.values == 1 for i in range(100, 200): await kv.put(f"hello.{i}", b'Hello JS KV!') status = await kv.status() - assert status.values == 102 + assert status.values == 101 with pytest.raises(NotFoundError): await kv.get("hello.5") @@ -2659,6 +2798,15 @@ async def error_handler(e): assert e.key == 't.hello' assert e.revision == 15 + # Default watch timeout should 5 minutes + ci = await js.consumer_info("KV_WATCH", w._sub._consumer) + assert ci.config.inactive_threshold == 300.0 + + # Setup new watch with a custom inactive_threshold. + w = await kv.watchall(inactive_threshold=10.0) + ci = await js.consumer_info("KV_WATCH", w._sub._consumer) + assert ci.config.inactive_threshold == 10.0 + await nc.close() @async_test diff --git a/tests/utils.py b/tests/utils.py index 95a589fb..43dd27fa 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -605,3 +605,14 @@ def wrapper(test_case, *args, **kw): ) return wrapper + +def async_debug_test(test_case_fun, timeout=3600): + + @wraps(test_case_fun) + def wrapper(test_case, *args, **kw): + asyncio.set_event_loop(test_case.loop) + return asyncio.run( + asyncio.wait_for(test_case_fun(test_case, *args, **kw), timeout) + ) + + return wrapper