diff --git a/server/json-adapter.cc b/server/json-adapter.cc index 9c71dab..c7e4a06 100644 --- a/server/json-adapter.cc +++ b/server/json-adapter.cc @@ -70,7 +70,7 @@ typedef variant EventListenerKey; struct EventListenerValue { - utility::locked_queue Q; + std::shared_ptr> Q = std::make_shared>();; }; static std::hash hash_tid; @@ -144,8 +144,8 @@ struct JsonAdapter::Internal for(auto& e : eventListener) { - Log << Logger::Debug << " ~~~ " << to_log(e.first) << " has " << e.second.Q.size() << " old events waiting."; - e.second.Q.push_back(request); + Log << Logger::Debug << " ~~~ " << to_log(e.first) << " has " << e.second.Q->size() << " old events waiting."; + e.second.Q->push_back(request); } } @@ -305,13 +305,15 @@ catch (std::exception const &e) void JsonAdapter::connection_close_cb() { Lock L{_mtx}; - auto q = i->eventListener.find( std::this_thread::get_id() ); - Log() << "Connection Close Callback: " << (q==i->eventListener.end() ? "NO" : "1") << " entry in eventListener map"; + const auto tid = std::this_thread::get_id(); + auto q = i->eventListener.find( tid ); + i->Log << Logger::Debug << "Connection Close Callback: " << (q==i->eventListener.end() ? "NO" : "1") << " entry in eventListener map for Thread " << tid << "."; if(q != i->eventListener.end()) { - while(q->second.Q.waiting() > 0) + i->Log.debug("%d listener(s) waiting on event queue", q->second.Q->waiting()); + while(q->second.Q->waiting() > 0) { - q->second.Q.push_back(queue_close_event); + q->second.Q->push_back(queue_close_event); std::this_thread::sleep_for( std::chrono::milliseconds(333) ); } i->eventListener.erase(q); @@ -323,12 +325,13 @@ void JsonAdapter::close_session(const std::string& session_id) { Lock L{_mtx}; auto q = i->eventListener.find( session_id ); - Log() << "Close session \"" << session_id << "\": " << (q==i->eventListener.end() ? "NO" : "1") << " entry in eventListener map"; + i->Log << Logger::Debug << "Close session \"" << session_id << "\": " << (q==i->eventListener.end() ? "NO" : "1") << " entry in eventListener map for session_id \"" << session_id << "\"."; if(q != i->eventListener.end()) { - while(q->second.Q.waiting() > 0) + i->Log.debug("%d listener(s) waiting on event queue", q->second.Q->waiting()); + while(q->second.Q->waiting() > 0) { - q->second.Q.push_back(queue_close_event); + q->second.Q->push_back(queue_close_event); std::this_thread::sleep_for( std::chrono::milliseconds(333) ); } i->eventListener.erase(q); @@ -389,17 +392,17 @@ js::Array JsonAdapter::Internal::pollForEvents(const EventListenerKey& key, unsi Logger L("JAI:poll"); Lock LCK{_mtx}; - EventListenerValue& el = eventListener[key]; // adds an entry, if not already there. :-) + auto elQ = eventListener[key].Q; // adds an entry, if not already there. :-) LCK.unlock(); - const size_t size = el.Q.size(); + const size_t size = elQ->size(); if(size) { L << Logger::Debug << size << " events in queue for key " << to_log(key) << ":"; // fetch all elements from queue for(size_t i=0; ipop_front() }; const std::string obj_s = js::write( obj ); L << Logger::Debug << "\t#" << i << ": " << obj_s; @@ -409,7 +412,7 @@ js::Array JsonAdapter::Internal::pollForEvents(const EventListenerKey& key, unsi // block until there is at least one element or timeout L << Logger::Debug << "Queue for key " << to_log(key) << " is empty. I'll block for " << timeout_seconds << " seconds."; js::Object event; - const bool success = el.Q.try_pop_front( event, std::chrono::seconds(timeout_seconds) ); + const bool success = elQ->try_pop_front( event, std::chrono::seconds(timeout_seconds) ); if(success) { const std::string event_s = js::write(event);