diff --git a/sdks/go.mod b/sdks/go.mod index 74556ee12a55..706be73f97f6 100644 --- a/sdks/go.mod +++ b/sdks/go.mod @@ -20,7 +20,7 @@ // directory. module github.com/apache/beam/sdks/v2 -go 1.21 +go 1.21.0 require ( cloud.google.com/go/bigquery v1.63.1 @@ -69,6 +69,8 @@ require ( require ( github.com/avast/retry-go/v4 v4.6.0 github.com/fsouza/fake-gcs-server v1.49.2 + github.com/golang-cz/devslog v0.0.11 + github.com/golang/protobuf v1.5.4 golang.org/x/exp v0.0.0-20231006140011-7918f672742d ) diff --git a/sdks/go.sum b/sdks/go.sum index af68a630addd..fa3c75bd3395 100644 --- a/sdks/go.sum +++ b/sdks/go.sum @@ -853,6 +853,8 @@ github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang-cz/devslog v0.0.11 h1:v4Yb9o0ZpuZ/D8ZrtVw1f9q5XrjnkxwHF1XmWwO8IHg= +github.com/golang-cz/devslog v0.0.11/go.mod h1:bSe5bm0A7Nyfqtijf1OMNgVJHlWEuVSXnkuASiE1vV8= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4= diff --git a/sdks/go/cmd/prism/prism.go b/sdks/go/cmd/prism/prism.go index 39c19df00dc3..070d2f023b74 100644 --- a/sdks/go/cmd/prism/prism.go +++ b/sdks/go/cmd/prism/prism.go @@ -22,9 +22,14 @@ import ( "flag" "fmt" "log" + "log/slog" + "os" + "strings" + "time" jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism" + "github.com/golang-cz/devslog" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) @@ -37,10 +42,52 @@ var ( idleShutdownTimeout = flag.Duration("idle_shutdown_timeout", -1, "duration that prism will wait for a new job before shutting itself down. Negative durations disable auto shutdown. Defaults to never shutting down.") ) +// Logging flags +var ( + debug = flag.Bool("debug", false, + "Enables full verbosity debug logging from the runner by default. Used to build SDKs or debug Prism itself.") + logKind = flag.String("log_kind", "dev", + "Determines the format of prism's logging to std err: valid values are `dev', 'json', or 'text'. Default is `dev`.") +) + +var logLevel = new(slog.LevelVar) + func main() { flag.Parse() ctx, cancel := context.WithCancelCause(context.Background()) + var logHandler slog.Handler + loggerOutput := os.Stderr + handlerOpts := &slog.HandlerOptions{ + Level: logLevel, + AddSource: *debug, + } + if *debug { + logLevel.Set(slog.LevelDebug) + // Print the Prism source line for a log in debug mode. + handlerOpts.AddSource = true + } + switch strings.ToLower(*logKind) { + case "dev": + logHandler = + devslog.NewHandler(loggerOutput, &devslog.Options{ + TimeFormat: "[" + time.RFC3339Nano + "]", + StringerFormatter: true, + HandlerOptions: handlerOpts, + StringIndentation: false, + NewLineAfterLog: true, + MaxErrorStackTrace: 3, + }) + case "json": + logHandler = slog.NewJSONHandler(loggerOutput, handlerOpts) + case "text": + logHandler = slog.NewTextHandler(loggerOutput, handlerOpts) + default: + log.Fatalf("Invalid value for log_kind: %v, must be 'dev', 'json', or 'text'", *logKind) + } + + slog.SetDefault(slog.New(logHandler)) + cli, err := makeJobClient(ctx, prism.Options{ Port: *jobPort, diff --git a/sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go b/sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go index c71ead208364..06bb727178fc 100644 --- a/sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go +++ b/sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go @@ -19,12 +19,12 @@ import ( "bytes" "fmt" "log" + "log/slog" "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" - "golang.org/x/exp/slog" ) // FromMonitoringInfos extracts metrics from monitored states and diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders.go b/sdks/go/pkg/beam/runners/prism/internal/coders.go index eb8abe16ecf8..ffea90e79065 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/coders.go +++ b/sdks/go/pkg/beam/runners/prism/internal/coders.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io" + "log/slog" "strings" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" @@ -28,7 +29,6 @@ import ( pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns" - "golang.org/x/exp/slog" "google.golang.org/protobuf/encoding/prototext" ) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/data.go b/sdks/go/pkg/beam/runners/prism/internal/engine/data.go index eaaf7f831712..7b8689f95112 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/data.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/data.go @@ -18,12 +18,12 @@ package engine import ( "bytes" "fmt" + "log/slog" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" - "golang.org/x/exp/slog" ) // StateData is a "union" between Bag state and MultiMap state to increase common code. diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index f7229853e4d3..3cfde4701a8f 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -23,6 +23,7 @@ import ( "context" "fmt" "io" + "log/slog" "sort" "strings" "sync" @@ -36,7 +37,6 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" "golang.org/x/exp/maps" - "golang.org/x/exp/slog" ) type element struct { @@ -1607,7 +1607,7 @@ func (ss *stageState) bundleReady(em *ElementManager, emNow mtime.Time) (mtime.T inputW := ss.input _, upstreamW := ss.UpstreamWatermark() if inputW == upstreamW { - slog.Debug("bundleReady: insufficient upstream watermark", + slog.Debug("bundleReady: unchanged upstream watermark", slog.String("stage", ss.ID), slog.Group("watermark", slog.Any("upstream", upstreamW), diff --git a/sdks/go/pkg/beam/runners/prism/internal/environments.go b/sdks/go/pkg/beam/runners/prism/internal/environments.go index add7f769a702..2f960a04f0cb 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/environments.go +++ b/sdks/go/pkg/beam/runners/prism/internal/environments.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io" + "log/slog" "os" fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" @@ -27,7 +28,6 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/jobservices" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker" - "golang.org/x/exp/slog" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/proto" @@ -42,7 +42,7 @@ import ( // TODO move environment handling to the worker package. func runEnvironment(ctx context.Context, j *jobservices.Job, env string, wk *worker.W) error { - logger := slog.With(slog.String("envID", wk.Env)) + logger := j.Logger.With(slog.String("envID", wk.Env)) // TODO fix broken abstraction. // We're starting a worker pool here, because that's the loopback environment. // It's sort of a mess, largely because of loopback, which has @@ -56,7 +56,7 @@ func runEnvironment(ctx context.Context, j *jobservices.Job, env string, wk *wor } go func() { externalEnvironment(ctx, ep, wk) - slog.Debug("environment stopped", slog.String("job", j.String())) + logger.Debug("environment stopped", slog.String("job", j.String())) }() return nil case urns.EnvDocker: @@ -129,6 +129,8 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock credEnv := fmt.Sprintf("%v=%v", gcloudCredsEnv, dockerGcloudCredsFile) envs = append(envs, credEnv) } + } else { + logger.Debug("local GCP credentials environment variable not found") } if _, _, err := cli.ImageInspectWithRaw(ctx, dp.GetContainerImage()); err != nil { // We don't have a local image, so we should pull it. @@ -140,6 +142,7 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock logger.Warn("unable to pull image and it's not local", "error", err) } } + logger.Debug("creating container", "envs", envs, "mounts", mounts) ccr, err := cli.ContainerCreate(ctx, &container.Config{ Image: dp.GetContainerImage(), @@ -169,17 +172,32 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock return fmt.Errorf("unable to start container image %v with docker for env %v, err: %w", dp.GetContainerImage(), wk.Env, err) } + logger.Debug("container started") + // Start goroutine to wait on container state. go func() { defer cli.Close() defer wk.Stop() + defer func() { + logger.Debug("container stopped") + }() - statusCh, errCh := cli.ContainerWait(ctx, containerID, container.WaitConditionNotRunning) + bgctx := context.Background() + statusCh, errCh := cli.ContainerWait(bgctx, containerID, container.WaitConditionNotRunning) select { case <-ctx.Done(): - // Can't use command context, since it's already canceled here. - err := cli.ContainerKill(context.Background(), containerID, "") + rc, err := cli.ContainerLogs(bgctx, containerID, container.LogsOptions{Details: true, ShowStdout: true, ShowStderr: true}) if err != nil { + logger.Error("error fetching container logs error on context cancellation", "error", err) + } + if rc != nil { + defer rc.Close() + var buf bytes.Buffer + stdcopy.StdCopy(&buf, &buf, rc) + logger.Info("container being killed", slog.Any("cause", context.Cause(ctx)), slog.Any("containerLog", buf)) + } + // Can't use command context, since it's already canceled here. + if err := cli.ContainerKill(bgctx, containerID, ""); err != nil { logger.Error("docker container kill error", "error", err) } case err := <-errCh: @@ -189,7 +207,7 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock case resp := <-statusCh: logger.Info("docker container has self terminated", "status_code", resp.StatusCode) - rc, err := cli.ContainerLogs(ctx, containerID, container.LogsOptions{Details: true, ShowStdout: true, ShowStderr: true}) + rc, err := cli.ContainerLogs(bgctx, containerID, container.LogsOptions{Details: true, ShowStdout: true, ShowStderr: true}) if err != nil { logger.Error("docker container logs error", "error", err) } diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index d7605f34f5f2..614edee47721 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "io" + "log/slog" "sort" "sync/atomic" "time" @@ -34,7 +35,6 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker" "golang.org/x/exp/maps" - "golang.org/x/exp/slog" "golang.org/x/sync/errgroup" "google.golang.org/protobuf/proto" ) @@ -311,7 +311,7 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic return fmt.Errorf("prism error building stage %v: \n%w", stage.ID, err) } stages[stage.ID] = stage - slog.Debug("pipelineBuild", slog.Group("stage", slog.String("ID", stage.ID), slog.String("transformName", t.GetUniqueName()))) + j.Logger.Debug("pipelineBuild", slog.Group("stage", slog.String("ID", stage.ID), slog.String("transformName", t.GetUniqueName()))) outputs := maps.Keys(stage.OutputsToCoders) sort.Strings(outputs) em.AddStage(stage.ID, []string{stage.primaryInput}, outputs, stage.sideInputs) @@ -322,9 +322,7 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic em.StageProcessingTimeTimers(stage.ID, stage.processingTimeTimers) } default: - err := fmt.Errorf("unknown environment[%v]", t.GetEnvironmentId()) - slog.Error("Execute", err) - return err + return fmt.Errorf("unknown environment[%v]", t.GetEnvironmentId()) } } @@ -344,11 +342,13 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic for { select { case <-ctx.Done(): - return context.Cause(ctx) + err := context.Cause(ctx) + j.Logger.Debug("context canceled", slog.Any("cause", err)) + return err case rb, ok := <-bundles: if !ok { err := eg.Wait() - slog.Debug("pipeline done!", slog.String("job", j.String()), slog.Any("error", err)) + j.Logger.Debug("pipeline done!", slog.String("job", j.String()), slog.Any("error", err), slog.Any("topo", topo)) return err } eg.Go(func() error { diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go index 8590fd0d4ced..be9d39ad02b7 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go +++ b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go @@ -19,6 +19,7 @@ import ( "bytes" "fmt" "io" + "log/slog" "reflect" "sort" @@ -31,7 +32,6 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker" - "golang.org/x/exp/slog" "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/proto" ) diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go index 99b786d45980..e42e3e7ca666 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go @@ -20,9 +20,9 @@ import ( "context" "fmt" "io" + "log/slog" jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1" - "golang.org/x/exp/slog" "google.golang.org/protobuf/encoding/prototext" ) @@ -77,7 +77,7 @@ func (s *Server) ReverseArtifactRetrievalService(stream jobpb.ArtifactStagingSer case *jobpb.ArtifactResponseWrapper_ResolveArtifactResponse: err := fmt.Errorf("unexpected ResolveArtifactResponse to GetArtifact: %v", in.GetResponse()) - slog.Error("GetArtifact failure", err) + slog.Error("GetArtifact failure", slog.Any("error", err)) return err } } diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go index 1407feafe325..deef259a99d1 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go @@ -27,6 +27,7 @@ package jobservices import ( "context" "fmt" + "log/slog" "sort" "strings" "sync" @@ -37,7 +38,6 @@ import ( jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns" - "golang.org/x/exp/slog" "google.golang.org/protobuf/types/known/structpb" ) @@ -88,6 +88,8 @@ type Job struct { // Context used to terminate this job. RootCtx context.Context CancelFn context.CancelCauseFunc + // Logger for this job. + Logger *slog.Logger metrics metricsStore } diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go index b957b99ca63d..a2840760bf7a 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go @@ -19,6 +19,7 @@ import ( "context" "errors" "fmt" + "log/slog" "sync" "sync/atomic" @@ -27,7 +28,6 @@ import ( pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns" "golang.org/x/exp/maps" - "golang.org/x/exp/slog" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -92,6 +92,7 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (_ * cancelFn(err) terminalOnceWrap() }, + Logger: s.logger, // TODO substitute with a configured logger. artifactEndpoint: s.Endpoint(), } // Stop the idle timer when a new job appears. diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go index 03d5b0a98369..bbbdfd1eba4f 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go @@ -19,6 +19,7 @@ import ( "bytes" "fmt" "hash/maphash" + "log/slog" "math" "sort" "sync" @@ -28,7 +29,6 @@ import ( fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" "golang.org/x/exp/constraints" - "golang.org/x/exp/slog" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/reflect/protoreflect" ) @@ -589,7 +589,7 @@ func (m *metricsStore) AddShortIDs(resp *fnpb.MonitoringInfosMetadataResponse) { urn := mi.GetUrn() ops, ok := mUrn2Ops[urn] if !ok { - slog.Debug("unknown metrics urn", slog.String("urn", urn)) + slog.Debug("unknown metrics urn", slog.String("shortID", short), slog.String("urn", urn), slog.String("type", mi.Type)) continue } key := ops.keyFn(urn, mi.GetLabels()) diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go index 320159f54c06..bdfe2aff2dd4 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go @@ -18,6 +18,7 @@ package jobservices import ( "context" "fmt" + "log/slog" "math" "net" "os" @@ -27,7 +28,6 @@ import ( fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1" - "golang.org/x/exp/slog" "google.golang.org/grpc" ) @@ -53,6 +53,7 @@ type Server struct { terminatedJobCount uint32 // Use with atomics. idleTimeout time.Duration cancelFn context.CancelCauseFunc + logger *slog.Logger // execute defines how a job is executed. execute func(*Job) @@ -71,8 +72,9 @@ func NewServer(port int, execute func(*Job)) *Server { lis: lis, jobs: make(map[string]*Job), execute: execute, + logger: slog.Default(), // TODO substitute with a configured logger. } - slog.Info("Serving JobManagement", slog.String("endpoint", s.Endpoint())) + s.logger.Info("Serving JobManagement", slog.String("endpoint", s.Endpoint())) opts := []grpc.ServerOption{ grpc.MaxRecvMsgSize(math.MaxInt32), } diff --git a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go index 7de32f85b7ee..dceaa9ab8fcb 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go +++ b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go @@ -17,6 +17,7 @@ package internal import ( "fmt" + "log/slog" "sort" "strings" @@ -26,7 +27,6 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/jobservices" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns" "golang.org/x/exp/maps" - "golang.org/x/exp/slog" "google.golang.org/protobuf/proto" ) diff --git a/sdks/go/pkg/beam/runners/prism/internal/separate_test.go b/sdks/go/pkg/beam/runners/prism/internal/separate_test.go index 1be3d3e70841..650932f525c8 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/separate_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/separate_test.go @@ -18,6 +18,7 @@ package internal_test import ( "context" "fmt" + "log/slog" "net" "net/http" "net/rpc" @@ -34,7 +35,6 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats" - "golang.org/x/exp/slog" ) // separate_test.go retains structures and tests to ensure the runner can @@ -286,7 +286,7 @@ func (ws *Watchers) Check(args *Args, unblocked *bool) error { w.mu.Lock() *unblocked = w.sentinelCount >= w.sentinelCap w.mu.Unlock() - slog.Debug("sentinel target for watcher%d is %d/%d. unblocked=%v", args.WatcherID, w.sentinelCount, w.sentinelCap, *unblocked) + slog.Debug("sentinel watcher status", slog.Int("watcher", args.WatcherID), slog.Int("sentinelCount", w.sentinelCount), slog.Int("sentinelCap", w.sentinelCap), slog.Bool("unblocked", *unblocked)) return nil } @@ -360,7 +360,7 @@ func (fn *sepHarnessBase) setup() error { sepClientOnce.Do(func() { client, err := rpc.DialHTTP("tcp", fn.LocalService) if err != nil { - slog.Error("failed to dial sentinels server", err, slog.String("endpoint", fn.LocalService)) + slog.Error("failed to dial sentinels server", slog.Any("error", err), slog.String("endpoint", fn.LocalService)) panic(fmt.Sprintf("dialing sentinels server %v: %v", fn.LocalService, err)) } sepClient = client @@ -385,7 +385,7 @@ func (fn *sepHarnessBase) setup() error { var unblock bool err := sepClient.Call("Watchers.Check", &Args{WatcherID: id}, &unblock) if err != nil { - slog.Error("Watchers.Check: sentinels server error", err, slog.String("endpoint", fn.LocalService)) + slog.Error("Watchers.Check: sentinels server error", slog.Any("error", err), slog.String("endpoint", fn.LocalService)) panic("sentinel server error") } if unblock { @@ -406,7 +406,7 @@ func (fn *sepHarnessBase) block() { var ignored bool err := sepClient.Call("Watchers.Block", &Args{WatcherID: fn.WatcherID}, &ignored) if err != nil { - slog.Error("Watchers.Block error", err, slog.String("endpoint", fn.LocalService)) + slog.Error("Watchers.Block error", slog.Any("error", err), slog.String("endpoint", fn.LocalService)) panic(err) } c := sepWaitMap[fn.WatcherID] @@ -423,7 +423,7 @@ func (fn *sepHarnessBase) delay() bool { var delay bool err := sepClient.Call("Watchers.Delay", &Args{WatcherID: fn.WatcherID}, &delay) if err != nil { - slog.Error("Watchers.Delay error", err) + slog.Error("Watchers.Delay error", slog.Any("error", err)) panic(err) } return delay diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index f33754b2ca0a..9f00c22789b6 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io" + "log/slog" "runtime/debug" "sync/atomic" "time" @@ -33,7 +34,6 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker" "golang.org/x/exp/maps" - "golang.org/x/exp/slog" "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/proto" ) @@ -361,7 +361,7 @@ func portFor(wInCid string, wk *worker.W) []byte { } sourcePortBytes, err := proto.Marshal(sourcePort) if err != nil { - slog.Error("bad port", err, slog.String("endpoint", sourcePort.ApiServiceDescriptor.GetUrl())) + slog.Error("bad port", slog.Any("error", err), slog.String("endpoint", sourcePort.ApiServiceDescriptor.GetUrl())) } return sourcePortBytes } diff --git a/sdks/go/pkg/beam/runners/prism/internal/web/web.go b/sdks/go/pkg/beam/runners/prism/internal/web/web.go index 9fabe22cee3a..b14778e4462c 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/web/web.go +++ b/sdks/go/pkg/beam/runners/prism/internal/web/web.go @@ -26,6 +26,7 @@ import ( "fmt" "html/template" "io" + "log/slog" "net/http" "sort" "strings" @@ -40,7 +41,6 @@ import ( jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" "golang.org/x/exp/maps" - "golang.org/x/exp/slog" "golang.org/x/sync/errgroup" "google.golang.org/protobuf/proto" ) diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go index 3ccafdb81e9a..55cdb97f258c 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go @@ -19,12 +19,12 @@ import ( "bytes" "context" "fmt" + "log/slog" "sync/atomic" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine" - "golang.org/x/exp/slog" ) // SideInputKey is for data lookups for a given bundle. diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go index f9ec03793488..1f129595abef 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go @@ -22,10 +22,9 @@ import ( "context" "fmt" "io" + "log/slog" "math" "net" - "strconv" - "strings" "sync" "sync/atomic" "time" @@ -39,7 +38,6 @@ import ( pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns" - "golang.org/x/exp/slog" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -203,30 +201,46 @@ func (wk *W) Logging(stream fnpb.BeamFnLogging_LoggingServer) error { case codes.Canceled: return nil default: - slog.Error("logging.Recv", err, "worker", wk) + slog.Error("logging.Recv", slog.Any("error", err), slog.Any("worker", wk)) return err } } for _, l := range in.GetLogEntries() { - if l.Severity >= minsev { - // TODO: Connect to the associated Job for this worker instead of - // logging locally for SDK side logging. - file := l.GetLogLocation() - i := strings.LastIndex(file, ":") - line, _ := strconv.Atoi(file[i+1:]) - if i > 0 { - file = file[:i] - } + // TODO base this on a per pipeline logging setting. + if l.Severity < minsev { + continue + } + + // Ideally we'd be writing these to per-pipeline files, but for now re-log them on the Prism process. + // We indicate they're from the SDK, and which worker, keeping the same log severity. + // SDK specific and worker specific fields are in separate groups for legibility. - slog.LogAttrs(stream.Context(), toSlogSev(l.GetSeverity()), l.GetMessage(), - slog.Any(slog.SourceKey, &slog.Source{ - File: file, - Line: line, - }), - slog.Time(slog.TimeKey, l.GetTimestamp().AsTime()), - slog.Any("worker", wk), - ) + attrs := []any{ + slog.String("transformID", l.GetTransformId()), // TODO: pull the unique name from the pipeline graph. + slog.String("location", l.GetLogLocation()), + slog.Time(slog.TimeKey, l.GetTimestamp().AsTime()), + slog.String(slog.MessageKey, l.GetMessage()), } + if fs := l.GetCustomData().GetFields(); len(fs) > 0 { + var grp []any + for n, v := range l.GetCustomData().GetFields() { + var attr slog.Attr + switch v.Kind.(type) { + case *structpb.Value_BoolValue: + attr = slog.Bool(n, v.GetBoolValue()) + case *structpb.Value_NumberValue: + attr = slog.Float64(n, v.GetNumberValue()) + case *structpb.Value_StringValue: + attr = slog.String(n, v.GetStringValue()) + default: + attr = slog.Any(n, v.AsInterface()) + } + grp = append(grp, attr) + } + attrs = append(attrs, slog.Group("customData", grp...)) + } + + slog.LogAttrs(stream.Context(), toSlogSev(l.GetSeverity()), "log from SDK worker", slog.Any("worker", wk), slog.Group("sdk", attrs...)) } } } @@ -298,7 +312,7 @@ func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error { if b, ok := wk.activeInstructions[resp.GetInstructionId()]; ok { b.Respond(resp) } else { - slog.Debug("ctrl.Recv: %v", resp) + slog.Debug("ctrl.Recv", slog.Any("response", resp)) } wk.mu.Unlock() } @@ -355,7 +369,7 @@ func (wk *W) Data(data fnpb.BeamFnData_DataServer) error { case codes.Canceled: return default: - slog.Error("data.Recv failed", err, "worker", wk) + slog.Error("data.Recv failed", slog.Any("error", err), slog.Any("worker", wk)) panic(err) } } @@ -434,7 +448,7 @@ func (wk *W) State(state fnpb.BeamFnState_StateServer) error { case codes.Canceled: return default: - slog.Error("state.Recv failed", err, "worker", wk) + slog.Error("state.Recv failed", slog.Any("error", err), slog.Any("worker", wk)) panic(err) } } @@ -584,7 +598,7 @@ func (wk *W) State(state fnpb.BeamFnState_StateServer) error { }() for resp := range responses { if err := state.Send(resp); err != nil { - slog.Error("state.Send error", err) + slog.Error("state.Send", slog.Any("error", err)) } } return nil