From 5fc4f1ccc92369a8a516c6c39ddd562edc45c295 Mon Sep 17 00:00:00 2001 From: Tigran Najaryan Date: Wed, 17 Apr 2024 13:16:37 -0400 Subject: [PATCH] Use UUID instead of ULID Implements spec change https://github.com/open-telemetry/opamp-spec/pull/186 The example server still accepts old-style ULID agent instances to demonstrate how this change can be handled in a backward compatible way. --- client/clientimpl_test.go | 36 ++++++----- client/internal/receivedprocessor.go | 8 +-- client/internal/sender.go | 15 ++--- client/internal/wsreceiver_test.go | 2 +- client/types/instanceid.go | 3 + client/types/startsettings.go | 2 +- go.mod | 2 +- go.sum | 5 +- internal/examples/agent/agent/agent.go | 30 +++++---- .../examples/agent/agent/metricreporter.go | 4 +- internal/examples/go.mod | 1 + internal/examples/go.sum | 2 + internal/examples/server/data/agent.go | 7 +- internal/examples/server/data/instanceid.go | 4 +- internal/examples/server/opampsrv/opampsrv.go | 24 +++++-- .../examples/server/uisrv/html/agent.html | 6 +- internal/examples/server/uisrv/html/root.html | 2 +- internal/examples/server/uisrv/ui.go | 30 +++++++-- .../supervisor/supervisor/supervisor.go | 22 ++++--- protobufs/anyvalue.pb.go | 5 +- protobufs/opamp.pb.go | 64 +++++++++---------- server/serverimpl.go | 4 +- server/serverimpl_test.go | 20 +++--- 23 files changed, 176 insertions(+), 122 deletions(-) create mode 100644 client/types/instanceid.go diff --git a/client/clientimpl_test.go b/client/clientimpl_test.go index 994ad4bf..288c754b 100644 --- a/client/clientimpl_test.go +++ b/client/clientimpl_test.go @@ -5,7 +5,6 @@ import ( "crypto/tls" "crypto/x509" "fmt" - "math/rand" "net/http" "net/http/httptest" "net/url" @@ -13,7 +12,7 @@ import ( "testing" "time" - ulid "github.com/oklog/ulid/v2" + "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" @@ -94,10 +93,17 @@ func eventually(t *testing.T, f func() bool) { assert.Eventually(t, f, 5*time.Second, 10*time.Millisecond) } +func newInstanceUid(t *testing.T) types.InstanceUid { + uid, err := uuid.NewV7() + require.NoError(t, err) + b, err := uid.MarshalBinary() + require.NoError(t, err) + return types.InstanceUid(b) +} + func prepareSettings(t *testing.T, settings *types.StartSettings, c OpAMPClient) { // Autogenerate instance id. - entropy := ulid.Monotonic(rand.New(rand.NewSource(99)), 0) - settings.InstanceUid = ulid.MustNew(ulid.Timestamp(time.Now()), entropy).String() + settings.InstanceUid = newInstanceUid(t) // Make sure correct URL scheme is used, based on the type of the OpAMP client. u, err := url.Parse(settings.OpAMPServerURL) @@ -150,7 +156,7 @@ func TestInvalidInstanceId(t *testing.T) { testClients(t, func(t *testing.T, client OpAMPClient) { settings := createNoServerSettings() prepareClient(t, &settings, client) - settings.InstanceUid = "invalidid" + settings.InstanceUid = types.InstanceUid{} err := client.Start(context.Background(), settings) assert.Error(t, err) @@ -624,9 +630,7 @@ func TestAgentIdentification(t *testing.T) { testClients(t, func(t *testing.T, client OpAMPClient) { // Start a server. srv := internal.StartMockServer(t) - newInstanceUid := ulid.MustNew( - ulid.Timestamp(time.Now()), ulid.Monotonic(rand.New(rand.NewSource(0)), 0), - ) + newInstanceUid := newInstanceUid(t) var rcvAgentInstanceUid atomic.Value var sentInvalidId atomic.Bool srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent { @@ -636,7 +640,7 @@ func TestAgentIdentification(t *testing.T) { InstanceUid: msg.InstanceUid, AgentIdentification: &protobufs.AgentIdentification{ // If we sent the invalid one first, send a valid one now - NewInstanceUid: newInstanceUid.String(), + NewInstanceUid: newInstanceUid[:], }, } } @@ -645,7 +649,7 @@ func TestAgentIdentification(t *testing.T) { InstanceUid: msg.InstanceUid, AgentIdentification: &protobufs.AgentIdentification{ // Start by sending an invalid id forcing an error. - NewInstanceUid: "", + NewInstanceUid: nil, }, } } @@ -662,11 +666,11 @@ func TestAgentIdentification(t *testing.T) { eventually( t, func() bool { - instanceUid, ok := rcvAgentInstanceUid.Load().(string) + instanceUid, ok := rcvAgentInstanceUid.Load().([]byte) if !ok { return false } - return instanceUid == oldInstanceUid + return types.InstanceUid(instanceUid) == oldInstanceUid }, ) @@ -677,11 +681,11 @@ func TestAgentIdentification(t *testing.T) { eventually( t, func() bool { - instanceUid, ok := rcvAgentInstanceUid.Load().(string) + instanceUid, ok := rcvAgentInstanceUid.Load().([]byte) if !ok { return false } - return instanceUid == oldInstanceUid + return types.InstanceUid(instanceUid) == oldInstanceUid }, ) @@ -693,11 +697,11 @@ func TestAgentIdentification(t *testing.T) { eventually( t, func() bool { - instanceUid, ok := rcvAgentInstanceUid.Load().(string) + instanceUid, ok := rcvAgentInstanceUid.Load().([]byte) if !ok { return false } - return instanceUid == newInstanceUid.String() + return types.InstanceUid(instanceUid) == newInstanceUid }, ) diff --git a/client/internal/receivedprocessor.go b/client/internal/receivedprocessor.go index af879205..736073d5 100644 --- a/client/internal/receivedprocessor.go +++ b/client/internal/receivedprocessor.go @@ -2,7 +2,7 @@ package internal import ( "context" - "errors" + "fmt" "github.com/open-telemetry/opamp-go/client/types" "github.com/open-telemetry/opamp-go/protobufs" @@ -225,13 +225,13 @@ func (r *receivedProcessor) processErrorResponse(ctx context.Context, body *prot } func (r *receivedProcessor) rcvAgentIdentification(ctx context.Context, agentId *protobufs.AgentIdentification) error { - if agentId.NewInstanceUid == "" { - err := errors.New("empty instance uid is not allowed") + if len(agentId.NewInstanceUid) != 16 { + err := fmt.Errorf("instance uid must be 16 bytes but is %d bytes long", len(agentId.NewInstanceUid)) r.logger.Debugf(ctx, err.Error()) return err } - err := r.sender.SetInstanceUid(agentId.NewInstanceUid) + err := r.sender.SetInstanceUid(types.InstanceUid(agentId.NewInstanceUid)) if err != nil { r.logger.Errorf(ctx, "Error while setting instance uid: %v", err) return err diff --git a/client/internal/sender.go b/client/internal/sender.go index f418b4c2..b6338b16 100644 --- a/client/internal/sender.go +++ b/client/internal/sender.go @@ -3,7 +3,7 @@ package internal import ( "errors" - "github.com/oklog/ulid/v2" + "github.com/open-telemetry/opamp-go/client/types" "github.com/open-telemetry/opamp-go/protobufs" ) @@ -21,7 +21,7 @@ type Sender interface { ScheduleSend() // SetInstanceUid sets a new instanceUid to be used for all subsequent messages to be sent. - SetInstanceUid(instanceUid string) error + SetInstanceUid(instanceUid types.InstanceUid) error } // SenderCommon is partial Sender implementation that is common between WebSocket and plain @@ -65,18 +65,15 @@ func (h *SenderCommon) NextMessage() *NextMessage { // SetInstanceUid sets a new instanceUid to be used for all subsequent messages to be sent. // Can be called concurrently, normally is called when a message is received from the // Server that instructs us to change our instance UID. -func (h *SenderCommon) SetInstanceUid(instanceUid string) error { - if instanceUid == "" { +func (h *SenderCommon) SetInstanceUid(instanceUid types.InstanceUid) error { + var emptyUid types.InstanceUid + if instanceUid == emptyUid { return errors.New("cannot set instance uid to empty value") } - if _, err := ulid.ParseStrict(instanceUid); err != nil { - return err - } - h.nextMessage.Update( func(msg *protobufs.AgentToServer) { - msg.InstanceUid = instanceUid + msg.InstanceUid = instanceUid[:] }) return nil diff --git a/client/internal/wsreceiver_test.go b/client/internal/wsreceiver_test.go index c929f8fa..6b2cca7d 100644 --- a/client/internal/wsreceiver_test.go +++ b/client/internal/wsreceiver_test.go @@ -157,7 +157,7 @@ func TestDecodeMessage(t *testing.T) { msgsToTest := []*protobufs.ServerToAgent{ {}, // Empty message { - InstanceUid: "abcd", + InstanceUid: []byte("0123456789123456"), }, } diff --git a/client/types/instanceid.go b/client/types/instanceid.go new file mode 100644 index 00000000..47901ad2 --- /dev/null +++ b/client/types/instanceid.go @@ -0,0 +1,3 @@ +package types + +type InstanceUid [16]byte diff --git a/client/types/startsettings.go b/client/types/startsettings.go index b395afdb..857fd2a4 100644 --- a/client/types/startsettings.go +++ b/client/types/startsettings.go @@ -21,7 +21,7 @@ type StartSettings struct { TLSConfig *tls.Config // Agent information. - InstanceUid string + InstanceUid InstanceUid // Callbacks that the client will call after Start() returns nil. Callbacks Callbacks diff --git a/go.mod b/go.mod index 125f76b5..01c3eaca 100644 --- a/go.mod +++ b/go.mod @@ -4,8 +4,8 @@ go 1.20 require ( github.com/cenkalti/backoff/v4 v4.2.1 + github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.1 - github.com/oklog/ulid/v2 v2.1.0 github.com/stretchr/testify v1.9.0 google.golang.org/protobuf v1.33.0 ) diff --git a/go.sum b/go.sum index c53b66af..04927923 100644 --- a/go.sum +++ b/go.sum @@ -4,11 +4,10 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= -github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU= -github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ= -github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= diff --git a/internal/examples/agent/agent/agent.go b/internal/examples/agent/agent/agent.go index bc89c3b8..71e9dd4b 100644 --- a/internal/examples/agent/agent/agent.go +++ b/internal/examples/agent/agent/agent.go @@ -10,16 +10,14 @@ import ( "crypto/x509/pkix" "encoding/pem" "fmt" - "math/rand" "os" "runtime" "sort" - "time" + "github.com/google/uuid" "github.com/knadh/koanf" "github.com/knadh/koanf/parsers/yaml" "github.com/knadh/koanf/providers/rawbytes" - "github.com/oklog/ulid/v2" "github.com/open-telemetry/opamp-go/client" "github.com/open-telemetry/opamp-go/client/types" @@ -54,7 +52,7 @@ type Agent struct { effectiveConfig string - instanceId ulid.ULID + instanceId uuid.UUID agentDescription *protobufs.AgentDescription @@ -82,7 +80,7 @@ func NewAgent(logger types.Logger, agentType string, agentVersion string) *Agent agent.createAgentIdentity() agent.logger.Debugf(context.Background(), "Agent starting, id=%v, type=%s, version=%s.", - agent.instanceId.String(), agentType, agentVersion) + agent.instanceId, agentType, agentVersion) agent.loadLocalConfig() if err := agent.connect(); err != nil { @@ -107,7 +105,7 @@ func (agent *Agent) connect() error { settings := types.StartSettings{ OpAMPServerURL: "wss://127.0.0.1:4320/v1/opamp", TLSConfig: tlsConfig, - InstanceUid: agent.instanceId.String(), + InstanceUid: types.InstanceUid(agent.instanceId), Callbacks: types.CallbacksStruct{ OnConnectFunc: func(ctx context.Context) { agent.logger.Debugf(ctx, "Connected to the server.") @@ -167,8 +165,11 @@ func (agent *Agent) disconnect(ctx context.Context) { func (agent *Agent) createAgentIdentity() { // Generate instance id. - entropy := ulid.Monotonic(rand.New(rand.NewSource(0)), 0) - agent.instanceId = ulid.MustNew(ulid.Timestamp(time.Now()), entropy) + uid, err := uuid.NewV7() + if err != nil { + panic(err) + } + agent.instanceId = uid hostname, _ := os.Hostname() @@ -209,10 +210,10 @@ func (agent *Agent) createAgentIdentity() { } } -func (agent *Agent) updateAgentIdentity(ctx context.Context, instanceId ulid.ULID) { +func (agent *Agent) updateAgentIdentity(ctx context.Context, instanceId uuid.UUID) { agent.logger.Debugf(ctx, "Agent identify is being changed from id=%v to id=%v", - agent.instanceId.String(), - instanceId.String()) + agent.instanceId, + instanceId) agent.instanceId = instanceId if agent.metricReporter != nil { @@ -459,11 +460,12 @@ func (agent *Agent) onMessage(ctx context.Context, msg *types.MessageData) { } if msg.AgentIdentification != nil { - newInstanceId, err := ulid.Parse(msg.AgentIdentification.NewInstanceUid) + uid, err := uuid.FromBytes(msg.AgentIdentification.NewInstanceUid) if err != nil { - agent.logger.Errorf(ctx, err.Error()) + agent.logger.Errorf(ctx, "invalid NewInstanceUid: %v", err) + return } - agent.updateAgentIdentity(ctx, newInstanceId) + agent.updateAgentIdentity(ctx, uid) } if configChanged { diff --git a/internal/examples/agent/agent/metricreporter.go b/internal/examples/agent/agent/metricreporter.go index 754f0630..09639856 100644 --- a/internal/examples/agent/agent/metricreporter.go +++ b/internal/examples/agent/agent/metricreporter.go @@ -9,7 +9,7 @@ import ( "os" "time" - "github.com/oklog/ulid/v2" + "github.com/google/uuid" "github.com/shirou/gopsutil/process" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" @@ -46,7 +46,7 @@ func NewMetricReporter( dest *protobufs.TelemetryConnectionSettings, agentType string, agentVersion string, - instanceId ulid.ULID, + instanceId uuid.UUID, ) (*MetricReporter, error) { // Check the destination credentials to make sure they look like a valid OTLP/HTTP diff --git a/internal/examples/go.mod b/internal/examples/go.mod index 875097cc..8734b6b7 100644 --- a/internal/examples/go.mod +++ b/internal/examples/go.mod @@ -4,6 +4,7 @@ go 1.20 require ( github.com/cenkalti/backoff/v4 v4.2.1 + github.com/google/uuid v1.6.0 github.com/knadh/koanf v1.3.3 github.com/oklog/ulid/v2 v2.1.0 github.com/open-telemetry/opamp-go v0.1.0 diff --git a/internal/examples/go.sum b/internal/examples/go.sum index 3fdd46a9..d38b555a 100644 --- a/internal/examples/go.sum +++ b/internal/examples/go.sum @@ -47,6 +47,8 @@ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 h1:/c3QmbOGMGTOumP2iT/rCwB7b0QDGLKzqOmktBjT+Is= diff --git a/internal/examples/server/data/agent.go b/internal/examples/server/data/agent.go index a28295c1..bbefaac5 100644 --- a/internal/examples/server/data/agent.go +++ b/internal/examples/server/data/agent.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/google/uuid" "google.golang.org/protobuf/proto" "github.com/open-telemetry/opamp-go/internal/examples/server/certman" @@ -23,7 +24,8 @@ type Agent struct { // Some fields in this struct are exported so that we can render them in the UI. // Agent's instance id. This is an immutable field. - InstanceId InstanceId + InstanceId InstanceId + InstanceIdStr string // Connection to the Agent. conn types.Connection @@ -60,7 +62,7 @@ func NewAgent( instanceId InstanceId, conn types.Connection, ) *Agent { - agent := &Agent{InstanceId: instanceId, conn: conn} + agent := &Agent{InstanceId: instanceId, InstanceIdStr: uuid.UUID(instanceId).String(), conn: conn} tslConn, ok := conn.Connection().(*tls.Conn) if ok { // Client is using TLS connection. @@ -84,6 +86,7 @@ func (agent *Agent) CloneReadonly() *Agent { defer agent.mux.RUnlock() return &Agent{ InstanceId: agent.InstanceId, + InstanceIdStr: uuid.UUID(agent.InstanceId).String(), Status: proto.Clone(agent.Status).(*protobufs.AgentToServer), EffectiveConfig: agent.EffectiveConfig, CustomInstanceConfig: agent.CustomInstanceConfig, diff --git a/internal/examples/server/data/instanceid.go b/internal/examples/server/data/instanceid.go index debb0efa..b40ae23a 100644 --- a/internal/examples/server/data/instanceid.go +++ b/internal/examples/server/data/instanceid.go @@ -1,3 +1,5 @@ package data -type InstanceId string +import "github.com/google/uuid" + +type InstanceId uuid.UUID diff --git a/internal/examples/server/opampsrv/opampsrv.go b/internal/examples/server/opampsrv/opampsrv.go index 84bb2018..ea19d912 100644 --- a/internal/examples/server/opampsrv/opampsrv.go +++ b/internal/examples/server/opampsrv/opampsrv.go @@ -6,6 +6,7 @@ import ( "net/http" "os" + "github.com/oklog/ulid/v2" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "github.com/open-telemetry/opamp-go/internal" @@ -83,13 +84,28 @@ func (srv *Server) onDisconnect(conn types.Connection) { } func (srv *Server) onMessage(ctx context.Context, conn types.Connection, msg *protobufs.AgentToServer) *protobufs.ServerToAgent { - instanceId := data.InstanceId(msg.InstanceUid) - - agent := srv.agents.FindOrCreateAgent(instanceId, conn) - // Start building the response. response := &protobufs.ServerToAgent{} + var instanceId data.InstanceId + if len(msg.InstanceUid) == 26 { + // This is an old-style ULID. + u, err := ulid.Parse(string(msg.InstanceUid)) + if err != nil { + srv.logger.Errorf(ctx, "Cannot parse ULID %s: %v", string(msg.InstanceUid), err) + return response + } + instanceId = data.InstanceId(u.Bytes()) + } else if len(msg.InstanceUid) == 16 { + // This is a 16 byte, new style UID. + instanceId = data.InstanceId(msg.InstanceUid) + } else { + srv.logger.Errorf(ctx, "Invalid length of msg.InstanceUid") + return response + } + + agent := srv.agents.FindOrCreateAgent(instanceId, conn) + // Process the status report and continue building the response. agent.UpdateStatus(msg, response) diff --git a/internal/examples/server/uisrv/html/agent.html b/internal/examples/server/uisrv/html/agent.html index af8f42da..3a6a9d8d 100644 --- a/internal/examples/server/uisrv/html/agent.html +++ b/internal/examples/server/uisrv/html/agent.html @@ -21,7 +21,7 @@

Agent

- + {{if .Status.Health }} @@ -70,7 +70,7 @@

Configuration

{{ range . }} - + {{ end }}
Instance ID:{{ .InstanceId }}Instance ID:{{ .InstanceIdStr }}
Additional Configuration:
- +
{{if .Status.RemoteConfigStatus }} {{if .Status.RemoteConfigStatus.ErrorMessage }} @@ -116,7 +116,7 @@

Client Certificate


- + {{if .ClientCert}} {{else}} diff --git a/internal/examples/server/uisrv/html/root.html b/internal/examples/server/uisrv/html/root.html index 1121edf6..ec7bb689 100644 --- a/internal/examples/server/uisrv/html/root.html +++ b/internal/examples/server/uisrv/html/root.html @@ -14,7 +14,7 @@

Agents

{{ .InstanceId }}{{ .InstanceIdStr }}
diff --git a/internal/examples/server/uisrv/ui.go b/internal/examples/server/uisrv/ui.go index 4e11b7f5..f766b037 100644 --- a/internal/examples/server/uisrv/ui.go +++ b/internal/examples/server/uisrv/ui.go @@ -8,6 +8,8 @@ import ( "text/template" "time" + "github.com/google/uuid" + "github.com/open-telemetry/opamp-go/internal" "github.com/open-telemetry/opamp-go/internal/examples/server/data" "github.com/open-telemetry/opamp-go/protobufs" @@ -63,7 +65,13 @@ func renderRoot(w http.ResponseWriter, r *http.Request) { } func renderAgent(w http.ResponseWriter, r *http.Request) { - agent := data.AllAgents.GetAgentReadonlyClone(data.InstanceId(r.URL.Query().Get("instanceid"))) + uid, err := uuid.Parse(r.URL.Query().Get("instanceid")) + if err != nil { + w.WriteHeader(http.StatusNotFound) + return + } + + agent := data.AllAgents.GetAgentReadonlyClone(data.InstanceId(uid)) if agent == nil { w.WriteHeader(http.StatusNotFound) return @@ -77,7 +85,13 @@ func saveCustomConfigForInstance(w http.ResponseWriter, r *http.Request) { return } - instanceId := data.InstanceId(r.Form.Get("instanceid")) + uid, err := uuid.Parse(r.Form.Get("instanceid")) + if err != nil { + w.WriteHeader(http.StatusNotFound) + return + } + + instanceId := data.InstanceId(uid) agent := data.AllAgents.GetAgentReadonlyClone(instanceId) if agent == nil { w.WriteHeader(http.StatusNotFound) @@ -103,7 +117,7 @@ func saveCustomConfigForInstance(w http.ResponseWriter, r *http.Request) { case <-timer.C: } - http.Redirect(w, r, "/agent?instanceid="+string(instanceId), http.StatusSeeOther) + http.Redirect(w, r, "/agent?instanceid="+uid.String(), http.StatusSeeOther) } func rotateInstanceClientCert(w http.ResponseWriter, r *http.Request) { @@ -113,7 +127,13 @@ func rotateInstanceClientCert(w http.ResponseWriter, r *http.Request) { } // Find the agent instance. - instanceId := data.InstanceId(r.Form.Get("instanceid")) + uid, err := uuid.Parse(r.Form.Get("instanceid")) + if err != nil { + w.WriteHeader(http.StatusNotFound) + return + } + + instanceId := data.InstanceId(uid) agent := data.AllAgents.GetAgentReadonlyClone(instanceId) if agent == nil { w.WriteHeader(http.StatusNotFound) @@ -151,5 +171,5 @@ func rotateInstanceClientCert(w http.ResponseWriter, r *http.Request) { logger.Printf("Time out waiting for agent %s to reconnect\n", instanceId) } - http.Redirect(w, r, "/agent?instanceid="+string(instanceId), http.StatusSeeOther) + http.Redirect(w, r, "/agent?instanceid="+uid.String(), http.StatusSeeOther) } diff --git a/internal/examples/supervisor/supervisor/supervisor.go b/internal/examples/supervisor/supervisor/supervisor.go index c47d9122..1d794b18 100644 --- a/internal/examples/supervisor/supervisor/supervisor.go +++ b/internal/examples/supervisor/supervisor/supervisor.go @@ -5,7 +5,6 @@ import ( "crypto/tls" "errors" "fmt" - "math/rand" "os" "runtime" "sort" @@ -13,11 +12,11 @@ import ( "time" "github.com/cenkalti/backoff/v4" + "github.com/google/uuid" "github.com/knadh/koanf" "github.com/knadh/koanf/parsers/yaml" "github.com/knadh/koanf/providers/file" "github.com/knadh/koanf/providers/rawbytes" - "github.com/oklog/ulid/v2" "github.com/open-telemetry/opamp-go/client" "github.com/open-telemetry/opamp-go/client/types" @@ -54,7 +53,7 @@ type Supervisor struct { agentVersion string // Agent's instance id. - instanceId ulid.ULID + instanceId uuid.UUID // A config section to be added to the Collector's config to fetch its own metrics. // TODO: store this persistently so that when starting we can compose the effective @@ -91,7 +90,7 @@ func NewSupervisor(logger types.Logger) (*Supervisor, error) { s.createInstanceId() logger.Debugf(context.Background(), "Supervisor starting, id=%v, type=%s, version=%s.", - s.instanceId.String(), agentType, agentVersion) + s.instanceId, agentType, agentVersion) s.loadAgentEffectiveConfig() @@ -138,7 +137,7 @@ func (s *Supervisor) startOpAMP() error { TLSConfig: &tls.Config{ InsecureSkipVerify: true, }, - InstanceUid: s.instanceId.String(), + InstanceUid: types.InstanceUid(s.instanceId), Callbacks: types.CallbacksStruct{ OnConnectFunc: func(ctx context.Context) { s.logger.Debugf(ctx, "Connected to the server.") @@ -184,8 +183,12 @@ func (s *Supervisor) startOpAMP() error { func (s *Supervisor) createInstanceId() { // Generate instance id. - entropy := ulid.Monotonic(rand.New(rand.NewSource(0)), 0) - s.instanceId = ulid.MustNew(ulid.Timestamp(time.Now()), entropy) + + uid, err := uuid.NewV7() + if err != nil { + panic(err) + } + s.instanceId = uid // TODO: set instanceId in the Collector config. } @@ -573,9 +576,10 @@ func (s *Supervisor) onMessage(ctx context.Context, msg *types.MessageData) { } if msg.AgentIdentification != nil { - newInstanceId, err := ulid.Parse(msg.AgentIdentification.NewInstanceUid) + newInstanceId, err := uuid.FromBytes(msg.AgentIdentification.NewInstanceUid) if err != nil { - s.logger.Errorf(ctx, err.Error()) + s.logger.Errorf(ctx, "invalid NewInstanceUid: %v", err) + return } s.logger.Debugf(ctx, "Agent identify is being changed from id=%v to id=%v", diff --git a/protobufs/anyvalue.pb.go b/protobufs/anyvalue.pb.go index 3f33ba54..697b805b 100644 --- a/protobufs/anyvalue.pb.go +++ b/protobufs/anyvalue.pb.go @@ -20,8 +20,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.31.0 -// protoc v4.24.3 +// protoc-gen-go v1.26.0 +// protoc v3.17.3 // source: anyvalue.proto package protobufs @@ -52,7 +52,6 @@ type AnyValue struct { // in which case this AnyValue is considered to be "null". // // Types that are assignable to Value: - // // *AnyValue_StringValue // *AnyValue_BoolValue // *AnyValue_IntValue diff --git a/protobufs/opamp.pb.go b/protobufs/opamp.pb.go index 5c908c10..a19f8f67 100644 --- a/protobufs/opamp.pb.go +++ b/protobufs/opamp.pb.go @@ -16,8 +16,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.31.0 -// protoc v4.24.3 +// protoc-gen-go v1.26.0 +// protoc v3.17.3 // source: opamp.proto package protobufs @@ -602,8 +602,8 @@ type AgentToServer struct { // Globally unique identifier of the running instance of the Agent. SHOULD remain // unchanged for the lifetime of the Agent process. - // Recommended format: https://github.com/ulid/spec - InstanceUid string `protobuf:"bytes,1,opt,name=instance_uid,json=instanceUid,proto3" json:"instance_uid,omitempty"` + // MUST be 16 bytes long and SHOULD be generated using UUID v7 spec. + InstanceUid []byte `protobuf:"bytes,1,opt,name=instance_uid,json=instanceUid,proto3" json:"instance_uid,omitempty"` // The sequence number is incremented by 1 for every AgentToServer sent // by the Agent. This allows the Server to detect that it missed a message when // it notices that the sequence_num is not exactly by 1 greater than the previously @@ -690,11 +690,11 @@ func (*AgentToServer) Descriptor() ([]byte, []int) { return file_opamp_proto_rawDescGZIP(), []int{0} } -func (x *AgentToServer) GetInstanceUid() string { +func (x *AgentToServer) GetInstanceUid() []byte { if x != nil { return x.InstanceUid } - return "" + return nil } func (x *AgentToServer) GetSequenceNum() uint64 { @@ -991,7 +991,7 @@ type ServerToAgent struct { // Agent instance uid. MUST match the instance_uid field in AgentToServer message. // Used for multiplexing messages from/to multiple agents using one message stream. - InstanceUid string `protobuf:"bytes,1,opt,name=instance_uid,json=instanceUid,proto3" json:"instance_uid,omitempty"` + InstanceUid []byte `protobuf:"bytes,1,opt,name=instance_uid,json=instanceUid,proto3" json:"instance_uid,omitempty"` // error_response is set if the Server wants to indicate that something went wrong // during processing of an AgentToServer message. If error_response is set then // all other fields below must be unset and vice versa, if any of the fields below is @@ -1065,11 +1065,11 @@ func (*ServerToAgent) Descriptor() ([]byte, []int) { return file_opamp_proto_rawDescGZIP(), []int{5} } -func (x *ServerToAgent) GetInstanceUid() string { +func (x *ServerToAgent) GetInstanceUid() []byte { if x != nil { return x.InstanceUid } - return "" + return nil } func (x *ServerToAgent) GetErrorResponse() *ServerErrorResponse { @@ -1951,7 +1951,6 @@ type ServerErrorResponse struct { // Error message in the string form, typically human readable. ErrorMessage string `protobuf:"bytes,2,opt,name=error_message,json=errorMessage,proto3" json:"error_message,omitempty"` // Types that are assignable to Details: - // // *ServerErrorResponse_RetryInfo Details isServerErrorResponse_Details `protobuf_oneof:"Details"` } @@ -2135,15 +2134,15 @@ type AgentDescription struct { // // For standalone running Agents (such as OpenTelemetry Collector) the following // attributes SHOULD be specified: - // - service.name should be set to a reverse FQDN that uniquely identifies the - // Agent type, e.g. "io.opentelemetry.collector" - // - service.namespace if it is used in the environment where the Agent runs. - // - service.version should be set to version number of the Agent build. - // - service.instance.id should be set. It may be be set equal to the Agent's - // instance uid (equal to ServerToAgent.instance_uid field) or any other value - // that uniquely identifies the Agent in combination with other attributes. - // - any other attributes that are necessary for uniquely identifying the Agent's - // own telemetry. + // - service.name should be set to a reverse FQDN that uniquely identifies the + // Agent type, e.g. "io.opentelemetry.collector" + // - service.namespace if it is used in the environment where the Agent runs. + // - service.version should be set to version number of the Agent build. + // - service.instance.id should be set. It may be be set equal to the Agent's + // instance uid (equal to ServerToAgent.instance_uid field) or any other value + // that uniquely identifies the Agent in combination with other attributes. + // - any other attributes that are necessary for uniquely identifying the Agent's + // own telemetry. // // The Agent SHOULD also include these attributes in the Resource of its own // telemetry. The combination of identifying attributes SHOULD be sufficient to @@ -2153,13 +2152,13 @@ type AgentDescription struct { // Attributes that do not necessarily identify the Agent but help describe // where it runs. // The following attributes SHOULD be included: - // - os.type, os.version - to describe where the Agent runs. - // - host.* to describe the host the Agent runs on. - // - cloud.* to describe the cloud where the host is located. - // - any other relevant Resource attributes that describe this Agent and the - // environment it runs in. - // - any user-defined attributes that the end user would like to associate - // with this Agent. + // - os.type, os.version - to describe where the Agent runs. + // - host.* to describe the host the Agent runs on. + // - cloud.* to describe the cloud where the host is located. + // - any other relevant Resource attributes that describe this Agent and the + // environment it runs in. + // - any user-defined attributes that the end user would like to associate + // with this Agent. NonIdentifyingAttributes []*KeyValue `protobuf:"bytes,2,rep,name=non_identifying_attributes,json=nonIdentifyingAttributes,proto3" json:"non_identifying_attributes,omitempty"` } @@ -2646,7 +2645,8 @@ type AgentIdentification struct { // When new_instance_uid is set, Agent MUST update instance_uid // to the value provided and use it for all further communication. - NewInstanceUid string `protobuf:"bytes,1,opt,name=new_instance_uid,json=newInstanceUid,proto3" json:"new_instance_uid,omitempty"` + // MUST be 16 bytes long and SHOULD be generated using UUID v7 spec. + NewInstanceUid []byte `protobuf:"bytes,1,opt,name=new_instance_uid,json=newInstanceUid,proto3" json:"new_instance_uid,omitempty"` } func (x *AgentIdentification) Reset() { @@ -2681,11 +2681,11 @@ func (*AgentIdentification) Descriptor() ([]byte, []int) { return file_opamp_proto_rawDescGZIP(), []int{25} } -func (x *AgentIdentification) GetNewInstanceUid() string { +func (x *AgentIdentification) GetNewInstanceUid() []byte { if x != nil { return x.NewInstanceUid } - return "" + return nil } type AgentRemoteConfig struct { @@ -2999,7 +2999,7 @@ var file_opamp_proto_rawDesc = []byte{ 0x61, 0x6c, 0x75, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xbc, 0x06, 0x0a, 0x0d, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x54, 0x6f, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x12, 0x21, 0x0a, 0x0c, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x5f, 0x75, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x0b, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x55, 0x69, 0x64, 0x12, + 0x28, 0x0c, 0x52, 0x0b, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x55, 0x69, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x5f, 0x6e, 0x75, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0b, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x4e, 0x75, 0x6d, 0x12, 0x4a, 0x0a, 0x11, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x64, 0x65, 0x73, 0x63, @@ -3067,7 +3067,7 @@ var file_opamp_proto_rawDesc = []byte{ 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x63, 0x73, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x63, 0x73, 0x72, 0x22, 0xc8, 0x05, 0x0a, 0x0d, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x54, 0x6f, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x12, 0x21, 0x0a, 0x0c, 0x69, 0x6e, - 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x5f, 0x75, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x5f, 0x75, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x55, 0x69, 0x64, 0x12, 0x47, 0x0a, 0x0e, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x6f, 0x70, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, @@ -3350,7 +3350,7 @@ var file_opamp_proto_rawDesc = []byte{ 0x73, 0x61, 0x67, 0x65, 0x22, 0x3f, 0x0a, 0x13, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x28, 0x0a, 0x10, 0x6e, 0x65, 0x77, 0x5f, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x5f, 0x75, 0x69, 0x64, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x6e, 0x65, 0x77, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6e, + 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0e, 0x6e, 0x65, 0x77, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x55, 0x69, 0x64, 0x22, 0x69, 0x0a, 0x11, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x33, 0x0a, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x6f, 0x70, 0x61, diff --git a/server/serverimpl.go b/server/serverimpl.go index 2f5965a0..815c0528 100644 --- a/server/serverimpl.go +++ b/server/serverimpl.go @@ -256,7 +256,7 @@ func (s *server) handleWSConnection(reqCtx context.Context, wsConn *websocket.Co if connectionCallbacks != nil { response := connectionCallbacks.OnMessage(msgContext, agentConn, &request) - if response.InstanceUid == "" { + if len(response.InstanceUid) == 0 { response.InstanceUid = request.InstanceUid } if !sentCustomCapabilities { @@ -349,7 +349,7 @@ func (s *server) handlePlainHTTPRequest(req *http.Request, w http.ResponseWriter response := connectionCallbacks.OnMessage(req.Context(), agentConn, &request) // Set the InstanceUid if it is not set by the callback. - if response.InstanceUid == "" { + if len(response.InstanceUid) == 0 { response.InstanceUid = request.InstanceUid } diff --git a/server/serverimpl_test.go b/server/serverimpl_test.go index f17263f7..55ad5ba9 100644 --- a/server/serverimpl_test.go +++ b/server/serverimpl_test.go @@ -247,6 +247,8 @@ func TestDisconnectWSConnection(t *testing.T) { }) } +var testInstanceUid = []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6} + func TestServerReceiveSendMessage(t *testing.T) { var rcvMsg atomic.Value callbacks := CallbacksStruct{ @@ -282,7 +284,7 @@ func TestServerReceiveSendMessage(t *testing.T) { // Send a message to the Server. sendMsg := protobufs.AgentToServer{ - InstanceUid: "12345678", + InstanceUid: testInstanceUid, } bytes, err := proto.Marshal(&sendMsg) require.NoError(t, err) @@ -364,7 +366,7 @@ func TestServerReceiveSendMessageWithCompression(t *testing.T) { // Send a message to the Server. sendMsg := protobufs.AgentToServer{ - InstanceUid: "10000000", + InstanceUid: testInstanceUid, EffectiveConfig: &protobufs.EffectiveConfig{ ConfigMap: &protobufs.AgentConfigMap{ ConfigMap: map[string]*protobufs.AgentConfigFile{ @@ -452,7 +454,7 @@ func TestServerReceiveSendMessagePlainHTTP(t *testing.T) { // Send a message to the Server. sendMsg := protobufs.AgentToServer{ - InstanceUid: "12345678", + InstanceUid: testInstanceUid, } b, err := proto.Marshal(&sendMsg) require.NoError(t, err) @@ -584,7 +586,7 @@ func TestServerAttachSendMessagePlainHTTP(t *testing.T) { // Send a message to the Server. sendMsg := protobufs.AgentToServer{ - InstanceUid: "12345678", + InstanceUid: testInstanceUid, } b, err := proto.Marshal(&sendMsg) require.NoError(t, err) @@ -653,7 +655,7 @@ func TestServerHonoursClientRequestContentEncoding(t *testing.T) { // Send a message to the Server. sendMsg := protobufs.AgentToServer{ - InstanceUid: "12345678", + InstanceUid: testInstanceUid, } b, err := proto.Marshal(&sendMsg) require.NoError(t, err) @@ -731,7 +733,7 @@ func TestServerHonoursAcceptEncoding(t *testing.T) { // Send a message to the Server. sendMsg := protobufs.AgentToServer{ - InstanceUid: "12345678", + InstanceUid: testInstanceUid, } b, err := proto.Marshal(&sendMsg) require.NoError(t, err) @@ -775,7 +777,7 @@ func TestDecodeMessage(t *testing.T) { msgsToTest := []*protobufs.AgentToServer{ {}, // Empty message { - InstanceUid: "abcd", + InstanceUid: testInstanceUid, SequenceNum: 123, }, } @@ -934,7 +936,7 @@ func TestServerCallsHTTPMiddlewareOverHTTP(t *testing.T) { }() // Send an AgentToServer message to the Server - sendMsg1 := protobufs.AgentToServer{InstanceUid: "01BX5ZZKBKACTAV9WEVGEMMVS1"} + sendMsg1 := protobufs.AgentToServer{InstanceUid: []byte("0123456789123456")} serializedProtoBytes1, err := proto.Marshal(&sendMsg1) require.NoError(t, err) _, err = http.Post( @@ -945,7 +947,7 @@ func TestServerCallsHTTPMiddlewareOverHTTP(t *testing.T) { require.NoError(t, err) // Send another AgentToServer message to the Server - sendMsg2 := protobufs.AgentToServer{InstanceUid: "01BX5ZZKBKACTAV9WEVGEMMVRZ"} + sendMsg2 := protobufs.AgentToServer{InstanceUid: []byte("0123456789000000")} serializedProtoBytes2, err := proto.Marshal(&sendMsg2) require.NoError(t, err) _, err = http.Post(