update for new sync interface

pull/14/head
heck 3 weeks ago
parent 44f66c4744
commit 5955eba1b8

@ -14,11 +14,9 @@ try:
from .__version__ import version as __version__
except ImportError:
import warnings
warnings.warn("Error loading build-time defined __version__.py, trying setuptools now...")
try:
import setuptools_scm
__version__ = setuptools_scm.get_version()
del setuptools_scm
except Exception:
@ -33,6 +31,7 @@ from ._pEp import *
import pEp._pEp
# 3rd party imports
from os import environ
from threading import Thread, Barrier
from time import sleep
from enum import Enum
@ -40,8 +39,42 @@ from enum import Enum
# Executed on module import
def init():
# enable log until the desired setting is clear
_pEp.set_debug_log_enabled(True)
env_var_log_adapter_enabled: str = "PEP_LOG_ADAPTER"
log_adapter_enabled: bool = False
try:
if environ[env_var_log_adapter_enabled] == "0":
log_adapter_enabled = False
_pEp._log("env var {} set to 0".format(env_var_log_adapter_enabled))
elif environ[env_var_log_adapter_enabled] == "1":
log_adapter_enabled = True
else:
_pEp._log("env var {}: valid values are 1 or 0".format(env_var_log_adapter_enabled))
except:
_pEp._log("env var {} not set. Defaulting to {}".format(env_var_log_adapter_enabled, log_adapter_enabled))
_pEp.set_debug_log_enabled(log_adapter_enabled)
# Sync event processing (Sync/Async)
use_sync_thread: bool = True
env_var_use_sync_thread: str = "PEP_MULTITHREAD"
try:
if environ[env_var_use_sync_thread] == "0":
use_sync_thread = False
print("env var {} set to 0, Sync-event processing set to synchronous".format(env_var_use_sync_thread,use_sync_thread))
elif environ[env_var_use_sync_thread] == "1":
use_sync_thread = True
print("env var {} set to 1, Sync-event processing set to asynchronous".format(env_var_use_sync_thread,use_sync_thread))
else:
_pEp._log("env var {}: valid values are 1 or 0".format(env_var_log_adapter_enabled))
except:
_pEp._log("env var {} not set. Defaulting to {}".format(env_var_use_sync_thread,use_sync_thread))
_pEp._init_callbackdispatcher()
_pEp._init_session(True)
_pEp._init_session(use_sync_thread)
def start_sync() -> None:
@ -52,24 +85,6 @@ def start_sync() -> None:
def shutdown_sync() -> None:
"""call this to shut down the sync thread"""
Sync.shutdown_sync()
# set_sync_mode(SyncModes.Off)
class SyncModes(Enum):
OFF: int = 0
SYNC: int = 1
ASYNC: int = 2
def set_sync_mode(mode):
if mode == SyncModes.SYNC:
Sync.shutdown_sync()
_pEp._init_session(False)
if mode == SyncModes.ASYNC:
_pEp._init_session(True)
Sync.start_sync()
if mode == SyncModes.OFF:
Sync.shutdown_sync()
def is_sync_active() -> bool:
@ -105,7 +120,7 @@ class Sync(Thread):
def __init__(self):
if Sync.__instance != None:
raise Exception("singleton!")
raise Exception("singleton already instantiated. Dont use constructor, use getInstance()")
else:
Sync.__instance = self
Thread.__init__(self)
@ -123,18 +138,15 @@ class Sync(Thread):
register_sync_callbacks
* 2. Create session for the sync thread (registers: messageToSend, _inject_sync_event, _ensure_passphrase)
* 3. register_sync_callbacks() (registers: _notifyHandshake, _retrieve_next_sync_event)
* 4. Enter Sync Event Dispatching Loop (do_sync_protocol())
* 4. Enter Sync Event Dispatching Loop (do_sync_protocol())
unregister_sync_callbacks
* 5. unregister_sync_callbacks()
* 6. Release the session
* NOPE 7. Execute registered shutdown function
"""
# TODO catch exception, and rethrow in start()
# _pEp._register_sync_callbacks()
_pEp._init_session(True)
self.barr.wait()
while _pEp._do_protocol_step():
while _pEp._do_protocol_step_from_queue():
sleep(1)
_pEp._free_session()

@ -106,46 +106,30 @@ namespace pEp {
}
}
// TODO: GIL handling isnt really required here, i think
PEP_STATUS _messageToSend(::message *msg)
{
pEpLog("called");
try {
PyGILState_STATE gil = PyGILState_Ensure();
pEpLog("GIL Aquired");
object modref = import("pEp");
object funcref = modref.attr("message_to_send");
call<void>(funcref.ptr(), Message(msg));
PyGILState_Release(gil);
pEpLog("GIL released");
} catch (std::exception &e) {
}
object modref = import("pEp");
object funcref = modref.attr("message_to_send");
call<void>(funcref.ptr(), Message(msg));
return PEP_STATUS_OK;
}
// TODO: GIL handling isnt really required here, i think
PEP_STATUS _notifyHandshake(pEp_identity *me, pEp_identity *partner, sync_handshake_signal signal)
{
pEpLog("called");
try {
PyGILState_STATE gil = PyGILState_Ensure();
pEpLog("GIL Aquired");
object modref = import("pEp");
object funcref = modref.attr("notify_handshake");
call<void>(funcref.ptr(), Identity(me), Identity(partner), signal);
PyGILState_Release(gil);
pEpLog("GIL released");
} catch (std::exception &e) {
}
object modref = import("pEp");
object funcref = modref.attr("notify_handshake");
call<void>(funcref.ptr(), Identity(me), Identity(partner), signal);
return PEP_STATUS_OK;
}
bool _do_protocol_step()
bool _do_protocol_step_from_queue()
{
pEpLog("called");
SYNC_EVENT event = Adapter::_cb_retrieve_next_sync_event_dequeue_next_sync_event(nullptr, 0);
SYNC_EVENT event = Adapter::_cb_retrieve_next_sync_event_dequeue_next_sync_event(
nullptr,
0);
if (event != NULL) {
::do_sync_protocol_step(Adapter::session(), event);
return true;
@ -204,8 +188,9 @@ namespace pEp {
for (int i = 0; i < boost::python::len(identities); ++i) {
Identity ident = extract<Identity>(identities[i]);
si = identity_list_add(si, ident);
if (!si)
if (!si) {
throw std::bad_alloc();
}
}
} catch (std::exception &ex) {
free_identity_list(shared_identities);
@ -221,6 +206,10 @@ namespace pEp {
_throw_status(status);
}
void _log(const std::string& msg) {
pEpLog(msg);
}
BOOST_PYTHON_MODULE(_pEp)
{
def("_init_callbackdispatcher", _init_callbackdispatcher);
@ -236,11 +225,11 @@ namespace pEp {
scope().attr("protocol_version") = get_protocol_version();
def("set_debug_log_enabled", &Adapter::pEpLog::set_enabled, "Switch debug logging on/off");
def("_log", _log);
def("_init_session", _init_session);
def("_free_session", _free_session);
def("_do_protocol_step", _do_protocol_step);
def("_do_protocol_step_from_queue", _do_protocol_step_from_queue);
def("_inject_sync_shutdown", Adapter::inject_sync_shutdown);
def("_notifyHandshake_sync_start", _notifyHandshake_sync_start);
def("_notifyHandshake_sync_stop", _notifyHandshake_sync_stop);

Loading…
Cancel
Save