Skip to content

Commit

Permalink
add jpa to date type storage (#1431)
Browse files Browse the repository at this point in the history
Signed-off-by: Clownsw <[email protected]>
  • Loading branch information
Clownsw authored Dec 19, 2023
1 parent 8c10c83 commit 911d6c1
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class History {
@Schema(title = "Monitoring Metric usage speed count")
private String metric;

@Column(length = 2048)
@Column(length = 5000)
private String instance;

@Schema(title = "Metric Type 0: Number 1:String")
Expand All @@ -56,6 +56,9 @@ public class History {
@Column(length = 2048)
private String str;

@Schema(title = "Metric Integer Value")
private Integer int32;

@Schema(title = "Metric Number Value")
private Double dou;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@
package org.dromara.hertzbeat.warehouse.store;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.dromara.hertzbeat.common.constants.CommonConstants;
import org.dromara.hertzbeat.common.entity.dto.Value;
import org.dromara.hertzbeat.common.entity.message.CollectRep;
import org.dromara.hertzbeat.common.entity.warehouse.History;
import org.dromara.hertzbeat.common.constants.CommonConstants;
import org.dromara.hertzbeat.common.util.JsonUtil;
import org.dromara.hertzbeat.common.util.TimePeriodUtil;
import org.dromara.hertzbeat.warehouse.config.WarehouseProperties;
import org.dromara.hertzbeat.warehouse.dao.HistoryDao;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.math.NumberUtils;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.domain.Sort;
import org.springframework.data.jpa.domain.Specification;
Expand All @@ -49,7 +50,6 @@
* data storage by mysql/h2 - jpa
*
*
*
*/
@Component
@ConditionalOnProperty(prefix = "warehouse.store.jpa",
Expand All @@ -62,55 +62,56 @@ public class HistoryJpaDatabaseDataStorage extends AbstractHistoryDataStorage {
private static final int STRING_MAX_LENGTH = 1024;

public HistoryJpaDatabaseDataStorage(WarehouseProperties properties,
HistoryDao historyDao) {
HistoryDao historyDao) {
this.jpaProperties = properties.getStore().getJpa();
this.serverAvailable = true;
this.historyDao = historyDao;
expiredDataCleaner();
expiredDataCleaner();
}

public void expiredDataCleaner() {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setUncaughtExceptionHandler((thread, throwable) -> {
log.error("Jpa metrics store has uncaughtException.");
log.error(throwable.getMessage(), throwable); })
.setDaemon(true)
.setNameFormat("jpa-metrics-cleaner-%d")
.build();
ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
scheduledExecutor.scheduleAtFixedRate(() -> {
log.warn("[jpa-metrics-store]-start running expired data cleaner." +
"Please use time series db instead of jpa for better performance");
String expireTimeStr = jpaProperties.getExpireTime();
long expireTime = 0;
try {
if (NumberUtils.isParsable(expireTimeStr)) {
expireTime = NumberUtils.toLong(expireTimeStr);
expireTime = (ZonedDateTime.now().toEpochSecond() + expireTime) * 1000;
} else {
TemporalAmount temporalAmount = TimePeriodUtil.parseTokenTime(expireTimeStr);
ZonedDateTime dateTime = ZonedDateTime.now().minus(temporalAmount);
expireTime = dateTime.toEpochSecond() * 1000;
}
} catch (Exception e) {
log.error("expiredDataCleaner time error: {}. use default expire time to clean: 1h", e.getMessage());
ZonedDateTime dateTime = ZonedDateTime.now().minus(Duration.ofHours(1));
expireTime = dateTime.toEpochSecond() * 1000;
}
try {
int rows = historyDao.deleteHistoriesByTimeBefore(expireTime);
log.info("[jpa-metrics-store]-delete {} rows.", rows);
long total = historyDao.count();
if (total > jpaProperties.getMaxHistoryRecordNum()) {
rows = historyDao.deleteOlderHistoriesRecord(jpaProperties.getMaxHistoryRecordNum() / 2);
log.warn("[jpa-metrics-store]-force delete {} rows due too many. Please use time series db instead of jpa for better performance.", rows);
}
} catch (Exception e) {
log.error("expiredDataCleaner database error: {}.", e.getMessage());
log.error("try to truncate table hzb_history. Please use time series db instead of jpa for better performance.");
historyDao.truncateTable();
}
}, 5, 30, TimeUnit.SECONDS);
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setUncaughtExceptionHandler((thread, throwable) -> {
log.error("Jpa metrics store has uncaughtException.");
log.error(throwable.getMessage(), throwable);
})
.setDaemon(true)
.setNameFormat("jpa-metrics-cleaner-%d")
.build();
ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
scheduledExecutor.scheduleAtFixedRate(() -> {
log.warn("[jpa-metrics-store]-start running expired data cleaner." +
"Please use time series db instead of jpa for better performance");
String expireTimeStr = jpaProperties.getExpireTime();
long expireTime = 0;
try {
if (NumberUtils.isParsable(expireTimeStr)) {
expireTime = NumberUtils.toLong(expireTimeStr);
expireTime = (ZonedDateTime.now().toEpochSecond() + expireTime) * 1000;
} else {
TemporalAmount temporalAmount = TimePeriodUtil.parseTokenTime(expireTimeStr);
ZonedDateTime dateTime = ZonedDateTime.now().minus(temporalAmount);
expireTime = dateTime.toEpochSecond() * 1000;
}
} catch (Exception e) {
log.error("expiredDataCleaner time error: {}. use default expire time to clean: 1h", e.getMessage());
ZonedDateTime dateTime = ZonedDateTime.now().minus(Duration.ofHours(1));
expireTime = dateTime.toEpochSecond() * 1000;
}
try {
int rows = historyDao.deleteHistoriesByTimeBefore(expireTime);
log.info("[jpa-metrics-store]-delete {} rows.", rows);
long total = historyDao.count();
if (total > jpaProperties.getMaxHistoryRecordNum()) {
rows = historyDao.deleteOlderHistoriesRecord(jpaProperties.getMaxHistoryRecordNum() / 2);
log.warn("[jpa-metrics-store]-force delete {} rows due too many. Please use time series db instead of jpa for better performance.", rows);
}
} catch (Exception e) {
log.error("expiredDataCleaner database error: {}.", e.getMessage());
log.error("try to truncate table hzb_history. Please use time series db instead of jpa for better performance.");
historyDao.truncateTable();
}
}, 5, 30, TimeUnit.SECONDS);
}

@Override
Expand All @@ -135,43 +136,69 @@ void saveData(CollectRep.MetricsData metricsData) {
for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) {
Map<String, String> labels = new HashMap<>(8);
for (int i = 0; i < fieldsList.size(); i++) {
CollectRep.Field field = fieldsList.get(i);
if (field.getLabel() && !CommonConstants.NULL_VALUE.equals(valueRow.getColumns(i))) {
labels.put(field.getName(), valueRow.getColumns(i));
}
}
for (int i = 0; i < fieldsList.size(); i++) {
CollectRep.Field field = fieldsList.get(i);
// ignore string value store in db
if (field.getType() == CommonConstants.TYPE_STRING) {
continue;
}
historyBuilder.metric(field.getName());
historyBuilder.instance(JsonUtil.toJson(labels));
if (!CommonConstants.NULL_VALUE.equals(valueRow.getColumns(i))) {
if (field.getType() == CommonConstants.TYPE_NUMBER) {
historyBuilder.metricType(CommonConstants.TYPE_NUMBER)
.dou(Double.parseDouble(valueRow.getColumns(i)));
} else if (field.getType() == CommonConstants.TYPE_STRING) {
historyBuilder.metricType(CommonConstants.TYPE_STRING)
.str(formatStrValue(valueRow.getColumns(i)));
final CollectRep.Field field = fieldsList.get(i);
final int fieldType = field.getType();
final String fieldName = field.getName();
final String columnValue = valueRow.getColumns(i);

historyBuilder.metric(fieldName);

if (CommonConstants.NULL_VALUE.equals(columnValue)) {
switch (fieldType) {
case CommonConstants.TYPE_NUMBER: {
historyBuilder.metricType(CommonConstants.TYPE_NUMBER)
.dou(null);
break;
}

case CommonConstants.TYPE_STRING: {
historyBuilder.metricType(CommonConstants.TYPE_STRING)
.str(null);
break;
}

case CommonConstants.TYPE_TIME: {
historyBuilder.metricType(CommonConstants.TYPE_TIME)
.int32(null);
break;
}
}
} else {
if (field.getType() == CommonConstants.TYPE_NUMBER) {
historyBuilder.metricType(CommonConstants.TYPE_NUMBER).dou(null);
} else if (field.getType() == CommonConstants.TYPE_STRING) {
historyBuilder.metricType(CommonConstants.TYPE_STRING).str(null);
switch (fieldType) {
case CommonConstants.TYPE_NUMBER: {
historyBuilder.metricType(CommonConstants.TYPE_NUMBER)
.dou(Double.parseDouble(columnValue));
break;
}

case CommonConstants.TYPE_STRING: {
historyBuilder.metricType(CommonConstants.TYPE_STRING)
.str(formatStrValue(columnValue));
break;
}

case CommonConstants.TYPE_TIME: {
historyBuilder.metricType(CommonConstants.TYPE_TIME)
.int32(Integer.parseInt(columnValue));
break;
}
}

if (field.getLabel()) {
labels.put(fieldName, columnValue);
}
}

historyList.add(historyBuilder.build());
}
historyBuilder.instance(JsonUtil.toJson(labels));
}
historyDao.saveAll(historyList);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}

@Override
public Map<String, List<Value>> getHistoryMetricData(Long monitorId, String app, String metrics, String metric, String label, String history) {
Map<String, List<Value>> instanceValuesMap = new HashMap<>(8);
Expand All @@ -185,10 +212,12 @@ public Map<String, List<Value>> getHistoryMetricData(Long monitorId, String app,
andList.add(predicateMonitorType);
andList.add(predicateMonitorMetrics);
andList.add(predicateMonitorMetric);
if (label != null && !"".equals(label)) {

if (StringUtils.isNotBlank(label)) {
Predicate predicateMonitorInstance = criteriaBuilder.equal(root.get("instance"), label);
andList.add(predicateMonitorInstance);
}

if (history != null) {
try {
TemporalAmount temporalAmount = TimePeriodUtil.parseTokenTime(history);
Expand Down

0 comments on commit 911d6c1

Please sign in to comment.