# -*- coding: utf-8 -*-
'''
Open Facebook allows you to use Facebook's open graph API with simple python code
**Features**
* Supported and maintained
* Tested so people can contribute
* Facebook exceptions are mapped
* Logging
**Basic examples**::
facebook = OpenFacebook(access_token)
# Getting info about me
facebook.get('me')
# Learning some more about fashiolista
facebook.get('fashiolista')
# Writing your first comment
facebook.set('fashiolista/comments', message='I love Fashiolista!')
# Posting to a users wall
facebook.set('me/feed', message='check out fashiolista',
url='http://www.fashiolista.com')
# Liking a page
facebook.set('fashiolista/likes')
# Getting who likes cocacola
facebook.set('cocacola/likes')
# Use fql to retrieve your name
facebook.fql('SELECT name FROM user WHERE uid = me()')
# Executing fql in batch
facebook.batch_fql([
'SELECT uid, name, pic_square FROM user WHERE uid = me()',
'SELECT uid, rsvp_status FROM event_member WHERE eid=12345678',
])
# Uploading pictures
photo_urls = [
'http://e.fashiocdn.com/images/entities/0/7/B/I/9/0.365x365.jpg',
'http://e.fashiocdn.com/images/entities/0/5/e/e/r/0.365x365.jpg',
]
for photo in photo_urls:
print facebook.set('me/feed', message='Check out Fashiolista',
picture=photo, url='http://www.fashiolista.com')
**Getting an access token**
Once you get your access token, Open Facebook gives you access to the Facebook API
There are 3 ways of getting a facebook access_token and these are currently
implemented by Django Facebook.
1. code is passed as request parameter and traded for an
access_token using the api
2. code is passed through a signed cookie and traded for an access_token
3. access_token is passed directly (retrieved through javascript, which
would be bad security, or through one of the mobile flows.)
If you are looking to develop your own flow for a different framework have a look at
Facebook's documentation:
http://developers.facebook.com/docs/authentication/
Also have a look at the :class:`.FacebookRequired` decorator and :func:`get_persistent_graph` function to
understand the required functionality
**Api docs**:
'''
from django.http import QueryDict
from django.utils import six
from django.utils.http import urlencode
from django_facebook import settings as facebook_settings
from open_facebook import exceptions as facebook_exceptions
from open_facebook.utils import json, encode_params, send_warning, memoized, \
stop_statsd, start_statsd
import logging
from django_facebook.utils import to_int
import ssl
import re
try:
# python 2 imports
from urlparse import urlparse
from urllib2 import build_opener, HTTPError, URLError
except ImportError:
# python 3 imports
from urllib.error import HTTPError, URLError
from urllib.parse import urlparse
from urllib.request import build_opener
logger = logging.getLogger(__name__)
# base timeout, actual timeout will increase when requests fail
REQUEST_TIMEOUT = 10
# two retries was too little, sometimes facebook is a bit flaky
REQUEST_ATTEMPTS = 3
[docs]class FacebookConnection(object):
'''
Shared utility class implementing the parsing
of Facebook API responses
'''
api_url = 'https://graph.facebook.com/'
# this older url is still used for fql requests
old_api_url = 'https://api.facebook.com/method/'
[docs] @classmethod
def request(cls, path='', post_data=None, old_api=False, **params):
'''
Main function for sending the request to facebook
**Example**::
FacebookConnection.request('me')
:param path:
The path to request, examples: /me/friends/, /me/likes/
:param post_data:
A dictionary of data to post
:param parms:
The get params to include
'''
api_base_url = cls.old_api_url if old_api else cls.api_url
if getattr(cls, 'access_token', None):
params['access_token'] = cls.access_token
url = '%s%s?%s' % (api_base_url, path, urlencode(params))
response = cls._request(url, post_data)
return response
@classmethod
def _request(cls, url, post_data=None, timeout=REQUEST_TIMEOUT,
attempts=REQUEST_ATTEMPTS):
# change fb__explicitly_shared to fb:explicitly_shared
if post_data:
post_data = dict(
(k.replace('__', ':'), v) for k, v in post_data.items())
logger.info('requesting url %s with post data %s', url, post_data)
post_request = (post_data is not None or 'method=post' in url)
if post_request and facebook_settings.FACEBOOK_READ_ONLY:
logger.info('running in readonly mode')
response = dict(id=123456789, setting_read_only=True)
return response
# nicely identify ourselves before sending the request
opener = build_opener()
opener.addheaders = [('User-agent', 'Open Facebook Python')]
# get the statsd path to track response times with
path = urlparse(url).path
statsd_path = path.replace('.', '_')
# give it a few shots, connection is buggy at times
timeout_mp = 0
while attempts:
# gradually increase the timeout upon failure
timeout_mp += 1
extended_timeout = timeout * timeout_mp
response_file = None
encoded_params = encode_params(post_data) if post_data else None
post_string = (urlencode(encoded_params)
if post_data else None)
try:
start_statsd('facebook.%s' % statsd_path)
try:
response_file = opener.open(
url, post_string, timeout=extended_timeout)
response = response_file.read().decode('utf8')
except (HTTPError,) as e:
response_file = e
response = response_file.read().decode('utf8')
# Facebook sents error codes for many of their flows
# we still want the json to allow for proper handling
msg_format = 'FB request, error type %s, code %s'
logger.warn(msg_format, type(e), getattr(e, 'code', None))
# detect if its a server or application error
server_error = cls.is_server_error(e, response)
if server_error:
# trigger a retry
raise URLError(
'Facebook is down %s' % response)
break
except (HTTPError, URLError, ssl.SSLError) as e:
# These are often temporary errors, so we will retry before
# failing
error_format = 'Facebook encountered a timeout (%ss) or error %s'
logger.warn(error_format, extended_timeout, str(e))
attempts -= 1
if not attempts:
# if we have no more attempts actually raise the error
error_instance = facebook_exceptions.convert_unreachable_exception(
e)
error_msg = 'Facebook request failed after several retries, raising error %s'
logger.warn(error_msg, error_instance)
raise error_instance
finally:
if response_file:
response_file.close()
stop_statsd('facebook.%s' % statsd_path)
# Faceboook response is either
# Valid json
# A string which is a querydict (a=b&c=d...etc)
# A html page stating FB is having trouble (but that shouldnt reach
# this part of the code)
try:
parsed_response = json.loads(response)
logger.info('facebook send response %s' % parsed_response)
except Exception as e:
# using exception because we need to support multiple json libs :S
parsed_response = QueryDict(response, True)
logger.info('facebook send response %s' % parsed_response)
if parsed_response and isinstance(parsed_response, dict):
# of course we have two different syntaxes
if parsed_response.get('error'):
cls.raise_error(parsed_response['error']['type'],
parsed_response['error']['message'],
parsed_response['error'].get('code'))
elif parsed_response.get('error_code'):
cls.raise_error(parsed_response['error_code'],
parsed_response['error_msg'])
return parsed_response
[docs] @classmethod
def is_server_error(cls, e, response):
'''
Checks an HTTPError to see if Facebook is down or we are using the
API in the wrong way
Facebook doesn't clearly distinquish between the two, so this is a bit
of a hack
'''
from open_facebook.utils import is_json
server_error = False
if hasattr(e, 'code') and e.code == 500:
server_error = True
# Facebook status codes are used for application logic
# http://fbdevwiki.com/wiki/Error_codes#User_Permission_Errors
# The only way I know to detect an actual server error is to check if
# it looks like their error page
# TODO: think of a better solution....
error_matchers = [
'<title>Facebook | Error</title>',
'Sorry, something went wrong.'
]
is_error_page = all(
[matcher in response for matcher in error_matchers])
if is_error_page:
server_error = True
# if it looks like json, facebook is probably not down
if is_json(response):
server_error = False
return server_error
[docs] @classmethod
def raise_error(cls, error_type, message, error_code=None):
'''
Lookup the best error class for the error and raise it
**Example**::
FacebookConnection.raise_error(10, 'OAuthException')
:param error_type:
the error type from the facebook api call
:param message:
the error message from the facebook api call
:param error_code:
optionally the error code which facebook send
'''
default_error_class = facebook_exceptions.OpenFacebookException
# get the error code
error_code = error_code or cls.get_code_from_message(message)
# also see http://fbdevwiki.com/wiki/Error_codes#User_Permission_Errors
logger.info('Trying to match error code %s to error class', error_code)
# lookup by error code takes precedence
error_class = cls.match_error_code(error_code)
# try to get error class by direct lookup
if not error_class:
if not isinstance(error_type, int):
error_class = getattr(facebook_exceptions, error_type, None)
if error_class and not issubclass(error_class, default_error_class):
error_class = None
# hack for missing parameters
if 'Missing' in message and 'parameter' in message:
error_class = facebook_exceptions.MissingParameter
# hack for Unsupported delete request
if 'Unsupported delete request' in message:
error_class = facebook_exceptions.UnsupportedDeleteRequest
# fallback to the default
if not error_class:
error_class = default_error_class
logger.info('Matched error to class %s', error_class)
error_message = message
if error_code:
# this is handy when adding new exceptions for facebook errors
error_message = u'%s (error code %s)' % (message, error_code)
raise error_class(error_message)
@classmethod
def get_code_from_message(cls, message):
# map error classes to facebook error codes
# find the error code
error_code = None
error_code_re = re.compile('\(#(\d+)\)')
matches = error_code_re.match(message)
matching_groups = matches.groups() if matches else None
if matching_groups:
error_code = to_int(matching_groups[0]) or None
return error_code
@classmethod
def get_sorted_exceptions(cls):
from open_facebook.exceptions import get_exception_classes
exception_classes = get_exception_classes()
exception_classes.sort(key=lambda e: e.range())
return exception_classes
[docs] @classmethod
def match_error_code(cls, error_code):
'''
Return the right exception class for the error code
'''
exception_classes = cls.get_sorted_exceptions()
error_class = None
for class_ in exception_classes:
codes_list = class_.codes_list()
# match the error class
matching_error_class = None
for code in codes_list:
if isinstance(code, tuple):
start, stop = code
if error_code and start <= error_code <= stop:
matching_error_class = class_
logger.info('Matched error on code %s', code)
elif isinstance(code, (int, six.integer_types)):
if int(code) == error_code:
matching_error_class = class_
logger.info('Matched error on code %s', code)
else:
raise(
ValueError, 'Dont know how to handle %s of '
'type %s' % (code, type(code)))
# tell about the happy news if we found something
if matching_error_class:
error_class = matching_error_class
break
return error_class
[docs]class FacebookAuthorization(FacebookConnection):
'''
Methods for getting us an access token
There are several flows we must support
* js authentication flow (signed cookie)
* facebook app authentication flow (signed cookie)
* facebook oauth redirect (code param in url)
These 3 options need to be converted to an access token
Also handles several testing scenarios
* get app access token
* create test user
* get_or_create_test_user
'''
[docs] @classmethod
def convert_code(cls, code,
redirect_uri='http://local.mellowmorning.com:8000/facebook/connect/'):
'''
Turns a code into an access token
**Example**::
FacebookAuthorization.convert_code(code)
:param code:
The code to convert
:param redirect_uri:
The redirect uri with which the code was requested
:returns: dict
'''
kwargs = cls._client_info()
kwargs['code'] = code
kwargs['redirect_uri'] = redirect_uri
response = cls.request('oauth/access_token', **kwargs)
return response
[docs] @classmethod
def extend_access_token(cls, access_token):
'''
https://developers.facebook.com/roadmap/offline-access-removal/
We can extend the token only once per day
Normal short lived tokens last 1-2 hours
Long lived tokens (given by extending) last 60 days
**Example**::
FacebookAuthorization.extend_access_token(access_token)
:param access_token:
The access_token to extend
:returns: dict
'''
kwargs = cls._client_info()
kwargs['grant_type'] = 'fb_exchange_token'
kwargs['fb_exchange_token'] = access_token
response = cls.request('oauth/access_token', **kwargs)
return response
@classmethod
def _client_info(cls):
kwargs = dict(client_id=facebook_settings.FACEBOOK_APP_ID)
kwargs['client_secret'] = facebook_settings.FACEBOOK_APP_SECRET
return kwargs
[docs] @classmethod
def parse_signed_data(cls, signed_request,
secret=facebook_settings.FACEBOOK_APP_SECRET):
'''
Thanks to
http://stackoverflow.com/questions/3302946/how-to-base64-url-decode-in-python
and
http://sunilarora.org/parsing-signedrequest-parameter-in-python-bas
'''
from open_facebook.utils import base64_url_decode_php_style, smart_str
l = signed_request.split('.', 2)
encoded_sig = l[0]
payload = l[1]
from open_facebook.utils import json
sig = base64_url_decode_php_style(encoded_sig)
import hmac
import hashlib
data = json.loads(base64_url_decode_php_style(payload).decode('utf-8'))
algo = data.get('algorithm').upper()
if algo != 'HMAC-SHA256':
error_format = 'Unknown algorithm we only support HMAC-SHA256 user asked for %s'
error_message = error_format % algo
send_warning(error_message)
logger.error('Unknown algorithm')
return None
else:
expected_sig = hmac.new(smart_str(secret), msg=smart_str(payload),
digestmod=hashlib.sha256).digest()
if not sig == expected_sig:
error_format = 'Signature %s didnt match the expected signature %s'
error_message = error_format % (sig, expected_sig)
send_warning(error_message)
return None
else:
logger.debug('valid signed request received..')
return data
[docs] @classmethod
def get_app_access_token(cls):
'''
Get the access_token for the app that can be used for
insights and creating test users
application_id = retrieved from the developer page
application_secret = retrieved from the developer page
returns the application access_token
'''
kwargs = {
'grant_type': 'client_credentials',
'client_id': facebook_settings.FACEBOOK_APP_ID,
'client_secret': facebook_settings.FACEBOOK_APP_SECRET,
}
response = cls.request('oauth/access_token', **kwargs)
return response['access_token']
@memoized
@classmethod
def get_cached_app_access_token(cls):
'''
Caches the access token in memory, good for speeding up testing
'''
app_access_token = cls.get_app_access_token()
return app_access_token
[docs] @classmethod
def create_test_user(cls, app_access_token, permissions=None, name=None):
'''
Creates a test user with the given permissions and name
:param app_access_token:
The application's access token
:param permissions:
The list of permissions to request for the test user
:param name:
Optionally specify the name
'''
if not permissions:
permissions = ['read_stream', 'publish_stream',
'user_photos,offline_access']
if isinstance(permissions, list):
permissions = ','.join(permissions)
default_name = 'Permissions %s' % permissions.replace(
',', ' ').replace('_', '')
name = name or default_name
kwargs = {
'access_token': app_access_token,
'installed': True,
'name': name,
'method': 'post',
'permissions': permissions,
}
path = '%s/accounts/test-users' % facebook_settings.FACEBOOK_APP_ID
# add the test user data to the test user data class
test_user_data = cls.request(path, **kwargs)
test_user_data['name'] = name
test_user = TestUser(test_user_data)
return test_user
[docs] @classmethod
def get_or_create_test_user(cls, app_access_token, name=None, permissions=None, force_create=False):
'''
There is no supported way of get or creating a test user
However
- creating a test user takes around 5s
- you an only create 500 test users
So this slows your testing flow quite a bit.
This method checks your test users
Queries their names (stores the permissions in the name)
'''
if not permissions:
permissions = ['read_stream', 'publish_stream', 'publish_actions',
'user_photos,offline_access']
if isinstance(permissions, list):
permissions = ','.join(permissions)
# hacking the permissions into the name of the test user
default_name = 'Permissions %s' % permissions.replace(
',', ' ').replace('_', '')
name = name or default_name
# retrieve all test users
test_users = cls.get_test_users(app_access_token)
user_id_dict = dict([(int(u['id']), u) for u in test_users])
user_ids = map(str, user_id_dict.keys())
# use fql to figure out their names
facebook = OpenFacebook(app_access_token)
users = facebook.fql('SELECT uid, name FROM user WHERE uid in (%s)' %
','.join(user_ids))
users_dict = dict([(u['name'], u['uid']) for u in users])
user_id = users_dict.get(name)
if force_create and user_id:
# we need the users access_token, the app access token doesn't
# always work, seems to be a bug in the Facebook api
test_user_data = user_id_dict[user_id]
cls.delete_test_user(test_user_data['access_token'], user_id)
user_id = None
if user_id:
# we found our user, extend the data a bit
test_user_data = user_id_dict[user_id]
test_user_data['name'] = name
test_user = TestUser(test_user_data)
else:
# create the user
test_user = cls.create_test_user(
app_access_token, permissions, name)
return test_user
@classmethod
def get_test_users(cls, app_access_token):
kwargs = dict(access_token=app_access_token)
path = '%s/accounts/test-users' % facebook_settings.FACEBOOK_APP_ID
# retrieve all test users
response = cls.request(path, **kwargs)
test_users = response['data']
return test_users
@classmethod
def delete_test_user(cls, app_access_token, test_user_id):
kwargs = dict(access_token=app_access_token, method='delete')
path = '%s/' % test_user_id
# retrieve all test users
response = cls.request(path, **kwargs)
return response
@classmethod
def delete_test_users(cls, app_access_token):
# retrieve all test users
test_users = cls.get_test_users(app_access_token)
test_user_ids = [u['id'] for u in test_users]
for test_user_id in test_user_ids:
cls.delete_test_user(app_access_token, test_user_id)
[docs]class OpenFacebook(FacebookConnection):
'''
The main api class, initialize using
**Example**::
graph = OpenFacebook(access_token)
print(graph.get('me'))
'''
def __init__(self, access_token=None, prefetched_data=None,
expires=None, current_user_id=None, version=None):
'''
:param access_token:
The facebook Access token
'''
self.access_token = access_token
# extra data coming from signed cookies
self.prefetched_data = prefetched_data
# store to enable detection for offline usage
self.expires = expires
# hook to store the current user id if representing the
# facebook connection to a logged in user :)
self.current_user_id = current_user_id
if version is None:
version = 'v1.0'
self.version = version
def __getstate__(self):
'''
Turns the object into something easy to serialize
'''
state = dict(
access_token=self.access_token,
prefetched_data=self.prefetched_data,
expires=self.expires,
)
return state
def __setstate__(self, state):
'''
Restores the object from the state dict
'''
self.access_token = state['access_token']
self.prefetched_data = state['prefetched_data']
self.expires = state['expires']
[docs] def is_authenticated(self):
'''
Ask facebook if we have access to the users data
:returns: bool
'''
try:
me = self.me()
except facebook_exceptions.OpenFacebookException as e:
if isinstance(e, facebook_exceptions.OAuthException):
raise
me = None
authenticated = bool(me)
return authenticated
[docs] def get(self, path, version=None, **kwargs):
'''
Make a Facebook API call
**Example**::
open_facebook.get('me')
open_facebook.get('me', fields='id,name')
:param path:
The path to use for making the API call
:returns: dict
'''
version = version or self.version
kwargs['version'] = version
response = self.request(path, **kwargs)
return response
[docs] def get_many(self, *ids, **kwargs):
'''
Make a batched Facebook API call
For multiple ids
**Example**::
open_facebook.get_many('me', 'starbucks')
open_facebook.get_many('me', 'starbucks', fields='id,name')
:param path:
The path to use for making the API call
:returns: dict
'''
kwargs['ids'] = ','.join(ids)
return self.request(**kwargs)
[docs] def set(self, path, params=None, version=None, **post_data):
'''
Write data to facebook
**Example**::
open_facebook.set('me/feed', message='testing open facebook')
:param path:
The path to use for making the API call
:param params:
A dictionary of get params
:param post_data:
The kwargs for posting to facebook
:returns: dict
'''
version = version or self.version
assert self.access_token, 'Write operations require an access token'
if not params:
params = {}
params['method'] = 'post'
params['version'] = version
response = self.request(path, post_data=post_data, **params)
return response
[docs] def delete(self, path, *args, **kwargs):
'''
Delete the given bit of data
**Example**::
graph.delete(12345)
:param path:
the id of the element to remove
'''
kwargs['method'] = 'delete'
self.request(path, *args, **kwargs)
[docs] def fql(self, query, **kwargs):
'''
Runs the specified query against the Facebook FQL API.
**Example**::
open_facebook.fql('SELECT name FROM user WHERE uid = me()')
:param query:
The query to execute
:param kwargs:
Extra options to send to facebook
:returns: dict
'''
kwargs['q'] = query
path = 'fql'
response = self.request(path, **kwargs)
# return only the data for backward compatability
return response['data']
[docs] def batch_fql(self, queries_dict):
'''
queries_dict a dict with the required queries
returns the query results in:
**Example**::
response = facebook.batch_fql({
name: 'SELECT uid, name, pic_square FROM user WHERE uid = me()',
rsvp: 'SELECT uid, rsvp_status FROM event_member WHERE eid=12345678',
})
# accessing the results
response['fql_results']['name']
response['fql_results']['rsvp']
:param queries_dict:
A dictiontary of queries to execute
:returns: dict
'''
query = json.dumps(queries_dict)
query_results = self.fql(query)
named_results = dict(
[(r['name'], r['fql_result_set']) for r in query_results])
return named_results
[docs] def me(self):
'''
Cached method of requesting information about me
'''
me = getattr(self, '_me', None)
if me is None:
# self._me = me = self.get('me')
self._me = me = self.get('me', fields="id,name,email,verified")
return me
[docs] def permissions(self):
'''
Shortcut for self.get('me/permissions') with some extra parsing
to turn it into a dictionary of booleans
:returns: dict
'''
permissions_dict = {}
try:
permissions = {}
permissions_response = self.get('me/permissions')
# determine whether we're dealing with 1.0 or 2.0+
for permission in permissions_response.get('data', []):
# graph api 2.0+, returns multiple dicts with keys 'status' and
# 'permission'
if any(value in ['granted', 'declined'] for value in permission.values()):
for perm in permissions_response['data']:
grant = perm.get('status') == 'granted'
name = perm.get('permission')
# just in case something goes sideways
if grant and name:
permissions_dict[name] = grant
# graph api 1.0, returns single dict as {permission: intval}
elif any(value in [0, 1, '0', '1'] for value in permission.values()):
permissions = permissions_response['data'][0]
permissions_dict = dict([(k, bool(int(v)))
for k, v in permissions.items()
if v == '1' or v == 1])
break
except facebook_exceptions.OAuthException:
pass
return permissions_dict
[docs] def has_permissions(self, required_permissions):
'''
Validate if all the required_permissions are currently given
by the user
**Example**::
open_facebook.has_permissions(['publish_actions','read_stream'])
:param required_permissions:
A list of required permissions
:returns: bool
'''
permissions_dict = self.permissions()
# see if we have all permissions
has_permissions = True
for permission in required_permissions:
if permission not in permissions_dict:
has_permissions = False
return has_permissions
[docs] def my_image_url(self, size='large'):
'''
Returns the image url from your profile
Shortcut for me/picture
:param size:
the type of the image to request, see facebook for available formats
:returns: string
'''
query_dict = QueryDict('', True)
query_dict['type'] = size
query_dict['access_token'] = self.access_token
url = '%sme/picture?%s' % (self.api_url, query_dict.urlencode())
return url
def request(self, path='', post_data=None, old_api=False, version=None, **params):
url = self.get_request_url(path=path, old_api=old_api, version=version,
**params)
logger.info('requesting url %s', url)
response = self._request(url, post_data)
return response
[docs] def get_request_url(self, path='', old_api=False, version=None, **params):
'''
Gets the url for the request.
'''
api_base_url = self.old_api_url if old_api else self.api_url
version = version or self.version
if getattr(self, 'access_token', None):
params['access_token'] = self.access_token
if api_base_url.endswith('/'):
api_base_url = api_base_url[:-1]
if path and path.startswith('/'):
path = path[1:]
if path == 'me':
params['fields'] = 'email,first_name,last_name,name,cover,picture'
url = '/'.join([api_base_url, version, path])
return '%s?%s' % (url, urlencode(params))
class TestUser(object):
'''
Simple wrapper around test users
'''
def __init__(self, data):
self.name = data['name']
self.id = data['id']
self.access_token = data['access_token']
self.data = data
def graph(self):
graph = OpenFacebook(self.access_token)
return graph
def __repr__(self):
return 'Test user %s' % self.name