Source code for open_facebook.utils

import logging
import re
import sys
import functools
import json

from django.utils import six
try:
    unicode = unicode
except NameError:
    unicode = str

logger = logging.getLogger(__name__)
URL_PARAM_RE = re.compile('(?P<k>[^(=|&)]+)=(?P<v>[^&]+)(&|$)')
URL_PARAM_NO_VALUE_RE = re.compile('(?P<k>[^(&|?)]+)(&|$)')


[docs]def import_statsd(): ''' Import only the statd by wolph not the mozilla statsd TODO: Move to mozilla statds which is more widely used ''' try: # check to see if the django_statsd we found # supports start (stop) timing. import django_statsd is_wolphs_statsd = hasattr( django_statsd, 'start') and hasattr(django_statsd, 'stop') if not is_wolphs_statsd: django_statsd = None except ImportError: django_statsd = None return django_statsd
django_statsd = import_statsd()
[docs]def start_statsd(path): ''' Simple wrapper to save some typing ''' if django_statsd: django_statsd.start(path)
def stop_statsd(path): if django_statsd: django_statsd.stop(path)
[docs]def base64_url_decode_php_style(inp): ''' PHP follows a slightly different protocol for base64 url decode. For a full explanation see: http://stackoverflow.com/questions/3302946/how-to-base64-url-decode-in-python and http://sunilarora.org/parsing-signedrequest-parameter-in-python-bas ''' import base64 padding_factor = (4 - len(inp) % 4) % 4 inp += "=" * padding_factor return base64.b64decode(unicode(inp).translate( dict(zip(map(ord, u'-_'), u'+/'))))
[docs]def encode_params(params_dict): ''' Take the dictionary of params and encode keys and values to ascii if it's unicode ''' encoded = [(smart_str(k), smart_str(v)) for k, v in params_dict.items()] encoded_dict = dict(encoded) return encoded_dict
[docs]def smart_str(s, encoding='utf-8', strings_only=False, errors='strict'): """ Adapted from django, needed for urlencoding Returns a bytestring version of 's', encoded as specified in 'encoding'. If strings_only is True, don't convert (some) non-string-like objects. """ import types if strings_only and isinstance(s, (types.NoneType, int)): return s elif not isinstance(s, six.string_types): try: return str(s) except UnicodeEncodeError: if isinstance(s, Exception): # An Exception subclass containing non-ASCII data that doesn't # know how to print itself properly. We shouldn't raise a # further exception. return ' '.join([smart_str(arg, encoding, strings_only, errors) for arg in s]) return unicode(s).encode(encoding, errors) elif isinstance(s, unicode): return s.encode(encoding, errors) elif s and encoding != 'utf-8': return s.decode('utf-8', errors).encode(encoding, errors) else: return s
[docs]def send_warning(message, request=None, e=None, **extra_data): ''' Uses the logging system to send a message to logging and sentry ''' username = None if request and request.user.is_authenticated(): username = request.user.username error_message = None if e: error_message = unicode(e) data = { 'username': username, 'body': error_message, } data.update(extra_data) logger.warn(message, exc_info=sys.exc_info(), extra={ 'request': request, 'data': data })
[docs]def merge_urls(generated_url, human_url): ''' merge the generated_url with the human_url following this rules: params introduced by generated_url are kept final params order comes from generated_url there's an hack to support things like this http://url?param&param=value >>> gen = "http://mysite.com?p1=a&p2=b&p3=c&p4=d" >>> hum = "http://mysite.com?p4=D&p3=C&p2=B" >>> merge_urls(gen, hum) u'http://mysite.com?p1=a&p2=B&p3=C&p4=D' >>> gen = "http://mysite.com?id=a&id_s=b&p_id=d" >>> hum = "http://mysite.com?id=A&id_s=B&p_id=D" >>> merge_urls(gen, hum) u'http://mysite.com?id=A&id_s=B&p_id=D' >>> gen = "http://mysite.com?p1=a&p2=b&p3=c&p4=d" >>> hum = "http://mysite.com" >>> merge_urls(gen, hum) u'http://mysite.com' >>> gen = "http://ad.zanox.com/ppc/?18595160C2000463397T&zpar4=scrapbook&zpar0=e2494344_c4385641&zpar1=not_authenticated&zpar2=unknown_campaign&zpar3=unknown_ref&ULP=http://www.asos.com/ASOS/ASOS-MARS-Loafer-Shoes/Prod/pgeproduct.aspx?iid=1703516&cid=4172&sh=0&pge=2&pgesize=20&sort=-1&clr=Black&affId=2441" >>> hum = "http://ad.zanox.com/ppc/?18595160C2000463397T&zpar3=scrapbook&ULP=http://www.asos.com/ASOS/ASOS-MARS-Loafer-Shoes/Prod/pgeproduct.aspx?iid=1703516&cid=4172&sh=0&pge=2&pgesize=20&sort=-1&clr=Black&affId=2441" >>> merge_urls(gen, hum) u'http://ad.zanox.com/ppc/?18595160C2000463397T&zpar4=scrapbook&zpar0=e2494344_c4385641&zpar1=not_authenticated&zpar2=unknown_campaign&zpar3=scrapbook&ULP=http://www.asos.com/ASOS/ASOS-MARS-Loafer-Shoes/Prod/pgeproduct.aspx?iid=1703516&cid=4172&sh=0&pge=2&pgesize=20&sort=-1&clr=Black&affId=2441' >>> gen = "http://mysite.com?invalidparam&p=2" >>> hum = "http://mysite.com?p=1" >>> merge_urls(gen, hum) u'http://mysite.com?invalidparam&p=1' ''' if '?' not in human_url: return u'%s' % human_url gen_path, gen_args = generated_url.split('?', 1) hum_path, hum_args = human_url.split('?', 1) get_args = lambda args: [(m.group('k'), m.group('v')) for m in URL_PARAM_RE.finditer(args)] get_novalues_args = lambda args: [m.group('k') for m in URL_PARAM_NO_VALUE_RE.finditer( args) if "=" not in m.group('k')] hum_dict = dict(get_args(hum_args)) out_args = [] # prepend crazy param w/o values for param in get_novalues_args(gen_args): out_args.append(u'%s' % param) # replace gen url params for k, v in get_args(gen_args): out_args.append(u'%s=%s' % (k, hum_dict.get(k, v))) return u'%s?%s' % (gen_path, '&'.join(out_args))
[docs]class memoized(object): '''Decorator. Caches a function's return value each time it is called. If called later with the same arguments, the cached value is returned (not reevaluated). ''' def __init__(self, func): self.func = func self.cache = {} def __call__(self, *args): try: return self.cache[args] except KeyError: value = self.func(*args) self.cache[args] = value return value except TypeError: # uncachable -- for instance, passing a list as an argument. # Better to not cache than to blow up entirely. return self.func(*args) def __repr__(self): '''Return the function's docstring.''' return self.func.__doc__ def __get__(self, obj, objtype): '''Support instance methods.''' return functools.partial(self.__call__, obj)
[docs]def camel_to_underscore(name): '''Convert camelcase style naming to underscore style naming e.g. SpamEggs -> spam_eggs ''' import string for c in string.ascii_uppercase: name = name.replace(c, '_%c' % c) return name.strip('_').lower()
[docs]def validate_is_instance(instance, classes): ''' Usage validate_is_instance(10, int) validate_is_instance('a', (str, unicode)) ''' if not isinstance(classes, tuple): classes = (classes,) correct_instance = isinstance(instance, classes) if not correct_instance: raise ValueError( 'Expected instance type %s found %s' % (classes, type(instance)))
[docs]def is_json(content): ''' Unfortunately facebook returns 500s which mean they are down Or 500s with a nice error message because you use open graph wrong So we have to figure out which is which :) ''' try: json.loads(content) is_json = True except: is_json = False return is_json