implement proper locked_queue and use it

JSON-15
Roker 7 years ago
parent 49b6f05a2c
commit b96884c494

@ -452,7 +452,7 @@ struct JsonAdapter::Internal
ThreadPool threads;
// Sync
locked_queue< sync_msg_t* > sync_queue;
locked_queue< sync_msg_t*, &free_sync_msg> sync_queue;
PEP_SESSION sync_session = nullptr;
ThreadPtr sync_thread{nullptr, ThreadDeleter};
@ -514,32 +514,27 @@ struct JsonAdapter::Internal
int injectSyncMsg(void* msg)
{
sync_queue.push_back((sync_msg_t*)msg);
sync_queue.push_back( static_cast<sync_msg_t*>(msg) );
return 0;
}
void* retrieveNextSyncMsg(time_t* timeout)
{
while (sync_queue.empty())
// TODO: add blocking dequeue
usleep(100000);
void* msg = sync_queue.front();
sync_queue.pop_front();
const std::chrono::milliseconds timeout_ms(1000*long(*timeout));
sync_msg_t* msg = nullptr;
const bool success = sync_queue.try_pop_front(msg, timeout_ms);
if(!success)
{
*timeout = 0;
return nullptr;
}
return msg;
}
void* syncThreadRoutine(void* arg)
{
PEP_STATUS status = do_sync_protocol(sync_session, arg);
while (sync_queue.size())
{
sync_msg_t* msg = sync_queue.front();
sync_queue.pop_front();
free_sync_msg(msg);
}
PEP_STATUS status = do_sync_protocol(sync_session, arg); // does the whole work
sync_queue.clear(); // remove remaining messages
return (void*) status;
}
};

@ -2,66 +2,143 @@
#include <list>
#include <mutex>
#include <cerrno>
#include <condition_variable>
namespace pEp {
namespace utility {
namespace pEp
{
namespace utility
{
// a thread-safe queue of T elements. Deleter is a functor that is called in clear() for each elements
// interface differs from std::queue because "top() and pop() if not empty()" does not work atomically!
// elements must be copied without exceptions!
// pop() blocks when queue is empty
template<class T, void(*Deleter)(T)> class locked_queue
{
typedef std::recursive_mutex Mutex;
typedef std::unique_lock<Mutex> Lock;
Mutex _mtx;
std::condition_variable_any _cv;
std::list<T> _q;
template<class T> class locked_queue
{
typedef std::recursive_mutex Mutex;
typedef std::lock_guard<Mutex> Lock;
Mutex _mtx;
std::list<T> _q;
public:
T& back()
{
Lock L(_mtx);
return _q.back();
}
T& front()
{
Lock L(_mtx);
return _q.front();
}
void clear()
{
Lock L(_mtx);
_q.clear();
}
void pop_back()
{
Lock L(_mtx);
_q.pop_back();
}
void pop_front()
{
Lock L(_mtx);
_q.pop_front();
}
void push_back(const T& data)
{
Lock L(_mtx);
_q.push_back(data);
}
void push_front(const T& data)
{
Lock L(_mtx);
_q.push_front(data);
}
size_t size()
{
Lock L(_mtx);
return _q.size();
}
bool empty()
{
Lock L(_mtx);
return _q.empty();
}
};
}
}
public:
~locked_queue()
{
clear();
}
void clear()
{
Lock L(_mtx);
for(auto element : _q)
{
Deleter(element);
}
_q.clear();
}
// returns a copy of the last element.
// blocks when queue is empty
T pop_back()
{
Lock L(_mtx);
_cv.wait(L, [&]{ return !_q.empty(); } );
T ret{std::move(_q.back())};
_q.pop_back();
return ret;
}
// returns a copy of the first element.
// blocks when queue is empty
T pop_front()
{
Lock L(_mtx);
_cv.wait(L, [&]{ return !_q.empty(); } );
T ret{std::move(_q.front())};
_q.pop_front();
return ret;
}
// returns true and set a copy of the last element and pop it from queue if there is any
// returns false and leaves 'out' untouched if queue is empty even after 'timeout' milliseconds
bool try_pop_back(T& out, std::chrono::milliseconds timeout)
{
Lock L(_mtx);
if(! _cv.wait_for(L, timeout, [this]{ return !_q.empty(); } ) )
{
return false;
}
out = std::move(_q.back());
_q.pop_back();
return true;
}
// returns true and set a copy of the first element and pop it from queue if there is any
// returns false and leaves 'out' untouched if queue is empty even after 'timeout' milliseconds
bool try_pop_front(T& out, std::chrono::milliseconds timeout)
{
Lock L(_mtx);
if(! _cv.wait_for(L, timeout, [this]{ return !_q.empty(); } ) )
{
return false;
}
out = std::move(_q.front());
_q.pop_front();
return true;
}
void push_back(const T& data)
{
{
Lock L(_mtx);
_q.push_back(data);
}
_cv.notify_one();
}
void push_front(const T& data)
{
{
Lock L(_mtx);
_q.push_front(data);
}
_cv.notify_one();
}
void push_back(T&& data)
{
{
Lock L(_mtx);
_q.push_back( std::move(data) );
}
_cv.notify_one();
}
void push_front(T&& data)
{
{
Lock L(_mtx);
_q.push_front( std::move(data) );
}
_cv.notify_one();
}
size_t size()
{
Lock L(_mtx);
return _q.size();
}
bool empty()
{
Lock L(_mtx);
return _q.empty();
}
};
} // end of namespace pEp::util
} // end of namespace pEp

Loading…
Cancel
Save