Skip to content

Commit

Permalink
fix(interactive): enhancement of data loader (#3585)
Browse files Browse the repository at this point in the history
- support auto discover endpoint by assigning a vipserver name
- support replay, compact, and reopen secondary after commit
  • Loading branch information
bufapiqi authored Feb 28, 2024
1 parent a5e1101 commit d3a40e8
Show file tree
Hide file tree
Showing 5 changed files with 264 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ public class DataLoadConfig {
public static final String USER_NAME = "auth.username";
public static final String PASS_WORD = "auth.password";
public static final String WAIT_TIME_BEFORE_COMMIT = "wait.time.before.commit";
public static final String WAIT_TIME_BEFORE_REPLAY = "wait.time.before.replay";
public static final String COMPACT_AFTER_COMMIT = "compact.after.commit";
public static final String REOPEN_AFTER_COMMIT = "reopen.after.commit";
public static final String PRIMARY_VIP_SERVER_DOMAIN = "primary.vipserver.domain";
public static final String SECONDARY_VIP_SERVER_DOMAIN = "secondary.vipserver.domain";
public static final String REPLAY_DATE = "replay.date";

/** job on HDFS configurations **/

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.alibaba.graphscope.groot.dataload.databuild;

public class EndpointDTO {

private String ip;

private int port;

public EndpointDTO(String ip, int port) {
this.ip = ip;
this.port = port;
}

public String toAddress() {
return ip + ":" + port;
}

public String getIp() {
return ip;
}

public void setIp(String ip) {
this.ip = ip;
}

public int getPort() {
return port;
}

public void setPort(int port) {
this.port = port;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@ public class OfflineBuildOdps {
private static final Logger logger = LoggerFactory.getLogger(OfflineBuildOdps.class);
private static Odps odps;

public static void main(String[] args) throws IOException {
public static void main(String[] args) throws Exception {
String propertiesFile = args[0];
for (String arg : args) {
logger.info("arg is {}", arg);
}

Properties properties = new Properties();
try (InputStream is = new FileInputStream(propertiesFile)) {
Expand All @@ -64,6 +67,37 @@ public static void main(String[] args) throws IOException {
Long.parseLong(
properties.getProperty(DataLoadConfig.WAIT_TIME_BEFORE_COMMIT, "-1"));

long waitTimeBeforeReplay =
Long.parseLong(
properties.getProperty(DataLoadConfig.WAIT_TIME_BEFORE_REPLAY, "-1"));

String primaryVipServerDomain =
properties.getProperty(DataLoadConfig.PRIMARY_VIP_SERVER_DOMAIN, "");
String secondaryVipServerDomain =
properties.getProperty(DataLoadConfig.SECONDARY_VIP_SERVER_DOMAIN, "");
if (!"".equals(primaryVipServerDomain)) {
// if vipserver domain is not blank, get vipserver ip:port replace graphEndpoint param
try {
List<EndpointDTO> vipServerEndpoints =
Utils.getEndpointFromVipServerDomain(primaryVipServerDomain);
logger.info("vipServerEndpoint is {}", vipServerEndpoints);
if (vipServerEndpoints.size() > 0) {
graphEndpoint = vipServerEndpoints.get(0).toAddress();
}
} catch (Exception e) {
logger.error("Get vipserver domain endpoint has error.", e);
}
}

boolean compactAfterCommit =
Boolean.parseBoolean(
properties.getProperty(DataLoadConfig.COMPACT_AFTER_COMMIT, "false"));
boolean reopenAfterCommit =
Boolean.parseBoolean(
properties.getProperty(DataLoadConfig.REOPEN_AFTER_COMMIT, "false"));
Long replayTimeStamp = getReplayTimeStampFromArgs(args);
logger.info("replayTimeStamp is {}", replayTimeStamp);

String uniquePath =
properties.getProperty(DataLoadConfig.UNIQUE_PATH, UuidUtils.getBase64UUIDString());

Expand Down Expand Up @@ -178,5 +212,87 @@ public static void main(String[] args) throws IOException {
}
}
}
replayRecords(replayTimeStamp, waitTimeBeforeReplay, client);
compactDb(compactAfterCommit, client, graphEndpoint);
reopenDb(reopenAfterCommit, secondaryVipServerDomain, username, password);
}

private static void reopenDb(
boolean reopenAfterCommit,
String secondaryVipServerDomain,
String username,
String password)
throws Exception {
if (reopenAfterCommit) {
if (!"".equals(secondaryVipServerDomain)) {
try {
List<EndpointDTO> secondaryVipServerEndpoints =
Utils.getEndpointFromVipServerDomain(secondaryVipServerDomain);
for (EndpointDTO secondaryVipServerEndpoint : secondaryVipServerEndpoints) {
String address = secondaryVipServerEndpoint.getIp() + ":55556";
logger.info("endpoint: {}, reopen start.", address);
GrootClient secondaryClient = Utils.getClient(address, username, password);
boolean reopenSuccess = secondaryClient.reopenSecondary();
logger.info("endpoint: {}, reopen result:{}", address, reopenSuccess);
}
} catch (Exception e) {
logger.error("Get secondary vipserver domain endpoint has error.", e);
throw e;
}
}
}
}

private static void compactDb(
boolean compactAfterCommit, GrootClient client, String graphEndpoint) {
if (compactAfterCommit) {
logger.info("endpoint {} compact start:", graphEndpoint);
boolean compactSuccess = client.compactDB();
logger.info("compact result:" + compactSuccess);
}
}

private static void replayRecords(
Long replayTimeStamp, long waitTimeBeforeReplay, GrootClient client) {
if (replayTimeStamp != null) {
if (waitTimeBeforeReplay > 0) {
long waitStartTime = System.currentTimeMillis();
logger.info("start wait before replay: " + waitStartTime);
try {
Thread.sleep(waitTimeBeforeReplay);
logger.info("wait time has arrived. will replay soon.");
} catch (InterruptedException e) {
logger.warn("wait thread has been interrupt. will replay soon.");
}
}
long replayStartTime = System.currentTimeMillis();
logger.info("start replay records: " + replayStartTime);
// need replay time stamp
List<Long> snapShotIds = client.replayRecords(-1, replayTimeStamp);
for (Long snapShotId : snapShotIds) {
client.remoteFlush(snapShotId);
}
long replayEndTime = System.currentTimeMillis();
logger.info("replay records end: " + replayEndTime);
}
}

/**
* find replay timestamp
* @param args
* @return
*/
private static Long getReplayTimeStampFromArgs(String[] args) {
for (String arg : args) {
if (arg.contains(DataLoadConfig.REPLAY_DATE)) {
String[] kv = arg.split("=");
if (kv.length < 2) {
return null;
}
String replayDate = kv[1];
return Utils.transferDateToTimeStamp(replayDate, "yyyyMMdd");
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,34 @@
import com.alibaba.graphscope.groot.common.exception.PropertyDefNotFoundException;
import com.alibaba.graphscope.groot.common.schema.api.*;
import com.alibaba.graphscope.groot.common.schema.wrapper.PropertyValue;
import com.alibaba.graphscope.groot.dataload.util.HttpClient;
import com.alibaba.graphscope.groot.sdk.GrootClient;
import com.alibaba.graphscope.proto.groot.DataLoadTargetPb;
import com.aliyun.odps.Odps;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;

public class Utils {

private static final Logger logger = LoggerFactory.getLogger(Utils.class);

public static final String VIP_SERVER_HOST_URL =
"http://jmenv.tbsite.net:8080/vipserver/serverlist";

public static final String VIP_SERVER_GET_DOMAIN_ENDPOINT_URL =
"http://%s:80/vipserver/api/srvIPXT?dom=%s&qps=0&clientIP=127.0.0.1&udpPort=55963&encoding=GBK&";

// For Main Job
static List<DataLoadTargetPb> getDataLoadTargets(
Map<String, FileColumnMapping> columnMappingConfig) {
Expand Down Expand Up @@ -215,4 +229,51 @@ public static String convertDateForLDBC(String input) {
throw new RuntimeException(e);
}
}

public static List<EndpointDTO> getEndpointFromVipServerDomain(String domain) throws Exception {
String srvResponse = HttpClient.doGet(VIP_SERVER_HOST_URL, null);
String[] srvList = srvResponse.split("\n");
List<EndpointDTO> endpoints = new ArrayList<>();
for (String srv : srvList) {
String url = String.format(VIP_SERVER_GET_DOMAIN_ENDPOINT_URL, srv, domain);
logger.info("url is {}", url);
try {
String endpointResponse = HttpClient.doGet(url, null);
ObjectMapper mapper = new ObjectMapper();
JsonNode endpointResponseJson = mapper.readTree(endpointResponse);
JsonNode endpointJsonArray = endpointResponseJson.get("hosts");
if (endpointJsonArray.isArray()) {
for (int i = 0; i < endpointJsonArray.size(); i++) {
JsonNode endpointJson = endpointJsonArray.get(i);
boolean isValid = endpointJson.get("valid").asBoolean();
if (isValid) {
String ip = endpointJson.get("ip").asText();
int port = endpointJson.get("port").asInt();
endpoints.add(new EndpointDTO(ip, port));
}
}
return endpoints;
}
} catch (Exception e) {
// continue
}
}
return endpoints;
}

/**
*
* @param dateStr
* @param dateFormatStr eg. yyyyMMdd
* @return
*/
public static Long transferDateToTimeStamp(String dateStr, String dateFormatStr) {
SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatStr);
try {
Date date = dateFormat.parse(dateStr);
return date.getTime();
} catch (java.text.ParseException e) {
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package com.alibaba.graphscope.groot.dataload.util;

import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;

import java.io.IOException;
import java.net.Authenticator;
import java.net.HttpURLConnection;
import java.net.PasswordAuthentication;
import java.net.URL;
import java.net.*;
import java.nio.charset.StandardCharsets;
import java.util.Map;

/**
* Simple HTTPClient which refer from <a href="https://github.com/eugenp/tutorials/blob/master/core-java-modules/core-java-networking-2/src/main/java/com/baeldung/url/auth/HttpClient.java">...</a>
Expand Down Expand Up @@ -59,6 +64,44 @@ public HttpURLConnection createConnection(String urlString) throws IOException {
return connection;
}

public static String doGet(String url, Map<String, String> param) throws Exception {
CloseableHttpClient httpClient = HttpClients.createDefault();
String resultMsg = "";
CloseableHttpResponse response = null;
try {
URIBuilder builder = new URIBuilder(url);
if (param != null) {
for (String key : param.keySet()) {
builder.addParameter(key, param.get(key));
}
}
URI uri = builder.build();
HttpGet httpGet = new HttpGet(uri);
response = httpClient.execute(httpGet);
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
resultMsg = EntityUtils.toString(response.getEntity(), "UTF-8");
} else {
throw new Exception(
"调用http-get方法返回失败code="
+ response.getStatusLine().getStatusCode()
+ ",msg="
+ response.getStatusLine().getReasonPhrase());
}
} catch (Exception e) {
throw new Exception("调用http-get方法异常", e);
} finally {
try {
if (response != null) {
response.close();
}
httpClient.close();
} catch (IOException e) {
throw new Exception("httpClient close异常", e);
}
}
return resultMsg;
}

private String createBasicAuthHeaderValue() {
String auth = user + ":" + password;
byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8));
Expand Down

0 comments on commit d3a40e8

Please sign in to comment.