Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exponential back-off strategy #130

Open
igalic opened this issue Jul 21, 2020 · 2 comments
Open

exponential back-off strategy #130

igalic opened this issue Jul 21, 2020 · 2 comments

Comments

@igalic
Copy link
Contributor

igalic commented Jul 21, 2020

when an Actor, responsible for processing messages for a networked service, receives a message that it cannot process because the remote service is down, i would like to retry the same message in an exponential back-off strategy.

  • what's the bestway to do this?
  • what happensto undelivered messages when the system restarts?
@hardliner66
Copy link
Contributor

Currently riker doesn't handle backpressure in any way. So to get that behaviour, you will have to implement it yourself.

The actor could either respond to the sender, saying it should try again later. In this case different senders can implement different back-off strategies.
Or it could schedule a message to itself (https://riker.rs/scheduling/). This way you can pack the old message and the timeout you used in a separate message and increase the timeout until it either succeeds or a max timeout is reached.

Here is an example of the second variant:

use riker::actors::*;
use std::time::Duration;

const MIN_TIMEOUT: u64 = 10;
const MAX_TIMEOUT: u64 = 2000;

#[derive(Clone, Debug)]
pub struct MyMessage {
    should_fail: bool,
}

#[derive(Clone, Debug)]
pub struct Retry {
    original_msg: MyMessage,
    last_timeout: u64,
}

#[actor(MyMessage, Retry)]
#[derive(Default, Debug)]
struct ActorBackOffTest;

impl Actor for ActorBackOffTest {
    type Msg = ActorBackOffTestMsg;

    fn supervisor_strategy(&self) -> Strategy {
        Strategy::Stop
    }

    fn recv(
        &mut self,
        ctx: &Context<ActorBackOffTestMsg>,
        msg: ActorBackOffTestMsg,
        sender: Sender,
    ) {
        self.receive(ctx, msg, sender);
    }
}

impl Receive<MyMessage> for ActorBackOffTest {
    type Msg = ActorBackOffTestMsg;

    fn receive(&mut self, ctx: &Context<Self::Msg>, msg: MyMessage, sender: Sender) {
        if msg.should_fail {
            println!("Failed first time, retrying in {} ms!", MIN_TIMEOUT);
            ctx.system.schedule_once(
                Duration::from_millis(MIN_TIMEOUT),
                ctx.myself(),
                sender,
                Retry {
                    original_msg: msg,
                    last_timeout: MIN_TIMEOUT,
                },
            );
        } else {
            println!("Success! Stopping system.");
            ctx.stop(&ctx.myself);
        }
    }
}

impl Receive<Retry> for ActorBackOffTest {
    type Msg = ActorBackOffTestMsg;

    fn receive(&mut self, ctx: &Context<Self::Msg>, msg: Retry, sender: Sender) {
        if msg.original_msg.should_fail && msg.last_timeout < MAX_TIMEOUT {
            let timeout = msg.last_timeout * 2;
            println!("Failed again, retrying in {} ms!", timeout);

            ctx.system.schedule_once(
                Duration::from_millis(timeout),
                ctx.myself(),
                sender,
                Retry {
                    original_msg: msg.original_msg,
                    last_timeout: timeout,
                },
            );
        } else {
            println!("Success! Stopping system.");
            ctx.stop(&ctx.myself);
        }
    }
}

fn main() {
    riker_bench::logging::init();

    let sys = ActorSystem::new().unwrap();

    let act = sys.actor_of::<ActorBackOffTest>("act").unwrap();

    act.tell(MyMessage { should_fail: true }, None);

    while sys.user_root().has_children() {
        std::thread::sleep(std::time::Duration::from_millis(50));
    }
}

@leenozara
Copy link
Contributor

If you're referring to sending messages outside of the actor system, for example sending webhooks to customers' server URL then you'd need something like this:

Create one actor that is responsible for dispatching and a group of actors responsible for the work of making the network IO. You also need persistent storage to record the state of the content.

Your dispatcher is responsible for taking messages from persistent storage and distributing to the workers. The dispatcher can react by:

  • A notification message that there is one or more message waiting to be sent
  • A scheduled message sent to itself every x interval
  • Handle failed messages and update record in storage with the current attempt number

When you want to send your network message, you can add it to storage and send a message to your dispatcher that it needs to dispatch to a working. Dispatcher will pull from storage and send to a worker. If the worker fails due to network IO it can message the dispatcher so that it can update storage. The message will be reattempted as per your backoff strategy when the dispatcher receives its own scheduled message to pull the set of messages that need to be dispatched.

If your system restarts there is no impact since your messages were stored in the DB.

This is a very basic design and for production I would separate responsibility into multiple actors so that your dispatcher is handling only the distribution of work to the workers on the scheduled message. A separate actor can be responsible for updating the database. You might also want to only write to the DB if a message fails, i.e. first attempt is in-memory only.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants