Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lock on windows platform using multiple workers #74

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

im-cgtribe
Copy link

granian use shared socket.

hyper make it nonblocking socket.

internals of WS2_32 accept in pseudo code:

def accept(socket):
[...]
  EnterCriticalSection(socket.cs)
  if socket.non_blocking:
    select(...)
    if not FD_ISSET(...):
      return error(WOULDBLOCK)

  blocking_accept()	// here will be reset of socket signalling that accept is available
[...]

"A critical section object provides synchronization similar to that provided by a mutex object, except that a critical section can be used only by the threads of a single process. "
https://learn.microsoft.com/en-us/windows/win32/sync/critical-section-objects

2 processes waiting to accept for connections
3 request came simultaneously

First two requests were accepted without problems, and start to process them simultaneously.
As we are async, as soon as it possible we are going to accept the rest of requests.

And now, two workers calling accept, both came across select and found that there is accept possible.
But when both of them going to actual accept, only one will obtain the connection, the second one will stuck in blocking manner until new connection received.

As result, event_loop blocked. Already accepted request will not be answered until event_loop unblocked.

callstack:

   7  Id: 3104.6bb4 Suspend: 1 Teb: 000000df`e6447000 Unfrozen
 # Child-SP          RetAddr           Call Site
00 000000df`e75ae088 00007ffa`e91d80fc ntdll!NtWaitForSingleObject+0x14
01 000000df`e75ae090 00007ffa`e91e225b mswsock!SockWaitForSingleObject+0x10c
02 000000df`e75ae130 00007ffa`eb531453 mswsock!WSPAccept+0x1116b
03 000000df`e75ae540 00007ffa`eb531372 WS2_32!WSAAccept+0xd3
04 000000df`e75ae5c0 00007ffa`9aa4d66d WS2_32!accept+0x12
05 (Inline Function) --------`-------- _granian_cp310_win_amd64!std::sys::windows::net::Socket::accept+0xf
06 (Inline Function) --------`-------- _granian_cp310_win_amd64!std::sys_common::net::TcpListener::accept+0x59
07 (Inline Function) --------`-------- _granian_cp310_win_amd64!std::net::tcp::TcpListener::accept+0x59
08 (Inline Function) --------`-------- _granian_cp310_win_amd64!mio::sys::windows::tcp::accept+0x59
09 (Inline Function) --------`-------- _granian_cp310_win_amd64!mio::net::tcp::listener::impl$0::accept::closure$0+0x59
0a (Inline Function) --------`-------- _granian_cp310_win_amd64!mio::sys::windows::IoSourceState::do_io+0x59
0b (Inline Function) --------`-------- _granian_cp310_win_amd64!mio::io_source::IoSource<std::net::tcp::TcpListener>::do_io+0x59
0c (Inline Function) --------`-------- _granian_cp310_win_amd64!mio::net::tcp::listener::TcpListener::accept+0x59
0d (Inline Function) --------`-------- _granian_cp310_win_amd64!tokio::net::tcp::listener::TcpListener::poll_accept+0xad
0e (Inline Function) --------`-------- _granian_cp310_win_amd64!hyper::server::tcp::AddrIncoming::poll_next_+0x182
0f 000000df`e75ae600 00007ffa`9a90fddd _granian_cp310_win_amd64!hyper::server::tcp::impl$2::poll_accept+0x1dd
10 (Inline Function) --------`-------- _granian_cp310_win_amd64!tokio::task::local::impl$8::poll::closure$0+0x75f
11 (Inline Function) --------`-------- _granian_cp310_win_amd64!tokio::task::local::impl$2::with::closure$0+0x797
12 (Inline Function) --------`-------- _granian_cp310_win_amd64!std::thread::local::LocalKey<tokio::task::local::LocalData>::try_with+0x7eb
13 (Inline Function) --------`-------- _granian_cp310_win_amd64!std::thread::local::LocalKey<tokio::task::local::LocalData>::with+0x7eb
14 (Inline Function) --------`-------- _granian_cp310_win_amd64!tokio::task::local::LocalSet::with+0x7eb
15 (Inline Function) --------`-------- _granian_cp310_win_amd64!tokio::task::local::impl$8::poll+0x817
16 000000df`e75aeb00 00007ffa`9a8d79ad _granian_cp310_win_amd64!tokio::task::local::impl$2::run_until::async_fn$0<enum2$<_granian::wsgi::serve::impl$0::_serve_wth::closure$0::async_block_env$0> >+0x85d
17 (Inline Function) --------`-------- _granian_cp310_win_amd64!tokio::runtime::scheduler::current_thread::CoreGuard::enter+0x157
18 (Inline Function) --------`-------- _granian_cp310_win_amd64!tokio::runtime::scheduler::current_thread::CoreGuard::block_on+0x17f
19 (Inline Function) --------`-------- _granian_cp310_win_amd64!tokio::runtime::scheduler::current_thread::CurrentThread::block_on+0x76d
1a (Inline Function) --------`-------- _granian_cp310_win_amd64!tokio::runtime::runtime::Runtime::block_on+0xb75
1b (Inline Function) --------`-------- _granian_cp310_win_amd64!tokio::task::local::LocalSet::block_on+0xb75
1c (Inline Function) --------`-------- _granian_cp310_win_amd64!_granian::runtime::block_on_local+0xb75
1d (Inline Function) --------`-------- _granian_cp310_win_amd64!_granian::wsgi::serve::impl$0::_serve_wth::closure$0+0xc6b
1e 000000df`e75af060 00007ffa`9a8e4bfd _granian_cp310_win_amd64!std::sys_common::backtrace::__rust_begin_short_backtrace<_granian::wsgi::serve::impl$0::_serve_wth::closure_env$0,tuple$<> >+0xccd
1f (Inline Function) --------`-------- _granian_cp310_win_amd64!std::thread::impl$0::spawn_unchecked_::closure$1::closure$0+0x31
20 (Inline Function) --------`-------- _granian_cp310_win_amd64!core::panic::unwind_safe::impl$23::call_once+0x31
21 (Inline Function) --------`-------- _granian_cp310_win_amd64!std::panicking::try::do_call+0x31
22 (Inline Function) --------`-------- _granian_cp310_win_amd64!std::panicking::try+0x31
23 (Inline Function) --------`-------- _granian_cp310_win_amd64!std::panic::catch_unwind+0x31
24 (Inline Function) --------`-------- _granian_cp310_win_amd64!std::thread::impl$0::spawn_unchecked_::closure$1+0xe0
25 000000df`e75afbe0 00007ffa`9aac75dc _granian_cp310_win_amd64!core::ops::function::FnOnce::call_once<std::thread::impl$0::spawn_unchecked_::closure_env$1<_granian::wsgi::serve::impl$0::_serve_wth::closure_env$0,tuple$<> >,tuple$<> >+0xfd
26 (Inline Function) --------`-------- _granian_cp310_win_amd64!alloc::boxed::impl$45::call_once+0xb
27 (Inline Function) --------`-------- _granian_cp310_win_amd64!alloc::boxed::impl$45::call_once+0x16
28 000000df`e75afce0 00007ffa`eb2e7614 _granian_cp310_win_amd64!std::sys::windows::thread::impl$0::new::thread_start+0x4c
29 000000df`e75afd70 00007ffa`ec9026a1 KERNEL32!BaseThreadInitThunk+0x14
2a 000000df`e75afda0 00000000`00000000 ntdll!RtlUserThreadStart+0x21

I'm not familiar with rust and tokio, so I'm still seeking for last direct evidence where is request task in tokio queues.
But indirect confirmation of this hypothesis exists:

  1. suspend not locked worker process (so it will not accept connections)
  2. make new request, so locked worker will return from blocked accept
  3. answer for locked request will be received, so it were managed by locked worker process

@gi0baro
Copy link
Member

gi0baro commented Apr 11, 2023

Hello, I'm not sure I got the scope of this.
Can you provide more context about the issue? Considering the description you provided I don't get the purpose of the added test.

@izmmisha
Copy link

Hello,
Ok, I finally found direct evidence that there is task queued

0:007> .frame 1c
1c (Inline Function) --------`-------- _granian_cp310_win_amd64!_granian::runtime::block_on_local+0xb75 [C:\repository\git\granian\src\runtime.rs @ 301] 

0:007> dx local.context.shared.local_state.owned.inner.__0.list
local.context.shared.local_state.owned.inner.__0.list                 [Type: tokio::util::linked_list::LinkedList<tokio::runtime::task::Task<alloc::sync::Arc<tokio::task::local::Shared> >,tokio::runtime::task::core::Header>]
    [+0x000] head             : Some [Type: enum2$<core::option::Option<core::ptr::non_null::NonNull<tokio::runtime::task::core::Header> > >]
    [+0x008] tail             : Some [Type: enum2$<core::option::Option<core::ptr::non_null::NonNull<tokio::runtime::task::core::Header> > >]
    [+0x000] _marker          [Type: core::marker::PhantomData<ptr_const$<tokio::runtime::task::Task<alloc::sync::Arc<tokio::task::local::Shared> > > >]

This list contains task to work on, but event_loop blocked in accept.

Can you provide more context about the issue?

The issue is that granian fail to respond on 3 simultaneous requests, when more than one worker used (on windows platform).

Considering the description you provided I don't get the purpose of the added test.

This test case is reproducing the problem and failing.

In addition to description here is sequence diagram:
image

@gi0baro
Copy link
Member

gi0baro commented Apr 14, 2023

Ok, thank you for the detailed explanation.

So the actual theme here is either to:

  • find a way to fix this
  • prevent multiple processes on windows

I need to dig quite a bit into socket's implementation in windows, which is not exactly my cup of tea.
If someone has any valid point/solution to add to the discussion feel free to do so.

@izmmisha
Copy link

izmmisha commented Apr 14, 2023

If you need any assistance in socket's implementation in windows, I can try to help you.
Do you need any simple code to reproduce windows shared non-blocking socket problem? I can write it on python.
But this is not the point, we will not be able to fix it.

Before starting to fix something, first we need to figure out what we need to fix:
Problem is that accept call on non-blocking socket became blocking.
This is not expected by tokio, so it is not the tokio bug, neither the hyper bug, neither the granian bug.

accept call on non-blocking socket became blocking because there is data race.

  1. avoid data race, to prevent accept became blocking.
    accept calls can be wrapped into interprocess synchronisation primitives.

runtime blocked in accept, make it work again
2) hack tokio to keep working if accept can be blocked.
in this case we probably can move work for accepting connection in dedicated runtime, which will unblock on new connections.

@gi0baro
Copy link
Member

gi0baro commented Apr 14, 2023

Before starting to fix something, first we need to figure out what we need to fix: Problem is that accept call on non-blocking socket became blocking. This is not expected by tokio, so it is not the tokio bug, neither the hyper bug, neither the granian bug.

accept call on non-blocking socket became blocking because there is data race.

Understood, but I still don't get why on Windows is there a data race. The socket created in the main process is set to be inheritable, thus the sub-processes (workers that actually accepts connections) should not have any data races as the file-descriptor is shared across all of them. Also, the rust code which set the socket to non blocking should fail if that call cannot succeed.

@izmmisha
Copy link

The socket created in the main process is set to be inheritable, thus the sub-processes (workers that actually accepts connections) should not have any data races as the file-descriptor is shared across all of them. Also, the rust code which set the socket to non blocking should fail if that call cannot succeed.

Correct.

but I still don't get why on Windows is there a data race.

data race is inside windows socket implementation.

exactly it is inside mswsock!WSPAccept function.
accept

The data race is in blocks select, FD_ISSET and NtDeviceIoControlFile.
select will say that there is accept available until NtDeviceIoControlFile will be called.

So two concurrent calls of select will say you can go and accept for connection.
But as soon as NtDeviceIoControlFile called, the next NtDeviceIoControlFile will say that you should wait for connection (if there was only one connection in queue).

Critical section is making this block of code data race free, but it works only process wide.

We have two processes, and they can't serialize these sequences of calls, as result we have data race.

Here is an example to reproduce this problem without granian, hyper, tokio.

import asyncio
import socket
from contextlib import contextmanager
from multiprocessing import Process
from concurrent.futures import ThreadPoolExecutor
from select import select
import httpx

HOST = 'localhost'

@contextmanager
def process_spawner():
    class Spawn():
        def __init__(self):
            self.procs = []

        def spawn(self, func, *args):
            proc = Process(target=func, args=args)
            proc.start()
            self.procs.append(proc)

        def stop(self):
            for proc in self.procs:
                proc.terminate()

            for proc in self.procs:
                proc.join()

    spawn = Spawn()
    try:
        yield spawn
    finally:
        spawn.stop()

def process_req(conn, _):
    conn.sendall(b'HTTP/1.1 200 OK\n'
                 b'content-type: text/plain; charset=utf-8\n'
                 b'content-length: 0\n'
                 b'\n\n')

def acceptor_blocking(sock, worker_id):
    sock.setblocking(True)

    def do_response(conn):
        with conn:
            data = conn.recv(1024)
            print(worker_id, 'process')
            process_req(conn, data)

    with ThreadPoolExecutor(max_workers=1) as executor:
        while True:
            conn, addr = sock.accept()
            print(worker_id, 'Connected by', addr)

            executor.submit(do_response, conn)

def acceptor_nonblocking(sock, worker_id):
    sock.setblocking(False)

    conns = [sock]
    while True:
        r_set, _, _ = select(conns, [], [])
        for conn in r_set:
            if sock == conn:
                # we still need try...except because other
                # processes will steal our connections
                try:
                    conn, addr = sock.accept()
                    print(worker_id, 'Connected by', addr)
                    conns.append(conn)
                except BlockingIOError:
                    pass
            else:
                with conn:
                    data = conn.recv(1024)
                    print(worker_id, 'process')
                    process_req(conn, data)
                    conns.remove(conn)


def test(port):
    async def make_req():
        async with httpx.AsyncClient() as client:
            return await client.get(f"http://{HOST}:{port}/")

    async def do_requests():
        return await asyncio.gather(*[make_req() for _ in range(3)], return_exceptions=True)

    requests = asyncio.run(do_requests())

    return tuple(map(lambda req: not isinstance(req, Exception)
                     and req.status_code == 200, requests))

def main():
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
        sock.bind((HOST, 0))
        port = sock.getsockname()[1]
        sock.listen(10)
        sock.set_inheritable(True)

        with process_spawner() as process:
            for worker_id in range(2):
                process.spawn(acceptor_nonblocking, sock, worker_id)

            for iteration in range(100):
                res = test(port)
                if res != (True, True, True):
                    print(res, iteration)
                    break
                print('-------')

if __name__ == '__main__':
    main()

@izmmisha
Copy link

Just realized that we can check will synchronisation fix the problem or not.
Here is the changes to repro sample:

--- shared_socket.py~	2023-04-17 13:27:57.000000000 +0400
+++ shared_socket.py	2023-04-17 15:38:43.000000000 +0400
@@ -1,7 +1,7 @@
 import asyncio
 import socket
 from contextlib import contextmanager
-from multiprocessing import Process
+from multiprocessing import Process, Lock
 from concurrent.futures import ThreadPoolExecutor
 from select import select
 import httpx
@@ -14,8 +14,8 @@
         def __init__(self):
             self.procs = []
 
-        def spawn(self, func, *args):
-            proc = Process(target=func, args=args)
+        def spawn(self, func, *args, **kwargs):
+            proc = Process(target=func, args=args, kwargs=kwargs)
             proc.start()
             self.procs.append(proc)
 
@@ -54,7 +54,7 @@
 
             executor.submit(do_response, conn)
 
-def acceptor_nonblocking(sock, worker_id):
+def acceptor_nonblocking(sock, worker_id, lock=None):
     sock.setblocking(False)
 
     conns = [sock]
@@ -65,11 +65,16 @@
                 # we still need try...except because other
                 # processes will steal our connections
                 try:
+                    if lock:
+                        lock.acquire()
                     conn, addr = sock.accept()
                     print(worker_id, 'Connected by', addr)
                     conns.append(conn)
                 except BlockingIOError:
                     pass
+                finally:
+                    if lock:
+                        lock.release()
             else:
                 with conn:
                     data = conn.recv(1024)
@@ -99,8 +104,9 @@
         sock.set_inheritable(True)
 
         with process_spawner() as process:
+            lock = Lock()
             for worker_id in range(2):
-                process.spawn(acceptor_nonblocking, sock, worker_id)
+                process.spawn(acceptor_nonblocking, sock, worker_id, lock=lock)
 
             for iteration in range(100):
                 res = test(port)

Yes it is fixing blocking on windows platform.

@gi0baro
Copy link
Member

gi0baro commented May 25, 2023

@izmmisha sorry for the super-late reply.

I understand the fix on the sample code but:

  • IMHO using a multi-process lock to "patch" an underlying bug, kinda nullify the advantage of having multiple processes sharing the same socket, as in the end you can accept a connection 1 process at time, and probably degrades performance
  • Even being up tu this patch, I'm quite sure I simply can't change the way tokio's TCPListener accepts new connections

So let's say at the moment, given the information I have, I would be more prone to lock windows on just 1 worker.

@gi0baro gi0baro force-pushed the master branch 4 times, most recently from 403a2ea to a0d86e7 Compare November 15, 2023 13:24
@gi0baro gi0baro force-pushed the master branch 2 times, most recently from ff9588a to 1814868 Compare May 1, 2024 22:24
@gi0baro gi0baro force-pushed the master branch 3 times, most recently from eb5297c to 8f05250 Compare December 12, 2024 18:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants