forked from lAmeR1/kaspa-socket-server
-
Notifications
You must be signed in to change notification settings - Fork 3
/
server.py
99 lines (77 loc) · 2.54 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# encoding: utf-8
import os
import socketio
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from fastapi_utils.tasks import repeat_every
from pydantic import BaseModel
from starlette.requests import Request
from starlette.responses import JSONResponse
from karlsend.KarlsendMultiClient import KarlsendMultiClient
sio = socketio.AsyncServer(async_mode="asgi", cors_allowed_origins=[])
socket_app = socketio.ASGIApp(sio)
app = FastAPI(
title="Karlsen REST-API server",
description="This server is to communicate with karlsen network via REST-API",
version=os.getenv("VERSION", "tbd"),
contact={
"name": "lAmeR1"
},
license_info={
"name": "MIT LICENSE"
}
)
app.add_middleware(GZipMiddleware, minimum_size=500)
app.mount("/ws", socket_app)
origins = ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class PingResponse(BaseModel):
serverVersion: str = "0.12.2"
isUtxoIndexed: bool = True
isSynced: bool = True
@app.get("/ping",
include_in_schema=False,
response_model=PingResponse)
async def ping_server():
"""
Ping Pong
"""
try:
info = await karlsend_client.karlsends[0].request("getInfoRequest")
assert info["getInfoResponse"]["isSynced"] is True
return {
"server_version": info["getInfoResponse"]["serverVersion"],
"is_utxo_indexed": info["getInfoResponse"]["isUtxoIndexed"],
"is_synced": info["getInfoResponse"]["isSynced"]
}
except Exception as exc:
raise HTTPException(status_code=500, detail="Karlsend not connected.")
karlsend_hosts = []
for i in range(100):
try:
karlsend_hosts.append(os.environ[f"KARLSEND_HOST_{i + 1}"].strip())
except KeyError:
break
if not karlsend_hosts:
raise Exception('Please set at least KARLSEND_HOST_1 environment variable.')
karlsend_client = KarlsendMultiClient(karlsend_hosts)
@app.exception_handler(Exception)
async def unicorn_exception_handler(request: Request, exc: Exception):
await karlsend_client.initialize_all()
return JSONResponse(
status_code=500,
content={"message": "Internal server error"
# "traceback": f"{traceback.format_exception(exc)}"
},
)
@app.on_event("startup")
@repeat_every(seconds=60)
async def periodical_blockdag():
await karlsend_client.initialize_all()