API: session(init/release) removed.

Before using the static object Adapter::session, initialize it using session.initialize().
Release it using session.release().
LIB-11
heck 2 years ago
parent 9b922a39dc
commit 080076582e

@ -8,6 +8,9 @@
#include "status_to_string.hh"
#include "pEpLog.hh"
#include "passphrase_cache.hh"
#include "callback_dispatcher.hh"
#include "group_manager_api.h"
#include <iostream>
using namespace std;
@ -36,18 +39,12 @@ namespace pEp {
throw RuntimeError(_status, status);
}
RuntimeError::RuntimeError(const std::string &_text, ::PEP_STATUS _status)
: std::runtime_error(_text.c_str()), text(_text), status(_status)
RuntimeError::RuntimeError(const std::string &_text, ::PEP_STATUS _status) :
std::runtime_error(_text.c_str()), text(_text), status(_status)
{
}
namespace Adapter {
// private
SyncModes _sync_mode = SyncModes::Async;
::messageToSend_t _messageToSend = nullptr;
::notifyHandshake_t _notifyHandshake = nullptr;
bool _adapter_manages_sync_thread = false;
::inject_sync_event_t _inject_action = _inject_sync_event;
std::thread _sync_thread;
::utility::locked_queue<SYNC_EVENT, ::free_Sync_event> sync_evt_q;
std::mutex mut;
@ -58,67 +55,6 @@ namespace pEp {
return _sync_thread.get_id();
}
// public
void sync_initialize(
SyncModes mode,
::messageToSend_t messageToSend,
::notifyHandshake_t notifyHandshake,
bool adapter_manages_sync_thread)
{
_messageToSend = messageToSend;
_notifyHandshake = notifyHandshake;
_adapter_manages_sync_thread = adapter_manages_sync_thread;
set_sync_mode(mode);
return;
}
// public
void set_sync_mode(SyncModes mode)
{
// std::lock_guard<mutex> lock(mut);
_sync_mode = mode;
if (_sync_mode == SyncModes::Sync) {
// init session with inject_sync = process
// stop sync
session(release);
_inject_action = _process_sync_event;
session(init);
::register_sync_callbacks(session(), nullptr, _notifyHandshake, _retrieve_next_sync_event);
if(!_adapter_manages_sync_thread) {
shutdown();
} else {
// The adapter need to shutdown sync thread
}
}
if (_sync_mode == SyncModes::Async) {
// init session with inject_sync = queue
// start sync thread
session(release);
_inject_action = _inject_sync_event;
session(init);
if(!_adapter_manages_sync_thread) {
if (!is_sync_running()) {
startup<void>(_messageToSend, _notifyHandshake, nullptr, nullptr);
}
} else {
// The adapter need to do sync thread start up
}
}
if (_sync_mode == SyncModes::Off) {
// init sesssion with inject_sync = null
// stop sync thread
if(!_adapter_manages_sync_thread) {
shutdown();
} else {
// Adapter needs to shutdown sync thread
}
session(release);
_inject_action = _inject_sync_event;
session(init);
}
return;
}
// private
int _process_sync_event(::SYNC_EVENT ev, void *management)
{
@ -171,34 +107,104 @@ namespace pEp {
return _sync_thread.get_id() == this_thread::get_id();
}
// public
::PEP_SESSION Session::operator()(session_action action)
// ---------------------------------------------------------------------------------------
Session::Session() :
_messageToSend{ nullptr }, _notifyHandshake{ nullptr }, _sync_mode{ SyncModes::Async },
_adapter_manages_sync_thread{ false }
{
std::lock_guard<mutex> lock(mut);
}
::PEP_STATUS status = ::PEP_STATUS_OK;
void Session::initialize(SyncModes sync_mode, bool adapter_manages_sync_thread)
{
pEpLog("Initializing session with CallbackDispatcher...");
_init(
pEp::CallbackDispatcher::messageToSend,
pEp::CallbackDispatcher::notifyHandshake,
sync_mode,
adapter_manages_sync_thread);
}
switch (action) {
case release:
if (_session.get()) {
_session = nullptr;
}
break;
void Session::initialize(
SyncModes sync_mode,
bool adapter_manages_sync_thread,
::messageToSend_t messageToSend,
::notifyHandshake_t notifyHandshake)
{
pEpLog("Initializing session...");
_init(messageToSend, notifyHandshake, sync_mode, adapter_manages_sync_thread);
}
case init:
if (!_session.get()) {
::PEP_SESSION session_;
status = ::init(&session_, _messageToSend, _inject_action, _ensure_passphrase);
throw_status(status);
_session = SessionPtr{session_, ::release};
}
break;
default:
status = ::PEP_ILLEGAL_VALUE;
void Session::_init(
::messageToSend_t messageToSend,
::notifyHandshake_t notifyHandshake,
SyncModes sync_mode,
bool adapter_manages_sync_thread)
{
// cache the values for sync-thread session creation
_messageToSend = messageToSend;
_notifyHandshake = notifyHandshake;
_sync_mode = sync_mode;
_adapter_manages_sync_thread = adapter_manages_sync_thread;
refresh();
::adapter_group_init();
}
void Session::refresh()
{
std::lock_guard<mutex> lock(mut);
release();
// Switch to mode "Sync" ensures the sync thread to be shutdown
if (_sync_mode == SyncModes::Sync) {
// process the event directly
_inject_action = _process_sync_event;
if (!_adapter_manages_sync_thread) {
stop_sync();
} else {
// The adapter needs to shutdown sync thread
}
}
// Switch to mode "ASync", sync thread needs to be started using start_sync
if (_sync_mode == SyncModes::Async) {
// put the event on queue
_inject_action = _inject_sync_event;
}
// create
::PEP_SESSION session_;
::PEP_STATUS status;
status = ::init(&session_, _messageToSend, _inject_action, _ensure_passphrase);
throw_status(status);
return _session.get();
status = ::register_sync_callbacks(
session_,
nullptr,
_notifyHandshake,
_retrieve_next_sync_event);
if (status != PEP_STATUS_OK) {
pEpLog("libpEpAdapter: WARNING - session is initialized but without sync/callbacks. "
"This is normal if there are no own identities yet. Call session.init() again to "
"re-initialize the session after creating an own identity.");
}
// store
_session = SessionPtr{ session_, ::release };
}
void Session::release()
{
if (_session.get()) {
_session = nullptr;
}
}
// public
::PEP_SESSION Session::operator()()
{
if (!_session.get()) {
throw std::runtime_error(
"libpEpAdapter: No session! Before use, call session.initialize() for each thread");
} else {
return _session.get();
}
}
// public
@ -230,7 +236,7 @@ namespace pEp {
// public
bool is_sync_running()
{
if(!_adapter_manages_sync_thread) {
if (!session._adapter_manages_sync_thread) {
return _sync_thread.joinable();
} else {
return false;

@ -11,6 +11,7 @@
#include <thread>
#include <pEp/sync_api.h>
#include "callback_dispatcher.hh"
namespace pEp {
@ -29,19 +30,10 @@ namespace pEp {
// public
enum class SyncModes
{
Off,
Sync,
Async
};
void sync_initialize(
SyncModes mode,
::messageToSend_t messageToSend,
::notifyHandshake_t notifyHandshake,
bool adapter_manages_sync_thread);
void set_sync_mode(SyncModes mode);
int _inject_sync_event(::SYNC_EVENT ev, void *management);
int _process_sync_event(::SYNC_EVENT ev, void *management);
@ -51,8 +43,6 @@ namespace pEp {
template<class T = void>
void startup(
messageToSend_t messageToSend,
notifyHandshake_t notifyHandshake,
T *obj = nullptr,
std::function<void(T *)> _startup = nullptr,
std::function<void(T *)> _shutdown = nullptr);
@ -63,18 +53,53 @@ namespace pEp {
// returns the thread id of the sync thread
std::thread::id sync_thread_id();
enum session_action
{
init,
release,
};
class Session {
public:
// TODO: needed because libpEpAdapter provides a static instance
// the session needs to be initialized in order to be usable.
Session();
// Init using CallbackDispatcher
// CAUTION: This may result in a partially initialized session.
// If there are any problem with register_sync_callbacks(), it will still
// succeed. (e.g. due to no own identities yet)
// BUT
// * Sync will not work
// * Group Encryption will not work
// TODO: This needs to be resolved in the engine, new func register_callbacks()
// that is not sync specific, and move the sync-checks to "start-sync()"
void initialize(SyncModes sync_mode = SyncModes::Async, bool adapter_manages_sync_thread = false);
// Arbitrary callbacks
void initialize(
SyncModes sync_mode,
bool adapter_manages_sync_thread,
::messageToSend_t messageToSend,
::notifyHandshake_t notifyHandshake);
// re-creates the session using same values
void refresh();
// Not copyable
Session(const Session &) = delete;
Session operator=(const Session&) = delete;
void release();
PEP_SESSION operator()();
SyncModes _sync_mode;
::messageToSend_t _messageToSend;
::notifyHandshake_t _notifyHandshake;
bool _adapter_manages_sync_thread;
::inject_sync_event_t _inject_action;
private:
void _init(
::messageToSend_t messageToSend,
::notifyHandshake_t notifyHandshake,
SyncModes sync_mode,
bool adapter_manages_sync_thread);
using SessionPtr = std::unique_ptr<_pEpSession, std::function<void(PEP_SESSION)>>;
SessionPtr _session = nullptr;
public:
PEP_SESSION operator()(session_action action = init);
};
extern thread_local Session session;

@ -14,8 +14,6 @@ namespace pEp {
namespace Adapter {
using std::function;
extern ::messageToSend_t _messageToSend;
extern ::notifyHandshake_t _notifyHandshake;
extern std::thread _sync_thread;
extern ::utility::locked_queue<::SYNC_EVENT, ::free_Sync_event> sync_evt_q;
@ -38,12 +36,10 @@ namespace pEp {
*/
// private
template<class T>
void sync_thread(T *obj, function<void(T *)> _startup, function<void(T *)> _shutdown)
void sync_thread(Session *rhs, T *obj, function<void(T *)> _startup, function<void(T *)> _shutdown)
{
pEpLog("called");
_ex = nullptr;
assert(_messageToSend);
assert(_notifyHandshake);
// 1. Execute registered startup function
if (obj && _startup) {
@ -52,29 +48,18 @@ namespace pEp {
pEpLog("creating session for the sync thread");
// 2. Create session for the sync thread
session();
// 3. register_sync_callbacks()
{
// TODO: Do we need to use a passphraseWrap here???
pEpLog("register_sync_callbacks()");
::PEP_STATUS status = ::register_sync_callbacks(
session(),
nullptr,
_notifyHandshake,
_retrieve_next_sync_event);
pEpLog("register_sync_callbacks() return:" << status);
// Convert status into exception and store it
// set register_done AFTER that
try {
throw_status(status);
register_done.store(true);
} catch (...) {
_ex = std::current_exception();
register_done.store(true);
return;
}
// 3. register_sync_callbacks() (in session.initialize())
try {
session.initialize(
rhs->_sync_mode,
rhs->_adapter_manages_sync_thread,
rhs->_messageToSend,
rhs->_notifyHandshake);
register_done.store(true);
} catch (...) {
_ex = std::current_exception();
register_done.store(true);
return;
}
pEpLog("sync protocol loop started");
@ -86,8 +71,7 @@ namespace pEp {
unregister_sync_callbacks(session());
// 6. Release the session
// TODO: Maybe do that AFTER shutdown?
session(release);
session.release();
// 7. Execute registered shutdown function
if (obj && _shutdown) {
@ -97,38 +81,25 @@ namespace pEp {
/*
* Sync Thread Startup
* 1. ensure session for the main thread (registers: messageToSend, _inject_sync_event, _ensure_passphrase)
* 1. throw if main thread session is not initialized
* 2. Start the sync thread
* 3. Defer execution until sync thread register_sync_callbacks() has returned
* 4. Throw pending exception from the sync thread
*/
// private
template<class T>
void startup(
::messageToSend_t messageToSend,
::notifyHandshake_t notifyHandshake,
T *obj,
function<void(T *)> _startup,
function<void(T *)> _shutdown)
void startup(T *obj, std::function<void(T *)> _startup, std::function<void(T *)> _shutdown)
{
pEpLog("called");
if (messageToSend) {
_messageToSend = messageToSend;
}
if (notifyHandshake) {
_notifyHandshake = notifyHandshake;
}
pEpLog("ensure session for the main thread");
// 1. re-initialize session for the main thread (registers: messageToSend, _inject_sync_event, _ensure_passphrase)
session(release);
session(init);
// refresh the session
// due to partially initialized session, see session.initialize()
session.refresh();
if (!_sync_thread.joinable()) {
register_done.store(false);
pEpLog("creating sync-thread");
// 2. Start the sync thread
_sync_thread = std::thread(sync_thread<T>, obj, _startup, _shutdown);
_sync_thread = std::thread(sync_thread<T>, &session, obj, _startup, _shutdown);
// 3. Defer execution until sync thread register_sync_callbacks() has returned
while (register_done.load() == false) {
pEpLog("waiting for sync-thread to init...");

Loading…
Cancel
Save