Merge branch 'master' of ssh://gitea.pep.foundation:23065/pEp.foundation/libpEpAdapter

pull/8/head
Volker Birk 2 years ago
commit 25379daed8

@ -0,0 +1,41 @@
BasedOnStyle: LLVM
Language: Cpp
Standard: c++14
DerivePointerAlignment: true
SortIncludes: false
ReflowComments: false
PointerAlignment: Left
AlignAfterOpenBracket: AlwaysBreak
AlignOperands: AlignAfterOperator
BreakConstructorInitializers: AfterColon
AlignTrailingComments: true
AllowAllArgumentsOnNextLine: false
AllowAllParametersOfDeclarationOnNextLine: false
AllowShortEnumsOnASingleLine: false
AllowShortFunctionsOnASingleLine: Empty
AllowShortIfStatementsOnASingleLine: Never
AllowShortLoopsOnASingleLine: false
AlwaysBreakTemplateDeclarations: Yes
BinPackArguments: false
BinPackParameters: false
ExperimentalAutoDetectBinPacking: true
BreakBeforeBraces: Custom
BraceWrapping:
AfterFunction: true
ColumnLimit: 100
AllowAllConstructorInitializersOnNextLine: false
AlwaysBreakAfterDefinitionReturnType: None
AlwaysBreakAfterReturnType: None
PenaltyBreakBeforeFirstCallParameter: 0
PenaltyReturnTypeOnItsOwnLine: 1000000
PenaltyBreakAssignment: 1000000
PenaltyExcessCharacter: 10
IndentCaseLabels: true
IndentWidth: 4
MaxEmptyLinesToKeep: 2
NamespaceIndentation: All
SpaceAfterTemplateKeyword: false
AccessModifierOffset: -4
AllowShortBlocksOnASingleLine: Always
IndentPPDirectives: BeforeHash
Cpp11BracedListStyle: false

@ -1,154 +0,0 @@
// This file is under GNU General Public License 3.0
// see LICENSE.txt
#include "Adapter.hh"
#include <sstream>
#include <iomanip>
#include <assert.h>
#include "status_to_string.hh"
#include "pEpLog.hh"
#include "passphrase_cache.hh"
using namespace std;
thread_local pEp::Adapter::Session pEp::Adapter::session;
namespace pEp {
void throw_status(PEP_STATUS status)
{
if (status == PEP_STATUS_OK)
return;
if (status >= 0x400 && status <= 0x4ff)
return;
if (status == PEP_STATEMACHINE_CANNOT_SEND)
return;
if (status == PEP_OUT_OF_MEMORY)
throw bad_alloc();
if (status == PEP_ILLEGAL_VALUE)
throw invalid_argument("illegal value");
string _status = status_to_string(status);
throw RuntimeError(_status, status);
}
RuntimeError::RuntimeError(const std::string& _text, PEP_STATUS _status)
: std::runtime_error(_text.c_str()), text(_text), status(_status)
{
}
namespace Adapter {
messageToSend_t _messageToSend = nullptr;
notifyHandshake_t _notifyHandshake = nullptr;
std::thread _sync_thread;
::utility::locked_queue< SYNC_EVENT, ::free_Sync_event > sync_evt_q;
std::mutex m;
std::thread::id sync_thread_id()
{
return _sync_thread.get_id();
}
int _inject_sync_event(SYNC_EVENT ev, void *management)
{
try {
if (ev == nullptr) {
sync_evt_q.clear();
sync_evt_q.push_back(ev);
}
else {
sync_evt_q.push_front(ev);
}
}
catch (exception&) {
return 1;
}
return 0;
}
PEP_STATUS _ensure_passphrase(PEP_SESSION session, const char *fpr)
{
return passphrase_cache.ensure_passphrase(session, fpr);
}
// threshold: max waiting time in seconds
SYNC_EVENT _retrieve_next_sync_event(void *management, unsigned threshold)
{
SYNC_EVENT syncEvent = nullptr;
const bool success = sync_evt_q.try_pop_front(syncEvent, std::chrono::seconds(threshold));
if (!success) {
return new_sync_timeout_event();
}
return syncEvent;
}
bool on_sync_thread()
{
return _sync_thread.get_id() == this_thread::get_id();
}
PEP_SESSION Session::operator()(session_action action)
{
std::lock_guard<mutex> lock(m);
PEP_STATUS status = PEP_STATUS_OK;
switch (action) {
case release:
if (_session.get()) {
_session = nullptr;
}
break;
case init:
if (!_session.get()) {
PEP_SESSION session_;
status = ::init(&session_, _messageToSend, _inject_sync_event, _ensure_passphrase);
throw_status(status);
_session = SessionPtr{session_, ::release};
}
break;
default:
status = PEP_ILLEGAL_VALUE;
}
throw_status(status);
return _session.get();
}
void shutdown()
{
pEpLog("called");
if (_sync_thread.joinable()) {
pEpLog("sync_is_running - injecting null event");
_inject_sync_event(nullptr, nullptr);
_sync_thread.join();
}
}
bool is_sync_running()
{
return _sync_thread.joinable();
}
bool in_shutdown()
{
SYNC_EVENT ev;
try {
ev = sync_evt_q.back();
}
catch (std::underflow_error&) {
return false;
}
if (ev) {
return false;
} else {
return true;
}
}
}
}

@ -1,110 +0,0 @@
// This file is under GNU General Public License 3.0
// see LICENSE.txt
#ifndef LIBPEPADAPTER_ADAPTER_HXX
#define LIBPEPADAPTER_ADAPTER_HXX
#include <thread>
#include "locked_queue.hh"
#include <cassert>
#include "pEpLog.hh"
#include <atomic>
namespace pEp {
namespace Adapter {
using std::function;
extern messageToSend_t _messageToSend;
extern notifyHandshake_t _notifyHandshake;
extern std::thread _sync_thread;
extern ::utility::locked_queue< SYNC_EVENT, ::free_Sync_event > sync_evt_q;
extern std::mutex m;
SYNC_EVENT _retrieve_next_sync_event(void *management, unsigned threshold);
static std::exception_ptr _ex;
static std::atomic_bool register_done{false};
template< class T >
void sync_thread(T *obj, function< void(T *) > _startup, function< void(T *) > _shutdown)
{
pEpLog("called");
_ex = nullptr;
assert(_messageToSend);
assert(_notifyHandshake);
if (obj && _startup) {
_startup(obj);
}
pEpLog("creating session");
session();
{
//TODO: Do we need to use a passphraseWrap here???
pEpLog("register_sync_callbacks()");
PEP_STATUS status = register_sync_callbacks(session(), nullptr,
_notifyHandshake, _retrieve_next_sync_event);
pEpLog("register_sync_callbacks() return:" << status);
try {
throw_status(status);
register_done.store(true);
}
catch (...) {
_ex = std::current_exception();
register_done.store(true);
return;
}
}
pEpLog("sync protocol loop started");
do_sync_protocol(session(), (void *)obj);
pEpLog("sync protocol loop ended");
unregister_sync_callbacks(session());
session(release);
if (obj && _shutdown) {
_shutdown(obj);
}
}
template< class T >
void startup(
messageToSend_t messageToSend,
notifyHandshake_t notifyHandshake,
T *obj,
function< void(T *) > _startup,
function< void(T *) > _shutdown)
{
pEpLog("called");
if (messageToSend) {
_messageToSend = messageToSend;
}
if (notifyHandshake) {
_notifyHandshake = notifyHandshake;
}
pEpLog("creating session");
session();
if (!_sync_thread.joinable()) {
register_done.store(false);
pEpLog("creating sync-thread");
_sync_thread = std::thread(sync_thread<T>, obj, _startup, _shutdown);
while (register_done.load() == false) {
pEpLog("waiting for sync-thread to init...");
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
if (_ex) {
pEpLog("exception pending, rethrowing");
std::rethrow_exception(_ex);
}
}
}
}
}
#endif //LIBPEPADAPTER_ADAPTER_HXX

@ -3,45 +3,22 @@
# This file may be used under the terms of the GNU General Public License version 3
# see LICENSE.txt
include Makefile.conf
.PHONY: src test install uninstall clean
TARGET=libpEpAdapter.a
all: src
.PHONY: install uninstall clean
src:
$(MAKE) -C src
SOURCE=$(wildcard *.cc)
HEADERS=$(wildcard *.hh *.hxx)
OBJECTS=$(subst .cc,.o,$(SOURCE))
DEPENDS=$(subst .cc,.d,$(SOURCE))
CXXFLAGS+= -MMD -MP
all: $(TARGET)
ifneq ($(MAKECMDGOALS),clean)
-include $(DEPENDS)
endif
lib: $(TARGET)
all: lib
test: lib
test: src
$(MAKE) -C test
$(TARGET): $(OBJECTS)
$(AR) -rc $@ $^
clean:
rm -vf $(TARGET) $(OBJECTS) $(DEPENDS)
rm -f *.d.*
$(MAKE) -C src clean
$(MAKE) -C test clean
install: $(TARGET)
mkdir -p $(PREFIX)/include/pEp
mkdir -p $(PREFIX)/lib
cp -v $(HEADERS) $(PREFIX)/include/pEp/
cp -v $(TARGET) $(PREFIX)/lib/
install:
$(MAKE) -C src install
uninstall:
cd $(PREFIX)/include/pEp && rm -vf $(HEADERS)
cd $(PREFIX)/lib && rm -vf $(TARGET)
$(MAKE) -C src uninstall

@ -5,6 +5,7 @@
HERE:=$(dir $(lastword $(MAKEFILE_LIST)))
TARGET=libpEpAdapter.a
# Defaults
DEBUG=1
@ -12,7 +13,7 @@ PREFIX?=$(HOME)
ENGINE_LIB_PATH=$(PREFIX)/lib
ENGINE_INC_PATH=$(PREFIX)/include
CXXFLAGS+=-std=c++11 -fPIC
CXXFLAGS+=-std=c++14 -fPIC
# Build target
BUILD_FOR:=$(shell uname)

@ -1,26 +0,0 @@
// This file is under GNU General Public License 3.0
// see LICENSE.txt
#ifndef LIBPEPADAPTER_CALL_WITH_LOCK_HH
#define LIBPEPADAPTER_CALL_WITH_LOCK_HH
#include <mutex>
namespace pEp
{
extern std::mutex call_with_lock_mutex;
// TODO: use && and std::forward<> to avoid copying of the arguments.
// It is not relevant, yet, because at the moment we use this function template only
// for init() and release() which have cheap-to-copy pointer parameters only
template<class R, class... Args>
R call_with_lock( R(*fn)(Args...), Args... args)
{
std::lock_guard<std::mutex> L(call_with_lock_mutex);
return fn(args...);
}
}
#endif // LIBPEPADAPTER_CALL_WITH_LOCK_HH

@ -1,23 +0,0 @@
// This file is under GNU General Public License 3.0
// see LICENSE.txt
#include "constant_time_algo.hh"
namespace pEp
{
bool constant_time_equal(const std::string& a, const std::string& b)
{
if(a.size() != b.size())
return false;
unsigned d = 0;
for(std::size_t idx = 0; idx<a.size(); ++idx)
{
d |= ( static_cast<unsigned>(a[idx]) ^ static_cast<unsigned>(b[idx]) );
}
// if d is still 0, the strings are equal.
return d == 0;
}
} // end of namespace pEp

@ -1,135 +0,0 @@
// This file is under GNU General Public License 3.0
// see LICENSE.txt
#ifndef LIBPEPADAPTER_MESSAGE_CACHE_HH
#define LIBPEPADAPTER_MESSAGE_CACHE_HH
#include <string>
#include <unordered_map>
#include <mutex>
#include <pEp/message_api.h>
#include <pEp/mime.h>
namespace pEp {
class MessageCache {
struct cache_entry {
cache_entry(::message *s, ::message *d)
: src(s), dst(d) { }
::message *src;
::message *dst;
};
using cache = std::unordered_map<std::string, cache_entry>;
cache _cache;
std::mutex _mtx;
long long id_range = 42;
long long next_id = 23;
public:
MessageCache();
enum which { msg_src = 0, msg_dst = 1 };
static PEP_STATUS cache_mime_decode_message(
const char *mimetext,
size_t size,
message **msg,
bool* has_possible_pEp_msg
);
static PEP_STATUS cache_mime_encode_message(
int one,
const message * msg,
bool omit_fields,
char **mimetext,
bool has_pEp_msg_attachment
);
static PEP_STATUS cache_decrypt_message(
PEP_SESSION session,
message *src,
message **dst,
stringlist_t **keylist,
PEP_rating *rating,
PEP_decrypt_flags_t *flags
);
static PEP_STATUS cache_encrypt_message(
PEP_SESSION session,
message *src,
stringlist_t *extra,
message **dst,
PEP_enc_format enc_format,
PEP_encrypt_flags_t flags
);
static PEP_STATUS cache_encrypt_message_for_self(
PEP_SESSION session,
pEp_identity* target_id,
message *src,
stringlist_t* extra,
message **dst,
PEP_enc_format enc_format,
PEP_encrypt_flags_t flags
);
static PEP_STATUS cache_release(std::string id);
static void removeCacheID(::message* msg);
protected:
void release(std::string id);
PEP_STATUS mime_decode_message(
const char *mimetext,
size_t size,
message **msg,
bool* has_possible_pEp_msg
);
PEP_STATUS mime_encode_message(
which one,
const message * src,
bool omit_fields,
char **mimetext,
bool has_pEp_msg_attachment
);
PEP_STATUS decrypt_message(
PEP_SESSION session,
message *src,
message **dst,
stringlist_t **keylist,
PEP_rating *rating,
PEP_decrypt_flags_t *flags
);
PEP_STATUS encrypt_message(
PEP_SESSION session,
message *src,
stringlist_t *extra,
message **dst,
PEP_enc_format enc_format,
PEP_encrypt_flags_t flags
);
PEP_STATUS encrypt_message_for_self(
PEP_SESSION session,
pEp_identity* target_id,
message *src,
stringlist_t* extra,
message **dst,
PEP_enc_format enc_format,
PEP_encrypt_flags_t flags
);
void generateCacheID(::message* msg);
static std::string cacheID(const ::message* msg);
};
extern MessageCache message_cache;
};
#endif // LIBPEPADAPTER_MESSAGE_CACHE_HH

@ -1,40 +0,0 @@
// This file is under GNU General Public License 3.0
// see LICENSE.txt
#include "pEpLog.hh"
#include <iostream>
#include <sstream>
#include <mutex>
#include <atomic>
namespace pEp {
namespace Adapter {
namespace pEpLog {
std::mutex mtx;
std::atomic_bool is_enabled{false};
void set_enabled(bool enabled) {
is_enabled.store(enabled);
}
bool get_enabled() {
return is_enabled.load();
}
void log(std::string msg) {
if (is_enabled.load()) {
std::lock_guard<std::mutex> l(mtx);
#ifdef ANDROID
__android_log_print(ANDROID_LOG_DEBUG, "pEpDebugLog", "%s", msg.c_str());
#else
std::cout << msg << std::endl; //std::endl also flushes
#endif
}
}
} // pEpLog
} // Adapter
} // pEp

@ -1,25 +0,0 @@
// This file is under GNU General Public License 3.0
// see LICENSE.txt
#include "slurp.hh"
#include <fstream>
#include <sstream>
#include <stdexcept>
namespace pEp
{
std::string slurp(const std::string& filename)
{
std::ifstream input(filename.c_str(), std::ios_base::binary);
if(!input)
{
throw std::runtime_error("Cannot read file \"" + filename + "\"! ");
}
std::stringstream sstr;
sstr << input.rdbuf();
return sstr.str();
}
} // end of namespace pEp

@ -0,0 +1,242 @@
// This file is under GNU General Public License 3.0
// see LICENSE.txt
#include "Adapter.hh"
#include <sstream>
#include <iomanip>
#include <assert.h>
#include "status_to_string.hh"
#include "pEpLog.hh"
#include "passphrase_cache.hh"
using namespace std;
thread_local pEp::Adapter::Session pEp::Adapter::session;
namespace pEp {
void throw_status(::PEP_STATUS status)
{
if (status == ::PEP_STATUS_OK) {
return;
}
if (status >= 0x400 && status <= 0x4ff) {
return;
}
if (status == ::PEP_STATEMACHINE_CANNOT_SEND) {
return;
}
if (status == ::PEP_OUT_OF_MEMORY) {
throw bad_alloc();
}
if (status == ::PEP_ILLEGAL_VALUE) {
throw invalid_argument("illegal value");
}
string _status = status_to_string(status);
throw RuntimeError(_status, status);
}
RuntimeError::RuntimeError(const std::string &_text, ::PEP_STATUS _status)
: std::runtime_error(_text.c_str()), text(_text), status(_status)
{
}
namespace Adapter {
// private
SyncModes _sync_mode = SyncModes::Async;
::messageToSend_t _messageToSend = nullptr;
::notifyHandshake_t _notifyHandshake = nullptr;
bool _adapter_manages_sync_thread = false;
::inject_sync_event_t _inject_action = _queue_sync_event;
std::thread _sync_thread;
::utility::locked_queue<SYNC_EVENT, ::free_Sync_event> sync_evt_q;
std::mutex mut;
// private
std::thread::id sync_thread_id()
{
return _sync_thread.get_id();
}
// public
void sync_initialize(
SyncModes mode,
::messageToSend_t messageToSend,
::notifyHandshake_t notifyHandshake,
bool adapter_manages_sync_thread)
{
_messageToSend = messageToSend;
_notifyHandshake = notifyHandshake;
_adapter_manages_sync_thread = adapter_manages_sync_thread;
set_sync_mode(mode);
return;
}
// public
void set_sync_mode(SyncModes mode)
{
// std::lock_guard<mutex> lock(mut);
_sync_mode = mode;
if (_sync_mode == SyncModes::Sync) {
// init sesssion with inject_sync = process
// stop sync
session(release);
_inject_action = _process_sync_event;
session(init);
::register_sync_callbacks(session(), nullptr, _notifyHandshake, _retrieve_next_sync_event);
if(!_adapter_manages_sync_thread) {
shutdown();
} else {
// The adapter need to shutdown sync thread
}
}
if (_sync_mode == SyncModes::Async) {
// init session with inject_sync = queue
// start sync thread
session(release);
_inject_action = _queue_sync_event;
session(init);
if(!_adapter_manages_sync_thread) {
if (!is_sync_running()) {
startup<void>(_messageToSend, _notifyHandshake, nullptr, nullptr);
}
} else {
// The adapter need to do sync thread start up
}
}
if (_sync_mode == SyncModes::Off) {
// init sesssion with inject_sync = null
// stop sync thread
if(!_adapter_manages_sync_thread) {
shutdown();
} else {
// Adapter needs to shutdown sync thread
}
session(release);
_inject_action = _queue_sync_event;
session(init);
}
return;
}
// private
int _process_sync_event(::SYNC_EVENT ev, void *management)
{
if (ev != nullptr) {
::do_sync_protocol_step(session(), nullptr, ev);
return 0;
} else {
return 0;
}
}
// private
int _queue_sync_event(::SYNC_EVENT ev, void *management)
{
try {
if (ev == nullptr) {
sync_evt_q.clear();
sync_evt_q.push_back(ev);
} else {
sync_evt_q.push_front(ev);
}
} catch (exception &) {
return 1;
}
return 0;
}
// private
PEP_STATUS _ensure_passphrase(::PEP_SESSION session, const char *fpr)
{
return passphrase_cache.ensure_passphrase(session, fpr);
}
// public
::SYNC_EVENT _retrieve_next_sync_event(void *management, unsigned threshold)
{
::SYNC_EVENT syncEvent = nullptr;
const bool success = sync_evt_q.try_pop_front(syncEvent, std::chrono::seconds(threshold));
if (!success) {
return ::new_sync_timeout_event();
}
return syncEvent;
}
// public
bool on_sync_thread()
{
return _sync_thread.get_id() == this_thread::get_id();
}
// public
::PEP_SESSION Session::operator()(session_action action)
{
std::lock_guard<mutex> lock(mut);
::PEP_STATUS status = ::PEP_STATUS_OK;
switch (action) {
case release:
if (_session.get()) {
_session = nullptr;
}
break;
case init:
if (!_session.get()) {
::PEP_SESSION session_;
status = ::init(&session_, _messageToSend, _inject_action, _ensure_passphrase);
throw_status(status);
_session = SessionPtr{session_, ::release};
}
break;
default:
status = ::PEP_ILLEGAL_VALUE;
}
throw_status(status);
return _session.get();
}
// public
void shutdown()
{
pEpLog("called");
if (_sync_thread.joinable()) {
pEpLog("sync_is_running - injecting null event");
_queue_sync_event(nullptr, nullptr);
_sync_thread.join();
}
}
// public
bool is_sync_running()
{
if(!_adapter_manages_sync_thread) {
return _sync_thread.joinable();
} else {
return false;
}
}
// public
// Works even if adapter is managing sync thread, BUT must be using this queue
bool in_shutdown()
{
SYNC_EVENT ev;
try {
ev = sync_evt_q.back();
} catch (std::underflow_error &) {
return false;
}
if (ev) {
return false;
} else {
return true;
}
}
} // namespace Adapter
} // namespace pEp

@ -5,10 +5,10 @@
#define LIBPEPADAPTER_ADAPTER_HH
#include <functional>
#include <memory>
#include <stdexcept>
#include <string>
#include <thread>
#include <stdexcept>
#include <memory>
#include <pEp/sync_api.h>
@ -17,26 +17,43 @@ namespace pEp {
// throws std::bad_alloc if status==PEP_OUT_OF_MEMORY,
// throws std::invalid_argument if status==PEP_ILLEGAL_VALUE,
// throws RuntimeError when 'status' represents another exceptional value.
void throw_status(PEP_STATUS status);
void throw_status(::PEP_STATUS status);
struct RuntimeError : std::runtime_error {
RuntimeError(const std::string& _text, PEP_STATUS _status);
RuntimeError(const std::string &_text, ::PEP_STATUS _status);
std::string text;
PEP_STATUS status;
::PEP_STATUS status;
};
namespace Adapter {
int _inject_sync_event(SYNC_EVENT ev, void *management);
PEP_STATUS _ensure_passphrase(PEP_SESSION session, const char *fpr);
// public
enum class SyncModes
{
Off,
Sync,
Async
};
void sync_initialize(
SyncModes mode,
::messageToSend_t messageToSend,
::notifyHandshake_t notifyHandshake,
bool adapter_manages_sync_thread);
void set_sync_mode(SyncModes mode);
int _queue_sync_event(::SYNC_EVENT ev, void *management);
int _process_sync_event(::SYNC_EVENT ev, void *management);
::PEP_STATUS _ensure_passphrase(::PEP_SESSION session, const char *fpr);
template<class T = void>
void startup(
messageToSend_t messageToSend,
notifyHandshake_t notifyHandshake,
T *obj = nullptr,
std::function< void (T *) > _startup = nullptr,
std::function< void (T *) > _shutdown = nullptr
);
std::function<void(T *)> _startup = nullptr,
std::function<void(T *)> _shutdown = nullptr);
// returns 'true' when called from the "sync" thread, 'false' otherwise.
bool on_sync_thread();
@ -44,9 +61,10 @@ namespace pEp {
// returns the thread id of the sync thread
std::thread::id sync_thread_id();
enum session_action {
enum session_action
{
init,
release
release,
};
class Session {
@ -59,15 +77,15 @@ namespace pEp {
extern thread_local Session session;
// injects a NULL event into sync_event_queue to denote sync thread to shutdown,
// and joins & removes the sync thread
// injects a NULL event into sync_event_queue to denote sync thread to
// shutdown, and joins & removes the sync thread
void shutdown();
bool is_sync_running();
bool in_shutdown();
}
}
} // namespace Adapter
} // namespace pEp
#include "Adapter.hxx"
#endif //LIBPEPADAPTER_ADAPTER_HH
#endif // LIBPEPADAPTER_ADAPTER_HH

@ -0,0 +1,148 @@
// This file is under GNU General Public License 3.0
// see LICENSE.txt
#ifndef LIBPEPADAPTER_ADAPTER_HXX
#define LIBPEPADAPTER_ADAPTER_HXX
#include <thread>
#include "locked_queue.hh"
#include <cassert>
#include "pEpLog.hh"
#include <atomic>
namespace pEp {
namespace Adapter {
using std::function;
extern ::messageToSend_t _messageToSend;
extern ::notifyHandshake_t _notifyHandshake;
extern std::thread _sync_thread;
extern ::utility::locked_queue<::SYNC_EVENT, ::free_Sync_event> sync_evt_q;
extern std::mutex mut;
::SYNC_EVENT _retrieve_next_sync_event(void *management, unsigned threshold);
static std::exception_ptr _ex;
static std::atomic_bool register_done{false};
/*
* Sync Thread
* 1. Execute registered startup function
* 2. Create session for the sync thread (registers: messageToSend, inject_sync_event, ensure_passphrase)
* 3. register_sync_callbacks() (registers: _notifyHandshake, _retrieve_next_sync_event)
* 4. Enter Sync Event Dispatching Loop (do_sync_protocol())
* 5. unregister_sync_callbacks()
* 6. Release the session
* 7. Execute registered shutdown function
*/
// private
template<class T>
void sync_thread(T *obj, function<void(T *)> _startup, function<void(T *)> _shutdown)
{
pEpLog("called");
_ex = nullptr;
assert(_messageToSend);
assert(_notifyHandshake);
// 1. Execute registered startup function
if (obj && _startup) {
_startup(obj);
}
pEpLog("creating session for the sync thread");
// 2. Create session for the sync thread
session();
// 3. register_sync_callbacks()
{
// TODO: Do we need to use a passphraseWrap here???
pEpLog("register_sync_callbacks()");
::PEP_STATUS status = ::register_sync_callbacks(
session(),
nullptr,
_notifyHandshake,
_retrieve_next_sync_event);
pEpLog("register_sync_callbacks() return:" << status);
// Convert status into exception and store it
// set register_done AFTER that
try {
throw_status(status);
register_done.store(true);
} catch (...) {
_ex = std::current_exception();
register_done.store(true);
return;
}
}
pEpLog("sync protocol loop started");
// 4. Enter Sync Event Dispatching Loop (do_sync_protocol())
::do_sync_protocol(session(), (void *)obj);
pEpLog("sync protocol loop ended");
// 5. unregister_sync_callbacks()
unregister_sync_callbacks(session());
// 6. Release the session
// TODO: Maybe do that AFTER shutdown?
session(release);
// 7. Execute registered shutdown function
if (obj && _shutdown) {
_shutdown(obj);
}
}
/*
* Sync Thread Startup
* 1. ensure session for the main thread (registers: messageToSend, _queue_sync_event, _ensure_passphrase)
* 2. Start the sync thread
* 3. Defer execution until sync thread register_sync_callbacks() has returned
* 4. Throw pending exception from the sync thread
*/
// private
template<class T>
void startup(
::messageToSend_t messageToSend,
::notifyHandshake_t notifyHandshake,
T *obj,
function<void(T *)> _startup,
function<void(T *)> _shutdown)
{
pEpLog("called");
if (messageToSend) {
_messageToSend = messageToSend;
}
if (notifyHandshake) {
_notifyHandshake = notifyHandshake;
}
pEpLog("ensure session for the main thread");
// 1. re-initialize session for the main thread (registers: messageToSend, _queue_sync_event, _ensure_passphrase)
session(release);
session(init);
if (!_sync_thread.joinable()) {
register_done.store(false);
pEpLog("creating sync-thread");
// 2. Start the sync thread
_sync_thread = std::thread(sync_thread<T>, obj, _startup, _shutdown);
// 3. Defer execution until sync thread register_sync_callbacks() has returned
while (register_done.load() == false) {
pEpLog("waiting for sync-thread to init...");
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
// 4. Throw pending exception from the sync thread
if (_ex) {
pEpLog("exception pending, rethrowing");
std::rethrow_exception(_ex);
}
}
}
} // namespace Adapter
} // namespace pEp
#endif //LIBPEPADAPTER_ADAPTER_HXX

@ -0,0 +1,37 @@
# Copyright 2018, pEp Foundation
# This file is part of lib pEp Adapter
# This file may be used under the terms of the GNU General Public License version 3
# see LICENSE.txt
include ../Makefile.conf
SOURCE=$(wildcard *.cc)
HEADERS=$(wildcard *.hh *.hxx)
OBJECTS=$(subst .cc,.o,$(SOURCE))
DEPENDS=$(subst .cc,.d,$(SOURCE))
CXXFLAGS+= -MMD -MP
ifneq ($(MAKECMDGOALS),clean)
-include $(DEPENDS)
endif
.PHONY: install uninstall clean
all: $(TARGET)
$(TARGET): $(OBJECTS)
$(AR) -rc $@ $^
clean:
rm -vf $(TARGET) $(OBJECTS) $(DEPENDS)
rm -f *.d.*
install: $(TARGET)
mkdir -p $(PREFIX)/include/pEp
mkdir -p $(PREFIX)/lib
cp -v $(HEADERS) $(PREFIX)/include/pEp/
cp -v $(TARGET) $(PREFIX)/lib/
uninstall:
cd $(PREFIX)/include/pEp && rm -vf $(HEADERS)
cd $(PREFIX)/lib && rm -vf $(TARGET)

@ -19,7 +19,7 @@ namespace pEp {
std::atomic_bool _stop;
public:
Semaphore() : _stop(false) { }
Semaphore() : _stop(false) {}
void stop()
{
@ -34,7 +34,7 @@ namespace pEp {
return;
}
while(_stop.load()) {
while (_stop.load()) {
cv.wait(lock);
}
}
@ -46,8 +46,6 @@ namespace pEp {
cv.notify_all();
}
};
}
} // namespace pEp
#endif // LIBPEPADAPTER_SEMAPHORE_HH

@ -3,7 +3,6 @@
#include "call_with_lock.hh"
namespace pEp
{
std::mutex call_with_lock_mutex;
namespace pEp {
std::mutex call_with_lock_mutex;
}

@ -0,0 +1,26 @@
// This file is under GNU General Public License 3.0
// see LICENSE.txt
#ifndef LIBPEPADAPTER_CALL_WITH_LOCK_HH
#define LIBPEPADAPTER_CALL_WITH_LOCK_HH
#include <mutex>
namespace pEp {
extern std::mutex call_with_lock_mutex;
// TODO: use && and std::forward<> to avoid copying of the arguments.
// It is not relevant, yet, because at the moment we use this function
// template only for init() and release() which have cheap-to-copy pointer
// parameters only
template<class R, class... Args>
R call_with_lock(R (*fn)(Args...), Args... args)
{
std::lock_guard<std::mutex> L(call_with_lock_mutex);
return fn(args...);
}
} // namespace pEp
#endif // LIBPEPADAPTER_CALL_WITH_LOCK_HH

@ -14,18 +14,19 @@ namespace pEp {
return callback_dispatcher._messageToSend(msg);
}
PEP_STATUS CallbackDispatcher::notifyHandshake(::pEp_identity *me,
::pEp_identity *partner, ::sync_handshake_signal signal)
PEP_STATUS CallbackDispatcher::notifyHandshake(
::pEp_identity *me,
::pEp_identity *partner,
::sync_handshake_signal signal)
{
return callback_dispatcher._notifyHandshake(me, partner, signal);
}
void CallbackDispatcher::add(
::messageToSend_t messageToSend,
::notifyHandshake_t notifyHandshake,
proc on_startup,
proc shutdown
)
::messageToSend_t messageToSend,
::notifyHandshake_t notifyHandshake,
proc on_startup,
proc shutdown)
{
assert(messageToSend);
if (!messageToSend) {
@ -77,10 +78,12 @@ namespace pEp {
pEpLog("called");
callback_dispatcher.semaphore.go();
pEp::Adapter::startup<CallbackDispatcher>(CallbackDispatcher::messageToSend,
CallbackDispatcher::notifyHandshake, &callback_dispatcher,
&CallbackDispatcher::on_startup,
&CallbackDispatcher::on_shutdown);
pEp::Adapter::startup<CallbackDispatcher>(
CallbackDispatcher::messageToSend,
CallbackDispatcher::notifyHandshake,
&callback_dispatcher,
&CallbackDispatcher::on_startup,
&CallbackDispatcher::on_shutdown);
pEpLog("all targets signal: SYNC_NOTIFY_START");
for (auto target : callback_dispatcher.targets) {
@ -143,8 +146,10 @@ namespace pEp {
return PEP_STATUS_OK;
}
PEP_STATUS CallbackDispatcher::_notifyHandshake(::pEp_identity *me,
::pEp_identity *partner, ::sync_handshake_signal signal)
PEP_STATUS CallbackDispatcher::_notifyHandshake(
::pEp_identity *me,
::pEp_identity *partner,
::sync_handshake_signal signal)
{
for (auto target : targets) {
if (target.notifyHandshake) {
@ -169,5 +174,4 @@ namespace pEp {
return PEP_STATUS_OK;
}
};
}; // namespace pEp

@ -34,8 +34,7 @@ namespace pEp {
::messageToSend_t messageToSend,
::notifyHandshake_t notifyHandshake,
proc on_startup = nullptr,
proc on_shutdown = nullptr
);
proc on_shutdown = nullptr);
void remove(::messageToSend_t messageToSend);
static void start_sync();
@ -45,8 +44,8 @@ namespace pEp {
static PEP_STATUS notifyHandshake(
::pEp_identity *me,
::pEp_identity *partner,
::sync_handshake_signal signal
);
::sync_handshake_signal signal);
protected:
void on_startup();
void on_shutdown();
@ -55,13 +54,12 @@ namespace pEp {
PEP_STATUS _notifyHandshake(
::pEp_identity *me,
::pEp_identity *partner,
::sync_handshake_signal signal
);
::sync_handshake_signal signal);
friend const char *PassphraseCache::add(const std::string& passphrase);
friend const char *PassphraseCache::add(const std::string &passphrase);
};
extern CallbackDispatcher callback_dispatcher;
}
} // namespace pEp
#endif // LIBPEPADAPTER_CALLBACK_DISPATCHER_HH

@ -0,0 +1,21 @@
// This file is under GNU General Public License 3.0
// see LICENSE.txt
#include "constant_time_algo.hh"
namespace pEp {
bool constant_time_equal(const std::string &a, const std::string &b)
{
if (a.size() != b.size())
return false;
unsigned d = 0;
for (std::size_t idx = 0; idx < a.size(); ++idx) {
d |= (static_cast<unsigned>(a[idx]) ^ static_cast<unsigned>(b[idx]));
}
// if d is still 0, the strings are equal.
return d == 0;
}
} // end of namespace pEp

@ -6,14 +6,13 @@
#include <string>
namespace pEp
{
namespace pEp {
// Returns false if a.size() != b.size().
// Compares always _all_ characters of 'a' and 'b' so runtime does not
// depends on the character position where the strings differ.
// Use this function instead of operator== if timing sidechannel attack
// might be a security problem.
bool constant_time_equal(const std::string& a, const std::string& b);
bool constant_time_equal(const std::string &a, const std::string &b);
} // end of namespace pEp

@ -8,17 +8,15 @@
#include <condition_variable>
#include <mutex>
namespace utility
{
template<class T, void(*Deleter)(T) = nullptr>
class locked_queue
{
typedef std::recursive_mutex Mutex;
typedef std::unique_lock<Mutex> Lock;
int _waiting = 0;
Mutex _mtx;
std::condition_variable_any _cv;
namespace utility {
template<class T, void (*Deleter)(T) = nullptr>
class locked_queue {
typedef std::recursive_mutex Mutex;
typedef std::unique_lock<Mutex> Lock;
int _waiting = 0;
Mutex _mtx;
std::condition_variable_any _cv;
std::deque<T> _q;
public:
@ -30,10 +28,8 @@ namespace utility
void clear()
{
Lock L(_mtx);
if(Deleter != nullptr)
{
for(auto q : _q)
{
if (Deleter != nullptr) {
for (auto q : _q) {
Deleter(q);
}
}
@ -63,7 +59,7 @@ namespace utility
T pop_back()
{
Lock L(_mtx);
_cv.wait(L, [&]{ return !_q.empty(); } );
_cv.wait(L, [&] { return !_q.empty(); });
T r{std::move(_q.back())};
_q.pop_back();
return r;
@ -74,7 +70,7 @@ namespace utility
T pop_front()
{
Lock L(_mtx);
_cv.wait(L, [&]{ return !_q.empty(); } );
_cv.wait(L, [&] { return !_q.empty(); });
T r{std::move(_q.front())};
_q.pop_front();
return r;
@ -86,30 +82,28 @@ namespace utility
{
Lock L(_mtx);
++_waiting;
if(! _cv.wait_until(L, end_time, [this]{ return !_q.empty(); } ) )
{
if (!_cv.wait_until(L, end_time, [this] { return !_q.empty(); })) {
--_waiting;
return false;
}
--_waiting;
out = std::move(_q.back());
_q.pop_back();
return true;
}
// returns true and set a copy of the first element and pop it from queue if there is any
// returns false and leaves 'out' untouched if queue is empty even after 'end_time'
bool try_pop_front(T& out, std::chrono::steady_clock::time_point end_time)
{
Lock L(_mtx);
++_waiting;
if(! _cv.wait_until(L, end_time, [this]{ return !_q.empty(); } ) )
{
if (!_cv.wait_until(L, end_time, [this] { return !_q.empty(); })) {
--_waiting;
return false;
}