Skip to content

Commit

Permalink
added mechanism for host-pool
Browse files Browse the repository at this point in the history
  • Loading branch information
ehsan6sha committed Feb 29, 2024
1 parent 0a547da commit 975070f
Show file tree
Hide file tree
Showing 10 changed files with 266 additions and 60 deletions.
109 changes: 103 additions & 6 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/functionland/go-fula/common"
"github.com/functionland/go-fula/ping"
wifi "github.com/functionland/go-fula/wap/pkg/wifi"
ipfsClusterClientApi "github.com/ipfs-cluster/ipfs-cluster/api"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
gostream "github.com/libp2p/go-libp2p-gostream"
"github.com/libp2p/go-libp2p/core/host"
Expand Down Expand Up @@ -408,6 +410,9 @@ func (bl *FxBlockchain) serve(w http.ResponseWriter, r *http.Request) {
actionManifestRemoveStored: func(from peer.ID, w http.ResponseWriter, r *http.Request) {
bl.handleAction(http.MethodPost, actionManifestRemoveStored, from, w, r)
},
actionReplicateInPool: func(from peer.ID, w http.ResponseWriter, r *http.Request) {
bl.handleReplicateInPool(http.MethodPost, actionReplicateInPool, from, w, r)
},
actionAuth: func(from peer.ID, w http.ResponseWriter, r *http.Request) {
bl.handleAuthorization(from, w, r)
},
Expand Down Expand Up @@ -518,6 +523,99 @@ func (bl *FxBlockchain) handleAction(method string, action string, from peer.ID,
}
}

func (bl *FxBlockchain) handleReplicateInPool(method string, action string, from peer.ID, w http.ResponseWriter, r *http.Request) {
log := log.With("action", action, "from", from)
var req ReplicateRequest
var res ReplicateResponse
var poolRes []string

defer r.Body.Close()

if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
log.Debug("cannot parse request body: %v", err)
http.Error(w, "", http.StatusBadRequest)
return
}

//TODO: Ensure it is optimized for long-running calls
ctx, cancel := context.WithTimeout(r.Context(), time.Second*time.Duration(bl.timeout))
defer cancel()
response, statusCode, err := bl.callBlockchain(ctx, method, actionManifestAvailableBatch, req)
if err != nil {
log.Error("failed to call blockchain: %v", err)
w.WriteHeader(statusCode)
// Try to parse the error and format it as JSON
var errMsg map[string]interface{}
if jsonErr := json.Unmarshal(response, &errMsg); jsonErr != nil {
// If the response isn't JSON or can't be parsed, use a generic message
errMsg = map[string]interface{}{
"message": "An error occurred",
"description": err.Error(),
}
}
json.NewEncoder(w).Encode(errMsg)
return
}
// If status code is not 200, attempt to format the response as JSON
if statusCode != http.StatusOK {
w.WriteHeader(statusCode)
var errMsg map[string]interface{}
if jsonErr := json.Unmarshal(response, &errMsg); jsonErr == nil {
// If it's already a JSON, write it as is
w.Write(response)
} else {
// If it's not JSON, wrap the response in the expected format
errMsg = map[string]interface{}{
"message": "Error",
"description": string(response),
}
json.NewEncoder(w).Encode(errMsg)
}
return
}

if jsonErr := json.Unmarshal(response, &res); jsonErr != nil {
// If the response isn't JSON or can't be parsed, use a generic message
w.WriteHeader(http.StatusFailedDependency)
errMsg := map[string]interface{}{
"message": "An error occurred",
"description": jsonErr.Error(),
}
json.NewEncoder(w).Encode(errMsg)
return
}

if bl.ipfsClusterApi == nil {
w.WriteHeader(http.StatusFailedDependency)
errMsg := map[string]interface{}{
"message": "An error occurred",
"description": "iipfs cluster API is nil",
}
json.NewEncoder(w).Encode(errMsg)
return
}
pCtx, pCancel := context.WithTimeout(r.Context(), time.Second*time.Duration(bl.timeout))
defer pCancel()
for i := 0; i < len(res.Manifests); i++ {
c, err := cid.Decode(res.Manifests[i].Cid)
if err != nil {
log.Errorw("Error decoding CID:", "err", err)
continue // Or handle the error appropriately
}
replicationRes, err := bl.ipfsClusterApi.Pin(pCtx, ipfsClusterClientApi.NewCid(c), ipfsClusterClientApi.PinOptions{})
if err != nil {
log.Errorw("Error pinning CID:", "err", err)
continue
}
poolRes = append(poolRes, replicationRes.Cid.Cid.String())
}

w.WriteHeader(http.StatusAccepted)
if err := json.NewEncoder(w).Encode(poolRes); err != nil {
log.Error("failed to write response: %v", err)
}
}

func (bl *FxBlockchain) handleAuthorization(from peer.ID, w http.ResponseWriter, r *http.Request) {
log := log.With("action", actionAuth, "from", from)
defer r.Body.Close()
Expand Down Expand Up @@ -1244,13 +1342,13 @@ func (bl *FxBlockchain) getClusterEndpoint(ctx context.Context, poolID int) (str
creatorClusterPeerID := poolDetails.Creator

// 4. Fetch user details
userDetails, err := bl.fetchUserDetails(ctx, poolID, creatorClusterPeerID)
userDetails, err := bl.fetchUserDetails(ctx, poolID)
if err != nil {
return "", err
}

// 5. Find the peer_id
peerID := findPeerID(creatorClusterPeerID, userDetails)
peerID := bl.findPeerID(creatorClusterPeerID, userDetails)

// 6. Save creator peer ID
err = saveCreatorPeerID(poolID, peerID)
Expand Down Expand Up @@ -1310,10 +1408,9 @@ func fetchPoolDetails(ctx context.Context, bl *FxBlockchain, poolID int) (*Pool,
return nil, fmt.Errorf("pool with ID %d not found", poolID)
}

func (bl *FxBlockchain) fetchUserDetails(ctx context.Context, poolID int, creatorClusterPeerID string) (*PoolUserListResponse, error) {
func (bl *FxBlockchain) fetchUserDetails(ctx context.Context, poolID int) (*PoolUserListResponse, error) {
req := PoolUserListRequest{
PoolID: poolID,
RequestPoolID: poolID,
PoolID: poolID,
}
action := "actionPoolUserList"

Expand All @@ -1340,7 +1437,7 @@ func (bl *FxBlockchain) fetchUserDetails(ctx context.Context, poolID int, creato
return &response, nil
}

func findPeerID(creatorClusterPeerID string, userDetails *PoolUserListResponse) string {
func (bl *FxBlockchain) findPeerID(creatorClusterPeerID string, userDetails *PoolUserListResponse) string {
for _, user := range userDetails.Users {
if user.Account == creatorClusterPeerID {
return user.PeerID
Expand Down
67 changes: 43 additions & 24 deletions blockchain/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,31 @@ import (
)

const (
actionSeeded = "account-seeded"
actionAccountExists = "account-exists"
actionAccountCreate = "account-create"
actionAccountFund = "account-fund"
actionAccountBalance = "account-balance"
actionAssetsBalance = "asset-balance"
actionTransferToMumbai = "fula-mumbai-convert_tokens"
actionTransferToGoerli = "fula-goerli-convert_tokens"
actionPoolCreate = "fula-pool-create"
actionPoolJoin = "fula-pool-join"
actionPoolCancelJoin = "fula-pool-cancel_join"
actionPoolRequests = "fula-pool-poolrequests"
actionPoolList = "fula-pool"
actionPoolUserList = "fula-pool-users"
actionPoolVote = "fula-pool-vote"
actionPoolLeave = "fula-pool-leave"
actionManifestUpload = "fula-manifest-upload"
actionManifestStore = "fula-manifest-storage"
actionManifestBatchStore = "fula-manifest-batch_storage"
actionManifestBatchUpload = "fula-manifest-batch_upload"
actionManifestAvailable = "fula-manifest-available"
actionManifestRemove = "fula-manifest-remove"
actionManifestRemoveStorer = "fula-manifest-remove_storer"
actionManifestRemoveStored = "fula-manifest-remove_storing_manifest"
actionSeeded = "account-seeded"
actionAccountExists = "account-exists"
actionAccountCreate = "account-create"
actionAccountFund = "account-fund"
actionAccountBalance = "account-balance"
actionAssetsBalance = "asset-balance"
actionTransferToMumbai = "fula-mumbai-convert_tokens"
actionTransferToGoerli = "fula-goerli-convert_tokens"
actionPoolCreate = "fula-pool-create"
actionPoolJoin = "fula-pool-join"
actionPoolCancelJoin = "fula-pool-cancel_join"
actionPoolRequests = "fula-pool-poolrequests"
actionPoolList = "fula-pool"
actionPoolUserList = "fula-pool-users"
actionPoolVote = "fula-pool-vote"
actionPoolLeave = "fula-pool-leave"
actionManifestUpload = "fula-manifest-upload"
actionManifestStore = "fula-manifest-storage"
actionManifestBatchStore = "fula-manifest-batch_storage"
actionManifestBatchUpload = "fula-manifest-batch_upload"
actionManifestAvailable = "fula-manifest-available"
actionManifestRemove = "fula-manifest-remove"
actionManifestRemoveStorer = "fula-manifest-remove_storer"
actionManifestRemoveStored = "fula-manifest-remove_storing_manifest"
actionManifestAvailableBatch = "fula-manifest-available_batch"

//Hardware
actionBloxFreeSpace = "blox-free-space"
Expand All @@ -48,8 +49,26 @@ const (
actionFetchContainerLogs = "fetch-container-logs"
actionGetFolderSize = "get-folder-size"
actionGetDatastoreSize = "get-datastore-size"

// Cluster
actionReplicateInPool = "replicate"
)

type ReplicateRequest struct {
Cids []string `json:"cids"`
Account string `json:"uploader"`
PoolID int `json:"pool_id"`
}

type ReplicateResponse struct {
Manifests []BatchManifest `json:"manifests"`
}

type BatchManifest struct {
Cid string `json:"cid"`
ReplicationAvailable int `json:"replication_available"`
}

type LinkWithLimit struct {
Link ipld.Link
Limit int
Expand Down
10 changes: 10 additions & 0 deletions blockchain/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"sync"
"time"

ipfsCluster "github.com/ipfs-cluster/ipfs-cluster/api/rest/client"
"github.com/ipfs/kubo/client/rpc"
"github.com/libp2p/go-libp2p/core/peer"
)
Expand All @@ -25,6 +26,7 @@ type (
updatePoolName func(string) error
fetchFrequency time.Duration //Hours that it should update the list of pool users and pool requests if not called through pubsub
rpc *rpc.HttpApi
ipfsClusterApi ipfsCluster.Client
}
)

Expand All @@ -47,6 +49,7 @@ func newOptions(o ...Option) (*options, error) {
updatePoolName: defaultUpdatePoolName, // set a default function or leave nil
fetchFrequency: time.Hour * 1, // default frequency, e.g., 1 hour
rpc: nil,
ipfsClusterApi: nil,
}
for _, apply := range o {
if err := apply(&opts); err != nil {
Expand Down Expand Up @@ -115,6 +118,13 @@ func WithMinSuccessPingCount(sr int) Option {
}
}

func WithIpfsClusterAPI(n ipfsCluster.Client) Option {
return func(o *options) error {
o.ipfsClusterApi = n
return nil
}
}

func WithMaxPingTime(t int) Option {
return func(o *options) error {
o.maxPingTime = t
Expand Down
1 change: 1 addition & 0 deletions blox/blox.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func New(o ...Option) (*Blox, error) {
blockchain.WithMaxPingTime(p.maxPingTime),
blockchain.WithIpfsClient(p.rpc),
blockchain.WithMinSuccessPingCount(p.minSuccessRate*p.pingCount/100),
blockchain.WithIpfsClusterAPI(p.ipfsClusterApi),
)
if err != nil {
return nil, err
Expand Down
17 changes: 17 additions & 0 deletions blox/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/functionland/go-fula/exchange"
ipfsCluster "github.com/ipfs-cluster/ipfs-cluster/api/rest/client"
"github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/ipfs/kubo/client/rpc"
Expand Down Expand Up @@ -41,10 +42,12 @@ type (
minSuccessRate int
blockchainEndpoint string
secretsPath string
poolHostMode bool
IPFShttpServer *http.Server
DefaultIPFShttpServer string
wg *sync.WaitGroup
rpc *rpc.HttpApi
ipfsClusterApi ipfsCluster.Client
}
)

Expand Down Expand Up @@ -173,6 +176,20 @@ func WithIpfsClient(n *rpc.HttpApi) Option {
}
}

func WithPoolHostMode(n bool) Option {
return func(o *options) error {
o.poolHostMode = n
return nil
}
}

func WithIpfsClusterAPI(n ipfsCluster.Client) Option {
return func(o *options) error {
o.ipfsClusterApi = n
return nil
}
}

func WithLinkSystem(ls *ipld.LinkSystem) Option {
return func(o *options) error {
o.ls = ls
Expand Down
Loading

0 comments on commit 975070f

Please sign in to comment.