diff --git a/server/json-adapter.cc b/server/json-adapter.cc index 34c6f75..10ceb31 100644 --- a/server/json-adapter.cc +++ b/server/json-adapter.cc @@ -17,6 +17,7 @@ #include "function_map.hh" #include "pep-types.hh" #include "json_rpc.hh" +#include "nulllogger.hh" #include "security-token.hh" #include "pep-utils.hh" @@ -326,7 +327,7 @@ void OnApiRequest(evhttp_request* req, void* obj) const std::string data_string(data.data(), data.data() + nr ); if(nr>0) { - std::cout << "\tData: «" << data_string << "»\n"; + ja->Log() << "\tData: «" << data_string << "»\n"; bool b = js::read( data_string, p); if(p.type() == js::obj_type) { @@ -336,14 +337,14 @@ void OnApiRequest(evhttp_request* req, void* obj) answer = make_error( JSON_RPC::PARSE_ERROR, "evbuffer_copyout does not return a JSON string. b=" + std::to_string(b), js::Value{data_string}, 42 ); } }else{ - std::cout << "\tError: " << nr << ".\n"; + ja->Log() << "\tError: " << nr << ".\n"; answer = make_error( JSON_RPC::INTERNAL_ERROR, "evbuffer_copyout returns negative value", p, request_id ); } } catch(const std::exception& e) { - std::cout << "\tException: \"" << e.what() << "\"\n"; + std::cerr << "\tException: \"" << e.what() << "\"\n"; answer = make_error( JSON_RPC::INTERNAL_ERROR, "Got a std::exception: \"" + std::string(e.what()) + "\"", p, request_id ); } @@ -439,19 +440,23 @@ struct JsonAdapter::Internal std::string token; std::map eventListener; + std::ostream& Log; unsigned start_port = 0; unsigned end_port = 0; unsigned port = 0; unsigned request_count = 0; evutil_socket_t sock = -1; bool running = false; + bool silent = false; ThreadPool threads; - - // Sync - locked_queue< sync_msg_t * > *sync_queue; - PEP_SESSION sync_session; - pthread_t sync_thread; - + + // Sync + locked_queue< sync_msg_t* >* sync_queue = nullptr; + PEP_SESSION sync_session; + pthread_t sync_thread; + + Internal(std::ostream& logger) : Log(logger) {} + static void requestDone(evhttp_request* req, void* userdata) { @@ -519,39 +524,40 @@ struct JsonAdapter::Internal return status; } - - int injectSyncMsg(void *msg) - { - sync_queue->push_back((sync_msg_t *)msg); - return 0; - } - - void *retrieveNextSyncMsg() - { - while (!sync_queue->size()) - // TODO: add blocking dequeue - usleep(100000); - - void *msg = sync_queue->front(); - sync_queue->pop_front(); - return msg; - } - - void *syncThreadRoutine(void *arg) - { - PEP_STATUS status = do_sync_protocol(sync_session, arg); - - while (sync_queue->size()) { - sync_msg_t *msg = sync_queue->front(); - sync_queue->pop_front(); - free_sync_msg(msg); - } - - return (void *) status; - } - + + int injectSyncMsg(void* msg) + { + sync_queue->push_back((sync_msg_t*)msg); + return 0; + } + + void* retrieveNextSyncMsg() + { + while (!sync_queue->size()) + // TODO: add blocking dequeue + usleep(100000); + + void* msg = sync_queue->front(); + sync_queue->pop_front(); + return msg; + } + + void* syncThreadRoutine(void* arg) + { + PEP_STATUS status = do_sync_protocol(sync_session, arg); + + while (sync_queue->size()) + { + sync_msg_t* msg = sync_queue->front(); + sync_queue->pop_front(); + free_sync_msg(msg); + } + + return (void*) status; + } }; + PEP_STATUS JsonAdapter::messageToSend(void* obj, message* msg) { JsonAdapter* ja = static_cast(obj); @@ -565,76 +571,82 @@ PEP_STATUS JsonAdapter::showHandshake(void* obj, pEp_identity* self, pEp_identit return ja->i->showHandshake(self, partner); } -int JsonAdapter::injectSyncMsg(void* obj, void *msg) + +int JsonAdapter::injectSyncMsg(void* obj, void* msg) { JsonAdapter* ja = static_cast(obj); return ja->i->injectSyncMsg(msg); } -void *JsonAdapter::retrieveNextSyncMsg(void* obj) + +void* JsonAdapter::retrieveNextSyncMsg(void* obj) { JsonAdapter* ja = static_cast(obj); return ja->i->retrieveNextSyncMsg(); } -void *JsonAdapter::syncThreadRoutine(void *arg) + +void* JsonAdapter::syncThreadRoutine(void* arg) { JsonAdapter* ja = static_cast(arg); return ja->i->syncThreadRoutine(arg); } -void JsonAdapter::startSync(void) + +void JsonAdapter::startSync() { - PEP_STATUS status = init(&i->sync_session); - if(status != PEP_STATUS_OK || i->sync_session==nullptr) - { - throw std::runtime_error("Cannot create sync session! status: " + status_to_string(status)); - } - - i->sync_queue = new locked_queue< sync_msg_t * >(); - - status = register_sync_callbacks(i->sync_session, - (void *) this, - JsonAdapter::messageToSend, - JsonAdapter::showHandshake, - JsonAdapter::injectSyncMsg, - JsonAdapter::retrieveNextSyncMsg); - if (status != PEP_STATUS_OK) - throw std::runtime_error("Cannot register sync callbacks! status: " + status_to_string(status)); - - if(pthread_create(&i->sync_thread, NULL, JsonAdapter::syncThreadRoutine, (void *) this) != 0) - throw std::runtime_error("Cannot create sync session thread !"); + PEP_STATUS status = init(&i->sync_session); + if(status != PEP_STATUS_OK || i->sync_session==nullptr) + { + throw std::runtime_error("Cannot create sync session! status: " + status_to_string(status)); + } + + i->sync_queue = new locked_queue< sync_msg_t* >(); + + status = register_sync_callbacks(i->sync_session, + (void*) this, + JsonAdapter::messageToSend, + JsonAdapter::showHandshake, + JsonAdapter::injectSyncMsg, + JsonAdapter::retrieveNextSyncMsg); + if (status != PEP_STATUS_OK) + throw std::runtime_error("Cannot register sync callbacks! status: " + status_to_string(status)); + + if(pthread_create(&i->sync_thread, NULL, JsonAdapter::syncThreadRoutine, (void*) this) != 0) + throw std::runtime_error("Cannot create sync session thread !"); } -void JsonAdapter::stopSync(void) -{ - i->sync_queue->push_front(NULL); - pthread_join(i->sync_thread, NULL); - - unregister_sync_callbacks(i->sync_session); - - delete i->sync_queue; - - release(i->sync_session); +void JsonAdapter::stopSync() +{ + i->sync_queue->push_front(NULL); + pthread_join(i->sync_thread, NULL); + + unregister_sync_callbacks(i->sync_session); + delete i->sync_queue; + i->sync_queue = nullptr; + + release(i->sync_session); } -JsonAdapter::JsonAdapter(const std::string& address, unsigned start_port, unsigned end_port) -: i(new Internal) + +JsonAdapter::JsonAdapter(const std::string& address, unsigned start_port, unsigned end_port, bool silent) +: i(new Internal( silent ? nulllogger : std::cerr )) { i->eventBase.reset(event_base_new()); if (!i->eventBase) throw std::runtime_error("Failed to create new base_event."); - i->evHttp.reset( evhttp_new(i->eventBase.get())); + i->evHttp.reset( evhttp_new(i->eventBase.get()) ); if (!i->evHttp) throw std::runtime_error("Failed to create new evhttp."); i->address = address; i->start_port = start_port; i->end_port = end_port; - - startSync(); + i->silent = silent; + + startSync(); } @@ -648,7 +660,7 @@ JsonAdapter::~JsonAdapter() void JsonAdapter::run() try { - std::cout << "I have " << session_registry.size() << " registered session(s).\n"; + Log() << "I have " << session_registry.size() << " registered session(s).\n"; std::exception_ptr initExcept; auto ThreadFunc = [&] () @@ -656,7 +668,7 @@ try try { const auto id = std::this_thread::get_id(); - std::cerr << " +++ Thread starts: isRun=" << i->running << ", id=" << id << ". +++\n"; + Log() << " +++ Thread starts: isRun=" << i->running << ", id=" << id << ". +++\n"; const auto q=session_registry.find(id); if(q==session_registry.end()) { @@ -668,14 +680,13 @@ try } session_registry.emplace( id, session); - std::cerr << "\tcreated new session for this thread: " << static_cast(session) << ".\n"; + Log() << "\tcreated new session for this thread: " << static_cast(session) << ".\n"; - status = attach_sync_session(session, i->sync_session); - if(status != PEP_STATUS_OK) - throw std::runtime_error("Cannot attach to sync session! status: " + status_to_string(status)); - + status = attach_sync_session(session, i->sync_session); + if(status != PEP_STATUS_OK) + throw std::runtime_error("Cannot attach to sync session! status: " + status_to_string(status)); }else{ - std::cerr << "\tsession for this thread: " << static_cast(q->second) << ".\n"; + Log() << "\tsession for this thread: " << static_cast(q->second) << ".\n"; } evhttp_set_cb(i->evHttp.get(), ApiRequestUrl.c_str() , OnApiRequest , this); @@ -685,7 +696,7 @@ try if (i->sock == -1) // no port bound, yet { // initialize the pEp engine - std::cout << "I have " << session_registry.size() << " registered session(s).\n"; + Log() << "I have " << session_registry.size() << " registered session(s).\n"; unsigned port_ofs = 0; try_next_port: @@ -708,7 +719,7 @@ try_next_port: i->port = i->start_port + port_ofs; i->token = create_security_token(i->address, i->port, BaseUrl); - std::cout << "Bound to port " << i->port << ", sec_token=\"" << i->token << "\"\n"; + Log() << "Bound to port " << i->port << ", sec_token=\"" << i->token << "\"\n"; } else { @@ -719,28 +730,32 @@ try_next_port: unsigned numnum = 1000000; while(i->running) { - event_base_loop(i->eventBase.get(), EVLOOP_NONBLOCK); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - std::cerr << "\r" << ++numnum << ". "; + // once we have libevent 2.1: + //event_base_loop(i->eventBase.get(), EVLOOP_NO_EXIT_ON_EMPTY); + + // for libevent 2.0: + event_base_loop(i->eventBase.get(), 0); + std::this_thread::sleep_for(std::chrono::milliseconds(333)); + Log() << "\r" << ++numnum << ". "; } } catch (const std::exception& e) { - std::cerr << " +++ std::exception in ThreadFunc: " << e.what() << "\n"; + Log() << " +++ std::exception in ThreadFunc: " << e.what() << "\n"; initExcept = std::current_exception(); } catch (...) { - std::cerr << " +++ UNKNOWN EXCEPTION in ThreadFunc +++ "; + Log() << " +++ UNKNOWN EXCEPTION in ThreadFunc +++ "; initExcept = std::current_exception(); } - std::cerr << " +++ Thread exit? isRun=" << i->running << ", id=" << std::this_thread::get_id() << ". +++\n"; + Log() << " +++ Thread exit? isRun=" << i->running << ", id=" << std::this_thread::get_id() << ". +++\n"; }; i->running = true; for(int t=0; tthreads.push_back(std::move(thread)); } - std::cout << "All " << SrvThreadCount << " thread(s) started.\n"; + Log() << "All " << SrvThreadCount << " thread(s) started.\n"; } catch (std::exception const &e) { - std::cerr << "Exception catched in main(): \"" << e.what() << "\"" << std::endl; + Log() << "Exception catched in main(): \"" << e.what() << "\"" << std::endl; + throw; } @@ -774,7 +790,7 @@ bool JsonAdapter::verify_security_token(const std::string& s) const { if(s!=i->token) { - std::cerr << "sec_token=\"" << i->token << "\" (len=" << i->token.size() << ") is unequal to \"" << s << "\" (len=" << s.size() << ")!\n"; + Log() << "sec_token=\"" << i->token << "\" (len=" << i->token.size() << ") is unequal to \"" << s << "\" (len=" << s.size() << ")!\n"; } return s == i->token; } @@ -808,3 +824,9 @@ void JsonAdapter::unregisterEventListener(const std::string& address, unsigned p i->eventListener.erase(q); } + +std::ostream& JsonAdapter::Log() const +{ + return i->Log; +} + diff --git a/server/json-adapter.hh b/server/json-adapter.hh index 2d36c5a..bd1042e 100644 --- a/server/json-adapter.hh +++ b/server/json-adapter.hh @@ -9,8 +9,9 @@ class JsonAdapter : public Context { public: // creates an instance of the JSON adapter. It tries to bind the first available port in the given range + // if chatty==true there is some debug output on stderr, if chatty==false the server is silent. // throws std::runtime_error if no port cannot be bound. - JsonAdapter(const std::string& address, unsigned start_port, unsigned end_port); + JsonAdapter(const std::string& address, unsigned start_port, unsigned end_port, bool chatty); // calls abort() on the instance if it is still running(). virtual ~JsonAdapter(); @@ -52,12 +53,15 @@ public: static PEP_STATUS messageToSend(void* obj, message* msg); static PEP_STATUS showHandshake(void* obj, pEp_identity* self, pEp_identity* partner); - static int injectSyncMsg(void* obj, void *msg); - static void *retrieveNextSyncMsg(void* obj); - static void *syncThreadRoutine(void *arg); - - void startSync(void); - void stopSync(void); + static int injectSyncMsg(void* obj, void* msg); + static void *retrieveNextSyncMsg(void* obj); + static void *syncThreadRoutine(void* arg); + + void startSync(void); + void stopSync(void); + + // returns the associated log stream (either std::cerr or nulllogger) + std::ostream& Log() const; private: struct Internal; diff --git a/server/main.cc b/server/main.cc index 708091f..f95da6e 100644 --- a/server/main.cc +++ b/server/main.cc @@ -1,27 +1,69 @@ #include "json-adapter.hh" #include +#include +namespace po = boost::program_options; - +bool debug_mode = false; std::string address = "127.0.0.1"; unsigned start_port = 4223; unsigned end_port = 9999; +void print_version() +{ + std::cout << "pEp JSON Adapter.\n" + "\tversion \"" << JsonAdapter::version() << "\"\n" + "\tAPI version " << JsonAdapter::apiVersion() << "\n" + "\tpEpEngine version " << get_engine_version() << "\n" + "\n"; +} + int main(int argc, char** argv) try { - JsonAdapter ja( address, start_port, end_port ); + po::options_description desc("Program options for the JSON Server Adapter"); + desc.add_options() + ("help,h", "print this help messages") + ("version,v", "print program version") + ("debug,d", po::value(&debug_mode)->default_value(false), "Run in debug mode, don't fork() in background") + ("start-port,s", po::value(&start_port)->default_value(start_port), "First port to bind on") + ("end-port,e", po::value(&end_port)->default_value(end_port), "Last port to bind on") + ("address,a", po::value(&address)->default_value(address), "Address to bind on") + ; - ja.run(); + po::variables_map vm; + po::store(po::parse_command_line(argc, argv, desc), vm); + po::notify(vm); - int input = 0; - do{ - std::cout << "Press to quit." << std::endl; - input = std::cin.get(); - std::cout << "Oh, I got a '" << input << "'. \n"; - }while(input != 'q' && input != 'Q'); + if (vm.count("help")) + { + std::cout << desc << "\n"; + return 0; + } + if (vm.count("version")) + { + print_version(); + return 0; + } + + if( vm.count("debug")) + { + JsonAdapter ja( address, start_port, end_port, !debug_mode ); + ja.run(); + + int input = 0; + do{ + std::cout << "Press to quit." << std::endl; + input = std::cin.get(); + std::cout << "Oh, I got a '" << input << "'. \n"; + }while(input != 'q' && input != 'Q'); + + ja.shutdown(nullptr); + std::cout << "Good bye. :-)" << std::endl; + }else{ - ja.shutdown(nullptr); - std::cout << "Good bye. :-)" << std::endl; + + + } } catch (std::exception const &e) {