Skip to content

Commit

Permalink
support gremlin http service
Browse files Browse the repository at this point in the history
  • Loading branch information
shirly121 committed Dec 31, 2024
1 parent a9a865b commit 8b3f9f9
Show file tree
Hide file tree
Showing 5 changed files with 466 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.FullHttpMessage;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.Attribute;
import io.netty.util.AttributeMap;

Expand All @@ -42,6 +44,7 @@
import org.apache.tinkerpop.gremlin.server.auth.Authenticator;
import org.apache.tinkerpop.gremlin.server.authz.Authorizer;
import org.apache.tinkerpop.gremlin.server.handler.AbstractAuthenticationHandler;
import org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtils;
import org.apache.tinkerpop.gremlin.server.handler.SaslAuthenticationHandler;
import org.apache.tinkerpop.gremlin.server.handler.StateKey;
import org.slf4j.Logger;
Expand All @@ -50,6 +53,7 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -205,9 +209,70 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) throw
ctx.writeAndFlush(error);
}
}
} else if (msg instanceof FullHttpMessage) { // add Authentication for HTTP requests
FullHttpMessage request = (FullHttpMessage) msg;

if (!authenticator.requireAuthentication()) {
ctx.fireChannelRead(request);
return;
}

String errorMsg =
"Invalid HTTP Header for Authentication. Expected format: 'Authorization: Basic"
+ " <Base64(user:password)>'";

if (!request.headers().contains("Authorization")) {
sendError(ctx, errorMsg, request);
return;
}

String authorizationHeader = request.headers().get("Authorization");
if (!authorizationHeader.startsWith("Basic ")) {
sendError(ctx, errorMsg, request);
return;
}

String authorization;
byte[] decodedUserPass;
try {
authorization = authorizationHeader.substring("Basic ".length());
decodedUserPass = BASE64_DECODER.decode(authorization);
} catch (Exception e) {
sendError(ctx, errorMsg, request);
return;
}

authorization = new String(decodedUserPass, Charset.forName("UTF-8"));
String[] split = authorization.split(":");
if (split.length != 2) {
sendError(
ctx,
"Invalid username or password after decoding the Base64 Authorization"
+ " header.",
request);
return;
}

Map<String, String> credentials = new HashMap();
credentials.put("username", split[0]);
credentials.put("password", split[1]);
String address = ctx.channel().remoteAddress().toString();
if (address.startsWith("/") && address.length() > 1) {
address = address.substring(1);
}

credentials.put("address", address);

try {
AuthenticatedUser user = authenticator.authenticate(credentials);
ctx.channel().attr(StateKey.AUTHENTICATED_USER).set(user);
ctx.fireChannelRead(request);
} catch (AuthenticationException e) {
sendError(ctx, e.getMessage(), request);
}
} else {
logger.warn(
"{} only processes RequestMessage instances - received {} - channel closing",
"{} received invalid request message {} - channel closing",
this.getClass().getSimpleName(),
msg.getClass());
ctx.close();
Expand All @@ -226,4 +291,17 @@ private InetAddress getRemoteInetAddress(final ChannelHandlerContext ctx) {

return ((InetSocketAddress) genericSocketAddr).getAddress();
}

private void sendError(
final ChannelHandlerContext ctx, String errorMsg, FullHttpMessage request) {
HttpHandlerUtils.sendError(ctx, HttpResponseStatus.UNAUTHORIZED, errorMsg, false);
if (request.refCnt() > 0) {
boolean fullyReleased = request.release();
if (!fullyReleased) {
logger.warn(
"http request message was not fully released, may cause a"
+ " memory leak");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
*
* * Copyright 2020 Alibaba Group Holding Limited.
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/

package com.alibaba.graphscope.gremlin.plugin.processor;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.*;

import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
import org.apache.tinkerpop.gremlin.driver.ser.MessageTextSerializer;
import org.apache.tinkerpop.gremlin.driver.ser.SerializationException;
import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
import org.apache.tinkerpop.gremlin.server.Context;
import org.apache.tinkerpop.gremlin.server.GraphManager;
import org.apache.tinkerpop.gremlin.server.Settings;
import org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtils;
import org.javatuples.Pair;

import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;

/**
* Maintain the gremlin execution context for http request.
*/
public class HttpContext extends Context {
private final Pair<String, MessageTextSerializer<?>> serializer;
private final boolean keepAlive;
private final AtomicReference<Boolean> headerSent;

public HttpContext(
RequestMessage requestMessage,
ChannelHandlerContext ctx,
Settings settings,
GraphManager graphManager,
GremlinExecutor gremlinExecutor,
ScheduledExecutorService scheduledExecutorService,
Pair<String, MessageTextSerializer<?>> serializer,
boolean keepAlive) {
super(
requestMessage,
ctx,
settings,
graphManager,
gremlinExecutor,
scheduledExecutorService);
this.serializer = Objects.requireNonNull(serializer);
this.keepAlive = keepAlive;
this.headerSent = new AtomicReference<>(false);
}

/**
* serialize the response message to http response and write to http channel.
* @param responseMessage
*/
@Override
public void writeAndFlush(final ResponseMessage responseMessage) {
try {
// send header once
if (!headerSent.compareAndSet(false, true)) {
FullHttpResponse chunkedResponse =
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
chunkedResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, serializer.getValue0());
chunkedResponse
.headers()
.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
this.getChannelHandlerContext().writeAndFlush(chunkedResponse);
}
ByteBuf byteBuf =
Unpooled.wrappedBuffer(
serializer
.getValue1()
.serializeResponseAsString(responseMessage)
.getBytes(StandardCharsets.UTF_8));
FullHttpResponse response =
new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf);
ChannelFuture channelFuture = this.getChannelHandlerContext().writeAndFlush(response);
ResponseStatusCode statusCode = responseMessage.getStatus().getCode();
if (!keepAlive && statusCode.isFinalResponse()) {
channelFuture.addListener(ChannelFutureListener.CLOSE);
}
} catch (SerializationException e) {
HttpHandlerUtils.sendError(
this.getChannelHandlerContext(),
HttpResponseStatus.INTERNAL_SERVER_ERROR,
e.getMessage(),
keepAlive);
}
}
}
Loading

0 comments on commit 8b3f9f9

Please sign in to comment.