-
Notifications
You must be signed in to change notification settings - Fork 86
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
attempting to create a gRPC service #116
Comments
Hi, The general direction seems right. However, an unbounded channel doesn't wait for the signal to be processed and once you receive Your best bet here is probably to make things sync on the side where you interact with windows-service because that thing is designed to be blocking. That can be achieved by making a oneshot channel and passing one end of it along with the signal and then wait on the other end of it before returning from your event handler (see example below). This way you can ensure that your signal handler receives and handles the signal before the lights go off. use tokio::sync::oneshot;
let event_handler = move |control| match control {
ServiceControl::Interrogate => ServiceControlHandlerResult::NoError,
ServiceControl::Stop => {
tracing::debug!("received stop request");
let (completion_tx, completion_rx) = oneshot::channel();
signal_sender.send(Signal::Stop(completion_tx)).unwrap();
// Make sure to wait for Stop to be processed by the receiver side before returning!
rt.block_on(async move {
let _ = completion_rx.await;
println!("Ok now the Stop signal is handled and we can quit!")
});
ServiceControlHandlerResult::NoError
}
ServiceControl::Shutdown => {
// [redacted]
ServiceControlHandlerResult::NoError
}
_ => ServiceControlHandlerResult::NotImplemented,
}; For simplicity you could probably wrap the entire event handler into let event_handler = move |control| {
rt.block_on(handle_control_event(control))
}
async fn handle_control_event(ServiceControl) -> ServiceControlHandlerResult {
// do the thing
} |
hmm there's a catch- |
Event handler is defined as I'd have to boot to windows to verify the assumption, but I am pretty sure you should be able to create a oneshot channel inside of event handler and then pass it along with signal event and wait for reply before returning the Where does it yell at you, can you share the compiler error? |
i'm creating it outside the closure, so that could be the issue |
@danieleades Please have a look at b7daad514846dcc36eb9301d0c26f6dfe409e138 I hope it's clear how it works. The key part is to clone the sender inside of use tokio::runtime;
use tokio::sync::mpsc;
// Create a channel to be able to poll a stop event from the service worker loop.
let (event_tx, event_rx) = mpsc::unbounded_channel();
// Take current runtime handle.
let rt_handle = runtime::Handle::current();
// Define system service event handler that will be receiving service events.
let event_handler = move |control_event| -> ServiceControlHandlerResult {
rt_handle.block_on(handle_control_event(event_tx.clone(), control_event))
}; Then the control code handling routine is trivial, you simply send the event (or signal?) over the channel, let the other side handle it and then return from the function: async fn handle_control_event(
event_tx: mpsc::UnboundedSender<Event>,
control_event: ServiceControl,
) -> ServiceControlHandlerResult {
match control_event {
ServiceControl::Interrogate => ServiceControlHandlerResult::NoError,
ServiceControl::Stop => {
// Create a oneshot channel to get notified when the event is processed.
let (completion_tx, completion_rx) = oneshot::channel();
// Send the event.
_ = event_tx.send(Event::Shutdown { completion_tx });
// Wait for event to be processed.
completion_rx.await;
ServiceControlHandlerResult::NoError
}
_ => ServiceControlHandlerResult::NotImplemented,
}
}
enum Event {
// Shutdown event with completion_tx that is used to notify the caller once the event is processed.
Shutdown { completion_tx: oneshot::Sender<()> },
} Last but not least, you can use the following little program to test that ping_service works. When running ping_service sends a "ping" utf8 message over the local udp port 1234. In the past we used netcat but it seems to be broken on Windows these days. use std::net::UdpSocket;
use std::sync::{Arc, Mutex};
use std::str;
fn main() -> std::io::Result<()> {
let socket = UdpSocket::bind(format!("{}:{}", "0.0.0.0", "1234"))?;
let mut buf = [0; 10];
loop {
let (num_bytes, src_addr) = socket.recv_from(&mut buf)?;
let msg = &mut buf[..num_bytes];
if let Ok(utf8_str) = str::from_utf8(&msg) {
print!("{}", utf8_str);
}
}
Ok(())
} |
Very much obliged. I should get some time to take a look over the weekend |
had a quick look just now- it should be enough to get me unstuck, will know more once i play around with it. I'm not too clear on the service lifecycle yet- does it matter that the service control handling loop blocks? // Wait for event to be processed.
completion_rx.await; in the general case this might take a "long" time. also, i'm not yet clear on how the service control commands map to the service commands exposed in the Windows Services app (start, stop, pause, resume, restart). not that it's your job to walk me through all of this of course. I'll just keep tinkering away. |
if my memory doesn’t fail me, I think that it doesn’t have to block and if you have a lengthy start or stop you can return in general you need a way to talk from sync to async code so with tokio you do that with If handling of service control command takes a long time, you may consider spawning another sub-task and move a long-running work there and then return as soon as possible to unblock the event handler. See the docs from msdn:
|
in terms of the lifecycle, the service provides (in the Services UI) the control commands are (from https://learn.microsoft.com/en-us/windows/win32/services/service-control-requests):
but the control commands exposed by this library (from https://docs.rs/windows-service/latest/windows_service/service/enum.ServiceControl.html) are:
i'm just having a bit of trouble reconciling these:
And finally, assuming I get good answers to all these questions, would you consider a PR to add some documentation to the |
It probably stops and starts the service again. I don't believe there is a dedicated call to restart. In a single service process the call to stop leads to service process exiting as the service dispatcher unblocks the main thread (from I also suggest you to familiarize yourself with However in a shared process binary (
Shutdown is typically called on OS shutdown, there is also pre-shutdown which is called a bit earlier in the shutdown sequence. This is all well documented in https://learn.microsoft.com/en-us/windows/win32/services/service-control-handler-function
Pause shouldn't lead to process termination, instead your process should simply put whatever it's doing on hold, resume is in reverse. I think it's described in https://learn.microsoft.com/en-us/windows/win32/services/service-status-transitions When you set the service status ( For the rest of questions, I'd suggest you to take the associated raw type of We could improve docs for |
i'm revisiting this now, but still struggling to make progress. updated reproduction - https://github.com/danieleades/grpc-service it appears to run fine, and stays running continuously. the problem occurs when i 'stop' the service from the windows Services app my control handler looks like this: let event_handler = move |control: ServiceControl| {
tracing::debug!("received a control command: {:?}", &control);
match control {
ServiceControl::Interrogate => ServiceControlHandlerResult::NoError,
ServiceControl::Stop => {
tracing::debug!("received stop request");
// send shutdown command to gRPC server
tracing::debug!("sending gRPC shutdown command");
rt.block_on(grpc_shutdown_tx.send());
tracing::debug!("gRPC shutdown command sent");
ServiceControlHandlerResult::NoError
}
_ => ServiceControlHandlerResult::NotImplemented,
}
};
let status_handle = service_control_handler::register(SERVICE_NAME, event_handler)?; i get the following logs:
and no further logging. So it seems that the edit: I'm definitely missing a call to set the service status to stopped before exiting- but i still don't understand why the control handler appears to never fire |
I have looked at your repo and I think there are few issues:
I have spent a bit of time redoing your work and ended up with something like that: pronebird/grpc-service@5e0201e Which then terminates properly from what I can tell: |
Personally, I'd wrap the event loop into channels and then pass events back and forth instead of just a single shutdown event. It's a bit more future proof as you'll probably want to support some more control events and not shoot yourself in the foot with the differences in how event handler and your async code work. // fn run_service()
let (event_tx, mut event_rx) = mpsc::unbounded_channel();
let event_handler = move |control: ServiceControl| {
tracing::debug!("Received a control command: {:?}", control);
let (completion_tx, completion_rx) = oneshot::channel();
match event_tx.send(ServiceEvent::new(control, completion_tx)) {
Ok(()) => completion_rx
.blocking_recv()
.inspect_err(|e| {
tracing::error!("Couldn't receive a completion reply: {}", e);
})
.unwrap_or(ServiceControlHandlerResult::Other(127)),
Err(e) => {
tracing::error!("Couldn't send the event: {}", e);
ServiceControlHandlerResult::Other(128)
}
}
};
#[derive(Debug)]
struct ServiceEvent {
service_control: ServiceControl,
completion_tx: oneshot::Sender<ServiceControlHandlerResult>,
}
// [impl ServiceEvent omitted for clarity] and then the receiver side could block rt.block_on(async {
let shutdown_token = CancellationToken::new();
let shutdown_fut = shutdown_token.clone().cancelled_owned();
let address = SocketAddr::from_str("[::1]:10000").unwrap();
let mut service_join_handle = Some(tokio::spawn(service::run(address, shutdown_fut)));
while let Some(service_event) = event_rx.recv().await {
let service_control_result: ServiceControlHandlerResult =
match service_event.service_control() {
ServiceControl::Interrogate => ServiceControlHandlerResult::NoError,
ServiceControl::Stop => {
shutdown_token.cancel();
if let Some(service_join_handle) = service_join_handle.take() {
if let Err(e) = service_join_handle.await {
tracing::error!("Couldn't join on service: {}", e);
}
}
ServiceControlHandlerResult::NoError
}
_ => ServiceControlHandlerResult::NotImplemented,
};
service_event.complete(service_control_result);
}
}); Complete gist: pronebird/grpc-service@5e78208 Also note that you can move the tokio runtime creation into |
that's embarrassing.
agreed- that was just a quick and dirty (and temporary) hack on my part to save messing about with lifetimes
thank you so much for taking the time |
i'm attempting to use this library to make a long-running gRPC service. I'm really struggling to get this working.
I've got a minimal reproduction here - https://github.com/danieleades/grpc-service
i'm struggling to get the control handler to fire. The service starts ok, but judging from the logging I never receive a control signal to stop. If i call 'stop' from the windows Service app i get an error telling me that the service exited unexpectedly.
There are no asynchronous examples in this repo, possibly i've messed something up there.
Would really appreciate any assistance to get this working
The text was updated successfully, but these errors were encountered: