From 374f5c7fc2fb4a49c2a4c94535aaae0a0e6d1ff8 Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Wed, 1 May 2024 21:58:59 +0330 Subject: [PATCH] addd pubsub --- exchange/fx_exchange.go | 2 +- exchange/noop_exchange.go | 5 ++ mobile/blockchain.go | 119 ++++++++++++++++++++++++++++++++++++++ mobile/client.go | 18 +++++- mobile/config.go | 13 ++++- mobile/example_test.go | 12 ++-- 6 files changed, 159 insertions(+), 10 deletions(-) diff --git a/exchange/fx_exchange.go b/exchange/fx_exchange.go index 8d13fa5..a701a8b 100644 --- a/exchange/fx_exchange.go +++ b/exchange/fx_exchange.go @@ -116,7 +116,7 @@ func NewFxExchange(h host.Host, ls ipld.LinkSystem, o ...Option) (*FxExchange, e return nil, err } tr := &http.Transport{ - DisableKeepAlives: true, // Ensure connections are not reused + DisableKeepAlives: false, // Ensure connections are not reused MaxIdleConns: 500, MaxConnsPerHost: 2000, IdleConnTimeout: 20 * time.Second, diff --git a/exchange/noop_exchange.go b/exchange/noop_exchange.go index 27bfc09..827a366 100644 --- a/exchange/noop_exchange.go +++ b/exchange/noop_exchange.go @@ -41,6 +41,11 @@ func (n NoopExchange) Shutdown(context.Context) error { return nil } +func (n NoopExchange) ShutdownIpfs(context.Context) error { + log.Debug("Shut down ipfs noop exchange.") + return nil +} + func (n NoopExchange) IpniNotifyLink(l ipld.Link) { log.Debugw("IpniNotifyLink noop exchange.", "link", l) } diff --git a/mobile/blockchain.go b/mobile/blockchain.go index 3826633..990e053 100644 --- a/mobile/blockchain.go +++ b/mobile/blockchain.go @@ -7,6 +7,7 @@ import ( "math/big" "strconv" "strings" + "time" "github.com/functionland/go-fula/blockchain" wifi "github.com/functionland/go-fula/wap/pkg/wifi" @@ -181,6 +182,124 @@ func (c *Client) ReplicateInPool(cidsBytes []byte, account string, poolID int) [ /////////////////////HARDWARE///////////////////// ////////////////////////////////////////////////// +// PreparePubSub initializes or reinitializes the pubsub system +func (c *Client) PreparePubSub(ctx context.Context) error { + topicName := "fula-global-channel" + // Check if already subscribed and topic is active + if c.sub == nil || c.topic == nil { + var err error + // Join the topic if not already joined or if topic was closed + if c.topic == nil { + c.topic, err = c.ps.Join(topicName) + if err != nil { + return fmt.Errorf("failed to join topic: %v", err) + } + } + + // Subscribe to the topic if not already subscribed + c.sub, err = c.topic.Subscribe() + if err != nil { + return fmt.Errorf("failed to subscribe to topic: %v", err) + } + } + return nil +} + +// ShutdownPubSub cleanly closes the pubsub topic and subscription +func (c *Client) ShutdownPubSub() error { + if c.sub != nil { + c.sub.Cancel() + c.sub = nil + } + if c.topic != nil { + if err := c.topic.Close(); err != nil { + return fmt.Errorf("failed to close topic: %v", err) + } + c.topic = nil + } + return nil +} + +// This function should be implemented to listen to the responses and match the correct response to the request +func (c *Client) waitForResponse(ctx context.Context, responseType string) ([]byte, error) { + // This channel might be part of the client struct or managed globally depending on your architecture + responseChan := make(chan []byte, 1) + go func() { + for { + select { + case <-ctx.Done(): + return + default: + msg, err := c.sub.Next(ctx) + if err != nil { + continue // Handle error or break as needed + } + if msg.ReceivedFrom != c.bloxPid { + continue + } + var response struct { + Type string `json:"type"` + Data []byte `json:"data"` + PeerID string `json:"peerID"` + } + if err := json.Unmarshal(msg.Data, &response); err != nil { + continue // Log or handle the error + } + if response.Type == responseType && response.PeerID == c.h.ID().String() { + responseChan <- response.Data + return + } + } + } + }() + + select { + case data := <-responseChan: + return data, nil + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +// BloxFreeSpace requests the blox avail/used free space information. +func (c *Client) BloxFreeSpaceIpfs() ([]byte, error) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Ensure that the pubsub system is ready + if err := c.PreparePubSub(ctx); err != nil { + return nil, fmt.Errorf("failed to prepare pubsub: %w", err) + } + + // Create a message to request free space, targeted at the blox peer + request := struct { + TargetPeerID string `json:"targetPeerID"` + Command string `json:"command"` + }{ + TargetPeerID: c.bloxPid.String(), // Assuming c.bloxPid is the peer ID of the blox + Command: "requestFreeSpace", + } + + requestData, err := json.Marshal(request) + if err != nil { + return nil, fmt.Errorf("failed to marshal request data: %w", err) + } + + // Publish the request + if err := c.topic.Publish(ctx, requestData); err != nil { + return nil, fmt.Errorf("failed to publish request: %w", err) + } + + // Listen for a response + // This part assumes your ListenForMessages handles responses and can return them + response, err := c.waitForResponse(ctx, "freeSpaceResponse") + if err != nil { + return nil, err + } + + return response, nil +} + // BloxFreeSpace requests the blox avail/used free space information. func (c *Client) BloxFreeSpace() ([]byte, error) { ctx := context.TODO() diff --git a/mobile/client.go b/mobile/client.go index d941b93..fcfa4dd 100644 --- a/mobile/client.go +++ b/mobile/client.go @@ -38,6 +38,7 @@ import ( "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/ipld/go-ipld-prime/traversal/selector/builder" "github.com/libp2p/go-libp2p" + pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" @@ -70,6 +71,9 @@ type Client struct { relays []string ipfsAPI iface.CoreAPI ipfsNode *core.IpfsNode + ps *pubsub.PubSub + topic *pubsub.Topic + sub *pubsub.Subscription } type DatastoreConfigSpec struct { @@ -92,9 +96,9 @@ type Child struct { Compression string `json:"compression,omitempty"` } -func CustomHostOption(opts []libp2p.Option) kubolibp2p.HostOption { +func CustomHostOption(h host.Host) kubolibp2p.HostOption { return func(id peer.ID, ps peerstore.Peerstore, options ...libp2p.Option) (host.Host, error) { - return libp2p.New(opts...) + return h, nil } } @@ -242,9 +246,11 @@ func CreateCustomRepo(ctx context.Context, cfg *Config, basePath string, h host. "/dnsaddr/bootstrap.libp2p.io/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt", "/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", "/ip4/104.131.131.82/udp/4001/quic-v1/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", + "/dns4/1.pools.functionyard.fula.network/tcp/9096/p2p/12D3KooWS79EhkPU7ESUwgG4vyHHzW9FDNZLoWVth9b5N5NSrvaj", } conf.Swarm.RelayService.Enabled = 1 conf.Discovery.MDNS.Enabled = true + conf.Pubsub.Enabled = 1 // Initialize the repo with the configuration @@ -301,11 +307,14 @@ func (c *Client) ConnectToBlox() error { } func (c *Client) ConnectToBloxIpfs() error { + ctx := context.TODO() if _, ok := c.ex.(exchange.NoopExchange); ok { return nil } - return c.ipfsAPI.Swarm().Connect(context.TODO(), c.h.Peerstore().PeerInfo(c.bloxPid)) + err := c.ipfsAPI.Swarm().Connect(ctx, c.h.Peerstore().PeerInfo(c.bloxPid)) + return err + } // ID returns the libp2p peer ID of the client. @@ -636,6 +645,7 @@ func (c *Client) ShutdownIpfs() error { hErr := c.h.Close() fErr := c.Flush() dsErr := c.ds.Close() + psErr := c.ShutdownPubSub() switch { case hErr != nil: return hErr @@ -643,6 +653,8 @@ func (c *Client) ShutdownIpfs() error { return fErr case dsErr != nil: return dsErr + case psErr != nil: + return psErr default: return xErr } diff --git a/mobile/config.go b/mobile/config.go index 90d1e85..f654374 100644 --- a/mobile/config.go +++ b/mobile/config.go @@ -28,6 +28,8 @@ import ( cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/libp2p/go-libp2p" dht "github.com/libp2p/go-libp2p-kad-dht" + + pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -361,7 +363,7 @@ func (cfg *Config) initIpfs(ctx context.Context, mc *Client) error { ipfsConfig := &core.BuildCfg{ Online: true, Permanent: false, - Host: CustomHostOption(hopts), + Host: CustomHostOption(mc.h), Routing: kubolibp2p.DHTOption, Repo: repo, } @@ -373,10 +375,12 @@ func (cfg *Config) initIpfs(ctx context.Context, mc *Client) error { log.Print("mc ipfsNode created") // ipfsHostId := ipfsNode.PeerHost.ID() // ipfsId := ipfsNode.Identity.String() + ipfsAPI, err := coreapi.NewCoreAPI(ipfsNode) if err != nil { panic(fmt.Errorf("failed to create IPFS API: %w", err)) } + mc.ipfsNode = ipfsNode mc.ipfsAPI = ipfsAPI log.Print("mc ipfsAPI created") @@ -439,6 +443,13 @@ func (cfg *Config) initIpfs(ctx context.Context, mc *Client) error { } } } + + ps, err := pubsub.NewGossipSub(context.Background(), mc.h) + if err != nil { + return fmt.Errorf("failed to create pubsub: %w", err) + } + mc.ps = ps + ctx3, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() // Ensure the context cancel function is called to free resources diff --git a/mobile/example_test.go b/mobile/example_test.go index f3690ad..6988fd8 100644 --- a/mobile/example_test.go +++ b/mobile/example_test.go @@ -304,9 +304,9 @@ func Example_poolExchangeDagBetweenClientBlox() { mcfg.StorePath = "C:/Users/ehsan/.tmp/datastore" mcfg.ConfigPath = "C:/Users/ehsan/.tmp" mcfg.AllowTransientConnection = true - bloxAddrString := "/ip4/70.34.208.109/udp/4001/quic-v1/p2p/12D3KooWQto3ReEkHtMNByVsSnMUWxRZyxk1Ni4yfMuizmYNmcJ9" + bloxAddrString := "/ip4/192.168.1.4/udp/4001/quic-v1/webtransport/certhash/uEiAkTJtNLmP1bkTg8_yWc7mHAM1i_zZ8RcVfxakkhkKjRg/certhash/uEiAyxCIPgYELZLjtX4JsWS7SlYV8XK78N9QqXhLBcAg0QQ" bloxId := "12D3KooWDaT8gS2zGMLGBKmW1mKhQSHxYeEX3Fr3VSjuPzmjyfZC" - mcfg.BloxAddr = bloxAddrString + "/p2p-circuit/p2p/" + bloxId + mcfg.BloxAddr = bloxAddrString + "/p2p/" + bloxId mcfg.PoolName = "1" mcfg.Exchange = bloxAddrString mcfg.BlockchainEndpoint = "127.0.0.1:4004" @@ -342,14 +342,16 @@ func Example_poolExchangeDagBetweenClientBlox() { panic(err) } fmt.Println("connected to blox") - /*_, err = cIpfs1.BloxFreeSpace() + + _, err = cIpfs1.BloxFreeSpaceIpfs() if err != nil { + log.Error("An Error occurred with panic") panic(err) - }*/ + } // Output: //Private Key Bytes: 08011240eefe2dafe94055ecd466687390f8ba331cce1f3b65432bd4a6fc2b8a304f0deae19005ae027ef4fdcd1eb327f8b1c79a365a90cbf6f07e7022ba5d3564eee1ec - // first client created with ID: 12D3KooWQzsGtYKX62PFvTeh67H7jdU7QqcMJgwiEzKFJCbqrKw112D3KooWQzsGtYKX62PFvTeh67H7jdU7QqcMJgwiEzKFJCbqrKw1 + // first client created with ID: 12D3KooWQzsGtYKX62PFvTeh67H7jdU7QqcMJgwiEzKFJCbqrKw1 // connected to blox // Instantiated node in pool 1 with ID: 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM // Original Val is: some raw data