-
Notifications
You must be signed in to change notification settings - Fork 202
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Showing
7 changed files
with
340 additions
and
20 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
This sample shows how to use Temporal's Datadog interceptor to add tracing to your worker and clients. | ||
|
||
### Setup | ||
|
||
To run this sample make sure you have an active datadog agent reachable. This sample assume you have the agent running at `localhost:8126` if not adjust the sample accordingly. | ||
|
||
https://docs.datadoghq.com/getting_started/agent/ | ||
|
||
#### Metrics | ||
|
||
Starting with version 6.5.0 the Datadog agent is capable of scraping prometheus endpoints. | ||
|
||
See more details here: | ||
https://docs.datadoghq.com/integrations/guide/prometheus-host-collection/ | ||
|
||
Example `openmetrics.d/conf.yaml` to collect metrics from this sample and emit them to datadog under "myapp" namespace. | ||
|
||
``` | ||
instances: | ||
- prometheus_url: http://localhost:9090/metrics | ||
namespace: "myapp" | ||
metrics: | ||
- temporal_datadog* | ||
``` | ||
|
||
#### Logging | ||
|
||
When using the Datadog interceptor all user created loggers will also include the trace and span ID. This makes | ||
it possible to correlate logs with traces. | ||
|
||
For documentation on how to configure your Datadog agent to upload logs see: | ||
https://docs.datadoghq.com/logs/log_collection/go/ | ||
|
||
### Steps to run this sample: | ||
1) Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use). | ||
2) Run the following command to start the worker | ||
``` | ||
go run datadog/worker/main.go | ||
``` | ||
3) Run the following command to start the example | ||
``` | ||
go run datadog/starter/main.go |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"log" | ||
|
||
"github.com/temporalio/samples-go/datadog" | ||
"go.temporal.io/sdk/client" | ||
"go.temporal.io/sdk/contrib/datadog/tracing" | ||
"go.temporal.io/sdk/interceptor" | ||
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" | ||
) | ||
|
||
func main() { | ||
// Start the tracer and defer the Stop method. | ||
tracer.Start(tracer.WithAgentAddr("localhost:8126")) | ||
defer tracer.Stop() | ||
|
||
// The client is a heavyweight object that should be created once per process. | ||
c, err := client.Dial(client.Options{ | ||
Interceptors: []interceptor.ClientInterceptor{tracing.NewTracingInterceptor(tracing.TracerOptions{})}, | ||
}) | ||
if err != nil { | ||
log.Fatalln("Unable to create client", err) | ||
} | ||
defer c.Close() | ||
|
||
workflowOptions := client.StartWorkflowOptions{ | ||
ID: "datadog_workflow_id", | ||
TaskQueue: "datadog", | ||
} | ||
|
||
we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, datadog.Workflow, "<param to log>") | ||
if err != nil { | ||
log.Fatalln("Unable to execute workflow", err) | ||
} | ||
|
||
log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID()) | ||
|
||
// Synchronously wait for the workflow completion. | ||
err = we.Get(context.Background(), nil) | ||
if err != nil { | ||
log.Fatalln("Unable get workflow result", err) | ||
} | ||
log.Println("Workflow completed. Check worker logs.") | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
package main | ||
|
||
import ( | ||
"io" | ||
"log" | ||
"log/slog" | ||
"os" | ||
"time" | ||
|
||
prom "github.com/prometheus/client_golang/prometheus" | ||
"github.com/temporalio/samples-go/datadog" | ||
"github.com/uber-go/tally/v4" | ||
"github.com/uber-go/tally/v4/prometheus" | ||
"go.temporal.io/sdk/client" | ||
"go.temporal.io/sdk/contrib/datadog/tracing" | ||
sdktally "go.temporal.io/sdk/contrib/tally" | ||
"go.temporal.io/sdk/interceptor" | ||
tlog "go.temporal.io/sdk/log" | ||
"go.temporal.io/sdk/worker" | ||
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" | ||
) | ||
|
||
func main() { | ||
// Start the tracer and defer the Stop method. | ||
tracer.Start(tracer.WithAgentAddr("localhost:8126")) | ||
defer tracer.Stop() | ||
|
||
// Setup logging | ||
f, err := os.OpenFile("worker.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) | ||
if err != nil { | ||
log.Fatalf("error opening file: %v", err) | ||
} | ||
defer func() { | ||
err = f.Close() | ||
if err != nil { | ||
log.Fatalf("error closing file: %v", err) | ||
} | ||
}() | ||
wrt := io.MultiWriter(os.Stdout, f) | ||
|
||
logger := tlog.NewStructuredLogger( | ||
slog.New(slog.NewJSONHandler(wrt, &slog.HandlerOptions{ | ||
Level: slog.LevelInfo, | ||
}))) | ||
|
||
c, err := client.Dial(client.Options{ | ||
Logger: logger, | ||
Interceptors: []interceptor.ClientInterceptor{tracing.NewTracingInterceptor(tracing.TracerOptions{})}, | ||
MetricsHandler: sdktally.NewMetricsHandler(newPrometheusScope(prometheus.Configuration{ | ||
ListenAddress: "localhost:9090", | ||
TimerType: "histogram", | ||
})), | ||
}) | ||
if err != nil { | ||
log.Fatalln("Unable to create client", err) | ||
} | ||
defer c.Close() | ||
|
||
w := worker.New(c, "datadog", worker.Options{}) | ||
|
||
w.RegisterWorkflow(datadog.Workflow) | ||
w.RegisterWorkflow(datadog.ChildWorkflow) | ||
w.RegisterActivity(datadog.Activity) | ||
|
||
err = w.Run(worker.InterruptCh()) | ||
if err != nil { | ||
log.Fatalln("Unable to start worker", err) | ||
} | ||
} | ||
|
||
func newPrometheusScope(c prometheus.Configuration) tally.Scope { | ||
reporter, err := c.NewReporter( | ||
prometheus.ConfigurationOptions{ | ||
Registry: prom.NewRegistry(), | ||
OnError: func(err error) { | ||
log.Println("error in prometheus reporter", err) | ||
}, | ||
}, | ||
) | ||
if err != nil { | ||
log.Fatalln("error creating prometheus reporter", err) | ||
} | ||
scopeOpts := tally.ScopeOptions{ | ||
CachedReporter: reporter, | ||
Separator: prometheus.DefaultSeparator, | ||
SanitizeOptions: &sdktally.PrometheusSanitizeOptions, | ||
Prefix: "temporal_datadog", | ||
} | ||
scope, _ := tally.NewRootScope(scopeOpts, time.Second) | ||
scope = sdktally.NewPrometheusNamingScope(scope) | ||
|
||
log.Println("prometheus metrics scope created") | ||
return scope | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
package datadog | ||
|
||
import ( | ||
"context" | ||
"time" | ||
|
||
"go.temporal.io/sdk/activity" | ||
"go.temporal.io/sdk/workflow" | ||
) | ||
|
||
func Workflow(ctx workflow.Context, name string) error { | ||
workflow.GetLogger(ctx).Info("Executing Workflow.", "name", name) | ||
cwo := workflow.ChildWorkflowOptions{ | ||
WorkflowID: "DATADOG-CHILD-WORKFLOW-ID", | ||
} | ||
ctx = workflow.WithChildOptions(ctx, cwo) | ||
return workflow.ExecuteChildWorkflow(ctx, ChildWorkflow, name).Get(ctx, nil) | ||
} | ||
|
||
func ChildWorkflow(ctx workflow.Context, name string) error { | ||
workflow.GetLogger(ctx).Info("Executing ChildWorkflow.", "name", name) | ||
ao := workflow.ActivityOptions{ | ||
StartToCloseTimeout: 10 * time.Second, | ||
} | ||
ctx = workflow.WithActivityOptions(ctx, ao) | ||
return workflow.ExecuteActivity(ctx, Activity, name).Get(ctx, nil) | ||
} | ||
|
||
func Activity(ctx context.Context, name string) error { | ||
activity.GetLogger(ctx).Info("Executing Activity.", "name", name) | ||
time.Sleep(time.Second) | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.