From a68434e96fccaaffece73d9def68f4393c72de75 Mon Sep 17 00:00:00 2001 From: Andrei Matveyeu Date: Fri, 27 Sep 2024 09:59:51 +0200 Subject: [PATCH] code review changes --- cmd/executionspace/main.go | 15 ++--- deploy/etos-executionspace/docker-compose.yml | 1 - internal/configs/executionspace/config.go | 12 +++- .../{executionspace => }/database/database.go | 1 + .../{executionspace => database}/etcd/etcd.go | 2 +- .../eventrepository/eventrepository.go | 28 ++++------ internal/executionspace/executor/executor.go | 1 + .../executionspace/executor/kubernetes.go | 40 +++----------- internal/executionspace/logging/logging.go | 38 ------------- .../executionspace/logging/logging_test.go | 34 ------------ .../executionspace/provider/kubernetes.go | 6 +- internal/executionspace/provider/provider.go | 26 ++++----- .../responses/responses_test.go | 42 -------------- .../logging/rabbitmqhook/rabbitmqhook.go | 2 +- .../{executionspace => }/rabbitmq/rabbitmq.go | 0 manifests/base/executionspace/deployment.yaml | 1 - pkg/executionspace/v1alpha/executor.go | 55 +++++++++---------- pkg/executionspace/v1alpha/provider.go | 42 +++++++------- .../executionspace/v1alpha}/responses.go | 2 +- 19 files changed, 106 insertions(+), 242 deletions(-) rename internal/{executionspace => }/database/database.go (93%) rename internal/{executionspace => database}/etcd/etcd.go (97%) rename internal/{executionspace => }/eventrepository/eventrepository.go (79%) delete mode 100644 internal/executionspace/logging/logging.go delete mode 100644 internal/executionspace/logging/logging_test.go delete mode 100644 internal/executionspace/responses/responses_test.go rename internal/{executionspace => }/logging/rabbitmqhook/rabbitmqhook.go (97%) rename internal/{executionspace => }/rabbitmq/rabbitmq.go (100%) rename {internal/executionspace/responses => pkg/executionspace/v1alpha}/responses.go (98%) diff --git a/cmd/executionspace/main.go b/cmd/executionspace/main.go index f1ecafd..2c72854 100644 --- a/cmd/executionspace/main.go +++ b/cmd/executionspace/main.go @@ -24,16 +24,16 @@ import ( "syscall" config "github.com/eiffel-community/etos-api/internal/configs/executionspace" - "github.com/eiffel-community/etos-api/internal/executionspace/etcd" + "github.com/eiffel-community/etos-api/internal/database/etcd" "github.com/eiffel-community/etos-api/internal/executionspace/provider" "github.com/eiffel-community/etos-api/internal/logging" + "github.com/eiffel-community/etos-api/internal/logging/rabbitmqhook" + "github.com/eiffel-community/etos-api/internal/rabbitmq" "github.com/eiffel-community/etos-api/internal/server" "github.com/eiffel-community/etos-api/pkg/application" + providerservice "github.com/eiffel-community/etos-api/pkg/executionspace/v1alpha" "github.com/sirupsen/logrus" "github.com/snowzach/rotatefilehook" - "github.com/eiffel-community/etos-api/internal/executionspace/logging/rabbitmqhook" - "github.com/eiffel-community/etos-api/internal/executionspace/rabbitmq" - providerservice "github.com/eiffel-community/etos-api/pkg/executionspace/v1alpha" "go.elastic.co/ecslogrus" ) @@ -94,7 +94,7 @@ func main() { if err := srv.Close(ctx); err != nil { log.Errorf("WebService shutdown failed: %+v", err) } - log.Info("Wait for checkout, flash and checkin jobs to complete") + log.Info("Wait for checkout and checkin jobs to complete") } // fileLogging adds a hook into a slice of hooks, if the filepath configuration is set @@ -120,12 +120,12 @@ func fileLogging(cfg config.Config) logrus.Hook { // remoteLogging starts a new rabbitmq publisher if the rabbitmq parameters are set // Warning: Must call publisher.Close() on the publisher returned from this function func remoteLogging(cfg config.Config) *rabbitmq.Publisher { - if cfg.RabbitMQHookUrl() != "" { + if cfg.RabbitMQHookURL() != "" { if cfg.RabbitMQHookExchangeName() == "" { panic("-rabbitmq_hook_exchange (env:ETOS_RABBITMQ_EXCHANGE) must be set when using -rabbitmq_hook_url (env:ETOS_RABBITMQ_URL)") } publisher := rabbitmq.NewPublisher(rabbitmq.PublisherConfig{ - URL: cfg.RabbitMQHookUrl(), + URL: cfg.RabbitMQHookURL(), ExchangeName: cfg.RabbitMQHookExchangeName(), }) return publisher @@ -133,6 +133,7 @@ func remoteLogging(cfg config.Config) *rabbitmq.Publisher { return nil } +// vcsRevision returns the current source code revision func vcsRevision() string { buildInfo, ok := debug.ReadBuildInfo() if !ok { diff --git a/deploy/etos-executionspace/docker-compose.yml b/deploy/etos-executionspace/docker-compose.yml index 8872d3c..65f096e 100644 --- a/deploy/etos-executionspace/docker-compose.yml +++ b/deploy/etos-executionspace/docker-compose.yml @@ -1,4 +1,3 @@ -version: "3.7" services: etos-executionspace: build: diff --git a/internal/configs/executionspace/config.go b/internal/configs/executionspace/config.go index 28bd34a..04f6f8b 100644 --- a/internal/configs/executionspace/config.go +++ b/internal/configs/executionspace/config.go @@ -35,10 +35,11 @@ type Config interface { Timeout() time.Duration KubernetesNamespace() string ExecutionSpaceWaitTimeout() time.Duration - RabbitMQHookUrl() string + RabbitMQHookURL() string RabbitMQHookExchangeName() string DatabaseURI() string ETOSNamespace() string + EiffelGoerURL() string } // cfg implements the Config interface. @@ -56,6 +57,7 @@ type cfg struct { executionSpaceWaitTimeout time.Duration rabbitmqHookURL string rabbitmqHookExchange string + eiffelGoerURL string etosNamespace string } @@ -86,6 +88,7 @@ func Get() Config { flag.DurationVar(&conf.executionSpaceWaitTimeout, "execution space wait timeout", executionSpaceWaitTimeout, "Timeout duration to wait when trying to checkout execution space(s)") flag.StringVar(&conf.rabbitmqHookURL, "rabbitmq_hook_url", os.Getenv("ETOS_RABBITMQ_URL"), "URL to the ETOS rabbitmq for logs") flag.StringVar(&conf.rabbitmqHookExchange, "rabbitmq_hook_exchange", os.Getenv("ETOS_RABBITMQ_EXCHANGE"), "Exchange to use for the ETOS rabbitmq for logs") + flag.StringVar(&conf.eiffelGoerURL, "event_repository_host", os.Getenv("EIFFEL_GOER_URL"), "Event repository URL used for Eiffel event lookup") flag.Parse() return &conf } @@ -136,10 +139,15 @@ func (c *cfg) ExecutionSpaceWaitTimeout() time.Duration { } // RabbitMQHookURL returns the rabbitmq url for ETOS logs -func (c *cfg) RabbitMQHookUrl() string { +func (c *cfg) RabbitMQHookURL() string { return c.rabbitmqHookURL } +// EventRepositoryURL returns the Eiffel event repository used for event lookups +func (c *cfg) EiffelGoerURL() string { + return c.eiffelGoerURL +} + // RabbitMQHookExchangeName returns the rabbitmq exchange name used for ETOS logs func (c *cfg) RabbitMQHookExchangeName() string { return c.rabbitmqHookExchange diff --git a/internal/executionspace/database/database.go b/internal/database/database.go similarity index 93% rename from internal/executionspace/database/database.go rename to internal/database/database.go index 50d428b..b34e652 100644 --- a/internal/executionspace/database/database.go +++ b/internal/database/database.go @@ -22,6 +22,7 @@ import ( "github.com/google/uuid" ) +// Opener is the common interface for database clients type Opener interface { Open(context.Context, uuid.UUID) io.ReadWriter } diff --git a/internal/executionspace/etcd/etcd.go b/internal/database/etcd/etcd.go similarity index 97% rename from internal/executionspace/etcd/etcd.go rename to internal/database/etcd/etcd.go index fd23fd6..c97b585 100644 --- a/internal/executionspace/etcd/etcd.go +++ b/internal/database/etcd/etcd.go @@ -23,7 +23,7 @@ import ( "time" config "github.com/eiffel-community/etos-api/internal/configs/executionspace" - "github.com/eiffel-community/etos-api/internal/executionspace/database" + "github.com/eiffel-community/etos-api/internal/database" "github.com/google/uuid" "github.com/sirupsen/logrus" clientv3 "go.etcd.io/etcd/client/v3" diff --git a/internal/executionspace/eventrepository/eventrepository.go b/internal/eventrepository/eventrepository.go similarity index 79% rename from internal/executionspace/eventrepository/eventrepository.go rename to internal/eventrepository/eventrepository.go index a1e1b45..23f0fa2 100644 --- a/internal/executionspace/eventrepository/eventrepository.go +++ b/internal/eventrepository/eventrepository.go @@ -21,7 +21,6 @@ import ( "errors" "io" "net/http" - "os" "github.com/eiffel-community/eiffelevents-sdk-go" ) @@ -38,15 +37,10 @@ type activityResponse struct { Items []eiffelevents.ActivityTriggeredV4 `json:"items"` } -// eventRepository returns the event repository URL to use. -func eventRepository() string { - return os.Getenv("EVENT_REPOSITORY_HOST") -} - // ActivityTriggered returns an activity triggered event from the event repository -func ActivityTriggered(ctx context.Context, id string) (*eiffelevents.ActivityTriggeredV4, error) { +func ActivityTriggered(ctx context.Context, eventRepositoryURL string, id string) (*eiffelevents.ActivityTriggeredV4, error) { query := map[string]string{"meta.id": id, "meta.type": "EiffelActivityTriggeredEvent"} - body, err := getEvents(ctx, query) + body, err := getEvents(ctx, eventRepositoryURL, query) if err != nil { return nil, err } @@ -61,15 +55,15 @@ func ActivityTriggered(ctx context.Context, id string) (*eiffelevents.ActivityTr } // MainSuiteStarted returns a test suite started event from the event repository -func MainSuiteStarted(ctx context.Context, id string) (*eiffelevents.TestSuiteStartedV3, error) { - activity, err := ActivityTriggered(ctx, id) +func MainSuiteStarted(ctx context.Context, eventRepositoryURL string, id string) (*eiffelevents.TestSuiteStartedV3, error) { + activity, err := ActivityTriggered(ctx, eventRepositoryURL, id) if err != nil { return nil, err } testSuiteID := activity.Links.FindFirst("CONTEXT") query := map[string]string{"meta.id": testSuiteID, "meta.type": "EiffelTestSuiteStartedEvent"} - body, err := getEvents(ctx, query) + body, err := getEvents(ctx, eventRepositoryURL, query) if err != nil { return nil, err } @@ -84,9 +78,9 @@ func MainSuiteStarted(ctx context.Context, id string) (*eiffelevents.TestSuiteSt } // TestSuiteStarted returns a test suite started event from the event repository -func TestSuiteStarted(ctx context.Context, id string, name string) (*eiffelevents.TestSuiteStartedV3, error) { +func TestSuiteStarted(ctx context.Context, eventRepositoryURL string, id string, name string) (*eiffelevents.TestSuiteStartedV3, error) { query := map[string]string{"links.target": id, "meta.type": "EiffelTestSuiteStartedEvent", "data.name": name} - body, err := getEvents(ctx, query) + body, err := getEvents(ctx, eventRepositoryURL, query) if err != nil { return nil, err } @@ -101,9 +95,9 @@ func TestSuiteStarted(ctx context.Context, id string, name string) (*eiffelevent } // EnvironmentDefined returns an environment defined event from the event repository -func EnvironmentDefined(ctx context.Context, id string) (*eiffelevents.EnvironmentDefinedV3, error) { +func EnvironmentDefined(ctx context.Context, eventRepositoryURL string, id string) (*eiffelevents.EnvironmentDefinedV3, error) { query := map[string]string{"meta.id": id, "meta.type": "EiffelEnvironmentDefinedEvent"} - body, err := getEvents(ctx, query) + body, err := getEvents(ctx, eventRepositoryURL, query) if err != nil { return nil, err } @@ -118,8 +112,8 @@ func EnvironmentDefined(ctx context.Context, id string) (*eiffelevents.Environme } // getEvents queries the event repository and returns the response for others to parse -func getEvents(ctx context.Context, query map[string]string) ([]byte, error) { - req, err := http.NewRequestWithContext(ctx, "GET", eventRepository(), nil) +func getEvents(ctx context.Context, eventRepositoryURL string, query map[string]string) ([]byte, error) { + req, err := http.NewRequestWithContext(ctx, "GET", eventRepositoryURL, nil) if err != nil { return nil, err } diff --git a/internal/executionspace/executor/executor.go b/internal/executionspace/executor/executor.go index 201e75f..1b2b21b 100644 --- a/internal/executionspace/executor/executor.go +++ b/internal/executionspace/executor/executor.go @@ -22,6 +22,7 @@ import ( "github.com/sirupsen/logrus" ) +// Executor is the common interface for test executor instances type Executor interface { Name() string Start(context.Context, *logrus.Entry, *executionspace.ExecutorSpec) (string, error) diff --git a/internal/executionspace/executor/kubernetes.go b/internal/executionspace/executor/kubernetes.go index fff6999..625177b 100644 --- a/internal/executionspace/executor/kubernetes.go +++ b/internal/executionspace/executor/kubernetes.go @@ -36,7 +36,7 @@ import ( var ( BACKOFFLIMIT int32 = 0 - PARALLELL int32 = 1 + PARALLEL int32 = 1 COMPLETIONS int32 = 1 SECRETMODE int32 = 0600 ) @@ -105,7 +105,7 @@ func (k KubernetesExecutor) Start(ctx context.Context, logger *logrus.Entry, exe Spec: batchv1.JobSpec{ BackoffLimit: &BACKOFFLIMIT, Completions: &COMPLETIONS, - Parallelism: &PARALLELL, + Parallelism: &PARALLEL, Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ Containers: []corev1.Container{ @@ -114,22 +114,6 @@ func (k KubernetesExecutor) Start(ctx context.Context, logger *logrus.Entry, exe Image: executorSpec.Instructions.Image, Args: args, Env: envs, - EnvFrom: []corev1.EnvFromSource{ - { - SecretRef: &corev1.SecretEnvSource{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: "etos-encryption-key", - }, - }, - }, - }, - VolumeMounts: []corev1.VolumeMount{ - { - Name: "ssh-key-and-config", - ReadOnly: true, - MountPath: "/home/etos/keys", - }, - }, }, }, RestartPolicy: corev1.RestartPolicyNever, @@ -150,13 +134,7 @@ func (k KubernetesExecutor) Start(ctx context.Context, logger *logrus.Entry, exe } job, err := jobs.Create(ctx, job, metav1.CreateOptions{}) if err != nil { - logger.WithField("user_log", true).Infof("Create job error: %s", err) - logger.WithField("user_log", true).Infof("Create job error: %s", err.Error()) - - unwrappedErr := errors.Unwrap(err) - if unwrappedErr != nil { - logger.WithField("user_log", true).Infof("Unwrapped Error: %s", unwrappedErr) - } + logger.WithField("user_log", true).Errorf("Create job error: %s", err) return "", err } return job.ObjectMeta.Name, nil @@ -198,7 +176,7 @@ func (k KubernetesExecutor) Wait(ctx context.Context, logger *logrus.Entry, name for { select { case <-ctx.Done(): - return "", "", fmt.Errorf("timed out waiting for kubernets job %s to start", name) + return "", "", fmt.Errorf("timed out waiting for Kubernetes job %s to start", name) case event := <-watcher.ResultChan(): pod := event.Object.(*corev1.Pod) if isReady(pod) { @@ -209,20 +187,20 @@ func (k KubernetesExecutor) Wait(ctx context.Context, logger *logrus.Entry, name } // Stop stops a test runner Kubernetes pod -func (k KubernetesExecutor) Stop(ctx context.Context, logger *logrus.Entry, id string) error { +func (k KubernetesExecutor) Stop(ctx context.Context, logger *logrus.Entry, name string) error { logger.WithField("user_log", true).Info("Stopping test runner Kubernetes pod") jobs := k.client.BatchV1().Jobs(k.namespace) propagation := metav1.DeletePropagationForeground - err := jobs.Delete(ctx, id, metav1.DeleteOptions{PropagationPolicy: &propagation}) + err := jobs.Delete(ctx, name, metav1.DeleteOptions{PropagationPolicy: &propagation}) if err != nil { logger.Error(err.Error()) return err } - watcher, err := k.client.CoreV1().Pods(k.namespace).Watch(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("job-name=%s", id)}) + watcher, err := k.client.CoreV1().Pods(k.namespace).Watch(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("job-name=%s", name)}) if err != nil { if net.IsProbableEOF(err) { // Assume that there are no more active jobs. - logger.Warningf("Did not find any pods for 'job-name=%s', reason=EOF. Assuming that there are no more active jobs", id) + logger.Warningf("Did not find any pods for 'job-name=%s', reason=EOF. Assuming that there are no more active jobs", name) return nil } return err @@ -231,7 +209,7 @@ func (k KubernetesExecutor) Stop(ctx context.Context, logger *logrus.Entry, id s for { select { case <-ctx.Done(): - return fmt.Errorf("timed out waiting for kubernets job %s to stop", id) + return fmt.Errorf("timed out waiting for Kubernetes job %s to stop", name) case event := <-watcher.ResultChan(): if event.Type == watch.Deleted { return nil diff --git a/internal/executionspace/logging/logging.go b/internal/executionspace/logging/logging.go deleted file mode 100644 index 07bc8d6..0000000 --- a/internal/executionspace/logging/logging.go +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright Axis Communications AB. -// -// For a full list of individual contributors, please see the commit history. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package logging - -import ( - "github.com/sirupsen/logrus" -) - -// Setup sets up logging to file with a JSON format and to stdout in text format. -func Setup(loglevel string, hooks []logrus.Hook) (*logrus.Logger, error) { - log := logrus.New() - - logLevel, err := logrus.ParseLevel(loglevel) - if err != nil { - return log, err - } - for _, hook := range hooks { - log.AddHook(hook) - } - - log.SetLevel(logLevel) - log.SetFormatter(&logrus.TextFormatter{FullTimestamp: true}) - log.SetReportCaller(true) - return log, nil -} diff --git a/internal/executionspace/logging/logging_test.go b/internal/executionspace/logging/logging_test.go deleted file mode 100644 index ead65c6..0000000 --- a/internal/executionspace/logging/logging_test.go +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright Axis Communications AB. -// -// For a full list of individual contributors, please see the commit history. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package logging - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -// TestLoggingSetup tests that it is possible to setup logging without a file hook. -func TestLoggingSetup(t *testing.T) { - _, err := Setup("INFO", nil) - assert.Nil(t, err) -} - -// TestLoggingSetupBadLogLevel shall return an error if log level is not parsable. -func TestLoggingSetupBadLogLevel(t *testing.T) { - _, err := Setup("NOTALOGLEVEL", nil) - assert.Error(t, err) -} diff --git a/internal/executionspace/provider/kubernetes.go b/internal/executionspace/provider/kubernetes.go index a1b1412..8df0e09 100644 --- a/internal/executionspace/provider/kubernetes.go +++ b/internal/executionspace/provider/kubernetes.go @@ -20,18 +20,18 @@ import ( "sync" config "github.com/eiffel-community/etos-api/internal/configs/executionspace" - "github.com/eiffel-community/etos-api/internal/executionspace/database" + "github.com/eiffel-community/etos-api/internal/database" "github.com/eiffel-community/etos-api/internal/executionspace/executor" ) type Kubernetes struct { - provider + providerCore } // New creates a copy of a Kubernetes provider func (k Kubernetes) New(db database.Opener, cfg config.Config) Provider { return &Kubernetes{ - provider{ + providerCore{ db: db, cfg: cfg, url: fmt.Sprintf("%s/v1alpha/executor/kubernetes", cfg.Hostname()), diff --git a/internal/executionspace/provider/provider.go b/internal/executionspace/provider/provider.go index b35cc0b..a73a4ff 100644 --- a/internal/executionspace/provider/provider.go +++ b/internal/executionspace/provider/provider.go @@ -20,7 +20,7 @@ import ( "sync" config "github.com/eiffel-community/etos-api/internal/configs/executionspace" - "github.com/eiffel-community/etos-api/internal/executionspace/database" + "github.com/eiffel-community/etos-api/internal/database" "github.com/eiffel-community/etos-api/internal/executionspace/executor" "github.com/eiffel-community/etos-api/pkg/executionspace/executionspace" "github.com/google/uuid" @@ -48,9 +48,9 @@ type ExecutorConfig struct { Environment map[string]string } -// provider partially implements the Provider interface. To use it it should +// providerCore partially implements the Provider interface. To use it it should // be included into another struct that implements the rest of the interface. -type provider struct { +type providerCore struct { db database.Opener cfg config.Config url string @@ -58,8 +58,8 @@ type provider struct { executor executor.Executor } -// Get fetches execution space status from a database -func (e provider) Status(logger *logrus.Entry, ctx context.Context, id uuid.UUID) (*executionspace.ExecutionSpace, error) { +// Status fetches execution space status from a database +func (e providerCore) Status(logger *logrus.Entry, ctx context.Context, id uuid.UUID) (*executionspace.ExecutionSpace, error) { e.active.Add(1) defer e.active.Done() @@ -96,7 +96,7 @@ func (e provider) Status(logger *logrus.Entry, ctx context.Context, id uuid.UUID } // Checkout checks out an execution space and stores it in a database -func (e provider) Checkout(logger *logrus.Entry, ctx context.Context, cfg ExecutorConfig) { +func (e providerCore) Checkout(logger *logrus.Entry, ctx context.Context, cfg ExecutorConfig) { e.active.Add(1) defer e.active.Done() @@ -130,7 +130,7 @@ func (e provider) Checkout(logger *logrus.Entry, ctx context.Context, cfg Execut } // Checkin checks in an execution space by removing it from database -func (e provider) Checkin(logger *logrus.Entry, ctx context.Context, executors []executionspace.ExecutorSpec) error { +func (e providerCore) Checkin(logger *logrus.Entry, ctx context.Context, executors []executionspace.ExecutorSpec) error { e.active.Add(1) defer e.active.Done() for _, executor := range executors { @@ -143,18 +143,18 @@ func (e provider) Checkin(logger *logrus.Entry, ctx context.Context, executors [ } // Executor returns the executor of this provider -func (e provider) Executor() executor.Executor { +func (e providerCore) Executor() executor.Executor { return e.executor } // SaveExecutor saves an executor specification into a database -func (e provider) SaveExecutor(ctx context.Context, executorSpec executionspace.ExecutorSpec) error { +func (e providerCore) SaveExecutor(ctx context.Context, executorSpec executionspace.ExecutorSpec) error { client := e.db.Open(ctx, executorSpec.ID) return executorSpec.Save(client) } // Job gets the Build ID of a test runner execution. -func (e provider) Job(ctx context.Context, id uuid.UUID) (string, error) { +func (e providerCore) Job(ctx context.Context, id uuid.UUID) (string, error) { executorSpec, err := e.ExecutorSpec(ctx, id) if err != nil { return "", err @@ -166,18 +166,18 @@ func (e provider) Job(ctx context.Context, id uuid.UUID) (string, error) { } // ExecutorSpec returns the specification of an executor stored in database -func (e provider) ExecutorSpec(ctx context.Context, id uuid.UUID) (*executionspace.ExecutorSpec, error) { +func (e providerCore) ExecutorSpec(ctx context.Context, id uuid.UUID) (*executionspace.ExecutorSpec, error) { client := e.db.Open(ctx, id) return executionspace.LoadExecutorSpec(client) } // ExecutionSPace returns the execution space stored in database -func (e provider) ExecutionSpace(ctx context.Context, id uuid.UUID) (*executionspace.ExecutionSpace, error) { +func (e providerCore) ExecutionSpace(ctx context.Context, id uuid.UUID) (*executionspace.ExecutionSpace, error) { client := e.db.Open(ctx, id) return executionspace.Load(client) } // Done waits for all jobs to be done -func (e provider) Done() { +func (e providerCore) Done() { e.active.Wait() } diff --git a/internal/executionspace/responses/responses_test.go b/internal/executionspace/responses/responses_test.go deleted file mode 100644 index a819deb..0000000 --- a/internal/executionspace/responses/responses_test.go +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright Axis Communications AB. -// -// For a full list of individual contributors, please see the commit history. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package responses - -import ( - "net/http/httptest" - "testing" - - "github.com/stretchr/testify/assert" -) - -// Test that RespondWithJSON writes the correct HTTP code, message and adds a content type header. -func TestRespondWithJSON(t *testing.T) { - responseRecorder := httptest.NewRecorder() - RespondWithJSON(responseRecorder, 200, map[string]string{"hello": "world"}) - assert.Equal(t, "application/json", responseRecorder.Header().Get("Content-Type")) - assert.Equal(t, 200, responseRecorder.Result().StatusCode) - assert.JSONEq(t, `{"hello": "world"}`, responseRecorder.Body.String()) -} - -// Test that RespondWithError writes the correct HTTP code, message and adds a content type header. -func TestRespondWithError(t *testing.T) { - responseRecorder := httptest.NewRecorder() - RespondWithError(responseRecorder, 400, "failure") - assert.Equal(t, "application/json", responseRecorder.Header().Get("Content-Type")) - assert.Equal(t, 400, responseRecorder.Result().StatusCode) - assert.JSONEq(t, `{"error": "failure"}`, responseRecorder.Body.String()) -} diff --git a/internal/executionspace/logging/rabbitmqhook/rabbitmqhook.go b/internal/logging/rabbitmqhook/rabbitmqhook.go similarity index 97% rename from internal/executionspace/logging/rabbitmqhook/rabbitmqhook.go rename to internal/logging/rabbitmqhook/rabbitmqhook.go index 547789d..bb11f72 100644 --- a/internal/executionspace/logging/rabbitmqhook/rabbitmqhook.go +++ b/internal/logging/rabbitmqhook/rabbitmqhook.go @@ -19,9 +19,9 @@ import ( "errors" "fmt" + "github.com/eiffel-community/etos-api/internal/rabbitmq" amqp "github.com/rabbitmq/amqp091-go" "github.com/sirupsen/logrus" - "github.com/eiffel-community/etos-api/internal/executionspace/rabbitmq" ) var fieldMap = logrus.FieldMap{ diff --git a/internal/executionspace/rabbitmq/rabbitmq.go b/internal/rabbitmq/rabbitmq.go similarity index 100% rename from internal/executionspace/rabbitmq/rabbitmq.go rename to internal/rabbitmq/rabbitmq.go diff --git a/manifests/base/executionspace/deployment.yaml b/manifests/base/executionspace/deployment.yaml index 4e7a84e..beb68ee 100644 --- a/manifests/base/executionspace/deployment.yaml +++ b/manifests/base/executionspace/deployment.yaml @@ -3,7 +3,6 @@ kind: ConfigMap metadata: name: etos-executionspace data: - TZ: "Europe/Stockholm" PROVIDER_HOSTNAME: http://etos-executionspace --- apiVersion: apps/v1 diff --git a/pkg/executionspace/v1alpha/executor.go b/pkg/executionspace/v1alpha/executor.go index 027bc2f..e98f4e1 100644 --- a/pkg/executionspace/v1alpha/executor.go +++ b/pkg/executionspace/v1alpha/executor.go @@ -24,9 +24,9 @@ import ( "time" "github.com/eiffel-community/eiffelevents-sdk-go" - "github.com/eiffel-community/etos-api/internal/executionspace/eventrepository" + config "github.com/eiffel-community/etos-api/internal/configs/executionspace" + "github.com/eiffel-community/etos-api/internal/eventrepository" "github.com/eiffel-community/etos-api/internal/executionspace/executor" - "github.com/eiffel-community/etos-api/internal/executionspace/responses" "github.com/eiffel-community/etos-api/pkg/executionspace/executionspace" "github.com/google/uuid" "github.com/julienschmidt/httprouter" @@ -61,7 +61,7 @@ func (h ProviderServiceHandler) ExecutorStart(w http.ResponseWriter, r *http.Req msg := fmt.Errorf("There was an error when preparing the %s execution space", executorName) logger.WithField("user_log", true).Error(msg) h.recordOtelException(span, msg) - responses.RespondWithError(w, http.StatusBadRequest, "could not read ID from post body") + RespondWithError(w, http.StatusBadRequest, "could not read ID from post body") return } @@ -71,10 +71,10 @@ func (h ProviderServiceHandler) ExecutorStart(w http.ResponseWriter, r *http.Req if ctx.Err() != nil { logger.WithField("user_log", true).Error(msg) h.recordOtelException(span, msg) - responses.RespondWithError(w, http.StatusRequestTimeout, msg.Error()) + RespondWithError(w, http.StatusRequestTimeout, msg.Error()) return } - responses.RespondWithError(w, http.StatusBadRequest, msg.Error()) + RespondWithError(w, http.StatusBadRequest, msg.Error()) logger.WithField("user_log", true).Error(msg) return } @@ -85,13 +85,13 @@ func (h ProviderServiceHandler) ExecutorStart(w http.ResponseWriter, r *http.Req if err != nil { if ctx.Err() != nil { msg := fmt.Errorf("Timed out when trying to start the test execution job") - responses.RespondWithError(w, http.StatusRequestTimeout, msg.Error()) + RespondWithError(w, http.StatusRequestTimeout, msg.Error()) logger.WithField("user_log", true).Error(msg) h.recordOtelException(span, msg) return } msg := fmt.Errorf("Error trying to start the test execution job: %s", err.Error()) - responses.RespondWithError(w, http.StatusInternalServerError, msg.Error()) + RespondWithError(w, http.StatusInternalServerError, msg.Error()) logger.WithField("user_log", true).Error(msg) h.recordOtelException(span, msg) return @@ -106,13 +106,13 @@ func (h ProviderServiceHandler) ExecutorStart(w http.ResponseWriter, r *http.Req } if ctx.Err() != nil { msg := fmt.Errorf("Timed out when waiting for the test execution job to start - Error: %s", err.Error()) - responses.RespondWithError(w, http.StatusRequestTimeout, msg.Error()) + RespondWithError(w, http.StatusRequestTimeout, msg.Error()) logger.WithField("user_log", true).Error(msg) h.recordOtelException(span, msg) return } msg := fmt.Errorf("Error when waiting for the test execution job to start - Error: %s", err.Error()) - responses.RespondWithError(w, http.StatusInternalServerError, msg.Error()) + RespondWithError(w, http.StatusInternalServerError, msg.Error()) logger.WithField("user_log", true).Error(msg) h.recordOtelException(span, msg) return @@ -128,33 +128,33 @@ func (h ProviderServiceHandler) ExecutorStart(w http.ResponseWriter, r *http.Req } if ctx.Err() != nil { msg := fmt.Errorf("Timed out when saving the test execution configuration") - responses.RespondWithError(w, http.StatusRequestTimeout, msg.Error()) + RespondWithError(w, http.StatusRequestTimeout, msg.Error()) logger.WithField("user_log", true).Error(msg) h.recordOtelException(span, msg) return } msg := fmt.Errorf("Error when saving the test execution configuration") - responses.RespondWithError(w, http.StatusInternalServerError, msg.Error()) + RespondWithError(w, http.StatusInternalServerError, msg.Error()) logger.WithField("user_log", true).Error(msg) h.recordOtelException(span, msg) return } subSuiteState := state{ExecutorSpec: executor} - if err = subSuiteState.waitStart(ctx, logger, h.provider.Executor()); err != nil { + if err = subSuiteState.waitStart(ctx, h.cfg, logger, h.provider.Executor()); err != nil { if cancelErr := h.provider.Executor().Stop(context.Background(), logger, buildID); cancelErr != nil { msg := fmt.Errorf("cancel failed: %s", cancelErr.Error()) logger.Error(msg) } if ctx.Err() != nil { msg := fmt.Errorf("Timed out when waiting for the test execution job to initialize - Error: %s", err.Error()) - responses.RespondWithError(w, http.StatusRequestTimeout, msg.Error()) + RespondWithError(w, http.StatusRequestTimeout, msg.Error()) logger.WithField("user_log", true).Error(msg) h.recordOtelException(span, msg) return } msg := fmt.Errorf("Error when waiting for the test execution job to initialize - Error: %s", err.Error()) - responses.RespondWithError(w, http.StatusBadRequest, msg.Error()) + RespondWithError(w, http.StatusBadRequest, msg.Error()) logger.WithField("user_log", true).Error(msg) h.recordOtelException(span, msg) return @@ -165,7 +165,8 @@ func (h ProviderServiceHandler) ExecutorStart(w http.ResponseWriter, r *http.Req if buildURL != "" { logger.WithField("user_log", true).Info("Executor build URL: ", buildURL) } - responses.RespondWithError(w, http.StatusNoContent, "") + w.WriteHeader(http.StatusNoContent) + _, _ = w.Write([]byte("")) } type state struct { @@ -175,23 +176,23 @@ type state struct { } // getSubSuite gets a sub suite from event repository -func (s *state) getSubSuite(logger *logrus.Entry, ctx context.Context) (*eiffelevents.TestSuiteStartedV3, error) { +func (s *state) getSubSuite(ctx context.Context, cfg config.Config) (*eiffelevents.TestSuiteStartedV3, error) { if s.environment == nil { - event, err := eventrepository.EnvironmentDefined(ctx, s.ExecutorSpec.Instructions.Environment["ENVIRONMENT_ID"]) + event, err := eventrepository.EnvironmentDefined(ctx, cfg.EiffelGoerURL(), s.ExecutorSpec.Instructions.Environment["ENVIRONMENT_ID"]) if err != nil { return nil, err } s.environment = event } if s.environment != nil && s.mainSuite == nil { - event, err := eventrepository.MainSuiteStarted(ctx, s.environment.Links.FindFirst("CONTEXT")) + event, err := eventrepository.MainSuiteStarted(ctx, cfg.EiffelGoerURL(), s.environment.Links.FindFirst("CONTEXT")) if err != nil { return nil, err } s.mainSuite = event } if s.mainSuite != nil && s.environment != nil { - event, err := eventrepository.TestSuiteStarted(ctx, s.mainSuite.Meta.ID, s.environment.Data.Name) + event, err := eventrepository.TestSuiteStarted(ctx, cfg.EiffelGoerURL(), s.mainSuite.Meta.ID, s.environment.Data.Name) if err != nil { return nil, err } @@ -201,10 +202,10 @@ func (s *state) getSubSuite(logger *logrus.Entry, ctx context.Context) (*eiffele } // waitStart waits for a job to start completely -func (s *state) waitStart(ctx context.Context, logger *logrus.Entry, executor executor.Executor) error { +func (s *state) waitStart(ctx context.Context, cfg config.Config, logger *logrus.Entry, executor executor.Executor) error { var event *eiffelevents.TestSuiteStartedV3 var err error - if err = retry.Fibonacci(ctx, 5*time.Second, func(ctx context.Context) error { + if err = retry.Constant(ctx, 5*time.Second, func(ctx context.Context) error { alive, err := executor.Alive(ctx, logger, s.ExecutorSpec.BuildID) if err != nil { logger.Errorf("Retrying - %s", err.Error()) @@ -214,13 +215,11 @@ func (s *state) waitStart(ctx context.Context, logger *logrus.Entry, executor ex if !alive { return errors.New("test runner did not start properly") } - if event == nil { - event, err = s.getSubSuite(logger, ctx) - if err != nil { - logger.Errorf("Retrying - %s", err.Error()) - // TODO: Verify that this is always retryable - return retry.RetryableError(err) - } + event, err = s.getSubSuite(ctx, cfg) + if err != nil { + logger.Errorf("Retrying - %s", err.Error()) + // TODO: Verify that this is always retryable + return retry.RetryableError(err) } if event == nil { return retry.RetryableError(errors.New("not yet started")) diff --git a/pkg/executionspace/v1alpha/provider.go b/pkg/executionspace/v1alpha/provider.go index 33c1e7d..0bbfd69 100644 --- a/pkg/executionspace/v1alpha/provider.go +++ b/pkg/executionspace/v1alpha/provider.go @@ -30,7 +30,6 @@ import ( "github.com/eiffel-community/eiffelevents-sdk-go" config "github.com/eiffel-community/etos-api/internal/configs/executionspace" "github.com/eiffel-community/etos-api/internal/executionspace/provider" - "github.com/eiffel-community/etos-api/internal/executionspace/responses" "github.com/eiffel-community/etos-api/pkg/application" httperrors "github.com/eiffel-community/etos-api/pkg/executionspace/errors" "github.com/eiffel-community/etos-api/pkg/executionspace/executionspace" @@ -190,7 +189,7 @@ func (h ProviderServiceHandler) recordOtelException(span trace.Span, err error) // Selftest is a handler to just return 204. func (h ProviderServiceHandler) Selftest(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { - responses.RespondWithError(w, http.StatusNoContent, "") + RespondWithError(w, http.StatusNoContent, "") } // Start handles the start request and checks out execution spaces @@ -204,7 +203,7 @@ func (h ProviderServiceHandler) Start(w http.ResponseWriter, r *http.Request, ps _, span := h.getOtelTracer().Start(ctx, "start", trace.WithSpanKind(trace.SpanKindServer)) defer span.End() - startReq, err := h.verifyStartInput(logger, r) + startReq, err := h.verifyStartInput(r) if err != nil { msg := fmt.Errorf("start input could not be verified: %s", err.Error()) logger.Error(msg) @@ -234,7 +233,7 @@ func (h ProviderServiceHandler) Start(w http.ResponseWriter, r *http.Request, ps span.SetAttributes(attribute.String("etos.execution_space_provider.checkout.environment", fmt.Sprintf("%v", startReq.Environment))) span.SetAttributes(attribute.String("etos.execution_space_provider.checkout.id", checkoutId.String())) - responses.RespondWithJSON(w, http.StatusOK, StartResponse{ID: checkoutId}) + RespondWithJSON(w, http.StatusOK, StartResponse{ID: checkoutId}) } // Status handles the status request, gets and returns the execution space checkout status @@ -259,7 +258,7 @@ func (h ProviderServiceHandler) Status(w http.ResponseWriter, r *http.Request, p msg := fmt.Errorf("Failed to retrieve execution space status (id=%s) - Reason: %s", id, err.Error()) logger.Error(msg.Error()) h.recordOtelException(span, msg) - responses.RespondWithJSON(w, http.StatusInternalServerError, executionSpace) + RespondWithJSON(w, http.StatusInternalServerError, executionSpace) return } @@ -268,7 +267,7 @@ func (h ProviderServiceHandler) Status(w http.ResponseWriter, r *http.Request, p attribute.String("etos.execution_space_provider.status.executorspec", fmt.Sprintf("%v", executorSpec)), ) } - responses.RespondWithJSON(w, http.StatusOK, executionSpace) + RespondWithJSON(w, http.StatusOK, executionSpace) } // Stop handles the stop request, stops the execution space executors and checks in all the provided execution spaces @@ -293,14 +292,15 @@ func (h ProviderServiceHandler) Stop(w http.ResponseWriter, r *http.Request, ps defer r.Body.Close() err = nil + for _, executorSpec := range executors { - id, stopErr := h.provider.Job(r.Context(), executorSpec.ID) - if stopErr != nil { - if errors.Is(stopErr, io.EOF) { + id, jobInitErr := h.provider.Job(r.Context(), executorSpec.ID) + if jobInitErr != nil { + if errors.Is(jobInitErr, io.EOF) { // Already been checked in continue } - err = errors.Join(err, stopErr) + err = errors.Join(err, jobInitErr) continue } // If the executorSpec does not exist in the database, we should not @@ -308,13 +308,11 @@ func (h ProviderServiceHandler) Stop(w http.ResponseWriter, r *http.Request, ps if id == "" { continue } - if stopErr := h.provider.Executor().Stop(r.Context(), logger, id); stopErr != nil { - err = errors.Join(err, stopErr) - } success := true - if stopErr != nil { + if stopErr := h.provider.Executor().Stop(r.Context(), logger, id); stopErr != nil { success = false - msg := fmt.Errorf("Failed to stop executor %v - Reason: %s", id, stopErr.Error()) + err = errors.Join(err, stopErr) + msg := fmt.Errorf("Failed to stop executor %v - Reason: %s", id, err.Error()) logger.Error(msg) h.recordOtelException(span, msg) } @@ -324,7 +322,7 @@ func (h ProviderServiceHandler) Stop(w http.ResponseWriter, r *http.Request, ps msg := fmt.Errorf("Some of the executors could not be stopped - Reason: %s", err.Error()) logger.Error(msg) h.recordOtelException(span, msg) - responses.RespondWithJSON(w, http.StatusInternalServerError, err.Error()) + RespondWithJSON(w, http.StatusInternalServerError, err.Error()) return } @@ -332,24 +330,24 @@ func (h ProviderServiceHandler) Stop(w http.ResponseWriter, r *http.Request, ps msg := fmt.Errorf("Failed to check in executors: %v - Reason: %s", executors, err) logger.Error(msg) h.recordOtelException(span, msg) - responses.RespondWithJSON(w, http.StatusInternalServerError, msg) + RespondWithJSON(w, http.StatusInternalServerError, msg) return } - responses.RespondWithJSON(w, http.StatusNoContent, "") + RespondWithJSON(w, http.StatusNoContent, "") } // sendError sends an error HTTP response depending on which error has been returned. func sendError(w http.ResponseWriter, err error) { httpError, ok := err.(*httperrors.HTTPError) if !ok { - responses.RespondWithError(w, http.StatusInternalServerError, fmt.Sprintf("unknown error %+v", err)) + RespondWithError(w, http.StatusInternalServerError, fmt.Sprintf("unknown error %+v", err)) } else { - responses.RespondWithError(w, httpError.Code, httpError.Message) + RespondWithError(w, httpError.Code, httpError.Message) } } // verifyStartInput verify input (json body) from a start request -func (h ProviderServiceHandler) verifyStartInput(logger *logrus.Entry, r *http.Request) (StartRequest, error) { +func (h ProviderServiceHandler) verifyStartInput(r *http.Request) (StartRequest, error) { request := StartRequest{} defer r.Body.Close() if err := json.NewDecoder(r.Body).Decode(&request); err != nil { @@ -394,7 +392,7 @@ func (h ProviderServiceHandler) panicRecovery( r.Context(), ).Errorf("recovering from err %+v\n %s", err, buf) identifier := ps.ByName("identifier") - responses.RespondWithError( + RespondWithError( w, http.StatusInternalServerError, fmt.Sprintf("unknown error: contact server admin with id '%s'", identifier), diff --git a/internal/executionspace/responses/responses.go b/pkg/executionspace/v1alpha/responses.go similarity index 98% rename from internal/executionspace/responses/responses.go rename to pkg/executionspace/v1alpha/responses.go index 5f40b4b..964043e 100644 --- a/internal/executionspace/responses/responses.go +++ b/pkg/executionspace/v1alpha/responses.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package responses +package providerservice import ( "encoding/json"