-
Notifications
You must be signed in to change notification settings - Fork 11
/
am_docker_ability.py
140 lines (126 loc) · 5.19 KB
/
am_docker_ability.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
"""Archivematica Docker Ability
This module contains the ``ArchivematicaDockerAbility`` class, which encodes the
ability of an Archivematica user to use Docker to interact configure and deploy
Archivematica.
"""
import logging
import os
import shlex
import subprocess
from . import base
logger = logging.getLogger("amuser.docker")
class ArchivematicaDockerAbility(base.Base):
"""Archivematica Docker Ability: the ability of an Archivematica user to use
Docker configure and deploy Archivematica.
"""
@property
def docker_compose_file_path(self):
return os.path.join(self.docker_compose_path, "docker-compose.yml")
def recreate_archivematica(self, capture_output=False):
"""Recreate the docker-compose deploy of Archivematica by calling
docker-compose's ``up`` subcommand.
"""
dc_recreate_cmd = f"docker-compose -f {self.docker_compose_file_path} up -d"
capture_output = {True: "true"}.get(capture_output, "false")
env = dict(os.environ, AM_CAPTURE_CLIENT_SCRIPT_OUTPUT=capture_output)
subprocess.check_output(shlex.split(dc_recreate_cmd), env=env)
def get_tasks_from_sip_uuid(
self, sip_uuid, mysql_user="root", mysql_password="12345"
):
"""Use ``docker-compose exec mysql`` to get all tasks used to create a
given SIP as a list of dicts.
"""
tasks = []
sql_query = (
"SELECT t.fileUUID as file_uuid,"
" f.fileSize as file_size,"
" LENGTH(t.stdOut) as len_std_out,"
" LENGTH(t.stdError) as len_std_err,"
" t.exec,"
" t.exitCode,"
" t.endTime,"
" t.startTime,"
" TIMEDIFF(t.endTime,t.startTime) as duration"
" FROM Tasks t"
" INNER JOIN Files f ON f.fileUUID=t.fileUUID"
f" WHERE f.sipUUID='{sip_uuid}'"
" ORDER by endTime-startTime, exec;"
)
cmd_str = f'docker-compose exec mysql mysql -u {mysql_user} -p{mysql_password} MCP -e "{sql_query}"'
res = subprocess.check_output(
shlex.split(cmd_str), cwd=self.docker_compose_path
).decode("utf8")
lines = [line for line in res.splitlines() if line.startswith("|")]
keys = [k.strip() for k in lines[0].strip(" |").split("|")]
for line in lines[1:]:
vals = [v.strip() for v in line.strip(" |").split("|")]
tasks.append(dict(zip(keys, vals)))
return tasks
def get_processes(self):
return (
subprocess.check_output(
["docker-compose", "-f", self.docker_compose_file_path, "ps"]
)
.decode("utf8")
.splitlines()
)
def get_process_names_states(self):
NAME = slice(0, 42)
STATE = slice(75, 86)
names_and_states = []
for line in self.get_processes():
name = line[NAME].strip().lower()
state = line[STATE].strip().lower()
names_and_states.append({"name": name, "state": state})
return names_and_states
def _get_container_id(self, docker_container_name):
try:
docker_compose_path = self.docker_compose_path
except AttributeError:
logger.error("No docker compose path provided.")
raise
return (
subprocess.check_output(
shlex.split(f"docker-compose ps -q {docker_container_name}"),
cwd=docker_compose_path,
)
.decode("utf8")
.strip()
)
def cp_server_file_to_local(self, server_file_path):
"""Use ``docker cp`` to copy a file from the docker container to our
local tmp directory.
"""
docker_container_name = "archivematica-storage-service"
docker_container_id = self._get_container_id(docker_container_name)
filename = os.path.basename(server_file_path)
local_path = os.path.join(self.tmp_path, filename)
subprocess.check_output(
shlex.split(
f"docker cp {docker_container_id}:{server_file_path} {local_path}"
),
cwd=self.docker_compose_path,
).decode("utf8").strip()
if os.path.isfile(local_path):
return local_path
logger.info("Failed to `docker cp` %s to %s", server_file_path, local_path)
return False
def cp_server_dir_to_local(self, server_dir_path):
"""Use ``docker cp`` to copy a directory from the docker container to
our local tmp directory.
"""
docker_container_name = "archivematica-storage-service"
docker_container_id = self._get_container_id(docker_container_name)
server_dir_path = server_dir_path.rstrip("/")
dirname = os.path.basename(server_dir_path)
local_path = os.path.join(self.tmp_path, dirname)
subprocess.check_output(
shlex.split(
f"docker cp {docker_container_id}:{server_dir_path} {local_path}"
),
cwd=self.docker_compose_path,
).decode("utf8").strip()
if os.path.isdir(local_path):
return local_path
logger.info("Failed to `docker cp` %s to %s", server_dir_path, local_path)
return False