Skip to content

Commit

Permalink
format
Browse files Browse the repository at this point in the history
  • Loading branch information
slandelle committed Nov 28, 2017
1 parent 931bb1d commit 8c27b67
Showing 1 changed file with 93 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,96 +32,97 @@

public class NettyReactiveStreamsBody implements NettyBody {

private static final Logger LOGGER = LoggerFactory.getLogger(NettyReactiveStreamsBody.class);
private static final String NAME_IN_CHANNEL_PIPELINE = "request-body-streamer";

private final Publisher<ByteBuf> publisher;

private final long contentLength;

public NettyReactiveStreamsBody(Publisher<ByteBuf> publisher, long contentLength) {
this.publisher = publisher;
this.contentLength = contentLength;
}

@Override
public long getContentLength() {
return contentLength;
}

@Override
public void write(Channel channel, NettyResponseFuture<?> future) throws IOException {
if (future.isStreamConsumed()) {
LOGGER.warn("Stream has already been consumed and cannot be reset");
} else {
future.setStreamConsumed(true);
NettySubscriber subscriber = new NettySubscriber(channel, future);
channel.pipeline().addLast(NAME_IN_CHANNEL_PIPELINE, subscriber);
publisher.subscribe(new SubscriberAdapter(subscriber));
}
}

private static class SubscriberAdapter implements Subscriber<ByteBuf> {
private final Subscriber<HttpContent> subscriber;

public SubscriberAdapter(Subscriber<HttpContent> subscriber) {
this.subscriber = subscriber;
}

@Override
public void onSubscribe(Subscription s) {
subscriber.onSubscribe(s);
}

@Override
public void onNext(ByteBuf buffer) {
HttpContent content = new DefaultHttpContent(buffer);
subscriber.onNext(content);
}

@Override
public void onError(Throwable t) {
subscriber.onError(t);
}

@Override
public void onComplete() {
subscriber.onComplete();
}
}

private static class NettySubscriber extends HandlerSubscriber<HttpContent> {
private static final Logger LOGGER = LoggerFactory.getLogger(NettySubscriber.class);

private final Channel channel;
private final NettyResponseFuture<?> future;

public NettySubscriber(Channel channel, NettyResponseFuture<?> future) {
super(channel.eventLoop());
this.channel = channel;
this.future = future;
}

@Override
protected void complete() {
channel.eventLoop().execute(() -> channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(future -> removeFromPipeline()));
}

@Override
protected void error(Throwable error) {
if (error == null)
throw null;
removeFromPipeline();
future.abort(error);
}

private void removeFromPipeline() {
try {
channel.pipeline().remove(this);
LOGGER.debug(String.format("Removed handler %s from pipeline.", NAME_IN_CHANNEL_PIPELINE));
} catch (NoSuchElementException e) {
LOGGER.debug(String.format("Failed to remove handler %s from pipeline.", NAME_IN_CHANNEL_PIPELINE), e);
}
}
}
private static final Logger LOGGER = LoggerFactory.getLogger(NettyReactiveStreamsBody.class);
private static final String NAME_IN_CHANNEL_PIPELINE = "request-body-streamer";

private final Publisher<ByteBuf> publisher;

private final long contentLength;

public NettyReactiveStreamsBody(Publisher<ByteBuf> publisher, long contentLength) {
this.publisher = publisher;
this.contentLength = contentLength;
}

@Override
public long getContentLength() {
return contentLength;
}

@Override
public void write(Channel channel, NettyResponseFuture<?> future) throws IOException {
if (future.isStreamConsumed()) {
LOGGER.warn("Stream has already been consumed and cannot be reset");
} else {
future.setStreamConsumed(true);
NettySubscriber subscriber = new NettySubscriber(channel, future);
channel.pipeline().addLast(NAME_IN_CHANNEL_PIPELINE, subscriber);
publisher.subscribe(new SubscriberAdapter(subscriber));
}
}

private static class SubscriberAdapter implements Subscriber<ByteBuf> {
private final Subscriber<HttpContent> subscriber;

public SubscriberAdapter(Subscriber<HttpContent> subscriber) {
this.subscriber = subscriber;
}

@Override
public void onSubscribe(Subscription s) {
subscriber.onSubscribe(s);
}

@Override
public void onNext(ByteBuf buffer) {
HttpContent content = new DefaultHttpContent(buffer);
subscriber.onNext(content);
}

@Override
public void onError(Throwable t) {
subscriber.onError(t);
}

@Override
public void onComplete() {
subscriber.onComplete();
}
}

private static class NettySubscriber extends HandlerSubscriber<HttpContent> {
private static final Logger LOGGER = LoggerFactory.getLogger(NettySubscriber.class);

private final Channel channel;
private final NettyResponseFuture<?> future;

public NettySubscriber(Channel channel, NettyResponseFuture<?> future) {
super(channel.eventLoop());
this.channel = channel;
this.future = future;
}

@Override
protected void complete() {
channel.eventLoop().execute(() -> channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
.addListener(future -> removeFromPipeline()));
}

@Override
protected void error(Throwable error) {
if (error == null)
throw null;
removeFromPipeline();
future.abort(error);
}

private void removeFromPipeline() {
try {
channel.pipeline().remove(this);
LOGGER.debug(String.format("Removed handler %s from pipeline.", NAME_IN_CHANNEL_PIPELINE));
} catch (NoSuchElementException e) {
LOGGER.debug(String.format("Failed to remove handler %s from pipeline.", NAME_IN_CHANNEL_PIPELINE), e);
}
}
}
}

0 comments on commit 8c27b67

Please sign in to comment.