From 72f25284a25310cf7af6d3063d6caf497362df4a Mon Sep 17 00:00:00 2001 From: depetrol Date: Wed, 2 Oct 2024 15:51:53 -0700 Subject: [PATCH 01/13] Python WebServer example --- examples/Python/src/WebServer/README.md | 76 +++++++++++ examples/Python/src/WebServer/logging.html | 139 +++++++++++++++++++++ examples/Python/src/WebServer/logging.lf | 134 ++++++++++++++++++++ examples/Python/src/WebServer/logging.svg | 1 + examples/Python/src/WebServer/minimal.html | 44 +++++++ examples/Python/src/WebServer/minimal.lf | 57 +++++++++ 6 files changed, 451 insertions(+) create mode 100644 examples/Python/src/WebServer/README.md create mode 100644 examples/Python/src/WebServer/logging.html create mode 100644 examples/Python/src/WebServer/logging.lf create mode 100644 examples/Python/src/WebServer/logging.svg create mode 100644 examples/Python/src/WebServer/minimal.html create mode 100644 examples/Python/src/WebServer/minimal.lf diff --git a/examples/Python/src/WebServer/README.md b/examples/Python/src/WebServer/README.md new file mode 100644 index 00000000..8dd57743 --- /dev/null +++ b/examples/Python/src/WebServer/README.md @@ -0,0 +1,76 @@ +# Web Server + +This example shows how to create an HTTP web server backend in Lingua Franca python target. + +## Application + +In this example, we will build a distributed logging service with two replicated databases, each database with an HTTP web server that handles add log and get log requests from frontend. The HTTP web server backend is `logging.lf`, and the frontend is `logging.html`. Valid requests are the following three kinds: + +- Add log: adds a log to the distributed database. The add log request is broadcast to all database replicas. +- Get log: get all historical logs from a single database. This returns without waiting for maximum availability. However, the logs could be out of order and inconsistent with each other. +- Get log consistent: get consistent historical logs from a single database. This request will respond slower but with consistency, meaning requests to different replicas will return identical logs if the requests have the same timestamp. + +## HTTP Server + +Building an HTTP server in Lingua Franca python target is a nontrivial task for several reasons: + +- The HTTP server in python is blocking operation that prevents a reaction from finishing +- Typical python web frameworks use decorator style and require you to return the response in the handler function, but to utilize the full potential of Lingua Franca, we often need to implement logic in different reactions. + +To tackle the issues above, we can: + +- Start the HTTP server in a separate thread, so it doesn't block the execution of reactions. Handlers act as external triggers to the Lingua Franca program. +- The `WebServer` reactor has a state `events` that is a dictionary of `event_id`->[asyncio event](https://docs.python.org/3/library/asyncio.html) +- The handler will add an event to the state `events` when a request comes in, trigger an action in Lingua Franca, and complete the request when the event is unblocked. + +## Minimal +1 Example + +First, let's build a minimal web server that adds one to the number in the request. The backend is in `minimal.lf`, frontend is `minimal.html`. + +The handler is as follows: + +```python +@self.app.post("/addone") +async def addone(request: Request): + event = asyncio.Event() + request_id = str(uuid.uuid4()) + self.events[request_id] = event + num = int((await request.json())["data"]) + addone_action.schedule(0, [request_id, num]) + await event.wait() + num = self.events[request_id] + del self.events[request_id] + return {"status": "success", "num": num} +``` + +And the reaction to the action is + +```python +reaction(addone_action){= + request_id, num = addone_action.value + event = self.events[request_id] + self.events[request_id] = num + 1 + event.set() +=} +``` + +When a request is processed by a handler, a response is generated in the following steps: + +1. Create a python async io event and add it to `self.events` +2. Trigger a physical action in Lingua Franca to process the request +3. Block the handler until the event is unblocked by another reaction +4. When the action has been processed, another reaction unblocks the asyncio event +5. The handler can now continue to execute and respond to the web request + +## Distributed Logging + +![logging](logging.svg) + +Now we can implement a distributed logging system by instantiating several `WebServer` on different network ports adding two `Database` for each `WebServer`. + +* One `Database` has the STA offset of 0, connected by physical connections. This will prioritize availability and fast response times. +* Another `Database` has the STA offset of 3s(This can be changed), connected by logical connections. This will guarantee that the logs in this `Database` will be consistent as long as out-of-order messages arrive within 3s. + +Note that this is implemented with banks and multiports. When sending logs, we want the `WebServer` to send logs to all `Database`, so the `newlog` connection is a multiport; but when getting logs, we want to know the log state of the single corresponding `Database`, which means that there is no multiport for the get log operation. + +The `sendlogs` connections from `Database` to `WebServer` are implemented with physical connections. This is because these connections carry no timing semantics -- they simply carry the data to be sent back to the frontend as a response and need to be executed as soon as possible. diff --git a/examples/Python/src/WebServer/logging.html b/examples/Python/src/WebServer/logging.html new file mode 100644 index 00000000..a7d671e0 --- /dev/null +++ b/examples/Python/src/WebServer/logging.html @@ -0,0 +1,139 @@ + + + + + + Send and Get Logs + + + +

Send and Get Logs

+ + +

+ +
+ + +

+ + + + +

Stored Logs:

+
+ + + + diff --git a/examples/Python/src/WebServer/logging.lf b/examples/Python/src/WebServer/logging.lf new file mode 100644 index 00000000..67d22c8f --- /dev/null +++ b/examples/Python/src/WebServer/logging.lf @@ -0,0 +1,134 @@ +target Python{ + coordination: decentralized +} +preamble{= + from fastapi import FastAPI, Request, HTTPException + from fastapi.middleware.cors import CORSMiddleware + import threading + import uvicorn + import asyncio + import uuid + import time + import random +=} + +reactor WebServer(bank_index = 0, STA=0){ + state app + state events + input sendlogs + input sendlogs_consistent + output newlog + output getlog + output getlog_consistent + physical action newlog_action + physical action getlog_action + logical action getlog_consistent_action + reaction(startup) -> newlog_action, getlog_action, getlog_consistent_action{= + self.events = {} + self.app = FastAPI() + self.app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) + @self.app.post("/log") + async def receive_log(request: Request): + data = await request.json() + if "log" in data: + log_message = data["log"] + newlog_action.schedule(0, log_message) + return {"status": "success", "message": "Log stored successfully"} + else: + return {"status": "error", "message": "No log provided"}, 400 + @self.app.post("/getlogs") + async def get_logs(request: Request): + event = asyncio.Event() + request_id = str(uuid.uuid4()) + self.events[request_id] = event + getlog_action.schedule(0, request_id) + await event.wait() + logs = self.events[request_id] + del self.events[request_id] + return {"status": "success", "logs": logs} + + @self.app.post("/getlogs_consistent") + async def getlogs_consistent(request: Request): + event = asyncio.Event() + request_id = str(uuid.uuid4()) + self.events[request_id] = event + getlog_consistent_action.schedule(0, request_id) + await event.wait() + logs = self.events[request_id] + del self.events[request_id] + return {"status": "success", "logs": logs} + + def run_fastapi_app(): + print(f"[WebServer{self.bank_index}] FastAPI server starting") + uvicorn.run(self.app, host="127.0.0.1", port=5000+self.bank_index, log_level="warning") + fastapi_thread = threading.Thread(target=run_fastapi_app) + fastapi_thread.start() + =} + + reaction(newlog_action) -> newlog{= + newlog.set(newlog_action.value) + =} + + reaction(getlog_action) -> getlog{= + getlog.set(getlog_action.value) + =} + + reaction(getlog_consistent_action) -> getlog_consistent{= + getlog_consistent.set(getlog_consistent_action.value) + =} + + reaction(sendlogs){= + [logs, event_id] = sendlogs.value + event = self.events[event_id] + self.events[event_id] = logs + event.set() + =} + + reaction(sendlogs_consistent){= + [logs, event_id] = sendlogs_consistent.value + event = self.events[event_id] + self.events[event_id] = logs + event.set() + =} +} + +reactor Database(bank_index = 0, portwidth=2, STA = 0s){ + state logs = [] + input[portwidth] addlog + input getlog + output sendlogs + + reaction(startup){= + self.logs = [] + =} + + reaction(addlog){= + for i, port in enumerate(addlog): + if port.is_present: + log_message = port.value + self.logs.append(log_message) + =} + + reaction(getlog) -> sendlogs{= + sendlogs.set([self.logs, getlog.value]) + =} +} + +federated reactor(ReplicaCount=2){ + server = new[ReplicaCount] WebServer() + db = new[ReplicaCount] Database(portwidth=ReplicaCount) + (server.newlog)+ ~> db.addlog + server.getlog ~> db.getlog + db.sendlogs ~> server.sendlogs + + dbc = new[ReplicaCount] Database(portwidth=ReplicaCount, STA = 3s) + (server.newlog)+ -> dbc.addlog + server.getlog_consistent -> dbc.getlog + dbc.sendlogs ~> server.sendlogs_consistent +} \ No newline at end of file diff --git a/examples/Python/src/WebServer/logging.svg b/examples/Python/src/WebServer/logging.svg new file mode 100644 index 00000000..62752b83 --- /dev/null +++ b/examples/Python/src/WebServer/logging.svg @@ -0,0 +1 @@ +loggingWebServer2sendlogssendlogs_consistentnewloggetloggetlog_consistentDatabase2addloggetlogsendlogsDatabase2addloggetlogsendlogs$$ \ No newline at end of file diff --git a/examples/Python/src/WebServer/minimal.html b/examples/Python/src/WebServer/minimal.html new file mode 100644 index 00000000..908bad09 --- /dev/null +++ b/examples/Python/src/WebServer/minimal.html @@ -0,0 +1,44 @@ + + + + + + API Request + + + +

Send Request to API

+
+ + +
+ + +

Result:

+

+  
+
diff --git a/examples/Python/src/WebServer/minimal.lf b/examples/Python/src/WebServer/minimal.lf
new file mode 100644
index 00000000..04f5f9d0
--- /dev/null
+++ b/examples/Python/src/WebServer/minimal.lf
@@ -0,0 +1,57 @@
+target Python{
+    coordination: decentralized
+}
+preamble{=
+    from fastapi import FastAPI, Request, HTTPException
+    from fastapi.middleware.cors import CORSMiddleware
+    import threading
+    import uvicorn
+    import asyncio
+    import uuid
+=}
+
+reactor WebServer(bank_index = 0, STA=0){
+    state app
+    state events
+    physical action addone_action
+
+    reaction(startup) -> addone_action{=
+        self.events = {}
+        self.app = FastAPI()
+        self.app.add_middleware(
+            CORSMiddleware,
+            allow_origins=["*"],
+            allow_credentials=True,
+            allow_methods=["*"],
+            allow_headers=["*"],
+        )
+        @self.app.post("/addone")
+        async def addone(request: Request):
+            event = asyncio.Event()
+            request_id = str(uuid.uuid4())
+            self.events[request_id] = event
+            num = int((await request.json())["data"])
+            addone_action.schedule(0, [request_id, num])
+            await event.wait()
+            num = self.events[request_id]
+            del self.events[request_id]
+            return {"status": "success", "num": num}
+
+        def run_fastapi_app():
+            print(f"[WebServer{self.bank_index}] FastAPI server starting")
+            uvicorn.run(self.app, host="127.0.0.1", port=5000+self.bank_index, log_level="warning")
+        fastapi_thread = threading.Thread(target=run_fastapi_app)
+        fastapi_thread.start()
+    =}
+
+    reaction(addone_action){=
+        request_id, num = addone_action.value
+        event = self.events[request_id]
+        self.events[request_id] = num + 1
+        event.set()
+    =}
+}
+
+federated reactor{
+    server = new WebServer()
+}
\ No newline at end of file

From ba492d5c5349a9ea3e6e88bddd6d0912ddbdfa90 Mon Sep 17 00:00:00 2001
From: depetrol 
Date: Tue, 8 Oct 2024 15:59:28 -0700
Subject: [PATCH 02/13] format

---
 examples/Python/src/WebServer/logging.lf | 256 ++++++++++++-----------
 examples/Python/src/WebServer/minimal.lf |  99 ++++-----
 2 files changed, 179 insertions(+), 176 deletions(-)

diff --git a/examples/Python/src/WebServer/logging.lf b/examples/Python/src/WebServer/logging.lf
index 67d22c8f..0eca3e97 100644
--- a/examples/Python/src/WebServer/logging.lf
+++ b/examples/Python/src/WebServer/logging.lf
@@ -1,134 +1,136 @@
-target Python{
-    coordination: decentralized
+target Python {
+  coordination: decentralized
 }
-preamble{=
-    from fastapi import FastAPI, Request, HTTPException
-    from fastapi.middleware.cors import CORSMiddleware
-    import threading
-    import uvicorn
-    import asyncio
-    import uuid
-    import time
-    import random
+
+preamble {=
+  from fastapi import FastAPI, Request, HTTPException
+  from fastapi.middleware.cors import CORSMiddleware
+  import threading
+  import uvicorn
+  import asyncio
+  import uuid
+  import time
+  import random
 =}
 
-reactor WebServer(bank_index = 0, STA=0){
-    state app
-    state events
-    input sendlogs
-    input sendlogs_consistent
-    output newlog
-    output getlog
-    output getlog_consistent
-    physical action newlog_action
-    physical action getlog_action
-    logical action getlog_consistent_action
-    reaction(startup) -> newlog_action, getlog_action, getlog_consistent_action{=
-        self.events = {}
-        self.app = FastAPI()
-        self.app.add_middleware(
-            CORSMiddleware,
-            allow_origins=["*"],
-            allow_credentials=True,
-            allow_methods=["*"],
-            allow_headers=["*"],
-        )
-        @self.app.post("/log")
-        async def receive_log(request: Request):
-            data = await request.json()
-            if "log" in data:
-                log_message = data["log"]
-                newlog_action.schedule(0, log_message)
-                return {"status": "success", "message": "Log stored successfully"}
-            else:
-                return {"status": "error", "message": "No log provided"}, 400
-        @self.app.post("/getlogs")
-        async def get_logs(request: Request):
-            event = asyncio.Event()
-            request_id = str(uuid.uuid4())
-            self.events[request_id] = event
-            getlog_action.schedule(0, request_id)
-            await event.wait()
-            logs = self.events[request_id]
-            del self.events[request_id]
-            return {"status": "success", "logs": logs}
-
-        @self.app.post("/getlogs_consistent")
-        async def getlogs_consistent(request: Request):
-            event = asyncio.Event()
-            request_id = str(uuid.uuid4())
-            self.events[request_id] = event
-            getlog_consistent_action.schedule(0, request_id)
-            await event.wait()
-            logs = self.events[request_id]
-            del self.events[request_id]
-            return {"status": "success", "logs": logs}
-
-        def run_fastapi_app():
-            print(f"[WebServer{self.bank_index}] FastAPI server starting")
-            uvicorn.run(self.app, host="127.0.0.1", port=5000+self.bank_index, log_level="warning")
-        fastapi_thread = threading.Thread(target=run_fastapi_app)
-        fastapi_thread.start()
-    =}
-
-    reaction(newlog_action) -> newlog{=
-        newlog.set(newlog_action.value)
-    =}
-
-    reaction(getlog_action) -> getlog{=
-        getlog.set(getlog_action.value)
-    =}
-
-    reaction(getlog_consistent_action) -> getlog_consistent{=
-        getlog_consistent.set(getlog_consistent_action.value)
-    =}
-    
-    reaction(sendlogs){=
-        [logs, event_id] = sendlogs.value
-        event = self.events[event_id]
-        self.events[event_id] = logs
-        event.set()
-    =}
-
-    reaction(sendlogs_consistent){=
-        [logs, event_id] = sendlogs_consistent.value
-        event = self.events[event_id]
-        self.events[event_id] = logs
-        event.set()
-    =}
+reactor WebServer(bank_index=0, STA=0) {
+  state app
+  state events
+  input sendlogs
+  input sendlogs_consistent
+  output newlog
+  output getlog
+  output getlog_consistent
+  physical action newlog_action
+  physical action getlog_action
+  logical action getlog_consistent_action
+
+  reaction(startup) -> newlog_action, getlog_action, getlog_consistent_action {=
+    self.events = {}
+    self.app = FastAPI()
+    self.app.add_middleware(
+        CORSMiddleware,
+        allow_origins=["*"],
+        allow_credentials=True,
+        allow_methods=["*"],
+        allow_headers=["*"],
+    )
+    @self.app.post("/log")
+    async def receive_log(request: Request):
+        data = await request.json()
+        if "log" in data:
+            log_message = data["log"]
+            newlog_action.schedule(0, log_message)
+            return {"status": "success", "message": "Log stored successfully"}
+        else:
+            return {"status": "error", "message": "No log provided"}, 400
+    @self.app.post("/getlogs")
+    async def get_logs(request: Request):
+        event = asyncio.Event()
+        request_id = str(uuid.uuid4())
+        self.events[request_id] = event
+        getlog_action.schedule(0, request_id)
+        await event.wait()
+        logs = self.events[request_id]
+        del self.events[request_id]
+        return {"status": "success", "logs": logs}
+
+    @self.app.post("/getlogs_consistent")
+    async def getlogs_consistent(request: Request):
+        event = asyncio.Event()
+        request_id = str(uuid.uuid4())
+        self.events[request_id] = event
+        getlog_consistent_action.schedule(0, request_id)
+        await event.wait()
+        logs = self.events[request_id]
+        del self.events[request_id]
+        return {"status": "success", "logs": logs}
+
+    def run_fastapi_app():
+        print(f"[WebServer{self.bank_index}] FastAPI server starting")
+        uvicorn.run(self.app, host="127.0.0.1", port=5000+self.bank_index, log_level="warning")
+    fastapi_thread = threading.Thread(target=run_fastapi_app)
+    fastapi_thread.start()
+  =}
+
+  reaction(newlog_action) -> newlog {=
+    newlog.set(newlog_action.value)
+  =}
+
+  reaction(getlog_action) -> getlog {=
+    getlog.set(getlog_action.value)
+  =}
+
+  reaction(getlog_consistent_action) -> getlog_consistent {=
+    getlog_consistent.set(getlog_consistent_action.value)
+  =}
+
+  reaction(sendlogs) {=
+    [logs, event_id] = sendlogs.value
+    event = self.events[event_id]
+    self.events[event_id] = logs
+    event.set()
+  =}
+
+  reaction(sendlogs_consistent) {=
+    [logs, event_id] = sendlogs_consistent.value
+    event = self.events[event_id]
+    self.events[event_id] = logs
+    event.set()
+  =}
 }
 
-reactor Database(bank_index = 0, portwidth=2, STA = 0s){
-    state logs = []
-    input[portwidth] addlog
-    input getlog
-    output sendlogs
-
-    reaction(startup){=
-        self.logs = []
-    =}
-
-    reaction(addlog){=
-        for i, port in enumerate(addlog):
-            if port.is_present:
-                log_message = port.value
-                self.logs.append(log_message)
-    =}
-
-    reaction(getlog) -> sendlogs{=
-        sendlogs.set([self.logs, getlog.value])
-    =}
+reactor Database(bank_index=0, portwidth=2, STA = 0 s) {
+  state logs = []
+  input[portwidth] addlog
+  input getlog
+  output sendlogs
+
+  reaction(startup) {=
+    self.logs = []
+  =}
+
+  reaction(addlog) {=
+    for i, port in enumerate(addlog):
+        if port.is_present:
+            log_message = port.value
+            self.logs.append(log_message)
+  =}
+
+  reaction(getlog) -> sendlogs {=
+    sendlogs.set([self.logs, getlog.value])
+  =}
 }
 
-federated reactor(ReplicaCount=2){
-    server = new[ReplicaCount] WebServer()
-    db = new[ReplicaCount] Database(portwidth=ReplicaCount)
-    (server.newlog)+ ~> db.addlog
-    server.getlog ~> db.getlog
-    db.sendlogs ~> server.sendlogs
-
-    dbc = new[ReplicaCount] Database(portwidth=ReplicaCount, STA = 3s)
-    (server.newlog)+ -> dbc.addlog
-    server.getlog_consistent -> dbc.getlog
-    dbc.sendlogs ~> server.sendlogs_consistent
-}
\ No newline at end of file
+federated reactor(ReplicaCount=2) {
+  server = new[ReplicaCount] WebServer()
+  db = new[ReplicaCount] Database(portwidth=ReplicaCount)
+  (server.newlog)+ ~> db.addlog
+  server.getlog ~> db.getlog
+  db.sendlogs ~> server.sendlogs
+
+  dbc = new[ReplicaCount] Database(portwidth=ReplicaCount, STA = 3 s)
+  (server.newlog)+ -> dbc.addlog
+  server.getlog_consistent -> dbc.getlog
+  dbc.sendlogs ~> server.sendlogs_consistent
+}
diff --git a/examples/Python/src/WebServer/minimal.lf b/examples/Python/src/WebServer/minimal.lf
index 04f5f9d0..2b727d74 100644
--- a/examples/Python/src/WebServer/minimal.lf
+++ b/examples/Python/src/WebServer/minimal.lf
@@ -1,57 +1,58 @@
-target Python{
-    coordination: decentralized
+target Python {
+  coordination: decentralized
 }
-preamble{=
-    from fastapi import FastAPI, Request, HTTPException
-    from fastapi.middleware.cors import CORSMiddleware
-    import threading
-    import uvicorn
-    import asyncio
-    import uuid
+
+preamble {=
+  from fastapi import FastAPI, Request, HTTPException
+  from fastapi.middleware.cors import CORSMiddleware
+  import threading
+  import uvicorn
+  import asyncio
+  import uuid
 =}
 
-reactor WebServer(bank_index = 0, STA=0){
-    state app
-    state events
-    physical action addone_action
+reactor WebServer(bank_index=0, STA=0) {
+  state app
+  state events
+  physical action addone_action
 
-    reaction(startup) -> addone_action{=
-        self.events = {}
-        self.app = FastAPI()
-        self.app.add_middleware(
-            CORSMiddleware,
-            allow_origins=["*"],
-            allow_credentials=True,
-            allow_methods=["*"],
-            allow_headers=["*"],
-        )
-        @self.app.post("/addone")
-        async def addone(request: Request):
-            event = asyncio.Event()
-            request_id = str(uuid.uuid4())
-            self.events[request_id] = event
-            num = int((await request.json())["data"])
-            addone_action.schedule(0, [request_id, num])
-            await event.wait()
-            num = self.events[request_id]
-            del self.events[request_id]
-            return {"status": "success", "num": num}
+  reaction(startup) -> addone_action {=
+    self.events = {}
+    self.app = FastAPI()
+    self.app.add_middleware(
+        CORSMiddleware,
+        allow_origins=["*"],
+        allow_credentials=True,
+        allow_methods=["*"],
+        allow_headers=["*"],
+    )
+    @self.app.post("/addone")
+    async def addone(request: Request):
+        event = asyncio.Event()
+        request_id = str(uuid.uuid4())
+        self.events[request_id] = event
+        num = int((await request.json())["data"])
+        addone_action.schedule(0, [request_id, num])
+        await event.wait()
+        num = self.events[request_id]
+        del self.events[request_id]
+        return {"status": "success", "num": num}
 
-        def run_fastapi_app():
-            print(f"[WebServer{self.bank_index}] FastAPI server starting")
-            uvicorn.run(self.app, host="127.0.0.1", port=5000+self.bank_index, log_level="warning")
-        fastapi_thread = threading.Thread(target=run_fastapi_app)
-        fastapi_thread.start()
-    =}
+    def run_fastapi_app():
+        print(f"[WebServer{self.bank_index}] FastAPI server starting")
+        uvicorn.run(self.app, host="127.0.0.1", port=5000+self.bank_index, log_level="warning")
+    fastapi_thread = threading.Thread(target=run_fastapi_app)
+    fastapi_thread.start()
+  =}
 
-    reaction(addone_action){=
-        request_id, num = addone_action.value
-        event = self.events[request_id]
-        self.events[request_id] = num + 1
-        event.set()
-    =}
+  reaction(addone_action) {=
+    request_id, num = addone_action.value
+    event = self.events[request_id]
+    self.events[request_id] = num + 1
+    event.set()
+  =}
 }
 
-federated reactor{
-    server = new WebServer()
-}
\ No newline at end of file
+federated reactor {
+  server = new WebServer()
+}

From faf5ae27474f173027c4b7915dd2669a6cbb30ab Mon Sep 17 00:00:00 2001
From: Shulu Li <65802727+Depetrol@users.noreply.github.com>
Date: Wed, 9 Oct 2024 17:41:33 -0700
Subject: [PATCH 03/13] Document suggestions from code review

Co-authored-by: Edward A. Lee 
---
 examples/Python/src/WebServer/README.md | 26 ++++++++++++-------------
 1 file changed, 13 insertions(+), 13 deletions(-)

diff --git a/examples/Python/src/WebServer/README.md b/examples/Python/src/WebServer/README.md
index 8dd57743..f4e1fc43 100644
--- a/examples/Python/src/WebServer/README.md
+++ b/examples/Python/src/WebServer/README.md
@@ -7,15 +7,15 @@ This example shows how to create an HTTP web server backend in Lingua Franca pyt
 In this example, we will build a distributed logging service with two replicated databases, each database with an HTTP web server that handles add log and get log requests from frontend. The HTTP web server backend is `logging.lf`, and the frontend is `logging.html`. Valid requests are the following three kinds:
 
 - Add log: adds a log to the distributed database. The add log request is broadcast to all database replicas.
-- Get log: get all historical logs from a single database. This returns without waiting for maximum availability. However, the logs could be out of order and inconsistent with each other.
+- Get log: get all historical logs from a single database. This returns without waiting for consistency, so the logs could be out of order and inconsistent with each other.
 - Get log consistent: get consistent historical logs from a single database. This request will respond slower but with consistency, meaning requests to different replicas will return identical logs if the requests have the same timestamp.
 
 ## HTTP Server
 
 Building an HTTP server in Lingua Franca python target is a nontrivial task for several reasons:
 
-- The HTTP server in python is blocking operation that prevents a reaction from finishing
-- Typical python web frameworks use decorator style and require you to return the response in the handler function, but to utilize the full potential of Lingua Franca, we often need to implement logic in different reactions.
+- The HTTP server in python is a blocking operation that prevents a reaction from finishing.
+- Typical python web frameworks use a decorator style and require you to return the response in the handler function, but to utilize the full potential of Lingua Franca, we often need to implement logic in different reactions.
 
 To tackle the issues above, we can:
 
@@ -25,7 +25,7 @@ To tackle the issues above, we can:
 
 ## Minimal +1 Example
 
-First, let's build a minimal web server that adds one to the number in the request. The backend is in `minimal.lf`, frontend is `minimal.html`.
+First, let's build a minimal web server that adds one to the number in the request. The backend is in `minimal.lf`, and frontend is `minimal.html`.
 
 The handler is as follows:
 
@@ -56,21 +56,21 @@ reaction(addone_action){=
 
 When a request is processed by a handler, a response is generated in the following steps:
 
-1. Create a python async io event and add it to `self.events`
-2. Trigger a physical action in Lingua Franca to process the request
-3. Block the handler until the event is unblocked by another reaction
-4. When the action has been processed, another reaction unblocks the asyncio event
-5. The handler can now continue to execute and respond to the web request
+1. Create a python async io event and add it to `self.events`.
+2. Trigger a physical action in Lingua Franca to process the request.
+3. Block the handler until the event is unblocked by another reaction.
+4. When the action has been processed, another reaction unblocks the asyncio event.
+5. The handler can now continue to execute and respond to the web request.
 
 ## Distributed Logging
 
 ![logging](logging.svg)
 
-Now we can implement a distributed logging system by instantiating several `WebServer` on different network ports adding two `Database` for each `WebServer`.
+Now we can implement a distributed logging system by instantiating several `WebServer` reactors on different network ports and adding two `Database` reactors for each `WebServer`.
 
-* One `Database` has the STA offset of 0, connected by physical connections. This will prioritize availability and fast response times.
-* Another `Database` has the STA offset of 3s(This can be changed), connected by logical connections. This will guarantee that the logs in this `Database` will be consistent as long as out-of-order messages arrive within 3s.
+* One `Database` reactor has an STA offset of 0 and is connected by physical connections. This will prioritize availability, generating a quick response that is not (necessarily) consistent.
+* Another `Database` reactor has an STA offset of 3s (this can be changed) and is connected by logical connections. This will guarantee that the logs in this `Database` reactor will be consistent as long as out-of-order messages arrive within 3s.
 
-Note that this is implemented with banks and multiports. When sending logs, we want the `WebServer` to send logs to all `Database`, so the `newlog` connection is a multiport; but when getting logs, we want to know the log state of the single corresponding `Database`, which means that there is no multiport for the get log operation.
+Note that this is implemented with banks and multiports. When sending logs, we want the `WebServer` to send logs to all `Database` reactors, so the `newlog` connection is a multiport; but when getting logs, we want to know the log state of the single corresponding `Database` reactor, which means that there is no multiport for the get log operation.
 
 The `sendlogs` connections from `Database` to `WebServer` are implemented with physical connections. This is because these connections carry no timing semantics -- they simply carry the data to be sent back to the frontend as a response and need to be executed as soon as possible.

From 6593008d8ecfd406780827d50f613a3af30973e4 Mon Sep 17 00:00:00 2001
From: depetrol 
Date: Wed, 9 Oct 2024 17:45:28 -0700
Subject: [PATCH 04/13] remove unused imports in preamble

---
 examples/Python/src/WebServer/logging.lf | 2 --
 1 file changed, 2 deletions(-)

diff --git a/examples/Python/src/WebServer/logging.lf b/examples/Python/src/WebServer/logging.lf
index 0eca3e97..0388818c 100644
--- a/examples/Python/src/WebServer/logging.lf
+++ b/examples/Python/src/WebServer/logging.lf
@@ -9,8 +9,6 @@ preamble {=
   import uvicorn
   import asyncio
   import uuid
-  import time
-  import random
 =}
 
 reactor WebServer(bank_index=0, STA=0) {

From 6ba838a304d0e27fcc89f95cede133af207d9e9e Mon Sep 17 00:00:00 2001
From: depetrol 
Date: Wed, 9 Oct 2024 18:04:38 -0700
Subject: [PATCH 05/13] update docs

---
 examples/Python/src/WebServer/README.md | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/examples/Python/src/WebServer/README.md b/examples/Python/src/WebServer/README.md
index f4e1fc43..465f7c53 100644
--- a/examples/Python/src/WebServer/README.md
+++ b/examples/Python/src/WebServer/README.md
@@ -43,6 +43,8 @@ async def addone(request: Request):
     return {"status": "success", "num": num}
 ```
 
+`self` here refers to the `WebServer` reactor in which the handler is defined. The `self.app` is an instance of `FastAPI` application instance, defined as a state of the `WebServer` reactor. Import statements are in the `preamble` and not shown here for simplicity. This handler function will be triggered to generate a response to an HTTP `POST` request at the `/addone` endpoint.
+
 And the reaction to the action is
 
 ```python

From 42763e7e3e85aa04833433b2d55875536e9bc9e5 Mon Sep 17 00:00:00 2001
From: depetrol 
Date: Wed, 9 Oct 2024 18:54:22 -0700
Subject: [PATCH 06/13] feat: reusable WebServer reactor

---
 examples/Python/src/WebServer/README.md       | 39 ++++++++++++-
 .../Python/src/WebServer/minimal_with_lib.lf  | 25 ++++++++
 examples/Python/src/lib/WebServer.lf          | 58 +++++++++++++++++++
 3 files changed, 119 insertions(+), 3 deletions(-)
 create mode 100644 examples/Python/src/WebServer/minimal_with_lib.lf
 create mode 100644 examples/Python/src/lib/WebServer.lf

diff --git a/examples/Python/src/WebServer/README.md b/examples/Python/src/WebServer/README.md
index 465f7c53..3eb1fce7 100644
--- a/examples/Python/src/WebServer/README.md
+++ b/examples/Python/src/WebServer/README.md
@@ -64,6 +64,41 @@ When a request is processed by a handler, a response is generated in the followi
 4. When the action has been processed, another reaction unblocks the asyncio event.
 5. The handler can now continue to execute and respond to the web request.
 
+## Minimal +1 Example with WebServer Library
+
+We can also build the +1 example with the prebuilt `WebServer` library at `../lib/WebServer.lf` that modularizes the web server. You only have to implement the following code to accomplish the same functionality, as demonstrated in `minimal_with_lib.lf`:
+
+```python
+target Python {
+  coordination: decentralized
+}
+
+import WebServer from "../lib/WebServer.lf"
+
+reactor Handler {
+  input request
+  output response
+
+  reaction(request) -> response {=
+    request_id, req_data = request.value
+    num = int(req_data["data"])
+    num += 1
+    resp = {"status": "success", "num": num}
+    response.set([request_id, resp])
+  =}
+}
+
+federated reactor {
+  server = new WebServer(endpoint="/addone")
+  handler = new Handler()
+  server.request -> handler.request
+  handler.response ~> server.response
+}
+
+```
+
+Note that the `request_id` has to be sent to and from the `Handler` reactor so that the `WebServer` knows which request to respond to. Also, notice that the response is connected with a physical connection `~>`, this is because these connections carry no timing semantics -- they simply carry the data to be sent back to the frontend as a response and need to be executed as soon as possible. This also prevents an STP violation from being triggered.
+
 ## Distributed Logging
 
 ![logging](logging.svg)
@@ -73,6 +108,4 @@ Now we can implement a distributed logging system by instantiating several `WebS
 * One `Database` reactor has an STA offset of 0 and is connected by physical connections. This will prioritize availability, generating a quick response that is not (necessarily) consistent.
 * Another `Database` reactor has an STA offset of 3s (this can be changed) and is connected by logical connections. This will guarantee that the logs in this `Database` reactor will be consistent as long as out-of-order messages arrive within 3s.
 
-Note that this is implemented with banks and multiports. When sending logs, we want the `WebServer` to send logs to all `Database` reactors, so the `newlog` connection is a multiport; but when getting logs, we want to know the log state of the single corresponding `Database` reactor, which means that there is no multiport for the get log operation.
-
-The `sendlogs` connections from `Database` to `WebServer` are implemented with physical connections. This is because these connections carry no timing semantics -- they simply carry the data to be sent back to the frontend as a response and need to be executed as soon as possible.
+Note that this is implemented with banks and multiports. When sending logs, we want the `WebServer` to send logs to all `Database` reactors, so the `newlog` connection is a multiport; but when getting logs, we want to know the log state of the single corresponding `Database` reactor, which means that there is no multiport for the get log operation.
\ No newline at end of file
diff --git a/examples/Python/src/WebServer/minimal_with_lib.lf b/examples/Python/src/WebServer/minimal_with_lib.lf
new file mode 100644
index 00000000..57e744d8
--- /dev/null
+++ b/examples/Python/src/WebServer/minimal_with_lib.lf
@@ -0,0 +1,25 @@
+target Python {
+  coordination: decentralized
+}
+
+import WebServer from "../lib/WebServer.lf"
+
+reactor Handler {
+  input request
+  output response
+
+  reaction(request) -> response {=
+    request_id, req_data = request.value
+    num = int(req_data["data"])
+    num += 1
+    resp = {"status": "success", "num": num}
+    response.set([request_id, resp])
+  =}
+}
+
+federated reactor {
+  server = new WebServer(endpoint="/addone")
+  handler = new Handler()
+  server.request -> handler.request
+  handler.response ~> server.response
+}
diff --git a/examples/Python/src/lib/WebServer.lf b/examples/Python/src/lib/WebServer.lf
new file mode 100644
index 00000000..1f9cdc74
--- /dev/null
+++ b/examples/Python/src/lib/WebServer.lf
@@ -0,0 +1,58 @@
+target Python
+
+reactor WebServer(port=5000, endpoint="/") {
+  state app
+  state events
+  physical action phy_action
+  output request
+  input response
+
+  reaction(startup) -> phy_action {=
+    from fastapi import FastAPI, Request, HTTPException
+    from fastapi.middleware.cors import CORSMiddleware
+    import threading
+    import uvicorn
+    import asyncio
+    import uuid
+    self.events = {}
+    assert isinstance(self.endpoint, str), "The endpoint is not a string"
+    assert self.endpoint.startswith("/"), "The endpoint must start with a /"
+    self.app = FastAPI()
+    self.app.add_middleware(
+        CORSMiddleware,
+        allow_origins=["*"],
+        allow_credentials=True,
+        allow_methods=["*"],
+        allow_headers=["*"],
+    )
+    @self.app.post(self.endpoint)
+    async def addone(request: Request):
+        event = asyncio.Event()
+        request_id = str(uuid.uuid4())
+        self.events[request_id] = event
+        req_data = await request.json()
+
+        phy_action.schedule(0, [request_id, req_data])
+        await event.wait()
+        resp_data = self.events[request_id]
+        del self.events[request_id]
+        return resp_data
+
+    def run_fastapi_app():
+        print(f"FastAPI server starting...")
+        uvicorn.run(self.app, host="127.0.0.1", port=self.port, log_level="warning")
+    fastapi_thread = threading.Thread(target=run_fastapi_app)
+    fastapi_thread.start()
+  =}
+
+  reaction(phy_action) -> request {=
+    request.set(phy_action.value)
+  =}
+
+  reaction(response) {=
+    request_id, resp_data = response.value
+    event = self.events[request_id]
+    self.events[request_id] = resp_data
+    event.set()
+  =}
+}

From 5588c73172f03d0128fba8e39a4060780028470c Mon Sep 17 00:00:00 2001
From: Depetrol 
Date: Wed, 9 Oct 2024 23:03:04 -0700
Subject: [PATCH 07/13] rewrite logging with reusable WebServer

---
 examples/Python/src/WebServer/logging.html |  18 ++-
 examples/Python/src/WebServer/logging.lf   | 129 ++++++---------------
 examples/Python/src/lib/WebServer.lf       |   3 +
 3 files changed, 48 insertions(+), 102 deletions(-)

diff --git a/examples/Python/src/WebServer/logging.html b/examples/Python/src/WebServer/logging.html
index a7d671e0..2617992e 100644
--- a/examples/Python/src/WebServer/logging.html
+++ b/examples/Python/src/WebServer/logging.html
@@ -36,22 +36,18 @@ 

Stored Logs:

const port = document.getElementById("portInput").value || 5000; const fullLogMessage = `Log from ${port}: ${logMessage}`; const logData = { + operation: "newlog", log: fullLogMessage, }; - fetch(`http://127.0.0.1:${port}/log`, { + fetch(`http://127.0.0.1:${port}/`, { method: "POST", headers: { "Content-Type": "application/json", }, body: JSON.stringify(logData), - }) - .then((response) => response.json()) - .then((data) => { - console.log("Response from server:", data); - incrementLogValue(); - }) - .catch((error) => console.error("Error:", error)); + }).catch((error) => console.error("Error:", error)); + incrementLogValue(); } function incrementLogValue() { @@ -63,11 +59,12 @@

Stored Logs:

const port = document.getElementById("portInput").value || 5000; const startTime = performance.now(); - fetch(`http://127.0.0.1:${port}/getlogs`, { + fetch(`http://127.0.0.1:${port}/`, { method: "POST", headers: { "Content-Type": "application/json", }, + body: JSON.stringify({ operation: "getlog" }), }) .then((response) => response.json()) .then((data) => { @@ -99,11 +96,12 @@

Stored Logs:

const port = document.getElementById("portInput").value || 5000; const startTime = performance.now(); - fetch(`http://127.0.0.1:${port}/getlogs_consistent`, { + fetch(`http://127.0.0.1:${port}/`, { method: "POST", headers: { "Content-Type": "application/json", }, + body: JSON.stringify({ operation: "getlog_consistent" }), }) .then((response) => response.json()) .then((data) => { diff --git a/examples/Python/src/WebServer/logging.lf b/examples/Python/src/WebServer/logging.lf index 0388818c..81c64054 100644 --- a/examples/Python/src/WebServer/logging.lf +++ b/examples/Python/src/WebServer/logging.lf @@ -2,99 +2,45 @@ target Python { coordination: decentralized } -preamble {= - from fastapi import FastAPI, Request, HTTPException - from fastapi.middleware.cors import CORSMiddleware - import threading - import uvicorn - import asyncio - import uuid -=} +import WebServer from "../lib/WebServer.lf" -reactor WebServer(bank_index=0, STA=0) { - state app - state events - input sendlogs - input sendlogs_consistent +reactor Router { + input request output newlog output getlog output getlog_consistent - physical action newlog_action - physical action getlog_action - logical action getlog_consistent_action - reaction(startup) -> newlog_action, getlog_action, getlog_consistent_action {= - self.events = {} - self.app = FastAPI() - self.app.add_middleware( - CORSMiddleware, - allow_origins=["*"], - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], - ) - @self.app.post("/log") - async def receive_log(request: Request): - data = await request.json() - if "log" in data: - log_message = data["log"] - newlog_action.schedule(0, log_message) - return {"status": "success", "message": "Log stored successfully"} - else: - return {"status": "error", "message": "No log provided"}, 400 - @self.app.post("/getlogs") - async def get_logs(request: Request): - event = asyncio.Event() - request_id = str(uuid.uuid4()) - self.events[request_id] = event - getlog_action.schedule(0, request_id) - await event.wait() - logs = self.events[request_id] - del self.events[request_id] - return {"status": "success", "logs": logs} - - @self.app.post("/getlogs_consistent") - async def getlogs_consistent(request: Request): - event = asyncio.Event() - request_id = str(uuid.uuid4()) - self.events[request_id] = event - getlog_consistent_action.schedule(0, request_id) - await event.wait() - logs = self.events[request_id] - del self.events[request_id] - return {"status": "success", "logs": logs} - - def run_fastapi_app(): - print(f"[WebServer{self.bank_index}] FastAPI server starting") - uvicorn.run(self.app, host="127.0.0.1", port=5000+self.bank_index, log_level="warning") - fastapi_thread = threading.Thread(target=run_fastapi_app) - fastapi_thread.start() - =} - - reaction(newlog_action) -> newlog {= - newlog.set(newlog_action.value) - =} - - reaction(getlog_action) -> getlog {= - getlog.set(getlog_action.value) - =} - - reaction(getlog_consistent_action) -> getlog_consistent {= - getlog_consistent.set(getlog_consistent_action.value) - =} - - reaction(sendlogs) {= - [logs, event_id] = sendlogs.value - event = self.events[event_id] - self.events[event_id] = logs - event.set() + reaction(request) -> newlog, getlog, getlog_consistent {= + # print(f"Router received request: {request.value}") + request_id, req_data = request.value + if req_data["operation"] == "newlog" and "log" in req_data.keys(): + newlog.set([request_id, req_data["log"]]) + elif req_data["operation"] == "getlog": + getlog.set(request_id) + elif req_data["operation"] == "getlog_consistent": + getlog_consistent.set(request_id) + else: + print("Invalid Request") + return =} +} - reaction(sendlogs_consistent) {= - [logs, event_id] = sendlogs_consistent.value - event = self.events[event_id] - self.events[event_id] = logs - event.set() +reactor WebServerRouter(bank_index=0, STA=0) { + output newlog + output getlog + output getlog_consistent + webserver = new WebServer(port = {= 5000+self.bank_index =}, endpoint="/") + router = new Router() + webserver.request -> router.request + router.newlog -> newlog + router.getlog -> getlog + router.getlog_consistent -> getlog_consistent + input[2] response + + reaction(response) -> webserver.response {= + for port in response: + if port.is_present: + webserver.response.set(port.value) =} } @@ -108,27 +54,26 @@ reactor Database(bank_index=0, portwidth=2, STA = 0 s) { self.logs = [] =} - reaction(addlog) {= + reaction(addlog) -> sendlogs {= for i, port in enumerate(addlog): if port.is_present: - log_message = port.value + request_id, log_message = port.value self.logs.append(log_message) =} reaction(getlog) -> sendlogs {= - sendlogs.set([self.logs, getlog.value]) + sendlogs.set([getlog.value, {"status": "success", "logs": self.logs}]) =} } federated reactor(ReplicaCount=2) { - server = new[ReplicaCount] WebServer() + server = new[ReplicaCount] WebServerRouter() db = new[ReplicaCount] Database(portwidth=ReplicaCount) (server.newlog)+ ~> db.addlog server.getlog ~> db.getlog - db.sendlogs ~> server.sendlogs dbc = new[ReplicaCount] Database(portwidth=ReplicaCount, STA = 3 s) (server.newlog)+ -> dbc.addlog server.getlog_consistent -> dbc.getlog - dbc.sendlogs ~> server.sendlogs_consistent + db.sendlogs, dbc.sendlogs ~> interleaved(server.response) } diff --git a/examples/Python/src/lib/WebServer.lf b/examples/Python/src/lib/WebServer.lf index 1f9cdc74..7332d229 100644 --- a/examples/Python/src/lib/WebServer.lf +++ b/examples/Python/src/lib/WebServer.lf @@ -51,6 +51,9 @@ reactor WebServer(port=5000, endpoint="/") { reaction(response) {= request_id, resp_data = response.value + if request_id not in self.events: + print("Invalid Request ID") + return event = self.events[request_id] self.events[request_id] = resp_data event.set() From 1b3963c99bae9d851e2a1c9266127da03dd4abf3 Mon Sep 17 00:00:00 2001 From: Depetrol Date: Wed, 9 Oct 2024 23:28:56 -0700 Subject: [PATCH 08/13] update docs --- examples/Python/src/WebServer/README.md | 4 +++- examples/Python/src/WebServer/logging.svg | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/examples/Python/src/WebServer/README.md b/examples/Python/src/WebServer/README.md index 3eb1fce7..481d246f 100644 --- a/examples/Python/src/WebServer/README.md +++ b/examples/Python/src/WebServer/README.md @@ -103,9 +103,11 @@ Note that the `request_id` has to be sent to and from the `Handler` reactor so t ![logging](logging.svg) +To implement our distributed logging application, we need to respond to three distinct operations, but the reusable `WebServer` reactor has only one API endpoint. We can solve this by introducing a new `Router` reactor and [composing reactors](https://www.lf-lang.org/docs/writing-reactors/composing-reactors), as shown in the diagram above. Each HTTP request body now carries an additional `operation` field that allows the router to route the request to different reactions through connections. + Now we can implement a distributed logging system by instantiating several `WebServer` reactors on different network ports and adding two `Database` reactors for each `WebServer`. * One `Database` reactor has an STA offset of 0 and is connected by physical connections. This will prioritize availability, generating a quick response that is not (necessarily) consistent. * Another `Database` reactor has an STA offset of 3s (this can be changed) and is connected by logical connections. This will guarantee that the logs in this `Database` reactor will be consistent as long as out-of-order messages arrive within 3s. -Note that this is implemented with banks and multiports. When sending logs, we want the `WebServer` to send logs to all `Database` reactors, so the `newlog` connection is a multiport; but when getting logs, we want to know the log state of the single corresponding `Database` reactor, which means that there is no multiport for the get log operation. \ No newline at end of file +Note that this is implemented with banks and multiports. When sending logs, we want the `WebServer` to send logs to all `Database` reactors, so the `newlog` connection is implemented with broadcasts; but when getting logs, we want to know the log state of the single corresponding `Database` reactor, hence there is no broadcast here. In the last line, `db.sendlogs, dbc.sendlogs ~> interleaved(server.response)` uses interleaved connections because each `WebServer` corresponds to two `Database`, one consistent and one not, and we need to avoid having a `WebServer` connecting to two inconsistent databases and another connecting to two consistent databases. \ No newline at end of file diff --git a/examples/Python/src/WebServer/logging.svg b/examples/Python/src/WebServer/logging.svg index 62752b83..bb661b6b 100644 --- a/examples/Python/src/WebServer/logging.svg +++ b/examples/Python/src/WebServer/logging.svg @@ -1 +1 @@ -loggingWebServer2sendlogssendlogs_consistentnewloggetloggetlog_consistentDatabase2addloggetlogsendlogsDatabase2addloggetlogsendlogs$$ \ No newline at end of file +loggingWebServerRouterWebServerresponserequestRouterrequestnewloggetloggetlog_consistent2responsenewloggetloggetlog_consistentDatabase2addloggetlogsendlogsDatabase2addloggetlogsendlogs$$$$ \ No newline at end of file From 473d32f082bf93be37daaccd7b6a1d967985f213 Mon Sep 17 00:00:00 2001 From: depetrol Date: Thu, 10 Oct 2024 10:15:35 -0700 Subject: [PATCH 09/13] docs: WebServer reusable reactor --- examples/Python/src/lib/WebServer.lf | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/examples/Python/src/lib/WebServer.lf b/examples/Python/src/lib/WebServer.lf index 7332d229..b65f2ea1 100644 --- a/examples/Python/src/lib/WebServer.lf +++ b/examples/Python/src/lib/WebServer.lf @@ -1,5 +1,28 @@ -target Python +/** + * @file + * @author Shulu Li + * @brief Reactor for handling HTTP requests in Python. + */ +target Python { + keepalive: true +} +/** + * @brief A reactor that starts a FastAPI server to handle HTTP requests. + * + * The `port` parameter is the port number on which the server listens for requests. + * + * The `endpoint` parameter is the endpoint at which the server listens for requests. + * + * The `request` output is a tuple of two values: the request ID and the request data. + * + * The `response` input is a tuple of two values: the request ID and the response data. Request ID + * is required to respond to the correct request. Use a physical connection to connect the + * `response` input to avoid STP violations. + * + * To use this reactor, you must install the fastapi and uvicorn libraries for Python. You can do + * this with `pip install fastapi uvicorn`. + */ reactor WebServer(port=5000, endpoint="/") { state app state events From b95ce1af1c91a4696a553fd98053e644aaf6203e Mon Sep 17 00:00:00 2001 From: depetrol Date: Thu, 10 Oct 2024 11:33:19 -0700 Subject: [PATCH 10/13] rename endpoint to path --- examples/Python/src/WebServer/README.md | 6 +++--- examples/Python/src/WebServer/logging.lf | 2 +- examples/Python/src/WebServer/minimal_with_lib.lf | 2 +- examples/Python/src/lib/WebServer.lf | 10 +++++----- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/examples/Python/src/WebServer/README.md b/examples/Python/src/WebServer/README.md index 481d246f..446545ba 100644 --- a/examples/Python/src/WebServer/README.md +++ b/examples/Python/src/WebServer/README.md @@ -43,7 +43,7 @@ async def addone(request: Request): return {"status": "success", "num": num} ``` -`self` here refers to the `WebServer` reactor in which the handler is defined. The `self.app` is an instance of `FastAPI` application instance, defined as a state of the `WebServer` reactor. Import statements are in the `preamble` and not shown here for simplicity. This handler function will be triggered to generate a response to an HTTP `POST` request at the `/addone` endpoint. +`self` here refers to the `WebServer` reactor in which the handler is defined. The `self.app` is an instance of `FastAPI` application instance, defined as a state of the `WebServer` reactor. Import statements are in the `preamble` and not shown here for simplicity. This handler function will be triggered to generate a response to an HTTP `POST` request at the `/addone` path. And the reaction to the action is @@ -89,7 +89,7 @@ reactor Handler { } federated reactor { - server = new WebServer(endpoint="/addone") + server = new WebServer(path="/addone") handler = new Handler() server.request -> handler.request handler.response ~> server.response @@ -103,7 +103,7 @@ Note that the `request_id` has to be sent to and from the `Handler` reactor so t ![logging](logging.svg) -To implement our distributed logging application, we need to respond to three distinct operations, but the reusable `WebServer` reactor has only one API endpoint. We can solve this by introducing a new `Router` reactor and [composing reactors](https://www.lf-lang.org/docs/writing-reactors/composing-reactors), as shown in the diagram above. Each HTTP request body now carries an additional `operation` field that allows the router to route the request to different reactions through connections. +To implement our distributed logging application, we need to respond to three distinct operations, but the reusable `WebServer` reactor has only one API path. We can solve this by introducing a new `Router` reactor and [composing reactors](https://www.lf-lang.org/docs/writing-reactors/composing-reactors), as shown in the diagram above. Each HTTP request body now carries an additional `operation` field that allows the router to route the request to different reactions through connections. Now we can implement a distributed logging system by instantiating several `WebServer` reactors on different network ports and adding two `Database` reactors for each `WebServer`. diff --git a/examples/Python/src/WebServer/logging.lf b/examples/Python/src/WebServer/logging.lf index 81c64054..638fec7f 100644 --- a/examples/Python/src/WebServer/logging.lf +++ b/examples/Python/src/WebServer/logging.lf @@ -29,7 +29,7 @@ reactor WebServerRouter(bank_index=0, STA=0) { output newlog output getlog output getlog_consistent - webserver = new WebServer(port = {= 5000+self.bank_index =}, endpoint="/") + webserver = new WebServer(port = {= 5000+self.bank_index =}, path="/") router = new Router() webserver.request -> router.request router.newlog -> newlog diff --git a/examples/Python/src/WebServer/minimal_with_lib.lf b/examples/Python/src/WebServer/minimal_with_lib.lf index 57e744d8..d8e281f2 100644 --- a/examples/Python/src/WebServer/minimal_with_lib.lf +++ b/examples/Python/src/WebServer/minimal_with_lib.lf @@ -18,7 +18,7 @@ reactor Handler { } federated reactor { - server = new WebServer(endpoint="/addone") + server = new WebServer(path="/addone") handler = new Handler() server.request -> handler.request handler.response ~> server.response diff --git a/examples/Python/src/lib/WebServer.lf b/examples/Python/src/lib/WebServer.lf index b65f2ea1..85e5a9e6 100644 --- a/examples/Python/src/lib/WebServer.lf +++ b/examples/Python/src/lib/WebServer.lf @@ -12,7 +12,7 @@ target Python { * * The `port` parameter is the port number on which the server listens for requests. * - * The `endpoint` parameter is the endpoint at which the server listens for requests. + * The `path` parameter is the path at which the server listens for requests. * * The `request` output is a tuple of two values: the request ID and the request data. * @@ -23,7 +23,7 @@ target Python { * To use this reactor, you must install the fastapi and uvicorn libraries for Python. You can do * this with `pip install fastapi uvicorn`. */ -reactor WebServer(port=5000, endpoint="/") { +reactor WebServer(port=5000, path="/") { state app state events physical action phy_action @@ -38,8 +38,8 @@ reactor WebServer(port=5000, endpoint="/") { import asyncio import uuid self.events = {} - assert isinstance(self.endpoint, str), "The endpoint is not a string" - assert self.endpoint.startswith("/"), "The endpoint must start with a /" + assert isinstance(self.path, str), "The path is not a string" + assert self.path.startswith("/"), "The path must start with a /" self.app = FastAPI() self.app.add_middleware( CORSMiddleware, @@ -48,7 +48,7 @@ reactor WebServer(port=5000, endpoint="/") { allow_methods=["*"], allow_headers=["*"], ) - @self.app.post(self.endpoint) + @self.app.post(self.path) async def addone(request: Request): event = asyncio.Event() request_id = str(uuid.uuid4()) From 485a1ee90a018599c6174e22261815f5627e403e Mon Sep 17 00:00:00 2001 From: depetrol Date: Thu, 10 Oct 2024 11:44:15 -0700 Subject: [PATCH 11/13] fix: WebServer allow any HTTP request body --- examples/Python/src/lib/WebServer.lf | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/examples/Python/src/lib/WebServer.lf b/examples/Python/src/lib/WebServer.lf index 85e5a9e6..91d88d06 100644 --- a/examples/Python/src/lib/WebServer.lf +++ b/examples/Python/src/lib/WebServer.lf @@ -12,7 +12,7 @@ target Python { * * The `port` parameter is the port number on which the server listens for requests. * - * The `path` parameter is the path at which the server listens for requests. + * The `path` parameter is the path at which the server listens for HTTP POST requests. * * The `request` output is a tuple of two values: the request ID and the request data. * @@ -53,8 +53,10 @@ reactor WebServer(port=5000, path="/") { event = asyncio.Event() request_id = str(uuid.uuid4()) self.events[request_id] = event - req_data = await request.json() - + if request.headers.get("content-type") == "application/json": + req_data = await request.json() + else: + req_data = await request.body() phy_action.schedule(0, [request_id, req_data]) await event.wait() resp_data = self.events[request_id] From c7ef7ee588689be5de9f9b7bf4240445727a5a15 Mon Sep 17 00:00:00 2001 From: depetrol Date: Thu, 10 Oct 2024 11:53:09 -0700 Subject: [PATCH 12/13] update docs --- examples/Python/src/lib/WebServer.lf | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/examples/Python/src/lib/WebServer.lf b/examples/Python/src/lib/WebServer.lf index 91d88d06..5a8df7a4 100644 --- a/examples/Python/src/lib/WebServer.lf +++ b/examples/Python/src/lib/WebServer.lf @@ -14,9 +14,11 @@ target Python { * * The `path` parameter is the path at which the server listens for HTTP POST requests. * - * The `request` output is a tuple of two values: the request ID and the request data. + * The `request` output is a list of two values: the request ID and the request data. If the + * request header indicates that the body is a JSON object, the request data is parsed into a python + * dictionary. Otherwise, the request body is forwarded as-is. * - * The `response` input is a tuple of two values: the request ID and the response data. Request ID + * The `response` input is a list of two values: the request ID and the response data. Request ID * is required to respond to the correct request. Use a physical connection to connect the * `response` input to avoid STP violations. * From 0f9aa8c6d3303bdb51d8ae017a94aaa747c8155b Mon Sep 17 00:00:00 2001 From: depetrol Date: Thu, 10 Oct 2024 12:10:02 -0700 Subject: [PATCH 13/13] unfederated execution for +1 examples --- .gitignore | 1 + examples/Python/src/WebServer/minimal.lf | 4 ++-- examples/Python/src/WebServer/minimal_with_lib.lf | 4 ++-- examples/Python/src/lib/WebServer.lf | 10 +++++----- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/.gitignore b/.gitignore index 4d594143..a42b10a6 100644 --- a/.gitignore +++ b/.gitignore @@ -17,6 +17,7 @@ C/include Cpp/**/include Cpp/**/share Cpp/**/lib +examples/Python/include # Created by https://www.toptal.com/developers/gitignore/api/intellij,gradle,eclipse,maven,visualstudiocode # Edit at https://www.toptal.com/developers/gitignore?templates=intellij,gradle,eclipse,maven,visualstudiocode diff --git a/examples/Python/src/WebServer/minimal.lf b/examples/Python/src/WebServer/minimal.lf index 2b727d74..9b838e3c 100644 --- a/examples/Python/src/WebServer/minimal.lf +++ b/examples/Python/src/WebServer/minimal.lf @@ -1,5 +1,5 @@ target Python { - coordination: decentralized + keepalive: true } preamble {= @@ -53,6 +53,6 @@ reactor WebServer(bank_index=0, STA=0) { =} } -federated reactor { +main reactor { server = new WebServer() } diff --git a/examples/Python/src/WebServer/minimal_with_lib.lf b/examples/Python/src/WebServer/minimal_with_lib.lf index d8e281f2..f5da0f75 100644 --- a/examples/Python/src/WebServer/minimal_with_lib.lf +++ b/examples/Python/src/WebServer/minimal_with_lib.lf @@ -1,5 +1,5 @@ target Python { - coordination: decentralized + keepalive: true } import WebServer from "../lib/WebServer.lf" @@ -17,7 +17,7 @@ reactor Handler { =} } -federated reactor { +main reactor { server = new WebServer(path="/addone") handler = new Handler() server.request -> handler.request diff --git a/examples/Python/src/lib/WebServer.lf b/examples/Python/src/lib/WebServer.lf index 5a8df7a4..79e962be 100644 --- a/examples/Python/src/lib/WebServer.lf +++ b/examples/Python/src/lib/WebServer.lf @@ -14,13 +14,13 @@ target Python { * * The `path` parameter is the path at which the server listens for HTTP POST requests. * - * The `request` output is a list of two values: the request ID and the request data. If the - * request header indicates that the body is a JSON object, the request data is parsed into a python + * The `request` output is a list of two values: the request ID and the request data. If the request + * header indicates that the body is a JSON object, the request data is parsed into a python * dictionary. Otherwise, the request body is forwarded as-is. * - * The `response` input is a list of two values: the request ID and the response data. Request ID - * is required to respond to the correct request. Use a physical connection to connect the - * `response` input to avoid STP violations. + * The `response` input is a list of two values: the request ID and the response data. Request ID is + * required to respond to the correct request. Use a physical connection to connect the `response` + * input to avoid STP violations. * * To use this reactor, you must install the fastapi and uvicorn libraries for Python. You can do * this with `pip install fastapi uvicorn`.