Skip to content

Commit

Permalink
code review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
andmat900 committed Sep 27, 2024
1 parent 8a034fc commit a68434e
Show file tree
Hide file tree
Showing 19 changed files with 106 additions and 242 deletions.
15 changes: 8 additions & 7 deletions cmd/executionspace/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ import (
"syscall"

config "github.com/eiffel-community/etos-api/internal/configs/executionspace"
"github.com/eiffel-community/etos-api/internal/executionspace/etcd"
"github.com/eiffel-community/etos-api/internal/database/etcd"
"github.com/eiffel-community/etos-api/internal/executionspace/provider"
"github.com/eiffel-community/etos-api/internal/logging"
"github.com/eiffel-community/etos-api/internal/logging/rabbitmqhook"
"github.com/eiffel-community/etos-api/internal/rabbitmq"
"github.com/eiffel-community/etos-api/internal/server"
"github.com/eiffel-community/etos-api/pkg/application"
providerservice "github.com/eiffel-community/etos-api/pkg/executionspace/v1alpha"
"github.com/sirupsen/logrus"
"github.com/snowzach/rotatefilehook"
"github.com/eiffel-community/etos-api/internal/executionspace/logging/rabbitmqhook"
"github.com/eiffel-community/etos-api/internal/executionspace/rabbitmq"
providerservice "github.com/eiffel-community/etos-api/pkg/executionspace/v1alpha"
"go.elastic.co/ecslogrus"
)

Expand Down Expand Up @@ -94,7 +94,7 @@ func main() {
if err := srv.Close(ctx); err != nil {
log.Errorf("WebService shutdown failed: %+v", err)
}
log.Info("Wait for checkout, flash and checkin jobs to complete")
log.Info("Wait for checkout and checkin jobs to complete")
}

// fileLogging adds a hook into a slice of hooks, if the filepath configuration is set
Expand All @@ -120,19 +120,20 @@ func fileLogging(cfg config.Config) logrus.Hook {
// remoteLogging starts a new rabbitmq publisher if the rabbitmq parameters are set
// Warning: Must call publisher.Close() on the publisher returned from this function
func remoteLogging(cfg config.Config) *rabbitmq.Publisher {
if cfg.RabbitMQHookUrl() != "" {
if cfg.RabbitMQHookURL() != "" {
if cfg.RabbitMQHookExchangeName() == "" {
panic("-rabbitmq_hook_exchange (env:ETOS_RABBITMQ_EXCHANGE) must be set when using -rabbitmq_hook_url (env:ETOS_RABBITMQ_URL)")
}
publisher := rabbitmq.NewPublisher(rabbitmq.PublisherConfig{
URL: cfg.RabbitMQHookUrl(),
URL: cfg.RabbitMQHookURL(),
ExchangeName: cfg.RabbitMQHookExchangeName(),
})
return publisher
}
return nil
}

// vcsRevision returns the current source code revision
func vcsRevision() string {
buildInfo, ok := debug.ReadBuildInfo()
if !ok {
Expand Down
1 change: 0 additions & 1 deletion deploy/etos-executionspace/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: "3.7"
services:
etos-executionspace:
build:
Expand Down
12 changes: 10 additions & 2 deletions internal/configs/executionspace/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ type Config interface {
Timeout() time.Duration
KubernetesNamespace() string
ExecutionSpaceWaitTimeout() time.Duration
RabbitMQHookUrl() string
RabbitMQHookURL() string
RabbitMQHookExchangeName() string
DatabaseURI() string
ETOSNamespace() string
EiffelGoerURL() string
}

// cfg implements the Config interface.
Expand All @@ -56,6 +57,7 @@ type cfg struct {
executionSpaceWaitTimeout time.Duration
rabbitmqHookURL string
rabbitmqHookExchange string
eiffelGoerURL string
etosNamespace string
}

Expand Down Expand Up @@ -86,6 +88,7 @@ func Get() Config {
flag.DurationVar(&conf.executionSpaceWaitTimeout, "execution space wait timeout", executionSpaceWaitTimeout, "Timeout duration to wait when trying to checkout execution space(s)")
flag.StringVar(&conf.rabbitmqHookURL, "rabbitmq_hook_url", os.Getenv("ETOS_RABBITMQ_URL"), "URL to the ETOS rabbitmq for logs")
flag.StringVar(&conf.rabbitmqHookExchange, "rabbitmq_hook_exchange", os.Getenv("ETOS_RABBITMQ_EXCHANGE"), "Exchange to use for the ETOS rabbitmq for logs")
flag.StringVar(&conf.eiffelGoerURL, "event_repository_host", os.Getenv("EIFFEL_GOER_URL"), "Event repository URL used for Eiffel event lookup")
flag.Parse()
return &conf
}
Expand Down Expand Up @@ -136,10 +139,15 @@ func (c *cfg) ExecutionSpaceWaitTimeout() time.Duration {
}

// RabbitMQHookURL returns the rabbitmq url for ETOS logs
func (c *cfg) RabbitMQHookUrl() string {
func (c *cfg) RabbitMQHookURL() string {
return c.rabbitmqHookURL
}

// EventRepositoryURL returns the Eiffel event repository used for event lookups
func (c *cfg) EiffelGoerURL() string {
return c.eiffelGoerURL
}

// RabbitMQHookExchangeName returns the rabbitmq exchange name used for ETOS logs
func (c *cfg) RabbitMQHookExchangeName() string {
return c.rabbitmqHookExchange
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/google/uuid"
)

// Opener is the common interface for database clients
type Opener interface {
Open(context.Context, uuid.UUID) io.ReadWriter
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"time"

config "github.com/eiffel-community/etos-api/internal/configs/executionspace"
"github.com/eiffel-community/etos-api/internal/executionspace/database"
"github.com/eiffel-community/etos-api/internal/database"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
clientv3 "go.etcd.io/etcd/client/v3"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"errors"
"io"
"net/http"
"os"

"github.com/eiffel-community/eiffelevents-sdk-go"
)
Expand All @@ -38,15 +37,10 @@ type activityResponse struct {
Items []eiffelevents.ActivityTriggeredV4 `json:"items"`
}

// eventRepository returns the event repository URL to use.
func eventRepository() string {
return os.Getenv("EVENT_REPOSITORY_HOST")
}

// ActivityTriggered returns an activity triggered event from the event repository
func ActivityTriggered(ctx context.Context, id string) (*eiffelevents.ActivityTriggeredV4, error) {
func ActivityTriggered(ctx context.Context, eventRepositoryURL string, id string) (*eiffelevents.ActivityTriggeredV4, error) {
query := map[string]string{"meta.id": id, "meta.type": "EiffelActivityTriggeredEvent"}
body, err := getEvents(ctx, query)
body, err := getEvents(ctx, eventRepositoryURL, query)
if err != nil {
return nil, err
}
Expand All @@ -61,15 +55,15 @@ func ActivityTriggered(ctx context.Context, id string) (*eiffelevents.ActivityTr
}

// MainSuiteStarted returns a test suite started event from the event repository
func MainSuiteStarted(ctx context.Context, id string) (*eiffelevents.TestSuiteStartedV3, error) {
activity, err := ActivityTriggered(ctx, id)
func MainSuiteStarted(ctx context.Context, eventRepositoryURL string, id string) (*eiffelevents.TestSuiteStartedV3, error) {
activity, err := ActivityTriggered(ctx, eventRepositoryURL, id)
if err != nil {
return nil, err
}
testSuiteID := activity.Links.FindFirst("CONTEXT")

query := map[string]string{"meta.id": testSuiteID, "meta.type": "EiffelTestSuiteStartedEvent"}
body, err := getEvents(ctx, query)
body, err := getEvents(ctx, eventRepositoryURL, query)
if err != nil {
return nil, err
}
Expand All @@ -84,9 +78,9 @@ func MainSuiteStarted(ctx context.Context, id string) (*eiffelevents.TestSuiteSt
}

// TestSuiteStarted returns a test suite started event from the event repository
func TestSuiteStarted(ctx context.Context, id string, name string) (*eiffelevents.TestSuiteStartedV3, error) {
func TestSuiteStarted(ctx context.Context, eventRepositoryURL string, id string, name string) (*eiffelevents.TestSuiteStartedV3, error) {
query := map[string]string{"links.target": id, "meta.type": "EiffelTestSuiteStartedEvent", "data.name": name}
body, err := getEvents(ctx, query)
body, err := getEvents(ctx, eventRepositoryURL, query)
if err != nil {
return nil, err
}
Expand All @@ -101,9 +95,9 @@ func TestSuiteStarted(ctx context.Context, id string, name string) (*eiffelevent
}

// EnvironmentDefined returns an environment defined event from the event repository
func EnvironmentDefined(ctx context.Context, id string) (*eiffelevents.EnvironmentDefinedV3, error) {
func EnvironmentDefined(ctx context.Context, eventRepositoryURL string, id string) (*eiffelevents.EnvironmentDefinedV3, error) {
query := map[string]string{"meta.id": id, "meta.type": "EiffelEnvironmentDefinedEvent"}
body, err := getEvents(ctx, query)
body, err := getEvents(ctx, eventRepositoryURL, query)
if err != nil {
return nil, err
}
Expand All @@ -118,8 +112,8 @@ func EnvironmentDefined(ctx context.Context, id string) (*eiffelevents.Environme
}

// getEvents queries the event repository and returns the response for others to parse
func getEvents(ctx context.Context, query map[string]string) ([]byte, error) {
req, err := http.NewRequestWithContext(ctx, "GET", eventRepository(), nil)
func getEvents(ctx context.Context, eventRepositoryURL string, query map[string]string) ([]byte, error) {
req, err := http.NewRequestWithContext(ctx, "GET", eventRepositoryURL, nil)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions internal/executionspace/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/sirupsen/logrus"
)

// Executor is the common interface for test executor instances
type Executor interface {
Name() string
Start(context.Context, *logrus.Entry, *executionspace.ExecutorSpec) (string, error)
Expand Down
40 changes: 9 additions & 31 deletions internal/executionspace/executor/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (

var (
BACKOFFLIMIT int32 = 0
PARALLELL int32 = 1
PARALLEL int32 = 1
COMPLETIONS int32 = 1
SECRETMODE int32 = 0600
)
Expand Down Expand Up @@ -105,7 +105,7 @@ func (k KubernetesExecutor) Start(ctx context.Context, logger *logrus.Entry, exe
Spec: batchv1.JobSpec{
BackoffLimit: &BACKOFFLIMIT,
Completions: &COMPLETIONS,
Parallelism: &PARALLELL,
Parallelism: &PARALLEL,
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
Expand All @@ -114,22 +114,6 @@ func (k KubernetesExecutor) Start(ctx context.Context, logger *logrus.Entry, exe
Image: executorSpec.Instructions.Image,
Args: args,
Env: envs,
EnvFrom: []corev1.EnvFromSource{
{
SecretRef: &corev1.SecretEnvSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: "etos-encryption-key",
},
},
},
},
VolumeMounts: []corev1.VolumeMount{
{
Name: "ssh-key-and-config",
ReadOnly: true,
MountPath: "/home/etos/keys",
},
},
},
},
RestartPolicy: corev1.RestartPolicyNever,
Expand All @@ -150,13 +134,7 @@ func (k KubernetesExecutor) Start(ctx context.Context, logger *logrus.Entry, exe
}
job, err := jobs.Create(ctx, job, metav1.CreateOptions{})
if err != nil {
logger.WithField("user_log", true).Infof("Create job error: %s", err)
logger.WithField("user_log", true).Infof("Create job error: %s", err.Error())

unwrappedErr := errors.Unwrap(err)
if unwrappedErr != nil {
logger.WithField("user_log", true).Infof("Unwrapped Error: %s", unwrappedErr)
}
logger.WithField("user_log", true).Errorf("Create job error: %s", err)
return "", err
}
return job.ObjectMeta.Name, nil
Expand Down Expand Up @@ -198,7 +176,7 @@ func (k KubernetesExecutor) Wait(ctx context.Context, logger *logrus.Entry, name
for {
select {
case <-ctx.Done():
return "", "", fmt.Errorf("timed out waiting for kubernets job %s to start", name)
return "", "", fmt.Errorf("timed out waiting for Kubernetes job %s to start", name)
case event := <-watcher.ResultChan():
pod := event.Object.(*corev1.Pod)
if isReady(pod) {
Expand All @@ -209,20 +187,20 @@ func (k KubernetesExecutor) Wait(ctx context.Context, logger *logrus.Entry, name
}

// Stop stops a test runner Kubernetes pod
func (k KubernetesExecutor) Stop(ctx context.Context, logger *logrus.Entry, id string) error {
func (k KubernetesExecutor) Stop(ctx context.Context, logger *logrus.Entry, name string) error {
logger.WithField("user_log", true).Info("Stopping test runner Kubernetes pod")
jobs := k.client.BatchV1().Jobs(k.namespace)
propagation := metav1.DeletePropagationForeground
err := jobs.Delete(ctx, id, metav1.DeleteOptions{PropagationPolicy: &propagation})
err := jobs.Delete(ctx, name, metav1.DeleteOptions{PropagationPolicy: &propagation})
if err != nil {
logger.Error(err.Error())
return err
}
watcher, err := k.client.CoreV1().Pods(k.namespace).Watch(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("job-name=%s", id)})
watcher, err := k.client.CoreV1().Pods(k.namespace).Watch(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("job-name=%s", name)})
if err != nil {
if net.IsProbableEOF(err) {
// Assume that there are no more active jobs.
logger.Warningf("Did not find any pods for 'job-name=%s', reason=EOF. Assuming that there are no more active jobs", id)
logger.Warningf("Did not find any pods for 'job-name=%s', reason=EOF. Assuming that there are no more active jobs", name)
return nil
}
return err
Expand All @@ -231,7 +209,7 @@ func (k KubernetesExecutor) Stop(ctx context.Context, logger *logrus.Entry, id s
for {
select {
case <-ctx.Done():
return fmt.Errorf("timed out waiting for kubernets job %s to stop", id)
return fmt.Errorf("timed out waiting for Kubernetes job %s to stop", name)
case event := <-watcher.ResultChan():
if event.Type == watch.Deleted {
return nil
Expand Down
38 changes: 0 additions & 38 deletions internal/executionspace/logging/logging.go

This file was deleted.

34 changes: 0 additions & 34 deletions internal/executionspace/logging/logging_test.go

This file was deleted.

Loading

0 comments on commit a68434e

Please sign in to comment.