@ -13,19 +13,18 @@
# include <functional>
# include <tuple>
# include <mutex>
# include "json-adapter.hh"
# include "daemonize.hh"
# include "p e p-types.hh"
# include "p E p-types.hh"
# include "json_rpc.hh"
# include "security-token.hh"
# include "p e p-utils.hh"
# include "p E p-utils.hh"
# include "ev_server.hh"
# include "logger.hh"
# include "server_version.hh"
# include <pEp/keymanagement.h>
# include <pEp/call_with_lock.hh>
# include <pEp/status_to_string.hh> // from libpEpAdapter.
# include <pEp/locked_queue.hh>
@ -74,35 +73,14 @@ auto ThreadDeleter = [](std::thread* t)
typedef std : : unique_ptr < std : : thread , decltype ( ThreadDeleter ) > ThreadPtr ;
typedef std : : vector < ThreadPtr > ThreadPool ;
// keyserver lookup
utility : : locked_queue < pEp_identity * , & free_identity > keyserver_lookup_queue ;
PEP_SESSION keyserver_lookup_session = nullptr ; // FIXME: what if another adapter started it already?
ThreadPtr keyserver_lookup_thread { nullptr , ThreadDeleter } ;
std : : mutex js_mutex ;
// TODO: use && and std::forward<> to avoid copying of the arguments.
// It is not relevant, yet, because at the moment we use this function template only
// for init() and release() which have cheap-to-copy pointer parameters only
template < class R , class . . . Args >
R call_with_lock ( R ( * fn ) ( Args . . . ) , Args . . . args )
{
std : : lock_guard < std : : mutex > L ( js_mutex ) ;
return fn ( args . . . ) ;
}
// *sigh* necessary because messageToSend() has no obj pointer anymore. :-(
JsonAdapter * ja_singleton = 0 ;
JsonAdapter * ja_singleton = nullptr ;
inject_sync_event_t sync_fn = nullptr ; // *sigh* ugly, but the Engine's API requires it.
} // end of anonymous namespace
PEP_SESSION JsonAdapter : : first_session = nullptr ;
typedef std : : pair < std : : string , unsigned > EventListenerKey ;
struct EventListenerValue
@ -127,7 +105,6 @@ struct JsonAdapter::Internal
unsigned port = 0 ;
unsigned request_count = 0 ;
evutil_socket_t sock = - 1 ;
bool shall_sync = false ; // just hold the value from config/command line.
bool running = false ;
bool silent = false ;
bool ignore_session_error = false ;
@ -135,12 +112,6 @@ struct JsonAdapter::Internal
ThreadPool threads ;
PEP_SESSION session = nullptr ;
// Sync
utility : : locked_queue < Sync_event * , & free_Sync_event > sync_queue ;
PEP_SESSION sync_session = nullptr ;
ThreadPtr sync_thread { nullptr , ThreadDeleter } ;
explicit Internal ( )
: Log ( " JAI " )
{ }
@ -150,14 +121,11 @@ struct JsonAdapter::Internal
~ Internal ( )
{
stopSync ( ) ;
if ( session )
call_with_lock( & release , session ) ;
pEp: : call_with_lock( & release , session ) ;
session = nullptr ;
}
void stopSync ( ) ;
static
void requestDone ( evhttp_request * req , void * userdata )
{
@ -211,58 +179,7 @@ struct JsonAdapter::Internal
addToArray ( param_array , params . . . ) ;
return makeAndDeliverRequest ( msg_name , param_array ) ;
}
int injectSyncMsg ( Sync_event * msg )
{
sync_queue . push_back ( msg ) ;
return 0 ;
}
int injectIdentity ( pEp_identity * idy )
{
keyserver_lookup_queue . push_back ( idy ) ;
return 0 ;
}
Sync_event * retrieveNextSyncMsg ( unsigned timeout )
{
Sync_event * msg = nullptr ;
if ( timeout )
{
const bool success = sync_queue . try_pop_front ( msg , std : : chrono : : seconds ( timeout ) ) ;
if ( ! success )
{
// this is timeout occurrence
return new_sync_timeout_event ( ) ;
}
} else {
msg = sync_queue . pop_front ( ) ;
}
return msg ;
}
pEp_identity * retrieveNextIdentity ( )
{
return keyserver_lookup_queue . pop_front ( ) ;
}
void * syncThreadRoutine ( void * arg )
{
PEP_STATUS status = call_with_lock ( & init , & sync_session , & JsonAdapter : : messageToSend , & JsonAdapter : : injectSyncMsg ) ;
if ( status ! = PEP_STATUS_OK )
throw std : : runtime_error ( " Cannot init sync_session! status: " + : : pEp : : status_to_string ( status ) ) ;
status = register_sync_callbacks ( sync_session ,
( void * ) this ,
JsonAdapter : : notifyHandshake ,
JsonAdapter : : retrieveNextSyncMsg ) ;
if ( status ! = PEP_STATUS_OK )
throw std : : runtime_error ( " Cannot register sync callbacks! status: " + : : pEp : : status_to_string ( status ) ) ;
status = do_sync_protocol ( sync_session , arg ) ; // does the whole work
sync_queue . clear ( ) ; // remove remaining messages
return ( void * ) status ;
}
} ;
@ -356,141 +273,11 @@ PEP_STATUS JsonAdapter::notifyHandshake(pEp_identity* self, pEp_identity* partne
}
// BEWARE: msg is 1st parameter, obj is second!!!
int JsonAdapter : : injectSyncMsg ( Sync_event * msg , void * obj )
{
// JsonAdapter* ja = static_cast<JsonAdapter*>(obj);
return ja_singleton - > i - > injectSyncMsg ( msg ) ;
}
Sync_event * JsonAdapter : : retrieveNextSyncMsg ( void * obj , unsigned timeout )
{
// JsonAdapter* ja = static_cast<JsonAdapter*>(obj);
ja_singleton - > check_guard ( ) ;
return ja_singleton - > i - > retrieveNextSyncMsg ( timeout ) ;
}
void * JsonAdapter : : syncThreadRoutine ( void * arg )
{
// JsonAdapter* ja = static_cast<JsonAdapter*>(arg);
return ja_singleton - > i - > syncThreadRoutine ( arg ) ;
}
void JsonAdapter : : startSync ( )
{
check_guard ( ) ;
i - > sync_queue . clear ( ) ;
// status = attach_sync_session(i->session, i->sync_session);
// if(status != PEP_STATUS_OK)
// throw std::runtime_error("Cannot attach to sync session! status: " + status_to_string(status));
i - > sync_thread . reset ( new std : : thread ( JsonAdapter : : syncThreadRoutine , ( void * ) this ) ) ;
}
void JsonAdapter : : stopSync ( )
{
check_guard ( ) ;
i - > stopSync ( ) ;
}
void JsonAdapter : : Internal : : stopSync ( )
{
// No sync session active
if ( sync_session = = nullptr )
return ;
sync_queue . push_front ( NULL ) ;
sync_thread - > join ( ) ;
unregister_sync_callbacks ( sync_session ) ;
sync_queue . clear ( ) ;
call_with_lock ( & release , sync_session ) ;
sync_session = nullptr ;
}
void JsonAdapter : : startKeyserverLookup ( )
{
if ( keyserver_lookup_session )
throw std : : runtime_error ( " KeyserverLookup already started. " ) ;
PEP_STATUS status = call_with_lock ( & init , & keyserver_lookup_session , & JsonAdapter : : messageToSend , & JsonAdapter : : injectSyncMsg ) ;
if ( status ! = PEP_STATUS_OK | | keyserver_lookup_session = = nullptr )
{
throw std : : runtime_error ( " Cannot create keyserver lookup session! status: " + : : pEp : : status_to_string ( status ) ) ;
}
keyserver_lookup_queue . clear ( ) ;
status = register_examine_function ( keyserver_lookup_session ,
JsonAdapter : : examineIdentity ,
& keyserver_lookup_session // nullptr is not accepted, so any dummy ptr is used here
) ;
if ( status ! = PEP_STATUS_OK )
throw std : : runtime_error ( " Cannot register keyserver lookup callbacks! status: " + : : pEp : : status_to_string ( status ) ) ;
keyserver_lookup_thread . reset ( new std : : thread ( JsonAdapter : : keyserverLookupThreadRoutine , & keyserver_lookup_session /* just a dummy */ ) ) ;
}
void JsonAdapter : : stopKeyserverLookup ( )
{
// No keyserver lookup session active
if ( keyserver_lookup_session = = nullptr )
return ;
keyserver_lookup_queue . push_front ( NULL ) ;
keyserver_lookup_thread - > join ( ) ;
// there is no unregister_examine_callback() function. hum...
keyserver_lookup_queue . clear ( ) ;
call_with_lock ( & release , keyserver_lookup_session ) ;
keyserver_lookup_session = nullptr ;
}
int JsonAdapter : : examineIdentity ( pEp_identity * idy , void * obj )
{
// JsonAdapter* ja = static_cast<JsonAdapter*>(obj);
return ja_singleton - > i - > injectIdentity ( idy ) ;
}
pEp_identity * JsonAdapter : : retrieveNextIdentity ( void * obj )
{
// JsonAdapter* ja = static_cast<JsonAdapter*>(obj);
return ja_singleton - > i - > retrieveNextIdentity ( ) ;
}
void * JsonAdapter : : keyserverLookupThreadRoutine ( void * arg )
{
PEP_STATUS status = do_keymanagement (
& JsonAdapter : : retrieveNextIdentity ,
arg ) ; // does the whole work
keyserver_lookup_queue . clear ( ) ;
return ( void * ) status ;
}
JsonAdapter : : JsonAdapter ( )
: guard_0 ( Guard_0 )
, i ( new Internal { } )
, guard_1 ( Guard_1 )
{
if ( ! ja_singleton )
{
ja_singleton = this ;
}
i - > eventBase . reset ( event_base_new ( ) ) ;
if ( ! i - > eventBase )
throw std : : runtime_error ( " Failed to create new base_event. " ) ;
@ -505,7 +292,6 @@ JsonAdapter::~JsonAdapter()
{
check_guard ( ) ;
Log ( ) < < " ~JsonAdapter(): " < < session_registry . size ( ) < < " sessions registered. " ;
stopSync ( ) ;
this - > shutdown ( nullptr ) ;
Log ( ) < < " \t After stopSync() and shutdown() there are " < < session_registry . size ( ) < < " sessions registered. " ;
delete i ;
@ -513,14 +299,6 @@ JsonAdapter::~JsonAdapter()
}
JsonAdapter & JsonAdapter : : do_sync ( bool _do_sync )
{
check_guard ( ) ;
i - > shall_sync = _do_sync ;
return * this ;
}
JsonAdapter & JsonAdapter : : ignore_session_errors ( bool _ig )
{
check_guard ( ) ;
@ -544,22 +322,6 @@ void JsonAdapter::prepare_run(const std::string& address, unsigned start_port, u
i - > start_port = start_port ;
i - > end_port = end_port ;
if ( first_session = = nullptr ) // okay, we are the 1st:
{
// create a dummy session just to see whether the Engine is functional.
// reason: here we still can log errors to stderr, because prepare_run() is called before daemonize().
PEP_STATUS status = call_with_lock ( & init , & first_session , & JsonAdapter : : messageToSend , & JsonAdapter : : injectSyncMsg ) ;
if ( status ! = PEP_STATUS_OK | | first_session = = nullptr )
{
const std : : string error_msg = " Cannot create first session! PEP_STATUS: " + : : pEp : : status_to_string ( status ) + " . " ;
std : : cerr < < error_msg < < std : : endl ; // Log to stderr intentionally, so Enigmail can grab that error message easily.
if ( ! i - > ignore_session_error )
{
throw std : : runtime_error ( error_msg ) ;
}
}
}
Log ( ) < < " ThreadFunc: thread id " < < std : : this_thread : : get_id ( ) < < " . \n Registry: " < < to_string ( session_registry ) ;
unsigned port_ofs = 0 ;
@ -598,7 +360,7 @@ void JsonAdapter::threadFunc()
if ( q = = session_registry . end ( ) )
{
i - > session = nullptr ;
PEP_STATUS status = call_with_lock( & init , & i - > session , & JsonAdapter : : messageToSend , & JsonAdapter : : injectSyncMsg ) ; // release(session) in ThreadDeleter
PEP_STATUS status = pEp: : call_with_lock( & init , & i - > session , & JsonAdapter : : messageToSend , sync_fn ) ; // release(session) in ThreadDeleter
if ( status ! = PEP_STATUS_OK | | i - > session = = nullptr )
{
const std : : string error_msg = " Cannot create session! PEP_STATUS: " + : : pEp : : status_to_string ( status ) + " . " ;
@ -611,11 +373,6 @@ void JsonAdapter::threadFunc()
session_registry . emplace ( id , this ) ;
L < < Logger : : Info < < " \t created new session for this thread: " < < static_cast < void * > ( i - > session ) < < " . " ;
if ( i - > shall_sync & & i - > session ) // startSync() does not make sense without session.
{
L < < Logger : : Info < < " \t startSync()... " ;
startSync ( ) ;
}
} else {
L < < Logger : : Info < < " \t session for this thread: " < < static_cast < void * > ( q - > second ) < < " . " ;
}
@ -632,7 +389,7 @@ void JsonAdapter::threadFunc()
if ( i - > deliver_html )
{
evhttp_set_cb ( evHttp . get ( ) , " /p e p_functions.js" , ev_server : : OnGetFunctions , this ) ;
evhttp_set_cb ( evHttp . get ( ) , " /p E p_functions.js" , ev_server : : OnGetFunctions , this ) ;
evhttp_set_gencb ( evHttp . get ( ) , ev_server : : OnOtherRequest , nullptr ) ;
}
@ -814,10 +571,27 @@ void JsonAdapter::check_guard() const
}
void JsonAdapter : : global_shutdown ( )
std : : recursive_mutex get_instance_mutex ;
JsonAdapter & JsonAdapter : : getInstance ( )
{
std : : lock_guard < std : : recursive_mutex > L ( get_instance_mutex ) ;
if ( ! ja_singleton )
{
ja_singleton = new JsonAdapter ( ) ;
}
return * ja_singleton ;
}
JsonAdapter & JsonAdapter : : startup ( inject_sync_event_t se )
{
call_with_lock ( & release , JsonAdapter : : first_session ) ;
JsonAdapter : : first_session = nullptr ;
sync_fn = se ;
JsonAdapter & ja = getInstance ( ) ;
ja . prepare_run ( " 127.0.0.1 " , 4223 , 9999 ) ;
ja . run ( ) ;
return ja ;
}