From 2d2483c9bfcb455920129f80e415976a12331652 Mon Sep 17 00:00:00 2001 From: ThaaoBlues Date: Sat, 26 Nov 2022 18:51:48 +0100 Subject: [PATCH] first step of using socketio --- copypasta.py | 375 ++++++++++++++++++------------------ requirements.txt | 1 + templates/img_preview.html | 4 +- templates/index.html | 157 ++++++--------- templates/scan_preview.html | 108 ++++++----- util.py | 74 ++----- 6 files changed, 328 insertions(+), 391 deletions(-) diff --git a/copypasta.py b/copypasta.py index 20b97f8..ca539fd 100644 --- a/copypasta.py +++ b/copypasta.py @@ -23,10 +23,11 @@ from flask_cors import CORS, cross_origin from re import findall -#waitress wsgi server -from waitress import serve -from webtest.http import StopableWSGIServer +# socket io for real time speeeeeed +from flask_socketio import SocketIO + + # to generate app secret key from random import choice @@ -42,10 +43,8 @@ app.secret_key = "".join([choice(printable) for _ in range(256)]) - -# init waitress wsgi server -webserv = StopableWSGIServer(app) - +# init socketio +socketio = SocketIO(app) if getattr(sys, 'frozen', False): @@ -63,8 +62,8 @@ if not path.exists("static/"): emergency_redownload() - - + + def check_exe_name(): if path.basename(__file__).replace(".py",".exe") != "copypasta.exe": rename(path.basename(__file__).replace(".py",".exe"),"copypasta.exe") @@ -76,14 +75,7 @@ def check_exe_name(): # copypasta url - -if is_hosts_file_modified(): - - COPYPASTA_URL = "http://copypasta.me" if system() == "Windows" else "http://copypasta.me:21987" - -else: - - COPYPASTA_URL = "http://127.0.0.1:21987" +COPYPASTA_URL = "http://127.0.0.1:21987" #necessary to update images (stack overflow) @app.after_request @@ -96,6 +88,7 @@ def add_header(response): + #home @app.route("/") @cross_origin() @@ -103,7 +96,6 @@ def home(): if request.remote_addr == "127.0.0.1": - if not path.exists("static/history.xml"): init_history_file() @@ -111,6 +103,7 @@ def home(): #render the html with the history return render_template("index.html",copypasta_url=COPYPASTA_URL,server_version=get_server_version(),hist = get_history(),ip=get_private_ip(),hostname=socket.gethostname(),tab=path.exists("static/tab"),upload_code=get_upload_code(APP_PATH)) + else: return abort(403) @@ -192,77 +185,197 @@ def scan_preview(): else: return abort(403) + + + +# real time processes (socketio) +@socketio.on("[DELETE_FILE_FROM_HIST]") +def delete_file_from_hist(json): + + file_info = get_history_file_by_id(json["file_id"]) + + # this id doest not belongs to any file + if not file_info: + return + + # check if file haven't been deleted by another process (we never know ^^) + if path.exists(file_info["path"]): + remove(file_info["path"]) + delete_history_file_by_id(json["file_id"]) + + # now tell the page to refresh its history content + socketio.emit("fill_history_tab",get_history_json()) + + + +#copy scan from history page +@socketio.on("[COPY_SCAN_FROM_HIST]") +def copy_scan_from_hist(json_data:dict): -#processes -@app.route("/process/") -def process(process_id): - if request.remote_addr == "127.0.0.1": + text = get_history_file_by_id(json_data["scan_id"]) + + if not text: + return jsonify({"Error":"this id does not belongs to any sacn"}) + + copy(text["text"]) + + socketio.emit("[NOTIFY_USER]",{"msg":"Scan copied !"}) - #delete a particular image from history table - if "[DELETE_FILE_FROM_HIST]" in process_id: +@socketio.on("[DELETE_SCAN_FROM_HIST]") +def delete_scan_from_hist(json_data:dict): + + + delete_history_file_by_id(json_data["scan_id"]) + + # now tell the page to refresh its history content + socketio.emit("fill_history_tab",get_history_json()) + + # and notify user ;) + socketio.emit("[NOTIFY_USER]",{"msg":"File deleted from history"}) - try: - file_id = request.args.get("file_id",type=int) - except ValueError: - return jsonify({"Error":"wrong file_id argument type/no argument passed"}) - - file_path = get_history_file_by_id(file_id) - - if not file_path: - - return jsonify({"Error":"this id does not belongs to any file"}) - - # check if file haven't been deleted by another process (we never know ^^) - if path.exists(file_path["path"]): - remove(file_path['path']) +#empty the history files +@socketio.on("[DEL_HISTORY]") +def del_history(json_data:dict): + + + init_history_file(force=True) + + # now tell the page to refresh its history content + socketio.emit("fill_history_tab",get_history_json()) + + + socketio.emit(["NOTIFY_USER"] ,{"msg" : "History deleted !"}) + + +#open new tab on scan received or not +@socketio.on("[CHANGE_TAB_SETTINGS]") +def change_tab_settings(json_data:dict): + + if path.exists("static/tab"): + remove("static/tab") + else: + open("static/tab","w") - delete_history_file_by_id(file_id) + socketio.emit("[NOTIFY_USER]",{"msg":"Settings changed !"}) - return redirect("/") - #open an image preview from image history table - if "[OPEN_IMAGE_SCAN_FROM_HIST]" in process_id: +# open files explorer into the the copypasta directory +socketio.on("[OPEN_FILES_EXPLORER]") +def open_files_explorer(json_data:dict): + + Process(target=startfile,args=(f"{APP_PATH}/static/files_hist",)).start() + + socketio.emit("[NOTIFY_USER]",{"msg":"Opening your files explorer..."}) + + +# open a file with default app +socketio.on("[OPEN_FILE]") +def open_file(json_data:dict): + - try: - image_id = request.args.get("image_id",type=int) - except ValueError: - return jsonify({"Error":"wrong image_id argument type/no argument passed"}) - return redirect(f"/image_preview?image_id={image_id}") + json_dict = get_history_file_by_id(json_data["file_id"]) + + if not json_dict: # id does not exists + socketio.emit("[NOTIFY_USER]",{"msg":"This file does not exists."}) + return + + Process(target=startfile,args=("{}/{}".format(APP_PATH,json_dict["path"]),)).start() + + socketio.emit("[NOTIFY_USER]",{"msg":"Opening your file in the default app..."}) - #copy scan from history page - if "[COPY_SCAN_FROM_HIST]" in process_id: + + +socketio.on("[COPY_WIFI_PW]") +def copy_wifi_pw(json_data:dict): - try: - scan_id = request.args.get("scan_id",type=int) - except ValueError: - return jsonify({"Error":"wrong scan_id argument type/no argument passed"}) - text = get_history_file_by_id(scan_id) + json_dict = get_history_file_by_id(json_data["scan_id"]) + + if not "content" in json_dict.keys(): + socketio.emit("[NOTIFY_USER]",{"msg":"this kind of scan cannot be copied to clipboard"}) + + return + + copy(json_dict['password']) + + socketio.emit("[NOTIFY_USER]",{"msg":"Wifi password copied to clipboard !"}) + + + +# copy text scan content +socketio.on("[COPY_CONTENT]") +def copy_text_scan_content(json_data:dict): - if not text: - return jsonify({"Error":"this id does not belongs to any sacn"}) - copy(text["text"]) + json_dict = get_history_file_by_id(json_data["scan_id"]) + + if not "content" in json_dict.keys(): + + socketio.emit("[NOTIFY_USER]",{"msg":"this kind of scan cannot be copied to clipboard"}) + return + + # finally, if nothing is wrong, copy the scan content + copy(json_dict["content"]) + socketio.emit("[NOTIFY_USER]",{"msg":"Scan copied to clipboard !"}) + + +@socketio.on("[SHUTDOWN_SERVER]") +def shutdown_server(json_data:dict): + + socketio.emit("[NOTIFY_USER]",{"msg":"CopyPasta server is now off. You may close this tab."}) + + socketio.stop() + - return redirect("/") - if "[DELETE_SCAN_FROM_HIST]" in process_id: +#empty the scan temporary file +@socketio.on("[CLEAR_LAST_SCAN]") +def clear_last_scan(json_data:dict): + + # overwrite the temp scan file + with open("static/scan.Blue","w") as f: + f.close() + + socketio.emit("[CLEAR_LAST_SCAN]") + +#copy the scan temporary file to clipboard +@socketio.on("[COPY_LAST_SCAN]") +def copy_last_scan(json_data:dict): + + with open("static/scan.Blue","r") as f: + copy(f.read()) + f.close() + + notify_desktop("CopyPasta","scan copied to clipboard :D") + + + + + +#processes +@app.route("/process/") +def process(process_id): + + if request.remote_addr == "127.0.0.1": + + #open an image preview from image history table + if "[OPEN_IMAGE_SCAN_FROM_HIST]" in process_id: try: - scan_id = request.args.get("scan_id",type=int) + image_id = request.args.get("image_id",type=int) except ValueError: - return jsonify({"Error":"wrong scan_id argument type/no argument passed"}) - - delete_history_file_by_id(scan_id) + return jsonify({"Error":"wrong image_id argument type/no argument passed"}) - return redirect("/") + return redirect(f"/image_preview?image_id={image_id}") + + #download the image received - if "[DOWNLOAD IMG]" in process_id: + if "[DOWNLOAD_IMG]" in process_id: try: @@ -282,26 +395,12 @@ def process(process_id): download_name=secure_filename(image_path.replace("static/files_hist/","")), as_attachment=True) - #empty the scan temporary file - if process_id == "[CLEAR SCAN]": - open("static/scan.Blue","w") - - #redirect to the usual scan preview - return redirect("/scan_preview") - - #copy the scan temporary file to clipboard - if process_id == "[COPY SCAN]": - - with open("static/scan.Blue","r") as f: - copy(f.read()) - f.close() - notify_desktop("CopyPasta","scan copied to clipboard :D") - #redirect to the usual scan preview - return redirect("/scan_preview") + + #copy an image to the clipboard with a win32 api - if "[COPY IMG]" in process_id: + if "[COPY_IMG]" in process_id: try: image_id = request.args.get("image_id",type=int) @@ -339,96 +438,12 @@ def process(process_id): except ImportError: return redirect(f"/image_preview?image_id={image_id}") - - - #empty the history files - if process_id == "[DEL HISTORY]": - init_history_file(force=True) - return redirect("/") - - if process_id == "[HOME]": #redirect to homepage return redirect("/") - - - if process_id == "[CHANGE TAB SETTINGS]": - if path.exists("static/tab"): - remove("static/tab") - else: - open("static/tab","w") - - return "OK" - - if process_id == "[OPEN FILES EXPLORER]": - - Process(target=startfile,args=(f"{APP_PATH}/static/files_hist",)).start() - return redirect("/") - - if process_id == "[OPEN FILE]": - - try: - file_id = request.args.get("file_id",type=int) - except ValueError: - return jsonify({"Error":"invalid url argument"}) - - - json_dict = get_history_file_by_id(file_id) - - if not json_dict: # id does not exists - return jsonify({"Error":"invalid url argument"}) - - Process(target=startfile,args=("{}/{}".format(APP_PATH,json_dict["path"]),)).start() - - return redirect("/") - - - if process_id == "[COPY WIFI PW]": - - try: - scan_id = request.args.get("scan_id",type=int) - except ValueError: - return jsonify({"Error":"invalid url argument"}) - - - json_dict = get_history_file_by_id(scan_id) - - if not json_dict: # id does not exists - return jsonify({"Error":"invalid url argument"}) - - - if not "content" in json_dict.keys(): - return jsonify({"this kind of scan cannot be copied to clipboard"}) - - - copy(json_dict['password']) - - return redirect("/") - - - if process_id == "[COPY CONTENT]": - - try: - scan_id = request.args.get("scan_id",type=int) - except ValueError: - return jsonify({"Error":"invalid url argument"}) - - - json_dict = get_history_file_by_id(scan_id) - - if not json_dict: # id does not exists - return jsonify({"Error":"invalid url argument"}) - - - if not "content" in json_dict.keys(): - return jsonify({"this kind of scan cannot be copied to clipboard"}) - - # finally, if nothing is wrong, copy the scan content - copy(json_dict["content"]) - return redirect("/") - if process_id == "[OPEN VIDEO]": + if process_id == "[OPEN_VIDEO]": try: video_id = request.args.get('video_id',type=int) @@ -449,9 +464,6 @@ def process(process_id): - - - #api url(s) @app.route("/api/") @@ -460,13 +472,7 @@ def api(api_req): if request.remote_addr == "127.0.0.1": - - - if api_req == "get_history": - - return get_history() - - elif api_req == "ping": + if api_req == "ping": return "pong" @@ -494,14 +500,9 @@ def api(api_req): f.close() notify_desktop("Network change detected !","Updating you qr code, you need to rescan it ;)") + return jsonify({"new_ip" : "updating qr code and private ip"}) - elif api_req == "shutdown_server": - webserv.shutdown() - - remove_copypasta_port_redirect() - - return jsonify({"success":"shutting down CopyPasta server..."}) elif api_req == "gen_otdl_url": @@ -588,7 +589,6 @@ def upload(): notify_desktop("New scan Incoming !", "Click to open CopyPasta") - if r != None: try: @@ -832,6 +832,8 @@ def client(): r = get("https://chart.googleapis.com/chart?cht=qr&chs=300x300&chl="+make_qr_url(),allow_redirects=True) + + #write it with open("static/qr.jpeg","wb") as f: f.write(r.content) @@ -846,15 +848,12 @@ def client(): upload_code = gen_upload_code() store_upload_code(APP_PATH,upload_code) - #open tab in web browser Process(target=open_link_process, args=(COPYPASTA_URL,)).start() if not is_server_already_running(): - #run waitress web server - webserv.create(app,host="0.0.0.0",port=21987) - webserv.run() + socketio.run(app,host="0.0.0.0",port=21987) diff --git a/requirements.txt b/requirements.txt index afb1c79..c385414 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,3 +7,4 @@ pillow waitress webtest beautifulsoup4 +flask-socketio diff --git a/templates/img_preview.html b/templates/img_preview.html index 3cbaf4e..3cd0392 100644 --- a/templates/img_preview.html +++ b/templates/img_preview.html @@ -22,10 +22,10 @@ @@ -84,7 +85,7 @@

Last files/data sent :


- + @@ -104,17 +105,7 @@

Last files/data sent :

- - {% with messages = get_flashed_messages() %} {% if messages %} - - {% endif %} {% endwith %} + - + + + + \ No newline at end of file diff --git a/util.py b/util.py index 72bb03c..8971678 100644 --- a/util.py +++ b/util.py @@ -14,7 +14,6 @@ from webbrowser import open as open_tab from ast import literal_eval import requests -from subprocess import Popen, run from functools import partial from win10toast_click import ToastNotifier from win32com.client import Dispatch @@ -79,7 +78,8 @@ def check_templates_update(): with open("static/update.Blue","r") as f: n = int(f.read()) if n == 10: - download_templates() + #download_templates() + pass else: with open("static/update.Blue","w") as f: f.write(str(n+1)) @@ -113,7 +113,7 @@ def emergency_redownload(): - download_templates() + #download_templates() def is_server_already_running(): @@ -194,14 +194,25 @@ def get_history(): history.append("]}") else: history.append("]}") - + return "".join(history) + +def get_history_json()->dict: + + # using lists and join() to speed up + history = {"history":[]} + + for element in ElementTree.parse("static/history.xml").getroot(): + history["history"].append(loads(element.text)) + + return history + def get_history_file_last_id(): return len(ElementTree.parse("static/history.xml").getroot()) -1 -def get_history_file_by_id(file_id): +def get_history_file_by_id(file_id) -> dict: if file_id < len(ElementTree.parse("static/history.xml").getroot()): @@ -271,33 +282,6 @@ def is_online(): return True except OSError: return False - - -def is_hosts_file_modified(): - - hosts_file_path = "C:\Windows\System32\Drivers\etc\hosts" if system() == "Windows" else "/etc/hosts" - - with open(hosts_file_path,"r") as f: - - return True if "copypasta.me" in f.read() else False - - -def add_copypasta_to_hosts_file(): - - hosts_file_path = "C:\Windows\System32\Drivers\etc\hosts" if system() == "Windows" else "/etc/hosts" - - with open(hosts_file_path,"a") as f: - - f.write("\n127.0.0.1:21987\tcopypasta") - - f.close() - - if system() == "Windows": - - # flush dns cache - run("ipconfig /flushdns",shell=True) - - add_copypasta_port_redirect() def get_server_version(): @@ -308,32 +292,6 @@ def get_server_version(): with open("version","r") as f: return f.read() - - - -def add_copypasta_port_redirect(): - - if system() == "Windows": - - # put port redirect from 127.0.0.1:21987 to 127.0.0.1:80 - try: - run("netsh interface portproxy add v4tov4 listenport=80 listenaddress=127.0.0.1 connectport=21987 connectaddress=127.0.0.1") - except: - # feature that may crash sometimes, not essential - pass - - -def remove_copypasta_port_redirect(): - - if system() == "Windows": - - # re-put port redirect from 127.0.0.1:80 to 127.0.0.1:80 - try: - run("netsh interface portproxy add v4tov4 listenport=80 listenaddress=127.0.0.1 connectport=80 connectaddress=127.0.0.1") - except: - # feature that may crash sometimes, not essential - pass - def is_image(file_type:str):