-
Notifications
You must be signed in to change notification settings - Fork 3
/
database.py
70 lines (58 loc) · 2.65 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import oracledb
import json
import threading
from time import sleep
# Load database credentials
try:
with open("config.json", "r") as config_file:
config = json.load(config_file)
DB_USERNAME = config.get("DB_USERNAME")
DB_PASSWORD = config.get("DB_PASSWORD")
except FileNotFoundError:
print("Error: Database credentials not found in config file.")
exit(1)
def database_execute(commandSQL, parameters = (), fetch="one"):
with oracledb.connect(user=DB_USERNAME, password=DB_PASSWORD, host="adb.eu-marseille-1.oraclecloud.com", dsn="(description= (retry_count=20)(retry_delay=3)(address=(protocol=tcps)(port=1521)(host=adb.eu-marseille-1.oraclecloud.com))(connect_data=(service_name=gff35b25ac52c7a_ggcbkdqvcse0dvpd_low.adb.oraclecloud.com))(security=(ssl_server_dn_match=yes)))") as connection:
with connection.cursor() as cursor:
try:
cur = cursor.execute(commandSQL, parameters)
if fetch == "one":
rows = cur.fetchone()
elif fetch == "all":
rows = cur.fetchall()
elif fetch == "none":
rows = None
else:
print("database_execute only take 'one' and 'all' as paramater for the 'fetch' argument")
connection.commit()
if rows != None:
print(rows)
return rows
return True
except Exception as e :
print("error when running: " + commandSQL + ". Exception : \n" + str(e))
return False
def add_user(name, password, id):
r = database_execute("""insert into USERS (USER_NAME, USER_PASSWORD, USER_ID) values (:username, :password, :id)""", [name, password, id], "none")
if (r == False):
print("Tried to add already existing user.")
return False
return True
def check_user(username, password):
count = database_execute("SELECT count(*) from users WHERE user_name=:username AND user_password=:password", [username,password])[0]
return count == 1
def get_user_id(username):
try :
id = database_execute("SELECT user_id FROM users WHERE user_name=:username",[username])[0]
return id
except:
return None
# Ping database every 12 hours to keep it awake
def ping_database():
while True:
get_user_id("123")
sleep(12*3600)
pinger = threading.Thread(target=ping_database)
pinger.start()
# database_execute("CREATE TABLE users (user_name VARCHAR(64) NOT NULL UNIQUE, user_password VARCHAR(64) NOT NULL, user_id VARCHAR(64) PRIMARY KEY )", [])
# database_execute("DROP TABLE Users", [])