Skip to content

Commit

Permalink
fix: libp2p host object initialization and export in package namespace
Browse files Browse the repository at this point in the history
  • Loading branch information
anomit committed Oct 23, 2024
1 parent d7311a1 commit 8dc63f6
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 55 deletions.
58 changes: 15 additions & 43 deletions pkgs/service/collector.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package service

import (
"Listen/pkgs/redis"
"context"
"encoding/json"
"io"

"github.com/ethereum/go-ethereum/common"
"github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/network"
log "github.com/sirupsen/logrus"
"io"

"Listen/pkgs/redis"
)

func handleStream(stream network.Stream) {
Expand All @@ -23,56 +26,22 @@ func handleStream(stream network.Stream) {
log.Debugln("Error reading:", err)
return
}
func handleStream(stream network.Stream) {
defer stream.Close()
for {
buf := make([]byte, 1024)
length, err := stream.Read(buf)
if err != nil {
if err == io.EOF {
log.Debugln("End of stream reached")
break
}
log.Debugln("Error reading:", err)
return
}
submissionId := uuid.New()
err = submissionId.UnmarshalText(buf[:36])

submissionID, err := uuid.ParseBytes(buf[:36])
if err != nil {
log.Debugln("Unable to unmarshal uuid for submission: ", string(buf[:36]))
log.Debugln("Unable to parse UUID for submission: ", string(buf[:36]))
return
}

// Extract data market address
// EVM address is a fixed length of 42 characters (0x followed by 40 hex digits)
dataMarketAddress := string(buf[36:78])
// convert to checksum address
dataMarketAddress = ethutil.HexToAddress(dataMarketAddress).Hex()
dataMarketAddress := common.HexToAddress(string(buf[36:78])).Hex()

// Add submission to Redis queue
queueData := map[string]interface{}{
"submission_id": submissionId.String(),
"submission_id": submissionID.String(),
"data_market_address": dataMarketAddress,
"data": string(buf[78:length]),
}
queueDataJSON, err := json.Marshal(queueData)
if err != nil {
log.Debugln("Error marshalling queue data:", err)
continue
}

err = redis.RedisClient.LPush(context.Background(), "submissionQueue", queueDataJSON).Err()
submissionId := uuid.New()
err = submissionId.UnmarshalText(buf[:36])
if err != nil {
log.Debugln("Unable to unmarshal uuid for submission: ", string(buf[:36]))
return
}

// Add submission to Redis queue
queueData := map[string]interface{}{
"submission_id": string(buf[:36]),
"data": string(buf[36:length]),
"data": string(buf[78:length]),
}
queueDataJSON, err := json.Marshal(queueData)
if err != nil {
Expand All @@ -90,5 +59,8 @@ func handleStream(stream network.Stream) {
}

func StartCollectorServer() {
collectorHost.SetStreamHandler("/collect", handleStream)
if RelayerHost == nil {
log.Fatal("RelayerHost is not initialized. Make sure ConfigureRelayer() is called before StartCollectorServer()")
}
RelayerHost.SetStreamHandler("/collect", handleStream)
}
25 changes: 13 additions & 12 deletions pkgs/service/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import (
"Listen/config"
"context"
"encoding/base64"
"os"
"syscall"
"time"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
Expand All @@ -16,12 +20,9 @@ import (
libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls"
ma "github.com/multiformats/go-multiaddr"
log "github.com/sirupsen/logrus"
"os"
"syscall"
"time"
)

var collectorHost host.Host
var RelayerHost host.Host
var activeConnections int

func handleConnectionEstablished(network network.Network, conn network.Conn) {
Expand Down Expand Up @@ -127,7 +128,7 @@ func ConfigureRelayer() {
}
}

collectorHost, err = libp2p.New(
RelayerHost, err = libp2p.New(
libp2p.EnableRelay(),
libp2p.Identity(pk),
libp2p.ConnectionManager(connManager),
Expand All @@ -141,24 +142,24 @@ func ConfigureRelayer() {
libp2p.EnableHolePunching(),
libp2p.Muxer(yamux.ID, yamux.DefaultTransport),
)
collectorHost.Network().Notify(&network.NotifyBundle{
RelayerHost.Network().Notify(&network.NotifyBundle{
ConnectedF: handleConnectionEstablished,
DisconnectedF: handleConnectionClosed,
})

kademliaDHT := ConfigureDHT(context.Background(), collectorHost)
kademliaDHT := ConfigureDHT(context.Background(), RelayerHost)

// Create a discovery service using the DHT
routingDiscovery := routing.NewRoutingDiscovery(kademliaDHT)

log.Debugln("Listener ID:", collectorHost.ID().String())
log.Debugln("Peerable addresses: ", collectorHost.Addrs())
log.Debugln("Listener ID:", RelayerHost.ID().String())
log.Debugln("Peerable addresses: ", RelayerHost.Addrs())

// Form initial connections to all peers available
go ConnectToPeers(context.Background(), routingDiscovery, config.SettingsObj.RelayerRendezvousPoint, collectorHost)
go ConnectToPeers(context.Background(), routingDiscovery, config.SettingsObj.RelayerRendezvousPoint, RelayerHost)

// Keep checking for new peers at a set interval
go DiscoverAtInterval(context.Background(), routingDiscovery, config.SettingsObj.RelayerRendezvousPoint, collectorHost, time.NewTicker(5*time.Minute))
go DiscoverAtInterval(context.Background(), routingDiscovery, config.SettingsObj.RelayerRendezvousPoint, RelayerHost, time.NewTicker(5*time.Minute))

log.Debugf("Collector relayer info: %s", collectorHost.ID().String())
log.Debugf("Listener host info: %s", RelayerHost.ID().String())
}

0 comments on commit 8dc63f6

Please sign in to comment.