Skip to content

Commit

Permalink
Characterize nodes (not yet ready)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikosT committed Aug 29, 2024
1 parent e5900a5 commit a1a31cc
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 3 deletions.
2 changes: 1 addition & 1 deletion etc/oar/admission_rules.d/15_check_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
r8 = "^allowed=\\w+$"
r9 = "^inner=\\w+$"
r10 = "^timesharing=(?:(?:\\*|user),(?:\\*|name)|(?:\\*|name),(?:\\*|user))$"
r11 = "^(?:compact|spread|f_spread|co_loc|f_co_loc|no_pref|exclusive)$"
r11 = "^(?:compact|spread|f_spread|co_loc|f_co_loc|no_pref|exclusive|friendly|unfriendly)$"
all_re = re.compile(
"(%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s)"
% (r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, r11)
Expand Down
87 changes: 85 additions & 2 deletions oar/kao/custom_scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,63 @@

from procset import ProcSet

from oar.lib.globals import get_logger
from oar.lib.globals import get_logger, init_oar
from oar.lib.hierarchy import find_resource_hierarchies_scattered
from sqlalchemy import func, case, desc, asc
from sqlalchemy.orm import Session
from oar.lib.models import (
AssignedResource,
EventLog,
FragJob,
Job,
JobType,
MoldableJobDescription,
Resource,
ResourceLog
)
from oar.lib.resource import ResourceSet

logger = get_logger("oar.custom_scheduling")


def get_nodes_characterization(session: Session):
# Subquery to calculate the appearance count and assign a row number per network_address
subquery = (
session.query(
Resource.network_address,
JobType.type,
func.count(AssignedResource.resource_id).label('appearance_count'),
func.row_number().over(
partition_by=Resource.network_address,
order_by=[
desc(func.count(AssignedResource.resource_id)),
case(
(JobType.type == 'unfriendly', 1),
(JobType.type == 'friendly', 2),
else_=3
)
]
).label('rn')
)
.join(Job, Job.id == JobType.job_id)
.join(AssignedResource, Job.assigned_moldable_job == AssignedResource.moldable_id)
.join(Resource, AssignedResource.resource_id == Resource.id)
.filter(Job.state.in_(("toLaunch", "Running", "Resuming", "Terminated")))
.group_by(Resource.network_address, JobType.type)
.subquery()
)

# Main query to get only the rows with rn = 1 (max appearance count per network_address)
results = (
session.query(subquery.c.network_address, subquery.c.type, subquery.c.appearance_count)
.filter(subquery.c.rn == 1)
.order_by(asc(subquery.c.network_address))
.all()
)

return results


def compact(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True):
"""
Given a job resource request and a set of resources this function tries to find a matching allocation.
Expand All @@ -23,7 +74,39 @@ def compact(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=Tru
:return [ProcSet]: \
The allocation if found, otherwise an empty :class:`procset.ProcSet`
"""
# logger.info(session)
import time
start_time = time.time()

logger.info(__file__)
config, db = init_oar(no_db=False)
chars = get_nodes_characterization(session)

rs = ResourceSet(session, config)

nodes = {}
list(map(lambda x: nodes.setdefault(x[1], []).append(x[0]), rs.roid_2_network_address.items()))

nodes = {k: ProcSet(*v) for k, v in nodes.items()}

logger.info(nodes)
logger.info(chars)

agg = []
#for node in nodes:
# agg.append((node,if n ))

#agg={}
#for node in nodes:



#result = {node: [k for k, v in data.items() if v == node] for node in set(data.values())}

#swapped_dict = {value: key for key, value in resource_set.items()}

end_time = time.time()
elapsed_time = end_time - start_time
logger.info(f"Function execution time: {elapsed_time:.4f} seconds")

# queryCollection = BaseQueryCollection(session)
# jobs=get_jobs_in_state(session,'Running')
Expand Down

0 comments on commit a1a31cc

Please sign in to comment.