PYADPT-111: "Add Single Threaded Sync Implementation" - Implementation

Release_2.1
heck 2021-03-27 05:15:16 +01:00
parent 76ef8e0a21
commit bb70cf0169
2 changed files with 76 additions and 29 deletions

View File

@ -36,6 +36,7 @@ import pEp._pEp
from threading import Thread, Barrier
from time import sleep
# Executed on module import
def init():
# print(init, "called")
@ -44,12 +45,23 @@ def init():
def start_sync() -> None:
"""starts the sync thread"""
Sync.start_sync()
set_sync_mode(SyncModes.Async)
def shutdown_sync() -> None :
def shutdown_sync() -> None:
"""call this to shut down the sync thread"""
Sync.shutdown_sync()
set_sync_mode(SyncModes.Off)
def set_sync_mode(mode):
_pEp._set_sync_mode(mode)
if mode == SyncModes.Sync:
Sync.shutdown_sync()
if mode == SyncModes.Async:
Sync.start_sync()
if mode == SyncModes.Off:
Sync.shutdown_sync()
def is_sync_active() -> bool:
@ -80,7 +92,7 @@ def notify_handshake(me, partner, signal):
class Sync(Thread):
__instance:'Sync' = None
__instance: 'Sync' = None
barr = Barrier(2)
def __init__(self):
@ -97,37 +109,54 @@ class Sync(Thread):
return Sync.__instance
def run(self):
pEp.register_sync_callbacks()
"""
* Sync Thread
* NOPE 1. Execute registered startup function
register_sync_callbacks
* 2. Create session for the sync thread (registers: messageToSend, _inject_sync_event, _ensure_passphrase)
* 3. register_sync_callbacks() (registers: _notifyHandshake, _retrieve_next_sync_event)
* 4. Enter Sync Event Dispatching Loop (do_sync_protocol())
unregister_sync_callbacks
* 5. unregister_sync_callbacks()
* 6. Release the session
* NOPE 7. Execute registered shutdown function
"""
# TODO catch exception, and rethrow in start()
_pEp._register_sync_callbacks()
self.barr.wait()
while pEp.do_protocol_step():
while _pEp._do_protocol_step():
sleep(1)
pEp.unregister_sync_callbacks()
_pEp._unregister_sync_callbacks()
def start(self):
"""
1. NEW ADLIB FUNC register_sync_callbacks() ONLY
2. create python thread that does do_sync_protocol()
3. NEWADLIB FUNC CallbackDispatcher.Signal_all_SYNC_NOTIFY_START()
* (1. Done on init(): ensure session for the main thread
(registers: messageToSend, _inject_sync_event, _ensure_passphrase))
* 2. Start the sync thread
* 3. Defer execution until sync thread register_sync_callbacks() has returned
* 4. TODO: Throw pending exception from the sync thread
"""
Thread.start(self)
self.barr.wait()
# pEp.notifyHandshake_sync_start()
# TODO: Throw exceptions from sync thread
# _pEp._notifyHandshake_sync_start()
# sleep(2)
@staticmethod
def start_sync():
Sync.getInstance().start()
if not Sync.getInstance().is_alive():
Sync.getInstance().start()
@staticmethod
def shutdown_sync():
if Sync.__instance:
if Sync.__instance.is_alive():
pEp.inject_sync_shutdown()
_pEp._inject_sync_shutdown()
Sync.__instance.join()
Sync.__instance = None
# pEp.notifyHandshake_sync_stop()
# _pEp._notifyHandshake_sync_stop()
init()

View File

@ -41,8 +41,16 @@ namespace pEp {
void _init_after_main_module() {
pEpLog("called");
callback_dispatcher.add(_messageToSend, _notifyHandshake, nullptr, nullptr);
Adapter::_messageToSend = CallbackDispatcher::messageToSend;
Adapter::_notifyHandshake = CallbackDispatcher::notifyHandshake;
// Adapter::sync_initialize(
// Adapter::SyncModes::Off,
// CallbackDispatcher::messageToSend,
// CallbackDispatcher::notifyHandshake,
// true);
Adapter::sync_initialize(
Adapter::SyncModes::Off,
_messageToSend,
_notifyHandshake,
true);
}
@ -129,7 +137,7 @@ namespace pEp {
return PEP_STATUS_OK;
}
bool do_protocol_step() {
bool _do_protocol_step() {
pEpLog("called");
SYNC_EVENT event = Adapter::_retrieve_next_sync_event(nullptr, 0);
if (event != NULL) {
@ -141,29 +149,31 @@ namespace pEp {
}
}
void register_sync_callbacks() {
void _register_sync_callbacks() {
pEpLog("called");
Adapter::session();
PEP_STATUS status = ::register_sync_callbacks(Adapter::session(), nullptr, Adapter::_notifyHandshake, Adapter::_retrieve_next_sync_event);
_throw_status(status);
}
void unregister_sync_callbacks() {
void _unregister_sync_callbacks() {
::unregister_sync_callbacks(Adapter::session());
// Adapter::session(release);
}
void inject_sync_shutdown() {
void _inject_sync_shutdown() {
pEpLog("injecting null event");
Adapter::_inject_sync_event(nullptr,nullptr);
Adapter::_queue_sync_event(nullptr,nullptr);
}
// TODO: Integrate this (currently SEGFAULTING)
void notifyHandshake_sync_start() {
void _notifyHandshake_sync_start() {
pEpLog("all targets signal: SYNC_NOTIFY_START");
CallbackDispatcher::notifyHandshake(nullptr, nullptr, SYNC_NOTIFY_START);
}
// TODO: Integrate this (currently SEGFAULTING)
void notifyHandshake_sync_stop() {
void _notifyHandshake_sync_stop() {
pEpLog("all targets signal: SYNC_NOTIFY_STOP");
CallbackDispatcher::notifyHandshake(nullptr, nullptr, SYNC_NOTIFY_STOP);
}
@ -228,24 +238,32 @@ namespace pEp {
def("set_debug_log_enabled", &Adapter::pEpLog::set_enabled,
"Switch debug logging on/off");
def("register_sync_callbacks", register_sync_callbacks,
def("_register_sync_callbacks", _register_sync_callbacks,
"");
def("unregister_sync_callbacks", unregister_sync_callbacks,
def("_unregister_sync_callbacks", _unregister_sync_callbacks,
"");
def("do_protocol_step", do_protocol_step,
def("_do_protocol_step", _do_protocol_step,
"");
def("inject_sync_shutdown", inject_sync_shutdown,
def("_inject_sync_shutdown", _inject_sync_shutdown,
"");
def("notifyHandshake_sync_start", notifyHandshake_sync_start,
def("_notifyHandshake_sync_start", _notifyHandshake_sync_start,
"");
def("notifyHandshake_sync_stop", notifyHandshake_sync_stop,
def("_notifyHandshake_sync_stop", _notifyHandshake_sync_stop,
"");
def("_set_sync_mode", pEp::Adapter::set_sync_mode,
"");
enum_<pEp::Adapter::SyncModes>("SyncModes")
.value("Off", pEp::Adapter::SyncModes::Off)
.value("Async", pEp::Adapter::SyncModes::Async)
.value("Sync", pEp::Adapter::SyncModes::Sync);
def("passive_mode", config_passive_mode,
"do not attach pub keys to all messages");