Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Protect against errors raised when adding a request to the engine #230

Merged
merged 5 commits into from
Mar 15, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 30 additions & 10 deletions serve/mlc_serve/engine/staging_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ def __init__(
self.next_generation_output = None
self.requests_lock = Lock()
self.requests = dict[RequestId, RequestState]()
self.requests_to_be_cancelled_lock = Lock()
# Error message for each request that fails to be added to the engine
self.requests_to_be_cancelled = dict[RequestId, str]()

# TODO(@team): This is a temporary solution to expose model config to higher API layer.
# Follow-up with the proper solution
Expand Down Expand Up @@ -119,13 +122,18 @@ def add(self, requests: list[Request]):
assert isinstance(req.stopping_criteria.stop_sequences, list)

# If the request violates the tokenization, this returns None, so skip.
state = get_new_request_state(
req,
self.conversation_template,
self.tokenizer,
self.model_artifact_config.vocab_size,
)
new_request_states.append(state)
try:
state = get_new_request_state(
req,
self.conversation_template,
self.tokenizer,
self.model_artifact_config.vocab_size,
)
new_request_states.append(state)
except Exception as e:
LOG.warn("Failed to add a request", request_id=req.request_id)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be better to just throw the error here, and catch it at https://github.com/octoml/ollm/blob/d29be36231e666f761a2fb08dbf0e4ce758618f4/mlc-serve/mlc_serve/engine/async_connector.py#L153? I think initially we just assumed engine.add cannot fail. Now it might be a good time to revisit this assumption. One caveat of the approach of deferring error reporting to the engine.step is that, streaming API can no longer returns error status code once streaming begins (because of how server-sent event works)

Just sharing some thoughts. No need to block this PR if it's related to production issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that for its simplicity, and it also removes the need for an additional lock. I reworked this PR according to your suggestion, but I'm not sure what to do after catching the error in async_connector.py. The following seems to work, but is this the right way?

try:
    await asyncio.to_thread(self.engine.add, [request])
except TextGenerationError as e:
    raise asyncio.CancelledError(e)

Copy link

@yelite yelite Mar 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think any change is needed in async_connector, but I am not fully sure. The TextGenerationError will just propagate to the http handler and the regular error handling can happen there. Because it's the engine.add that fails, we don't need to call engine.cancel either (as in https://github.com/octoml/ollm/blob/de6378ee6a1391276530e94b7b4374f01792c8ae/mlc-serve/mlc_serve/engine/async_connector.py#L99)

Copy link

@yelite yelite Mar 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One benefit of throwing error in engine.add is that the http handler will be able to respond with failure http status code for streaming requests. Throwing a CancelledError will confuse the handler

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed that not catching the error in async_connector.py still keeps the server from dying.

with self.requests_to_be_cancelled_lock:
self.requests_to_be_cancelled[req.request_id] = str(e)

self.command_queue.put(AddRequestsCommand(request_states=new_request_states))

Expand Down Expand Up @@ -171,11 +179,25 @@ def step(self) -> InferenceStepResult:
has_pending_requests=self.has_pending_requests(),
)

outputs = list[RequestOutput]()

with self.requests_to_be_cancelled_lock:
if len(self.requests_to_be_cancelled) > 0:
for req_id, err_msg in self.requests_to_be_cancelled.items():
outputs.append(
RequestOutput(
req_id,
sequences=[],
error=err_msg,
Copy link
Member Author

@masahi masahi Mar 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By returning a non-None error here, the exception is now raised at https://github.com/octoml/mlc-llm/blob/batch-serving/serve/mlc_serve/engine/async_connector.py#L88-L89 like

  File "/home/masahi/projects/dev/mlc-llm/serve/mlc_serve/api/handler.py", line 159, in request_completion
    return await collect_result_stream(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/masahi/projects/dev/mlc-llm/serve/mlc_serve/api/handler.py", line 233, in collect_result_stream                                                                              
    async for res in result_generator:                                                       
  File "/home/masahi/projects/dev/mlc-llm/serve/mlc_serve/engine/async_connector.py", line 89, in generate
    raise TextGenerationError(output.error)
mlc_serve.engine.error.TextGenerationError: Conversation roles must alternate user/assistant/user/assistant/...

Using the standalone MLC server, a client still gets openai.InternalServerError: Internal Server Error as a response but the server doesn't die.

Would that be ok? I assume ollm has a proper response handling logic in such case @jroesch

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes as long as we get an exception back we should catch and convert it properly!

)
)
self.requests_to_be_cancelled.clear()

if not self._is_ready_to_serve():
raise RuntimeError("GenerationLoopWorker process is not running")

if not self.has_pending_requests():
return InferenceStepResult([])
return InferenceStepResult(outputs)

if self.next_generation_output is None:
generation_output = self.result_queue.get()
Expand All @@ -188,8 +210,6 @@ def step(self) -> InferenceStepResult:
f"Error from GenerationLoopWorker process: {generation_output.error}"
) from generation_output.error

outputs = list[RequestOutput]()

with self.requests_lock:
LOG.debug(
"StagingInferenceEngine.step obtained requests_lock",
Expand Down
Loading