From 6c6b459c5cccdea7cc5f47bc4c69c994cf93808e Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 13 Sep 2024 17:32:25 -0400 Subject: [PATCH 1/5] network: allow multi-role phonebook entries --- network/p2p/peerstore/peerstore.go | 18 +++++- network/p2p/peerstore/peerstore_test.go | 83 +++++++++++++++++-------- network/p2pNetwork.go | 52 +++++++++------- network/p2pNetwork_test.go | 11 ++-- network/phonebook/phonebook.go | 57 ++++++++++++++--- network/phonebook/phonebook_test.go | 4 +- 6 files changed, 157 insertions(+), 68 deletions(-) diff --git a/network/p2p/peerstore/peerstore.go b/network/p2p/peerstore/peerstore.go index 5ae9c6aa04..c5fc6d3ade 100644 --- a/network/p2p/peerstore/peerstore.go +++ b/network/p2p/peerstore/peerstore.go @@ -213,11 +213,21 @@ func (ps *PeerStore) ReplacePeerList(addressesThey []*peer.AddrInfo, networkName data, _ := ps.Get(pid, addressDataKey) if data != nil { ad := data.(addressData) + updated := false ad.mu.RLock() - if ad.networkNames[networkName] && ad.role == role && !ad.persistent { - removeItems[pid] = true + if ad.networkNames[networkName] && !ad.persistent { + if ad.role.Is(role) { + removeItems[pid] = true + } else if ad.role.Has(role) { + ad.role.Remove(role) + updated = true + } } ad.mu.RUnlock() + + if updated { + _ = ps.Put(pid, addressDataKey, ad) + } } } @@ -229,7 +239,9 @@ func (ps *PeerStore) ReplacePeerList(addressesThey []*peer.AddrInfo, networkName ad := data.(addressData) ad.mu.Lock() ad.networkNames[networkName] = true + ad.role.Assign(role) ad.mu.Unlock() + _ = ps.Put(info.ID, addressDataKey, ad) // do not remove this entry delete(removeItems, info.ID) @@ -326,7 +338,7 @@ func (ps *PeerStore) filterRetryTime(t time.Time, role phonebook.PhoneBookEntryR data, _ := ps.Get(peerID, addressDataKey) if data != nil { ad := data.(addressData) - if t.After(ad.retryAfter) && role == ad.role { + if t.After(ad.retryAfter) && ad.role.Has(role) { mas := ps.Addrs(peerID) info := peer.AddrInfo{ID: peerID, Addrs: mas} o = append(o, &info) diff --git a/network/p2p/peerstore/peerstore_test.go b/network/p2p/peerstore/peerstore_test.go index d82b34595d..4ab99ae292 100644 --- a/network/p2p/peerstore/peerstore_test.go +++ b/network/p2p/peerstore/peerstore_test.go @@ -32,13 +32,6 @@ import ( "github.com/algorand/go-algorand/test/partitiontest" ) -// PhoneBookEntryRelayRole used for all the relays that are provided either via the algobootstrap SRV record -// or via a configuration file. -const PhoneBookEntryRelayRole = 1 - -// PhoneBookEntryArchiverRole used for all the archivers that are provided via the archive SRV record. -const PhoneBookEntryArchiverRole = 2 - func TestPeerstore(t *testing.T) { partitiontest.PartitionTest(t) t.Parallel() @@ -90,7 +83,7 @@ func TestPeerstore(t *testing.T) { } func testPhonebookAll(t *testing.T, set []*peer.AddrInfo, ph *PeerStore) { - actual := ph.GetAddresses(len(set), PhoneBookEntryRelayRole) + actual := ph.GetAddresses(len(set), phonebook.PhoneBookEntryRelayRole) for _, info := range actual { ok := false for _, known := range set { @@ -125,7 +118,7 @@ func testPhonebookUniform(t *testing.T, set []*peer.AddrInfo, ph *PeerStore, get counts[set[i].ID.String()] = 0 } for i := 0; i < uniformityTestLength; i++ { - actual := ph.GetAddresses(getsize, PhoneBookEntryRelayRole) + actual := ph.GetAddresses(getsize, phonebook.PhoneBookEntryRelayRole) for _, info := range actual { if _, ok := counts[info.ID.String()]; ok { counts[info.ID.String()]++ @@ -161,7 +154,7 @@ func TestArrayPhonebookAll(t *testing.T) { ph, err := MakePhonebook(1, 1*time.Millisecond) require.NoError(t, err) for _, addr := range set { - entry := makePhonebookEntryData("", PhoneBookEntryRelayRole, false) + entry := makePhonebookEntryData("", phonebook.PhoneBookEntryRelayRole, false) info, _ := peerInfoFromDomainPort(addr) ph.AddAddrs(info.ID, info.Addrs, libp2p.AddressTTL) ph.Put(info.ID, addressDataKey, entry) @@ -183,7 +176,7 @@ func TestArrayPhonebookUniform1(t *testing.T) { ph, err := MakePhonebook(1, 1*time.Millisecond) require.NoError(t, err) for _, addr := range set { - entry := makePhonebookEntryData("", PhoneBookEntryRelayRole, false) + entry := makePhonebookEntryData("", phonebook.PhoneBookEntryRelayRole, false) info, _ := peerInfoFromDomainPort(addr) ph.AddAddrs(info.ID, info.Addrs, libp2p.AddressTTL) ph.Put(info.ID, addressDataKey, entry) @@ -205,7 +198,7 @@ func TestArrayPhonebookUniform3(t *testing.T) { ph, err := MakePhonebook(1, 1*time.Millisecond) require.NoError(t, err) for _, addr := range set { - entry := makePhonebookEntryData("", PhoneBookEntryRelayRole, false) + entry := makePhonebookEntryData("", phonebook.PhoneBookEntryRelayRole, false) info, _ := peerInfoFromDomainPort(addr) ph.AddAddrs(info.ID, info.Addrs, libp2p.AddressTTL) ph.Put(info.ID, addressDataKey, entry) @@ -234,8 +227,8 @@ func TestMultiPhonebook(t *testing.T) { ph, err := MakePhonebook(1, 1*time.Millisecond) require.NoError(t, err) - ph.ReplacePeerList(pha, "pha", PhoneBookEntryRelayRole) - ph.ReplacePeerList(phb, "phb", PhoneBookEntryRelayRole) + ph.ReplacePeerList(pha, "pha", phonebook.PhoneBookEntryRelayRole) + ph.ReplacePeerList(phb, "phb", phonebook.PhoneBookEntryRelayRole) testPhonebookAll(t, infoSet, ph) testPhonebookUniform(t, infoSet, ph, 1) @@ -268,13 +261,13 @@ func TestMultiPhonebookPersistentPeers(t *testing.T) { } ph, err := MakePhonebook(1, 1*time.Millisecond) require.NoError(t, err) - ph.AddPersistentPeers(persistentPeers, "pha", PhoneBookEntryRelayRole) - ph.AddPersistentPeers(persistentPeers, "phb", PhoneBookEntryRelayRole) - ph.ReplacePeerList(pha, "pha", PhoneBookEntryRelayRole) - ph.ReplacePeerList(phb, "phb", PhoneBookEntryRelayRole) + ph.AddPersistentPeers(persistentPeers, "pha", phonebook.PhoneBookEntryRelayRole) + ph.AddPersistentPeers(persistentPeers, "phb", phonebook.PhoneBookEntryRelayRole) + ph.ReplacePeerList(pha, "pha", phonebook.PhoneBookEntryRelayRole) + ph.ReplacePeerList(phb, "phb", phonebook.PhoneBookEntryRelayRole) testPhonebookAll(t, append(infoSet, info), ph) - allAddresses := ph.GetAddresses(len(set)+len(persistentPeers), PhoneBookEntryRelayRole) + allAddresses := ph.GetAddresses(len(set)+len(persistentPeers), phonebook.PhoneBookEntryRelayRole) for _, pp := range persistentPeers { found := false for _, addr := range allAddresses { @@ -308,8 +301,8 @@ func TestMultiPhonebookDuplicateFiltering(t *testing.T) { } ph, err := MakePhonebook(1, 1*time.Millisecond) require.NoError(t, err) - ph.ReplacePeerList(pha, "pha", PhoneBookEntryRelayRole) - ph.ReplacePeerList(phb, "phb", PhoneBookEntryRelayRole) + ph.ReplacePeerList(pha, "pha", phonebook.PhoneBookEntryRelayRole) + ph.ReplacePeerList(phb, "phb", phonebook.PhoneBookEntryRelayRole) testPhonebookAll(t, infoSet, ph) testPhonebookUniform(t, infoSet, ph, 1) @@ -338,7 +331,7 @@ func TestWaitAndAddConnectionTimeLongtWindow(t *testing.T) { // Test the addresses are populated in the phonebook and a // time can be added to one of them - entries.ReplacePeerList([]*peer.AddrInfo{info1, info2}, "default", PhoneBookEntryRelayRole) + entries.ReplacePeerList([]*peer.AddrInfo{info1, info2}, "default", phonebook.PhoneBookEntryRelayRole) addrInPhonebook, waitTime, provisionalTime := entries.GetConnectionWaitTime(string(info1.ID)) require.Equal(t, true, addrInPhonebook) require.Equal(t, time.Duration(0), waitTime) @@ -469,20 +462,20 @@ func TestPhonebookRoles(t *testing.T) { ph, err := MakePhonebook(1, 1) require.NoError(t, err) - ph.ReplacePeerList(infoRelaySet, "default", PhoneBookEntryRelayRole) - ph.ReplacePeerList(infoArchiverSet, "default", PhoneBookEntryArchiverRole) + ph.ReplacePeerList(infoRelaySet, "default", phonebook.PhoneBookEntryRelayRole) + ph.ReplacePeerList(infoArchiverSet, "default", phonebook.PhoneBookEntryArchivalRole) require.Equal(t, len(relaysSet)+len(archiverSet), len(ph.Peers())) require.Equal(t, len(relaysSet)+len(archiverSet), ph.Length()) - for _, role := range []phonebook.PhoneBookEntryRoles{PhoneBookEntryRelayRole, PhoneBookEntryArchiverRole} { + for _, role := range []phonebook.PhoneBookEntryRoles{phonebook.PhoneBookEntryRelayRole, phonebook.PhoneBookEntryArchivalRole} { for k := 0; k < 100; k++ { for l := 0; l < 3; l++ { entries := ph.GetAddresses(l, role) - if role == PhoneBookEntryRelayRole { + if role.Has(phonebook.PhoneBookEntryRelayRole) { for _, entry := range entries { require.Contains(t, string(entry.ID), "relay") } - } else if role == PhoneBookEntryArchiverRole { + } else if role.Has(phonebook.PhoneBookEntryArchivalRole) { for _, entry := range entries { require.Contains(t, string(entry.ID), "archiver") } @@ -491,3 +484,39 @@ func TestPhonebookRoles(t *testing.T) { } } } + +// TestPhonebookRolesMulti makes sure the same host might have multiple roles +func TestPhonebookRolesMulti(t *testing.T) { + partitiontest.PartitionTest(t) + + relaysSet := []string{"relay1:4040", "relay2:4041"} + archiverSet := []string{"relay1:4040", "archiver1:1111"} + const numUnique = 3 + + infoRelaySet := make([]*peer.AddrInfo, 0) + for _, addr := range relaysSet { + info, err := peerInfoFromDomainPort(addr) + require.NoError(t, err) + infoRelaySet = append(infoRelaySet, info) + } + + infoArchiverSet := make([]*peer.AddrInfo, 0) + for _, addr := range archiverSet { + info, err := peerInfoFromDomainPort(addr) + require.NoError(t, err) + infoArchiverSet = append(infoArchiverSet, info) + } + + ph, err := MakePhonebook(1, 1) + require.NoError(t, err) + ph.ReplacePeerList(infoRelaySet, "default", phonebook.PhoneBookEntryRelayRole) + ph.ReplacePeerList(infoArchiverSet, "default", phonebook.PhoneBookEntryArchivalRole) + require.Equal(t, numUnique, len(ph.Peers())) + require.Equal(t, numUnique, ph.Length()) + + const maxPeers = 5 + entries := ph.GetAddresses(maxPeers, phonebook.PhoneBookEntryRelayRole) + require.Equal(t, len(relaysSet), len(entries)) + entries = ph.GetAddresses(maxPeers, phonebook.PhoneBookEntryArchivalRole) + require.Equal(t, len(archiverSet), len(entries)) +} diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index aa991d0429..7c2ebf2e0c 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -434,12 +434,29 @@ func (n *P2PNetwork) meshThreadInner() int { n.log.Warnf("Error getting relay nodes from capabilities discovery: %v", err) } n.log.Debugf("Discovered %d gossip peers from DHT", len(dhtPeers)) + + // also discover archival nodes + var dhtArchivalPeers []peer.AddrInfo + const numPeersToDiscover = 5 // some arbitrary number TODO: figure out a better value based on peerSelector/fetcher algorithm + dhtArchivalPeers, err = n.capabilitiesDiscovery.PeersForCapability(p2p.Archival, numPeersToDiscover) + if err != nil { + n.log.Warnf("Error getting archival nodes from capabilities discovery: %v", err) + } + n.log.Debugf("Discovered %d archival peers from DHT", len(dhtArchivalPeers)) + + if len(dhtArchivalPeers) > 0 { + replace := make([]*peer.AddrInfo, len(dhtArchivalPeers)) + for i := range dhtArchivalPeers { + replace[i] = &dhtArchivalPeers[i] + } + n.pstore.ReplacePeerList(replace, string(n.networkID), phonebook.PhoneBookEntryArchivalRole) + } } peers := mergeP2PAddrInfoResolvedAddresses(dnsPeers, dhtPeers) - replace := make([]*peer.AddrInfo, 0, len(peers)) + replace := make([]*peer.AddrInfo, len(peers)) for i := range peers { - replace = append(replace, &peers[i]) + replace[i] = &peers[i] } if len(peers) > 0 { n.pstore.ReplacePeerList(replace, string(n.networkID), phonebook.PhoneBookEntryRelayRole) @@ -655,28 +672,19 @@ func (n *P2PNetwork) GetPeers(options ...PeerOption) []Peer { } case PeersPhonebookArchivalNodes: // query known archival nodes from DHT if enabled - if n.config.EnableDHTProviders { - const nodesToFind = 5 - infos, err := n.capabilitiesDiscovery.PeersForCapability(p2p.Archival, nodesToFind) - if err != nil { - n.log.Warnf("Error getting archival nodes from capabilities discovery: %v", err) - return peers - } - n.log.Debugf("Got %d archival node(s) from DHT", len(infos)) - for _, addrInfo := range infos { - // TODO: remove after go1.22 - info := addrInfo - if peerCore, ok := addrInfoToWsPeerCore(n, &info); ok { - peers = append(peers, &peerCore) - } + const maxNodes = 5 // some arbitrary number + addrInfos := n.pstore.GetAddresses(maxNodes, phonebook.PhoneBookEntryArchivalRole) + for _, peerInfo := range addrInfos { + if peerCore, ok := addrInfoToWsPeerCore(n, peerInfo); ok { + peers = append(peers, &peerCore) } - if n.log.GetLevel() >= logging.Debug && len(peers) > 0 { - addrs := make([]string, 0, len(peers)) - for _, peer := range peers { - addrs = append(addrs, peer.(*wsPeerCore).GetAddress()) - } - n.log.Debugf("Archival node(s) from DHT: %v", addrs) + } + if n.log.GetLevel() >= logging.Debug && len(peers) > 0 { + addrs := make([]string, 0, len(peers)) + for _, peer := range peers { + addrs = append(addrs, peer.(*wsPeerCore).GetAddress()) } + n.log.Debugf("Archival node(s) from peerstore: %v", addrs) } case PeersConnectedIn: n.wsPeersLock.RLock() diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index d18196c4c4..a991c42abe 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -592,7 +592,7 @@ func TestP2PNetworkDHTCapabilities(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, test.nis[0], nil) + netA, err := NewP2PNetwork(log.With("name", "netA"), cfg, "", nil, genesisID, config.Devtestnet, test.nis[0], nil) require.NoError(t, err) err = netA.Start() @@ -606,13 +606,13 @@ func TestP2PNetworkDHTCapabilities(t *testing.T) { multiAddrStr := addrsA[0].String() phoneBookAddresses := []string{multiAddrStr} - netB, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, test.nis[1], nil) + netB, err := NewP2PNetwork(log.With("name", "netB"), cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, test.nis[1], nil) require.NoError(t, err) err = netB.Start() require.NoError(t, err) defer netB.Stop() - netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, test.nis[2], nil) + netC, err := NewP2PNetwork(log.With("name", "netC"), cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, test.nis[2], nil) require.NoError(t, err) err = netC.Start() require.NoError(t, err) @@ -665,6 +665,7 @@ func TestP2PNetworkDHTCapabilities(t *testing.T) { func() bool { peers, err := disc.PeersForCapability(cap, test.numCapPeers) if err == nil && len(peers) == test.numCapPeers { + fmt.Printf("%s peers for cap %s: %s\n", disc.Host().ID().String(), cap, peers) return true } return false @@ -674,14 +675,16 @@ func TestP2PNetworkDHTCapabilities(t *testing.T) { fmt.Sprintf("Not all expected %s cap peers were found", cap), ) // ensure GetPeers gets PeersPhonebookArchivalNodes peers - // it appears there are artifical peers because of listening on localhost and on a real network interface + // it appears there are artificial peers because of listening on localhost and on a real network interface // so filter out and save only unique peers by their IDs net := nets[idx] + net.meshThreadInner() // update peerstore with DHT peers peers := net.GetPeers(PeersPhonebookArchivalNodes) uniquePeerIDs := make(map[peer.ID]struct{}) for _, p := range peers { wsPeer := p.(*wsPeerCore) pi, err := peer.AddrInfoFromString(wsPeer.GetAddress()) + fmt.Println("pi.ID", pi.ID) require.NoError(t, err) uniquePeerIDs[pi.ID] = struct{}{} } diff --git a/network/phonebook/phonebook.go b/network/phonebook/phonebook.go index b3aeafb0fa..dcfb111785 100644 --- a/network/phonebook/phonebook.go +++ b/network/phonebook/phonebook.go @@ -33,14 +33,45 @@ const getAllAddresses = math.MaxInt32 // currently, we have two roles : relay role and archival role, which are mutually exclusive. // //msgp:ignore PhoneBookEntryRoles -type PhoneBookEntryRoles int +type PhoneBookEntryRoles struct { + role phonebookRole + _ func() // func is not comparable so that PhoneBookEntryRoles. This is to prevent roles misuse and direct comparison. +} + +var ( + // PhoneBookEntryRelayRole used for all the relays that are provided either via the algobootstrap SRV record + // or via a configuration file. + PhoneBookEntryRelayRole = PhoneBookEntryRoles{role: relayRole} + // PhoneBookEntryArchivalRole used for all the archival nodes that are provided via the archive SRV record. + PhoneBookEntryArchivalRole = PhoneBookEntryRoles{role: archivalRole} +) -// PhoneBookEntryRelayRole used for all the relays that are provided either via the algobootstrap SRV record -// or via a configuration file. -const PhoneBookEntryRelayRole = 1 +type phonebookRole uint8 -// PhoneBookEntryArchivalRole used for all the archival nodes that are provided via the archive SRV record. -const PhoneBookEntryArchivalRole = 2 +const ( + relayRole phonebookRole = 1 << iota + archivalRole +) + +// Has checks if the role also has the other role +func (r PhoneBookEntryRoles) Has(other PhoneBookEntryRoles) bool { + return r.role&other.role != 0 +} + +// Is checks if the role is exactly the other role +func (r PhoneBookEntryRoles) Is(other PhoneBookEntryRoles) bool { + return r.role == other.role +} + +// Assign adds the other role to the role +func (r *PhoneBookEntryRoles) Assign(other PhoneBookEntryRoles) { + r.role |= other.role +} + +// Remove removes the other role from the role +func (r *PhoneBookEntryRoles) Remove(other PhoneBookEntryRoles) { + r.role &= ^other.role +} // Phonebook stores or looks up addresses of nodes we might contact type Phonebook interface { @@ -127,7 +158,7 @@ func MakePhonebook(connectionsRateLimitingCount uint, func (e *phonebookImpl) deletePhonebookEntry(entryName, networkName string) { pbEntry := e.data[entryName] delete(pbEntry.networkNames, networkName) - if 0 == len(pbEntry.networkNames) { + if len(pbEntry.networkNames) == 0 { delete(e.data, entryName) } } @@ -152,7 +183,7 @@ func (e *phonebookImpl) appendTime(addr string, t time.Time) { func (e *phonebookImpl) filterRetryTime(t time.Time, role PhoneBookEntryRoles) []string { o := make([]string, 0, len(e.data)) for addr, entry := range e.data { - if t.After(entry.retryAfter) && role == entry.role { + if t.After(entry.retryAfter) && entry.role.Has(role) { o = append(o, addr) } } @@ -170,8 +201,13 @@ func (e *phonebookImpl) ReplacePeerList(addressesThey []string, networkName stri // prepare a map of items we'd like to remove. removeItems := make(map[string]bool, 0) for k, pbd := range e.data { - if pbd.networkNames[networkName] && pbd.role == role && !pbd.persistent { - removeItems[k] = true + if pbd.networkNames[networkName] && !pbd.persistent { + if pbd.role.Is(role) { + removeItems[k] = true + } else if pbd.role.Has(role) { + pbd.role.Remove(role) + e.data[k] = pbd + } } } @@ -180,6 +216,7 @@ func (e *phonebookImpl) ReplacePeerList(addressesThey []string, networkName stri // we already have this. // Update the networkName pbData.networkNames[networkName] = true + pbData.role.Assign(role) // do not remove this entry delete(removeItems, addr) diff --git a/network/phonebook/phonebook_test.go b/network/phonebook/phonebook_test.go index d603a51a1a..863131ff66 100644 --- a/network/phonebook/phonebook_test.go +++ b/network/phonebook/phonebook_test.go @@ -354,11 +354,11 @@ func TestPhonebookRoles(t *testing.T) { for k := 0; k < 100; k++ { for l := 0; l < 3; l++ { entries := ph.GetAddresses(l, role) - if role == PhoneBookEntryRelayRole { + if role.Has(PhoneBookEntryRelayRole) { for _, entry := range entries { require.Contains(t, entry, "relay") } - } else if role == PhoneBookEntryArchivalRole { + } else if role.Has(PhoneBookEntryArchivalRole) { for _, entry := range entries { require.Contains(t, entry, "archiver") } From c5d70c3ec413e05f30c358a8ee84d10c4f6c0032 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Mon, 16 Sep 2024 09:41:08 -0400 Subject: [PATCH 2/5] fix linter --- config/config_test.go | 2 +- network/p2p/peerstore/peerstore.go | 12 +++--- network/p2p/peerstore/peerstore_test.go | 2 +- network/phonebook/phonebook.go | 52 ++++++++++++------------- network/phonebook/phonebook_test.go | 2 +- 5 files changed, 35 insertions(+), 35 deletions(-) diff --git a/config/config_test.go b/config/config_test.go index 936622b06c..cc8b2dc581 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -782,7 +782,7 @@ func TestLocal_ValidateP2PHybridConfig(t *testing.T) { NetAddress: test.netAddress, } err := c.ValidateP2PHybridConfig() - require.Equal(t, test.err, err != nil, "test=%d", i) + require.Equal(t, test.err, err != nil) }) } } diff --git a/network/p2p/peerstore/peerstore.go b/network/p2p/peerstore/peerstore.go index c5fc6d3ade..ba128cb582 100644 --- a/network/p2p/peerstore/peerstore.go +++ b/network/p2p/peerstore/peerstore.go @@ -58,7 +58,7 @@ type addressData struct { mu *deadlock.RWMutex // role is the role that this address serves. - role phonebook.PhoneBookEntryRoles + role phonebook.Roles // persistent is set true for peers whose record should not be removed for the peer list persistent bool @@ -97,7 +97,7 @@ func MakePhonebook(connectionsRateLimitingCount uint, } // GetAddresses returns up to N addresses, but may return fewer -func (ps *PeerStore) GetAddresses(n int, role phonebook.PhoneBookEntryRoles) []*peer.AddrInfo { +func (ps *PeerStore) GetAddresses(n int, role phonebook.Roles) []*peer.AddrInfo { return shuffleSelect(ps.filterRetryTime(time.Now(), role), n) } @@ -205,7 +205,7 @@ func (ps *PeerStore) UpdateConnectionTime(addrOrPeerID string, provisionalTime t } // ReplacePeerList replaces the peer list for the given networkName and role. -func (ps *PeerStore) ReplacePeerList(addressesThey []*peer.AddrInfo, networkName string, role phonebook.PhoneBookEntryRoles) { +func (ps *PeerStore) ReplacePeerList(addressesThey []*peer.AddrInfo, networkName string, role phonebook.Roles) { // prepare a map of items we'd like to remove. removeItems := make(map[peer.ID]bool, 0) peerIDs := ps.Peers() @@ -261,7 +261,7 @@ func (ps *PeerStore) ReplacePeerList(addressesThey []*peer.AddrInfo, networkName // AddPersistentPeers stores addresses of peers which are persistent. // i.e. they won't be replaced by ReplacePeerList calls -func (ps *PeerStore) AddPersistentPeers(addrInfo []*peer.AddrInfo, networkName string, role phonebook.PhoneBookEntryRoles) { +func (ps *PeerStore) AddPersistentPeers(addrInfo []*peer.AddrInfo, networkName string, role phonebook.Roles) { for _, info := range addrInfo { data, _ := ps.Get(info.ID, addressDataKey) if data != nil { @@ -285,7 +285,7 @@ func (ps *PeerStore) Length() int { } // makePhonebookEntryData creates a new address entry for provided network name and role. -func makePhonebookEntryData(networkName string, role phonebook.PhoneBookEntryRoles, persistent bool) addressData { +func makePhonebookEntryData(networkName string, role phonebook.Roles, persistent bool) addressData { pbData := addressData{ networkNames: make(map[string]bool), mu: &deadlock.RWMutex{}, @@ -332,7 +332,7 @@ func (ps *PeerStore) popNElements(n int, peerID peer.ID) { _ = ps.Put(peerID, addressDataKey, ad) } -func (ps *PeerStore) filterRetryTime(t time.Time, role phonebook.PhoneBookEntryRoles) []*peer.AddrInfo { +func (ps *PeerStore) filterRetryTime(t time.Time, role phonebook.Roles) []*peer.AddrInfo { o := make([]*peer.AddrInfo, 0, len(ps.Peers())) for _, peerID := range ps.Peers() { data, _ := ps.Get(peerID, addressDataKey) diff --git a/network/p2p/peerstore/peerstore_test.go b/network/p2p/peerstore/peerstore_test.go index 4ab99ae292..bdce4308d2 100644 --- a/network/p2p/peerstore/peerstore_test.go +++ b/network/p2p/peerstore/peerstore_test.go @@ -467,7 +467,7 @@ func TestPhonebookRoles(t *testing.T) { require.Equal(t, len(relaysSet)+len(archiverSet), len(ph.Peers())) require.Equal(t, len(relaysSet)+len(archiverSet), ph.Length()) - for _, role := range []phonebook.PhoneBookEntryRoles{phonebook.PhoneBookEntryRelayRole, phonebook.PhoneBookEntryArchivalRole} { + for _, role := range []phonebook.Roles{phonebook.PhoneBookEntryRelayRole, phonebook.PhoneBookEntryArchivalRole} { for k := 0; k < 100; k++ { for l := 0; l < 3; l++ { entries := ph.GetAddresses(l, role) diff --git a/network/phonebook/phonebook.go b/network/phonebook/phonebook.go index dcfb111785..a35049a8de 100644 --- a/network/phonebook/phonebook.go +++ b/network/phonebook/phonebook.go @@ -29,54 +29,54 @@ import ( // of how many addresses the phonebook actually has. ( with the retry-after logic applied ) const getAllAddresses = math.MaxInt32 -// PhoneBookEntryRoles defines the roles that a single entry on the phonebook can take. +// Roles defines the roles that a single entry on the phonebook can take. // currently, we have two roles : relay role and archival role, which are mutually exclusive. // -//msgp:ignore PhoneBookEntryRoles -type PhoneBookEntryRoles struct { - role phonebookRole - _ func() // func is not comparable so that PhoneBookEntryRoles. This is to prevent roles misuse and direct comparison. +//msgp:ignore Roles +type Roles struct { + roles role + _ func() // func is not comparable so that Roles. This is to prevent roles misuse and direct comparison. } var ( // PhoneBookEntryRelayRole used for all the relays that are provided either via the algobootstrap SRV record // or via a configuration file. - PhoneBookEntryRelayRole = PhoneBookEntryRoles{role: relayRole} + PhoneBookEntryRelayRole = Roles{roles: relayRole} // PhoneBookEntryArchivalRole used for all the archival nodes that are provided via the archive SRV record. - PhoneBookEntryArchivalRole = PhoneBookEntryRoles{role: archivalRole} + PhoneBookEntryArchivalRole = Roles{roles: archivalRole} ) -type phonebookRole uint8 +type role uint8 const ( - relayRole phonebookRole = 1 << iota + relayRole role = 1 << iota archivalRole ) // Has checks if the role also has the other role -func (r PhoneBookEntryRoles) Has(other PhoneBookEntryRoles) bool { - return r.role&other.role != 0 +func (r Roles) Has(other Roles) bool { + return r.roles&other.roles != 0 } // Is checks if the role is exactly the other role -func (r PhoneBookEntryRoles) Is(other PhoneBookEntryRoles) bool { - return r.role == other.role +func (r Roles) Is(other Roles) bool { + return r.roles == other.roles } // Assign adds the other role to the role -func (r *PhoneBookEntryRoles) Assign(other PhoneBookEntryRoles) { - r.role |= other.role +func (r *Roles) Assign(other Roles) { + r.roles |= other.roles } // Remove removes the other role from the role -func (r *PhoneBookEntryRoles) Remove(other PhoneBookEntryRoles) { - r.role &= ^other.role +func (r *Roles) Remove(other Roles) { + r.roles &= ^other.roles } // Phonebook stores or looks up addresses of nodes we might contact type Phonebook interface { // GetAddresses(N) returns up to N addresses, but may return fewer - GetAddresses(n int, role PhoneBookEntryRoles) []string + GetAddresses(n int, role Roles) []string // UpdateRetryAfter updates the retry-after field for the entries matching the given address UpdateRetryAfter(addr string, retryAfter time.Time) @@ -97,11 +97,11 @@ type Phonebook interface { // new entries in dnsAddresses are being added // existing items that aren't included in dnsAddresses are being removed // matching entries don't change - ReplacePeerList(dnsAddresses []string, networkName string, role PhoneBookEntryRoles) + ReplacePeerList(dnsAddresses []string, networkName string, role Roles) // AddPersistentPeers stores addresses of peers which are persistent. // i.e. they won't be replaced by ReplacePeerList calls - AddPersistentPeers(dnsAddresses []string, networkName string, role PhoneBookEntryRoles) + AddPersistentPeers(dnsAddresses []string, networkName string, role Roles) } // addressData: holds the information associated with each phonebook address. @@ -117,14 +117,14 @@ type addressData struct { networkNames map[string]bool // role is the role that this address serves. - role PhoneBookEntryRoles + role Roles // persistent is set true for peers whose record should not be removed for the peer list persistent bool } // makePhonebookEntryData creates a new addressData entry for provided network name and role. -func makePhonebookEntryData(networkName string, role PhoneBookEntryRoles, persistent bool) addressData { +func makePhonebookEntryData(networkName string, role Roles, persistent bool) addressData { pbData := addressData{ networkNames: make(map[string]bool), recentConnectionTimes: make([]time.Time, 0), @@ -180,7 +180,7 @@ func (e *phonebookImpl) appendTime(addr string, t time.Time) { e.data[addr] = entry } -func (e *phonebookImpl) filterRetryTime(t time.Time, role PhoneBookEntryRoles) []string { +func (e *phonebookImpl) filterRetryTime(t time.Time, role Roles) []string { o := make([]string, 0, len(e.data)) for addr, entry := range e.data { if t.After(entry.retryAfter) && entry.role.Has(role) { @@ -194,7 +194,7 @@ func (e *phonebookImpl) filterRetryTime(t time.Time, role PhoneBookEntryRoles) [ // new entries in addressesThey are being added // existing items that aren't included in addressesThey are being removed // matching entries don't change -func (e *phonebookImpl) ReplacePeerList(addressesThey []string, networkName string, role PhoneBookEntryRoles) { +func (e *phonebookImpl) ReplacePeerList(addressesThey []string, networkName string, role Roles) { e.lock.Lock() defer e.lock.Unlock() @@ -232,7 +232,7 @@ func (e *phonebookImpl) ReplacePeerList(addressesThey []string, networkName stri } } -func (e *phonebookImpl) AddPersistentPeers(dnsAddresses []string, networkName string, role PhoneBookEntryRoles) { +func (e *phonebookImpl) AddPersistentPeers(dnsAddresses []string, networkName string, role Roles) { e.lock.Lock() defer e.lock.Unlock() @@ -372,7 +372,7 @@ func shuffleSelect(set []string, n int) []string { } // GetAddresses returns up to N shuffled address -func (e *phonebookImpl) GetAddresses(n int, role PhoneBookEntryRoles) []string { +func (e *phonebookImpl) GetAddresses(n int, role Roles) []string { e.lock.RLock() defer e.lock.RUnlock() return shuffleSelect(e.filterRetryTime(time.Now(), role), n) diff --git a/network/phonebook/phonebook_test.go b/network/phonebook/phonebook_test.go index 863131ff66..1ebf048ea3 100644 --- a/network/phonebook/phonebook_test.go +++ b/network/phonebook/phonebook_test.go @@ -350,7 +350,7 @@ func TestPhonebookRoles(t *testing.T) { require.Equal(t, len(relaysSet)+len(archiverSet), len(ph.data)) require.Equal(t, len(relaysSet)+len(archiverSet), ph.Length()) - for _, role := range []PhoneBookEntryRoles{PhoneBookEntryRelayRole, PhoneBookEntryArchivalRole} { + for _, role := range []Roles{PhoneBookEntryRelayRole, PhoneBookEntryArchivalRole} { for k := 0; k < 100; k++ { for l := 0; l < 3; l++ { entries := ph.GetAddresses(l, role) From 8dd3f10a0555d515bc7fd957391d1cf1803e7bc8 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Mon, 16 Sep 2024 09:59:22 -0400 Subject: [PATCH 3/5] add roles test --- network/phonebook/phonebook_test.go | 34 +++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/network/phonebook/phonebook_test.go b/network/phonebook/phonebook_test.go index 1ebf048ea3..8bd2aeb224 100644 --- a/network/phonebook/phonebook_test.go +++ b/network/phonebook/phonebook_test.go @@ -367,3 +367,37 @@ func TestPhonebookRoles(t *testing.T) { } } } + +func TestRolesOperations(t *testing.T) { + partitiontest.PartitionTest(t) + + var tests = []struct { + role Roles + otherRoles Roles + }{ + {PhoneBookEntryRelayRole, PhoneBookEntryArchivalRole}, + {PhoneBookEntryArchivalRole, PhoneBookEntryRelayRole}, + } + + for _, test := range tests { + require.True(t, test.role.Has(test.role)) + require.True(t, test.role.Is(test.role)) + require.False(t, test.role.Has(test.otherRoles)) + require.False(t, test.role.Is(test.otherRoles)) + + combo := Roles{roles: test.role.roles} + combo.Assign(test.otherRoles) + require.Equal(t, test.role.roles|test.otherRoles.roles, combo.roles) + require.True(t, combo.Has(test.role)) + require.False(t, combo.Is(test.role)) + require.True(t, combo.Has(test.otherRoles)) + require.False(t, combo.Is(test.otherRoles)) + + combo.Remove(test.otherRoles) + require.Equal(t, test.role.roles, combo.roles) + require.True(t, combo.Has(test.role)) + require.True(t, combo.Is(test.role)) + require.False(t, combo.Has(test.otherRoles)) + require.False(t, combo.Is(test.otherRoles)) + } +} From d604984f15300c25b4f4e2aad377dd086233fac4 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Mon, 16 Sep 2024 10:38:31 -0400 Subject: [PATCH 4/5] add phonebook/peerstore multi role tests --- network/p2p/peerstore/peerstore_test.go | 86 +++++++++++++++++++++++++ network/p2pNetwork_test.go | 2 - network/phonebook/phonebook.go | 1 + network/phonebook/phonebook_test.go | 36 +++++++++++ 4 files changed, 123 insertions(+), 2 deletions(-) diff --git a/network/p2p/peerstore/peerstore_test.go b/network/p2p/peerstore/peerstore_test.go index bdce4308d2..8bf53dbe93 100644 --- a/network/p2p/peerstore/peerstore_test.go +++ b/network/p2p/peerstore/peerstore_test.go @@ -520,3 +520,89 @@ func TestPhonebookRolesMulti(t *testing.T) { entries = ph.GetAddresses(maxPeers, phonebook.PhoneBookEntryArchivalRole) require.Equal(t, len(archiverSet), len(entries)) } + +func TestReplacePeerList(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + relaysSet := []string{"a:1", "b:2"} + archiverSet := []string{"c:3"} + comboSet := []string{"b:2", "c:3"} // b is in both sets + + infoRelaySet := make([]*peer.AddrInfo, 0) + for _, addr := range relaysSet { + info, err := peerInfoFromDomainPort(addr) + require.NoError(t, err) + infoRelaySet = append(infoRelaySet, info) + } + + infoArchiverSet := make([]*peer.AddrInfo, 0) + for _, addr := range archiverSet { + info, err := peerInfoFromDomainPort(addr) + require.NoError(t, err) + infoArchiverSet = append(infoArchiverSet, info) + } + + infoComboArchiverSet := make([]*peer.AddrInfo, 0) + for _, addr := range comboSet { + info, err := peerInfoFromDomainPort(addr) + require.NoError(t, err) + infoComboArchiverSet = append(infoComboArchiverSet, info) + } + + ph, err := MakePhonebook(1, 1) + require.NoError(t, err) + + ph.ReplacePeerList(infoRelaySet, "default", phonebook.PhoneBookEntryRelayRole) + res := ph.GetAddresses(4, phonebook.PhoneBookEntryRelayRole) + require.Equal(t, 2, len(res)) + for _, info := range infoRelaySet { + require.Contains(t, res, info) + } + + ph.ReplacePeerList(infoArchiverSet, "default", phonebook.PhoneBookEntryArchivalRole) + res = ph.GetAddresses(4, phonebook.PhoneBookEntryArchivalRole) + require.Equal(t, 1, len(res)) + for _, info := range infoArchiverSet { + require.Contains(t, res, info) + } + + // make b archival in addition to relay + ph.ReplacePeerList(infoComboArchiverSet, "default", phonebook.PhoneBookEntryArchivalRole) + res = ph.GetAddresses(4, phonebook.PhoneBookEntryRelayRole) + require.Equal(t, 2, len(res)) + for _, info := range infoRelaySet { + require.Contains(t, res, info) + } + res = ph.GetAddresses(4, phonebook.PhoneBookEntryArchivalRole) + require.Equal(t, 2, len(res)) + for _, info := range infoComboArchiverSet { + require.Contains(t, res, info) + } + + // update relays + ph.ReplacePeerList(infoRelaySet, "default", phonebook.PhoneBookEntryRelayRole) + res = ph.GetAddresses(4, phonebook.PhoneBookEntryRelayRole) + require.Equal(t, 2, len(res)) + for _, info := range infoRelaySet { + require.Contains(t, res, info) + } + res = ph.GetAddresses(4, phonebook.PhoneBookEntryArchivalRole) + require.Equal(t, 2, len(res)) + for _, info := range infoComboArchiverSet { + require.Contains(t, res, info) + } + + // exclude b from archival + ph.ReplacePeerList(infoArchiverSet, "default", phonebook.PhoneBookEntryArchivalRole) + res = ph.GetAddresses(4, phonebook.PhoneBookEntryRelayRole) + require.Equal(t, 2, len(res)) + for _, info := range infoRelaySet { + require.Contains(t, res, info) + } + res = ph.GetAddresses(4, phonebook.PhoneBookEntryArchivalRole) + require.Equal(t, 1, len(res)) + for _, info := range infoArchiverSet { + require.Contains(t, res, info) + } +} diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index a991c42abe..e3d18fa492 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -665,7 +665,6 @@ func TestP2PNetworkDHTCapabilities(t *testing.T) { func() bool { peers, err := disc.PeersForCapability(cap, test.numCapPeers) if err == nil && len(peers) == test.numCapPeers { - fmt.Printf("%s peers for cap %s: %s\n", disc.Host().ID().String(), cap, peers) return true } return false @@ -684,7 +683,6 @@ func TestP2PNetworkDHTCapabilities(t *testing.T) { for _, p := range peers { wsPeer := p.(*wsPeerCore) pi, err := peer.AddrInfoFromString(wsPeer.GetAddress()) - fmt.Println("pi.ID", pi.ID) require.NoError(t, err) uniquePeerIDs[pi.ID] = struct{}{} } diff --git a/network/phonebook/phonebook.go b/network/phonebook/phonebook.go index a35049a8de..7739d440d8 100644 --- a/network/phonebook/phonebook.go +++ b/network/phonebook/phonebook.go @@ -217,6 +217,7 @@ func (e *phonebookImpl) ReplacePeerList(addressesThey []string, networkName stri // Update the networkName pbData.networkNames[networkName] = true pbData.role.Assign(role) + e.data[addr] = pbData // do not remove this entry delete(removeItems, addr) diff --git a/network/phonebook/phonebook_test.go b/network/phonebook/phonebook_test.go index 8bd2aeb224..f0ee06711a 100644 --- a/network/phonebook/phonebook_test.go +++ b/network/phonebook/phonebook_test.go @@ -370,6 +370,7 @@ func TestPhonebookRoles(t *testing.T) { func TestRolesOperations(t *testing.T) { partitiontest.PartitionTest(t) + t.Parallel() var tests = []struct { role Roles @@ -401,3 +402,38 @@ func TestRolesOperations(t *testing.T) { require.False(t, combo.Is(test.otherRoles)) } } + +func TestReplacePeerList(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + pb := MakePhonebook(1, 1) + pb.ReplacePeerList([]string{"a", "b"}, "default", PhoneBookEntryRelayRole) + res := pb.GetAddresses(4, PhoneBookEntryRelayRole) + require.ElementsMatch(t, []string{"a", "b"}, res) + + pb.ReplacePeerList([]string{"c"}, "default", PhoneBookEntryArchivalRole) + res = pb.GetAddresses(4, PhoneBookEntryArchivalRole) + require.ElementsMatch(t, []string{"c"}, res) + + // make b archival in addition to relay + pb.ReplacePeerList([]string{"b", "c"}, "default", PhoneBookEntryArchivalRole) + res = pb.GetAddresses(4, PhoneBookEntryRelayRole) + require.ElementsMatch(t, []string{"a", "b"}, res) + res = pb.GetAddresses(4, PhoneBookEntryArchivalRole) + require.ElementsMatch(t, []string{"b", "c"}, res) + + // update relays + pb.ReplacePeerList([]string{"a", "b"}, "default", PhoneBookEntryRelayRole) + res = pb.GetAddresses(4, PhoneBookEntryRelayRole) + require.ElementsMatch(t, []string{"a", "b"}, res) + res = pb.GetAddresses(4, PhoneBookEntryArchivalRole) + require.ElementsMatch(t, []string{"b", "c"}, res) + + // exclude b from archival + pb.ReplacePeerList([]string{"c"}, "default", PhoneBookEntryArchivalRole) + res = pb.GetAddresses(4, PhoneBookEntryRelayRole) + require.ElementsMatch(t, []string{"a", "b"}, res) + res = pb.GetAddresses(4, PhoneBookEntryArchivalRole) + require.ElementsMatch(t, []string{"c"}, res) +} From ee6b2d08430d0e2c71fc750ce4598af658bbd7b9 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Tue, 29 Oct 2024 15:44:59 -0400 Subject: [PATCH 5/5] CR fixes --- network/p2pNetwork.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index 4b286a3453..2c9512b81b 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -45,6 +45,9 @@ import ( manet "github.com/multiformats/go-multiaddr/net" ) +// some arbitrary number TODO: figure out a better value based on peerSelector/fetcher algorithm +const numArchivalPeersToFind = 4 + // P2PNetwork implements the GossipNode interface type P2PNetwork struct { service p2p.Service @@ -437,8 +440,7 @@ func (n *P2PNetwork) meshThreadInner() int { // also discover archival nodes var dhtArchivalPeers []peer.AddrInfo - const numPeersToDiscover = 5 // some arbitrary number TODO: figure out a better value based on peerSelector/fetcher algorithm - dhtArchivalPeers, err = n.capabilitiesDiscovery.PeersForCapability(p2p.Archival, numPeersToDiscover) + dhtArchivalPeers, err = n.capabilitiesDiscovery.PeersForCapability(p2p.Archival, numArchivalPeersToFind) if err != nil { n.log.Warnf("Error getting archival nodes from capabilities discovery: %v", err) } @@ -671,9 +673,8 @@ func (n *P2PNetwork) GetPeers(options ...PeerOption) []Peer { n.log.Debugf("Relay node(s) from peerstore: %v", addrs) } case PeersPhonebookArchivalNodes: - // query known archival nodes from DHT if enabled - const maxNodes = 5 // some arbitrary number - addrInfos := n.pstore.GetAddresses(maxNodes, phonebook.PhoneBookEntryArchivalRole) + // query known archival nodes that came from from DHT if enabled (or DNS if configured) + addrInfos := n.pstore.GetAddresses(numArchivalPeersToFind, phonebook.PhoneBookEntryArchivalRole) for _, peerInfo := range addrInfos { if peerCore, ok := addrInfoToWsPeerCore(n, peerInfo); ok { peers = append(peers, &peerCore)