Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flood publish #15

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4,382 changes: 4,382 additions & 0 deletions flood_publishing/Cargo.lock

Large diffs are not rendered by default.

24 changes: 24 additions & 0 deletions flood_publishing/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "flood_publishing"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
# Staggered message sending.
# https://github.com/libp2p/rust-libp2p/pull/3666
# The revision specified here adds some log output needed to measure latency.
# See: https://github.com/ackintosh/rust-libp2p/commit/e49b2b27a21b26e1c90a08ff8fa2d38bc265539d
libp2p = { git = "https://github.com/ackintosh/rust-libp2p.git", rev = "6c303a5b19bae7949f435958087f443ac60d41e3", default-features = false, features = ["gossipsub", "dns", "tcp", "tokio", "noise", "mplex", "yamux", "serde"] }

futures = "0.3"
rand = { version = "0.8", features = ["small_rng"] }
serde = "1.0"
serde_json = "1.0"
sha2 = "0.10"
# TODO: Update testground once the next version(v0.5.0).
testground = { git = "https://github.com/testground/sdk-rust.git", rev = "1fd032ec29361a00b25c0c8a6bac5f19a43019eb" }
tokio = { version = "1.21", features = ["macros"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
46 changes: 46 additions & 0 deletions flood_publishing/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# This Dockerfile is for the `docker:generic` builder.
# See https://docs.testground.ai/builder-library/docker-generic for details about the builder.
FROM rust:1.67-bullseye as builder
WORKDIR /usr/src/test-plan

# * `prost-build`, a dependency of `libp2p-gossipsub`, requires cmake.
# There is a discussion for removing cmake from their dependency.
# https://github.com/tokio-rs/prost/pull/620
# * Since `prost-build` v0.11, `protoc` is required.
# https://github.com/tokio-rs/prost/releases/tag/v0.11.0
RUN apt-get update && apt-get install -y cmake && apt-get install -y protobuf-compiler

# Cache dependencies between test runs,
# See https://blog.mgattozzi.dev/caching-rust-docker-builds/
# And https://github.com/rust-lang/cargo/issues/2644
RUN mkdir -p ./plan/src/
# This is a placeholder main function to build only the dependencies.
RUN echo "fn main() { println!(\"If you see this message, you may want to clean up the target directory or the Docker build cache.\") }" > ./plan/src/main.rs
COPY ./plan/Cargo.toml ./plan/
RUN cd ./plan/ && cargo build --release

# Coping each files only needed instead of all of them(`COPY . .`) in order to avoid unnecessary compiling.
# e.g. compile triggered by modifiying `manifest.toml`.
# NOTE:
# `.dockerignore` is not effective because the build context is not this directory on the test run.
# See https://docs.testground.ai/builder-library/docker-generic#usage
COPY ./plan/src ./plan/src
COPY ./plan/Cargo.lock ./plan/

# This is in order to make sure `main.rs`s mtime timestamp is updated to avoid the dummy `main`
# remaining in the release binary.
# https://github.com/rust-lang/cargo/issues/9598
RUN touch ./plan/src/main.rs

# Note: In `docker:generic` builder, the root of the docker build context is one directory higher
# than this test plan.
# See https://docs.testground.ai/builder-library/docker-generic#usage
RUN cd ./plan/ && cargo build --release

FROM debian:bullseye-slim
COPY --from=builder /usr/src/test-plan/plan/target/release/flood_publishing /usr/local/bin/flood_publishing

# Configure Logging
ENV RUST_LOG=libp2p_gossipsub=info,flood_publishing=info

ENTRYPOINT ["flood_publishing"]
48 changes: 48 additions & 0 deletions flood_publishing/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Flood Publishing Simulation

This simulation creates a number of nodes in which flood publishing is enabled
and help users to measure the latency of messages.

In this simulation, each node logs the time when send_message() and handle_received_message()
are called. (The functions are defined in rust-libp2p side. So this simulation uses
a forked rust-libp2p that includes the logging.)

```mermaid
sequenceDiagram
participant Node1
participant Node2
participant Node3

loop Simulation
Note over Node1: send_message()
Node1->>Node2: message
Note over Node2: handle_received_message()
Note over Node1: send_message()
Node1->>Node3: message
Note over Node3: handle_received_message()
end
```

Using `measure_latency.py` we can measure the time between `send_message()` on the publisher side
and `handle_received_message()` on the receiver side.

## Running the Simulation

This simulation can be run with the following command (from within the repos
root directory):

The type of flood publishing can be switched via `--test-param flood_publish=heartbeat`. Please read
the `flood_publishing/manifest.toml` to understand test parameters.

```sh
testground run single \
--plan gossipsub-testground/flood_publishing \
--testcase flood_publishing \
--builder docker:generic \
--runner local:docker \
--instances 50 \
--wait \
--test-param flood_publish=heartbeat \
| grep flood_publishing_test \
| python3 flood_publishing/measure_latency.py
```
27 changes: 27 additions & 0 deletions flood_publishing/manifest.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name = "flood_publishing"

[defaults]
builder = "docker:generic"
runner = "local:docker"
disable_metrics = false

[builders."docker:generic"]
enabled = true

[runners."local:docker"]
enabled = true

[[testcases]]
name = "flood_publishing"
instances = { min = 3, max = 100, default = 3 }
[testcases.params]
warm_up = { type = "int", desc = "Time to wait for nodes to form overlay mesh before beginning publishing", unit = "sec", default = 10 }
run = { type = "int", desc = "Time to run simulation", unit = "sec", default = 20 }
cool_down = { type = "int", desc = "Time to wait after simulation", unit = "sec", default = 10 }
publish_interval = { type = "int", desc = "Interval to publish a message", unit = "sec", default = 5 }
message_size = { type = "int", desc = "Size of messages to be published", unit = "bytes", default = 50000 }
# Network config
bandwidth = { type = "int", desc = "Bandwidth in MiB", unit = "MiB", default = 30 }
# Gossipsub config
flood_publish = { type = "string", desc = "The type of flood publishing: rapid / heartbeat" }

123 changes: 123 additions & 0 deletions flood_publishing/measure_latency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import json
import sys


def parse_log(line):
label_index = line.index("[flood_publishing_test]")
log_str = line[label_index + len("[flood_publishing_test]"):]
log = json.loads(log_str)
log['node_id'] = extract_node_id(line)
return log


# Extract `node_id`, issued by Testground from a log.
# For example, this function returns `a04ed9` if a log like below is passed to:
# Mar 31 21:52:47.918675 INFO 0.8177s OTHER << single[002] (a04ed9) >> [flood_publishing_test]...
def extract_node_id(line):
label_sep = line.index(") >>")
return line[label_sep - 6:label_sep]


def node_id_to_peer_id(nodes, node_id):
for key, node in nodes.items():
if key == node_id:
return node['peer_id']

print("node_id not found in nodes: " + node_id)
sys.exit(1)


def mean(latencies):
return sum(latencies) / len(latencies)


def median(latencies):
sorted_latencies = sorted(latencies)
length = len(sorted_latencies)
if length == 0:
return None
elif length % 2 == 0:
return (sorted_latencies[length // 2 - 1] + sorted_latencies[length // 2]) / 2
else:
return sorted_latencies[length // 2]


# Example logs for debugging.
debug = '''Mar 31 21:57:30.155618 INFO 0.8585s OTHER << single[000] (3b20a6) >> [flood_publishing_test]{"event":"peer_id","peer_id":"12D3KooWN5f2YP3sKRykhAYEz48cfpGseW9dRfJxWDKXmKbb22c1","is_publisher":false}
Mar 31 21:57:30.157546 INFO 0.8605s OTHER << single[001] (a250a4) >> [flood_publishing_test]{"event":"peer_id","peer_id":"12D3KooWPf6ZJrYn1ojsp9iazfxJFRgYoHRC79yNDUviqSeS894x","is_publisher":true}
Mar 31 21:57:30.159084 INFO 0.8620s OTHER << single[002] (591350) >> [flood_publishing_test]{"event":"peer_id","peer_id":"12D3KooWAHNzQSmfpCF4r7WT8G7tWQFc2Yr6wES3Poz99r3RbkAy","is_publisher":false}
Mar 31 21:57:36.156824 INFO 6.8598s OTHER << single[001] (a250a4) >> [flood_publishing_test]{"event":"send","to":"12D3KooWN5f2YP3sKRykhAYEz48cfpGseW9dRfJxWDKXmKbb22c1","message_id":"c189ad462fb14701d0b1040139e119c60f3172b2","time":1680299856156}
Mar 31 21:57:36.156965 INFO 6.8599s OTHER << single[001] (a250a4) >> [flood_publishing_test]{"event":"send","to":"12D3KooWAHNzQSmfpCF4r7WT8G7tWQFc2Yr6wES3Poz99r3RbkAy","message_id":"c189ad462fb14701d0b1040139e119c60f3172b2","time":1680299856156}
Mar 31 21:57:36.862880 INFO 7.5658s OTHER << single[002] (591350) >> [flood_publishing_test]{"event":"receive","propagation_source":"12D3KooWPf6ZJrYn1ojsp9iazfxJFRgYoHRC79yNDUviqSeS894x","message_id":"c189ad462fb14701d0b1040139e119c60f3172b2","time":1680299856862}
Mar 31 21:57:36.928091 INFO 7.6310s OTHER << single[000] (3b20a6) >> [flood_publishing_test]{"event":"receive","propagation_source":"12D3KooWPf6ZJrYn1ojsp9iazfxJFRgYoHRC79yNDUviqSeS894x","message_id":"c189ad462fb14701d0b1040139e119c60f3172b2","time":1680299856927}
Mar 31 21:57:39.166998 INFO 9.8700s OTHER << single[001] (a250a4) >> [flood_publishing_test]{"event":"send","to":"12D3KooWN5f2YP3sKRykhAYEz48cfpGseW9dRfJxWDKXmKbb22c1","message_id":"25b377b39fd58a9b57f3a7c8fea759063fcfcc1a","time":1680299859158}
Mar 31 21:57:39.167076 INFO 9.8700s OTHER << single[001] (a250a4) >> [flood_publishing_test]{"event":"send","to":"12D3KooWAHNzQSmfpCF4r7WT8G7tWQFc2Yr6wES3Poz99r3RbkAy","message_id":"25b377b39fd58a9b57f3a7c8fea759063fcfcc1a","time":1680299859158}
Mar 31 21:57:39.899528 INFO 10.6025s OTHER << single[000] (3b20a6) >> [flood_publishing_test]{"event":"receive","propagation_source":"12D3KooWPf6ZJrYn1ojsp9iazfxJFRgYoHRC79yNDUviqSeS894x","message_id":"25b377b39fd58a9b57f3a7c8fea759063fcfcc1a","time":1680299859899}
Mar 31 21:57:39.941150 INFO 10.6441s OTHER << single[002] (591350) >> [flood_publishing_test]{"event":"receive","propagation_source":"12D3KooWPf6ZJrYn1ojsp9iazfxJFRgYoHRC79yNDUviqSeS894x","message_id":"25b377b39fd58a9b57f3a7c8fea759063fcfcc1a","time":1680299859940}
Mar 31 21:57:42.156844 INFO 12.8598s OTHER << single[001] (a250a4) >> [flood_publishing_test]{"event":"send","to":"12D3KooWAHNzQSmfpCF4r7WT8G7tWQFc2Yr6wES3Poz99r3RbkAy","message_id":"472b99c805f1d58a2155d4bf654cbcb58d12b75b","time":1680299862155}
Mar 31 21:57:42.157266 INFO 12.8602s OTHER << single[001] (a250a4) >> [flood_publishing_test]{"event":"send","to":"12D3KooWN5f2YP3sKRykhAYEz48cfpGseW9dRfJxWDKXmKbb22c1","message_id":"472b99c805f1d58a2155d4bf654cbcb58d12b75b","time":1680299862156}
Mar 31 21:57:42.873985 INFO 13.5770s OTHER << single[002] (591350) >> [flood_publishing_test]{"event":"receive","propagation_source":"12D3KooWPf6ZJrYn1ojsp9iazfxJFRgYoHRC79yNDUviqSeS894x","message_id":"472b99c805f1d58a2155d4bf654cbcb58d12b75b","time":1680299862873}
Mar 31 21:57:42.927309 INFO 13.6303s OTHER << single[000] (3b20a6) >> [flood_publishing_test]{"event":"receive","propagation_source":"12D3KooWPf6ZJrYn1ojsp9iazfxJFRgYoHRC79yNDUviqSeS894x","message_id":"472b99c805f1d58a2155d4bf654cbcb58d12b75b","time":1680299862926}
Mar 31 21:57:45.156938 INFO 15.8599s OTHER << single[001] (a250a4) >> [flood_publishing_test]{"event":"send","to":"12D3KooWAHNzQSmfpCF4r7WT8G7tWQFc2Yr6wES3Poz99r3RbkAy","message_id":"4ddbd9ff71a5ceebb8b1f3d4353a217594e11439","time":1680299865156}
Mar 31 21:57:45.157038 INFO 15.8600s OTHER << single[001] (a250a4) >> [flood_publishing_test]{"event":"send","to":"12D3KooWN5f2YP3sKRykhAYEz48cfpGseW9dRfJxWDKXmKbb22c1","message_id":"4ddbd9ff71a5ceebb8b1f3d4353a217594e11439","time":1680299865156}
Mar 31 21:57:45.858130 INFO 16.5611s OTHER << single[002] (591350) >> [flood_publishing_test]{"event":"receive","propagation_source":"12D3KooWPf6ZJrYn1ojsp9iazfxJFRgYoHRC79yNDUviqSeS894x","message_id":"4ddbd9ff71a5ceebb8b1f3d4353a217594e11439","time":1680299865851}
Mar 31 21:57:45.952577 INFO 16.6556s OTHER << single[000] (3b20a6) >> [flood_publishing_test]{"event":"receive","propagation_source":"12D3KooWPf6ZJrYn1ojsp9iazfxJFRgYoHRC79yNDUviqSeS894x","message_id":"4ddbd9ff71a5ceebb8b1f3d4353a217594e11439","time":1680299865928}
Mar 31 21:57:48.157564 INFO 18.8606s OTHER << single[001] (a250a4) >> [flood_publishing_test]{"event":"send","to":"12D3KooWAHNzQSmfpCF4r7WT8G7tWQFc2Yr6wES3Poz99r3RbkAy","message_id":"864403bdfd935b72df14701adf7fc44829c47357","time":1680299868156}
Mar 31 21:57:48.157612 INFO 18.8606s OTHER << single[001] (a250a4) >> [flood_publishing_test]{"event":"send","to":"12D3KooWN5f2YP3sKRykhAYEz48cfpGseW9dRfJxWDKXmKbb22c1","message_id":"864403bdfd935b72df14701adf7fc44829c47357","time":1680299868156}
Mar 31 21:57:48.863088 INFO 19.5661s OTHER << single[002] (591350) >> [flood_publishing_test]{"event":"receive","propagation_source":"12D3KooWPf6ZJrYn1ojsp9iazfxJFRgYoHRC79yNDUviqSeS894x","message_id":"864403bdfd935b72df14701adf7fc44829c47357","time":1680299868862}
Mar 31 21:57:48.927562 INFO 19.6306s OTHER << single[000] (3b20a6) >> [flood_publishing_test]{"event":"receive","propagation_source":"12D3KooWPf6ZJrYn1ojsp9iazfxJFRgYoHRC79yNDUviqSeS894x","message_id":"864403bdfd935b72df14701adf7fc44829c47357","time":1680299868927}
'''


def run():
publisher = None
nodes = {}

# The logs of events that the `publisher` sent a message.
send_logs = []
# The logs of events that the `nodes` received a message from the `publisher`.
receive_logs = {}

# for line in debug.split('\n'): # debugging
for line in sys.stdin:
if len(line) == 0:
continue

log = parse_log(line.rstrip())
if log['event'] == 'peer_id':
if log['is_publisher']:
publisher = log
else:
nodes[log['node_id']] = log
elif log['event'] == 'send':
if log['node_id'] == publisher['node_id']:
send_logs.append(log)
elif log['event'] == 'receive':
if log['node_id'] != publisher['node_id'] and log['propagation_source'] == publisher['peer_id']:
key = node_id_to_peer_id(nodes, log['node_id']) + '_' + log['message_id']
receive_logs[key] = log
else:
print("Unknown log: ", log)
sys.exit(1)

print("\n\n*** measure_latency.py ***")
print('[publisher] node_id:', publisher['node_id'], ", peer_id:", publisher['peer_id'])
print('[nodes]', len(nodes) + 1) # nodes + publisher
print('[send_logs]', len(send_logs))

latencies = []

for send in send_logs:
receive = receive_logs.get(send['to'] + '_' + send['message_id'])
# It's possible that a log may not be found even in normal cases, due to factors such as latency.
if receive is not None:
latencies.append(receive['time'] - send['time'])
print('[receive_logs]', len(latencies))

print('\n* Results (in milliseconds) *')
print('[mean]', mean(latencies))
print('[median]', median(latencies))


if __name__ == '__main__':
run()
Loading