addAggNodeField(String field, int size) {
+ ObjectNode aggNode = JsonUtil.createObjectNode();
+ ObjectNode node = addNodeField(aggNode, field, size);
+ this.aggNode = aggNode;
+ this.node = node;
+ return this;
+ }
+
+ /**
+ * agg节点下添加group by 字段
+ *
+ * @param aggNode aggNode
+ * @param field group by 字段
+ * @param size 结果集size,默认为1
+ */
+ private ObjectNode addNodeField(ObjectNode aggNode, String field, int size) {
+ ObjectNode node = JsonUtil.createObjectNode();
+ node.set("terms", JsonUtil.createObjectNode().put("field", field).put("size", size));
+ ObjectNode groupByNode = JsonUtil.createObjectNode();
+ groupByNode.set("group_by_" + field, node);
+ aggNode.set(ElasticSearchConst.AGGS, groupByNode);
+ searchRequest.getGroupByAggFields().add(field);
+ return node;
+ }
+
+ ObjectNode getAggNode() {
+ return aggNode;
+ }
+ }
+}
diff --git a/message-push-common/src/main/java/org/kangspace/messagepush/core/elasticsearch/response/AggregationsResult.java b/message-push-common/src/main/java/org/kangspace/messagepush/core/elasticsearch/response/AggregationsResult.java
new file mode 100644
index 0000000..3abb6ee
--- /dev/null
+++ b/message-push-common/src/main/java/org/kangspace/messagepush/core/elasticsearch/response/AggregationsResult.java
@@ -0,0 +1,247 @@
+package org.kangspace.messagepush.core.elasticsearch.response;
+
+import lombok.Data;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * 分组查询结果基础Bean
+ *
+ * 查询:
+ * GET message-gateway.index_operators_info/_search
+ * {
+ * "size": 0,
+ * "query": {
+ * "bool": {
+ * "filter": [
+ * {
+ * "range": {
+ * "reportTime": {
+ * "gt": "2022-05-10 14:00:00",
+ * "lt": "2022-05-10 15:00:00"
+ * }
+ * }
+ * }
+ * ],
+ * "must": [],
+ * "must_not": [],
+ * "should": []
+ * }
+ * },
+ * "aggs": {
+ * "totalSmsCountPerHourGroup": {
+ * "date_histogram": {
+ * "field": "reportTime",
+ * "interval": "hour",
+ * "format": "yyyy-MM-dd HH"
+ * },
+ * "aggs": {
+ * "smsSendResultGroup": {
+ * "terms": {
+ * "field": "operatorsSendStatus",
+ * "size": 10
+ * },
+ * "aggs": {
+ * "smsTypeGroup": {
+ * "terms": {
+ * "field": "smsType",
+ * "size": 10
+ * },
+ * "aggs": {
+ * "serviceProviderGroup": {
+ * "terms": {
+ * "field": "operatorsType",
+ * "size": 10
+ * },
+ * "aggs": {
+ * "smsSizeSum": {
+ * "sum": {
+ * "field": "smsSize"
+ * }
+ * }
+ * }
+ * }
+ * }
+ * }
+ * }
+ * }
+ * }
+ * }
+ * }
+ * }
+ * 结果样例:
+ * "aggregations" : {
+ * "agroup" : {
+ * "value" : 1.65625
+ * },
+ * "totalSmsCountPerHourGroup" : {
+ * "buckets" : [
+ * {
+ * "key_as_string" : "2022-05-10 14",
+ * "key" : 1652191200000,
+ * "doc_count" : 32,
+ * "smsSendResultGroup" : {
+ * "doc_count_error_upper_bound" : 0,
+ * "sum_other_doc_count" : 0,
+ * "buckets" : [
+ * {
+ * "key" : "3",
+ * "doc_count" : 31,
+ * "smsTypeGroup" : {
+ * "doc_count_error_upper_bound" : 0,
+ * "sum_other_doc_count" : 0,
+ * "buckets" : [
+ * {
+ * "key" : "2",
+ * "doc_count" : 20,
+ * "serviceProviderGroup" : {
+ * "doc_count_error_upper_bound" : 0,
+ * "sum_other_doc_count" : 0,
+ * "buckets" : [
+ * {
+ * "key" : "6",
+ * "doc_count" : 20,
+ * "smsSizeSum" : {
+ * "value" : 41.0
+ * }
+ * }
+ * ]
+ * }
+ * },
+ * {
+ * "key" : "1",
+ * "doc_count" : 8,
+ * "serviceProviderGroup" : {
+ * "doc_count_error_upper_bound" : 0,
+ * "sum_other_doc_count" : 0,
+ * "buckets" : [
+ * {
+ * "key" : "6",
+ * "doc_count" : 8,
+ * "smsSizeSum" : {
+ * "value" : 8.0
+ * }
+ * }
+ * ]
+ * }
+ * },
+ * {
+ * "key" : "3",
+ * "doc_count" : 3,
+ * "serviceProviderGroup" : {
+ * "doc_count_error_upper_bound" : 0,
+ * "sum_other_doc_count" : 0,
+ * "buckets" : [
+ * {
+ * "key" : "5",
+ * "doc_count" : 3,
+ * "smsSizeSum" : {
+ * "value" : 3.0
+ * }
+ * }
+ * ]
+ * }
+ * }
+ * ]
+ * }
+ * },
+ * {
+ * "key" : "2",
+ * "doc_count" : 1,
+ * "smsTypeGroup" : {
+ * "doc_count_error_upper_bound" : 0,
+ * "sum_other_doc_count" : 0,
+ * "buckets" : [
+ * {
+ * "key" : "1",
+ * "doc_count" : 1,
+ * "serviceProviderGroup" : {
+ * "doc_count_error_upper_bound" : 0,
+ * "sum_other_doc_count" : 0,
+ * "buckets" : [
+ * {
+ * "key" : "6",
+ * "doc_count" : 1,
+ * "smsSizeSum" : {
+ * "value" : 1.0
+ * }
+ * }
+ * ]
+ * }
+ * }
+ * ]
+ * }
+ * }
+ * ]
+ * }
+ * }
+ * ]
+ * }
+ * }
+ *
+ *
+ *
+ *
+ *
+ * @author kango2gler@gmail.com
+ * @since 2022/5/16
+ */
+@Data
+public class AggregationsResult extends Result {
+ /**
+ * key: group name
+ * value: {buckets...}
+ */
+ Map aggregations;
+
+
+ /**
+ * 分组结果
+ */
+ @Data
+ public static class GroupBuckets {
+ /**
+ * buckets列表
+ */
+ private List buckets;
+
+ private Long docCountErrorUpperBound;
+ private Long sumOtherDocCount;
+ /**
+ * 简单分组函数值(key: 分组名称, value: 分组结果),可能为double类型.
+ * 如sum, avg分组等
+ */
+ private Object singleNumericValue;
+ }
+
+ /**
+ * 分组结果详情
+ */
+ @Data
+ public static class GroupBucketDetail {
+ /**
+ * 内部分组
+ * key: group name
+ * value: {buckets...}
+ */
+ Map aggs;
+ private Object key;
+ private String keyAsString;
+ /**
+ * 文档总数
+ */
+ private Long docCount;
+
+ public GroupBucketDetail() {
+ }
+
+ public GroupBucketDetail(Object key, String keyAsString, Long docCount) {
+ this.key = key;
+ this.keyAsString = keyAsString;
+ this.docCount = docCount;
+ }
+
+
+ }
+}
diff --git a/message-push-common/src/main/java/org/kangspace/messagepush/core/elasticsearch/response/Result.java b/message-push-common/src/main/java/org/kangspace/messagepush/core/elasticsearch/response/Result.java
new file mode 100644
index 0000000..4568d44
--- /dev/null
+++ b/message-push-common/src/main/java/org/kangspace/messagepush/core/elasticsearch/response/Result.java
@@ -0,0 +1,47 @@
+package org.kangspace.messagepush.core.elasticsearch.response;
+
+import lombok.Data;
+
+/**
+ * @author kango2gler@gmail.com
+ * @since 2022/5/16
+ */
+@Data
+public class Result {
+ /**
+ * 返回结果中的took
+ */
+ private Long took;
+ /**
+ * 返回结果中的timeout
+ */
+ private Boolean timeout;
+ /**
+ * 返回结果中 hits.total.value
+ */
+ private Long total;
+
+ /**
+ * 响应码
+ */
+ private Integer status;
+
+ /**
+ * 错误信息
+ */
+ private String error;
+
+ public Result() {
+ }
+
+ public Result(Integer status, String error) {
+ this.status = status;
+ this.error = error;
+ }
+
+ public Result(Long took, Boolean timeout, Long total) {
+ this.took = took;
+ this.timeout = timeout;
+ this.total = total;
+ }
+}
diff --git a/message-push-common/src/main/java/org/kangspace/messagepush/core/elasticsearch/util/QueryUtil.java b/message-push-common/src/main/java/org/kangspace/messagepush/core/elasticsearch/util/QueryUtil.java
new file mode 100644
index 0000000..5e40908
--- /dev/null
+++ b/message-push-common/src/main/java/org/kangspace/messagepush/core/elasticsearch/util/QueryUtil.java
@@ -0,0 +1,565 @@
+package org.kangspace.messagepush.core.elasticsearch.util;
+
+import cn.hutool.core.date.DateUtil;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import lombok.extern.slf4j.Slf4j;
+import org.kangspace.messagepush.core.elasticsearch.ElasticSearchConst;
+import org.kangspace.messagepush.core.elasticsearch.annotation.QueryField;
+import org.kangspace.messagepush.core.elasticsearch.domain.FullTextQuery;
+import org.kangspace.messagepush.core.elasticsearch.enumeration.OccurEnum;
+import org.kangspace.messagepush.core.elasticsearch.query.FulltextQueryBuilder;
+import org.kangspace.messagepush.core.elasticsearch.request.JsonSearchRequest;
+import org.kangspace.messagepush.core.util.JsonUtil;
+import org.kangspace.messagepush.core.util.ListUtil;
+import org.kangspace.messagepush.core.util.StrUtil;
+
+import java.beans.PropertyDescriptor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * 查询工具类
+ *
+ * @author kango2gler@gmail.com
+ */
+@Slf4j
+public class QueryUtil {
+
+ /**
+ * Date类型默认格式
+ */
+ private static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss";
+
+ /**
+ * LocalDateTime类型默认格式
+ */
+ private static final String DEFAULT_LOCAL_DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'+'08:00";
+
+ /**
+ * 全文检索构建器
+ */
+ private static FulltextQueryBuilder fulltextQueryBuilder;
+
+ /**
+ * 初始化
+ *
+ * @param fulltextQueryBuilder 全文检索构建对象
+ */
+ public static void init(FulltextQueryBuilder fulltextQueryBuilder) {
+ QueryUtil.fulltextQueryBuilder = fulltextQueryBuilder;
+ }
+
+ /**
+ * 自动拼装
+ *
+ * @param jsonSearchRequest 查询请求对象
+ * @param entity 实体
+ * @param 泛型
+ * @throws Exception 异常
+ */
+ public static void entityToQuery(JsonSearchRequest jsonSearchRequest, T entity) throws Exception {
+
+ //初始化查询框架
+ initQueryNode(jsonSearchRequest);
+ if (entity == null) {
+ return;
+ }
+ //遍历实体类属性
+ for (Field field : entity.getClass().getDeclaredFields()) {
+ if (!field.isAnnotationPresent(QueryField.class)) {
+ continue;
+ }
+ //属性名
+ String fieldName = field.getName();
+ QueryField fieldAnnotation = field.getAnnotation(QueryField.class);
+ if (StrUtil.isNotEmpty(fieldAnnotation.field())) {
+ fieldName = fieldAnnotation.field().trim();
+ }
+
+ //属性值
+ Object fieldValue = getMethod(field, entity);
+ if (fieldValue == null) {
+ continue;
+ }
+ fieldValue = formatValue(fieldValue, fieldAnnotation.format());
+
+ //按处理方式拼装查询条件
+ switch (fieldAnnotation.value()) {
+ case TERM:
+ term(jsonSearchRequest, fieldAnnotation.occur(), fieldName, fieldValue);
+ break;
+ case TERM_GT_ZERO:
+ termGtZero(jsonSearchRequest, fieldAnnotation.occur(), fieldName, Integer.parseInt(String.valueOf(fieldValue)));
+ break;
+ case TERM_GTE_ZERO:
+ termGteZero(jsonSearchRequest, fieldAnnotation.occur(), fieldName, Integer.parseInt(String.valueOf(fieldValue)));
+ break;
+ case TERMS:
+ terms(jsonSearchRequest, fieldAnnotation.occur(), fieldName, fieldValue);
+ break;
+ case TERMS_GT_ZERO:
+ termsGtZero(jsonSearchRequest, fieldAnnotation.occur(), fieldName, fieldValue);
+ break;
+ case TERMS_GTE_ZERO:
+ termsGteZero(jsonSearchRequest, fieldAnnotation.occur(), fieldName, fieldValue);
+ break;
+ case TERMS_AND:
+ termsAnd(jsonSearchRequest, fieldAnnotation.occur(), fieldName, fieldValue);
+ break;
+ case MATCH:
+ match(jsonSearchRequest, fieldAnnotation.occur(), fieldName, fieldValue);
+ break;
+ case MATCH_PHRASE:
+ matchPhrase(jsonSearchRequest, fieldAnnotation.occur(), fieldName, fieldValue);
+ break;
+ case MATCH_PHRASE_PREFIX:
+ buildQueryNode(ElasticSearchConst.MATCH_PHRASE_PREFIX, jsonSearchRequest, fieldAnnotation.occur(), fieldName, fieldValue);
+ break;
+ case PREFIX:
+ buildQueryNode(ElasticSearchConst.PREFIX, jsonSearchRequest, fieldAnnotation.occur(), fieldName, fieldValue);
+ break;
+ case RANGE_GT:
+ range(jsonSearchRequest, fieldAnnotation.occur(), fieldName, fieldValue, ElasticSearchConst.GT);
+ break;
+ case RANGE_GTE:
+ range(jsonSearchRequest, fieldAnnotation.occur(), fieldName, fieldValue, ElasticSearchConst.GTE);
+ break;
+ case RANGE_LT:
+ range(jsonSearchRequest, fieldAnnotation.occur(), fieldName, fieldValue, ElasticSearchConst.LT);
+ break;
+ case RANGE_LTE:
+ range(jsonSearchRequest, fieldAnnotation.occur(), fieldName, fieldValue, ElasticSearchConst.LTE);
+ break;
+ case FULL_TEXT:
+ fulltext(jsonSearchRequest, fieldValue);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ /**
+ * 全文检索
+ *
+ * @param jsonSearchRequest 查询对象
+ * @param fieldValue 属性值
+ */
+ private static void fulltext(JsonSearchRequest jsonSearchRequest, Object fieldValue) {
+ if (fulltextQueryBuilder == null || !(fieldValue instanceof FullTextQuery)) {
+ return;
+ }
+ fulltextQueryBuilder.handler(jsonSearchRequest, ((FullTextQuery) fieldValue));
+ }
+
+ /**
+ * 初始化查询节点
+ *
+ * @param jsonSearchRequest 查询对象
+ */
+ private static void initQueryNode(JsonSearchRequest jsonSearchRequest) {
+ ObjectNode queryNode = getNode();
+ ObjectNode boolNode = getNode();
+ ArrayNode filterNode = JsonUtil.createArrayNode();
+ ArrayNode mustNode = JsonUtil.createArrayNode();
+ ArrayNode mustNotNode = JsonUtil.createArrayNode();
+ ArrayNode shouldNode = JsonUtil.createArrayNode();
+ boolNode.set(ElasticSearchConst.FILTER, filterNode);
+ boolNode.set(ElasticSearchConst.MUST, mustNode);
+ boolNode.set(ElasticSearchConst.MUST_NOT, mustNotNode);
+ boolNode.set(ElasticSearchConst.SHOULD, shouldNode);
+ queryNode.set(ElasticSearchConst.BOOL, boolNode);
+ jsonSearchRequest.getRootNode().set(ElasticSearchConst.QUERY, queryNode);
+ }
+
+ /**
+ * 得到属性get方法的值
+ *
+ * @param field 字段域
+ * @param entity 实体
+ * @param 泛型
+ * @return 调用get方法返回字段值
+ * @throws Exception 异常
+ */
+ private static Object getMethod(Field field, T entity) throws Exception {
+ PropertyDescriptor pd = new PropertyDescriptor(field.getName(), entity.getClass());
+ Method method = pd.getReadMethod();
+ method.setAccessible(true);
+ return method.invoke(entity);
+ }
+
+ /**
+ * 格式化属性值
+ *
+ * @param fieldValue 属性值
+ * @param format 格式
+ * @return 格式化后值
+ */
+ private static Object formatValue(Object fieldValue, String format) {
+ if (fieldValue == null) {
+ return null;
+ }
+ Object value;
+ if (fieldValue instanceof LocalDateTime) {
+ if (StrUtil.isEmpty(format)) {
+ format = DEFAULT_LOCAL_DATE_TIME_FORMAT;
+ }
+ value = DateUtil.format(((LocalDateTime) fieldValue), format);
+ } else if (fieldValue instanceof Date) {
+ if (StrUtil.isEmpty(format)) {
+ format = DEFAULT_DATE_FORMAT;
+ }
+ value = DateUtil.format(((Date) fieldValue), format);
+ } else {
+ value = fieldValue;
+ }
+ return value;
+ }
+
+
+ /**
+ * 等于
+ *
+ * @param jsonSearchRequest 查询对象
+ * @param occurEnum 查询条件拼装的位置,是在must、filter、should中
+ * @param fieldName 属性名
+ * @param fieldValue 属性值
+ */
+ public static void term(JsonSearchRequest jsonSearchRequest, OccurEnum occurEnum, String fieldName, Object fieldValue) {
+ if (jsonSearchRequest == null || occurEnum == null || StrUtil.isEmpty(fieldName) || fieldValue == null) {
+ return;
+ }
+ getOccurNode(jsonSearchRequest, occurEnum).add(getNode(ElasticSearchConst.TERM, getNode(fieldName, fieldValue)));
+
+ }
+
+ /**
+ * 等于,值大于0才拼装
+ *
+ * @param jsonSearchRequest 查询对象
+ * @param occurEnum 查询条件拼装的位置,是在must、filter、should中
+ * @param fieldName 属性名
+ * @param fieldValue 属性值
+ */
+ public static void termGtZero(JsonSearchRequest jsonSearchRequest, OccurEnum occurEnum, String fieldName, Integer fieldValue) {
+ if (jsonSearchRequest == null || occurEnum == null || StrUtil.isEmpty(fieldName) || fieldValue == null || fieldValue <= 0) {
+ return;
+ }
+ getOccurNode(jsonSearchRequest, occurEnum).add(getNode(ElasticSearchConst.TERM, getNode(fieldName, fieldValue)));
+ }
+
+ /**
+ * 等于,值大于等于0才拼装
+ *
+ * @param jsonSearchRequest 查询对象
+ * @param occurEnum 查询条件拼装的位置,是在must、filter、should中
+ * @param fieldName 属性名
+ * @param fieldValue 属性值
+ */
+ public static void termGteZero(JsonSearchRequest jsonSearchRequest, OccurEnum occurEnum, String fieldName, Integer fieldValue) {
+ if (jsonSearchRequest == null || occurEnum == null || StrUtil.isEmpty(fieldName) || fieldValue == null || fieldValue < 0) {
+ return;
+ }
+ getOccurNode(jsonSearchRequest, occurEnum).add(getNode(ElasticSearchConst.TERM, getNode(fieldName, fieldValue)));
+ }
+
+ /**
+ * 多项或完全匹配
+ *
+ * @param jsonSearchRequest 查询对象
+ * @param occurEnum 查询条件拼装的位置,是在must、filter、should中
+ * @param fieldName 属性名
+ * @param fieldValue 属性值
+ */
+ public static void terms(JsonSearchRequest jsonSearchRequest, OccurEnum occurEnum, String fieldName, Object fieldValue) {
+ List values = getValues(fieldValue);
+ if (values == null) {
+ return;
+ }
+ baseTerms(jsonSearchRequest, occurEnum, fieldName, values);
+ }
+
+ /**
+ * 多项或完全匹配,值为正整数才拼装
+ *
+ * @param jsonSearchRequest 查询对象
+ * @param occurEnum 查询条件拼装的位置,是在must、filter、should中
+ * @param fieldName 属性名
+ * @param fieldValue 属性值
+ */
+ public static void termsGtZero(JsonSearchRequest jsonSearchRequest, OccurEnum occurEnum, String fieldName, Object fieldValue) {
+ List values = getValues(fieldValue);
+ if (values == null) {
+ return;
+ }
+ for (int i = values.size() - 1; i >= 0; i--) {
+ Object value = values.get(i);
+ if (value instanceof Integer) {
+ if ((Integer) value <= 0) {
+ values.remove(i);
+ }
+ } else if (value instanceof Double) {
+ if ((Double) value <= 0) {
+ values.remove(i);
+ }
+ } else if (value instanceof String) {
+ if (StrUtil.strToInt((String) value, 0) <= 0) {
+ values.remove(i);
+ }
+ } else {
+ values.remove(i);
+ }
+ }
+ baseTerms(jsonSearchRequest, occurEnum, fieldName, values);
+ }
+
+ /**
+ * 多项或完全匹配,值为自然数才拼装
+ *
+ * @param jsonSearchRequest 查询对象
+ * @param occurEnum 查询条件拼装的位置,是在must、filter、should中
+ * @param fieldName 属性名
+ * @param fieldValue 属性值
+ */
+ public static void termsGteZero(JsonSearchRequest jsonSearchRequest, OccurEnum occurEnum, String fieldName, Object fieldValue) {
+ List values = getValues(fieldValue);
+ if (values == null) {
+ return;
+ }
+ for (int i = values.size() - 1; i >= 0; i--) {
+ Object value = values.get(i);
+ if (value instanceof Integer) {
+ if ((Integer) value < 0) {
+ values.remove(i);
+ }
+ } else if (value instanceof Double) {
+ if ((Double) value < 0) {
+ values.remove(i);
+ }
+ } else if (value instanceof String) {
+ if (StrUtil.strToInt((String) value, 0) < 0) {
+ values.remove(i);
+ }
+ } else {
+ values.remove(i);
+ }
+ }
+ baseTerms(jsonSearchRequest, occurEnum, fieldName, values);
+ }
+
+ /**
+ * 多项与完全匹配
+ *
+ * @param jsonSearchRequest 查询对象
+ * @param occurEnum 查询条件拼装的位置,是在must、filter、should中
+ * @param fieldName 属性名
+ * @param fieldValue 属性值
+ */
+ public static void termsAnd(JsonSearchRequest jsonSearchRequest, OccurEnum occurEnum, String fieldName, Object fieldValue) {
+ List values = getValues(fieldValue);
+ if (values == null) {
+ return;
+ }
+ values.forEach(value -> getOccurNode(jsonSearchRequest, occurEnum).add(getNode(ElasticSearchConst.TERM, getNode(fieldName, value))));
+ }
+
+ /**
+ * 基于Object对象获取List
+ *
+ * @param fieldValue 属性值
+ * @return
+ */
+ private static List getValues(Object fieldValue) {
+ if (fieldValue == null) {
+ return null;
+ }
+ if (fieldValue instanceof List) {
+ if (((List) fieldValue).size() == 0) {
+ return null;
+ }
+ return (List) fieldValue;
+ } else if (fieldValue instanceof String) {
+ if (StrUtil.isEmpty((String) fieldValue)) {
+ return null;
+ }
+ return StrUtil.strToList((String) fieldValue);
+ }
+ return null;
+ }
+
+ /**
+ * 基础terms方法
+ *
+ * @param jsonSearchRequest 查询对象
+ * @param occurEnum 查询条件拼装的位置,是在must、filter、should中
+ * @param fieldName 属性名
+ * @param fieldValue 属性值
+ */
+ private static void baseTerms(JsonSearchRequest jsonSearchRequest, OccurEnum occurEnum, String fieldName, List fieldValue) {
+ if (jsonSearchRequest == null || occurEnum == null || StrUtil.isEmpty(fieldName) || ListUtil.isEmpty(fieldValue)) {
+ return;
+ }
+ getOccurNode(jsonSearchRequest, occurEnum).add(getNode(ElasticSearchConst.TERMS, getNode(fieldName, fieldValue)));
+ }
+
+ /**
+ * 分词匹配查询
+ *
+ * @param jsonSearchRequest 查询对象
+ * @param occurEnum 查询条件拼装的位置,是在must、filter、should中
+ * @param fieldName 属性名
+ * @param fieldValue 属性值
+ */
+ public static void match(JsonSearchRequest jsonSearchRequest, OccurEnum occurEnum, String fieldName, Object fieldValue) {
+ if (jsonSearchRequest == null || occurEnum == null || StrUtil.isEmpty(fieldName) || fieldValue == null) {
+ return;
+ }
+ getOccurNode(jsonSearchRequest, occurEnum).add(getNode(ElasticSearchConst.MATCH, getNode(fieldName, fieldValue)));
+ }
+
+ /**
+ * 分词匹配-短语匹配查询
+ *
+ * @param jsonSearchRequest 查询对象
+ * @param occurEnum 查询条件拼装的位置,是在must、filter、should中
+ * @param fieldName 属性名
+ * @param fieldValue 属性值
+ */
+ public static void matchPhrase(JsonSearchRequest jsonSearchRequest, OccurEnum occurEnum, String fieldName, Object fieldValue) {
+ if (jsonSearchRequest == null || occurEnum == null || StrUtil.isEmpty(fieldName) || fieldValue == null) {
+ return;
+ }
+ getOccurNode(jsonSearchRequest, occurEnum).add(getNode(ElasticSearchConst.MATCH_PHRASE, getNode(fieldName, fieldValue)));
+ }
+
+ /**
+ * 构建查询节点
+ *
+ * @param queryType 查询类型 如:{@link ElasticSearchConst#MATCH_PHRASE_PREFIX},{@link ElasticSearchConst#MATCH_PHRASE}等
+ * @param jsonSearchRequest jsonSearchRequest
+ * @param occurEnum 查询条件拼装的位置,是在must、filter、should中
+ * @param fieldName 属性名
+ * @param fieldValue 属性值
+ */
+ public static void buildQueryNode(String queryType, JsonSearchRequest jsonSearchRequest, OccurEnum occurEnum, String fieldName, Object fieldValue) {
+ if (jsonSearchRequest == null || occurEnum == null || StrUtil.isEmpty(fieldName) || fieldValue == null) {
+ return;
+ }
+ getOccurNode(jsonSearchRequest, occurEnum).add(getNode(queryType, getNode(fieldName, fieldValue)));
+ }
+
+ /**
+ * 范围查询
+ *
+ * @param jsonSearchRequest 查询对象
+ * @param occurEnum 查询条件拼装的位置,是在must、filter、should中
+ * @param fieldName 属性名
+ * @param fieldValue 属性值
+ * @param relation 关系运算符 ElasticSearchConst.GT、ElasticSearchConst.GTE、ElasticSearchConst.LT、ElasticSearchConst.LTE
+ */
+ public static void range(JsonSearchRequest jsonSearchRequest, OccurEnum occurEnum, String fieldName, Object fieldValue, String relation) {
+ if (jsonSearchRequest == null || occurEnum == null || StrUtil.isEmpty(fieldName) || fieldValue == null) {
+ return;
+ }
+ getOccurNode(jsonSearchRequest, occurEnum).add(getNode(ElasticSearchConst.RANGE, getNode(fieldName, getNode(relation, fieldValue))));
+ }
+
+ /**
+ * 增加查询条件
+ *
+ * @param jsonSearchRequest 查询对象
+ * @param occurEnum 查询条件拼装的位置,是在must、filter、should中
+ * @param jsonNode
+ */
+ public static void addQuery(JsonSearchRequest jsonSearchRequest, OccurEnum occurEnum, JsonNode jsonNode) {
+ if (jsonSearchRequest == null || occurEnum == null || jsonNode == null) {
+ return;
+ }
+ getOccurNode(jsonSearchRequest, occurEnum).add(jsonNode);
+ }
+
+ /**
+ * 获取一个空JsonNode
+ *
+ * @return
+ */
+ private static ObjectNode getNode() {
+ return JsonUtil.createObjectNode();
+ }
+
+ /**
+ * 获取一个节点jsonNode
+ *
+ * @param fieldName 属性名
+ * @param fieldValue 属性值
+ * @return
+ */
+ public static JsonNode getNode(String fieldName, Object fieldValue) {
+ if (fieldValue instanceof JsonNode) {
+ return JsonUtil.createObjectNode().set(fieldName, (JsonNode) fieldValue);
+ } else if (fieldValue instanceof List) {
+ ArrayNode arrayNode = JsonUtil.createArrayNode();
+ ((List) fieldValue).forEach(value -> {
+ if (value instanceof String) {
+ arrayNode.add((String) value);
+ } else {
+ arrayNode.addPOJO(value);
+ }
+ });
+ return JsonUtil.createObjectNode().set(fieldName, arrayNode);
+ } else if (fieldValue instanceof String) {
+ return JsonUtil.createObjectNode().put(fieldName, (String) fieldValue);
+ } else if (fieldValue instanceof Integer) {
+ return JsonUtil.createObjectNode().put(fieldName, (Integer) fieldValue);
+ } else if (fieldValue instanceof Long) {
+ return JsonUtil.createObjectNode().put(fieldName, (Long) fieldValue);
+ } else if (fieldValue instanceof Short) {
+ return JsonUtil.createObjectNode().put(fieldName, (Short) fieldValue);
+ } else if (fieldValue instanceof Float) {
+ return JsonUtil.createObjectNode().put(fieldName, (Float) fieldValue);
+ } else if (fieldValue instanceof Double) {
+ return JsonUtil.createObjectNode().put(fieldName, (Double) fieldValue);
+ } else if (fieldValue instanceof Boolean) {
+ return JsonUtil.createObjectNode().put(fieldName, (Boolean) fieldValue);
+ } else if (fieldValue instanceof BigDecimal) {
+ return JsonUtil.createObjectNode().put(fieldName, (BigDecimal) fieldValue);
+ } else {
+ return JsonUtil.createObjectNode().putPOJO(fieldName, fieldValue);
+ }
+ }
+
+ /**
+ * 获取查询条件拼装的位置
+ *
+ * @param jsonSearchRequest 查询对象
+ * @param occurEnum 查询条件拼装的位置,是在must、filter、should中
+ * @return 查询条件拼装的位置
+ */
+ public static ArrayNode getOccurNode(JsonSearchRequest jsonSearchRequest, OccurEnum occurEnum) {
+ ArrayNode occurNode = null;
+ switch (occurEnum) {
+ case FILTER:
+ occurNode = ((ArrayNode) jsonSearchRequest.getRootNode().get(ElasticSearchConst.QUERY).get(ElasticSearchConst.BOOL).get(ElasticSearchConst.FILTER));
+ break;
+ case MUST:
+ occurNode = ((ArrayNode) jsonSearchRequest.getRootNode().get(ElasticSearchConst.QUERY).get(ElasticSearchConst.BOOL).get(ElasticSearchConst.MUST));
+ break;
+ case MUST_NOT:
+ occurNode = ((ArrayNode) jsonSearchRequest.getRootNode().get(ElasticSearchConst.QUERY).get(ElasticSearchConst.BOOL).get(ElasticSearchConst.MUST_NOT));
+ break;
+ case SHOULD:
+ occurNode = ((ArrayNode) jsonSearchRequest.getRootNode().get(ElasticSearchConst.QUERY).get(ElasticSearchConst.BOOL).get(ElasticSearchConst.SHOULD));
+ break;
+ default:
+ break;
+ }
+ return occurNode;
+ }
+
+}
diff --git a/message-push-common/src/main/java/org/kangspace/messagepush/core/enums/ResponseEnum.java b/message-push-common/src/main/java/org/kangspace/messagepush/core/enums/ResponseEnum.java
new file mode 100644
index 0000000..70eece4
--- /dev/null
+++ b/message-push-common/src/main/java/org/kangspace/messagepush/core/enums/ResponseEnum.java
@@ -0,0 +1,160 @@
+package org.kangspace.messagepush.core.enums;
+
+import org.springframework.lang.Nullable;
+
+import java.util.Objects;
+
+/**
+ * @author kango2gler@gmail.com
+ * @date 2024/7/13
+ * @since
+ */
+public enum ResponseEnum {
+ CONTINUE(100, "Continue"),
+ SWITCHING_PROTOCOLS(101, "Switching Protocols"),
+ PROCESSING(102, "Processing"),
+ CHECKPOINT(103, "Checkpoint"),
+ OK(1, "OK"),
+ CREATED(201, "Created"),
+ ACCEPTED(202, "Accepted"),
+ NON_AUTHORITATIVE_INFORMATION(203, "Non-Authoritative Information"),
+ NO_CONTENT(204, "No Content"),
+ RESET_CONTENT(205, "Reset Content"),
+ PARTIAL_CONTENT(206, "Partial Content"),
+ MULTI_STATUS(207, "Multi-Status"),
+ ALREADY_REPORTED(208, "Already Reported"),
+ IM_USED(226, "IM Used"),
+ MULTIPLE_CHOICES(300, "Multiple Choices"),
+ MOVED_PERMANENTLY(301, "Moved Permanently"),
+ FOUND(302, "Found"),
+ /**
+ * @deprecated
+ */
+ @Deprecated
+ MOVED_TEMPORARILY(302, "Moved Temporarily"),
+ SEE_OTHER(303, "See Other"),
+ NOT_MODIFIED(304, "Not Modified"),
+ /**
+ * @deprecated
+ */
+ @Deprecated
+ USE_PROXY(305, "Use Proxy"),
+ TEMPORARY_REDIRECT(307, "Temporary Redirect"),
+ PERMANENT_REDIRECT(308, "Permanent Redirect"),
+ BAD_REQUEST(400, "Bad Request"),
+ UNAUTHORIZED(401, "Unauthorized"),
+ PAYMENT_REQUIRED(402, "Payment Required"),
+ FORBIDDEN(403, "Forbidden"),
+ NOT_FOUND(404, "Not Found"),
+ METHOD_NOT_ALLOWED(405, "Method Not Allowed"),
+ NOT_ACCEPTABLE(406, "Not Acceptable"),
+ PROXY_AUTHENTICATION_REQUIRED(407, "Proxy Authentication Required"),
+ REQUEST_TIMEOUT(408, "Request Timeout"),
+ CONFLICT(409, "Conflict"),
+ GONE(410, "Gone"),
+ LENGTH_REQUIRED(411, "Length Required"),
+ PRECONDITION_FAILED(412, "Precondition Failed"),
+ PAYLOAD_TOO_LARGE(413, "Payload Too Large"),
+ /**
+ * @deprecated
+ */
+ @Deprecated
+ REQUEST_ENTITY_TOO_LARGE(413, "Request Entity Too Large"),
+ URI_TOO_LONG(414, "URI Too Long"),
+ /**
+ * @deprecated
+ */
+ @Deprecated
+ REQUEST_URI_TOO_LONG(414, "Request-URI Too Long"),
+ UNSUPPORTED_MEDIA_TYPE(415, "Unsupported Media Type"),
+ REQUESTED_RANGE_NOT_SATISFIABLE(416, "Requested range not satisfiable"),
+ EXPECTATION_FAILED(417, "Expectation Failed"),
+ I_AM_A_TEAPOT(418, "I'm a teapot"),
+ /**
+ * @deprecated
+ */
+ @Deprecated
+ INSUFFICIENT_SPACE_ON_RESOURCE(419, "Insufficient Space On Resource"),
+ /**
+ * @deprecated
+ */
+ @Deprecated
+ METHOD_FAILURE(420, "Method Failure"),
+ /**
+ * @deprecated
+ */
+ @Deprecated
+ DESTINATION_LOCKED(421, "Destination Locked"),
+ UNPROCESSABLE_ENTITY(422, "Unprocessable Entity"),
+ LOCKED(423, "Locked"),
+ FAILED_DEPENDENCY(424, "Failed Dependency"),
+ UPGRADE_REQUIRED(426, "Upgrade Required"),
+ PRECONDITION_REQUIRED(428, "Precondition Required"),
+ TOO_MANY_REQUESTS(429, "Too Many Requests"),
+ REQUEST_HEADER_FIELDS_TOO_LARGE(431, "Request Header Fields Too Large"),
+ UNAVAILABLE_FOR_LEGAL_REASONS(451, "Unavailable For Legal Reasons"),
+ INTERNAL_SERVER_ERROR(0, "Internal Server Error"),
+ NOT_IMPLEMENTED(501, "Not Implemented"),
+ BAD_GATEWAY(502, "Bad Gateway"),
+ SERVICE_UNAVAILABLE(503, "Service Unavailable"),
+ GATEWAY_TIMEOUT(504, "Gateway Timeout"),
+ HTTP_VERSION_NOT_SUPPORTED(505, "HTTP Version not supported"),
+ VARIANT_ALSO_NEGOTIATES(506, "Variant Also Negotiates"),
+ INSUFFICIENT_STORAGE(507, "Insufficient Storage"),
+ LOOP_DETECTED(508, "Loop Detected"),
+ BANDWIDTH_LIMIT_EXCEEDED(509, "Bandwidth Limit Exceeded"),
+ NOT_EXTENDED(510, "Not Extended"),
+ NETWORK_AUTHENTICATION_REQUIRED(511, "Network Authentication Required"),
+ BCL_CLIENT_FEIGN_ERROR(700, "Feign异常"),
+ BCL_CLIENT_HYSTRIX_CIRCUIT_ERROR(701, "触发熔断"),
+ BCL_CLIENT_HYSTRIX_TIME_OUT_ERROR(702, "HYSTRIX超时"),
+ BCL_CLIENT_GATEWAY_NET_ERROR(703, "网关网络异常"),
+ BCL_GATEWAY_AUTH_ERROR(800, "鉴权异常"),
+ BCL_GATEWAY_SIGN_ERROR(801, "无效签名"),
+ BCL_GATEWAY_APP_ERROR(802, "无效系统"),
+ BCL_GATEWAY_API_ERROR(803, "无效接口"),
+ BCL_GATEWAY_PERM_ERROR(804, "无权限"),
+ BCL_GATEWAY_TS_ERROR(805, "无效TS"),
+ BCL_GATEWAY_TS_TIME_OUT_ERROR(806, "TS超时"),
+ BCL_GATEWAY_ERROR(807, "网关代码异常"),
+ BCL_GATEWAY_SERVER_NET_ERROR(808, "服务端网络异常"),
+ BCL_GATEWAY_SERVER_CODE_ERROR(809, "服务端代码异常"),
+ BCL_GATEWAY_HYSTRIX_CIRCUIT_ERROR(810, "触发熔断"),
+ BCL_DB_ERROR(900, "数据库异常"),
+ BCL_REDIS_ERROR(901, "Redis异常"),
+ BCL_ES_ERROR(902, "ES异常"),
+ BCL_BUSINESS_ERROR(903, "业务异常"),
+ BCL_ILLEGAL_ARGUMENT_ERROR(904, "参数异常"),
+ BCL_UNKNOWN_ERROR(999, "未知异常");
+
+ private final int value;
+ private final String reasonPhrase;
+
+ private ResponseEnum(int value, String reasonPhrase) {
+ this.value = value;
+ this.reasonPhrase = reasonPhrase;
+ }
+
+ @Nullable
+ public static ResponseEnum resolve(int statusCode) {
+ ResponseEnum[] var1 = values();
+ int var2 = var1.length;
+
+ for (int var3 = 0; var3 < var2; ++var3) {
+ ResponseEnum status = var1[var3];
+ if (Objects.equals(status.value, statusCode)) {
+ return status;
+ }
+ }
+
+ return null;
+ }
+
+ public int getValue() {
+ return this.value;
+ }
+
+ public String getReasonPhrase() {
+ return this.reasonPhrase;
+ }
+}
diff --git a/message-push-common/src/main/java/org/kangspace/messagepush/core/event/NacosServiceUpInfo.java b/message-push-common/src/main/java/org/kangspace/messagepush/core/event/NacosServiceUpInfo.java
new file mode 100644
index 0000000..8578488
--- /dev/null
+++ b/message-push-common/src/main/java/org/kangspace/messagepush/core/event/NacosServiceUpInfo.java
@@ -0,0 +1,23 @@
+package org.kangspace.messagepush.core.event;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.kangspace.messagepush.core.hash.ConsistencyHashing;
+
+import java.io.Serializable;
+
+/**
+ * 服务上线信息(用于Rehash时客户端剔除下线)
+ *
+ * @author kango2gler@gmail.com
+ * @since 2021/11/2
+ */
+@Slf4j
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class NacosServiceUpInfo implements Serializable {
+ private ConsistencyHashing consistencyHashing;
+}
diff --git a/message-push-common/src/main/java/org/kangspace/messagepush/core/event/NacosServiceUpdateEvent.java b/message-push-common/src/main/java/org/kangspace/messagepush/core/event/NacosServiceUpdateEvent.java
new file mode 100644
index 0000000..97297d4
--- /dev/null
+++ b/message-push-common/src/main/java/org/kangspace/messagepush/core/event/NacosServiceUpdateEvent.java
@@ -0,0 +1,15 @@
+package org.kangspace.messagepush.core.event;
+
+import org.springframework.context.ApplicationEvent;
+
+/**
+ * Nacos服务更新事件
+ *
+ * @author kango2gler@gmail.com
+ * @since 2021/11/2
+ */
+public class NacosServiceUpdateEvent extends ApplicationEvent {
+ public NacosServiceUpdateEvent(T source) {
+ super(source);
+ }
+}
diff --git a/message-push-common/src/main/java/org/kangspace/messagepush/core/event/NacosServiceUpdateInfo.java b/message-push-common/src/main/java/org/kangspace/messagepush/core/event/NacosServiceUpdateInfo.java
new file mode 100644
index 0000000..311ed29
--- /dev/null
+++ b/message-push-common/src/main/java/org/kangspace/messagepush/core/event/NacosServiceUpdateInfo.java
@@ -0,0 +1,52 @@
+package org.kangspace.messagepush.core.event;
+
+import com.alibaba.nacos.api.naming.pojo.Instance;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.kangspace.messagepush.core.util.MD5Util;
+import org.springframework.util.CollectionUtils;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * 服务变化信息
+ *
+ * @author kango2gler@gmail.com
+ * @since 2021/11/2
+ */
+@Slf4j
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class NacosServiceUpdateInfo implements Serializable {
+ /**
+ * 服务ID
+ */
+ private String serviceId;
+ /**
+ * 实例服务信息
+ */
+ private List instances;
+
+ /**
+ * 获取服务列表摘要字符串
+ * 摘要逻辑: 1. instances 按 "ip:端口" Hash排序
+ * 2. 转换为按,分割的字符串
+ * 3. 对字符串取MD5
+ *
+ * @return
+ * @see MD5Util#hashDigest(Collection)
+ */
+ public String getInstancesDigest() {
+ if (CollectionUtils.isEmpty(instances)) {
+ return null;
+ }
+ return MD5Util.hashDigest(instances.stream().map(t -> t.getIp() + ":" + t.getPort())
+ .collect(Collectors.toList()));
+ }
+}
diff --git a/message-push-common/src/main/java/org/kangspace/messagepush/core/hash/ConsistencyHashing.java b/message-push-common/src/main/java/org/kangspace/messagepush/core/hash/ConsistencyHashing.java
new file mode 100644
index 0000000..a4b588f
--- /dev/null
+++ b/message-push-common/src/main/java/org/kangspace/messagepush/core/hash/ConsistencyHashing.java
@@ -0,0 +1,239 @@
+package org.kangspace.messagepush.core.hash;
+
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.kangspace.messagepush.core.hash.algoithm.HashAlgorithm;
+import org.kangspace.messagepush.core.hash.algoithm.KetamaHashAlgorithm;
+import org.kangspace.messagepush.core.util.MD5Util;
+import org.springframework.util.CollectionUtils;
+
+import java.util.*;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+/**
+ * 一致性hash实现
+ * 基于Ketama 一致性Hash算法:
+ * 1. 取node的md5
+ * 2. 再将md5值每4字节计算一个Hash Key存到Hash环中,即每个node会有4个hash节点
+ * 3. 为node的所有虚拟节点做2的处理,并将结果存到Hash环中
+ * 4. 每个物理节点的虚拟节点在hash环上最好分配100-200个点来抑制分布不均匀,最大限度地减小服务器增减时的缓存重新分布
+ *
+ * @author kango2gler@gmail.com
+ * @since 2021/10/22
+ */
+@Data
+@Slf4j
+public class ConsistencyHashing {
+ /**
+ * 虚拟节点分割符
+ */
+ private static final String VIRTUAL_DELIMITER = "#VN";
+ /**
+ * hash 分组大小
+ */
+ private static final int HASH_GROUP_SIZE = 4;
+ /**
+ * 并发锁
+ */
+ private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
+ private final Lock r = rwl.readLock();
+ private final Lock w = rwl.writeLock();
+ /**
+ *
+ * 每个物理节点的虚拟节点数
+ * 每个物理节点的虚拟节点在hash环上最好分配100-200个点来抑制分布不均匀,最大限度地减小服务器增减时的缓存重新分布
+ * 计算虚拟节点总数时需考虑:numberOfVirtualNode中的每个节点会创建4个虚拟节点
+ *
+ */
+ private Integer numberOfVirtualNode;
+ /**
+ * Hash算法
+ */
+ private HashAlgorithm hashAlgorithm = new KetamaHashAlgorithm();
+ /**
+ * 物理节点的摘要
+ *
+ * @see MD5Util#hashDigest(Collection)
+ */
+ private String physicalNodesDigest;
+ /**
+ * 物理节点列表
+ */
+ private List> physicalNodes = new ArrayList<>();
+ /**
+ * Hash环
+ * key: hash值
+ * value: 虚拟节点
+ */
+ private TreeMap> ring = new TreeMap<>();
+
+ public ConsistencyHashing(int numberOfVirtualNode, List physicalNodes) {
+ this.numberOfVirtualNode = numberOfVirtualNode;
+ if (!CollectionUtils.isEmpty(physicalNodes)) {
+ List> pNodes = physicalNodes.stream().distinct()
+ .map(node -> new PhysicalNode(hashAlgorithm.hashing(node), node))
+ .collect(Collectors.toList());
+ pNodes.forEach(node -> addVirtualNode(node, numberOfVirtualNode));
+ setPhysicalNodes(pNodes);
+ }
+ }
+
+ public ConsistencyHashing(List> physicalNodes, int numberOfVirtualNode) {
+ this.numberOfVirtualNode = numberOfVirtualNode;
+ if (!CollectionUtils.isEmpty(physicalNodes)) {
+ physicalNodes.forEach(node -> addVirtualNode(node, numberOfVirtualNode));
+ setPhysicalNodes(physicalNodes);
+ }
+ }
+
+ public ConsistencyHashing(Map> fromRing, int numberOfVirtualNode) {
+ if (!CollectionUtils.isEmpty(fromRing)) {
+ fromRing.forEach((k, v) -> this.ring.put(Long.valueOf(k), v));
+ List> physicalNodes = fromRing.values().stream().map(t -> t.getPhysicalNode()).collect(Collectors.toList());
+ List> distinctPhysicalNodes = new ArrayList<>(physicalNodes.stream().collect(Collectors.toMap(k -> k.getNode(), v -> v, (v1, v2) -> v1)).values());
+ int totalVirtualCount = fromRing.size();
+ long physicalNodeCount = distinctPhysicalNodes.size();
+ this.numberOfVirtualNode = Math.toIntExact(physicalNodeCount > 0 ? totalVirtualCount / 4 / physicalNodeCount : numberOfVirtualNode);
+ setPhysicalNodes(distinctPhysicalNodes);
+ }
+ }
+
+ private void setPhysicalNodes(List> physicalNodes) {
+ this.physicalNodes = physicalNodes;
+ List nodeIpPorts = physicalNodes.stream().map(t -> t.getNode()).distinct().collect(Collectors.toList());
+ this.physicalNodesDigest = MD5Util.hashDigest(nodeIpPorts);
+ }
+
+ private void addPhysicalNodes(PhysicalNode physicalNode) {
+ this.physicalNodes.add(physicalNode);
+ List nodeIpPorts = this.physicalNodes.stream().map(t -> t.getNode()).collect(Collectors.toList());
+ this.physicalNodesDigest = MD5Util.hashDigest(nodeIpPorts);
+ }
+
+ private void removePhysicalNodes(PhysicalNode physicalNode) {
+ this.physicalNodes = this.physicalNodes.stream().filter(t -> !t.getNode().equals(physicalNode.getNode())).collect(Collectors.toList());
+ List nodeIpPorts = this.physicalNodes.stream().map(t -> t.getNode()).collect(Collectors.toList());
+ this.physicalNodesDigest = MD5Util.hashDigest(nodeIpPorts);
+ }
+
+ /**
+ * 获取节点hash
+ *
+ * @param node node
+ * @return hash结果
+ */
+ public Long getNodeHash(String node) {
+ return this.hashAlgorithm.hashing(node);
+ }
+
+
+ /**
+ * 获取data所在环的虚拟节点
+ *
+ * @param data 数据节点
+ * @return virtualNode
+ */
+ public VirtualNode getVirtualNode(String data) {
+ if (ring.isEmpty()) {
+ return null;
+ }
+ Long hash = hashAlgorithm.hashing(data);
+ r.lock();
+ try {
+ if (!ring.containsKey(hash)) {
+ SortedMap> tailMap = ring.tailMap(hash);
+ hash = tailMap.isEmpty() ? ring.firstKey() : tailMap.firstKey();
+ }
+ return ring.get(hash);
+ } finally {
+ r.unlock();
+ }
+ }
+
+ /**
+ * 添加虚拟节点(使用Ketama一致性HASH算法)
+ *
+ * @param physicalNode 物理节点
+ * @param numberOfVirtualNode 每个物理节点的虚拟节点数
+ */
+ private void addVirtualNode(Node physicalNode, int numberOfVirtualNode) {
+ w.lock();
+ try {
+ // / HASH_GROUP_SIZE
+ for (int i = 0; i < numberOfVirtualNode; i++) {
+ String virtualNode = getVirtualNode(physicalNode, i);
+ byte[] digest = hashAlgorithm.md5(virtualNode);
+ for (int j = 0; j < HASH_GROUP_SIZE; j++) {
+ Long hash = hashAlgorithm.hashing(digest, j);
+ ring.put(hash, new VirtualNode(hash, virtualNode, (PhysicalNode) physicalNode));
+ }
+ }
+ } finally {
+ w.unlock();
+ }
+ }
+
+
+ /**
+ * 删除一个物理节点
+ *
+ * @param physicalNode 物理节点
+ */
+ public void removeNode(PhysicalNode physicalNode) {
+ if (ring.isEmpty()) {
+ return;
+ }
+ w.lock();
+ try {
+ // 实现注意遍历删除可能存在的并发修改异常
+ Iterator iterator = ring.keySet().iterator();
+ while (iterator.hasNext()) {
+ Long nodeHashKey = iterator.next();
+ VirtualNode virtualNode = ring.get(nodeHashKey);
+ if (virtualNode.isVirtualOf(physicalNode)) {
+ iterator.remove();
+ }
+ }
+ removePhysicalNodes(physicalNode);
+ } finally {
+ w.unlock();
+ }
+ }
+
+ /**
+ * 添加一个物理节点
+ *
+ * @param physicalNode 物理节点
+ */
+ public void addNode(PhysicalNode physicalNode) {
+ w.lock();
+ try {
+ addPhysicalNodes(physicalNode);
+ addVirtualNode(physicalNode, this.numberOfVirtualNode);
+ } finally {
+ w.unlock();
+ }
+ }
+
+ /**
+ * 获取虚拟节点节点node值
+ *
+ * @param physicalNode 物理节点
+ * @param number 下表
+ * @return 新的node字符串
+ */
+ private String getVirtualNode(Node physicalNode, int number) {
+ return physicalNode.getNode() + VIRTUAL_DELIMITER + number;
+ }
+
+ /**
+ * 获取虚拟节点数量
+ *
+ * @return 虚拟节点数量
+ */
+ public int getVirtualNodeCount() {
+ return ring.size();
+ }
+}
diff --git a/message-push-common/src/main/java/org/kangspace/messagepush/core/hash/Node.java b/message-push-common/src/main/java/org/kangspace/messagepush/core/hash/Node.java
new file mode 100644
index 0000000..39c7eb8
--- /dev/null
+++ b/message-push-common/src/main/java/org/kangspace/messagepush/core/hash/Node.java
@@ -0,0 +1,38 @@
+package org.kangspace.messagepush.core.hash;
+
+/**
+ * 基本的服务节点(hash环上的节点)
+ *
+ * @author kango2gler@gmail.com
+ * @since 2021/10/22
+ */
+public interface Node {
+ /**
+ * 获取节点Key(一般为HASH值)
+ *
+ * @return key
+ */
+ Long getKey();
+
+ /**
+ * 获取node原始值
+ *
+ * @return String
+ */
+ String getNode();
+
+ /**
+ * 是否物理节点
+ *
+ * @return boolean
+ */
+ boolean isPhysicalNode();
+
+ /**
+ * 获取节点数据
+ *
+ * @return T
+ */
+ T getData();
+
+}
diff --git a/message-push-common/src/main/java/org/kangspace/messagepush/core/hash/PhysicalNode.java b/message-push-common/src/main/java/org/kangspace/messagepush/core/hash/PhysicalNode.java
new file mode 100644
index 0000000..f013d3c
--- /dev/null
+++ b/message-push-common/src/main/java/org/kangspace/messagepush/core/hash/PhysicalNode.java
@@ -0,0 +1,46 @@
+package org.kangspace.messagepush.core.hash;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Objects;
+
+/**
+ * 物理服务节点
+ *
+ * @author kango2gler@gmail.com
+ * @since 2021/10/22
+ */
+@Data
+@NoArgsConstructor
+public class PhysicalNode