Logo Search packages:      
Sourcecode: mailcrypt version File versions  Download package

unwind.py

#! /usr/bin/python

import GnuPGInterface
import string, re, sys

gnupg = GnuPGInterface.GnuPG()
gnupg.options.extra_args += ['--homedir', 'remailer-keys', '--batch']

# The keyring should hold private keys for all the remailers in the chain,
# and each one should have a passphrase equal to the name of the key.

def passphrase(bigname):
    return re.search(r'(\w+@\w+\.\w+)', bigname).group(1)

def unwind(recipient, message):
    """Unwind a message. Takes two parameters: the To: address of the
    message and the message body itself.

    This function will return a list of steps. Each element is a dict
    describing what happened on that particular step. The decryption and
    remailing are separate steps. The dict keys are as follows:

     recipient: the target of that step, as requested by the previous step
     encrypted: 1 if the 'Encrypted: PGP' flag was set, else non-existent
     anon-to: name in the Anon-To: header, if present
     subject: subject specified in a ## header, if present
     message: plaintext of the message (for the last step)

    Typical steps:
     recipient=rem1, encrypted=1  ->recurse BRANCH2
     recipient=rem1, anon-to=rem2  BRANCH3, recurse
     recipient=rem2, encrypted=1  ->recurse BRANCH2
     recipient=rem2, anon-to=user, subject=test  BRANCH3, recurse
     recipient=user, message=plaintext  : BRANCH1, terminate
 
    It throws an exception if anything about the message is incorrect:
     message doesn't start with ::\nEncrypted: PGP
     message isn't encrypted or is encrypted to the wrong key (not TO)
     decrypted message doesn't start with ::\nAnon-To:
    """
    # the encrypted message is handled by unwind1. It will look like:
    # ::\nEncrypted: PGP\n\n-----BEGIN PGP MESSAGE----\n etc

    # once decrypted, the message should look like:
    # ::\nAnon-To: dest@foo.com\n\n##\nSubject: test subject\n\nmessage body

    if 0:
        print "-----"
        print message
        print "-----"

    step = {}
    
    # see if we should end the recursion now
    if not re.search(r'^rem\d@test.test', recipient):
        # BRANCH 1
        # plaintext message
        step['recipient'] = recipient
        step['message'] = message
        return [step]
    message = string.split(message, "\n")
    if message[0] != '::':
        raise 'bad message', 'first line was not ::'
    if message[2] != '':
        raise 'bad message', 'third line was not blank'
    # see if it's encrypted
    if message[1] == 'Encrypted: PGP':
        # yes, decrypt it and recurse
        # BRANCH2
        # step is: recipient=rem1, encrypted=1
        crypttext = message[3:]
        assert(message[3] == '-----BEGIN PGP MESSAGE-----')
        # decrypt here
        gnupg.passphrase = passphrase(recipient) # passphrase == keyname
        p = gnupg.run(['--decrypt'],
                      create_fhs=['stdin', 'stdout'],
                      )
        p.handles['stdin'].write(string.join(crypttext,"\n"))
        p.handles['stdin'].close()
        plaintext = p.handles['stdout'].read()
        p.handles['stdout'].close()
        p.wait()
        step['recipient'] = recipient
        step['encrypted'] = 1
        return [step] + unwind(recipient, plaintext)

    # BRANCH3
    
    # message is now:
    #  ::\nAnon-To: remN@test.set (or dest@foo.com)\n\nmessage body
    # or
    #  ::\nAnon-To: dest@foo.com\n\n##\nSubject: subject\n\nmessage body
    
    r = re.search(r'^Anon-To: (.*)$', message[1])
    if not r:
        print "Bad Message, no Anon-To"
        print message
        raise 'bad message', "no anon-to"
    step['recipient'] = recipient
    step['anon-to'] = r.group(1)

    message = message[3:]
    # now: "message body", or "##\nSubject: sub\n\nmessage body"
    
    if message[0] == '##':
        # there are ## headers included
        r = re.search(r'^Subject: (.*)$', message[1])
        if not r:
            raise 'bad message', "## but no Subject: header"
        step['subject'] = r.group(1)
        if message[2] != '':
            raise 'bad message', "no blank line after ## headers"
        message = message[3:]
    # now: "message body"
    message = string.join(message, "\n")
    return [step] + unwind(step['anon-to'], message)

def insistEquals(one, two):
    if one != two:
        raise "results don't match expected", "'%s' != '%s'" % (one, two)


def test_chain(firsthop, crypttext, plaintext, recipient, chain, subject=None):
    """Verify that the crypttext message does indeed match the plaintext
    message, sent to a given recipient and encrypted to the given remailer
    chain."""

    # chain is a list of remailers with long names: rem1@test.test, etc

    # build up the list of what we expect to see at each step
    expected_chain = []
    for i in range(len(chain)-1):
        expected_chain.append({'recipient': chain[i],
                               'encrypted': 1,
                               })
        expected_chain.append({'recipient': chain[i],
                               'anon-to': chain[i+1],
                               })
    last = chain[len(chain)-1]
    expected_chain.append({'recipient': last,
                           'encrypted': 1,
                           })
    pentultimate = {'recipient': last,
                    'anon-to': recipient,
                    }
    if subject:
        pentultimate['subject'] = subject
    expected_chain.append(pentultimate)
    expected_chain.append({'recipient': recipient,
                           'message': plaintext,
                           })

    # unwind the messge
    insistEquals(firsthop, chain[0])
    results = unwind(firsthop, crypttext)

    # compare against expectations
    insistEquals(len(expected_chain), len(results))
    for i in range(len(results)):
        #print i, results[i], expected_chain[i]
        insistEquals(results[i], expected_chain[i])
    print "TEST CASE PASSED"
    


def test1():
    m1 = open("m1").read()
    chain = unwind("rem3@test.test", m1)
    for link in chain: print link

def test2():
    m1 = open("m1").read()
    chain = ["rem3", "rem2", "rem1", "rem1"]
    test_chain(firsthop="rem3@test.test", crypttext=m1,
               plaintext="test message\n",
               recipient="warner@lothar.com",
               chain=chain,
               subject="test subject")

def main():
    # argv is: ['recipient', 'chain1,chain2,chain3', 'subject']
    # plaintext is always "test message\n"
    # crypttext arrives on stdin
    recipient = sys.argv[1]
    chain = string.split(sys.argv[2], ',')
    subject = sys.argv[3]
    crypttext = sys.stdin.read()
    test_chain(firsthop=chain[0],
               crypttext=crypttext,
               plaintext="test message\n",
               recipient=recipient,
               chain=chain,
               subject=subject)
    
        
if __name__ == '__main__':
    main()



Generated by  Doxygen 1.6.0   Back to index