Browse Source

tests: python: Make test_handshake actually work.

This test misused parallel test-processing of pytest-xdist to run two
processes in parallel - while these are actually interactig and form one
test-case. Fixed by forming one test-case which spawns two processes.
doxygen_doc
Hartmut Goebel 7 months ago
parent
commit
88686b8cef
1 changed files with 54 additions and 76 deletions
  1. +54
    -76
      test/python_tests/test_handshake.py

+ 54
- 76
test/python_tests/test_handshake.py View File

@ -1,57 +1,54 @@
# -*- coding: utf-8 -*-
#
# this file is under GNU General Public License 3.0
# Copyleft 2017, p≡p foundation
# Copyleft 2017-2021, p≡p foundation
from transport import *
from multiprocessing import Process
def setup_gnupg():
assert os.environ["GNUPGHOME"] != ""
def setup_gnupg(arthome):
print("using arthome", arthome)
os.environ["HOME"] = os.path.join(mydir, arthome)
os.environ["GNUPGHOME"] = os.path.join(mydir, arthome, 'gnupg')
os.chdir(os.path.join(mydir, arthome))
mydir = os.path.dirname(os.path.realpath(__file__))
def assert_gnupg():
assert os.environ["GNUPGHOME"] != ""
class Test1:
mydir = os.path.dirname(os.path.realpath(__file__))
def setup_method(self):
arthome = "test1"
os.environ["HOME"] = os.path.join(mydir, arthome)
os.environ["GNUPGHOME"] = os.path.join(mydir, arthome, 'gnupg')
ALICE = "test1@peptest.ch", "Alice One"
BOB = "test2@peptest.ch", "Bob Two"
os.chdir(os.path.join(mydir, arthome))
@property
def me(self):
# because of flaws of py.test these two statements are necessary
setup_gnupg() # work around a bug with initializing os.environ
import pEp # after that import pEp module, not before
def Me(who):
assert_gnupg() ; import pEp
me = pEp.Identity(*who)
pEp.myself(me)
return me
i = pEp.Identity()
i.address = "test1@peptest.ch"
i.username = "Alice One"
i.myself()
return i
def You(who):
assert_gnupg() ; import pEp
you = pEp.Identity(*who)
you.update()
return you
@property
def you(self):
setup_gnupg() ; import pEp
i = pEp.Identity()
i.address = "test2@peptest.ch"
i.username = "Bob Two"
i.update()
return i
def test_handshake():
def test_handshake(self):
setup_gnupg() ; import pEp
def process1():
print("process1 starting")
setup_gnupg("test1") ; import pEp
me = Me(ALICE)
you = You(BOB)
msg = pEp.Message(1)
msg.from_ = self.me
msg.to = [self.you]
msg = pEp.Message(1, me)
msg.to = [you]
msg.shortmsg = "Subject line"
msg.longmsg = "Message Text\n"
@ -61,59 +58,31 @@ class Test1:
txt = wait_for_message()
enc = pEp.Message(txt)
assert enc.from_.address == "test2@peptest.ch"
inc, keys, rating, consumed, flags = enc.decrypt()
inc, keys, rating, flags = enc.decrypt()
assert rating == 6
msg = pEp.Message(1)
msg.from_ = self.me
msg.to = [self.you]
msg = pEp.Message(1, me)
msg.to = [you]
msg.shortmsg = "Subject line complete"
msg.longmsg = "Message Text complete\n"
enc = msg.encrypt()
send_message("test2", str(enc))
print("process1 finishing")
class Test2:
def setup_method(self):
arthome = "test2"
os.environ["HOME"] = os.path.join(mydir, arthome)
os.environ["GNUPGHOME"] = os.path.join(mydir, arthome, 'gnupg')
os.chdir(os.path.join(mydir, arthome))
@property
def me(self):
setup_gnupg() ; import pEp
i = pEp.Identity()
i.address = "test2@peptest.ch"
i.username = "Bob Two"
i.myself()
return i
@property
def you(self):
setup_gnupg() ; import pEp
i = pEp.Identity()
i.address = "test1@peptest.ch"
i.username = "Alice One"
i.update()
return i
def test_handshake(self):
setup_gnupg() ; import pEp
def process2():
print("process2 starting")
setup_gnupg("test2") ; import pEp
me = Me(BOB)
you = You(ALICE)
txt = wait_for_message()
msg = pEp.Message(txt)
msg.decrypt()
assert msg.from_.address == self.you.address
assert msg.from_.address == you.address
out = pEp.Message(1)
out.from_ = self.me
out.to = [self.you]
out = pEp.Message(1, me)
out.to = [you]
out.shortmsg = "Subject Back"
out.longmsg = "Text Back\n"
@ -123,5 +92,14 @@ class Test2:
txt = wait_for_message()
msg = pEp.Message(txt)
msg.decrypt()
assert msg.from_.address == self.you.address
assert msg.from_.address == you.address
print("process2 finishing")
p1 = Process(target=process1, daemon=True)
p2 = Process(target=process2, daemon=True)
p1.start()
p2.start()
p1.join()
p2.join()
assert p2.exitcode == 0
assert p1.exitcode == 0

Loading…
Cancel
Save