Skip to content

Commit

Permalink
fix: Reduce memory usage when publishing prediction log to kafka (#525)
Browse files Browse the repository at this point in the history
<!--  Thanks for sending a pull request!  Here are some tips for you:

1. Run unit tests and ensure that they are passing
2. If your change introduces any API changes, make sure to update the
e2e tests
3. Make sure documentation is updated for your PR!

-->
# Description
<!-- Briefly describe the motivation for the change. Please include
illustrations where appropriate. -->
We've seen gradual increase of memory usage when model observability is
enabled for a model.
 
First hypothesis why this happened is due to using asyncio, because we
pass prediction input and output to the async function. We try to prove
it by reducing sampling rate to 0, since the async function that being
called need to publish to kafka, so we need to isolate this only for
asyncio overhead. After set sampling rate to 0 the memory usage is
stable there is no gradual increase

Since first hypothesis is not correct, we have new hypothesis that this
is due to publishing the data to kafka, and we did memory profiler to
the model

PS: We use memray as profiler https://github.com/bloomberg/memray
 
<img width="1135" alt="Screen Shot 2024-01-29 at 09 47 21"
src="https://github.com/caraml-dev/merlin/assets/2369255/2f4e6aaf-2279-4052-97d2-f5515d743f76">

<img width="1777" alt="Screen Shot 2024-01-29 at 09 49 11"
src="https://github.com/caraml-dev/merlin/assets/2369255/2ac87035-6f8a-4d4a-8c5f-f216f7904bf5">

<img width="1789" alt="Screen Shot 2024-01-29 at 09 48 52"
src="https://github.com/caraml-dev/merlin/assets/2369255/556bf0bf-d90e-4720-af53-34029d2c45ec">

We see that the memory usage is keep increasing and producing the
message to kafka contribute to this.

# Modifications
To solve this problem, kafka producer must call `poll` after publish the
message, this is necessary so `ack` buffer from producer will be drained
and the memory usage won't gradually increase, ref:
[1](openedx/event-bus-kafka#10) ,
[2](https://github.com/confluentinc/librdkafka/wiki/FAQ#when-and-how-should-i-call-rd_kafka_poll)

After the changes
<img width="1139" alt="Screen Shot 2024-01-29 at 09 49 02"
src="https://github.com/caraml-dev/merlin/assets/2369255/beab944d-fe05-445c-865a-524ef40630d0">

<img width="1786" alt="Screen Shot 2024-01-29 at 09 49 19"
src="https://github.com/caraml-dev/merlin/assets/2369255/0d7a79ad-36c7-4154-92af-e00c6a893b8e">

# Tests
<!-- Besides the existing / updated automated tests, what specific
scenarios should be tested? Consider the backward compatibility of the
changes, whether corner cases are covered, etc. Please describe the
tests and check the ones that have been completed. Eg:
- [x] Deploying new and existing standard models
- [ ] Deploying PyFunc models
-->

# Checklist
- [x] Added PR label
- [ ] Added unit test, integration, and/or e2e tests
- [x] Tested locally
- [ ] Updated documentation
- [ ] Update Swagger spec if the PR introduce API changes
- [ ] Regenerated Golang and Python client if the PR introduces API
changes

# Release Notes
<!--
Does this PR introduce a user-facing change?
If no, just write "NONE" in the release-note block below.
If yes, a release note is required. Enter your extended release note in
the block below.
If the PR requires additional action from users switching to the new
release, include the string "action required".

For more information about release notes, see kubernetes' guide here:
http://git.k8s.io/community/contributors/guide/release-notes.md
-->

```release-note

```
  • Loading branch information
tiopramayudi authored Jan 29, 2024
1 parent 9f3b161 commit e97330c
Showing 1 changed file with 1 addition and 0 deletions.
1 change: 1 addition & 0 deletions python/pyfunc-server/pyfuncserver/publisher/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ def produce(self, data: PyFuncOutput):
prediction_log = build_prediction_log(pyfunc_output=data, model_manifest=self.model_manifest)
serialized_data = prediction_log.SerializeToString()
self.producer.produce(topic=self.topic, value=serialized_data)
self.producer.poll(0)

0 comments on commit e97330c

Please sign in to comment.