Skip to content

Instantly share code, notes, and snippets.

@ToniRV
Created January 3, 2019 10:00
Show Gist options
  • Save ToniRV/7515c07122741d0cfe5090638cf209d3 to your computer and use it in GitHub Desktop.
Save ToniRV/7515c07122741d0cfe5090638cf209d3 to your computer and use it in GitHub Desktop.
// Based on [1] Book by Anthony A. Williams C++ Concurrency in Action.
/*******************************************************************************
* MIT License
*
* Copyright (c) 2018 Toni Rosinol
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
******************************************************************************/
#pragma once
#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>
template <typename T>
class ThreadsafeQueue {
public:
ThreadsafeQueue() {}
ThreadsafeQueue(const ThreadsafeQueue& other) {
std::lock_guard<std::mutex> lk(other.mutex_);
data_queue_ = other.data_queue_;
}
void push(T new_value) {
std::unique_lock<std::mutex> lk(mutex_);
data_queue_.push(new_value);
lk.unlock(); // Unlock before notify.
data_cond_.notify_one();
}
// Push with move semantics.
void push(T&& new_value) {
std::unique_lock<std::mutex> lk(mutex_);
data_queue_.push(std::move(new_value));
lk.unlock(); // Unlock before notify.
data_cond_.notify_one();
}
// Pop value. Waits for data to be available in the queue.
void popBlocking(T& value) {
std::unique_lock<std::mutex> lk(mutex_);
// Wait until there is data in the queue or shutdown requested.
data_cond_.wait(lk, [this]{return !data_queue_.empty() || shutdown_;});
// Return in case shutdown is requested.
if (shutdown_) return;
value = data_queue_.front();
data_queue_.pop();
}
// Pop value. Waits for data to be available in the queue.
std::shared_ptr<T> popBlocking() {
std::unique_lock<std::mutex> lk(mutex_);
data_cond_.wait(lk,[this]{return !data_queue_.empty() || shutdown_;});
if (shutdown_) return std::shared_ptr<T>();
// The shared_ptr allocation might throw an exception.
// Making the queue hold shared_ptr instead, would avoid this issue.
// See listing 6.3 in [1].
std::shared_ptr<T> value(std::make_shared<T>(data_queue_.front()));
data_queue_.pop();
return value;
}
// Pop without blocking, just checks once if the queue is empty.
// Returns true if the value could be retrieved, false otherwise.
bool pop(T& value) {
std::lock_guard<std::mutex> lk(mutex_);
if (data_queue_.empty) return false;
value = data_queue_.front();
data_queue_.pop();
return true;
}
// Pop without blocking, just checks once if the queue is empty.
// Returns a shared_ptr to the value retrieved.
// If the queue is empty, it returns a null shared_ptr.
std::shared_ptr<T> pop() {
std::lock_guard<std::mutex> lk(mutex_);
if(data_queue_.empty()) return std::shared_ptr<T>();
// The shared_ptr allocation might throw an exception.
// Making the queue hold shared_ptr instead, would avoid this issue.
// See listing 6.3 in [1].
std::shared_ptr<T> res(std::make_shared<T>(data_queue_.front()));
data_queue_.pop();
return res;
}
void shutdown() {
std::unique_lock<std::mutex> mlock(mutex_);
// Even if the shared variable is atomic, it must be modified under the
// mutex in order to correctly publish the modification to the waiting
// thread.
shutdown_ = true;
mlock.unlock();
data_cond_.notify_all();
}
private:
// Checks if the queue is empty.
// Kept private because it might be misused by the user,
// since the state of the queue might change right after this query.
bool empty() const {
std::lock_guard<std::mutex> lk(mutex_);
return data_queue_.empty();
}
private:
mutable std::mutex mutex_; // mutable for empty() and copy-constructor.
std::queue<T> data_queue_;
std::condition_variable data_cond_;
std::atomic_bool shutdown_ = {false}; // flag for signaling queue shutdown.
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment