#!/usr/bin/env python """ General Utilities (part of web.py) """ __all__ = [ "Storage", "storage", "storify", "Counter", "counter", "iters", "rstrips", "lstrips", "strips", "safeunicode", "safestr", "utf8", "TimeoutError", "timelimit", "Memoize", "memoize", "re_compile", "re_subm", "group", "uniq", "iterview", "IterBetter", "iterbetter", "safeiter", "safewrite", "dictreverse", "dictfind", "dictfindall", "dictincr", "dictadd", "requeue", "restack", "listget", "intget", "datestr", "numify", "denumify", "commify", "dateify", "nthstr", "cond", "CaptureStdout", "capturestdout", "Profile", "profile", "tryall", "ThreadedDict", "threadeddict", "autoassign", "to36", "safemarkdown", "sendmail" ] import re, sys, time, threading, itertools, traceback, os try: import subprocess except ImportError: subprocess = None try: import datetime except ImportError: pass try: set except NameError: from sets import Set as set try: from threading import local as threadlocal except ImportError: from python23 import threadlocal class Storage(dict): """ A Storage object is like a dictionary except `obj.foo` can be used in addition to `obj['foo']`. >>> o = storage(a=1) >>> o.a 1 >>> o['a'] 1 >>> o.a = 2 >>> o['a'] 2 >>> del o.a >>> o.a Traceback (most recent call last): ... AttributeError: 'a' """ def __getattr__(self, key): try: return self[key] except KeyError, k: raise AttributeError, k def __setattr__(self, key, value): self[key] = value def __delattr__(self, key): try: del self[key] except KeyError, k: raise AttributeError, k def __repr__(self): return '' storage = Storage def storify(mapping, *requireds, **defaults): """ Creates a `storage` object from dictionary `mapping`, raising `KeyError` if d doesn't have all of the keys in `requireds` and using the default values for keys found in `defaults`. For example, `storify({'a':1, 'c':3}, b=2, c=0)` will return the equivalent of `storage({'a':1, 'b':2, 'c':3})`. If a `storify` value is a list (e.g. multiple values in a form submission), `storify` returns the last element of the list, unless the key appears in `defaults` as a list. Thus: >>> storify({'a':[1, 2]}).a 2 >>> storify({'a':[1, 2]}, a=[]).a [1, 2] >>> storify({'a':1}, a=[]).a [1] >>> storify({}, a=[]).a [] Similarly, if the value has a `value` attribute, `storify will return _its_ value, unless the key appears in `defaults` as a dictionary. >>> storify({'a':storage(value=1)}).a 1 >>> storify({'a':storage(value=1)}, a={}).a >>> storify({}, a={}).a {} Optionally, keyword parameter `_unicode` can be passed to convert all values to unicode. >>> storify({'x': 'a'}, _unicode=True) >>> storify({'x': storage(value='a')}, x={}, _unicode=True) }> >>> storify({'x': storage(value='a')}, _unicode=True) """ _unicode = defaults.pop('_unicode', False) # if _unicode is callable object, use it convert a string to unicode. to_unicode = safeunicode if _unicode is not False and hasattr(_unicode, "__call__"): to_unicode = _unicode def unicodify(s): if _unicode and isinstance(s, str): return to_unicode(s) else: return s def getvalue(x): if hasattr(x, 'file') and hasattr(x, 'value'): return x.value elif hasattr(x, 'value'): return unicodify(x.value) else: return unicodify(x) stor = Storage() for key in requireds + tuple(mapping.keys()): value = mapping[key] if isinstance(value, list): if isinstance(defaults.get(key), list): value = [getvalue(x) for x in value] else: value = value[-1] if not isinstance(defaults.get(key), dict): value = getvalue(value) if isinstance(defaults.get(key), list) and not isinstance(value, list): value = [value] setattr(stor, key, value) for (key, value) in defaults.iteritems(): result = value if hasattr(stor, key): result = stor[key] if value == () and not isinstance(result, tuple): result = (result,) setattr(stor, key, result) return stor class Counter(storage): """Keeps count of how many times something is added. >>> c = counter() >>> c.add('x') >>> c.add('x') >>> c.add('x') >>> c.add('x') >>> c.add('x') >>> c.add('y') >>> c >>> c.most() ['x'] """ def add(self, n): self.setdefault(n, 0) self[n] += 1 def most(self): """Returns the keys with maximum count.""" m = max(self.itervalues()) return [k for k, v in self.iteritems() if v == m] def least(self): """Returns the keys with mininum count.""" m = min(self.itervalues()) return [k for k, v in self.iteritems() if v == m] def percent(self, key): """Returns what percentage a certain key is of all entries. >>> c = counter() >>> c.add('x') >>> c.add('x') >>> c.add('x') >>> c.add('y') >>> c.percent('x') 0.75 >>> c.percent('y') 0.25 """ return float(self[key])/sum(self.values()) def sorted_keys(self): """Returns keys sorted by value. >>> c = counter() >>> c.add('x') >>> c.add('x') >>> c.add('y') >>> c.sorted_keys() ['x', 'y'] """ return sorted(self.keys(), key=lambda k: self[k], reverse=True) def sorted_values(self): """Returns values sorted by value. >>> c = counter() >>> c.add('x') >>> c.add('x') >>> c.add('y') >>> c.sorted_values() [2, 1] """ return [self[k] for k in self.sorted_keys()] def sorted_items(self): """Returns items sorted by value. >>> c = counter() >>> c.add('x') >>> c.add('x') >>> c.add('y') >>> c.sorted_items() [('x', 2), ('y', 1)] """ return [(k, self[k]) for k in self.sorted_keys()] def __repr__(self): return '' counter = Counter iters = [list, tuple] import __builtin__ if hasattr(__builtin__, 'set'): iters.append(set) if hasattr(__builtin__, 'frozenset'): iters.append(set) if sys.version_info < (2,6): # sets module deprecated in 2.6 try: from sets import Set iters.append(Set) except ImportError: pass class _hack(tuple): pass iters = _hack(iters) iters.__doc__ = """ A list of iterable items (like lists, but not strings). Includes whichever of lists, tuples, sets, and Sets are available in this version of Python. """ def _strips(direction, text, remove): if isinstance(remove, iters): for subr in remove: text = _strips(direction, text, subr) return text if direction == 'l': if text.startswith(remove): return text[len(remove):] elif direction == 'r': if text.endswith(remove): return text[:-len(remove)] else: raise ValueError, "Direction needs to be r or l." return text def rstrips(text, remove): """ removes the string `remove` from the right of `text` >>> rstrips("foobar", "bar") 'foo' """ return _strips('r', text, remove) def lstrips(text, remove): """ removes the string `remove` from the left of `text` >>> lstrips("foobar", "foo") 'bar' >>> lstrips('http://foo.org/', ['http://', 'https://']) 'foo.org/' >>> lstrips('FOOBARBAZ', ['FOO', 'BAR']) 'BAZ' >>> lstrips('FOOBARBAZ', ['BAR', 'FOO']) 'BARBAZ' """ return _strips('l', text, remove) def strips(text, remove): """ removes the string `remove` from the both sides of `text` >>> strips("foobarfoo", "foo") 'bar' """ return rstrips(lstrips(text, remove), remove) def safeunicode(obj, encoding='utf-8'): r""" Converts any given object to unicode string. >>> safeunicode('hello') u'hello' >>> safeunicode(2) u'2' >>> safeunicode('\xe1\x88\xb4') u'\u1234' """ t = type(obj) if t is unicode: return obj elif t is str: return obj.decode(encoding) elif t in [int, float, bool]: return unicode(obj) elif hasattr(obj, '__unicode__') or isinstance(obj, unicode): return unicode(obj) else: return str(obj).decode(encoding) def safestr(obj, encoding='utf-8'): r""" Converts any given object to utf-8 encoded string. >>> safestr('hello') 'hello' >>> safestr(u'\u1234') '\xe1\x88\xb4' >>> safestr(2) '2' """ if isinstance(obj, unicode): return obj.encode(encoding) elif isinstance(obj, str): return obj elif hasattr(obj, 'next'): # iterator return itertools.imap(safestr, obj) else: return str(obj) # for backward-compatibility utf8 = safestr class TimeoutError(Exception): pass def timelimit(timeout): """ A decorator to limit a function to `timeout` seconds, raising `TimeoutError` if it takes longer. >>> import time >>> def meaningoflife(): ... time.sleep(.2) ... return 42 >>> >>> timelimit(.1)(meaningoflife)() Traceback (most recent call last): ... TimeoutError: took too long >>> timelimit(1)(meaningoflife)() 42 _Caveat:_ The function isn't stopped after `timeout` seconds but continues executing in a separate thread. (There seems to be no way to kill a thread.) inspired by """ def _1(function): def _2(*args, **kw): class Dispatch(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.result = None self.error = None self.setDaemon(True) self.start() def run(self): try: self.result = function(*args, **kw) except: self.error = sys.exc_info() c = Dispatch() c.join(timeout) if c.isAlive(): raise TimeoutError, 'took too long' if c.error: raise c.error[0], c.error[1] return c.result return _2 return _1 class Memoize: """ 'Memoizes' a function, caching its return values for each input. If `expires` is specified, values are recalculated after `expires` seconds. If `background` is specified, values are recalculated in a separate thread. >>> calls = 0 >>> def howmanytimeshaveibeencalled(): ... global calls ... calls += 1 ... return calls >>> fastcalls = memoize(howmanytimeshaveibeencalled) >>> howmanytimeshaveibeencalled() 1 >>> howmanytimeshaveibeencalled() 2 >>> fastcalls() 3 >>> fastcalls() 3 >>> import time >>> fastcalls = memoize(howmanytimeshaveibeencalled, .1, background=False) >>> fastcalls() 4 >>> fastcalls() 4 >>> time.sleep(.2) >>> fastcalls() 5 >>> def slowfunc(): ... time.sleep(.1) ... return howmanytimeshaveibeencalled() >>> fastcalls = memoize(slowfunc, .2, background=True) >>> fastcalls() 6 >>> timelimit(.05)(fastcalls)() 6 >>> time.sleep(.2) >>> timelimit(.05)(fastcalls)() 6 >>> timelimit(.05)(fastcalls)() 6 >>> time.sleep(.2) >>> timelimit(.05)(fastcalls)() 7 >>> fastcalls = memoize(slowfunc, None, background=True) >>> threading.Thread(target=fastcalls).start() >>> time.sleep(.01) >>> fastcalls() 9 """ def __init__(self, func, expires=None, background=True): self.func = func self.cache = {} self.expires = expires self.background = background self.running = {} def __call__(self, *args, **keywords): key = (args, tuple(keywords.items())) if not self.running.get(key): self.running[key] = threading.Lock() def update(block=False): if self.running[key].acquire(block): try: self.cache[key] = (self.func(*args, **keywords), time.time()) finally: self.running[key].release() if key not in self.cache: update(block=True) elif self.expires and (time.time() - self.cache[key][1]) > self.expires: if self.background: threading.Thread(target=update).start() else: update() return self.cache[key][0] memoize = Memoize re_compile = memoize(re.compile) #@@ threadsafe? re_compile.__doc__ = """ A memoized version of re.compile. """ class _re_subm_proxy: def __init__(self): self.match = None def __call__(self, match): self.match = match return '' def re_subm(pat, repl, string): """ Like re.sub, but returns the replacement _and_ the match object. >>> t, m = re_subm('g(oo+)fball', r'f\\1lish', 'goooooofball') >>> t 'foooooolish' >>> m.groups() ('oooooo',) """ compiled_pat = re_compile(pat) proxy = _re_subm_proxy() compiled_pat.sub(proxy.__call__, string) return compiled_pat.sub(repl, string), proxy.match def group(seq, size): """ Returns an iterator over a series of lists of length size from iterable. >>> list(group([1,2,3,4], 2)) [[1, 2], [3, 4]] >>> list(group([1,2,3,4,5], 2)) [[1, 2], [3, 4], [5]] """ def take(seq, n): for i in xrange(n): yield seq.next() if not hasattr(seq, 'next'): seq = iter(seq) while True: x = list(take(seq, size)) if x: yield x else: break def uniq(seq, key=None): """ Removes duplicate elements from a list while preserving the order of the rest. >>> uniq([9,0,2,1,0]) [9, 0, 2, 1] The value of the optional `key` parameter should be a function that takes a single argument and returns a key to test the uniqueness. >>> uniq(["Foo", "foo", "bar"], key=lambda s: s.lower()) ['Foo', 'bar'] """ key = key or (lambda x: x) seen = set() result = [] for v in seq: k = key(v) if k in seen: continue seen.add(k) result.append(v) return result def iterview(x): """ Takes an iterable `x` and returns an iterator over it which prints its progress to stderr as it iterates through. """ WIDTH = 70 def plainformat(n, lenx): return '%5.1f%% (%*d/%d)' % ((float(n)/lenx)*100, len(str(lenx)), n, lenx) def bars(size, n, lenx): val = int((float(n)*size)/lenx + 0.5) if size - val: spacing = ">" + (" "*(size-val))[1:] else: spacing = "" return "[%s%s]" % ("="*val, spacing) def eta(elapsed, n, lenx): if n == 0: return '--:--:--' if n == lenx: secs = int(elapsed) else: secs = int((elapsed/n) * (lenx-n)) mins, secs = divmod(secs, 60) hrs, mins = divmod(mins, 60) return '%02d:%02d:%02d' % (hrs, mins, secs) def format(starttime, n, lenx): out = plainformat(n, lenx) + ' ' if n == lenx: end = ' ' else: end = ' ETA ' end += eta(time.time() - starttime, n, lenx) out += bars(WIDTH - len(out) - len(end), n, lenx) out += end return out starttime = time.time() lenx = len(x) for n, y in enumerate(x): sys.stderr.write('\r' + format(starttime, n, lenx)) yield y sys.stderr.write('\r' + format(starttime, n+1, lenx) + '\n') class IterBetter: """ Returns an object that can be used as an iterator but can also be used via __getitem__ (although it cannot go backwards -- that is, you cannot request `iterbetter[0]` after requesting `iterbetter[1]`). >>> import itertools >>> c = iterbetter(itertools.count()) >>> c[1] 1 >>> c[5] 5 >>> c[3] Traceback (most recent call last): ... IndexError: already passed 3 For boolean test, IterBetter peeps at first value in the itertor without effecting the iteration. >>> c = iterbetter(iter(range(5))) >>> bool(c) True >>> list(c) [0, 1, 2, 3, 4] >>> c = iterbetter(iter([])) >>> bool(c) False >>> list(c) [] """ def __init__(self, iterator): self.i, self.c = iterator, 0 def __iter__(self): if hasattr(self, "_head"): yield self._head while 1: yield self.i.next() self.c += 1 def __getitem__(self, i): #todo: slices if i < self.c: raise IndexError, "already passed "+str(i) try: while i > self.c: self.i.next() self.c += 1 # now self.c == i self.c += 1 return self.i.next() except StopIteration: raise IndexError, str(i) def __nonzero__(self): if hasattr(self, "__len__"): return len(self) != 0 elif hasattr(self, "_head"): return True else: try: self._head = self.i.next() except StopIteration: return False else: return True iterbetter = IterBetter def safeiter(it, cleanup=None, ignore_errors=True): """Makes an iterator safe by ignoring the exceptions occured during the iteration. """ def next(): while True: try: return it.next() except StopIteration: raise except: traceback.print_exc() it = iter(it) while True: yield next() def safewrite(filename, content): """Writes the content to a temp file and then moves the temp file to given filename to avoid overwriting the existing file in case of errors. """ f = file(filename + '.tmp', 'w') f.write(content) f.close() os.rename(f.name, filename) def dictreverse(mapping): """ Returns a new dictionary with keys and values swapped. >>> dictreverse({1: 2, 3: 4}) {2: 1, 4: 3} """ return dict([(value, key) for (key, value) in mapping.iteritems()]) def dictfind(dictionary, element): """ Returns a key whose value in `dictionary` is `element` or, if none exists, None. >>> d = {1:2, 3:4} >>> dictfind(d, 4) 3 >>> dictfind(d, 5) """ for (key, value) in dictionary.iteritems(): if element is value: return key def dictfindall(dictionary, element): """ Returns the keys whose values in `dictionary` are `element` or, if none exists, []. >>> d = {1:4, 3:4} >>> dictfindall(d, 4) [1, 3] >>> dictfindall(d, 5) [] """ res = [] for (key, value) in dictionary.iteritems(): if element is value: res.append(key) return res def dictincr(dictionary, element): """ Increments `element` in `dictionary`, setting it to one if it doesn't exist. >>> d = {1:2, 3:4} >>> dictincr(d, 1) 3 >>> d[1] 3 >>> dictincr(d, 5) 1 >>> d[5] 1 """ dictionary.setdefault(element, 0) dictionary[element] += 1 return dictionary[element] def dictadd(*dicts): """ Returns a dictionary consisting of the keys in the argument dictionaries. If they share a key, the value from the last argument is used. >>> dictadd({1: 0, 2: 0}, {2: 1, 3: 1}) {1: 0, 2: 1, 3: 1} """ result = {} for dct in dicts: result.update(dct) return result def requeue(queue, index=-1): """Returns the element at index after moving it to the beginning of the queue. >>> x = [1, 2, 3, 4] >>> requeue(x) 4 >>> x [4, 1, 2, 3] """ x = queue.pop(index) queue.insert(0, x) return x def restack(stack, index=0): """Returns the element at index after moving it to the top of stack. >>> x = [1, 2, 3, 4] >>> restack(x) 1 >>> x [2, 3, 4, 1] """ x = stack.pop(index) stack.append(x) return x def listget(lst, ind, default=None): """ Returns `lst[ind]` if it exists, `default` otherwise. >>> listget(['a'], 0) 'a' >>> listget(['a'], 1) >>> listget(['a'], 1, 'b') 'b' """ if len(lst)-1 < ind: return default return lst[ind] def intget(integer, default=None): """ Returns `integer` as an int or `default` if it can't. >>> intget('3') 3 >>> intget('3a') >>> intget('3a', 0) 0 """ try: return int(integer) except (TypeError, ValueError): return default def datestr(then, now=None): """ Converts a (UTC) datetime object to a nice string representation. >>> from datetime import datetime, timedelta >>> d = datetime(1970, 5, 1) >>> datestr(d, now=d) '0 microseconds ago' >>> for t, v in { ... timedelta(microseconds=1): '1 microsecond ago', ... timedelta(microseconds=2): '2 microseconds ago', ... -timedelta(microseconds=1): '1 microsecond from now', ... -timedelta(microseconds=2): '2 microseconds from now', ... timedelta(microseconds=2000): '2 milliseconds ago', ... timedelta(seconds=2): '2 seconds ago', ... timedelta(seconds=2*60): '2 minutes ago', ... timedelta(seconds=2*60*60): '2 hours ago', ... timedelta(days=2): '2 days ago', ... }.iteritems(): ... assert datestr(d, now=d+t) == v >>> datestr(datetime(1970, 1, 1), now=d) 'January 1' >>> datestr(datetime(1969, 1, 1), now=d) 'January 1, 1969' >>> datestr(datetime(1970, 6, 1), now=d) 'June 1, 1970' >>> datestr(None) '' """ def agohence(n, what, divisor=None): if divisor: n = n // divisor out = str(abs(n)) + ' ' + what # '2 day' if abs(n) != 1: out += 's' # '2 days' out += ' ' # '2 days ' if n < 0: out += 'from now' else: out += 'ago' return out # '2 days ago' oneday = 24 * 60 * 60 if not then: return "" if not now: now = datetime.datetime.utcnow() if type(now).__name__ == "DateTime": now = datetime.datetime.fromtimestamp(now) if type(then).__name__ == "DateTime": then = datetime.datetime.fromtimestamp(then) elif type(then).__name__ == "date": then = datetime.datetime(then.year, then.month, then.day) delta = now - then deltaseconds = int(delta.days * oneday + delta.seconds + delta.microseconds * 1e-06) deltadays = abs(deltaseconds) // oneday if deltaseconds < 0: deltadays *= -1 # fix for oddity of floor if deltadays: if abs(deltadays) < 4: return agohence(deltadays, 'day') try: out = then.strftime('%B %e') # e.g. 'June 3' except ValueError: # %e doesn't work on Windows. out = then.strftime('%B %d') # e.g. 'June 03' if then.year != now.year or deltadays < 0: out += ', %s' % then.year return out if int(deltaseconds): if abs(deltaseconds) > (60 * 60): return agohence(deltaseconds, 'hour', 60 * 60) elif abs(deltaseconds) > 60: return agohence(deltaseconds, 'minute', 60) else: return agohence(deltaseconds, 'second') deltamicroseconds = delta.microseconds if delta.days: deltamicroseconds = int(delta.microseconds - 1e6) # datetime oddity if abs(deltamicroseconds) > 1000: return agohence(deltamicroseconds, 'millisecond', 1000) return agohence(deltamicroseconds, 'microsecond') def numify(string): """ Removes all non-digit characters from `string`. >>> numify('800-555-1212') '8005551212' >>> numify('800.555.1212') '8005551212' """ return ''.join([c for c in str(string) if c.isdigit()]) def denumify(string, pattern): """ Formats `string` according to `pattern`, where the letter X gets replaced by characters from `string`. >>> denumify("8005551212", "(XXX) XXX-XXXX") '(800) 555-1212' """ out = [] for c in pattern: if c == "X": out.append(string[0]) string = string[1:] else: out.append(c) return ''.join(out) def commify(n): """ Add commas to an integer `n`. >>> commify(1) '1' >>> commify(123) '123' >>> commify(1234) '1,234' >>> commify(1234567890) '1,234,567,890' >>> commify(123.0) '123.0' >>> commify(1234.5) '1,234.5' >>> commify(1234.56789) '1,234.56789' >>> commify('%.2f' % 1234.5) '1,234.50' >>> commify(None) >>> """ if n is None: return None n = str(n) if '.' in n: dollars, cents = n.split('.') else: dollars, cents = n, None r = [] for i, c in enumerate(str(dollars)[::-1]): if i and (not (i % 3)): r.insert(0, ',') r.insert(0, c) out = ''.join(r) if cents: out += '.' + cents return out def dateify(datestring): """ Formats a numified `datestring` properly. """ return denumify(datestring, "XXXX-XX-XX XX:XX:XX") def nthstr(n): """ Formats an ordinal. Doesn't handle negative numbers. >>> nthstr(1) '1st' >>> nthstr(0) '0th' >>> [nthstr(x) for x in [2, 3, 4, 5, 10, 11, 12, 13, 14, 15]] ['2nd', '3rd', '4th', '5th', '10th', '11th', '12th', '13th', '14th', '15th'] >>> [nthstr(x) for x in [91, 92, 93, 94, 99, 100, 101, 102]] ['91st', '92nd', '93rd', '94th', '99th', '100th', '101st', '102nd'] >>> [nthstr(x) for x in [111, 112, 113, 114, 115]] ['111th', '112th', '113th', '114th', '115th'] """ assert n >= 0 if n % 100 in [11, 12, 13]: return '%sth' % n return {1: '%sst', 2: '%snd', 3: '%srd'}.get(n % 10, '%sth') % n def cond(predicate, consequence, alternative=None): """ Function replacement for if-else to use in expressions. >>> x = 2 >>> cond(x % 2 == 0, "even", "odd") 'even' >>> cond(x % 2 == 0, "even", "odd") + '_row' 'even_row' """ if predicate: return consequence else: return alternative class CaptureStdout: """ Captures everything `func` prints to stdout and returns it instead. >>> def idiot(): ... print "foo" >>> capturestdout(idiot)() 'foo\\n' **WARNING:** Not threadsafe! """ def __init__(self, func): self.func = func def __call__(self, *args, **keywords): from cStringIO import StringIO # Not threadsafe! out = StringIO() oldstdout = sys.stdout sys.stdout = out try: self.func(*args, **keywords) finally: sys.stdout = oldstdout return out.getvalue() capturestdout = CaptureStdout class Profile: """ Profiles `func` and returns a tuple containing its output and a string with human-readable profiling information. >>> import time >>> out, inf = profile(time.sleep)(.001) >>> out >>> inf[:10].strip() 'took 0.0' """ def __init__(self, func): self.func = func def __call__(self, *args): ##, **kw): kw unused import hotshot, hotshot.stats, os, tempfile ##, time already imported f, filename = tempfile.mkstemp() os.close(f) prof = hotshot.Profile(filename) stime = time.time() result = prof.runcall(self.func, *args) stime = time.time() - stime prof.close() import cStringIO out = cStringIO.StringIO() stats = hotshot.stats.load(filename) stats.stream = out stats.strip_dirs() stats.sort_stats('time', 'calls') stats.print_stats(40) stats.print_callers() x = '\n\ntook '+ str(stime) + ' seconds\n' x += out.getvalue() # remove the tempfile try: os.remove(filename) except IOError: pass return result, x profile = Profile import traceback # hack for compatibility with Python 2.3: if not hasattr(traceback, 'format_exc'): from cStringIO import StringIO def format_exc(limit=None): strbuf = StringIO() traceback.print_exc(limit, strbuf) return strbuf.getvalue() traceback.format_exc = format_exc def tryall(context, prefix=None): """ Tries a series of functions and prints their results. `context` is a dictionary mapping names to values; the value will only be tried if it's callable. >>> tryall(dict(j=lambda: True)) j: True ---------------------------------------- results: True: 1 For example, you might have a file `test/stuff.py` with a series of functions testing various things in it. At the bottom, have a line: if __name__ == "__main__": tryall(globals()) Then you can run `python test/stuff.py` and get the results of all the tests. """ context = context.copy() # vars() would update results = {} for (key, value) in context.iteritems(): if not hasattr(value, '__call__'): continue if prefix and not key.startswith(prefix): continue print key + ':', try: r = value() dictincr(results, r) print r except: print 'ERROR' dictincr(results, 'ERROR') print ' ' + '\n '.join(traceback.format_exc().split('\n')) print '-'*40 print 'results:' for (key, value) in results.iteritems(): print ' '*2, str(key)+':', value class ThreadedDict(threadlocal): """ Thread local storage. >>> d = ThreadedDict() >>> d.x = 1 >>> d.x 1 >>> import threading >>> def f(): d.x = 2 ... >>> t = threading.Thread(target=f) >>> t.start() >>> t.join() >>> d.x 1 """ _instances = set() def __init__(self): ThreadedDict._instances.add(self) def __del__(self): ThreadedDict._instances.remove(self) def __hash__(self): return id(self) def clear_all(): """Clears all ThreadedDict instances. """ for t in list(ThreadedDict._instances): t.clear() clear_all = staticmethod(clear_all) # Define all these methods to more or less fully emulate dict -- attribute access # is built into threading.local. def __getitem__(self, key): return self.__dict__[key] def __setitem__(self, key, value): self.__dict__[key] = value def __delitem__(self, key): del self.__dict__[key] def __contains__(self, key): return key in self.__dict__ has_key = __contains__ def clear(self): self.__dict__.clear() def copy(self): return self.__dict__.copy() def get(self, key, default=None): return self.__dict__.get(key, default) def items(self): return self.__dict__.items() def iteritems(self): return self.__dict__.iteritems() def keys(self): return self.__dict__.keys() def iterkeys(self): return self.__dict__.iterkeys() iter = iterkeys def values(self): return self.__dict__.values() def itervalues(self): return self.__dict__.itervalues() def pop(self, key, *args): return self.__dict__.pop(key, *args) def popitem(self): return self.__dict__.popitem() def setdefault(self, key, default=None): return self.__dict__.setdefault(key, default) def update(self, *args, **kwargs): self.__dict__.update(*args, **kwargs) def __repr__(self): return '' % self.__dict__ __str__ = __repr__ threadeddict = ThreadedDict def autoassign(self, locals): """ Automatically assigns local variables to `self`. >>> self = storage() >>> autoassign(self, dict(a=1, b=2)) >>> self Generally used in `__init__` methods, as in: def __init__(self, foo, bar, baz=1): autoassign(self, locals()) """ for (key, value) in locals.iteritems(): if key == 'self': continue setattr(self, key, value) def to36(q): """ Converts an integer to base 36 (a useful scheme for human-sayable IDs). >>> to36(35) 'z' >>> to36(119292) '2k1o' >>> int(to36(939387374), 36) 939387374 >>> to36(0) '0' >>> to36(-393) Traceback (most recent call last): ... ValueError: must supply a positive integer """ if q < 0: raise ValueError, "must supply a positive integer" letters = "0123456789abcdefghijklmnopqrstuvwxyz" converted = [] while q != 0: q, r = divmod(q, 36) converted.insert(0, letters[r]) return "".join(converted) or '0' r_url = re_compile('(?', text) text = markdown(text) return text def sendmail(from_address, to_address, subject, message, headers=None, **kw): """ Sends the email message `message` with mail and envelope headers for from `from_address_` to `to_address` with `subject`. Additional email headers can be specified with the dictionary `headers. Optionally cc, bcc and attachments can be specified as keyword arguments. Attachments must be an iterable and each attachment can be either a filename or a file object or a dictionary with filename, content and optionally content_type keys. If `web.config.smtp_server` is set, it will send the message to that SMTP server. Otherwise it will look for `/usr/sbin/sendmail`, the typical location for the sendmail-style binary. To use sendmail from a different path, set `web.config.sendmail_path`. """ attachments = kw.pop("attachments", []) mail = _EmailMessage(from_address, to_address, subject, message, headers, **kw) for a in attachments: if isinstance(a, dict): mail.attach(a['filename'], a['content'], a.get('content_type')) elif hasattr(a, 'read'): # file filename = os.path.basename(getattr(a, "name", "")) content_type = getattr(a, 'content_type', None) mail.attach(filename, a.read(), content_type) elif isinstance(a, basestring): f = open(a, 'rb') content = f.read() f.close() filename = os.path.basename(a) mail.attach(filename, content, None) else: raise ValueError, "Invalid attachment: %s" % repr(a) mail.send() class _EmailMessage: def __init__(self, from_address, to_address, subject, message, headers=None, **kw): def listify(x): if not isinstance(x, list): return [safestr(x)] else: return [safestr(a) for a in x] subject = safestr(subject) message = safestr(message) from_address = safestr(from_address) to_address = listify(to_address) cc = listify(kw.get('cc', [])) bcc = listify(kw.get('bcc', [])) recipients = to_address + cc + bcc import email.Utils self.from_address = email.Utils.parseaddr(from_address)[1] self.recipients = [email.Utils.parseaddr(r)[1] for r in recipients] self.headers = dictadd({ 'From': from_address, 'To': ", ".join(to_address), 'Subject': subject }, headers or {}) if cc: self.headers['Cc'] = ", ".join(cc) self.message = self.new_message() self.message.add_header("Content-Transfer-Encoding", "7bit") self.message.add_header("Content-Disposition", "inline") self.message.add_header("MIME-Version", "1.0") self.message.set_payload(message, 'utf-8') self.multipart = False def new_message(self): from email.Message import Message return Message() def attach(self, filename, content, content_type=None): if not self.multipart: msg = self.new_message() msg.add_header("Content-Type", "multipart/mixed") msg.attach(self.message) self.message = msg self.multipart = True import mimetypes try: from email import encoders except: from email import Encoders as encoders content_type = content_type or mimetypes.guess_type(filename)[0] or "applcation/octet-stream" msg = self.new_message() msg.set_payload(content) msg.add_header('Content-Type', content_type) msg.add_header('Content-Disposition', 'attachment', filename=filename) if not content_type.startswith("text/"): encoders.encode_base64(msg) self.message.attach(msg) def prepare_message(self): for k, v in self.headers.iteritems(): if k.lower() == "content-type": self.message.set_type(v) else: self.message.add_header(k, v) self.headers = {} def send(self): try: import webapi except ImportError: webapi = Storage(config=Storage()) self.prepare_message() message_text = self.message.as_string() if webapi.config.get('smtp_server'): server = webapi.config.get('smtp_server') port = webapi.config.get('smtp_port', 0) username = webapi.config.get('smtp_username') password = webapi.config.get('smtp_password') debug_level = webapi.config.get('smtp_debuglevel', None) starttls = webapi.config.get('smtp_starttls', False) import smtplib smtpserver = smtplib.SMTP(server, port) if debug_level: smtpserver.set_debuglevel(debug_level) if starttls: smtpserver.ehlo() smtpserver.starttls() smtpserver.ehlo() if username and password: smtpserver.login(username, password) smtpserver.sendmail(self.from_address, self.recipients, message_text) smtpserver.quit() elif webapi.config.get('email_engine') == 'aws': import boto.ses c = boto.ses.SESConnection( aws_access_key_id=webapi.config.get('aws_access_key_id'), aws_secret_access_key=web.api.config.get('aws_secret_access_key')) c.send_raw_email(self.from_address, message_text, self.from_recipients) else: sendmail = webapi.config.get('sendmail_path', '/usr/sbin/sendmail') assert not self.from_address.startswith('-'), 'security' for r in self.recipients: assert not r.startswith('-'), 'security' cmd = [sendmail, '-f', self.from_address] + self.recipients if subprocess: p = subprocess.Popen(cmd, stdin=subprocess.PIPE) p.stdin.write(message_text) p.stdin.close() p.wait() else: i, o = os.popen2(cmd) i.write(message) i.close() o.close() del i, o def __repr__(self): return "" def __str__(self): return self.message.as_string() if __name__ == "__main__": import doctest doctest.testmod()