Skip to content

Commit

Permalink
Support task retry (#81)
Browse files Browse the repository at this point in the history
* remove rollup storage

* remove cache

* remove recover check

* remote cloud storage r/w

* add comments

* check tx with off-chin indexer

* fix clippy

* simply graph setup

* delete unused code

* remove graph

* update

* 1) simplify executor setup
  - remove rollup setup
  - remove graph setup
  - introduce remote storage setup
2) update console.js
3) update play document

* update

* fix storage writing

* reduce storage reading

* fix

* comment gas estimation in uniswap executor

* update console.js

* bring back gas estimation

* task retry support

* ensure executor running when retry

* fix task destroy

* fix
  • Loading branch information
tolak authored Aug 30, 2023
1 parent 83cb395 commit fbab12e
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 66 deletions.
40 changes: 40 additions & 0 deletions contracts/index_executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ mod index_executor {
FailedToInitTask,
FailedToDestoryTask,
FailedToUploadTask,
FailedToReApplyNonce,
FailedToReRunTask,
TaskNotFoundInStorage,
UnexpectedChainType,
ExecutorPaused,
Expand Down Expand Up @@ -258,6 +260,44 @@ mod index_executor {
Ok(())
}

/// Re-apply nonce for task steps from current execution index, then re-run the failed step
///
/// This is used to retry task when it failed to execute in current step indicated by `execute_index`
#[ink(message)]
pub fn retry(&self, id: TaskId) -> Result<()> {
self.ensure_running()?;
let config = self.ensure_configured()?;
let client = StorageClient::new(config.db_url.clone(), config.db_token.clone());
let (mut task, task_doc) = client
.read::<Task>(&id)
.map_err(|_| Error::FailedToReadStorage)?
.ok_or(Error::TaskNotFoundInStorage)?;

if let TaskStatus::Executing(execute_index, _) = task.status {
let context = Context {
signer: self.pub_to_prv(task.worker).unwrap(),
worker_accounts: self.worker_accounts.clone(),
registry: &self.registry,
};
task.reapply_nonce(execute_index as u64, &context, &client)
.map_err(|_| Error::FailedToReApplyNonce)?;
pink_extension::info!(
"Step nonce re-applied from execution index: {:?}",
&execute_index
);
// Now re-run the step
let _ = task
.execute_step(&context, &client)
.map_err(|_| Error::FailedToReRunTask)?;
// Upload task data to storage
client
.update(task.id.as_ref(), &task.encode(), task_doc)
.map_err(|_| Error::FailedToUploadTask)?;
}

Ok(())
}

/// Return config information
#[ink(message)]
pub fn get_config(&self) -> Result<Option<Config>> {
Expand Down
7 changes: 5 additions & 2 deletions contracts/index_executor/src/steps/claimer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,11 @@ impl Runner for ClaimStep {
.get_chain(self.chain.clone())
.ok_or("MissingChain")?;

// TODO: Check if the transaction is successed or not
Ok(true)
let account = match chain.chain_type {
ChainType::Evm => worker_account.account20.to_vec(),
ChainType::Sub => worker_account.account32.to_vec(),
};
tx::check_tx(&chain.tx_indexer_url, &account, nonce)
}
}

Expand Down
1 change: 1 addition & 0 deletions contracts/index_executor/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ mod tests {
execute_index: 0,
sender: vec![],
recipient: vec![],
retry_counter: 0,
};

assert_eq!(client.read::<Task>(&task.id).unwrap(), None);
Expand Down
144 changes: 89 additions & 55 deletions contracts/index_executor/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ pub struct Task {
pub sender: Vec<u8>,
/// Recipient address on dest chain
pub recipient: Vec<u8>,
// Retry counter, retry counter will be cleared after one step executed successfully
pub retry_counter: u8,
}

impl Default for Task {
Expand All @@ -57,6 +59,7 @@ impl Default for Task {
execute_index: 0,
sender: vec![],
recipient: vec![],
retry_counter: 0,
}
}
}
Expand Down Expand Up @@ -98,7 +101,7 @@ impl Task {
}

// Apply worker nonce for each step in task
self.apply_nonce(context, client)?;
self.apply_nonce(0, context, client)?;
// Apply recipient for each step in task
self.apply_recipient(context)?;
// TODO: query initial balance of worker account and setup to specific step
Expand Down Expand Up @@ -132,71 +135,91 @@ impl Task {
return Ok(TaskStatus::Completed);
}

// If step already executed successfully, execute next step
if self.steps[self.execute_index as usize].check(
match self.steps[self.execute_index as usize].check(
// An executing task must have nonce applied
self.steps[self.execute_index as usize].nonce.unwrap(),
context,
) == Ok(true)
// FIXME: handle returned error
{
self.execute_index += 1;
// If all step executed successfully, set task as `Completed`
if self.execute_index as usize == self.steps.len() {
self.status = TaskStatus::Completed;
return Ok(self.status.clone());
}

// Settle before execute next step
let settle_balance = self.settle(context)?;
pink_extension::debug!(
"Settle balance of last step[{:?}], settle amount: {:?}",
(self.execute_index - 1),
settle_balance,
);
// Update balance that actually can be consumed
self.update_balance(settle_balance, context)?;
pink_extension::debug!("Finished previous step execution");
) {
// If step already executed successfully, execute next step
Ok(true) => {
self.execute_index += 1;
self.retry_counter = 0;
// If all step executed successfully, set task as `Completed`
if self.execute_index as usize == self.steps.len() {
self.status = TaskStatus::Completed;
return Ok(self.status.clone());
}

// An executing task must have nonce applied
let nonce = self.steps[self.execute_index as usize].nonce.unwrap();
// FIXME: handle returned error
if self.steps[self.execute_index as usize].runnable(nonce, context, Some(client))
== Ok(true)
{
// Settle before execute next step
let settle_balance = self.settle(context)?;
pink_extension::debug!(
"Trying to run step[{:?}] with nonce {:?}",
self.execute_index,
nonce
"Settle balance of last step[{:?}], settle amount: {:?}",
(self.execute_index - 1),
settle_balance,
);
self.steps[self.execute_index as usize].run(nonce, context)?;
self.status = TaskStatus::Executing(self.execute_index, Some(nonce));
} else {
pink_extension::debug!("Step[{:?}] not runnable, return", self.execute_index);
}
} else {
let nonce = self.steps[self.execute_index as usize].nonce.unwrap();
// Claim step should be considered separately
if let StepMeta::Claim(claim_step) = &mut self.steps[self.execute_index as usize].meta {
let worker_account = AccountInfo::from(context.signer);
let latest_balance = worker_account.get_balance(
claim_step.chain.clone(),
claim_step.asset.clone(),
context,
)?;
claim_step.b0 = Some(latest_balance);
// Update balance that actually can be consumed
self.update_balance(settle_balance, context)?;
pink_extension::debug!("Finished previous step execution");

// FIXME: handle returned error
if self.steps[self.execute_index as usize].runnable(nonce, context, Some(client))
== Ok(true)
let _ = self.execute_step(context, client)?;
}
// There are several situations that indexer return `false`:
// - Step hasn't been executed yet
// - Step failed to execute
// - Step has been executed, but off-chain indexer hasn't caught up
Ok(false) => {
// Claim step should be considered separately
if let StepMeta::Claim(claim_step) =
&mut self.steps[self.execute_index as usize].meta
{
pink_extension::debug!("Trying to claim task with nonce {:?}", nonce);
self.steps[self.execute_index as usize].run(nonce, context)?;
self.status = TaskStatus::Executing(self.execute_index, Some(nonce));
let worker_account = AccountInfo::from(context.signer);
let latest_balance = worker_account.get_balance(
claim_step.chain.clone(),
claim_step.asset.clone(),
context,
)?;
claim_step.b0 = Some(latest_balance);
}
// Since we don't actually understand what happened, retry is the only choice.
// To avoid we retry too many times, we involved `retry_counter`
self.retry_counter += 1;
if self.retry_counter < 5 {
// FIXME: handle returned error
let _ = self.execute_step(context, client)?;
} else {
pink_extension::debug!("Claim step not runnable, return");
return Err("TooManyRetry");
}
}
Err(e) => return Err(e),
}

Ok(self.status.clone())
}

/// Check and execute a single step. Only can be executed when the step is ready to run.
///
/// Note this method assume that the last step has been settled, e.g. finished
pub fn execute_step(
&mut self,
context: &Context,
client: &StorageClient,
) -> Result<TaskStatus, &'static str> {
// An executing task must have nonce applied
let nonce = self.steps[self.execute_index as usize].nonce.unwrap();

if self.steps[self.execute_index as usize].runnable(nonce, context, Some(client))
== Ok(true)
{
pink_extension::debug!(
"Trying to run step[{:?}] with nonce {:?}",
self.execute_index,
nonce
);
self.steps[self.execute_index as usize].run(nonce, context)?;
self.status = TaskStatus::Executing(self.execute_index, Some(nonce));
} else {
pink_extension::debug!("Step[{:?}] not runnable, return", self.execute_index);
}
Ok(self.status.clone())
}
Expand Down Expand Up @@ -234,15 +257,26 @@ impl Task {
Ok(())
}

pub fn reapply_nonce(
&mut self,
start_index: u64,
context: &Context,
client: &StorageClient,
) -> Result<(), &'static str> {
self.apply_nonce(start_index, context, client)
}

fn apply_nonce(
&mut self,
start_index: u64,
context: &Context,
_client: &StorageClient,
) -> Result<(), &'static str> {
// Only in last step the recipient with be set as real recipient on destchain,
// or else it will be the worker account under the hood
let mut nonce_map: Mapping<String, u64> = Mapping::default();
for step in self.steps.iter_mut() {
for index in start_index as usize..self.steps.len() {
let step = &mut self.steps[index];
let nonce = match nonce_map.get(&step.chain) {
Some(nonce) => nonce,
None => {
Expand Down
6 changes: 3 additions & 3 deletions doc/play-with-index.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ node src/console.js --config src/config.poc5.json executor setup --resume

with `--resume` will unpause the executor (executor is paused by default after deployed). You will get the output like below:
```sh
✅ Config done
Setup worker on remote storage done
✅ Resume executor done
✅ Config executor
Config storage
✅ Resume executor
🎉 Finished executor configuration!
```

Expand Down
12 changes: 6 additions & 6 deletions scripts/src/console.js
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ executor

{
// costs estimation
let { gasRequired, storageDeposit } = await executor.query.config(cert, {},
let { gasRequired, storageDeposit } = await executor.query.configEngine(cert, {},
storageUrl,
storageKey,
config.key_store_contract_id,
Expand All @@ -187,19 +187,19 @@ executor
gasLimit: gasRequired.refTime,
storageDepositLimit: storageDeposit.isCharge ? storageDeposit.asCharge : null,
};
await executor.tx.config(options,
await executor.tx.configEngine(options,
storageUrl,
storageKey,
config.key_store_contract_id,
opt.resume
).signAndSend(pair, { nonce: -1 });
console.log(`✅ Config done`)
console.log(`✅ Config executor`)
}

await delay(10*1000); // 10 seconds
{
await executor.query.setupWorkerOnStorage(cert, {});
console.log(`✅ Setup worker on remote storage done`)
await executor.query.configStorage(cert, {});
console.log(`✅ Config storage`)
}

if (opt.resume !== false) {
Expand All @@ -211,7 +211,7 @@ executor
storageDepositLimit: storageDeposit.isCharge ? storageDeposit.asCharge : null,
};
await executor.tx.resumeExecutor(options).signAndSend(pair, { nonce: -1 });
console.log(`✅ Resume executor done`);
console.log(`✅ Resume executor`);
}
console.log(`🎉 Finished executor configuration!`);
}));
Expand Down

0 comments on commit fbab12e

Please sign in to comment.