COM-121
Volker Birk 4 years ago
parent 31ef02f2c9
commit 9c91e87568
  1. 124
      CpEpEngine.cpp
  2. 25
      CpEpEngine.h

@ -17,10 +17,6 @@ using namespace pEp::Adapter;
// keysync thread actually has finished before we're destructed.
std::mutex CpEpEngine::init_mutex;
std::list< IpEpEngineCallbacks * > CpEpEngine::all_callbacks;
std::mutex CpEpEngine::callbacks_mutex;
atomic< int > CpEpEngine::count = 0;
STDMETHODIMP CpEpEngine::InterfaceSupportsErrorInfo(REFIID riid)
@ -738,65 +734,87 @@ int CpEpEngine::examine_identity(pEp_identity *ident, void *management)
return _ident;
}
PEP_STATUS CpEpEngine::messageToSend(message *msg)
PEP_STATUS CpEpEngine::_messageToSend(message *msg, bool in_sync)
{
assert(msg);
if (!msg)
return PEP_ILLEGAL_VALUE;
lock_guard< mutex > lock(callbacks_mutex);
// use the first one
IpEpEngineCallbacks *cb = all_callbacks.front();
for (auto p = sync_callbacks.begin(); p != sync_callbacks.end(); ++p) {
IpEpEngineCallbacks *cb = p->pdata->unmarshaled;
if (cb) {
TextMessage _msg;
memset(&_msg, 0, sizeof(TextMessage));
text_message_from_C(&_msg, msg);
HRESULT r = cb->MessageToSend(&_msg);
assert(r == S_OK);
clear_text_message(&_msg);
if (r == E_OUTOFMEMORY)
return PEP_OUT_OF_MEMORY;
if (r != S_OK)
return PEP_UNKNOWN_ERROR;
}
}
TextMessage _msg;
memset(&_msg, 0, sizeof(TextMessage));
return PEP_STATUS_OK;
}
text_message_from_C(&_msg, msg);
HRESULT r = cb->MessageToSend(&_msg);
assert(r == S_OK);
clear_text_message(&_msg);
if (r == E_OUTOFMEMORY)
return PEP_OUT_OF_MEMORY;
if (r != S_OK)
return PEP_UNKNOWN_ERROR;
PEP_STATUS CpEpEngine::messageToSend(message *msg)
{
return _messageToSend(msg);
}
return PEP_STATUS_OK;
PEP_STATUS CpEpEngine::messageToSend_sync(message *msg)
{
return _messageToSend(msg, true);
}
PEP_STATUS CpEpEngine::notifyHandshake(::pEp_identity *self, ::pEp_identity *partner, sync_handshake_signal signal)
PEP_STATUS CpEpEngine::_notifyHandshake(::pEp_identity *self, ::pEp_identity *partner, sync_handshake_signal signal, bool in_sync)
{
assert(self && partner);
if (!(self && partner))
return PEP_ILLEGAL_VALUE;
lock_guard< mutex > lock(callbacks_mutex);
if (all_callbacks.size() == 0)
return PEP_SYNC_NO_NOTIFY_CALLBACK;
// fire all of them
for (auto i = all_callbacks.begin(); i != all_callbacks.end(); ++i) {
IpEpEngineCallbacks *cb = *i;
pEpIdentity _self;
copy_identity(&_self, self);
pEpIdentity _partner;
copy_identity(&_partner, partner);
SyncHandshakeSignal _signal = (SyncHandshakeSignal) signal;
SyncHandshakeResult result;
HRESULT r = cb->NotifyHandshake(&_self, &_partner, _signal, &result);
assert(r == S_OK);
clear_identity_s(_self);
clear_identity_s(_partner);
if (r == E_OUTOFMEMORY)
return PEP_OUT_OF_MEMORY;
for (auto p = sync_callbacks.begin(); p != sync_callbacks.end(); ++p) {
IpEpEngineCallbacks *cb = nullptr;
if (in_sync)
cb = p->cdata;
else
cb = p->pdata->unmarshaled;
if (cb) {
pEpIdentity _self;
copy_identity(&_self, self);
pEpIdentity _partner;
copy_identity(&_partner, partner);
SyncHandshakeSignal _signal = (SyncHandshakeSignal)signal;
SyncHandshakeResult result;
HRESULT r = cb->NotifyHandshake(&_self, &_partner, _signal, &result);
assert(r == S_OK);
clear_identity_s(_self);
clear_identity_s(_partner);
if (r == E_OUTOFMEMORY)
return PEP_OUT_OF_MEMORY;
}
}
return PEP_STATUS_OK;
}
PEP_STATUS CpEpEngine::notifyHandshake(::pEp_identity *self, ::pEp_identity *partner, sync_handshake_signal signal)
{
return _notifyHandshake(self, partner, signal);
}
PEP_STATUS CpEpEngine::notifyHandshake_sync(::pEp_identity *self, ::pEp_identity *partner, sync_handshake_signal signal)
{
return _notifyHandshake(self, partner, signal, true);
}
STDMETHODIMP CpEpEngine::BlacklistAdd(BSTR fpr)
{
assert(fpr);
@ -1340,10 +1358,12 @@ STDMETHODIMP CpEpEngine::RegisterCallbacks(IpEpEngineCallbacks* new_callbacks)
this->client_callbacks = new_callbacks;
new_callbacks->AddRef();
{
lock_guard< mutex > lock(callbacks_mutex);
all_callbacks.push_back(this->client_callbacks);
}
// provide callbacks to sync
LPSTREAM marshaled_callbacks;
auto result = CoMarshalInterThreadInterfaceInStream(IID_IpEpEngineCallbacks, client_callbacks, &marshaled_callbacks);
assert(SUCCEEDED(result));
assert(marshaled_callbacks);
sync_callbacks.insert(new MarshaledCallbacks({ this->client_callbacks, marshaled_callbacks }));
return S_OK;
}
@ -1355,13 +1375,11 @@ STDMETHODIMP CpEpEngine::UnregisterCallbacks()
if (!this->client_callbacks)
return S_FALSE;
{
lock_guard< mutex > lock(callbacks_mutex);
for (auto i = all_callbacks.begin(); i != all_callbacks.end(); ++i) {
if (*i == this->client_callbacks) {
all_callbacks.erase(i);
break;
}
for (auto p = sync_callbacks.begin(); p != sync_callbacks.end(); ++p) {
if (p->pdata->unmarshaled == this->client_callbacks) {
delete p->pdata;
sync_callbacks.erase(p);
break;
}
}

@ -11,6 +11,7 @@
#include <queue>
#include <mutex>
#include <vector>
#include "..\libpEpAdapter\pc_container.hh"
#if defined(_WIN32_WCE) && !defined(_CE_DCOM) && !defined(_CE_ALLOW_SINGLE_THREADED_OBJECTS_IN_MTA)
#error "Single-threaded COM objects are not properly supported on Windows CE platform, such as the Windows Mobile platforms that do not include full DCOM support. Define _CE_ALLOW_SINGLE_THREADED_OBJECTS_IN_MTA to force ATL to support creating single-thread COM object's and allow use of it's single-threaded COM object implementations. The threading model in your rgs file was set to 'Free' as that is the only threading model supported in non DCOM Windows CE platforms."
@ -85,7 +86,9 @@ public:
::register_examine_function(session(), CpEpEngine::examine_identity, (void *)this);
::log_event(session(), "Startup", "pEp COM Adapter", NULL, NULL);
startup(messageToSend, notifyHandshake);
startup<CpEpEngine>(messageToSend, notifyHandshake, messageToSend_sync, notifyHandshake_sync, this);
return S_OK;
}
@ -93,13 +96,19 @@ public:
{
}
protected:
typedef locked_queue<pEp_identity_cpp> identity_queue_t;
static ::pEp_identity * retrieve_next_identity(void *management);
static PEP_STATUS _messageToSend(message *msg, bool in_sync = false);
static PEP_STATUS _notifyHandshake(pEp_identity *self, pEp_identity *partner, sync_handshake_signal signal, bool in_sync = false);
static PEP_STATUS messageToSend(message *msg);
static PEP_STATUS notifyHandshake(pEp_identity *self, pEp_identity *partner, sync_handshake_signal signal);
static PEP_STATUS messageToSend_sync(message *msg);
static PEP_STATUS notifyHandshake_sync(pEp_identity *self, pEp_identity *partner, sync_handshake_signal signal);
HRESULT error(_bstr_t msg);
HRESULT error(_bstr_t msg, PEP_STATUS errorcode);
@ -113,6 +122,14 @@ protected:
}
private:
// callbacks for sync
struct MarshaledCallbacks {
IpEpEngineCallbacks *unmarshaled;
LPSTREAM marshaled;
};
static pEp::pc_container< MarshaledCallbacks, IpEpEngineCallbacks > sync_callbacks;
atomic< identity_queue_t * > identity_queue;
thread *keymanagement_thread;
bool verbose_mode;
@ -121,10 +138,6 @@ private:
bool client_last_signalled_polling_state = true;
static std::mutex init_mutex;
static std::list< IpEpEngineCallbacks * > all_callbacks;
static std::mutex callbacks_mutex;
static atomic< int > count;
public:

Loading…
Cancel
Save