From 0220df5cf7c18d0c3ba80d1c2e8e18d6acee63de Mon Sep 17 00:00:00 2001 From: haorenfsa Date: Fri, 18 Oct 2024 16:13:01 +0800 Subject: [PATCH] support pulsar enable authn Signed-off-by: haorenfsa --- go.mod | 23 ++-- go.sum | 54 ++++----- pkg/controllers/conditions.go | 36 +----- pkg/controllers/conditions_test.go | 56 ---------- pkg/controllers/external_interfaces.go | 26 ----- pkg/controllers/status_cluster.go | 2 +- pkg/controllers/status_cluster_test.go | 2 - pkg/external/pulsar.go | 96 ++++++++++++++++ pkg/external/pulsar_client.go | 17 +++ .../log.go => external/pulsar_log.go} | 16 ++- pkg/external/pulsar_test.go | 104 ++++++++++++++++++ 11 files changed, 275 insertions(+), 157 deletions(-) create mode 100644 pkg/external/pulsar.go create mode 100644 pkg/external/pulsar_client.go rename pkg/{controllers/log.go => external/pulsar_log.go} (78%) create mode 100644 pkg/external/pulsar_test.go diff --git a/go.mod b/go.mod index c76240d..20f5933 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.1.0 github.com/Masterminds/sprig v2.22.0+incompatible github.com/aliyun/credentials-go v1.2.7 - github.com/apache/pulsar-client-go v0.6.0 + github.com/apache/pulsar-client-go v0.9.0 github.com/coreos/go-semver v0.3.0 github.com/davecgh/go-spew v1.1.1 github.com/go-logr/logr v1.2.3 @@ -42,14 +42,15 @@ require ( require ( cloud.google.com/go v0.99.0 // indirect - github.com/99designs/keyring v1.1.5 // indirect - github.com/AthenZ/athenz v1.10.15 // indirect + github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect + github.com/99designs/keyring v1.2.1 // indirect + github.com/AthenZ/athenz v1.10.39 // indirect github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.2 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2 // indirect github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect github.com/AzureAD/microsoft-authentication-library-for-go v1.2.1 // indirect github.com/BurntSushi/toml v1.1.0 // indirect - github.com/DataDog/zstd v1.4.6-0.20210211175136-c6db21d202f4 // indirect + github.com/DataDog/zstd v1.5.0 // indirect github.com/MakeNowJust/heredoc v1.0.0 // indirect github.com/Masterminds/goutils v1.1.1 // indirect github.com/Masterminds/semver v1.5.0 // indirect @@ -59,7 +60,6 @@ require ( github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect github.com/alibabacloud-go/debug v0.0.0-20190504072949-9472017b5c68 // indirect github.com/alibabacloud-go/tea v1.1.8 // indirect - github.com/apache/pulsar-client-go/oauth2 v0.0.0-20201120111947-b8bd55bc02bd // indirect github.com/ardielle/ardielle-go v1.5.2 // indirect github.com/asaskevich/govalidator v0.0.0-20200428143746-21a406dcc535 // indirect github.com/beorn7/perks v1.0.1 // indirect @@ -68,8 +68,7 @@ require ( github.com/containerd/containerd v1.6.6 // indirect github.com/coreos/go-systemd/v22 v22.3.2 // indirect github.com/cyphar/filepath-securejoin v0.2.4 // indirect - github.com/danieljoos/wincred v1.1.0 // indirect - github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect + github.com/danieljoos/wincred v1.1.2 // indirect github.com/docker/cli v20.10.17+incompatible // indirect github.com/docker/distribution v2.8.1+incompatible // indirect github.com/docker/docker v20.10.17+incompatible // indirect @@ -78,7 +77,7 @@ require ( github.com/docker/go-metrics v0.0.1 // indirect github.com/docker/go-units v0.4.0 // indirect github.com/dustin/go-humanize v1.0.1 // indirect - github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a // indirect + github.com/dvsekhvalnov/jose2go v1.5.0 // indirect github.com/emicklei/go-restful/v3 v3.9.0 // indirect github.com/evanphx/json-patch v5.6.0+incompatible // indirect github.com/evanphx/json-patch/v5 v5.6.0 // indirect @@ -96,6 +95,7 @@ require ( github.com/gobwas/glob v0.2.3 // indirect github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang-jwt/jwt v3.2.1+incompatible // indirect github.com/golang-jwt/jwt/v5 v5.2.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.2 // indirect @@ -117,7 +117,6 @@ require ( github.com/jmoiron/sqlx v1.3.5 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d // indirect github.com/klauspost/compress v1.16.5 // indirect github.com/klauspost/cpuid/v2 v2.2.4 // indirect github.com/kylelemons/godebug v1.1.0 // indirect @@ -134,7 +133,6 @@ require ( github.com/minio/md5-simd v1.1.2 // indirect github.com/minio/sha256-simd v1.0.1 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect - github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/go-wordwrap v1.0.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/moby/locker v1.0.1 // indirect @@ -146,7 +144,6 @@ require ( github.com/morikuni/aec v1.0.0 // indirect github.com/mtibben/percent v0.2.1 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/onsi/ginkgo v1.16.4 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 // indirect github.com/peterbourgon/diskv v2.0.1+incompatible // indirect @@ -211,3 +208,7 @@ require ( sigs.k8s.io/kustomize/kyaml v0.13.9 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect ) + +replace github.com/apache/pulsar-client-go => github.com/milvus-io/pulsar-client-go v0.6.10 + +exclude github.com/apache/pulsar-client-go/oauth2 v0.0.0-20201120111947-b8bd55bc02bd diff --git a/go.sum b/go.sum index 48f5b8f..d601cbe 100644 --- a/go.sum +++ b/go.sum @@ -47,10 +47,12 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX cloud.google.com/go/storage v1.10.0 h1:STgFzyU5/8miMl0//zKh2aQeTyeaUH3WN9bSUiJ09bA= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= -github.com/99designs/keyring v1.1.5 h1:wLv7QyzYpFIyMSwOADq1CLTF9KbjbBfcnfmOGJ64aO4= -github.com/99designs/keyring v1.1.5/go.mod h1:7hsVvt2qXgtadGevGJ4ujg+u8m6SpJ5TpHqTozIPqf0= -github.com/AthenZ/athenz v1.10.15 h1:8Bc2W313k/ev/SGokuthNbzpwfg9W3frg3PKq1r943I= -github.com/AthenZ/athenz v1.10.15/go.mod h1:7KMpEuJ9E4+vMCMI3UQJxwWs0RZtQq7YXZ1IteUjdsc= +github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 h1:/vQbFIOMbk2FiG/kXiLl8BRyzTWDw7gX/Hz7Dd5eDMs= +github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4/go.mod h1:hN7oaIRCjzsZ2dE+yG5k+rsdt3qcwykqK6HVGcKwsw4= +github.com/99designs/keyring v1.2.1 h1:tYLp1ULvO7i3fI5vE21ReQuj99QFSs7lGm0xWyJo87o= +github.com/99designs/keyring v1.2.1/go.mod h1:fc+wB5KTk9wQ9sDx0kFXB3A0MaeGHM9AwRStKOQ5vOA= +github.com/AthenZ/athenz v1.10.39 h1:mtwHTF/v62ewY2Z5KWhuZgVXftBej1/Tn80zx4DcawY= +github.com/AthenZ/athenz v1.10.39/go.mod h1:3Tg8HLsiQZp81BJY58JBeU2BR6B/H4/0MQGfCwhHNEA= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.2 h1:c4k2FIYIh4xtwqrQwV0Ct1v5+ehlNXj5NI/MWVsiTkQ= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.2/go.mod h1:5FDJtLEO/GxwNgUxbwrY3LP0pEoThTQJtk2oysdXHxM= github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0 h1:vcYCAze6p19qBW7MhZybIsqD8sMV8js0NyQM8JDnVtg= @@ -79,8 +81,8 @@ github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbi github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60= github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= -github.com/DataDog/zstd v1.4.6-0.20210211175136-c6db21d202f4 h1:++HGU87uq9UsSTlFeiOV9uZR3NpYkndUXeYyLv2DTc8= -github.com/DataDog/zstd v1.4.6-0.20210211175136-c6db21d202f4/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= +github.com/DataDog/zstd v1.5.0 h1:+K/VEwIAaPcHiMtQvpLD4lqW7f0Gk3xdYZmI1hD+CXo= +github.com/DataDog/zstd v1.5.0/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= github.com/MakeNowJust/heredoc v1.0.0 h1:cXCdzVdstXyiTqTvfqk9SDHpKNjxuom+DOlyEeQ4pzQ= github.com/MakeNowJust/heredoc v1.0.0/go.mod h1:mG5amYoWBHf8vpLOuehzbGGw0EHxpZZ6lCpQ4fNJ8LE= github.com/Masterminds/goutils v1.1.0/go.mod h1:8cTjp+g8YejhMuvIA5y2vz3BpJxksy863GQaJW2MFNU= @@ -125,10 +127,6 @@ github.com/aliyun/credentials-go v1.2.7 h1:gLtFylxLZ1TWi1pStIt1O6a53GFU1zkNwjtJi github.com/aliyun/credentials-go v1.2.7/go.mod h1:/KowD1cfGSLrLsH28Jr8W+xwoId0ywIy5lNzDz6O1vw= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= -github.com/apache/pulsar-client-go v0.6.0 h1:yKX7NsmJxR5mL6uIUxTTatNhMFlhurTASSZRJ9IULDg= -github.com/apache/pulsar-client-go v0.6.0/go.mod h1:A1P5VjjljsFKAD13w7/jmU3Dly2gcRvcobiULqQXhz4= -github.com/apache/pulsar-client-go/oauth2 v0.0.0-20201120111947-b8bd55bc02bd h1:P5kM7jcXJ7TaftX0/EMKiSJgvQc/ct+Fw0KMvcH3WuY= -github.com/apache/pulsar-client-go/oauth2 v0.0.0-20201120111947-b8bd55bc02bd/go.mod h1:0UtvvETGDdvXNDCHa8ZQpxl+w3HbdFtfYZvDHLgWGTY= github.com/ardielle/ardielle-go v1.5.2 h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4= github.com/ardielle/ardielle-go v1.5.2/go.mod h1:I4hy1n795cUhaVt/ojz83SNVCYIGsAFAONtv2Dr7HUI= github.com/ardielle/ardielle-tools v1.5.4/go.mod h1:oZN+JRMnqGiIhrzkRN9l26Cej9dEx4jeNG6A+AdkShk= @@ -143,7 +141,6 @@ github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:l github.com/asaskevich/govalidator v0.0.0-20200428143746-21a406dcc535 h1:4daAzAu0S6Vi7/lbWECcX0j45yZReDZ56BQsrVBOEEY= github.com/asaskevich/govalidator v0.0.0-20200428143746-21a406dcc535/go.mod h1:oGkLhpf+kjZl6xBf758TQhh5XrAeiJv/7FRz/2spLIg= github.com/aws/aws-sdk-go v1.32.6/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= -github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6/go.mod h1:6YNgTHLutezwnBvyneBbwvB8C82y3dcoOj5EQJIdGXA= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= @@ -208,14 +205,13 @@ github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw= github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/cyphar/filepath-securejoin v0.2.4 h1:Ugdm7cg7i6ZK6x3xDF1oEu1nfkyfH53EtKeQYTC3kyg= github.com/cyphar/filepath-securejoin v0.2.4/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4= -github.com/danieljoos/wincred v1.0.2/go.mod h1:SnuYRW9lp1oJrZX/dXJqr0cPK5gYXqx3EJbmjhLdK9U= -github.com/danieljoos/wincred v1.1.0 h1:3RNcEpBg4IhIChZdFRSdlQt1QjCp1sMAPIrOnm7Yf8g= github.com/danieljoos/wincred v1.1.0/go.mod h1:XYlo+eRTsVA9aHGp7NGjFkPla4m+DCL7hqDjlFjiygg= +github.com/danieljoos/wincred v1.1.2 h1:QLdCxFs1/Yl4zduvBdcHB8goaYk9RARS2SgLLRuAyr0= +github.com/danieljoos/wincred v1.1.2/go.mod h1:GijpziifJoIBfYh+S7BbkdUTU4LfM+QnGqR5Vl2tAx0= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/denisenkom/go-mssqldb v0.9.0/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU= -github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA= github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0= @@ -249,8 +245,8 @@ github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:Htrtb github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= -github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a h1:mq+R6XEM6lJX5VlLyZIrUSP8tSuJp82xTK89hvBwJbU= -github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a/go.mod h1:7BvyPhdbLxMXIYTFPLsyJRFMsKmOZnQmzh6Gb+uquuM= +github.com/dvsekhvalnov/jose2go v1.5.0 h1:3j8ya4Z4kMCwT5nXIKFSV84YS+HdqSSO0VsTQxaLAeM= +github.com/dvsekhvalnov/jose2go v1.5.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU= github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153 h1:yUdfgN0XgIJw7foRItutHYUIhlcKzcSf5vDpdhQAKTc= github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= @@ -386,6 +382,8 @@ github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zV github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang-jwt/jwt v3.2.1+incompatible h1:73Z+4BJcrTC+KczS6WvTPvRGOp1WmfEP4Q1lOd9Z/+c= +github.com/golang-jwt/jwt v3.2.1+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg= github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang-jwt/jwt/v5 v5.2.0 h1:d/ix8ftRUorsN+5eMIlF4T6J8CAt9rch3My2winC1Jw= @@ -474,6 +472,7 @@ github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210122040257-d980be63207e/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210601050228-01bbb1931b22/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210609004039-a478d1d731e9/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= @@ -570,13 +569,11 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/karrick/godirwalk v1.16.1 h1:DynhcF+bztK8gooS0+NDJFrdNZjJ3gzVzC545UNA9iw= github.com/karrick/godirwalk v1.16.1/go.mod h1:j4mkqPuvaLI8mp1DroR3P6ad7cyYd4c1qeJ3RV7ULlk= -github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d h1:Z+RDyXzjKE0i2sTjZ/b1uxiGtPhFy34Ou/Tk0qwN0kM= -github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d/go.mod h1:JJNrCn9otv/2QP4D7SMJBgaleKpOf66PnW6F5WGNRIc= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.10.8/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI= github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= @@ -648,6 +645,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5 github.com/matttproud/golang_protobuf_extensions v1.0.2 h1:hAHbPm5IJGijwng3PWk09JkG9WeqChjprR5s9bBZ+OM= github.com/matttproud/golang_protobuf_extensions v1.0.2/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= +github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A= +github.com/milvus-io/pulsar-client-go v0.6.10/go.mod h1:lQqCkgwDF8YFYjKA+zOheTk1tev2B+bKj5j7+nm8M1w= github.com/minio/madmin-go v1.3.14 h1:9f9ZylP5Yn/TcplE/wowsBjb+Czt2+/NRCa2IqpNLcI= github.com/minio/madmin-go v1.3.14/go.mod h1:ez87VmMtsxP7DRxjKJKD4RDNW+nhO2QF9KSzwxBDQ98= github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= @@ -662,7 +661,6 @@ github.com/mitchellh/copystructure v1.0.0/go.mod h1:SNtv71yrdKgLRyLFxmLdkAbkKEFW github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= -github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= github.com/mitchellh/go-wordwrap v1.0.0 h1:6GlHJ/LTGMrIJbwgdqdl2eEH8o+Exx/0m8ir9Gns0u4= @@ -712,15 +710,18 @@ github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= -github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= -github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc= github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= +github.com/onsi/ginkgo/v2 v2.1.3/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c= github.com/onsi/ginkgo/v2 v2.6.0 h1:9t9b9vRUbFq3C4qKFCGkVuq/fIHji802N1nrtkh1mNc= github.com/onsi/ginkgo/v2 v2.6.0/go.mod h1:63DOGlLAH8+REH8jUGdL3YpCpu7JODesutUjdENfUAc= github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= +github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro= github.com/onsi/gomega v1.24.1 h1:KORJXNNTzJXzu4ScJWssJfJMnJ+2QJqhoQSRwNlze9E= github.com/onsi/gomega v1.24.1/go.mod h1:3AOiACssS3/MajrniINInwbfOOtfZvplPzuRSmvt1jM= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= @@ -859,6 +860,7 @@ github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= +github.com/stretchr/objx v0.3.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= @@ -1050,7 +1052,6 @@ golang.org/x/net v0.0.0-20200506145744-7e3656a0809f/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/net v0.0.0-20200513185701-a91f0712d120/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= -golang.org/x/net v0.0.0-20200602114024-627f9648deb9/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= @@ -1062,8 +1063,10 @@ golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= @@ -1123,7 +1126,6 @@ golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190616124812-15dcb6c0061f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1149,7 +1151,6 @@ golang.org/x/sys v0.0.0-20200331124033-c3d80250170d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200501052902-10377860bb8e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200511232937-7e40ca221e25/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200515095857-1151b9dac4a9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1179,12 +1180,14 @@ golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210819135213-f52c844e1c1c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -1440,6 +1443,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw= diff --git a/pkg/controllers/conditions.go b/pkg/controllers/conditions.go index dcf9bd4..7663d6b 100644 --- a/pkg/controllers/conditions.go +++ b/pkg/controllers/conditions.go @@ -49,9 +49,6 @@ var ( wrapKafkaConditonGetter = func(ctx context.Context, logger logr.Logger, p v1beta1.MilvusKafka, cfg external.CheckKafkaConfig) func() v1beta1.MilvusCondition { return func() v1beta1.MilvusCondition { return GetKafkaCondition(ctx, logger, p, cfg) } } - wrapPulsarConditonGetter = func(ctx context.Context, logger logr.Logger, p v1beta1.MilvusPulsar) func() v1beta1.MilvusCondition { - return func() v1beta1.MilvusCondition { return GetPulsarCondition(ctx, logger, p) } - } wrapEtcdConditionGetter = func(ctx context.Context, endpoints []string) func() v1beta1.MilvusCondition { return func() v1beta1.MilvusCondition { return GetEtcdCondition(ctx, endpoints) } } @@ -60,38 +57,7 @@ var ( } ) -func GetPulsarCondition(ctx context.Context, logger logr.Logger, p v1beta1.MilvusPulsar) v1beta1.MilvusCondition { - - client, err := pulsarNewClient(pulsar.ClientOptions{ - URL: "pulsar://" + p.Endpoint, - ConnectionTimeout: 2 * time.Second, - OperationTimeout: 3 * time.Second, - Logger: newPulsarLog(logger), - }) - - if err != nil { - return newErrMsgStreamCondResult(v1beta1.ReasonMsgStreamNotReady, err.Error()) - } - defer client.Close() - - reader, err := client.CreateReader(pulsar.ReaderOptions{ - Topic: "milvus-operator-topic", - StartMessageID: pulsar.EarliestMessageID(), - }) - if err != nil { - return newErrMsgStreamCondResult(v1beta1.ReasonMsgStreamNotReady, err.Error()) - } - defer reader.Close() - - return msgStreamReadyCondition -} - -var msgStreamReadyCondition = v1beta1.MilvusCondition{ - Type: v1beta1.MsgStreamReady, - Status: GetConditionStatus(true), - Reason: v1beta1.ReasonMsgStreamReady, - Message: MessageMsgStreamReady, -} +var msgStreamReadyCondition = external.MQReadyCondition var checkKafka = external.CheckKafka diff --git a/pkg/controllers/conditions_test.go b/pkg/controllers/conditions_test.go index 3537f59..b456bb0 100644 --- a/pkg/controllers/conditions_test.go +++ b/pkg/controllers/conditions_test.go @@ -5,7 +5,6 @@ import ( "errors" "testing" - "github.com/apache/pulsar-client-go/pulsar" "github.com/milvus-io/milvus-operator/apis/milvus.io/v1beta1" "github.com/milvus-io/milvus-operator/pkg/external" "github.com/prashantv/gostub" @@ -72,10 +71,6 @@ func TestWrapGetters(t *testing.T) { fn := wrapKafkaConditonGetter(ctx, logger, v1beta1.MilvusKafka{}, external.CheckKafkaConfig{}) fn() }) - t.Run("pulsar", func(t *testing.T) { - fn := wrapPulsarConditonGetter(ctx, logger, v1beta1.MilvusPulsar{}) - fn() - }) t.Run("etcd", func(t *testing.T) { fn := wrapEtcdConditionGetter(ctx, []string{}) fn() @@ -89,12 +84,6 @@ func TestWrapGetters(t *testing.T) { }) } -func getMockPulsarNewClient(cli pulsar.Client, err error) func(options pulsar.ClientOptions) (pulsar.Client, error) { - return func(options pulsar.ClientOptions) (pulsar.Client, error) { - return cli, err - } -} - func TestGetKafkaCondition(t *testing.T) { checkKafka = func(external.CheckKafkaConfig) error { return nil } ret := GetKafkaCondition(context.TODO(), logf.Log.WithName("test"), v1beta1.MilvusKafka{}, external.CheckKafkaConfig{}) @@ -105,51 +94,6 @@ func TestGetKafkaCondition(t *testing.T) { assert.Equal(t, corev1.ConditionFalse, ret.Status) } -func TestGetPulsarCondition(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - ctx := context.TODO() - logger := logf.Log.WithName("test") - mockPulsarNewClient := NewMockPulsarClient(ctrl) - errTest := errors.New("test") - - t.Run("new client failed, no err", func(t *testing.T) { - stubs := gostub.Stub(&pulsarNewClient, getMockPulsarNewClient(mockPulsarNewClient, errTest)) - defer stubs.Reset() - ret := GetPulsarCondition(ctx, logger, v1beta1.MilvusPulsar{}) - assert.Equal(t, corev1.ConditionFalse, ret.Status) - assert.Equal(t, v1beta1.ReasonMsgStreamNotReady, ret.Reason) - }) - - t.Run("new client ok, create read failed, no err", func(t *testing.T) { - stubs := gostub.Stub(&pulsarNewClient, getMockPulsarNewClient(mockPulsarNewClient, nil)) - defer stubs.Reset() - gomock.InOrder( - mockPulsarNewClient.EXPECT().CreateReader(gomock.Any()).Return(nil, errTest), - mockPulsarNewClient.EXPECT().Close(), - ) - ret := GetPulsarCondition(ctx, logger, v1beta1.MilvusPulsar{}) - assert.Equal(t, corev1.ConditionFalse, ret.Status) - assert.Equal(t, v1beta1.ReasonMsgStreamNotReady, ret.Reason) - }) - - t.Run("new client ok, create read ok, no err", func(t *testing.T) { - stubs := gostub.Stub(&pulsarNewClient, getMockPulsarNewClient(mockPulsarNewClient, nil)) - defer stubs.Reset() - mockReader := NewMockPulsarReader(ctrl) - gomock.InOrder( - mockPulsarNewClient.EXPECT().CreateReader(gomock.Any()).Return(mockReader, nil), - mockReader.EXPECT().Close(), - mockPulsarNewClient.EXPECT().Close(), - ) - ret := GetPulsarCondition(ctx, logger, v1beta1.MilvusPulsar{}) - assert.Equal(t, corev1.ConditionTrue, ret.Status) - assert.Equal(t, v1beta1.ReasonMsgStreamReady, ret.Reason) - }) - -} - func getMockCheckMinIOFunc(err error) checkMinIOFunc { return func(external.CheckMinIOArgs) error { return err diff --git a/pkg/controllers/external_interfaces.go b/pkg/controllers/external_interfaces.go index 008406c..7fa0c4c 100644 --- a/pkg/controllers/external_interfaces.go +++ b/pkg/controllers/external_interfaces.go @@ -3,8 +3,6 @@ package controllers import ( "context" - "github.com/apache/pulsar-client-go/pulsar" - "github.com/go-logr/logr" clientv3 "go.etcd.io/etcd/client/v3" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -21,30 +19,6 @@ type K8sStatusClient interface { client.SubResourceWriter } -// Logger for mock -type Logger interface { - Enabled() bool - Error(err error, msg string, keysAndValues ...interface{}) - GetSink() logr.LogSink - Info(msg string, keysAndValues ...interface{}) - V(level int) logr.Logger - WithCallDepth(depth int) logr.Logger - WithCallStackHelper() (func(), logr.Logger) - WithName(name string) logr.Logger - WithSink(sink logr.LogSink) logr.Logger - WithValues(keysAndValues ...interface{}) logr.Logger -} - -// PulsarClient for mock -type PulsarClient interface { - pulsar.Client -} - -// PulsarReader for mock -type PulsarReader interface { - pulsar.Reader -} - // EtcdClient for mock type EtcdClient interface { Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) diff --git a/pkg/controllers/status_cluster.go b/pkg/controllers/status_cluster.go index 4864ea9..8765e3c 100644 --- a/pkg/controllers/status_cluster.go +++ b/pkg/controllers/status_cluster.go @@ -461,7 +461,7 @@ func (r *MilvusStatusSyncer) GetMsgStreamCondition( eps = mc.Spec.Dep.Kafka.BrokerList default: // default pulsar - getter = wrapPulsarConditonGetter(ctx, r.logger, mc.Spec.Dep.Pulsar) + getter = external.NewPulsarConditionGetter(&mc).GetCondition eps = []string{mc.Spec.Dep.Pulsar.Endpoint} } return GetCondition(getter, eps), nil diff --git a/pkg/controllers/status_cluster_test.go b/pkg/controllers/status_cluster_test.go index 8e55e7d..bf18d2c 100644 --- a/pkg/controllers/status_cluster_test.go +++ b/pkg/controllers/status_cluster_test.go @@ -440,8 +440,6 @@ func mockConditionGetter() v1beta1.MilvusCondition { func TestWrapGetter(t *testing.T) { var getter func() v1beta1.MilvusCondition - getter = wrapPulsarConditonGetter(nil, logr.Logger{}, v1beta1.MilvusPulsar{}) - assert.NotNil(t, getter) getter = wrapEtcdConditionGetter(nil, []string{}) assert.NotNil(t, getter) getter = wrapMinioConditionGetter(nil, logr.Logger{}, nil, StorageConditionInfo{}) diff --git a/pkg/external/pulsar.go b/pkg/external/pulsar.go new file mode 100644 index 0000000..34c3cf4 --- /dev/null +++ b/pkg/external/pulsar.go @@ -0,0 +1,96 @@ +package external + +import ( + "encoding/json" + "strings" + "time" + + "github.com/apache/pulsar-client-go/pulsar" + "github.com/apache/pulsar-client-go/pulsar/log" + "github.com/milvus-io/milvus-operator/apis/milvus.io/v1beta1" + "github.com/milvus-io/milvus-operator/pkg/util" + corev1 "k8s.io/api/core/v1" +) + +type ConditionGetter interface { + GetCondition() v1beta1.MilvusCondition +} + +type PulsarConditionGetter struct { + m *v1beta1.Milvus +} + +var _ ConditionGetter = &PulsarConditionGetter{} + +func NewPulsarConditionGetter(m *v1beta1.Milvus) *PulsarConditionGetter { + + return &PulsarConditionGetter{ + m: m, + } +} + +func newErrMsgStreamCondResult(reason, message string) v1beta1.MilvusCondition { + return v1beta1.MilvusCondition{ + Type: v1beta1.MsgStreamReady, + Status: corev1.ConditionFalse, + Reason: reason, + Message: message, + } +} + +var MQReadyCondition = v1beta1.MilvusCondition{ + Type: v1beta1.MsgStreamReady, + Status: corev1.ConditionTrue, + Reason: v1beta1.ReasonMsgStreamReady, +} + +func (p PulsarConditionGetter) GetCondition() v1beta1.MilvusCondition { + conf := p.m.Spec.Conf + authPlugin, _ := util.GetStringValue(conf.Data, "pulsar", "authPlugin") + authParams, _ := util.GetStringValue(conf.Data, "pulsar", "authParams") + endpoint := p.m.Spec.Dep.Pulsar.Endpoint + + // fomatter copied from: milvus code + // https://github.com/milvus-io/milvus/blob/3252d7a64ce47309feeffcb3caa8e201f083b770/pkg/util/paramtable/service_param.go#L739-L750 + formatter := func(authParams string) string { + jsonMap := make(map[string]string) + params := strings.Split(authParams, ",") + for _, param := range params { + kv := strings.Split(param, ":") + if len(kv) == 2 { + jsonMap[kv[0]] = kv[1] + } + } + + jsonData, _ := json.Marshal(&jsonMap) + return string(jsonData) + } + + authn, err := pulsar.NewAuthentication(authPlugin, formatter(authParams)) + if err != nil { + return newErrMsgStreamCondResult("InitClientAuth", err.Error()) + } + + client, err := pulsarNewClient(pulsar.ClientOptions{ + URL: "pulsar://" + endpoint, + ConnectionTimeout: 2 * time.Second, + OperationTimeout: 3 * time.Second, + Logger: log.DefaultNopLogger(), + Authentication: authn, + }) + + if err != nil { + return newErrMsgStreamCondResult("CreateClientFailed", err.Error()) + } + defer client.Close() + + reader, err := client.CreateReader(pulsar.ReaderOptions{ + Topic: "milvus-operator-topic", + StartMessageID: pulsar.EarliestMessageID(), + }) + if err != nil { + return newErrMsgStreamCondResult("ConnectionFailed", err.Error()) + } + defer reader.Close() + return MQReadyCondition +} diff --git a/pkg/external/pulsar_client.go b/pkg/external/pulsar_client.go new file mode 100644 index 0000000..08ceea3 --- /dev/null +++ b/pkg/external/pulsar_client.go @@ -0,0 +1,17 @@ +package external + +import "github.com/apache/pulsar-client-go/pulsar" + +//go:generate mockgen -package=external -source=pulsar_client.go -destination=pulsar_client_mock.go + +var pulsarNewClient = pulsar.NewClient + +// PulsarClient for mock +type PulsarClient interface { + pulsar.Client +} + +// PulsarReader for mock +type PulsarReader interface { + pulsar.Reader +} diff --git a/pkg/controllers/log.go b/pkg/external/pulsar_log.go similarity index 78% rename from pkg/controllers/log.go rename to pkg/external/pulsar_log.go index 2ac5a30..e364d40 100644 --- a/pkg/controllers/log.go +++ b/pkg/external/pulsar_log.go @@ -1,4 +1,4 @@ -package controllers +package external import ( "fmt" @@ -7,6 +7,20 @@ import ( "github.com/go-logr/logr" ) +// Logger for mock +type Logger interface { + Enabled() bool + Error(err error, msg string, keysAndValues ...interface{}) + GetSink() logr.LogSink + Info(msg string, keysAndValues ...interface{}) + V(level int) logr.Logger + WithCallDepth(depth int) logr.Logger + WithCallStackHelper() (func(), logr.Logger) + WithName(name string) logr.Logger + WithSink(sink logr.LogSink) logr.Logger + WithValues(keysAndValues ...interface{}) logr.Logger +} + type pulsarLog struct { logger logr.Logger } diff --git a/pkg/external/pulsar_test.go b/pkg/external/pulsar_test.go new file mode 100644 index 0000000..cfa3606 --- /dev/null +++ b/pkg/external/pulsar_test.go @@ -0,0 +1,104 @@ +package external + +import ( + "testing" + + "github.com/apache/pulsar-client-go/pulsar" + "github.com/milvus-io/milvus-operator/apis/milvus.io/v1beta1" + "github.com/pkg/errors" + "github.com/prashantv/gostub" + "github.com/stretchr/testify/assert" + "go.uber.org/mock/gomock" + corev1 "k8s.io/api/core/v1" +) + +func getMockPulsarNewClient(cli pulsar.Client, err error) func(options pulsar.ClientOptions) (pulsar.Client, error) { + return func(options pulsar.ClientOptions) (pulsar.Client, error) { + return cli, err + } +} + +func TestGetPulsarCondition(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockPulsarNewClient := NewMockPulsarClient(ctrl) + errTest := errors.New("test") + + m := &v1beta1.Milvus{} + getter := NewPulsarConditionGetter(m) + + t.Run("new client failed, no err", func(t *testing.T) { + stubs := gostub.Stub(&pulsarNewClient, getMockPulsarNewClient(mockPulsarNewClient, errTest)) + defer stubs.Reset() + ret := getter.GetCondition() + assert.Equal(t, corev1.ConditionFalse, ret.Status) + assert.Equal(t, "CreateClientFailed", ret.Reason) + }) + + t.Run("new client ok, create read failed, no err", func(t *testing.T) { + stubs := gostub.Stub(&pulsarNewClient, getMockPulsarNewClient(mockPulsarNewClient, nil)) + defer stubs.Reset() + gomock.InOrder( + mockPulsarNewClient.EXPECT().CreateReader(gomock.Any()).Return(nil, errTest), + mockPulsarNewClient.EXPECT().Close(), + ) + ret := getter.GetCondition() + assert.Equal(t, corev1.ConditionFalse, ret.Status) + assert.Equal(t, "ConnectionFailed", ret.Reason) + }) + + t.Run("new client ok, create read ok, no err", func(t *testing.T) { + stubs := gostub.Stub(&pulsarNewClient, getMockPulsarNewClient(mockPulsarNewClient, nil)) + defer stubs.Reset() + mockReader := NewMockPulsarReader(ctrl) + gomock.InOrder( + mockPulsarNewClient.EXPECT().CreateReader(gomock.Any()).Return(mockReader, nil), + mockReader.EXPECT().Close(), + mockPulsarNewClient.EXPECT().Close(), + ) + ret := getter.GetCondition() + assert.Equal(t, corev1.ConditionTrue, ret.Status) + assert.Equal(t, v1beta1.ReasonMsgStreamReady, ret.Reason) + }) + + t.Run("with auth failed", func(t *testing.T) { + m.Spec.Conf.FromObject(map[string]interface{}{ + "pulsar": map[string]interface{}{ + "authPlugin": "org.apache.pulsar.client.impl.auth.AuthenticationBasic", + "authParams": "bad", + }, + }) + stubs := gostub.Stub(&pulsarNewClient, getMockPulsarNewClient(mockPulsarNewClient, nil)) + defer stubs.Reset() + mockReader := NewMockPulsarReader(ctrl) + gomock.InOrder( + mockPulsarNewClient.EXPECT().CreateReader(gomock.Any()).Return(mockReader, nil), + mockReader.EXPECT().Close(), + mockPulsarNewClient.EXPECT().Close(), + ) + ret := getter.GetCondition() + assert.Equal(t, corev1.ConditionTrue, ret.Status) + assert.Equal(t, v1beta1.ReasonMsgStreamReady, ret.Reason) + }) + + t.Run("with auth ok", func(t *testing.T) { + m.Spec.Conf.FromObject(map[string]interface{}{ + "pulsar": map[string]interface{}{ + "authPlugin": "token", + "authParams": "file:/path/to/token", + }, + }) + stubs := gostub.Stub(&pulsarNewClient, getMockPulsarNewClient(mockPulsarNewClient, nil)) + defer stubs.Reset() + mockReader := NewMockPulsarReader(ctrl) + gomock.InOrder( + mockPulsarNewClient.EXPECT().CreateReader(gomock.Any()).Return(mockReader, nil), + mockReader.EXPECT().Close(), + mockPulsarNewClient.EXPECT().Close(), + ) + ret := getter.GetCondition() + assert.Equal(t, corev1.ConditionTrue, ret.Status) + assert.Equal(t, v1beta1.ReasonMsgStreamReady, ret.Reason, "err", ret.Message) + }) +}