Skip to content
This repository has been archived by the owner on Sep 30, 2024. It is now read-only.

Switch to k8s client-go #15486

Merged
merged 13 commits into from
Feb 12, 2021
121 changes: 57 additions & 64 deletions cmd/frontend/internal/app/debugproxies/scanner.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package debugproxies

import (
"context"
"errors"
"fmt"
"io/ioutil"
"strconv"
"strings"
"time"

"github.com/ericchiang/k8s"
corev1 "github.com/ericchiang/k8s/apis/core/v1"
"github.com/inconshreveable/log15"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
)

// Represents an endpoint
Expand All @@ -26,47 +30,21 @@ type Endpoint struct {
// ScanConsumer is the callback to consume scan results.
type ScanConsumer func([]Endpoint)

// Declares methods we use with k8s.Client. Useful to plug testing replacements or even logging middleware.
type kubernetesClient interface {
Watch(ctx context.Context, namespace string, r k8s.Resource, options ...k8s.Option) (*k8s.Watcher, error)
List(ctx context.Context, namespace string, resp k8s.ResourceList, options ...k8s.Option) error
Get(ctx context.Context, namespace, name string, resp k8s.Resource, options ...k8s.Option) error
Namespace() string
}

// "real" implementation that sends calls to the k8s.Client
type k8sClientImpl struct {
client *k8s.Client
}

func (kci *k8sClientImpl) Watch(ctx context.Context, namespace string, r k8s.Resource, options ...k8s.Option) (*k8s.Watcher, error) {
return kci.client.Watch(ctx, namespace, r, options...)
}

func (kci *k8sClientImpl) List(ctx context.Context, namespace string, resp k8s.ResourceList, options ...k8s.Option) error {
return kci.client.List(ctx, namespace, resp, options...)
}

func (kci *k8sClientImpl) Get(ctx context.Context, namespace, name string, resp k8s.Resource, options ...k8s.Option) error {
return kci.client.Get(ctx, namespace, name, resp, options...)
}

func (kci *k8sClientImpl) Namespace() string {
return kci.client.Namespace
}

// clusterScanner scans the cluster for endpoints belonging to services that have annotation sourcegraph.prometheus/scrape=true.
// It runs an event loop that reacts to changes to the endpoints set. Everytime there is a change it calls the ScanConsumer.
type clusterScanner struct {
client kubernetesClient
consume ScanConsumer
client v1.CoreV1Interface
namespace string
consume ScanConsumer
}

// Starts a cluster scanner with the specified client and consumer. Does not block.
func startClusterScannerWithClient(client kubernetesClient, consumer ScanConsumer) error {
func startClusterScannerWithClient(client *kubernetes.Clientset, ns string, consumer ScanConsumer) error {

cs := &clusterScanner{
client: client,
consume: consumer,
client: client.CoreV1(),
namespace: ns,
consume: consumer,
}

go cs.runEventLoop()
Expand All @@ -75,13 +53,18 @@ func startClusterScannerWithClient(client kubernetesClient, consumer ScanConsume

// Starts a cluster scanner with the specified consumer. Does not block.
func StartClusterScanner(consumer ScanConsumer) error {
client, err := k8s.NewInClusterClient()
config, err := rest.InClusterConfig()
if err != nil {
return err
}
ns := namespace()
// access to K8s clients
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return err
}

kci := &k8sClientImpl{client: client}
return startClusterScannerWithClient(kci, consumer)
return startClusterScannerWithClient(clientset, ns, consumer)
}

// Runs the k8s.Watch endpoints event loop, and triggers a rescan of cluster when something changes with endpoints.
Expand All @@ -102,21 +85,23 @@ func (cs *clusterScanner) runEventLoop() {
// watchEndpointEvents uses the k8s watch API operation to watch for endpoint events. Spins forever unless an error
// occurs that would necessitate creating a new watcher. The caller will then call again creating the new watcher.
func (cs *clusterScanner) watchEndpointEvents() (bool, error) {
watcher, err := cs.client.Watch(context.Background(), cs.client.Namespace(), new(corev1.Endpoints))

// TODO(Dax): Rewrite this to used NewSharedInformerFactory from k8s/client-go
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you may have to look at older versions of this repo, but I believe we used to use informer back when this package used client-go.


watcher, err := cs.client.Endpoints(metav1.NamespaceAll).Watch(metav1.ListOptions{})
if err != nil {
return false, fmt.Errorf("k8s client.Watch error: %w", err)
}
defer watcher.Close()
defer watcher.Stop()

for {
var eps corev1.Endpoints
eventType, err := watcher.Next(&eps)
event := <-watcher.ResultChan()
Comment on lines 97 to +98
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should be able to do for event := range watcher.ResultChan()

if err != nil {
// we need a new watcher
return true, fmt.Errorf("k8s watcher.Next error: %w", err)
}

if eventType == k8s.EventError {
if event.Type == watch.Error {
// we need a new watcher
return true, errors.New("error event")
}
Expand All @@ -128,66 +113,63 @@ func (cs *clusterScanner) watchEndpointEvents() (bool, error) {
// scanCluster looks for endpoints belonging to services that have annotation sourcegraph.prometheus/scrape=true.
// It derives the appropriate port from the prometheus.io/port annotation.
func (cs *clusterScanner) scanCluster() {
var services corev1.ServiceList

err := cs.client.List(context.Background(), cs.client.Namespace(), &services)
// Get services from all namespaces
services, err := cs.client.Services(cs.namespace).List(metav1.ListOptions{})
if err != nil {
log15.Error("k8s failed to list services", "error", err)
return
}

var scanResults []Endpoint

for _, svc := range services.Items {
svcName := *svc.Metadata.Name
svcName := svc.Name

// TODO(uwedeportivo): pgsql doesn't work, figure out why
if svcName == "pgsql" {
continue
}

if svc.Metadata.Annotations["sourcegraph.prometheus/scrape"] != "true" {
if svc.Annotations["sourcegraph.prometheus/scrape"] != "true" {
continue
}

var port int
if portStr := svc.Metadata.Annotations["prometheus.io/port"]; portStr != "" {
if portStr := svc.Annotations["prometheus.io/port"]; portStr != "" {
port, err = strconv.Atoi(portStr)
if err != nil {
log15.Debug("k8s prometheus.io/port annotation for service is not an integer", "service", svcName, "port", portStr)
continue
}
}

var endpoints corev1.Endpoints
err = cs.client.Get(context.Background(), cs.client.Namespace(), svcName, &endpoints)
endpoints, err := cs.client.Endpoints(cs.namespace).Get(svcName, metav1.GetOptions{})
if err != nil {
log15.Error("k8s failed to get endpoints", "error", err)
return
}

for _, subset := range endpoints.Subsets {
var ports []int
if port != 0 {
ports = []int{port}
} else {
for _, port := range subset.GetPorts() {
ports = append(ports, int(port.GetPort()))
for _, port := range subset.Ports {
ports = append(ports, int(port.Port))
}
}

for _, addr := range subset.Addresses {
for _, port := range ports {
addrStr := fromStrPtr(addr.Ip)
addrStr := addr.IP
if addrStr == "" {
addrStr = fromStrPtr(addr.Hostname)
addrStr = addr.Hostname
}

if addrStr != "" {
scanResults = append(scanResults, Endpoint{
Service: svcName,
Addr: fmt.Sprintf("%s:%d", addrStr, port),
Hostname: fromStrPtr(addr.Hostname),
Hostname: addr.Hostname,
})
}
}
Expand All @@ -198,10 +180,21 @@ func (cs *clusterScanner) scanCluster() {
cs.consume(scanResults)
}

// fromStrPtr returns *s. If s is nil the empty string is returned.
func fromStrPtr(s *string) string {
if s == nil {
return ""
// namespace returns the namespace the pod is currently running in
// this is done because the k8s client we previously used set the namespace
// when the client was created, the official k8s client does not
func namespace() string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please rewrite this a little to log the error if it happens.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rewritten

const filename = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
data, err := ioutil.ReadFile(filename)
if err != nil {
log15.Warn("scanner: falling back to kubernetes default namespace", "filename", filename, "error", err)
return "default"
}

ns := strings.TrimSpace(string(data))
if ns == "" {
log15.Warn("file: ", filename, " empty using \"default\" ns")
daxmc99 marked this conversation as resolved.
Show resolved Hide resolved
return "default"
}
return *s
return ns
}
Loading