From e9fd7406b7797f3e8bacf47430bb722ce3641d1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BF=A0=20/=20green?= Date: Sat, 7 Dec 2024 12:42:10 +0900 Subject: [PATCH] =?UTF-8?q?Revert=20"Revert=20"Revert=20"feat:=20?= =?UTF-8?q?=F0=9F=8E=B8=20payment=E3=82=B5=E3=83=BC=E3=83=90=E3=83=BC?= =?UTF-8?q?=E3=82=92=E3=82=AD=E3=83=A5=E3=83=BC=E3=81=A7=E3=81=AA=E3=81=8F?= =?UTF-8?q?=E7=A2=BA=E7=8E=87=E3=81=A7=E8=90=BD=E3=81=A8=E3=81=99=E3=82=88?= =?UTF-8?q?=E3=81=86=E3=81=AB"""?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bench/benchmarker/scenario/scenario.go | 2 +- bench/payment/handler.go | 80 ++++++++++--------- .../{handler_test.go_ => handler_test.go} | 14 +++- bench/payment/payment.go | 4 +- bench/payment/payment_test.go | 4 +- bench/payment/queue.go | 78 ++++++++++++++++++ bench/payment/server.go | 30 +++---- development/matching.js | 9 --- 8 files changed, 147 insertions(+), 74 deletions(-) rename bench/payment/{handler_test.go_ => handler_test.go} (96%) create mode 100644 bench/payment/queue.go delete mode 100644 development/matching.js diff --git a/bench/benchmarker/scenario/scenario.go b/bench/benchmarker/scenario/scenario.go index 95a42375..66dfd5b2 100644 --- a/bench/benchmarker/scenario/scenario.go +++ b/bench/benchmarker/scenario/scenario.go @@ -73,7 +73,7 @@ func NewScenario(target, addr, paymentURL string, logger *slog.Logger, reporter worldCtx := world.NewContext(w) paymentErrChan := make(chan error, 1000) - paymentServer := payment.NewServer(w.PaymentDB, 30*time.Millisecond, paymentErrChan) + paymentServer := payment.NewServer(w.PaymentDB, 30*time.Millisecond, 2, paymentErrChan) go func() { http.ListenAndServe(":12345", paymentServer) }() diff --git a/bench/payment/handler.go b/bench/payment/handler.go index 19e8a695..63523c7d 100644 --- a/bench/payment/handler.go +++ b/bench/payment/handler.go @@ -68,45 +68,54 @@ func (s *Server) PostPaymentsHandler(w http.ResponseWriter, r *http.Request) { p.Amount = req.Amount } - time.Sleep(s.processTime) - // (直近3秒で処理された payment の数) / 100 の確率で処理を失敗させる(最大50%) - var recentProcessedCount int - for _, processed := range s.processedPayments.BackwardIter() { - if time.Since(processed.processedAt) > 3*time.Second { - break + // 決済処理 + // キューに入れて完了を待つ(ブロッキング) + if s.queue.tryProcess(p) { + <-p.processChan + p.locked.Store(false) + + select { + case <-r.Context().Done(): + // クライアントが既に切断している + w.WriteHeader(http.StatusGatewayTimeout) + default: + writeResponse(w, p.Status) } - recentProcessedCount++ - } - failurePercentage := recentProcessedCount - if failurePercentage > 50 { - failurePercentage = 50 + return } - failureCount, _ := s.failureCounts.GetOrSetDefault(token, func() int { return 0 }) - if rand.IntN(100) > failurePercentage || failureCount >= 4 { - // lock はここでしか触らない。lock が true の場合は idempotency key が同じリクエストが処理中の場合のみ - if p.locked.CompareAndSwap(false, true) { - s.processedPayments.Append(&processedPayment{payment: p, processedAt: time.Now()}) - p.Status = s.verifier.Verify(p) - if p.Status.Err != nil { - s.errChan <- p.Status.Err - } - s.failureCounts.Delete(token) - if rand.IntN(100) > failurePercentage || failureCount >= 4 { - writeResponse(w, p.Status) - } else { - writeRandomError(w) - } - return + + // キューが詰まっていても確率で成功させる + if rand.IntN(5) == 0 { + slog.Debug("決済が詰まったが成功") + + go s.queue.process(p) + <-p.processChan + p.locked.Store(false) + + select { + case <-r.Context().Done(): + // クライアントが既に切断している + w.WriteHeader(http.StatusGatewayTimeout) + default: + writeResponse(w, p.Status) } + return + } + + // エラーを返した場合でもキューに入る場合がある + if rand.IntN(5) < 4 { + go s.queue.process(p) + // 処理の終了を待たない + go func() { + <-p.processChan + p.locked.Store(false) + }() + slog.Debug("決済が詰まったが、キューに積んでエラー") } else { - s.failureCounts.Set(token, failureCount+1) + slog.Debug("決済が詰まってエラー") } // 不安定なエラーを再現 - writeRandomError(w) -} - -func writeRandomError(w http.ResponseWriter) { switch rand.IntN(3) { case 0: w.WriteHeader(http.StatusInternalServerError) @@ -137,14 +146,11 @@ func (s *Server) GetPaymentsHandler(w http.ResponseWriter, r *http.Request) { return } - payments := s.processedPayments.ToSlice() + payments := s.queue.getAllAcceptedPayments(token) res := []ResponsePayment{} for _, p := range payments { - if p.payment.Token != token { - continue - } - res = append(res, NewResponsePayment(p.payment)) + res = append(res, NewResponsePayment(p)) } writeJSON(w, http.StatusOK, res) } diff --git a/bench/payment/handler_test.go_ b/bench/payment/handler_test.go similarity index 96% rename from bench/payment/handler_test.go_ rename to bench/payment/handler_test.go index e12a5121..d75c77e6 100644 --- a/bench/payment/handler_test.go_ +++ b/bench/payment/handler_test.go @@ -49,7 +49,7 @@ func TestServer_PaymentHandler(t *testing.T) { prepare := func(t *testing.T) (*Server, *MockVerifier, *httpexpect.Expect) { mockCtrl := gomock.NewController(t) verifier := NewMockVerifier(mockCtrl) - server := NewServer(verifier, 1*time.Millisecond, make(chan error)) + server := NewServer(verifier, 1*time.Millisecond, 1, make(chan error)) httpServer := httptest.NewServer(server) t.Cleanup(httpServer.Close) e := httpexpect.Default(t, httpServer.URL) @@ -327,7 +327,7 @@ func TestServer_GetPaymentsHandler(t *testing.T) { prepare := func(t *testing.T) (*Server, *MockVerifier, *httpexpect.Expect) { mockCtrl := gomock.NewController(t) verifier := NewMockVerifier(mockCtrl) - server := NewServer(verifier, 1*time.Millisecond, make(chan error)) + server := NewServer(verifier, 1*time.Millisecond, 1, make(chan error)) httpServer := httptest.NewServer(server) t.Cleanup(httpServer.Close) e := httpexpect.Default(t, httpServer.URL) @@ -393,12 +393,17 @@ func TestServer_GetPaymentsHandler(t *testing.T) { }) for _, p := range payments { + status := http.StatusNoContent + if p.Status.Type != StatusSuccess && p.Status.Type != StatusInitial { + status = http.StatusBadRequest + } e.POST("/payments"). WithHeader(AuthorizationHeader, AuthorizationHeaderPrefix+p.Token). WithJSON(map[string]any{ "amount": p.Amount, }). - Expect() + Expect(). + Status(status) } e.GET("/payments"). @@ -431,7 +436,8 @@ func TestServer_GetPaymentsHandler(t *testing.T) { WithJSON(map[string]any{ "amount": p.Amount, }). - Expect() + Expect(). + Status(http.StatusNoContent) e.GET("/payments"). WithHeader(AuthorizationHeader, AuthorizationHeaderPrefix+token). diff --git a/bench/payment/payment.go b/bench/payment/payment.go index af7d4639..8394c0f7 100644 --- a/bench/payment/payment.go +++ b/bench/payment/payment.go @@ -39,13 +39,15 @@ type Payment struct { Amount int Status Status locked atomic.Bool + processChan chan struct{} } func NewPayment(idk string) *Payment { p := &Payment{ IdempotencyKey: idk, Status: Status{Type: StatusInitial, Err: nil}, + processChan: make(chan struct{}), } - p.locked.Store(false) + p.locked.Store(true) return p } diff --git a/bench/payment/payment_test.go b/bench/payment/payment_test.go index 54d203c2..619a3a78 100644 --- a/bench/payment/payment_test.go +++ b/bench/payment/payment_test.go @@ -10,5 +10,7 @@ func TestNewPayment(t *testing.T) { p := NewPayment("test") assert.Equal(t, "test", p.IdempotencyKey) assert.Equal(t, StatusInitial, p.Status.Type) - assert.False(t, p.locked.Load()) + assert.True(t, p.locked.Load()) + assert.NotNil(t, p.processChan) + assert.NotPanics(t, func() { close(p.processChan) }) } diff --git a/bench/payment/queue.go b/bench/payment/queue.go new file mode 100644 index 00000000..8f315e34 --- /dev/null +++ b/bench/payment/queue.go @@ -0,0 +1,78 @@ +package payment + +import ( + "context" + "time" + + "github.com/isucon/isucon14/bench/internal/concurrent" + "golang.org/x/sync/semaphore" +) + +type paymentQueue struct { + ctx context.Context + semaphore *semaphore.Weighted + verifier Verifier + processTime time.Duration + acceptedPayments *concurrent.SimpleMap[string, *concurrent.SimpleSlice[*Payment]] + errChan chan error +} + +func newPaymentQueue(queueSize int, verifier Verifier, processTime time.Duration, errChan chan error) *paymentQueue { + return &paymentQueue{ + ctx: context.Background(), + semaphore: semaphore.NewWeighted(int64(queueSize)), + verifier: verifier, + processTime: processTime, + acceptedPayments: concurrent.NewSimpleMap[string, *concurrent.SimpleSlice[*Payment]](), + errChan: errChan, + } +} + +func (q *paymentQueue) execute(p *Payment) { + time.Sleep(q.processTime) + p.Status = q.verifier.Verify(p) + if p.Status.Err != nil { + q.errChan <- p.Status.Err + } + close(p.processChan) +} + +func (q *paymentQueue) tryProcess(p *Payment) bool { + if !q.semaphore.TryAcquire(1) { + return false + } + q.appendAcceptedPayments(p) + + go func() { + defer q.semaphore.Release(1) + q.execute(p) + }() + return true +} + +func (q *paymentQueue) process(p *Payment) { + // 遅かれ早かれ処理するため、受け入れた決済として保持しておく + q.appendAcceptedPayments(p) + + q.semaphore.Acquire(q.ctx, 1) + defer q.semaphore.Release(1) + + q.execute(p) +} + +func (q *paymentQueue) appendAcceptedPayments(p *Payment) { + payments, _ := q.acceptedPayments.GetOrSetDefault(p.Token, func() *concurrent.SimpleSlice[*Payment] { return concurrent.NewSimpleSlice[*Payment]() }) + payments.Append(p) +} + +func (q *paymentQueue) getAllAcceptedPayments(token string) []*Payment { + payments, ok := q.acceptedPayments.Get(token) + if !ok { + return []*Payment{} + } + return payments.ToSlice() +} + +func (q *paymentQueue) close() { + q.ctx.Done() +} diff --git a/bench/payment/server.go b/bench/payment/server.go index c6a98e63..f4b0a36b 100644 --- a/bench/payment/server.go +++ b/bench/payment/server.go @@ -11,31 +11,18 @@ const IdempotencyKeyHeader = "Idempotency-Key" const AuthorizationHeader = "Authorization" const AuthorizationHeaderPrefix = "Bearer " -type processedPayment struct { - payment *Payment - processedAt time.Time -} - type Server struct { - mux *http.ServeMux - knownKeys *concurrent.SimpleMap[string, *Payment] - failureCounts *concurrent.SimpleMap[string, int] - processedPayments *concurrent.SimpleSlice[*processedPayment] - processTime time.Duration - verifier Verifier - errChan chan error - closed bool + mux *http.ServeMux + knownKeys *concurrent.SimpleMap[string, *Payment] + queue *paymentQueue + closed bool } -func NewServer(verifier Verifier, processTime time.Duration, errChan chan error) *Server { +func NewServer(verifier Verifier, processTime time.Duration, queueSize int, errChan chan error) *Server { s := &Server{ - mux: http.NewServeMux(), - knownKeys: concurrent.NewSimpleMap[string, *Payment](), - failureCounts: concurrent.NewSimpleMap[string, int](), - processedPayments: concurrent.NewSimpleSlice[*processedPayment](), - processTime: processTime, - verifier: verifier, - errChan: errChan, + mux: http.NewServeMux(), + knownKeys: concurrent.NewSimpleMap[string, *Payment](), + queue: newPaymentQueue(queueSize, verifier, processTime, errChan), } s.mux.HandleFunc("GET /payments", s.GetPaymentsHandler) s.mux.HandleFunc("POST /payments", s.PostPaymentsHandler) @@ -52,4 +39,5 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (s *Server) Close() { s.closed = true + s.queue.close() } diff --git a/development/matching.js b/development/matching.js deleted file mode 100644 index 8bf823a7..00000000 --- a/development/matching.js +++ /dev/null @@ -1,9 +0,0 @@ -// while true; do curl -s http://nginx/api/internal/matching; sleep 0.5; done と同じことをnodejs で実装 - -const f = () => { - try { - fetch("http://localhost:8080/api/internal/matching"); - } catch (e) {} -}; - -setInterval(f, 500);