Skip to content

Commit

Permalink
发送部分消息
Browse files Browse the repository at this point in the history
  • Loading branch information
Yuanpei-Wu committed Sep 2, 2024
1 parent 89ee873 commit df476bc
Showing 1 changed file with 9 additions and 3 deletions.
12 changes: 9 additions & 3 deletions server.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,12 @@ async def send_json(self, event, data, sid=None):
await send_socket_catch_exception(self.sockets[sid].send_json, message)

def send_sync(self, event, data, sid=None):
if event in ["execution_start", "execution_interrupted", "execution_error"]:
self.send_msg2redis(event, data)
self.loop.call_soon_threadsafe(
self.messages.put_nowait, (event, data, sid))

def send_msg2redis(self, event, data):
if hasattr(self, "last_prompt_id") is not True:
logging.warning("server has not attr last_prompt_id")
return
Expand All @@ -664,11 +670,11 @@ def send_sync(self, event, data, sid=None):
"type": event,
"data": data,
}
begin = time.time()
res = self.redis_op.zadd_with_expire(key, json.dumps(message), int(time.time()), 120)
logging.info("send_message time: {} seconds. event: {}".format(time.time() - begin, event))
if not res:
logging.warning(f"Failed to send {event}")
self.loop.call_soon_threadsafe(
self.messages.put_nowait, (event, data, sid))
logging.error("failed to send_message. key: {}. event: {}".format(key, event))

def queue_updated(self):
self.send_sync("status", { "status": self.get_queue_info() })
Expand Down

0 comments on commit df476bc

Please sign in to comment.