Browse Source

PYADPT-110: "Add Mutlithreaded Sync Implementation" - Implementation

PYADPT-110
heck 7 months ago
parent
commit
f858556460
3 changed files with 138 additions and 63 deletions
  1. +73
    -3
      src/pEp/__init__.py
  2. +63
    -58
      src/pEp/_pEp/pEpmodule.cc
  3. +2
    -2
      src/pEp/_pEp/pEpmodule.hh

+ 73
- 3
src/pEp/__init__.py View File

@ -14,9 +14,11 @@ try:
from .__version__ import version as __version__
except ImportError:
import warnings
warnings.warn("Error loading build-time defined __version__.py, trying setuptools now...")
try:
import setuptools_scm
__version__ = setuptools_scm.get_version()
del setuptools_scm
except Exception:
@ -30,18 +32,36 @@ from ._pEp import *
# with an underscore (of _pEp), but we dont want to import them into this module
import pEp._pEp
# 3rd party imports
from threading import Thread, Barrier
from time import sleep
# Executed on module import
def init():
print(init, "called")
# print(init, "called")
_pEp._init_after_main_module()
def start_sync() -> None:
"""starts the sync thread"""
Sync.start_sync()
def shutdown_sync() -> None :
"""call this to shut down the sync thread"""
Sync.shutdown_sync()
def is_sync_active() -> bool:
"""True if sync is active, False otherwise"""
return Sync.getInstance().isAlive()
def message_to_send(msg):
"""
message_to_send(msg)
override pEp.message_to_send(msg) with your own implementation
this callback is being called when a pp management message needs to be sent
GIL CAVEAT
"""
print("message_to_send() - default callback\n")
print("overwrite this method")
@ -54,10 +74,60 @@ def notify_handshake(me, partner, signal):
partner identity of communication partner
signal the handshake signal
overwrite this method with an implementation of a handshake dialog
GIL CAVEAT
"""
print("notify_handshake() - default callback\n")
print("overwrite this method")
class Sync(Thread):
__instance:'Sync' = None
barr = Barrier(2)
def __init__(self):
if Sync.__instance != None:
raise Exception("singleton!")
else:
Sync.__instance = self
Thread.__init__(self)
@staticmethod
def getInstance() -> 'Sync':
if Sync.__instance == None:
Sync()
return Sync.__instance
def run(self):
pEp.register_sync_callbacks()
self.barr.wait()
while pEp.do_protocol_step():
sleep(1)
pEp.unregister_sync_callbacks()
def start(self):
"""
1. NEW ADLIB FUNC register_sync_callbacks() ONLY
2. create python thread that does do_sync_protocol()
3. NEWADLIB FUNC CallbackDispatcher.Signal_all_SYNC_NOTIFY_START()
"""
Thread.start(self)
self.barr.wait()
# pEp.notifyHandshake_sync_start()
# sleep(2)
@staticmethod
def start_sync():
Sync.getInstance().start()
@staticmethod
def shutdown_sync():
if Sync.__instance:
if Sync.__instance.isAlive():
pEp.inject_sync_shutdown()
Sync.__instance.join()
Sync.__instance = None
# pEp.notifyHandshake_sync_stop()
init()

+ 63
- 58
src/pEp/_pEp/pEpmodule.cc View File

@ -37,11 +37,12 @@ namespace pEp {
pEpLog("called");
}
// hidden init function, wrapped by hello_world.init()
// hidden init function, wrapped by _pEp.init()
void _init_after_main_module() {
pEpLog("called");
callback_dispatcher.add(_messageToSend, notifyHandshake, nullptr, nullptr);
callback_dispatcher.add(_messageToSend, _notifyHandshake, nullptr, nullptr);
Adapter::_messageToSend = CallbackDispatcher::messageToSend;
Adapter::_notifyHandshake = CallbackDispatcher::notifyHandshake;
}
@ -96,6 +97,7 @@ namespace pEp {
}
}
// TODO: GIL handling isnt really required here, i think
PEP_STATUS _messageToSend(::message *msg) {
pEpLog("called");
try {
@ -103,7 +105,7 @@ namespace pEp {
pEpLog("GIL Aquired");
object modref = import("pEp");
object funcref = modref.attr("message_to_send");
call<void>(funcref.ptr(), Message());
call<void>(funcref.ptr(), Message(msg));
PyGILState_Release(gil);
pEpLog("GIL released");
} catch (exception &e) {}
@ -111,14 +113,15 @@ namespace pEp {
return PEP_STATUS_OK;
}
PEP_STATUS notifyHandshake(pEp_identity *me, pEp_identity *partner, sync_handshake_signal signal) {
// TODO: GIL handling isnt really required here, i think
PEP_STATUS _notifyHandshake(pEp_identity *me, pEp_identity *partner, sync_handshake_signal signal) {
pEpLog("called");
try {
PyGILState_STATE gil = PyGILState_Ensure();
pEpLog("GIL Aquired");
object modref = import("pEp");
object funcref = modref.attr("notify_handshake");
call<void>(funcref.ptr(), me, partner, signal);
call<void>(funcref.ptr(), Identity(me), Identity(partner), signal);
PyGILState_Release(gil);
pEpLog("GIL released");
} catch (exception &e) {}
@ -126,13 +129,43 @@ namespace pEp {
return PEP_STATUS_OK;
}
bool do_protocol_step() {
pEpLog("called");
SYNC_EVENT event = Adapter::_retrieve_next_sync_event(nullptr, 0);
if (event != NULL) {
::do_sync_protocol_step(Adapter::session(), (void *)&callback_dispatcher, event);
return true;
} else {
pEpLog("null event, signaling sync shutdown");
return false;
}
}
void register_sync_callbacks() {
pEpLog("called");
PEP_STATUS status = ::register_sync_callbacks(Adapter::session(), nullptr, Adapter::_notifyHandshake, Adapter::_retrieve_next_sync_event);
_throw_status(status);
}
void start_sync() {
CallbackDispatcher::start_sync();
void unregister_sync_callbacks() {
::unregister_sync_callbacks(Adapter::session());
}
void shutdown_sync() {
CallbackDispatcher::stop_sync();
void inject_sync_shutdown() {
pEpLog("injecting null event");
Adapter::_inject_sync_event(nullptr,nullptr);
}
// TODO: Integrate this (currently SEGFAULTING)
void notifyHandshake_sync_start() {
pEpLog("all targets signal: SYNC_NOTIFY_START");
CallbackDispatcher::notifyHandshake(nullptr, nullptr, SYNC_NOTIFY_START);
}
// TODO: Integrate this (currently SEGFAULTING)
void notifyHandshake_sync_stop() {
pEpLog("all targets signal: SYNC_NOTIFY_STOP");
CallbackDispatcher::notifyHandshake(nullptr, nullptr, SYNC_NOTIFY_STOP);
}
void debug_color(int ansi_color) {
@ -143,10 +176,6 @@ namespace pEp {
::leave_device_group(Adapter::session());
}
bool is_sync_active() {
return Adapter::is_sync_running();
}
void testfunc() {
_messageToSend(NULL);
}
@ -196,6 +225,27 @@ namespace pEp {
scope().attr("engine_version") = get_engine_version();
scope().attr("protocol_version") = get_protocol_version();
def("set_debug_log_enabled", &Adapter::pEpLog::set_enabled,
"Switch debug logging on/off");
def("register_sync_callbacks", register_sync_callbacks,
"");
def("unregister_sync_callbacks", unregister_sync_callbacks,
"");
def("do_protocol_step", do_protocol_step,
"");
def("inject_sync_shutdown", inject_sync_shutdown,
"");
def("notifyHandshake_sync_start", notifyHandshake_sync_start,
"");
def("notifyHandshake_sync_stop", notifyHandshake_sync_stop,
"");
def("passive_mode", config_passive_mode,
"do not attach pub keys to all messages");
@ -584,32 +634,6 @@ namespace pEp {
.value("SYNC_NOTIFY_SOLE", SYNC_NOTIFY_SOLE)
.value("SYNC_NOTIFY_IN_GROUP", SYNC_NOTIFY_IN_GROUP);
// auto user_interface_class = class_<UserInterface, UserInterface_callback, boost::noncopyable>(
// "UserInterface",
// "class MyUserInterface(UserInterface):\n"
// " def notifyHandshake(self, me, partner):\n"
// " ...\n"
// "\n"
// "p≡p User Interface class\n"
// "To be used as a mixin\n"
// )
// .def("notifyHandshake", &UserInterface::notifyHandshake,
// "notifyHandshake(self, me, partner)\n"
// "\n"
// " me own identity\n"
// " partner identity of communication partner\n"
// "\n"
// "overwrite this method with an implementation of a handshake dialog")
// .def("deliverHandshakeResult", &UserInterface::deliverHandshakeResult,
// boost::python::arg("identities")=object(),
// "deliverHandshakeResult(self, result, identities=None)\n"
// "\n"
// " result -1: cancel, 0: accepted, 1: rejected\n"
// " identities list of identities to share or None for all\n"
// "\n"
// "call to deliver the handshake result of the handshake dialog"
// );
def("deliver_handshake_result", &deliverHandshakeResult, boost::python::arg("identities")=object(),
"deliverHandshakeResult(self, result, identities=None)\n"
"\n"
@ -619,18 +643,6 @@ namespace pEp {
"call to deliver the handshake result of the handshake dialog"
);
def("start_sync", &start_sync,
"start_sync()\n"
"\n"
"starts the sync thread"
);
def("shutdown_sync", &shutdown_sync,
"shutdown_sync()\n"
"\n"
"call this from another thread to shut down the sync thread\n"
);
def("debug_color", &debug_color,
"for debug builds set ANSI color value");
@ -640,13 +652,6 @@ namespace pEp {
"call this for a grouped device, which should leave\n"
);
def("is_sync_active", &is_sync_active,
"is_sync_active()\n"
"\n"
"True if sync is active, False otherwise\n"
);
// codecs
call< object >(((object)(import("codecs").attr("register"))).ptr(), make_function(sync_search));
call< object >(((object)(import("codecs").attr("register"))).ptr(), make_function(distribution_search));


+ 2
- 2
src/pEp/_pEp/pEpmodule.hh View File

@ -27,9 +27,9 @@ namespace pEp {
PEP_STATUS _messageToSend(::message *msg);
PEP_STATUS notifyHandshake(pEp_identity *me, pEp_identity *partner, sync_handshake_signal signal);
PEP_STATUS _notifyHandshake(pEp_identity *me, pEp_identity *partner, sync_handshake_signal signal);
} /* namespace PythonAdapter */
} /* namespace pEp */
#endif /* PEPMODULE_HH */
#endif /* PEPMODULE_HH */

Loading…
Cancel
Save