diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/DataLoadConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/DataLoadConfig.java index 87f5355a6be9..6e31e0b28f4a 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/DataLoadConfig.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/DataLoadConfig.java @@ -17,6 +17,12 @@ public class DataLoadConfig { public static final String USER_NAME = "auth.username"; public static final String PASS_WORD = "auth.password"; public static final String WAIT_TIME_BEFORE_COMMIT = "wait.time.before.commit"; + public static final String WAIT_TIME_BEFORE_REPLAY = "wait.time.before.replay"; + public static final String COMPACT_AFTER_COMMIT = "compact.after.commit"; + public static final String REOPEN_AFTER_COMMIT = "reopen.after.commit"; + public static final String PRIMARY_VIP_SERVER_DOMAIN = "primary.vipserver.domain"; + public static final String SECONDARY_VIP_SERVER_DOMAIN = "secondary.vipserver.domain"; + public static final String REPLAY_DATE = "replay.date"; /** job on HDFS configurations **/ diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/EndpointDTO.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/EndpointDTO.java new file mode 100644 index 000000000000..2ef64b7297f7 --- /dev/null +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/EndpointDTO.java @@ -0,0 +1,33 @@ +package com.alibaba.graphscope.groot.dataload.databuild; + +public class EndpointDTO { + + private String ip; + + private int port; + + public EndpointDTO(String ip, int port) { + this.ip = ip; + this.port = port; + } + + public String toAddress() { + return ip + ":" + port; + } + + public String getIp() { + return ip; + } + + public void setIp(String ip) { + this.ip = ip; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } +} diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdps.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdps.java index b4d74fd41800..319d81d2426a 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdps.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdps.java @@ -45,8 +45,11 @@ public class OfflineBuildOdps { private static final Logger logger = LoggerFactory.getLogger(OfflineBuildOdps.class); private static Odps odps; - public static void main(String[] args) throws IOException { + public static void main(String[] args) throws Exception { String propertiesFile = args[0]; + for (String arg : args) { + logger.info("arg is {}", arg); + } Properties properties = new Properties(); try (InputStream is = new FileInputStream(propertiesFile)) { @@ -64,6 +67,37 @@ public static void main(String[] args) throws IOException { Long.parseLong( properties.getProperty(DataLoadConfig.WAIT_TIME_BEFORE_COMMIT, "-1")); + long waitTimeBeforeReplay = + Long.parseLong( + properties.getProperty(DataLoadConfig.WAIT_TIME_BEFORE_REPLAY, "-1")); + + String primaryVipServerDomain = + properties.getProperty(DataLoadConfig.PRIMARY_VIP_SERVER_DOMAIN, ""); + String secondaryVipServerDomain = + properties.getProperty(DataLoadConfig.SECONDARY_VIP_SERVER_DOMAIN, ""); + if (!"".equals(primaryVipServerDomain)) { + // if vipserver domain is not blank, get vipserver ip:port replace graphEndpoint param + try { + List vipServerEndpoints = + Utils.getEndpointFromVipServerDomain(primaryVipServerDomain); + logger.info("vipServerEndpoint is {}", vipServerEndpoints); + if (vipServerEndpoints.size() > 0) { + graphEndpoint = vipServerEndpoints.get(0).toAddress(); + } + } catch (Exception e) { + logger.error("Get vipserver domain endpoint has error.", e); + } + } + + boolean compactAfterCommit = + Boolean.parseBoolean( + properties.getProperty(DataLoadConfig.COMPACT_AFTER_COMMIT, "false")); + boolean reopenAfterCommit = + Boolean.parseBoolean( + properties.getProperty(DataLoadConfig.REOPEN_AFTER_COMMIT, "false")); + Long replayTimeStamp = getReplayTimeStampFromArgs(args); + logger.info("replayTimeStamp is {}", replayTimeStamp); + String uniquePath = properties.getProperty(DataLoadConfig.UNIQUE_PATH, UuidUtils.getBase64UUIDString()); @@ -178,5 +212,87 @@ public static void main(String[] args) throws IOException { } } } + replayRecords(replayTimeStamp, waitTimeBeforeReplay, client); + compactDb(compactAfterCommit, client, graphEndpoint); + reopenDb(reopenAfterCommit, secondaryVipServerDomain, username, password); + } + + private static void reopenDb( + boolean reopenAfterCommit, + String secondaryVipServerDomain, + String username, + String password) + throws Exception { + if (reopenAfterCommit) { + if (!"".equals(secondaryVipServerDomain)) { + try { + List secondaryVipServerEndpoints = + Utils.getEndpointFromVipServerDomain(secondaryVipServerDomain); + for (EndpointDTO secondaryVipServerEndpoint : secondaryVipServerEndpoints) { + String address = secondaryVipServerEndpoint.getIp() + ":55556"; + logger.info("endpoint: {}, reopen start.", address); + GrootClient secondaryClient = Utils.getClient(address, username, password); + boolean reopenSuccess = secondaryClient.reopenSecondary(); + logger.info("endpoint: {}, reopen result:{}", address, reopenSuccess); + } + } catch (Exception e) { + logger.error("Get secondary vipserver domain endpoint has error.", e); + throw e; + } + } + } + } + + private static void compactDb( + boolean compactAfterCommit, GrootClient client, String graphEndpoint) { + if (compactAfterCommit) { + logger.info("endpoint {} compact start:", graphEndpoint); + boolean compactSuccess = client.compactDB(); + logger.info("compact result:" + compactSuccess); + } + } + + private static void replayRecords( + Long replayTimeStamp, long waitTimeBeforeReplay, GrootClient client) { + if (replayTimeStamp != null) { + if (waitTimeBeforeReplay > 0) { + long waitStartTime = System.currentTimeMillis(); + logger.info("start wait before replay: " + waitStartTime); + try { + Thread.sleep(waitTimeBeforeReplay); + logger.info("wait time has arrived. will replay soon."); + } catch (InterruptedException e) { + logger.warn("wait thread has been interrupt. will replay soon."); + } + } + long replayStartTime = System.currentTimeMillis(); + logger.info("start replay records: " + replayStartTime); + // need replay time stamp + List snapShotIds = client.replayRecords(-1, replayTimeStamp); + for (Long snapShotId : snapShotIds) { + client.remoteFlush(snapShotId); + } + long replayEndTime = System.currentTimeMillis(); + logger.info("replay records end: " + replayEndTime); + } + } + + /** + * find replay timestamp + * @param args + * @return + */ + private static Long getReplayTimeStampFromArgs(String[] args) { + for (String arg : args) { + if (arg.contains(DataLoadConfig.REPLAY_DATE)) { + String[] kv = arg.split("="); + if (kv.length < 2) { + return null; + } + String replayDate = kv[1]; + return Utils.transferDateToTimeStamp(replayDate, "yyyyMMdd"); + } + } + return null; } } diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/Utils.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/Utils.java index dcee9e8e2113..2761ee3fe142 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/Utils.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/Utils.java @@ -3,6 +3,7 @@ import com.alibaba.graphscope.groot.common.exception.PropertyDefNotFoundException; import com.alibaba.graphscope.groot.common.schema.api.*; import com.alibaba.graphscope.groot.common.schema.wrapper.PropertyValue; +import com.alibaba.graphscope.groot.dataload.util.HttpClient; import com.alibaba.graphscope.groot.sdk.GrootClient; import com.alibaba.graphscope.proto.groot.DataLoadTargetPb; import com.aliyun.odps.Odps; @@ -10,13 +11,26 @@ import com.aliyun.odps.data.TableInfo; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; public class Utils { + + private static final Logger logger = LoggerFactory.getLogger(Utils.class); + + public static final String VIP_SERVER_HOST_URL = + "http://jmenv.tbsite.net:8080/vipserver/serverlist"; + + public static final String VIP_SERVER_GET_DOMAIN_ENDPOINT_URL = + "http://%s:80/vipserver/api/srvIPXT?dom=%s&qps=0&clientIP=127.0.0.1&udpPort=55963&encoding=GBK&"; + // For Main Job static List getDataLoadTargets( Map columnMappingConfig) { @@ -215,4 +229,51 @@ public static String convertDateForLDBC(String input) { throw new RuntimeException(e); } } + + public static List getEndpointFromVipServerDomain(String domain) throws Exception { + String srvResponse = HttpClient.doGet(VIP_SERVER_HOST_URL, null); + String[] srvList = srvResponse.split("\n"); + List endpoints = new ArrayList<>(); + for (String srv : srvList) { + String url = String.format(VIP_SERVER_GET_DOMAIN_ENDPOINT_URL, srv, domain); + logger.info("url is {}", url); + try { + String endpointResponse = HttpClient.doGet(url, null); + ObjectMapper mapper = new ObjectMapper(); + JsonNode endpointResponseJson = mapper.readTree(endpointResponse); + JsonNode endpointJsonArray = endpointResponseJson.get("hosts"); + if (endpointJsonArray.isArray()) { + for (int i = 0; i < endpointJsonArray.size(); i++) { + JsonNode endpointJson = endpointJsonArray.get(i); + boolean isValid = endpointJson.get("valid").asBoolean(); + if (isValid) { + String ip = endpointJson.get("ip").asText(); + int port = endpointJson.get("port").asInt(); + endpoints.add(new EndpointDTO(ip, port)); + } + } + return endpoints; + } + } catch (Exception e) { + // continue + } + } + return endpoints; + } + + /** + * + * @param dateStr + * @param dateFormatStr eg. yyyyMMdd + * @return + */ + public static Long transferDateToTimeStamp(String dateStr, String dateFormatStr) { + SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatStr); + try { + Date date = dateFormat.parse(dateStr); + return date.getTime(); + } catch (java.text.ParseException e) { + return null; + } + } } diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/util/HttpClient.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/util/HttpClient.java index b97b5bc514d3..d984f4a392ec 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/util/HttpClient.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/util/HttpClient.java @@ -1,13 +1,18 @@ package com.alibaba.graphscope.groot.dataload.util; import org.apache.commons.codec.binary.Base64; +import org.apache.http.HttpStatus; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; import java.io.IOException; -import java.net.Authenticator; -import java.net.HttpURLConnection; -import java.net.PasswordAuthentication; -import java.net.URL; +import java.net.*; import java.nio.charset.StandardCharsets; +import java.util.Map; /** * Simple HTTPClient which refer from ... @@ -59,6 +64,44 @@ public HttpURLConnection createConnection(String urlString) throws IOException { return connection; } + public static String doGet(String url, Map param) throws Exception { + CloseableHttpClient httpClient = HttpClients.createDefault(); + String resultMsg = ""; + CloseableHttpResponse response = null; + try { + URIBuilder builder = new URIBuilder(url); + if (param != null) { + for (String key : param.keySet()) { + builder.addParameter(key, param.get(key)); + } + } + URI uri = builder.build(); + HttpGet httpGet = new HttpGet(uri); + response = httpClient.execute(httpGet); + if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { + resultMsg = EntityUtils.toString(response.getEntity(), "UTF-8"); + } else { + throw new Exception( + "调用http-get方法返回失败code=" + + response.getStatusLine().getStatusCode() + + ",msg=" + + response.getStatusLine().getReasonPhrase()); + } + } catch (Exception e) { + throw new Exception("调用http-get方法异常", e); + } finally { + try { + if (response != null) { + response.close(); + } + httpClient.close(); + } catch (IOException e) { + throw new Exception("httpClient close异常", e); + } + } + return resultMsg; + } + private String createBasicAuthHeaderValue() { String auth = user + ":" + password; byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8));