Skip to content

Instantly share code, notes, and snippets.

@amedama41
Created December 12, 2015 08:54
Show Gist options
  • Save amedama41/50ec73706f6039e591b4 to your computer and use it in GitHub Desktop.
Save amedama41/50ec73706f6039e591b4 to your computer and use it in GitHub Desktop.
#include <cstddef>
#include <algorithm>
#include <atomic>
#include <iostream>
#include <mutex>
#include <random>
#include <thread>
#include <vector>
#include <boost/asio/io_service.hpp>
#include <boost/asio/strand.hpp>
#include <boost/program_options.hpp>
#include <boost/range/irange.hpp>
#include <boost/range/algorithm/for_each.hpp>
#include <boost/thread/barrier.hpp>
#include <boost/timer/timer.hpp>
namespace asio = boost::asio;
std::size_t nwork{};
std::size_t npost{};
std::size_t nloop_max{};
std::atomic<std::size_t> atomic_counter{0};
std::size_t counter{};
thread_local auto thread_index = std::size_t{0};
std::vector<std::size_t> num_invoked{};
thread_local std::mt19937 rnd{};
struct io_service_post_work
{
void operator()()
{
++num_invoked[thread_index];
auto const prev_count
= atomic_counter.fetch_add(1, std::memory_order_relaxed);
if (prev_count + 1 > nwork) {
io_service.stop();
return;
}
auto const volatile busy_loop_count
= std::uniform_int_distribution<std::size_t>(
nloop_max * 0.9, nloop_max)(rnd);
for (auto i = std::size_t{}; i < busy_loop_count; ++i) {}
boost::for_each(
boost::irange(std::size_t{0}, npost)
, [&](std::size_t){ io_service.post(*this); });
}
asio::io_service& io_service;
};
template <class StrandWork>
struct strand_wrapper
{
void operator()()
{
strand.dispatch(StrandWork{strand});
}
asio::io_service::strand strand;
};
struct strand_wrap_work
{
void operator()()
{
++num_invoked[thread_index];
if (++counter > nwork) {
strand.get_io_service().stop();
return;
}
auto const volatile busy_loop_count
= std::uniform_int_distribution<std::size_t>(
nloop_max * 0.9, nloop_max)(rnd);
for (auto i = std::size_t{}; i < busy_loop_count; ++i) {}
boost::for_each(
boost::irange(std::size_t{0}, npost)
, [&](std::size_t){ strand.get_io_service().post(strand.wrap(*this)); });
}
asio::io_service::strand strand;
};
struct strand_post_work
{
void operator()()
{
++num_invoked[thread_index];
if (++counter > nwork) {
strand.get_io_service().stop();
return;
}
auto const volatile busy_loop_count
= std::uniform_int_distribution<std::size_t>(
nloop_max * 0.9, nloop_max)(rnd);
for (auto i = std::size_t{}; i < busy_loop_count; ++i) {}
boost::for_each(
boost::irange(std::size_t{0}, npost)
, [&](std::size_t){ strand.post(*this); });
}
asio::io_service::strand strand;
};
struct mutex_io_servcie_post_work
{
void operator()()
{
++num_invoked[thread_index];
{
std::lock_guard<std::mutex> lock{mutex};
if (++counter > nwork) {
io_service.stop();
return;
}
}
auto const volatile busy_loop_count
= std::uniform_int_distribution<std::size_t>(
nloop_max * 0.9, nloop_max)(rnd);
for (auto i = std::size_t{}; i < busy_loop_count; ++i) {}
boost::for_each(
boost::irange(std::size_t{0}, npost)
, [&](std::size_t){ io_service.post(*this); });
}
asio::io_service& io_service;
static std::mutex mutex;
};
std::mutex mutex_io_servcie_post_work::mutex{};
template <class Work, class Executor, class Counter>
void run(std::vector<asio::io_service>& io_service
, std::vector<Executor>& executor
, std::size_t const nthread, Counter& counter)
{
auto threads = std::vector<std::thread>();
threads.reserve(nthread);
counter = 0;
::num_invoked.resize(nthread);
std::cout << "setup " << nthread << " thread(s)..." << std::endl;
boost::barrier barrier(nthread + 1);
for (auto i = std::size_t{0}; i < nthread; ++i) {
threads.emplace_back([&, i] {
::thread_index = i;
::num_invoked[::thread_index] = 0;
auto& ios = io_service[i % io_service.size()];
ios.post(Work{executor[i % executor.size()]});
barrier.wait();
ios.run();
barrier.wait();
});
}
{
std::cout << "running..." << std::endl;
barrier.wait();
boost::timer::auto_cpu_timer timer{};
barrier.wait();
}
for (auto&& t : threads) {
t.join();
}
std::cout << "the number of invoked handler: " << counter << std::endl;
for (auto i = std::size_t{0}; i < nthread; ++i) {
std::cout << "thread " << i << ": " << ::num_invoked[i] << std::endl;
}
}
int main(int argc, char* argv[])
{
try {
namespace popts = boost::program_options;
popts::options_description desc{"performe io_service"};
desc.add_options()
("help,h", "display help message")
("nio_service,i", popts::value<std::size_t>()->default_value(1), "the number of io_services")
("nthread,t", popts::value<std::size_t>()->default_value(1), "the number of threads")
("nwork,w", popts::value<std::size_t>(&nwork)->default_value(10000), "the number of works that producers post to io_services")
("npost,p", popts::value<std::size_t>(&npost)->default_value(1), "the number of posts per work")
("nloop,l", popts::value<std::size_t>(&nloop_max)->default_value(1000), "max loop count per work")
("strand,s", "use strand as executor")
("mutex,m", "use mutex instead of atomic counter")
("wrap,r", "use strand::wrap instead of atomic counter")
;
auto vm = popts::variables_map{};
popts::store(popts::parse_command_line(argc, argv, desc), vm);
popts::notify(vm);
if (vm.count("help")) {
std::cout << desc << std::endl;
return 0;
}
auto const nio_service = vm["nio_service"].as<std::size_t>();
auto const nthread
= std::max(vm["nthread"].as<std::size_t>(), nio_service);
if (!vm.count("strand") && !vm.count("mutex") && !vm.count("wrap")) {
std::cout << "use " << nio_service << " io_service(s) with atomic counter" << std::endl;
auto io_service = std::vector<asio::io_service>(nio_service);
run<io_service_post_work>(io_service, io_service, nthread, atomic_counter);
}
if (vm.count("strand")) {
std::cout << "use " << nio_service << " strand(s) with strad::post" << std::endl;
auto io_service = std::vector<asio::io_service>(1);
auto strand = std::vector<asio::io_service::strand>{};
strand.reserve(nio_service);
for (auto i = std::size_t{}; i < nio_service; ++i) {
strand.emplace_back(io_service[0]);
}
run<strand_wrapper<strand_post_work>>(io_service, strand, nthread, counter);
}
if (vm.count("mutex")) {
std::cout << "use " << nio_service << " io_service(s) with mutex" << std::endl;
auto io_service = std::vector<asio::io_service>(nio_service);
run<mutex_io_servcie_post_work>(io_service, io_service, nthread, counter);
}
if (vm.count("wrap")) {
std::cout << "use " << nio_service << " strand(s) with strand::wrap" << std::endl;
auto io_service = std::vector<asio::io_service>(1);
auto strand = std::vector<asio::io_service::strand>{};
strand.reserve(nio_service);
for (auto i = std::size_t{}; i < nio_service; ++i) {
strand.emplace_back(io_service[0]);
}
run<strand_wrapper<strand_wrap_work>>(io_service, strand, nthread, counter);
}
}
catch (std::exception const& e) {
std::cerr << e.what() << std::endl;
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment