Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shard aware batching - add Session::shard_for_statement & Batch::enforce_target_node #738

Open
wants to merge 17 commits into
base: main
Choose a base branch
from

Conversation

Ten0
Copy link
Contributor

@Ten0 Ten0 commented Jun 3, 2023

Resolves #468
Resolves #974
Closes #975 (since that is just an alternate way to fix #974 and this implementation is probably more direct)

This is a follow-up on #508 and #658:

  • To minimize CPU usage related to network operations when inserting a very large number of lines, it is relevant to batch.
  • To batch in the most efficient manner, these batches have to be shard-aware. Since Pick connections based on batch first statement's shard #508, batch will pick the shard of the first statement to send the query to. However it is left to the user to constitute the batches in such a way that the target shard is the same for all the elements of the batch.
  • This was made possible by Expose calculate_token #658, but it was still very boilerplate-ish. I was waiting for transport: load balancing module refactor #612 to be merged (amazing work btw! 😃) to implement a more direct and factored API (as that would use it).
  • This new ~Session::first_shard_for_statement(self, &PreparedStatement, &SerializedValues) -> Option<(Node, Shard)> makes shard-aware batching easy on the users, by providing access to the first node and shard of the query plan.
  • As pointed out in the first review, this was not enough considering the load balancer (now?) picks a random replica at each call. To make that deterministic when constituting batches "by hand", a new enforce_target_node method is added on Batch to be able to choose the target node (that makes use of the load balancing API via an EnforceTargetNodePolicy struct, so no significant change).

Pre-review checklist

  • I have split my patch into logically separate commits. (I propose we squash-and-merge)
  • All commit messages clearly explain what they change and why.
  • I added relevant tests for new features and bug fixes.
  • All commits compile, pass static checks and pass test.
  • PR description sums up the changes and reasons why they should be introduced.
  • I have provided docstrings for the public items that I want to introduce.
  • I have adjusted the documentation in ./docs/source/.
  • I added appropriate Fixes: annotations to PR description.

Resolves scylladb#468

This is a follow-up on scylladb#508 and scylladb#658:
- To minimize CPU usage related to network operations when inserting a very large number of lines, it is relevant to batch.
- To batch in the most efficient manner, these batches have to be shard-aware. Since scylladb#508, `batch` will pick the shard of the first statement to send the query to. However it is left to the user to constitute the batches in such a way that the target shard is the same for all the elements of the batch.
- This was made *possible* by scylladb#658, but it was still very boilerplate-ish. I was waiting for scylladb#612 to be merged (amazing work btw! 😃) to implement a more direct and factored API (as that would use it).
- This new ~`Session::first_shard_for_statement(self, &PreparedStatement, &SerializedValues) -> Option<(Node, Option<Shard>)>` makes shard-aware batching easy on the users, by providing access to the first node and shard of the query plan.
@Ten0
Copy link
Contributor Author

Ten0 commented Jun 3, 2023

One more clippy lint I don't agree with -.-
Type seems clear enough and I feel like adding a new one would instead increase overall complexity. Fine if I silence that one?

@Ten0 Ten0 changed the title Add Session::first_shard_for_statement Shard aware batching - add Session::first_shard_for_statement Jun 3, 2023
@wprzytula wprzytula self-requested a review June 7, 2023 07:26
scylla/src/transport/session.rs Outdated Show resolved Hide resolved
scylla/src/transport/session.rs Outdated Show resolved Hide resolved
scylla/src/transport/session.rs Outdated Show resolved Hide resolved
scylla/src/transport/session.rs Outdated Show resolved Hide resolved
@Ten0 Ten0 force-pushed the easier_access_to_shard_awareness_api branch from 845feee to 20c32b8 Compare July 2, 2023 10:13
@Ten0 Ten0 force-pushed the easier_access_to_shard_awareness_api branch from 7e47f67 to 146655e Compare July 2, 2023 12:56
@Ten0 Ten0 force-pushed the easier_access_to_shard_awareness_api branch from 533fac4 to 6406e70 Compare July 2, 2023 13:28
scylla/src/transport/load_balancing/enforce_node.rs Outdated Show resolved Hide resolved
scylla/src/transport/session.rs Outdated Show resolved Hide resolved
@wprzytula
Copy link
Collaborator

Have you tested your solution? How? Could you write some unit tests?

@wprzytula
Copy link
Collaborator

I can see that you merged main into your branch. As our policy is to rebase, could you instead rebase your branch on main, please?

@wprzytula
Copy link
Collaborator

wprzytula commented Jul 3, 2023

The approach with enforce_target_node() is very interesting and it seems to be working, at least conceptually. I'm a bit worried, however, that it may turn out to be somewhat fragile. For instance, consider a situation when an enforced node is removed from the cluster. Not only will the correct shard not be targeted, but also each query will involve an expensive call to fallback(). Even worse, in order to repair this, the application will have to be restarted.

@Ten0
Copy link
Contributor Author

Ten0 commented Jul 3, 2023

For instance, consider a situation when an enforced node is removed from the cluster. Not only will the correct shard not be targeted

If the node is removed from the cluster, new lines on which one would call shard_for_cluster wouldn't associate with this shard anymore, so the task dedicated to sending batches to this shard wouldn't receive any more messages to send, which would just be a dangling sleeping Tokio task until app restart, which is reasonably inexpensive (one could even clean that up but it seems overkill).

each query will involve an expensive call to fallback()

I'm making the call to the non-fallback function of self.fallback inside the pick function, so that should be pretty inexpensive for each message whose shard/batching task has already been picked before the node was removed.

@wprzytula
Copy link
Collaborator

I'm making the call to the non-fallback function of self.fallback inside the pick function, so that should be pretty inexpensive for each message whose shard/batching task has already been picked before the node was removed.

Correct, I missed it.

If the node is removed from the cluster, new lines on which one would call shard_for_cluster wouldn't associate with this shard anymore, so the task dedicated to sending batches to this shard wouldn't receive any more messages to send [...]

It seems that I don't quite understand your use case. As I imagine it, one would once (upon application initialisation) consistute/associate a batch with a specific shard using shard_for_node() and enforce_target_node(), and then keep calling Session::batch() with that batch for the whole lifetime of the application. That would mean incorrect node being enforced forever if the target node is removed. How does your use case prevent that?

@Ten0
Copy link
Contributor Author

Ten0 commented Jul 3, 2023

As I imagine it, one would once (upon application initialisation) consistute/associate a batch with a specific shard

No, rather I have an app that receives millions of lines per second to insert. These lines are processed, spread over multiple threads, in a manner that involves just identifying the shard, and then looking up in a HashMap<(NodeUuid, ShardIdxOnNode), Sender> for the lazily instantiated channel that sends to a task that constitutes and sends the batches for a given target shard. This allows getting rid of ~90% of the scylla driver CPU consumption (according to perf).

Now if a node is removed, the task will be left dangling, and a few messages will be sent to a default-policy-chosen shard by the time its channel is drained, but new messages will be assigned to an alive shard (and its corresponding channel).

@wprzytula
Copy link
Collaborator

No, rather I have an app that receives millions of lines per second to insert. These lines are processed, spread over multiple threads, in a manner that involves just identifying the shard, and then looking up in a HashMap<(NodeUuid, ShardIdxOnNode), Sender> for the lazily instantiated channel that sends to a task that constitutes and sends the batches for a given target shard.

Let me dig deeper. What is the lifetime of your Batch structs? Do they contain PreparedStatements? What is their lifetime?
Are the lines that you get every second merely bind values to be sent with prepared statements with fixed query string?

@Ten0
Copy link
Contributor Author

Ten0 commented Jul 3, 2023

Are the lines that you get every second merely bind values to be sent with prepared statements with fixed query string?

Basically yes. There's a bunch of different prepared statements but those are prepared at application startup.
A Batch object is initialized once in each task dedicated to a shard (with its overridden execution policy from this PR). When it has constituted a set of lines to insert, it sets the corresponding prepared statements in the Batch, then sends it.

@wprzytula
Copy link
Collaborator

Are the lines that you get every second merely bind values to be sent with prepared statements with fixed query string?

Basically yes. There's a bunch of different prepared statements but those are prepared at application startup. A Batch object is initialized once in each task dedicated to a shard (with its overridden execution policy from this PR). When it has constituted a set of lines to insert, it sets the corresponding prepared statements in the Batch, then sends it.

I think I've got it, many thanks. I'm convinced now that what you propose in this PR does work in your case. I still wonder, however, if we can make it more universal. So far no good ideas how to do that, though.

@wprzytula
Copy link
Collaborator

Maybe you could write an example called, say, shard_aware_batching.rs, and present a complete minimal way to utilise shard_for_statement() and enforce_target_node() - similar to what you put in the docstring, but complete? This would let others follow your design more easily and adapt it to their workloads.

@Ten0 Ten0 force-pushed the easier_access_to_shard_awareness_api branch from a484a4c to a651fed Compare July 3, 2023 13:12
@Ten0 Ten0 changed the title Shard aware batching - add Session::first_shard_for_statement Shard aware batching - add Session::shard_for_statement & Batch::enforce_target_node Jul 3, 2023
@Ten0 Ten0 force-pushed the easier_access_to_shard_awareness_api branch from cfdeff6 to d5f71a9 Compare May 25, 2024 20:38
@Ten0
Copy link
Contributor Author

Ten0 commented May 25, 2024

@wprzytula

This is also related to #974 where a batch that is token-aware would be much more efficient.

Updated the PR. It looks like it indeed does solve #974. 😊
Also #788 did indeed solve this issue, thanks! 😊

(NB: I know this will need squash because oldest commits are very old and we don't want spaghetti on main - can we merge this by using Github's squash-and-merge feature rather than force-pushing here and losing history?)

@wprzytula
Copy link
Collaborator

After discussion with @Lorak-mmk, our impression is that we still don't understand the approach taken by this PR. Why would we want to hardwire the destination node and shard manually, instead of relying on the default load balancer?
There is a heuristics that chooses the target node and shard based on token of the first statement in a batch. Why is it not enough? Once other statements in a batch are destined to the same node and shard, then performance will be optimal.

Hardwiring the target node and shard has significant drawbacks. RetryPolicy and SpeculativeExecutionPolicy are going to malfunction on single-target query plans. It's the responsibility of the user to select the correct node and shard.

Clearly, it's unclear to us why the problem of shard-aware batches is approached this way.

@Ten0
Copy link
Contributor Author

Ten0 commented Jun 20, 2024

Why is it not enough?

Thanks for the review.

I initially thought it was enough as well, however

As pointed out in the first review, this was not enough considering the load balancer (now?) picks a random replica at each call.

This means that if we check which node a statement would pick to put it in a batch, then send the batch, the batch may randomly be sent to a different shard that is appropriate for that statement, but not necessarily for all other statements of the batch, resulting in them having one chance out of nb_replicas to actually hit in shard-aware.

Following this change:

I have also tested the solution in a multi node cluster at 1M lines/s and could see a much reduced metric of non-shard-aware queries in the monitoring (that metric is documented in Grafana as being wrong if there are batches but the fact it went from 60% to 10% probably still says that it was more targeted, so properly targeted).

@Lorak-mmk
Copy link
Collaborator

Why is it not enough?

Thanks for the review.

I initially thought it was enough as well, however

As pointed out in the first review, this was not enough considering the load balancer (now?) picks a random replica at each call.

This means that if we check which node a statement would pick to put it in a batch, then send the batch, the batch may randomly be sent to a different shard that is appropriate for that statement, but not necessarily for all other statements of the batch, resulting in them having one chance out of nb_replicas to actually hit in shard-aware.

The solution is to batch by token, not by "first" replica.

Following this change:

I have also tested the solution in a multi node cluster at 1M lines/s and could see a much reduced metric of non-shard-aware queries in the monitoring (that metric is documented in Grafana as being wrong if there are batches but the fact it went from 60% to 10% probably still says that it was more targeted, so properly targeted).

@Ten0
Copy link
Contributor Author

Ten0 commented Jun 20, 2024

The solution is to batch by token, not by "first" replica.

Unless I'm mistaken because the space of tokens is large this would imply that we very rarely are able to batch, whereas batching by replica allows to group by "one random acceptable replica" for each statement.

@Lorak-mmk
Copy link
Collaborator

The solution is to batch by token, not by "first" replica.

Unless I'm mistaken because the space of tokens is large this would imply that we very rarely are able to batch, whereas batching by replica allows to group by "one random acceptable replica" for each statement.

Afaik it doesn't really make sense to perform multi partition batches. This incurs additional work for the coordinator node and you would be better off just using single, non-batched writes.
So if your data to insert is spread across partitions so that there is no significant amount of data to update in any single partition - just use single non-batched writes.
If you have data that is more grouped by partitions (meaning you have many updates to a partition) then it may make sense to use batches (but you should benchmark if it gives you performance advantage over single writes), and you can group them by Token.

@Ten0
Copy link
Contributor Author

Ten0 commented Jun 20, 2024

Afaik it doesn't really make sense to perform multi partition batches. This incurs additional work for the coordinator node and you would be better off just using single, non-batched writes.

The main issue with not batching is that going through the heavy Tokio & networking machinery for every statement puts x5 CPU load on the server that makes the scylla calls using the Rust driver, whereas with batching that load is shared across coordinators (and probably less significant than managing Tokio futures is even if ignoring the fact that work is shared, because Tokio turned out to be heavy in my benchmarks).
I'm aware that the driver currently attempts to batch networking at low level to some extent (and have read the relevant blog post), but that 1. doesn't cut off Tokio heaviness and 2. Only waits for a single Tokio yield to try to append more, whereas I significantly benefit from waiting more to build larger network batches.

Using batches enabled me to x5 my insertion speed due to this.

@lvboudre
Copy link

Afaik it doesn't really make sense to perform multi partition batches. This incurs additional work for the coordinator node and you would be better off just using single, non-batched writes.

I forge my own batches and I do see increase in performance and lower latency. I did this couple months ago, I don't have the number with me, but it was a no brainer to code more in order to do batching locally before sending off to the scylla driver.

@Lorak-mmk
Copy link
Collaborator

Lorak-mmk commented Jun 21, 2024

Afaik it doesn't really make sense to perform multi partition batches. This incurs additional work for the coordinator node and you would be better off just using single, non-batched writes.

The main issue with not batching is that going through the heavy Tokio & networking machinery for every statement puts x5 CPU load on the server that makes the scylla calls using the Rust driver, whereas with batching that load is shared across coordinators (and probably less significant than managing Tokio futures is even if ignoring the fact that work is shared, because Tokio turned out to be heavy in my benchmarks). I'm aware that the driver currently attempts to batch networking at low level to some extent (and have read the relevant blog post), but that 1. doesn't cut off Tokio heaviness and 2. Only waits for a single Tokio yield to try to append more, whereas I significantly benefit from waiting more to build larger network batches.

Would you be able to share some benchmark we could reproduce? If the problem is driver's performance then my preferred solution would be to improve driver's performance instead of doing hacky workarounds.
As for the solution for you for now: shard_for_token makes no sense because of tablets and mixed-shards clusters. If I understand correctly you want to batch together statements that have 1 common replica, and send them to this replica. I think you can do this today. You can use ClusterData::replica_locator to get ReplicaLocator struct, which has replicas_for_token method you can call. This method returns ReplicaSet which implements IntoIterator of replicas for a given token.
You can then use whatever heuristics you like to batch statements (IIUC you will want to use first element of this iterator as grouping key), and create your own LoadBalancingPolicy to use on such batch. This LBP would return plan consisting of 1 element. If you have trouble implementing this let me know and I'll try to prepare some PoC

Using batches enabled me to x5 my insertion speed due to this.

I'd really like to see and reproduce the benchmark to investigate if we can improve this.

@Ten0
Copy link
Contributor Author

Ten0 commented Jun 25, 2024

Thanks for your answer.

If the problem is driver's performance then my preferred solution would be to improve driver's performance instead of doing hacky workarounds.

I sure see your point - if there's a way to reduce the load in the driver without putting extra load on the servers and/or exposing less type-safe APIs it's definitely best to do that instead.

I'd really like to see and reproduce the benchmark to investigate if we can improve this.

I don't think I still have the perf graphs but I can tell you what did show up on it.

Bear in mind that for each statement, the work I do is otherwise reading a buffered line, some minor parsing to get a partition key, pushing the parsed line into a batch of say 64 lines, and then doing the operations below only 1 out of 64 times (thanks to batching).

What shows up on perf is:

NB: We should probably merge these discussions because they seem to be about the same thing, but I'll copy it here because it's also relevant to this particular sub-discussion (from #468 (comment)): for the use-case of performing High-Frequency Append-Log Style Operations like is my case and that of the author of #974, it seems clear that more scylla servers will be required to store & perform maintenance operations than client servers (~1/10 ratio), so all things otherwise equal, being able to offload some work to the servers doesn't seem necessarily bad.

To answer the question from the same thread:

OK, could you re-iterate what kind of API you need specifically access to in order to get such client-side performance gains?

Fundamentally what I need is to send statements in a processing queue, where I don't necessarily care about getting each individual result as an async return (which seems to be the heavy part), but I do need that once a line is processed it gets sent to the next step of the pipeline.

What allows to achieve optimal performance is that there is nowhere in the pipeline async sleep/wakeups that occur for each statement, because:

  • Scylla part is batched, and batches are always sent to a single node (so always 1 networking call, result channel...)
  • Everything else in the pipeline uses relaxed_channel

Wakeups do otherwise cost a lot, relatively, as I tried to explain in the relaxed_channel documentation. (Please let me know if the explanation there is unclear as I could otherwise forward my talk on this topic from the Nov. 2023 Paris Rust Meetup which may give more detail.)

Currently I achieve this no-per-statement-wakeup by:

  • Looking up a target shard (that's only the first step of the relatively heavy call tree above, and notably it's not async.) (I serialize before performing this operation, and lookup the shard from the serialized values & prepared statement.)
  • Sending to a channel dedicated to that shard (reasonably lightweight thanks relaxed_channel)
  • The task dedicated to that shard makes batches from that channel using chunks_timeout (~64 size)
  • Batch is sent without re-serializing (IIUC this is your main concern with the proposed API with regards to reduced type safety?)
  • If batch is successfully inserted, send all the lines to the next step of the pipeline.
  • Otherwise, retry the whole batch (soon, shouldn't go at the back of the queue because of the requirements of the next pipeline steps) (errors happens seldom so shows no perf impact)
  • In case of unrecoverable error, error the shard task, which stops the program ~gracefully.

Indeed I don't care about the other properties of batches that Scylla may try to uphold. My understanding is that if batch is Unlogged they are minimal - am I mistaken ?

shard_for_token makes no sense because of tablets and mixed-shards clusters

My understanding is that this would result in most of the time, hitting a shard that will be one of those that holds one of the copies of this particular data. Am I mistaken?
If I am indeed mistaken here, then I guess a smaller version of batch that would uphold even smaller guarantees would be a useful API.

I don't care that I hit a correct shard 100% of the time: if the Scylla nodes need to transfer the call to another node 1% of the time because there was a tablet split it doesn't look like it's going to have any impact.

Actually even hitting a correct shard almost every time probably has almost no impact: my understanding is that the cluster will still perform as many extra network jumps for each statement as there are replicas, so that's just one more, so I could probably just random-batch.
What I'm trying to optimize here compared to the random-batch solution is that I hit a relevant shard most of the time, as opposed to statistically never, requiring one extra jump all the time.

My understanding is that since the Scylla servers will already perform some per-statement jumps whatever happens, letting it perform the per-statement work instead of doing it via per-statement async management in tokio enables to reduce the number of clients without significantly increasing the load on the Scylla servers in a way that would require provisioning more of them. Please let me know if I am mistaken.

Thanks!

@mykaul
Copy link
Contributor

mykaul commented Jun 25, 2024

Thanks for your answer.

If the problem is driver's performance then my preferred solution would be to improve driver's performance instead of doing hacky workarounds.

I sure see your point - if there's a way to reduce the load in the driver without putting extra load on the servers and/or exposing less type-safe APIs it's definitely best to do that instead.

General comment, not specific to this issue - I think it makes sense to always offload more to the client, if possible, rather than push more work to the servers. There are most likely (in most scenarios) far more clients and they can be scaled out and so on than server resources.

@Ten0
Copy link
Contributor Author

Ten0 commented Jun 25, 2024

I think it makes sense to always offload more to the client - There are most likely (in most scenarios) far more clients and they can be scaled out

Definitely. Although it seems that unless I'm mistaken, in this particular case (append-log style operations) the load difference if a user chooses to use the API that offloads (namely, the batch API) may be insignificant on the servers, but an order of magnitude of difference on the clients, so maybe the approach of using batches for that purpose may be relevant, and if that's indeed the case, then proper shard awareness for batches (to the extent possible) is just uncompromising (perf-wise) load reduction on the server.

@Lorak-mmk
Copy link
Collaborator

Lorak-mmk commented Jul 2, 2024

I don't think I still have the perf graphs but I can tell you what did show up on it.

Bear in mind that for each statement, the work I do is otherwise reading a buffered line, some minor parsing to get a partition key, pushing the parsed line into a batch of say 64 lines, and then doing the operations below only 1 out of 64 times (thanks to batching).

What shows up on perf is:

* Statements being handled individually and independently, independent tokio tasks keep [going to sleep and be woken up for each statement](https://github.com/tokio-rs/tokio/blob/06582776a564c88a7b4c6f9e3d7c0ebd0ef3f34b/tokio/src/runtime/task/harness.rs#L94-L230).

* It does for every statement [all](https://github.com/scylladb/scylla-rust-driver/blob/1fcadc99055fe21a47ec623a8f1af1bb8790d186/scylla/src/transport/session.rs#L1006-L1065) [operations](https://github.com/scylladb/scylla-rust-driver/blob/1fcadc99055fe21a47ec623a8f1af1bb8790d186/scylla/src/transport/session.rs#L1512-L1657) [under](https://github.com/scylladb/scylla-rust-driver/blob/1fcadc99055fe21a47ec623a8f1af1bb8790d186/scylla/src/transport/session.rs#L1673-L1778) [`execute`](https://github.com/scylladb/scylla-rust-driver/blob/1fcadc99055fe21a47ec623a8f1af1bb8790d186/scylla/src/transport/connection.rs#L698-L742) (notably [dedicated channel](https://github.com/scylladb/scylla-rust-driver/blob/1fcadc99055fe21a47ec623a8f1af1bb8790d186/scylla/src/transport/connection.rs#L116-L154)).

Oh sorry, I didn't mean the results of the benchmark, but how to perform the benchmark (source code that uses driver, Scylla version, config and machines used etc), so we can reproduce the problem, make changes in the driver and measure the difference.

Wakeups do otherwise cost a lot, relatively, as I tried to explain in the relaxed_channel documentation. (Please let me know if the explanation there is unclear as I could otherwise forward my talk on this topic from the Nov. 2023 Paris Rust Meetup which may give more detail.)

I understand the explanation, but the talk sounds interesting anyway. Could you share it?

Currently I achieve this no-per-statement-wakeup by:

* Looking up a target shard (that's only the first step of the _relatively_ heavy call tree above, and notably [it's not async](https://github.com/scylladb/scylla-rust-driver/pull/738/files#diff-80ba8e49418bed2ad375905c49b6784a23c454eac059d99c3e18679a8525d9feR1826).) (I serialize before performing this operation, and lookup the shard from the serialized values & prepared statement.)

* Sending to a channel dedicated to that shard (reasonably lightweight thanks [`relaxed_channel`](https://docs.rs/relaxed_channel/latest/relaxed_channel/index.html))

* The task dedicated to that shard makes batches from that channel using [`chunks_timeout`](https://docs.rs/futures-batch/latest/futures_batch/trait.ChunksTimeoutStreamExt.html#method.chunks_timeout) (~64 size)

* Batch is sent without re-serializing (IIUC this is your main concern with the proposed API with regards to reduced type safety?)

* If batch is successfully inserted, send all the lines to the next step of the pipeline.

* Otherwise, retry the whole batch (soon, shouldn't go at the back of the queue because of the requirements of the next pipeline steps) (errors happens seldom so shows no perf impact)

* In case of unrecoverable error, error the shard task, which stops the program [~gracefully](https://crates.io/crates/tokio_tasks_shutdown).

Ok, I think I understand your use case now.
Regarding serializing: what I want is to keep type safety in APIs exposed by the driver. We'll be able to support this use case by introducing BoundStatement.
You can use old API or custom wrappers for this in the meantime - which is what you are doing iiuc.

Indeed I don't care about the other properties of batches that Scylla may try to uphold. My understanding is that if batch is Unlogged they are minimal - am I mistaken ?

There must be some overhead for multi-partition unlogged batches, but I'm not sure how big it is in practice. Coordinator will have to split the batch and execute it's statements separatly iiuc.
Maybe @avikivity would know how big the overhead typically is?

shard_for_token makes no sense because of tablets and mixed-shards clusters

My understanding is that this would result in most of the time, hitting a shard that will be one of those that holds one of the copies of this particular data. Am I mistaken? If I am indeed mistaken here, then I guess a smaller version of batch that would uphold even smaller guarantees would be a useful API.

I don't care that I hit a correct shard 100% of the time: if the Scylla nodes need to transfer the call to another node 1% of the time because there was a tablet split it doesn't look like it's going to have any impact.

Actually even hitting a correct shard almost every time probably has almost no impact: my understanding is that the cluster will still perform as many extra network jumps for each statement as there are replicas, so that's just one more, so I could probably just random-batch. What I'm trying to optimize here compared to the random-batch solution is that I hit a relevant shard most of the time, as opposed to statistically never, requiring one extra jump all the time.

My understanding is that since the Scylla servers will already perform some per-statement jumps whatever happens, letting it perform the per-statement work instead of doing it via per-statement async management in tokio enables to reduce the number of clients without significantly increasing the load on the Scylla servers in a way that would require provisioning more of them. Please let me know if I am mistaken.

Your cluster consists of nodes. Each node consists of shards (usually number of shards is about the same as number of CPU cores).
If your keyspace has replication factor X, each partition is copied to X different replicas, where replica is a pair (Node, shard number).
When you perform an insert statement driver chooses some node and send the request to it. This node is a coordinator for this request. Coordinator will then contact all of the X replicas for the partition this statement touches and send them the data. After receiving responses from some of them (consistency level determines how many) it will respond to the driver.
Now what is Shard Awareness?
Let's start with Token Awareness. It is an optimization to the flow described above. The idea is for the driver to calculate which partition the statement touches and then send the statement to a node that is a replica for this partition. That way the coordinator will only have to contact (X-1) replicas, because it is itself a replica.
Shard Awareness is a natural extension of that to Scylla architecture. As I mentioned, in Scylla a replica is (Node, shard number), so Token Awareness may send the statement to wrong shard on correct node. This means coordinator would need to contact (X-1) different nodes and one different shard on its own node.
To fix this the driver sends the statement to the correct replica (node + shard), so coordinator needs to contact (X-1) replicas.

Why do I explain all that? You mentioned that you are batching by statements shard. I understand that you have N nodes, each has K shards, and on the driver side you have K queues (not N, not N*K, just K). Then you send batch from queue m to a shard m on some node.
This is not shard awareness and it doesn't give you benefits of Shard Awareness. It doesn't even give you the benefit of Token Awareness. The coordinator will still need to contact X replicas for most of statements in the batch, not (X-1).

Additionally, as I mentioned before, with new versions of Scylla the concept of "statements shard" will lose it's meaning because Scylla is moving from VNodes to Tablets.
With VNodes all replicas for a given partition would have different nodes, but same shard (shard calculation:

pub fn shard_of(&self, token: Token) -> Shard {
), so would have the form of (A, n), (B, n), (C, n).
That means you could tell "n is a shard for this statement".

With tablets it's possible (and it's a normal case, not some edge case) for replicas to have different shards, meaning your replicas could be (A, n), (B, l), (C, j) with n != l != j - so there isn't any shard that you could call THE shard of this statement.

I see 3 options here:

  • Just batch randomly and see how many queues give you the optimal performance.
  • Batch by replica and send to this replica. You'll have N * K queues (N - node count in cluster, K - shards per node). Take replica set from https://docs.rs/scylla/latest/scylla/transport/locator/struct.ReplicaLocator.html#method.replicas_for_token , use the first element as batching key. Use custom LBP that wraps DefaultPolicy which in pick returns this preferred replica and in fallback calls DefaultPolicy. This way you'll send the batch to correct replica AND retain the possibility to use retry policy / speculative execution. Coordinator will have to contact (X-1) replicas for each statement, as is the case with Shard Awareness on normal statements.
  • Batch by node of a replica. Do this if N * K queues is too much. Basically do as above, but batch by just node, not node+shard, and make your LBP return None for a shard. This will be similar to Token Awareness - coordinator will have to contact (X-1) other nodes, and one other shard, for each statement.

None of the above implementations requires changes in the driver, you can do them from user code.

Regarding possible improvements in the driver:

  • We could experiment with introducing something like performance profiles to the driver. Currently driver focuses on latency, but there could be a switch in session creation to focus on throughput. Then we could disable TCP_NODELAY, change preemption in the place you mentioned and others, or even use your channels.
  • After Session refactor we could introduce a method that executes a slice of statements, but does this more efficiently, by grouping together some operations.

There are probably more ideas to find but we need benchmarks for that.

@avikivity
Copy link
Member

Indeed I don't care about the other properties of batches that Scylla may try to uphold. My understanding is that if batch is Unlogged they are minimal - am I mistaken ?

There must be some overhead for multi-partition unlogged batches, but I'm not sure how big it is in practice. Coordinator will have to split the batch and execute it's statements separatly iiuc. Maybe @avikivity would know how big the overhead typically is?

It's better to coordinate on the client. The overhead isn't large but it's an extra hop.

@Ten0
Copy link
Contributor Author

Ten0 commented Jul 2, 2024

I understand the explanation, but the talk sounds interesting anyway. Could you share it?

Alright, I'll take the time to finish cleaning the auto-generated subtitles that are a disaster 😅 then post the link. 🙂

I understand that you have N nodes, each has K shards, and on the driver side you have K queues (not N, not N*K, just K)

Ah it looks like we had a misunderstanding here: I do create N*K queues, that is, one per actual shard, not one per shard-index or one per number-of-shards-per-node. (I'm using the naming discussed here, but maybe I should have said replica? I thought that was a different concept from node, associated to a particular statement.)

With tablets it's possible (and it's a normal case, not some edge case) for replicas to have different shards, meaning your replicas could be (A, n), (B, l), (C, j) with n != l != j - so there isn't any shard that you could call THE shard of this statement.

I already don't make such assumptions. Apologies if I said "the shard for a statement" anywhere above, I really only ever meant "a shard that would match this statement": I only ever manipulate pairs of (Node, ShardIndex), hence the signature of the main new function of this PR shard_for_statement. (Note to self: If shard is not the term but replica is, I assume it should be named replica_for_statement then.)

Take replica set from https://docs.rs/scylla/latest/scylla/transport/locator/struct.ReplicaLocator.html#method.replicas_for_token , use the first element as batching key

None of the above implementations requires changes in the driver, you can do them from user code.

I know that it's already possible, it's even written in the PR's description:

This was made possible by #658, but it was still very boilerplate-ish. I was waiting for #612 to be merged (amazing work btw! 😃) to implement a more direct and factored API (as that would use it).

The issues that I have with implementing this outside of the driver are:

  1. It's not very direct: I feel like this would often break from one scylla release to another, or worse, become inaccurate, compared to using the query planning code: if not using the query planner, it may not take into account nodes being down, or other things from the load balancing (node being overloaded, node's queue being more full...) that may come out from one release to another.
  2. It's very boilerplate-ish to calculate the token for the statement, notably because if we want to take into account the load balancer strategy, however complex, methods that the function this PR proposes use are not currently public. Rather than exposing them and leaving it to all users who want to do the same thing as I do to reimplement and exposing functions that are largely internal, it looks like it may be preferable to instead just add the one function that does what people actually may want to do with it.

To be clear: for me adding this function is the main point of this PR:

pub fn shard_for_statement(
&self,
prepared: &PreparedStatement,
serialized_values: &SerializedValues,
) -> Result<Option<(Arc<Node>, routing::Shard)>, QueryError> {

making the code that picks a shard for a given statement easily accessible and correct and consistent with load balancing, as efficient load balancing is effectively what I'm looking for.

Now as pointed out by @wprzytula here any code that uses this doesn't make sense without also enforcing the target (node,shard) afterwards, and it's nice to have usage examples, which is why the PR also adds a policy to enforce the target shard afterwards, and an test that also constitutes an example of how to write a system such as the one I was describing in my previous message.

@Ten0
Copy link
Contributor Author

Ten0 commented Jul 7, 2024

I understand the explanation, but the talk sounds interesting anyway. Could you share it?

Finally finished the subtitles (I recommend enabling them): https://youtu.be/cBElzSO4A_s

@Lorak-mmk Lorak-mmk self-assigned this Jul 9, 2024
@Lorak-mmk Lorak-mmk removed their assignment Jul 31, 2024
@wprzytula wprzytula removed their assignment Aug 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants