p≡p for Python
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

131 lines
4.7 KiB

  1. #/!usr/bin/env python3
  2. #
  3. # Copyleft 2020, p≡p Security
  4. # Copyleft 2020, Hartmut Goebel <h.goebel@crazy-compilers.com>
  5. #
  6. # This file is under GNU General Public License 3.0
  7. """
  8. This examples shows how to transport pEp messages within a
  9. simple XML based message format which is capable for attachments.
  10. pEp message elements loosely follow the "pEp for XML specification" and
  11. https://pEp.software/ns/pEp-1.0.xsd.
  12. IMPORTANT: In this example, no error checking is done. Neither is
  13. the "pEp for XML" specification enforced.
  14. Structure of the simple XML based message:
  15. <msg>
  16. <from>from-addr</from>
  17. <to>to-addr</to>
  18. <body>text of the messgage (must not be XML)</bod>
  19. <attachments>
  20. <attachment>textual data</attachment>
  21. <attachments>
  22. </msg>
  23. """
  24. from lxml import etree
  25. import email.utils
  26. import pEp
  27. __all__ = ["serialize_pEp_message", "parse_serialized_message"]
  28. CT2TAG = {
  29. 'application/pgp-keys': 'keydata',
  30. 'application/pEp.sync': 'sync',
  31. 'application/pEp.distribution': 'distribution',
  32. 'application/pgp-signature': 'signature',
  33. }
  34. TAG2CT = dict((v,k)for (k,v) in CT2TAG.items())
  35. PEP_NAMESPACE = "https://pEp.software/ns/pEp-1.0.xsd"
  36. PEP_NS = '{%s}' % PEP_NAMESPACE
  37. NSMAP = {'pEp': PEP_NAMESPACE}
  38. INCOMING = 0
  39. OUTGOING = 1
  40. UNENCRYPTED = 0
  41. def serialize_pEp_message(msg):
  42. root = etree.Element("msg", nsmap=NSMAP)
  43. etree.SubElement(root, "to").text = str(msg.to[0])
  44. etree.SubElement(root, "from").text = str(msg.from_)
  45. if msg.enc_format == UNENCRYPTED:
  46. # unencrypted
  47. etree.SubElement(root, "body").text = msg.longmsg
  48. attachments = etree.SubElement(root, "attachments")
  49. # FIXME: Namespace
  50. attachments = etree.SubElement(attachments,
  51. PEP_NS + "attachments",
  52. version=msg.opt_fields['X-pEp-Version'])
  53. # TODO: opt_fields, esp. auto-consume
  54. # TODO: Order pEp attachments by type
  55. for attach in msg.attachments:
  56. # no need to base64-encode, attachement are ascii-armoured
  57. # already
  58. #attachment = base64_encode(attachment)
  59. # FIXME: Namespace
  60. a = etree.SubElement(attachments,
  61. PEP_NS + CT2TAG[attach.mime_type])
  62. a.text = attach.decode("ascii")
  63. else: # encrypted
  64. # forget about longmsg and original body
  65. # encrypted message is an attachment and there might be
  66. # further attachments, e.g. new keys
  67. # build a new message out of these attachments
  68. etree.SubElement(root, "body") # emptry body
  69. attachments = etree.SubElement(root, "attachments")
  70. assert len(msg.attachments) == 2
  71. # first attachment is "Version: 1"
  72. attach = msg.attachments[1]
  73. # no need to base64-encode, attachements are ascii-armoured
  74. # already
  75. n = etree.SubElement(root, PEP_NS + "message")
  76. n.text = attach.decode("ascii")
  77. return etree.tostring(root)
  78. def parse_serialized_message(transport_message):
  79. def addr2identity(text):
  80. name, addr = email.utils.parseaddr(text)
  81. ident = pEp.Identity(addr, name)
  82. ident.update()
  83. return ident
  84. # parse the XML text, fetch from and to
  85. root = etree.fromstring(transport_message)
  86. from_ = addr2identity(root.xpath("./from/text()")[0])
  87. msg1 = pEp.Message(INCOMING, from_)
  88. msg1.to.append(addr2identity(root.xpath("./to/text()")[0]))
  89. enc_msg = root.find("{%s}message" % PEP_NAMESPACE)
  90. if enc_msg is not None:
  91. # this is an encrypted message, ignore all but the encrypted message
  92. msg1.attachments = [
  93. # As of Engine r4652 the encrypted message must be the second
  94. # attachment
  95. pEp.Blob(b"Version: 1", "application/pgp-encrypted"),
  96. pEp.Blob(enc_msg.text.encode(), "application/xxpgp-encrypted")]
  97. else:
  98. # this is an unencrypted message, might contain pEp attachments
  99. msg1.longmsg = root.findtext("body")
  100. pEp_attachments = None
  101. attachments = root.find("attachments")
  102. if attachments is not None:
  103. pEp_attachments = attachments.find("{%s}attachments" % PEP_NAMESPACE)
  104. if pEp_attachments is not None:
  105. msg1.opt_fields['X-pEp-Version'] = pEp_attachments.attrib["version"]
  106. pEp_attachs = []
  107. for tagname in ("keydata", "signature", "sync", "distribution"):
  108. for attach in pEp_attachments.iterfind(
  109. "{%s}%s" % (PEP_NAMESPACE, tagname)):
  110. pEp_attachs.append(
  111. pEp.Blob(attach.text.encode(), TAG2CT[tagname]))
  112. msg1.attachments = pEp_attachs
  113. msg2, keys, rating, flags = msg1.decrypt()
  114. return msg2, rating