You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

278 lines
8.8 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
4 years ago
4 years ago
4 years ago
4 years ago
3 years ago
3 years ago
3 years ago
3 years ago
4 years ago
4 years ago
4 years ago
3 years ago
4 years ago
4 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
4 years ago
3 years ago
3 years ago
3 years ago
3 years ago
4 years ago
  1. # vim: set fileencoding=utf-8 :
  2. """start a handshake test for one simulated device
  3. this is being started once per device by sync_test.py
  4. you can use this manually for debugging purposes
  5. For debugging try:
  6. $ cd $DEV && HOME=$PWD lldb python3 -- ../sync_handshake.py -e $DEV
  7. """
  8. # Sync test 2.0
  9. # Copyleft 2018, 2019, p≡p foundation
  10. # this file is under GNU General Public License 3.0
  11. import pathlib
  12. import os
  13. import sys
  14. import re
  15. import pEp
  16. import minimail
  17. from datetime import datetime
  18. from time import sleep
  19. try:
  20. from termcolor import colored
  21. except:
  22. colored = lambda x, y: x
  23. try:
  24. from lxml import etree
  25. from lxml import objectify
  26. except:
  27. etree = None
  28. inbox = pathlib.Path("..") / "TestInbox"
  29. device_name = ""
  30. output = print
  31. multithreaded = False
  32. DONT_TRIGGER_SYNC = 0x200
  33. SYNC_HANDSHAKE_ACCEPTED = 0
  34. SYNC_HANDSHAKE_REJECTED = 1
  35. the_end = False
  36. end_on = [
  37. pEp.sync_handshake_signal.SYNC_NOTIFY_ACCEPTED_DEVICE_ADDED,
  38. pEp.sync_handshake_signal.SYNC_NOTIFY_ACCEPTED_GROUP_CREATED,
  39. pEp.sync_handshake_signal.SYNC_NOTIFY_ACCEPTED_DEVICE_ACCEPTED,
  40. ]
  41. def print_msg(p):
  42. if isinstance(p, pathlib.Path):
  43. if p.name[:5] == "Phone":
  44. color = "red"
  45. elif p.name[:6] == "Laptop":
  46. color = "green"
  47. else:
  48. color = "cyan"
  49. with open(p, "r") as f:
  50. t = f.read(-1)
  51. msg = pEp.Message(t)
  52. print("\n" + colored(p.name, color))
  53. print(colored(str(datetime.fromtimestamp(p.stat().st_mtime)), color))
  54. elif isinstance(p, pEp.Message):
  55. msg = p
  56. else:
  57. raise TypeError("print_msg(): pathlib.Path and pEp.Message supported, but "
  58. + str(type(p)) + " delivered")
  59. m = re.search("<keysync>(.*)</keysync>", msg.opt_fields["pEp.sync"].replace("\n", " "))
  60. if m:
  61. if etree:
  62. tree = objectify.fromstring(m.group(1).replace("\r", ""))
  63. text = etree.tostring(tree, pretty_print=True, encoding="unicode")
  64. else:
  65. text = m.group(1).replace("\r", "").strip()
  66. while text.count(" "):
  67. text = text.replace(" ", " ")
  68. print(text)
  69. def messageToSend(msg):
  70. msg = add_debug_info(msg)
  71. minimail.send(inbox, msg, device_name)
  72. def messageImapToSend(msg):
  73. import miniimap
  74. msg = add_debug_info(msg)
  75. miniimap.send('Inbox', msg)
  76. def add_debug_info(msg):
  77. if msg.enc_format:
  78. m, keys, rating, flags = msg.decrypt(DONT_TRIGGER_SYNC)
  79. else:
  80. m = msg
  81. try:
  82. text = "<!-- sending from " + device_name + " -->\n" + m.attachments[0].decode()
  83. except UnicodeDecodeError as e:
  84. text = "<!-- sending from " + device_name + " -->\n *** NO DECODER AVAILABLE FOR THIS MESSAGE TYPE ***\n"
  85. output(text)
  86. msg.opt_fields = { "pEp.sync": text }
  87. return msg
  88. class UserInterface(pEp.UserInterface):
  89. def notifyHandshake(self, me, partner, signal):
  90. print(colored(str(signal), "yellow"), end=" ")
  91. output("on " + device_name + "" if not me.fpr else
  92. "for identities " + str(me.fpr) + " " + str(partner.fpr))
  93. if me.fpr and partner.fpr:
  94. assert me.fpr != partner.fpr
  95. if signal in (
  96. pEp.sync_handshake_signal.SYNC_NOTIFY_INIT_ADD_OTHER_DEVICE,
  97. pEp.sync_handshake_signal.SYNC_NOTIFY_INIT_ADD_OUR_DEVICE,
  98. pEp.sync_handshake_signal.SYNC_NOTIFY_INIT_FORM_GROUP
  99. ):
  100. if isinstance(end_on, list):
  101. end_on.extend([
  102. pEp.sync_handshake_signal.SYNC_NOTIFY_SOLE,
  103. pEp.sync_handshake_signal.SYNC_NOTIFY_IN_GROUP,
  104. ])
  105. sleep(.5) # user is reading message
  106. try:
  107. if not options.noanswer:
  108. if options.reject:
  109. self.deliverHandshakeResult(SYNC_HANDSHAKE_REJECTED)
  110. else:
  111. self.deliverHandshakeResult(SYNC_HANDSHAKE_ACCEPTED)
  112. except NameError:
  113. self.deliverHandshakeResult(SYNC_HANDSHAKE_ACCEPTED)
  114. if signal in end_on:
  115. global the_end
  116. the_end = True
  117. def shutdown_sync():
  118. pEp.shutdown_sync()
  119. def run(name, color=None, imap=False, own_ident=1, leave=False):
  120. global device_name
  121. device_name = name
  122. if color:
  123. global output
  124. output = lambda x: print(colored(x, color))
  125. if color == "red":
  126. pEp.debug_color(31)
  127. elif color == "green":
  128. pEp.debug_color(32)
  129. elif color == "cyan":
  130. pEp.debug_color(36)
  131. if imap:
  132. import miniimap
  133. import imap_settings
  134. me = pEp.Identity(imap_settings.IMAP_EMAIL, name + " of " + imap_settings.IMAP_USER, name)
  135. pEp.myself(me)
  136. pEp.messageToSend = messageImapToSend
  137. else:
  138. me = pEp.Identity("alice@peptest.ch", name + " of Alice Neuman", name)
  139. pEp.myself(me)
  140. if own_ident >= 2:
  141. me2 = pEp.Identity("alice@pep.security", name + " of Alice Neuman", name)
  142. pEp.myself(me2)
  143. if own_ident == 3:
  144. me3 = pEp.Identity("alice@pep.foundation", name + " of Alice Neuman", name)
  145. pEp.myself(me3)
  146. pEp.messageToSend = messageToSend
  147. if multithreaded:
  148. from threading import Thread
  149. def sync_thread():
  150. print(colored("********* ", "yellow") + colored("sync_thread entered", color))
  151. ui = UserInterface()
  152. print(colored("********* ", "yellow") + colored("UserInterface object created", color))
  153. pEp.do_sync_protocol()
  154. print(colored("********* ", "yellow") + colored("leaving sync_thread", color))
  155. sync = Thread(target=sync_thread)
  156. sync.start()
  157. else:
  158. pEp.script_is_implementing_sync()
  159. sync = None
  160. ui = UserInterface()
  161. try:
  162. if leave:
  163. pEp.leave_device_group()
  164. while not the_end:
  165. if pEp.is_sync_active():
  166. pass # we could react on this
  167. if imap:
  168. l = miniimap.recv_all()
  169. else:
  170. l = minimail.recv_all(inbox, name)
  171. for n, m in l:
  172. msg = pEp.Message(m)
  173. output("*** Reading")
  174. print_msg(msg)
  175. msg2, keys, rating, flags = msg.decrypt()
  176. except KeyboardInterrupt:
  177. shutdown_sync()
  178. sys.exit()
  179. if __name__=="__main__":
  180. from optparse import OptionParser
  181. optParser = OptionParser()
  182. optParser.description = __doc__
  183. optParser.add_option("-e", "--exec-for", action="store", type="string",
  184. dest="exec_for", help="execute for name of simulated device " +
  185. "(default: name of actual directory)")
  186. optParser.add_option("--color", action="store", type="string",
  187. dest="color", help="print debug output in this color", default=None)
  188. optParser.add_option("--reject", action="store_true", dest="reject",
  189. help="reject device group")
  190. optParser.add_option("--accept", action="store_false", dest="reject",
  191. help="accept device group (default)")
  192. optParser.add_option("--no-answer", action="store_true", dest="noanswer",
  193. help="do not answer device group handshake")
  194. optParser.add_option("-E", "--end-on", dest="notifications",
  195. help="end test on these notifications")
  196. optParser.add_option("-j", "--multi-threaded", action="store_true",
  197. dest="multithreaded",
  198. help="use multithreaded instead of single threaded implementation")
  199. optParser.add_option("-n", "--noend", action="store_true",
  200. dest="noend", help="do not end")
  201. optParser.add_option("-i", "--imap", action="store_true",
  202. dest="imap",
  203. help="use imap instead of minimail")
  204. optParser.add_option("-o", "--own-identities", type="int", dest="own_ident",
  205. help="simulate having OWN_IDENT own identities (1 to 3)", default=1)
  206. optParser.add_option("-L", "--leave-device-group", action="store_true",
  207. dest="leave",
  208. help="after a successful sync run this to make the device leave the "
  209. "device group again")
  210. options, args = optParser.parse_args()
  211. if not options.exec_for:
  212. options.exec_for = os.path.basename(os.getcwd())
  213. if options.own_ident < 1 or options.own_ident > 3:
  214. raise ValueError("illegal number of own identities (allowed are 1 to 3)")
  215. if options.notifications:
  216. end_on = eval(options.notifications)
  217. try: None in end_on
  218. except TypeError:
  219. end_on = (end_on,)
  220. if options.noend:
  221. end_on = (None,)
  222. if options.imap and options.own_ident >1:
  223. raise ValueError("Multiple own identities not supported for imap mode")
  224. multithreaded = options.multithreaded
  225. run(options.exec_for, options.color, options.imap, options.own_ident, options.leave)