|
|
|
@ -70,7 +70,7 @@ typedef variant<std::thread::id, std::string> EventListenerKey;
|
|
|
|
|
|
|
|
|
|
struct EventListenerValue
|
|
|
|
|
{
|
|
|
|
|
utility::locked_queue<js::Object> Q;
|
|
|
|
|
std::shared_ptr<utility::locked_queue<js::Object>> Q = std::make_shared<utility::locked_queue<js::Object>>();;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
static std::hash<std::thread::id> hash_tid;
|
|
|
|
@ -144,8 +144,8 @@ struct JsonAdapter::Internal
|
|
|
|
|
|
|
|
|
|
for(auto& e : eventListener)
|
|
|
|
|
{
|
|
|
|
|
Log << Logger::Debug << " ~~~ " << to_log(e.first) << " has " << e.second.Q.size() << " old events waiting.";
|
|
|
|
|
e.second.Q.push_back(request);
|
|
|
|
|
Log << Logger::Debug << " ~~~ " << to_log(e.first) << " has " << e.second.Q->size() << " old events waiting.";
|
|
|
|
|
e.second.Q->push_back(request);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -305,13 +305,15 @@ catch (std::exception const &e)
|
|
|
|
|
void JsonAdapter::connection_close_cb()
|
|
|
|
|
{
|
|
|
|
|
Lock L{_mtx};
|
|
|
|
|
auto q = i->eventListener.find( std::this_thread::get_id() );
|
|
|
|
|
Log() << "Connection Close Callback: " << (q==i->eventListener.end() ? "NO" : "1") << " entry in eventListener map";
|
|
|
|
|
const auto tid = std::this_thread::get_id();
|
|
|
|
|
auto q = i->eventListener.find( tid );
|
|
|
|
|
i->Log << Logger::Debug << "Connection Close Callback: " << (q==i->eventListener.end() ? "NO" : "1") << " entry in eventListener map for Thread " << tid << ".";
|
|
|
|
|
if(q != i->eventListener.end())
|
|
|
|
|
{
|
|
|
|
|
while(q->second.Q.waiting() > 0)
|
|
|
|
|
i->Log.debug("%d listener(s) waiting on event queue", q->second.Q->waiting());
|
|
|
|
|
while(q->second.Q->waiting() > 0)
|
|
|
|
|
{
|
|
|
|
|
q->second.Q.push_back(queue_close_event);
|
|
|
|
|
q->second.Q->push_back(queue_close_event);
|
|
|
|
|
std::this_thread::sleep_for( std::chrono::milliseconds(333) );
|
|
|
|
|
}
|
|
|
|
|
i->eventListener.erase(q);
|
|
|
|
@ -323,12 +325,13 @@ void JsonAdapter::close_session(const std::string& session_id)
|
|
|
|
|
{
|
|
|
|
|
Lock L{_mtx};
|
|
|
|
|
auto q = i->eventListener.find( session_id );
|
|
|
|
|
Log() << "Close session \"" << session_id << "\": " << (q==i->eventListener.end() ? "NO" : "1") << " entry in eventListener map";
|
|
|
|
|
i->Log << Logger::Debug << "Close session \"" << session_id << "\": " << (q==i->eventListener.end() ? "NO" : "1") << " entry in eventListener map for session_id \"" << session_id << "\".";
|
|
|
|
|
if(q != i->eventListener.end())
|
|
|
|
|
{
|
|
|
|
|
while(q->second.Q.waiting() > 0)
|
|
|
|
|
i->Log.debug("%d listener(s) waiting on event queue", q->second.Q->waiting());
|
|
|
|
|
while(q->second.Q->waiting() > 0)
|
|
|
|
|
{
|
|
|
|
|
q->second.Q.push_back(queue_close_event);
|
|
|
|
|
q->second.Q->push_back(queue_close_event);
|
|
|
|
|
std::this_thread::sleep_for( std::chrono::milliseconds(333) );
|
|
|
|
|
}
|
|
|
|
|
i->eventListener.erase(q);
|
|
|
|
@ -389,17 +392,17 @@ js::Array JsonAdapter::Internal::pollForEvents(const EventListenerKey& key, unsi
|
|
|
|
|
Logger L("JAI:poll");
|
|
|
|
|
|
|
|
|
|
Lock LCK{_mtx};
|
|
|
|
|
EventListenerValue& el = eventListener[key]; // adds an entry, if not already there. :-)
|
|
|
|
|
auto elQ = eventListener[key].Q; // adds an entry, if not already there. :-)
|
|
|
|
|
LCK.unlock();
|
|
|
|
|
|
|
|
|
|
const size_t size = el.Q.size();
|
|
|
|
|
const size_t size = elQ->size();
|
|
|
|
|
if(size)
|
|
|
|
|
{
|
|
|
|
|
L << Logger::Debug << size << " events in queue for key " << to_log(key) << ":";
|
|
|
|
|
// fetch all elements from queue
|
|
|
|
|
for(size_t i=0; i<size; ++i)
|
|
|
|
|
{
|
|
|
|
|
js::Object obj{ el.Q.pop_front() };
|
|
|
|
|
js::Object obj{ elQ->pop_front() };
|
|
|
|
|
const std::string obj_s = js::write( obj );
|
|
|
|
|
L << Logger::Debug << "\t#" << i << ": " << obj_s;
|
|
|
|
|
|
|
|
|
@ -409,7 +412,7 @@ js::Array JsonAdapter::Internal::pollForEvents(const EventListenerKey& key, unsi
|
|
|
|
|
// block until there is at least one element or timeout
|
|
|
|
|
L << Logger::Debug << "Queue for key " << to_log(key) << " is empty. I'll block for " << timeout_seconds << " seconds.";
|
|
|
|
|
js::Object event;
|
|
|
|
|
const bool success = el.Q.try_pop_front( event, std::chrono::seconds(timeout_seconds) );
|
|
|
|
|
const bool success = elQ->try_pop_front( event, std::chrono::seconds(timeout_seconds) );
|
|
|
|
|
if(success)
|
|
|
|
|
{
|
|
|
|
|
const std::string event_s = js::write(event);
|
|
|
|
|