Skip to content

Commit

Permalink
Add support for client-initiated connection settings request
Browse files Browse the repository at this point in the history
- Implemented spec change open-telemetry/opamp-spec#162
- Added ability for the client to request connection settings.
- Added CSR flow example to demonstrate the new capability.
  • Loading branch information
tigrannajaryan committed Oct 18, 2023
1 parent c1931d7 commit bd6f23e
Show file tree
Hide file tree
Showing 9 changed files with 449 additions and 17 deletions.
9 changes: 9 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,13 @@ type OpAMPClient interface {
// May be called anytime after Start(), including from OnMessage handler.
// nil values are not allowed and will return an error.
SetPackageStatuses(statuses *protobufs.PackageStatuses) error

// RequestConnectionSettings sets a ConnectionSettingsRequest. The ConnectionSettingsRequest
// will be included in the next AgentToServer message sent to the Server.
// Used for client-initiated connection setting acquisition flows.
// It is the responsibility of the caller to ensure that the Server supports
// AcceptsConnectionSettingsRequest capability.
// May be called before or after Start().
// May be also called from OnMessage handler.
RequestConnectionSettings(request *protobufs.ConnectionSettingsRequest) error
}
61 changes: 59 additions & 2 deletions client/clientimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ func TestAgentIdentification(t *testing.T) {
})
}

func TestConnectionSettings(t *testing.T) {
func TestServerOfferConnectionSettings(t *testing.T) {
testClients(t, func(t *testing.T, client OpAMPClient) {
hash := []byte{1, 2, 3}
opampSettings := &protobufs.OpAMPConnectionSettings{DestinationEndpoint: "http://opamp.com"}
Expand Down Expand Up @@ -765,9 +765,66 @@ func TestConnectionSettings(t *testing.T) {
})
}

func TestClientRequestConnectionSettings(t *testing.T) {
testClients(
t, func(t *testing.T, client OpAMPClient) {
opampSettings := &protobufs.OpAMPConnectionSettings{DestinationEndpoint: "http://opamp.com"}

var srvReceivedRequest int64
// Start a Server.
srv := internal.StartMockServer(t)
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
if msg != nil && msg.ConnectionSettingsRequest != nil {
atomic.AddInt64(&srvReceivedRequest, 1)
return &protobufs.ServerToAgent{
ConnectionSettings: &protobufs.ConnectionSettingsOffers{
Opamp: opampSettings,
},
}
}
return nil
}

var clientGotOpampSettings int64

// Start a client.
settings := types.StartSettings{
Callbacks: types.CallbacksStruct{
OnOpampConnectionSettingsFunc: func(
ctx context.Context, settings *protobufs.OpAMPConnectionSettings,
) error {
assert.True(t, proto.Equal(opampSettings, settings))
atomic.AddInt64(&clientGotOpampSettings, 1)
return nil
},
},
Capabilities: protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings,
}
settings.OpAMPServerURL = "ws://" + srv.Endpoint
prepareClient(t, &settings, client)

assert.NoError(t, client.Start(context.Background(), settings))

client.RequestConnectionSettings(&protobufs.ConnectionSettingsRequest{})

// Wait until server receives the request.
eventually(t, func() bool { return atomic.LoadInt64(&srvReceivedRequest) == 1 })

// Wait until client receives the server's response.
eventually(t, func() bool { return atomic.LoadInt64(&clientGotOpampSettings) == 1 })

// Shutdown the Server.
srv.Close()

// Shutdown the client.
err := client.Stop(context.Background())
assert.NoError(t, err)
},
)
}

func TestReportAgentDescription(t *testing.T) {
testClients(t, func(t *testing.T, client OpAMPClient) {

// Start a Server.
srv := internal.StartMockServer(t)
srv.EnableExpectMode()
Expand Down
4 changes: 4 additions & 0 deletions client/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ func (c *httpClient) SetAgentDescription(descr *protobufs.AgentDescription) erro
return c.common.SetAgentDescription(descr)
}

func (c *httpClient) RequestConnectionSettings(request *protobufs.ConnectionSettingsRequest) error {
return c.common.RequestConnectionSettings(request)
}

// SetHealth implements OpAMPClient.SetHealth.
func (c *httpClient) SetHealth(health *protobufs.ComponentHealth) error {
return c.common.SetHealth(health)
Expand Down
10 changes: 10 additions & 0 deletions client/internal/clientcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,16 @@ func (c *ClientCommon) SetAgentDescription(descr *protobufs.AgentDescription) er
return nil
}

func (c *ClientCommon) RequestConnectionSettings(request *protobufs.ConnectionSettingsRequest) error {
c.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
msg.ConnectionSettingsRequest = request
},
)
c.sender.ScheduleSend()
return nil
}

// SetHealth sends a status update to the Server with the new agent health
// and remembers the health in the client state so that it can be sent
// to the Server when the Server asks for it.
Expand Down
4 changes: 4 additions & 0 deletions client/wsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ func (c *wsClient) SetAgentDescription(descr *protobufs.AgentDescription) error
return c.common.SetAgentDescription(descr)
}

func (c *wsClient) RequestConnectionSettings(request *protobufs.ConnectionSettingsRequest) error {
return c.common.RequestConnectionSettings(request)
}

func (c *wsClient) SetHealth(health *protobufs.ComponentHealth) error {
return c.common.SetHealth(health)
}
Expand Down
145 changes: 134 additions & 11 deletions internal/examples/agent/agent/agent.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package agent

import (
"bytes"
"context"
cryptorand "crypto/rand"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"fmt"
"math/rand"
Expand Down Expand Up @@ -63,6 +67,9 @@ type Agent struct {
// The TLS certificate used for the OpAMP connection. Can be nil, meaning no client-side
// certificate is used.
opampClientCert *tls.Certificate

certRequested bool
clientPrivateKeyPEM []byte
}

func NewAgent(logger types.Logger, agentType string, agentVersion string) *Agent {
Expand Down Expand Up @@ -133,6 +140,14 @@ func (agent *Agent) connect() error {
return err
}

// This sets the request to create a client certificate before the OpAMP client
// is started, before the connection is established. However, this assumes the
// server supports "AcceptsConnectionRequest" capability.
// Alternatively the agent can perform this request after receiving the first
// message from the server (in onMessage), i.e. after the server capabilities
// become known and can be checked.
agent.requestClientCertificate()

agent.logger.Debugf("Starting OpAMP client...")

err = agent.opampClient.Start(context.Background(), settings)
Expand Down Expand Up @@ -341,17 +356,96 @@ func (agent *Agent) Shutdown() {
}
}

// requestClientCertificate sets a request to be sent to the Server to create
// a client certificate that the Agent can use in subsequent OpAMP connections.
// This is the initiating step of the Client Signing Request (CSR) flow.
func (agent *Agent) requestClientCertificate() {
if agent.certRequested {
// Request only once, for bootstrapping.
// TODO: the Agent may also for example check that the current certificate
// is approaching expiration date and re-requests a new certificate.
return
}

// Generate a keypair for new client cert.
clientCertKeyPair, err := rsa.GenerateKey(cryptorand.Reader, 4096)
if err != nil {
agent.logger.Errorf("Cannot generate keypair: %v", err)
return
}

// Encode the private key of the keypair as DER.
privateKeyDER := x509.MarshalPKCS1PrivateKey(clientCertKeyPair)

// Convert private key from DER to PEM.
privateKeyPEM := new(bytes.Buffer)
pem.Encode(
privateKeyPEM, &pem.Block{
Type: "RSA PRIVATE KEY",
Bytes: privateKeyDER,
},
)
// Keep it. We will need it in later steps of the flow.
agent.clientPrivateKeyPEM = privateKeyPEM.Bytes()

// Create the CSR.
template := x509.CertificateRequest{
Subject: pkix.Name{
CommonName: "OpAMP Example Client",
Organization: []string{"OpenTelemetry OpAMP Workgroup"},
Locality: []string{"Agent-initiated"},
// Where do we put instance_uid?
},
SignatureAlgorithm: x509.SHA256WithRSA,
}

derBytes, err := x509.CreateCertificateRequest(cryptorand.Reader, &template, clientCertKeyPair)
if err != nil {
agent.logger.Errorf("Failed to create certificate request: %s", err)
return
}

// Convert CSR from DER to PEM format.
csrPEM := new(bytes.Buffer)
pem.Encode(
csrPEM, &pem.Block{
Type: "CERTIFICATE REQUEST",
Bytes: derBytes,
},
)

// Send the request to the Server (immediately if already connected
// or upon next successful connection).
err = agent.opampClient.RequestConnectionSettings(
&protobufs.ConnectionSettingsRequest{
Opamp: &protobufs.OpAMPConnectionSettingsRequest{
CertificateRequest: &protobufs.CertificateRequest{
Csr: csrPEM.Bytes(),
},
},
},
)
if err != nil {
agent.logger.Errorf("Failed to send CSR to server: %s", err)
return
}

agent.certRequested = true
}

func (agent *Agent) onMessage(ctx context.Context, msg *types.MessageData) {
configChanged := false
if msg.RemoteConfig != nil {
var err error
configChanged, err = agent.applyRemoteConfig(msg.RemoteConfig)
if err != nil {
agent.opampClient.SetRemoteConfigStatus(&protobufs.RemoteConfigStatus{
LastRemoteConfigHash: msg.RemoteConfig.ConfigHash,
Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED,
ErrorMessage: err.Error(),
})
agent.opampClient.SetRemoteConfigStatus(
&protobufs.RemoteConfigStatus{
LastRemoteConfigHash: msg.RemoteConfig.ConfigHash,
Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED,
ErrorMessage: err.Error(),
},
)
} else {
agent.opampClient.SetRemoteConfigStatus(&protobufs.RemoteConfigStatus{
LastRemoteConfigHash: msg.RemoteConfig.ConfigHash,
Expand All @@ -378,6 +472,15 @@ func (agent *Agent) onMessage(ctx context.Context, msg *types.MessageData) {
agent.logger.Errorf(err.Error())
}
}

// TODO: check that the Server has AcceptsConnectionSettingsRequest capability before
// requesting a certificate.
// This is actually a no-op since we already made the request when connecting
// (see connect()). However we keep this call here to demonstrate that requesting it
// in onMessage callback is also an option. This approach should be used if it is
// necessary to check for AcceptsConnectionSettingsRequest (if the Agent is
// not certain that the Server has this capability).
agent.requestClientCertificate()
}

func (agent *Agent) tryChangeOpAMPCert(cert *tls.Certificate) {
Expand Down Expand Up @@ -419,13 +522,33 @@ func (agent *Agent) onOpampConnectionSettings(ctx context.Context, settings *pro
}

func (agent *Agent) getCertFromSettings(certificate *protobufs.TLSCertificate) (*tls.Certificate, error) {
// Parse the key pair to a certificate that can be used for network connections.
cert, err := tls.X509KeyPair(
certificate.PublicKey,
certificate.PrivateKey,
)
// Parse the key pair to a TLS certificate that can be used for network connections.

// There are 2 types of certificate creation flows in OpAMP: client-initiated CSR
// and server-initiated. In this example we demonstrate both flows.
// Real-world Agent implementations will probably choose and use only one of these flows.

var cert tls.Certificate
var err error
if certificate.PrivateKey == nil && agent.clientPrivateKeyPEM != nil {
// Client-initiated CSR flow. This is currently initiated when connecting
// to the Server for the first time (see requestClientCertificate()).
cert, err = tls.X509KeyPair(
certificate.PublicKey, // We received the certificate from the Server.
agent.clientPrivateKeyPEM, // Private key was earlier locally generated.
)
} else {
// Server-initiated flow. This is currently initiated by user clicking a button in
// the Server UI.
// Both certificate and private key are from the Server.
cert, err = tls.X509KeyPair(
certificate.PublicKey,
certificate.PrivateKey,
)
}

if err != nil {
agent.logger.Errorf("Received invalid certificate offer: %s\n", err)
agent.logger.Errorf("Received invalid certificate offer: %s\n", err.Error())
return nil, err
}

Expand Down
Loading

0 comments on commit bd6f23e

Please sign in to comment.