Skip to content

Commit

Permalink
hacking II - is this any better?
Browse files Browse the repository at this point in the history
  • Loading branch information
christophstrobl committed Nov 26, 2024
1 parent daf8240 commit f62fb20
Showing 1 changed file with 44 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static org.springframework.data.mongodb.core.query.Criteria.*;

import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -113,7 +114,7 @@ public <S extends T> Flux<S> saveAll(Iterable<S> entities) {
Streamable<S> source = Streamable.of(entities);

return source.stream().allMatch(entityInformation::isNew) ? //
insert(entities) : doItSomewhatSequentially(source, this::save);
insert(entities) : new AeonFlux<>(source).combatMap(this::save);
}

@Override
Expand All @@ -126,20 +127,6 @@ public <S extends T> Flux<S> saveAll(Publisher<S> entityStream) {
mongoOperations.save(entity, entityInformation.getCollectionName()));
}

static <T> Flux<T> doItSomewhatSequentially/* how should we actually call this? */(Streamable<T> ts, Function<? super T, ? extends Publisher<? extends T>> mapper) {

List<T> list = ts.toList();
if (list.size() == 1) {
return Flux.just(list.iterator().next()).flatMap(mapper);
} else if (list.size() == 2) {
return Flux.fromIterable(list).concatMap(mapper);
}

Flux<T> first = Flux.just(list.get(0)).flatMap(mapper);
Flux<T> theRest = Flux.fromIterable(list.subList(1, list.size())).flatMapSequential(mapper);
return first.concatWith(theRest);
}

@Override
public Mono<T> findById(ID id) {

Expand Down Expand Up @@ -579,4 +566,46 @@ private ReactiveFindOperation.TerminatingFind<T> createQuery(UnaryOperator<Query
}

}

static class AeonFlux<T> extends Flux<T> {

private final Streamable<T> source;
private final Flux<T> delegate;

AeonFlux(Streamable<T> source) {
this(source, Flux.fromIterable(source));
}

private AeonFlux(Streamable<T> source, Flux<T> delegate) {
this.source = source;
this.delegate = delegate;
}

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
delegate.subscribe(actual);
}

Flux<T> combatMap(Function<? super T, ? extends Publisher<? extends T>> mapper) {
return new AeonFlux<>(source, combatMapList(source.toList(), mapper));
}

private static <T> Flux<T> combatMapList(List<T> list,
Function<? super T, ? extends Publisher<? extends T>> mapper) {

if (list.isEmpty()) {
return Flux.empty();
}
if (list.size() == 1) {
return Flux.just(list.iterator().next()).flatMap(mapper);
}
if (list.size() == 2) {
return Flux.fromIterable(list).concatMap(mapper);
}

Flux<T> first = Flux.just(list.get(0)).flatMap(mapper);
Flux<T> theRest = Flux.fromIterable(list.subList(1, list.size())).flatMapSequential(mapper);
return first.concatWith(theRest);
}
}
}

0 comments on commit f62fb20

Please sign in to comment.