From cae0cc5a1e41d350866883fe282a0a90036839d9 Mon Sep 17 00:00:00 2001 From: "Masih H. Derkani" Date: Mon, 11 Sep 2023 07:33:10 +0100 Subject: [PATCH] Add initial implementation of local IPFS RPC server (#161) * Add initial implementation of local IPFS RPC server Add an initial implementation of the IPFS RPC server running locally that serves pin/ls and block/stat. * Update ipfs.go * added logging for 404 * added log/level endpoint to ipfs * added stats repo to ipfs I had to also save the storeDir in blox object to be able to return and use it in stats repo call * handled empty value for sizes * modified response of stats/repo for sugafunge expected format Added RepoSize to the root of response despite the standard * added files/stat * added stats/bitswap * add stats/bw * added bitswap/ledger --------- Co-authored-by: ehsan shariati --- blox/blox.go | 11 ++ blox/ipfs.go | 393 +++++++++++++++++++++++++++++++++++++ blox/options.go | 10 + cmd/blox/main.go | 1 + wap/pkg/wifi/properties.go | 23 ++- 5 files changed, 434 insertions(+), 4 deletions(-) create mode 100644 blox/ipfs.go diff --git a/blox/blox.go b/blox/blox.go index 6fec41fd..19a8234c 100644 --- a/blox/blox.go +++ b/blox/blox.go @@ -3,6 +3,7 @@ package blox import ( "context" "errors" + "net/http" "sync" "time" @@ -72,6 +73,16 @@ func (p *Blox) Start(ctx context.Context) error { if err := p.bl.Start(ctx); err != nil { return err } + go func() { + log.Infow("IPFS RPC server started on address http://localhost:5001") + switch err := http.ListenAndServe("localhost:5001", p.ServeIpfsRpc()); { + case errors.Is(err, http.ErrServerClosed): + log.Infow("IPFS RPC server stopped") + default: + log.Errorw("IPFS RPC server stopped erroneously", "err", err) + } + }() + gsub, err := pubsub.NewGossipSub(ctx, p.h, pubsub.WithPeerExchange(true), pubsub.WithFloodPublish(true), diff --git a/blox/ipfs.go b/blox/ipfs.go new file mode 100644 index 00000000..f23bb873 --- /dev/null +++ b/blox/ipfs.go @@ -0,0 +1,393 @@ +package blox + +import ( + "encoding/base64" + "encoding/json" + "errors" + "net/http" + + wifi "github.com/functionland/go-fula/wap/pkg/wifi" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/query" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" +) + +type ( + pinListResp struct { + PinLsList struct { + Keys map[string]pinListKeysType `json:"Keys,omitempty"` + } `json:"PinLsList"` + } + pinListKeysType struct { + Type string + } +) + +type SizeStat struct { + RepoSize uint64 `json:"RepoSize"` + StorageMax uint64 `json:"StorageMax"` +} + +type RepoInfo struct { + NumObjects uint64 `json:"NumObjects"` + RepoPath string `json:"RepoPath"` + SizeStat SizeStat `json:"SizeStat"` + Version string `json:"Version"` + RepoSize uint64 `json:"RepoSize"` +} + +type FilesStat struct { + Blocks int `json:"Blocks"` + CumulativeSize uint64 `json:"CumulativeSize"` + Hash string `json:"Hash"` + Local bool `json:"Local,omitempty"` // Optional field. 'omitempty' keyword is used to exclude the field from the output if it's default/zero value + Size uint64 `json:"Size"` + SizeLocal uint64 `json:"SizeLocal,omitempty"` // Optional field. + Type string `json:"Type"` + WithLocality bool `json:"WithLocality,omitempty"` // Optional field. +} + +type CidStruct struct { + Root string `json:"/"` +} + +type StatsBitswap struct { + BlocksReceived uint64 `json:"BlocksReceived"` + BlocksSent uint64 `json:"BlocksSent"` + DataReceived uint64 `json:"DataReceived"` + DataSent uint64 `json:"DataSent"` + DupBlksReceived uint64 `json:"DupBlksReceived"` + DupDataReceived uint64 `json:"DupDataReceived"` + MessagesReceived uint64 `json:"MessagesReceived"` + Peers []string `json:"Peers"` + ProvideBufLen int `json:"ProvideBufLen"` + Wantlist []CidStruct `json:"Wantlist"` +} + +type StatsBw struct { + RateIn float64 `json:"RateIn"` + RateOut float64 `json:"RateOut"` + TotalIn int64 `json:"TotalIn"` + TotalOut int64 `json:"TotalOut"` +} + +type PeerStats struct { + Exchanged uint64 `json:"Exchanged"` + Peer string `json:"Peer"` + Recv uint64 `json:"Recv"` + Sent uint64 `json:"Sent"` + Value float64 `json:"Value"` +} + +func notFoundHandler(w http.ResponseWriter, r *http.Request) { + params := r.URL.Query() + log.Errorw("404 Not Found", + "method", r.Method, + "url", r.URL.String(), + "params", params, + ) + http.NotFound(w, r) +} + +func (p *Blox) ServeIpfsRpc() http.Handler { + mux := http.NewServeMux() + // https://docs.ipfs.tech/reference/kubo/rpc/#api-v0-pin-ls + mux.HandleFunc("/api/v0/pin/ls", func(w http.ResponseWriter, r *http.Request) { + var resp pinListResp + resp.PinLsList.Keys = make(map[string]pinListKeysType) + results, err := p.ds.Query(r.Context(), query.Query{ + KeysOnly: true, + }) + if err != nil { + log.Errorw("failed to query datastore", "err", err) + http.Error(w, "internal error while querying datastore: "+err.Error(), http.StatusInternalServerError) + return + } + for result := range results.Next() { + if result.Error != nil { + log.Errorw("failed to traverse results", "err", err) + http.Error(w, "internal error while traversing datastore results: "+err.Error(), http.StatusInternalServerError) + return + } + c, err := cid.Cast([]byte(result.Key)) + if err != nil { + log.Debugw("failed to cast key to cid", "key", result.Key, "err", err) + continue + } + resp.PinLsList.Keys[c.String()] = pinListKeysType{Type: "fx"} //TODO: what should the type be? + } + if err := json.NewEncoder(w).Encode(resp); err != nil { + log.Errorw("failed to encode response to pin ls", "err", err) + } + }) + // https://docs.ipfs.tech/reference/kubo/rpc/#api-v0-block-stat + mux.HandleFunc("/api/v0/block/stat", func(w http.ResponseWriter, r *http.Request) { + c := r.URL.Query().Get("cid") + if c == "" { + http.Error(w, "no cid specified", http.StatusBadRequest) + return + } + cd, err := cid.Decode(c) + if err != nil { + http.Error(w, "invalid cid: "+err.Error(), http.StatusBadRequest) + return + } + key := toDatastoreKey(cidlink.Link{Cid: cd}) + switch value, err := p.ds.Get(r.Context(), key); { + case errors.Is(err, datastore.ErrNotFound): + http.NotFound(w, r) + return + case err != nil: + http.Error(w, "internal error: "+err.Error(), http.StatusInternalServerError) + return + default: + resp := struct { + Key string `json:"Key"` + Size int `json:"Size"` + }{ + Key: cd.String(), + Size: len(value), + } + if err := json.NewEncoder(w).Encode(resp); err != nil { + log.Errorw("failed to encode response to block stat", "err", err) + } + } + }) + // https://docs.ipfs.tech/reference/kubo/rpc/#api-v0-id + mux.HandleFunc("/api/v0/id", func(w http.ResponseWriter, r *http.Request) { + //Get string value of addresses + addresses := p.h.Addrs() + addressStrings := make([]string, len(addresses)) + for i, addr := range addresses { + addressStrings[i] = addr.String() + } + + //Get Public Key + pubKey, err := p.h.ID().ExtractPublicKey() + if err != nil { + log.Errorw("Public key is not available", err) + return + } + pubKeyBytes, err := pubKey.Raw() + if err != nil { + log.Errorw("Error getting raw public key:", err) + return + } + + pubKeyBase64 := base64.StdEncoding.EncodeToString(pubKeyBytes) + + resp := struct { + Addresses []string `json:"Addresses"` + AgentVersion string `json:"AgentVersion"` + ID string `json:"ID"` + ProtocolVersion string `json:"ProtocolVersion"` + Protocols []string `json:"Protocols"` + PublicKey string `json:"PublicKey"` + }{ + Addresses: addressStrings, + AgentVersion: Version0, + ID: p.h.ID().String(), + ProtocolVersion: "fx_exchange/" + Version0, + Protocols: []string{"fx_exchange"}, + PublicKey: pubKeyBase64, + } + if err := json.NewEncoder(w).Encode(resp); err != nil { + log.Errorw("failed to encode response to id", "err", err) + } + + }) + + // https://docs.ipfs.tech/reference/kubo/rpc/#api-log-level + mux.HandleFunc("/api/v0/log/level", func(w http.ResponseWriter, r *http.Request) { + resp := struct { + Message string `json:"Message"` + }{ + Message: "ignored", + } + if err := json.NewEncoder(w).Encode(resp); err != nil { + log.Errorw("failed to encode response to log level", "err", err) + } + + }) + + // https://docs.ipfs.tech/reference/kubo/rpc/#api-v0-stats-repo Modified for Sugarfunge by adding RepoSize in the root of response + mux.HandleFunc("/api/v0/stats/repo", func(w http.ResponseWriter, r *http.Request) { + //get StorageMax + storage, err := wifi.GetBloxFreeSpace() + if err != nil { + log.Errorw("failed to get storage stats", "err", err) + http.Error(w, "internal error while getting storage stats: "+err.Error(), http.StatusInternalServerError) + return + } + + //Get RepoSize and NumObjects + repoSize := 0 + numObjects := 0 + results, err := p.ds.Query(r.Context(), query.Query{ + KeysOnly: true, + }) + if err != nil { + log.Errorw("failed to query datastore", "err", err) + http.Error(w, "internal error while querying datastore: "+err.Error(), http.StatusInternalServerError) + return + } + for result := range results.Next() { + if result.Error != nil { + log.Errorw("failed to traverse results", "err", err) + http.Error(w, "internal error while traversing datastore results: "+err.Error(), http.StatusInternalServerError) + return + } + repoSize = repoSize + result.Size + numObjects = numObjects + 1 + } + + resp := RepoInfo{ + NumObjects: uint64(numObjects), + RepoPath: p.storeDir, + RepoSize: uint64(repoSize), + SizeStat: SizeStat{ + RepoSize: uint64(repoSize), + StorageMax: uint64(storage.Size), + }, + Version: "fx-repo@" + Version0, + } + if err := json.NewEncoder(w).Encode(resp); err != nil { + log.Errorw("failed to encode response to stats repo", "err", err) + } + }) + + // https://docs.ipfs.tech/reference/kubo/rpc/#api-v0-files-stat + mux.HandleFunc("/api/v0/files/stat", func(w http.ResponseWriter, r *http.Request) { + //Get RepoSize and NumObjects + repoSize := 0 + numObjects := 0 + results, err := p.ds.Query(r.Context(), query.Query{ + KeysOnly: true, + }) + if err != nil { + log.Errorw("failed to query datastore", "err", err) + http.Error(w, "internal error while querying datastore: "+err.Error(), http.StatusInternalServerError) + return + } + for result := range results.Next() { + if result.Error != nil { + log.Errorw("failed to traverse results", "err", err) + http.Error(w, "internal error while traversing datastore results: "+err.Error(), http.StatusInternalServerError) + return + } + repoSize = repoSize + result.Size + numObjects = numObjects + 1 + } + + resp := FilesStat{ + Hash: "", //TODO: Get hash of root directory + Size: uint64(repoSize), + CumulativeSize: uint64(repoSize), + Blocks: numObjects, + Type: "directory", + } + if err := json.NewEncoder(w).Encode(resp); err != nil { + log.Errorw("failed to encode response to files stat", "err", err) + } + }) + + // https://docs.ipfs.tech/reference/kubo/rpc/#api-v0-stats-bitswap + mux.HandleFunc("/api/v0/stats/bitswap", func(w http.ResponseWriter, r *http.Request) { + //Get RepoSize and NumObjects + repoSize := 0 + numObjects := 0 + results, err := p.ds.Query(r.Context(), query.Query{ + KeysOnly: true, + }) + if err != nil { + log.Errorw("failed to query datastore", "err", err) + http.Error(w, "internal error while querying datastore: "+err.Error(), http.StatusInternalServerError) + return + } + for result := range results.Next() { + if result.Error != nil { + log.Errorw("failed to traverse results", "err", err) + http.Error(w, "internal error while traversing datastore results: "+err.Error(), http.StatusInternalServerError) + return + } + repoSize = repoSize + result.Size + numObjects = numObjects + 1 + } + + // First, create a slice to store the string representations + peerStrings := make([]string, len(p.authorizedPeers)) + + // Then convert each peer.ID to a string + for i, peerID := range p.authorizedPeers { + peerStrings[i] = peerID.String() + } + + resp := StatsBitswap{ + Wantlist: []CidStruct{}, // empty slice + Peers: peerStrings, + BlocksReceived: uint64(numObjects), + DataReceived: 0, + DupBlksReceived: 0, + DupDataReceived: 0, + MessagesReceived: uint64(numObjects), + BlocksSent: 0, + DataSent: 0, + ProvideBufLen: 0, + } + if err := json.NewEncoder(w).Encode(resp); err != nil { + log.Errorw("failed to encode response to stats bitswap", "err", err) + } + }) + + // https://docs.ipfs.tech/reference/kubo/rpc/#api-v0-stats-bw + mux.HandleFunc("/api/v0/stats/bw", func(w http.ResponseWriter, r *http.Request) { + //Get NumObjects + numObjects := 0 + results, err := p.ds.Query(r.Context(), query.Query{ + KeysOnly: true, + }) + if err != nil { + log.Errorw("failed to query datastore", "err", err) + http.Error(w, "internal error while querying datastore: "+err.Error(), http.StatusInternalServerError) + return + } + for result := range results.Next() { + if result.Error != nil { + log.Errorw("failed to traverse results", "err", err) + http.Error(w, "internal error while traversing datastore results: "+err.Error(), http.StatusInternalServerError) + return + } + numObjects = numObjects + 1 + } + + resp := StatsBw{ + TotalIn: int64(numObjects), + TotalOut: 0, + RateIn: 0, + RateOut: 0, + } + if err := json.NewEncoder(w).Encode(resp); err != nil { + log.Errorw("failed to encode response to stats bw", "err", err) + } + }) + + // https://docs.ipfs.tech/reference/kubo/rpc/#api-v0-bitswap-ledger + mux.HandleFunc("/api/v0/bitswap/ledger", func(w http.ResponseWriter, r *http.Request) { + + resp := PeerStats{ + Exchanged: 0, + Peer: p.h.ID().String(), + Recv: 0, + Sent: 0, + Value: 0, + } + if err := json.NewEncoder(w).Encode(resp); err != nil { + log.Errorw("failed to encode response to bitswap ledger", "err", err) + } + }) + + mux.HandleFunc("/", notFoundHandler) + + return mux +} diff --git a/blox/options.go b/blox/options.go index 899a5b98..fc59750c 100644 --- a/blox/options.go +++ b/blox/options.go @@ -24,6 +24,7 @@ type ( h host.Host name string topicName string + storeDir string announceInterval time.Duration ds datastore.Batching ls *ipld.LinkSystem @@ -102,6 +103,15 @@ func WithPoolName(n string) Option { } } +// WithStoreDir sets a the store directory we are using for datastore +// Required. +func WithStoreDir(n string) Option { + return func(o *options) error { + o.storeDir = n + return nil + } +} + // WithTopicName sets the name of the topic onto which announcements are made. // Defaults to "/explore.fula/pools/" if unset. // See: WithPoolName. diff --git a/cmd/blox/main.go b/cmd/blox/main.go index 9077ade5..d3084f71 100644 --- a/cmd/blox/main.go +++ b/cmd/blox/main.go @@ -390,6 +390,7 @@ func action(ctx *cli.Context) error { blox.WithHost(h), blox.WithDatastore(ds), blox.WithPoolName(app.config.PoolName), + blox.WithStoreDir(app.config.StoreDir), blox.WithExchangeOpts( exchange.WithUpdateConfig(updateConfig), exchange.WithAuthorizer(authorizer), diff --git a/wap/pkg/wifi/properties.go b/wap/pkg/wifi/properties.go index 0fe5153a..e1ddfde1 100644 --- a/wap/pkg/wifi/properties.go +++ b/wap/pkg/wifi/properties.go @@ -107,10 +107,25 @@ func GetBloxFreeSpace() (BloxFreeSpaceResponse, error) { } deviceCount, errCount := strconv.Atoi(parts[0]) - size, errSize := strconv.ParseFloat(parts[1], 32) - used, errUsed := strconv.ParseFloat(parts[2], 32) - avail, errAvail := strconv.ParseFloat(parts[3], 32) - usedPercentage, errUsedPercentage := strconv.ParseFloat(parts[4], 32) + sizeStr, usedStr, availStr, usedPercentageStr := parts[1], parts[2], parts[3], parts[4] + + if sizeStr == "" { + sizeStr = "0" + } + if usedStr == "" { + usedStr = "0" + } + if availStr == "" { + availStr = "0" + } + if usedPercentageStr == "" { + usedPercentageStr = "0" + } + + size, errSize := strconv.ParseFloat(sizeStr, 32) + used, errUsed := strconv.ParseFloat(usedStr, 32) + avail, errAvail := strconv.ParseFloat(availStr, 32) + usedPercentage, errUsedPercentage := strconv.ParseFloat(usedPercentageStr, 32) var errors []string if errCount != nil {