Skip to content

Commit

Permalink
add jpa to date type storage (#1431)
Browse files Browse the repository at this point in the history
Signed-off-by: Clownsw <[email protected]>
  • Loading branch information
Clownsw authored Dec 19, 2023
1 parent 38ef882 commit f6aeb48
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class History {
@Schema(title = "Monitoring Metric usage speed count")
private String metric;

@Column(length = 2048)
@Column(length = 5000)
private String instance;

@Schema(title = "Metric Type 0: Number 1:String")
Expand All @@ -56,6 +56,9 @@ public class History {
@Column(length = 2048)
private String str;

@Schema(title = "Metric Integer Value")
private Integer int32;

@Schema(title = "Metric Number Value")
private Double dou;

Expand Down
3 changes: 2 additions & 1 deletion script/sql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,10 @@ CREATE TABLE hzb_history
app varchar(100) not null comment '监控类型 mysql oracle db2',
metrics varchar(100) not null comment '指标集合名称 innodb disk cpu',
metric varchar(100) not null comment '指标名称 usage speed count',
instance varchar(1024) comment '实例',
instance varchar(5000) comment '实例',
metric_type tinyint not null comment '字段类型 0: 数值 1:字符串',
str varchar(1024) comment '字符值',
int32 int comment '整数',
dou float comment '数值',
time bigint comment '采集时间戳',
primary key (id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@
package org.dromara.hertzbeat.warehouse.store;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.dromara.hertzbeat.common.constants.CommonConstants;
import org.dromara.hertzbeat.common.entity.dto.Value;
import org.dromara.hertzbeat.common.entity.message.CollectRep;
import org.dromara.hertzbeat.common.entity.warehouse.History;
import org.dromara.hertzbeat.common.constants.CommonConstants;
import org.dromara.hertzbeat.common.util.JsonUtil;
import org.dromara.hertzbeat.common.util.TimePeriodUtil;
import org.dromara.hertzbeat.warehouse.config.WarehouseProperties;
import org.dromara.hertzbeat.warehouse.dao.HistoryDao;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.math.NumberUtils;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.domain.Sort;
import org.springframework.data.jpa.domain.Specification;
Expand All @@ -49,7 +50,6 @@
* data storage by mysql/h2 - jpa
*
* @author tom
*
*/
@Component
@ConditionalOnProperty(prefix = "warehouse.store.jpa",
Expand All @@ -62,55 +62,56 @@ public class HistoryJpaDatabaseDataStorage extends AbstractHistoryDataStorage {
private static final int STRING_MAX_LENGTH = 1024;

public HistoryJpaDatabaseDataStorage(WarehouseProperties properties,
HistoryDao historyDao) {
HistoryDao historyDao) {
this.jpaProperties = properties.getStore().getJpa();
this.serverAvailable = true;
this.historyDao = historyDao;
expiredDataCleaner();
expiredDataCleaner();
}

public void expiredDataCleaner() {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setUncaughtExceptionHandler((thread, throwable) -> {
log.error("Jpa metrics store has uncaughtException.");
log.error(throwable.getMessage(), throwable); })
.setDaemon(true)
.setNameFormat("jpa-metrics-cleaner-%d")
.build();
ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
scheduledExecutor.scheduleAtFixedRate(() -> {
log.warn("[jpa-metrics-store]-start running expired data cleaner." +
"Please use time series db instead of jpa for better performance");
String expireTimeStr = jpaProperties.getExpireTime();
long expireTime = 0;
try {
if (NumberUtils.isParsable(expireTimeStr)) {
expireTime = NumberUtils.toLong(expireTimeStr);
expireTime = (ZonedDateTime.now().toEpochSecond() + expireTime) * 1000;
} else {
TemporalAmount temporalAmount = TimePeriodUtil.parseTokenTime(expireTimeStr);
ZonedDateTime dateTime = ZonedDateTime.now().minus(temporalAmount);
expireTime = dateTime.toEpochSecond() * 1000;
}
} catch (Exception e) {
log.error("expiredDataCleaner time error: {}. use default expire time to clean: 1h", e.getMessage());
ZonedDateTime dateTime = ZonedDateTime.now().minus(Duration.ofHours(1));
expireTime = dateTime.toEpochSecond() * 1000;
}
try {
int rows = historyDao.deleteHistoriesByTimeBefore(expireTime);
log.info("[jpa-metrics-store]-delete {} rows.", rows);
long total = historyDao.count();
if (total > jpaProperties.getMaxHistoryRecordNum()) {
rows = historyDao.deleteOlderHistoriesRecord(jpaProperties.getMaxHistoryRecordNum() / 2);
log.warn("[jpa-metrics-store]-force delete {} rows due too many. Please use time series db instead of jpa for better performance.", rows);
}
} catch (Exception e) {
log.error("expiredDataCleaner database error: {}.", e.getMessage());
log.error("try to truncate table hzb_history. Please use time series db instead of jpa for better performance.");
historyDao.truncateTable();
}
}, 5, 30, TimeUnit.SECONDS);
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setUncaughtExceptionHandler((thread, throwable) -> {
log.error("Jpa metrics store has uncaughtException.");
log.error(throwable.getMessage(), throwable);
})
.setDaemon(true)
.setNameFormat("jpa-metrics-cleaner-%d")
.build();
ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
scheduledExecutor.scheduleAtFixedRate(() -> {
log.warn("[jpa-metrics-store]-start running expired data cleaner." +
"Please use time series db instead of jpa for better performance");
String expireTimeStr = jpaProperties.getExpireTime();
long expireTime = 0;
try {
if (NumberUtils.isParsable(expireTimeStr)) {
expireTime = NumberUtils.toLong(expireTimeStr);
expireTime = (ZonedDateTime.now().toEpochSecond() + expireTime) * 1000;
} else {
TemporalAmount temporalAmount = TimePeriodUtil.parseTokenTime(expireTimeStr);
ZonedDateTime dateTime = ZonedDateTime.now().minus(temporalAmount);
expireTime = dateTime.toEpochSecond() * 1000;
}
} catch (Exception e) {
log.error("expiredDataCleaner time error: {}. use default expire time to clean: 1h", e.getMessage());
ZonedDateTime dateTime = ZonedDateTime.now().minus(Duration.ofHours(1));
expireTime = dateTime.toEpochSecond() * 1000;
}
try {
int rows = historyDao.deleteHistoriesByTimeBefore(expireTime);
log.info("[jpa-metrics-store]-delete {} rows.", rows);
long total = historyDao.count();
if (total > jpaProperties.getMaxHistoryRecordNum()) {
rows = historyDao.deleteOlderHistoriesRecord(jpaProperties.getMaxHistoryRecordNum() / 2);
log.warn("[jpa-metrics-store]-force delete {} rows due too many. Please use time series db instead of jpa for better performance.", rows);
}
} catch (Exception e) {
log.error("expiredDataCleaner database error: {}.", e.getMessage());
log.error("try to truncate table hzb_history. Please use time series db instead of jpa for better performance.");
historyDao.truncateTable();
}
}, 5, 30, TimeUnit.SECONDS);
}

@Override
Expand All @@ -135,43 +136,69 @@ void saveData(CollectRep.MetricsData metricsData) {
for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) {
Map<String, String> labels = new HashMap<>(8);
for (int i = 0; i < fieldsList.size(); i++) {
CollectRep.Field field = fieldsList.get(i);
if (field.getLabel() && !CommonConstants.NULL_VALUE.equals(valueRow.getColumns(i))) {
labels.put(field.getName(), valueRow.getColumns(i));
}
}
for (int i = 0; i < fieldsList.size(); i++) {
CollectRep.Field field = fieldsList.get(i);
// ignore string value store in db
if (field.getType() == CommonConstants.TYPE_STRING) {
continue;
}
historyBuilder.metric(field.getName());
historyBuilder.instance(JsonUtil.toJson(labels));
if (!CommonConstants.NULL_VALUE.equals(valueRow.getColumns(i))) {
if (field.getType() == CommonConstants.TYPE_NUMBER) {
historyBuilder.metricType(CommonConstants.TYPE_NUMBER)
.dou(Double.parseDouble(valueRow.getColumns(i)));
} else if (field.getType() == CommonConstants.TYPE_STRING) {
historyBuilder.metricType(CommonConstants.TYPE_STRING)
.str(formatStrValue(valueRow.getColumns(i)));
final CollectRep.Field field = fieldsList.get(i);
final int fieldType = field.getType();
final String fieldName = field.getName();
final String columnValue = valueRow.getColumns(i);

historyBuilder.metric(fieldName);

if (CommonConstants.NULL_VALUE.equals(columnValue)) {
switch (fieldType) {
case CommonConstants.TYPE_NUMBER: {
historyBuilder.metricType(CommonConstants.TYPE_NUMBER)
.dou(null);
break;
}

case CommonConstants.TYPE_STRING: {
historyBuilder.metricType(CommonConstants.TYPE_STRING)
.str(null);
break;
}

case CommonConstants.TYPE_TIME: {
historyBuilder.metricType(CommonConstants.TYPE_TIME)
.int32(null);
break;
}
}
} else {
if (field.getType() == CommonConstants.TYPE_NUMBER) {
historyBuilder.metricType(CommonConstants.TYPE_NUMBER).dou(null);
} else if (field.getType() == CommonConstants.TYPE_STRING) {
historyBuilder.metricType(CommonConstants.TYPE_STRING).str(null);
switch (fieldType) {
case CommonConstants.TYPE_NUMBER: {
historyBuilder.metricType(CommonConstants.TYPE_NUMBER)
.dou(Double.parseDouble(columnValue));
break;
}

case CommonConstants.TYPE_STRING: {
historyBuilder.metricType(CommonConstants.TYPE_STRING)
.str(formatStrValue(columnValue));
break;
}

case CommonConstants.TYPE_TIME: {
historyBuilder.metricType(CommonConstants.TYPE_TIME)
.int32(Integer.parseInt(columnValue));
break;
}
}

if (field.getLabel()) {
labels.put(fieldName, columnValue);
}
}

historyList.add(historyBuilder.build());
}
historyBuilder.instance(JsonUtil.toJson(labels));
}
historyDao.saveAll(historyList);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}

@Override
public Map<String, List<Value>> getHistoryMetricData(Long monitorId, String app, String metrics, String metric, String label, String history) {
Map<String, List<Value>> instanceValuesMap = new HashMap<>(8);
Expand All @@ -185,10 +212,12 @@ public Map<String, List<Value>> getHistoryMetricData(Long monitorId, String app,
andList.add(predicateMonitorType);
andList.add(predicateMonitorMetrics);
andList.add(predicateMonitorMetric);
if (label != null && !"".equals(label)) {

if (StringUtils.isNotBlank(label)) {
Predicate predicateMonitorInstance = criteriaBuilder.equal(root.get("instance"), label);
andList.add(predicateMonitorInstance);
}

if (history != null) {
try {
TemporalAmount temporalAmount = TimePeriodUtil.parseTokenTime(history);
Expand Down

0 comments on commit f6aeb48

Please sign in to comment.