From bb70cf0169323d77c90437e63d8246dfd3cb5a7c Mon Sep 17 00:00:00 2001 From: heck Date: Sat, 27 Mar 2021 05:15:16 +0100 Subject: [PATCH] PYADPT-111: "Add Single Threaded Sync Implementation" - Implementation --- src/pEp/__init__.py | 57 +++++++++++++++++++++++++++++---------- src/pEp/_pEp/pEpmodule.cc | 48 ++++++++++++++++++++++----------- 2 files changed, 76 insertions(+), 29 deletions(-) diff --git a/src/pEp/__init__.py b/src/pEp/__init__.py index 2c0bcf3..02c0b05 100755 --- a/src/pEp/__init__.py +++ b/src/pEp/__init__.py @@ -36,6 +36,7 @@ import pEp._pEp from threading import Thread, Barrier from time import sleep + # Executed on module import def init(): # print(init, "called") @@ -44,12 +45,23 @@ def init(): def start_sync() -> None: """starts the sync thread""" - Sync.start_sync() + set_sync_mode(SyncModes.Async) -def shutdown_sync() -> None : +def shutdown_sync() -> None: """call this to shut down the sync thread""" - Sync.shutdown_sync() + set_sync_mode(SyncModes.Off) + + +def set_sync_mode(mode): + _pEp._set_sync_mode(mode) + if mode == SyncModes.Sync: + Sync.shutdown_sync() + if mode == SyncModes.Async: + Sync.start_sync() + if mode == SyncModes.Off: + Sync.shutdown_sync() + def is_sync_active() -> bool: @@ -80,7 +92,7 @@ def notify_handshake(me, partner, signal): class Sync(Thread): - __instance:'Sync' = None + __instance: 'Sync' = None barr = Barrier(2) def __init__(self): @@ -97,37 +109,54 @@ class Sync(Thread): return Sync.__instance def run(self): - pEp.register_sync_callbacks() + """ + * Sync Thread + * NOPE 1. Execute registered startup function + register_sync_callbacks + * 2. Create session for the sync thread (registers: messageToSend, _inject_sync_event, _ensure_passphrase) + * 3. register_sync_callbacks() (registers: _notifyHandshake, _retrieve_next_sync_event) + * 4. Enter Sync Event Dispatching Loop (do_sync_protocol()) + unregister_sync_callbacks + * 5. unregister_sync_callbacks() + * 6. Release the session + * NOPE 7. Execute registered shutdown function + """ + # TODO catch exception, and rethrow in start() + _pEp._register_sync_callbacks() self.barr.wait() - while pEp.do_protocol_step(): + while _pEp._do_protocol_step(): sleep(1) - pEp.unregister_sync_callbacks() + _pEp._unregister_sync_callbacks() def start(self): """ - 1. NEW ADLIB FUNC register_sync_callbacks() ONLY - 2. create python thread that does do_sync_protocol() - 3. NEWADLIB FUNC CallbackDispatcher.Signal_all_SYNC_NOTIFY_START() + * (1. Done on init(): ensure session for the main thread + (registers: messageToSend, _inject_sync_event, _ensure_passphrase)) + * 2. Start the sync thread + * 3. Defer execution until sync thread register_sync_callbacks() has returned + * 4. TODO: Throw pending exception from the sync thread """ Thread.start(self) self.barr.wait() - # pEp.notifyHandshake_sync_start() + # TODO: Throw exceptions from sync thread + # _pEp._notifyHandshake_sync_start() # sleep(2) @staticmethod def start_sync(): - Sync.getInstance().start() + if not Sync.getInstance().is_alive(): + Sync.getInstance().start() @staticmethod def shutdown_sync(): if Sync.__instance: if Sync.__instance.is_alive(): - pEp.inject_sync_shutdown() + _pEp._inject_sync_shutdown() Sync.__instance.join() Sync.__instance = None - # pEp.notifyHandshake_sync_stop() + # _pEp._notifyHandshake_sync_stop() init() diff --git a/src/pEp/_pEp/pEpmodule.cc b/src/pEp/_pEp/pEpmodule.cc index d747c5c..8fc2a7f 100644 --- a/src/pEp/_pEp/pEpmodule.cc +++ b/src/pEp/_pEp/pEpmodule.cc @@ -41,8 +41,16 @@ namespace pEp { void _init_after_main_module() { pEpLog("called"); callback_dispatcher.add(_messageToSend, _notifyHandshake, nullptr, nullptr); - Adapter::_messageToSend = CallbackDispatcher::messageToSend; - Adapter::_notifyHandshake = CallbackDispatcher::notifyHandshake; +// Adapter::sync_initialize( +// Adapter::SyncModes::Off, +// CallbackDispatcher::messageToSend, +// CallbackDispatcher::notifyHandshake, +// true); + Adapter::sync_initialize( + Adapter::SyncModes::Off, + _messageToSend, + _notifyHandshake, + true); } @@ -129,7 +137,7 @@ namespace pEp { return PEP_STATUS_OK; } - bool do_protocol_step() { + bool _do_protocol_step() { pEpLog("called"); SYNC_EVENT event = Adapter::_retrieve_next_sync_event(nullptr, 0); if (event != NULL) { @@ -141,29 +149,31 @@ namespace pEp { } } - void register_sync_callbacks() { + void _register_sync_callbacks() { pEpLog("called"); + Adapter::session(); PEP_STATUS status = ::register_sync_callbacks(Adapter::session(), nullptr, Adapter::_notifyHandshake, Adapter::_retrieve_next_sync_event); _throw_status(status); } - void unregister_sync_callbacks() { + void _unregister_sync_callbacks() { ::unregister_sync_callbacks(Adapter::session()); +// Adapter::session(release); } - void inject_sync_shutdown() { + void _inject_sync_shutdown() { pEpLog("injecting null event"); - Adapter::_inject_sync_event(nullptr,nullptr); + Adapter::_queue_sync_event(nullptr,nullptr); } // TODO: Integrate this (currently SEGFAULTING) - void notifyHandshake_sync_start() { + void _notifyHandshake_sync_start() { pEpLog("all targets signal: SYNC_NOTIFY_START"); CallbackDispatcher::notifyHandshake(nullptr, nullptr, SYNC_NOTIFY_START); } // TODO: Integrate this (currently SEGFAULTING) - void notifyHandshake_sync_stop() { + void _notifyHandshake_sync_stop() { pEpLog("all targets signal: SYNC_NOTIFY_STOP"); CallbackDispatcher::notifyHandshake(nullptr, nullptr, SYNC_NOTIFY_STOP); } @@ -228,24 +238,32 @@ namespace pEp { def("set_debug_log_enabled", &Adapter::pEpLog::set_enabled, "Switch debug logging on/off"); - def("register_sync_callbacks", register_sync_callbacks, + def("_register_sync_callbacks", _register_sync_callbacks, ""); - def("unregister_sync_callbacks", unregister_sync_callbacks, + def("_unregister_sync_callbacks", _unregister_sync_callbacks, ""); - def("do_protocol_step", do_protocol_step, + def("_do_protocol_step", _do_protocol_step, ""); - def("inject_sync_shutdown", inject_sync_shutdown, + def("_inject_sync_shutdown", _inject_sync_shutdown, ""); - def("notifyHandshake_sync_start", notifyHandshake_sync_start, + def("_notifyHandshake_sync_start", _notifyHandshake_sync_start, ""); - def("notifyHandshake_sync_stop", notifyHandshake_sync_stop, + def("_notifyHandshake_sync_stop", _notifyHandshake_sync_stop, ""); + def("_set_sync_mode", pEp::Adapter::set_sync_mode, + ""); + + enum_("SyncModes") + .value("Off", pEp::Adapter::SyncModes::Off) + .value("Async", pEp::Adapter::SyncModes::Async) + .value("Sync", pEp::Adapter::SyncModes::Sync); + def("passive_mode", config_passive_mode, "do not attach pub keys to all messages");