Skip to content

Commit

Permalink
RBAC token to be passed from the plugin to the event mesh backend (#66)
Browse files Browse the repository at this point in the history
* refactored listers to kube client

* added token to config & test refactoring

* tests fixed

* added token to config

* added token place holder

* hardcoded namespace fixed to all namespaces & copy config provided to fetchers (#34)

* Bearer added to header (#34)

* (fixed go imports  #34)

* fixed go imports  (#34)

* removed permissions for eventtypes, brokers and triggers (#34)

* reverted test namespaces (#34)

* token check added (#34)

* permissions deleted (#34)

* go lint fix (#34)

* Update docs

Signed-off-by: Ali Ok <[email protected]>

* Get rid of listers and other injection

Signed-off-by: Ali Ok <[email protected]>

* Read the token in `Bearer: <token>` format

Signed-off-by: Ali Ok <[email protected]>

* Update backstage/plugins/knative-event-mesh-backend/README.md

* Update backstage/plugins/knative-event-mesh-backend/README.md

* Update backstage/plugins/knative-event-mesh-backend/README.md

* Update backstage/plugins/knative-event-mesh-backend/README.md

* Use kncloudevents.HTTPEventReceiver to start the HTTP server

Signed-off-by: Ali Ok <[email protected]>

* ./hack/update-deps.sh

Signed-off-by: Ali Ok <[email protected]>

* Remove some TODOs

Signed-off-by: Ali Ok <[email protected]>

* Goimports

Signed-off-by: Ali Ok <[email protected]>

* Address more comments

Signed-off-by: Ali Ok <[email protected]>

---------

Signed-off-by: Ali Ok <[email protected]>
Co-authored-by: Ali Ok <[email protected]>
  • Loading branch information
ahmetcihank and aliok authored Jun 17, 2024
1 parent 2f438d2 commit 8e25fee
Show file tree
Hide file tree
Showing 239 changed files with 17,505 additions and 15,508 deletions.
22 changes: 2 additions & 20 deletions backends/cmd/eventmesh/main.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,12 @@
package main

import (
"context"

"knative.dev/backstage-plugins/backends/pkg/reconciler/eventmesh"

"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/injection"
"knative.dev/pkg/injection/sharedmain"
"knative.dev/pkg/signals"
)

const (
component = "eventmesh-backend"
)

func main() {

sharedmain.MainNamed(signals.NewContext(), component,

injection.NamedControllerConstructor{
Name: "backend",
ControllerConstructor: func(ctx context.Context, watcher configmap.Watcher) *controller.Impl {
return eventmesh.NewController(ctx)
},
},
)
ctx := signals.NewContext()
eventmesh.NewController(ctx)
}
23 changes: 0 additions & 23 deletions backends/config/100-eventmesh/100-clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,6 @@ rules:
- list
- watch

# permissions for eventtypes, brokers and triggers
- apiGroups:
- "eventing.knative.dev"
resources:
- brokers
- eventtypes
- triggers
verbs:
- get
- list
- watch

# permissions for leader election
- apiGroups:
- "coordination.k8s.io"
Expand All @@ -45,14 +33,3 @@ rules:
- delete
- patch
- watch


# permissions to get subscribers for triggers
# as subscribers can be any resource, we need to give access to all resources
# we fetch subscribers one by one, we only need `get` verb
- apiGroups:
- "*"
resources:
- "*"
verbs:
- get
64 changes: 16 additions & 48 deletions backends/pkg/reconciler/eventmesh/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,71 +3,39 @@ package eventmesh
import (
"context"
"log"
"net/http"

"github.com/gorilla/mux"
"k8s.io/client-go/rest"

"knative.dev/pkg/controller"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/pkg/logging"

eventtypereconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1beta2/eventtype"

brokerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker"
triggerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger"
eventtypeinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1beta2/eventtype"

eventinglistersv1 "knative.dev/eventing/pkg/client/listers/eventing/v1"
eventinglistersv1beta2 "knative.dev/eventing/pkg/client/listers/eventing/v1beta2"
)

type Listers struct {
EventTypeLister eventinglistersv1beta2.EventTypeLister
BrokerLister eventinglistersv1.BrokerLister
TriggerLister eventinglistersv1.TriggerLister
}

func NewController(ctx context.Context) *controller.Impl {

reconciler := &Reconciler{}
func NewController(ctx context.Context) {

logger := logging.FromContext(ctx)

logger.Infow("Starting eventmesh-backend controller")

// shared main does all the injection and starts the controller
// thus, we want to use it.
// and, it wants a controller.Impl, so, we're just returning one that's not really used in reality.
impl := eventtypereconciler.NewImpl(ctx, reconciler)

listers := Listers{
EventTypeLister: eventtypeinformer.Get(ctx).Lister(),
BrokerLister: brokerinformer.Get(ctx).Lister(),
TriggerLister: triggerinformer.Get(ctx).Lister(),
}

go startWebServer(ctx, listers)

return impl
startWebServer(ctx)
}

func startWebServer(ctx context.Context, listers Listers) {
func startWebServer(ctx context.Context) {

logger := logging.FromContext(ctx)

logger.Infow("Starting eventmesh-backend webserver")

r := mux.NewRouter()
r.Use(commonMiddleware)

r.HandleFunc("/", HttpHandler(ctx, listers)).Methods("GET")
http.Handle("/", r)
noTokenConfig, err := rest.InClusterConfig()
if err != nil {
log.Fatalf("Error getting in-cluster config: %v", err)
}

log.Fatal(http.ListenAndServe(":8080", r))
}
noTokenConfig.BearerToken = ""
noTokenConfig.Username = ""
noTokenConfig.Password = ""
noTokenConfig.BearerTokenFile = ""

func commonMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Content-Type", "application/json")
next.ServeHTTP(w, r)
})
r := kncloudevents.NewHTTPEventReceiver(8080)
err = r.StartListen(ctx, HttpHandler{ctx, noTokenConfig})
log.Fatal(err)
}
113 changes: 68 additions & 45 deletions backends/pkg/reconciler/eventmesh/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,26 @@ package eventmesh
import (
"context"
"fmt"
"log"
"net/http"
"sort"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/rest"
"knative.dev/eventing/pkg/client/clientset/versioned"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/logging"

"knative.dev/pkg/injection/clients/dynamicclient"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
eventinglistersv1beta2 "knative.dev/eventing/pkg/client/listers/eventing/v1beta2"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/labels"

"k8s.io/apimachinery/pkg/util/json"

"knative.dev/pkg/logging"

eventinglistersv1 "knative.dev/eventing/pkg/client/listers/eventing/v1"
)

// EventMesh is the top-level struct that holds the event mesh data.
Expand All @@ -47,26 +44,49 @@ type EventMesh struct {
const BackstageKubernetesIDLabel = "backstage.io/kubernetes-id"

// HttpHandler is the HTTP handler that's used to serve the event mesh data.
func HttpHandler(ctx context.Context, listers Listers) func(w http.ResponseWriter, req *http.Request) {
logger := logging.FromContext(ctx)
type HttpHandler struct {
ctx context.Context
inClusterConfig *rest.Config
}

// this handler simply calls the event mesh builder and returns the result as JSON
return func(w http.ResponseWriter, req *http.Request) {
logger.Debugw("Handling request", "method", req.Method, "url", req.URL)
// This handler simply calls the event mesh builder and returns the result as JSON
func (h HttpHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
logger := logging.FromContext(h.ctx)

eventMesh, err := BuildEventMesh(ctx, listers, logger)
if err != nil {
logger.Errorw("Error building event mesh", "error", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Add("Content-Type", "application/json")

err = json.NewEncoder(w).Encode(eventMesh)
if err != nil {
logger.Errorw("Error encoding event mesh", "error", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
logger.Debugw("Handling request", "method", req.Method, "url", req.URL)

config := rest.CopyConfig(h.inClusterConfig)
authHeader := req.Header.Get("Authorization")
if authHeader == "" {
http.Error(w, "Authorization header is missing", http.StatusUnauthorized)
return
}
// header value is in this format: "Bearer <token>"
// we only need the token part
if len(authHeader) < 8 || authHeader[:7] != "Bearer " {
http.Error(w, "Invalid Authorization header. Should start with `Bearer `", http.StatusUnauthorized)
return
}
config.BearerToken = authHeader[7:]
clientset, err := versioned.NewForConfig(config)
if err != nil {
log.Fatalf("Error creating clientset: %v", err)
}

eventMesh, err := BuildEventMesh(h.ctx, clientset, logger)
if err != nil {
logger.Errorw("Error building event mesh", "error", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

err = json.NewEncoder(w).Encode(eventMesh)
if err != nil {
logger.Errorw("Error encoding event mesh", "error", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}

Expand All @@ -76,9 +96,9 @@ func HttpHandler(ctx context.Context, listers Listers) func(w http.ResponseWrite
// - Do the same for event types.
// - Fetch the triggers, find out what event types they're subscribed to and find out the resources that are receiving the events.
// - Make a connection between the event types and the subscribers. Store this connection in the eventType struct.
func BuildEventMesh(ctx context.Context, listers Listers, logger *zap.SugaredLogger) (EventMesh, error) {
func BuildEventMesh(ctx context.Context, clientset versioned.Interface, logger *zap.SugaredLogger) (EventMesh, error) {
// fetch the brokers and convert them to the representation that's consumed by the Backstage plugin.
convertedBrokers, err := fetchBrokers(listers.BrokerLister, logger)
convertedBrokers, err := fetchBrokers(clientset, logger)
if err != nil {
logger.Errorw("Error fetching and converting brokers", "error", err)
return EventMesh{}, err
Expand All @@ -93,7 +113,7 @@ func BuildEventMesh(ctx context.Context, listers Listers, logger *zap.SugaredLog
}

// fetch the event types and convert them to the representation that's consumed by the Backstage plugin.
convertedEventTypes, err := fetchEventTypes(listers.EventTypeLister, logger)
convertedEventTypes, err := fetchEventTypes(clientset, logger)
if err != nil {
logger.Errorw("Error fetching and converting event types", "error", err)
return EventMesh{}, err
Expand All @@ -109,7 +129,8 @@ func BuildEventMesh(ctx context.Context, listers Listers, logger *zap.SugaredLog
}

// fetch the triggers we will process them later
triggers, err := listers.TriggerLister.List(labels.Everything())
triggers, err := clientset.EventingV1().Triggers(metav1.NamespaceAll).List(context.Background(), metav1.ListOptions{})

if err != nil {
logger.Errorw("Error listing triggers", "error", err)
return EventMesh{}, err
Expand All @@ -124,8 +145,8 @@ func BuildEventMesh(ctx context.Context, listers Listers, logger *zap.SugaredLog
etByNamespacedName[et.NamespacedName()] = et
}

for _, trigger := range triggers {
err := processTrigger(ctx, trigger, brokerMap, etByNamespacedName, logger)
for _, trigger := range triggers.Items {
err := processTrigger(ctx, &trigger, brokerMap, etByNamespacedName, logger)
if err != nil {
logger.Errorw("Error processing trigger", "error", err)
// do not stop the Backstage plugin from rendering the rest of the data, e.g. because
Expand Down Expand Up @@ -234,39 +255,41 @@ func collectSubscribedEventTypes(trigger *eventingv1.Trigger, broker *Broker, et
}

// fetchBrokers fetches the brokers and converts them to the representation that's consumed by the Backstage plugin.
func fetchBrokers(brokerLister eventinglistersv1.BrokerLister, logger *zap.SugaredLogger) ([]*Broker, error) {
fetchedBrokers, err := brokerLister.List(labels.Everything())
func fetchBrokers(clientset versioned.Interface, logger *zap.SugaredLogger) ([]*Broker, error) {
brokers, err := clientset.EventingV1().Brokers(metav1.NamespaceAll).List(context.Background(), metav1.ListOptions{})

if err != nil {
logger.Errorw("Error listing brokers", "error", err)
return nil, err
}

convertedBrokers := make([]*Broker, 0, len(fetchedBrokers))
for _, br := range fetchedBrokers {
convertedBroker := convertBroker(br)
convertedBrokers := make([]*Broker, 0, len(brokers.Items))
for _, br := range brokers.Items {
convertedBroker := convertBroker(&br)
convertedBrokers = append(convertedBrokers, &convertedBroker)
}
return convertedBrokers, err
}

// fetchEventTypes fetches the event types and converts them to the representation that's consumed by the Backstage plugin.
func fetchEventTypes(eventTypeLister eventinglistersv1beta2.EventTypeLister, logger *zap.SugaredLogger) ([]*EventType, error) {
fetchedEventTypes, err := eventTypeLister.List(labels.Everything())
func fetchEventTypes(clientset versioned.Interface, logger *zap.SugaredLogger) ([]*EventType, error) {
eventTypeResponse, err := clientset.EventingV1beta2().EventTypes(metav1.NamespaceAll).List(context.Background(), metav1.ListOptions{})
if err != nil {
logger.Errorw("Error listing eventTypes", "error", err)
return nil, err
}
eventTypes := eventTypeResponse.Items

sort.Slice(fetchedEventTypes, func(i, j int) bool {
if fetchedEventTypes[i].Namespace != fetchedEventTypes[j].Namespace {
return fetchedEventTypes[i].Namespace < fetchedEventTypes[j].Namespace
sort.Slice(eventTypes, func(i, j int) bool {
if eventTypes[i].Namespace != eventTypes[j].Namespace {
return eventTypes[i].Namespace < eventTypes[j].Namespace
}
return fetchedEventTypes[i].Name < fetchedEventTypes[j].Name
return eventTypes[i].Name < eventTypes[j].Name
})

convertedEventTypes := make([]*EventType, 0, len(fetchedEventTypes))
for _, et := range fetchedEventTypes {
convertedEventType := convertEventType(et)
convertedEventTypes := make([]*EventType, 0, len(eventTypes))
for _, et := range eventTypes {
convertedEventType := convertEventType(&et)
convertedEventTypes = append(convertedEventTypes, &convertedEventType)
}

Expand Down
Loading

0 comments on commit 8e25fee

Please sign in to comment.