add command line options, make json-adapter quit per default

JSON-15
Roker 7 years ago
parent a303d12b2c
commit c9df333a9d

@ -17,6 +17,7 @@
#include "function_map.hh"
#include "pep-types.hh"
#include "json_rpc.hh"
#include "nulllogger.hh"
#include "security-token.hh"
#include "pep-utils.hh"
@ -326,7 +327,7 @@ void OnApiRequest(evhttp_request* req, void* obj)
const std::string data_string(data.data(), data.data() + nr );
if(nr>0)
{
std::cout << "\tData: «" << data_string << "»\n";
ja->Log() << "\tData: «" << data_string << "»\n";
bool b = js::read( data_string, p);
if(p.type() == js::obj_type)
{
@ -336,14 +337,14 @@ void OnApiRequest(evhttp_request* req, void* obj)
answer = make_error( JSON_RPC::PARSE_ERROR, "evbuffer_copyout does not return a JSON string. b=" + std::to_string(b), js::Value{data_string}, 42 );
}
}else{
std::cout << "\tError: " << nr << ".\n";
ja->Log() << "\tError: " << nr << ".\n";
answer = make_error( JSON_RPC::INTERNAL_ERROR, "evbuffer_copyout returns negative value", p, request_id );
}
}
catch(const std::exception& e)
{
std::cout << "\tException: \"" << e.what() << "\"\n";
std::cerr << "\tException: \"" << e.what() << "\"\n";
answer = make_error( JSON_RPC::INTERNAL_ERROR, "Got a std::exception: \"" + std::string(e.what()) + "\"", p, request_id );
}
@ -439,19 +440,23 @@ struct JsonAdapter::Internal
std::string token;
std::map<EventListenerKey, EventListenerValue> eventListener;
std::ostream& Log;
unsigned start_port = 0;
unsigned end_port = 0;
unsigned port = 0;
unsigned request_count = 0;
evutil_socket_t sock = -1;
bool running = false;
bool silent = false;
ThreadPool threads;
// Sync
locked_queue< sync_msg_t * > *sync_queue;
PEP_SESSION sync_session;
pthread_t sync_thread;
// Sync
locked_queue< sync_msg_t* >* sync_queue = nullptr;
PEP_SESSION sync_session;
pthread_t sync_thread;
Internal(std::ostream& logger) : Log(logger) {}
static
void requestDone(evhttp_request* req, void* userdata)
{
@ -519,39 +524,40 @@ struct JsonAdapter::Internal
return status;
}
int injectSyncMsg(void *msg)
{
sync_queue->push_back((sync_msg_t *)msg);
return 0;
}
void *retrieveNextSyncMsg()
{
while (!sync_queue->size())
// TODO: add blocking dequeue
usleep(100000);
void *msg = sync_queue->front();
sync_queue->pop_front();
return msg;
}
void *syncThreadRoutine(void *arg)
{
PEP_STATUS status = do_sync_protocol(sync_session, arg);
while (sync_queue->size()) {
sync_msg_t *msg = sync_queue->front();
sync_queue->pop_front();
free_sync_msg(msg);
}
return (void *) status;
}
int injectSyncMsg(void* msg)
{
sync_queue->push_back((sync_msg_t*)msg);
return 0;
}
void* retrieveNextSyncMsg()
{
while (!sync_queue->size())
// TODO: add blocking dequeue
usleep(100000);
void* msg = sync_queue->front();
sync_queue->pop_front();
return msg;
}
void* syncThreadRoutine(void* arg)
{
PEP_STATUS status = do_sync_protocol(sync_session, arg);
while (sync_queue->size())
{
sync_msg_t* msg = sync_queue->front();
sync_queue->pop_front();
free_sync_msg(msg);
}
return (void*) status;
}
};
PEP_STATUS JsonAdapter::messageToSend(void* obj, message* msg)
{
JsonAdapter* ja = static_cast<JsonAdapter*>(obj);
@ -565,76 +571,82 @@ PEP_STATUS JsonAdapter::showHandshake(void* obj, pEp_identity* self, pEp_identit
return ja->i->showHandshake(self, partner);
}
int JsonAdapter::injectSyncMsg(void* obj, void *msg)
int JsonAdapter::injectSyncMsg(void* obj, void* msg)
{
JsonAdapter* ja = static_cast<JsonAdapter*>(obj);
return ja->i->injectSyncMsg(msg);
}
void *JsonAdapter::retrieveNextSyncMsg(void* obj)
void* JsonAdapter::retrieveNextSyncMsg(void* obj)
{
JsonAdapter* ja = static_cast<JsonAdapter*>(obj);
return ja->i->retrieveNextSyncMsg();
}
void *JsonAdapter::syncThreadRoutine(void *arg)
void* JsonAdapter::syncThreadRoutine(void* arg)
{
JsonAdapter* ja = static_cast<JsonAdapter*>(arg);
return ja->i->syncThreadRoutine(arg);
}
void JsonAdapter::startSync(void)
{
PEP_STATUS status = init(&i->sync_session);
if(status != PEP_STATUS_OK || i->sync_session==nullptr)
{
throw std::runtime_error("Cannot create sync session! status: " + status_to_string(status));
}
i->sync_queue = new locked_queue< sync_msg_t * >();
status = register_sync_callbacks(i->sync_session,
(void *) this,
JsonAdapter::messageToSend,
JsonAdapter::showHandshake,
JsonAdapter::injectSyncMsg,
JsonAdapter::retrieveNextSyncMsg);
if (status != PEP_STATUS_OK)
throw std::runtime_error("Cannot register sync callbacks! status: " + status_to_string(status));
if(pthread_create(&i->sync_thread, NULL, JsonAdapter::syncThreadRoutine, (void *) this) != 0)
throw std::runtime_error("Cannot create sync session thread !");
void JsonAdapter::startSync()
{
PEP_STATUS status = init(&i->sync_session);
if(status != PEP_STATUS_OK || i->sync_session==nullptr)
{
throw std::runtime_error("Cannot create sync session! status: " + status_to_string(status));
}
i->sync_queue = new locked_queue< sync_msg_t* >();
status = register_sync_callbacks(i->sync_session,
(void*) this,
JsonAdapter::messageToSend,
JsonAdapter::showHandshake,
JsonAdapter::injectSyncMsg,
JsonAdapter::retrieveNextSyncMsg);
if (status != PEP_STATUS_OK)
throw std::runtime_error("Cannot register sync callbacks! status: " + status_to_string(status));
if(pthread_create(&i->sync_thread, NULL, JsonAdapter::syncThreadRoutine, (void*) this) != 0)
throw std::runtime_error("Cannot create sync session thread !");
}
void JsonAdapter::stopSync(void)
{
i->sync_queue->push_front(NULL);
pthread_join(i->sync_thread, NULL);
unregister_sync_callbacks(i->sync_session);
delete i->sync_queue;
release(i->sync_session);
void JsonAdapter::stopSync()
{
i->sync_queue->push_front(NULL);
pthread_join(i->sync_thread, NULL);
unregister_sync_callbacks(i->sync_session);
delete i->sync_queue;
i->sync_queue = nullptr;
release(i->sync_session);
}
JsonAdapter::JsonAdapter(const std::string& address, unsigned start_port, unsigned end_port)
: i(new Internal)
JsonAdapter::JsonAdapter(const std::string& address, unsigned start_port, unsigned end_port, bool silent)
: i(new Internal( silent ? nulllogger : std::cerr ))
{
i->eventBase.reset(event_base_new());
if (!i->eventBase)
throw std::runtime_error("Failed to create new base_event.");
i->evHttp.reset( evhttp_new(i->eventBase.get()));
i->evHttp.reset( evhttp_new(i->eventBase.get()) );
if (!i->evHttp)
throw std::runtime_error("Failed to create new evhttp.");
i->address = address;
i->start_port = start_port;
i->end_port = end_port;
startSync();
i->silent = silent;
startSync();
}
@ -648,7 +660,7 @@ JsonAdapter::~JsonAdapter()
void JsonAdapter::run()
try
{
std::cout << "I have " << session_registry.size() << " registered session(s).\n";
Log() << "I have " << session_registry.size() << " registered session(s).\n";
std::exception_ptr initExcept;
auto ThreadFunc = [&] ()
@ -656,7 +668,7 @@ try
try
{
const auto id = std::this_thread::get_id();
std::cerr << " +++ Thread starts: isRun=" << i->running << ", id=" << id << ". +++\n";
Log() << " +++ Thread starts: isRun=" << i->running << ", id=" << id << ". +++\n";
const auto q=session_registry.find(id);
if(q==session_registry.end())
{
@ -668,14 +680,13 @@ try
}
session_registry.emplace( id, session);
std::cerr << "\tcreated new session for this thread: " << static_cast<void*>(session) << ".\n";
Log() << "\tcreated new session for this thread: " << static_cast<void*>(session) << ".\n";
status = attach_sync_session(session, i->sync_session);
if(status != PEP_STATUS_OK)
throw std::runtime_error("Cannot attach to sync session! status: " + status_to_string(status));
status = attach_sync_session(session, i->sync_session);
if(status != PEP_STATUS_OK)
throw std::runtime_error("Cannot attach to sync session! status: " + status_to_string(status));
}else{
std::cerr << "\tsession for this thread: " << static_cast<void*>(q->second) << ".\n";
Log() << "\tsession for this thread: " << static_cast<void*>(q->second) << ".\n";
}
evhttp_set_cb(i->evHttp.get(), ApiRequestUrl.c_str() , OnApiRequest , this);
@ -685,7 +696,7 @@ try
if (i->sock == -1) // no port bound, yet
{
// initialize the pEp engine
std::cout << "I have " << session_registry.size() << " registered session(s).\n";
Log() << "I have " << session_registry.size() << " registered session(s).\n";
unsigned port_ofs = 0;
try_next_port:
@ -708,7 +719,7 @@ try_next_port:
i->port = i->start_port + port_ofs;
i->token = create_security_token(i->address, i->port, BaseUrl);
std::cout << "Bound to port " << i->port << ", sec_token=\"" << i->token << "\"\n";
Log() << "Bound to port " << i->port << ", sec_token=\"" << i->token << "\"\n";
}
else
{
@ -719,28 +730,32 @@ try_next_port:
unsigned numnum = 1000000;
while(i->running)
{
event_base_loop(i->eventBase.get(), EVLOOP_NONBLOCK);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::cerr << "\r" << ++numnum << ". ";
// once we have libevent 2.1:
//event_base_loop(i->eventBase.get(), EVLOOP_NO_EXIT_ON_EMPTY);
// for libevent 2.0:
event_base_loop(i->eventBase.get(), 0);
std::this_thread::sleep_for(std::chrono::milliseconds(333));
Log() << "\r" << ++numnum << ". ";
}
}
catch (const std::exception& e)
{
std::cerr << " +++ std::exception in ThreadFunc: " << e.what() << "\n";
Log() << " +++ std::exception in ThreadFunc: " << e.what() << "\n";
initExcept = std::current_exception();
}
catch (...)
{
std::cerr << " +++ UNKNOWN EXCEPTION in ThreadFunc +++ ";
Log() << " +++ UNKNOWN EXCEPTION in ThreadFunc +++ ";
initExcept = std::current_exception();
}
std::cerr << " +++ Thread exit? isRun=" << i->running << ", id=" << std::this_thread::get_id() << ". +++\n";
Log() << " +++ Thread exit? isRun=" << i->running << ", id=" << std::this_thread::get_id() << ". +++\n";
};
i->running = true;
for(int t=0; t<SrvThreadCount; ++t)
{
std::cout << "Start Thread #" << t << "...\n";
Log() << "Start Thread #" << t << "...\n";
ThreadPtr thread(new std::thread(ThreadFunc), ThreadDeleter);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
if (initExcept != std::exception_ptr())
@ -750,11 +765,12 @@ try_next_port:
}
i->threads.push_back(std::move(thread));
}
std::cout << "All " << SrvThreadCount << " thread(s) started.\n";
Log() << "All " << SrvThreadCount << " thread(s) started.\n";
}
catch (std::exception const &e)
{
std::cerr << "Exception catched in main(): \"" << e.what() << "\"" << std::endl;
Log() << "Exception catched in main(): \"" << e.what() << "\"" << std::endl;
throw;
}
@ -774,7 +790,7 @@ bool JsonAdapter::verify_security_token(const std::string& s) const
{
if(s!=i->token)
{
std::cerr << "sec_token=\"" << i->token << "\" (len=" << i->token.size() << ") is unequal to \"" << s << "\" (len=" << s.size() << ")!\n";
Log() << "sec_token=\"" << i->token << "\" (len=" << i->token.size() << ") is unequal to \"" << s << "\" (len=" << s.size() << ")!\n";
}
return s == i->token;
}
@ -808,3 +824,9 @@ void JsonAdapter::unregisterEventListener(const std::string& address, unsigned p
i->eventListener.erase(q);
}
std::ostream& JsonAdapter::Log() const
{
return i->Log;
}

@ -9,8 +9,9 @@ class JsonAdapter : public Context
{
public:
// creates an instance of the JSON adapter. It tries to bind the first available port in the given range
// if chatty==true there is some debug output on stderr, if chatty==false the server is silent.
// throws std::runtime_error if no port cannot be bound.
JsonAdapter(const std::string& address, unsigned start_port, unsigned end_port);
JsonAdapter(const std::string& address, unsigned start_port, unsigned end_port, bool chatty);
// calls abort() on the instance if it is still running().
virtual ~JsonAdapter();
@ -52,12 +53,15 @@ public:
static PEP_STATUS messageToSend(void* obj, message* msg);
static PEP_STATUS showHandshake(void* obj, pEp_identity* self, pEp_identity* partner);
static int injectSyncMsg(void* obj, void *msg);
static void *retrieveNextSyncMsg(void* obj);
static void *syncThreadRoutine(void *arg);
void startSync(void);
void stopSync(void);
static int injectSyncMsg(void* obj, void* msg);
static void *retrieveNextSyncMsg(void* obj);
static void *syncThreadRoutine(void* arg);
void startSync(void);
void stopSync(void);
// returns the associated log stream (either std::cerr or nulllogger)
std::ostream& Log() const;
private:
struct Internal;

@ -1,27 +1,69 @@
#include "json-adapter.hh"
#include <iostream>
#include <boost/program_options.hpp>
namespace po = boost::program_options;
bool debug_mode = false;
std::string address = "127.0.0.1";
unsigned start_port = 4223;
unsigned end_port = 9999;
void print_version()
{
std::cout << "pEp JSON Adapter.\n"
"\tversion \"" << JsonAdapter::version() << "\"\n"
"\tAPI version " << JsonAdapter::apiVersion() << "\n"
"\tpEpEngine version " << get_engine_version() << "\n"
"\n";
}
int main(int argc, char** argv)
try
{
JsonAdapter ja( address, start_port, end_port );
po::options_description desc("Program options for the JSON Server Adapter");
desc.add_options()
("help,h", "print this help messages")
("version,v", "print program version")
("debug,d", po::value<bool>(&debug_mode)->default_value(false), "Run in debug mode, don't fork() in background")
("start-port,s", po::value<unsigned>(&start_port)->default_value(start_port), "First port to bind on")
("end-port,e", po::value<unsigned>(&end_port)->default_value(end_port), "Last port to bind on")
("address,a", po::value<std::string>(&address)->default_value(address), "Address to bind on")
;
po::variables_map vm;
po::store(po::parse_command_line(argc, argv, desc), vm);
po::notify(vm);
ja.run();
if (vm.count("help"))
{
std::cout << desc << "\n";
return 0;
}
if (vm.count("version"))
{
print_version();
return 0;
}
if( vm.count("debug"))
{
JsonAdapter ja( address, start_port, end_port, !debug_mode );
ja.run();
int input = 0;
do{
std::cout << "Press <Q> <Enter> to quit." << std::endl;
input = std::cin.get();
std::cout << "Oh, I got a '" << input << "'. \n";
}while(input != 'q' && input != 'Q');
ja.shutdown(nullptr);
std::cout << "Good bye. :-)" << std::endl;
}else{
int input = 0;
do{
std::cout << "Press <Q> <Enter> to quit." << std::endl;
input = std::cin.get();
std::cout << "Oh, I got a '" << input << "'. \n";
}while(input != 'q' && input != 'Q');
ja.shutdown(nullptr);
std::cout << "Good bye. :-)" << std::endl;
}
}
catch (std::exception const &e)
{

Loading…
Cancel
Save