Skip to content

Commit

Permalink
Merge pull request #722 from isucon/revert-700-revert-697-revert-692-…
Browse files Browse the repository at this point in the history
…feat/payment.failure.ratio

Revert 2 "feat: 🎸 paymentサーバーをキューでなく確率で落とすように"
  • Loading branch information
sapphi-red authored Dec 7, 2024
2 parents d8025ca + e9fd740 commit 5ccd54a
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 74 deletions.
2 changes: 1 addition & 1 deletion bench/benchmarker/scenario/scenario.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func NewScenario(target, addr, paymentURL string, logger *slog.Logger, reporter
worldCtx := world.NewContext(w)

paymentErrChan := make(chan error, 1000)
paymentServer := payment.NewServer(w.PaymentDB, 30*time.Millisecond, paymentErrChan)
paymentServer := payment.NewServer(w.PaymentDB, 30*time.Millisecond, 2, paymentErrChan)
go func() {
http.ListenAndServe(":12345", paymentServer)
}()
Expand Down
80 changes: 43 additions & 37 deletions bench/payment/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,45 +68,54 @@ func (s *Server) PostPaymentsHandler(w http.ResponseWriter, r *http.Request) {
p.Amount = req.Amount
}

time.Sleep(s.processTime)
// (直近3秒で処理された payment の数) / 100 の確率で処理を失敗させる(最大50%)
var recentProcessedCount int
for _, processed := range s.processedPayments.BackwardIter() {
if time.Since(processed.processedAt) > 3*time.Second {
break
// 決済処理
// キューに入れて完了を待つ(ブロッキング)
if s.queue.tryProcess(p) {
<-p.processChan
p.locked.Store(false)

select {
case <-r.Context().Done():
// クライアントが既に切断している
w.WriteHeader(http.StatusGatewayTimeout)
default:
writeResponse(w, p.Status)
}
recentProcessedCount++
}
failurePercentage := recentProcessedCount
if failurePercentage > 50 {
failurePercentage = 50
return
}
failureCount, _ := s.failureCounts.GetOrSetDefault(token, func() int { return 0 })
if rand.IntN(100) > failurePercentage || failureCount >= 4 {
// lock はここでしか触らない。lock が true の場合は idempotency key が同じリクエストが処理中の場合のみ
if p.locked.CompareAndSwap(false, true) {
s.processedPayments.Append(&processedPayment{payment: p, processedAt: time.Now()})
p.Status = s.verifier.Verify(p)
if p.Status.Err != nil {
s.errChan <- p.Status.Err
}
s.failureCounts.Delete(token)
if rand.IntN(100) > failurePercentage || failureCount >= 4 {
writeResponse(w, p.Status)
} else {
writeRandomError(w)
}
return

// キューが詰まっていても確率で成功させる
if rand.IntN(5) == 0 {
slog.Debug("決済が詰まったが成功")

go s.queue.process(p)
<-p.processChan
p.locked.Store(false)

select {
case <-r.Context().Done():
// クライアントが既に切断している
w.WriteHeader(http.StatusGatewayTimeout)
default:
writeResponse(w, p.Status)
}
return
}

// エラーを返した場合でもキューに入る場合がある
if rand.IntN(5) < 4 {
go s.queue.process(p)
// 処理の終了を待たない
go func() {
<-p.processChan
p.locked.Store(false)
}()
slog.Debug("決済が詰まったが、キューに積んでエラー")
} else {
s.failureCounts.Set(token, failureCount+1)
slog.Debug("決済が詰まってエラー")
}

// 不安定なエラーを再現
writeRandomError(w)
}

func writeRandomError(w http.ResponseWriter) {
switch rand.IntN(3) {
case 0:
w.WriteHeader(http.StatusInternalServerError)
Expand Down Expand Up @@ -137,14 +146,11 @@ func (s *Server) GetPaymentsHandler(w http.ResponseWriter, r *http.Request) {
return
}

payments := s.processedPayments.ToSlice()
payments := s.queue.getAllAcceptedPayments(token)

res := []ResponsePayment{}
for _, p := range payments {
if p.payment.Token != token {
continue
}
res = append(res, NewResponsePayment(p.payment))
res = append(res, NewResponsePayment(p))
}
writeJSON(w, http.StatusOK, res)
}
Expand Down
14 changes: 10 additions & 4 deletions bench/payment/handler_test.go_ → bench/payment/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestServer_PaymentHandler(t *testing.T) {
prepare := func(t *testing.T) (*Server, *MockVerifier, *httpexpect.Expect) {
mockCtrl := gomock.NewController(t)
verifier := NewMockVerifier(mockCtrl)
server := NewServer(verifier, 1*time.Millisecond, make(chan error))
server := NewServer(verifier, 1*time.Millisecond, 1, make(chan error))
httpServer := httptest.NewServer(server)
t.Cleanup(httpServer.Close)
e := httpexpect.Default(t, httpServer.URL)
Expand Down Expand Up @@ -327,7 +327,7 @@ func TestServer_GetPaymentsHandler(t *testing.T) {
prepare := func(t *testing.T) (*Server, *MockVerifier, *httpexpect.Expect) {
mockCtrl := gomock.NewController(t)
verifier := NewMockVerifier(mockCtrl)
server := NewServer(verifier, 1*time.Millisecond, make(chan error))
server := NewServer(verifier, 1*time.Millisecond, 1, make(chan error))
httpServer := httptest.NewServer(server)
t.Cleanup(httpServer.Close)
e := httpexpect.Default(t, httpServer.URL)
Expand Down Expand Up @@ -393,12 +393,17 @@ func TestServer_GetPaymentsHandler(t *testing.T) {
})

for _, p := range payments {
status := http.StatusNoContent
if p.Status.Type != StatusSuccess && p.Status.Type != StatusInitial {
status = http.StatusBadRequest
}
e.POST("/payments").
WithHeader(AuthorizationHeader, AuthorizationHeaderPrefix+p.Token).
WithJSON(map[string]any{
"amount": p.Amount,
}).
Expect()
Expect().
Status(status)
}

e.GET("/payments").
Expand Down Expand Up @@ -431,7 +436,8 @@ func TestServer_GetPaymentsHandler(t *testing.T) {
WithJSON(map[string]any{
"amount": p.Amount,
}).
Expect()
Expect().
Status(http.StatusNoContent)

e.GET("/payments").
WithHeader(AuthorizationHeader, AuthorizationHeaderPrefix+token).
Expand Down
4 changes: 3 additions & 1 deletion bench/payment/payment.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@ type Payment struct {
Amount int
Status Status
locked atomic.Bool
processChan chan struct{}
}

func NewPayment(idk string) *Payment {
p := &Payment{
IdempotencyKey: idk,
Status: Status{Type: StatusInitial, Err: nil},
processChan: make(chan struct{}),
}
p.locked.Store(false)
p.locked.Store(true)
return p
}
4 changes: 3 additions & 1 deletion bench/payment/payment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,7 @@ func TestNewPayment(t *testing.T) {
p := NewPayment("test")
assert.Equal(t, "test", p.IdempotencyKey)
assert.Equal(t, StatusInitial, p.Status.Type)
assert.False(t, p.locked.Load())
assert.True(t, p.locked.Load())
assert.NotNil(t, p.processChan)
assert.NotPanics(t, func() { close(p.processChan) })
}
78 changes: 78 additions & 0 deletions bench/payment/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package payment

import (
"context"
"time"

"github.com/isucon/isucon14/bench/internal/concurrent"
"golang.org/x/sync/semaphore"
)

type paymentQueue struct {
ctx context.Context
semaphore *semaphore.Weighted
verifier Verifier
processTime time.Duration
acceptedPayments *concurrent.SimpleMap[string, *concurrent.SimpleSlice[*Payment]]
errChan chan error
}

func newPaymentQueue(queueSize int, verifier Verifier, processTime time.Duration, errChan chan error) *paymentQueue {
return &paymentQueue{
ctx: context.Background(),
semaphore: semaphore.NewWeighted(int64(queueSize)),
verifier: verifier,
processTime: processTime,
acceptedPayments: concurrent.NewSimpleMap[string, *concurrent.SimpleSlice[*Payment]](),
errChan: errChan,
}
}

func (q *paymentQueue) execute(p *Payment) {
time.Sleep(q.processTime)
p.Status = q.verifier.Verify(p)
if p.Status.Err != nil {
q.errChan <- p.Status.Err
}
close(p.processChan)
}

func (q *paymentQueue) tryProcess(p *Payment) bool {
if !q.semaphore.TryAcquire(1) {
return false
}
q.appendAcceptedPayments(p)

go func() {
defer q.semaphore.Release(1)
q.execute(p)
}()
return true
}

func (q *paymentQueue) process(p *Payment) {
// 遅かれ早かれ処理するため、受け入れた決済として保持しておく
q.appendAcceptedPayments(p)

q.semaphore.Acquire(q.ctx, 1)
defer q.semaphore.Release(1)

q.execute(p)
}

func (q *paymentQueue) appendAcceptedPayments(p *Payment) {
payments, _ := q.acceptedPayments.GetOrSetDefault(p.Token, func() *concurrent.SimpleSlice[*Payment] { return concurrent.NewSimpleSlice[*Payment]() })
payments.Append(p)
}

func (q *paymentQueue) getAllAcceptedPayments(token string) []*Payment {
payments, ok := q.acceptedPayments.Get(token)
if !ok {
return []*Payment{}
}
return payments.ToSlice()
}

func (q *paymentQueue) close() {
q.ctx.Done()
}
30 changes: 9 additions & 21 deletions bench/payment/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,31 +11,18 @@ const IdempotencyKeyHeader = "Idempotency-Key"
const AuthorizationHeader = "Authorization"
const AuthorizationHeaderPrefix = "Bearer "

type processedPayment struct {
payment *Payment
processedAt time.Time
}

type Server struct {
mux *http.ServeMux
knownKeys *concurrent.SimpleMap[string, *Payment]
failureCounts *concurrent.SimpleMap[string, int]
processedPayments *concurrent.SimpleSlice[*processedPayment]
processTime time.Duration
verifier Verifier
errChan chan error
closed bool
mux *http.ServeMux
knownKeys *concurrent.SimpleMap[string, *Payment]
queue *paymentQueue
closed bool
}

func NewServer(verifier Verifier, processTime time.Duration, errChan chan error) *Server {
func NewServer(verifier Verifier, processTime time.Duration, queueSize int, errChan chan error) *Server {
s := &Server{
mux: http.NewServeMux(),
knownKeys: concurrent.NewSimpleMap[string, *Payment](),
failureCounts: concurrent.NewSimpleMap[string, int](),
processedPayments: concurrent.NewSimpleSlice[*processedPayment](),
processTime: processTime,
verifier: verifier,
errChan: errChan,
mux: http.NewServeMux(),
knownKeys: concurrent.NewSimpleMap[string, *Payment](),
queue: newPaymentQueue(queueSize, verifier, processTime, errChan),
}
s.mux.HandleFunc("GET /payments", s.GetPaymentsHandler)
s.mux.HandleFunc("POST /payments", s.PostPaymentsHandler)
Expand All @@ -52,4 +39,5 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {

func (s *Server) Close() {
s.closed = true
s.queue.close()
}
9 changes: 0 additions & 9 deletions development/matching.js

This file was deleted.

0 comments on commit 5ccd54a

Please sign in to comment.