Skip to content

Commit

Permalink
feat: dual write to clickhouse
Browse files Browse the repository at this point in the history
  • Loading branch information
chronark committed Sep 5, 2024
1 parent 5748780 commit 972f46b
Show file tree
Hide file tree
Showing 12 changed files with 281 additions and 268 deletions.
1 change: 1 addition & 0 deletions apps/agent/cmd/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ func run(c *cli.Context) error {
BufferSize: cfg.Services.EventRouter.Tinybird.BufferSize,
FlushInterval: time.Duration(cfg.Services.EventRouter.Tinybird.FlushInterval) * time.Second,
Tinybird: tinybird.New("https://api.tinybird.co", cfg.Services.EventRouter.Tinybird.Token),
Clickhouse: ch,
AuthToken: cfg.AuthToken,
})
if err != nil {
Expand Down
14 changes: 10 additions & 4 deletions apps/agent/pkg/api/testutil/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/stretchr/testify/require"
"github.com/unkeyed/unkey/apps/agent/pkg/api/routes"
"github.com/unkeyed/unkey/apps/agent/pkg/api/validation"
"github.com/unkeyed/unkey/apps/agent/pkg/cluster"
"github.com/unkeyed/unkey/apps/agent/pkg/logging"
"github.com/unkeyed/unkey/apps/agent/pkg/membership"
Expand Down Expand Up @@ -79,11 +80,16 @@ func (h *Harness) Register(route *routes.Route) {
}

func (h *Harness) SetupRoute(constructor func(svc routes.Services) *routes.Route) *routes.Route {

validator, err := validation.New("./pkg/openapi/openapi.json")
require.NoError(h.t, err)
route := constructor(routes.Services{
Logger: h.logger,
Metrics: h.metrics,
Ratelimit: h.ratelimit,
Vault: nil,
Logger: h.logger,
Metrics: h.metrics,
Ratelimit: h.ratelimit,
Vault: nil,
OpenApiValidator: validator,
Sender: routes.NewJsonSender(h.logger),
})
h.Register(route)
return route
Expand Down
23 changes: 20 additions & 3 deletions apps/agent/pkg/clickhouse/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ type Clickhouse struct {
conn ch.Conn
logger logging.Logger

requests *batch.BatchProcessor[schema.ApiRequestV1]
requests *batch.BatchProcessor[schema.ApiRequestV1]
keyVerifications *batch.BatchProcessor[schema.KeyVerificationRequestV1]
}

type Config struct {
Expand Down Expand Up @@ -61,7 +62,20 @@ func New(config Config) (*Clickhouse, error) {
FlushInterval: time.Second,
Consumers: 4,
Flush: func(ctx context.Context, rows []schema.ApiRequestV1) {
table := "api_requests__v1"
table := "raw_api_requests_v1"
err := flush(ctx, conn, table, rows)
if err != nil {
config.Logger.Error().Err(err).Str("table", table).Msg("failed to flush batch")
}
},
}),
keyVerifications: batch.New[schema.KeyVerificationRequestV1](batch.Config[schema.KeyVerificationRequestV1]{
BatchSize: 1000,
BufferSize: 100000,
FlushInterval: time.Second,
Consumers: 4,
Flush: func(ctx context.Context, rows []schema.KeyVerificationRequestV1) {
table := "raw_key_verifications_v1"
err := flush(ctx, conn, table, rows)
if err != nil {
config.Logger.Error().Err(err).Str("table", table).Msg("failed to flush batch")
Expand All @@ -83,6 +97,9 @@ func (c *Clickhouse) Shutdown(ctx context.Context) error {
}

func (c *Clickhouse) BufferApiRequest(req schema.ApiRequestV1) {
c.logger.Info().Msg("buffering api request")
c.requests.Buffer(req)
}

func (c *Clickhouse) BufferKeyVerification(req schema.KeyVerificationRequestV1) {
c.keyVerifications.Buffer(req)
}
1 change: 1 addition & 0 deletions apps/agent/pkg/clickhouse/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ import (

type Bufferer interface {
BufferApiRequest(schema.ApiRequestV1)
BufferKeyVerification(schema.KeyVerificationRequestV1)
}
3 changes: 3 additions & 0 deletions apps/agent/pkg/clickhouse/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ var _ Bufferer = &noop{}
func (n *noop) BufferApiRequest(schema.ApiRequestV1) {
return
}
func (n *noop) BufferKeyVerification(schema.KeyVerificationRequestV1) {
return
}

func NewNoop() *noop {
return &noop{}
Expand Down
72 changes: 72 additions & 0 deletions apps/agent/pkg/clickhouse/schema/000_README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@

# ClickHouse Table Naming Conventions

This document outlines the naming conventions for tables and materialized views in our ClickHouse setup. Adhering to these conventions ensures consistency, clarity, and ease of management across our data infrastructure.

## General Rules

1. Use lowercase letters and separate words with underscores.
2. Avoid ClickHouse reserved words and special characters in names.
3. Be descriptive but concise.

## Table Naming Convention

Format: `[prefix]_[domain]_[description]_[version]`

### Prefixes

- `raw_`: Input data tables
- `mv_`: Materialized views
- `tmp_{yourname}_`: Temporary tables for experiments, add your name, so it's easy to identify ownership.

### Domain/Category

Include the domain or category of the data when applicable.

Examples:
- `keys`
- `audit`
- `user`
- `gateway`

### Versioning

- Version numbers: `_v1`, `_v2`, etc.

### Aggregation Suffixes

For aggregated or summary tables, use suffixes like:
- `_daily`
- `_monthly`
- `_summary`

## Materialized View Naming Convention

Format: `mv_[description]_[aggregation]`

- Always prefix with `mv_`
- Include a description of the view's purpose
- Add aggregation level if applicable

## Examples

1. Raw Data Table:
`raw_sales_transactions_v1`

2. Materialized View:
`mv_active_users_daily_v2`

3. Temporary Table:
`tmp_andreas_user_analysis_v1`

4. Aggregated Table:
`mv_sales_summary_daily_v1`

## Consistency Across Related Objects

Maintain consistent naming across related tables, views, and other objects:

- `raw_user_activity_v1`
- `mv_user_activity_daily_v1`

By following these conventions, we ensure a clear, consistent, and scalable naming structure for our ClickHouse setup.
11 changes: 9 additions & 2 deletions apps/agent/pkg/clickhouse/schema/001_create_requests_table.sql
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
-- +goose up
CREATE TABLE default.api_requests__v1(
CREATE TABLE default.raw_api_requests_v1(
request_id String,
-- unix milli
time Int64,

workspace_id String,

host String,

-- Upper case HTTP method
-- Examples: "GET", "POST", "PUT", "DELETE"
method LowCardinality(String),
path String,
-- "Key: Value" pairs
Expand All @@ -19,4 +25,5 @@ CREATE TABLE default.api_requests__v1(

)
ENGINE = MergeTree()
PRIMARY KEY (request_id);
ORDER BY (workspace_id, time, request_id)
;
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
-- +goose up
CREATE TABLE default.raw_key_verifications_v1(
-- the api request id, so we can correlate the verification with traces and logs
request_id String,

-- unix milli
time Int64,

workspace_id String,
key_space_id String,
key_id String,

-- Right now this is a 3 character airport code, but when we move to aws,
-- this will be the region code such as `us-east-1`
region String,

-- Examples:
-- - "VALID"
-- - "RATE_LIMITED"
-- - "EXPIRED"
-- - "DISABLED
outcome LowCardinality(String),

-- Empty string if the key has no identity
identity_id String,

)
ENGINE = MergeTree()
ORDER BY (workspace_id, key_space_id, key_id, time)
;
11 changes: 11 additions & 0 deletions apps/agent/pkg/clickhouse/schema/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,14 @@ type ApiRequestV1 struct {
ResponseBody string `ch:"response_body"`
Error string `ch:"error"`
}

type KeyVerificationRequestV1 struct {
RequestID string `ch:"request_id"`
Time int64 `ch:"time"`
WorkspaceID string `ch:"workspace_id"`
KeySpaceID string `ch:"key_space_id"`
KeyID string `ch:"key_id"`
Region string `ch:"region"`
Outcome string `ch:"outcome"`
IdentityID string `ch:"identity_id"`
}
Loading

0 comments on commit 972f46b

Please sign in to comment.