Skip to content

Commit

Permalink
Merge pull request #700 from isucon/revert-697-revert-692-feat/paymen…
Browse files Browse the repository at this point in the history
…t.failure.ratio

Revert "Revert "feat: 🎸 paymentサーバーをキューでなく確率で落とすように""
  • Loading branch information
ryoha000 authored Dec 6, 2024
2 parents 1e04c24 + 82c085e commit b98490d
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 147 deletions.
2 changes: 1 addition & 1 deletion bench/benchmarker/scenario/scenario.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func NewScenario(target, addr, paymentURL string, logger *slog.Logger, reporter
worldCtx := world.NewContext(w)

paymentErrChan := make(chan error, 1000)
paymentServer := payment.NewServer(w.PaymentDB, 30*time.Millisecond, 2, paymentErrChan)
paymentServer := payment.NewServer(w.PaymentDB, 30*time.Millisecond, paymentErrChan)
go func() {
http.ListenAndServe(":12345", paymentServer)
}()
Expand Down
80 changes: 37 additions & 43 deletions bench/payment/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,54 +68,45 @@ func (s *Server) PostPaymentsHandler(w http.ResponseWriter, r *http.Request) {
p.Amount = req.Amount
}

// 決済処理
// キューに入れて完了を待つ(ブロッキング)
if s.queue.tryProcess(p) {
<-p.processChan
p.locked.Store(false)

select {
case <-r.Context().Done():
// クライアントが既に切断している
w.WriteHeader(http.StatusGatewayTimeout)
default:
writeResponse(w, p.Status)
time.Sleep(s.processTime)
// (直近3秒で処理された payment の数) / 100 の確率で処理を失敗させる(最大50%)
var recentProcessedCount int
for _, processed := range s.processedPayments.BackwardIter() {
if time.Since(processed.processedAt) > 3*time.Second {
break
}
return
recentProcessedCount++
}

// キューが詰まっていても確率で成功させる
if rand.IntN(5) == 0 {
slog.Debug("決済が詰まったが成功")

go s.queue.process(p)
<-p.processChan
p.locked.Store(false)

select {
case <-r.Context().Done():
// クライアントが既に切断している
w.WriteHeader(http.StatusGatewayTimeout)
default:
writeResponse(w, p.Status)
}
return
failurePercentage := recentProcessedCount
if failurePercentage > 50 {
failurePercentage = 50
}

// エラーを返した場合でもキューに入る場合がある
if rand.IntN(5) < 4 {
go s.queue.process(p)
// 処理の終了を待たない
go func() {
<-p.processChan
p.locked.Store(false)
}()
slog.Debug("決済が詰まったが、キューに積んでエラー")
failureCount, _ := s.failureCounts.GetOrSetDefault(token, func() int { return 0 })
if rand.IntN(100) > failurePercentage || failureCount >= 4 {
// lock はここでしか触らない。lock が true の場合は idempotency key が同じリクエストが処理中の場合のみ
if p.locked.CompareAndSwap(false, true) {
s.processedPayments.Append(&processedPayment{payment: p, processedAt: time.Now()})
p.Status = s.verifier.Verify(p)
if p.Status.Err != nil {
s.errChan <- p.Status.Err
}
s.failureCounts.Delete(token)
if rand.IntN(100) > failurePercentage || failureCount >= 4 {
writeResponse(w, p.Status)
} else {
writeRandomError(w)
}
return
}
} else {
slog.Debug("決済が詰まってエラー")
s.failureCounts.Set(token, failureCount+1)
}

// 不安定なエラーを再現
writeRandomError(w)
}

func writeRandomError(w http.ResponseWriter) {
switch rand.IntN(3) {
case 0:
w.WriteHeader(http.StatusInternalServerError)
Expand Down Expand Up @@ -146,11 +137,14 @@ func (s *Server) GetPaymentsHandler(w http.ResponseWriter, r *http.Request) {
return
}

payments := s.queue.getAllAcceptedPayments(token)
payments := s.processedPayments.ToSlice()

res := []ResponsePayment{}
for _, p := range payments {
res = append(res, NewResponsePayment(p))
if p.payment.Token != token {
continue
}
res = append(res, NewResponsePayment(p.payment))
}
writeJSON(w, http.StatusOK, res)
}
Expand Down
14 changes: 4 additions & 10 deletions bench/payment/handler_test.go → bench/payment/handler_test.go_
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestServer_PaymentHandler(t *testing.T) {
prepare := func(t *testing.T) (*Server, *MockVerifier, *httpexpect.Expect) {
mockCtrl := gomock.NewController(t)
verifier := NewMockVerifier(mockCtrl)
server := NewServer(verifier, 1*time.Millisecond, 1, make(chan error))
server := NewServer(verifier, 1*time.Millisecond, make(chan error))
httpServer := httptest.NewServer(server)
t.Cleanup(httpServer.Close)
e := httpexpect.Default(t, httpServer.URL)
Expand Down Expand Up @@ -327,7 +327,7 @@ func TestServer_GetPaymentsHandler(t *testing.T) {
prepare := func(t *testing.T) (*Server, *MockVerifier, *httpexpect.Expect) {
mockCtrl := gomock.NewController(t)
verifier := NewMockVerifier(mockCtrl)
server := NewServer(verifier, 1*time.Millisecond, 1, make(chan error))
server := NewServer(verifier, 1*time.Millisecond, make(chan error))
httpServer := httptest.NewServer(server)
t.Cleanup(httpServer.Close)
e := httpexpect.Default(t, httpServer.URL)
Expand Down Expand Up @@ -393,17 +393,12 @@ func TestServer_GetPaymentsHandler(t *testing.T) {
})

for _, p := range payments {
status := http.StatusNoContent
if p.Status.Type != StatusSuccess && p.Status.Type != StatusInitial {
status = http.StatusBadRequest
}
e.POST("/payments").
WithHeader(AuthorizationHeader, AuthorizationHeaderPrefix+p.Token).
WithJSON(map[string]any{
"amount": p.Amount,
}).
Expect().
Status(status)
Expect()
}

e.GET("/payments").
Expand Down Expand Up @@ -436,8 +431,7 @@ func TestServer_GetPaymentsHandler(t *testing.T) {
WithJSON(map[string]any{
"amount": p.Amount,
}).
Expect().
Status(http.StatusNoContent)
Expect()

e.GET("/payments").
WithHeader(AuthorizationHeader, AuthorizationHeaderPrefix+token).
Expand Down
4 changes: 1 addition & 3 deletions bench/payment/payment.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,13 @@ type Payment struct {
Amount int
Status Status
locked atomic.Bool
processChan chan struct{}
}

func NewPayment(idk string) *Payment {
p := &Payment{
IdempotencyKey: idk,
Status: Status{Type: StatusInitial, Err: nil},
processChan: make(chan struct{}),
}
p.locked.Store(true)
p.locked.Store(false)
return p
}
4 changes: 1 addition & 3 deletions bench/payment/payment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,5 @@ func TestNewPayment(t *testing.T) {
p := NewPayment("test")
assert.Equal(t, "test", p.IdempotencyKey)
assert.Equal(t, StatusInitial, p.Status.Type)
assert.True(t, p.locked.Load())
assert.NotNil(t, p.processChan)
assert.NotPanics(t, func() { close(p.processChan) })
assert.False(t, p.locked.Load())
}
78 changes: 0 additions & 78 deletions bench/payment/queue.go

This file was deleted.

30 changes: 21 additions & 9 deletions bench/payment/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,31 @@ const IdempotencyKeyHeader = "Idempotency-Key"
const AuthorizationHeader = "Authorization"
const AuthorizationHeaderPrefix = "Bearer "

type processedPayment struct {
payment *Payment
processedAt time.Time
}

type Server struct {
mux *http.ServeMux
knownKeys *concurrent.SimpleMap[string, *Payment]
queue *paymentQueue
closed bool
mux *http.ServeMux
knownKeys *concurrent.SimpleMap[string, *Payment]
failureCounts *concurrent.SimpleMap[string, int]
processedPayments *concurrent.SimpleSlice[*processedPayment]
processTime time.Duration
verifier Verifier
errChan chan error
closed bool
}

func NewServer(verifier Verifier, processTime time.Duration, queueSize int, errChan chan error) *Server {
func NewServer(verifier Verifier, processTime time.Duration, errChan chan error) *Server {
s := &Server{
mux: http.NewServeMux(),
knownKeys: concurrent.NewSimpleMap[string, *Payment](),
queue: newPaymentQueue(queueSize, verifier, processTime, errChan),
mux: http.NewServeMux(),
knownKeys: concurrent.NewSimpleMap[string, *Payment](),
failureCounts: concurrent.NewSimpleMap[string, int](),
processedPayments: concurrent.NewSimpleSlice[*processedPayment](),
processTime: processTime,
verifier: verifier,
errChan: errChan,
}
s.mux.HandleFunc("GET /payments", s.GetPaymentsHandler)
s.mux.HandleFunc("POST /payments", s.PostPaymentsHandler)
Expand All @@ -39,5 +52,4 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {

func (s *Server) Close() {
s.closed = true
s.queue.close()
}
9 changes: 9 additions & 0 deletions development/matching.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// while true; do curl -s http://nginx/api/internal/matching; sleep 0.5; done と同じことをnodejs で実装

const f = () => {
try {
fetch("http://localhost:8080/api/internal/matching");
} catch (e) {}
};

setInterval(f, 500);

0 comments on commit b98490d

Please sign in to comment.