From 8c56c92d3eeccc2844e5bb9c3f812858faa0c04d Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Sun, 6 Oct 2024 21:55:00 -0400 Subject: [PATCH] Added plugins control --- Fula.podspec | 2 +- blockchain/bl_plugins.go | 238 +++++++++++++++++++++++++++++++++++++++ blockchain/blockchain.go | 14 +++ blockchain/interface.go | 65 +++++++++++ mobile/blockchain.go | 24 ++++ mobile/example_test.go | 101 +++++++++++++++++ 6 files changed, 443 insertions(+), 1 deletion(-) create mode 100644 blockchain/bl_plugins.go diff --git a/Fula.podspec b/Fula.podspec index dd5723f..045d0fa 100644 --- a/Fula.podspec +++ b/Fula.podspec @@ -1,6 +1,6 @@ Pod::Spec.new do |s| s.name = 'Fula' # Name for your pod - s.version = '1.54.10' + s.version = '1.54.12' s.summary = 'Go-fula for iOS' s.homepage = 'https://github.com/functionland/go-fula' diff --git a/blockchain/bl_plugins.go b/blockchain/bl_plugins.go new file mode 100644 index 0000000..99905ea --- /dev/null +++ b/blockchain/bl_plugins.go @@ -0,0 +1,238 @@ +package blockchain + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "os" + "os/exec" + "strings" + + "github.com/libp2p/go-libp2p/core/peer" +) + +const activePluginsFile = "/internal/active-plugins.txt" + +type PluginInfo struct { + Name string `json:"name"` + Description string `json:"description"` + Version string `json:"version"` + Usage struct { + Storage string `json:"storage"` + Compute string `json:"compute"` + Bandwidth string `json:"bandwidth"` + RAM string `json:"ram"` + GPU string `json:"gpu"` + } `json:"usage"` + Rewards []map[string]string `json:"rewards"` + Socials []map[string]string `json:"socials"` + Approved bool `json:"approved"` + Installed bool `json:"installed"` +} + +func (bl *FxBlockchain) ListPlugins(ctx context.Context) ([]byte, error) { + // Fetch the list of plugins + resp, err := http.Get("https://raw.githubusercontent.com/functionland/fula-ota/refs/heads/main/docker/fxsupport/linux/plugins/info.json") + if err != nil { + return nil, fmt.Errorf("failed to fetch plugin list: %w", err) + } + defer resp.Body.Close() + + var pluginList []struct { + Name string `json:"name"` + } + if err := json.NewDecoder(resp.Body).Decode(&pluginList); err != nil { + return nil, fmt.Errorf("failed to decode plugin list: %w", err) + } + + // Read active plugins + activePlugins, err := bl.readActivePlugins() + if err != nil { + return nil, fmt.Errorf("failed to read active plugins: %w", err) + } + + var detailedPlugins []PluginInfo + for _, plugin := range pluginList { + // Fetch detailed info for each plugin + detailResp, err := http.Get(fmt.Sprintf("https://raw.githubusercontent.com/functionland/fula-ota/refs/heads/main/docker/fxsupport/linux/plugins/%s/info.json", plugin.Name)) + if err != nil { + return nil, fmt.Errorf("failed to fetch details for plugin %s: %w", plugin.Name, err) + } + defer detailResp.Body.Close() + + var pluginInfo PluginInfo + if err := json.NewDecoder(detailResp.Body).Decode(&pluginInfo); err != nil { + return nil, fmt.Errorf("failed to decode details for plugin %s: %w", plugin.Name, err) + } + + // Check if the plugin is installed + pluginInfo.Installed = contains(activePlugins, plugin.Name) + + detailedPlugins = append(detailedPlugins, pluginInfo) + } + + return json.Marshal(detailedPlugins) +} + +func (bl *FxBlockchain) InstallPlugin(ctx context.Context, pluginName string) ([]byte, error) { + // Read existing plugins + plugins, err := bl.readActivePlugins() + if err != nil { + return nil, err + } + + // Check if plugin already exists + for _, p := range plugins { + if p == pluginName { + return []byte("Plugin already installed"), nil + } + } + + // Append new plugin + plugins = append(plugins, pluginName) + + // Write updated list back to file + if err := bl.writeActivePlugins(plugins); err != nil { + return nil, err + } + + return []byte("Plugin installed successfully"), nil +} + +func (bl *FxBlockchain) UninstallPlugin(ctx context.Context, pluginName string) ([]byte, error) { + // Read existing plugins + plugins, err := bl.readActivePlugins() + if err != nil { + return nil, err + } + + // Remove the plugin if it exists + var newPlugins []string + found := false + for _, p := range plugins { + if p != pluginName { + newPlugins = append(newPlugins, p) + } else { + found = true + } + } + + if !found { + return []byte("Plugin not found"), nil + } + + // Write updated list back to file + if err := bl.writeActivePlugins(newPlugins); err != nil { + return nil, err + } + + return []byte("Plugin uninstalled successfully"), nil +} + +func (bl *FxBlockchain) readActivePlugins() ([]string, error) { + content, err := os.ReadFile(activePluginsFile) + if err != nil { + if os.IsNotExist(err) { + return []string{}, nil + } + return nil, fmt.Errorf("failed to read active plugins file: %w", err) + } + return strings.Split(strings.TrimSpace(string(content)), "\n"), nil +} + +func (bl *FxBlockchain) writeActivePlugins(plugins []string) error { + content := strings.Join(plugins, "\n") + if err := os.WriteFile(activePluginsFile, []byte(content), 0644); err != nil { + return fmt.Errorf("failed to write active plugins file: %w", err) + } + return nil +} + +func (bl *FxBlockchain) ShowPluginStatus(ctx context.Context, pluginName string, lines int) ([]byte, error) { + status, err := bl.showPluginStatusImpl(ctx, pluginName, lines) + if err != nil { + return nil, err + } + return json.Marshal(status) +} + +func (bl *FxBlockchain) showPluginStatusImpl(ctx context.Context, pluginName string, lines int) ([]string, error) { + args := []string{"docker", "logs"} + + if lines > 0 { + args = append(args, "--tail", fmt.Sprintf("%d", lines)) + } + + args = append(args, pluginName) + + cmd := exec.CommandContext(ctx, "sudo", args...) + output, err := cmd.Output() + if err != nil { + if exitErr, ok := err.(*exec.ExitError); ok { + return nil, fmt.Errorf("failed to get logs for plugin %s: %w\nStderr: %s", pluginName, err, string(exitErr.Stderr)) + } + return nil, fmt.Errorf("failed to execute docker logs for plugin %s: %w", pluginName, err) + } + + rawLines := strings.Split(strings.TrimSpace(string(output)), "\n") + var formattedLines []string + for _, line := range rawLines { + select { + case <-ctx.Done(): + return formattedLines, ctx.Err() + default: + formattedLines = append(formattedLines, line) + } + } + + if len(formattedLines) == 0 { + return nil, fmt.Errorf("no log output for plugin %s", pluginName) + } + + return formattedLines, nil +} + +func (bl *FxBlockchain) handlePluginAction(ctx context.Context, from peer.ID, w http.ResponseWriter, r *http.Request, action string) { + var req struct { + PluginName string `json:"plugin_name"` + Lines int `json:"lines,omitempty"` + Follow bool `json:"follow,omitempty"` + } + + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, fmt.Sprintf("Invalid request body: %v", err), http.StatusBadRequest) + return + } + + if req.PluginName == "" && action != "list" { + http.Error(w, "Plugin name is required", http.StatusBadRequest) + return + } + + var result []byte + var err error + + switch action { + case "list": + result, err = bl.ListPlugins(ctx) + case "install": + result, err = bl.InstallPlugin(ctx, req.PluginName) + case "uninstall": + result, err = bl.UninstallPlugin(ctx, req.PluginName) + case "status": + result, err = bl.ShowPluginStatus(ctx, req.PluginName, req.Lines) + default: + http.Error(w, "Invalid action", http.StatusBadRequest) + return + } + + if err != nil { + log.Errorf("Error in plugin action %s for %s: %v", action, req.PluginName, err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + w.Write(result) +} diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index dd4ef94..47bd237 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -540,6 +540,20 @@ func (bl *FxBlockchain) serve(w http.ResponseWriter, r *http.Request) { actionGetDatastoreSize: func(from peer.ID, w http.ResponseWriter, r *http.Request) { bl.handleGetDatastoreSize(r.Context(), from, w, r) }, + + // Plugin actions + actionListPlugins: func(from peer.ID, w http.ResponseWriter, r *http.Request) { + bl.handlePluginAction(r.Context(), from, w, r, actionListPlugins) + }, + actionInstallPlugin: func(from peer.ID, w http.ResponseWriter, r *http.Request) { + bl.handlePluginAction(r.Context(), from, w, r, actionInstallPlugin) + }, + actionUninstallPlugin: func(from peer.ID, w http.ResponseWriter, r *http.Request) { + bl.handlePluginAction(r.Context(), from, w, r, actionUninstallPlugin) + }, + actionShowPluginStatus: func(from peer.ID, w http.ResponseWriter, r *http.Request) { + bl.handlePluginAction(r.Context(), from, w, r, actionShowPluginStatus) + }, } // Look up the function in the map and call it diff --git a/blockchain/interface.go b/blockchain/interface.go index cccbca4..43129fe 100644 --- a/blockchain/interface.go +++ b/blockchain/interface.go @@ -54,6 +54,12 @@ const ( // Cluster actionReplicateInPool = "replicate" + + // Plugins + actionListPlugins = "list-plugins" + actionInstallPlugin = "install-plugin" + actionUninstallPlugin = "uninstall-plugin" + actionShowPluginStatus = "show-plugin-status" ) type ReplicateRequest struct { @@ -385,6 +391,47 @@ type ManifestRemoveStoredResponse struct { PoolID int `json:"pool_id"` } +// Plugins +// Plugin structures + +// ListPlugins +type ListPluginsRequest struct{} + +type ListPluginsResponse struct { + Plugins []PluginInfo `json:"plugins"` +} + +// InstallPlugin +type InstallPluginRequest struct { + PluginName string `json:"plugin_name"` +} + +type InstallPluginResponse struct { + Success bool `json:"success"` + Message string `json:"message"` +} + +// UninstallPlugin +type UninstallPluginRequest struct { + PluginName string `json:"plugin_name"` +} + +type UninstallPluginResponse struct { + Success bool `json:"success"` + Message string `json:"message"` +} + +// ShowPluginStatus +type ShowPluginStatusRequest struct { + PluginName string `json:"plugin_name"` + Lines int `json:"lines"` + Follow bool `json:"follow"` +} + +type ShowPluginStatusResponse struct { + Status []string `json:"status"` +} + type Blockchain interface { Seeded(context.Context, peer.ID, SeededRequest) ([]byte, error) AccountExists(context.Context, peer.ID, AccountExistsRequest) ([]byte, error) @@ -426,6 +473,12 @@ type Blockchain interface { FindBestAndTargetInLogs(context.Context, peer.ID, wifi.FindBestAndTargetInLogsRequest) ([]byte, error) GetFolderSize(context.Context, peer.ID, wifi.GetFolderSizeRequest) ([]byte, error) GetDatastoreSize(context.Context, peer.ID, wifi.GetDatastoreSizeRequest) ([]byte, error) + + //Plugins + ListPlugins(context.Context) ([]byte, error) + InstallPlugin(context.Context, string) ([]byte, error) + UninstallPlugin(context.Context, string) ([]byte, error) + ShowPluginStatus(context.Context, string, int) ([]byte, error) } var requestTypes = map[string]reflect.Type{ @@ -470,6 +523,12 @@ var requestTypes = map[string]reflect.Type{ actionFindBestAndTargetInLogs: reflect.TypeOf(wifi.FindBestAndTargetInLogsRequest{}), actionGetFolderSize: reflect.TypeOf(wifi.GetFolderSizeRequest{}), actionGetDatastoreSize: reflect.TypeOf(wifi.GetDatastoreSizeRequest{}), + + // Plugins + actionListPlugins: reflect.TypeOf(ListPluginsRequest{}), + actionInstallPlugin: reflect.TypeOf(InstallPluginRequest{}), + actionUninstallPlugin: reflect.TypeOf(UninstallPluginRequest{}), + actionShowPluginStatus: reflect.TypeOf(ShowPluginStatusRequest{}), } var responseTypes = map[string]reflect.Type{ @@ -514,4 +573,10 @@ var responseTypes = map[string]reflect.Type{ actionFindBestAndTargetInLogs: reflect.TypeOf(wifi.FindBestAndTargetInLogsResponse{}), actionGetFolderSize: reflect.TypeOf(wifi.GetFolderSizeResponse{}), actionGetDatastoreSize: reflect.TypeOf(wifi.GetDatastoreSizeResponse{}), + + // Plugins + actionListPlugins: reflect.TypeOf(ListPluginsResponse{}), + actionInstallPlugin: reflect.TypeOf(InstallPluginResponse{}), + actionUninstallPlugin: reflect.TypeOf(UninstallPluginResponse{}), + actionShowPluginStatus: reflect.TypeOf(ShowPluginStatusResponse{}), } diff --git a/mobile/blockchain.go b/mobile/blockchain.go index 3826633..a2aff46 100644 --- a/mobile/blockchain.go +++ b/mobile/blockchain.go @@ -269,3 +269,27 @@ func (c *Client) GetDatastoreSize() ([]byte, error) { ctx := context.TODO() return c.bl.GetDatastoreSize(ctx, c.bloxPid, wifi.GetDatastoreSizeRequest{}) } + +// ListPlugins requests the blox to list all available plugins +func (c *Client) ListPlugins() ([]byte, error) { + ctx := context.TODO() + return c.bl.ListPlugins(ctx) +} + +// InstallPlugin requests the blox to install a specific plugin +func (c *Client) InstallPlugin(pluginName string) ([]byte, error) { + ctx := context.TODO() + return c.bl.InstallPlugin(ctx, pluginName) +} + +// UninstallPlugin requests the blox to uninstall a specific plugin +func (c *Client) UninstallPlugin(pluginName string) ([]byte, error) { + ctx := context.TODO() + return c.bl.UninstallPlugin(ctx, pluginName) +} + +// ShowPluginStatus requests the status of a specific plugin +func (c *Client) ShowPluginStatus(pluginName string, lines int) ([]byte, error) { + ctx := context.TODO() + return c.bl.ShowPluginStatus(ctx, pluginName, lines) +} diff --git a/mobile/example_test.go b/mobile/example_test.go index 539e99b..5fe7979 100644 --- a/mobile/example_test.go +++ b/mobile/example_test.go @@ -852,3 +852,104 @@ func Example_blockchainCalls() { // Instantiated node in pool 1 with ID: 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM // account is {"account":"5GEottBB4kGzpN6imRwpnhyVDtKMTSYHYZvT4Rq93dchjN45"} } + +func Example_listPlugins() { + server := startMockServer("127.0.0.1:4004") + defer func() { + // Shutdown the server after test + if err := server.Shutdown(context.Background()); err != nil { + log.Error("Error happened in server.Shutdown") + panic(err) // Handle the error as you see fit + } + }() + + const poolName = "1" + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + // Elevate log level to show internal communications. + if err := logging.SetLogLevel("*", "info"); err != nil { + log.Error("Error happened in logging.SetLogLevel") + panic(err) + } + + // Use a deterministic random generator to generate deterministic + // output for the example. + + // Instantiate the first node in the pool + h1, err := libp2p.New(libp2p.Identity(generateIdentity(1))) + if err != nil { + log.Errorw("Error happened in libp2p.New", "err", err) + panic(err) + } + n1, err := blox.New( + blox.WithHost(h1), + blox.WithPoolName("1"), + blox.WithUpdatePoolName(updatePoolName), + blox.WithBlockchainEndPoint("127.0.0.1:4004"), + blox.WithPingCount(5), + blox.WithExchangeOpts( + exchange.WithIpniGetEndPoint("http://127.0.0.1:4004/cid/"), + ), + ) + if err != nil { + log.Errorw("Error happened in blox.New", "err", err) + panic(err) + } + if err := n1.Start(ctx); err != nil { + log.Errorw("Error happened in n1.Start", "err", err) + panic(err) + } + defer n1.Shutdown(ctx) + fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h1.ID().String()) + + mcfg := fulamobile.NewConfig() + mcfg.AllowTransientConnection = true + bloxAddrString := "" + if len(h1.Addrs()) > 0 { + // Convert the first multiaddr to a string + bloxAddrString = h1.Addrs()[0].String() + log.Infow("blox multiadddr is", "addr", bloxAddrString, "peerID", h1.ID().String()) + } else { + log.Errorw("Error happened in h1.Addrs", "err", "No addresses in slice") + panic("No addresses in slice") + } + mcfg.BloxAddr = bloxAddrString + "/p2p/" + h1.ID().String() + mcfg.PoolName = "1" + mcfg.Exchange = bloxAddrString + mcfg.BlockchainEndpoint = "127.0.0.1:4004" + log.Infow("bloxAdd string created", "addr", bloxAddrString+"/p2p/"+h1.ID().String()) + + c1, err := fulamobile.NewClient(mcfg) + if err != nil { + log.Errorw("Error happened in fulamobile.NewClient", "err", err) + panic(err) + } + // Authorize exchange between the two nodes + mobilePeerIDString := c1.ID() + log.Infof("first client created with ID: %s", mobilePeerIDString) + mpid, err := peer.Decode(mobilePeerIDString) + if err != nil { + log.Errorw("Error happened in peer.Decode", "err", err) + panic(err) + } + if err := n1.SetAuth(ctx, h1.ID(), mpid, true); err != nil { + log.Error("Error happened in n1.SetAuth") + panic(err) + } + + err = c1.ConnectToBlox() + if err != nil { + log.Errorw("Error happened in c1.ConnectToBlox", "err", err) + panic(err) + } + plugins, err := c1.ListPlugins() + if err != nil { + panic(err) + } + fmt.Printf("account is %v", plugins) + + // Output: + // Instantiated node in pool 1 with ID: 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM + // account is {"account":"5GEottBB4kGzpN6imRwpnhyVDtKMTSYHYZvT4Rq93dchjN45"} +}