Skip to content

Commit

Permalink
Fix re-appearing peer not getting rewards (#565)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeandemeusy authored Aug 29, 2024
1 parent f528931 commit b887640
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 64 deletions.
6 changes: 4 additions & 2 deletions ct-app/.configs/core_prod_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,17 @@ economicModel:
#
# =============================================================================
storage:
count: 5
count: 25
timeout: 3600 # seconds

# =============================================================================
#
# =============================================================================
peer:
minVersion: '2.1.0'

initialSleep:
mean: 30
std: 2
# =============================================================================
#
# =============================================================================
Expand Down
11 changes: 7 additions & 4 deletions ct-app/.configs/core_staging_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ economicModel:

legacy:
proportion: 1.0
apr: 0.0000001
apr: 0.000005

coefficients:
a: 1
b: 2
c: 3
l: 0.000001
b: 1.4
c: 15
l: 0.1

equations:
fx:
Expand Down Expand Up @@ -95,6 +95,9 @@ storage:
# =============================================================================
peer:
minVersion: '2.1.0'
initialSleep:
mean: 30
std: 2

# =============================================================================
#
Expand Down
22 changes: 19 additions & 3 deletions ct-app/core/components/lockedvar.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,27 @@ async def update(self, value: Any):
self.warning(
f"Trying to change value of type {type(value)} to {self.type}, ignoring"
)
if not isinstance(value, dict):
raise TypeError("Trying to call 'update' on non-dict value")
async with self.lock:
try:
self.value.update(value)
except AttributeError as e:
raise AttributeError("Trying to call 'update' on non-dict value") from e

async def replace_value(self, old: Any, new: Any):
"""
Asynchronously replace the old value with the new value in a locked manner. If the type of the value is different from the type of the variable, a TypeError will be raised.
:param old: The old value to replace.
:param new: The new value to replace with.
"""
if self.type and not isinstance(new, self.type):
self.warning(
f"Trying to change value of type {type(new)} to {self.type}, ignoring"
)

async with self.lock:
self.value.update(value)
if self.value == old:
self.value = new

@property
def log_prefix(self):
Expand Down
58 changes: 36 additions & 22 deletions ct-app/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
# endregion

# region Metrics
UNIQUE_PEERS = Gauge("ct_unique_peers", "Unique peers")
UNIQUE_PEERS = Gauge("ct_unique_peers", "Unique peers", ["type"])
SUBGRAPH_IN_USE = Gauge("ct_subgraph_in_use", "Subgraph in use")
SUBGRAPH_CALLS = Gauge("ct_subgraph_calls", "# of subgraph calls", ["type"])
SUBGRAPH_SIZE = Gauge("ct_subgraph_size", "Size of the subgraph")
Expand Down Expand Up @@ -117,28 +117,42 @@ async def connected_peers(self):
Aggregates the peers from all nodes and sets the all_peers LockedVar.
"""

async with self.all_peers.lock:
visible_peers = set[Peer]()

known_peers: set[Peer] = self.all_peers.value

for node in self.nodes:
visible_peers.update(await node.peers.get())

# set yearly message count to None (-> not eligible for rewards) for peers that are not visible anymore
for peer in known_peers - visible_peers:
await peer.yearly_message_count.set(None)
counts = {"new": 0, "known": 0, "unreachable": 0}

# add new peers to the set
for peer in visible_peers - known_peers:
peer.params = self.params
peer.running = True
known_peers.add(peer)

self.all_peers.value = known_peers

self.debug(f"Aggregated peers ({len(known_peers)} entries).")
UNIQUE_PEERS.set(len(known_peers))
async with self.all_peers.lock:
visible_peers: set[Peer] = set()
visible_peers.update(*[await node.peers.get() for node in self.nodes])
current_peers: set[Peer] = self.all_peers.value

for peer in current_peers:
# if peer is still visible
if peer in visible_peers:
await peer.yearly_message_count.replace_value(None, 0)
peer.running = True
counts["known"] += 1

# if peer is not visible anymore
else:
await peer.yearly_message_count.set(None)
peer.running = False
counts["unreachable"] += 1

# if peer is new
for peer in visible_peers:
if peer not in current_peers:
peer.params = self.params
await peer.yearly_message_count.set(0)
peer.start_async_processes()
current_peers.add(peer)
counts["new"] += 1

self.all_peers.value = current_peers

self.debug(
f"Aggregated peers ({len(self.all_peers.value)} entries) ({', '.join([f'{value} {key}' for key, value in counts.items() ] )})."
)
for key, value in counts.items():
UNIQUE_PEERS.labels(key).set(value)

@flagguard
@formalin
Expand Down
31 changes: 16 additions & 15 deletions ct-app/core/model/peer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import random
from datetime import datetime
from typing import Union

Expand Down Expand Up @@ -50,17 +51,6 @@ def __init__(self, id: str, address: str, version: str):
self.params = None
self.running = False

@property
def running(self) -> bool:
return self._running

@running.setter
def running(self, value: bool):
self._running = value
if value is True:
AsyncLoop.add(self.message_relay_request)
AsyncLoop.add(self.sent_messages_to_db)

def is_old(self, min_version: Union[str, Version]):
"""
Check if the peer's version is older than the specified version.
Expand Down Expand Up @@ -170,7 +160,12 @@ async def message_relay_request(self):
await self.message_count.inc(1)
await asyncio.sleep(delay)
else:
await asyncio.sleep(60)
await asyncio.sleep(
random.normalvariate(
self.params.peer.initialSleep.mean,
self.params.peer.initialSleep.std,
)
)

@flagguard
@formalin
Expand All @@ -188,9 +183,6 @@ async def sent_messages_to_db(self):
):
return

self.info(
f"Storing sent messages in the database for {self.address.id} (count: {count})"
)
entry = SentMessages(
relayer=self.address.id,
count=count,
Expand All @@ -206,6 +198,15 @@ async def sent_messages_to_db(self):
await self.message_count.sub(count)
self.last_db_storage = now

def start_async_processes(self):
if self.running is False:
self.running = True
AsyncLoop.add(self.message_relay_request)
AsyncLoop.add(self.sent_messages_to_db)

def stop_async_processes(self):
self.running = False

def __repr__(self):
return f"Peer(address: {self.address})"

Expand Down
2 changes: 0 additions & 2 deletions ct-app/core/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,6 @@ async def relayed_messages_to_db(self):
)
)

self.info(f"Storing relayed messages entries for {len(entries)} peers")

try:
DatabaseConnection.session().add_all(entries)
DatabaseConnection.session().commit()
Expand Down
16 changes: 16 additions & 0 deletions ct-app/test/components/test_lockedvar.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,19 @@ async def test_locked_var_update_with_infer_type():

with pytest.raises(TypeError):
await locked_var.update(10)


@pytest.mark.asyncio
async def test_locked_var_replace_succeeds():
locked_var = LockedVar("test_var", None, infer_type=False)
await locked_var.replace_value(None, 0)

assert (await locked_var.get()) == 0


@pytest.mark.asyncio
async def test_locked_var_replace_fails():
locked_var = LockedVar("test_var", None, infer_type=False)
await locked_var.replace_value(0, 1)

assert (await locked_var.get()) is None
11 changes: 4 additions & 7 deletions ct-app/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def peers_raw() -> list[dict]:


@pytest.fixture
def peers(peers_raw: list[dict]) -> list[Peer]:
def peers(peers_raw: list[dict]) -> set[Peer]:
peers = [
Peer(peer["peer_id"], peer["peer_address"], peer["reported_version"])
for peer in peers_raw
Expand All @@ -115,7 +115,7 @@ def peers(peers_raw: list[dict]) -> list[Peer]:
peer.safe_balance = randint(100, 200)
peer.channel_balance = randint(10, 50)

return peers
return set(peers)


@pytest.fixture
Expand All @@ -132,7 +132,7 @@ def addresses() -> list[dict]:
@pytest.fixture
async def nodes(
mocker: MockerFixture,
peers: list[Peer],
peers: set[Peer],
addresses: list[dict],
peers_raw: list[dict],
channels: NodeChannelsResponse,
Expand All @@ -145,9 +145,6 @@ async def nodes(
Node("localhost:9004", "random_key"),
]
for idx, node in enumerate(nodes):
mocker.patch.object(
node.peers, "get", return_value=peers[:idx] + peers[idx + 1 :]
)
mocker.patch.object(node.api, "get_address", return_value=addresses[idx])
mocker.patch.object(node.api, "all_channels", return_value=channels)
mocker.patch.object(
Expand All @@ -173,7 +170,7 @@ async def nodes(


@pytest.fixture
def channels(peers: list[Peer]) -> NodeChannelsResponse:
def channels(peers: set[Peer]) -> NodeChannelsResponse:
channels = list[ChannelInfoResponse]()
index = 0

Expand Down
68 changes: 60 additions & 8 deletions ct-app/test/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,72 @@ async def test_check_subgraph_urls(core: Core):


@pytest.mark.asyncio
async def test_aggregate_peers(core: Core, peers: list[Peer]):
async def test_connected_peers(core: Core, peers: list[Peer]):
assert len(await core.all_peers.get()) == 0

# drop manually some peers from nodes
await core.nodes[0].peers.set(set(peers[:3]))
await core.nodes[1].peers.set(set(peers[2:]))
await core.nodes[2].peers.set(set(peers[::2]))
# drop manually some peers (all in but #3)
peers_list = list(peers)
await core.nodes[0].peers.set(set(peers_list[:3]))
await core.nodes[1].peers.set(set(peers_list[4:]))
await core.nodes[2].peers.set(set(peers_list[::2]))

await core.connected_peers()
all_peers = await core.all_peers.get()

assert len(await core.all_peers.get()) == len(peers)
assert len(all_peers) == len(peers) - 1
assert (
sum([(await peer.yearly_message_count.get()) is not None for peer in all_peers])
== 4
)

for peer in await core.all_peers.get():
peer.running = False
# drop manually all but #0 and #1
for node in core.nodes:
await node.peers.set(set(peers_list[:2]))

await core.connected_peers()
all_peers = await core.all_peers.get()

assert len(all_peers) == len(peers) - 1
assert (
sum([(await peer.yearly_message_count.get()) is not None for peer in all_peers])
== 2
)
# last peer appear
await core.nodes[0].peers.update(set([peers_list[3]]))

await core.connected_peers()
all_peers = await core.all_peers.get()

assert len(all_peers) == len(peers)
assert (
sum([(await peer.yearly_message_count.get()) is not None for peer in all_peers])
== 3
)

# peer reappear
await core.nodes[0].peers.update(set([peers_list[-1]]))

await core.connected_peers()
all_peers = await core.all_peers.get()

assert len(all_peers) == len(peers)
assert (
sum([(await peer.yearly_message_count.get()) is not None for peer in all_peers])
== 4
)

# all disappear
for node in core.nodes:
await node.peers.set(set())

await core.connected_peers()
all_peers = await core.all_peers.get()

assert len(all_peers) == len(peers)
assert (
sum([(await peer.yearly_message_count.get()) is not None for peer in all_peers])
== 0
)


@pytest.mark.asyncio
Expand Down
2 changes: 1 addition & 1 deletion helm/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ backup:
ctdapp:
core:
replicas: 1
tag: v3.2.1
tag: v3.2.2
nodes:
NODE_ADDRESS_1: http://ctdapp-green-node-1:3001
NODE_ADDRESS_2: http://ctdapp-green-node-2:3001
Expand Down

0 comments on commit b887640

Please sign in to comment.