Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add leader election retry #66

Merged
merged 37 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
75b2fd8
fix leader election retry
samos123 Feb 3, 2024
9a049f1
increase sleep from 20 to 30
samos123 Feb 3, 2024
d8fdf7f
add single replica e2e tests
samos123 Feb 3, 2024
23f6904
add more descriptive name to e2e replica test
samos123 Feb 3, 2024
fd8725a
stream all logs of lingo
samos123 Feb 3, 2024
e365dd7
add retry to leader election process
samos123 Feb 3, 2024
4b2acd8
increase apiserver unavailability from 30s to 60s
samos123 Feb 3, 2024
c149556
recreate context if context deadline exceeded
samos123 Feb 4, 2024
1dbb678
ensure apiserver outage is 2 minutes
samos123 Feb 4, 2024
ba46c40
add log to indicate context deadline exceeded
samos123 Feb 4, 2024
3f1d48d
kubectl sometimes returns errors when apiserver went away for too long
samos123 Feb 4, 2024
7339da4
make wait for backoff blocking
samos123 Feb 4, 2024
228920b
fix logging in e2e test after apiserver went down
samos123 Feb 4, 2024
2405e3b
remove unneeded check for context deadline exceeds
samos123 Feb 4, 2024
83c81ab
wait for apiserver to be ready
samos123 Feb 4, 2024
c711ae6
Add more logs in e2e test
samos123 Feb 4, 2024
b8fc6b7
address PR comments
samos123 Feb 4, 2024
8236150
simplify tests and run in parallel
samos123 Feb 4, 2024
c63fd0e
improve GHA job names
samos123 Feb 5, 2024
e7cedfa
simplify leader election retry
samos123 Feb 5, 2024
19a89af
increase wait time for scale back to 0 in e2e
samos123 Feb 5, 2024
6ad6446
fix #67 flapping scale from 0 to 1 to 0 to 1
samos123 Feb 5, 2024
38ff2ad
add hostname to leader log messages
samos123 Feb 5, 2024
d4a3947
maybe this fixes #67
samos123 Feb 5, 2024
b056bce
fix PR comment, thanks Alex!
samos123 Feb 6, 2024
ba9d1e0
improve string formatting
samos123 Feb 6, 2024
985831e
sleep for 20 sec after apiserver outage
samos123 Feb 6, 2024
610ba31
wait wasn't long enough
samos123 Feb 6, 2024
b7bd4cf
revert fix for #67 because it breaks scale down to 0
samos123 Feb 6, 2024
8bea0bf
fix #67 only the leader should scale
samos123 Feb 6, 2024
9552e9e
simplify fix for #67 and unit tests
samos123 Feb 6, 2024
d720074
print lingo logs of all replicas on failure
samos123 Feb 6, 2024
fae4bab
in some cases state is incorrect so just scale to desired scale
samos123 Feb 6, 2024
af888e6
Revert "in some cases state is incorrect so just scale to desired scale"
samos123 Feb 6, 2024
89b8a6e
Revert "simplify fix for #67 and unit tests"
samos123 Feb 6, 2024
3f6bab5
Revert "fix #67 only the leader should scale"
samos123 Feb 6, 2024
7310411
remove broken test
samos123 Feb 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,14 @@ jobs:
run: make test-integration

e2e:
strategy:
matrix:
replicas: ["1", "3"]
test_cases:
- { requests: 60, expected_replicas: 1 }
- { requests: 300, expected_replicas: 2 }
runs-on: ubuntu-latest

name: E2E Lingo.replicas=${{ matrix.replicas }} requests=${{ matrix.test_cases.requests }} expected_replicas=${{ matrix.test_cases.expected_replicas }}
steps:
- name: Checkout code
uses: actions/checkout@v2
Expand All @@ -49,4 +55,8 @@ jobs:
sudo mv skaffold /usr/local/bin

- name: Run e2e tests
env:
REPLICAS: ${{ matrix.replicas }}
REQUESTS: ${{ matrix.test_cases.requests }}
EXPECTED_REPLICAS: ${{ matrix.test_cases.expected_replicas }}
run: make test-e2e
6 changes: 5 additions & 1 deletion cmd/lingo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,11 @@ func run() error {
}()
go func() {
setupLog.Info("Starting leader election")
le.Start(ctx)
err := le.Start(ctx)
if err != nil {
setupLog.Error(err, "starting leader election")
os.Exit(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the context is canceled on a graceful shutdown (sigterm) then Start() would return the "cancel" error that results in an Exit(1).
This error handling strategy is used in other places in this file as well. Instead, we could wrap the context in L110
ctx, cancel := context.WithCancel(ctrl.SetupSignalHandler()) and call cancel() instead. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the error comes from the context already, it may be simpler to do nothing and let the go routine complete

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't fully understand how to change it. Could you elaborate? Are you saying instead of calling os.Exit() just call cancel()?

I didn't see any other example of using cancel on the context in main.go. I tried to follow the same pattern as the other start functions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only error returned from Start() is coming from the context, now. Thanks to your for loop. Calling cancel on the context again is not needed. I would not return an error in Start() and let the Go routine complete. The main thread should handle the shutdown. I should have deleted my first comment. Sorry for the confusion.
To elaborate for a different use case where other errors can happen, this is an example:

	ctx, cancel := context.WithCancel(ctrl.SetupSignalHandler())
	go func() {
		// in subroutine
		if err:= doSomething(ctx); err!=nil{
			// todo: log error
			cancel()
		}
	}()
	// main thread
	if err:=doSomethingElse(ctx);err!=nil{
		// todo: log error
		os.Exit(1)
	}

Please note that the error is not returned to the main thread.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for that example! I think I get it now. To me it feels cleaner to just directly exit instead of cancelling a context which then may in the future cause an exit? I favor being explicit and going to the exit path as soon as it makes sense.

}
}()
defer func() {
setupLog.Info("waiting on manager to stop")
Expand Down
2 changes: 1 addition & 1 deletion pkg/deployments/scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (s *scaler) compareScales(current, desired int32) {
s.desiredScale = desired
}

if s.currentScale == -1 || s.desiredScale == -1 {
if s.currentScale == -1 || s.desiredScale == -1 || desired == -1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't actually solve issue #67 . So I might revert this

// Nothing to compare if we only have partial information
return
}
Expand Down
23 changes: 19 additions & 4 deletions pkg/leader/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/util/flowcontrol"
)

func NewElection(clientset kubernetes.Interface, id, namespace string) *Election {
Expand All @@ -35,11 +36,11 @@ func NewElection(clientset kubernetes.Interface, id, namespace string) *Election
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
log.Println("Started leading")
log.Printf("%v started leading", id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: %v does the job but it is more common to use %s for string or %q to have a quoted string

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know about %q that might actually be nice here.

isLeader.Store(true)
},
OnStoppedLeading: func() {
log.Println("Stopped leading")
log.Printf("%v stopped leading", id)
isLeader.Store(false)
},
OnNewLeader: func(identity string) {
Expand All @@ -54,14 +55,28 @@ func NewElection(clientset kubernetes.Interface, id, namespace string) *Election
return &Election{
IsLeader: isLeader,
config: config,
ID: id,
}
}

type Election struct {
config leaderelection.LeaderElectionConfig
IsLeader *atomic.Bool
ID string
}

func (le *Election) Start(ctx context.Context) {
leaderelection.RunOrDie(ctx, le.config)
func (le *Election) Start(ctx context.Context) error {
backoff := flowcontrol.NewBackOff(1*time.Second, 15*time.Second)
const backoffID = "lingo-leader-election"
for {
leaderelection.RunOrDie(ctx, le.config)
backoff.Next(backoffID, backoff.Clock.Now())
delay := backoff.Get(backoffID)
log.Printf("Leader election stopped on %v, retrying in %v", le.ID, delay)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(delay):
}
}
samos123 marked this conversation as resolved.
Show resolved Hide resolved
}
63 changes: 29 additions & 34 deletions tests/e2e/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ set -xe
HOST=127.0.0.1
PORT=30080
BASE_URL="http://$HOST:$PORT/v1"
REPLICAS=${REPLICAS:-3}
REQUESTS=60
samos123 marked this conversation as resolved.
Show resolved Hide resolved
EXPECTED_REPLICAS=1


if kind get clusters | grep -q substratus-test; then
Expand Down Expand Up @@ -42,6 +45,9 @@ if ! kubectl get deployment lingo; then
skaffold run
fi

kubectl patch deployment lingo --patch "{\"spec\": {\"replicas\": $REPLICAS}}"

kubectl logs -f deployment/lingo &

kubectl wait --for=condition=available --timeout=30s deployment/lingo

Expand Down Expand Up @@ -77,53 +83,42 @@ pip3 install openai==1.2.3

# Send 60 requests in parallel to stapi backend using openai python client and threading
python3 $SCRIPT_DIR/test_openai_embedding.py \
--requests 60 --timeout 300 --base-url "${BASE_URL}" \
--requests ${REQUESTS} --timeout 300 --base-url "${BASE_URL}" \
--model text-embedding-ada-002

# Ensure replicas has been scaled up to 1 after sending 60 requests
replicas=$(kubectl get deployment stapi-minilm-l6-v2 -o jsonpath='{.spec.replicas}')
if [ "$replicas" -eq 1 ]; then
echo "Test passed: Expected 1 replica after sending requests 60 requests"
if [ "$replicas" -ge "${EXPECTED_REPLICAS}" ]; then
echo "Test passed: Expected ${EXPECTED_REPLICAS} or more replicas and got ${replicas} after sending requests ${REQUESTS} requests"
else
echo "Test failed: Expected 1 replica after sending requests 60 requests, got $replicas"
echo "Test failed: Expected ${EXPECTED_REPLICAS} or more replicas after sending requests ${REQUESTS} requests, got ${replicas}"
exit 1
fi

echo "Waiting for deployment to scale down back to 0 within 2 minutes"
for i in {1..15}; do
if [ "$i" -eq 15 ]; then
# Verify that leader election works by forcing a 120 second apiserver outage
KIND_NODE=$(kind get nodes --name=substratus-test)
docker exec ${KIND_NODE} iptables -I INPUT -p tcp --dport 6443 -j DROP
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 6443 the port of the API Server running on kind?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep and credit to chatGPT with coming up with the high-level idea to simply block the traffic.

k describe svc kubernetes 
Name:              kubernetes
Namespace:         default
Labels:            component=apiserver
                   provider=kubernetes
Annotations:       <none>
Selector:          <none>
Type:              ClusterIP
IP Family Policy:  SingleStack
IP Families:       IPv4
IP:                10.96.0.1
IPs:               10.96.0.1
Port:              https  443/TCP
TargetPort:        6443/TCP
Endpoints:         172.18.0.2:6443
Session Affinity:  None
Events:            <none>

sleep 120
docker exec ${KIND_NODE} iptables -D INPUT -p tcp --dport 6443 -j DROP

until kubectl get pods; do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is until kubectl get pods; do doing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so even after the iptable rule is removed it takes sometimes some time for kubernetes to recover. This until statement will wait until kubectl get pods start working again, meaning only if kubectl get pods returns exit code 0 it will continue.

echo "Waiting for apiserver to be back up, waiting for 1 second and trying again"
sleep 1
done

# rerun kubectl logs because previous one got killed when apiserver was down
kubectl logs --tail=500 -f deployment/lingo &
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you might want --tail=-1 if you are trying to get at all logs since restart.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to get all the logs which may already have 1000+ entries and only the last 500. I tried and last 100 wasn't enough but last 500 was more than enough.


echo "Waiting for deployment to scale down back to 0 within ~2 minutes"
for i in {1..30}; do
if [ "$i" -eq 30 ]; then
echo "Test failed: Expected 0 replica after not having requests for more than 1 minute, got $replicas"
exit 1
fi
replicas=$(kubectl get deployment stapi-minilm-l6-v2 -o jsonpath='{.spec.replicas}')
replicas=$(kubectl get deployment stapi-minilm-l6-v2 -o jsonpath='{.spec.replicas}' || true)
if [ "$replicas" -eq 0 ]; then
echo "Test passed: Expected 0 replica after not having requests for more than 1 minute"
break
fi
sleep 8
sleep 6
done

echo "Patching stapi deployment to sleep on startup"
cat <<EOF | kubectl patch deployment stapi-minilm-l6-v2 --type merge --patch "$(cat)"
spec:
template:
spec:
initContainers:
- name: sleep
image: busybox
command: ["sh", "-c", "sleep 10"]
EOF

requests=300
echo "Send $requests requests in parallel to stapi backend using openai python client and threading"
python3 $SCRIPT_DIR/test_openai_embedding.py \
--requests $requests --timeout 600 --base-url "${BASE_URL}" \
--model text-embedding-ada-002

replicas=$(kubectl get deployment stapi-minilm-l6-v2 -o jsonpath='{.spec.replicas}')
if [ "$replicas" -ge 2 ]; then
echo "Test passed: Expected 2 or more replicas after sending more than $requests requests, got $replicas"
else
echo "Test failed: Expected 2 or more replicas after sending more than $requests requests, got $replicas"
exit 1
fi
Loading