forked from pEp.foundation/pEpPythonAdapter
538 lines
17 KiB
Python
538 lines
17 KiB
Python
"""
|
|
multipEp.py : multiple process python testing framework for pEp
|
|
|
|
= Command line switches =
|
|
|
|
wait_for_debug
|
|
Block and ask if debugger should be attached each time an instance
|
|
is started
|
|
|
|
debug_${instance_name}
|
|
Launch lldb in another terminal, and attach it to given intsance
|
|
immediately after instance startup.
|
|
|
|
debug_${instance_name}_${execution_number}
|
|
Launch lldb in another terminal, and attach it to given intsance
|
|
when instance is at some particular step in the test.
|
|
${execution_number} is found by reading test output.
|
|
|
|
only_${test_scenario_name}
|
|
Execute only given test scenario. Scenario with different name
|
|
are skipped.
|
|
|
|
libs_${instance_name}=/path/to/libs
|
|
Set LD_LIBRARY_PATH to given path before launching instance,
|
|
meant to allow selection of per-instance pEpEngines flavors
|
|
|
|
wait_for_cleanup
|
|
Block at the end of each test scenario, before deleting temporary
|
|
directory. It is meant to be able to examine keyring and DBs after
|
|
test finished or crashed.
|
|
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import multiprocessing
|
|
import importlib
|
|
import tempfile
|
|
import time
|
|
import types
|
|
import itertools
|
|
from copy import deepcopy
|
|
from collections import OrderedDict
|
|
|
|
# ----------------------------------------------------------------------------
|
|
# GLOBALS
|
|
# ----------------------------------------------------------------------------
|
|
|
|
# per-instance globals
|
|
sync_handler = None
|
|
own_addresses = []
|
|
indent = 0
|
|
i_name = ""
|
|
handshakes_pending = None
|
|
pEp = None
|
|
|
|
# manager globals
|
|
instances = None
|
|
if(multiprocessing.current_process().name == "MainProcess"):
|
|
ctx = multiprocessing.get_context('spawn')
|
|
# import pEp in main process to get enums
|
|
pEp = importlib.import_module("pEp")
|
|
|
|
# both side globals (managed by MP)
|
|
handshakes_seen = None
|
|
test_config = None
|
|
msgs_folders = None
|
|
|
|
# both side globals (not managed)
|
|
disable_sync = False
|
|
|
|
# ----------------------------------------------------------------------------
|
|
# INSTANCE ACTIONS
|
|
# ----------------------------------------------------------------------------
|
|
|
|
def create_account(address, name, flags=None):
|
|
global own_addresses
|
|
i = pEp.Identity(address, name)
|
|
if flags is not None:
|
|
i.flags = flags
|
|
pEp.myself(i)
|
|
own_addresses.append(address)
|
|
|
|
def _send_message(address, msg):
|
|
global msgs_folders
|
|
# list inside dict from MP manager are not proxified.
|
|
msgs = msgs_folders.get(address,[])
|
|
msg.sent = int(time.time())
|
|
msgs.append(str(msg))
|
|
msgs_folders[address] = msgs
|
|
|
|
def _encrypted_message(from_address, to_address, shortmsg, longmsg):
|
|
m = pEp.outgoing_message(pEp.Identity(from_address, from_address))
|
|
if type(to_address) != list :
|
|
to_address = [to_address]
|
|
m.to = [ pEp.Identity(address, address) for address in to_address ]
|
|
m.shortmsg = shortmsg
|
|
m.longmsg = longmsg
|
|
begin = time.time()
|
|
ret = m.encrypt()
|
|
end = time.time()
|
|
printi("ENCRYPTION TIME:", end - begin)
|
|
return ret
|
|
|
|
def encrypted_message(from_address, to_address, shortmsg, longmsg):
|
|
return str(_encrypted_message(from_address, to_address, shortmsg, longmsg))
|
|
|
|
def send_message(from_address, to_address, shortmsg, longmsg):
|
|
msg = _encrypted_message(from_address, to_address, shortmsg, longmsg)
|
|
if type(to_address) != list :
|
|
to_address = [to_address]
|
|
for address in to_address:
|
|
_send_message(address, msg)
|
|
|
|
def decrypt_message(msgstr):
|
|
msg = pEp.incoming_message(msgstr)
|
|
printi("--- decrypt()")
|
|
msg.recv = int(time.time())
|
|
printmsg(msg)
|
|
msg2, keys, rating, consumed, flags = msg.decrypt()
|
|
printi("->-", rating, "->-")
|
|
printmsg(msg2)
|
|
printi("---")
|
|
return rating
|
|
|
|
def simulate_timeout():
|
|
global sync_handler
|
|
sync_handler.onTimeout()
|
|
|
|
no_inbox_decrypt = [simulate_timeout, create_account]
|
|
# ----------------------------------------------------------------------------
|
|
# MANAGER ACTIONS
|
|
# ----------------------------------------------------------------------------
|
|
|
|
def flush_all_mails():
|
|
global msgs_folders
|
|
count = sum(map(len,msgs_folders.values()))
|
|
msgs_folders.clear()
|
|
return count
|
|
|
|
def restart_instance(iname):
|
|
tmpdir, instance_addresses = stop_instance(iname)
|
|
instances[iname] = start_instance(iname, tmpdir, instance_addresses)
|
|
|
|
def cycle_until_no_change(*instancelist, maxcycles=20):
|
|
count = 0
|
|
while True:
|
|
global msgs_folders
|
|
tmp = deepcopy(dict(msgs_folders))
|
|
for iname in instancelist:
|
|
action = (iname, [])
|
|
run_instance_action(action)
|
|
count += 1
|
|
|
|
if dict(msgs_folders) == tmp:
|
|
return count
|
|
|
|
if count >= maxcycles:
|
|
raise Exception("Too many cycles waiting for stability")
|
|
|
|
def disable_auto_handshake():
|
|
global test_config
|
|
test_config.disable_handshake = True
|
|
|
|
def enable_auto_handshake():
|
|
global test_config
|
|
test_config.disable_handshake = False
|
|
|
|
def expect(expectation):
|
|
def _expect(res, action):
|
|
if(expectation != res):
|
|
raise Exception("Expected " + str(expectation) + ", got " + str(res))
|
|
return _expect
|
|
|
|
# ----------------------------------------------------------------------------
|
|
# "PRETTY" PRINTING
|
|
# ----------------------------------------------------------------------------
|
|
|
|
def printi(*args):
|
|
global indent
|
|
print(i_name + ">" * indent, *args)
|
|
|
|
def printheader(blah=None):
|
|
global indent
|
|
if blah is None:
|
|
printi("-" * 80)
|
|
indent = indent - 1
|
|
else:
|
|
indent = indent + 1
|
|
printi("-" * (39 - int(len(blah)/2)) +
|
|
" " + blah + " " +
|
|
"-" * (39 - len(blah) + int(len(blah)/2)))
|
|
|
|
def printmsg(msg):
|
|
printi("from :", repr(msg.from_))
|
|
printi("to :", repr(msg.to))
|
|
printi("recv :", msg.recv)
|
|
printi("sent :", msg.sent)
|
|
printi("short :", msg.shortmsg)
|
|
printi("opt_fields :", msg.opt_fields)
|
|
lng = msg.longmsg.splitlines()
|
|
lngcut = lng[:40]+["[...]"] if len(lng)>40 else lng
|
|
pfx = "long : "
|
|
for l in lngcut :
|
|
printi(pfx + l)
|
|
pfx = " "
|
|
printi("attachments : ", msg.attachments)
|
|
|
|
# ----------------------------------------------------------------------------
|
|
# INSTANCE TEST EXECUTION
|
|
# ----------------------------------------------------------------------------
|
|
|
|
def execute_order(order):
|
|
global handshakes_pending, hanshakes_seen
|
|
global test_config, msgs_folders, own_addresses, sync_handler
|
|
|
|
func, args, kwargs, timeoff = order[0:] + [None, [], {}, 0][len(order):]
|
|
|
|
printheader("DECRYPT messages")
|
|
# decrypt every non-consumed message for all instance accounts
|
|
if func not in no_inbox_decrypt :
|
|
for own_address in own_addresses :
|
|
msgs_for_me = msgs_folders.get(own_address, [])
|
|
for msgstr in msgs_for_me:
|
|
msg = pEp.incoming_message(msgstr)
|
|
printi("--- decrypt()")
|
|
msg.recv = int(time.time() + timeoff)
|
|
printmsg(msg)
|
|
msg2, keys, rating, consumed, flags = msg.decrypt()
|
|
|
|
if consumed == "MESSAGE_CONSUME":
|
|
printi("--- PEP_MESSAGE_CONSUMED")
|
|
# folder may have changed in the meantime,
|
|
# remove item directly from latest version of it.
|
|
folder = msgs_folders[own_address]
|
|
folder.remove(msgstr)
|
|
msgs_folders[own_address] = folder
|
|
elif consumed == "MESSAGE_IGNORE":
|
|
printi("--- PEP_MESSAGE_DISCARDED")
|
|
else :
|
|
printi("->-", rating, "->-")
|
|
printmsg(msg2)
|
|
printi("---")
|
|
printheader()
|
|
|
|
res = None
|
|
if func is not None:
|
|
printheader("Executing instance function " + func.__name__)
|
|
printi("args :", args)
|
|
printi("kwargs :", kwargs)
|
|
res = func(*args,**kwargs)
|
|
printi("function " + func.__name__ + " returned :", res)
|
|
printheader()
|
|
|
|
if handshakes_pending and not test_config.disable_handshake :
|
|
printheader("check pending handshakes accepted on other device")
|
|
tw, partner, nth_seen = handshakes_pending
|
|
if handshakes_seen[tw] >= test_config.handshake_count_to_accept :
|
|
if nth_seen in [1, test_config.handshake_count_to_accept]:
|
|
# equiv to close dialog
|
|
handshakes_pending = None
|
|
printi("ACCEPT pending handshake : "+ tw)
|
|
sync_handler.deliverHandshakeResult(partner, 0)
|
|
# else dialog closed later by OVERTAKEN notification
|
|
printheader()
|
|
|
|
return res
|
|
|
|
def pEp_instance_run(iname, _own_addresses, conn, _msgs_folders, _handshakes_seen, _test_config):
|
|
global pEp, sync_handler, own_addresses, i_name, msgs_folders
|
|
global handshakes_pending
|
|
global handshakes_seen, test_config
|
|
|
|
# assign instance globals
|
|
own_addresses = _own_addresses
|
|
msgs_folders = _msgs_folders
|
|
handshakes_seen = _handshakes_seen
|
|
test_config = _test_config
|
|
i_name = iname
|
|
|
|
pEp = importlib.import_module("pEp")
|
|
|
|
class Handler(pEp.SyncMixIn):
|
|
def messageToSend(self, msg):
|
|
printheader("SYNC MESSAGE to send")
|
|
printmsg(msg)
|
|
printheader()
|
|
for rcpt in msg.to + msg.cc + msg.bcc:
|
|
_send_message(rcpt.address, msg)
|
|
|
|
def notifyHandshake(self, me, partner, signal):
|
|
global handshakes_pending
|
|
if test_config.disable_handshake :
|
|
printheader("HANDSHAKE disabled. Notification ignored")
|
|
printi(signal)
|
|
printheader()
|
|
return
|
|
|
|
if signal in [
|
|
pEp.sync_handshake_signal.SYNC_NOTIFY_INIT_ADD_OUR_DEVICE,
|
|
pEp.sync_handshake_signal.SYNC_NOTIFY_INIT_ADD_OTHER_DEVICE,
|
|
pEp.sync_handshake_signal.SYNC_NOTIFY_INIT_FORM_GROUP,
|
|
pEp.sync_handshake_signal.SYNC_NOTIFY_INIT_MOVE_OUR_DEVICE]:
|
|
printheader("show HANDSHAKE dialog")
|
|
printi(signal)
|
|
printi("handshake needed between " + repr(me) + " and " + repr(partner))
|
|
tw = pEp.trustwords(me, partner, 'en')
|
|
printi(tw)
|
|
|
|
# This is an error from pEpEngine if asked to open handshake dialog twice
|
|
if handshakes_pending:
|
|
raise Exception("Asked to open a second Sync Handshake Dialog !")
|
|
|
|
if tw in handshakes_seen :
|
|
handshakes_seen[tw] += 1
|
|
else:
|
|
handshakes_seen[tw] = 1
|
|
|
|
handshakes_pending = (tw,partner,handshakes_seen[tw])
|
|
printheader()
|
|
|
|
elif signal == pEp.sync_handshake_signal.SYNC_NOTIFY_OVERTAKEN:
|
|
if handshakes_pending:
|
|
tw, partner, nth_seen = handshakes_pending
|
|
printi("OVERTAKEN handshake : "+ tw)
|
|
handshakes_pending = None
|
|
else:
|
|
raise Exception("Asked to close a non existing Sync Handshake Dialog !")
|
|
else :
|
|
printheader("other HANDSHAKE notification - ignored")
|
|
printi(signal)
|
|
printheader()
|
|
|
|
def setTimeout(self, timeout):
|
|
printi("SET TIMEOUT :", timeout)
|
|
|
|
def cancelTimeout(self):
|
|
printi("CANCEL TIMEOUT")
|
|
return 42
|
|
|
|
if not disable_sync:
|
|
sync_handler = Handler()
|
|
|
|
while True:
|
|
order = conn.recv()
|
|
if order is None:
|
|
break
|
|
|
|
res = execute_order(order)
|
|
|
|
conn.send(res)
|
|
|
|
conn.send(own_addresses)
|
|
|
|
msgs_folders = None
|
|
|
|
def pEp_instance_main(iname, tmpdirname, *args):
|
|
# run with a dispensable $HOME to get fresh DB and PGP keyrings
|
|
print("Instance " + iname + " runs into " + tmpdirname)
|
|
os.environ['HOME'] = tmpdirname
|
|
|
|
pEp_instance_run(iname, *args)
|
|
print(iname + " exiting")
|
|
|
|
# ----------------------------------------------------------------------------
|
|
# MANAGER TEST EXECUTION
|
|
# ----------------------------------------------------------------------------
|
|
|
|
def start_debug(iname, proc):
|
|
print("#"*80 + "\n" +
|
|
"INSTANCE " + iname + "\n" +
|
|
"launching debugger attaching to process " +
|
|
str(proc.pid) + "\n" +
|
|
"#"*80 + "\n")
|
|
# TODO : linux terminal support
|
|
#import subprocess
|
|
#subprocess.call(['xterm', '-e', 'lldb', '-p', str(proc.pid)])
|
|
import appscript
|
|
appscript.app('Terminal').do_script('lldb -p ' + str(proc.pid))
|
|
time.sleep(2)
|
|
|
|
def start_instance(iname, tmpdir=None, instance_addresses = []):
|
|
global handshakes_seen, test_config, msgs_folders
|
|
|
|
given_libs = None
|
|
for a in sys.argv:
|
|
if a.startswith("libs_"+iname+"="):
|
|
given_libs = a.split("=")[1]
|
|
break
|
|
|
|
if tmpdir is None:
|
|
tmpdir = tempfile.TemporaryDirectory()
|
|
if given_libs is not None:
|
|
os.symlink(given_libs, os.path.join(tmpdir.name, "libs"))
|
|
|
|
if sys.platform.startswith('darwin'):
|
|
ld_env_name = 'DYLD_LIBRARY_PATH'
|
|
else:
|
|
ld_env_name = 'LD_LIBRARY_PATH'
|
|
|
|
orig_ld_env_val = None
|
|
if given_libs is not None:
|
|
orig_ld_env_val = os.environ.pop(ld_env_name, None)
|
|
os.environ[ld_env_name] = os.path.join(tmpdir.name, "libs")
|
|
|
|
conn, child_conn = ctx.Pipe()
|
|
proc = ctx.Process(
|
|
target=pEp_instance_main,
|
|
args=(iname, tmpdir.name, instance_addresses,
|
|
child_conn, msgs_folders,
|
|
handshakes_seen, test_config))
|
|
proc.start()
|
|
|
|
if orig_ld_env_val is not None:
|
|
os.environ[ld_env_name] = orig_ld_env_val
|
|
elif given_libs is not None:
|
|
os.environ.pop(ld_env_name)
|
|
|
|
debug = False
|
|
if "debug_"+iname in sys.argv :
|
|
debug = True
|
|
if not debug and "wait_for_debug" in sys.argv :
|
|
yes = input("#"*80 + "\n" +
|
|
"INSTANCE " + iname + "\n" +
|
|
"Enter y/yes/Y/YES to attach debugger to process " +
|
|
str(proc.pid) + "\nor just press ENTER\n" +
|
|
"#"*80 + "\n")
|
|
if yes in ["y", "Y", "yes" "YES"]:
|
|
debug = True
|
|
if debug :
|
|
start_debug(iname, proc)
|
|
|
|
return (proc, conn, tmpdir, 0)
|
|
|
|
def get_instance(iname):
|
|
global instances
|
|
if iname not in instances:
|
|
res = start_instance(iname)
|
|
instances[iname] = res
|
|
return res
|
|
else:
|
|
return instances[iname]
|
|
|
|
def stop_instance(iname):
|
|
proc, conn, tmpdir, execnt = instances.pop(iname)
|
|
# tell process to terminate
|
|
conn.send(None)
|
|
instance_addresses = conn.recv()
|
|
proc.join()
|
|
return tmpdir, instance_addresses
|
|
|
|
def purge_instances():
|
|
global instances
|
|
for iname in list(instances.keys()):
|
|
stop_instance(iname)
|
|
|
|
def run_instance_action(action):
|
|
iname, order = action
|
|
proc, conn, tmpdir, execnt = get_instance(iname)
|
|
execnt = execnt + 1
|
|
instances[iname] = (proc, conn, tmpdir, execnt)
|
|
debug_here_arg = "debug_"+iname+"_"+str(execnt)
|
|
print(iname, ": execution number :", execnt , "(add", debug_here_arg, "to args to debug from here)")
|
|
if debug_here_arg in sys.argv :
|
|
start_debug(iname, proc)
|
|
conn.send(order)
|
|
return conn.recv()
|
|
|
|
def run_manager_action(action):
|
|
func, args, kwargs = action[0:] + (None, [], {})[len(action):]
|
|
print("------------------------- Executing manager function -----------------------------")
|
|
print("function name :", func.__name__)
|
|
print("args :", args)
|
|
print("kwargs :", kwargs)
|
|
res = func(*args, **kwargs)
|
|
print("manager function " + func.__name__ + " returned :", res)
|
|
print("-" * 80)
|
|
return res
|
|
|
|
def run_scenario(scenario):
|
|
global pEp
|
|
|
|
for a in sys.argv:
|
|
if a.startswith("only_") and a != "only_" + scenario.__name__ :
|
|
print("IGNORING: " + scenario.__name__)
|
|
return
|
|
print("RUNNING: " + scenario.__name__)
|
|
|
|
global handshakes_seen, test_config, msgs_folders, instances
|
|
instances = OrderedDict()
|
|
with ctx.Manager() as manager:
|
|
msgs_folders = manager.dict()
|
|
handshakes_seen = manager.dict()
|
|
test_config = manager.Namespace(
|
|
disable_handshake=False,
|
|
handshake_count_to_accept=2)
|
|
|
|
sc = scenario()
|
|
t = None
|
|
try:
|
|
action = next(sc)
|
|
while True:
|
|
res = None
|
|
output = None
|
|
if len(action) > 1 and type(action[-1]) == types.FunctionType:
|
|
output = action[-1]
|
|
action = action[:-1]
|
|
|
|
if type(action[0]) == str:
|
|
res = run_instance_action(action)
|
|
else:
|
|
res = run_manager_action(action)
|
|
|
|
if output is not None:
|
|
output(res, action)
|
|
|
|
action = sc.send(res)
|
|
except StopIteration:
|
|
pass
|
|
except :
|
|
t,v,tv = sys.exc_info()
|
|
import traceback
|
|
print("EXCEPTION IN: " + scenario.__name__)
|
|
traceback.print_exc()
|
|
|
|
if "wait_for_cleanup" in sys.argv:
|
|
for iname,(proc, conn, tmpdir, execnt) in instances.items():
|
|
print("Instance " + iname + " waits into " + tmpdir.name)
|
|
input("#"*80 + "\n" +
|
|
"Press ENTER to cleanup\n" +
|
|
"#"*80 + "\n")
|
|
|
|
purge_instances()
|
|
|
|
if t:
|
|
raise t(v).with_traceback(tv)
|
|
|