diff --git a/README.md b/README.md index 178fc4e..99ba2bf 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,22 @@ Dummy-SMTP ========== -A dummy Linux SMTP server that drops emails to a folder, instead of sending them. \ No newline at end of file +A dummy Linux SMTP server that drops emails to a folder, instead of sending them. + +Allows optional options from the command line: + +Usage: + + listen.py + + listen.py [--ip=127.0.0.1] [--port=25] + + listen.py (-h | --help) + +Options: + + -h --help Show this screen + + --ip=IP IP to listen to [default: localhost] + + --port=PORT Port to bind to [default: 25] diff --git a/docopt.py b/docopt.py new file mode 100644 index 0000000..ce03468 --- /dev/null +++ b/docopt.py @@ -0,0 +1,579 @@ +"""Pythonic command-line interface parser that will make you smile. + + * http://docopt.org + * Repository and issue-tracker: https://github.com/docopt/docopt + * Licensed under terms of MIT license (see LICENSE-MIT) + * Copyright (c) 2013 Vladimir Keleshev, vladimir@keleshev.com + +""" +import sys +import re + + +__all__ = ['docopt'] +__version__ = '0.6.1' + + +class DocoptLanguageError(Exception): + + """Error in construction of usage-message by developer.""" + + +class DocoptExit(SystemExit): + + """Exit in case user invoked program with incorrect arguments.""" + + usage = '' + + def __init__(self, message=''): + SystemExit.__init__(self, (message + '\n' + self.usage).strip()) + + +class Pattern(object): + + def __eq__(self, other): + return repr(self) == repr(other) + + def __hash__(self): + return hash(repr(self)) + + def fix(self): + self.fix_identities() + self.fix_repeating_arguments() + return self + + def fix_identities(self, uniq=None): + """Make pattern-tree tips point to same object if they are equal.""" + if not hasattr(self, 'children'): + return self + uniq = list(set(self.flat())) if uniq is None else uniq + for i, c in enumerate(self.children): + if not hasattr(c, 'children'): + assert c in uniq + self.children[i] = uniq[uniq.index(c)] + else: + c.fix_identities(uniq) + + def fix_repeating_arguments(self): + """Fix elements that should accumulate/increment values.""" + either = [list(c.children) for c in self.either.children] + for case in either: + for e in [c for c in case if case.count(c) > 1]: + if type(e) is Argument or type(e) is Option and e.argcount: + if e.value is None: + e.value = [] + elif type(e.value) is not list: + e.value = e.value.split() + if type(e) is Command or type(e) is Option and e.argcount == 0: + e.value = 0 + return self + + @property + def either(self): + """Transform pattern into an equivalent, with only top-level Either.""" + # Currently the pattern will not be equivalent, but more "narrow", + # although good enough to reason about list arguments. + ret = [] + groups = [[self]] + while groups: + children = groups.pop(0) + types = [type(c) for c in children] + if Either in types: + either = [c for c in children if type(c) is Either][0] + children.pop(children.index(either)) + for c in either.children: + groups.append([c] + children) + elif Required in types: + required = [c for c in children if type(c) is Required][0] + children.pop(children.index(required)) + groups.append(list(required.children) + children) + elif Optional in types: + optional = [c for c in children if type(c) is Optional][0] + children.pop(children.index(optional)) + groups.append(list(optional.children) + children) + elif AnyOptions in types: + optional = [c for c in children if type(c) is AnyOptions][0] + children.pop(children.index(optional)) + groups.append(list(optional.children) + children) + elif OneOrMore in types: + oneormore = [c for c in children if type(c) is OneOrMore][0] + children.pop(children.index(oneormore)) + groups.append(list(oneormore.children) * 2 + children) + else: + ret.append(children) + return Either(*[Required(*e) for e in ret]) + + +class ChildPattern(Pattern): + + def __init__(self, name, value=None): + self.name = name + self.value = value + + def __repr__(self): + return '%s(%r, %r)' % (self.__class__.__name__, self.name, self.value) + + def flat(self, *types): + return [self] if not types or type(self) in types else [] + + def match(self, left, collected=None): + collected = [] if collected is None else collected + pos, match = self.single_match(left) + if match is None: + return False, left, collected + left_ = left[:pos] + left[pos + 1:] + same_name = [a for a in collected if a.name == self.name] + if type(self.value) in (int, list): + if type(self.value) is int: + increment = 1 + else: + increment = ([match.value] if type(match.value) is str + else match.value) + if not same_name: + match.value = increment + return True, left_, collected + [match] + same_name[0].value += increment + return True, left_, collected + return True, left_, collected + [match] + + +class ParentPattern(Pattern): + + def __init__(self, *children): + self.children = list(children) + + def __repr__(self): + return '%s(%s)' % (self.__class__.__name__, + ', '.join(repr(a) for a in self.children)) + + def flat(self, *types): + if type(self) in types: + return [self] + return sum([c.flat(*types) for c in self.children], []) + + +class Argument(ChildPattern): + + def single_match(self, left): + for n, p in enumerate(left): + if type(p) is Argument: + return n, Argument(self.name, p.value) + return None, None + + @classmethod + def parse(class_, source): + name = re.findall('(<\S*?>)', source)[0] + value = re.findall('\[default: (.*)\]', source, flags=re.I) + return class_(name, value[0] if value else None) + + +class Command(Argument): + + def __init__(self, name, value=False): + self.name = name + self.value = value + + def single_match(self, left): + for n, p in enumerate(left): + if type(p) is Argument: + if p.value == self.name: + return n, Command(self.name, True) + else: + break + return None, None + + +class Option(ChildPattern): + + def __init__(self, short=None, long=None, argcount=0, value=False): + assert argcount in (0, 1) + self.short, self.long = short, long + self.argcount, self.value = argcount, value + self.value = None if value is False and argcount else value + + @classmethod + def parse(class_, option_description): + short, long, argcount, value = None, None, 0, False + options, _, description = option_description.strip().partition(' ') + options = options.replace(',', ' ').replace('=', ' ') + for s in options.split(): + if s.startswith('--'): + long = s + elif s.startswith('-'): + short = s + else: + argcount = 1 + if argcount: + matched = re.findall('\[default: (.*)\]', description, flags=re.I) + value = matched[0] if matched else None + return class_(short, long, argcount, value) + + def single_match(self, left): + for n, p in enumerate(left): + if self.name == p.name: + return n, p + return None, None + + @property + def name(self): + return self.long or self.short + + def __repr__(self): + return 'Option(%r, %r, %r, %r)' % (self.short, self.long, + self.argcount, self.value) + + +class Required(ParentPattern): + + def match(self, left, collected=None): + collected = [] if collected is None else collected + l = left + c = collected + for p in self.children: + matched, l, c = p.match(l, c) + if not matched: + return False, left, collected + return True, l, c + + +class Optional(ParentPattern): + + def match(self, left, collected=None): + collected = [] if collected is None else collected + for p in self.children: + m, left, collected = p.match(left, collected) + return True, left, collected + + +class AnyOptions(Optional): + + """Marker/placeholder for [options] shortcut.""" + + +class OneOrMore(ParentPattern): + + def match(self, left, collected=None): + assert len(self.children) == 1 + collected = [] if collected is None else collected + l = left + c = collected + l_ = None + matched = True + times = 0 + while matched: + # could it be that something didn't match but changed l or c? + matched, l, c = self.children[0].match(l, c) + times += 1 if matched else 0 + if l_ == l: + break + l_ = l + if times >= 1: + return True, l, c + return False, left, collected + + +class Either(ParentPattern): + + def match(self, left, collected=None): + collected = [] if collected is None else collected + outcomes = [] + for p in self.children: + matched, _, _ = outcome = p.match(left, collected) + if matched: + outcomes.append(outcome) + if outcomes: + return min(outcomes, key=lambda outcome: len(outcome[1])) + return False, left, collected + + +class TokenStream(list): + + def __init__(self, source, error): + self += source.split() if hasattr(source, 'split') else source + self.error = error + + def move(self): + return self.pop(0) if len(self) else None + + def current(self): + return self[0] if len(self) else None + + +def parse_long(tokens, options): + """long ::= '--' chars [ ( ' ' | '=' ) chars ] ;""" + long, eq, value = tokens.move().partition('=') + assert long.startswith('--') + value = None if eq == value == '' else value + similar = [o for o in options if o.long == long] + if tokens.error is DocoptExit and similar == []: # if no exact match + similar = [o for o in options if o.long and o.long.startswith(long)] + if len(similar) > 1: # might be simply specified ambiguously 2+ times? + raise tokens.error('%s is not a unique prefix: %s?' % + (long, ', '.join(o.long for o in similar))) + elif len(similar) < 1: + argcount = 1 if eq == '=' else 0 + o = Option(None, long, argcount) + options.append(o) + if tokens.error is DocoptExit: + o = Option(None, long, argcount, value if argcount else True) + else: + o = Option(similar[0].short, similar[0].long, + similar[0].argcount, similar[0].value) + if o.argcount == 0: + if value is not None: + raise tokens.error('%s must not have an argument' % o.long) + else: + if value is None: + if tokens.current() is None: + raise tokens.error('%s requires argument' % o.long) + value = tokens.move() + if tokens.error is DocoptExit: + o.value = value if value is not None else True + return [o] + + +def parse_shorts(tokens, options): + """shorts ::= '-' ( chars )* [ [ ' ' ] chars ] ;""" + token = tokens.move() + assert token.startswith('-') and not token.startswith('--') + left = token.lstrip('-') + parsed = [] + while left != '': + short, left = '-' + left[0], left[1:] + similar = [o for o in options if o.short == short] + if len(similar) > 1: + raise tokens.error('%s is specified ambiguously %d times' % + (short, len(similar))) + elif len(similar) < 1: + o = Option(short, None, 0) + options.append(o) + if tokens.error is DocoptExit: + o = Option(short, None, 0, True) + else: # why copying is necessary here? + o = Option(short, similar[0].long, + similar[0].argcount, similar[0].value) + value = None + if o.argcount != 0: + if left == '': + if tokens.current() is None: + raise tokens.error('%s requires argument' % short) + value = tokens.move() + else: + value = left + left = '' + if tokens.error is DocoptExit: + o.value = value if value is not None else True + parsed.append(o) + return parsed + + +def parse_pattern(source, options): + tokens = TokenStream(re.sub(r'([\[\]\(\)\|]|\.\.\.)', r' \1 ', source), + DocoptLanguageError) + result = parse_expr(tokens, options) + if tokens.current() is not None: + raise tokens.error('unexpected ending: %r' % ' '.join(tokens)) + return Required(*result) + + +def parse_expr(tokens, options): + """expr ::= seq ( '|' seq )* ;""" + seq = parse_seq(tokens, options) + if tokens.current() != '|': + return seq + result = [Required(*seq)] if len(seq) > 1 else seq + while tokens.current() == '|': + tokens.move() + seq = parse_seq(tokens, options) + result += [Required(*seq)] if len(seq) > 1 else seq + return [Either(*result)] if len(result) > 1 else result + + +def parse_seq(tokens, options): + """seq ::= ( atom [ '...' ] )* ;""" + result = [] + while tokens.current() not in [None, ']', ')', '|']: + atom = parse_atom(tokens, options) + if tokens.current() == '...': + atom = [OneOrMore(*atom)] + tokens.move() + result += atom + return result + + +def parse_atom(tokens, options): + """atom ::= '(' expr ')' | '[' expr ']' | 'options' + | long | shorts | argument | command ; + """ + token = tokens.current() + result = [] + if token in '([': + tokens.move() + matching, pattern = {'(': [')', Required], '[': [']', Optional]}[token] + result = pattern(*parse_expr(tokens, options)) + if tokens.move() != matching: + raise tokens.error("unmatched '%s'" % token) + return [result] + elif token == 'options': + tokens.move() + return [AnyOptions()] + elif token.startswith('--') and token != '--': + return parse_long(tokens, options) + elif token.startswith('-') and token not in ('-', '--'): + return parse_shorts(tokens, options) + elif token.startswith('<') and token.endswith('>') or token.isupper(): + return [Argument(tokens.move())] + else: + return [Command(tokens.move())] + + +def parse_argv(tokens, options, options_first=False): + """Parse command-line argument vector. + + If options_first: + argv ::= [ long | shorts ]* [ argument ]* [ '--' [ argument ]* ] ; + else: + argv ::= [ long | shorts | argument ]* [ '--' [ argument ]* ] ; + + """ + parsed = [] + while tokens.current() is not None: + if tokens.current() == '--': + return parsed + [Argument(None, v) for v in tokens] + elif tokens.current().startswith('--'): + parsed += parse_long(tokens, options) + elif tokens.current().startswith('-') and tokens.current() != '-': + parsed += parse_shorts(tokens, options) + elif options_first: + return parsed + [Argument(None, v) for v in tokens] + else: + parsed.append(Argument(None, tokens.move())) + return parsed + + +def parse_defaults(doc): + # in python < 2.7 you can't pass flags=re.MULTILINE + split = re.split('\n *(<\S+?>|-\S+?)', doc)[1:] + split = [s1 + s2 for s1, s2 in zip(split[::2], split[1::2])] + options = [Option.parse(s) for s in split if s.startswith('-')] + #arguments = [Argument.parse(s) for s in split if s.startswith('<')] + #return options, arguments + return options + + +def printable_usage(doc): + usage_pattern = re.compile(r'(usage:)', re.IGNORECASE) + usage_split = re.split(usage_pattern, doc) + if len(usage_split) < 3: + raise DocoptLanguageError('"usage:" (case-insensitive) not found.') + if len(usage_split) > 3: + raise DocoptLanguageError('More than one "usage:" (case-insensitive).') + return re.split(r'\n\s*\n', ''.join(usage_split[1:]))[0].strip() + + +def formal_usage(printable_usage): + pu = printable_usage.split()[1:] # split and drop "usage:" + return '( ' + ' '.join(') | (' if s == pu[0] else s for s in pu[1:]) + ' )' + + +def extras(help, version, options, doc): + if help and any((o.name in ('-h', '--help')) and o.value for o in options): + print(doc.strip("\n")) + sys.exit() + if version and any(o.name == '--version' and o.value for o in options): + print(version) + sys.exit() + + +class Dict(dict): + def __repr__(self): + return '{%s}' % ',\n '.join('%r: %r' % i for i in sorted(self.items())) + + +def docopt(doc, argv=None, help=True, version=None, options_first=False): + """Parse `argv` based on command-line interface described in `doc`. + + `docopt` creates your command-line interface based on its + description that you pass as `doc`. Such description can contain + --options, , commands, which could be + [optional], (required), (mutually | exclusive) or repeated... + + Parameters + ---------- + doc : str + Description of your command-line interface. + argv : list of str, optional + Argument vector to be parsed. sys.argv[1:] is used if not + provided. + help : bool (default: True) + Set to False to disable automatic help on -h or --help + options. + version : any object + If passed, the object will be printed if --version is in + `argv`. + options_first : bool (default: False) + Set to True to require options precede positional arguments, + i.e. to forbid options and positional arguments intermix. + + Returns + ------- + args : dict + A dictionary, where keys are names of command-line elements + such as e.g. "--verbose" and "", and values are the + parsed values of those elements. + + Example + ------- + >>> from docopt import docopt + >>> doc = ''' + Usage: + my_program tcp [--timeout=] + my_program serial [--baud=] [--timeout=] + my_program (-h | --help | --version) + + Options: + -h, --help Show this screen and exit. + --baud= Baudrate [default: 9600] + ''' + >>> argv = ['tcp', '127.0.0.1', '80', '--timeout', '30'] + >>> docopt(doc, argv) + {'--baud': '9600', + '--help': False, + '--timeout': '30', + '--version': False, + '': '127.0.0.1', + '': '80', + 'serial': False, + 'tcp': True} + + See also + -------- + * For video introduction see http://docopt.org + * Full documentation is available in README.rst as well as online + at https://github.com/docopt/docopt#readme + + """ + if argv is None: + argv = sys.argv[1:] + DocoptExit.usage = printable_usage(doc) + options = parse_defaults(doc) + pattern = parse_pattern(formal_usage(DocoptExit.usage), options) + # [default] syntax for argument is disabled + #for a in pattern.flat(Argument): + # same_name = [d for d in arguments if d.name == a.name] + # if same_name: + # a.value = same_name[0].value + argv = parse_argv(TokenStream(argv, DocoptExit), list(options), + options_first) + pattern_options = set(pattern.flat(Option)) + for ao in pattern.flat(AnyOptions): + doc_options = parse_defaults(doc) + ao.children = list(set(doc_options) - pattern_options) + #if any_options: + # ao.children += [Option(o.short, o.long, o.argcount) + # for o in argv if type(o) is Option] + extras(help, version, argv, doc) + matched, left, collected = pattern.fix().match(argv) + if matched and left == []: # better error message if left? + return Dict((a.name, a.value) for a in (pattern.flat() + collected)) + raise DocoptExit() diff --git a/listen.py b/listen.py index 78800f4..d0cc07e 100755 --- a/listen.py +++ b/listen.py @@ -1,27 +1,48 @@ #!/usr/bin/env python -# Original script written by Stuart Colville: http://muffinresearch.co.uk/archives/2010/10/15/fake-smtp-server-with-python/ -"""A noddy fake smtp server.""" +"""Dummy-SMPT - A noddy fake smtp server + +Usage: + listen.py + listen.py [--ip=127.0.0.1] [--port=25] + listen.py (-h | --help) + +Options: + -h --help Show this screen + --ip=IP IP to listen to [default: localhost] + --port=PORT Port to bind to [default: 25] + +""" import smtpd import asyncore import time +import socket +from email.parser import Parser +from docopt import docopt class FakeSMTPServer(smtpd.SMTPServer): """A Fake smtp server""" def __init__(*args, **kwargs): - print "Running fake smtp server on port 25" smtpd.SMTPServer.__init__(*args, **kwargs) def process_message(*args, **kwargs): + headers = Parser().parsestr(args[4]) mail = open("mails/"+str(time.time())+".eml", "w") - print "New mail from " + args[2] + print "New mail from " + headers['from'] + " to " + headers['to'] + " - " + headers['subject'] mail.write(args[4]) mail.close pass if __name__ == "__main__": - smtp_server = FakeSMTPServer(('localhost', 25), None) + arguments = docopt(__doc__, version='Dummy-SMTP') + + try: + smtp_server = FakeSMTPServer((arguments['--ip'], int(arguments['--port'])), None) + print "Running dummy SMTP server on " + arguments['--ip'] + ":" + arguments['--port'] + except socket.error: + print "Permission denied to " + arguments['--ip'] + ":" + arguments['--port'] + ". Try using sudo to run this script." + try: asyncore.loop() except KeyboardInterrupt: