Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(interactive): fix backup conflict checking, refine log recycling #3909

Merged
merged 2 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions charts/graphscope-store/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ data:
offsets.persist.interval.ms={{ .Values.offsetsPersistIntervalMs }}
file.meta.store.path={{ .Values.fileMetaStorePath }}
log.recycle.enable={{ .Values.logRecycleEnable }}
log.recycle.offset.reserve={{ .Values.logRecycleOffsetReserve }}

## Extra Config
{{- if .Values.extraConfig }}
Expand Down
1 change: 1 addition & 0 deletions charts/graphscope-store/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ snapshotIncreaseIntervalMs: 1000
offsetsPersistIntervalMs: 3000
fileMetaStorePath: "/etc/groot/my.meta"
logRecycleEnable: true
logRecycleOffsetReserve: 86400

## Store Config
storeDataPath: "/var/lib/graphscope-store"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ public class CoordinatorConfig {
public static final Config<Long> LOG_RECYCLE_INTERVAL_SECOND =
Config.longConfig("log.recycle.interval.second", 3600L);

public static final Config<Long> LOG_RECYCLE_OFFSET_RESERVE =
Config.longConfig("log.recycle.offset.reserve", 86400);

public static final Config<String> FILE_META_STORE_PATH =
Config.stringConfig("file.meta.store.path", "./meta");
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,4 +175,16 @@ public static boolean isLockAvailable(Configs configs) {
}
return true;
}

public static boolean isMetaFreshEnough(Configs configs, long delta) {
String dataRoot = StoreConfig.STORE_DATA_PATH.get(configs);
File metaDir = Paths.get(dataRoot, "meta").toAbsolutePath().toFile();
if (metaDir.exists()) {
long lastModified = metaDir.lastModified();
long ts = System.currentTimeMillis();
return ts - lastModified < delta;
}
// not exists also means fresh enough
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ public class LogRecycler {
private ScheduledExecutorService scheduler;
private final boolean recycleEnable;
private final long recycleIntervalSeconds;
private final long recycleOffsetReserve;

public LogRecycler(Configs configs, LogService logService, SnapshotManager snapshotManager) {
this.logService = logService;
this.snapshotManager = snapshotManager;
this.recycleEnable = CoordinatorConfig.LOG_RECYCLE_ENABLE.get(configs);
this.recycleIntervalSeconds = CoordinatorConfig.LOG_RECYCLE_INTERVAL_SECOND.get(configs);
this.recycleOffsetReserve = CoordinatorConfig.LOG_RECYCLE_OFFSET_RESERVE.get(configs);
}

public void start() {
Expand Down Expand Up @@ -84,7 +86,7 @@ private void doRecycle() {
List<Long> queueOffsets = this.snapshotManager.getQueueOffsets();
for (int i = 0; i < queueOffsets.size(); i++) {
long offset = queueOffsets.get(i);
offset = Math.max(offset - 3600, 0); // Leave some spaces
offset = Math.max(offset - recycleOffsetReserve, 0); // Leave some spaces
try {
logService.deleteBeforeOffset(i, offset);
logger.info("recycled queue [{}] offset [{}]", i, offset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ public void init() {
AdminClient admin = getAdmin();
NewTopic newTopic = new NewTopic(this.topic, this.storeCount, this.replicationFactor);
Map<String, String> configs = new HashMap<>();
configs.put("retention.ms", "-1");
configs.put("retention.bytes", "-1");
// Respect the global settings is enough for us
// configs.put("retention.ms", "-1");
// configs.put("retention.bytes", "-1");
configs.put("max.message.bytes", String.valueOf(this.maxMessageMb * 1024 * 1024));
newTopic.configs(configs);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,17 @@ public static void main(String[] args) throws Exception {
latch.getLeader(),
latch.getState());
latch.await();
// Sleep 5s before check the lock to prevent the leader has not
// Sleep 10s before check the lock to prevent the leader has not
// released the resource yet.
Thread.sleep(5000);
if (Utils.isLockAvailable(conf)) {
logger.info("LOCK is available, node starting");
Thread.sleep(10000);
if (Utils.isLockAvailable(conf) && !Utils.isMetaFreshEnough(conf, 9000)) {
logger.info("LOCK is available and meta stop updating, node starting");
break;
}
latch.close();
logger.info("LOCK is unavailable, the leader may still exists");
logger.info(
"LOCK is unavailable or the meta is still updating, the leader may"
+ " still exists");
// The leader has lost connection but still alive,
// give it another chance
Thread.sleep(60000);
Expand All @@ -95,7 +97,7 @@ public static void main(String[] args) throws Exception {
logger.error("Exception while leader election", e);
throw e;
}
// curator.close();
// curator.close();
}
}
NodeLauncher launcher = new NodeLauncher(node);
Expand Down
Loading