ENGINE-187 retrieveNextSyncMsg now update *timeout value with remaining time

ENGINE-187
Edouard Tisserant 6 years ago
parent 2c5136aaf9
commit b5f8df83ac

@ -587,15 +587,33 @@ struct JsonAdapter::Internal
void* retrieveNextSyncMsg(time_t* timeout)
{
const std::chrono::milliseconds timeout_ms(1000*long(*timeout));
sync_msg_t* msg = nullptr;
const bool success = sync_queue.try_pop_front(msg, timeout_ms);
if(!success)
{
*timeout = 0;
return nullptr;
}
return msg;
sync_msg_t* msg = nullptr;
if(timeout && *timeout) {
std::chrono::steady_clock::time_point end_time = std::chrono::steady_clock::now()
+ std::chrono::seconds(*timeout);
const bool success = sync_queue.try_pop_front(msg, end_time);
if(!success)
{
// this is timeout occurrence
return nullptr;
}
// we got a message while waiting for timeout -> compute remaining time
std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
if (now < end_time)
{
*timeout = std::chrono::duration_cast<std::chrono::seconds>(end_time - now).count();
}
else
{
*timeout = 0;
}
}else{
msg = sync_queue.pop_front();
}
return msg;
}
void* syncThreadRoutine(void* arg)

@ -62,10 +62,10 @@ namespace pEp
// returns true and set a copy of the last element and pop it from queue if there is any
// returns false and leaves 'out' untouched if queue is empty even after 'timeout' milliseconds
bool try_pop_back(T& out, std::chrono::milliseconds timeout)
bool try_pop_back(T& out, std::chrono::steady_clock::time_point end_time)
{
Lock L(_mtx);
if(! _cv.wait_for(L, timeout, [this]{ return !_q.empty(); } ) )
if(! _cv.wait_for(L, end_time, [this]{ return !_q.empty(); } ) )
{
return false;
}
@ -77,10 +77,10 @@ namespace pEp
// returns true and set a copy of the first element and pop it from queue if there is any
// returns false and leaves 'out' untouched if queue is empty even after 'timeout' milliseconds
bool try_pop_front(T& out, std::chrono::milliseconds timeout)
bool try_pop_front(T& out, std::chrono::steady_clock::time_point end_time)
{
Lock L(_mtx);
if(! _cv.wait_for(L, timeout, [this]{ return !_q.empty(); } ) )
if(! _cv.wait_for(L, end_time, [this]{ return !_q.empty(); } ) )
{
return false;
}

@ -33,7 +33,7 @@ void consu(unsigned nr)
while(! (finished && Q.empty()) )
{
uint64_t u = ~0ull;
if( !Q.try_pop_front(u, std::chrono::milliseconds(2*1000)) )
if( !Q.try_pop_front(u, std::chrono::steady_clock::now() + std::chrono::seconds(2)) )
{
std::cout << "Consumer #" << nr << " times out. Got " << v[nr].size() << " elements." << std::endl;
return;

Loading…
Cancel
Save