Skip to content

Commit

Permalink
Merge pull request #140 from ueckoken/connectionclosederror
Browse files Browse the repository at this point in the history
fix to handle ConnectionClosedError
  • Loading branch information
otariidae authored Oct 16, 2022
2 parents af04e13 + 1109fc4 commit 4aa2878
Showing 1 changed file with 88 additions and 81 deletions.
169 changes: 88 additions & 81 deletions frontend/videoCast/one_to_multiple_cast_skyway.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import asyncio
import websockets
from websockets.exceptions import ConnectionClosedError
import json
import ssl
import os
Expand Down Expand Up @@ -39,46 +40,64 @@ async def handler(websocket, path):
async with lock:
connections.append(websocket)

async for message in websocket: # 受信
dictionary = json.loads(message)
# msg_type, room_idで構成
# connect_senderの時のみ+peer_id
promises = []
msg_type = dictionary["msg_type"]
room_id = dictionary["room_id"]

if room_id not in rooms:
room = {
"sender_socket": None,
"peer_id": None,
"skyway_room_id": None,
"connections": [websocket],
"connect_num": 0,
# ルームの累積接続数が1000行くと通信が弾かれるのでその前にルームを切り替え
}
async with lock:
rooms[room_id] = room
else:
room = rooms[room_id]
if websocket not in room["connections"]:
try:
async for message in websocket: # 受信
dictionary = json.loads(message)
# msg_type, room_idで構成
# connect_senderの時のみ+peer_id
promises = []
msg_type = dictionary["msg_type"]
room_id = dictionary["room_id"]

if room_id not in rooms:
room = {
"sender_socket": None,
"peer_id": None,
"skyway_room_id": None,
"connections": [websocket],
"connect_num": 0,
# ルームの累積接続数が1000行くと通信が弾かれるのでその前にルームを切り替え
}
async with lock:
room["connections"].append(websocket)
# 現在の通信のwebsocketが入ったroom_idのroomが存在することを保証

if msg_type == "connect_sender":
if SENDER_TOKEN is None or SENDER_TOKEN == dictionary["sender_token"]:
async with lock: # if room["sender_socket"] is None:
print("sender_connect")
room["sender_socket"] = websocket
room["skyway_room_id"] = dictionary["skyway_room_id"]
room["connect_num"] = 0
# senderは上書き
room["peer_id"] = dictionary["peer_id"]
for connection in room["connections"]:
if connection is websocket:
continue
print("send")
promise = connection.send(
rooms[room_id] = room
else:
room = rooms[room_id]
if websocket not in room["connections"]:
async with lock:
room["connections"].append(websocket)
# 現在の通信のwebsocketが入ったroom_idのroomが存在することを保証

if msg_type == "connect_sender":
if SENDER_TOKEN is None or SENDER_TOKEN == dictionary["sender_token"]:
async with lock: # if room["sender_socket"] is None:
print("sender_connect")
room["sender_socket"] = websocket
room["skyway_room_id"] = dictionary["skyway_room_id"]
room["connect_num"] = 0
# senderは上書き
room["peer_id"] = dictionary["peer_id"]
for connection in room["connections"]:
if connection is websocket:
continue
print("send")
promise = connection.send(
json.dumps(
{
"msg_type": "connect_receiver",
"room_id": room_id,
"skyway_room_id": room["skyway_room_id"],
"peer_id": room["peer_id"],
}
)
)
room["connect_num"] += 1
promises.append(promise)
elif msg_type == "connect_receiver":
print("connect_receiver")
if room["sender_socket"] is not None:
print("send")
if room["connect_num"] < MAX_CONNECT_NUM:
promise = websocket.send(
json.dumps(
{
"msg_type": "connect_receiver",
Expand All @@ -88,53 +107,41 @@ async def handler(websocket, path):
}
)
)
room["connect_num"] += 1

async with lock:
room["connect_num"] += 1
promises.append(promise)
elif msg_type == "connect_receiver":
print("connect_receiver")
if room["sender_socket"] is not None:
print("send")
if room["connect_num"] < MAX_CONNECT_NUM:
promise = websocket.send(
json.dumps(
{
"msg_type": "connect_receiver",
"room_id": room_id,
"skyway_room_id": room["skyway_room_id"],
"peer_id": room["peer_id"],
}
else:
# ルームの累積接続数が溢れそうだったら新しい部屋をsenderに作ってもらう
promise = room["sender_socket"].send(
json.dumps(
{
"msg_type": "request_reconnect_sender",
"room_id": room_id,
}
)
)
)

promises.append(promise)
async with lock:
room["connect_num"] = 0
elif msg_type == "exit_room":
if websocket in room["connections"]:
async with lock:
room["connect_num"] += 1
promises.append(promise)
else:
# ルームの累積接続数が溢れそうだったら新しい部屋をsenderに作ってもらう
promise = room["sender_socket"].send(
json.dumps(
{"msg_type": "request_reconnect_sender", "room_id": room_id}
)
)
promises.append(promise)
room["connections"].remove(websocket)
if room["sender_socket"] is websocket:
async with lock:
room["sender_socket"] = None
room["peer_id"] = None
room["skyway_room_id"] = None
room["connect_num"] = 0
elif msg_type == "exit_room":
if websocket in room["connections"]:
async with lock:
room["connections"].remove(websocket)
if room["sender_socket"] is websocket:
async with lock:
room["sender_socket"] = None
room["peer_id"] = None
room["skyway_room_id"] = None
room["connect_num"] = 0
print("{}: {}".format(path, dictionary))
for p in promises:
try:
a = await p
except:
pass
print("{}: {}".format(path, dictionary))
for p in promises:
try:
a = await p
except:
pass
except ConnectionClosedError:
pass

# 接続が切れたらその接続を削除
for room_id, room in rooms.items():
Expand Down

0 comments on commit 4aa2878

Please sign in to comment.