Skip to content

Commit

Permalink
Update main.go
Browse files Browse the repository at this point in the history
  • Loading branch information
ehsan6sha committed Feb 20, 2024
1 parent d57a86d commit 061652a
Showing 1 changed file with 145 additions and 86 deletions.
231 changes: 145 additions & 86 deletions cmd/blox/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"fmt"
"io"
"log"
"mime/multipart"
"net"
"net/http"
"os"
Expand All @@ -24,41 +25,34 @@ import (

"github.com/functionland/go-fula/blox"
"github.com/functionland/go-fula/exchange"
ipfsPath "github.com/ipfs/boxo/path"
blockformat "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
badger "github.com/ipfs/go-ds-badger"
logging "github.com/ipfs/go-log/v2"
oldcmds "github.com/ipfs/kubo/commands"
config "github.com/ipfs/kubo/config"
core "github.com/ipfs/kubo/core"
coreapi "github.com/ipfs/kubo/core/coreapi"
"github.com/ipfs/kubo/core/corehttp"
iface "github.com/ipfs/kubo/core/coreiface"
kubolibp2p "github.com/ipfs/kubo/core/node/libp2p"
"github.com/ipfs/kubo/repo"
"github.com/ipfs/kubo/repo/fsrepo"
ipld "github.com/ipld/go-ipld-prime"
linking "github.com/ipld/go-ipld-prime/linking"
"github.com/ipld/go-ipld-prime/linking"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipni/index-provider/engine"
goprocess "github.com/jbenet/goprocess"
"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/host/autorelay"
sockets "github.com/libp2p/go-socket-activation"
"github.com/mdp/qrterminal"
"github.com/multiformats/go-multiaddr"

Check failure on line 52 in cmd/blox/main.go

View workflow job for this annotation

GitHub Actions / All

package "github.com/multiformats/go-multiaddr" is being imported more than once (ST1019)
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/multiformats/go-multicodec"
bip39 "github.com/tyler-smith/go-bip39"
"github.com/urfave/cli/v2"
"github.com/urfave/cli/v2/altsrc"
Expand Down Expand Up @@ -86,8 +80,9 @@ type Child struct {
}

var (
logger = logging.Logger("fula/cmd/blox")
app struct {
logger = logging.Logger("fula/cmd/blox")
baseURL = "http://localhost:5002"
app struct {
cli.App
initOnly bool
blockchainEndpoint string
Expand Down Expand Up @@ -624,56 +619,121 @@ func CustomHostOption(h host.Host) kubolibp2p.HostOption {
}
}

// CustomStorageWriteOpener creates a StorageWriteOpener using the IPFS blockstore
func CustomStorageWriteOpener(ipfsNode *core.IpfsNode) linking.BlockWriteOpener {
return func(lnkCtx linking.LinkContext) (io.Writer, linking.BlockWriteCommitter, error) {
// The returned io.Writer is where the data will be written.
// The BlockWriteCommitter function will be called to "commit" the data once writing is done.
var buffer bytes.Buffer
committer := func(lnk ipld.Link) error {
// Convert the IPLD link to a CID.
c, err := cid.Parse(lnk.String())
if err != nil {
return err
}
func CustomStorageReadOpener(ctx ipld.LinkContext, lnk ipld.Link) (io.Reader, error) {
cidStr := lnk.String()
url := fmt.Sprintf("%s/api/v0/block/get?arg=%s", baseURL, cidStr)

// Create an IPFS block with the data from the buffer and the CID.
block, err := blockformat.NewBlockWithCid(buffer.Bytes(), c)
if err != nil {
return err
}
logger.Debugw("block was created with cid", "block", block, "cid", c)
// Create a new request with POST method
req, err := http.NewRequest("POST", url, nil)
if err != nil {
return nil, err
}

// Store the block using the IPFS node's blockstore.
err = ipfsNode.Blockstore.Put(context.Background(), block)
if err != nil {
logger.Errorw("Error in ipfs store", "block", block, "cid", c)
}
return err
}
// Execute the request
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}

return &buffer, committer, nil
// Check for HTTP error status codes
if resp.StatusCode != http.StatusOK {
resp.Body.Close() // Ensure to close the body to prevent resource leaks
return nil, fmt.Errorf("failed to get block: CID %s, HTTP status %d", cidStr, resp.StatusCode)
}

// The response body contains the block data
return resp.Body, nil
}

// CustomWriter captures written data and allows it to be retrieved.
type CustomWriter struct {
Buffer *bytes.Buffer
}

// NewCustomWriter creates a new CustomWriter instance.
func NewCustomWriter() *CustomWriter {
return &CustomWriter{
Buffer: new(bytes.Buffer),
}
}

// Write captures data written to the writer.
func (cw *CustomWriter) Write(p []byte) (n int, err error) {
return cw.Buffer.Write(p)
}

func CustomStorageReadOpener(ipfsApi iface.CoreAPI) ipld.BlockReadOpener {
return func(lnkCtx ipld.LinkContext, lnk ipld.Link) (io.Reader, error) {
// CustomStorageWriteOpener opens a block for writing to an external IPFS node via HTTP API.
func CustomStorageWriteOpener(lctx linking.LinkContext) (io.Writer, ipld.BlockWriteCommitter, error) {
cw := NewCustomWriter()

committer := func(lnk ipld.Link) error {
cidLink, ok := lnk.(cidlink.Link)
if !ok {
return nil, fmt.Errorf("link is not a cid link")
logger.Errorf("link is not a CID link")
return fmt.Errorf("link is not a CID link")
}
// Convert the CID link to a path
p, err := ipfsPath.NewPath("/ipfs/" + cidLink.Cid.String())

c := cidLink.Cid
cidCodec := c.Type()
mhType := c.Prefix().MhType

// Dynamically construct the URL with the cid-codec and mhtype
url := fmt.Sprintf("%s/api/v0/block/put?cid-codec=%s&mhtype=%s&mhlen=-1&pin=false&allow-big-block=false", baseURL, multicodec.Code(cidCodec), multicodec.Code(mhType))

formData := &bytes.Buffer{}
writer := multipart.NewWriter(formData)
part, err := writer.CreateFormFile("file", "block.data")
if err != nil {
return nil, err
return err
}

if _, err := part.Write(cw.Buffer.Bytes()); err != nil {
return err
}
// Use the Block API's Get method to obtain a reader for the block's data
reader, err := ipfsApi.Block().Get(context.Background(), p)

if err := writer.Close(); err != nil {
return err
}

req, reqErr := http.NewRequestWithContext(lctx.Ctx, "POST", url, formData)
if reqErr != nil {
logger.Errorf("Failed to NewRequestWithContext: %s", reqErr)
return reqErr
}

// Set the Content-Type header to multipart/form-data with the boundary parameter
req.Header.Set("Content-Type", writer.FormDataContentType())

client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, err
return err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("HTTP request failed: status %d", resp.StatusCode)
}

// Optionally read response to get the CID from the response if necessary
var respData struct {
Key string `json:"Key"`
Size int `json:"Size"`
}
if err := json.NewDecoder(resp.Body).Decode(&respData); err != nil {
logger.Errorf("Failed to NewDecoder: %s", err)
return err
}

// Verify the CID matches if necessary
if respData.Key != c.String() {
return fmt.Errorf("mismatched CIDs: expected %s, got %s", c.String(), respData.Key)
}
return reader, nil
logger.Debugw("Write cid finished", "cid", c)
return nil
}

return cw, committer, nil
}

// defaultMux tells mux to serve path using the default muxer. This is
Expand Down Expand Up @@ -960,39 +1020,38 @@ func action(ctx *cli.Context) error {
}*/

dirPath := filepath.Dir(app.configPath)
repo, err := CreateCustomRepo(ctx2, dirPath, h, &badger.DefaultOptions, app.config.StoreDir, "90%")
if err != nil {
logger.Fatal(err)
return err
}

ipfsConfig := &core.BuildCfg{
Online: true,
Permanent: true,
Host: CustomHostOption(h),
Routing: kubolibp2p.DHTOption,
Repo: repo,
}
const useDefaultIPFSServer = true
if !useDefaultIPFSServer {
repo, err := CreateCustomRepo(ctx2, dirPath, h, &badger.DefaultOptions, app.config.StoreDir, "90%")
if err != nil {
logger.Fatal(err)
return err
}

ipfsNode, err := core.NewNode(ctx2, ipfsConfig)
if err != nil {
logger.Fatal(err)
return err
}
ipfsHostId := ipfsNode.PeerHost.ID()
ipfsId := ipfsNode.Identity.String()
logger.Infow("ipfscore successfully instantiated", "host", ipfsHostId, "peer", ipfsId)
ipfsAPI, err := coreapi.NewCoreAPI(ipfsNode)
if err != nil {
panic(fmt.Errorf("failed to create IPFS API: %w", err))
}
if ipfsAPI != nil {
logger.Info("ipfscoreapi successfully instantiated")
}
ipfsConfig := &core.BuildCfg{
Online: true,
Permanent: true,
Host: CustomHostOption(h),
Routing: kubolibp2p.DHTOption,
Repo: repo,
}

const useDefaultIPFSServer = false
ipfsNode, err := core.NewNode(ctx2, ipfsConfig)
if err != nil {
logger.Fatal(err)
return err
}
ipfsHostId := ipfsNode.PeerHost.ID()
ipfsId := ipfsNode.Identity.String()
logger.Infow("ipfscore successfully instantiated", "host", ipfsHostId, "peer", ipfsId)
ipfsAPI, err := coreapi.NewCoreAPI(ipfsNode)
if err != nil {
panic(fmt.Errorf("failed to create IPFS API: %w", err))
}
if ipfsAPI != nil {
logger.Info("ipfscoreapi successfully instantiated")
}

if !useDefaultIPFSServer {
ipfsNode.IsDaemon = true
logger.Debug("called wg.Add in blox start")
logger.Debugw("ipfs started", "condifg path", dirPath)
Expand Down Expand Up @@ -1047,15 +1106,15 @@ func action(ctx *cli.Context) error {
}()
}

ds := ipfsNode.Repo.Datastore()
//ds := ipfsNode.Repo.Datastore()
linkSystem := cidlink.DefaultLinkSystem()
linkSystem.StorageReadOpener = CustomStorageReadOpener(ipfsAPI)
linkSystem.StorageWriteOpener = CustomStorageWriteOpener(ipfsNode)
linkSystem.StorageReadOpener = CustomStorageReadOpener
linkSystem.StorageWriteOpener = CustomStorageWriteOpener

bb, err := blox.New(
blox.WithHost(h),
blox.WithWg(&wg),
blox.WithDatastore(ds),
//blox.WithDatastore(ds),
blox.WithLinkSystem(&linkSystem),
blox.WithPoolName(app.config.PoolName),
blox.WithTopicName(app.config.PoolName),
Expand All @@ -1069,7 +1128,7 @@ func action(ctx *cli.Context) error {
blox.WithExchangeOpts(
exchange.WithUpdateConfig(updateConfig),
exchange.WithWg(&wg),
exchange.WithIPFSApi(ipfsAPI),
//exchange.WithIPFSApi(ipfsAPI),
exchange.WithAuthorizer(authorizer),
exchange.WithAuthorizedPeers(authorizedPeers),
exchange.WithAllowTransientConnection(app.config.AllowTransientConnection),
Expand All @@ -1079,17 +1138,17 @@ func action(ctx *cli.Context) error {
exchange.WithIpniGetEndPoint("https://cid.contact/cid/"),
exchange.WithIpniProviderEngineOptions(
engine.WithHost(ipnih),
engine.WithDatastore(namespace.Wrap(ds, datastore.NewKey("ipni/ads"))),
//engine.WithDatastore(namespace.Wrap(ds, datastore.NewKey("ipni/ads"))),
engine.WithPublisherKind(engine.DataTransferPublisher),
engine.WithDirectAnnounce(app.config.IpniPublishDirectAnnounce...),
),
exchange.WithDhtProviderOptions(
dht.Datastore(namespace.Wrap(ds, datastore.NewKey("dht"))),
/*exchange.WithDhtProviderOptions(
//dht.Datastore(namespace.Wrap(ds, datastore.NewKey("dht"))),
dht.ProtocolExtension(protocol.ID("/"+app.config.PoolName)),
dht.ProtocolPrefix("/fula"),
dht.Resiliency(1),
dht.Mode(dht.ModeAutoServer),
),
),*/
),
)
if err != nil {
Expand Down

0 comments on commit 061652a

Please sign in to comment.