Browse Source

use std::shared_ptr<> to the queue instead of reference, to avoid destruction of the queue while it is used.

JSON-170
Roker 1 year ago
parent
commit
0a4a87f79a
1 changed files with 17 additions and 14 deletions
  1. +17
    -14
      server/json-adapter.cc

+ 17
- 14
server/json-adapter.cc View File

@ -70,7 +70,7 @@ typedef variant<std::thread::id, std::string> EventListenerKey;
struct EventListenerValue
{
utility::locked_queue<js::Object> Q;
std::shared_ptr<utility::locked_queue<js::Object>> Q = std::make_shared<utility::locked_queue<js::Object>>();;
};
static std::hash<std::thread::id> hash_tid;
@ -144,8 +144,8 @@ struct JsonAdapter::Internal
for(auto& e : eventListener)
{
Log << Logger::Debug << " ~~~ " << to_log(e.first) << " has " << e.second.Q.size() << " old events waiting.";
e.second.Q.push_back(request);
Log << Logger::Debug << " ~~~ " << to_log(e.first) << " has " << e.second.Q->size() << " old events waiting.";
e.second.Q->push_back(request);
}
}
@ -305,13 +305,15 @@ catch (std::exception const &e)
void JsonAdapter::connection_close_cb()
{
Lock L{_mtx};
auto q = i->eventListener.find( std::this_thread::get_id() );
Log() << "Connection Close Callback: " << (q==i->eventListener.end() ? "NO" : "1") << " entry in eventListener map";
const auto tid = std::this_thread::get_id();
auto q = i->eventListener.find( tid );
i->Log << Logger::Debug << "Connection Close Callback: " << (q==i->eventListener.end() ? "NO" : "1") << " entry in eventListener map for Thread " << tid << ".";
if(q != i->eventListener.end())
{
while(q->second.Q.waiting() > 0)
i->Log.debug("%d listener(s) waiting on event queue", q->second.Q->waiting());
while(q->second.Q->waiting() > 0)
{
q->second.Q.push_back(queue_close_event);
q->second.Q->push_back(queue_close_event);
std::this_thread::sleep_for( std::chrono::milliseconds(333) );
}
i->eventListener.erase(q);
@ -323,12 +325,13 @@ void JsonAdapter::close_session(const std::string& session_id)
{
Lock L{_mtx};
auto q = i->eventListener.find( session_id );
Log() << "Close session \"" << session_id << "\": " << (q==i->eventListener.end() ? "NO" : "1") << " entry in eventListener map";
i->Log << Logger::Debug << "Close session \"" << session_id << "\": " << (q==i->eventListener.end() ? "NO" : "1") << " entry in eventListener map for session_id \"" << session_id << "\".";
if(q != i->eventListener.end())
{
while(q->second.Q.waiting() > 0)
i->Log.debug("%d listener(s) waiting on event queue", q->second.Q->waiting());
while(q->second.Q->waiting() > 0)
{
q->second.Q.push_back(queue_close_event);
q->second.Q->push_back(queue_close_event);
std::this_thread::sleep_for( std::chrono::milliseconds(333) );
}
i->eventListener.erase(q);
@ -389,17 +392,17 @@ js::Array JsonAdapter::Internal::pollForEvents(const EventListenerKey& key, unsi
Logger L("JAI:poll");
Lock LCK{_mtx};
EventListenerValue& el = eventListener[key]; // adds an entry, if not already there. :-)
auto elQ = eventListener[key].Q; // adds an entry, if not already there. :-)
LCK.unlock();
const size_t size = el.Q.size();
const size_t size = elQ->size();
if(size)
{
L << Logger::Debug << size << " events in queue for key " << to_log(key) << ":";
// fetch all elements from queue
for(size_t i=0; i<size; ++i)
{
js::Object obj{ el.Q.pop_front() };
js::Object obj{ elQ->pop_front() };
const std::string obj_s = js::write( obj );
L << Logger::Debug << "\t#" << i << ": " << obj_s;
@ -409,7 +412,7 @@ js::Array JsonAdapter::Internal::pollForEvents(const EventListenerKey& key, unsi
// block until there is at least one element or timeout
L << Logger::Debug << "Queue for key " << to_log(key) << " is empty. I'll block for " << timeout_seconds << " seconds.";
js::Object event;
const bool success = el.Q.try_pop_front( event, std::chrono::seconds(timeout_seconds) );
const bool success = elQ->try_pop_front( event, std::chrono::seconds(timeout_seconds) );
if(success)
{
const std::string event_s = js::write(event);


Loading…
Cancel
Save