-
Notifications
You must be signed in to change notification settings - Fork 38.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for batch operations (R2DBC) #27229
Add support for batch operations (R2DBC) #27229
Conversation
Batching through We could follow the idea of Other than that, please check out the contribution guide to learn how we accept contributions. |
Thanks for the response.
Do you mean that we should check the expanded parameters count in NamedParameterUtils.substituteNamedParameters() or NamedParameter.addPlaceholder()? |
Is there any plans to add batching? |
@mp911de, correct me if I'm wrong, but the SPI has two ways of batching, right? One that allows you to send a single command with multiple parameters (via parameter binding) & another that allows you to pass multiple commands and, although both are sent in a single roundtrip, the latter one is slower since the database will have to parse every single command, so if your command won't change, you should stick with the first approach. I'm asking because the implementation of this PR is probably the most optimized (since it uses the first approach), although less flexible. Isn't there a way to add both ways to the |
@gabfssilva your perception is correct. Parametrized batching parses the command once and sends a execute command to the database for each binding set. I think it would be possible to bridge the |
It works for me now. @Autowired
private io.r2dbc.spi.ConnectionFactory connectionFactory;
@Transactional
public Mono<Void> update(List<User> users) {
String sql = "update user u set u.address=? where s.id=?";
return DatabaseClient.create(connectionFactory).sql(sql).filter(statement -> {
for (User u : users) {
statement.bind(0, u.getAddress()).bind(1, s.getId()).add();
}
return statement;
}).then();
} |
@mp911de I am happy helping the merge process but I could use a review on your side first. |
From the spec, we've learned that Considering batching with As a starting point, how about a databaseClient.sql("INSERT INTO legoset (id, name, manual) VALUES(:id, :name, :manual)")
.bind(params -> params.bind("name", name1).bind("manual", manual1))
.bind(params -> params.bind("name", name2).bind("manual", manual2))
.fetch(); The lambda parameter would be a simple interface carrying interface Params {
Params bind(String name, Object value);
Params bind(int index, Object value);
Params bindNull(String name, Class<?> type);
Params bindNull(int index, Class<?> type);
} |
@MarkMarkyMarkus would you have some cycle to amend your PR with the review of Mark? If not, that's cool we can take over when time permits. |
Yes, I will take a look. |
063786f
to
9bf751a
Compare
Note that we intentionally designed our new |
Hope there is a bind method to accept a list/array as params directly to avoid bind object one by one. .bind(
List.of(
Tuple.of(param1, param2, param3...),
Tuple.of(param1, param2, param3...),
...
)
) |
I did some test with https://github.com/TechEmpower/FrameworkBenchmarks/tree/master/frameworks/Java/spring-webflux in order to use batch update instead of individual requests. Under high latency, batch update from this PR allows to increase the throughput by 50%, so I would be in favor of trying to move forward on this PR if we can find the right API. Speaking about the API and related to @hantsy comment, I think we should evolve the API to make it easier to use a collection of data to bind, because if I am not mistaken, the current one is not practical to use for this very popular (likely the most popular) use case. I had to do something like that: // Wrapper class is needed because variable used in lambda must be final or effectively final
var wrapper = new Object(){ GenericExecuteSpec spec =
databaseClient.sql("UPDATE world SET randomnumber=:randomnumber WHERE id = :id"); };
worlds.forEach(world -> wrapper.spec = wrapper.spec.bind(params ->
params.bind("id", world.id).bind("randomnumber", world.randomnumber)));
return wrapper.spec.fetch().rowsUpdated(); Maybe we should just have a I was not able to make it work with @bwhyman I was not able to make your |
databaseClient.inConnection(conn -> {
Statement statement = conn.createStatement(sql);
for (int i = 0; i < timetables.size(); i++) {
var tb = timetables.get(i);
statement.bind(0, snowflake.nextId())
.bind(1, collid)
.bind(2, tb.getStartweek())
.bind(3, tb.getEndweek());
// The add() method cannot be called for the last time
if (i < timetables.size() - 1) {
statement.add();
}
}
return Flux.from(statement.execute()).collectList();
}); R2DBC:3.3.1 |
@bwhyman I was able to use batch updates with what you suggested but using @mp911de @jhoeller Do you think we could relax the requirement of skipping the last databaseClient.inConnectionMany(conn -> {
Statement statement = conn.createStatement(sql);
for (World world : worlds) {
statement.bind(0, world.randomnumber).bind(1, world.id).add();
}
return Flux.from(statement.execute()).collectList();
}); I am also wondering if dedicated batch update support would provide even more performance benefits (the improvement I get in Techempower are visible here). |
Implicit batching with the current API opens pathways for a new class of bugs.
I like |
Ok so if nobody objects, I will close this PR unmerged, create a related issue for |
Closing this (old) PR unmerged in favor of #33812. |
Add support for batch operations to the DefaultDatabaseClient:
Closes #259
Demo app: springR2dbcBatchExample.tar.gz