-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(api): add possibility to run study simulations #35
Conversation
salemsd
commented
Dec 12, 2024
- Studies now have a run service that can be called to both start a simulation and wait for its job
- Unit tests are working
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work :) Once the comments will be resolved you could add an integration test it would be great
src/antares/model/study.py
Outdated
|
||
Returns: A job representing the simulation task | ||
""" | ||
return self._run_service.run_antares_simulation() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should pass the parameters here
src/antares/model/study.py
Outdated
""" | ||
return self._run_service.run_antares_simulation() | ||
|
||
def wait_job_completion(self, job: Job, time_out: int) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
time_out should be optional IMO. With a really high default value if not given such as 172800 (default value in AntaresWeb)
src/antares/model/study.py
Outdated
""" | ||
Runs the Antares simulation. | ||
|
||
This method starts an antares simulation for the current study config and params |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be This method starts an antares simulation with the given parameters
src/antares/model/settings/solver.py
Outdated
from enum import Enum | ||
|
||
|
||
class Solver(Enum): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should put this class at the same place as the job one and simulation_parameters, inside a simulation.py file
As settings is for the study generaldata.ini file
if self.presolve: | ||
options.append("presolve") | ||
if self.solver != Solver.SIRIUS: | ||
options.append(self.solver.name.lower()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.solver.name.lower() could be self.solver.value as it's already in lower case
job.status = updated_job.status | ||
job.unzip_output = updated_job.unzip_output | ||
|
||
if job.unzip_output: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once you go out of the loop your status could be success or failed.
If it's failed, you should raise an Exception to say the simulation failed.
Else you can continue your code
|
||
return None | ||
|
||
def _unzip_output(self, ref_id: str, type: list[str]) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps my spec wasn't really clear as it's a complex subject but what you need to do in this method is to wait for the output to unzip itself. So you can rename this method.
Also, your payload should simply be
{
"type": type,
"ref_id": ref_id
}
otherwise you won't have any result (example: to_completion_date: 0 means you want all jobs that ended before date 0 which will return no jobs)
"from_completion_date_utc": 0, | ||
"to_completion_date_utc": 0, | ||
} | ||
self._wrapper.post(url, json=payload) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Finally here you should repeat this call every 2 seconds for example, and check the output to see if the task ended successfully. I can reexplain this to you if you need
src/antares/exceptions/exceptions.py
Outdated
|
||
|
||
class SimulationTimeOutError(Exception): | ||
def __init__(self, job_id: str, time_out: int, message: str = "Error") -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't really see the point of the 'message' argument here
mocker.post(run_url, json={"id": job_id}, status_code=200) | ||
|
||
job_url = f"https://antares.com/api/v1/launcher/jobs/{job_id}" | ||
response_list = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure to understand this. The output shouldn't be a list when calling this endpoint. We can discuss it later.
options.append(self.solver.name.lower()) | ||
return " ".join(options) | ||
|
||
def model_dump(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should rename this method to_api()
instead of model_dump because when we'll implement the local method we'll have to parse the object differently and it would be clearer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments but we're in the right direction :)
As we discussed you can introduce a test inside test_web_client that just launches a simulation and asserts the job succeeded
response = self._wrapper.get(url) | ||
job_info = response.json() | ||
status = JobStatus.from_str(job_info["status"]) | ||
output_id = job_info["output_id"] if status == JobStatus.SUCCESS else None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can just use job_info["output_id"] as it will be None if the job succeeded
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
job_info["output_id"]
doesn't even exist before the job succeeds, so it will throw a KeyError
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay. Could be output_id = job_info.get("output_id")
then ?
if job.status == JobStatus.FAILED: | ||
raise SimulationFailedError(self.study_id) | ||
|
||
if job.unzip_output and job.output_id: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree here you should assert that we have an output_id. But if we haven't it seems to me we should raise a SimulationFailedError. So i think you should put this as a or not job.output_id
line 71 and remove it from this line
|
||
return None | ||
|
||
def _wait_unzip_output(self, ref_id: str, type: list[str], job_output_id: str) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually we shouldn't give ["UNARCHIVE"] to this method, we should use it internally as we cannot call this method with another task_type
"json": { | ||
"id": job_id, | ||
"status": "running", | ||
"launcher_params": dumps(parameters.to_api()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can store dumps(parameters.to_api())
inside a variable instead of calling it 4 times
except APIError as e: | ||
raise AntaresSimulationUnzipError(self.study_id, e.message) from e | ||
|
||
def _get_task_id(self, job_output_id: str, tasks: list[dict[str, Any]]) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could be more specific, like _get_unarchiving_task_id as this code will only work for this type of task and we might one day introduce a get_task_id somewhere in the code that will be more generic
raise AntaresSimulationUnzipError(self.study_id, "Could not find task for unarchiving job") | ||
|
||
def _get_task_until_success(self, url: str, repeat_interval: int) -> None: | ||
task_success = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This task can fail. And if it does you will never go out of this while. You can introduce a more generic method that has a task_id as an argument and builds its url internally. it could be called wait_task_completion
and will have the same behavior as wait_job_completion: a timeout and raise a Timeout error etc.. (For now you can make it private but we'll need this method at some point).
Also task["result"] can return None so you'll have to handle this as a sign it's still running to avoid KeyErrors
def test_run_and_wait_antares_simulation(self): | ||
parameters = AntaresSimulationParameters(solver=Solver.COIN, nb_cpu=2, unzip_output=True, presolve=False) | ||
|
||
# patch simulates the repeating intervals so that we don't have to wait X seconds during the tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Smart
9f88c9d
to
5ccf8c7
Compare
9d4bcf3
to
33131b3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor changes, mainly a spec change I just came up with :/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost done :)
response = self._wrapper.get(url) | ||
job_info = response.json() | ||
status = JobStatus.from_str(job_info["status"]) | ||
output_id = job_info["output_id"] if status == JobStatus.SUCCESS else None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay. Could be output_id = job_info.get("output_id")
then ?
self._update_job(job) | ||
|
||
if job.status == JobStatus.FAILED or not job.output_id: | ||
raise SimulationFailedError(self.study_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should also log the job_id inside this Exception for the user and debugging purpose
tasks = response.json() | ||
task_id = self._get_unarchiving_task_id(job, tasks) | ||
self._wait_task_completion(task_id, repeat_interval, time_out) | ||
except APIError as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can simply do a
except Exception as e:
raise AntaresSimulationUnzipError(self.study_id, job.job_id, e.message) from e
as I think we want to raise this issue in both cases