Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Boost fiber #173

Draft
wants to merge 27 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e1ba2c4
Compiles without tests
pmconrad Sep 29, 2019
a6df917
Adapted test
pmconrad Sep 29, 2019
17d9c29
Add comment about deadlock
pmconrad Oct 6, 2019
37cd419
Switched fc::async and fc::schedule to boost::fibers
pmconrad Oct 7, 2019
39fa142
Added synchronization for pool initialization
pmconrad Oct 7, 2019
0e166f6
Intermediate
pmconrad Oct 10, 2019
1d5ea8b
Added get/set_thread_name + ...fiber_name
pmconrad Oct 16, 2019
422905e
Imported helper files from https://github.com/boostorg/fiber/blob/dev…
pmconrad Nov 1, 2019
f8a5255
Compiles
pmconrad Nov 1, 2019
1e3b20b
Intermediate version with non-working tests
pmconrad Nov 4, 2019
348e38d
Fixup
pmconrad Nov 4, 2019
9f5a7f9
Get rid of target_thread_properties
pmconrad Nov 6, 2019
6fd0e60
Tests working
pmconrad Nov 6, 2019
1f7cb42
Add method for thread initialization
pmconrad Nov 6, 2019
c058d74
Avoid wrong deadlock exception in pool destructor
pmconrad Nov 6, 2019
72ec010
Fix test initialization
pmconrad Nov 11, 2019
b4c738f
Auto-initialize sending threads
pmconrad Nov 11, 2019
87896be
Fixed synchronization in websocket_test
pmconrad Nov 11, 2019
b826d15
Fixup initialization
pmconrad Nov 11, 2019
d4c7c11
Make initialize_fibers multi-call safe
pmconrad Nov 11, 2019
50fa4d3
Fix synchronization
pmconrad Nov 11, 2019
625000b
Move class sync_point into common header file
pmconrad Nov 12, 2019
6750442
Use a std::thread for deleting files instead of a fiber
pmconrad Nov 14, 2019
50f1a22
Avoid using locks in destructors of static objects
pmconrad Nov 14, 2019
4f45da9
Switch from boost::thread to std::thread
pmconrad Nov 15, 2019
dc672fa
Fixed pool implementation
pmconrad Nov 15, 2019
93bab40
Handle delete thread being quicker than create thread
pmconrad Dec 18, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ if(CMAKE_SIZEOF_VOID_P EQUAL 8)
endif()

SET(BOOST_COMPONENTS)
LIST(APPEND BOOST_COMPONENTS coroutine thread date_time filesystem system program_options chrono unit_test_framework context iostreams regex)
LIST(APPEND BOOST_COMPONENTS coroutine thread date_time filesystem system program_options chrono
unit_test_framework context iostreams regex fiber)
# boost::endian is also required, but FindBoost can't handle header-only libs
SET( Boost_USE_STATIC_LIBS ON CACHE STRING "ON or OFF" )

Expand Down Expand Up @@ -167,15 +168,8 @@ set( fc_sources
src/exception.cpp
src/variant_object.cpp
src/static_variant.cpp
src/thread/thread.cpp
src/thread/thread_specific.cpp
src/thread/future.cpp
src/thread/task.cpp
src/thread/spin_lock.cpp
src/thread/spin_yield_lock.cpp
src/thread/mutex.cpp
src/thread/fibers.cpp
src/thread/parallel.cpp
src/thread/non_preemptable_scope_check.cpp
src/asio.cpp
src/string.cpp
src/stacktrace.cpp
Expand All @@ -188,6 +182,7 @@ set( fc_sources
src/io/sstream.cpp
src/io/json.cpp
src/io/varint.cpp
src/io/stdio.cpp
src/filesystem.cpp
src/interprocess/signals.cpp
src/interprocess/file_mapping.cpp
Expand Down Expand Up @@ -310,6 +305,8 @@ else()
set( ZLIB_LIBRARIES "" )
endif( ZLIB_FOUND )

target_include_directories(fc PUBLIC vendor)

# This will become unnecessary once we update to websocketpp which fixes upstream issue #395
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DWEBSOCKETPP_STRICT_MASKING")

Expand Down
5 changes: 4 additions & 1 deletion include/fc/api.hpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
#pragma once
#include <fc/thread/future.hpp>
#include <functional>

#include <boost/any.hpp>
#include <boost/config.hpp>

#include <fc/exception/exception.hpp>
#include <fc/optional.hpp>

// ms visual c++ (as of 2013) doesn't accept the standard syntax for calling a
// templated member function (foo->template bar();)
#ifdef _MSC_VER
Expand Down
242 changes: 65 additions & 177 deletions include/fc/asio.hpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
/**
* @file fc/cmt/asio.hpp
* @file fc/asio.hpp
* @brief defines wrappers for boost::asio functions
*/
#pragma once
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <boost/fiber/asio/yield.hpp>

#include <vector>
#include <fc/thread/future.hpp>

#include <fc/exception/exception.hpp>
#include <fc/io/iostream.hpp>

namespace fc {
Expand All @@ -19,39 +20,6 @@ namespace asio {
* @brief internal implementation types/methods for fc::asio
*/
namespace detail {

class read_write_handler
{
public:
read_write_handler(const promise<size_t>::ptr& p);
void operator()(const boost::system::error_code& ec, size_t bytes_transferred);
private:
promise<size_t>::ptr _completion_promise;
};

class read_write_handler_with_buffer
{
public:
read_write_handler_with_buffer(const promise<size_t>::ptr& p,
const std::shared_ptr<const char>& buffer);
void operator()(const boost::system::error_code& ec, size_t bytes_transferred);
private:
promise<size_t>::ptr _completion_promise;
std::shared_ptr<const char> _buffer;
};

//void read_write_handler( const promise<size_t>::ptr& p,
// const boost::system::error_code& ec,
// size_t bytes_transferred );
void read_write_handler_ec( promise<size_t>* p,
boost::system::error_code* oec,
const boost::system::error_code& ec,
size_t bytes_transferred );
void error_handler( const promise<void>::ptr& p,
const boost::system::error_code& ec );
void error_handler_ec( promise<boost::system::error_code>* p,
const boost::system::error_code& ec );

template<typename C>
struct non_blocking {
bool operator()( C& c ) { return c.non_blocking(); }
Expand Down Expand Up @@ -81,7 +49,7 @@ namespace asio {
static uint16_t get_num_threads();
boost::asio::io_service* io;
private:
std::vector<boost::thread*> asio_threads;
std::vector<std::thread> asio_threads;
boost::asio::io_service::work* the_work;
protected:
static uint16_t num_io_threads; // marked protected to help with testing
Expand All @@ -95,145 +63,59 @@ namespace asio {
*/
boost::asio::io_service& default_io_service();

/**
* @brief wraps boost::asio::async_read
* @pre s.non_blocking() == true
* @return the number of bytes read.
*/
template<typename AsyncReadStream, typename MutableBufferSequence>
size_t read( AsyncReadStream& s, const MutableBufferSequence& buf ) {
promise<size_t>::ptr p = promise<size_t>::create("fc::asio::read");
boost::asio::async_read( s, buf, detail::read_write_handler(p) );
return p->wait();
}
/**
* This method will read at least 1 byte from the stream and will
* cooperatively block until that byte is available or an error occurs.
*
* If the stream is not in 'non-blocking' mode it will be put in 'non-blocking'
* mode it the stream supports s.non_blocking() and s.non_blocking(bool).
*
* If in non blocking mode, the call will be synchronous avoiding heap allocs
* and context switching. If the sync call returns 'would block' then an
* promise is created and an async read is generated.
*
* @return the number of bytes read.
*/
template<typename AsyncReadStream, typename MutableBufferSequence>
future<size_t> read_some(AsyncReadStream& s, const MutableBufferSequence& buf)
{
promise<size_t>::ptr completion_promise = promise<size_t>::create("fc::asio::async_read_some");
s.async_read_some(buf, detail::read_write_handler(completion_promise));
return completion_promise;//->wait();
}

template<typename AsyncReadStream>
future<size_t> read_some(AsyncReadStream& s, char* buffer, size_t length, size_t offset = 0)
{
promise<size_t>::ptr completion_promise = promise<size_t>::create("fc::asio::async_read_some");
s.async_read_some(boost::asio::buffer(buffer + offset, length),
detail::read_write_handler(completion_promise));
return completion_promise;//->wait();
}

template<typename AsyncReadStream>
future<size_t> read_some(AsyncReadStream& s, const std::shared_ptr<char>& buffer, size_t length, size_t offset)
{
promise<size_t>::ptr completion_promise = promise<size_t>::create("fc::asio::async_read_some");
s.async_read_some(boost::asio::buffer(buffer.get() + offset, length),
detail::read_write_handler_with_buffer(completion_promise, buffer));
return completion_promise;//->wait();
}

template<typename AsyncReadStream, typename MutableBufferSequence>
void async_read_some(AsyncReadStream& s, const MutableBufferSequence& buf, promise<size_t>::ptr completion_promise)
{
s.async_read_some(buf, detail::read_write_handler(completion_promise));
}

template<typename AsyncReadStream>
void async_read_some(AsyncReadStream& s, char* buffer,
size_t length, promise<size_t>::ptr completion_promise)
size_t read_some( AsyncReadStream& s, char* buffer, size_t length, size_t offset = 0 )
{
s.async_read_some(boost::asio::buffer(buffer, length), detail::read_write_handler(completion_promise));
boost::system::error_code ec;
std::size_t rlen = s.async_read_some( boost::asio::buffer(buffer + offset, length),
boost::fibers::asio::yield_t()[ec] );
if ( ec == boost::asio::error::eof) {
throw fc::eof_exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) );
} else if ( ec) {
throw fc::exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) );
}
return rlen;
}

template<typename AsyncReadStream>
void async_read_some(AsyncReadStream& s, const std::shared_ptr<char>& buffer,
size_t length, size_t offset, promise<size_t>::ptr completion_promise)
size_t read_some(AsyncReadStream& s, const std::shared_ptr<char>& buffer, size_t length, size_t offset)
{
s.async_read_some(boost::asio::buffer(buffer.get() + offset, length), detail::read_write_handler_with_buffer(completion_promise, buffer));
}

template<typename AsyncReadStream>
size_t read_some( AsyncReadStream& s, boost::asio::streambuf& buf )
{
char buffer[1024];
size_t bytes_read = read_some( s, boost::asio::buffer( buffer, sizeof(buffer) ) );
buf.sputn( buffer, bytes_read );
return bytes_read;
}

/** @brief wraps boost::asio::async_write
* @return the number of bytes written
*/
template<typename AsyncWriteStream, typename ConstBufferSequence>
size_t write( AsyncWriteStream& s, const ConstBufferSequence& buf ) {
promise<size_t>::ptr p = promise<size_t>::create("fc::asio::write");
boost::asio::async_write(s, buf, detail::read_write_handler(p));
return p->wait();
}

/**
* @pre s.non_blocking() == true
* @brief wraps boost::asio::async_write_some
* @return the number of bytes written
*/
template<typename AsyncWriteStream, typename ConstBufferSequence>
future<size_t> write_some( AsyncWriteStream& s, const ConstBufferSequence& buf ) {
promise<size_t>::ptr p = promise<size_t>::create("fc::asio::write_some");
s.async_write_some( buf, detail::read_write_handler(p));
return p; //->wait();
}

template<typename AsyncWriteStream>
future<size_t> write_some( AsyncWriteStream& s, const char* buffer,
size_t length, size_t offset = 0) {
promise<size_t>::ptr p = promise<size_t>::create("fc::asio::write_some");
s.async_write_some( boost::asio::buffer(buffer + offset, length), detail::read_write_handler(p));
return p; //->wait();
}

template<typename AsyncWriteStream>
future<size_t> write_some( AsyncWriteStream& s, const std::shared_ptr<const char>& buffer,
size_t length, size_t offset ) {
promise<size_t>::ptr p = promise<size_t>::create("fc::asio::write_some");
s.async_write_some( boost::asio::buffer(buffer.get() + offset, length), detail::read_write_handler_with_buffer(p, buffer));
return p; //->wait();
}

/**
* @pre s.non_blocking() == true
* @brief wraps boost::asio::async_write_some
* @return the number of bytes written
*/
template<typename AsyncWriteStream, typename ConstBufferSequence>
void async_write_some(AsyncWriteStream& s, const ConstBufferSequence& buf, promise<size_t>::ptr completion_promise) {
s.async_write_some(buf, detail::read_write_handler(completion_promise));
boost::system::error_code ec;
std::size_t rlen = s.async_read_some( boost::asio::buffer(buffer.get() + offset, length),
boost::fibers::asio::yield_t()[ec] );
if ( ec == boost::asio::error::eof) {
throw fc::eof_exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) );
} else if ( ec) {
throw fc::exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) );
}
return rlen;
}

template<typename AsyncWriteStream>
void async_write_some(AsyncWriteStream& s, const char* buffer,
size_t length, promise<size_t>::ptr completion_promise) {
s.async_write_some(boost::asio::buffer(buffer, length),
detail::read_write_handler(completion_promise));
size_t write_some( AsyncWriteStream& s, const char* buffer, size_t length, size_t offset = 0 ) {
boost::system::error_code ec;
std::size_t rlen = s.async_write_some( boost::asio::buffer(buffer + offset, length),
boost::fibers::asio::yield_t()[ec] );
if ( ec == boost::asio::error::eof) {
throw fc::eof_exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) );
} else if ( ec) {
throw fc::exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) );
}
return rlen;
}

template<typename AsyncWriteStream>
void async_write_some(AsyncWriteStream& s, const std::shared_ptr<const char>& buffer,
size_t length, size_t offset, promise<size_t>::ptr completion_promise) {
s.async_write_some(boost::asio::buffer(buffer.get() + offset, length),
detail::read_write_handler_with_buffer(completion_promise, buffer));
size_t write_some( AsyncWriteStream& s, const std::shared_ptr<const char>& buffer,
size_t length, size_t offset ) {
boost::system::error_code ec;
std::size_t rlen = s.async_write_some( boost::asio::buffer(buffer.get() + offset, length),
boost::fibers::asio::yield_t()[ec] );
if ( ec == boost::asio::error::eof) {
throw fc::eof_exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) );
} else if ( ec) {
throw fc::exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) );
}
return rlen;
}

namespace tcp {
Expand All @@ -249,10 +131,13 @@ namespace asio {
*/
template<typename SocketType, typename AcceptorType>
void accept( AcceptorType& acc, SocketType& sock ) {
promise<void>::ptr p = promise<void>::create("fc::asio::tcp::accept");
acc.async_accept( sock, boost::bind( fc::asio::detail::error_handler, p, _1 ) );
p->wait();
//if( ec ) BOOST_THROW_EXCEPTION( boost::system::system_error(ec) );
boost::system::error_code ec;
acc.async_accept( sock, boost::fibers::asio::yield_t()[ec] );
if ( ec == boost::asio::error::eof) {
throw fc::eof_exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) );
} else if ( ec) {
throw fc::exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) );
}
}

/** @brief wraps boost::asio::socket::async_connect
Expand All @@ -261,10 +146,13 @@ namespace asio {
*/
template<typename AsyncSocket, typename EndpointType>
void connect( AsyncSocket& sock, const EndpointType& ep ) {
promise<void>::ptr p = promise<void>::create("fc::asio::tcp::connect");
sock.async_connect( ep, boost::bind( fc::asio::detail::error_handler, p, _1 ) );
p->wait();
//if( ec ) BOOST_THROW_EXCEPTION( boost::system::system_error(ec) );
boost::system::error_code ec;
sock.async_connect( ep, boost::fibers::asio::yield_t()[ec] );
if ( ec == boost::asio::error::eof) {
throw fc::eof_exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) );
} else if ( ec) {
throw fc::exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) );
}
}
}
namespace udp {
Expand All @@ -285,11 +173,11 @@ namespace asio {

virtual size_t readsome( char* buf, size_t len )
{
return fc::asio::read_some(*_stream, buf, len).wait();
return fc::asio::read_some(*_stream, buf, len);
}
virtual size_t readsome( const std::shared_ptr<char>& buf, size_t len, size_t offset )
{
return fc::asio::read_some(*_stream, buf, len, offset).wait();
return fc::asio::read_some(*_stream, buf, len, offset);
}

private:
Expand All @@ -305,12 +193,12 @@ namespace asio {

virtual size_t writesome( const char* buf, size_t len )
{
return fc::asio::write_some(*_stream, buf, len).wait();
return fc::asio::write_some(*_stream, buf, len);
}

virtual size_t writesome( const std::shared_ptr<const char>& buf, size_t len, size_t offset )
{
return fc::asio::write_some(*_stream, buf, len, offset).wait();
return fc::asio::write_some(*_stream, buf, len, offset);
}

virtual void close(){ _stream->close(); }
Expand Down
1 change: 0 additions & 1 deletion include/fc/io/stdio.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ namespace fc

class cin_t : virtual public istream {
public:
~cin_t();
virtual size_t readsome( char* buf, size_t len );
virtual size_t readsome( const std::shared_ptr<char>& buf, size_t len, size_t offset );
virtual istream& read( char* buf, size_t len );
Expand Down
Loading