diff --git a/.gitignore b/.gitignore index 09bd3d6..b452a70 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,4 @@ torch .DS_Store .idea -*otel-agent-celestia.yaml +*otel-agent-*.yaml diff --git a/Dockerfile b/Dockerfile index 22d2714..bf6fff2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,12 +1,38 @@ -FROM golang:1.21.0-bullseye AS builder +# stage 1 Generate torch Binary +FROM --platform=$BUILDPLATFORM docker.io/golang:1.21.3-alpine3.18 as builder + +ARG TARGETOS +ARG TARGETARCH +ENV CGO_ENABLED=0 +ENV GO111MODULE=on + WORKDIR / + COPY go.mod go.sum ./ # Download dependencies RUN go mod download COPY . . -RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o /go/bin/torch ./cmd/main.go +RUN CGO_ENABLED=${CGO_ENABLED} GOOS=${TARGETOS} GOARCH=${TARGETARCH} go build -o /go/bin/torch ./cmd/main.go -FROM alpine:latest +# stage 2 +FROM docker.io/alpine:3.18.4 WORKDIR / +# Read here why UID 10001: https://github.com/hexops/dockerfile/blob/main/README.md#do-not-use-a-uid-below-10000 +ARG UID=10001 +ARG USER_NAME=torch + +ENV USR_HOME=/home/${USER_NAME} + +# hadolint ignore=DL3018 +RUN adduser ${USER_NAME} \ + -D \ + -g ${USER_NAME} \ + -h ${USR_HOME} \ + -s /sbin/nologin \ + -u ${UID} + COPY --from=builder /go/bin/torch . + +EXPOSE 8080 + ENTRYPOINT ["./torch"] diff --git a/Dockerfile_local b/Dockerfile_local index f568cd4..dce0b05 100644 --- a/Dockerfile_local +++ b/Dockerfile_local @@ -1,11 +1,37 @@ -FROM golang:1.21.0-bullseye AS builder +# stage 1 Generate torch Binary +FROM --platform=$BUILDPLATFORM docker.io/golang:1.21.3-alpine3.18 as builder + +ARG TARGETOS +ARG TARGETARCH +ENV CGO_ENABLED=0 +ENV GO111MODULE=on + WORKDIR / + COPY go.mod go.sum ./ # Download dependencies RUN go mod download COPY torch /go/bin/torch -FROM alpine:latest +# stage 2 +FROM docker.io/alpine:3.18.4 WORKDIR / +# Read here why UID 10001: https://github.com/hexops/dockerfile/blob/main/README.md#do-not-use-a-uid-below-10000 +ARG UID=10001 +ARG USER_NAME=torch + +ENV USR_HOME=/home/${USER_NAME} + +# hadolint ignore=DL3018 +RUN adduser ${USER_NAME} \ + -D \ + -g ${USER_NAME} \ + -h ${USR_HOME} \ + -s /sbin/nologin \ + -u ${UID} + COPY --from=builder /go/bin/torch . + +EXPOSE 8080 + ENTRYPOINT ["./torch"] diff --git a/Makefile b/Makefile index bd2c39a..4235033 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ PROJECT_NAME := $(shell basename `pwd`) REPOSITORY_NAME := $(shell basename `pwd`) -REGISTRY_NAME=ghcr.io/jrmanes +REGISTRY_NAME=ghcr.io/celestiaorg LOCAL_DEV=local # Go @@ -69,4 +69,4 @@ kubectl_deploy: docker_build_local_push kubectl_apply .PHYONY: kubectl_deploy kubectl_remote_kustomize_deploy: docker_build_local_push_gh kubectl_kustomize -.PHYONY: kubectl_remote_kustomize_deploys \ No newline at end of file +.PHYONY: kubectl_remote_kustomize_deploy diff --git a/README.md b/README.md index 99646ad..28d72c7 100644 --- a/README.md +++ b/README.md @@ -2,56 +2,91 @@ ## Description -Torch is the **Trusted Peers Orchestrator**. +**Torch** is the ***Trusted Peers Orchestrator***. This service was created with the idea to manage [Celestia Nodes](https://github.com/celestiaorg/celestia-node/) automatically. -By default, when you run some Bridge Nodes and Full Nodes, you have to specify in the Full Node the Bridge's multiaddress, this service does it automatically for you. +You can use Torch to manage the nodes connections from a config file and Torch will manage those nodes for you. -Torch access to the nodes defined in the config file and get's their multiaddress, then, it writes it to the specified path and shares the info with all the other peers defined. +Torch uses the Kubernetes API to manage the nodes, it gets their multi addresses information and stores them in a Redis instance, also, it provides some metrics to expose the node's IDs through the `/metrics` endpoint. --- -## Flow +## Workflow -Nodes side: -- Nodes check their `ENV` var during the start-up process -- If they don't have the value yet, they ask to Torch for it. - - They send a request to the service asking for the value -> phase-2 - - If the service already has the addresses, return them, otherwise, check the nodes. -- We store the value in the config PVC in a file, to keep it there even if we restart the pod or update it, and we -will source the value with the `start.sh` - - -1) Torch checks the peers based on the config file, the scope is in its namespace. - - How does it work? - - Torch receives a request with the nodeName in the body, then, checks the config (to validate it) and - opens a connection to them. - - checks the multiaddr, and stores it in memory - - once it has the addresses, it creates a file in the config PVC with the TRUSTED_PEERS value (the path can be defined in the config) -2) Then, it restarts the nodes until all of the peers have the env var available. +![Torch Flow](./docs/assets/torch.png) +When Torch receives a new request to the path `/api/v1/gen` with the node name in the body, it will verify if the node received is in the config file, if so, it will start the process, otherwise, it will reject it. + +There are two types of connections: + +- Using `ENV Vars`: Torch gets the data from the config file and write the connection to the node, using the `containerSetupName` to access to the node and write to a file. + - If the value of the key `nodeType` is `da`. Torch will try to generate the node ID once the node it will be ready to accept connections (*`containerName` will be up & running*). +- Connection via `Multi Address`: The user can specify the `connectsTo` list of a node, that means the node will have one or more connections. + - You can either use the node name like: + + ```yaml + connectsTo: + - "da-bridge-1-0" + - "da-bridge-2-0" + ``` + + - or you can specify the full multi address: + + ```yaml + connectsTo: + - "/dns/da-bridge-1/tcp/2121/p2p/12D3KooWNFpkX9fuo3GQ38FaVKdAZcTQsLr1BNE5DTHGjv2fjEHG" + - "/dns/da-bridge-1/tcp/2121/p2p/12D3KooWL8cqu7dFyodQNLWgJLuCzsQiv617SN9WDVX2GiZnjmeE" + ``` + + - If you want to generate the Multi address, you can either use the DNS or IP, to use dns, you will have to add the key `dnsConnections` and Torch will try to connect to this node, in the other hand, if you want to use IPs, just remove this key. + - Example: + + ```yaml + # This will use IP to connect to da-bridge-1-0 + - peers: + - nodeName: "da-full-1-0" + nodeType: "da" + connectsTo: + - "da-bridge-1-0" + # This will use DNS to connect to da-bridge-1-0 & da-bridge-2-0 + - peers: + - nodeName: "da-full-2-0" + nodeType: "da" + dnsConnections: + - "da-bridge-1" + - "da-bridge-2" + connectsTo: + - "da-bridge-1-0" + - "da-bridge-2-0" + ``` --- ## API Paths -- `/config` - - **Method**: `GET` - - **Description**: returns the config added by the user, can be used to debug -- `/list` +- `/api/v1/config` + - **Method**: `GET` + - **Description**: Returns the config added by the user, can be used to debug +- `/api/v1/list` - **Method**: `GET` - - **Description**: returns the list of the pods available in it's namespace based on the config file -- `/gen` + - **Description**: Returns the list of the pods available in it's namespace based on the config file +- `/api/v1/noId/` + - **Method**: `GET` + - **Description**: Returns the multi address of the node requested. +- `/api/v1/gen` - **Method**: `POST` - - **Description**: starts the process to generate the trusted peers on the nodes based on the config - - **Body Example**: + - **Description**: Starts the process to generate the trusted peers on the nodes based on the config + - **Body Example**: + ```json { "podName": "da-bridge-1" } ``` + - **Response Example**: + ```json { "status": 200, @@ -60,51 +95,154 @@ will source the value with the `start.sh` } } ``` -- `/genAll` - - **Method**: `POST` - - **Description**: generate the config for all the peers in the config file - - **Body Example**: - ```json - { - "podName": - [ - "da-bridge-1", - "da-full-1" - ] - } - ``` - - **Response Example**: - ```json - { - "status": 200, - "body": { - "da-bridge-0": "/dns/da-bridge-0/tcp/2121/p2p/12D3KooWDMuPiHgnB6xwnpaR4cgyAdbB5aN9zwoZCATgGxnrpk1M", - "da-full-0": "/dns/da-full-0/tcp/2121/p2p/12D3KooWDCUaPA5ZQveFfsuAHHBNiAhEERo5J1YfbqwSZKtn9RrD" - } - } - ``` + +- `/metrics` + - **Method**: `GET` + - **Description**: Prometheus metrics endpoint. + --- -## How does it work? +## Config Example Here is an example of the flow, using the config: ```yaml +--- mutualPeers: + - consensusNode: "consensus-validator-1" - peers: - - nodeName: "da-bridge-1" - containerName: "da" - - nodeName: "da-full-1" - containerName: "da" - trustedPeersPath: "/tmp" + - nodeName: "consensus-full-1-0" + containerName: "consensus" # optional - default: consensus + containerSetupName: "consensus-setup" # optional - default: consensus-setup + connectsAsEnvVar: true + nodeType: "consensus" + connectsTo: + - "consensus-validator-1" + - peers: + - nodeName: "consensus-full-2-0" + connectsAsEnvVar: true + nodeType: "consensus" + connectsTo: + - "consensus-validator-1" + - peers: + - nodeName: "da-bridge-1-0" + connectsAsEnvVar: true + nodeType: "da" + connectsTo: + - "consensus-full-1" + - peers: + - nodeName: "da-bridge-2-0" + containerName: "da" # optional - default: da + containerSetupName: "da-setup" # optional - default: da-setup + connectsAsEnvVar: true + nodeType: "da" + connectsTo: + - "consensus-full-2" + - peers: + - nodeName: "da-bridge-3-0" + containerName: "da" + nodeType: "da" + connectsTo: + - "da-bridge-1-0" + - "da-bridge-2-0" + - peers: + - nodeName: "da-full-1-0" + containerName: "da" + containerSetupName: "da-setup" + nodeType: "da" + dnsConnections: + - "da-bridge-1" + - "da-bridge-2" + connectsTo: + - "da-bridge-1-0" + - "da-bridge-2-0" - peers: - - nodeName: "da-bridge-2" - containerName: "da" - - nodeName: "da-full-2" - containerName: "da" + - nodeName: "da-full-2-0" + containerName: "da" + containerSetupName: "da-setup" + nodeType: "da" + connectsTo: + - "da-bridge-1-0" + - "da-bridge-2-0" + - peers: + - nodeName: "da-full-3-0" + nodeType: "da" + connectsTo: + # all the nodes in line using IP + - "/ip4/100.64.5.103/tcp/2121/p2p/12D3KooWNFpkX9fuo3GQ38FaVKdAZcTQsLr1BNE5DTHGjv2fjEHG,/ip4/100.64.5.15/tcp/2121/p2p/12D3KooWL8cqu7dFyodQNLWgJLuCzsQiv617SN9WDVX2GiZnjmeE" + # all the nodes in line using DNS + - "/dns/da-bridge-1/tcp/2121/p2p/12D3KooWKsHCeUVJqJwymyi3bGt1Gwbn5uUUFi2N9WQ7G6rUSXig,/dns/da-bridge-2/tcp/2121/p2p/12D3KooWA26WDUmejZzU6XHc4C7KQNSWaEApe5BEyXFNchAqrxhA" + # one node per line, either IP or DNS + - "/dns/da-bridge-1/tcp/2121/p2p/12D3KooWKsHCeUVJqJwymyi3bGt1Gwbn5uUUFi2N9WQ7G6rUSXig" + - "/dns/da-bridge-2/tcp/2121/p2p/12D3KooWA26WDUmejZzU6XHc4C7KQNSWaEApe5BEyXFNchAqrxhA" trustedPeersPath: "/tmp" ``` -![Torch Flow](./docs/assets/torch.png) +### Another example + +The architecture will contain: + +- 1 Consensus - Validator +- 2 Consensus - non-validating mode - connected to the validator +- 1 DA-Bridge-1 - connected to the CONS-NON-VALIDATOR +- 1 DA-Bridge-2 - connected to the CONS-NON-VALIDATOR +- 1 DA-Full-Node-1 - connected to DA-BN-1 +- 1 DA-Full-Node-2 - connected to DA-BN-1 & DA-BN-2 using DNS + +```yaml +--- +mutualPeers: + - consensusNode: "consensus-validator-1" + - peers: + - nodeName: "consensus-full-1-0" + connectsAsEnvVar: true + nodeType: "consensus" + connectsTo: + - "consensus-validator-1" + - peers: + - nodeName: "consensus-full-2-0" + connectsAsEnvVar: true + nodeType: "consensus" + connectsTo: + - "consensus-validator-1" + - peers: + - nodeName: "da-bridge-1-0" + connectsAsEnvVar: true + nodeType: "da" + connectsTo: + - "consensus-full-1" + - peers: + - nodeName: "da-bridge-2-0" + connectsAsEnvVar: true + nodeType: "da" + connectsTo: + - "consensus-full-2" + - peers: + - nodeName: "da-full-1-0" + nodeType: "da" + dnsConnections: + - "da-bridge-1" + connectsTo: + - "da-bridge-1-0" + - peers: + - nodeName: "da-full-2-0" + nodeType: "da" + dnsConnections: + - "da-bridge-1" + - "da-bridge-2" + connectsTo: + - "da-bridge-1-0" + - "da-bridge-2-0" +``` + +## Requirements + +### Redis + +Torch uses [Redis](https://redis.io/) as a DB, so to use Torch, you need to have a Redis instance available to connect. + +We are using Redis in two different ways: +- Store the Nodes IDs and reuse them. +- As a message broker, where Torch uses Producer & Consumer approach to process data async. --- diff --git a/cmd/main.go b/cmd/main.go index f8e4e9e..7c13d06 100755 --- a/cmd/main.go +++ b/cmd/main.go @@ -3,20 +3,14 @@ package main import ( "flag" "fmt" - "io/ioutil" + "os" - "github.com/jrmanes/torch/config" - handlers "github.com/jrmanes/torch/pkg/http" - - "github.com/jrmanes/torch/pkg/k8s" log "github.com/sirupsen/logrus" "gopkg.in/yaml.v2" -) -// Configuration variables -var ( - // cfg stores the mutual peers configuration. - cfg config.MutualPeersConfig + "github.com/celestiaorg/torch/config" + handlers "github.com/celestiaorg/torch/pkg/http" + "github.com/celestiaorg/torch/pkg/k8s" ) // ParseFlags parses the command-line flags and reads the configuration file. @@ -28,11 +22,13 @@ func ParseFlags() config.MutualPeersConfig { flag.Parse() // Read the configuration file - file, err := ioutil.ReadFile(*configFile) + file, err := os.ReadFile(*configFile) if err != nil { log.Error("Config file doesn't exist...", err) } + cfg := config.MutualPeersConfig{} + // Unmarshal the YAML into a struct err = yaml.Unmarshal(file, &cfg) if err != nil { @@ -59,7 +55,7 @@ func main() { PrintName() // Parse the command-line flags and read the configuration file log.Info("Running on namespace: ", k8s.GetCurrentNamespace()) - cfg = ParseFlags() + cfg := ParseFlags() handlers.Run(cfg) } diff --git a/config-test.yaml b/config-test.yaml index c1a8c26..db310bc 100644 --- a/config-test.yaml +++ b/config-test.yaml @@ -1,11 +1,69 @@ +--- mutualPeers: + - consensusNode: "consensus-validator-1" - peers: - - nodeName: "da-bridge-0" - containerName: "da" - - nodeName: "da-full-0" - containerName: "da" + - nodeName: "consensus-full-1-0" + containerName: "consensus" # optional - default: consensus + containerSetupName: "consensus-setup" # optional - default: consensus-setup + connectsAsEnvVar: true + nodeType: "consensus" + connectsTo: + - "consensus-validator-1" - peers: - - nodeName: "test-pod" - containerName: "pod" - - nodeName: "testing-pod-0l" - containerName: "test" + - nodeName: "consensus-full-2-0" + connectsAsEnvVar: true + nodeType: "consensus" + connectsTo: + - "consensus-validator-1" + - peers: + - nodeName: "da-bridge-1-0" + connectsAsEnvVar: true + nodeType: "da" + connectsTo: + - "consensus-full-1" + - peers: + - nodeName: "da-bridge-2-0" + containerName: "da" # optional - default: da + containerSetupName: "da-setup" # optional - default: da-setup + connectsAsEnvVar: true + nodeType: "da" + connectsTo: + - "consensus-full-2" + - peers: + - nodeName: "da-bridge-3-0" + containerName: "da" + nodeType: "da" + connectsTo: + - "da-bridge-1-0" + - "da-bridge-2-0" + - peers: + - nodeName: "da-full-1-0" + containerName: "da" + containerSetupName: "da-setup" + nodeType: "da" + dnsConnections: + - "da-bridge-1" + - "da-bridge-2" + connectsTo: + - "da-bridge-1-0" + - "da-bridge-2-0" + - peers: + - nodeName: "da-full-2-0" + containerName: "da" + containerSetupName: "da-setup" + nodeType: "da" + connectsTo: + - "da-bridge-1-0" + - "da-bridge-2-0" + - peers: + - nodeName: "da-full-3-0" + nodeType: "da" + connectsTo: + # all the nodes in line using IP + - "/ip4/100.64.5.103/tcp/2121/p2p/12D3KooWNFpkX9fuo3GQ38FaVKdAZcTQsLr1BNE5DTHGjv2fjEHG,/ip4/100.64.5.15/tcp/2121/p2p/12D3KooWL8cqu7dFyodQNLWgJLuCzsQiv617SN9WDVX2GiZnjmeE" + # all the nodes in line using DNS + - "/dns/da-bridge-1/tcp/2121/p2p/12D3KooWKsHCeUVJqJwymyi3bGt1Gwbn5uUUFi2N9WQ7G6rUSXig,/dns/da-bridge-2/tcp/2121/p2p/12D3KooWA26WDUmejZzU6XHc4C7KQNSWaEApe5BEyXFNchAqrxhA" + # one node per line, either IP or DNS + - "/dns/da-bridge-1/tcp/2121/p2p/12D3KooWKsHCeUVJqJwymyi3bGt1Gwbn5uUUFi2N9WQ7G6rUSXig" + - "/dns/da-bridge-2/tcp/2121/p2p/12D3KooWA26WDUmejZzU6XHc4C7KQNSWaEApe5BEyXFNchAqrxhA" + trustedPeersPath: "/tmp" diff --git a/config/config.go b/config/config.go index cbf1352..c29ed03 100644 --- a/config/config.go +++ b/config/config.go @@ -2,21 +2,25 @@ package config // MutualPeersConfig represents the configuration structure. type MutualPeersConfig struct { - // List of mutual peers. - MutualPeers []*MutualPeer `yaml:"mutualPeers"` + MutualPeers []*MutualPeer `yaml:"mutualPeers"` // MutualPeers list of mutual peers. } // MutualPeer represents a mutual peer structure. type MutualPeer struct { - ConsensusNode string `yaml:"consensusNode,omitempty"` - // List of peers. - Peers []Peer `yaml:"peers"` - TrustedPeersPath string `yaml:"trustedPeersPath,omitempty"` + ConsensusNode string `yaml:"consensusNode,omitempty"` // ConsensusNode name + Peers []Peer `yaml:"peers"` // Peer list of peers. + TrustedPeersPath string `yaml:"trustedPeersPath,omitempty"` // TrustedPeersPath specify the path to keep the files } // Peer represents a peer structure. type Peer struct { - // NodeName of the peer node. - NodeName string `yaml:"nodeName"` - ContainerName string `yaml:"containerName"` + NodeName string `yaml:"nodeName"` // NodeName name of the sts/deployment + NodeType string `yaml:"nodeType"` // NodeType specify the type of node + Namespace string `yaml:"namespace,omitempty"` // Namespace of the node + ContainerName string `yaml:"containerName,omitempty"` // ContainerName name of the main container + ContainerSetupName string `yaml:"containerSetupName,omitempty"` // ContainerSetupName initContainer name + ConnectsAsEnvVar bool `yaml:"connectsAsEnvVar,omitempty"` // ConnectsAsEnvVar use the value as env var + ConnectsTo []string `yaml:"connectsTo,omitempty"` // ConnectsTo list of nodes that it will connect to + DnsConnections []string `yaml:"dnsConnections,omitempty"` // DnsConnections list of DNS records + RetryCount int `yaml:"retryCount,omitempty"` // RetryCount number of retries } diff --git a/docs/assets/torch.png b/docs/assets/torch.png index 51a01bf..8e2cf07 100644 Binary files a/docs/assets/torch.png and b/docs/assets/torch.png differ diff --git a/go.mod b/go.mod index 6f8af8e..79d8abb 100644 --- a/go.mod +++ b/go.mod @@ -1,15 +1,17 @@ -module github.com/jrmanes/torch +module github.com/celestiaorg/torch -go 1.20 +go 1.21 require ( + github.com/adjust/rmq/v5 v5.2.0 github.com/gorilla/mux v1.8.0 - github.com/prometheus/client_golang v1.15.1 + github.com/prometheus/client_golang v1.17.0 + github.com/redis/go-redis/v9 v9.2.1 github.com/sirupsen/logrus v1.9.3 - go.opentelemetry.io/otel v1.16.0 - go.opentelemetry.io/otel/exporters/prometheus v0.39.0 - go.opentelemetry.io/otel/metric v1.16.0 - go.opentelemetry.io/otel/sdk/metric v0.39.0 + go.opentelemetry.io/otel v1.19.0 + go.opentelemetry.io/otel/exporters/prometheus v0.41.0 + go.opentelemetry.io/otel/metric v1.19.0 + go.opentelemetry.io/otel/sdk/metric v0.41.0 gopkg.in/yaml.v2 v2.4.0 k8s.io/api v0.27.4 k8s.io/apimachinery v0.27.4 @@ -17,9 +19,12 @@ require ( ) require ( + github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 // indirect + github.com/alicebob/miniredis/v2 v2.30.4 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/emicklei/go-restful/v3 v3.9.0 // indirect github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect @@ -40,19 +45,22 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/prometheus/client_model v0.4.0 // indirect - github.com/prometheus/common v0.42.0 // indirect - github.com/prometheus/procfs v0.9.0 // indirect - go.opentelemetry.io/otel/sdk v1.16.0 // indirect - go.opentelemetry.io/otel/trace v1.16.0 // indirect - golang.org/x/net v0.8.0 // indirect - golang.org/x/oauth2 v0.5.0 // indirect - golang.org/x/sys v0.8.0 // indirect - golang.org/x/term v0.6.0 // indirect - golang.org/x/text v0.8.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect + github.com/prometheus/common v0.44.0 // indirect + github.com/prometheus/procfs v0.11.1 // indirect + github.com/stretchr/testify v1.8.4 // indirect + github.com/yuin/gopher-lua v1.1.0 // indirect + go.opentelemetry.io/otel/sdk v1.18.0 // indirect + go.opentelemetry.io/otel/trace v1.19.0 // indirect + golang.org/x/net v0.10.0 // indirect + golang.org/x/oauth2 v0.8.0 // indirect + golang.org/x/sys v0.12.0 // indirect + golang.org/x/term v0.8.0 // indirect + golang.org/x/text v0.9.0 // indirect golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect google.golang.org/appengine v1.6.7 // indirect - google.golang.org/protobuf v1.30.0 // indirect + google.golang.org/protobuf v1.31.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/klog/v2 v2.90.1 // indirect diff --git a/go.sum b/go.sum index 52a2cbd..8a3b205 100644 --- a/go.sum +++ b/go.sum @@ -1,16 +1,33 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/adjust/rmq/v5 v5.2.0 h1:ENPC+3i8N/LAvAfHpEpTMVl7q8zmwh4nl+hhxkao6KE= +github.com/adjust/rmq/v5 v5.2.0/go.mod h1:FfA6MzYJHeLbuATsNYaZYZaISyxxADDXQLN9QBroFCw= +github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= +github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 h1:uvdUDbHQHO85qeSydJtItA4T55Pw6BtAejd0APRJOCE= +github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= +github.com/alicebob/miniredis/v2 v2.30.4 h1:8S4/o1/KoUArAGbGwPxcwf0krlzceva2XVOSchFS7Eo= +github.com/alicebob/miniredis/v2 v2.30.4/go.mod h1:b25qWj4fCEsBeAAR2mlb0ufImGC6uH3VlUfb/HS5zKg= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= +github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= +github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= +github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= github.com/emicklei/go-restful/v3 v3.9.0 h1:XwGDlfxEnQZzuopoqxwSEllNcCOM9DhhFyhFIIGKwxE= github.com/emicklei/go-restful/v3 v3.9.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= @@ -29,6 +46,7 @@ github.com/go-openapi/jsonreference v0.20.1/go.mod h1:Bl1zwGIM8/wsvqjsOQLJ/SH+En github.com/go-openapi/swag v0.22.3 h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/g= github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 h1:p104kn46Q8WdvHunIJ9dAyjPVtrBPhSr3KT2yUst43I= +github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= @@ -59,6 +77,7 @@ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/ github.com/google/gofuzz v1.1.0 h1:Hsa8mG0dQ46ij8Sl2AYJDUv1oA9/d6Vk+3LG99Oe02g= github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec= +github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= @@ -73,6 +92,7 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -91,22 +111,28 @@ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjY github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/onsi/ginkgo/v2 v2.9.1 h1:zie5Ly042PD3bsCvsSOPvRnFwyo3rKe64TJlD6nu0mk= +github.com/onsi/ginkgo/v2 v2.9.1/go.mod h1:FEcmzVcCHl+4o9bQZVab+4dC9+j+91t2FHSzmGAPfuo= github.com/onsi/gomega v1.27.4 h1:Z2AnStgsdSayCMDiCU42qIz+HLqEPcgiOCXjAU/w+8E= +github.com/onsi/gomega v1.27.4/go.mod h1:riYq/GJKh8hhoM01HN6Vmuy93AarCXCBGpvFDK3q3fQ= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v1.15.1 h1:8tXpTmJbyH5lydzFPoxSIJ0J46jdh3tylbvM1xCv0LI= -github.com/prometheus/client_golang v1.15.1/go.mod h1:e9yaBhRPU2pPNsZwE+JdQl0KEt1N9XgF6zxWmaC0xOk= +github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q= +github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/prometheus/client_model v0.4.0 h1:5lQXD3cAg1OXBf4Wq03gTrXHeaV0TQvGfUooCfx1yqY= -github.com/prometheus/client_model v0.4.0/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU= -github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI1YM= -github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= -github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI= -github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY= +github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 h1:v7DLqVdK4VrYkVD5diGdl4sxJurKJEMnODWRJlxV9oM= +github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU= +github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY= +github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY= +github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI= +github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY= +github.com/redis/go-redis/v9 v9.2.1 h1:WlYJg71ODF0dVspZZCpYmoF1+U1Jjk9Rwd7pq6QmlCg= +github.com/redis/go-redis/v9 v9.2.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= +github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= @@ -117,21 +143,24 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s= -go.opentelemetry.io/otel v1.16.0/go.mod h1:vl0h9NUa1D5s1nv3A5vZOYWn8av4K8Ml6JDeHrT/bx4= -go.opentelemetry.io/otel/exporters/prometheus v0.39.0 h1:whAaiHxOatgtKd+w0dOi//1KUxj3KoPINZdtDaDj3IA= -go.opentelemetry.io/otel/exporters/prometheus v0.39.0/go.mod h1:4jo5Q4CROlCpSPsXLhymi+LYrDXd2ObU5wbKayfZs7Y= -go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo= -go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4= -go.opentelemetry.io/otel/sdk v1.16.0 h1:Z1Ok1YsijYL0CSJpHt4cS3wDDh7p572grzNrBMiMWgE= -go.opentelemetry.io/otel/sdk v1.16.0/go.mod h1:tMsIuKXuuIWPBAOrH+eHtvhTL+SntFtXF9QD68aP6p4= -go.opentelemetry.io/otel/sdk/metric v0.39.0 h1:Kun8i1eYf48kHH83RucG93ffz0zGV1sh46FAScOTuDI= -go.opentelemetry.io/otel/sdk/metric v0.39.0/go.mod h1:piDIRgjcK7u0HCL5pCA4e74qpK/jk3NiUoAHATVAmiI= -go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs= -go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0= +github.com/yuin/gopher-lua v1.1.0 h1:BojcDhfyDWgU2f2TOzYK/g5p2gxMrku8oupLDqlnSqE= +github.com/yuin/gopher-lua v1.1.0/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= +go.opentelemetry.io/otel v1.19.0 h1:MuS/TNf4/j4IXsZuJegVzI1cwut7Qc00344rgH7p8bs= +go.opentelemetry.io/otel v1.19.0/go.mod h1:i0QyjOq3UPoTzff0PJB2N66fb4S0+rSbSB15/oyH9fY= +go.opentelemetry.io/otel/exporters/prometheus v0.41.0 h1:A3/bhjP5SmELy8dcpK+uttHeh9Qrh+YnS16/VzrztRQ= +go.opentelemetry.io/otel/exporters/prometheus v0.41.0/go.mod h1:mKuXEMi9suyyNJQ99SZCO0mpWGFe0MIALtjd3r6uo7Q= +go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPiOKwvpE= +go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8= +go.opentelemetry.io/otel/sdk v1.18.0 h1:e3bAB0wB3MljH38sHzpV/qWrOTCFrdZF2ct9F8rBkcY= +go.opentelemetry.io/otel/sdk v1.18.0/go.mod h1:1RCygWV7plY2KmdskZEDDBs4tJeHG92MdHZIluiYs/M= +go.opentelemetry.io/otel/sdk/metric v0.41.0 h1:c3sAt9/pQ5fSIUfl0gPtClV3HhE18DCVzByD33R/zsk= +go.opentelemetry.io/otel/sdk/metric v0.41.0/go.mod h1:PmOmSt+iOklKtIg5O4Vz9H/ttcRFSNTgii+E1KGyn1w= +go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg= +go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -150,11 +179,11 @@ golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= -golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= +golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= -golang.org/x/oauth2 v0.5.0 h1:HuArIo48skDwlrvM3sEdHXElYslAMsf3KwRkkW4MC4s= -golang.org/x/oauth2 v0.5.0/go.mod h1:9/XBHVqLaWO3/BRHs5jbpYCnOZVjj5V0ndyaAM7KB4I= +golang.org/x/oauth2 v0.8.0 h1:6dkIjl3j3LtZ/O3sTgZTMsLKSftL/B8Zgq4huOIIUu8= +golang.org/x/oauth2 v0.8.0/go.mod h1:yr7u4HXZRm1R1kBWqr/xKNqewf0plRYoB7sla+BCIXE= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -162,19 +191,20 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= -golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/term v0.6.0 h1:clScbb1cHjoCkyRbWwBEUZ5H/tIFu5TAXIqaZD0Gcjw= -golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= +golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.8.0 h1:n5xxQn2i3PC0yLAbjTpNT85q/Kgzcr2gIoX9OrJUols= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= -golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 h1:vVKdlvoWBphwdxWKrFZEuM0kGgGLxUOYcY4U/2Vjg44= golang.org/x/time v0.0.0-20220210224613-90d013bbcef8/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -186,6 +216,7 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.7.0 h1:W4OVu8VVOaIO0yzWMNdepAulS7YfoS3Zabrm8DOXXU4= +golang.org/x/tools v0.7.0/go.mod h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -211,8 +242,8 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= -google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= diff --git a/pkg/db/redis/config.go b/pkg/db/redis/config.go new file mode 100644 index 0000000..c0d6445 --- /dev/null +++ b/pkg/db/redis/config.go @@ -0,0 +1,59 @@ +package redis + +import ( + "os" + + log "github.com/sirupsen/logrus" +) + +var ( + redisHost = "" + redisPort = "" + redisPass = "" + redisFullUrl = "" +) + +// InitRedisConfig checks env vars and add default values in case we need +func InitRedisConfig() *RedisClient { + // redis config + redisHost = GetRedisHost() + redisPort = GetRedisPort() + redisPass = GetRedisPass() + redisFullUrl = GetRedisFullURL() + + log.Info("Redis host to connect: ", redisFullUrl) + + return NewRedisClient(redisFullUrl, redisPass, 0) +} + +// GetRedisHost returns the redis host to connect +func GetRedisHost() string { + redisHost = os.Getenv("REDIS_HOST") + if redisHost == "" { + redisHost = "localhost" + } + return redisHost +} + +// GetRedisPort returns the redis port +func GetRedisPort() string { + redisPort = os.Getenv("REDIS_PORT") + if redisPort == "" { + redisPort = "6379" + } + return redisPort +} + +// GetRedisFullURL returns the full url +func GetRedisFullURL() string { + return redisHost + ":" + redisPort +} + +// GetRedisPass returns the redis pass +func GetRedisPass() string { + redisPass = os.Getenv("REDIS_PASS") + if redisPass == "" { + redisPass = "" + } + return redisPass +} diff --git a/pkg/db/redis/manager.go b/pkg/db/redis/manager.go new file mode 100644 index 0000000..b517f00 --- /dev/null +++ b/pkg/db/redis/manager.go @@ -0,0 +1,52 @@ +package redis + +import ( + "context" + "time" + + log "github.com/sirupsen/logrus" +) + +// SetNodeId stores the values in redis. +func SetNodeId( + podName string, + r *RedisClient, + ctx context.Context, + output string, +) error { + // try to get the value from redis + // if the value is empty, then we add it + nodeName, err := CheckIfNodeExistsInDB(r, ctx, podName) + if err != nil { + return err + } + + // if the node is not in the db, then we add it + if nodeName == "" { + log.Info("Node ", "["+podName+"]"+" not found in Redis, let's add it") + err := r.SetKey(ctx, podName, output, 1000*time.Hour) + if err != nil { + log.Error("Error adding the node to redis: ", err) + return err + } + } else { + log.Info("Node ", "["+podName+"]"+" found in Redis") + } + + return nil +} + +// CheckIfNodeExistsInDB checks if node is in the DB and return it. +func CheckIfNodeExistsInDB( + r *RedisClient, + ctx context.Context, + nodeName string, +) (string, error) { + nodeName, err := r.GetKey(ctx, nodeName) + if err != nil { + log.Error("Error: ", err) + return "", err + } + + return nodeName, err +} diff --git a/pkg/db/redis/producer.go b/pkg/db/redis/producer.go new file mode 100644 index 0000000..1350399 --- /dev/null +++ b/pkg/db/redis/producer.go @@ -0,0 +1,38 @@ +package redis + +import ( + "github.com/adjust/rmq/v5" + log "github.com/sirupsen/logrus" +) + +// Producer add data into the queue. +func Producer(data, queueName string) error { + log.Info("Adding STS [", data, "] node to the queue: [", queueName, "]") + data += "-0" // we add the suffix as the pods have it in their name when we use a StatefulSet. + log.Info("Getting the pod from the STS [", data, "]") + + connection, err := rmq.OpenConnection( + "producer", + "tcp", + GetRedisFullURL(), + 2, + nil, + ) + if err != nil { + log.Error("Error: ", err) + return err + } + + queue, err := connection.OpenQueue(queueName) + if err != nil { + log.Error("Error: ", err) + return err + } + + if err := queue.Publish(data); err != nil { + log.Error("Error, failed to publish: ", err) + return err + } + + return nil +} diff --git a/pkg/db/redis/redis.go b/pkg/db/redis/redis.go new file mode 100644 index 0000000..ba04967 --- /dev/null +++ b/pkg/db/redis/redis.go @@ -0,0 +1,64 @@ +package redis + +import ( + "context" + "time" + + "github.com/redis/go-redis/v9" + log "github.com/sirupsen/logrus" +) + +type RedisClient struct { + client *redis.Client +} + +// NewRedisClient returns a Redis client connection +func NewRedisClient(addr string, password string, db int) *RedisClient { + client := redis.NewClient(&redis.Options{ + Addr: addr, + Password: password, // use password set. + DB: db, // use DB. + Protocol: 3, // specify 2 for RESP 2 or 3 for RESP 3. + }) + return &RedisClient{client} +} + +// SetKey receives a key - value and stores it into the DB. +func (r *RedisClient) SetKey(ctx context.Context, key, value string, expiration time.Duration) error { + return r.client.Set(ctx, key, value, expiration).Err() +} + +// GetKey receives a key and tries to return it from the DB. +func (r *RedisClient) GetKey(ctx context.Context, key string) (string, error) { + result, err := r.client.Get(ctx, key).Result() + if err == redis.Nil { + return "", nil + } else if err != nil { + return "", err + } + return result, nil +} + +// GetAllKeys returns all the keys from the DB. +func (r *RedisClient) GetAllKeys(ctx context.Context) (map[string]string, error) { + result := make(map[string]string) + iter, err := r.client.Keys(ctx, "*").Result() + if err != nil { + log.Error("Error getting the key ", err) + } + for _, s := range iter { + value, err := r.GetKey(ctx, s) + if err != nil { + log.Error("Error getting the key ", s, ": ", err) + } else { + result[s] = value + } + } + + return result, nil +} + +// SetKeyExpiration receive a key and exp. time and set it. +func (r *RedisClient) SetKeyExpiration(ctx context.Context, key string, expiration time.Duration) error { + return r.client.Expire(ctx, key, expiration).Err() +} diff --git a/pkg/http/handlers.go b/pkg/http/handlers.go index ec98f2b..a138866 100644 --- a/pkg/http/handlers.go +++ b/pkg/http/handlers.go @@ -1,23 +1,33 @@ package handlers import ( + "context" "encoding/json" + "errors" "net/http" + "time" - "github.com/jrmanes/torch/config" - "github.com/jrmanes/torch/pkg/k8s" - + "github.com/gorilla/mux" log "github.com/sirupsen/logrus" + + "github.com/celestiaorg/torch/config" + "github.com/celestiaorg/torch/pkg/db/redis" + "github.com/celestiaorg/torch/pkg/nodes" +) + +const ( + errorMsg = "Error: " // errorMsg common error message. + timeoutDuration = 30 * time.Second // timeoutDuration we specify the max time to run the func. ) type RequestBody struct { // Body response response body. - Body string `json:"podName"` + Body string `json:"pod_name"` } type RequestMultipleNodesBody struct { // Body response response body. - Body []string `json:"podName"` + Body []string `json:"pod_name"` } // Response represents the response structure. @@ -30,27 +40,8 @@ type Response struct { Errors interface{} `json:"errors,omitempty"` } -// MutualPeersConfig represents the configuration structure. -type MutualPeersConfig struct { - // List of mutual peers. - MutualPeers []*MutualPeer `yaml:"mutualPeers"` -} - -// MutualPeer represents a mutual peer structure. -type MutualPeer struct { - // List of peers. - Peers []Peer `yaml:"peers"` -} - -// Peer represents a peer structure. -type Peer struct { - // NodeName of the peer node. - NodeName string `yaml:"nodeName"` - ContainerName string `yaml:"containerName"` -} - // GetConfig handles the HTTP GET request for retrieving the config as JSON. -func GetConfig(w http.ResponseWriter, r *http.Request, cfg config.MutualPeersConfig) { +func GetConfig(w http.ResponseWriter, cfg config.MutualPeersConfig) { // Generate the response, including the configuration resp := Response{ Status: http.StatusOK, @@ -58,95 +49,97 @@ func GetConfig(w http.ResponseWriter, r *http.Request, cfg config.MutualPeersCon Errors: nil, } - jsonData, err := json.Marshal(resp) - if err != nil { - log.Error("Error marshaling to JSON:", err) - http.Error(w, "Internal Server Error", http.StatusInternalServerError) - return - } - - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - _, err = w.Write(jsonData) - if err != nil { - log.Error("Error writing response:", err) - } + ReturnResponse(resp, w) } // List handles the HTTP GET request for retrieving the list of matching pods as JSON. -func List(w http.ResponseWriter, r *http.Request, cfg config.MutualPeersConfig) { - //listOfPods := k8s.GenerateList(cfg) - nodeIDs := k8s.GetAllIDs() +func List(w http.ResponseWriter) { + red := redis.InitRedisConfig() + // Create a new context with a timeout + ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration) + + // Make sure to call the cancel function to release resources when you're done + defer cancel() + + // get all values from redis + nodeIDs, err := red.GetAllKeys(ctx) + if err != nil { + log.Error("Error getting the keys and values: ", err) + } - // Generate the response, adding the matching pod names + // Generate the response, including the configuration resp := Response{ Status: http.StatusOK, Body: nodeIDs, Errors: nil, } - jsonData, err := json.Marshal(resp) - if err != nil { - log.Error("Error marshaling to JSON:", err) - http.Error(w, "Internal Server Error", http.StatusInternalServerError) - return - } - - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - _, err = w.Write(jsonData) - if err != nil { - log.Error("Error writing response:", err) - } + ReturnResponse(resp, w) } -// Gen handles the HTTP POST request to create the files with their ids -func Gen(w http.ResponseWriter, r *http.Request, cfg config.MutualPeersConfig) { - var body RequestBody - var resp Response +// GetNoId handles the HTTP GET request for retrieving the list of matching pods as JSON. +func GetNoId(w http.ResponseWriter, r *http.Request, cfg config.MutualPeersConfig) { + nodeName := mux.Vars(r)["nodeName"] + if nodeName == "" { + log.Error("User param nodeName is empty", http.StatusNotFound) + return + } - err := json.NewDecoder(r.Body).Decode(&body) - if err != nil { - log.Error("Error decoding the request body into the struct:", err) + // verify that the node is in the config + ok, peer := nodes.ValidateNode(nodeName, cfg) + if !ok { + log.Error(errorMsg, "Pod doesn't exists in the config") resp := Response{ - Status: http.StatusInternalServerError, - Body: body.Body, - Errors: err, + Status: http.StatusNotFound, + Body: peer.NodeName, + Errors: errors.New("error: Pod doesn't exists in the config"), } - ReturnResponse(resp, w, r) + ReturnResponse(resp, w) } - pod := body.Body - log.Info(pod) + red := redis.InitRedisConfig() + // Create a new context with a timeout + ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration) + + // Make sure to call the cancel function to release resources when you're done + defer cancel() - output, err := k8s.GenerateTrustedPeersAddr(cfg, pod) + // initialize the response struct + resp := Response{} + + nodeIDs, err := red.GetKey(ctx, nodeName) if err != nil { - log.Error("Error: ", err) - resp := Response{ - Status: http.StatusInternalServerError, - Body: pod, - Errors: err, - } - ReturnResponse(resp, w, r) + log.Error("Error getting the keys and values: ", err) } - // print the output -> should be the nodeId - log.Info(output) - - nodeId := make(map[string]string) - nodeId[pod] = output + if nodeIDs == "" { + resp = Response{ + Status: http.StatusNotFound, + Body: "", + Errors: "[ERROR] Node [" + nodeName + "] not found", + } + } else { + // Generate the response, adding the matching pod names + resp = Response{ + Status: http.StatusOK, + Body: nodeIDs, + Errors: nil, + } + } + // Generate the response, including the configuration resp = Response{ Status: http.StatusOK, - Body: nodeId, + Body: nodeIDs, Errors: nil, } - ReturnResponse(resp, w, r) + + ReturnResponse(resp, w) } -// GenAll generate the list of ids for all the nodes availabe in the config -func GenAll(w http.ResponseWriter, r *http.Request, cfg config.MutualPeersConfig) { - var body RequestMultipleNodesBody +// Gen handles the HTTP POST request to create the files with their ids. +func Gen(w http.ResponseWriter, r *http.Request, cfg config.MutualPeersConfig) { + var body RequestBody var resp Response err := json.NewDecoder(r.Body).Decode(&body) @@ -157,43 +150,79 @@ func GenAll(w http.ResponseWriter, r *http.Request, cfg config.MutualPeersConfig Body: body.Body, Errors: err, } - ReturnResponse(resp, w, r) + ReturnResponse(resp, w) } - pod := body.Body - log.Info(pod) - - nodeIDs, err := k8s.GenerateAllTrustedPeersAddr(cfg, pod) - if err != nil { - log.Error("Error: ", err) - // resp -> generate the response with the error + // verify that the node is in the config + ok, peer := nodes.ValidateNode(body.Body, cfg) + if !ok { + log.Error(errorMsg, "Pod doesn't exists in the config") resp := Response{ - Status: http.StatusInternalServerError, - Body: pod, - Errors: err, + Status: http.StatusNotFound, + Body: body.Body, + Errors: errors.New("error: Pod doesn't exists in the config"), } - ReturnResponse(resp, w, r) + ReturnResponse(resp, w) } - // remove if the ids is empty - for nodeName, id := range nodeIDs { - if id == "" { - // if the id is empty, we remove it from the map - delete(nodeIDs, nodeName) + log.Info("Pod to setup: ", "[", peer.NodeName, "]") + + resp = ConfigureNode(cfg, peer, err) + + ReturnResponse(resp, w) +} + +func ConfigureNode( + cfg config.MutualPeersConfig, + peer config.Peer, + err error, +) Response { + // Get the default values in case we need + switch peer.NodeType { + case "da": + peer = nodes.SetDaNodeDefault(peer) + case "consensus": + peer = nodes.SetConsNodeDefault(peer) + } + + // check if the node uses env var + if peer.ConnectsAsEnvVar { + log.Info("Pod: [", peer.NodeName, "] ", "uses env var to connect.") + // configure the env vars for the node + err = nodes.SetupNodesEnvVarAndConnections(peer, cfg) + if err != nil { + log.Error(errorMsg, err) + return Response{ + Status: http.StatusInternalServerError, + Body: peer.NodeName, + Errors: err, + } } } - // resp -> generate the response - resp = Response{ + // Configure DA Nodes with which are not using env var + if peer.NodeType == "da" && !peer.ConnectsAsEnvVar { + err := nodes.SetupDANodeWithConnections(peer) + if err != nil { + log.Error(errorMsg, err) + return Response{ + Status: http.StatusInternalServerError, + Body: peer.NodeName, + Errors: err, + } + } + } + + // return the resp with status 200 and the node name. + return Response{ Status: http.StatusOK, - Body: nodeIDs, - Errors: nil, + Body: peer.NodeName, + Errors: "", } - ReturnResponse(resp, w, r) } -// ReturnResponse assert function to write the reponse -func ReturnResponse(resp Response, w http.ResponseWriter, r *http.Request) { +// ReturnResponse assert function to write the response. +func ReturnResponse(resp Response, w http.ResponseWriter) { jsonData, err := json.Marshal(resp) if err != nil { log.Error("Error marshaling to JSON:", err) @@ -210,7 +239,7 @@ func ReturnResponse(resp Response, w http.ResponseWriter, r *http.Request) { } } -// logRequest is a middleware function that logs the incoming request. +// LogRequest is a middleware function that logs the incoming request. func LogRequest(handler http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { log.Info(r.Method, " ", r.URL.Path) diff --git a/pkg/http/router.go b/pkg/http/router.go index 2fa4243..7787777 100644 --- a/pkg/http/router.go +++ b/pkg/http/router.go @@ -3,26 +3,38 @@ package handlers import ( "net/http" - "github.com/jrmanes/torch/config" - "github.com/gorilla/mux" "github.com/prometheus/client_golang/prometheus/promhttp" + + "github.com/celestiaorg/torch/config" ) func Router(r *mux.Router, cfg config.MutualPeersConfig) *mux.Router { r.Use(LogRequest) - r.HandleFunc("/config", func(w http.ResponseWriter, r *http.Request) { - GetConfig(w, r, cfg) + + // group the current version to /api/v1 + s := r.PathPrefix("/api/v1").Subrouter() + + // get config + s.HandleFunc("/config", func(w http.ResponseWriter, r *http.Request) { + GetConfig(w, cfg) }).Methods("GET") - r.HandleFunc("/list", func(w http.ResponseWriter, r *http.Request) { - List(w, r, cfg) + + // get nodes + s.HandleFunc("/list", func(w http.ResponseWriter, r *http.Request) { + List(w) }).Methods("GET") - r.HandleFunc("/gen", func(w http.ResponseWriter, r *http.Request) { + // get node details by node name + s.HandleFunc("/noId/{nodeName}", func(w http.ResponseWriter, r *http.Request) { + GetNoId(w, r, cfg) + }).Methods("GET") + + // generate + s.HandleFunc("/gen", func(w http.ResponseWriter, r *http.Request) { Gen(w, r, cfg) }).Methods("POST") - r.HandleFunc("/genAll", func(w http.ResponseWriter, r *http.Request) { - GenAll(w, r, cfg) - }).Methods("POST") + + // metrics r.Handle("/metrics", promhttp.Handler()) return r diff --git a/pkg/http/server.go b/pkg/http/server.go index 80f58d9..1c5682a 100644 --- a/pkg/http/server.go +++ b/pkg/http/server.go @@ -9,12 +9,14 @@ import ( "syscall" "time" - "github.com/jrmanes/torch/config" - "github.com/jrmanes/torch/pkg/k8s" - "github.com/jrmanes/torch/pkg/metrics" - "github.com/gorilla/mux" log "github.com/sirupsen/logrus" + + "github.com/celestiaorg/torch/config" + "github.com/celestiaorg/torch/pkg/db/redis" + "github.com/celestiaorg/torch/pkg/k8s" + "github.com/celestiaorg/torch/pkg/metrics" + "github.com/celestiaorg/torch/pkg/nodes" ) // GetHttpPort GetPort retrieves the namespace where the service will be deployed @@ -28,7 +30,7 @@ func GetHttpPort() string { // Ensure that the provided port is a valid numeric value _, err := strconv.Atoi(port) if err != nil { - log.Error("Invalid HTTP_PORT value: %v. Using default port 8080") + log.Error("Invalid HTTP_PORT [", os.Getenv("HTTP_PORT"), "] ,using default port 8080") return "8080" } @@ -55,29 +57,13 @@ func Run(cfg config.MutualPeersConfig) { return } - // Register Metrics - Initialize them - err = RegisterMetrics(cfg) - if err != nil { - log.Errorf("Error registering metrics: %v", err) + // generate the metric from the Genesis Hash data + notOk := GenerateHashMetrics(cfg, err) + if notOk { + log.Error("Error registering metric block_height_1") return } - // Get the genesisHash - // check if the config has the consensusNode field defined - if cfg.MutualPeers[0].ConsensusNode != "" { - blockHash, earliestBlockTime := k8s.GenesisHash(cfg) - err = metrics.WithMetricsBlockHeight( - blockHash, - earliestBlockTime, - cfg.MutualPeers[0].ConsensusNode, - os.Getenv("POD_NAMESPACE"), - ) - if err != nil { - log.Errorf("Error registering metric block_height_1: %v", err) - return - } - } - // Create the server server := &http.Server{ Addr: ":" + httpPort, @@ -95,6 +81,38 @@ func Run(cfg config.MutualPeersConfig) { log.Info("Server Started...") log.Info("Listening on port: " + httpPort) + // Initialize the goroutine to check the nodes in the queue. + log.Info("Initializing queues to process the nodes...") + // Create a new context without timeout as we want to keep this goroutine running forever, if we specify a timeout, + // it will be canceled at some point.c + go func() { + go nodes.ProcessTaskQueue() + }() + + log.Info("Initializing goroutine to watch over the StatefulSets...") + // Initialize a goroutine to watch for changes in StatefulSets in the namespace. + go func() { + // Call the WatchStatefulSets function and capture any potential error. + err := k8s.WatchStatefulSets() + if err != nil { + // Log an error message if WatchStatefulSets encounters an error. + log.Error("Error in WatchStatefulSets: ", err) + } + }() + + // Initialize the goroutine to add a watcher to the StatefulSets in the namespace. + log.Info("Initializing Redis consumer") + go func() { + nodes.ConsumerInit("k8s") + }() + + // Check if we already have some multi addresses in the DB and expose them, there might be a situation where Torch + // get restarted, and we already have the nodes IDs, so we can expose them. + err = RegisterMetrics(cfg) + if err != nil { + log.Error("Couldn't generate the metrics...", err) + } + <-done log.Info("Server Stopped") @@ -109,25 +127,62 @@ func Run(cfg config.MutualPeersConfig) { log.Info("Server Exited Properly") } -// RegisterMetrics generates and registers the metrics for all nodes in the configuration. +func GenerateHashMetrics(cfg config.MutualPeersConfig, err error) bool { + // Get the genesisHash + // check if the config has the consensusNode field defined + if cfg.MutualPeers[0].ConsensusNode != "" { + blockHash, earliestBlockTime := nodes.GenesisHash(cfg) + err = metrics.WithMetricsBlockHeight( + blockHash, + earliestBlockTime, + cfg.MutualPeers[0].ConsensusNode, + os.Getenv("POD_NAMESPACE"), + ) + if err != nil { + log.Errorf("Error registering metric block_height_1: %v", err) + return true + } + } + return false +} + +// RegisterMetrics generates and registers the metrics for all nodes in case they already exist in the DB. func RegisterMetrics(cfg config.MutualPeersConfig) error { - log.Info("Generating initial metrics for all the nodes...") + red := redis.InitRedisConfig() + // Create a new context with a timeout + ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration) + + // Make sure to call the cancel function to release resources when you're done + defer cancel() - var nodeNames []string + log.Info("Generating metrics from existing nodes...") // Adding nodes from config to register the initial metrics for _, n := range cfg.MutualPeers { for _, no := range n.Peers { - nodeNames = append(nodeNames, no.NodeName) + // checking the node in the DB first + ma, err := redis.CheckIfNodeExistsInDB(red, ctx, no.NodeName) + if err != nil { + log.Error("Error CheckIfNodeExistsInDB : [", no.NodeName, "]", err) + return err + } + + // check if the multi address is not empty + if ma != "" { + log.Info("Node: [", no.NodeName, "], found in the DB generating metric: ", " [", ma, "]") + + // Register a multi-address metric + m := metrics.MultiAddrs{ + ServiceName: "torch", + NodeName: no.NodeName, + MultiAddr: ma, + Namespace: no.Namespace, + Value: 1, + } + metrics.RegisterMetric(m) + } } } - // Generate the metrics for all nodes - _, err := k8s.GenerateAllTrustedPeersAddr(cfg, nodeNames) - if err != nil { - log.Errorf("Error GenerateAllTrustedPeersAddr: %v", err) - return err - } - return nil } diff --git a/pkg/k8s/commands.go b/pkg/k8s/commands.go index ef8f8a4..351b21d 100644 --- a/pkg/k8s/commands.go +++ b/pkg/k8s/commands.go @@ -1,84 +1,71 @@ package k8s import ( - "fmt" + "bytes" - "github.com/jrmanes/torch/config" + log "github.com/sirupsen/logrus" + v1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/remotecommand" ) -var ( - trustedPeerFile = "/tmp/TP-ADDR" - trustedPeers = "/tmp/" - cmd = `$(ifconfig | grep -oE 'inet addr:([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)' | grep -v '127.0.0.1' | awk '{print substr($2, 6)}')` - trustedPeerPrefix = "/ip4/" + cmd + "/tcp/2121/p2p/" -) - -// GetTrustedPeersPath get the peers path from config or return the default value -func GetTrustedPeersPath(cfg config.MutualPeer) string { - // if not defined in the config, return the default value - if cfg.TrustedPeersPath == "" { - return trustedPeers +// RunRemoteCommand executes a remote command on the specified node. +func RunRemoteCommand(nodeName, container, namespace string, command []string) (string, error) { + clusterConfig, err := rest.InClusterConfig() + if err != nil { + log.Error("Error: ", err.Error()) + } + // creates the client + client, err := kubernetes.NewForConfig(clusterConfig) + if err != nil { + log.Fatalf("Error: %v", err.Error()) } - return cfg.TrustedPeersPath -} + // Create a request to execute the command on the specified node. + req := client.CoreV1().RESTClient().Post(). + Resource("pods"). + Name(nodeName). + Namespace(namespace). + SubResource("exec"). + VersionedParams(&v1.PodExecOptions{ + Command: command, + Container: container, + Stdin: false, + Stdout: true, + Stderr: true, + TTY: false, + }, scheme.ParameterCodec) -// GetTrustedPeerCommand generates the command for retrieving trusted peer information. -func GetTrustedPeerCommand() []string { - script := fmt.Sprintf(`#!/bin/sh -# add the prefix to the addr -if [ -f "%[1]s" ];then - cat "%[1]s" -fi`, trustedPeerFile) + // Execute the remote command. + output, err := executeCommand(clusterConfig, req) + if err != nil { + log.Error("failed to execute remote command: ", err) + } - return []string{"sh", "-c", script} + return output, nil } -// CreateTrustedPeerCommand generates the command for creating trusted peers. -// we have to use the shell script because we can only get the token and the -// nodeID from the node itself -func CreateTrustedPeerCommand() []string { - script := fmt.Sprintf(`#!/bin/sh -if [ -f "%[1]s" ];then - cat "%[1]s" -else - # add the prefix to the addr - echo -n "%[2]s" > "%[1]s" - - # generate the token - export AUTHTOKEN=$(celestia bridge auth admin --node.store /home/celestia) - - # remove the first warning line... - export AUTHTOKEN=$(echo $AUTHTOKEN|rev|cut -d' ' -f1|rev) - - # make the request and parse the response - TP_ADDR=$(wget --header="Authorization: Bearer $AUTHTOKEN" \ - --header="Content-Type: application/json" \ - --post-data='{"jsonrpc":"2.0","id":0,"method":"p2p.Info","params":[]}' \ - --output-document - \ - http://localhost:26658 | grep -o '"ID":"[^"]*"' | sed 's/"ID":"\([^"]*\)"/\1/') - - echo -n "${TP_ADDR}" >> "%[1]s" - cat "%[1]s" -fi`, trustedPeerFile, trustedPeerPrefix) - - return []string{"sh", "-c", script} -} +// executeCommand executes the remote command using the provided configuration, request, and output writer. +func executeCommand(config *rest.Config, req *rest.Request) (string, error) { + executor, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL()) + if err != nil { + log.Error("failed to create SPDY executor: ", err) + } -// BulkTrustedPeerCommand generates the peers content in the files -func BulkTrustedPeerCommand(tp string, cfg config.MutualPeer) []string { - // Get the path to write - trustedPeers = GetTrustedPeersPath(cfg) + // Prepare the standard I/O streams. + var stdout, stderr bytes.Buffer - script := fmt.Sprintf(`#!/bin/sh -# create the folder if doesnt exists -mkdir -p "%[3]s" + // Execute the remote command and capture the output. + err = executor.Stream(remotecommand.StreamOptions{ + Stdout: &stdout, + Stderr: &stderr, + Tty: false, + }) + if err != nil { + log.Error("failed to execute command stream: ", err) + } -if [ ! -f "%[3]s" ];then - cp "%[2]s" "%[3]s/TRUSTED_PEERS" -fi -# Generate Trusteed Peers only if they are not in the file -grep -qF "%[1]s" "%[3]s/TRUSTED_PEERS" || echo ",%[1]s" >> "%[3]s/TRUSTED_PEERS" -`, tp, trustedPeerFile, trustedPeers) - return []string{"sh", "-c", script} + return stdout.String(), nil } diff --git a/pkg/k8s/k8s.go b/pkg/k8s/k8s.go deleted file mode 100644 index a32d716..0000000 --- a/pkg/k8s/k8s.go +++ /dev/null @@ -1,393 +0,0 @@ -package k8s - -import ( - "bytes" - "context" - "encoding/json" - "errors" - "fmt" - "io/ioutil" - "net/http" - "os" - "sync" - - "github.com/jrmanes/torch/config" - "github.com/jrmanes/torch/pkg/metrics" - - log "github.com/sirupsen/logrus" - - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/remotecommand" -) - -type NodeAddress struct { - ID string - NodeName string -} - -var nodeIDsMap map[string]string - -// GetCurrentNamespace gets the current namespace from the environment variable. -// If the variable is not defined, the default value "default" is used. -func GetCurrentNamespace() string { - // currentNamespace Stores the current namespace. - currentNamespace := os.Getenv("POD_NAMESPACE") - if currentNamespace == "" { - log.Warn("Current Namespace variable is not defined, using the default value") - return "default" - } - return currentNamespace -} - -// StoreNodeIDs stores the nodeName-address pair in the map -func StoreNodeIDs(nodeName, id string) { - // check if the nodeIDsMap has been initialized - if nodeIDsMap == nil { - nodeIDsMap = make(map[string]string) - } - nodeIDsMap[nodeName] = id -} - -// GetAllIDs returns the nodeIDsMap -func GetAllIDs() map[string]string { - return nodeIDsMap -} - -// validateNode checks if an input node is available in the config. -func validateNode(n string, cfg config.MutualPeersConfig) (bool, string, string) { - // check if the node received by the request is on the list, if so, we - // continue the process - for _, mutualPeer := range cfg.MutualPeers { - for _, peer := range mutualPeer.Peers { - if peer.NodeName == n { - log.Info("Pod found in the config, executing remote command...") - return true, peer.NodeName, peer.ContainerName - } - } - } - - return false, "", "" -} - -// GenerateList generates a list of matching pods based on the configured NodeName values. -func GenerateList(cfg config.MutualPeersConfig) []string { - // matchingPods Stores the matching pods. - var matchingPods []string - - clusterConfig, err := rest.InClusterConfig() - if err != nil { - log.Fatalf("Error %v", err) - } - // creates the clientset - clientset, err := kubernetes.NewForConfig(clusterConfig) - if err != nil { - log.Fatalf("Error %v", err) - } - - log.Info("Namespace: ", GetCurrentNamespace()) - - // get pods in the current namespace - pods, err := clientset.CoreV1().Pods(GetCurrentNamespace()).List(context.TODO(), metav1.ListOptions{}) - if err != nil { - log.Error("Failed to get pods:", err) - } - - log.Info("There are ", len(pods.Items), " pods in the namespace") - - // Check if the pod names match the configured NodeName values - for _, pod := range pods.Items { - podName := pod.Name - for _, mutualPeer := range cfg.MutualPeers { - for _, peer := range mutualPeer.Peers { - if podName == peer.NodeName { - log.Info("Pod matches the name: ", pod.Name, " ", peer.NodeName) - matchingPods = append(matchingPods, podName) - } - } - } - } - - // matchingPods Stores the matching pods. - return matchingPods -} - -// GenerateTrustedPeersAddr handles the HTTP request to generate trusted peers' addresses. -func GenerateTrustedPeersAddr(cfg config.MutualPeersConfig, pod string) (string, error) { - // get the command - command := CreateTrustedPeerCommand() - - // validate if the node received is ok - ok, pod, cont := validateNode(pod, cfg) - if !ok { - log.Error("Pod name not valid", pod) - return "", errors.New("Pod name not valid...") - } - - log.Info("Pod found: ", pod, " ", cont, " ", GetCurrentNamespace()) - - output, err := RunRemoteCommand( - pod, - cont, - GetCurrentNamespace(), - command) - if err != nil { - log.Error("Error executing remote command: ", err) - return "", err - } - - // Registering metric - m := metrics.MultiAddrs{ - ServiceName: "torch", - NodeName: pod, - MultiAddr: output, - Namespace: GetCurrentNamespace(), - Value: 1, - } - RegisterMetric(m) - - return output, nil -} - -// GenerateAllTrustedPeersAddr handles the HTTP request to generate trusted peers' addresses. -func GenerateAllTrustedPeersAddr(cfg config.MutualPeersConfig, pod []string) (map[string]string, error) { - // get the command - command := CreateTrustedPeerCommand() - - // Create a map to store the pod names - podMap := make(map[string]bool) - - // Add the pod names to the map - for _, p := range pod { - podMap[p] = true - } - - var wg sync.WaitGroup - - for _, mutualPeer := range cfg.MutualPeers { - for _, peer := range mutualPeer.Peers { - if _, exists := podMap[peer.NodeName]; exists { - wg.Add(1) - go func(peer config.Peer) { - defer wg.Done() - - output, err := RunRemoteCommand( - peer.NodeName, - peer.ContainerName, - GetCurrentNamespace(), - command) - if err != nil { - log.Error("Error executing remote command: ", err) - // Handle the error or add it to a shared error channel - return - } - - StoreNodeIDs(peer.NodeName, output) - - m := metrics.MultiAddrs{ - ServiceName: "torch", - NodeName: peer.NodeName, - MultiAddr: output, - Namespace: GetCurrentNamespace(), - Value: 1, - } - RegisterMetric(m) - }(peer) - } - } - } - - wg.Wait() - - // generate the data on the nodes by calling BulkTrusteedPeers - for _, mutualPeer := range cfg.MutualPeers { - for _, peer := range mutualPeer.Peers { - // Check if the peer's NodeName is present in the podMap - if _, exists := podMap[peer.NodeName]; exists { - log.Info("Generating config for node:", peer.NodeName) - BulkTrustedPeers(*mutualPeer) - break // Skip to the next mutualPeer - } - } - } - - return nodeIDsMap, nil -} - -func BulkTrustedPeers(pods config.MutualPeer) { - // Get the data from the map containing trusted peers' addresses - data := GetAllIDs() - - // Create a channel to collect errors from goroutines - errCh := make(chan error) - // Use a WaitGroup to wait for all goroutines to finish - var wg sync.WaitGroup - - // Loop through the trusted peers' addresses in the data map - for key := range data { - for _, pod := range pods.Peers { - // Skip if the current trusted peer's address matches the current pod's NodeName - if key != pod.NodeName { - wg.Add(1) - // Launch a goroutine to execute the remote command for the current pod - go func(peer config.Peer) { - defer wg.Done() - - // Generate the command to get trusted peers' addresses for the current pod - command := BulkTrustedPeerCommand(data[key], pods) - - // Execute the remote command to get trusted peers' addresses - output, err := RunRemoteCommand( - peer.NodeName, - peer.ContainerName, - GetCurrentNamespace(), - command) - if err != nil { - // If an error occurs, send it to the error channel - errCh <- err - return - } - log.Info("OUTPUT: ", output) - - // Generate the metrics with the MultiAddrs - m := metrics.MultiAddrs{ - ServiceName: "torch", - NodeName: peer.NodeName, - MultiAddr: output, - Namespace: GetCurrentNamespace(), - Value: 1, - } - RegisterMetric(m) - }(pod) - } - } - } - - // Close the error channel after all goroutines finish - go func() { - wg.Wait() - close(errCh) - }() - - // Collect errors from the error channel and log them - for err := range errCh { - log.Error("Error executing remote command: ", err) - } -} - -// GenesisHash -func GenesisHash(pods config.MutualPeersConfig) (string, string) { - consensusNode := pods.MutualPeers[0].ConsensusNode - url := fmt.Sprintf("http://%s:26657/block?height=1", consensusNode) - - response, err := http.Get(url) - if err != nil { - log.Error("Error making GET request:", err) - return "", "" - } - defer response.Body.Close() - - if response.StatusCode != http.StatusOK { - log.Error("Non-OK response:", response.Status) - return "", "" - } - - bodyBytes, err := ioutil.ReadAll(response.Body) - if err != nil { - log.Error("Error reading response body:", err) - return "", "" - } - - bodyString := string(bodyBytes) - log.Info("Response Body:", bodyString) - - // Parse the JSON response into a generic map - var jsonResponse map[string]interface{} - err = json.Unmarshal([]byte(bodyString), &jsonResponse) - if err != nil { - log.Error("Error parsing JSON:", err) - return "", "" - } - - // Access and print the .block_id.hash field - blockIDHash, ok := jsonResponse["result"].(map[string]interface{})["block_id"].(map[string]interface{})["hash"].(string) - if !ok { - log.Error("Unable to access .block_id.hash") - return "", "" - } - - // Access and print the .block.header.time field - blockTime, ok := jsonResponse["result"].(map[string]interface{})["block"].(map[string]interface{})["header"].(map[string]interface{})["time"].(string) - if !ok { - log.Error("Unable to access .block.header.time") - return "", "" - } - - log.Info("Block ID Hash: ", blockIDHash) - log.Info("Block Time: ", blockTime) - log.Info("Full output: ", bodyString) - - return blockIDHash, blockTime -} - -// RunRemoteCommand executes a remote command on the specified node. -func RunRemoteCommand(nodeName, container, namespace string, command []string) (string, error) { - clusterConfig, err := rest.InClusterConfig() - if err != nil { - log.Error("Error: ", err.Error()) - } - // creates the clientset - clientset, err := kubernetes.NewForConfig(clusterConfig) - if err != nil { - log.Fatalf("Error: %v", err.Error()) - } - - // Create a request to execute the command on the specified node. - req := clientset.CoreV1().RESTClient().Post(). - Resource("pods"). - Name(nodeName). - Namespace(namespace). - SubResource("exec"). - VersionedParams(&v1.PodExecOptions{ - Command: command, - Container: container, - Stdin: false, - Stdout: true, - Stderr: true, - TTY: false, - }, scheme.ParameterCodec) - - // Execute the remote command. - output, err := executeCommand(clusterConfig, req) - if err != nil { - log.Error("failed to execute remote command: ", err) - } - - return output, nil -} - -// executeCommand executes the remote command using the provided configuration, request, and output writer. -func executeCommand(config *rest.Config, req *rest.Request) (string, error) { - executor, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL()) - if err != nil { - log.Error("failed to create SPDY executor: ", err) - } - - // Prepare the standard I/O streams. - var stdout, stderr bytes.Buffer - - // Execute the remote command and capture the output. - err = executor.Stream(remotecommand.StreamOptions{ - Stdout: &stdout, - Stderr: &stderr, - Tty: false, - }) - if err != nil { - log.Error("failed to execute command stream: ", err) - } - - return stdout.String(), nil -} diff --git a/pkg/k8s/scripts.go b/pkg/k8s/scripts.go new file mode 100644 index 0000000..65b4a24 --- /dev/null +++ b/pkg/k8s/scripts.go @@ -0,0 +1,77 @@ +package k8s + +import ( + "fmt" +) + +var ( + trustedPeerFile = "/tmp/TP-ADDR" + trustedPeerFileConsensus = "/home/celestia/config/TP-ADDR" + trustedPeerFileDA = "/tmp/CONSENSUS_NODE_SERVICE" + nodeIpFile = "/tmp/NODE_IP" + cmd = `$(ifconfig | grep -oE 'inet addr:([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)' | grep -v '127.0.0.1' | awk '{print substr($2, 6)}')` + trustedPeerPrefix = "/ip4/" + cmd + "/tcp/2121/p2p/" +) + +// CreateFileWithEnvVar creates the file in the FS with the node to connect. +func CreateFileWithEnvVar(nodeToFile, nodeType string) []string { + f := "" + if nodeType == "consensus" { + f = trustedPeerFileConsensus + } + if nodeType == "da" { + f = trustedPeerFileDA + } + + script := fmt.Sprintf(` +#!/bin/sh +echo -n "%[2]s" > "%[1]s"`, f, nodeToFile) + + return []string{"sh", "-c", script} +} + +// CreateTrustedPeerCommand generates the command for creating trusted peers. +// we have to use the shell script because we can only get the token and the +// nodeID from the node itself. +func CreateTrustedPeerCommand() []string { + script := fmt.Sprintf(` +#!/bin/sh +# generate the token +export AUTHTOKEN=$(celestia bridge auth admin --node.store /home/celestia) + +# remove the first warning line... +export AUTHTOKEN=$(echo $AUTHTOKEN|rev|cut -d' ' -f1|rev) + +# make the request and parse the response +TP_ADDR=$(wget --header="Authorization: Bearer $AUTHTOKEN" \ + --header="Content-Type: application/json" \ + --post-data='{"jsonrpc":"2.0","id":0,"method":"p2p.Info","params":[]}' \ + --output-document - \ + http://localhost:26658 | grep -o '"ID":"[^"]*"' | sed 's/"ID":"\([^"]*\)"/\1/') + +echo -n "${TP_ADDR}" >> "%[1]s" +cat "%[1]s" +`, trustedPeerFile, trustedPeerPrefix) + + return []string{"sh", "-c", script} +} + +// GetNodeIP adds the node IP to a file. +func GetNodeIP() []string { + script := fmt.Sprintf(` +#!/bin/sh +echo -n "%[2]s" > "%[1]s" +cat "%[1]s"`, nodeIpFile, trustedPeerPrefix) + + return []string{"sh", "-c", script} +} + +// WriteToFile writes content into a file. +func WriteToFile(content, file string) []string { + script := fmt.Sprintf(` +#!/bin/sh +echo -n "%[1]s" > "%[2]s" +cat "%[2]s"`, content, file) + + return []string{"sh", "-c", script} +} diff --git a/pkg/k8s/scripts_test.go b/pkg/k8s/scripts_test.go new file mode 100644 index 0000000..c465af8 --- /dev/null +++ b/pkg/k8s/scripts_test.go @@ -0,0 +1,154 @@ +package k8s + +import ( + "reflect" + "testing" +) + +// case1 common message. +const case1 = "Case 1: Successfully script generated." + +// TestCreateFileWithEnvVar validates the node types and their path +func TestCreateFileWithEnvVar(t *testing.T) { + type args struct { + nodeToFile string + nodeType string + } + tests := []struct { + name string + args args + want []string + }{ + { + name: "Case 1: Check [consensus] nodes", + args: args{ + nodeToFile: "/home/celestia/config/TP-ADDR", + nodeType: "consensus", + }, + want: []string{"sh", "-c", ` +#!/bin/sh +echo -n "/home/celestia/config/TP-ADDR" > "/home/celestia/config/TP-ADDR"`}, + }, + { + name: "Case 2: Check [da] nodes", + args: args{ + nodeToFile: "/tmp/CONSENSUS_NODE_SERVICE", + nodeType: "da", + }, + want: []string{"sh", "-c", ` +#!/bin/sh +echo -n "/tmp/CONSENSUS_NODE_SERVICE" > "/tmp/CONSENSUS_NODE_SERVICE"`}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := CreateFileWithEnvVar(tt.args.nodeToFile, tt.args.nodeType); !reflect.DeepEqual(got, tt.want) { + t.Errorf("CreateFileWithEnvVar() = got: \n%v, \nwant \n%v", got, tt.want) + } + }) + } +} + +// TestCreateTrustedPeerCommand checks the script to generate the multi address. +func TestCreateTrustedPeerCommand(t *testing.T) { + tests := []struct { + name string + want []string + }{ + { + name: case1, + want: []string{"sh", "-c", ` +#!/bin/sh +# generate the token +export AUTHTOKEN=$(celestia bridge auth admin --node.store /home/celestia) + +# remove the first warning line... +export AUTHTOKEN=$(echo $AUTHTOKEN|rev|cut -d' ' -f1|rev) + +# make the request and parse the response +TP_ADDR=$(wget --header="Authorization: Bearer $AUTHTOKEN" \ + --header="Content-Type: application/json" \ + --post-data='{"jsonrpc":"2.0","id":0,"method":"p2p.Info","params":[]}' \ + --output-document - \ + http://localhost:26658 | grep -o '"ID":"[^"]*"' | sed 's/"ID":"\([^"]*\)"/\1/') + +echo -n "${TP_ADDR}" >> "/tmp/TP-ADDR" +cat "/tmp/TP-ADDR" +`}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := CreateTrustedPeerCommand(); !reflect.DeepEqual(got, tt.want) { + t.Errorf("CreateTrustedPeerCommand() = %v, want %v", got, tt.want) + } + }) + } +} + +// TestGetNodeIP gets the IP of the node and add it to a file +func TestGetNodeIP(t *testing.T) { + tests := []struct { + name string + want []string + }{ + { + name: case1, + want: []string{"sh", "-c", ` +#!/bin/sh +echo -n "/ip4/$(ifconfig | grep -oE 'inet addr:([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)' | grep -v '127.0.0.1' | awk '{print substr($2, 6)}')/tcp/2121/p2p/" > "/tmp/NODE_IP" +cat "/tmp/NODE_IP"`}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := GetNodeIP(); !reflect.DeepEqual(got, tt.want) { + t.Errorf("GetNodeIP() = got: \n%v, \nwant: \n%v", got, tt.want) + } + }) + } +} + +// TestWriteToFile writes content to a file +func TestWriteToFile(t *testing.T) { + type args struct { + content string + file string + } + tests := []struct { + name string + args args + want []string + }{ + { + name: case1, + args: args{ + content: "THIS IS A TEST", + file: "/tmp/test_file", + }, + want: []string{"sh", "-c", ` +#!/bin/sh +echo -n "THIS IS A TEST" > "/tmp/test_file" +cat "/tmp/test_file"`}, + }, + { + name: "Case 2: Successfully script generated.", + args: args{ + content: "content in file", + file: "/tmp/file_with_content", + }, + want: []string{"sh", "-c", ` +#!/bin/sh +echo -n "content in file" > "/tmp/file_with_content" +cat "/tmp/file_with_content"`}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := WriteToFile(tt.args.content, tt.args.file); !reflect.DeepEqual(got, tt.want) { + t.Errorf("WriteToFile() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/k8s/statefulsets.go b/pkg/k8s/statefulsets.go new file mode 100644 index 0000000..b556847 --- /dev/null +++ b/pkg/k8s/statefulsets.go @@ -0,0 +1,60 @@ +package k8s + +import ( + "context" + "strings" + + log "github.com/sirupsen/logrus" + v1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + + "github.com/celestiaorg/torch/pkg/db/redis" +) + +const queueK8SNodes = "k8s" + +// WatchStatefulSets watches for changes to the StatefulSets in the specified namespace and updates the metrics accordingly +func WatchStatefulSets() error { + // namespace get the current namespace where torch is running + namespace := GetCurrentNamespace() + // Authentication in cluster - using Service Account, Role, RoleBinding + cfg, err := rest.InClusterConfig() + if err != nil { + log.Error("Error: ", err) + return err + } + + // Create the Kubernetes clientSet + clientSet, err := kubernetes.NewForConfig(cfg) + if err != nil { + log.Error("Error: ", err) + return err + } + + // Create a StatefulSet watcher + watcher, err := clientSet.AppsV1().StatefulSets(namespace).Watch(context.Background(), metav1.ListOptions{}) + if err != nil { + log.Error("Error: ", err) + return err + } + + // Watch for events on the watcher channel + for event := range watcher.ResultChan() { + if statefulSet, ok := event.Object.(*v1.StatefulSet); ok { + //log.Info("StatefulSet containers: ", statefulSet.Spec.Template.Spec.Containers) + + // check if the node is DA, if so, send it to the queue to generate the multi address + if strings.HasPrefix(statefulSet.Name, "da") { + err := redis.Producer(statefulSet.Name, queueK8SNodes) + if err != nil { + log.Error("ERROR adding the node to the queue: ", err) + return err + } + } + } + } + + return nil +} diff --git a/pkg/k8s/utils.go b/pkg/k8s/utils.go new file mode 100644 index 0000000..6c15ef0 --- /dev/null +++ b/pkg/k8s/utils.go @@ -0,0 +1,19 @@ +package k8s + +import ( + "os" + + log "github.com/sirupsen/logrus" +) + +// GetCurrentNamespace gets the current namespace from the environment variable. +// If the variable is not defined, the default value "default" is used. +func GetCurrentNamespace() string { + // currentNamespace Stores the current namespace. + currentNamespace := os.Getenv("POD_NAMESPACE") + if currentNamespace == "" { + log.Warn("Current Namespace variable is not defined, using the default value") + return "default" + } + return currentNamespace +} diff --git a/pkg/metrics/config.go b/pkg/metrics/config.go index 2d4f2a0..369c35e 100644 --- a/pkg/metrics/config.go +++ b/pkg/metrics/config.go @@ -10,6 +10,7 @@ import ( // InitConfig initializes the configs Prometheus - OTEL func InitConfig() error { // Initialize the Prometheus exporter + log.Info("Initializing Prometheus client...") exporter, err := prometheus.New() if err != nil { log.Fatal(err) @@ -17,6 +18,7 @@ func InitConfig() error { } provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(exporter)) + log.Info("Initializing OTEL Provider...") otel.SetMeterProvider(provider) return nil diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index e82ea51..ba712ce 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -2,11 +2,9 @@ package metrics import ( "context" - "fmt" "time" log "github.com/sirupsen/logrus" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -15,19 +13,19 @@ import ( // Get the meter from the global meter provider with the name "torch". var meter = otel.GetMeterProvider().Meter("torch") -// MultiAddrs represents the information for a multiaddress. +// MultiAddrs represents the information for a Multi Addresses. type MultiAddrs struct { - ServiceName string // ServiceName Name of the service associated with the multiaddress. + ServiceName string // ServiceName Name of the service associated with the Multi Addresses. NodeName string // NodeName Name of the node. - MultiAddr string // MultiAddr Multiaddress value. + MultiAddr string // MultiAddr Multi Addresses value. Namespace string // Namespace where the service is deployed. - Value float64 // Value to be observed for the multiaddress. + Value float64 // Value to be observed for the Multi Addresses. } -// WithMetricsMultiAddress creates a callback function to observe metrics for multiple multiaddresses. +// WithMetricsMultiAddress creates a callback function to observe metrics for multiple Multi Addresses. func WithMetricsMultiAddress(multiAddrs []MultiAddrs) error { log.Info("registering metric: ", multiAddrs) - // Create a Float64ObservableGauge named "multiaddress" with a description for the metric. + // Create a Float64ObservableGauge named "Multi Addresses" with a description for the metric. multiAddressesGauge, err := meter.Float64ObservableGauge( "multiaddr", metric.WithDescription("Torch - MultiAddresses"), @@ -40,14 +38,14 @@ func WithMetricsMultiAddress(multiAddrs []MultiAddrs) error { // Define the callback function that will be called periodically to observe metrics. callback := func(ctx context.Context, observer metric.Observer) error { for _, ma := range multiAddrs { - // Create labels with attributes for each multiaddress. + // Create labels with attributes for each Multi Addresses. labels := metric.WithAttributes( attribute.String("service_name", ma.ServiceName), attribute.String("node_name", ma.NodeName), attribute.String("multiaddress", ma.MultiAddr), attribute.String("namespace", ma.Namespace), ) - // Observe the float64 value for the current multiaddress with the associated labels. + // Observe the float64 value for the current Multi Addresses with the associated labels. observer.ObserveFloat64(multiAddressesGauge, ma.Value, labels) } @@ -61,9 +59,9 @@ func WithMetricsMultiAddress(multiAddrs []MultiAddrs) error { // BlockHeight represents the information for the block height 1. type BlockHeight struct { - ServiceName string // ServiceName Name of the service associated with the multiaddress. - BlockHeight string // Namespace where the service is deployed. - Value float64 // Value to be observed for the multiaddress. + ServiceName string // ServiceName Name of the service associated with the multi-address. + BlockHeight string // BlockHeight height of the block. + Value float64 // Value to be observed for the multi-address. } // WithMetricsBlockHeight creates a callback function to observe metrics for block_height_1. @@ -100,11 +98,12 @@ func WithMetricsBlockHeight(blockHeight, earliestBlockTime, serviceName, namespa return err } +// CalculateDaysDifference based on the date received, returns the number of days since this day. func CalculateDaysDifference(inputTimeString string) int { layout := "2006-01-02T15:04:05.999999999Z" inputTime, err := time.Parse(layout, inputTimeString) if err != nil { - fmt.Println("Error parsing time:", err) + log.Error("Error parsing time: [", inputTimeString, "]", err) return -1 } diff --git a/pkg/k8s/metrics.go b/pkg/metrics/register.go similarity index 70% rename from pkg/k8s/metrics.go rename to pkg/metrics/register.go index dcf57cd..149ec3e 100644 --- a/pkg/k8s/metrics.go +++ b/pkg/metrics/register.go @@ -1,12 +1,11 @@ -package k8s +package metrics import ( - "github.com/jrmanes/torch/pkg/metrics" log "github.com/sirupsen/logrus" ) -// Declare a slice to hold multiple MultiAddrs metrics. -var multiAddresses []metrics.MultiAddrs +// Declare a slice to hold multiple Multi Addresses metrics. +var multiAddresses []MultiAddrs // MultiAddrExists checks if a given MultiAddr already exists in the multiAddresses slice. // It returns true if the MultiAddr already exists, and false otherwise. @@ -20,11 +19,11 @@ func MultiAddrExists(multiAddr string) bool { return false } -// RegisterMetric adds a new MultiAddrs metric to the multiAddresses slice. +// RegisterMetric adds a new Multi Addresses metric to the multiAddresses slice. // Before adding, it checks if the MultiAddr already exists in the slice using MultiAddrExists function. // If the MultiAddr already exists, it logs a message and skips the addition. -// Otherwise, it appends the new MultiAddrs to the slice and registers the updated metrics. -func RegisterMetric(m metrics.MultiAddrs) { +// Otherwise, it appends the new Multi Addresses to the slice and registers the updated metrics. +func RegisterMetric(m MultiAddrs) { // Check if the MultiAddr already exists in the array if MultiAddrExists(m.MultiAddr) { log.Info("MultiAddr already exists in the metrics array: ", m.NodeName, " ", m.MultiAddr) @@ -35,7 +34,7 @@ func RegisterMetric(m metrics.MultiAddrs) { multiAddresses = append(multiAddresses, m) // Register the metric - err := metrics.WithMetricsMultiAddress(multiAddresses) + err := WithMetricsMultiAddress(multiAddresses) if err != nil { log.Printf("Failed to update metrics: %v", err) } diff --git a/pkg/nodes/consensus.go b/pkg/nodes/consensus.go new file mode 100644 index 0000000..03d3a18 --- /dev/null +++ b/pkg/nodes/consensus.go @@ -0,0 +1,89 @@ +package nodes + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + + log "github.com/sirupsen/logrus" + + "github.com/celestiaorg/torch/config" + "github.com/celestiaorg/torch/pkg/k8s" +) + +var ( + consContainerSetupName = "consensus-setup" // consContainerSetupName initContainer that we use to configure the nodes. + consContainerName = "consensus" // consContainerName container name which the pod runs. + namespace = k8s.GetCurrentNamespace() // ns namespace of the node. +) + +// SetConsNodeDefault sets all the default values in case they are empty +func SetConsNodeDefault(peer config.Peer) config.Peer { + if peer.ContainerSetupName == "" { + peer.ContainerSetupName = consContainerSetupName + } + if peer.ContainerName == "" { + peer.ContainerName = consContainerName + } + if peer.Namespace == "" { + peer.Namespace = namespace + } + return peer +} + +// GenesisHash connects to the node specified in: config.MutualPeersConfig.ConsensusNode +// makes a request to the API and gets the info about the genesis and return it +func GenesisHash(pods config.MutualPeersConfig) (string, string) { + consensusNode := pods.MutualPeers[0].ConsensusNode + url := fmt.Sprintf("http://%s:26657/block?height=1", consensusNode) + + response, err := http.Get(url) + if err != nil { + log.Error("Error making GET request:", err) + return "", "" + } + defer response.Body.Close() + + if response.StatusCode != http.StatusOK { + log.Error("Non-OK response:", response.Status) + return "", "" + } + + bodyBytes, err := ioutil.ReadAll(response.Body) + if err != nil { + log.Error("Error reading response body:", err) + return "", "" + } + + bodyString := string(bodyBytes) + log.Info("Response Body: ", bodyString) + + // Parse the JSON response into a generic map + var jsonResponse map[string]interface{} + err = json.Unmarshal([]byte(bodyString), &jsonResponse) + if err != nil { + log.Error("Error parsing JSON:", err) + return "", "" + } + + // Access and print the .block_id.hash field + blockIDHash, ok := jsonResponse["result"].(map[string]interface{})["block_id"].(map[string]interface{})["hash"].(string) + if !ok { + log.Error("Unable to access .block_id.hash") + return "", "" + } + + // Access and print the .block.header.time field + blockTime, ok := jsonResponse["result"].(map[string]interface{})["block"].(map[string]interface{})["header"].(map[string]interface{})["time"].(string) + if !ok { + log.Error("Unable to access .block.header.time") + return "", "" + } + + log.Info("Block ID Hash: ", blockIDHash) + log.Info("Block Time: ", blockTime) + log.Info("Full output: ", bodyString) + + return blockIDHash, blockTime +} diff --git a/pkg/nodes/consensus_test.go b/pkg/nodes/consensus_test.go new file mode 100644 index 0000000..56a1bf8 --- /dev/null +++ b/pkg/nodes/consensus_test.go @@ -0,0 +1,65 @@ +package nodes + +import ( + "reflect" + "testing" + + "github.com/celestiaorg/torch/config" +) + +func TestSetConsNodeDefault(t *testing.T) { + type args struct { + peer config.Peer + } + tests := []struct { + name string + args args + want config.Peer + }{ + { + name: "Case 1: Tests default values", + args: args{ + peer: config.Peer{ + NodeName: "consensus-full-1", + NodeType: "consensus", + }, + }, + want: config.Peer{ + NodeName: "consensus-full-1", + NodeType: "consensus", + ContainerName: "consensus", + ContainerSetupName: "consensus-setup", + ConnectsAsEnvVar: false, + ConnectsTo: nil, + DnsConnections: nil, + }, + }, + { + name: "Case 2: Tests default values already specified", + args: args{ + peer: config.Peer{ + NodeName: "consensus-full-1", + NodeType: "consensus", + ContainerName: "consensus", + ContainerSetupName: "consensus-setup", + }, + }, + want: config.Peer{ + NodeName: "consensus-full-1", + NodeType: "consensus", + ContainerName: "consensus", + ContainerSetupName: "consensus-setup", + ConnectsAsEnvVar: false, + ConnectsTo: nil, + DnsConnections: nil, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := SetConsNodeDefault(tt.args.peer); !reflect.DeepEqual(got, tt.want) { + t.Errorf("SetConsNodeDefault() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/nodes/da.go b/pkg/nodes/da.go new file mode 100644 index 0000000..bb730a9 --- /dev/null +++ b/pkg/nodes/da.go @@ -0,0 +1,230 @@ +package nodes + +import ( + "context" + "errors" + "fmt" + "strings" + "time" + + log "github.com/sirupsen/logrus" + + "github.com/celestiaorg/torch/config" + "github.com/celestiaorg/torch/pkg/db/redis" + "github.com/celestiaorg/torch/pkg/k8s" + "github.com/celestiaorg/torch/pkg/metrics" +) + +const ( + errRemoteCommand = "Error executing remote command: " + timeoutDuration = 60 * time.Second // timeoutDuration we specify the max time to run the func. + nodeIdMaxLength = 52 // nodeIdMaxLength Specify the max length for the nodes ids. +) + +var ( + daContainerSetupName = "da-setup" // daContainerSetupName initContainer that we use to configure the nodes. + daContainerName = "da" // daContainerName container name which the pod runs. + fPathDA = "/tmp/celestia-config/TP-ADDR" // fPathDA path to the file where Torch will write. + ns = k8s.GetCurrentNamespace() // ns namespace of the node. +) + +// SetDaNodeDefault sets all the default values in case they are empty +func SetDaNodeDefault(peer config.Peer) config.Peer { + if peer.ContainerSetupName == "" { + peer.ContainerSetupName = daContainerSetupName + } + if peer.ContainerName == "" { + peer.ContainerName = daContainerName + } + if peer.Namespace == "" { + peer.Namespace = ns + } + return peer +} + +// SetupDANodeWithConnections configure a DA node with connections +func SetupDANodeWithConnections(peer config.Peer) error { + red := redis.InitRedisConfig() + // Create a new context with a timeout + ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration) + connString := "" + addPrefix := true + + // Make sure to call the cancel function to release resources when you're done + defer cancel() + + // read the connection list + for index, nodeName := range peer.ConnectsTo { + log.Info(peer.NodeName, " , connection: [", index, "] to node: [", nodeName, "]") + + // checking the node in the DB first + ma, err := redis.CheckIfNodeExistsInDB(red, ctx, nodeName) + if err != nil { + log.Error("Error CheckIfNodeExistsInDB for full-node: [", peer.NodeName, "]", err) + return err + } + + // check if the MA is already in the config + ma, addPrefix = VerifyAndUpdateMultiAddress(peer, index, ma, addPrefix) + + // if the node is not in the db, then we generate it + if ma == "" { + log.Info("Node ", "["+nodeName+"]"+" NOT found in DB, let'nodeName generate it") + ma, err = GenerateNodeIdAndSaveIt(peer, peer.ConnectsTo[index], red, ctx) + if err != nil { + log.Error("Error GenerateNodeIdAndSaveIt for full-node: [", peer.NodeName, "]", err) + return err + } + } + + // if we have the address already, lets continue the process, otherwise, means we couldn't get the node id + if ma != "" && addPrefix { + // adding the node prefix + ma, err = SetIdPrefix(peer, ma, index) + if err != nil { + log.Error("Error SetIdPrefix for full-node: [", peer.NodeName, "]", err) + return err + } + log.Info("Peer connection prefix: ", ma) + } + + // check the connection index and concatenate it in case we have more than one node + if index > 0 { + connString = connString + "," + ma + } else { + connString = ma + } + + // validate the MA, must start with /ip4/ || /dns/ + if !strings.HasPrefix(ma, "/ip4/") && !strings.HasPrefix(ma, "/dns/") { + errorMessage := fmt.Sprintf("Error generating the MultiAddress, must begin with /ip4/ || /dns/: [%s]", ma) + log.Error(errorMessage) + return errors.New(errorMessage) + } + + log.Info("Registering metric for node: [", nodeName, "]") + + // Register a multi-address metric + m := metrics.MultiAddrs{ + ServiceName: "torch", + NodeName: nodeName, + MultiAddr: ma, + Namespace: peer.Namespace, + Value: 1, + } + metrics.RegisterMetric(m) + + // get the command to write in a file and execute the command against the node + command := k8s.WriteToFile(connString, fPathDA) + output, err := k8s.RunRemoteCommand( + peer.NodeName, + peer.ContainerSetupName, + k8s.GetCurrentNamespace(), + command) + if err != nil { + log.Error(errRemoteCommand, err) + return err + } + + log.Info("MultiAddr for node ", peer.NodeName, " is: [", output, "]") + + log.Info("Adding node to the queue: [", peer.NodeName, "]") + go AddToQueue(peer) + } + + return nil +} + +// VerifyAndUpdateMultiAddress checks if the configuration contains a Multi Address at the specified index +// and updates it if found. It returns the verified Multi Address and a boolean indicating if an update was performed. +func VerifyAndUpdateMultiAddress(peer config.Peer, index int, currentAddr string, addPrefix bool) (string, bool) { + // verify that we have the multi addr already specify in the config + if strings.Contains(peer.ConnectsTo[index], "dns") || strings.Contains(peer.ConnectsTo[index], "ip4") { + // Use the address from the configuration + currentAddr = peer.ConnectsTo[index] + addPrefix = false + } + return currentAddr, addPrefix +} + +// SetIdPrefix generates the prefix depending on dns or ip +func SetIdPrefix(peer config.Peer, c string, i int) (string, error) { + // check if we are using DNS or IP + if len(peer.DnsConnections) > 0 { + c = "/dns/" + peer.DnsConnections[i] + "/tcp/2121/p2p/" + c + } else { + comm := k8s.GetNodeIP() + output, err := k8s.RunRemoteCommand( + peer.ConnectsTo[i], + peer.ContainerName, + k8s.GetCurrentNamespace(), + comm) + if err != nil { + log.Error(errRemoteCommand, err) + return "", err + } + log.Info("command - ip is: ", output) + c = output + c + } + return c, nil +} + +// GenerateNodeIdAndSaveIt generates the node id and store it +func GenerateNodeIdAndSaveIt( + pod config.Peer, + connNode string, + red *redis.RedisClient, + ctx context.Context, +) (string, error) { + // Generate the command and run it against the connection node + it's running container + command := k8s.CreateTrustedPeerCommand() + output, err := k8s.RunRemoteCommand( + connNode, + pod.ContainerName, + k8s.GetCurrentNamespace(), + command) + if err != nil { + log.Error(errRemoteCommand, err) + return "", err + } + + // if the output of the generation is not empty, that means that we could generate the node id successfully, so let's + // store it into the DB. + if output != "" { + log.Info("Adding pod id to Redis: ", connNode, " [", output, "] ") + + // check that the node id generate has the right length + output, err = TruncateString(output, nodeIdMaxLength) + if err != nil { + log.Error("Error TruncateString: ", err) + return "", err + } + + // save node in redis + err = redis.SetNodeId(connNode, red, ctx, output) + if err != nil { + log.Error("Error SetNodeId: ", err) + return "", err + } + } else { + log.Error("Output is empty for pod: ", " [", pod.NodeName, "] ") + return "", err + } + + return output, nil +} + +// TruncateString receives and input and a maxLength and returns a string with the size specified. +func TruncateString(input string, maxLength int) (string, error) { + if len(input) == maxLength { + return input, nil + } + if len(input) < maxLength { + log.Error("Error: The node id received is not valid, too short: , ", " - [", len(input), "]", " - [", input, "]") + return input, errors.New("error: The node id received is not valid") + } + + log.Info("Node ID with bigger size found: ", input) + + return input[:maxLength], nil +} diff --git a/pkg/nodes/da_test.go b/pkg/nodes/da_test.go new file mode 100644 index 0000000..5c8ba32 --- /dev/null +++ b/pkg/nodes/da_test.go @@ -0,0 +1,210 @@ +package nodes + +import ( + "reflect" + "testing" + + "github.com/celestiaorg/torch/config" +) + +func TestHasAddrAlready(t *testing.T) { + t.Parallel() + type args struct { + peer config.Peer + i int + c string + addPrefix bool + } + tests := []struct { + name string + args args + want string + want1 bool + }{ + { + name: "Case 1.0: DNS - Multi address specified", + args: args{ + peer: config.Peer{ + NodeName: "da-full-1", + NodeType: "da", + ConnectsTo: []string{"/dns/da-bridge-1/tcp/2121/p2p/12D3KooWH1pTTJR5NXPYs2huVcJ9srmmiyGU4txHm2qgdaUVPYAw"}, + }, + i: 0, + c: "", + addPrefix: false, + }, + want: "/dns/da-bridge-1/tcp/2121/p2p/12D3KooWH1pTTJR5NXPYs2huVcJ9srmmiyGU4txHm2qgdaUVPYAw", + want1: false, + }, + { + name: "Case 1.1: IP - Multi address specified", + args: args{ + peer: config.Peer{ + NodeName: "da-full-1", + NodeType: "da", + ConnectsTo: []string{"/ip4/192.168.1.100/tcp/2121/p2p/12D3KooWH1pTTJR5NXPYs2huVcJ9srmmiyGU4txHm2qgdaUVPYAw"}, + }, + i: 0, + c: "", + addPrefix: false, + }, + want: "/ip4/192.168.1.100/tcp/2121/p2p/12D3KooWH1pTTJR5NXPYs2huVcJ9srmmiyGU4txHm2qgdaUVPYAw", + want1: false, + }, + { + name: "Case 2: No multi address specified - one node", + args: args{ + peer: config.Peer{ + NodeName: "da-full-1", + NodeType: "da", + ConnectsTo: []string{"da-bridge-1"}, + }, + i: 0, + c: "da-bridge-1", + addPrefix: false, + }, + want: "da-bridge-1", + want1: false, + }, + { + name: "Case 3: No multi address specified - more than one node", + args: args{ + peer: config.Peer{ + NodeName: "da-full-1", + NodeType: "da", + ConnectsTo: []string{"da-bridge-1", "da-bridge-2"}, + }, + i: 1, + c: "da-bridge-1,da-bridge-2", + addPrefix: false, + }, + want: "da-bridge-1,da-bridge-2", + want1: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, got1 := VerifyAndUpdateMultiAddress(tt.args.peer, tt.args.i, tt.args.c, tt.args.addPrefix) + if got != tt.want { + t.Errorf("VerifyAndUpdateMultiAddress() got = %v, want %v", got, tt.want) + } + if got1 != tt.want1 { + t.Errorf("VerifyAndUpdateMultiAddress() got1 = %v, want %v", got1, tt.want1) + } + }) + } +} + +func TestSetDaNodeDefault(t *testing.T) { + type args struct { + peer config.Peer + } + tests := []struct { + name string + args args + want config.Peer + }{ + { + name: "Case 1: Tests default values", + args: args{ + peer: config.Peer{ + NodeName: "da-full-1", + NodeType: "da", + }, + }, + want: config.Peer{ + NodeName: "da-full-1", + NodeType: "da", + ContainerName: "da", + ContainerSetupName: "da-setup", + ConnectsAsEnvVar: false, + ConnectsTo: nil, + DnsConnections: nil, + }, + }, + { + name: "Case 2: Tests default values already specified", + args: args{ + peer: config.Peer{ + NodeName: "da-bridge-1", + NodeType: "da", + ContainerName: "da", + ContainerSetupName: "da-setup", + }, + }, + want: config.Peer{ + NodeName: "da-bridge-1", + NodeType: "da", + ContainerName: "da", + ContainerSetupName: "da-setup", + ConnectsAsEnvVar: false, + ConnectsTo: nil, + DnsConnections: nil, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := SetDaNodeDefault(tt.args.peer); !reflect.DeepEqual(got, tt.want) { + t.Errorf("SetDaNodeDefault() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestTruncateString(t *testing.T) { + type args struct { + input string + maxLength int + } + tests := []struct { + name string + args args + want string + }{ + { + name: "Case 1: Tests valid node ID: 12D3KooWPB3thXCYyr6Jid49d5DDaRL63inzVagaQswCcgUARg5W - 52", + args: args{ + input: "12D3KooWPB3thXCYyr6Jid49d5DDaRL63inzVagaQswCcgUARg5W", + maxLength: 52, + }, + want: "12D3KooWPB3thXCYyr6Jid49d5DDaRL63inzVagaQswCcgUARg5W", + }, + { + name: "Case 2: Tests not valid node ID.", + args: args{ + input: "12D3KooWPB3thXCYyr6Jid49d5DDaRL63inzVagaQswCcgUARg5W12D3KooWPB3thXCYyr6Jid49d5DDaRL63inzVagaQswCcgUARg5W", + maxLength: 52, + }, + want: "12D3KooWPB3thXCYyr6Jid49d5DDaRL63inzVagaQswCcgUARg5W", + }, + { + name: "Case 3: Tests valid node ID: 12D3KooWMGSh8pLvQYn5zYcdRhVfNAcMZrDt71iyq6eSVtrgjKb8 - 52", + args: args{ + input: "12D3KooWMGSh8pLvQYn5zYcdRhVfNAcMZrDt71iyq6eSVtrgjKb8", + maxLength: 52, + }, + want: "12D3KooWMGSh8pLvQYn5zYcdRhVfNAcMZrDt71iyq6eSVtrgjKb8", + }, + { + name: "Case 4: Tests not valid node ID.", + args: args{ + input: "12D3KooWMGSh8pLvQYn5zYcdRhVfNAcMZrDt71iyq6eSVtrgjKb812D3KooWMGSh8pLvQYn5zYcdRhVfNAcMZrDt71iyq6eSVtrgjKb8", + maxLength: 52, + }, + want: "12D3KooWMGSh8pLvQYn5zYcdRhVfNAcMZrDt71iyq6eSVtrgjKb8", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got, err := TruncateString(tt.args.input, tt.args.maxLength); got != tt.want { + if err != nil { + t.Errorf("ERROR: TruncateString() = %v, want %v", got, tt.want) + } + t.Errorf("TruncateString() = %v, want %v", got, tt.want) + + } + }) + } +} diff --git a/pkg/nodes/nodes.go b/pkg/nodes/nodes.go new file mode 100644 index 0000000..6bf6f38 --- /dev/null +++ b/pkg/nodes/nodes.go @@ -0,0 +1,51 @@ +package nodes + +import ( + log "github.com/sirupsen/logrus" + + "github.com/celestiaorg/torch/config" + "github.com/celestiaorg/torch/pkg/k8s" +) + +type NodeAddress struct { + ID string + NodeName string +} + +// ValidateNode checks if a node received is available in the config, meaning that we can proceed to generate it is id. +// if not, we return an error and an empty node struct. +func ValidateNode(n string, cfg config.MutualPeersConfig) (bool, config.Peer) { + for _, mutualPeer := range cfg.MutualPeers { + for _, peer := range mutualPeer.Peers { + if peer.NodeName == n { + log.Info("Pod found in the config, executing remote command...") + return true, peer + } + } + } + + return false, config.Peer{} +} + +// SetupNodesEnvVarAndConnections configure the ENV vars for those nodes that needs to connect via ENV var +func SetupNodesEnvVarAndConnections(peer config.Peer, cfg config.MutualPeersConfig) error { + // Configure Consensus & DA - connecting using env var + _, err := k8s.RunRemoteCommand( + peer.NodeName, + peer.ContainerSetupName, + k8s.GetCurrentNamespace(), + k8s.CreateFileWithEnvVar(peer.ConnectsTo[0], peer.NodeType), + ) + if err != nil { + log.Error("Error executing remote command: ", err) + return err + } + + // check if the node is type DA, if so, add the node to the queue to generate the Multi Address later. + if peer.NodeType == "da" { + // we use the goroutine for that, otherwise, Torch tries to keep the connection opened. + go AddToQueue(peer) + } + + return nil +} diff --git a/pkg/nodes/nodes_consumer.go b/pkg/nodes/nodes_consumer.go new file mode 100644 index 0000000..24af937 --- /dev/null +++ b/pkg/nodes/nodes_consumer.go @@ -0,0 +1,108 @@ +package nodes + +import ( + "context" + "os" + "os/signal" + "syscall" + "time" + + "github.com/adjust/rmq/v5" + log "github.com/sirupsen/logrus" + + "github.com/celestiaorg/torch/config" + "github.com/celestiaorg/torch/pkg/db/redis" +) + +const ( + consumerName = "torch-consumer" // consumerName name used in the tag to identify the consumer. + prefetchLimit = 10 // prefetchLimit + pollDuration = 10 * time.Second // pollDuration how often is Torch going to pull data from the queue. + timeoutDurationConsumer = 60 * time.Second // timeoutDurationConsumer timeout for the consumer. +) + +// ConsumerInit initialize the process to check the queues in Redis. +func ConsumerInit(queueName string) { + errChan := make(chan error, 10) + go logErrors(errChan) + + red := redis.InitRedisConfig() + // Create a new context with a timeout + ctx, cancel := context.WithTimeout(context.Background(), timeoutDurationConsumer) + + // Make sure to call the cancel function to release resources when you're done + defer cancel() + + connection, err := rmq.OpenConnection( + "consumer", + "tcp", + redis.GetRedisFullURL(), + 2, + errChan, + ) + if err != nil { + log.Error("Error: ", err) + } + + queue, err := connection.OpenQueue(queueName) + if err != nil { + log.Error("Error: ", err) + } + + if err := queue.StartConsuming(prefetchLimit, pollDuration); err != nil { + log.Error("Error: ", err) + } + + _, err = queue.AddConsumerFunc(consumerName, func(delivery rmq.Delivery) { + log.Info("Performing task: ", delivery.Payload()) + peer := config.Peer{ + NodeName: delivery.Payload(), + NodeType: "da", + ContainerName: "da", + } + + // here we wil send the node to generate the id + err := CheckNodesInDBOrCreateThem(peer, red, ctx) + if err != nil { + log.Error("Error checking the nodes: CheckNodesInDBOrCreateThem - ", err) + } + + if err := delivery.Ack(); err != nil { + log.Error("Error: ", err) + } + }) + if err != nil { + log.Error("Error: ", err) + } + + signals := make(chan os.Signal, 1) + signal.Notify(signals, syscall.SIGINT) + defer signal.Stop(signals) + + <-signals // wait for signal + go func() { + <-signals // hard exit on second signal (in case shutdown gets stuck) + os.Exit(1) + }() + + <-connection.StopAllConsuming() // wait for all Consume() calls to finish +} + +func logErrors(errChan <-chan error) { + for err := range errChan { + switch err := err.(type) { + case *rmq.HeartbeatError: + if err.Count == rmq.HeartbeatErrorLimit { + log.Print("heartbeat error (limit): ", err) + } else { + log.Print("heartbeat error: ", err) + } + case *rmq.ConsumeError: + log.Print("consume error: ", err) + case *rmq.DeliveryError: + log.Print("delivery error: ", err.Delivery, err) + default: + log.Print("other error: ", err) + } + } +} diff --git a/pkg/nodes/nodes_test.go b/pkg/nodes/nodes_test.go new file mode 100644 index 0000000..44cba62 --- /dev/null +++ b/pkg/nodes/nodes_test.go @@ -0,0 +1,78 @@ +package nodes + +import ( + "reflect" + "testing" + + "github.com/celestiaorg/torch/config" +) + +func TestValidateNode(t *testing.T) { + type args struct { + n string + cfg config.MutualPeersConfig + } + + cfg := config.MutualPeersConfig{ + MutualPeers: []*config.MutualPeer{ + { + Peers: []config.Peer{ + {NodeName: "da-bridge-1"}, + {NodeName: "da-bridge-2"}, + }, + }, + { + Peers: []config.Peer{ + {NodeName: "da-bridge-3"}, + }, + }, + }, + } + + tests := []struct { + name string + args args + want bool + want1 config.Peer + }{ + { + name: "Case 1: Node exists in config", + args: args{ + n: "da-bridge-1", + cfg: cfg, + }, + want: true, + want1: config.Peer{NodeName: "da-bridge-1"}, + }, + { + name: "Case 2: Node exists in config", + args: args{ + n: "da-bridge-2", + cfg: cfg, + }, + want: true, + want1: config.Peer{NodeName: "da-bridge-2"}, + }, + { + name: "Case 1: Node DOES NOT exists in config", + args: args{ + n: "nonexistent_node", + cfg: cfg, + }, + want: false, + want1: config.Peer{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, got1 := ValidateNode(tt.args.n, tt.args.cfg) + if got != tt.want { + t.Errorf("ValidateNode() got = %v, want %v", got, tt.want) + } + if !reflect.DeepEqual(got1, tt.want1) { + t.Errorf("ValidateNode() got1 = %v, want %v", got1, tt.want1) + } + }) + } +} diff --git a/pkg/nodes/queue.go b/pkg/nodes/queue.go new file mode 100644 index 0000000..118365c --- /dev/null +++ b/pkg/nodes/queue.go @@ -0,0 +1,112 @@ +package nodes + +import ( + "context" + "time" + + log "github.com/sirupsen/logrus" + + "github.com/celestiaorg/torch/config" + "github.com/celestiaorg/torch/pkg/db/redis" + "github.com/celestiaorg/torch/pkg/metrics" +) + +var ( + taskQueue = make(chan config.Peer) // taskQueue channel for pending tasks (peers to process later). + MaxRetryCount = 5 // MaxRetryCount number of retries per node. + TickerTime = 5 * time.Second // TickerTime time specified to make a signal. + timeoutDurationProcessQueue = 60 * time.Second // timeoutDurationProcessQueue time specified to make a signal. +) + +// ProcessTaskQueue processes the pending tasks in the queue the time specified in the const TickerTime. +func ProcessTaskQueue() { + ticker := time.NewTicker(TickerTime) + + for { + select { + case <-ticker.C: + processQueue() + } + } +} + +// processQueue process the nodes in the queue and tries to generate the Multi Address +func processQueue() { + red := redis.InitRedisConfig() + // Create a new context with a timeout + ctx, cancel := context.WithTimeout(context.Background(), timeoutDurationProcessQueue) + + // Make sure to call the cancel function to release resources when you're done + defer cancel() + + for { + select { + case <-ctx.Done(): + log.Error("processQueue - The context has been canceled, exit the loop.") + return + case peer := <-taskQueue: + // TODO: + // errors should be returned back and go routines needs to be in errGroup instead of pure go + err := CheckNodesInDBOrCreateThem(peer, red, ctx) + if err != nil { + log.Error("Error checking the nodes: CheckNodesInDBOrCreateThem - ", err) + } + + default: + return + } + } +} + +// CheckNodesInDBOrCreateThem try to find the node in the DB, if the node is not in the DB, it tries to create it. +func CheckNodesInDBOrCreateThem(peer config.Peer, red *redis.RedisClient, ctx context.Context) error { + log.Info("Processing Node in the queue: ", "[", peer.NodeName, "]") + // check if the node is in the DB + ma, err := redis.CheckIfNodeExistsInDB(red, ctx, peer.NodeName) + if err != nil { + log.Error("Error CheckIfNodeExistsInDB for node: [", peer.NodeName, "]: ", err) + return err + } + + // if the node doesn't exist in the DB, let's try to create it + if ma == "" { + log.Info("Node ", "["+peer.NodeName+"]"+" NOT found in DB, let's try to generate it") + ma, err = GenerateNodeIdAndSaveIt(peer, peer.NodeName, red, ctx) + if err != nil { + log.Error("Error GenerateNodeIdAndSaveIt for full-node: [", peer.NodeName, "]", err) + } + return err + } + + // check if the multi address is empty after trying to generate it + if ma == "" { + // check if the node is still under the maximum number of retries + if peer.RetryCount < MaxRetryCount { + log.Info("Node ", "["+peer.NodeName+"]"+" NOT found in DB, adding it to the queue, attempt: ", "[", peer.RetryCount, "]") + peer.RetryCount++ // increment the counter + AddToQueue(peer) + } else { + log.Info("Max retry count reached for node: ", "[", peer.NodeName, "]", "it might have some issues...") + } + } else { + log.Info("Node ", "[", peer.NodeName, "]", " found in DB, ID: ", "[", ma, "]") + // Register a multi-address metric + m := metrics.MultiAddrs{ + ServiceName: "torch", + NodeName: peer.NodeName, + MultiAddr: ma, + Namespace: peer.Namespace, + Value: 1, + } + metrics.RegisterMetric(m) + } + + return nil +} + +// AddToQueue adds a function to add peers to the queue if necessary. +func AddToQueue(peer config.Peer) { + peer.RetryCount = 0 // set the first attempt + log.Info("Node added to the queue: ", peer) + taskQueue <- peer +}