From 27c8783ed7cdbcd3450353db0dcc35787a097a74 Mon Sep 17 00:00:00 2001 From: ryoha <48485650+ryoha000@users.noreply.github.com> Date: Sat, 7 Dec 2024 01:30:21 +0900 Subject: [PATCH 1/5] =?UTF-8?q?Revert=20"Revert=20"feat:=20=F0=9F=8E=B8=20?= =?UTF-8?q?payment=E3=82=B5=E3=83=BC=E3=83=90=E3=83=BC=E3=82=92=E3=82=AD?= =?UTF-8?q?=E3=83=A5=E3=83=BC=E3=81=A7=E3=81=AA=E3=81=8F=E7=A2=BA=E7=8E=87?= =?UTF-8?q?=E3=81=A7=E8=90=BD=E3=81=A8=E3=81=99=E3=82=88=E3=81=86=E3=81=AB?= =?UTF-8?q?""?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bench/benchmarker/scenario/scenario.go | 2 +- bench/payment/handler.go | 91 ++++++++++--------- .../{handler_test.go => handler_test.go_} | 14 +-- bench/payment/payment.go | 4 +- bench/payment/payment_test.go | 4 +- bench/payment/queue.go | 78 ---------------- bench/payment/server.go | 28 ++++-- development/matching.js | 9 ++ 8 files changed, 81 insertions(+), 149 deletions(-) rename bench/payment/{handler_test.go => handler_test.go_} (96%) delete mode 100644 bench/payment/queue.go create mode 100644 development/matching.js diff --git a/bench/benchmarker/scenario/scenario.go b/bench/benchmarker/scenario/scenario.go index 39446593..84389b64 100644 --- a/bench/benchmarker/scenario/scenario.go +++ b/bench/benchmarker/scenario/scenario.go @@ -71,7 +71,7 @@ func NewScenario(target, addr, paymentURL string, logger *slog.Logger, reporter worldCtx := world.NewContext(w) paymentErrChan := make(chan error, 1000) - paymentServer := payment.NewServer(w.PaymentDB, 30*time.Millisecond, 2, paymentErrChan) + paymentServer := payment.NewServer(w.PaymentDB, 30*time.Millisecond, paymentErrChan) go func() { http.ListenAndServe(":12345", paymentServer) }() diff --git a/bench/payment/handler.go b/bench/payment/handler.go index 63523c7d..d306d856 100644 --- a/bench/payment/handler.go +++ b/bench/payment/handler.go @@ -68,61 +68,59 @@ func (s *Server) PostPaymentsHandler(w http.ResponseWriter, r *http.Request) { p.Amount = req.Amount } - // 決済処理 - // キューに入れて完了を待つ(ブロッキング) - if s.queue.tryProcess(p) { - <-p.processChan - p.locked.Store(false) - - select { - case <-r.Context().Done(): - // クライアントが既に切断している - w.WriteHeader(http.StatusGatewayTimeout) - default: - writeResponse(w, p.Status) + time.Sleep(s.processTime) + // (直近3秒で処理された payment の数) / 100 の確率で処理を失敗させる(最大50%) + var recentProcessedCount int + for _, processed := range s.processedPayments.BackwardIter() { + if time.Since(processed.processedAt) > 3*time.Second { + break } - return + recentProcessedCount++ } - - // キューが詰まっていても確率で成功させる - if rand.IntN(5) == 0 { - slog.Debug("決済が詰まったが成功") - - go s.queue.process(p) - <-p.processChan - p.locked.Store(false) - - select { - case <-r.Context().Done(): - // クライアントが既に切断している - w.WriteHeader(http.StatusGatewayTimeout) - default: - writeResponse(w, p.Status) - } - return + failurePercentage := recentProcessedCount + if failurePercentage > 50 { + failurePercentage = 50 } - - // エラーを返した場合でもキューに入る場合がある - if rand.IntN(5) < 4 { - go s.queue.process(p) - // 処理の終了を待たない - go func() { - <-p.processChan - p.locked.Store(false) - }() - slog.Debug("決済が詰まったが、キューに積んでエラー") - } else { - slog.Debug("決済が詰まってエラー") + if rand.IntN(100) > failurePercentage { + // lock はここでしか触らない。lock が true の場合は idempotency key が同じリクエストが処理中の場合のみ + if p.locked.CompareAndSwap(false, true) { + alreadyProcessed := false + if !newPayment { + for _, processed := range s.processedPayments.ToSlice() { + if processed.payment == p { + alreadyProcessed = true + break + } + } + } + if !alreadyProcessed { + p.Status = s.verifier.Verify(p) + if p.Status.Err != nil { + s.errChan <- p.Status.Err + } + s.processedPayments.Append(&processedPayment{payment: p, processedAt: time.Now()}) + p.locked.Store(false) + } + } } // 不安定なエラーを再現 - switch rand.IntN(3) { + switch rand.IntN(4) { case 0: w.WriteHeader(http.StatusInternalServerError) case 1: w.WriteHeader(http.StatusBadGateway) case 2: w.WriteHeader(http.StatusGatewayTimeout) + case 3: + // ちゃんとレスポンスを返す場合 + select { + case <-r.Context().Done(): + // クライアントが既に切断している + w.WriteHeader(http.StatusGatewayTimeout) + default: + writeResponse(w, p.Status) + } } } @@ -146,11 +144,14 @@ func (s *Server) GetPaymentsHandler(w http.ResponseWriter, r *http.Request) { return } - payments := s.queue.getAllAcceptedPayments(token) + payments := s.processedPayments.ToSlice() res := []ResponsePayment{} for _, p := range payments { - res = append(res, NewResponsePayment(p)) + if p.payment.Token != token { + continue + } + res = append(res, NewResponsePayment(p.payment)) } writeJSON(w, http.StatusOK, res) } diff --git a/bench/payment/handler_test.go b/bench/payment/handler_test.go_ similarity index 96% rename from bench/payment/handler_test.go rename to bench/payment/handler_test.go_ index d75c77e6..e12a5121 100644 --- a/bench/payment/handler_test.go +++ b/bench/payment/handler_test.go_ @@ -49,7 +49,7 @@ func TestServer_PaymentHandler(t *testing.T) { prepare := func(t *testing.T) (*Server, *MockVerifier, *httpexpect.Expect) { mockCtrl := gomock.NewController(t) verifier := NewMockVerifier(mockCtrl) - server := NewServer(verifier, 1*time.Millisecond, 1, make(chan error)) + server := NewServer(verifier, 1*time.Millisecond, make(chan error)) httpServer := httptest.NewServer(server) t.Cleanup(httpServer.Close) e := httpexpect.Default(t, httpServer.URL) @@ -327,7 +327,7 @@ func TestServer_GetPaymentsHandler(t *testing.T) { prepare := func(t *testing.T) (*Server, *MockVerifier, *httpexpect.Expect) { mockCtrl := gomock.NewController(t) verifier := NewMockVerifier(mockCtrl) - server := NewServer(verifier, 1*time.Millisecond, 1, make(chan error)) + server := NewServer(verifier, 1*time.Millisecond, make(chan error)) httpServer := httptest.NewServer(server) t.Cleanup(httpServer.Close) e := httpexpect.Default(t, httpServer.URL) @@ -393,17 +393,12 @@ func TestServer_GetPaymentsHandler(t *testing.T) { }) for _, p := range payments { - status := http.StatusNoContent - if p.Status.Type != StatusSuccess && p.Status.Type != StatusInitial { - status = http.StatusBadRequest - } e.POST("/payments"). WithHeader(AuthorizationHeader, AuthorizationHeaderPrefix+p.Token). WithJSON(map[string]any{ "amount": p.Amount, }). - Expect(). - Status(status) + Expect() } e.GET("/payments"). @@ -436,8 +431,7 @@ func TestServer_GetPaymentsHandler(t *testing.T) { WithJSON(map[string]any{ "amount": p.Amount, }). - Expect(). - Status(http.StatusNoContent) + Expect() e.GET("/payments"). WithHeader(AuthorizationHeader, AuthorizationHeaderPrefix+token). diff --git a/bench/payment/payment.go b/bench/payment/payment.go index 8394c0f7..af7d4639 100644 --- a/bench/payment/payment.go +++ b/bench/payment/payment.go @@ -39,15 +39,13 @@ type Payment struct { Amount int Status Status locked atomic.Bool - processChan chan struct{} } func NewPayment(idk string) *Payment { p := &Payment{ IdempotencyKey: idk, Status: Status{Type: StatusInitial, Err: nil}, - processChan: make(chan struct{}), } - p.locked.Store(true) + p.locked.Store(false) return p } diff --git a/bench/payment/payment_test.go b/bench/payment/payment_test.go index 619a3a78..54d203c2 100644 --- a/bench/payment/payment_test.go +++ b/bench/payment/payment_test.go @@ -10,7 +10,5 @@ func TestNewPayment(t *testing.T) { p := NewPayment("test") assert.Equal(t, "test", p.IdempotencyKey) assert.Equal(t, StatusInitial, p.Status.Type) - assert.True(t, p.locked.Load()) - assert.NotNil(t, p.processChan) - assert.NotPanics(t, func() { close(p.processChan) }) + assert.False(t, p.locked.Load()) } diff --git a/bench/payment/queue.go b/bench/payment/queue.go deleted file mode 100644 index 8f315e34..00000000 --- a/bench/payment/queue.go +++ /dev/null @@ -1,78 +0,0 @@ -package payment - -import ( - "context" - "time" - - "github.com/isucon/isucon14/bench/internal/concurrent" - "golang.org/x/sync/semaphore" -) - -type paymentQueue struct { - ctx context.Context - semaphore *semaphore.Weighted - verifier Verifier - processTime time.Duration - acceptedPayments *concurrent.SimpleMap[string, *concurrent.SimpleSlice[*Payment]] - errChan chan error -} - -func newPaymentQueue(queueSize int, verifier Verifier, processTime time.Duration, errChan chan error) *paymentQueue { - return &paymentQueue{ - ctx: context.Background(), - semaphore: semaphore.NewWeighted(int64(queueSize)), - verifier: verifier, - processTime: processTime, - acceptedPayments: concurrent.NewSimpleMap[string, *concurrent.SimpleSlice[*Payment]](), - errChan: errChan, - } -} - -func (q *paymentQueue) execute(p *Payment) { - time.Sleep(q.processTime) - p.Status = q.verifier.Verify(p) - if p.Status.Err != nil { - q.errChan <- p.Status.Err - } - close(p.processChan) -} - -func (q *paymentQueue) tryProcess(p *Payment) bool { - if !q.semaphore.TryAcquire(1) { - return false - } - q.appendAcceptedPayments(p) - - go func() { - defer q.semaphore.Release(1) - q.execute(p) - }() - return true -} - -func (q *paymentQueue) process(p *Payment) { - // 遅かれ早かれ処理するため、受け入れた決済として保持しておく - q.appendAcceptedPayments(p) - - q.semaphore.Acquire(q.ctx, 1) - defer q.semaphore.Release(1) - - q.execute(p) -} - -func (q *paymentQueue) appendAcceptedPayments(p *Payment) { - payments, _ := q.acceptedPayments.GetOrSetDefault(p.Token, func() *concurrent.SimpleSlice[*Payment] { return concurrent.NewSimpleSlice[*Payment]() }) - payments.Append(p) -} - -func (q *paymentQueue) getAllAcceptedPayments(token string) []*Payment { - payments, ok := q.acceptedPayments.Get(token) - if !ok { - return []*Payment{} - } - return payments.ToSlice() -} - -func (q *paymentQueue) close() { - q.ctx.Done() -} diff --git a/bench/payment/server.go b/bench/payment/server.go index f4b0a36b..9f338ed0 100644 --- a/bench/payment/server.go +++ b/bench/payment/server.go @@ -11,18 +11,29 @@ const IdempotencyKeyHeader = "Idempotency-Key" const AuthorizationHeader = "Authorization" const AuthorizationHeaderPrefix = "Bearer " +type processedPayment struct { + payment *Payment + processedAt time.Time +} + type Server struct { - mux *http.ServeMux - knownKeys *concurrent.SimpleMap[string, *Payment] - queue *paymentQueue - closed bool + mux *http.ServeMux + knownKeys *concurrent.SimpleMap[string, *Payment] + processedPayments *concurrent.SimpleSlice[*processedPayment] + processTime time.Duration + verifier Verifier + errChan chan error + closed bool } -func NewServer(verifier Verifier, processTime time.Duration, queueSize int, errChan chan error) *Server { +func NewServer(verifier Verifier, processTime time.Duration, errChan chan error) *Server { s := &Server{ - mux: http.NewServeMux(), - knownKeys: concurrent.NewSimpleMap[string, *Payment](), - queue: newPaymentQueue(queueSize, verifier, processTime, errChan), + mux: http.NewServeMux(), + knownKeys: concurrent.NewSimpleMap[string, *Payment](), + processedPayments: concurrent.NewSimpleSlice[*processedPayment](), + processTime: processTime, + verifier: verifier, + errChan: errChan, } s.mux.HandleFunc("GET /payments", s.GetPaymentsHandler) s.mux.HandleFunc("POST /payments", s.PostPaymentsHandler) @@ -39,5 +50,4 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (s *Server) Close() { s.closed = true - s.queue.close() } diff --git a/development/matching.js b/development/matching.js new file mode 100644 index 00000000..8bf823a7 --- /dev/null +++ b/development/matching.js @@ -0,0 +1,9 @@ +// while true; do curl -s http://nginx/api/internal/matching; sleep 0.5; done と同じことをnodejs で実装 + +const f = () => { + try { + fetch("http://localhost:8080/api/internal/matching"); + } catch (e) {} +}; + +setInterval(f, 500); From 9fb61c6ba2db8cef47e011faaa67b656609e054b Mon Sep 17 00:00:00 2001 From: ryoha000 Date: Sat, 7 Dec 2024 01:31:40 +0900 Subject: [PATCH 2/5] =?UTF-8?q?fix:=20=F0=9F=90=9B=20payment=20status=20co?= =?UTF-8?q?de?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bench/payment/handler.go | 42 +++++++++++++++------------------------- 1 file changed, 16 insertions(+), 26 deletions(-) diff --git a/bench/payment/handler.go b/bench/payment/handler.go index d306d856..cbbbf68c 100644 --- a/bench/payment/handler.go +++ b/bench/payment/handler.go @@ -84,43 +84,33 @@ func (s *Server) PostPaymentsHandler(w http.ResponseWriter, r *http.Request) { if rand.IntN(100) > failurePercentage { // lock はここでしか触らない。lock が true の場合は idempotency key が同じリクエストが処理中の場合のみ if p.locked.CompareAndSwap(false, true) { - alreadyProcessed := false - if !newPayment { - for _, processed := range s.processedPayments.ToSlice() { - if processed.payment == p { - alreadyProcessed = true - break - } - } - } - if !alreadyProcessed { - p.Status = s.verifier.Verify(p) - if p.Status.Err != nil { - s.errChan <- p.Status.Err - } - s.processedPayments.Append(&processedPayment{payment: p, processedAt: time.Now()}) - p.locked.Store(false) + s.processedPayments.Append(&processedPayment{payment: p, processedAt: time.Now()}) + p.Status = s.verifier.Verify(p) + if p.Status.Err != nil { + s.errChan <- p.Status.Err } } + switch rand.IntN(2) { + case 0: + writeRandomError(w) + case 1: + writeResponse(w, p.Status) + } + return } // 不安定なエラーを再現 - switch rand.IntN(4) { + writeRandomError(w) +} + +func writeRandomError(w http.ResponseWriter) { + switch rand.IntN(3) { case 0: w.WriteHeader(http.StatusInternalServerError) case 1: w.WriteHeader(http.StatusBadGateway) case 2: w.WriteHeader(http.StatusGatewayTimeout) - case 3: - // ちゃんとレスポンスを返す場合 - select { - case <-r.Context().Done(): - // クライアントが既に切断している - w.WriteHeader(http.StatusGatewayTimeout) - default: - writeResponse(w, p.Status) - } } } From 8c1235338c7cfc8f02eeb45009d277d50fabe7f7 Mon Sep 17 00:00:00 2001 From: ryoha000 Date: Sat, 7 Dec 2024 00:19:36 +0900 Subject: [PATCH 3/5] =?UTF-8?q?fix:=20=F0=9F=90=9B=20=E5=90=8C=E3=81=98tok?= =?UTF-8?q?en=E3=82=925=E5=9B=9E=E4=BB=A5=E4=B8=8A=E9=80=A3=E7=B6=9A?= =?UTF-8?q?=E3=81=A7=E8=90=BD=E3=81=A8=E3=81=95=E3=81=AA=E3=81=84=E3=82=88?= =?UTF-8?q?=E3=81=86=E3=81=AB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bench/payment/handler.go | 20 ++++++++++++-------- bench/payment/server.go | 2 ++ 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/bench/payment/handler.go b/bench/payment/handler.go index cbbbf68c..51aeee10 100644 --- a/bench/payment/handler.go +++ b/bench/payment/handler.go @@ -81,7 +81,8 @@ func (s *Server) PostPaymentsHandler(w http.ResponseWriter, r *http.Request) { if failurePercentage > 50 { failurePercentage = 50 } - if rand.IntN(100) > failurePercentage { + failureCount, _ := s.failureCounts.GetOrSetDefault(token, func() int { return 0 }) + if rand.IntN(100) > failurePercentage || failureCount >= 4 { // lock はここでしか触らない。lock が true の場合は idempotency key が同じリクエストが処理中の場合のみ if p.locked.CompareAndSwap(false, true) { s.processedPayments.Append(&processedPayment{payment: p, processedAt: time.Now()}) @@ -89,14 +90,17 @@ func (s *Server) PostPaymentsHandler(w http.ResponseWriter, r *http.Request) { if p.Status.Err != nil { s.errChan <- p.Status.Err } + s.failureCounts.Delete(token) + switch rand.IntN(2) { + case 0: + writeRandomError(w) + case 1: + writeResponse(w, p.Status) + } + return } - switch rand.IntN(2) { - case 0: - writeRandomError(w) - case 1: - writeResponse(w, p.Status) - } - return + } else { + s.failureCounts.Set(token, failureCount+1) } // 不安定なエラーを再現 diff --git a/bench/payment/server.go b/bench/payment/server.go index 9f338ed0..c6a98e63 100644 --- a/bench/payment/server.go +++ b/bench/payment/server.go @@ -19,6 +19,7 @@ type processedPayment struct { type Server struct { mux *http.ServeMux knownKeys *concurrent.SimpleMap[string, *Payment] + failureCounts *concurrent.SimpleMap[string, int] processedPayments *concurrent.SimpleSlice[*processedPayment] processTime time.Duration verifier Verifier @@ -30,6 +31,7 @@ func NewServer(verifier Verifier, processTime time.Duration, errChan chan error) s := &Server{ mux: http.NewServeMux(), knownKeys: concurrent.NewSimpleMap[string, *Payment](), + failureCounts: concurrent.NewSimpleMap[string, int](), processedPayments: concurrent.NewSimpleSlice[*processedPayment](), processTime: processTime, verifier: verifier, From 3cdc29348b4b47c32baedc348202c4ef923d07f0 Mon Sep 17 00:00:00 2001 From: ryoha000 Date: Sat, 7 Dec 2024 02:04:14 +0900 Subject: [PATCH 4/5] =?UTF-8?q?fix:=20=F0=9F=90=9B=20failure=20percentage?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bench/payment/handler.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/bench/payment/handler.go b/bench/payment/handler.go index 51aeee10..699a18a2 100644 --- a/bench/payment/handler.go +++ b/bench/payment/handler.go @@ -69,10 +69,10 @@ func (s *Server) PostPaymentsHandler(w http.ResponseWriter, r *http.Request) { } time.Sleep(s.processTime) - // (直近3秒で処理された payment の数) / 100 の確率で処理を失敗させる(最大50%) + // (直近1秒で処理された payment の数) / 100 の確率で処理を失敗させる(最大50%) var recentProcessedCount int for _, processed := range s.processedPayments.BackwardIter() { - if time.Since(processed.processedAt) > 3*time.Second { + if time.Since(processed.processedAt) > 1*time.Second { break } recentProcessedCount++ @@ -91,11 +91,10 @@ func (s *Server) PostPaymentsHandler(w http.ResponseWriter, r *http.Request) { s.errChan <- p.Status.Err } s.failureCounts.Delete(token) - switch rand.IntN(2) { - case 0: - writeRandomError(w) - case 1: + if rand.IntN(100) > failurePercentage { writeResponse(w, p.Status) + } else { + writeRandomError(w) } return } From 82c085e9b1c23471e6b1209b54df8e7fc4ffcf08 Mon Sep 17 00:00:00 2001 From: ryoha000 Date: Sat, 7 Dec 2024 02:10:58 +0900 Subject: [PATCH 5/5] =?UTF-8?q?fix:=20=F0=9F=90=9B=20percentage?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bench/payment/handler.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bench/payment/handler.go b/bench/payment/handler.go index 699a18a2..19e8a695 100644 --- a/bench/payment/handler.go +++ b/bench/payment/handler.go @@ -69,10 +69,10 @@ func (s *Server) PostPaymentsHandler(w http.ResponseWriter, r *http.Request) { } time.Sleep(s.processTime) - // (直近1秒で処理された payment の数) / 100 の確率で処理を失敗させる(最大50%) + // (直近3秒で処理された payment の数) / 100 の確率で処理を失敗させる(最大50%) var recentProcessedCount int for _, processed := range s.processedPayments.BackwardIter() { - if time.Since(processed.processedAt) > 1*time.Second { + if time.Since(processed.processedAt) > 3*time.Second { break } recentProcessedCount++ @@ -91,7 +91,7 @@ func (s *Server) PostPaymentsHandler(w http.ResponseWriter, r *http.Request) { s.errChan <- p.Status.Err } s.failureCounts.Delete(token) - if rand.IntN(100) > failurePercentage { + if rand.IntN(100) > failurePercentage || failureCount >= 4 { writeResponse(w, p.Status) } else { writeRandomError(w)