From a843ca3c663aa9aac340077928312a0a3ff33112 Mon Sep 17 00:00:00 2001 From: walter Date: Fri, 8 Nov 2024 20:36:28 +0800 Subject: [PATCH] [fix](backup) Load backup meta and job info bytes from disk #43276 (#43518) cherry pick from #43276 --- .../org/apache/doris/backup/AbstractJob.java | 2 + .../apache/doris/backup/BackupHandler.java | 50 ++++++++++++++++--- .../org/apache/doris/backup/BackupJob.java | 45 +++++++++++------ .../org/apache/doris/backup/RestoreJob.java | 5 ++ .../doris/service/FrontendServiceImpl.java | 7 ++- 5 files changed, 85 insertions(+), 24 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java index 672a92c59e31b9..375aa789a773b6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java @@ -155,6 +155,8 @@ public void setTypeRead(boolean isTypeRead) { public abstract boolean isCancelled(); + public abstract boolean isFinished(); + public static AbstractJob read(DataInput in) throws IOException { AbstractJob job = null; JobType type = JobType.valueOf(Text.readString(in)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java index 496727c1e00086..a3fd66692a2928 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java @@ -108,10 +108,10 @@ public class BackupHandler extends MasterDaemon implements Writable { private Env env; - // map to store backup info, key is label name, value is Pair, meta && info is bytes - // this map not present in persist && only in fe master memory + // map to store backup info, key is label name, value is the BackupJob + // this map not present in persist && only in fe memory // one table only keep one snapshot info, only keep last - private final Map localSnapshots = new HashMap<>(); + private final Map localSnapshots = new HashMap<>(); private ReadWriteLock localSnapshotsLock = new ReentrantReadWriteLock(); public BackupHandler() { @@ -166,6 +166,7 @@ private boolean init() { return false; } } + isInit = true; return true; } @@ -484,11 +485,15 @@ private void addBackupOrRestoreJob(long dbId, AbstractJob job) { return; } + List removedLabels = Lists.newArrayList(); jobLock.lock(); try { Deque jobs = dbIdToBackupOrRestoreJobs.computeIfAbsent(dbId, k -> Lists.newLinkedList()); while (jobs.size() >= Config.max_backup_restore_job_num_per_db) { - jobs.removeFirst(); + AbstractJob removedJob = jobs.removeFirst(); + if (removedJob instanceof BackupJob && ((BackupJob) removedJob).isLocalSnapshot()) { + removedLabels.add(removedJob.getLabel()); + } } AbstractJob lastJob = jobs.peekLast(); @@ -501,6 +506,17 @@ private void addBackupOrRestoreJob(long dbId, AbstractJob job) { } finally { jobLock.unlock(); } + + if (job.isFinished() && job instanceof BackupJob) { + // Save snapshot to local repo, when reload backupHandler from image. + BackupJob backupJob = (BackupJob) job; + if (backupJob.isLocalSnapshot()) { + addSnapshot(backupJob.getLabel(), backupJob); + } + } + for (String label : removedLabels) { + removeSnapshot(label); + } } private List getAllCurrentJobs() { @@ -737,22 +753,42 @@ public boolean report(TTaskType type, long jobId, long taskId, int finishedNum, return false; } - public void addSnapshot(String labelName, Snapshot snapshot) { + public void addSnapshot(String labelName, BackupJob backupJob) { + assert backupJob.isFinished(); + + LOG.info("add snapshot {} to local repo", labelName); localSnapshotsLock.writeLock().lock(); try { - localSnapshots.put(labelName, snapshot); + localSnapshots.put(labelName, backupJob); + } finally { + localSnapshotsLock.writeLock().unlock(); + } + } + + public void removeSnapshot(String labelName) { + LOG.info("remove snapshot {} from local repo", labelName); + localSnapshotsLock.writeLock().lock(); + try { + localSnapshots.remove(labelName); } finally { localSnapshotsLock.writeLock().unlock(); } } public Snapshot getSnapshot(String labelName) { + BackupJob backupJob; localSnapshotsLock.readLock().lock(); try { - return localSnapshots.get(labelName); + backupJob = localSnapshots.get(labelName); } finally { localSnapshotsLock.readLock().unlock(); } + + if (backupJob == null) { + return null; + } + + return backupJob.getSnapshot(); } public static BackupHandler read(DataInput in) throws IOException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java index 0ed342a57cdcdc..dc92e9a07c3c1f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java @@ -123,9 +123,6 @@ public enum BackupJobState { // backup properties && table commit seq with table id private Map properties = Maps.newHashMap(); - private byte[] metaInfoBytes = null; - private byte[] jobInfoBytes = null; - public BackupJob() { super(JobType.BACKUP); } @@ -333,11 +330,7 @@ public synchronized boolean finishSnapshotUploadTask(UploadTask task, TFinishTas @Override public synchronized void replayRun() { - LOG.info("replay run backup job: {}", this); - if (state == BackupJobState.FINISHED && repoId == Repository.KEEP_ON_LOCAL_REPO_ID) { - Snapshot snapshot = new Snapshot(label, metaInfoBytes, jobInfoBytes); - env.getBackupHandler().addSnapshot(label, snapshot); - } + // nothing to do } @Override @@ -355,6 +348,11 @@ public boolean isCancelled() { return state == BackupJobState.CANCELLED; } + @Override + public boolean isFinished() { + return state == BackupJobState.FINISHED; + } + // Polling the job state and do the right things. @Override public synchronized void run() { @@ -792,8 +790,6 @@ private void saveMetaInfo() { } backupMeta.writeToFile(metaInfoFile); localMetaInfoFilePath = metaInfoFile.getAbsolutePath(); - // read meta info to metaInfoBytes - metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath()); // 3. save job info file Map tableCommitSeqMap = Maps.newHashMap(); @@ -818,8 +814,6 @@ private void saveMetaInfo() { } jobInfo.writeToFile(jobInfoFile); localJobInfoFilePath = jobInfoFile.getAbsolutePath(); - // read job info to jobInfoBytes - jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath()); } catch (Exception e) { status = new Status(ErrCode.COMMON_ERROR, "failed to save meta info and job info file: " + e.getMessage()); return; @@ -873,7 +867,6 @@ private void uploadMetaAndJobInfoFile() { } } - finishedTime = System.currentTimeMillis(); state = BackupJobState.FINISHED; @@ -882,8 +875,7 @@ private void uploadMetaAndJobInfoFile() { LOG.info("job is finished. {}", this); if (repoId == Repository.KEEP_ON_LOCAL_REPO_ID) { - Snapshot snapshot = new Snapshot(label, metaInfoBytes, jobInfoBytes); - env.getBackupHandler().addSnapshot(label, snapshot); + env.getBackupHandler().addSnapshot(label, this); return; } } @@ -976,6 +968,29 @@ private void cancelInternal() { LOG.info("finished to cancel backup job. current state: {}. {}", curState.name(), this); } + public boolean isLocalSnapshot() { + return repoId == Repository.KEEP_ON_LOCAL_REPO_ID; + } + + // read meta and job info bytes from disk, and return the snapshot + public synchronized Snapshot getSnapshot() { + if (state != BackupJobState.FINISHED || repoId != Repository.KEEP_ON_LOCAL_REPO_ID) { + return null; + } + + try { + File metaInfoFile = new File(localMetaInfoFilePath); + File jobInfoFile = new File(localJobInfoFilePath); + byte[] metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath()); + byte[] jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath()); + return new Snapshot(label, metaInfoBytes, jobInfoBytes); + } catch (IOException e) { + LOG.warn("failed to load meta info and job info file, meta info file {}, job info file {}: ", + localMetaInfoFilePath, localJobInfoFilePath, e); + return null; + } + } + public synchronized List getInfo() { List info = Lists.newArrayList(); info.add(String.valueOf(jobId)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 36def1738d4397..69bcb3f7941f9d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -377,6 +377,11 @@ public boolean isCancelled() { return state == RestoreJobState.CANCELLED; } + @Override + public boolean isFinished() { + return state == RestoreJobState.FINISHED; + } + @Override public synchronized void run() { if (state == RestoreJobState.FINISHED || state == RestoreJobState.CANCELLED) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 9dbd0afd1c319f..25fa5e1524c219 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -2803,15 +2803,18 @@ private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request, String c } // Step 3: get snapshot + String label = request.getLabelName(); TGetSnapshotResult result = new TGetSnapshotResult(); result.setStatus(new TStatus(TStatusCode.OK)); - Snapshot snapshot = Env.getCurrentEnv().getBackupHandler().getSnapshot(request.getLabelName()); + Snapshot snapshot = Env.getCurrentEnv().getBackupHandler().getSnapshot(label); if (snapshot == null) { result.getStatus().setStatusCode(TStatusCode.SNAPSHOT_NOT_EXIST); - result.getStatus().addToErrorMsgs("snapshot not exist"); + result.getStatus().addToErrorMsgs(String.format("snapshot %s not exist", label)); } else { result.setMeta(snapshot.getMeta()); result.setJobInfo(snapshot.getJobInfo()); + LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info size: {}", + label, snapshot.getMeta().length, snapshot.getJobInfo().length); } return result;