Skip to content

Commit

Permalink
Merge branch 'universal-dependencies' into development
Browse files Browse the repository at this point in the history
# Conflicts:
#	buildall.sh
#	pie/TDN-ALL/Dockerfile
  • Loading branch information
PrinsINT committed Nov 15, 2024
2 parents ec7e7e2 + 8826af7 commit 9f0eb3b
Show file tree
Hide file tree
Showing 30 changed files with 2,449 additions and 71 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
.env.dev
webservice
7 changes: 4 additions & 3 deletions base/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
FROM python:3.10-slim-bookworm
ENV LC_ALL C.UTF-8
ENV LANG C.UTF-8
FROM python:3.12.7-slim-bookworm

# Install requirements
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt \
&& mkdir input status output process

# copy python source
COPY --link . ./

# Optionally set a callback server
Expand Down
13 changes: 6 additions & 7 deletions base/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
bottle==0.12.25
bottle-log==1.0.0
certifi==2023.7.22
charset-normalizer==3.3.0
idna==3.7
requests==2.32.2
urllib3==2.2.2
bottle==0.13.2
certifi==2024.8.30
charset-normalizer==3.4.0
idna==3.10
requests==2.32.3
urllib3==2.2.3
134 changes: 106 additions & 28 deletions base/statuslogger.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import sys
import logging
from typing import Any, Optional
import pathlib
import time
import fcntl

STATUS_FOLDER = "status"
PROCESS_FOLDER = "process"
Expand All @@ -33,6 +36,62 @@
logging.basicConfig(stream=sys.stdout, format=log_format, level=logging.INFO)


class FileMutex:
"""
A mutex for file access. First acquires a lock on a lock file named original_file.lock, creating it if needed.
Then opens the original file. When the mutex is released, the lock file is removed in an attempt to clean up.
"""

def __init__(self, file_path: str, timeout: int = 5):
"""
Create a new mutex for the file_path. timeout is the maximum time to wait for the lock.
"""
self.timeout = timeout
# Paths
self.file_path = file_path
self._lock_path = file_path + ".lock"
# Files
self._lock = None
self.file = None

def acquire(self, file_mode: str) -> None:
"""
Acquire the lock and open the file in file_mode, once the lock is acquired. Sets self.file.
"""
start_time = time.time()
# Try to acquire the lock, if it fails, wait for a bit and try again.
while True:
try:
self._lock = open(self._lock_path, "a+", encoding="utf-8")
fcntl.flock(self._lock, fcntl.LOCK_EX)
break # Acquired!
except (IOError, OSError):
if time.time() - start_time > self.timeout:
raise TimeoutError(
"Timeout occurred while trying to acquire the lock."
)
time.sleep(0.1)
# Open the file after acquiring the lock.
self.file = open(self.file_path, file_mode, encoding="utf-8")

def release(self) -> None:
"""
Release the lock and close the file. Try to remove the lock file.
"""
if self.file:
self.file.close()
self.file = None

if self._lock:
fcntl.flock(self._lock, fcntl.LOCK_UN)
self._lock.close()
self._lock = None
try:
pathlib.Path(self._lock_path).unlink(missing_ok=True)
except:
pass # Well, we tried.


class StatusLogger:
"""
A status object for files at the tagger. Keeps a json status that can be sent to the server.
Expand Down Expand Up @@ -91,40 +150,55 @@ def get_status(self) -> dict[str, Any]:
"message": "File not on server",
"pending": False,
"busy": False,
"error": False,
"error": True,
"finished": False,
}
with open(self.status_path, encoding="utf-8") as f:
try:
return json.load(f)
except:
logging.error(f"Error decoding status file { self.status_path }")
return {
"message": "Error decoding status file",
"pending": False,
"busy": False,
"error": True,
"finished": False,
}
try:
mutex = FileMutex(self.status_path)
mutex.acquire("r")
return json.load(mutex.file)
except Exception as e:
return {
"message": f"Could not read status file. {e}",
"pending": False,
"busy": False,
"error": True,
"finished": False,
}
finally:
mutex.release()

def delete_status(self) -> None:
"""
Deletes the file storage associated with this status, as well as the process status if present.
"""
if self.exists():
os.remove(self.status_path)
self.delete_status_file()
# We might have to remove its process status as well.
process_status = ProcessStatus(self.filename)
if process_status.exists():
process_status.kill()

def delete_status_file(self) -> None:
"""
Delete only the status file. Used by ProcessStatus to avoid recursion.
"""
try:
pathlib.Path(self.status_path).unlink(missing_ok=True)
except:
raise

def _dump_status(self, status: dict[str, Any]) -> None:
"""
Logs the current status, replacing the previous one.
"""
f = open(self.status_path, "w", encoding="utf-8")
json.dump(status, f)
f.close()
try:
mutex = FileMutex(self.status_path)
mutex.acquire("w")
json.dump(status, mutex.file)
except:
raise
finally:
mutex.release()

# Logging functions

Expand Down Expand Up @@ -186,6 +260,9 @@ def get_all_statusloggers() -> list[ProcessStatus]:
)

def __init__(self, filename: str, pid: Optional[int] = None) -> None:
"""
When no pid is given, we try to find the pid from the file. Otherwise, we create a new status file.
"""
self.filename = filename
self.status_path = os.path.join(PROCESS_FOLDER, filename)
if pid is not None:
Expand All @@ -194,29 +271,31 @@ def __init__(self, filename: str, pid: Optional[int] = None) -> None:
pid = self.get_pid()
if pid is not None:
try:
os.kill(pid, 0)
os.kill(pid, 0) # Check if alive.
except:
# No process with this pid exists.
# delete ourselves, otherwise the tagger thinks we are busy.
self.delete_status()
StatusLogger(self.filename).init("File processing ended. Retry later.")
StatusLogger(self.filename).init(
"File processing ended. Retry later."
)

def get_pid(self) -> Optional[int]:
"""
Process ID of the current thread (i.e. mp.pool).
Process ID of the current thread.
"""
if not self.exists():
try:
return self.get_status()["pid"]
except:
return None
with open(self.status_path, encoding="utf-8") as f:
status = json.load(f)
return status["pid"]

def kill(self) -> None:
"""
Kill the thread (i.e. mp.pool) that is currently tagging the file.
Kill the thread that is currently tagging the file.
"""
pid = self.get_pid()
if pid is not None:
print(f"Killing process {pid}")
os.kill(pid, signal.SIGKILL)
self.delete_status()

Expand All @@ -225,6 +304,5 @@ def delete_status(self) -> None:
Called when a processed is killed, or naturally ends.
Removes ourselves, signifying the tagger is no longer busy.
"""
if self.exists():
os.remove(self.status_path)
self.delete_status_file()
# Note that calling the super would cause recursion.
68 changes: 40 additions & 28 deletions base/tagger_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import subprocess
import requests
import traceback
import pathlib

# Local
import process
Expand All @@ -29,6 +30,7 @@
from process import PROCESSING_SPEED

CALLBACK_SERVER: str = os.getenv("CALLBACK_SERVER") or ""
NUM_WORKERS = int(os.getenv("NUM_WORKERS") or 1)


def run_pending_tasks() -> None:
Expand All @@ -39,7 +41,8 @@ def run_pending_tasks() -> None:
global pool

# One task at a time.
if StatusLogger.busy_task_exists():
tasks_in_queue = pool._taskqueue.qsize()
if tasks_in_queue > 0:
return

# Start new task when not busy
Expand All @@ -53,11 +56,9 @@ def run_pending_tasks() -> None:
# Extra None check for typing
if (not is_pool_running(pool)) or pool is None:
# Spawn pool if not running
pool = mp.Pool(processes=1, initializer=process.init)
pool = mp.Pool(processes=NUM_WORKERS, initializer=process.init)
# Perform task at running pool
pool.apply_async(process_file, args=(sl.filename,))
# Only start one task at a time, so return.
return


def process_file(filename: str):
Expand Down Expand Up @@ -86,8 +87,7 @@ def process_file(filename: str):
sl.error(f"An exception occurred: {e}")
print(traceback.format_exc())
# copy input file to error folder if it exists
if os.path.exists(in_path):
sl.error("Moving input file to error folder")
if os.path.isfile(in_path):
os.rename(in_path, error_path)
if CALLBACK_SERVER != "":
sl.error("Sending error to callback server")
Expand All @@ -99,20 +99,32 @@ def tag(
) -> None:
"""
Attempt to tag the file by the tagger with a timeout.
Send the result to the server, whether sucessful or not.
Also appropiately logs the status.
Send the result to the server, whether successful or not.
Also appropriately logs the status.
"""
# 300s = 5min fixed time
# plus
# bytes * speed variable time
in_bytes_size = int(
subprocess.check_output(["du", "-sb", in_path]).split()[0].decode("utf-8")
)
TIMEOUT = 300 + in_bytes_size + PROCESSING_SPEED
sl.busy("Will process with a timeout after " + str(TIMEOUT) + " seconds")

# Runs the respective tagger software.
@timeout(TIMEOUT, os.strerror(errno.ETIME))
in_bytes_size = None
while in_bytes_size is None:
if os.path.isfile(in_path):
try:
in_bytes_size = int(
subprocess.check_output(["du", "-sb", in_path])
.split()[0]
.decode("utf-8")
)
except Exception as e:
print(f"Error getting file size: {e}")
time.sleep(1)
else:
raise FileNotFoundError(f"File {in_path} not found")

time_out = 300 + in_bytes_size + PROCESSING_SPEED
sl.busy("Will process with a timeout after " + str(time_out) + " seconds")

# Runs the respective tagger software synchronously.
@timeout(time_out, os.strerror(errno.ETIME))
def doTagging():
process.process(in_path, out_path)

Expand All @@ -121,11 +133,7 @@ def doTagging():
# Done processing
ps.delete_status() # Frees up the tagger
sl.finished("Removing input file")
# "try", because the task might have been cancelled and deleted in the meantime.
try:
os.remove(in_path)
except:
pass
pathlib.Path(in_path).unlink(missing_ok=True)

sl.finished(
"Finished processing %s, result has size %d"
Expand Down Expand Up @@ -169,8 +177,9 @@ def send_error_to_callback_server(filename: str, out_path: str, message: str) ->
Send the error to the callback server and keep or delete the file based on the server response.
"""
url = CALLBACK_SERVER + "/error"
payload = {"file_id": filename, "message": message}
r = requests.post(url, data=payload)
payload = {"file_id": filename}
json_data = {"file_id": filename, "message": message}
r = requests.post(url, json=json_data, params=payload)
keep_or_delete_file(r, out_path)


Expand All @@ -190,12 +199,15 @@ def is_pool_running(pool: Optional[Pool]) -> bool:

# Pool needs to be defined after the functions it will execute.
# https://stackoverflow.com/questions/41385708/multiprocessing-example-giving-attributeerror#comment101561695_42383397
pool: Optional[Pool] = mp.Pool(processes=1, initializer=process.init)

pool = None

# It is ugly, but it is also used here:
# https://pypi.org/project/schedule/
if __name__ == "__main__":
# Can't use fork with the gpu.
mp.set_start_method("spawn", force=True)
pool = mp.Pool(processes=NUM_WORKERS, initializer=process.init)

# It is ugly, but it is also used here:
# https://pypi.org/project/schedule/
while True:
run_pending_tasks()
time.sleep(1)
time.sleep(0.05)
7 changes: 5 additions & 2 deletions base/webservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@ def post_input():
return HTTPResponse("No selected file", 400)
if file:
id = str(uuid.uuid4())
file.save(os.path.join(UPLOAD_FOLDER, id))
file_dest = os.path.join(UPLOAD_FOLDER, id)
file.save(file_dest)
file_exists = os.path.isfile(file_dest)
if not file_exists:
return HTTPResponse("File could not be saved. Please try again.", 500)
# register the file
sl = StatusLogger(id)
sl.init("File arrived")
Expand Down Expand Up @@ -156,7 +160,6 @@ def delete_file(id: str):
# remove the file
if os.path.isfile(path):
os.remove(path)
print(path)
return HTTPResponse("File " + id + " deleted", 200)


Expand Down
Loading

0 comments on commit 9f0eb3b

Please sign in to comment.