Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

State distribution #292

Merged
merged 20 commits into from
Oct 12, 2024
Merged

State distribution #292

merged 20 commits into from
Oct 12, 2024

Conversation

hosie
Copy link
Contributor

@hosie hosie commented Oct 10, 2024

Adding reliable messages between nodes for the distribution of state as per the handshake described https://github.com/kaleido-io/paladin/blob/engine-docs/architecture/distributed-transaction-management.md#distribution-of-private-state-data

Also took steps towards improving the threading model and error handling for the co-ordinator / privateTransactionManger orchestrator. (TODO sort out a consistent naming of these things)

TODO:

  • refactor: move stateDistributer out of privatetxnmgr and into internal/statedistributer
  • clean up the usage of the identityLocator in transport messages. We are currently abusing the who part of who@where for internal component
  • use flush writer to store the state on the receiving node ( currently creates a new DB TX for each one)
  • unit test coverage
    • some of the new code does not have unit test but is covered by the new component test so overall coverage has not dropped below the threshold
  • manual exploratory test with zeto domain. Given that AFAIK, this should work but I don't know what I don't know
    • had to fix an issue where VerifierType was missing on some data structures
    • given that zeto does not require any signatures, the PostAssembly.Signatures array was never being initialised and failed the validation here https://github.com/kaleido-io/paladin/blob/09d9081f68185c5d9585472c35ce6f8ae5fa2d20/core/go/internal/domainmgr/private_smart_contract.go#L299
    • had to use debugger and add a bunch of debug logging because we don't record the error reasons for failed transactions or transactions that are currently in a retry loop ( and sometimes don't realise that we should be in a retry loop). Next priority item is to improve this error handling and reporting. Not sure if it will make it into this PR before it gets merged.
  • add white box test for reliability - e.g. similar to component test but rather than use real grpc transport, use a fake transport that simulates network unreliability
  • restart recover testing - test that message delivery is reliable over a node restart

Last 2 bullets may defer until we have implemented all of the other reliable message exchanges (endorsement requests, delegate to remote coordinator, dispatch to remote submitter etc...)

I think there is potential for some generic code to be teased out of this and used for the other cross node message exchanges where we need but for now, I'll just look to get the code into a sensible structure as a step towards that.

One particular decision point that I could do with @peterbroadhurst review on is that I added an optional parameter to StateManager.GetSchema where the caller can chose to pass a database transaction. Previously, if this was called during a DB transaction (e.g. WriteReceivedStates ) then it was always creating a new transaction which flat out doesn't work on sqllite and on PostGres would be wasteful of DB resources. I did consider calling GetSchema before calling WriteReceivedStates to force the schema to be cached but wasn't completely comfortable that would always be safe.

Signed-off-by: John Hosie <[email protected]>
Signed-off-by: John Hosie <[email protected]>
Signed-off-by: John Hosie <[email protected]>
Signed-off-by: John Hosie <[email protected]>
Signed-off-by: John Hosie <[email protected]>
Signed-off-by: John Hosie <[email protected]>
Signed-off-by: John Hosie <[email protected]>
Signed-off-by: John Hosie <[email protected]>
Signed-off-by: John Hosie <[email protected]>
Signed-off-by: John Hosie <[email protected]>
Signed-off-by: John Hosie <[email protected]>
Signed-off-by: John Hosie <[email protected]>
@hosie hosie marked this pull request as ready for review October 12, 2024 08:40
}
}

_, err := rsw.stateManager.WriteReceivedStates(ctx, tx, values[0].DomainName, stateUpserts)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using the DomainName from the first value in the array but it applies to all stateUpserts so we are making an assumption that they are all on the same domain. This is a safe assumption while WriteKey returns DomainName https://github.com/kaleido-io/paladin/pull/292/files#diff-81f7ddfed27a2e83ea5d097162f6fbcc56a85dce3836841563a3dbd3ec353e0dR51

however, I feel that this could be a brittle assumption in future and wonder if we wanted to find some way to codify this assertion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a safe assumption while WriteKey returns DomainName

Not not I understand why this helps. It definitely means that all writes for the same domain go to the same writer, but it doesn't give us any assurance that all writes in the batch are for the same domain.

I think the logic in this function needs to group the writes in a map[string][]*components.StateUpsertOutsideContext by domain, and then call rsw.stateManager.WriteReceivedStates multiple times (once per domain).

Or we update WriteReceivedStates to take the domain in each record.

Happy with either, and happy to help with that (but after I make the task switch)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah. ok. worse than I thought then. I'll fix this. I have a similar situation in my next PR too so I'll get it right there off the bat.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will probably copy the pattern proven out here 647d234

nodeID: nodeID,
}
sd.acknowledgementWriter = NewAcknowledgementWriter(ctx, sd.persistence, &conf.AcknowledgementWriter)
sd.receivedStateWriter = NewReceivedStateWriter(ctx, stateManager, persistence, &conf.ReceivedStateWriter)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was in 2 minds here whether to combine these into a single writer and therefore a single pool of flush workers. Decided to keep them separate in the interest of simpler code ( albeit more LOC) and to allow finer grained tuning through config. Since making this decision, I have become less confident that it is the correct decision and think I might come back to this and fold them into one. But interested in review comments to sway my thinking.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😄 - I'd indeed made comments earlier in the PR review on this. Merging into one seems good to me, but I also don't think it's urgent.

Maybe it goes along with resolving the different-domain-per-state issue discussed above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR I am currently working on for error handling, I am taking the single writer approach in privatetxmgr ( for persisting dispatches to public and for finalizing reverts to TxMgr) so will work out an idiomatic code structure for that and maybe copy the pattern here later.

Copy link
Contributor

@peterbroadhurst peterbroadhurst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really great @hosie - having an eventually-consistent state distribution model is an approach I really like.

  • Sender records that it's trying to get states to receiver
  • Sender records persistently, but lazily, that it's got acknowledgement
  • On restart (and I think periodic going forwards) sender tries to re-send states

This feels both performant, and resilient.

Some comments as I went through, for you/I to factor in at an appropriate point. Nothing at all that stops this going into main though, so merging.

}

type StateDistributerConfig struct {
AcknowledgementWriter FlushWriterConfig `json:"acknowledgementWriter"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conversation for another day - but I wonder how valuable it is to have lots of individual ones of these, with individual thread pools, vs. just a single one.

If we go for lots - I think there's a bit of work I should do to have the go-routines spin up on demand rather than sitting around indefinitely as they do today.

@@ -233,6 +238,10 @@ func newInstanceForComponentTesting(t *testing.T, domainRegistryAddress *tktypes
},
}

//uncomment for debugging
//i.conf.DB.SQLite.DSN = "./sql." + i.name + ".db"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pokes a bit on why PostgreSQL isn't supported in some of these tests.

  • Being able to switch to postgres is extremely helpful for debugging - I do this a lot for SQL debugging in the tests that support it and run with both in the build
  • We should in the build run these more complete component tests with PSQL as well as SQLite, or if we can only do one - it should be PSQL as that's the one that's most important

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had assumed sqllite in memory was preferred for speed of test execution. I guess we could use the t.Short flag to control that and maybe have a gradle testShort task for developers to run if they do want quick test run not to interrupt their dev flow too much. There are some other tests that I'd probably just skip altogether in short mode.

@@ -10,5 +10,26 @@ CREATE TABLE dispatches (

CREATE UNIQUE INDEX dispatches_public_private ON dispatches("public_transaction_address","public_transaction_nonce","private_transaction_id");

CREATE TABLE state_distributions (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're going to want timestamps in these tables, if we're taking the overhead of using the DB as a log (and resilient queue) of what we have sent / received.

We'll be very likely to want to query this using a suitable JSON/RPC a lot in debugging scenarios... in fact adding that query is likely something I'll do quite soon after starting using this.

@@ -244,8 +244,8 @@ func (ir *identityResolver) handleResolveVerifierRequest(ctx context.Context, me
err = ir.transportManager.Send(ctx, &components.TransportMessage{
MessageType: "ResolveVerifierResponse",
CorrelationID: requestID,
ReplyTo: tktypes.PrivateIdentityLocator(fmt.Sprintf("%s@%s", IDENTITY_RESOLVER_DESTINATION, ir.nodeID)),
Destination: replyTo,
Component: IDENTITY_RESOLVER_DESTINATION,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really pleased to see this worked through 👍

MsgComponentIdentityResolverStartError = ffe("PD010030", "Error starting identity resolver")
MsgComponentAdditionalMgrInitError = ffe("PD010031", "Error initializing %s manager")
MsgComponentAdditionalMgrStartError = ffe("PD010032", "Error initializing %s manager")
MsgPrivateTxManagerInvalidEventMissingField = ffe("PD010033", "Invalid event: missing field %s")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noting wrong prefix on this error.

I have a TODO to sort out prefixes (public TX mgr in particularly is not following convention) so happy to pick that up.

}
}

_, err := rsw.stateManager.WriteReceivedStates(ctx, tx, values[0].DomainName, stateUpserts)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a safe assumption while WriteKey returns DomainName

Not not I understand why this helps. It definitely means that all writes for the same domain go to the same writer, but it doesn't give us any assurance that all writes in the batch are for the same domain.

I think the logic in this function needs to group the writes in a map[string][]*components.StateUpsertOutsideContext by domain, and then call rsw.stateManager.WriteReceivedStates multiple times (once per domain).

Or we update WriteReceivedStates to take the domain in each record.

Happy with either, and happy to help with that (but after I make the task switch)

nodeID: nodeID,
}
sd.acknowledgementWriter = NewAcknowledgementWriter(ctx, sd.persistence, &conf.AcknowledgementWriter)
sd.receivedStateWriter = NewReceivedStateWriter(ctx, stateManager, persistence, &conf.ReceivedStateWriter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😄 - I'd indeed made comments earlier in the PR review on this. Merging into one seems good to me, but I also don't think it's urgent.

Maybe it goes along with resolving the different-domain-per-state issue discussed above

}

var stateDistributions []StateDistributionPersisted
err = sd.persistence.DB().Table("state_distributions").
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This query need pagination, and probably also some consideration for dealing with fairness across parties we're distributing to.

It's feasible, and probably likely, that we'll have situations where a party we're looking to share states with for transactions becomes unreachable for a long period. We build up a lot of cruft for them, which might never get resolved, but we also need to make sure we're distributing states efficiently to other parties.

Simplest is probably to run a state distribution worker per peer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great point. Don't want to forget about this so have opened an issue #299

}

func (sd *stateDistributer) sendStateAcknowledgement(ctx context.Context, domainName string, contractAddress string, stateId string, receivingParty string, distributingNode string, distributionID string) error {
log.L(ctx).Debugf("stateDistributer:sendStateAcknowledgement %s %s %s %s %s %s", domainName, contractAddress, stateId, receivingParty, distributingNode, distributionID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in logs it's super helpful to have domainName=%s in these key multi-input log lines

@peterbroadhurst peterbroadhurst merged commit 5bf1c7d into main Oct 12, 2024
3 checks passed
@peterbroadhurst peterbroadhurst deleted the state-distribution branch October 12, 2024 12:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants