Skip to content

Commit

Permalink
Merge pull request #33 from Layr-Labs/make-avssync-package
Browse files Browse the repository at this point in the history
Make `avssync` package
  • Loading branch information
ian-shim authored May 2, 2024
2 parents 1651a5d + 88303bd commit 7b17542
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 114 deletions.
40 changes: 23 additions & 17 deletions avssync.go → avssync/avssync.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package avssync

import (
"context"
Expand All @@ -16,16 +16,16 @@ import (
)

type AvsSync struct {
logger sdklogging.Logger
avsReader avsregistry.AvsRegistryReader
avsWriter avsregistry.AvsRegistryWriter
AvsReader avsregistry.AvsRegistryReader
AvsWriter avsregistry.AvsRegistryWriter
RetrySyncNTimes int

logger sdklogging.Logger
sleepBeforeFirstSyncDuration time.Duration
syncInterval time.Duration
operators []common.Address // empty means we update all operators
quorums []byte
fetchQuorumsDynamically bool
retrySyncNTimes int

readerTimeoutDuration time.Duration
writerTimeoutDuration time.Duration
Expand All @@ -46,22 +46,22 @@ func NewAvsSync(
prometheusServerAddr string,
) *AvsSync {
return &AvsSync{
AvsReader: avsReader,
AvsWriter: avsWriter,
RetrySyncNTimes: retrySyncNTimes,
logger: logger,
avsReader: avsReader,
avsWriter: avsWriter,
sleepBeforeFirstSyncDuration: sleepBeforeFirstSyncDuration,
syncInterval: syncInterval,
operators: operators,
quorums: quorums,
fetchQuorumsDynamically: fetchQuorumsDynamically,
retrySyncNTimes: retrySyncNTimes,
readerTimeoutDuration: readerTimeoutDuration,
writerTimeoutDuration: writerTimeoutDuration,
prometheusServerAddr: prometheusServerAddr,
}
}

func (a *AvsSync) Start() {
func (a *AvsSync) Start(ctx context.Context) {
// TODO: should prob put all of these in a config struct, to make sure we don't forget to print any of them
// when we add new ones.
a.logger.Info("Avssync config",
Expand Down Expand Up @@ -97,9 +97,15 @@ func (a *AvsSync) Start() {
ticker := time.NewTicker(a.syncInterval)
defer ticker.Stop()

for range ticker.C {
a.updateStakes()
a.logger.Infof("Sleeping for %s", a.syncInterval)
for {
select {
case <-ctx.Done():
a.logger.Info("Context done, exiting")
return
case <-ticker.C:
a.updateStakes()
a.logger.Infof("Sleeping for %s", a.syncInterval)
}
}
}

Expand All @@ -112,15 +118,15 @@ func (a *AvsSync) updateStakes() {
// we update one quorum at a time, just to make sure we don't run into any gas limit issues
// in case there are a lot of operators in a given quorum
for _, quorum := range a.quorums {
a.tryNTimesUpdateStakesOfEntireOperatorSetForQuorum(quorum, a.retrySyncNTimes)
a.tryNTimesUpdateStakesOfEntireOperatorSetForQuorum(quorum, a.RetrySyncNTimes)
}
a.logger.Info("Completed stake update. Check logs to make sure every quorum update succeeded successfully.")
} else {
a.logger.Infof("Updating stakes of operators: %v", a.operators)
timeoutCtx, cancel := context.WithTimeout(context.Background(), a.writerTimeoutDuration)
defer cancel()
// this one we update all quorums at once, since we're only updating a subset of operators (which should be a small number)
receipt, err := a.avsWriter.UpdateStakesOfOperatorSubsetForAllQuorums(timeoutCtx, a.operators)
receipt, err := a.AvsWriter.UpdateStakesOfOperatorSubsetForAllQuorums(timeoutCtx, a.operators)
if err != nil {
// no quorum label means we are updating all quorums
updateStakeAttempt.With(prometheus.Labels{"status": string(UpdateStakeStatusError), "quorum": ""}).Inc()
Expand All @@ -143,7 +149,7 @@ func (a *AvsSync) maybeUpdateQuorumSet() {
a.logger.Info("Fetching quorum set dynamically")
timeoutCtx, cancel := context.WithTimeout(context.Background(), a.readerTimeoutDuration)
defer cancel()
quorumCount, err := a.avsReader.GetQuorumCount(&bind.CallOpts{Context: timeoutCtx})
quorumCount, err := a.AvsReader.GetQuorumCount(&bind.CallOpts{Context: timeoutCtx})
if err != nil {
a.logger.Error("Error fetching quorum set dynamically", err)
return
Expand All @@ -165,7 +171,7 @@ func (a *AvsSync) tryNTimesUpdateStakesOfEntireOperatorSetForQuorum(quorum byte,
defer cancel()
// we need to refetch the operator set because one reason for update stakes failing is that the operator set has changed
// in between us fetching it and trying to update it (the contract makes sure the entire operator set is updated and reverts if not)
operatorAddrsPerQuorum, err := a.avsReader.GetOperatorAddrsInQuorumsAtCurrentBlock(&bind.CallOpts{Context: timeoutCtx}, types.QuorumNums{types.QuorumNum(quorum)})
operatorAddrsPerQuorum, err := a.AvsReader.GetOperatorAddrsInQuorumsAtCurrentBlock(&bind.CallOpts{Context: timeoutCtx}, types.QuorumNums{types.QuorumNum(quorum)})
if err != nil {
a.logger.Warn("Error fetching operator addresses in quorums", "err", err, "quorum", quorum, "retryNTimes", retryNTimes, "try", i+1)
continue
Expand All @@ -179,7 +185,7 @@ func (a *AvsSync) tryNTimesUpdateStakesOfEntireOperatorSetForQuorum(quorum byte,
a.logger.Infof("Updating stakes of operators in quorum %d: %v", int(quorum), operators)
timeoutCtx, cancel = context.WithTimeout(context.Background(), a.writerTimeoutDuration)
defer cancel()
receipt, err := a.avsWriter.UpdateStakesOfEntireOperatorSetForQuorums(timeoutCtx, [][]common.Address{operators}, types.QuorumNums{types.QuorumNum(quorum)})
receipt, err := a.AvsWriter.UpdateStakesOfEntireOperatorSetForQuorums(timeoutCtx, [][]common.Address{operators}, types.QuorumNums{types.QuorumNum(quorum)})
if err != nil {
a.logger.Warn("Error updating stakes of entire operator set for quorum", "err", err, "quorum", int(quorum), "retryNTimes", retryNTimes, "try", i+1)
continue
Expand Down
6 changes: 4 additions & 2 deletions metrics.go → avssync/metrics.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package avssync

import (
"net/http"
Expand Down Expand Up @@ -40,5 +40,7 @@ func StartMetricsServer(metricsAddr string) {
registry.MustRegister(updateStakeAttempt, txRevertedTotal, operatorsUpdated)
http.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{}))
// not sure if we need to handle this error, since if metric server errors, then we will get alerts from grafana
go http.ListenAndServe(metricsAddr, nil)
go func() {
_ = http.ListenAndServe(metricsAddr, nil)
}()
}
129 changes: 36 additions & 93 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/Layr-Labs/eigensdk-go/signerv2"
"github.com/Layr-Labs/eigensdk-go/types"

"github.com/Layr-Labs/avs-sync/avssync"
contractreg "github.com/Layr-Labs/avs-sync/bindings/ContractsRegistry"
)

Expand Down Expand Up @@ -64,14 +65,14 @@ func TestIntegrationUpdateSingleOperatorPath(t *testing.T) {
}
operatorAddr := crypto.PubkeyToAddress(operatorEcdsaPrivKey.PublicKey)
operatorBlsPrivKey := "0x1"
c := NewAvsSyncComponents(t, anvilHttpEndpoint, contractAddresses, []common.Address{operatorAddr}, 30*time.Second)
c := NewAvsSyncComponents(t, anvilHttpEndpoint, contractAddresses, []common.Address{operatorAddr}, 0)
avsSync := c.avsSync

// first register operator into avs. at this point, the operator will have whatever stake it had registered in eigenlayer in the avs
registerOperatorWithAvs(c.wallet, anvilHttpEndpoint, contractAddresses, operatorEcdsaPrivKeyHex, operatorBlsPrivKey)

// get stake of operator before sync
operatorsPerQuorumBeforeSync, err := avsSync.avsReader.GetOperatorsStakeInQuorumsAtCurrentBlock(&bind.CallOpts{}, []types.QuorumNum{0})
operatorsPerQuorumBeforeSync, err := c.avsReader.GetOperatorsStakeInQuorumsAtCurrentBlock(&bind.CallOpts{}, []types.QuorumNum{0})
if err != nil {
t.Fatal(err)
}
Expand All @@ -82,69 +83,11 @@ func TestIntegrationUpdateSingleOperatorPath(t *testing.T) {
depositErc20IntoStrategyForOperator(c.wallet, anvilHttpEndpoint, contractAddresses.DelegationManager, contractAddresses.Erc20MockStrategy, operatorEcdsaPrivKeyHex, operatorAddr.Hex(), depositAmount)

// run avsSync
go avsSync.Start()
go avsSync.Start(context.Background())
time.Sleep(5 * time.Second)

// get stake of operator after sync
operatorsPerQuorumAfterSync, err := avsSync.avsReader.GetOperatorsStakeInQuorumsAtCurrentBlock(&bind.CallOpts{}, []types.QuorumNum{0})
if err != nil {
t.Fatal(err)
}
operatorStakeAfterSync := operatorsPerQuorumAfterSync[0][0].Stake
operatorStakeDiff := new(big.Int).Sub(operatorStakeAfterSync, operatorStakeBeforeSync)

// we just check that the diff is equal to the deposited amount
if operatorStakeDiff.Cmp(depositAmount) != 0 {
t.Errorf("expected operator stake diff to be equal to deposit amount, got %v", operatorStakeDiff)
}

}

// here we test the case where we call avsSync without a list of operators
// although the operator set here consists of a single operator, the code path is different
func TestIntegrationFullOperatorSet(t *testing.T) {

/* Start the anvil chain */
anvilC := startAnvilTestContainer()
// Not sure why but deferring anvilC.Terminate() causes a panic when the test finishes...
// so letting it terminate silently for now
anvilHttpEndpoint, err := anvilC.Endpoint(context.Background(), "http")
if err != nil {
t.Fatal(err)
}

contractAddresses := getContractAddressesFromContractRegistry(anvilHttpEndpoint)
operatorEcdsaPrivKeyHex := "ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80"
operatorEcdsaPrivKey, err := crypto.HexToECDSA(operatorEcdsaPrivKeyHex)
if err != nil {
t.Fatal(err)
}
operatorAddr := crypto.PubkeyToAddress(operatorEcdsaPrivKey.PublicKey)
operatorBlsPrivKey := "0x1"
c := NewAvsSyncComponents(t, anvilHttpEndpoint, contractAddresses, []common.Address{}, 30*time.Second)
avsSync := c.avsSync

// first register operator into avs. at this point, the operator will have whatever stake it had registered in eigenlayer in the avs
registerOperatorWithAvs(c.wallet, anvilHttpEndpoint, contractAddresses, operatorEcdsaPrivKeyHex, operatorBlsPrivKey)

// get stake of operator before sync
operatorsPerQuorumBeforeSync, err := avsSync.avsReader.GetOperatorsStakeInQuorumsAtCurrentBlock(&bind.CallOpts{}, []types.QuorumNum{0})
if err != nil {
t.Fatal(err)
}
// TODO: should be checking all operators, not just the first one
operatorStakeBeforeSync := operatorsPerQuorumBeforeSync[0][0].Stake

// deposit into strategy to create a diff between eigenlayer and avs stakes
depositAmount := big.NewInt(100)
depositErc20IntoStrategyForOperator(c.wallet, anvilHttpEndpoint, contractAddresses.DelegationManager, contractAddresses.Erc20MockStrategy, operatorEcdsaPrivKeyHex, operatorAddr.Hex(), depositAmount)

// run avsSync
go avsSync.Start()
time.Sleep(5 * time.Second)

// get stake of operator after sync
operatorsPerQuorumAfterSync, err := avsSync.avsReader.GetOperatorsStakeInQuorumsAtCurrentBlock(&bind.CallOpts{}, []types.QuorumNum{0})
operatorsPerQuorumAfterSync, err := c.avsReader.GetOperatorsStakeInQuorumsAtCurrentBlock(&bind.CallOpts{}, []types.QuorumNum{0})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -181,7 +124,7 @@ func TestIntegrationFullOperatorSetWithRetry(t *testing.T) {
operatorBlsPrivKey := "0x1"
// we create avs sync and replace its avsWriter with a mock that will fail the first 2 times we call UpdateStakesOfEntireOperatorSetForQuorums
// and succeed on the third time
c := NewAvsSyncComponents(t, anvilHttpEndpoint, contractAddresses, []common.Address{}, 30*time.Second)
c := NewAvsSyncComponents(t, anvilHttpEndpoint, contractAddresses, []common.Address{}, 0)
avsSync := c.avsSync

// first register operator into avs. at this point, the operator will have whatever stake it had registered in eigenlayer in the avs
Expand All @@ -192,16 +135,16 @@ func TestIntegrationFullOperatorSetWithRetry(t *testing.T) {
// this is the test. we just make sure this is called 3 times
mockAvsRegistryWriter.EXPECT().UpdateStakesOfEntireOperatorSetForQuorums(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("error")).Times(2)
mockAvsRegistryWriter.EXPECT().UpdateStakesOfEntireOperatorSetForQuorums(gomock.Any(), gomock.Any(), gomock.Any()).Return(&gethtypes.Receipt{Status: gethtypes.ReceiptStatusSuccessful}, nil)
avsSync.avsWriter = mockAvsRegistryWriter
avsSync.retrySyncNTimes = 3
avsSync.AvsWriter = mockAvsRegistryWriter
avsSync.RetrySyncNTimes = 3

// run avsSync
go avsSync.Start()
time.Sleep(5 * time.Second)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
avsSync.Start(ctx)
}

func TestSingleRun(t *testing.T) {
func TestIntegrationFullOperatorSet(t *testing.T) {
/* Start the anvil chain */
anvilC := startAnvilTestContainer()
// Not sure why but deferring anvilC.Terminate() causes a panic when the test finishes...
Expand All @@ -227,7 +170,7 @@ func TestSingleRun(t *testing.T) {
registerOperatorWithAvs(c.wallet, anvilHttpEndpoint, contractAddresses, operatorEcdsaPrivKeyHex, operatorBlsPrivKey)

// get stake of operator before sync
operatorsPerQuorumBeforeSync, err := avsSync.avsReader.GetOperatorsStakeInQuorumsAtCurrentBlock(&bind.CallOpts{}, []types.QuorumNum{0})
operatorsPerQuorumBeforeSync, err := avsSync.AvsReader.GetOperatorsStakeInQuorumsAtCurrentBlock(&bind.CallOpts{}, []types.QuorumNum{0})
if err != nil {
t.Fatal(err)
}
Expand All @@ -238,10 +181,10 @@ func TestSingleRun(t *testing.T) {
depositAmount := big.NewInt(100)
depositErc20IntoStrategyForOperator(c.wallet, anvilHttpEndpoint, contractAddresses.DelegationManager, contractAddresses.Erc20MockStrategy, operatorEcdsaPrivKeyHex, operatorAddr.Hex(), depositAmount)

avsSync.Start()
avsSync.Start(context.Background())

// get stake of operator after sync
operatorsPerQuorumAfterSync, err := avsSync.avsReader.GetOperatorsStakeInQuorumsAtCurrentBlock(&bind.CallOpts{}, []types.QuorumNum{0})
operatorsPerQuorumAfterSync, err := avsSync.AvsReader.GetOperatorsStakeInQuorumsAtCurrentBlock(&bind.CallOpts{}, []types.QuorumNum{0})
if err != nil {
t.Fatal(err)
}
Expand All @@ -255,8 +198,10 @@ func TestSingleRun(t *testing.T) {
}

type AvsSyncComponents struct {
avsSync *AvsSync
wallet walletsdk.Wallet
avsSync *avssync.AvsSync
wallet walletsdk.Wallet
avsReader *avsregistry.AvsRegistryChainReader
avsWriter *avsregistry.AvsRegistryChainWriter
}

func NewAvsSyncComponents(t *testing.T, anvilHttpEndpoint string, contractAddresses ContractAddresses, operators []common.Address, syncInterval time.Duration) *AvsSyncComponents {
Expand Down Expand Up @@ -315,7 +260,7 @@ func NewAvsSyncComponents(t *testing.T, anvilHttpEndpoint string, contractAddres
logger.Fatalf("Cannot create avs reader", "err", err)
}

avsSync := NewAvsSync(
avsSync := avssync.NewAvsSync(
logger,
avsReader,
avsWriter,
Expand All @@ -326,13 +271,15 @@ func NewAvsSyncComponents(t *testing.T, anvilHttpEndpoint string, contractAddres
[]byte{0},
false,
1, // 1 retry
5*time.Second,
5*time.Second,
time.Second,
time.Second,
"", // no metrics server (otherwise parallel tests all try to start server at same endpoint and error out)
)
return &AvsSyncComponents{
avsSync: avsSync,
wallet: wallet,
avsSync: avsSync,
wallet: wallet,
avsReader: avsReader,
avsWriter: avsWriter,
}
}

Expand All @@ -344,7 +291,7 @@ func startAnvilTestContainer() testcontainers.Container {

ctx := context.Background()
req := testcontainers.ContainerRequest{
Image: "ghcr.io/foundry-rs/foundry:latest",
Image: "ghcr.io/foundry-rs/foundry:nightly-5b7e4cb3c882b28f3c32ba580de27ce7381f415a",
Mounts: testcontainers.ContainerMounts{
testcontainers.ContainerMount{
Source: testcontainers.GenericBindMountSource{
Expand Down Expand Up @@ -380,19 +327,15 @@ func advanceChainByNBlocks(n int, anvilC testcontainers.Container) {
}
rpcUrl := "http://" + anvilEndpoint
// this is just the first anvil address, which is funded so can send ether
address := "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266"
privateKey := "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80"
for i := 0; i < n; i++ {
// we just send a transaction to ourselves to advance the chain
cmd := exec.Command("bash", "-c",
fmt.Sprintf(
`cast send %s --value 0.01ether --private-key %s --rpc-url %s`,
address, privateKey, rpcUrl),
)
err = cmd.Run()
if err != nil {
panic(err)
}
// we just send a transaction to ourselves to advance the chain
cmd := exec.Command("bash", "-c",
fmt.Sprintf(
`cast rpc anvil_mine %d --rpc-url %s`,
n, rpcUrl),
)
err = cmd.Run()
if err != nil {
panic(err)
}
}

Expand Down
5 changes: 3 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"time"

"github.com/Layr-Labs/avs-sync/avssync"
"github.com/Layr-Labs/eigensdk-go/aws/secretmanager"
"github.com/Layr-Labs/eigensdk-go/chainio/clients/avsregistry"
"github.com/Layr-Labs/eigensdk-go/chainio/clients/eth"
Expand Down Expand Up @@ -207,7 +208,7 @@ func avsSyncMain(cliCtx *cli.Context) error {
sleepBeforeFirstSyncDuration = firstSyncTime.Sub(now)
}
logger.Infof("Sleeping for %v before first sync, so that it happens at %v", sleepBeforeFirstSyncDuration, time.Now().Add(sleepBeforeFirstSyncDuration))
avsSync := NewAvsSync(
avsSync := avssync.NewAvsSync(
logger,
avsReader,
avsWriter,
Expand All @@ -222,6 +223,6 @@ func avsSyncMain(cliCtx *cli.Context) error {
cliCtx.String(MetricsAddrFlag.Name),
)

avsSync.Start()
avsSync.Start(context.Background())
return nil
}

0 comments on commit 7b17542

Please sign in to comment.