diff --git a/src/handlers/bulletin.py b/src/handlers/bulletin.py index a353559a..16afe9f2 100644 --- a/src/handlers/bulletin.py +++ b/src/handlers/bulletin.py @@ -11,7 +11,7 @@ class BulletinHandler(RequestHandler): @reqenv async def get(self, bulletin_id=None): if bulletin_id is None: - can_submit = await JudgeServerClusterService.inst.is_server_online() + can_submit = JudgeServerClusterService.inst.is_server_online() _, bulletin_list = await BulletinService.inst.list_bulletin() bulletin_list.sort(key=lambda b: (b['pinned'], b['timestamp']), reverse=True) diff --git a/src/handlers/contests/manage/pro.py b/src/handlers/contests/manage/pro.py index c3e5191f..969e02fa 100644 --- a/src/handlers/contests/manage/pro.py +++ b/src/handlers/contests/manage/pro.py @@ -85,7 +85,7 @@ async def post(self): elif reqtype == "rechal": pro_id = int(pro_id) - can_submit = await JudgeServerClusterService.inst.is_server_online() + can_submit = JudgeServerClusterService.inst.is_server_online() if not can_submit: self.error('Ejudge') return @@ -121,6 +121,7 @@ async def _rechal(rechals): pro_id, pro['testm_conf'], comp_type, + ChalConst.CONTEST_REJUDGE_PRI, ) await asyncio.create_task(_rechal(rechals=result)) diff --git a/src/handlers/manage/judge.py b/src/handlers/manage/judge.py index 43ff22df..4bc17427 100644 --- a/src/handlers/manage/judge.py +++ b/src/handlers/manage/judge.py @@ -19,7 +19,7 @@ class ManageJudgeHandler(RequestHandler): @reqenv @require_permission(UserConst.ACCTTYPE_KERNEL) async def get(self): - judge_status_list = await JudgeServerClusterService.inst.get_servers_status() + judge_status_list = JudgeServerClusterService.inst.get_servers_status() await self.render('manage/judge', page='judge', judge_status_list=judge_status_list) @reqenv @@ -30,7 +30,7 @@ async def post(self): if reqtype == 'connect': index = int(self.get_argument('index')) - err, server_inform = await JudgeServerClusterService.inst.get_server_status(index) + err, server_inform = JudgeServerClusterService.inst.get_server_status(index) if (server_name := server_inform['name']) == '': server_name = f"server-{index}" @@ -52,7 +52,7 @@ async def post(self): index = int(self.get_argument('index')) pwd = str(self.get_argument('pwd')) - err, server_inform = await JudgeServerClusterService.inst.get_server_status(index) + err, server_inform = JudgeServerClusterService.inst.get_server_status(index) if (server_name := server_inform['name']) == '': server_name = f"server-{index}" diff --git a/src/handlers/manage/pro.py b/src/handlers/manage/pro.py index fc15081b..b1ed6b1b 100644 --- a/src/handlers/manage/pro.py +++ b/src/handlers/manage/pro.py @@ -300,7 +300,7 @@ async def post(self, page=None): return pro_id = int(self.get_argument('pro_id')) - can_submit = await JudgeServerClusterService.inst.is_server_online() + can_submit = JudgeServerClusterService.inst.is_server_online() if not can_submit: self.error('Ejudge') return @@ -339,6 +339,7 @@ async def _rechal(rechals): pro_id, pro['testm_conf'], comp_type, + ChalConst.NORMAL_REJUDGE_PRI, ) await asyncio.create_task(_rechal(rechals=result)) diff --git a/src/handlers/pro.py b/src/handlers/pro.py index 94eace90..ffd5f5b2 100644 --- a/src/handlers/pro.py +++ b/src/handlers/pro.py @@ -270,7 +270,7 @@ async def get(self, pro_id): if result['state'] is None or result['state'] != ChalConst.STATE_AC: pro['tags'] = '' - can_submit = await JudgeServerClusterService.inst.is_server_online() + can_submit = JudgeServerClusterService.inst.is_server_online() await self.render( 'pro', diff --git a/src/handlers/submit.py b/src/handlers/submit.py index 0aab007d..40343257 100644 --- a/src/handlers/submit.py +++ b/src/handlers/submit.py @@ -21,6 +21,7 @@ async def get(self, pro_id=None): pro_id = int(pro_id) + # TODO: if problem is makefile type, we should restrict compiler type allow_compilers = ChalConst.ALLOW_COMPILERS if self.contest: if not self.contest.is_running() and not self.contest.is_admin(self.acct): @@ -33,7 +34,7 @@ async def get(self, pro_id=None): allow_compilers = self.contest.allow_compilers - can_submit = await JudgeServerClusterService.inst.is_server_online() + can_submit = JudgeServerClusterService.inst.is_server_online() if not can_submit: self.finish('

All Judge Server Offline

') @@ -56,7 +57,7 @@ async def get(self, pro_id=None): @require_permission([UserConst.ACCTTYPE_USER, UserConst.ACCTTYPE_KERNEL]) @contest_require_permission('all') async def post(self): - can_submit = await JudgeServerClusterService.inst.is_server_online() + can_submit = JudgeServerClusterService.inst.is_server_online() if not can_submit: self.error('Ejudge') @@ -73,6 +74,7 @@ async def post(self): comp_type = str(self.get_argument('comp_type')) if self.contest: + pri = ChalConst.CONTEST_PRI if not self.contest.is_running() and not self.contest.is_admin(self.acct): self.error('Eacces') return @@ -80,6 +82,8 @@ async def post(self): if pro_id not in self.contest.pro_list: self.error('Enoext') return + else: + pri = ChalConst.NORMAL_PRI err = await self.is_allow_submit(code, comp_type, pro_id) if err: @@ -107,6 +111,10 @@ async def post(self): elif reqtype == 'rechal': if ((self.contest is None and self.acct.is_kernel()) # not in contest or (self.contest and self.contest.is_admin(self.acct))): # in contest + if self.contest: + pri = ChalConst.CONTEST_REJUDGE_PRI + else: + pri = ChalConst.NORMAL_REJUDGE_PRI chal_id = int(self.get_argument('chal_id')) @@ -129,6 +137,7 @@ async def post(self): pro_id, pro['testm_conf'], comp_type, + pri=pri ) if err: self.error(err) @@ -139,6 +148,7 @@ async def post(self): self.finish(json.dumps(chal_id)) return + async def is_allow_submit(self, code: str, comp_type: str, pro_id: int): # limits variable config allow_compilers = ChalConst.ALLOW_COMPILERS @@ -153,6 +163,7 @@ async def is_allow_submit(self, code: str, comp_type: str, pro_id: int): if len(code) > ProService.CODE_MAX: return 'Ecodemax' + # TODO: if problem is makefile type, we should restrict compiler type if comp_type not in allow_compilers: return 'Ecomp' @@ -185,5 +196,4 @@ async def is_allow_submit(self, code: str, comp_type: str, pro_id: int): else: await self.rs.set(last_submit_name, int(time.time())) - return None diff --git a/src/services/chal.py b/src/services/chal.py index b5f266d8..ff1fc73a 100644 --- a/src/services/chal.py +++ b/src/services/chal.py @@ -75,6 +75,11 @@ class ChalConst: 'java': 'OpenJDK 17.0.8', } + NORMAL_PRI = 0 + CONTEST_PRI = 1 + CONTEST_REJUDGE_PRI = 2 + NORMAL_REJUDGE_PRI = 3 + @dataclass class ChalSearchingParam: @@ -287,7 +292,7 @@ async def get_chal(self, chal_id): }, ) - async def emit_chal(self, chal_id, pro_id, testm_conf, comp_type): + async def emit_chal(self, chal_id, pro_id, testm_conf, comp_type, pri: int): chal_id = int(chal_id) pro_id = int(pro_id) @@ -335,11 +340,7 @@ async def emit_chal(self, chal_id, pro_id, testm_conf, comp_type): file_ext = ChalConst.FILE_EXTENSION[comp_type] - try: - code_f = open(f"code/{chal_id}/main.{file_ext}", 'rb') - code_f.close() - - except FileNotFoundError: + if not os.path.isfile(f"code/{chal_id}/main.{file_ext}"): for test in testl: await self.update_test(chal_id, test['test_idx'], ChalConst.STATE_ERR, 0, 0, '', refresh_db=False) await self.rs.publish('materialized_view_req', (await self.rs.get('materialized_view_counter'))) @@ -352,7 +353,7 @@ async def emit_chal(self, chal_id, pro_id, testm_conf, comp_type): await JudgeServerClusterService.inst.send( { - 'pri': 1, + 'pri': pri, 'chal_id': chal_id, 'test': testl, 'code_path': f'{chal_id}/main.{file_ext}', @@ -361,7 +362,6 @@ async def emit_chal(self, chal_id, pro_id, testm_conf, comp_type): 'comp_type': comp_type, 'check_type': test_conf['check_type'], }, - 1, pro_id, contest_id, ) diff --git a/src/services/judge.py b/src/services/judge.py index 3171104f..7ebded61 100644 --- a/src/services/judge.py +++ b/src/services/judge.py @@ -100,6 +100,7 @@ async def disconnect_server(self) -> Union[str, None]: try: self.status = False + self.running_chal_cnt = 0 self.ws.close() self.main_task.cancel() self.main_task = None @@ -108,7 +109,7 @@ async def disconnect_server(self) -> Union[str, None]: return None - async def get_server_status(self): + def get_server_status(self): return ( None, { @@ -203,56 +204,51 @@ async def disconnect_all_server(self) -> None: self.queue.get() await server.disconnect_server() - async def get_server_status(self, idx): + def get_server_status(self, idx): if idx < 0 or idx >= len(self.servers): return 'Eparam' - _, status = await self.servers[idx].get_server_status() + _, status = self.servers[idx].get_server_status() return None, status - async def get_servers_status(self) -> List[Dict]: + def get_servers_status(self) -> List[Dict]: status_list: List[Dict] = [] for server in self.servers: - _, status = await server.get_server_status() + _, status = server.get_server_status() status_list.append(status) return status_list - async def is_server_online(self) -> bool: + def is_server_online(self) -> bool: for server in self.servers: - _, status = await server.get_server_status() + _, status = server.get_server_status() if status['status']: return True return False - async def send(self, data, pri, pro_id, contest_id) -> None: - # simple round-robin impl + async def send(self, data, pro_id, contest_id) -> None: + # priority impl - for i in range(self.idx + 1, len(self.servers)): - if self.servers[i].ws is None: - continue + if not self.is_server_online(): + return - _, status = await self.servers[i].get_server_status() + while not self.queue.empty(): + running_cnt, idx = self.queue.get() + _, status = self.get_server_status(idx) if not status['status']: continue - await self.servers[i].send(json.dumps(data)) - self.servers[i].chal_map[data['chal_id']] = {"pro_id": pro_id, "contest_id": contest_id} + judge_id = status['judge_id'] - self.idx = i - return - - for i in range(0, len(self.servers)): - if self.servers[i].ws is None: - continue + if data['chal_id'] in self.servers[judge_id].chal_map: + self.queue.put([running_cnt, idx]) + break - _, status = await self.servers[i].get_server_status() - if not status['status']: - continue + await self.servers[judge_id].send(data) + _, status = self.get_server_status(idx) - await self.servers[i].send(data) - self.servers[i].chal_map[data['chal_id']] = {"pro_id": pro_id, "contest_id": contest_id} + self.queue.put([status['running_chal_cnt'], judge_id]) + self.servers[idx].chal_map[data['chal_id']] = {"pro_id": pro_id, "contest_id": contest_id} - self.idx = i - return + break