Skip to content

Commit

Permalink
feat: management WebUI homepage (#36822)
Browse files Browse the repository at this point in the history
issue: #36784
1. Implement an embedded web server for WebUI access.  
2. Complete the homepage development.

Home page demo:
<img width="2177" alt="iShot_2024-10-10_17 57 34"
src="https://github.com/user-attachments/assets/38539917-ce09-4e54-a5b5-7f4f7eaac353">

Signed-off-by: jaime <[email protected]>
  • Loading branch information
jaime0815 authored Oct 23, 2024
1 parent 30121a5 commit 4746f47
Show file tree
Hide file tree
Showing 66 changed files with 2,618 additions and 253 deletions.
2 changes: 1 addition & 1 deletion configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ pulsar:

# If you want to enable kafka, needs to comment the pulsar configs
# kafka:
# brokerList:
# brokerList: localhost:9092
# saslUsername:
# saslPassword:
# saslMechanisms:
Expand Down
4 changes: 3 additions & 1 deletion internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/tidwall/gjson"
"go.opentelemetry.io/otel"
"go.uber.org/zap"

Expand Down Expand Up @@ -1077,7 +1078,8 @@ func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest
}, nil
}

metricType, err := metricsinfo.ParseMetricType(req.Request)
ret := gjson.Parse(req.GetRequest())
metricType, err := metricsinfo.ParseMetricRequestType(ret)
if err != nil {
log.Warn("DataCoord.GetMetrics failed to parse metric type",
zap.Int64("nodeID", paramtable.GetNodeID()),
Expand Down
4 changes: 3 additions & 1 deletion internal/datanode/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"

"github.com/samber/lo"
"github.com/tidwall/gjson"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
Expand Down Expand Up @@ -165,7 +166,8 @@ func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe
}, nil
}

metricType, err := metricsinfo.ParseMetricType(req.Request)
ret := gjson.Parse(req.GetRequest())
metricType, err := metricsinfo.ParseMetricRequestType(ret)
if err != nil {
log.Warn("DataNode.GetMetrics failed to parse metric type",
zap.Int64("nodeID", node.GetNodeID()),
Expand Down
5 changes: 3 additions & 2 deletions internal/distributed/proxy/httpserver/handler_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
mhttp "github.com/milvus-io/milvus/internal/http"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proxy"
Expand Down Expand Up @@ -358,7 +359,7 @@ func TestTimeout(t *testing.T) {
})
headerTestCases = append(headerTestCases, headerTestCase{
path: path, // timeout 3s
headers: map[string]string{HTTPHeaderRequestTimeout: "3"},
headers: map[string]string{mhttp.HTTPHeaderRequestTimeout: "3"},
status: http.StatusRequestTimeout,
})
path = "/middleware/timeout/31"
Expand All @@ -371,7 +372,7 @@ func TestTimeout(t *testing.T) {
})
headerTestCases = append(headerTestCases, headerTestCase{
path: path, // wait 32s
headers: map[string]string{HTTPHeaderRequestTimeout: "32"},
headers: map[string]string{mhttp.HTTPHeaderRequestTimeout: "32"},
})

for _, testcase := range headerTestCases {
Expand Down
8 changes: 5 additions & 3 deletions internal/distributed/proxy/httpserver/timeout_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"time"

"github.com/gin-gonic/gin"

mhttp "github.com/milvus-io/milvus/internal/http"
)

func defaultResponse(c *gin.Context) {
Expand Down Expand Up @@ -133,13 +135,13 @@ func checkWriteHeaderCode(code int) {

func timeoutMiddleware(handler gin.HandlerFunc) gin.HandlerFunc {
t := &Timeout{
timeout: HTTPDefaultTimeout,
timeout: mhttp.HTTPDefaultTimeout,
handler: handler,
response: defaultResponse,
}
bufPool := &BufferPool{}
return func(c *gin.Context) {
timeoutSecond, err := strconv.ParseInt(c.Request.Header.Get(HTTPHeaderRequestTimeout), 10, 64)
timeoutSecond, err := strconv.ParseInt(c.Request.Header.Get(mhttp.HTTPHeaderRequestTimeout), 10, 64)
if err == nil {
t.timeout = time.Duration(timeoutSecond) * time.Second
}
Expand All @@ -166,7 +168,7 @@ func timeoutMiddleware(handler gin.HandlerFunc) gin.HandlerFunc {
case p := <-panicChan:
tw.FreeBuffer()
c.Writer = w
c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{HTTPReturnCode: http.StatusInternalServerError})
c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{mhttp.HTTPReturnCode: http.StatusInternalServerError})
panic(p)

case <-finish:
Expand Down
79 changes: 79 additions & 0 deletions internal/distributed/proxy/httpserver/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"reflect"
"strconv"
"strings"
"time"

"github.com/gin-gonic/gin"
"github.com/spf13/cast"
Expand All @@ -19,8 +20,10 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
mhttp "github.com/milvus-io/milvus/internal/http"
"github.com/milvus-io/milvus/internal/json"
"github.com/milvus-io/milvus/internal/proxy"
"github.com/milvus-io/milvus/internal/proxy/accesslog"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
Expand Down Expand Up @@ -1501,3 +1504,79 @@ func convertToExtraParams(indexParam IndexParam) ([]*commonpb.KeyValuePair, erro
}
return params, nil
}

func MetricsHandlerFunc(c *gin.Context) {
path := c.Request.URL.Path
metrics.RestfulFunctionCall.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10), path,
).Inc()
if c.Request.ContentLength >= 0 {
metrics.RestfulReceiveBytes.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10), path,
).Add(float64(c.Request.ContentLength))
}
start := time.Now()

// Process request
c.Next()

latency := time.Since(start)
metrics.RestfulReqLatency.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10), path,
).Observe(float64(latency.Milliseconds()))

// see https://github.com/milvus-io/milvus/issues/35767, counter cannot add negative value
// when response is not written(say timeout/network broken), panicking may happen if not check
if size := c.Writer.Size(); size > 0 {
metrics.RestfulSendBytes.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10), path,
).Add(float64(c.Writer.Size()))
}
}

func LoggerHandlerFunc() gin.HandlerFunc {
return gin.LoggerWithConfig(gin.LoggerConfig{
SkipPaths: proxy.Params.ProxyCfg.GinLogSkipPaths.GetAsStrings(),
Formatter: func(param gin.LogFormatterParams) string {
if param.Latency > time.Minute {
param.Latency = param.Latency.Truncate(time.Second)
}
traceID, ok := param.Keys["traceID"]
if !ok {
traceID = ""
}

accesslog.SetHTTPParams(&param)
return fmt.Sprintf("[%v] [GIN] [%s] [traceID=%s] [code=%3d] [latency=%v] [client=%s] [method=%s] [error=%s]\n",
param.TimeStamp.Format("2006/01/02 15:04:05.000 Z07:00"),
param.Path,
traceID,
param.StatusCode,
param.Latency,
param.ClientIP,
param.Method,
param.ErrorMessage,
)
},
})
}

func RequestHandlerFunc(c *gin.Context) {
_, err := strconv.ParseBool(c.Request.Header.Get(mhttp.HTTPHeaderAllowInt64))
if err != nil {
if paramtable.Get().HTTPCfg.AcceptTypeAllowInt64.GetAsBool() {
c.Request.Header.Set(mhttp.HTTPHeaderAllowInt64, "true")
} else {
c.Request.Header.Set(mhttp.HTTPHeaderAllowInt64, "false")
}
}
c.Writer.Header().Set("Access-Control-Allow-Origin", "*")
c.Writer.Header().Set("Access-Control-Allow-Credentials", "true")
c.Writer.Header().Set("Access-Control-Allow-Headers", "Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization, accept, origin, Cache-Control, X-Requested-With")
c.Writer.Header().Set("Access-Control-Allow-Methods", "GET, HEAD, POST, PUT, DELETE, OPTIONS, PATCH")
if c.Request.Method == "OPTIONS" {
c.AbortWithStatus(204)
return
}
c.Next()
}
2 changes: 1 addition & 1 deletion internal/distributed/proxy/httpserver/wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestWrapHandler(t *testing.T) {
testEngine.GET("/test/:case", wrappedHandler)

t.Run("status ok", func(t *testing.T) {
req := httptest.NewRequest(http.MethodGet, "/test/0", nil)
req := httptest.NewRequest(http.MethodGet, "/test/0?verbose=false", nil)
w := httptest.NewRecorder()
testEngine.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
Expand Down
92 changes: 13 additions & 79 deletions internal/distributed/proxy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"io"
"net/http"
"os"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -52,7 +51,7 @@ import (
qcc "github.com/milvus-io/milvus/internal/distributed/querycoord/client"
rcc "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
"github.com/milvus-io/milvus/internal/distributed/utils"
management "github.com/milvus-io/milvus/internal/http"
mhttp "github.com/milvus-io/milvus/internal/http"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/proxy"
Expand Down Expand Up @@ -140,7 +139,7 @@ func authenticate(c *gin.Context) {
}
log.Warn("fail to verify apikey", zap.Error(err))
}
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{httpserver.HTTPReturnCode: merr.Code(merr.ErrNeedAuthenticate), httpserver.HTTPReturnMessage: merr.ErrNeedAuthenticate.Error()})
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{mhttp.HTTPReturnCode: merr.Code(merr.ErrNeedAuthenticate), mhttp.HTTPReturnMessage: merr.ErrNeedAuthenticate.Error()})
}

// registerHTTPServer register the http server, panic when failed
Expand All @@ -157,9 +156,14 @@ func (s *Server) registerHTTPServer() {
}
metricsGinHandler := gin.Default()
apiv1 := metricsGinHandler.Group(apiPathPrefix)
httpserver.NewHandlers(s.proxy).RegisterRoutesTo(apiv1)
management.Register(&management.Handler{
Path: management.RootPath,
apiv1.Use(httpserver.RequestHandlerFunc)
handlers := httpserver.NewHandlers(s.proxy)
handlers.RegisterRoutesTo(apiv1)
if p, ok := s.proxy.(*proxy.Proxy); ok {
p.RegisterRestRouter(apiv1)
}
mhttp.Register(&mhttp.Handler{
Path: mhttp.RootPath,
HandlerFunc: nil,
Handler: metricsGinHandler.Handler(),
})
Expand All @@ -168,80 +172,10 @@ func (s *Server) registerHTTPServer() {
func (s *Server) startHTTPServer(errChan chan error) {
defer s.wg.Done()
ginHandler := gin.New()
ginHandler.Use(func(c *gin.Context) {
path := c.Request.URL.Path
metrics.RestfulFunctionCall.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10), path,
).Inc()
if c.Request.ContentLength >= 0 {
metrics.RestfulReceiveBytes.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10), path,
).Add(float64(c.Request.ContentLength))
}
start := time.Now()

// Process request
c.Next()

latency := time.Since(start)
metrics.RestfulReqLatency.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10), path,
).Observe(float64(latency.Milliseconds()))

// see https://github.com/milvus-io/milvus/issues/35767, counter cannot add negative value
// when response is not written(say timeout/network broken), panicking may happen if not check
if size := c.Writer.Size(); size > 0 {
metrics.RestfulSendBytes.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10), path,
).Add(float64(c.Writer.Size()))
}
})

ginHandler.Use(httpserver.MetricsHandlerFunc)
ginHandler.Use(accesslog.AccessLogMiddleware)
ginLogger := gin.LoggerWithConfig(gin.LoggerConfig{
SkipPaths: proxy.Params.ProxyCfg.GinLogSkipPaths.GetAsStrings(),
Formatter: func(param gin.LogFormatterParams) string {
if param.Latency > time.Minute {
param.Latency = param.Latency.Truncate(time.Second)
}
traceID, ok := param.Keys["traceID"]
if !ok {
traceID = ""
}

accesslog.SetHTTPParams(&param)
return fmt.Sprintf("[%v] [GIN] [%s] [traceID=%s] [code=%3d] [latency=%v] [client=%s] [method=%s] [error=%s]\n",
param.TimeStamp.Format("2006/01/02 15:04:05.000 Z07:00"),
param.Path,
traceID,
param.StatusCode,
param.Latency,
param.ClientIP,
param.Method,
param.ErrorMessage,
)
},
})
ginHandler.Use(ginLogger, gin.Recovery())
ginHandler.Use(func(c *gin.Context) {
_, err := strconv.ParseBool(c.Request.Header.Get(httpserver.HTTPHeaderAllowInt64))
if err != nil {
if paramtable.Get().HTTPCfg.AcceptTypeAllowInt64.GetAsBool() {
c.Request.Header.Set(httpserver.HTTPHeaderAllowInt64, "true")
} else {
c.Request.Header.Set(httpserver.HTTPHeaderAllowInt64, "false")
}
}
c.Writer.Header().Set("Access-Control-Allow-Origin", "*")
c.Writer.Header().Set("Access-Control-Allow-Credentials", "true")
c.Writer.Header().Set("Access-Control-Allow-Headers", "Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization, accept, origin, Cache-Control, X-Requested-With")
c.Writer.Header().Set("Access-Control-Allow-Methods", "GET, HEAD, POST, PUT, DELETE, OPTIONS, PATCH")
if c.Request.Method == "OPTIONS" {
c.AbortWithStatus(204)
return
}
c.Next()
})
ginHandler.Use(httpserver.LoggerHandlerFunc(), gin.Recovery())
ginHandler.Use(httpserver.RequestHandlerFunc)
ginHandler.Use(func(c *gin.Context) {
c.Set(httpserver.ContextUsername, "")
})
Expand Down
14 changes: 14 additions & 0 deletions internal/http/constant.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package http

import (
"time"
)

const (
HTTPHeaderAllowInt64 = "Accept-Type-Allow-Int64"
HTTPHeaderRequestTimeout = "Request-Timeout"
HTTPDefaultTimeout = 30 * time.Second
HTTPReturnCode = "code"
HTTPReturnMessage = "message"
HTTPReturnData = "data"
)
13 changes: 13 additions & 0 deletions internal/http/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (
const (
RouteTriggerStopPath = "/management/stop"
RouteCheckComponentReady = "/management/check/ready"
RouteWebUI = "/webui/"
)

// proxy management restful api root path
Expand All @@ -61,3 +62,15 @@ const (
RouteGetQueryNodeDistribution = "/management/querycoord/distribution/get"
RouteCheckQueryNodeDistribution = "/management/querycoord/distribution/check"
)

// for WebUI restful api root path
const (
ClusterInfoPath = "/_cluster/info"
ClusterConfigsPath = "/_cluster/configs"
ClusterClientsPath = "/_cluster/clients"
ClusterDependenciesPath = "/_cluster/dependencies"
HookConfigsPath = "/_hook/configs"
QcoordSegmentsPath = "/_qcoord/segments"
QcoordChannelsPath = "/_qcoord/channels"
QcoordTasksPath = "/_qcoord/tasks"
)
Loading

0 comments on commit 4746f47

Please sign in to comment.