clean up, do not clear whole registry when one instance dies

JSON-28
Roker 6 years ago
parent 5bdad99a8a
commit 48d00be9df

@ -96,7 +96,7 @@ const std::string server_version =
"(28) Olpe-Süd"; // add re_evaluate_message_rating(). Jira: JSON-29
typedef std::map<std::thread::id, PEP_SESSION> SessionRegistry;
typedef std::map<std::thread::id, JsonAdapter*> SessionRegistry;
SessionRegistry session_registry;
@ -428,55 +428,13 @@ void OnApiRequest(evhttp_request* req, void* obj)
} // end of anonymous namespace
std::string getSessions()
{
js::Array a;
a.reserve(session_registry.size());
for(const auto& s : session_registry)
{
std::stringstream ss;
js::Object o;
ss << s.first;
o.emplace_back("tid", ss.str() );
ss.str("");
ss << static_cast<void*>(s.second);
o.emplace_back("session", ss.str() );
if(s.first == std::this_thread::get_id())
{
o.emplace_back("mine", true);
}
a.push_back( std::move(o) );
}
return js::write( a, js::pretty_print | js::raw_utf8 | js::single_line_arrays );
}
template<>
PEP_SESSION from_json(const js::Value& /* not used */)
{
const auto id = std::this_thread::get_id();
const auto q = session_registry.find( id );
if(q == session_registry.end())
{
std::stringstream ss;
ss << "There is no SESSION for this thread (" << id << ")!";
throw std::logic_error( ss.str() );
}
return q->second;
}
std::string JsonAdapter::version()
{
return server_version;
}
typedef std::pair<std::string, unsigned> EventListenerKey;
unsigned JsonAdapter::apiVersion()
struct EventListenerValue
{
return API_VERSION;
}
std::string securityContext;
std::unique_ptr<evhttp_connection, decltype(&evhttp_connection_free)> connection = { nullptr, &evhttp_connection_free};
};
auto ThreadDeleter = [](std::thread* t)
@ -485,24 +443,16 @@ auto ThreadDeleter = [](std::thread* t)
const auto q = session_registry.find( id );
if(q != session_registry.end())
{
detach_sync_session(q->second);
release(q->second);
session_registry.erase( q );
delete q->second;
}
delete t;
};
typedef std::unique_ptr<std::thread, decltype(ThreadDeleter)> ThreadPtr;
typedef std::vector<ThreadPtr> ThreadPool;
typedef std::pair<std::string, unsigned> EventListenerKey;
struct EventListenerValue
{
std::string securityContext;
std::unique_ptr<evhttp_connection, decltype(&evhttp_connection_free)> connection = { nullptr, &evhttp_connection_free};
};
struct JsonAdapter::Internal
{
@ -521,6 +471,7 @@ struct JsonAdapter::Internal
bool running = false;
bool silent = false;
ThreadPool threads;
PEP_SESSION session = nullptr;
// Sync
locked_queue< sync_msg_t*, &free_sync_msg> sync_queue;
@ -532,7 +483,18 @@ struct JsonAdapter::Internal
PEP_SESSION keyserver_lookup_session = nullptr;
ThreadPtr keyserver_lookup_thread{nullptr, ThreadDeleter};
Internal(std::ostream& logger) : Log(logger) {}
explicit Internal(std::ostream& logger) : Log(logger) {}
Internal(const Internal&) = delete;
void operator=(const Internal&) = delete;
~Internal()
{
stopSync();
release(session);
session=nullptr;
}
void stopSync();
static
void requestDone(evhttp_request* req, void* userdata)
@ -583,9 +545,9 @@ struct JsonAdapter::Internal
template<class... Params>
PEP_STATUS makeAndDeliverRequest2(const char* msg_name, Params&&... params)
{
js::Array param;
addToArray( param, params...);
return makeAndDeliverRequest(msg_name, param);
js::Array param_array;
addToArray( param_array, params...);
return makeAndDeliverRequest(msg_name, param_array);
}
int injectSyncMsg(void* msg)
@ -645,6 +607,60 @@ struct JsonAdapter::Internal
};
std::string getSessions()
{
js::Array a;
a.reserve(session_registry.size());
for(const auto& s : session_registry)
{
std::stringstream ss;
js::Object o;
ss << s.first;
o.emplace_back("tid", ss.str() );
ss.str("");
ss << static_cast<void*>(s.second);
o.emplace_back("session", ss.str() );
if(s.first == std::this_thread::get_id())
{
o.emplace_back("mine", true);
}
a.push_back( std::move(o) );
}
return js::write( a, js::pretty_print | js::raw_utf8 | js::single_line_arrays );
}
template<>
PEP_SESSION from_json(const js::Value& /* not used */)
{
const auto id = std::this_thread::get_id();
const auto q = session_registry.find( id );
if(q == session_registry.end())
{
std::stringstream ss;
ss << "There is no SESSION for this thread (" << id << ")!";
throw std::logic_error( ss.str() );
}
return q->second->i->session;
}
std::string JsonAdapter::version()
{
return server_version;
}
unsigned JsonAdapter::apiVersion()
{
return API_VERSION;
}
PEP_STATUS JsonAdapter::messageToSend(void* obj, message* msg)
{
JsonAdapter* ja = static_cast<JsonAdapter*>(obj);
@ -689,6 +705,10 @@ void JsonAdapter::startSync()
throw std::runtime_error("Cannot create sync session! status: " + status_to_string(status));
}
status = attach_sync_session(i->session, i->sync_session);
if(status != PEP_STATUS_OK)
throw std::runtime_error("Cannot attach to sync session! status: " + status_to_string(status));
i->sync_queue.clear();
status = register_sync_callbacks(i->sync_session,
@ -705,19 +725,25 @@ void JsonAdapter::startSync()
void JsonAdapter::stopSync()
{
i->stopSync();
}
void JsonAdapter::Internal::stopSync()
{
// No sync session active
if(i->sync_session == nullptr)
if(sync_session == nullptr)
return
i->sync_queue.push_front(NULL);
i->sync_thread->join();
sync_queue.push_front(NULL);
sync_thread->join();
unregister_sync_callbacks(i->sync_session);
i->sync_queue.clear();
unregister_sync_callbacks(sync_session);
sync_queue.clear();
release(i->sync_session);
i->sync_session = nullptr;
release(sync_session);
sync_session = nullptr;
}
@ -805,15 +831,10 @@ JsonAdapter::~JsonAdapter()
{
Log() << "~JsonAdapter(): " << session_registry.size() << " sessions registered.\n";
stopSync();
shutdown(nullptr);
Log() << "\t After stopSync() and shutdown() there are " << session_registry.size() << " sessions registered.\n";
for(auto& s : session_registry)
{
release(s.second);
}
session_registry.clear();
this->shutdown(nullptr);
delete i;
i=nullptr;
Log() << "\t After stopSync() and shutdown() there are " << session_registry.size() << " sessions registered.\n";
}
@ -821,6 +842,7 @@ void JsonAdapter::run()
try
{
Log() << "I have " << session_registry.size() << " registered session(s).\n";
JsonAdapter* ja = this;
std::exception_ptr initExcept;
auto ThreadFunc = [&] ()
@ -832,19 +854,16 @@ try
const auto q=session_registry.find(id);
if(q==session_registry.end())
{
PEP_SESSION session = nullptr;
PEP_STATUS status = init(&session); // release(session) in ThreadDeleter
if(status != PEP_STATUS_OK || session==nullptr)
i->session = nullptr;
PEP_STATUS status = init(&i->session); // release(session) in ThreadDeleter
if(status != PEP_STATUS_OK || i->session==nullptr)
{
throw std::runtime_error("Cannot create session! status: " + status_to_string(status));
}
session_registry.emplace( id, session);
Log() << "\tcreated new session for this thread: " << static_cast<void*>(session) << ".\n";
session_registry.emplace(id, ja);
Log() << "\tcreated new session for this thread: " << static_cast<void*>(i->session) << ".\n";
status = attach_sync_session(session, i->sync_session);
if(status != PEP_STATUS_OK)
throw std::runtime_error("Cannot attach to sync session! status: " + status_to_string(status));
}else{
Log() << "\tsession for this thread: " << static_cast<void*>(q->second) << ".\n";
}

@ -76,7 +76,7 @@ public:
// returns the associated log stream (either std::cerr or nulllogger)
std::ostream& Log() const;
private:
//private:
struct Internal;
Internal* i; // pimpl for stable interface.
};

Loading…
Cancel
Save