diff --git a/CHANGELOG.md b/CHANGELOG.md index 0364a3c0ed..c2ae254e52 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#7009](https://github.com/thanos-io/thanos/pull/7009) Rule: Fix spacing error in URL. ### Added + +- [#6756](https://github.com/thanos-io/thanos/pull/6756) Query: Add `query.enable-tenancy` & `query.tenant-label-name` options to allow enforcement of tenancy on the query path, by injecting labels into queries (uses prom-label-proxy internally). - [#6944](https://github.com/thanos-io/thanos/pull/6944) Receive: Added a new flag for maximum retention bytes. - [#6891](https://github.com/thanos-io/thanos/pull/6891) Objstore: Bump `objstore` which adds support for Azure Workload Identity. - [#6453](https://github.com/thanos-io/thanos/pull/6453) Sidecar: Added `--reloader.method` to support configuration reloads via SIHUP signal. @@ -25,12 +27,13 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6943](https://github.com/thanos-io/thanos/pull/6943) Ruler: Added `keep_firing_for` field in alerting rule. - [#6972](https://github.com/thanos-io/thanos/pull/6972) Store Gateway: Apply series limit when streaming series for series actually matched if lazy postings is enabled. - [#6984](https://github.com/thanos-io/thanos/pull/6984) Store Gateway: Added `--store.index-header-lazy-download-strategy` to specify how to lazily download index headers when lazy mmap is enabled. - - [#6887](https://github.com/thanos-io/thanos/pull/6887) Query Frontend: *breaking :warning:* Add tenant label to relevant exported metrics. Note that this change may cause some pre-existing custom dashboard queries to be incorrect due to the added label. - [#7028](https://github.com/thanos-io/thanos/pull/7028) Query|Query Frontend: Add new `--query-frontend.enable-x-functions` flag to enable experimental extended functions. ### Changed +- [#6539](https://github.com/thanos-io/thanos/pull/6539) Store: *breaking :warning:* Changed `--sync-block-duration` default 3m to 15m. + ### Removed ## [v0.33.0](https://github.com/thanos-io/thanos/tree/release-0.33) - 18.12.2023 diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index ce6adde354..4d831ab6d1 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -220,6 +220,8 @@ func registerQuery(app *extkingpin.App) { tenantHeader := cmd.Flag("query.tenant-header", "HTTP header to determine tenant.").Default(tenancy.DefaultTenantHeader).String() defaultTenant := cmd.Flag("query.default-tenant-id", "Default tenant ID to use if tenant header is not present").Default(tenancy.DefaultTenant).String() tenantCertField := cmd.Flag("query.tenant-certificate-field", "Use TLS client's certificate field to determine tenant for write requests. Must be one of "+tenancy.CertificateFieldOrganization+", "+tenancy.CertificateFieldOrganizationalUnit+" or "+tenancy.CertificateFieldCommonName+". This setting will cause the query.tenant-header flag value to be ignored.").Default("").Enum("", tenancy.CertificateFieldOrganization, tenancy.CertificateFieldOrganizationalUnit, tenancy.CertificateFieldCommonName) + enforceTenancy := cmd.Flag("query.enforce-tenancy", "Enforce tenancy on Query APIs. Responses are returned only if the label value of the configured tenant-label-name and the value of the tenant header matches.").Default("false").Bool() + tenantLabel := cmd.Flag("query.tenant-label-name", "Label name to use when enforcing tenancy (if --query.enforce-tenancy is enabled).").Default(tenancy.DefaultTenantLabel).String() var storeRateLimits store.SeriesSelectLimits storeRateLimits.RegisterFlags(cmd) @@ -347,6 +349,8 @@ func registerQuery(app *extkingpin.App) { *tenantHeader, *defaultTenant, *tenantCertField, + *enforceTenancy, + *tenantLabel, ) }) } @@ -427,6 +431,8 @@ func runQuery( tenantHeader string, defaultTenant string, tenantCertField string, + enforceTenancy bool, + tenantLabel string, ) error { if alertQueryURL == "" { lastColon := strings.LastIndex(httpBindAddr, ":") @@ -724,6 +730,8 @@ func runQuery( tenantHeader, defaultTenant, tenantCertField, + enforceTenancy, + tenantLabel, ) api.Register(router.WithPrefix("/api/v1"), tracer, logger, ins, logMiddleware) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 7d80687ec3..9ad3960ff5 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -135,7 +135,7 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { sc.objStoreConfig = *extkingpin.RegisterCommonObjStoreFlags(cmd, "", true) cmd.Flag("sync-block-duration", "Repeat interval for syncing the blocks between local and remote view."). - Default("3m").DurationVar(&sc.syncInterval) + Default("15m").DurationVar(&sc.syncInterval) cmd.Flag("block-sync-concurrency", "Number of goroutines to use when constructing index-cache.json blocks from object storage. Must be equal or greater than 1."). Default("20").IntVar(&sc.blockSyncConcurrency) diff --git a/docs/components/query.md b/docs/components/query.md index 677f0b8a0a..4584363ba3 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -182,7 +182,7 @@ Available options: ### Partial Response Strategy -// TODO(bwplotka): Update. This will change to "strategy" soon as [PartialResponseStrategy enum here](../../pkg/store/storepb/rpc.proto) + | HTTP URL/FORM parameter | Type | Default | Example | |-------------------------|-----------|-----------------------------------------------|----------------------------------------| @@ -260,6 +260,20 @@ Example file SD file in YAML: `--query.active-query-path` is an option which allows the user to specify a directory which will contain a `queries.active` file to track active queries. To enable this feature, the user has to specify a directory other than "", since that is skipped being the default. +## Tenancy + +### Tenant Metrics + +Tenant information is captured in relevant Thanos exported metrics in the Querier, Query Frontend and Store. In order make use of this functionality requests to the Query/Query Frontend component should include the tenant-id in the appropriate HTTP request header as configured with `--query.tenant-header`. The tenant information is passed through components (including Query Frontend), down to the Thanos Store, enabling per-tenant metrics in these components also. If no tenant header is set to requests to the query component, the default tenant as defined by `--query.tenant-default-id` will be used. + +### Tenant Enforcement + +Enforcement of tenancy can be enabled using `--query.enforce-tenancy`. If enabled, queries will only fetch series containing a specific matcher, while evaluating PromQL expressions. The matcher label name is `--query.tenant-label-name` and the matcher value matches the tenant, as sent to the querier in the HTTP header configured with `--query-tenant-header`. This functionality requires that metrics are injected with a tenant label when ingested into Thanos. This can be done for example by enabling tenancy in the Thanos Receive component. + +In case of nested Thanos Query components, it's important to note that tenancy enforcement will only occur in the querier which the initial request is sent to, the layered queriers will not perform any enforcement. + +Further, note that there are no authentication mechanisms in Thanos, so anyone can set an arbitrary tenant in the HTTP header. It is recommended to use a proxy in front of the querier in case an authentication mechanism is needed. The Query UI also includes an option to set an arbitrary tenant, and should therefore not be exposed to end-users if users should not be able to see each others data. + ## Flags ```$ mdox-exec="thanos query --help" @@ -367,6 +381,10 @@ Flags: Whether to enable extended rate functions (xrate, xincrease and xdelta). Only has effect when used with Thanos engine. + --query.enforce-tenancy Enforce tenancy on Query APIs. Responses + are returned only if the label value of the + configured tenant-label-name and the value of + the tenant header matches. --query.lookback-delta=QUERY.LOOKBACK-DELTA The maximum lookback duration for retrieving metrics during expression evaluations. @@ -419,6 +437,9 @@ Flags: flag value to be ignored. --query.tenant-header="THANOS-TENANT" HTTP header to determine tenant. + --query.tenant-label-name="tenant_id" + Label name to use when enforcing tenancy (if + --query.enforce-tenancy is enabled). --query.timeout=2m Maximum time to process query by query node. --request.logging-config= Alternative to 'request.logging-config-file' diff --git a/docs/components/store.md b/docs/components/store.md index 85fd4ce688..8ecc53d68f 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -210,7 +210,7 @@ Flags: The maximum series allowed for a single Series request. The Series call fails if this limit is exceeded. 0 means no limit. - --sync-block-duration=3m Repeat interval for syncing the blocks between + --sync-block-duration=15m Repeat interval for syncing the blocks between local and remote view. --tracing.config= Alternative to 'tracing.config-file' flag diff --git a/docs/operating/multi-tenancy.md b/docs/operating/multi-tenancy.md index a52741505d..22c26f8396 100644 --- a/docs/operating/multi-tenancy.md +++ b/docs/operating/multi-tenancy.md @@ -2,4 +2,4 @@ Thanos supports multi-tenancy by using external labels. For such use cases, the [Thanos Sidecar](../components/sidecar.md) based approach with layered [Thanos Queriers](../components/query.md) is recommended. -You can also use the [Thanos Receiver](../components/receive.md) however, we don't recommend it to achieve a global view of data of a single-tenant. Also note that, multi-tenancy may also be achievable if ingestion is not user-controlled, as then enforcing of labels, for example using the [prom-label-proxy](https://github.com/openshift/prom-label-proxy) (please thoroughly understand the mechanism if intending to employ this mechanism, as the wrong configuration could leak data). +You can also use the [Thanos Receiver](../components/receive.md) however, we don't recommend it to achieve a global view of data of a single-tenant. Also note that, multi-tenancy may also be achievable if ingestion is not user-controlled, as then enforcing of labels, for example using the [prom-label-proxy](https://github.com/prometheus-community/prom-label-proxy) (please thoroughly understand the mechanism if intending to employ this mechanism, as the wrong configuration could leak data). diff --git a/go.mod b/go.mod index 11d60030e5..73ae55a2b7 100644 --- a/go.mod +++ b/go.mod @@ -119,6 +119,7 @@ require ( require ( github.com/mitchellh/go-ps v1.0.0 github.com/onsi/gomega v1.27.10 + github.com/prometheus-community/prom-label-proxy v0.7.0 go.opentelemetry.io/contrib/propagators/autoprop v0.38.0 go4.org/intern v0.0.0-20230525184215-6c62f75575cb golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb @@ -126,11 +127,13 @@ require ( require ( github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 // indirect + github.com/go-openapi/runtime v0.26.0 // indirect github.com/golang-jwt/jwt/v5 v5.0.0 // indirect github.com/google/s2a-go v0.1.7 // indirect github.com/hashicorp/go-version v1.6.0 // indirect github.com/huaweicloud/huaweicloud-sdk-go-obs v3.23.3+incompatible // indirect github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect + github.com/metalmatze/signal v0.0.0-20210307161603-1c9aa721a97a // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/onsi/ginkgo v1.16.5 // indirect github.com/zhangyunhao116/umap v0.0.0-20221211160557-cb7705fafa39 // indirect diff --git a/go.sum b/go.sum index 15177158b4..9f8d31831b 100644 --- a/go.sum +++ b/go.sum @@ -84,6 +84,7 @@ github.com/AzureAD/microsoft-authentication-library-for-go v1.1.1 h1:WpB/QDNLpMw github.com/AzureAD/microsoft-authentication-library-for-go v1.1.1/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/DATA-DOG/go-sqlmock v1.4.1/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= github.com/Code-Hex/go-generics-cache v1.3.1 h1:i8rLwyhoyhaerr7JpjtYjJZUcCbWOdiYO3fZXLiEC4g= github.com/Code-Hex/go-generics-cache v1.3.1/go.mod h1:qxcC9kRVrct9rHeiYpFWSoW1vxyillCVzX13KZG8dl4= github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v1.8.3 h1:i84ZOPT35YCJROyuf97VP/VEdYhQce/8NTLOWq5tqJw= @@ -339,6 +340,8 @@ github.com/go-openapi/jsonreference v0.20.2/go.mod h1:Bl1zwGIM8/wsvqjsOQLJ/SH+En github.com/go-openapi/loads v0.21.1/go.mod h1:/DtAMXXneXFjbQMGEtbamCZb+4x7eGwkvZCvBmwUG+g= github.com/go-openapi/loads v0.21.2 h1:r2a/xFIYeZ4Qd2TnGpWDIQNcP80dIaZgf704za8enro= github.com/go-openapi/loads v0.21.2/go.mod h1:Jq58Os6SSGz0rzh62ptiu8Z31I+OTHqmULx5e/gJbNw= +github.com/go-openapi/runtime v0.26.0 h1:HYOFtG00FM1UvqrcxbEJg/SwvDRvYLQKGhw2zaQjTcc= +github.com/go-openapi/runtime v0.26.0/go.mod h1:QgRGeZwrUcSHdeh4Ka9Glvo0ug1LC5WyE+EV88plZrQ= github.com/go-openapi/spec v0.20.4/go.mod h1:faYFR1CvsJZ0mNsmsphTMSoRrNV3TEDoAM7FOEWeq8I= github.com/go-openapi/spec v0.20.6/go.mod h1:2OpW+JddWPrpXSCIX8eOx7lZ5iyuWj3RYR6VaaBKcWA= github.com/go-openapi/spec v0.20.9 h1:xnlYNQAwKd2VQRRfwTEI0DcK+2cbuvI/0c7jx3gA8/8= @@ -638,6 +641,7 @@ github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= @@ -713,6 +717,8 @@ github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg= github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k= +github.com/metalmatze/signal v0.0.0-20210307161603-1c9aa721a97a h1:0usWxe5SGXKQovz3p+BiQ81Jy845xSMu2CWKuXsXuUM= +github.com/metalmatze/signal v0.0.0-20210307161603-1c9aa721a97a/go.mod h1:3OETvrxfELvGsU2RoGGWercfeZ4bCL3+SOwzIWtJH/Q= github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/miekg/dns v1.1.57 h1:Jzi7ApEIzwEPLHWRcafCN9LZSBbqQpxjt/wpgvg7wcM= @@ -840,10 +846,13 @@ github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4 github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= github.com/prometheus/alertmanager v0.26.0 h1:uOMJWfIwJguc3NaM3appWNbbrh6G/OjvaHMk22aBBYc= github.com/prometheus/alertmanager v0.26.0/go.mod h1:rVcnARltVjavgVaNnmevxK7kOn7IZavyf0KNgHkbEpU= +github.com/prometheus-community/prom-label-proxy v0.7.0 h1:1iNHXF7V8z2iOCinEyxKDUHu2jppPAAd6PmBCi3naok= +github.com/prometheus-community/prom-label-proxy v0.7.0/go.mod h1:wR9C/Mwp5aBbiqM6gQ+FZdFRwL8pCzzhsje8lTAx/aA= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeDPbaTKGT+JTgUa3og= +github.com/prometheus/client_golang v1.5.1/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= github.com/prometheus/client_golang v1.12.1/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY= @@ -861,6 +870,7 @@ github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA= +github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4= github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc= github.com/prometheus/common v0.29.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls= diff --git a/pkg/api/query/v1.go b/pkg/api/query/v1.go index 3939aeb0ba..d6767d9a94 100644 --- a/pkg/api/query/v1.go +++ b/pkg/api/query/v1.go @@ -174,6 +174,8 @@ type QueryAPI struct { tenantHeader string defaultTenant string tenantCertField string + enforceTenancy bool + tenantLabel string } // NewQueryAPI returns an initialized QueryAPI type. @@ -207,6 +209,8 @@ func NewQueryAPI( tenantHeader string, defaultTenant string, tenantCertField string, + enforceTenancy bool, + tenantLabel string, ) *QueryAPI { if statsAggregatorFactory == nil { statsAggregatorFactory = &store.NoopSeriesStatsAggregatorFactory{} @@ -240,6 +244,8 @@ func NewQueryAPI( tenantHeader: tenantHeader, defaultTenant: defaultTenant, tenantCertField: tenantCertField, + enforceTenancy: enforceTenancy, + tenantLabel: tenantLabel, queryRangeHist: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ Name: "thanos_query_range_requested_timespan_duration_seconds", @@ -644,12 +650,10 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro lookbackDelta = lookbackDeltaFromReq } - tenant, err := tenancy.GetTenantFromHTTP(r, qapi.tenantHeader, qapi.defaultTenant, qapi.tenantCertField) + queryStr, tenant, ctx, err := tenancy.RewritePromQL(ctx, r, qapi.tenantHeader, qapi.defaultTenant, qapi.tenantCertField, qapi.enforceTenancy, qapi.tenantLabel, r.FormValue("query")) if err != nil { - apiErr = &api.ApiError{Typ: api.ErrorBadData, Err: err} - return nil, nil, apiErr, func() {} + return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}, func() {} } - ctx = context.WithValue(ctx, tenancy.TenantKey, tenant) // We are starting promQL tracing span here, because we have no control over promQL code. span, ctx := tracing.StartSpan(ctx, "promql_instant_query") @@ -670,7 +674,7 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro query.NewAggregateStatsReporter(&seriesStats), ), promql.NewPrometheusQueryOpts(false, lookbackDelta), - r.FormValue("query"), + queryStr, ts, ) @@ -943,12 +947,10 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap lookbackDelta = lookbackDeltaFromReq } - tenant, err := tenancy.GetTenantFromHTTP(r, qapi.tenantHeader, qapi.defaultTenant, qapi.tenantCertField) + queryStr, tenant, ctx, err := tenancy.RewritePromQL(ctx, r, qapi.tenantHeader, qapi.defaultTenant, qapi.tenantCertField, qapi.enforceTenancy, qapi.tenantLabel, r.FormValue("query")) if err != nil { - apiErr = &api.ApiError{Typ: api.ErrorBadData, Err: err} - return nil, nil, apiErr, func() {} + return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}, func() {} } - ctx = context.WithValue(ctx, tenancy.TenantKey, tenant) // Record the query range requested. qapi.queryRangeHist.Observe(end.Sub(start).Seconds()) @@ -972,7 +974,7 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap query.NewAggregateStatsReporter(&seriesStats), ), promql.NewPrometheusQueryOpts(false, lookbackDelta), - r.FormValue("query"), + queryStr, start, end, step, @@ -1048,21 +1050,11 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A return nil, nil, apiErr, func() {} } - var matcherSets [][]*labels.Matcher - for _, s := range r.Form[MatcherParam] { - matchers, err := parser.ParseMetricSelector(s) - if err != nil { - return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}, func() {} - } - matcherSets = append(matcherSets, matchers) - } - - tenant, err := tenancy.GetTenantFromHTTP(r, qapi.tenantHeader, qapi.defaultTenant, qapi.tenantCertField) + matcherSets, ctx, err := tenancy.RewriteLabelMatchers(ctx, r, qapi.tenantHeader, qapi.defaultTenant, qapi.tenantCertField, qapi.enforceTenancy, qapi.tenantLabel, r.Form[MatcherParam]) if err != nil { apiErr = &api.ApiError{Typ: api.ErrorBadData, Err: err} return nil, nil, apiErr, func() {} } - ctx = context.WithValue(ctx, tenancy.TenantKey, tenant) q, err := qapi.queryableCreate( true, @@ -1131,13 +1123,10 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}, func() {} } - var matcherSets [][]*labels.Matcher - for _, s := range r.Form[MatcherParam] { - matchers, err := parser.ParseMetricSelector(s) - if err != nil { - return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}, func() {} - } - matcherSets = append(matcherSets, matchers) + matcherSets, ctx, err := tenancy.RewriteLabelMatchers(r.Context(), r, qapi.tenantHeader, qapi.defaultTenant, qapi.tenantCertField, qapi.enforceTenancy, qapi.tenantLabel, r.Form[MatcherParam]) + if err != nil { + apiErr := &api.ApiError{Typ: api.ErrorBadData, Err: err} + return nil, nil, apiErr, func() {} } enableDedup, apiErr := qapi.parseEnableDedupParam(r) @@ -1160,13 +1149,6 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr return nil, nil, apiErr, func() {} } - tenant, err := tenancy.GetTenantFromHTTP(r, qapi.tenantHeader, qapi.defaultTenant, "") - if err != nil { - apiErr = &api.ApiError{Typ: api.ErrorBadData, Err: err} - return nil, nil, apiErr, func() {} - } - ctx := context.WithValue(r.Context(), tenancy.TenantKey, tenant) - q, err := qapi.queryableCreate( enableDedup, replicaLabels, @@ -1218,21 +1200,11 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap return nil, nil, apiErr, func() {} } - var matcherSets [][]*labels.Matcher - for _, s := range r.Form[MatcherParam] { - matchers, err := parser.ParseMetricSelector(s) - if err != nil { - return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}, func() {} - } - matcherSets = append(matcherSets, matchers) - } - - tenant, err := tenancy.GetTenantFromHTTP(r, qapi.tenantHeader, qapi.defaultTenant, "") + matcherSets, ctx, err := tenancy.RewriteLabelMatchers(r.Context(), r, qapi.tenantHeader, qapi.defaultTenant, qapi.tenantCertField, qapi.enforceTenancy, qapi.tenantLabel, r.Form[MatcherParam]) if err != nil { - apiErr = &api.ApiError{Typ: api.ErrorBadData, Err: err} + apiErr := &api.ApiError{Typ: api.ErrorBadData, Err: err} return nil, nil, apiErr, func() {} } - ctx := context.WithValue(r.Context(), tenancy.TenantKey, tenant) q, err := qapi.queryableCreate( true, diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index e632c9788f..c0c0fea21f 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -106,7 +106,7 @@ type Handler struct { mtx sync.RWMutex hashring Hashring - peers *peerGroup + peers peersContainer expBackoff backoff.Backoff peerStates map[string]*retryState receiverMode ReceiverMode @@ -253,11 +253,49 @@ func (h *Handler) Hashring(hashring Hashring) { h.mtx.Lock() defer h.mtx.Unlock() + if h.hashring != nil { + previousNodes := h.hashring.Nodes() + newNodes := hashring.Nodes() + + disappearedNodes := getSortedStringSliceDiff(previousNodes, newNodes) + for _, node := range disappearedNodes { + if err := h.peers.close(node); err != nil { + level.Error(h.logger).Log("msg", "closing gRPC connection failed, we might have leaked a file descriptor", "addr", node, "err", err.Error()) + } + } + } + h.hashring = hashring h.expBackoff.Reset() h.peerStates = make(map[string]*retryState) } +// getSortedStringSliceDiff returns items which are in slice1 but not in slice2. +// The returned slice also only contains unique items i.e. it is a set. +func getSortedStringSliceDiff(slice1, slice2 []string) []string { + slice1Items := make(map[string]struct{}, len(slice1)) + slice2Items := make(map[string]struct{}, len(slice2)) + + for _, s1 := range slice1 { + slice1Items[s1] = struct{}{} + } + for _, s2 := range slice2 { + slice2Items[s2] = struct{}{} + } + + var difference = make([]string, 0) + for s1 := range slice1Items { + _, s2Contains := slice2Items[s1] + if s2Contains { + continue + } + difference = append(difference, s1) + } + sort.Strings(difference) + + return difference +} + // Verifies whether the server is ready or not. func (h *Handler) isReady() bool { h.mtx.RLock() @@ -1123,31 +1161,55 @@ func newReplicationErrors(threshold, numErrors int) []*replicationErrors { return errs } -func newPeerGroup(dialOpts ...grpc.DialOption) *peerGroup { +func newPeerGroup(dialOpts ...grpc.DialOption) peersContainer { return &peerGroup{ dialOpts: dialOpts, - cache: map[string]storepb.WriteableStoreClient{}, + cache: map[string]*grpc.ClientConn{}, m: sync.RWMutex{}, dialer: grpc.DialContext, } } +type peersContainer interface { + close(string) error + get(context.Context, string) (storepb.WriteableStoreClient, error) +} + type peerGroup struct { dialOpts []grpc.DialOption - cache map[string]storepb.WriteableStoreClient + cache map[string]*grpc.ClientConn m sync.RWMutex // dialer is used for testing. dialer func(ctx context.Context, target string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) } +func (p *peerGroup) close(addr string) error { + p.m.Lock() + defer p.m.Unlock() + + c, ok := p.cache[addr] + if !ok { + // NOTE(GiedriusS): this could be valid case when the connection + // was never established. + return nil + } + + delete(p.cache, addr) + if err := c.Close(); err != nil { + return fmt.Errorf("closing connection for %s", addr) + } + + return nil +} + func (p *peerGroup) get(ctx context.Context, addr string) (storepb.WriteableStoreClient, error) { // use a RLock first to prevent blocking if we don't need to. p.m.RLock() c, ok := p.cache[addr] p.m.RUnlock() if ok { - return c, nil + return storepb.NewWriteableStoreClient(c), nil } p.m.Lock() @@ -1155,14 +1217,13 @@ func (p *peerGroup) get(ctx context.Context, addr string) (storepb.WriteableStor // Make sure that another caller hasn't created the connection since obtaining the write lock. c, ok = p.cache[addr] if ok { - return c, nil + return storepb.NewWriteableStoreClient(c), nil } conn, err := p.dialer(ctx, addr, p.dialOpts...) if err != nil { return nil, errors.Wrap(err, "failed to dial peer") } - client := storepb.NewWriteableStoreClient(conn) - p.cache[addr] = client - return client, nil + p.cache[addr] = conn + return storepb.NewWriteableStoreClient(conn), nil } diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index b71e438edf..de511dc8b6 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -166,24 +166,38 @@ func (f *fakeAppender) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels panic("not implemented") } +type fakePeersGroup struct { + clients map[string]storepb.WriteableStoreClient + + closeCalled map[string]bool +} + +func (g *fakePeersGroup) close(addr string) error { + if g.closeCalled == nil { + g.closeCalled = map[string]bool{} + } + g.closeCalled[addr] = true + return nil +} + +func (g *fakePeersGroup) get(_ context.Context, addr string) (storepb.WriteableStoreClient, error) { + c, ok := g.clients[addr] + if !ok { + return nil, fmt.Errorf("client %s not found", addr) + } + return c, nil +} + +var _ = (peersContainer)(&fakePeersGroup{}) + func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64, hashringAlgo HashringAlgorithm) ([]*Handler, Hashring, error) { var ( cfg = []HashringConfig{{Hashring: "test"}} handlers []*Handler wOpts = &WriterOptions{} ) - // create a fake peer group where we manually fill the cache with fake addresses pointed to our handlers - // This removes the network from the tests and creates a more consistent testing harness. - peers := &peerGroup{ - dialOpts: nil, - m: sync.RWMutex{}, - cache: map[string]storepb.WriteableStoreClient{}, - dialer: func(context.Context, string, ...grpc.DialOption) (*grpc.ClientConn, error) { - // dialer should never be called since we are creating fake clients with fake addresses - // this protects against some leaking test that may attempt to dial random IP addresses - // which may pose a security risk. - return nil, errors.New("unexpected dial called in testing") - }, + fakePeers := &fakePeersGroup{ + clients: map[string]storepb.WriteableStoreClient{}, } ag := addrGen{} @@ -198,11 +212,11 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin Limiter: limiter, }) handlers = append(handlers, h) - h.peers = peers addr := ag.newAddr() + h.peers = fakePeers + fakePeers.clients[addr] = &fakeRemoteWriteGRPCServer{h: h} h.options.Endpoint = addr cfg[0].Endpoints = append(cfg[0].Endpoints, Endpoint{Address: h.options.Endpoint}) - peers.cache[addr] = &fakeRemoteWriteGRPCServer{h: h} } // Use hashmod as default. if hashringAlgo == "" { @@ -1573,3 +1587,35 @@ func TestGetStatsLimitParameter(t *testing.T) { testutil.Equals(t, limit, givenLimit) }) } + +func TestSortedSliceDiff(t *testing.T) { + testutil.Equals(t, []string{"a"}, getSortedStringSliceDiff([]string{"a", "a", "foo"}, []string{"b", "b", "foo"})) + testutil.Equals(t, []string{}, getSortedStringSliceDiff([]string{}, []string{"b", "b", "foo"})) + testutil.Equals(t, []string{}, getSortedStringSliceDiff([]string{}, []string{})) +} + +func TestHashringChangeCallsClose(t *testing.T) { + appendables := []*fakeAppendable{ + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + } + allHandlers, _, err := newTestHandlerHashring(appendables, 3, AlgorithmHashmod) + testutil.Ok(t, err) + + appendables = appendables[1:] + + _, smallHashring, err := newTestHandlerHashring(appendables, 2, AlgorithmHashmod) + testutil.Ok(t, err) + + allHandlers[0].Hashring(smallHashring) + + pg := allHandlers[0].peers.(*fakePeersGroup) + testutil.Assert(t, len(pg.closeCalled) > 0) +} diff --git a/pkg/receive/hashring.go b/pkg/receive/hashring.go index 18925cc4cc..0d7c2dc10c 100644 --- a/pkg/receive/hashring.go +++ b/pkg/receive/hashring.go @@ -55,6 +55,9 @@ type Hashring interface { Get(tenant string, timeSeries *prompb.TimeSeries) (string, error) // GetN returns the nth node that should handle the given tenant and time series. GetN(tenant string, timeSeries *prompb.TimeSeries, n uint64) (string, error) + // Nodes returns a sorted slice of nodes that are in this hashring. Addresses could be duplicated + // if, for example, the same address is used for multiple tenants in the multi-hashring. + Nodes() []string } // SingleNodeHashring always returns the same node. @@ -65,6 +68,10 @@ func (s SingleNodeHashring) Get(tenant string, ts *prompb.TimeSeries) (string, e return s.GetN(tenant, ts, 0) } +func (s SingleNodeHashring) Nodes() []string { + return []string{string(s)} +} + // GetN implements the Hashring interface. func (s SingleNodeHashring) GetN(_ string, _ *prompb.TimeSeries, n uint64) (string, error) { if n > 0 { @@ -84,9 +91,15 @@ func newSimpleHashring(endpoints []Endpoint) (Hashring, error) { } addresses[i] = endpoints[i].Address } + sort.Strings(addresses) + return simpleHashring(addresses), nil } +func (s simpleHashring) Nodes() []string { + return s +} + // Get returns a target to handle the given tenant and time series. func (s simpleHashring) Get(tenant string, ts *prompb.TimeSeries) (string, error) { return s.GetN(tenant, ts, 0) @@ -120,6 +133,7 @@ type ketamaHashring struct { endpoints []Endpoint sections sections numEndpoints uint64 + nodes []string } func newKetamaHashring(endpoints []Endpoint, sectionsPerNode int, replicationFactor uint64) (*ketamaHashring, error) { @@ -132,8 +146,11 @@ func newKetamaHashring(endpoints []Endpoint, sectionsPerNode int, replicationFac hash := xxhash.New() availabilityZones := make(map[string]struct{}) ringSections := make(sections, 0, numSections) + + nodes := []string{} for endpointIndex, endpoint := range endpoints { availabilityZones[endpoint.AZ] = struct{}{} + nodes = append(nodes, endpoint.Address) for i := 1; i <= sectionsPerNode; i++ { _, _ = hash.Write([]byte(endpoint.Address + ":" + strconv.Itoa(i))) n := §ion{ @@ -148,15 +165,21 @@ func newKetamaHashring(endpoints []Endpoint, sectionsPerNode int, replicationFac } } sort.Sort(ringSections) + sort.Strings(nodes) calculateSectionReplicas(ringSections, replicationFactor, availabilityZones) return &ketamaHashring{ endpoints: endpoints, sections: ringSections, numEndpoints: uint64(len(endpoints)), + nodes: nodes, }, nil } +func (k *ketamaHashring) Nodes() []string { + return k.nodes +} + func sizeOfLeastOccupiedAZ(azSpread map[string]int64) int64 { minValue := int64(math.MaxInt64) for _, value := range azSpread { @@ -232,6 +255,8 @@ type multiHashring struct { // to the cache map, as this is both written to // and read from. mu sync.RWMutex + + nodes []string } // Get returns a target to handle the given tenant and time series. @@ -269,6 +294,10 @@ func (m *multiHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st return "", errors.New("no matching hashring to handle tenant") } +func (m *multiHashring) Nodes() []string { + return m.nodes +} + // newMultiHashring creates a multi-tenant hashring for a given slice of // groups. // Which hashring to use for a tenant is determined @@ -289,6 +318,7 @@ func NewMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg if err != nil { return nil, err } + m.nodes = append(m.nodes, hashring.Nodes()...) m.hashrings = append(m.hashrings, hashring) var t map[string]struct{} if len(h.Tenants) != 0 { @@ -299,6 +329,7 @@ func NewMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg } m.tenantSets = append(m.tenantSets, t) } + sort.Strings(m.nodes) return m, nil } diff --git a/pkg/tenancy/tenancy.go b/pkg/tenancy/tenancy.go index f8b54bcc48..aec0bad86a 100644 --- a/pkg/tenancy/tenancy.go +++ b/pkg/tenancy/tenancy.go @@ -8,9 +8,11 @@ import ( "net/http" "path" - "google.golang.org/grpc/metadata" - "github.com/pkg/errors" + "github.com/prometheus-community/prom-label-proxy/injectproxy" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql/parser" + "google.golang.org/grpc/metadata" ) type contextKey int @@ -136,3 +138,97 @@ func GetTenantFromGRPCMetadata(ctx context.Context) (string, bool) { } return md.Get(DefaultTenantHeader)[0], true } + +func EnforceQueryTenancy(tenantLabel string, tenant string, query string) (string, error) { + labelMatcher := &labels.Matcher{ + Name: tenantLabel, + Type: labels.MatchEqual, + Value: tenant, + } + + e := injectproxy.NewEnforcer(false, labelMatcher) + + expr, err := parser.ParseExpr(query) + if err != nil { + return "", errors.Wrap(err, "error parsing query string, when enforcing tenenacy") + } + + if err := e.EnforceNode(expr); err != nil { + return "", errors.Wrap(err, "error enforcing label") + } + + return expr.String(), nil +} + +func getLabelMatchers(formMatchers []string, tenant string, enforceTenancy bool, tenantLabel string) ([][]*labels.Matcher, error) { + tenantLabelMatcher := &labels.Matcher{ + Name: tenantLabel, + Type: labels.MatchEqual, + Value: tenant, + } + + matcherSets := make([][]*labels.Matcher, 0, len(formMatchers)) + + // If tenancy is enforced, but there are no matchers at all, add the tenant matcher + if len(formMatchers) == 0 && enforceTenancy { + var matcher []*labels.Matcher + matcher = append(matcher, tenantLabelMatcher) + matcherSets = append(matcherSets, matcher) + return matcherSets, nil + } + + for _, s := range formMatchers { + matchers, err := parser.ParseMetricSelector(s) + if err != nil { + return nil, err + } + + if enforceTenancy { + e := injectproxy.NewEnforcer(false, tenantLabelMatcher) + matchers, err = e.EnforceMatchers(matchers) + if err != nil { + return nil, err + } + } + + matcherSets = append(matcherSets, matchers) + } + + return matcherSets, nil +} + +// This function will: +// - Get tenant from HTTP header and add it to context. +// - if tenancy is enforced, add a tenant matcher to the promQL expression. +func RewritePromQL(ctx context.Context, r *http.Request, tenantHeader string, defaultTenantID string, certTenantField string, enforceTenancy bool, tenantLabel string, queryStr string) (string, string, context.Context, error) { + tenant, err := GetTenantFromHTTP(r, tenantHeader, defaultTenantID, certTenantField) + if err != nil { + return "", "", ctx, err + } + ctx = context.WithValue(ctx, TenantKey, tenant) + + if enforceTenancy { + queryStr, err = EnforceQueryTenancy(tenantLabel, tenant, queryStr) + return queryStr, tenant, ctx, err + } + return queryStr, tenant, ctx, nil +} + +// This function will: +// - Get tenant from HTTP header and add it to context. +// - Parse all labels matchers provided. +// - If tenancy is enforced, make sure a tenant matcher is present. +func RewriteLabelMatchers(ctx context.Context, r *http.Request, tenantHeader string, defaultTenantID string, certTenantField string, enforceTenancy bool, tenantLabel string, formMatchers []string) ([][]*labels.Matcher, context.Context, error) { + tenant, err := GetTenantFromHTTP(r, tenantHeader, defaultTenantID, certTenantField) + if err != nil { + return nil, ctx, err + } + ctx = context.WithValue(ctx, TenantKey, tenant) + + matcherSets, err := getLabelMatchers(formMatchers, tenant, enforceTenancy, tenantLabel) + if err != nil { + return nil, ctx, err + } + + return matcherSets, ctx, nil +} diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index 7eaff47b40..07141a4ac7 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -265,6 +265,8 @@ type QuerierBuilder struct { telemetrySamplesQuantiles []float64 telemetrySeriesQuantiles []float64 + enforceTenancy bool + e2e.Linkable f e2e.FutureRunnable } @@ -391,6 +393,11 @@ func (q *QuerierBuilder) WithTelemetryQuantiles(duration []float64, samples []fl return q } +func (q *QuerierBuilder) WithTenancy(enforceTenancy bool) *QuerierBuilder { + q.enforceTenancy = enforceTenancy + return q +} + func (q *QuerierBuilder) Init() *e2eobs.Observable { args, err := q.collectArgs() if err != nil { @@ -491,6 +498,9 @@ func (q *QuerierBuilder) collectArgs() ([]string, error) { for _, bucket := range q.telemetrySeriesQuantiles { args = append(args, "--query.telemetry.request-series-seconds-quantiles="+strconv.FormatFloat(bucket, 'f', -1, 64)) } + if q.enforceTenancy { + args = append(args, "--query.enforce-tenancy") + } if q.enableXFunctions { args = append(args, "--query.enable-x-functions") } diff --git a/test/e2e/query_frontend_test.go b/test/e2e/query_frontend_test.go index e14832d8b9..b5341fd8de 100644 --- a/test/e2e/query_frontend_test.go +++ b/test/e2e/query_frontend_test.go @@ -1034,12 +1034,12 @@ func (u tenantRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { func TestTenantQFEHTTPMetrics(t *testing.T) { t.Parallel() - e, err := e2e.NewDockerEnvironment("tenant-metrics") + e, err := e2e.NewDockerEnvironment("qfetenantmetrics") testutil.Ok(t, err) t.Cleanup(e2ethanos.CleanScenario(t, e)) // scrape the local prometheus, and our querier metrics - prom1, sidecar1 := e2ethanos.NewPrometheusWithSidecar(e, "alone", e2ethanos.DefaultPromConfig("prom-alone", 0, "", "", e2ethanos.LocalPrometheusTarget, "tenant-metrics-querier-1:8080"), "", e2ethanos.DefaultPrometheusImage(), "") + prom1, sidecar1 := e2ethanos.NewPrometheusWithSidecar(e, "alone", e2ethanos.DefaultPromConfig("prom-alone", 0, "", "", e2ethanos.LocalPrometheusTarget, "qfetenantmetrics-querier-1:8080"), "", e2ethanos.DefaultPrometheusImage(), "") q := e2ethanos.NewQuerierBuilder(e, "1", sidecar1.InternalEndpoint("grpc")).Init() testutil.Ok(t, e2e.StartAndWaitReady(q)) diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 81eda46f99..1844bb7a23 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -60,10 +60,13 @@ import ( "github.com/thanos-io/thanos/pkg/store/labelpb" prompb_copy "github.com/thanos-io/thanos/pkg/store/storepb/prompb" "github.com/thanos-io/thanos/pkg/targets/targetspb" + "github.com/thanos-io/thanos/pkg/tenancy" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" "github.com/thanos-io/thanos/test/e2e/e2ethanos" ) +const testQueryA = "{a=\"1\"}" + func defaultWebConfig() string { // username: test, secret: test(bcrypt hash) return ` @@ -2417,12 +2420,12 @@ func TestSidecarPrefersExtLabels(t *testing.T) { func TestTenantHTTPMetrics(t *testing.T) { t.Parallel() - e, err := e2e.NewDockerEnvironment("tenant-metrics") + e, err := e2e.NewDockerEnvironment("q-tenant-metrics") testutil.Ok(t, err) t.Cleanup(e2ethanos.CleanScenario(t, e)) // scrape the local prometheus, and our querier metrics - prom1, sidecar1 := e2ethanos.NewPrometheusWithSidecar(e, "alone", e2ethanos.DefaultPromConfig("prom-alone", 0, "", "", e2ethanos.LocalPrometheusTarget, "tenant-metrics-querier-1:8080"), "", e2ethanos.DefaultPrometheusImage(), "") + prom1, sidecar1 := e2ethanos.NewPrometheusWithSidecar(e, "alone", e2ethanos.DefaultPromConfig("prom-alone", 0, "", "", e2ethanos.LocalPrometheusTarget, "q-tenant-metrics-querier-1:8080"), "", e2ethanos.DefaultPrometheusImage(), "") q := e2ethanos.NewQuerierBuilder(e, "1", sidecar1.InternalEndpoint("grpc")).Init() testutil.Ok(t, e2e.StartAndWaitReady(q)) @@ -2497,3 +2500,265 @@ func TestTenantHTTPMetrics(t *testing.T) { e2emon.WaitMissingMetrics(), )) } + +func TestQueryTenancyEnforcement(t *testing.T) { + t.Parallel() + + // Build up. + e, err := e2e.New(e2e.WithName("tenancyEnforce")) + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + t.Cleanup(cancel) + + bucket := "store-gw-test" + minio := e2edb.NewMinio(e, "thanos-minio", bucket, e2edb.WithMinioTLS()) + testutil.Ok(t, e2e.StartAndWaitReady(minio)) + + l := log.NewLogfmtLogger(os.Stdout) + bkt, err := s3.NewBucketWithConfig(l, e2ethanos.NewS3Config(bucket, minio.Endpoint("http"), minio.Dir()), "test") + testutil.Ok(t, err) + + // Add series from different tenants + now := time.Now() + tenantLabel01 := labels.FromStrings(tenancy.DefaultTenantLabel, "tenant-01") + tenantLabel02 := labels.FromStrings(tenancy.DefaultTenantLabel, "tenant-02") + tenantLabel03 := labels.FromStrings(tenancy.DefaultTenantLabel, "default-tenant") + dir := filepath.Join(e.SharedDir(), "tmp") + testutil.Ok(t, os.MkdirAll(filepath.Join(e.SharedDir(), dir), os.ModePerm)) + + series1 := []labels.Labels{labels.FromStrings("a", "1")} + series2 := []labels.Labels{labels.FromStrings("b", "2")} + series3 := []labels.Labels{labels.FromStrings("c", "3")} + + blockID1, err := e2eutil.CreateBlockWithBlockDelay(ctx, + dir, + series1, + 10, + timestamp.FromTime(now), + timestamp.FromTime(now.Add(2*time.Hour)), + 30*time.Minute, + tenantLabel01, + 0, + metadata.NoneFunc, + ) + testutil.Ok(t, err) + + blockID2, err := e2eutil.CreateBlockWithBlockDelay(ctx, + dir, + series2, + 10, + timestamp.FromTime(now), + timestamp.FromTime(now.Add(2*time.Hour)), + 30*time.Minute, + tenantLabel02, + 0, + metadata.NoneFunc, + ) + testutil.Ok(t, err) + + blockID3, err := e2eutil.CreateBlockWithBlockDelay(ctx, + dir, + series3, + 10, + timestamp.FromTime(now), + timestamp.FromTime(now.Add(2*time.Hour)), + 30*time.Minute, + tenantLabel03, + 0, + metadata.NoneFunc, + ) + testutil.Ok(t, err) + + testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, blockID1.String()), blockID1.String())) + testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, blockID2.String()), blockID2.String())) + testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, blockID3.String()), blockID3.String())) + + storeGW := e2ethanos.NewStoreGW( + e, + "s1", + client.BucketConfig{ + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, minio.InternalEndpoint("http"), minio.InternalDir()), + }, + "", + "", + nil, + ) + + querierEnforce := e2ethanos.NewQuerierBuilder(e, "1", storeGW.InternalEndpoint("grpc")).WithTenancy(true).Init() + querierNoEnforce := e2ethanos.NewQuerierBuilder(e, "2", storeGW.InternalEndpoint("grpc")).Init() + testutil.Ok(t, e2e.StartAndWaitReady(storeGW, querierEnforce, querierNoEnforce)) + testutil.Ok(t, storeGW.WaitSumMetrics(e2emon.Equals(3), "thanos_blocks_meta_synced")) + + tenant1Header := make(http.Header) + tenant1Header.Add("thanos-tenant", "tenant-01") + + tenant2Header := make(http.Header) + tenant2Header.Add("thanos-tenant", "tenant-02") + + // default-tenant should only see part of the results + queryAndAssertSeries(t, ctx, querierEnforce.Endpoint("http"), func() string { return "{c=\"3\"}" }, + time.Now, promclient.QueryOptions{ + Deduplicate: false, + }, + []model.Metric{ + { + "c": "3", + "tenant_id": "default-tenant", + }, + }, + ) + + // tenant-01 should only see part of the results + queryAndAssertSeries(t, ctx, querierEnforce.Endpoint("http"), func() string { return testQueryA }, + time.Now, promclient.QueryOptions{ + Deduplicate: false, + HTTPHeaders: tenant1Header, + }, + []model.Metric{ + { + "a": "1", + "tenant_id": "tenant-01", + }, + }, + ) + + // With no enforcement enabled, default tenant can see everything + queryAndAssertSeries(t, ctx, querierNoEnforce.Endpoint("http"), func() string { return testQueryA }, + time.Now, promclient.QueryOptions{ + Deduplicate: false, + }, + []model.Metric{ + { + "a": "1", + "tenant_id": "tenant-01", + }, + }, + ) + + // Default tenant don't see "a" when tenancy is enforced + queryAndAssertSeries(t, ctx, querierEnforce.Endpoint("http"), func() string { return testQueryA }, + time.Now, promclient.QueryOptions{ + Deduplicate: false, + }, + nil, + ) + + // tenant-2 don't see "a" when tenancy is enforced + queryAndAssertSeries(t, ctx, querierEnforce.Endpoint("http"), func() string { return testQueryA }, + time.Now, promclient.QueryOptions{ + Deduplicate: false, + HTTPHeaders: tenant2Header, + }, + nil, + ) + + // default-tenant cannot attempt to view other tenants data, by setting the tenant id + queryAndAssertSeries(t, ctx, querierEnforce.Endpoint("http"), func() string { return "{tenant_id=\"tenant-01\"}" }, + time.Now, promclient.QueryOptions{ + Deduplicate: false, + }, + nil, + ) + + rangeQuery(t, ctx, querierEnforce.Endpoint("http"), func() string { return testQueryA }, timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), 3600, + promclient.QueryOptions{ + Deduplicate: true, + }, func(res model.Matrix) error { + if res.Len() == 0 { + return nil + } else { + return errors.New("default-tenant shouldn't be able to see results with label a") + } + }) + + rangeQuery(t, ctx, querierNoEnforce.Endpoint("http"), func() string { return testQueryA }, timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), 3600, + promclient.QueryOptions{ + Deduplicate: true, + }, func(res model.Matrix) error { + if res[0].Metric["a"] == "1" { + return nil + } else { + return errors.New("default-tenant should be able to see results with label a when enforcement is off") + } + }) + + rangeQuery(t, ctx, querierEnforce.Endpoint("http"), func() string { return "{c=\"3\"}" }, timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), 3600, + promclient.QueryOptions{ + Deduplicate: true, + }, func(res model.Matrix) error { + if res[0].Metric["c"] == "3" { + return nil + } else { + return errors.New("default-tenant should be able to see its own data when enforcement is enabled") + } + }) + + // default-tenant should only see two labels when enforcing is on (c,tenant_id) + labelNames(t, ctx, querierEnforce.Endpoint("http"), nil, timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { + return len(res) == 2 + }) + + // default-tenant should only see all labels when enforcing is not on (a,b,c,tenant_id) + labelNames(t, ctx, querierNoEnforce.Endpoint("http"), nil, timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { + return len(res) == 4 + }) + + // default tenant can just the value of the C label + labelValues(t, ctx, querierEnforce.Endpoint("http"), "c", nil, + timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { + return len(res) == 1 + }, + ) + labelValues(t, ctx, querierEnforce.Endpoint("http"), "a", nil, + timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { + return len(res) == 0 + }, + ) + + // Series endpoint tests + var matcherSetC []*labels.Matcher + labelMatcherC := &labels.Matcher{ + Name: "c", + Type: labels.MatchEqual, + Value: "3", + } + matcherSetC = append(matcherSetC, labelMatcherC) + + var matcherSetB []*labels.Matcher + labelMatcher := &labels.Matcher{ + Name: "b", + Type: labels.MatchEqual, + Value: "2", + } + matcherSetB = append(matcherSetB, labelMatcher) + + // default-tenant can see series with matcher C + series(t, ctx, querierEnforce.Endpoint("http"), matcherSetC, timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []map[string]string) bool { + var expected = []map[string]string{ + { + "c": "3", + "tenant_id": "default-tenant", + }, + } + return reflect.DeepEqual(res, expected) + }) + + // default-tenant cannot see series with matcher B when tenancy is enabled + series(t, ctx, querierEnforce.Endpoint("http"), matcherSetB, timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []map[string]string) bool { + return len(res) == 0 + }) + + // default-tenant can see series with matcher B when tenancy is not enabled + series(t, ctx, querierNoEnforce.Endpoint("http"), matcherSetB, timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []map[string]string) bool { + var expected = []map[string]string{ + { + "b": "2", + "tenant_id": "tenant-02", + }, + } + return reflect.DeepEqual(res, expected) + }) +}