diff --git a/.github/workflows/build-test-publish.yaml b/.github/workflows/build-test-publish.yaml new file mode 100644 index 000000000000..a03e97d1d709 --- /dev/null +++ b/.github/workflows/build-test-publish.yaml @@ -0,0 +1,480 @@ +name: build-test-publish +on: + push: + branches: [main] + tags: + - "v[0-9]+.[0-9]+.[0-9]+*" + pull_request: +env: + TEST_RESULTS: testbed/tests/results/junit/results.xml + # See: https://github.com/actions/cache/issues/810#issuecomment-1222550359 + # Cache downloads for this workflow consistently run in under 1 minute + SEGMENT_DOWNLOAD_TIMEOUT_MINS: 5 + +# Do not cancel this workflow on main. See https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/16616 +concurrency: + group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} + cancel-in-progress: true + +jobs: + setup-environment: + timeout-minutes: 30 + runs-on: ubuntu-latest + if: ${{ github.actor != 'dependabot[bot]' }} + steps: + - name: Checkout Repo + uses: actions/checkout@v3 + - name: Setup Go + uses: actions/setup-go@v3 + with: + go-version: 1.21 + - name: Cache Go + id: go-cache + uses: actions/cache@v3 + with: + path: | + ~/go/bin + ~/go/pkg/mod + key: go-cache-${{ runner.os }}-${{ hashFiles('**/go.sum') }} + - name: Install dependencies + if: steps.go-cache.outputs.cache-hit != 'true' + run: make -j2 gomoddownload + - name: Install Tools + if: steps.go-cache.outputs.cache-hit != 'true' + run: make install-tools + check-collector-module-version: + runs-on: ubuntu-latest + needs: [setup-environment] + steps: + - name: Checkout Repo + uses: actions/checkout@v3 + - name: Check Collector Module Version + run: ./.github/workflows/scripts/check-collector-module-version.sh + lint-matrix: + strategy: + matrix: + group: + - receiver-0 + - receiver-1 + - processor + - exporter + - extension + - connector + - internal + - other + runs-on: ubuntu-latest + needs: [setup-environment] + steps: + - name: Checkout Repo + uses: actions/checkout@v3 + - name: Setup Go + uses: actions/setup-go@v3 + with: + go-version: 1.21 + - name: Cache Go + id: go-cache + uses: actions/cache@v3 + with: + path: | + ~/go/bin + ~/go/pkg/mod + key: go-cache-${{ runner.os }}-${{ hashFiles('**/go.sum') }} + - name: Install dependencies + if: steps.go-cache.outputs.cache-hit != 'true' + run: make -j2 gomoddownload + - name: Install Tools + if: steps.go-cache.outputs.cache-hit != 'true' + run: make install-tools + - name: Cache Lint Build + uses: actions/cache@v3 + with: + path: ~/.cache/go-build + key: go-lint-build-${{ matrix.group }}-${{ runner.os }}-${{ hashFiles('**/go.sum') }} + - name: Lint + run: make -j2 golint GROUP=${{ matrix.group }} + lint: + if: ${{ github.actor != 'dependabot[bot]' && always() }} + runs-on: ubuntu-latest + needs: [setup-environment, lint-matrix] + steps: + - name: Print result + run: echo ${{ needs.lint-matrix.result }} + - name: Interpret result + run: | + if [[ success == ${{ needs.lint-matrix.result }} ]] + then + echo "All matrix jobs passed!" + else + echo "One or more matrix jobs failed." + false + fi + checks: + runs-on: ubuntu-latest + needs: [setup-environment] + steps: + - name: Checkout Repo + uses: actions/checkout@v3 + - name: Setup Go + uses: actions/setup-go@v3 + with: + go-version: 1.21 + - name: Cache Go + id: go-cache + uses: actions/cache@v3 + with: + path: | + ~/go/bin + ~/go/pkg/mod + key: go-cache-${{ runner.os }}-${{ hashFiles('**/go.sum') }} + - name: Install dependencies + if: steps.go-cache.outputs.cache-hit != 'true' + run: make -j2 gomoddownload + - name: Install Tools + if: steps.go-cache.outputs.cache-hit != 'true' + run: make install-tools + - name: CheckDoc + run: make checkdoc + - name: Porto + run: | + make -j2 goporto + git diff --exit-code || (echo 'Porto links are out of date, please run "make goporto" and commit the changes in this PR.' && exit 1) + - name: crosslink + run: | + make crosslink + git diff --exit-code || (echo 'Replace statements are out of date, please run "make crosslink" and commit the changes in this PR.' && exit 1) + - name: Check for go mod dependency changes + run: | + make gotidy + git diff --exit-code || (echo 'go.mod/go.sum deps changes detected, please run "make gotidy" and commit the changes in this PR.' && exit 1) + - name: Gen genotelcontribcol + run: | + make genotelcontribcol + git diff -s --exit-code || (echo 'Generated code is out of date, please run "make genotelcontribcol" and commit the changes in this PR.' && exit 1) + - name: Gen genoteltestbedcol + run: | + make genoteltestbedcol + git diff -s --exit-code || (echo 'Generated code is out of date, please run "make genoteltestbedcol" and commit the changes in this PR.' && exit 1) + - name: CodeGen + run: | + make -j2 generate + git diff --exit-code ':!*go.sum' || (echo 'Generated code is out of date, please run "make generate" and commit the changes in this PR.' && exit 1) + - name: Check gendependabot + run: | + make -j2 gendependabot + git diff --exit-code ':!*go.sum' || (echo 'dependabot.yml is out of date, please run "make gendependabot" and commit the changes in this PR.' && exit 1) + - name: MultimodVerify + run: make multimod-verify + - name: Components dropdown in issue templates + run: | + make generate-gh-issue-templates + git diff --exit-code '.github/ISSUE_TEMPLATE' || (echo 'Dropdowns in issue templates are out of date, please run "make generate-gh-issue-templates" and commit the changes in this PR.' && exit 1) + unittest-matrix: + strategy: + matrix: + go-version: ["1.21", "1.20"] # 1.20 is interpreted as 1.2 without quotes + group: + - receiver-0 + - receiver-1 + - processor + - exporter + - extension + - connector + - internal + - other + runs-on: ubuntu-latest + needs: [setup-environment] + steps: + - name: Collect Workflow Telemetry + if: always() + uses: runforesight/foresight-workflow-kit-action@v1 + with: + api_key: ${{ secrets.FORESIGHT_API_KEY }} + - name: Checkout Repo + uses: actions/checkout@v3 + - name: Setup Go + uses: actions/setup-go@v3 + with: + go-version: ${{ matrix.go-version }} + - name: Cache Go + id: go-cache + uses: actions/cache@v3 + with: + path: | + ~/go/bin + ~/go/pkg/mod + key: go-cache-${{ runner.os }}-${{ hashFiles('**/go.sum') }} + - name: Install dependencies + if: steps.go-cache.outputs.cache-hit != 'true' + run: make -j2 gomoddownload + - name: Install Tools + if: steps.go-cache.outputs.cache-hit != 'true' + run: make install-tools + - name: Cache Test Build + uses: actions/cache@v3 + with: + path: ~/.cache/go-build + key: go-test-build-${{ runner.os }}-${{ matrix.go-version }}-${{ hashFiles('**/go.sum') }} + - name: Run Unit Tests + run: make gotest GROUP=${{ matrix.group }} + unittest: + if: ${{ github.actor != 'dependabot[bot]' && always() }} + strategy: + matrix: + go-version: ["1.21", "1.20"] # 1.20 is interpreted as 1.2 without quotes + runs-on: ubuntu-latest + needs: [setup-environment, unittest-matrix] + steps: + - name: Print result + run: echo ${{ needs.unittest-matrix.result }} + - name: Interpret result + run: | + if [[ success == ${{ needs.unittest-matrix.result }} ]] + then + echo "All matrix jobs passed!" + else + echo "One or more matrix jobs failed." + false + fi + + integration-tests: + runs-on: ubuntu-latest + needs: [setup-environment] + steps: + - name: Checkout Repo + uses: actions/checkout@v3 + - name: Setup Go + uses: actions/setup-go@v3 + with: + go-version: 1.21 + - name: Cache Go + id: go-cache + uses: actions/cache@v3 + with: + path: | + ~/go/bin + ~/go/pkg/mod + key: go-cache-${{ runner.os }}-${{ hashFiles('**/go.sum') }} + - name: Run Integration Tests + run: make integration-tests-with-cover + + correctness-traces: + runs-on: ubuntu-latest + needs: [setup-environment] + steps: + - name: Checkout Repo + uses: actions/checkout@v3 + - name: Setup Go + uses: actions/setup-go@v3 + with: + go-version: 1.21 + - name: Cache Go + id: go-cache + uses: actions/cache@v3 + with: + path: | + ~/go/bin + ~/go/pkg/mod + key: go-cache-${{ runner.os }}-${{ hashFiles('**/go.sum') }} + - name: Install dependencies + if: steps.go-cache.outputs.cache-hit != 'true' + run: make -j2 gomoddownload + - name: Install Tools + if: steps.go-cache.outputs.cache-hit != 'true' + run: make install-tools + - name: Correctness + run: make -C testbed run-correctness-traces-tests + + correctness-metrics: + runs-on: ubuntu-latest + needs: [setup-environment] + steps: + - name: Checkout Repo + uses: actions/checkout@v3 + - name: Setup Go + uses: actions/setup-go@v3 + with: + go-version: 1.21 + - name: Cache Go + id: go-cache + uses: actions/cache@v3 + with: + path: | + ~/go/bin + ~/go/pkg/mod + key: go-cache-${{ runner.os }}-${{ hashFiles('**/go.sum') }} + - name: Install dependencies + if: steps.go-cache.outputs.cache-hit != 'true' + run: make -j2 gomoddownload + - name: Install Tools + if: steps.go-cache.outputs.cache-hit != 'true' + run: make install-tools + - name: Correctness + run: make -C testbed run-correctness-metrics-tests + + cross-compile: + runs-on: ubuntu-latest + needs: [unittest, integration-tests, lint] + strategy: + matrix: + os: + #- darwin + - linux + #- windows + arch: + #- 386 + - amd64 + #- arm + #- arm64 + #- ppc64le + exclude: + - os: darwin + arch: 386 + - os: darwin + arch: arm + - os: darwin + arch: ppc64le + - os: windows + arch: arm + - os: windows + arch: arm64 + - os: windows + arch: ppc64le + steps: + - name: Checkout Repo + uses: actions/checkout@v3 + - name: Setup Go + uses: actions/setup-go@v3 + with: + go-version: 1.21 + - name: Cache Go + id: go-cache + uses: actions/cache@v3 + with: + path: | + ~/go/bin + ~/go/pkg/mod + key: go-cache-${{ runner.os }}-${{ hashFiles('**/go.sum') }} + - name: Install dependencies + if: steps.go-cache.outputs.cache-hit != 'true' + run: make -j2 gomoddownload + - name: Install Tools + if: steps.go-cache.outputs.cache-hit != 'true' + run: make install-tools + - name: Build Collector ${{ matrix.binary }} + run: make GOOS=${{ matrix.os }} GOARCH=${{ matrix.arch }} otelcontribcol + - name: Upload Collector Binaries + uses: actions/upload-artifact@v3 + with: + name: collector-binaries + path: ./bin/* + + publish-dev: + permissions: + contents: 'read' + id-token: 'write' + runs-on: ubuntu-latest + needs: [lint, unittest, integration-tests, cross-compile] + if: (github.ref == 'refs/heads/main' || startsWith(github.ref, 'refs/tags/v')) + steps: + - name: Checkout Repo + uses: actions/checkout@v3 + - name: Setup Go + uses: actions/setup-go@v3 + with: + go-version: 1.21 + - name: Mkdir bin and dist + run: | + mkdir bin/ dist/ + - name: Cache Go + id: go-cache + uses: actions/cache@v3 + with: + path: | + ~/go/bin + ~/go/pkg/mod + key: go-cache-${{ runner.os }}-${{ hashFiles('**/go.sum') }} + - name: Install dependencies + if: steps.go-cache.outputs.cache-hit != 'true' + run: make -j2 gomoddownload + - name: Install Tools + if: steps.go-cache.outputs.cache-hit != 'true' + run: make install-tools + - name: Download Binaries + uses: actions/download-artifact@v3 + with: + name: collector-binaries + path: ./bin/ + - run: chmod +x bin/* + - name: Sanitize branch name and create version + id: create-version + env: + BRANCH: ${{github.ref_name}} + RUN_NUMBER: ${{github.run_number}} + BASE_VERSION: "0.0.0" + run: | + # let's simply use the k8s namespace rules (even stricter) and have the same version(-suffix) for everything + # lowercase everything and replace all invalid characters with '-' and trim to 60 characters + SANITIZED_BRANCH=$(echo -n "${BRANCH}" | tr '[:upper:]' '[:lower:]' | tr -C 'a-z0-9' '-') + SANITIZED_BRANCH="${SANITIZED_BRANCH:0:60}" + + BUILD_VERSION="${BASE_VERSION}-${SANITIZED_BRANCH}-${RUN_NUMBER}" + echo "BUILD_VERSION=${BUILD_VERSION}" | tee -a $GITHUB_ENV $GITHUB_OUTPUT + - name: Build Docker Image + run: | + make docker-otelcontribcol + docker tag otelcontribcol:latest 609927696493.dkr.ecr.us-west-2.amazonaws.com/opentelemetry-collector-contrib:${{ steps.create-version.outputs.BUILD_VERSION }} + - name: Validate Docker Image + run: | + docker run 609927696493.dkr.ecr.us-west-2.amazonaws.com/opentelemetry-collector-contrib:${{ steps.create-version.outputs.BUILD_VERSION }} --version + - id: login-gcp + name: Authenticate with Google Cloud + uses: google-github-actions/auth@v1 + with: + token_format: access_token + workload_identity_provider: ${{secrets.GCR_WORKLOAD_IDENTITY_PROVIDER}} + service_account: ${{secrets.GCR_SERVICE_ACCOUNT}} + access_token_lifetime: 1800s + - name: Login to us Artifact Registry + uses: docker/login-action@v2 + with: + registry: us-docker.pkg.dev + username: oauth2accesstoken + password: ${{ steps.login-gcp.outputs.access_token }} + - name: Login to eu Artifact Registry + uses: docker/login-action@v2 + with: + registry: europe-docker.pkg.dev + username: oauth2accesstoken + password: ${{ steps.login-gcp.outputs.access_token }} + - name: Login to asia Artifact Registry + uses: docker/login-action@v2 + with: + registry: asia-docker.pkg.dev + username: oauth2accesstoken + password: ${{ steps.login-gcp.outputs.access_token }} + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v1-node16 + with: + aws-access-key-id: ${{secrets.ECR_AWS_ACCESS_KEY_ID}} + aws-secret-access-key: ${{secrets.ECR_AWS_SECRET_ACCESS_KEY}} + aws-region: us-west-2 + - name: Login to Amazon ECR + id: login-ecr + uses: aws-actions/amazon-ecr-login@v1 + - name: Push Docker Image + run: | + docker push 609927696493.dkr.ecr.us-west-2.amazonaws.com/opentelemetry-collector-contrib:${{ steps.create-version.outputs.BUILD_VERSION }} + - name: Push image to GCP + env: + DOCKER_IMAGE: otelcontribcol + BUILD_VERSION: ${{steps.create-version.outputs.BUILD_VERSION}} + GCR_ASIA_IMAGE: ${{secrets.GCR_ASIA_IMAGE}} + GCR_EUROPE_IMAGE: ${{secrets.GCR_EUROPE_IMAGE}} + GCR_US_IMAGE: ${{secrets.GCR_US_IMAGE}} + run: | + docker tag $DOCKER_IMAGE:latest ${GCR_ASIA_IMAGE}:${BUILD_VERSION} + docker tag $DOCKER_IMAGE:latest ${GCR_EUROPE_IMAGE}:${BUILD_VERSION} + docker tag $DOCKER_IMAGE:latest ${GCR_US_IMAGE}:${BUILD_VERSION} + docker push -a ${GCR_ASIA_IMAGE} + docker push -a ${GCR_EUROPE_IMAGE} + docker push -a ${GCR_US_IMAGE} diff --git a/cmd/otelcontribcol/processors_test.go b/cmd/otelcontribcol/processors_test.go index 868ddc2a74f4..8e9329fc84c0 100644 --- a/cmd/otelcontribcol/processors_test.go +++ b/cmd/otelcontribcol/processors_test.go @@ -74,6 +74,10 @@ func TestDefaultProcessors(t *testing.T) { processor: "k8sattributes", skipLifecycle: true, // Requires a k8s API to communicate with }, + { + processor: "logstransform", + skipLifecycle: true, + }, { processor: "memory_limiter", getConfigFn: func() component.Config { diff --git a/exporter/clickhouseexporter/exporter_logs.go b/exporter/clickhouseexporter/exporter_logs.go index a970bdb26c96..f60d21ee57f2 100644 --- a/exporter/clickhouseexporter/exporter_logs.go +++ b/exporter/clickhouseexporter/exporter_logs.go @@ -7,9 +7,11 @@ import ( "context" "database/sql" "fmt" + "net/url" + "strings" "time" - _ "github.com/ClickHouse/clickhouse-go/v2" // For register database driver. + "github.com/ClickHouse/clickhouse-go/v2" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" @@ -28,7 +30,7 @@ type logsExporter struct { } func newLogsExporter(logger *zap.Logger, cfg *Config) (*logsExporter, error) { - client, err := newClickhouseClient(cfg) + client, err := newClickHouseConn(cfg) if err != nil { return nil, err } @@ -57,35 +59,72 @@ func (e *logsExporter) shutdown(_ context.Context) error { return nil } -func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error { +func (e *logsExporter) pushLogsData(_ context.Context, ld plog.Logs) error { start := time.Now() - err := doWithTx(ctx, e.client, func(tx *sql.Tx) error { - statement, err := tx.PrepareContext(ctx, e.insertSQL) + err := func() error { + scope, err := e.client.Begin() if err != nil { - return fmt.Errorf("PrepareContext:%w", err) + return fmt.Errorf("Begin:%w", err) } - defer func() { - _ = statement.Close() - }() + + batch, err := scope.Prepare(e.insertSQL) + if err != nil { + return fmt.Errorf("Prepare:%w", err) + } + var serviceName string - for i := 0; i < ld.ResourceLogs().Len(); i++ { - logs := ld.ResourceLogs().At(i) + var podName string + var containerName string + var region string + var cloudProvider string + var cell string + + resAttr := make(map[string]string) + + resourceLogs := ld.ResourceLogs() + for i := 0; i < resourceLogs.Len(); i++ { + logs := resourceLogs.At(i) res := logs.Resource() resURL := logs.SchemaUrl() - resAttr := attributesToMap(res.Attributes()) - if v, ok := res.Attributes().Get(conventions.AttributeServiceName); ok { - serviceName = v.Str() - } + + attrs := res.Attributes() + attributesToMap(attrs, resAttr) + + attrs.Range(func(key string, value pcommon.Value) bool { + switch key { + case conventions.AttributeServiceName: + serviceName = value.Str() + case conventions.AttributeK8SPodName: + podName = value.AsString() + case conventions.AttributeK8SContainerName: + containerName = value.AsString() + // TODO use AttributeCloudRegion 'cloud.region' + // https://github.com/ClickHouse/data-plane-application/issues/4155 + case "region": + fallthrough + case conventions.AttributeCloudRegion: + region = value.AsString() + case conventions.AttributeCloudProvider: + cloudProvider = value.AsString() + case "cell": + cell = value.AsString() + } + return true + }) for j := 0; j < logs.ScopeLogs().Len(); j++ { rs := logs.ScopeLogs().At(j).LogRecords() scopeURL := logs.ScopeLogs().At(j).SchemaUrl() scopeName := logs.ScopeLogs().At(j).Scope().Name() scopeVersion := logs.ScopeLogs().At(j).Scope().Version() - scopeAttr := attributesToMap(logs.ScopeLogs().At(j).Scope().Attributes()) + scopeAttr := make(map[string]string, attrs.Len()) + attributesToMap(logs.ScopeLogs().At(j).Scope().Attributes(), scopeAttr) for k := 0; k < rs.Len(); k++ { r := rs.At(k) - logAttr := attributesToMap(r.Attributes()) - _, err = statement.ExecContext(ctx, + + logAttr := make(map[string]string, attrs.Len()) + attributesToMap(r.Attributes(), logAttr) + + _, err = batch.Exec( r.Timestamp().AsTime(), traceutil.TraceIDToHexOrEmptyString(r.TraceID()), traceutil.SpanIDToHexOrEmptyString(r.SpanID()), @@ -95,6 +134,11 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error { serviceName, r.Body().AsString(), resURL, + podName, + containerName, + region, + cloudProvider, + cell, resAttr, scopeURL, scopeName, @@ -103,26 +147,31 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error { logAttr, ) if err != nil { - return fmt.Errorf("ExecContext:%w", err) + return fmt.Errorf("Append:%w", err) } } } + + // clear map for reuse + for k := range resAttr { + delete(resAttr, k) + } } - return nil - }) + + return scope.Commit() + }() + duration := time.Since(start) e.logger.Debug("insert logs", zap.Int("records", ld.LogRecordCount()), zap.String("cost", duration.String())) return err } -func attributesToMap(attributes pcommon.Map) map[string]string { - m := make(map[string]string, attributes.Len()) +func attributesToMap(attributes pcommon.Map, dest map[string]string) { attributes.Range(func(k string, v pcommon.Value) bool { - m[k] = v.AsString() + dest[k] = v.AsString() return true }) - return m } const ( @@ -136,7 +185,12 @@ CREATE TABLE IF NOT EXISTS %s ( SeverityText LowCardinality(String) CODEC(ZSTD(1)), SeverityNumber Int32 CODEC(ZSTD(1)), ServiceName LowCardinality(String) CODEC(ZSTD(1)), - Body String CODEC(ZSTD(1)), + Body LowCardinality(String) CODEC(ZSTD(1)), + PodName LowCardinality(String), + ContainerName LowCardinality(String), + Region LowCardinality(String), + CloudProvider LowCardinality(String), + Cell LowCardinality(String), ResourceSchemaUrl String CODEC(ZSTD(1)), ResourceAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)), ScopeSchemaUrl String CODEC(ZSTD(1)), @@ -154,10 +208,11 @@ CREATE TABLE IF NOT EXISTS %s ( INDEX idx_body Body TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 1 ) ENGINE MergeTree() %s -PARTITION BY toDate(Timestamp) -ORDER BY (ServiceName, SeverityText, toUnixTimestamp(Timestamp), TraceId) +PARTITION BY toYYYYMM(Timestamp) +ORDER BY (PodName, ContainerName, SeverityText, Timestamp) SETTINGS index_granularity=8192, ttl_only_drop_parts = 1; ` + // language=ClickHouse SQL insertLogsSQLTemplate = `INSERT INTO %s ( Timestamp, @@ -169,35 +224,47 @@ SETTINGS index_granularity=8192, ttl_only_drop_parts = 1; ServiceName, Body, ResourceSchemaUrl, + PodName, + ContainerName, + Region, + CloudProvider, + Cell, + ResourceAttributes, + ScopeSchemaUrl, + ScopeName, + ScopeVersion, + ScopeAttributes, + LogAttributes + )` + inlineinsertLogsSQLTemplate = `INSERT INTO %s SETTINGS async_insert=1, wait_for_async_insert=0 ( + Timestamp, + TraceId, + SpanId, + TraceFlags, + SeverityText, + SeverityNumber, + ServiceName, + Body, + ResourceSchemaUrl, + PodName, + ContainerName, + Region, + CloudProvider, + Cell, ResourceAttributes, ScopeSchemaUrl, ScopeName, ScopeVersion, ScopeAttributes, LogAttributes - ) VALUES ( - ?, - ?, - ?, - ?, - ?, - ?, - ?, - ?, - ?, - ?, - ?, - ?, - ?, - ?, - ? - )` + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` ) var driverName = "clickhouse" // for testing -// newClickhouseClient create a clickhouse client. -func newClickhouseClient(cfg *Config) (*sql.DB, error) { +// newClickHouseClient create a clickhouse client. +// used by metrics and traces: +func newClickHouseClient(cfg *Config) (*sql.DB, error) { db, err := cfg.buildDB(cfg.Database) if err != nil { return nil, err @@ -205,6 +272,41 @@ func newClickhouseClient(cfg *Config) (*sql.DB, error) { return db, nil } +// used by logs: +func newClickHouseConn(cfg *Config) (*sql.DB, error) { + endpoint := cfg.Endpoint + + if len(cfg.ConnectionParams) > 0 { + values := make(url.Values, len(cfg.ConnectionParams)) + for k, v := range cfg.ConnectionParams { + values.Add(k, v) + } + + if !strings.Contains(endpoint, "?") { + endpoint += "?" + } else if !strings.HasSuffix(endpoint, "&") { + endpoint += "&" + } + + endpoint += values.Encode() + } + + opts, err := clickhouse.ParseDSN(endpoint) + if err != nil { + return nil, fmt.Errorf("unable to parse endpoint: %w", err) + } + + opts.Auth = clickhouse.Auth{ + Database: cfg.Database, + Username: cfg.Username, + Password: string(cfg.Password), + } + + // can return a "bad" connection if misconfigured, we won't know + // until a Ping, Exec, etc.. is done + return clickhouse.OpenDB(opts), nil +} + func createDatabase(ctx context.Context, cfg *Config) error { // use default database to create new database if cfg.Database == defaultDatabase { @@ -242,19 +344,8 @@ func renderCreateLogsTableSQL(cfg *Config) string { } func renderInsertLogsSQL(cfg *Config) string { - return fmt.Sprintf(insertLogsSQLTemplate, cfg.LogsTableName) -} - -func doWithTx(_ context.Context, db *sql.DB, fn func(tx *sql.Tx) error) error { - tx, err := db.Begin() - if err != nil { - return fmt.Errorf("db.Begin: %w", err) + if strings.HasPrefix(cfg.Endpoint, "tcp") && cfg.ConnectionParams["async_insert"] == "1" { + return fmt.Sprintf(inlineinsertLogsSQLTemplate, cfg.LogsTableName) } - defer func() { - _ = tx.Rollback() - }() - if err := fn(tx); err != nil { - return err - } - return tx.Commit() + return fmt.Sprintf(insertLogsSQLTemplate, cfg.LogsTableName) } diff --git a/exporter/clickhouseexporter/exporter_logs_test.go b/exporter/clickhouseexporter/exporter_logs_test.go index c2d2bc44f2d7..92297a101dcf 100644 --- a/exporter/clickhouseexporter/exporter_logs_test.go +++ b/exporter/clickhouseexporter/exporter_logs_test.go @@ -51,7 +51,7 @@ func TestLogsExporter_New(t *testing.T) { }{ "no dsn": { config: withDefaultConfig(), - want: failWithMsg("exec create logs table sql: parse dsn address failed"), + want: failWithMsg("parse dsn address failed"), }, } @@ -78,6 +78,7 @@ func TestExporter_pushLogsData(t *testing.T) { t.Run("push success", func(t *testing.T) { var items int initClickhouseTestServer(t, func(query string, values []driver.Value) error { + t.Logf(query) t.Logf("%d, values:%+v", items, values) if strings.HasPrefix(query, "INSERT") { items++ @@ -97,7 +98,7 @@ func TestExporter_pushLogsData(t *testing.T) { require.Equal(t, "https://opentelemetry.io/schemas/1.4.0", values[8]) require.Equal(t, map[string]string{ "service.name": "test-service", - }, values[9]) + }, values[14]) } return nil }) @@ -107,12 +108,12 @@ func TestExporter_pushLogsData(t *testing.T) { t.Run("test check scope metadata", func(t *testing.T) { initClickhouseTestServer(t, func(query string, values []driver.Value) error { if strings.HasPrefix(query, "INSERT") { - require.Equal(t, "https://opentelemetry.io/schemas/1.7.0", values[10]) - require.Equal(t, "io.opentelemetry.contrib.clickhouse", values[11]) - require.Equal(t, "1.0.0", values[12]) + require.Equal(t, "https://opentelemetry.io/schemas/1.7.0", values[15]) + require.Equal(t, "io.opentelemetry.contrib.clickhouse", values[16]) + require.Equal(t, "1.0.0", values[17]) require.Equal(t, map[string]string{ "lib": "clickhouse", - }, values[13]) + }, values[18]) } return nil }) @@ -122,7 +123,12 @@ func TestExporter_pushLogsData(t *testing.T) { } func newTestLogsExporter(t *testing.T, dsn string, fns ...func(*Config)) *logsExporter { - exporter, err := newLogsExporter(zaptest.NewLogger(t), withTestExporterConfig(fns...)(dsn)) + cfg := withTestExporterConfig(fns...)(dsn) + exporter, err := newLogsExporter(zaptest.NewLogger(t), cfg) + require.NoError(t, err) + + // need to use the dummy driver driver for testing + exporter.client, err = newClickHouseClient(cfg) require.NoError(t, err) require.NoError(t, exporter.start(context.TODO(), nil)) @@ -216,7 +222,18 @@ func (*testClickhouseDriverStmt) Close() error { } func (t *testClickhouseDriverStmt) NumInput() int { - return strings.Count(t.query, "?") + if !strings.HasPrefix(t.query, `INSERT`) { + return 0 + } + + n := strings.Count(t.query, "?") + if n > 0 { + return n + } + + // no ? in batched queries but column are separated with "," + // except for the last one + return strings.Count(t.query, ",") + 1 } func (t *testClickhouseDriverStmt) Exec(args []driver.Value) (driver.Result, error) { diff --git a/exporter/clickhouseexporter/exporter_metrics.go b/exporter/clickhouseexporter/exporter_metrics.go index 916b9381e201..ccc978032bd0 100644 --- a/exporter/clickhouseexporter/exporter_metrics.go +++ b/exporter/clickhouseexporter/exporter_metrics.go @@ -24,7 +24,7 @@ type metricsExporter struct { } func newMetricsExporter(logger *zap.Logger, cfg *Config) (*metricsExporter, error) { - client, err := newClickhouseClient(cfg) + client, err := newClickHouseClient(cfg) if err != nil { return nil, err } @@ -57,7 +57,11 @@ func (e *metricsExporter) pushMetricsData(ctx context.Context, md pmetric.Metric metricsMap := internal.NewMetricsModel(e.cfg.MetricsTableName) for i := 0; i < md.ResourceMetrics().Len(); i++ { metrics := md.ResourceMetrics().At(i) - resAttr := attributesToMap(metrics.Resource().Attributes()) + res := metrics.Resource() + + resAttr := make(map[string]string, res.Attributes().Len()) + attributesToMap(metrics.Resource().Attributes(), resAttr) + for j := 0; j < metrics.ScopeMetrics().Len(); j++ { rs := metrics.ScopeMetrics().At(j).Metrics() scopeInstr := metrics.ScopeMetrics().At(j).Scope() diff --git a/exporter/clickhouseexporter/exporter_traces.go b/exporter/clickhouseexporter/exporter_traces.go index 6f03ee295833..0cb4cecb3c6e 100644 --- a/exporter/clickhouseexporter/exporter_traces.go +++ b/exporter/clickhouseexporter/exporter_traces.go @@ -28,7 +28,7 @@ type tracesExporter struct { } func newTracesExporter(logger *zap.Logger, cfg *Config) (*tracesExporter, error) { - client, err := newClickhouseClient(cfg) + client, err := newClickHouseClient(cfg) if err != nil { return nil, err } @@ -59,8 +59,8 @@ func (e *tracesExporter) shutdown(_ context.Context) error { func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) error { start := time.Now() - err := doWithTx(ctx, e.client, func(tx *sql.Tx) error { - statement, err := tx.PrepareContext(ctx, e.insertSQL) + err := func() error { + statement, err := e.client.PrepareContext(ctx, e.insertSQL) if err != nil { return fmt.Errorf("PrepareContext:%w", err) } @@ -70,7 +70,10 @@ func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) er for i := 0; i < td.ResourceSpans().Len(); i++ { spans := td.ResourceSpans().At(i) res := spans.Resource() - resAttr := attributesToMap(res.Attributes()) + attr := res.Attributes() + resAttr := make(map[string]string, attr.Len()) + attributesToMap(attr, resAttr) + var serviceName string if v, ok := res.Attributes().Get(conventions.AttributeServiceName); ok { serviceName = v.Str() @@ -81,7 +84,9 @@ func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) er scopeVersion := spans.ScopeSpans().At(j).Scope().Version() for k := 0; k < rs.Len(); k++ { r := rs.At(k) - spanAttr := attributesToMap(r.Attributes()) + + spanAttr := make(map[string]string, res.Attributes().Len()) + attributesToMap(r.Attributes(), spanAttr) status := r.Status() eventTimes, eventNames, eventAttrs := convertEvents(r.Events()) linksTraceIDs, linksSpanIDs, linksTraceStates, linksAttrs := convertLinks(r.Links()) @@ -116,7 +121,7 @@ func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) er } } return nil - }) + }() duration := time.Since(start) e.logger.Debug("insert traces", zap.Int("records", td.SpanCount()), zap.String("cost", duration.String())) @@ -124,33 +129,38 @@ func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) er } func convertEvents(events ptrace.SpanEventSlice) ([]time.Time, []string, []map[string]string) { - var ( - times []time.Time - names []string - attrs []map[string]string - ) + times := make([]time.Time, events.Len()) + names := make([]string, events.Len()) + attrs := make([]map[string]string, events.Len()) + for i := 0; i < events.Len(); i++ { event := events.At(i) times = append(times, event.Timestamp().AsTime()) names = append(names, event.Name()) - attrs = append(attrs, attributesToMap(event.Attributes())) + + eventAttrs := event.Attributes() + dest := make(map[string]string, eventAttrs.Len()) + attributesToMap(eventAttrs, dest) + attrs = append(attrs, dest) } return times, names, attrs } func convertLinks(links ptrace.SpanLinkSlice) ([]string, []string, []string, []map[string]string) { - var ( - traceIDs []string - spanIDs []string - states []string - attrs []map[string]string - ) + traceIDs := make([]string, links.Len()) + spanIDs := make([]string, links.Len()) + states := make([]string, links.Len()) + attrs := make([]map[string]string, links.Len()) + for i := 0; i < links.Len(); i++ { link := links.At(i) traceIDs = append(traceIDs, traceutil.TraceIDToHexOrEmptyString(link.TraceID())) spanIDs = append(spanIDs, traceutil.SpanIDToHexOrEmptyString(link.SpanID())) states = append(states, link.TraceState().AsRaw()) - attrs = append(attrs, attributesToMap(link.Attributes())) + + linkAttrs := link.Attributes() + dest := make(map[string]string, linkAttrs.Len()) + attrs = append(attrs, dest) } return traceIDs, spanIDs, states, attrs } diff --git a/exporter/logicmonitorexporter/go.mod b/exporter/logicmonitorexporter/go.mod index 23bea7c7eca2..1fcc8a567a6d 100644 --- a/exporter/logicmonitorexporter/go.mod +++ b/exporter/logicmonitorexporter/go.mod @@ -57,9 +57,9 @@ require ( go.opentelemetry.io/otel/metric v1.16.0 // indirect go.opentelemetry.io/otel/trace v1.16.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/net v0.14.0 // indirect - golang.org/x/sys v0.11.0 // indirect - golang.org/x/text v0.12.0 // indirect + golang.org/x/net v0.17.0 // indirect + golang.org/x/sys v0.13.0 // indirect + golang.org/x/text v0.13.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect google.golang.org/grpc v1.57.0 // indirect google.golang.org/protobuf v1.31.0 // indirect diff --git a/exporter/logicmonitorexporter/go.sum b/exporter/logicmonitorexporter/go.sum index 3e9e8166b4d4..dabd7312e174 100644 --- a/exporter/logicmonitorexporter/go.sum +++ b/exporter/logicmonitorexporter/go.sum @@ -375,8 +375,8 @@ golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8= -golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14= -golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -420,8 +420,8 @@ golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= -golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20181227161524-e6919f6577db/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= @@ -429,8 +429,8 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= -golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=