Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
shmsr committed Nov 28, 2024
1 parent 492eca1 commit 413010f
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 56 deletions.
11 changes: 8 additions & 3 deletions x-pack/metricbeat/module/openai/usage/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package usage

import (
"fmt"
"net/http"
"time"

Expand All @@ -13,8 +14,7 @@ import (
"github.com/elastic/elastic-agent-libs/logp"
)

// RLHTTPClient implements a rate-limited HTTP client that wraps the standard http.Client
// with a rate limiter to control API request frequency.
// RLHTTPClient wraps the standard http.Client with a rate limiter to control API request frequency.
type RLHTTPClient struct {
client *http.Client
logger *logp.Logger
Expand Down Expand Up @@ -42,7 +42,12 @@ func (c *RLHTTPClient) Do(req *http.Request) (*http.Response, error) {
c.logger.Infof("Rate limit wait exceeded threshold: %v", waitDuration)
}

return c.client.Do(req)
resp, err := c.client.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to execute request: %w", err)
}

return resp, nil
}

// newClient creates a new rate-limited HTTP client with specified rate limiter and timeout.
Expand Down
6 changes: 6 additions & 0 deletions x-pack/metricbeat/module/openai/usage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package usage
import (
"errors"
"fmt"
"net/url"
"time"
)

Expand Down Expand Up @@ -56,6 +57,11 @@ func (c *Config) Validate() error {
}
if c.APIURL == "" {
errs = append(errs, errors.New("api_url cannot be empty"))
} else {
_, err := url.ParseRequestURI(c.APIURL)
if err != nil {
errs = append(errs, fmt.Errorf("invalid api_url format: %w", err))
}
}
if c.RateLimit == nil {
errs = append(errs, errors.New("rate_limit must be configured"))
Expand Down
11 changes: 9 additions & 2 deletions x-pack/metricbeat/module/openai/usage/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,25 @@ package usage

import "strings"

// dateFormatForStateStore is used to parse and format dates in the YYYY-MM-DD format
const dateFormatForStateStore = "2006-01-02"

func ptr[T any](value T) *T {
return &value
}

func processHeaders(headers []string) map[string]string {
headersMap := make(map[string]string, len(headers))
for _, header := range headers {
parts := strings.Split(header, ":")
parts := strings.SplitN(header, ":", 2)
if len(parts) != 2 {
continue
}
headersMap[strings.TrimSpace(parts[0])] = strings.TrimSpace(parts[1])
k, v := strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1])
if k == "" || v == "" {
continue
}
headersMap[k] = v
}
return headersMap
}
2 changes: 1 addition & 1 deletion x-pack/metricbeat/module/openai/usage/persistcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (s *stateManager) GetLastProcessedDate(apiKey string) (time.Time, error) {
return time.Time{}, fmt.Errorf("get state: %w", err)
}

return time.Parse("2006-01-02", dateStr)
return time.Parse(dateFormatForStateStore, dateStr)
}

// hashKey generates and caches a SHA-256 hash of the provided API key
Expand Down
79 changes: 30 additions & 49 deletions x-pack/metricbeat/module/openai/usage/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,18 @@

package usage

type BaseData struct {
OrganizationID string `json:"organization_id"`
OrganizationName string `json:"organization_name"`
UserID *string `json:"user_id"`
ApiKeyID *string `json:"api_key_id"`
ApiKeyName *string `json:"api_key_name"`
ApiKeyRedacted *string `json:"api_key_redacted"`
ApiKeyType *string `json:"api_key_type"`
ProjectID *string `json:"project_id"`
ProjectName *string `json:"project_name"`
}

type UsageResponse struct {
Object string `json:"object"`
Data []UsageData `json:"data"`
Expand All @@ -16,71 +28,40 @@ type UsageResponse struct {
}

type UsageData struct {
OrganizationID string `json:"organization_id"`
OrganizationName string `json:"organization_name"`
BaseData
AggregationTimestamp int64 `json:"aggregation_timestamp"`
NRequests int `json:"n_requests"`
Operation string `json:"operation"`
SnapshotID string `json:"snapshot_id"`
NContextTokensTotal int `json:"n_context_tokens_total"`
NGeneratedTokensTotal int `json:"n_generated_tokens_total"`
Email *string `json:"email"`
ApiKeyID *string `json:"api_key_id"`
ApiKeyName *string `json:"api_key_name"`
ApiKeyRedacted *string `json:"api_key_redacted"`
ApiKeyType *string `json:"api_key_type"`
ProjectID *string `json:"project_id"`
ProjectName *string `json:"project_name"`
RequestType string `json:"request_type"`
NCachedContextTokensTotal int `json:"n_cached_context_tokens_total"`
}

type DalleData struct {
Timestamp int64 `json:"timestamp"`
NumImages int `json:"num_images"`
NumRequests int `json:"num_requests"`
ImageSize string `json:"image_size"`
Operation string `json:"operation"`
UserID *string `json:"user_id"`
OrganizationID string `json:"organization_id"`
ApiKeyID *string `json:"api_key_id"`
ApiKeyName *string `json:"api_key_name"`
ApiKeyRedacted *string `json:"api_key_redacted"`
ApiKeyType *string `json:"api_key_type"`
OrganizationName string `json:"organization_name"`
ModelID string `json:"model_id"`
ProjectID *string `json:"project_id"`
ProjectName *string `json:"project_name"`
BaseData
Timestamp int64 `json:"timestamp"`
NumImages int `json:"num_images"`
NumRequests int `json:"num_requests"`
ImageSize string `json:"image_size"`
Operation string `json:"operation"`
ModelID string `json:"model_id"`
}

type WhisperData struct {
Timestamp int64 `json:"timestamp"`
ModelID string `json:"model_id"`
NumSeconds int `json:"num_seconds"`
NumRequests int `json:"num_requests"`
UserID *string `json:"user_id"`
OrganizationID string `json:"organization_id"`
ApiKeyID *string `json:"api_key_id"`
ApiKeyName *string `json:"api_key_name"`
ApiKeyRedacted *string `json:"api_key_redacted"`
ApiKeyType *string `json:"api_key_type"`
OrganizationName string `json:"organization_name"`
ProjectID *string `json:"project_id"`
ProjectName *string `json:"project_name"`
BaseData
Timestamp int64 `json:"timestamp"`
ModelID string `json:"model_id"`
NumSeconds int `json:"num_seconds"`
NumRequests int `json:"num_requests"`
}

type TtsData struct {
Timestamp int64 `json:"timestamp"`
ModelID string `json:"model_id"`
NumCharacters int `json:"num_characters"`
NumRequests int `json:"num_requests"`
UserID *string `json:"user_id"`
OrganizationID string `json:"organization_id"`
ApiKeyID *string `json:"api_key_id"`
ApiKeyName *string `json:"api_key_name"`
ApiKeyRedacted *string `json:"api_key_redacted"`
ApiKeyType *string `json:"api_key_type"`
OrganizationName string `json:"organization_name"`
ProjectID *string `json:"project_id"`
ProjectName *string `json:"project_name"`
BaseData
Timestamp int64 `json:"timestamp"`
ModelID string `json:"model_id"`
NumCharacters int `json:"num_characters"`
NumRequests int `json:"num_requests"`
}
2 changes: 1 addition & 1 deletion x-pack/metricbeat/module/openai/usage/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (m *MetricSet) processResponse(resp *http.Response, dateStr string) error {
return fmt.Errorf("error decoding response: %w", err)
}

m.logger.Info("Fetched usage metrics for date:", dateStr)
m.logger.Infof("Fetched usage metrics for date: %s", dateStr)

events := make([]mb.Event, 0, len(usageResponse.Data))

Expand Down

0 comments on commit 413010f

Please sign in to comment.