Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: better reporting of airbyte chart errors #112

Merged
merged 6 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions internal/cmd/local/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"strings"
"time"

v1 "k8s.io/api/apps/v1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -28,7 +28,7 @@ var DefaultPersistentVolumeSize = resource.MustParse("500Mi")
// Client primarily for testing purposes
type Client interface {
// DeploymentList returns a list of all the services within the namespace
DeploymentList(ctx context.Context, namespace string) (*v1.DeploymentList, error)
DeploymentList(ctx context.Context, namespace string) (*appsv1.DeploymentList, error)
// DeploymentRestart will force a restart of the deployment name in the provided namespace.
// This is a blocking call, it should only return once the deployment has completed.
DeploymentRestart(ctx context.Context, namespace, name string) error
Expand Down Expand Up @@ -76,6 +76,7 @@ type Client interface {

LogsGet(ctx context.Context, namespace string, name string) (string, error)
StreamPodLogs(ctx context.Context, namespace string, podName string, since time.Time) (io.ReadCloser, error)
PodList(ctx context.Context, namespace string) (*corev1.PodList, error)
}

var _ Client = (*DefaultK8sClient)(nil)
Expand All @@ -85,7 +86,7 @@ type DefaultK8sClient struct {
ClientSet kubernetes.Interface
}

func (d *DefaultK8sClient) DeploymentList(ctx context.Context, namespace string) (*v1.DeploymentList, error) {
func (d *DefaultK8sClient) DeploymentList(ctx context.Context, namespace string) (*appsv1.DeploymentList, error) {
return d.ClientSet.AppsV1().Deployments(namespace).List(ctx, metav1.ListOptions{})
}

Expand Down Expand Up @@ -329,8 +330,12 @@ func (d *DefaultK8sClient) LogsGet(ctx context.Context, namespace string, name s

func (d *DefaultK8sClient) StreamPodLogs(ctx context.Context, namespace string, podName string, since time.Time) (io.ReadCloser, error) {
req := d.ClientSet.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{
Follow: true,
Follow: true,
SinceTime: &metav1.Time{Time: since},
})
return req.Stream(ctx)
}

func (d *DefaultK8sClient) PodList(ctx context.Context, namespace string) (*corev1.PodList, error) {
return d.ClientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
}
8 changes: 8 additions & 0 deletions internal/cmd/local/k8s/k8stest/k8stest.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type MockClient struct {
FnEventsWatch func(ctx context.Context, namespace string) (watch.Interface, error)
FnLogsGet func(ctx context.Context, namespace string, name string) (string, error)
FnStreamPodLogs func(ctx context.Context, namespace, podName string, since time.Time) (io.ReadCloser, error)
FnPodList func(ctx context.Context, namespace string) (*corev1.PodList, error)
}

func (m *MockClient) DeploymentList(ctx context.Context, namespace string) (*v1.DeploymentList, error) {
Expand Down Expand Up @@ -176,3 +177,10 @@ func (m *MockClient) StreamPodLogs(ctx context.Context, namespace string, podNam
}
return m.FnStreamPodLogs(ctx, namespace, podName, since)
}

func (m *MockClient) PodList(ctx context.Context, namespace string) (*corev1.PodList, error) {
if m.FnPodList == nil {
return nil, nil
}
return m.FnPodList(ctx, namespace)
}
68 changes: 37 additions & 31 deletions internal/cmd/local/local/install.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package local

import (
"bufio"
"context"
"fmt"
"net/http"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -162,7 +160,6 @@ func (c *Command) Install(ctx context.Context, opts InstallOpts) error {

if opts.Migrate {
c.spinner.UpdateText("Migrating airbyte data")
//if err := c.tel.Wrap(ctx, telemetry.Migrate, func() error { return opts.Docker.MigrateComposeDB(ctx, "airbyte_db") }); err != nil {
if err := c.tel.Wrap(ctx, telemetry.Migrate, func() error { return migrate.FromDockerVolume(ctx, opts.Docker.Client, "airbyte_db") }); err != nil {
pterm.Error.Println("Failed to migrate data from previous Airbyte installation")
return fmt.Errorf("unable to migrate data from previous airbyte installation: %w", err)
Expand Down Expand Up @@ -275,7 +272,7 @@ func (c *Command) Install(ctx context.Context, opts InstallOpts) error {
namespace: airbyteNamespace,
valuesYAML: valuesYAML,
}); err != nil {
return fmt.Errorf("unable to install airbyte chart: %w", err)
return c.diagnoseAirbyteChartFailure(ctx, err)
}

if err := c.handleChart(ctx, chartRequest{
Expand Down Expand Up @@ -331,6 +328,35 @@ func (c *Command) Install(ctx context.Context, opts InstallOpts) error {
return nil
}

func (c *Command) diagnoseAirbyteChartFailure(ctx context.Context, chartErr error) error {

if podList, err := c.k8s.PodList(ctx, airbyteNamespace); err == nil {

errors := []string{}
for _, pod := range podList.Items {
if pod.Status.Phase == corev1.PodFailed {
msg := "unknown"

logs, err := c.k8s.LogsGet(ctx, airbyteNamespace, pod.Name)
if err != nil {
msg = "unknown: failed to get pod logs."
}
m, err := getLastJavaLogError(strings.NewReader(logs))
if err != nil {
msg = "unknown: failed to find error log."
}
if m != "" {
msg = m
}

errors = append(errors, fmt.Sprintf("pod %s: %s", pod.Name, msg))
}
}
return fmt.Errorf("unable to install airbyte chart:\n%s", strings.Join(errors, "\n"))
}
return fmt.Errorf("unable to install airbyte chart: %w", chartErr)
}

func (c *Command) handleIngress(ctx context.Context, hosts []string) error {
c.spinner.UpdateText("Checking for existing Ingress")

Expand Down Expand Up @@ -380,43 +406,23 @@ func (c *Command) watchEvents(ctx context.Context) {
}
}

// 2024-09-10 20:16:24 WARN i.m.s.r.u.Loggers$Slf4JLogger(warn):299 - [273....
var javaLogRx = regexp.MustCompile(`^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} \x1b\[(?:1;)?\d+m(?P<level>[A-Z]+)\x1b\[m (?P<msg>\S+ - .*)`)

func (c *Command) streamPodLogs(ctx context.Context, namespace, podName, prefix string, since time.Time) error {
r, err := c.k8s.StreamPodLogs(ctx, namespace, podName, since)
if err != nil {
return err
}
defer r.Close()

level := pterm.Debug
scanner := bufio.NewScanner(r)

for scanner.Scan() {

// skip java stacktrace noise
if strings.HasPrefix(scanner.Text(), "\tat ") || strings.HasPrefix(scanner.Text(), "\t... ") {
continue
}

m := javaLogRx.FindSubmatch(scanner.Bytes())
var msg string

if m != nil {
msg = string(m[2])
if string(m[1]) == "ERROR" {
level = pterm.Error
} else {
level = pterm.Debug
}
s := newJavaLogScanner(r)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is true that most of the pods are running Java services, but not all. Just wanna make note of that in case we ever try to use this to pull errors from temporal or something.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya, the log parsing stuff is fragile right now. And maybe I should remove the java part of the name. It's really just a log scanner for a certain format.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added tests and cleaned up the naming to not mention java

for s.Scan() {
if s.line.level == "ERROR" {
pterm.Error.Printfln("%s: %s", prefix, s.line.msg)
} else {
msg = scanner.Text()
pterm.Debug.Printfln("%s: %s", prefix, s.line.msg)
}

level.Printfln("%s: %s", prefix, msg)
}
return scanner.Err()

return s.Err()
}

func (c *Command) watchBootloaderLogs(ctx context.Context) {
Expand Down
80 changes: 80 additions & 0 deletions internal/cmd/local/local/java_logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package local

import (
"bufio"
"io"
"regexp"
"strings"
)

// 2024-09-10 20:16:24 WARN i.m.s.r.u.Loggers$Slf4JLogger(warn):299 - [273....
var javaLogRx = regexp.MustCompile(`^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} \x1b\[(?:1;)?\d+m(?P<level>[A-Z]+)\x1b\[m (?P<msg>\S+ - .*)`)

type javaLogLine struct {
msg string
level string
}

type javaLogScanner struct {
scanner *bufio.Scanner
line javaLogLine
}

func newJavaLogScanner(r io.Reader) *javaLogScanner {
return &javaLogScanner{
scanner: bufio.NewScanner(r),
line: javaLogLine{
msg: "",
level: "DEBUG",
},
}
}

func (j *javaLogScanner) Scan() bool {
for {
if ok := j.scanner.Scan(); !ok {
return false
}

// skip java stacktrace noise
if strings.HasPrefix(j.scanner.Text(), "\tat ") || strings.HasPrefix(j.scanner.Text(), "\t... ") {
continue
}

m := javaLogRx.FindSubmatch(j.scanner.Bytes())

if m != nil {
j.line.msg = string(m[2])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing [0] os the input string or the bytes leading up to the first capture group?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the text of the leftmost match of the regular expression in b

according to https://pkg.go.dev/regexp#Regexp.FindSubmatch

j.line.level = string(m[1])
} else {
j.line.msg = j.scanner.Text()
}
return true
}
}

func (j *javaLogScanner) Err() error {
return j.scanner.Err()
}

func getAllJavaLogLines(r io.Reader) ([]javaLogLine, error) {
lines := []javaLogLine{}
s := newJavaLogScanner(r)
for s.Scan() {
lines = append(lines, s.line)
}
return lines, s.Err()
}

func getLastJavaLogError(r io.Reader) (string, error) {
lines, err := getAllJavaLogLines(r)
if err != nil {
return "", err
}
for i := len(lines) - 1; i >= 0; i-- {
if lines[i].level == "ERROR" {
return lines[i].msg, nil
}
}
return "", nil
}