Skip to content

Commit

Permalink
worker process: prevent worker from getting SIGKILL'd on KeyboardInte…
Browse files Browse the repository at this point in the history
…rrupt

While running script, sending <kbd>Ctrl</kbd> + <kbd>C</kbd> would also send SIGINT to
the worker processes, but then [it would immediately follow up with a SIKILL][1] [after 0.25s][2].

This allowed very little time for workers to do cleanup, due to which, Python was raising `UserWarning`
for leaked semaphores due to this:

```py
.../lib/python3.12/site-packages/multiprocess/resource_tracker.py:257: UserWarning: resource_tracker: There appear to be 6 leaked semaphore objects to clean up at shutdown
```

Also see python/cpython#70130.

This patch changes worker processes to be run using `subprocess.Popen`
rather than `subprocess.run`, so that SIGKILL never gets sent.

[1]: https://github.com/python/cpython/blob/3c770e3f0978d825c5ebea98fcd654660e7e135f/Lib/subprocess.py#L571-L573
[2]: https://github.com/python/cpython/blob/3c770e3f0978d825c5ebea98fcd654660e7e135f/Lib/subprocess.py#L891
  • Loading branch information
skshetry committed Nov 23, 2024
1 parent c6c9b34 commit ae02ab3
Showing 1 changed file with 5 additions and 8 deletions.
13 changes: 5 additions & 8 deletions src/datachain/query/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,18 +457,15 @@ def populate_udf_table(self, udf_table: "Table", query: Select) -> None:
# Run the UDFDispatcher in another process to avoid needing
# if __name__ == '__main__': in user scripts
exec_cmd = get_datachain_executable()
cmd = [*exec_cmd, "internal-run-udf"]
envs = dict(os.environ)
envs.update({"PYTHONPATH": os.getcwd()})
process_data = filtered_cloudpickle_dumps(udf_info)
result = subprocess.run( # noqa: S603
[*exec_cmd, "internal-run-udf"],
input=process_data,
check=False,
env=envs,
)
if result.returncode != 0:
raise RuntimeError("UDF Execution Failed!")

with subprocess.Popen(cmd, env=envs, stdin=subprocess.PIPE) as process: # noqa: S603
process.communicate(process_data)
if process.poll():
raise RuntimeError("UDF Execution Failed!")
else:
# Otherwise process single-threaded (faster for smaller UDFs)
warehouse = self.catalog.warehouse
Expand Down

0 comments on commit ae02ab3

Please sign in to comment.