Skip to content

Commit

Permalink
Python WebServer example
Browse files Browse the repository at this point in the history
  • Loading branch information
Depetrol committed Oct 2, 2024
1 parent 7383059 commit 72f2528
Show file tree
Hide file tree
Showing 6 changed files with 451 additions and 0 deletions.
76 changes: 76 additions & 0 deletions examples/Python/src/WebServer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Web Server

This example shows how to create an HTTP web server backend in Lingua Franca python target.

## Application

In this example, we will build a distributed logging service with two replicated databases, each database with an HTTP web server that handles add log and get log requests from frontend. The HTTP web server backend is `logging.lf`, and the frontend is `logging.html`. Valid requests are the following three kinds:

- Add log: adds a log to the distributed database. The add log request is broadcast to all database replicas.
- Get log: get all historical logs from a single database. This returns without waiting for maximum availability. However, the logs could be out of order and inconsistent with each other.
- Get log consistent: get consistent historical logs from a single database. This request will respond slower but with consistency, meaning requests to different replicas will return identical logs if the requests have the same timestamp.

## HTTP Server

Building an HTTP server in Lingua Franca python target is a nontrivial task for several reasons:

- The HTTP server in python is blocking operation that prevents a reaction from finishing
- Typical python web frameworks use decorator style and require you to return the response in the handler function, but to utilize the full potential of Lingua Franca, we often need to implement logic in different reactions.

To tackle the issues above, we can:

- Start the HTTP server in a separate thread, so it doesn't block the execution of reactions. Handlers act as external triggers to the Lingua Franca program.
- The `WebServer` reactor has a state `events` that is a dictionary of `event_id`->[asyncio event](https://docs.python.org/3/library/asyncio.html)
- The handler will add an event to the state `events` when a request comes in, trigger an action in Lingua Franca, and complete the request when the event is unblocked.

## Minimal +1 Example

First, let's build a minimal web server that adds one to the number in the request. The backend is in `minimal.lf`, frontend is `minimal.html`.

The handler is as follows:

```python
@self.app.post("/addone")
async def addone(request: Request):
event = asyncio.Event()
request_id = str(uuid.uuid4())
self.events[request_id] = event
num = int((await request.json())["data"])
addone_action.schedule(0, [request_id, num])
await event.wait()
num = self.events[request_id]
del self.events[request_id]
return {"status": "success", "num": num}
```

And the reaction to the action is

```python
reaction(addone_action){=
request_id, num = addone_action.value
event = self.events[request_id]
self.events[request_id] = num + 1
event.set()
=}
```

When a request is processed by a handler, a response is generated in the following steps:

1. Create a python async io event and add it to `self.events`
2. Trigger a physical action in Lingua Franca to process the request
3. Block the handler until the event is unblocked by another reaction
4. When the action has been processed, another reaction unblocks the asyncio event
5. The handler can now continue to execute and respond to the web request

## Distributed Logging

![logging](logging.svg)

Now we can implement a distributed logging system by instantiating several `WebServer` on different network ports adding two `Database` for each `WebServer`.

* One `Database` has the STA offset of 0, connected by physical connections. This will prioritize availability and fast response times.
* Another `Database` has the STA offset of 3s(This can be changed), connected by logical connections. This will guarantee that the logs in this `Database` will be consistent as long as out-of-order messages arrive within 3s.

Note that this is implemented with banks and multiports. When sending logs, we want the `WebServer` to send logs to all `Database`, so the `newlog` connection is a multiport; but when getting logs, we want to know the log state of the single corresponding `Database`, which means that there is no multiport for the get log operation.

The `sendlogs` connections from `Database` to `WebServer` are implemented with physical connections. This is because these connections carry no timing semantics -- they simply carry the data to be sent back to the frontend as a response and need to be executed as soon as possible.
139 changes: 139 additions & 0 deletions examples/Python/src/WebServer/logging.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Send and Get Logs</title>
</head>

<body>
<h1>Send and Get Logs</h1>

<label for="portInput">Backend Port:</label>
<input
type="number"
id="portInput"
value="5000"
placeholder="Enter backend port here"
/><br /><br />

<textarea id="logInput" rows="4" cols="50"></textarea><br />

<button onclick="sendLog()">Send Log</button>
<br /><br />

<button onclick="getLogs()">Get Logs</button>
<button onclick="getConsistentLogs()">Get Consistent Logs</button>

<h2>Stored Logs:</h2>
<div id="logOutput"></div>

<script>
let logCounter = 1;

function sendLog() {
const logMessage = document.getElementById("logInput").value;
const port = document.getElementById("portInput").value || 5000;
const fullLogMessage = `Log from ${port}: ${logMessage}`;
const logData = {
log: fullLogMessage,
};

fetch(`http://127.0.0.1:${port}/log`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify(logData),
})
.then((response) => response.json())
.then((data) => {
console.log("Response from server:", data);
incrementLogValue();
})
.catch((error) => console.error("Error:", error));
}

function incrementLogValue() {
document.getElementById("logInput").value = logCounter;
logCounter++;
}

function getLogs() {
const port = document.getElementById("portInput").value || 5000;
const startTime = performance.now();

fetch(`http://127.0.0.1:${port}/getlogs`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
})
.then((response) => response.json())
.then((data) => {
const logOutput = document.getElementById("logOutput");
logOutput.innerHTML = "";

if (data.logs.length === 0) {
logOutput.innerHTML = "No logs available";
} else {
data.logs.forEach((log, index) => {
const logElement = document.createElement("p");
logElement.textContent = `${index + 1}: ${log}`;
logOutput.appendChild(logElement);
});
}

const endTime = performance.now();
const duration = endTime - startTime;
const timeElement = document.createElement("p");
timeElement.textContent = `Time taken to fetch logs: ${duration.toFixed(
2
)} ms`;
logOutput.appendChild(timeElement);
})
.catch((error) => console.error("Error:", error));
}

function getConsistentLogs() {
const port = document.getElementById("portInput").value || 5000;
const startTime = performance.now();

fetch(`http://127.0.0.1:${port}/getlogs_consistent`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
})
.then((response) => response.json())
.then((data) => {
const logOutput = document.getElementById("logOutput");
logOutput.innerHTML = "";

if (data.logs.length === 0) {
logOutput.innerHTML = "No logs available";
} else {
data.logs.forEach((log, index) => {
const logElement = document.createElement("p");
logElement.textContent = `${index + 1}: ${log}`;
logOutput.appendChild(logElement);
});
}

const endTime = performance.now();
const duration = endTime - startTime;
const timeElement = document.createElement("p");
timeElement.textContent = `Time taken to fetch consistent logs: ${duration.toFixed(
2
)} ms`;
logOutput.appendChild(timeElement);
})
.catch((error) => console.error("Error:", error));
}

window.onload = function () {
incrementLogValue();
};
</script>
</body>
</html>
134 changes: 134 additions & 0 deletions examples/Python/src/WebServer/logging.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
target Python{
coordination: decentralized
}
preamble{=
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import threading
import uvicorn
import asyncio
import uuid
import time
import random
=}

reactor WebServer(bank_index = 0, STA=0){
state app
state events
input sendlogs
input sendlogs_consistent
output newlog
output getlog
output getlog_consistent
physical action newlog_action
physical action getlog_action
logical action getlog_consistent_action
reaction(startup) -> newlog_action, getlog_action, getlog_consistent_action{=
self.events = {}
self.app = FastAPI()
self.app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@self.app.post("/log")
async def receive_log(request: Request):
data = await request.json()
if "log" in data:
log_message = data["log"]
newlog_action.schedule(0, log_message)
return {"status": "success", "message": "Log stored successfully"}
else:
return {"status": "error", "message": "No log provided"}, 400
@self.app.post("/getlogs")
async def get_logs(request: Request):
event = asyncio.Event()
request_id = str(uuid.uuid4())
self.events[request_id] = event
getlog_action.schedule(0, request_id)
await event.wait()
logs = self.events[request_id]
del self.events[request_id]
return {"status": "success", "logs": logs}

@self.app.post("/getlogs_consistent")
async def getlogs_consistent(request: Request):
event = asyncio.Event()
request_id = str(uuid.uuid4())
self.events[request_id] = event
getlog_consistent_action.schedule(0, request_id)
await event.wait()
logs = self.events[request_id]
del self.events[request_id]
return {"status": "success", "logs": logs}

def run_fastapi_app():
print(f"[WebServer{self.bank_index}] FastAPI server starting")
uvicorn.run(self.app, host="127.0.0.1", port=5000+self.bank_index, log_level="warning")
fastapi_thread = threading.Thread(target=run_fastapi_app)
fastapi_thread.start()
=}

reaction(newlog_action) -> newlog{=
newlog.set(newlog_action.value)
=}

reaction(getlog_action) -> getlog{=
getlog.set(getlog_action.value)
=}

reaction(getlog_consistent_action) -> getlog_consistent{=
getlog_consistent.set(getlog_consistent_action.value)
=}

reaction(sendlogs){=
[logs, event_id] = sendlogs.value
event = self.events[event_id]
self.events[event_id] = logs
event.set()
=}

reaction(sendlogs_consistent){=
[logs, event_id] = sendlogs_consistent.value
event = self.events[event_id]
self.events[event_id] = logs
event.set()
=}
}

reactor Database(bank_index = 0, portwidth=2, STA = 0s){
state logs = []
input[portwidth] addlog
input getlog
output sendlogs

reaction(startup){=
self.logs = []
=}

reaction(addlog){=
for i, port in enumerate(addlog):
if port.is_present:
log_message = port.value
self.logs.append(log_message)
=}

reaction(getlog) -> sendlogs{=
sendlogs.set([self.logs, getlog.value])
=}
}

federated reactor(ReplicaCount=2){
server = new[ReplicaCount] WebServer()
db = new[ReplicaCount] Database(portwidth=ReplicaCount)
(server.newlog)+ ~> db.addlog
server.getlog ~> db.getlog
db.sendlogs ~> server.sendlogs

dbc = new[ReplicaCount] Database(portwidth=ReplicaCount, STA = 3s)
(server.newlog)+ -> dbc.addlog
server.getlog_consistent -> dbc.getlog
dbc.sendlogs ~> server.sendlogs_consistent
}
1 change: 1 addition & 0 deletions examples/Python/src/WebServer/logging.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
44 changes: 44 additions & 0 deletions examples/Python/src/WebServer/minimal.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>API Request</title>
<script>
async function sendRequest() {
const input = document.getElementById("inputBox").value;

if (isNaN(input) || input === "") {
alert("Please enter a valid number.");
return;
}

const response = await fetch("http://127.0.0.1:5000/addone", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ data: input }),
});

const result = await response.json();
if (result.status === "success") {
document.getElementById("result").innerText = `Number: ${result.num}`;
} else {
document.getElementById("result").innerText = "Error occurred";
}
}
</script>
</head>
<body>
<h1>Send Request to API</h1>
<div>
<label for="inputBox">Input Number:</label>
<input type="number" id="inputBox" placeholder="Enter a number" />
</div>
<button onclick="sendRequest()">Send Request</button>

<h2>Result:</h2>
<pre id="result"></pre>
</body>
</html>
Loading

0 comments on commit 72f2528

Please sign in to comment.