From 41bc5ae9bec44e25b3dde3d3824c16660eed8eba Mon Sep 17 00:00:00 2001 From: bufapiqi <26716594@qq.com> Date: Thu, 16 Nov 2023 19:16:36 +0800 Subject: [PATCH] fix(interactive): Support for option of using hash(srcId, dstId, edgeLabelId, edgePks) instead of nanotime as eid (#3357) Support edge updating scenarios where eid is not known in advance. User can leverage `enable.hash.generate.eid` to switch from previous eid generation behavior. --- .../templates/configmap.yaml | 1 + charts/graphscope-store-one-pod/values.yaml | 3 + .../graphscope-store/templates/configmap.yaml | 1 + charts/graphscope-store/values.yaml | 1 + .../groot/common/config/FrontendConfig.java | 3 + .../common/schema/impl/DefaultGraphEdge.java | 37 +++++++- .../schema/impl/DefaultGraphSchema.java | 31 ++++--- .../common/schema/mapper/EdgeTypeMapper.java | 27 ++++++ ...dexMapper.java => ElementIndexMapper.java} | 2 +- .../schema/mapper/VertexTypeMapper.java | 20 ++--- .../groot/common/util/PkHashUtils.java | 25 ++++++ .../groot/dataload/databuild/DataEncoder.java | 13 ++- .../dataload/databuild/OfflineBuildOdps.java | 3 + .../write/DefaultEdgeIdGenerator.java | 10 +++ .../groot/frontend/write/EdgeIdGenerator.java | 4 + .../groot/frontend/write/GraphWriter.java | 84 ++++++++++++++++++- .../graphscope/groot/servers/Frontend.java | 3 +- 17 files changed, 237 insertions(+), 31 deletions(-) rename interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/mapper/{VertexIndexMapper.java => ElementIndexMapper.java} (97%) diff --git a/charts/graphscope-store-one-pod/templates/configmap.yaml b/charts/graphscope-store-one-pod/templates/configmap.yaml index aeab8b31177e..e0841a57085b 100644 --- a/charts/graphscope-store-one-pod/templates/configmap.yaml +++ b/charts/graphscope-store-one-pod/templates/configmap.yaml @@ -31,6 +31,7 @@ data: ## Frontend Config frontend.server.num=1 + enable.hash.generate.eid={{ .Values.enableHashGenerateEid }} ## Ingestor Config ingestor.queue.buffer.size={{ .Values.ingestorQueueBufferSize }} diff --git a/charts/graphscope-store-one-pod/values.yaml b/charts/graphscope-store-one-pod/values.yaml index 4bb9598779b4..91198a6b3e5e 100644 --- a/charts/graphscope-store-one-pod/values.yaml +++ b/charts/graphscope-store-one-pod/values.yaml @@ -362,6 +362,9 @@ snapshotIncreaseIntervalMs: 1000 offsetsPersistIntervalMs: 3000 fileMetaStorePath: "/var/lib/graphscope-store/meta" +## Frontend config +enableHashGenerateEid: false + ## Store Config storeDataPath: "/var/lib/graphscope-store" storeWriteThreadCount: 1 diff --git a/charts/graphscope-store/templates/configmap.yaml b/charts/graphscope-store/templates/configmap.yaml index 081fa9465e7c..cd8c11fc51b9 100644 --- a/charts/graphscope-store/templates/configmap.yaml +++ b/charts/graphscope-store/templates/configmap.yaml @@ -33,6 +33,7 @@ data: frontend.service.port=55556 frontend.server.id=INDEX frontend.server.num={{ .Values.frontend.replicaCount }} + enable.hash.generate.eid={{ .Values.enableHashGenerateEid }} ## Ingestor Config ingestor.queue.buffer.size={{ .Values.ingestorQueueBufferSize }} diff --git a/charts/graphscope-store/values.yaml b/charts/graphscope-store/values.yaml index 00002c908384..fbfdf81fa808 100644 --- a/charts/graphscope-store/values.yaml +++ b/charts/graphscope-store/values.yaml @@ -522,6 +522,7 @@ kafkaTopic: "graphscope" kafkaProducerCustomConfigs: "" ## Frontend Config +enableHashGenerateEid: false # gremlinServerPort: 12312 executorWorkerPerProcess: 2 diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/FrontendConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/FrontendConfig.java index 5c3df04d4b83..2be5e01ea2ed 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/FrontendConfig.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/FrontendConfig.java @@ -25,4 +25,7 @@ public class FrontendConfig { Config.intConfig( "frontend.service.thread.count", Math.max(Math.min(Runtime.getRuntime().availableProcessors() / 2, 64), 4)); + + public static final Config ENABLE_HASH_GENERATE_EID = + Config.boolConfig("enable.hash.generate.eid", false); } diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/impl/DefaultGraphEdge.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/impl/DefaultGraphEdge.java index a9d32ba02aa1..9a43d3f58e6b 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/impl/DefaultGraphEdge.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/impl/DefaultGraphEdge.java @@ -21,6 +21,7 @@ import com.alibaba.graphscope.groot.common.schema.wrapper.TypeDef; import com.google.common.base.MoreObjects; +import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -29,6 +30,7 @@ public class DefaultGraphEdge implements GraphEdge { private String label; private List propertyList; private List relationList; + private List primaryKeyList; private final int versionId; @@ -45,11 +47,35 @@ public DefaultGraphEdge( String label, List propertyList, List relationList, + List primaryKeyList) { + this(id, label, propertyList, relationList, primaryKeyList, 0); + } + + public DefaultGraphEdge( + int id, + String label, + List propertyList, + List relationList, + int versionId) { + this.id = id; + this.label = label; + this.propertyList = propertyList; + this.relationList = relationList; + this.versionId = versionId; + } + + public DefaultGraphEdge( + int id, + String label, + List propertyList, + List relationList, + List primaryKeyList, int versionId) { this.id = id; this.label = label; this.propertyList = propertyList; this.relationList = relationList; + this.primaryKeyList = primaryKeyList; this.versionId = versionId; } @@ -59,6 +85,7 @@ public DefaultGraphEdge(TypeDef typeDef, List edgeRelations) { typeDef.getLabel(), typeDef.getPropertyList(), edgeRelations, + typeDef.getPrimaryKeyNameList(), typeDef.getVersionId()); } @@ -111,12 +138,18 @@ public int getVersionId() { @Override public List getPrimaryKeyList() { - return null; + List props = new ArrayList<>(); + if (this.primaryKeyList != null) { + for (String name : primaryKeyList) { + props.add(getProperty(name)); + } + } + return props; } @Override public List getPrimaryKeyNameList() { - return null; + return primaryKeyList; } @Override diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/impl/DefaultGraphSchema.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/impl/DefaultGraphSchema.java index 83f2afc6d8cc..d15c45d0f70c 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/impl/DefaultGraphSchema.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/impl/DefaultGraphSchema.java @@ -187,22 +187,15 @@ public static GraphSchema buildSchemaFromJson(String schemaJson) { } if (type.equalsIgnoreCase("VERTEX")) { - List primaryKeyList = Lists.newArrayList(); - - JsonNode indexArray = typeObject.get("indexes"); - if (indexArray != null) { - for (JsonNode indexObject : indexArray) { - JsonNode priNameList = indexObject.get("propertyNames"); - for (JsonNode pri : priNameList) { - primaryKeyList.add(pri.asText()); - } - } - } + List primaryKeyList = getPrimaryKeyList(typeObject.get("indexes")); DefaultGraphVertex graphVertex = new DefaultGraphVertex( labelId, label, propertyList, primaryKeyList); vertexList.put(label, graphVertex); } else { + // get edge pk name list + List primaryKeyList = getPrimaryKeyList(typeObject.get("indexes")); + List relationList = Lists.newArrayList(); JsonNode relationArray = typeObject.get("rawRelationShips"); if (null != relationArray) { @@ -218,7 +211,8 @@ public static GraphSchema buildSchemaFromJson(String schemaJson) { logger.warn("There's no relation def in edge " + label); } DefaultGraphEdge graphEdge = - new DefaultGraphEdge(labelId, label, propertyList, relationList); + new DefaultGraphEdge( + labelId, label, propertyList, relationList, primaryKeyList); edgeList.put(label, graphEdge); } } @@ -233,6 +227,19 @@ public static GraphSchema buildSchemaFromJson(String schemaJson) { } } + private static List getPrimaryKeyList(JsonNode indexArray) { + List primaryKeyList = Lists.newArrayList(); + if (indexArray != null) { + for (JsonNode indexObject : indexArray) { + JsonNode priNameList = indexObject.get("propertyNames"); + for (JsonNode pri : priNameList) { + primaryKeyList.add(pri.asText()); + } + } + } + return primaryKeyList; + } + public static void main(String[] args) throws JsonProcessingException { String schemaJson = "{\"partitionNum\": 2, \"types\": [{\"id\": 0, \"indexes\": [{\"propertyNames\":" diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/mapper/EdgeTypeMapper.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/mapper/EdgeTypeMapper.java index b882af5d7e1a..1df42344114f 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/mapper/EdgeTypeMapper.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/mapper/EdgeTypeMapper.java @@ -30,12 +30,21 @@ public class EdgeTypeMapper extends SchemaElementMapper { private List relationShips; + private List indexes; + public static SchemaElementMapper parseFromEdgeType(GraphEdge graphEdge) { EdgeTypeMapper edgeTypeMapper = new EdgeTypeMapper(); edgeTypeMapper.setId(graphEdge.getLabelId()); edgeTypeMapper.setLabel(graphEdge.getLabel()); edgeTypeMapper.setType(TypeEnum.EDGE.toString()); + ElementIndexMapper elementIndexMapper = new ElementIndexMapper(); + elementIndexMapper.setName("primary_key"); + elementIndexMapper.setIndexType("PRIMARY_KEY"); + elementIndexMapper.setPropertyNames(graphEdge.getPrimaryKeyNameList()); + ArrayList elementIndexMapperList = new ArrayList<>(); + elementIndexMapperList.add(elementIndexMapper); + edgeTypeMapper.setIndexes(elementIndexMapperList); List relationMapperList = new ArrayList<>(); for (EdgeRelation edgeRelation : graphEdge.getRelationList()) { relationMapperList.add( @@ -56,6 +65,14 @@ public List getRelationShips() { return relationShips; } + public List getIndexes() { + return indexes; + } + + public void setIndexes(List indexes) { + this.indexes = indexes; + } + public void setRelationShips(List relationShips) { this.relationShips = relationShips; } @@ -73,11 +90,21 @@ public GraphEdge toEdgeType(Map vertexTypeMap) { relationList.add(relationMapper.toEdgeRelation(vertexTypeMap)); } } + List primaryKeyList = new ArrayList<>(); + if (indexes != null && indexes.size() > 0) { + if (indexes.size() > 1) { + throw new IllegalArgumentException( + "Only support primary key now for " + this.indexes); + } + primaryKeyList = indexes.get(0).getPropertyNames(); + } + return new DefaultGraphEdge( this.getId(), this.getLabel(), graphPropertyList, relationList, + primaryKeyList, this.getVersionId()); } } diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/mapper/VertexIndexMapper.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/mapper/ElementIndexMapper.java similarity index 97% rename from interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/mapper/VertexIndexMapper.java rename to interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/mapper/ElementIndexMapper.java index 78e2df91567e..e6107639de2c 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/mapper/VertexIndexMapper.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/mapper/ElementIndexMapper.java @@ -18,7 +18,7 @@ import java.util.List; import java.util.stream.Collectors; -public class VertexIndexMapper { +public class ElementIndexMapper { private String name; private String indexType; private List propertyNames; diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/mapper/VertexTypeMapper.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/mapper/VertexTypeMapper.java index e454634073ab..cd3d2cd2fe7c 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/mapper/VertexTypeMapper.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/mapper/VertexTypeMapper.java @@ -25,7 +25,7 @@ @JsonIgnoreProperties(ignoreUnknown = true) public class VertexTypeMapper extends SchemaElementMapper { - private List indexes; + private List indexes; private long tableId; public static VertexTypeMapper parseFromVertexType(GraphVertex graphVertex) { @@ -34,13 +34,13 @@ public static VertexTypeMapper parseFromVertexType(GraphVertex graphVertex) { vertexTypeMapper.setLabel(graphVertex.getLabel()); vertexTypeMapper.setType(TypeEnum.VERTEX.toString()); - VertexIndexMapper vertexIndexMapper = new VertexIndexMapper(); - vertexIndexMapper.setName("primary_key"); - vertexIndexMapper.setIndexType("PRIMARY_KEY"); - vertexIndexMapper.setPropertyNames(graphVertex.getPrimaryKeyNameList()); - ArrayList vertexIndexMapperList = new ArrayList<>(); - vertexIndexMapperList.add(vertexIndexMapper); - vertexTypeMapper.setIndexes(vertexIndexMapperList); + ElementIndexMapper elementIndexMapper = new ElementIndexMapper(); + elementIndexMapper.setName("primary_key"); + elementIndexMapper.setIndexType("PRIMARY_KEY"); + elementIndexMapper.setPropertyNames(graphVertex.getPrimaryKeyNameList()); + ArrayList elementIndexMapperList = new ArrayList<>(); + elementIndexMapperList.add(elementIndexMapper); + vertexTypeMapper.setIndexes(elementIndexMapperList); List propertyMapperList = new ArrayList<>(); for (GraphProperty graphProperty : graphVertex.getPropertyList()) { propertyMapperList.add(GraphPropertyMapper.parseFromGraphProperty(graphProperty)); @@ -51,11 +51,11 @@ public static VertexTypeMapper parseFromVertexType(GraphVertex graphVertex) { return vertexTypeMapper; } - public List getIndexes() { + public List getIndexes() { return indexes; } - public void setIndexes(List indexes) { + public void setIndexes(List indexes) { this.indexes = indexes; } diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/util/PkHashUtils.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/util/PkHashUtils.java index 3fe3d737c188..260c87de7617 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/util/PkHashUtils.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/util/PkHashUtils.java @@ -42,6 +42,31 @@ public static long hash(int labelId, List pks) { return hash64(buffer.array(), buffer.limit()); } + public static long hash(long srcId, long dstId, int labelId, List pks) { + ByteBuffer buffer = THREAD_BUFFER.get(); + clear(buffer); + buffer.putLong(srcId); + buffer.putLong(dstId); + buffer.putInt(labelId); + for (byte[] pk : pks) { + buffer.putInt(pk.length); + buffer.put(pk); + } + flip(buffer); + return hash64(buffer.array(), buffer.limit()); + } + + public static long hash(long srcId, long dstId, int labelId, long nanoTime) { + ByteBuffer buffer = THREAD_BUFFER.get(); + clear(buffer); + buffer.putLong(srcId); + buffer.putLong(dstId); + buffer.putInt(labelId); + buffer.putLong(nanoTime); + flip(buffer); + return hash64(buffer.array(), buffer.limit()); + } + /** * Generates 64-bit hash from byte array of the given length and seed. * diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataEncoder.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataEncoder.java index ec14593ca650..759fde66b3ca 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataEncoder.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataEncoder.java @@ -74,9 +74,10 @@ public BytesRef encodeEdgeKey( labelPkIds.computeIfAbsent(type, k -> SchemaUtils.getEdgePrimaryKeyList(type)); long eid; if (edgePkIds != null && edgePkIds.size() > 0) { - eid = getHashId(type.getLabelId(), propertiesMap, edgePkIds); + List pkBytes = getPkBytes(type.getLabelId(), propertiesMap, edgePkIds); + eid = PkHashUtils.hash(srcId, dstId, type.getLabelId(), pkBytes); } else { - eid = System.nanoTime(); + eid = PkHashUtils.hash(srcId, dstId, type.getLabelId(), System.nanoTime()); } if (outEdge) { @@ -102,7 +103,7 @@ public BytesRef encodeProperties(int labelId, Map proper return new BytesRef(scratch.array(), 0, scratch.limit()); } - private static long getHashId( + private static List getPkBytes( int labelId, Map operationProperties, List pkIds) { List pks = new ArrayList<>(pkIds.size()); for (int pkId : pkIds) { @@ -114,6 +115,12 @@ private static long getHashId( byte[] valBytes = propertyValue.getValBytes(); pks.add(valBytes); } + return pks; + } + + private static long getHashId( + int labelId, Map operationProperties, List pkIds) { + List pks = getPkBytes(labelId, operationProperties, pkIds); return PkHashUtils.hash(labelId, pks); } diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdps.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdps.java index 6bdd55e3d1a3..52958cf6ed2b 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdps.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdps.java @@ -17,6 +17,7 @@ import com.alibaba.graphscope.groot.common.schema.api.GraphSchema; import com.alibaba.graphscope.groot.common.schema.mapper.GraphSchemaMapper; import com.alibaba.graphscope.groot.common.schema.wrapper.GraphDef; +import com.alibaba.graphscope.groot.common.util.JSON; import com.alibaba.graphscope.groot.common.util.UuidUtils; import com.alibaba.graphscope.groot.dataload.util.OSSFS; import com.alibaba.graphscope.groot.dataload.util.VolumeFS; @@ -72,6 +73,7 @@ public static void main(String[] args) throws IOException { GraphDefPb graphDefPb = client.prepareDataLoad(targets); System.out.println("GraphDef: " + graphDefPb); GraphSchema schema = GraphDef.parseProto(graphDefPb); + System.out.println("GraphSchema: " + JSON.toJson(schema)); // number of reduce task int partitionNum = client.getPartitionNum(); @@ -124,6 +126,7 @@ public static void main(String[] args) throws IOException { } String schemaJson = GraphSchemaMapper.parseFromSchema(schema).toJsonString(); + System.out.println("schemaJson is :" + schemaJson); Map info = Utils.getMappingInfo(odps, schema, mappingConfig); ObjectMapper mapper = new ObjectMapper(); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/DefaultEdgeIdGenerator.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/DefaultEdgeIdGenerator.java index 6e5c19b7c21f..55212f220d0f 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/DefaultEdgeIdGenerator.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/DefaultEdgeIdGenerator.java @@ -3,9 +3,11 @@ import com.alibaba.graphscope.groot.common.RoleType; import com.alibaba.graphscope.groot.common.config.CommonConfig; import com.alibaba.graphscope.groot.common.config.Configs; +import com.alibaba.graphscope.groot.common.util.PkHashUtils; import com.alibaba.graphscope.groot.rpc.ChannelManager; import com.alibaba.graphscope.groot.rpc.RoleClients; +import java.util.List; import java.util.concurrent.atomic.AtomicLong; public class DefaultEdgeIdGenerator extends RoleClients @@ -35,6 +37,14 @@ public long getNextId() { return getNextId(); } + @Override + public long getHashId(long srcId, long dstId, int labelId, List pks) { + if (pks != null && pks.size() > 0) { + return PkHashUtils.hash(srcId, dstId, labelId, pks); + } + return PkHashUtils.hash(srcId, dstId, labelId, System.nanoTime()); + } + private void allocateNewIds() { long startId = getClient(0).allocateId(this.idAllocateSize); this.currentId.set(startId); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/EdgeIdGenerator.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/EdgeIdGenerator.java index bb2874738d45..d55e46282051 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/EdgeIdGenerator.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/EdgeIdGenerator.java @@ -1,6 +1,10 @@ package com.alibaba.graphscope.groot.frontend.write; +import java.util.List; + public interface EdgeIdGenerator { long getNextId(); + + long getHashId(long srcId, long dstId, int labelId, List pks); } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java index a15359d1877b..8c86c5574764 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java @@ -2,6 +2,8 @@ import com.alibaba.graphscope.groot.CompletionCallback; import com.alibaba.graphscope.groot.SnapshotCache; +import com.alibaba.graphscope.groot.common.config.Configs; +import com.alibaba.graphscope.groot.common.config.FrontendConfig; import com.alibaba.graphscope.groot.common.exception.GrootException; import com.alibaba.graphscope.groot.common.exception.PropertyDefNotFoundException; import com.alibaba.graphscope.groot.common.schema.api.GraphElement; @@ -26,6 +28,9 @@ import com.alibaba.graphscope.groot.operation.dml.*; import com.alibaba.graphscope.groot.rpc.RoleClients; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -50,6 +55,10 @@ public class GraphWriter implements MetricsAgent { private volatile long ingestorBlockTimeAvgMs; private volatile long lastUpdateIngestorBlockTimeNano; private AtomicInteger pendingWriteCount; + /** + * true: enable use hash64(srcId, dstId, edgeLabelId, edgePks) to generate eid; + */ + private boolean enableHashEid; private SnapshotCache snapshotCache; private EdgeIdGenerator edgeIdGenerator; @@ -57,6 +66,8 @@ public class GraphWriter implements MetricsAgent { private RoleClients ingestWriteClients; private AtomicLong lastWrittenSnapshotId = new AtomicLong(0L); + private static final Logger logger = LoggerFactory.getLogger(GraphWriter.class); + public GraphWriter( SnapshotCache snapshotCache, EdgeIdGenerator edgeIdGenerator, @@ -71,6 +82,23 @@ public GraphWriter( metricsCollector.register(this, () -> updateMetrics()); } + public GraphWriter( + SnapshotCache snapshotCache, + EdgeIdGenerator edgeIdGenerator, + MetaService metaService, + RoleClients ingestWriteClients, + MetricsCollector metricsCollector, + Configs configs) { + this.snapshotCache = snapshotCache; + this.edgeIdGenerator = edgeIdGenerator; + this.metaService = metaService; + this.ingestWriteClients = ingestWriteClients; + initMetrics(); + metricsCollector.register(this, () -> updateMetrics()); + // default for incr eid generate + this.enableHashEid = FrontendConfig.ENABLE_HASH_GENERATE_EID.get(configs); + } + public long writeBatch( String requestId, String writeSession, List writeRequests) { CompletableFuture future = new CompletableFuture<>(); @@ -359,13 +387,60 @@ private EdgeId getEdgeId(GraphSchema schema, DataRecord dataRecord, boolean over parseRawProperties(dstVertexDef, dstVertexRecordKey.getProperties()); long dstVertexHashId = getPrimaryKeysHashId(dstVertexDef.getLabelId(), dstVertexPkVals, dstVertexDef); + // long edgeInnerId = + // overwrite ? edgeIdGenerator.getNextId() : + // edgeRecordKey.getEdgeInnerId(); long edgeInnerId = - overwrite ? edgeIdGenerator.getNextId() : edgeRecordKey.getEdgeInnerId(); + getEdgeInnerId( + srcVertexHashId, + dstVertexHashId, + overwrite, + edgeRecordKey, + schema, + dataRecord); return new EdgeId( new VertexId(srcVertexHashId), new VertexId(dstVertexHashId), edgeInnerId); } } + /** + * if enableHashEid == true: when eid == 0(client input none eid), return hash eid as final eid + * if enableHashEid == false: when eid == 0(client input none eid), return 0 + * @param srcId srcVertexId + * @param dstId dstVertexId + * @param overwrite if insert + * @param edgeRecordKey edgeRecordKey + * @param schema GraphSchema + * @param dataRecord DataRecord + * @return eid + */ + private long getEdgeInnerId( + long srcId, + long dstId, + boolean overwrite, + EdgeRecordKey edgeRecordKey, + GraphSchema schema, + DataRecord dataRecord) { + long edgeInnerId; + if (this.enableHashEid) { + GraphElement edgeDef = schema.getElement(edgeRecordKey.getLabel()); + Map edgePkVals = + parseRawProperties(edgeDef, dataRecord.getProperties()); + List edgePkBytes = getPkBytes(edgePkVals, edgeDef); + int edgeLabelId = edgeDef.getLabelId(); + long eid = edgeIdGenerator.getHashId(srcId, dstId, edgeLabelId, edgePkBytes); + edgeInnerId = + overwrite + ? eid + : (edgeRecordKey.getEdgeInnerId() == 0 + ? eid + : edgeRecordKey.getEdgeInnerId()); + } else { + edgeInnerId = overwrite ? edgeIdGenerator.getNextId() : edgeRecordKey.getEdgeInnerId(); + } + return edgeInnerId; + } + private EdgeKind getEdgeKind(GraphSchema schema, DataRecord dataRecord) { EdgeTarget edgeTarget = dataRecord.getEdgeTarget(); if (edgeTarget != null) { @@ -392,13 +467,18 @@ private EdgeKind getEdgeKind(GraphSchema schema, DataRecord dataRecord) { public static long getPrimaryKeysHashId( int labelId, Map properties, GraphElement graphElement) { + return PkHashUtils.hash(labelId, getPkBytes(properties, graphElement)); + } + + public static List getPkBytes( + Map properties, GraphElement graphElement) { List pklist = graphElement.getPrimaryKeyList(); List pks = new ArrayList<>(pklist.size()); for (GraphProperty pk : pklist) { byte[] valBytes = properties.get(pk.getId()).getValBytes(); pks.add(valBytes); } - return PkHashUtils.hash(labelId, pks); + return pks; } public static long getPrimaryKeysHashIdFromRaw( diff --git a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Frontend.java b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Frontend.java index 40347b0dba71..4f32812a5f25 100644 --- a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Frontend.java +++ b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Frontend.java @@ -141,7 +141,8 @@ public Frontend(Configs configs) { edgeIdGenerator, this.metaService, ingestorWriteClients, - metricsCollector); + metricsCollector, + configs); WriteSessionGenerator writeSessionGenerator = new WriteSessionGenerator(configs); ClientWriteService clientWriteService = new ClientWriteService(writeSessionGenerator, graphWriter);