-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add support for exporting metrics via wasitelmetrics.Exporter
Signed-off-by: Joonas Bergius <[email protected]>
- Loading branch information
Showing
11 changed files
with
928 additions
and
15 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
package wasitelmetric | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"encoding/json" | ||
"fmt" | ||
"net/http" | ||
"net/url" | ||
|
||
"go.wasmcloud.dev/component/net/wasihttp" | ||
"go.wasmcloud.dev/component/x/wasitel/wasitelmetric/internal/types" | ||
) | ||
|
||
type clienti interface { | ||
UploadMetrics(context.Context, *types.ResourceMetrics) error | ||
Shutdown(context.Context) error | ||
} | ||
|
||
func newClient(opts ...Option) *client { | ||
cfg := newConfig(opts...) | ||
|
||
wasiTransport := &wasihttp.Transport{} | ||
httpClient := &http.Client{Transport: wasiTransport} | ||
|
||
return &client{ | ||
config: cfg, | ||
httpClient: httpClient, | ||
} | ||
} | ||
|
||
type client struct { | ||
config config | ||
httpClient *http.Client | ||
} | ||
|
||
func (c *client) UploadMetrics(ctx context.Context, rm *types.ResourceMetrics) error { | ||
export := &types.ExportMetricsServiceRequest{ | ||
ResourceMetrics: []*types.ResourceMetrics{rm}, | ||
} | ||
|
||
body, err := json.Marshal(export) | ||
if err != nil { | ||
return fmt.Errorf("failed to serialize export request to JSON: %w", err) | ||
} | ||
|
||
u := c.getUrl() | ||
req, err := http.NewRequest(http.MethodPost, u.String(), bytes.NewBuffer(body)) | ||
if err != nil { | ||
return fmt.Errorf("failed to create request to %q: %w", u.String(), err) | ||
} | ||
req.Header.Set("Content-Type", "application/json") | ||
|
||
_, err = c.httpClient.Do(req) | ||
if err != nil { | ||
return fmt.Errorf("failed to request %q: %w", u.String(), err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// Shutdown shuts down the client, freeing all resources. | ||
func (c *client) Shutdown(ctx context.Context) error { | ||
c.httpClient = nil | ||
return ctx.Err() | ||
} | ||
|
||
func (c *client) getUrl() url.URL { | ||
scheme := "http" | ||
if !c.config.Insecure { | ||
scheme = "https" | ||
} | ||
u := url.URL{ | ||
Scheme: scheme, | ||
Host: c.config.Endpoint, | ||
Path: c.config.Path, | ||
} | ||
return u | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
package wasitelmetric | ||
|
||
import ( | ||
"fmt" | ||
"net/url" | ||
) | ||
|
||
const ( | ||
// DefaultPort is the default HTTP port of the collector. | ||
DefaultPort uint16 = 4318 | ||
// DefaultHost is the host address the client will attempt | ||
// connect to if no collector address is provided. | ||
DefaultHost string = "localhost" | ||
// DefaultPath is a default URL path for endpoint that receives spans. | ||
DefaultPath string = "/v1/metrics" | ||
) | ||
|
||
type config struct { | ||
Endpoint string | ||
Insecure bool | ||
Path string | ||
} | ||
|
||
func newConfig(opts ...Option) config { | ||
cfg := config{ | ||
Insecure: true, | ||
Endpoint: fmt.Sprintf("%s:%d", DefaultHost, DefaultPort), | ||
Path: DefaultPath, | ||
} | ||
for _, opt := range opts { | ||
cfg = opt.apply(cfg) | ||
} | ||
return cfg | ||
} | ||
|
||
type Option interface { | ||
apply(config) config | ||
} | ||
|
||
func newWrappedOption(fn func(config) config) Option { | ||
return &wrappedOption{fn: fn} | ||
} | ||
|
||
type wrappedOption struct { | ||
fn func(config) config | ||
} | ||
|
||
func (o *wrappedOption) apply(cfg config) config { | ||
return o.fn(cfg) | ||
} | ||
|
||
func WithEndpoint(endpoint string) Option { | ||
return newWrappedOption(func(cfg config) config { | ||
cfg.Endpoint = endpoint | ||
return cfg | ||
}) | ||
} | ||
|
||
func WithEndpointURL(eu string) Option { | ||
return newWrappedOption(func(cfg config) config { | ||
u, err := url.Parse(eu) | ||
if err != nil { | ||
return cfg | ||
} | ||
|
||
cfg.Endpoint = u.Host | ||
cfg.Path = u.Path | ||
if u.Scheme != "https" { | ||
cfg.Insecure = true | ||
} | ||
|
||
return cfg | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
package wasitelmetric | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"sync" | ||
|
||
metric "go.opentelemetry.io/otel/sdk/metric" | ||
"go.opentelemetry.io/otel/sdk/metric/metricdata" | ||
"go.wasmcloud.dev/component/x/wasitel/wasitelmetric/internal/convert" | ||
) | ||
|
||
func New(opts ...Option) (*Exporter, error) { | ||
client := newClient(opts...) | ||
return &Exporter{ | ||
client: client, | ||
}, nil | ||
} | ||
|
||
type Exporter struct { | ||
client *client | ||
|
||
temporalitySelector metric.TemporalitySelector | ||
aggregationSelector metric.AggregationSelector | ||
|
||
stopped bool | ||
stoppedMu sync.RWMutex | ||
} | ||
|
||
var _ metric.Exporter = (*Exporter)(nil) | ||
|
||
// Temporality returns the Temporality to use for an instrument kind. | ||
func (e *Exporter) Temporality(k metric.InstrumentKind) metricdata.Temporality { | ||
return e.temporalitySelector(k) | ||
} | ||
|
||
// Aggregation returns the Aggregation to use for an instrument kind. | ||
func (e *Exporter) Aggregation(k metric.InstrumentKind) metric.Aggregation { | ||
return e.aggregationSelector(k) | ||
} | ||
|
||
func (e *Exporter) Export(ctx context.Context, data *metricdata.ResourceMetrics) error { | ||
err := ctx.Err() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// Check whether the exporter has been told to Shutdown | ||
e.stoppedMu.RLock() | ||
stopped := e.stopped | ||
e.stoppedMu.RUnlock() | ||
if stopped { | ||
return nil | ||
} | ||
|
||
metrics, err := convert.ResourceMetrics(data) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
err = e.client.UploadMetrics(ctx, metrics) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// ForceFlush flushes any metric data held by an exporter. | ||
// | ||
// This method returns an error if called after Shutdown. | ||
// This method returns an error if the method is canceled by the passed context. | ||
// | ||
// This method is safe to call concurrently. | ||
func (e *Exporter) ForceFlush(ctx context.Context) error { | ||
// Check whether the exporter has been told to Shutdown | ||
e.stoppedMu.RLock() | ||
stopped := e.stopped | ||
e.stoppedMu.RUnlock() | ||
if stopped { | ||
return errShutdown | ||
} | ||
return ctx.Err() | ||
} | ||
|
||
var errShutdown = fmt.Errorf("Exporter is shutdown") | ||
|
||
// Shutdown flushes all metric data held by an exporter and releases any held | ||
// computational resources. | ||
// | ||
// This method returns an error if called after Shutdown. | ||
// This method returns an error if the method is canceled by the passed context. | ||
// | ||
// This method is safe to call concurrently. | ||
func (e *Exporter) Shutdown(ctx context.Context) error { | ||
e.stoppedMu.Lock() | ||
e.stopped = true | ||
e.stoppedMu.Unlock() | ||
|
||
return nil | ||
} | ||
|
||
// MarshalLog is the marshaling function used by the logging system to represent this Exporter. | ||
func (e *Exporter) MarshalLog() interface{} { | ||
return struct { | ||
Type string | ||
}{ | ||
Type: "wasitelmetric", | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
package convert | ||
|
||
import ( | ||
"go.opentelemetry.io/otel/sdk/metric/metricdata" | ||
"go.wasmcloud.dev/component/x/wasitel/wasitelmetric/internal/types" | ||
) | ||
|
||
func ResourceMetrics(data *metricdata.ResourceMetrics) (*types.ResourceMetrics, error) { | ||
return nil, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
// Original source: https://github.com/open-telemetry/opentelemetry-proto-go/blob/v1.3.1/slim/otlp/common/v1/common.pb.go | ||
package types | ||
|
||
// AnyValue is used to represent any type of attribute value. AnyValue may contain a | ||
// primitive value such as a string or integer or it may contain an arbitrary nested | ||
// object containing arrays, key-value lists and primitives. | ||
type AnyValue struct { | ||
// The value is one of the listed fields. It is valid for all values to be unspecified | ||
// in which case this AnyValue is considered to be "empty". | ||
// | ||
// Types that are assignable to Value: | ||
// *AnyValue_StringValue | ||
// *AnyValue_BoolValue | ||
// *AnyValue_IntValue | ||
// *AnyValue_DoubleValue | ||
// *AnyValue_ArrayValue | ||
// *AnyValue_KvlistValue | ||
// *AnyValue_BytesValue | ||
StringValue string `json:"stringValue,omitempty"` | ||
BoolValue bool `json:"boolValue,omitempty"` | ||
IntValue int64 `json:"intValue,omitempty"` | ||
DoubleValue float64 `json:"doubleValue,omitempty"` | ||
ArrayValue *ArrayValue `json:"arrayValue,omitempty"` | ||
KvlistValue *KeyValueList `json:"kvlistValue,omitempty"` | ||
BytesValue []byte `json:"bytesValue,omitempty"` | ||
} | ||
|
||
// ArrayValue is a list of AnyValue messages. We need ArrayValue as a message | ||
// since oneof in AnyValue does not allow repeated fields. | ||
type ArrayValue struct { | ||
// Array of values. The array may be empty (contain 0 elements). | ||
Values []*AnyValue `json:"values,omitempty"` | ||
} | ||
|
||
// KeyValueList is a list of KeyValue messages. We need KeyValueList as a message | ||
// since `oneof` in AnyValue does not allow repeated fields. Everywhere else where we need | ||
// a list of KeyValue messages (e.g. in Span) we use `repeated KeyValue` directly to | ||
// avoid unnecessary extra wrapping (which slows down the protocol). The 2 approaches | ||
// are semantically equivalent. | ||
type KeyValueList struct { | ||
// A collection of key/value pairs of key-value pairs. The list may be empty (may | ||
// contain 0 elements). | ||
// The keys MUST be unique (it is not allowed to have more than one | ||
// value with the same key). | ||
Values []*KeyValue `json:"values,omitempty"` | ||
} | ||
|
||
// KeyValue is a key-value pair that is used to store Span attributes, Link | ||
// attributes, etc. | ||
type KeyValue struct { | ||
Key string `json:"key,omitempty"` | ||
Value *AnyValue `json:"value,omitempty"` | ||
} | ||
|
||
// InstrumentationScope is a message representing the instrumentation scope information | ||
// such as the fully qualified name and version. | ||
type InstrumentationScope struct { | ||
// An empty instrumentation scope name means the name is unknown. | ||
Name string `json:"name,omitempty"` | ||
Version string `json:"version,omitempty"` | ||
// Additional attributes that describe the scope. [Optional]. | ||
// Attribute keys MUST be unique (it is not allowed to have more than one | ||
// attribute with the same key). | ||
Attributes []*KeyValue `json:"attributes,omitempty"` | ||
DroppedAttributesCount uint32 `json:"dropped_attributes_count,omitempty"` | ||
} |
Oops, something went wrong.