Skip to content

Commit

Permalink
Implement UCS Communication
Browse files Browse the repository at this point in the history
  • Loading branch information
winstonsmith1897 committed Aug 25, 2023
1 parent e122be3 commit 1e02e0a
Show file tree
Hide file tree
Showing 12 changed files with 831 additions and 59 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ COPY requirements.txt /
RUN pip install --no-cache-dir -r /requirements.txt

ADD application_manager/*.py /
ADD application_manager/get-labels.sh /

ADD run_application_manager/run_manager.sh /
#ADD services/leader_file.txt / uncomment this to test the run_manager
Expand Down
Binary file modified application_manager/__pycache__/app_dht.cpython-310.pyc
Binary file not shown.
Binary file not shown.
Binary file modified application_manager/__pycache__/catch_topic.cpython-310.pyc
Binary file not shown.
Binary file not shown.
80 changes: 49 additions & 31 deletions application_manager/app_dht.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

client = docker.from_env()

api_url = "http://localhost:3000/"
api_url = "http://sifis-device4.iit.cnr.it:3000/"


def publish(ws, topic_uuid, requestor_id, request_id, containers):
Expand Down Expand Up @@ -36,18 +36,27 @@ def request_list(ws, message, image_name):
requestor_id = topic_value["requestor_id"]
request_id = topic_value["request_id"]
containers = topic_value["containers"]
containers.append(image_name)
publish(ws, topic_uuid, requestor_id, request_id, containers)
print("\n[!] Active Containers: " + str(containers) + "\n")
if image_name not in containers:
containers.append(image_name)
publish(ws, topic_uuid, requestor_id, request_id, containers)
print("\n[!] Active Containers: " + str(containers) + "\n")
result = "OK"
return result
else:
print("The App has been already installed")
print("\n[!] Active Containers: " + str(containers) + "\n")
result = "Already Installed"
return result


def update_dht_list(ws, image_name):
topic_name = "SIFIS:container_list"
try:
response = requests.get(api_url + "topic_name/" + topic_name)
message = str(response.json()[0])
message = str(response.json()[-1])
message = message.replace("'", '"')
request_list(ws, message, image_name)
result = request_list(ws, message, image_name)
return result
except IndexError:
print("\n[!] Update List, the list is empty")
topic_uuid = "Pippo"
Expand All @@ -58,32 +67,39 @@ def update_dht_list(ws, image_name):


def pull_image(ws, image_name, topic_uuid, requestor_id, request_id):
prefix = "ghcr.io/sifis-home/"
if image_name == "":
raise ValueError("Image name cannot be empty")
if prefix not in image_name:
image_name = prefix + image_name
if image_name:
try:
client.images.pull(image_name)
update_dht_list(ws, image_name)
topic_name = "SIFIS:application_manager_pull_image"
pulling_data = {
"requestor_id": requestor_id,
"request_id": request_id,
"pulled_image": image_name,
"operation": "pull image",
"result": "successfull",
}
requests.post(
api_url
+ "topic_name/"
+ topic_name
+ "/topic_uuid/"
+ topic_uuid,
json=pulling_data,
)
result = update_dht_list(ws, image_name)
if result != "Already Installed":
client.images.pull(image_name)
# update_dht_list(ws, image_name)
pulling_data = {
"requestor_id": requestor_id,
"request_id": request_id,
"pulled_image": image_name,
"operation": "pull image",
"result": "successfull",
}
requests.post(
api_url
+ "topic_name/"
+ topic_name
+ "/topic_uuid/"
+ topic_uuid,
json=pulling_data,
)

print(f"[!] Image {image_name} pulled successfully!")
start_container(image_name, topic_uuid, requestor_id, request_id)
return "\n Pulling and Starting operation completed ..."
print(f"[!] Image {image_name} pulled successfully!")
start_container(
image_name, topic_uuid, requestor_id, request_id
)
return "\n Pulling and Starting operation completed ..."
except docker.errors.APIError as e:
pulling_data = {
"requestor_id": requestor_id,
Expand Down Expand Up @@ -114,8 +130,8 @@ def start_container(image_name, topic_uuid, requestor_id, request_id):
image_name,
detach=True,
volumes={
"/var/run/docker.sock": {
"bind": "/var/run/docker.sock",
"/var/run/sifis.sock": {
"bind": "/var/run/sifis.sock",
"mode": "rw",
}
}, # volume
Expand Down Expand Up @@ -197,8 +213,9 @@ def remove_image(image_name, topic_uuid, request_id, requestor_id):
return "Missing 'image_name' parameter", 400

try:
client.images.remove(image_name, force=True)
print("[!] Removing Image : " + image_name)
topic_name = "SIFIS:application_manager_remove_image"
list_containers(topic_uuid, requestor_id, request_id, image_name)
removing_info = {
"requestor_id": requestor_id,
"request_id": request_id,
Expand All @@ -211,6 +228,7 @@ def remove_image(image_name, topic_uuid, request_id, requestor_id):
json=removing_info,
)
list_containers(topic_uuid, requestor_id, request_id, image_name)
client.images.remove(image_name, force=True)
return f"Image {image_name} removed successfully!"
except docker.errors.ImageNotFound as e:
return f"Image {image_name} not found: {e}", 404
Expand All @@ -221,13 +239,13 @@ def remove_image(image_name, topic_uuid, request_id, requestor_id):
def list_containers(topic_uuid, requestor_id, request_id, image_name):
topic_name = "SIFIS:container_list"
response = requests.get(api_url + "topic_name/" + topic_name)
message = str(response.json()[0])
message = str(response.json()[-1])
message = message.replace("'", '"')
if "value" in str(message):
json_message = json.loads(message)
topic_value = json_message["value"]
containers = topic_value["containers"]
if image_name != None:
if image_name in containers:
containers.remove(image_name)
_list = {
"requestor_id": requestor_id,
Expand Down
135 changes: 131 additions & 4 deletions application_manager/catch_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,32 @@
import security_by_contract
import websocket

REGISTERED = False
registration_id = 1
session_id = "None"
permit_installation = False

def handle_pull_image(ws, topic_name, topic_uuid):

def UCS_request(ws, topic_name, topic_uuid):
global REGISTERED, session_id
try:
image_name = topic_name["image_name"]
security_by_contract.get_labels(image_name)
print("[!] Recovering App LABELS\n")
_, app_id = security_by_contract.get_labels(image_name)
session_id = app_id
print("[!] APP_ID: " + str(session_id))
return
except Exception as e:
print(e)


"""
def handle_pull_image(ws, topic_name, topic_uuid):
global REGISTERED, session_id
print("Permit Installation")
print(permit_installation)
try:
image_name = topic_name["image_name"]
requestor_id = topic_name["requestor_id"]
request_id = topic_name["request_id"]
result = app_dht.pull_image(
Expand All @@ -18,6 +38,33 @@ def handle_pull_image(ws, topic_name, topic_uuid):
print(result)
except Exception as e:
print(e)
"""


def handle_pull_image():
global REGISTERED, session_id, global_pull_image_params

# Accedi ai parametri globali
params = global_pull_image_params
if params is not None:
ws = params["ws"]
topic_value = params["topic_value"]
topic_uuid = params["topic_uuid"]
try:
image_name = topic_value["image_name"]
requestor_id = topic_value["requestor_id"]
request_id = topic_value["request_id"]
result = app_dht.pull_image(
ws, image_name, topic_uuid, requestor_id, request_id
)
print(result)
print(
"\n----------------- PULLING OPERATION COMPLETED ---------------------------------\n"
)
except Exception as e:
print(e)
else:
print("No pull image parameters available.")


def handle_start_container(topic_name):
Expand All @@ -38,6 +85,9 @@ def handle_remove_image(topic_name, topic_uuid):
image_name, topic_uuid, request_id, requestor_id
)
print(result)
print(
"\n----------------- REMOVING OPERATION COMPLETED ---------------------------------\n"
)
except Exception as e:
print(e)

Expand Down Expand Up @@ -70,13 +120,67 @@ def handle_list_containers(topic_uuid, requestor_id, request_id):
topic_uuid, requestor_id, request_id, None
)
print(result)
print(
"\n----------------- LISTING OPERATION COMPLETED ---------------------------------\n"
)
except Exception as e:
print(e)


def wait_for_access(json_value):
global registration_id
print("[!] Wait for access")
_id = json_value["command"]["value"]["id"]
try:
if _id == "pep-application_manager":
purpose = json_value["command"]["value"]["message"]["purpose"]
code = json_value["command"]["value"]["message"]["code"]
response_id = json_value["command"]["value"]["message"][
"message_id"
]
if (
purpose == "REGISTER_RESPONSE"
and code == "OK"
and response_id == registration_id
):
return "OK"
else:
return "REGISTRATION DENIED"
except Exception as e:
print(e)


def on_message(ws, message):
global REGISTERED
global session_id
global permit_installation
json_message = json.loads(message)

try:
if "Volatile" in json_message:
json_message = json_message["Volatile"]
json_value = json_message["value"]
purpose = json_value["command"]["value"]["message"]["purpose"]
if purpose == "TRY_RESPONSE":
print("[!] Permit Installation")
handle_pull_image()

if (
json_value["command"]["value"]["topic_name"]
== "application_manager_registration"
and REGISTERED == False
):
access = wait_for_access(json_value)
if access == "OK":
REGISTERED = True
print(
"[!] REGISTRATION OK: Application Manager is registered to UC\n\n"
)
else:
print("[!] Application Manager is not registered to UC")
except Exception as e:
pass

if "Persistent" in json_message:
json_message = json_message["Persistent"]
# Handle messages
Expand All @@ -92,11 +196,29 @@ def on_message(ws, message):
)


global_pull_image_params = None


# Funzione per salvare i parametri di handle_pull_image
def save_pull_image_params(params):
global global_pull_image_params
global_pull_image_params = params


def handle_message(ws, topic_uuid, topic_value, request_id, requestor_id):
global permit_installation
if "operation" in topic_value:
operation = topic_value["operation"]
if operation == "pull_image":
handle_pull_image(ws, topic_value, topic_uuid)
print("[!] Forwarding UCS Request")
UCS_request(ws, topic_value, topic_uuid)
print("[!] Pulling Image Request")
params = {
"ws": ws,
"topic_value": topic_value,
"topic_uuid": topic_uuid,
}
save_pull_image_params(params)
elif operation == "remove_image":
handle_remove_image(topic_value, topic_uuid)
elif operation == "start_container":
Expand All @@ -118,12 +240,17 @@ def on_close(ws, close_status_code, close_msg):


def on_open(ws):
global REGISTERED
global registration_id
print("### Connection established ###")
if REGISTERED == False:
registration_id = security_by_contract.register()
print("[!] The registration ID is : " + str(registration_id))


if __name__ == "__main__":
ws = websocket.WebSocketApp(
"ws://localhost:3000/ws",
"ws://sifis-device4.iit.cnr.it:3000/ws",
on_open=on_open,
on_message=on_message,
on_error=on_error,
Expand Down
Loading

0 comments on commit 1e02e0a

Please sign in to comment.