diff --git a/Net/src/SocketReactor.cpp b/Net/src/SocketReactor.cpp index f68457a081..32700ee376 100644 --- a/Net/src/SocketReactor.cpp +++ b/Net/src/SocketReactor.cpp @@ -160,13 +160,14 @@ void SocketReactor::sleep() void SocketReactor::stop() { - _stop = true; + if (_stop.exchange(true)) return; wakeUp(); } void SocketReactor::wakeUp() { + if (_stop) return; _pollSet.wakeUp(); _event.set(); } diff --git a/Net/testsuite/src/SocketReactorTest.cpp b/Net/testsuite/src/SocketReactorTest.cpp index 06a26e0467..2319c2618d 100644 --- a/Net/testsuite/src/SocketReactorTest.cpp +++ b/Net/testsuite/src/SocketReactorTest.cpp @@ -395,6 +395,45 @@ namespace Poco::Thread::sleep(500); } }; + + class DummyServiceHandler + { + public: + DummyServiceHandler(StreamSocket& socket, SocketReactor& reactor) : _socket(socket), + _reactor(reactor) + { + _reactor.addEventHandler(_socket, Observer(*this, &DummyServiceHandler::onReadable)); + _reactor.addEventHandler(_socket, Observer(*this, &DummyServiceHandler::onShutdown)); + _socket.setBlocking(false); + } + + ~DummyServiceHandler() + { + _reactor.removeEventHandler(_socket, Observer(*this, &DummyServiceHandler::onReadable)); + _reactor.removeEventHandler(_socket, Observer(*this, &DummyServiceHandler::onShutdown)); + } + + void onReadable(ReadableNotification* pNf) + { + pNf->release(); + char buffer[64]; + do + { + if (0 == _socket.receiveBytes(&buffer[0], sizeof(buffer))) + break; + } while (true); + } + + void onShutdown(ShutdownNotification* pNf) + { + pNf->release(); + delete this; + } + + private: + StreamSocket _socket; + SocketReactor& _reactor; + }; } @@ -589,19 +628,40 @@ void SocketReactorTest::testDataCollection() void SocketReactorTest::testSocketConnectorDeadlock() { - SocketAddress ssa; - ServerSocket ss(ssa); - SocketAddress sa("127.0.0.1", ss.address().port()); - SocketReactor reactor; - Thread thread; + { + SocketAddress ssa; + ServerSocket ss(ssa); + SocketAddress sa("127.0.0.1", ss.address().port()); + SocketReactor reactor; + Thread thread; + int i = 0; + while (++i < 10) + { + auto sc = new SocketConnector(sa, reactor); + thread.startFunc([&reactor]() { reactor.run(); }); + reactor.stop(); + thread.join(); + delete sc; + } + } + int i = 0; while (++i < 10) { - auto sc = new SocketConnector(sa, reactor); - thread.startFunc([&reactor]() { reactor.run(); }); + SocketAddress ssa; + ServerSocket ss(ssa); + SocketReactor reactor; + SocketAcceptor acceptor(ss, reactor); + Thread thread; + thread.start(reactor); + + SocketAddress sa("127.0.0.1", ss.address().port()); + StreamSocket sock(sa); + + std::string data("HELLO"); + sock.sendBytes(data.data(), static_cast(data.size())); reactor.stop(); thread.join(); - delete sc; } }