Skip to content

Commit

Permalink
improve retry and clock check mechanisms
Browse files Browse the repository at this point in the history
  • Loading branch information
cbaker6 committed Apr 25, 2023
1 parent e763fdc commit f124e1a
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 17 deletions.
38 changes: 34 additions & 4 deletions Sources/ParseCareKit/Models/RemoteSynchronizing.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@ import CareKitStore
import Foundation

actor RemoteSynchronizing {
var isSynchronizing = false
var isSynchronizing = false {
willSet {
if newValue {
resetLiveQueryRetry()
}
}
}
var liveQueryRetry = 0
var clock: PCKClock?
var knowledgeVector: OCKRevisionRecord.KnowledgeVector? {
clock?.knowledgeVector
Expand All @@ -24,17 +31,40 @@ actor RemoteSynchronizing {
isSynchronizing = false
}

func resetLiveQueryRetry() {
liveQueryRetry = 0
}

func retryLiveQueryAfter() throws -> Int {
liveQueryRetry += 1
guard liveQueryRetry <= 10 else {
throw ParseCareKitError.errorString("Max retries reached")
}
return Int.random(in: 0...liveQueryRetry)
}

func updateClock(_ clock: PCKClock?) {
self.clock = clock
}

func hasNewerClock(_ vector: OCKRevisionRecord.KnowledgeVector, for uuid: UUID) -> Bool {
guard !isSynchronizing else {
return false
func updateClockIfNeeded(_ clock: PCKClock) {
guard self.clock == nil else {
return
}
self.clock = clock
}

func hasNewerClock(_ vector: OCKRevisionRecord.KnowledgeVector, for uuid: UUID) -> Bool {
guard let currentClock = knowledgeVector?.clock(for: uuid) else {
return true
}
return vector.clock(for: uuid) > currentClock
}

func hasNewerVector(_ vector: OCKRevisionRecord.KnowledgeVector) -> Bool {
guard let currentVector = knowledgeVector else {
return true
}
return vector > currentVector
}
}
43 changes: 30 additions & 13 deletions Sources/ParseCareKit/ParseRemote.swift
Original file line number Diff line number Diff line change
Expand Up @@ -254,19 +254,17 @@ public class ParseRemote: OCKRemoteSynchronizable {
Task {
do {
let parseClock = try await PCKClock.fetchFromRemote(self.uuid, createNewIfNeeded: true)

guard let parseVector = parseClock.knowledgeVector else {
await self.remoteStatus.updateClock(nil)
await self.remoteStatus.notSynchronzing()
Logger.pushRevisions.error("Could not get KnowledgeVector from Clock")
completion(ParseCareKitError.requiredValueCantBeUnwrapped)
return
}
await self.remoteStatus.updateClockIfNeeded(parseClock)

// 6. Ensure there has not been any updates to remote clock before proceeding.
let hasNewerClock = await self.remoteStatus.hasNewerClock(parseVector, for: self.uuid)
let currentClock = await self.remoteStatus.clock
guard !hasNewerClock || currentClock == nil else {
guard await !self.remoteStatus.hasNewerVector(parseVector) else {
let errorString = "New knowledge on server. Attempting to pull and try again"
Logger.pushRevisions.error("\(errorString)")
await self.remoteStatus.notSynchronzing()
Expand Down Expand Up @@ -438,17 +436,22 @@ public class ParseRemote: OCKRemoteSynchronizable {
switch event {
case .created(let updatedClock), .updated(let updatedClock):
Task {
guard let updatedVector = updatedClock.knowledgeVector,
await self.remoteStatus.hasNewerClock(updatedVector, for: self.uuid) else {
guard await !self.remoteStatus.isSynchronizing else {
// If currently syncing need to check in the future
do {
let delay = try await self.remoteStatus.retryLiveQueryAfter()
DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(delay)) {
Task {
await self.requestSyncIfNewerClock(updatedClock)
}
}
} catch {
Logger.clockSubscription.error("\(error)")
await self.remoteStatus.notSynchronzing()
}
return
}
let random = Int.random(in: 0..<2)
DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(random)) {
self.parseDelegate?.didRequestSynchronization(self)
}
Logger
.clockSubscription
.log("Parse subscription is notifying that there are updates on the remote")
await self.requestSyncIfNewerClock(updatedClock)
}
default:
return
Expand All @@ -463,6 +466,20 @@ public class ParseRemote: OCKRemoteSynchronizable {
}
}

func requestSyncIfNewerClock(_ clock: PCKClock?) async {
guard let vector = clock?.knowledgeVector,
await self.remoteStatus.hasNewerClock(vector, for: self.uuid) else {
return
}
let random = Int.random(in: 0..<2)
DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(random)) {
self.parseDelegate?.didRequestSynchronization(self)
}
Logger
.clockSubscription
.log("Parse subscription is notifying that there are updates on the remote")
}

func incrementVectorClock(_ vector: OCKRevisionRecord.KnowledgeVector) -> OCKRevisionRecord.KnowledgeVector {
var mutableVector = vector
mutableVector.increment(clockFor: self.uuid)
Expand Down

0 comments on commit f124e1a

Please sign in to comment.