Skip to content

Commit

Permalink
update flow-go
Browse files Browse the repository at this point in the history
  • Loading branch information
Guitarheroua committed Sep 26, 2024
2 parents a68bcda + 5374159 commit bab4756
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 26 deletions.
21 changes: 18 additions & 3 deletions emulator/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1323,7 +1323,7 @@ func (b *Blockchain) executeNextTransaction(ctx fvm.Context) (*types.Transaction
tr.Debug = b.debugSignatureError(tr.Error, txnBody)
}

//add to source map if any pragma
// add to source map if any pragma
if pragmas.Contains(PragmaSourceFile) {
location := common.NewTransactionLocation(nil, tr.TransactionID.Bytes())
sourceFile := pragmas.FilterByName(PragmaSourceFile).First().Argument()
Expand Down Expand Up @@ -1412,7 +1412,7 @@ func (b *Blockchain) commitBlock() (*flowgo.Block, error) {
return nil, err
}

//notify listeners on new block
// notify listeners on new block
b.broadcaster.Publish()

// reset pending block using current block and ledger state
Expand Down Expand Up @@ -1555,7 +1555,7 @@ func (b *Blockchain) executeScriptAtBlockID(script []byte, arguments [][]byte, i
scriptError = convert.VMErrorToEmulator(output.Err)
}

//add to source map if any pragma
// add to source map if any pragma
if pragmas.Contains(PragmaSourceFile) {
location := common.NewScriptLocation(nil, scriptID.Bytes())
sourceFile := pragmas.FilterByName(PragmaSourceFile).First().Argument()
Expand Down Expand Up @@ -1870,3 +1870,18 @@ func (b *Blockchain) executeSystemChunkTransaction() error {

return nil
}

func (b *Blockchain) GetRegisterValues(registerIDs flowgo.RegisterIDs, height uint64) (values []flowgo.RegisterValue, err error) {
ledger, err := b.storage.LedgerByHeight(context.Background(), height)
if err != nil {
return nil, err
}
for _, registerID := range registerIDs {
value, err := ledger.Get(registerID)
if err != nil {
return nil, err
}
values = append(values, value)
}
return values, nil
}
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ require (
github.com/onflow/cadence v1.0.0-preview.52
github.com/onflow/crypto v0.25.2
github.com/onflow/flow-core-contracts/lib/go/templates v1.3.1
github.com/onflow/flow-go v0.37.7-0.20240830182756-9ac9e1889c34
github.com/onflow/flow-go v0.38.0-preview.0
github.com/onflow/flow-go-sdk v1.0.0-preview.55
github.com/onflow/flow-nft/lib/go/contracts v1.2.1
github.com/onflow/flow/protobuf/go/flow v0.4.6
github.com/onflow/flow/protobuf/go/flow v0.4.7
github.com/prometheus/client_golang v1.18.0
github.com/psiemens/graceland v1.0.0
github.com/psiemens/sconfig v0.1.0
Expand Down Expand Up @@ -237,4 +237,4 @@ require (
rsc.io/tmplfunc v0.0.3 // indirect
)

replace github.com/onflow/flow-go v0.37.7-0.20240830182756-9ac9e1889c34 => github.com/The-K-R-O-K/flow-go v0.0.0-20240903093254-2a0462191007
replace github.com/onflow/flow-go v0.38.0-preview.0 => github.com/The-K-R-O-K/flow-go v0.0.0-20240926130756-162e6244a810
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -979,8 +979,8 @@ github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMx
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA=
github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8=
github.com/The-K-R-O-K/flow-go v0.0.0-20240903093254-2a0462191007 h1:ntpHF2kERa8n5VmUClBQPk1reZO7gad4VmBGzA6nBgM=
github.com/The-K-R-O-K/flow-go v0.0.0-20240903093254-2a0462191007/go.mod h1:A3/V6VO8ASI0pHyL5iEWXVd9qdNHYSPxdNt0tMxUYeY=
github.com/The-K-R-O-K/flow-go v0.0.0-20240926130756-162e6244a810 h1:V4AsE3Xej9ygMy0/sjnbrvHcxjSGyKfbB04AKqx9fCg=
github.com/The-K-R-O-K/flow-go v0.0.0-20240926130756-162e6244a810/go.mod h1:Gdqw1ptnAUuB0izif88PWMK8abe655Hr8iEkXXuUJl4=
github.com/VictoriaMetrics/fastcache v1.6.0/go.mod h1:0qHz5QP0GMX4pfmMA/zt5RgfNuXJrTP0zS7DqpHGGTw=
github.com/VictoriaMetrics/fastcache v1.12.1/go.mod h1:tX04vaqcNoQeGLD+ra5pU5sWkuxnzWhEzLwhP9w653o=
github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI=
Expand Down Expand Up @@ -2069,8 +2069,8 @@ github.com/onflow/flow-nft/lib/go/contracts v1.2.1/go.mod h1:2gpbza+uzs1k7x31hkp
github.com/onflow/flow-nft/lib/go/templates v1.2.0 h1:JSQyh9rg0RC+D1930BiRXN8lrtMs+ubVMK6aQPon6Yc=
github.com/onflow/flow-nft/lib/go/templates v1.2.0/go.mod h1:p+2hRvtjLUR3MW1NsoJe5Gqgr2eeH49QB6+s6ze00w0=
github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20231121210617-52ee94b830c2/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk=
github.com/onflow/flow/protobuf/go/flow v0.4.6 h1:KE/CsRVfyG5lGBtm1aNcjojMciQyS5GfPF3ixOWRfi0=
github.com/onflow/flow/protobuf/go/flow v0.4.6/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk=
github.com/onflow/flow/protobuf/go/flow v0.4.7 h1:iP6DFx4wZ3ETORsyeqzHu7neFT3d1CXF6wdK+AOOjmc=
github.com/onflow/flow/protobuf/go/flow v0.4.7/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk=
github.com/onflow/go-ethereum v1.14.7 h1:gg3awYqI02e3AypRdpJKEvNTJ6kz/OhAqRti0h54Wlc=
github.com/onflow/go-ethereum v1.14.7/go.mod h1:zV14QLrXyYu5ucvcwHUA0r6UaqveqbXaehAVQJlSW+I=
github.com/onflow/nft-storefront/lib/go/contracts v1.0.0 h1:sxyWLqGm/p4EKT6DUlQESDG1ZNMN9GjPCm1gTq7NGfc=
Expand Down
93 changes: 77 additions & 16 deletions server/access/streamBackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,50 +68,87 @@ func NewStateStreamBackend(blockchain *emulator.Blockchain, log zerolog.Logger)

var _ state_stream.API = &StateStreamBackend{}

func (b *StateStreamBackend) newSubscriptionByBlockId(
ctx context.Context,
startBlockID flow.Identifier,
f subscription.GetDataByHeightFunc,
) subscription.Subscription {
block, err := b.blockchain.GetBlockByID(startBlockID)
if err != nil {
return subscription.NewFailedSubscription(err, "could not get block by ID")
}
sub := subscription.NewHeightBasedSubscription(b.sendBufferSize, block.Header.Height, f)
go subscription.NewStreamer(b.log, b.blockchain.Broadcaster(), b.sendTimeout, b.responseLimit, sub).Stream(ctx)
return sub
}

func (b *StateStreamBackend) newSubscriptionByHeight(
ctx context.Context,
startHeight uint64,
f subscription.GetDataByHeightFunc,
) subscription.Subscription {
sub := subscription.NewHeightBasedSubscription(b.sendBufferSize, startHeight, f)
go subscription.NewStreamer(b.log, b.blockchain.Broadcaster(), b.sendTimeout, b.responseLimit, sub).Stream(ctx)
return sub
}

func (b *StateStreamBackend) newSubscriptionByLatestHeight(
ctx context.Context,
f subscription.GetDataByHeightFunc,
) subscription.Subscription {
block, err := b.blockchain.GetLatestBlock()
if err != nil {
return subscription.NewFailedSubscription(err, "could not get latest block")
}
sub := subscription.NewHeightBasedSubscription(b.sendBufferSize, block.Header.Height, f)
go subscription.NewStreamer(b.log, b.blockchain.Broadcaster(), b.sendTimeout, b.responseLimit, sub).Stream(ctx)
return sub
}

func (b *StateStreamBackend) SubscribeEventsFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
filter state_stream.EventFilter,
) subscription.Subscription {
return nil
return b.newSubscriptionByBlockId(ctx, startBlockID, b.getEventsResponseFactory(filter))
}

func (b *StateStreamBackend) SubscribeEventsFromStartHeight(
ctx context.Context,
startHeight uint64,
filter state_stream.EventFilter,
) subscription.Subscription {
return nil
return b.newSubscriptionByHeight(ctx, startHeight, b.getEventsResponseFactory(filter))
}

func (b *StateStreamBackend) SubscribeEventsFromLatest(
ctx context.Context,
filter state_stream.EventFilter,
) subscription.Subscription {
return nil
return b.newSubscriptionByLatestHeight(ctx, b.getEventsResponseFactory(filter))
}

func (b *StateStreamBackend) SubscribeAccountStatusesFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
filter state_stream.AccountStatusFilter,
) subscription.Subscription {
return nil
return b.newSubscriptionByBlockId(ctx, startBlockID, b.getAccountStatusResponseFactory(filter))
}

func (b *StateStreamBackend) SubscribeAccountStatusesFromStartHeight(
ctx context.Context,
startHeight uint64,
filter state_stream.AccountStatusFilter,
) subscription.Subscription {
return nil
return b.newSubscriptionByHeight(ctx, startHeight, b.getAccountStatusResponseFactory(filter))
}

func (b *StateStreamBackend) SubscribeAccountStatusesFromLatestBlock(
ctx context.Context,
filter state_stream.AccountStatusFilter,
) subscription.Subscription {
return nil
return b.newSubscriptionByLatestHeight(ctx, b.getAccountStatusResponseFactory(filter))
}

func getStartHeightFunc(blockchain *emulator.Blockchain) GetStartHeightFunc {
Expand Down Expand Up @@ -261,26 +298,26 @@ func (b *StateStreamBackend) SubscribeExecutionData(ctx context.Context, startBl
return subscription.NewFailedSubscription(err, "could not get start height")
}

sub := subscription.NewHeightBasedSubscription(b.sendBufferSize, nextHeight, b.getResponse)
sub := subscription.NewHeightBasedSubscription(b.sendBufferSize, nextHeight, b.getExecutionDataResponse)

go subscription.NewStreamer(b.log, b.blockchain.Broadcaster(), b.sendTimeout, b.responseLimit, sub).Stream(ctx)

return sub
}

func (b *StateStreamBackend) SubscribeExecutionDataFromStartBlockID(ctx context.Context, startBlockID flow.Identifier) subscription.Subscription {
return nil
return b.newSubscriptionByBlockId(ctx, startBlockID, b.getExecutionDataResponse)
}

func (b *StateStreamBackend) SubscribeExecutionDataFromStartBlockHeight(ctx context.Context, startBlockHeight uint64) subscription.Subscription {
return nil
return b.newSubscriptionByHeight(ctx, startBlockHeight, b.getExecutionDataResponse)
}

func (b *StateStreamBackend) SubscribeExecutionDataFromLatest(ctx context.Context) subscription.Subscription {
return nil
return b.newSubscriptionByLatestHeight(ctx, b.getExecutionDataResponse)
}

func (b *StateStreamBackend) getResponse(ctx context.Context, height uint64) (interface{}, error) {
func (b *StateStreamBackend) getExecutionDataResponse(ctx context.Context, height uint64) (interface{}, error) {
executionData, err := b.getExecutionData(ctx, height)
if err != nil {
return nil, fmt.Errorf("could not get execution data for block %d: %w", height, err)
Expand All @@ -296,20 +333,20 @@ type GetExecutionDataFunc func(context.Context, uint64) (*execution_data.BlockEx

type GetStartHeightFunc func(flow.Identifier, uint64) (uint64, error)

func (b StateStreamBackend) SubscribeEvents(ctx context.Context, startBlockID flow.Identifier, startHeight uint64, filter state_stream.EventFilter) subscription.Subscription {
func (b *StateStreamBackend) SubscribeEvents(ctx context.Context, startBlockID flow.Identifier, startHeight uint64, filter state_stream.EventFilter) subscription.Subscription {
nextHeight, err := b.getStartHeight(startBlockID, startHeight)
if err != nil {
return subscription.NewFailedSubscription(err, "could not get start height")
}

sub := subscription.NewHeightBasedSubscription(b.sendBufferSize, nextHeight, b.getResponseFactory(filter))
sub := subscription.NewHeightBasedSubscription(b.sendBufferSize, nextHeight, b.getEventsResponseFactory(filter))

go subscription.NewStreamer(b.log, b.blockchain.Broadcaster(), b.sendTimeout, b.responseLimit, sub).Stream(ctx)

return sub
}

func (b StateStreamBackend) getResponseFactory(filter state_stream.EventFilter) subscription.GetDataByHeightFunc {
func (b *StateStreamBackend) getEventsResponseFactory(filter state_stream.EventFilter) subscription.GetDataByHeightFunc {
return func(ctx context.Context, height uint64) (interface{}, error) {
executionData, err := b.getExecutionData(ctx, height)
if err != nil {
Expand All @@ -334,6 +371,30 @@ func (b StateStreamBackend) getResponseFactory(filter state_stream.EventFilter)
}
}

func (b StateStreamBackend) GetRegisterValues(registerIDs flow.RegisterIDs, height uint64) ([]flow.RegisterValue, error) {
return nil, status.Errorf(codes.Unimplemented, "not implemented")
func (b *StateStreamBackend) getAccountStatusResponseFactory(
filter state_stream.AccountStatusFilter,
) subscription.GetDataByHeightFunc {
return func(ctx context.Context, height uint64) (interface{}, error) {
executionData, err := b.getExecutionData(ctx, height)
if err != nil {
return nil, fmt.Errorf("could not get execution data for block %d: %w", height, err)
}

events := []flow.Event{}
for _, chunkExecutionData := range executionData.ChunkExecutionDatas {
events = append(events, filter.Filter(chunkExecutionData.Events)...)
}

allAccountProtocolEvents := filter.GroupCoreEventsByAccountAddress(events, b.log)

return &backend.AccountStatusesResponse{
BlockID: executionData.BlockID,
Height: height,
AccountEvents: allAccountProtocolEvents,
}, nil
}
}

func (b *StateStreamBackend) GetRegisterValues(registerIDs flow.RegisterIDs, height uint64) ([]flow.RegisterValue, error) {
return b.blockchain.GetRegisterValues(registerIDs, height)
}

0 comments on commit bab4756

Please sign in to comment.