aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbmedx <bmesick@edx.org>2017-09-13 11:31:37 -0400
committerbmedx <bmesick@edx.org>2017-09-13 11:31:37 -0400
commit8b210641f9f75fc923e531bc093687ab18901f34 (patch)
tree2a0d9a30513732177a9a2ca467866892ea28bf71
parent86b822c24fe16e62bfb85e23ee54fca0bd7cd748 (diff)
Update repo from upstream revision 128
- Previous pull was revision 115 - Diff of versions: http://bazaar.launchpad.net/~ubuntuone-pqm-team/django-openid-auth/trunk/revision/128?remember=115&compare_revid=115 - Should get us Py3 and Django up to 1.10 support
-rw-r--r--.gitignore56
-rw-r--r--django_openid_auth/__init__.py3
-rw-r--r--django_openid_auth/admin.py10
-rw-r--r--django_openid_auth/auth.py124
-rw-r--r--django_openid_auth/forms.py12
-rw-r--r--django_openid_auth/signals.py1
-rw-r--r--django_openid_auth/store.py12
-rw-r--r--django_openid_auth/teams.py7
-rw-r--r--django_openid_auth/templates/openid/login.html1
-rw-r--r--django_openid_auth/tests/models.py9
-rw-r--r--django_openid_auth/tests/test_auth.py949
-rw-r--r--django_openid_auth/tests/test_store.py9
-rw-r--r--django_openid_auth/tests/test_views.py162
-rw-r--r--django_openid_auth/tests/urls.py11
-rw-r--r--django_openid_auth/urls.py18
-rw-r--r--django_openid_auth/views.py53
-rw-r--r--example_consumer/settings.py18
-rw-r--r--example_consumer/urls.py12
-rw-r--r--setup.py17
-rw-r--r--tox.ini48
20 files changed, 1234 insertions, 298 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..4ab0ba0
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,56 @@
+# Created by .ignore support plugin (hsz.mobi)
+### Python template
+# Byte-compiled / optimized / DLL files
+__pycache__/
+*.py[cod]
+*$py.class
+
+# Distribution / packaging
+.Python
+env/
+build/
+develop-eggs/
+dist/
+downloads/
+eggs/
+.eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+wheels/
+*.egg-info/
+.installed.cfg
+*.egg
+
+# Installer logs
+pip-log.txt
+pip-delete-this-directory.txt
+
+# Unit test / coverage reports
+htmlcov/
+.tox/
+.coverage
+.coverage.*
+.cache
+nosetests.xml
+coverage.xml
+*,cover
+.hypothesis/
+
+# Django stuff:
+*.log
+local_settings.py
+
+# dotenv
+.env
+
+# virtualenv
+.venv
+venv/
+ENV/
+
+.gitignore
+.idea/
+django_openid_auth.egg-info/
diff --git a/django_openid_auth/__init__.py b/django_openid_auth/__init__.py
index ca9566c..a51a8ca 100644
--- a/django_openid_auth/__init__.py
+++ b/django_openid_auth/__init__.py
@@ -26,3 +26,6 @@
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
+import sys
+
+PY3 = sys.version_info.major >= 3
diff --git a/django_openid_auth/admin.py b/django_openid_auth/admin.py
index e837d96..7babc49 100644
--- a/django_openid_auth/admin.py
+++ b/django_openid_auth/admin.py
@@ -29,8 +29,11 @@
from __future__ import unicode_literals
-from urllib import urlencode
-from urlparse import parse_qsl, urlparse
+try:
+ from urllib.parse import parse_qsl, urlencode, urlparse
+except ImportError:
+ from urllib import urlencode
+ from urlparse import parse_qsl, urlparse
from django.conf import settings
from django.contrib import admin
@@ -50,6 +53,7 @@ class NonceAdmin(admin.ModelAdmin):
self.message_user(request, "%d expired nonces removed" % count)
cleanup_nonces.short_description = "Clean up expired nonces"
+
admin.site.register(Nonce, NonceAdmin)
@@ -65,6 +69,7 @@ class AssociationAdmin(admin.ModelAdmin):
self.message_user(request, "%d expired associations removed" % count)
cleanup_associations.short_description = "Clean up expired associations"
+
admin.site.register(Association, AssociationAdmin)
@@ -73,6 +78,7 @@ class UserOpenIDAdmin(admin.ModelAdmin):
list_display = ('user', 'claimed_id')
search_fields = ('claimed_id',)
+
admin.site.register(UserOpenID, UserOpenIDAdmin)
diff --git a/django_openid_auth/auth.py b/django_openid_auth/auth.py
index 6a572b5..1c01a7e 100644
--- a/django_openid_auth/auth.py
+++ b/django_openid_auth/auth.py
@@ -30,13 +30,12 @@
from __future__ import unicode_literals
-__metaclass__ = type
-
import re
from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.auth.models import Group, Permission
+from django.core.exceptions import ImproperlyConfigured
from openid.consumer.consumer import SUCCESS
from openid.extensions import ax, sreg, pape
@@ -49,12 +48,51 @@ from django_openid_auth.exceptions import (
MissingPhysicalMultiFactor,
RequiredAttributeNotReturned,
)
+from django_openid_auth.signals import openid_duplicate_username
User = get_user_model()
-class OpenIDBackend:
+def get_user_group_model():
+ """Returns the model used for mapping users to groups."""
+ user_group_model_name = getattr(settings, 'AUTH_USER_GROUP_MODEL', None)
+ if user_group_model_name is None:
+ return User.groups.through
+ else:
+ try:
+ # django.apps available starting from django 1.7
+ from django.apps import apps
+ get_model = apps.get_model
+ args = (user_group_model_name,)
+ except ImportError:
+ # if we can't import, then it must be django 1.6, still using
+ # the old django.db.models.loading code
+ from django.db.models.loading import get_model
+ app_label, model_name = user_group_model_name.split('.', 1)
+ args = (app_label, model_name)
+ try:
+ model = get_model(*args)
+ if model is None:
+ # in django 1.6 referring to a non-installed app will
+ # return None for get_model, but in 1.7 onwards it will
+ # raise a LookupError exception.
+ raise LookupError()
+ return model
+ except ValueError:
+ raise ImproperlyConfigured(
+ "AUTH_USER_GROUP_MODEL must be of the form "
+ "'app_label.model_name'")
+ except LookupError:
+ raise ImproperlyConfigured(
+ "AUTH_USER_GROUP_MODEL refers to model '%s' that has not been "
+ "installed" % user_group_model_name)
+
+
+UserGroup = get_user_group_model()
+
+
+class OpenIDBackend(object):
"""A django.contrib.auth backend that authenticates the user based on
an OpenID response."""
@@ -116,8 +154,9 @@ class OpenIDBackend:
teams_mapping = self.get_teams_mapping()
groups_required = [group for team, group in teams_mapping.items()
if team in teams_required]
+ user_groups = UserGroup.objects.filter(user=user)
matches = set(groups_required).intersection(
- user.groups.values_list('name', flat=True))
+ user_groups.values_list('group__name', flat=True))
if not matches:
name = 'OPENID_EMAIL_WHITELIST_REGEXP_LIST'
whitelist_regexp_list = getattr(settings, name, [])
@@ -194,28 +233,19 @@ class OpenIDBackend:
return suggestion
return 'openiduser'
- def _get_available_username(self, nickname, identity_url):
- # If we're being strict about usernames, throw an error if we didn't
- # get one back from the provider
- if getattr(settings, 'OPENID_STRICT_USERNAMES', False):
- if nickname is None or nickname == '':
- raise MissingUsernameViolation()
-
+ def _get_available_username_for_nickname(self, nickname, identity_url):
# If we don't have a nickname, and we're not being strict, use a
# default
nickname = nickname or 'openiduser'
# See if we already have this nickname assigned to a username
- try:
- User.objects.get(username__exact=nickname)
- except User.DoesNotExist:
- # No conflict, we can use this nickname
+ if not User.objects.filter(username=nickname).exists():
return nickname
# Check if we already have nickname+i for this identity_url
try:
user_openid = UserOpenID.objects.get(
- claimed_id__exact=identity_url,
+ claimed_id=identity_url,
user__username__startswith=nickname)
# No exception means we have an existing user for this identity
# that starts with this nickname.
@@ -239,28 +269,48 @@ class OpenIDBackend:
# No user associated with this identity_url
pass
- if getattr(settings, 'OPENID_STRICT_USERNAMES', False):
- if User.objects.filter(username__exact=nickname).count() > 0:
- raise DuplicateUsernameViolation(
- "The username (%s) with which you tried to log in is "
- "already in use for a different account." % nickname)
-
# Pick a username for the user based on their nickname,
# checking for conflicts. Start with number of existing users who's
# username starts with this nickname to avoid having to iterate over
# all of the existing ones.
i = User.objects.filter(username__startswith=nickname).count() + 1
- while True:
- username = nickname
- if i > 1:
- username += str(i)
- try:
- User.objects.get(username__exact=username)
- except User.DoesNotExist:
- break
+ username = nickname
+ while User.objects.filter(username=username).exists():
+ username = nickname + str(i)
i += 1
+
return username
+ def _ensure_available_username(self, nickname, identity_url):
+ if not nickname:
+ raise MissingUsernameViolation()
+
+ # As long as the `QuerySet` does not get evaluated, no
+ # caching should be involved in our multiple `exists()`
+ # calls. See docs for details: http://bit.ly/2aYCmkw
+ user_with_same_username = User.objects.exclude(
+ useropenid__claimed_id=identity_url
+ ).filter(username=nickname)
+
+ if user_with_same_username.exists():
+ # Notify any listeners that a duplicated username was
+ # found and give the opportunity to handle conflict.
+ openid_duplicate_username.send(sender=User, username=nickname)
+
+ # Check for conflicts again as the signal could have handled it.
+ if user_with_same_username.exists():
+ raise DuplicateUsernameViolation(
+ "The username (%s) with which you tried to log in is "
+ "already in use for a different account." % nickname)
+
+ def _get_available_username(self, nickname, identity_url):
+ if getattr(settings, 'OPENID_STRICT_USERNAMES', False):
+ self._ensure_available_username(nickname, identity_url)
+ else:
+ nickname = self._get_available_username_for_nickname(
+ nickname, identity_url)
+ return nickname
+
def create_user_from_openid(self, openid_response):
details = self._extract_user_details(openid_response)
required_attrs = getattr(settings, 'OPENID_SREG_REQUIRED_FIELDS', [])
@@ -357,13 +407,17 @@ class OpenIDBackend:
mapping = [
teams_mapping[lp_team] for lp_team in teams_response.is_member
if lp_team in teams_mapping]
+ user_groups = UserGroup.objects.filter(user=user)
+ matching_groups = user_groups.filter(
+ group__name__in=teams_mapping.values())
current_groups = set(
- user.groups.filter(name__in=teams_mapping.values()))
+ user_group.group for user_group in matching_groups)
desired_groups = set(Group.objects.filter(name__in=mapping))
- for group in current_groups - desired_groups:
- user.groups.remove(group)
- for group in desired_groups - current_groups:
- user.groups.add(group)
+ groups_to_remove = current_groups - desired_groups
+ groups_to_add = desired_groups - current_groups
+ user_groups.filter(group__in=groups_to_remove).delete()
+ for group in groups_to_add:
+ UserGroup.objects.create(user=user, group=group)
def update_staff_status_from_teams(self, user, teams_response):
if not hasattr(settings, 'OPENID_LAUNCHPAD_STAFF_TEAMS'):
diff --git a/django_openid_auth/forms.py b/django_openid_auth/forms.py
index 88bbc50..9f62257 100644
--- a/django_openid_auth/forms.py
+++ b/django_openid_auth/forms.py
@@ -38,6 +38,8 @@ from django.conf import settings
from openid.yadis import xri
+from django_openid_auth import PY3
+
def teams_new_unicode(self):
"""
@@ -52,8 +54,13 @@ def teams_new_unicode(self):
else:
return name
-Group.unicode_before_teams = Group.__unicode__
-Group.__unicode__ = teams_new_unicode
+
+if PY3:
+ Group.unicode_before_teams = Group.__str__
+ Group.__str__ = teams_new_unicode
+else:
+ Group.unicode_before_teams = Group.__unicode__
+ Group.__unicode__ = teams_new_unicode
class UserChangeFormWithTeamRestriction(UserChangeForm):
@@ -72,6 +79,7 @@ class UserChangeFormWithTeamRestriction(UserChangeForm):
"You cannot assign it manually." % group.name)
return data
+
UserAdmin.form = UserChangeFormWithTeamRestriction
diff --git a/django_openid_auth/signals.py b/django_openid_auth/signals.py
index e3b3d1d..65f24a3 100644
--- a/django_openid_auth/signals.py
+++ b/django_openid_auth/signals.py
@@ -34,3 +34,4 @@ import django.dispatch
openid_login_complete = django.dispatch.Signal(providing_args=[
'request', 'openid_response'])
+openid_duplicate_username = django.dispatch.Signal(providing_args=['username'])
diff --git a/django_openid_auth/store.py b/django_openid_auth/store.py
index 3c1c3e8..1c7de97 100644
--- a/django_openid_auth/store.py
+++ b/django_openid_auth/store.py
@@ -36,6 +36,7 @@ from openid.association import Association as OIDAssociation
from openid.store.interface import OpenIDStore
from openid.store.nonce import SKEW
+from django_openid_auth import PY3
from django_openid_auth.models import Association, Nonce
@@ -75,10 +76,15 @@ class DjangoOpenIDStore(OpenIDStore):
expired = []
for assoc in assocs:
association = OIDAssociation(
- assoc.handle, base64.decodestring(assoc.secret), assoc.issued,
- assoc.lifetime, assoc.assoc_type
+ assoc.handle,
+ base64.decodestring(assoc.secret.encode('utf-8')),
+ assoc.issued, assoc.lifetime, assoc.assoc_type
)
- if association.getExpiresIn() == 0:
+ if PY3:
+ expires_in = association.expiresIn
+ else:
+ expires_in = association.getExpiresIn()
+ if expires_in == 0:
expired.append(assoc)
else:
associations.append((association.issued, association))
diff --git a/django_openid_auth/teams.py b/django_openid_auth/teams.py
index ed84fe3..6e5e984 100644
--- a/django_openid_auth/teams.py
+++ b/django_openid_auth/teams.py
@@ -72,6 +72,7 @@ from openid.message import (
registerNamespaceAlias,
NamespaceAliasRegistrationError,
)
+from six import string_types
__all__ = [
'TeamsRequest',
@@ -84,7 +85,7 @@ ns_uri = 'http://ns.launchpad.net/2007/openid-teams'
try:
registerNamespaceAlias(ns_uri, 'lp')
-except NamespaceAliasRegistrationError, e:
+except NamespaceAliasRegistrationError as e:
oidutil.log(
'registerNamespaceAlias(%r, %r) failed: %s' % (ns_uri, 'lp', str(e)))
@@ -139,7 +140,7 @@ def getTeamsNS(message):
# There is no alias, so try to add one. (OpenID version 1)
try:
message.namespaces.addAlias(ns_uri, 'lp')
- except KeyError, why:
+ except KeyError as why:
# An alias for the string 'lp' already exists, but it's
# defined for something other than Launchpad teams
raise TeamsNamespaceError(why[0])
@@ -287,7 +288,7 @@ class TeamsRequest(Extension):
@raise ValueError: when a team requested is not a string
or strict is set and a team was requested more than once
"""
- if isinstance(query_membership, basestring):
+ if isinstance(query_membership, string_types):
raise TypeError('Teams should be passed as a list of '
'strings (not %r)' % (type(query_membership),))
diff --git a/django_openid_auth/templates/openid/login.html b/django_openid_auth/templates/openid/login.html
index 79bc08a..7f00c6b 100644
--- a/django_openid_auth/templates/openid/login.html
+++ b/django_openid_auth/templates/openid/login.html
@@ -1,5 +1,4 @@
{% load i18n %}
-{% load url from future %}
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/html4/strict.dtd">
<html>
<head>
diff --git a/django_openid_auth/tests/models.py b/django_openid_auth/tests/models.py
new file mode 100644
index 0000000..bdb6c0c
--- /dev/null
+++ b/django_openid_auth/tests/models.py
@@ -0,0 +1,9 @@
+from django.conf import settings
+from django.contrib.auth.models import Group
+from django.db import models
+
+
+class UserGroup(models.Model):
+
+ user = models.ForeignKey(settings.AUTH_USER_MODEL)
+ group = models.ForeignKey(Group)
diff --git a/django_openid_auth/tests/test_auth.py b/django_openid_auth/tests/test_auth.py
index b37bedd..1d12509 100644
--- a/django_openid_auth/tests/test_auth.py
+++ b/django_openid_auth/tests/test_auth.py
@@ -28,56 +28,69 @@
from __future__ import unicode_literals
+import re
+try:
+ from urllib.parse import urljoin
+except ImportError:
+ from urlparse import urljoin
+
+from django.conf import settings
from django.contrib.auth.models import Group, Permission, User
+from django.core.exceptions import ImproperlyConfigured
from django.test import TestCase
from django.test.utils import override_settings
-
-from openid.consumer.consumer import SuccessResponse
+from openid.consumer.consumer import (
+ CancelResponse,
+ FailureResponse,
+ SetupNeededResponse,
+ SuccessResponse,
+)
from openid.consumer.discover import OpenIDServiceEndpoint
+from openid.extensions import pape
from openid.message import Message, OPENID2_NS
-from django_openid_auth.auth import OpenIDBackend
+from django_openid_auth.auth import OpenIDBackend, get_user_group_model
+from django_openid_auth.exceptions import (
+ DuplicateUsernameViolation,
+ MissingPhysicalMultiFactor,
+ MissingUsernameViolation,
+ RequiredAttributeNotReturned,
+)
from django_openid_auth.models import UserOpenID
+from django_openid_auth.signals import openid_duplicate_username
from django_openid_auth.teams import ns_uri as TEAMS_NS
from django_openid_auth.tests.helpers import override_session_serializer
+
SREG_NS = "http://openid.net/sreg/1.0"
AX_NS = "http://openid.net/srv/ax/1.0"
+SERVER_URL = 'http://example.com'
-@override_session_serializer
-@override_settings(
- OPENID_USE_EMAIL_FOR_USERNAME=False,
- OPENID_LAUNCHPAD_TEAMS_REQUIRED=[],
- OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO=False,
- OPENID_EMAIL_WHITELIST_REGEXP_LIST=[])
-class OpenIDBackendTests(TestCase):
+def make_claimed_id(id_):
+ return urljoin(SERVER_URL, id_)
- def setUp(self):
- super(OpenIDBackendTests, self).setUp()
- self.backend = OpenIDBackend()
- def make_openid_response(self, sreg_args=None, teams_args=None):
- endpoint = OpenIDServiceEndpoint()
- endpoint.claimed_id = 'some-id'
-
- message = Message(OPENID2_NS)
- if sreg_args is not None:
- for key, value in sreg_args.items():
- message.setArg(SREG_NS, key, value)
- if teams_args is not None:
- for key, value in teams_args.items():
- message.setArg(TEAMS_NS, key, value)
- response = SuccessResponse(
- endpoint, message, signed_fields=message.toPostArgs().keys())
- return response
-
- def make_response_ax(
- self, schema="http://axschema.org/",
- fullname="Some User", nickname="someuser", email="foo@example.com",
- first=None, last=None, verified=False):
+class TestMessage(Message):
+ """Convenience class to construct test OpenID messages and responses."""
+
+ def __init__(self, openid_namespace=OPENID2_NS):
+ super(TestMessage, self).__init__(openid_namespace=openid_namespace)
endpoint = OpenIDServiceEndpoint()
- message = Message(OPENID2_NS)
+ endpoint.claimed_id = make_claimed_id('some-id')
+ endpoint.server_url = SERVER_URL
+ self.endpoint = endpoint
+
+ def set_ax_args(
+ self,
+ email="foo@example.com",
+ first=None,
+ fullname="Some User",
+ last=None,
+ nickname="someuser",
+ schema="http://axschema.org/",
+ verified=False):
+
attributes = [
("nickname", schema + "namePerson/friendly", nickname),
("fullname", schema + "namePerson", fullname),
@@ -93,53 +106,79 @@ class OpenIDBackendTests(TestCase):
attributes.append(
("last", "http://axschema.org/namePerson/last", last))
- message.setArg(AX_NS, "mode", "fetch_response")
+ self.setArg(AX_NS, "mode", "fetch_response")
for (alias, uri, value) in attributes:
- message.setArg(AX_NS, "type.%s" % alias, uri)
- message.setArg(AX_NS, "value.%s" % alias, value)
+ self.setArg(AX_NS, "type.%s" % alias, uri)
+ self.setArg(AX_NS, "value.%s" % alias, value)
+
+ def set_pape_args(self, *auth_policies):
+ self.setArg(pape.ns_uri, 'auth_policies', ' '.join(auth_policies))
+
+ def _set_args(self, ns, **kwargs):
+ for key, value in kwargs.items():
+ if value is not None:
+ self.setArg(ns, key, value)
+ elif self.hasKey(ns, key):
+ self.delArg(ns, key)
+
+ def set_sreg_args(self, **kwargs):
+ self._set_args(SREG_NS, **kwargs)
+
+ def set_team_args(self, **kwargs):
+ self._set_args(TEAMS_NS, **kwargs)
+
+ def to_response(self):
return SuccessResponse(
- endpoint, message, signed_fields=message.toPostArgs().keys())
+ self.endpoint, self, signed_fields=self.toPostArgs().keys())
+
- def make_user_openid(self, user=None,
- claimed_id='http://example.com/existing_identity',
- display_id='http://example.com/existing_identity'):
+@override_session_serializer
+@override_settings(
+ OPENID_USE_EMAIL_FOR_USERNAME=False,
+ OPENID_LAUNCHPAD_TEAMS_REQUIRED=[],
+ OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO=False,
+ OPENID_EMAIL_WHITELIST_REGEXP_LIST=[])
+class OpenIDBackendTests(TestCase):
+
+ def setUp(self):
+ super(OpenIDBackendTests, self).setUp()
+ self.backend = OpenIDBackend()
+ self.message = TestMessage()
+
+ def make_user_openid(
+ self, user=None, claimed_id=make_claimed_id('existing_identity')):
if user is None:
user = User.objects.create_user(
username='someuser', email='someuser@example.com',
password='12345678')
user_openid, created = UserOpenID.objects.get_or_create(
- user=user, claimed_id=claimed_id, display_id=display_id)
+ user=user, claimed_id=claimed_id, display_id=claimed_id)
return user_openid
- def assert_account_verified(self, user, initially_verified, verified):
- # set user's verification status
+ def _assert_account_verified(self, user, expected):
permission = Permission.objects.get(codename='account_verified')
- if initially_verified:
- user.user_permissions.add(permission)
- else:
- user.user_permissions.remove(permission)
+ perm_label = '%s.%s' % (permission.content_type.app_label,
+ permission.codename)
+ # Always invalidate the per-request perm cache
+ attrs = list(user.__dict__.keys())
+ for attr in attrs:
+ if attr.endswith('_perm_cache'):
+ delattr(user, attr)
- user = User.objects.get(pk=user.pk)
- has_perm = user.has_perm('django_openid_auth.account_verified')
- assert has_perm == initially_verified
+ self.assertEqual(user.has_perm(perm_label), expected)
- if hasattr(user, '_perm_cache'):
- del user._perm_cache
+ def assert_account_not_verified(self, user):
+ self._assert_account_verified(user, False)
- # get a response including verification status
- response = self.make_response_ax()
- data = dict(first_name=u"Some56789012345678901234567890123",
- last_name=u"User56789012345678901234567890123",
- email=u"someotheruser@example.com",
- account_verified=verified)
- self.backend.update_user_details(user, data, response)
+ def assert_account_verified(self, user):
+ self._assert_account_verified(user, True)
- # refresh object from the database
- user = User.objects.get(pk=user.pk)
- # check the verification status
- self.assertEqual(
- user.has_perm('django_openid_auth.account_verified'), verified)
+ def assert_no_users_created(self, expected_count=0):
+ current_count = User.objects.count()
+ msg = 'New users found (expected: %i, current: %i)' % (
+ expected_count, current_count)
+ self.assertEqual(current_count, expected_count, msg)
def test_extract_user_details_sreg(self):
expected = {
@@ -155,16 +194,19 @@ class OpenIDBackendTests(TestCase):
expected['last_name']),
'email': expected['email'],
}
- response = self.make_openid_response(sreg_args=data)
+ self.message.set_sreg_args(**data)
- details = self.backend._extract_user_details(response)
+ details = self.backend._extract_user_details(
+ self.message.to_response())
self.assertEqual(details, expected)
def test_extract_user_details_ax(self):
- response = self.make_response_ax(
- fullname="Some User", nickname="someuser", email="foo@example.com")
-
- data = self.backend._extract_user_details(response)
+ self.message.set_ax_args(
+ email="foo@example.com",
+ fullname="Some User",
+ nickname="someuser",
+ )
+ data = self.backend._extract_user_details(self.message.to_response())
self.assertEqual(data, {"nickname": "someuser",
"first_name": "Some",
@@ -175,10 +217,9 @@ class OpenIDBackendTests(TestCase):
def test_extract_user_details_ax_split_name(self):
# Include fullname too to show that the split data takes
# precedence.
- response = self.make_response_ax(
+ self.message.set_ax_args(
fullname="Bad Data", first="Some", last="User")
-
- data = self.backend._extract_user_details(response)
+ data = self.backend._extract_user_details(self.message.to_response())
self.assertEqual(data, {"nickname": "someuser",
"first_name": "Some",
@@ -187,11 +228,10 @@ class OpenIDBackendTests(TestCase):
"account_verified": False})
def test_extract_user_details_ax_broken_myopenid(self):
- response = self.make_response_ax(
+ self.message.set_ax_args(
schema="http://schema.openid.net/", fullname="Some User",
nickname="someuser", email="foo@example.com")
-
- data = self.backend._extract_user_details(response)
+ data = self.backend._extract_user_details(self.message.to_response())
self.assertEqual(data, {"nickname": "someuser",
"first_name": "Some",
@@ -200,7 +240,7 @@ class OpenIDBackendTests(TestCase):
"account_verified": False})
def test_update_user_details_long_names(self):
- response = self.make_response_ax()
+ self.message.set_ax_args()
user = User.objects.create_user(
'someuser', 'someuser@example.com', password=None)
user_openid, created = UserOpenID.objects.get_or_create(
@@ -212,49 +252,92 @@ class OpenIDBackendTests(TestCase):
last_name=u"User56789012345678901234567890123",
email=u"someotheruser@example.com", account_verified=False)
- self.backend.update_user_details(user, data, response)
+ self.backend.update_user_details(
+ user, data, self.message.to_response())
self.assertEqual("Some56789012345678901234567890", user.first_name)
self.assertEqual("User56789012345678901234567890", user.last_name)
+ def _test_update_user_perms_account_verified(
+ self, user, initially_verified, verified):
+ # set user's verification status
+ permission = Permission.objects.get(codename='account_verified')
+ if initially_verified:
+ user.user_permissions.add(permission)
+ else:
+ user.user_permissions.remove(permission)
+
+ if initially_verified:
+ self.assert_account_verified(user)
+ else:
+ self.assert_account_not_verified(user)
+
+ # get a response including verification status
+ self.message.set_ax_args()
+ data = dict(first_name=u"Some56789012345678901234567890123",
+ last_name=u"User56789012345678901234567890123",
+ email=u"someotheruser@example.com",
+ account_verified=verified)
+ self.backend.update_user_details(
+ user, data, self.message.to_response())
+
+ # refresh object from the database
+ user = User.objects.get(pk=user.pk)
+
+ if verified:
+ self.assert_account_verified(user)
+ else:
+ self.assert_account_not_verified(user)
+
def test_update_user_perms_initially_verified_then_verified(self):
- self.assert_account_verified(
+ self._test_update_user_perms_account_verified(
self.make_user_openid().user,
initially_verified=True, verified=True)
def test_update_user_perms_initially_verified_then_unverified(self):
- self.assert_account_verified(
+ self._test_update_user_perms_account_verified(
self.make_user_openid().user,
initially_verified=True, verified=False)
def test_update_user_perms_initially_not_verified_then_verified(self):
- self.assert_account_verified(
+ self._test_update_user_perms_account_verified(
self.make_user_openid().user,
initially_verified=False, verified=True)
def test_update_user_perms_initially_not_verified_then_unverified(self):
- self.assert_account_verified(
+ self._test_update_user_perms_account_verified(
self.make_user_openid().user,
initially_verified=False, verified=False)
def test_extract_user_details_name_with_trailing_space(self):
- response = self.make_response_ax(fullname="SomeUser ")
+ self.message.set_ax_args(fullname="SomeUser ")
- data = self.backend._extract_user_details(response)
+ data = self.backend._extract_user_details(self.message.to_response())
self.assertEqual("", data['first_name'])
self.assertEqual("SomeUser", data['last_name'])
def test_extract_user_details_name_with_thin_space(self):
- response = self.make_response_ax(fullname=u"Some\u2009User")
+ self.message.set_ax_args(fullname=u"Some\u2009User")
- data = self.backend._extract_user_details(response)
+ data = self.backend._extract_user_details(self.message.to_response())
self.assertEqual("Some", data['first_name'])
self.assertEqual("User", data['last_name'])
+ @override_settings(OPENID_CREATE_USERS=True)
+ def test_auth_username_when_no_nickname(self):
+ self.message.set_sreg_args(nickname='')
+ user = self.backend.authenticate(
+ openid_response=self.message.to_response())
+
+ self.assertIsNotNone(user)
+ self.assertEqual(
+ user.username, 'openiduser',
+ "username must default to 'openiduser'")
+
@override_settings(OPENID_USE_EMAIL_FOR_USERNAME=True)
- def test_preferred_username_email_munging(self):
+ def test_auth_username_email_munging(self):
for nick, email, expected in [
('nickcomesfirst', 'foo@example.com', 'nickcomesfirst'),
('', 'foo@example.com', 'fooexamplecom'),
@@ -262,11 +345,17 @@ class OpenIDBackendTests(TestCase):
('', '@%.-', 'openiduser'),
('', '', 'openiduser'),
(None, None, 'openiduser')]:
- self.assertEqual(
- expected,
- self.backend._get_preferred_username(nick, email))
+ self.message.set_sreg_args(nickname=nick, email=email)
+ user = self.backend.authenticate(
+ openid_response=self.message.to_response())
+ # Cleanup user for further tests
+ user.delete()
- def test_preferred_username_no_email_munging(self):
+ self.assertIsNotNone(user)
+ self.assertEqual(user.username, expected)
+
+ @override_settings(OPENID_USE_EMAIL_FOR_USERNAME=False)
+ def test_auth_username_no_email_munging(self):
for nick, email, expected in [
('nickcomesfirst', 'foo@example.com', 'nickcomesfirst'),
('', 'foo@example.com', 'openiduser'),
@@ -274,9 +363,89 @@ class OpenIDBackendTests(TestCase):
('', '@%.-', 'openiduser'),
('', '', 'openiduser'),
(None, None, 'openiduser')]:
- self.assertEqual(
- expected,
- self.backend._get_preferred_username(nick, email))
+ self.message.set_sreg_args(nickname=nick, email=email)
+ user = self.backend.authenticate(
+ openid_response=self.message.to_response())
+ # Cleanup user for further tests
+ user.delete()
+
+ self.assertIsNotNone(user)
+ self.assertEqual(user.username, expected)
+
+ @override_settings(
+ OPENID_CREATE_USERS=True,
+ OPENID_FOLLOW_RENAMES=False,
+ OPENID_UPDATE_DETAILS_FROM_SREG=True)
+ def test_auth_username_duplicate_numbering(self):
+ # Setup existing user to conflict with
+ User.objects.create_user('testuser')
+
+ self.message.set_sreg_args(nickname='testuser')
+ user = self.backend.authenticate(
+ openid_response=self.message.to_response())
+
+ self.assertIsNotNone(user)
+ self.assertEqual(
+ user.username, 'testuser2',
+ 'Username must contain numeric suffix to avoid collisions.')
+
+ def test_auth_username_duplicate_numbering_with_conflicts(self):
+ # Setup existing users to conflict with
+ User.objects.create_user('testuser')
+ User.objects.create_user('testuser3')
+
+ self.message.set_sreg_args(nickname='testuser')
+ user = self.backend.authenticate(
+ openid_response=self.message.to_response())
+
+ # Since this username is already taken by someone else, we go through
+ # the process of adding +i to it starting with the count of users with
+ # username starting with 'testuser', of which there are 2. i should
+ # start at 3, which already exists, so it should skip to 4.
+ self.assertIsNotNone(user)
+ self.assertEqual(
+ user.username, 'testuser4',
+ 'Username must contain numeric suffix to avoid collisions.')
+
+ def test_auth_username_duplicate_numbering_with_holes(self):
+ # Setup existing users to conflict with
+ User.objects.create_user('testuser')
+ User.objects.create_user('testuser1')
+ User.objects.create_user('testuser6')
+ User.objects.create_user('testuser7')
+ User.objects.create_user('testuser8')
+
+ self.message.set_sreg_args(nickname='testuser')
+ user = self.backend.authenticate(
+ openid_response=self.message.to_response())
+
+ # Since this username is already taken by someone else, we go through
+ # the process of adding +i to it starting with the count of users with
+ # username starting with 'testuser', of which there are 5. i should
+ # start at 6, and increment until it reaches 9.
+ self.assertIsNotNone(user)
+ self.assertEqual(
+ user.username, 'testuser9',
+ 'Username must contain numeric suffix to avoid collisions.')
+
+ def test_auth_username_duplicate_numbering_with_nonsequential_matches(
+ self):
+ # Setup existing users to conflict with
+ User.objects.create_user('testuser')
+ User.objects.create_user('testuserfoo')
+
+ self.message.set_sreg_args(nickname='testuser')
+ user = self.backend.authenticate(
+ openid_response=self.message.to_response())
+
+ # Since this username is already taken by someone else, we go through
+ # the process of adding +i to it starting with the count of users with
+ # username starting with 'testuser', of which there are 2. i should
+ # start at 3, which will be available.
+ self.assertIsNotNone(user)
+ self.assertEqual(
+ user.username, 'testuser3',
+ 'Username must contain numeric suffix to avoid collisions.')
@override_settings(
OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO=True,
@@ -284,10 +453,10 @@ class OpenIDBackendTests(TestCase):
def test_authenticate_when_not_member_of_teams_required(self):
Group.objects.create(name='team')
- response = self.make_openid_response(
- sreg_args=dict(nickname='someuser'),
- teams_args=dict(is_member='foo'))
- user = self.backend.authenticate(openid_response=response)
+ self.message.set_sreg_args(nickname='someuser')
+ self.message.set_team_args(is_member='foo')
+ user = self.backend.authenticate(
+ openid_response=self.message.to_response())
self.assertIsNone(user)
@@ -297,10 +466,10 @@ class OpenIDBackendTests(TestCase):
def test_authenticate_when_no_group_mapping_to_required_team(self):
assert Group.objects.filter(name='team').count() == 0
- response = self.make_openid_response(
- sreg_args=dict(nickname='someuser'),
- teams_args=dict(is_member='foo'))
- user = self.backend.authenticate(openid_response=response)
+ self.message.set_sreg_args(nickname='someuser')
+ self.message.set_team_args(is_member='foo')
+ user = self.backend.authenticate(
+ openid_response=self.message.to_response())
self.assertIsNone(user)
@@ -310,19 +479,19 @@ class OpenIDBackendTests(TestCase):
def test_authenticate_when_member_of_teams_required(self):
Group.objects.create(name='team')
- response = self.make_openid_response(
- sreg_args=dict(nickname='someuser'),
- teams_args=dict(is_member='foo,team'))
- user = self.backend.authenticate(openid_response=response)
+ self.message.set_sreg_args(nickname='someuser')
+ self.message.set_team_args(is_member='foo,team')
+ user = self.backend.authenticate(
+ openid_response=self.message.to_response())
self.assertIsNotNone(user)
@override_settings(OPENID_LAUNCHPAD_TEAMS_REQUIRED=[])
def test_authenticate_when_no_teams_required(self):
- response = self.make_openid_response(
- sreg_args=dict(nickname='someuser'),
- teams_args=dict(is_member='team'))
- user = self.backend.authenticate(openid_response=response)
+ self.message.set_sreg_args(nickname='someuser')
+ self.message.set_team_args(is_member='team')
+ user = self.backend.authenticate(
+ openid_response=self.message.to_response())
self.assertIsNotNone(user)
@@ -332,10 +501,10 @@ class OpenIDBackendTests(TestCase):
def test_authenticate_when_member_of_at_least_one_team(self):
Group.objects.create(name='team1')
- response = self.make_openid_response(
- sreg_args=dict(nickname='someuser'),
- teams_args=dict(is_member='foo,team1'))
- user = self.backend.authenticate(openid_response=response)
+ self.message.set_sreg_args(nickname='someuser')
+ self.message.set_team_args(is_member='foo,team1')
+ user = self.backend.authenticate(
+ openid_response=self.message.to_response())
self.assertIsNotNone(user)
@@ -347,17 +516,17 @@ class OpenIDBackendTests(TestCase):
self):
assert Group.objects.filter(name='team').count() == 0
- response = self.make_openid_response(
- sreg_args=dict(nickname='someuser', email='foo@foo.com'),
- teams_args=dict(is_member='foo'))
- user = self.backend.authenticate(openid_response=response)
+ self.message.set_sreg_args(
+ nickname='someuser', email='foo@foo.com')
+ user = self.backend.authenticate(
+ openid_response=self.message.to_response())
self.assertIsNotNone(user)
- response = self.make_openid_response(
- sreg_args=dict(nickname='someuser', email='foo+bar@foo.com'),
- teams_args=dict(is_member='foo'))
- user = self.backend.authenticate(openid_response=response)
+ self.message.set_sreg_args(
+ nickname='someuser', email='foo+bar@foo.com')
+ user = self.backend.authenticate(
+ openid_response=self.message.to_response())
self.assertIsNotNone(user)
@@ -368,10 +537,9 @@ class OpenIDBackendTests(TestCase):
def test_authenticate_whitelisted_email_multiple_patterns(self):
assert Group.objects.filter(name='team').count() == 0
- response = self.make_openid_response(
- sreg_args=dict(nickname='someuser', email='bar@foo.com'),
- teams_args=dict(is_member='foo'))
- user = self.backend.authenticate(openid_response=response)
+ self.message.set_sreg_args(nickname='someuser', email='bar@foo.com')
+ user = self.backend.authenticate(
+ openid_response=self.message.to_response())
self.assertIsNotNone(user)
@@ -382,9 +550,530 @@ class OpenIDBackendTests(TestCase):
def test_authenticate_whitelisted_email_not_match(self):
assert Group.objects.filter(name='team').count() == 0
- response = self.make_openid_response(
- sreg_args=dict(nickname='someuser', email='bar@foo.com'),
- teams_args=dict(is_member='foo'))
- user = self.backend.authenticate(openid_response=response)
+ self.message.set_sreg_args(nickname='someuser', email='bar@foo.com')
+ self.message.set_team_args(is_member='foo')
+ user = self.backend.authenticate(
+ openid_response=self.message.to_response())
+
+ self.assertIsNone(user)
+
+ def test_auth_no_response(self):
+ self.assertIsNone(self.backend.authenticate())
+ self.assert_no_users_created()
+
+ def test_auth_cancel_response(self):
+ response = CancelResponse(OpenIDServiceEndpoint())
+
+ self.assertIsNone(self.backend.authenticate(openid_response=response))
+ self.assert_no_users_created()
+
+ def test_auth_failure_response(self):
+ response = FailureResponse(OpenIDServiceEndpoint())
+
+ self.assertIsNone(self.backend.authenticate(openid_response=response))
+ self.assert_no_users_created()
+
+ def test_auth_setup_needed_response(self):
+ response = SetupNeededResponse(OpenIDServiceEndpoint())
+
+ self.assertIsNone(self.backend.authenticate(openid_response=response))
+ self.assert_no_users_created()
+
+ @override_settings(OPENID_CREATE_USERS=False)
+ def test_auth_no_create_users(self):
+ user = self.backend.authenticate(
+ openid_response=self.message.to_response())
self.assertIsNone(user)
+ self.assert_no_users_created()
+
+ @override_settings(OPENID_CREATE_USERS=False)
+ def test_auth_no_create_users_existing_user(self):
+ existing_openid = self.make_user_openid(
+ claimed_id=self.message.endpoint.claimed_id)
+ expected_user_count = User.objects.count()
+ user = self.backend.authenticate(
+ openid_response=self.message.to_response())
+
+ self.assertIsNotNone(user)
+ self.assertEqual(user, existing_openid.user)
+ self.assert_no_users_created(expected_count=expected_user_count)
+
+ @override_settings(
+ OPENID_UPDATE_DETAILS_FROM_SREG=True,
+ OPENID_VALID_VERIFICATION_SCHEMES={
+ SERVER_URL: {'token_via_email'}})
+ def test_auth_update_details_from_sreg(self):
+ first_name = 'a' * 31
+ last_name = 'b' * 31
+ email = 'new@email.com'
+ self.message.set_ax_args(
+ fullname=first_name + ' ' + last_name,
+ nickname='newnickname',
+ email=email,
+ first=first_name,
+ last=last_name,
+ verified=True,
+ )
+ existing_openid = self.make_user_openid(
+ claimed_id=self.message.endpoint.claimed_id)
+ original_username = existing_openid.user.username
+ expected_user_count = User.objects.count()
+
+ self.assert_account_not_verified(existing_openid.user)
+
+ user = self.backend.authenticate(
+ openid_response=self.message.to_response())
+
+ self.assertEqual(user, existing_openid.user)
+ self.assertEqual(
+ user.username, original_username,
+ 'Username must not be updated unless OPENID_FOLLOW_RENAMES=True.')
+ self.assertEqual(user.email, email)
+ self.assertEqual(user.first_name, first_name[:30])
+ self.assertEqual(user.last_name, last_name[:30])
+ self.assert_account_verified(user)
+ self.assert_no_users_created(expected_count=expected_user_count)
+
+ @override_settings(
+ OPENID_UPDATE_DETAILS_FROM_SREG=True,
+ OPENID_VALID_VERIFICATION_SCHEMES={
+ SERVER_URL: {'token_via_email'}})
+ def test_auth_update_details_from_sreg_unverifies_account(self):
+ first_name = 'a' * 31
+ last_name = 'b' * 31
+ email = 'new@email.com'
+ kwargs = dict(
+ fullname=first_name + ' ' + last_name,
+ nickname='newnickname',
+ email=email,
+ first=first_name,
+ last=last_name,
+ verified=True,
+ )
+ self.message.set_ax_args(**kwargs)
+ verified_user = self.backend.authenticate(
+ openid_response=self.message.to_response())
+
+ self.assert_account_verified(verified_user)
+ expected_user_count = User.objects.count()
+
+ kwargs['verified'] = False
+ self.message.set_ax_args(**kwargs)
+ unverified_user = self.backend.authenticate(
+ openid_response=self.message.to_response())
+
+ self.assertEqual(verified_user, unverified_user)
+ self.assert_account_not_verified(unverified_user)
+ self.assert_no_users_created(expected_count=expected_user_count)
+
+ @override_settings(OPENID_PHYSICAL_MULTIFACTOR_REQUIRED=True)
+ def test_physical_multifactor_required_not_given(self):
+ response = self.message.to_response()
+
+ with self.assertRaises(MissingPhysicalMultiFactor):
+ self.backend.authenticate(openid_response=response)
+
+ self.assertTrue(
+ UserOpenID.objects.filter(
+ claimed_id=self.message.endpoint.claimed_id).exists(),
+ 'User must be created anyways.')
+
+ @override_settings(OPENID_PHYSICAL_MULTIFACTOR_REQUIRED=True)
+ def test_physical_multifactor_required_invalid_auth_policy(self):
+ self.message.set_pape_args(
+ pape.AUTH_MULTI_FACTOR, pape.AUTH_PHISHING_RESISTANT)
+
+ with self.assertRaises(MissingPhysicalMultiFactor):
+ self.backend.authenticate(
+ openid_response=self.message.to_response())
+
+ self.assertTrue(
+ UserOpenID.objects.filter(
+ claimed_id=self.message.endpoint.claimed_id).exists(),
+ 'User must be created anyways.')
+
+ @override_settings(OPENID_PHYSICAL_MULTIFACTOR_REQUIRED=True)
+ def test_physical_multifactor_required_valid_auth_policy(self):
+ self.message.set_pape_args(
+ pape.AUTH_MULTI_FACTOR, pape.AUTH_MULTI_FACTOR_PHYSICAL,
+ pape.AUTH_PHISHING_RESISTANT)
+ user = self.backend.authenticate(
+ openid_response=self.message.to_response())
+
+ self.assertIsNotNone(user)
+
+ @override_settings(OPENID_STRICT_USERNAMES=True)
+ def test_auth_strict_usernames(self):
+ username = 'nickname'
+ self.message.set_sreg_args(nickname=username)
+ user = self.backend.authenticate(
+ openid_response=self.message.to_response())
+
+ self.assertIsNotNone(user, 'User must be created')
+ self.assertEqual(user.username, username)
+
+ @override_settings(OPENID_STRICT_USERNAMES=True)
+ def test_auth_strict_usernames_no_nickname(self):
+ self.message.set_sreg_args(nickname='')
+
+ msg = re.escape(
+ "An attribute required for logging in was not returned (nickname)")
+
+ with self.assertRaisesRegexp(RequiredAttributeNotReturned, msg):
+ self.backend.authenticate(
+ openid_response=self.message.to_response())
+
+ self.assert_no_users_created()
+
+ @override_settings(
+ OPENID_STRICT_USERNAMES=True,
+ OPENID_UPDATE_DETAILS_FROM_SREG=True)
+ def test_auth_strict_usernames_conflict(self):
+ existing_openid = self.make_user_openid()
+ expected_user_count = User.objects.count()
+ self.message.set_sreg_args(
+ nickname=existing_openid.user.username)
+
+ with self.assertRaises(DuplicateUsernameViolation):
+ self.backend.authenticate(
+ openid_response=self.message.to_response())
+
+ self.assert_no_users_created(expected_count=expected_user_count)
+
+ @override_settings(
+ OPENID_FOLLOW_RENAMES=True,
+ OPENID_UPDATE_DETAILS_FROM_SREG=True)
+ def test_auth_follow_renames(self):
+ new_username = 'new'
+ self.message.set_sreg_args(nickname='username')
+ user = self.backend.authenticate(
+ openid_response=self.message.to_response())
+ expected_user_count = User.objects.count()
+
+ self.assertIsNotNone(user, 'User must be created')
+
+ self.message.set_sreg_args(nickname=new_username)
+ renamed_user = self.backend.authenticate(
+ openid_response=self.message.to_response())
+
+ self.assertEqual(user.pk, renamed_user.pk)
+ self.assertEqual(renamed_user.username, new_username)
+ self.assert_no_users_created(expected_count=expected_user_count)
+
+ @override_settings(
+ OPENID_FOLLOW_RENAMES=True,
+ OPENID_STRICT_USERNAMES=True,
+ OPENID_UPDATE_DETAILS_FROM_SREG=True)
+ def test_auth_follow_renames_strict_usernames_no_nickname(self):
+ self.message.set_sreg_args(nickname='nickame')
+ user = self.backend.authenticate(
+ openid_response=self.message.to_response())
+ expected_user_count = User.objects.count()
+
+ self.assertIsNotNone(user, 'User must be created')
+
+ self.message.set_sreg_args(nickname='')
+
+ # XXX: Check possibilities to normalize this error into a
+ # `RequiredAttributeNotReturned`.
+ with self.assertRaises(MissingUsernameViolation):
+ self.backend.authenticate(
+ openid_response=self.message.to_response())
+
+ self.assert_no_users_created(expected_count=expected_user_count)
+
+ @override_settings(
+ OPENID_FOLLOW_RENAMES=True,
+ OPENID_STRICT_USERNAMES=True,
+ OPENID_UPDATE_DETAILS_FROM_SREG=True)
+ def test_auth_follow_renames_strict_usernames_rename_conflict(self):
+ # Setup existing user to conflict with
+ User.objects.create_user('testuser')
+
+ self.message.set_sreg_args(nickname='nickname')
+ user = self.backend.authenticate(
+ openid_response=self.message.to_response())
+ expected_user_count = User.objects.count()
+
+ self.assertIsNotNone(user, 'First request should succeed')
+
+ self.message.set_sreg_args(nickname='testuser')
+
+ with self.assertRaises(DuplicateUsernameViolation):
+ self.backend.authenticate(
+ openid_response=self.message.to_response())
+
+ db_user = User.objects.get(pk=user.pk)
+ self.assertEqual(db_user.username, 'nickname')
+ self.assert_no_users_created(expected_count=expected_user_count)
+
+ @override_settings(
+ OPENID_FOLLOW_RENAMES=True,
+ OPENID_STRICT_USERNAMES=False,
+ OPENID_UPDATE_DETAILS_FROM_SREG=True)
+ def test_auth_follow_renames_to_conflict(self):
+ # Setup existing user to conflict with
+ User.objects.create_user('testuser')
+ # Setup user to rename
+ user = User.objects.create_user('nickname')
+ self.make_user_openid(
+ user=user, claimed_id=self.message.endpoint.claimed_id)
+ # Trigger rename
+ self.message.set_sreg_args(nickname='testuser')
+ renamed_user = self.backend.authenticate(
+ openid_response=self.message.to_response())
+
+ self.assertEqual(renamed_user.pk, user.pk)
+ self.assertEqual(
+ renamed_user.username, 'testuser2',
+ 'Username must have a numbered suffix to avoid conflict.')
+
+ @override_settings(
+ OPENID_FOLLOW_RENAMES=True,
+ OPENID_UPDATE_DETAILS_FROM_SREG=True)
+ def test_auth_follow_renames_no_change(self):
+ # Setup user to rename
+ user = User.objects.create_user('username')
+ self.make_user_openid(
+ user=user, claimed_id=self.message.endpoint.claimed_id)
+ expected_user_count = User.objects.count()
+ # Trigger rename
+ self.message.set_sreg_args(nickname=user.username)
+ renamed_user = self.backend.authenticate(
+ openid_response=self.message.to_response())
+
+ self.assertEqual(renamed_user.pk, user.pk)
+ self.assertEqual(
+ renamed_user.username, user.username,
+ 'No numeric suffix should be appended for username owner.')
+ self.assert_no_users_created(expected_count=expected_user_count)
+
+ @override_settings(
+ OPENID_FOLLOW_RENAMES=True,
+ OPENID_UPDATE_DETAILS_FROM_SREG=True)
+ def test_auth_follow_renames_to_numbered_suffix(self):
+ # Setup user to rename to numbered suffix pattern
+ user = User.objects.create_user('testuser2000eight')
+ self.make_user_openid(
+ user=user, claimed_id=self.message.endpoint.claimed_id)
+ # Trigger rename
+ self.message.set_sreg_args(nickname='testuser2')
+ renamed_user = self.backend.authenticate(
+ openid_response=self.message.to_response())
+
+ self.assertEqual(renamed_user.pk, user.pk)
+ self.assertEqual(
+ renamed_user.username, 'testuser2',
+ 'The numbered suffix must be kept.')
+
+ @override_settings(
+ OPENID_FOLLOW_RENAMES=True,
+ OPENID_UPDATE_DETAILS_FROM_SREG=True)
+ def test_auth_follow_renames_to_numbered_suffix_with_existing(self):
+ # Setup existing user to conflict with
+ User.objects.create_user('testuser')
+ # Setup user to rename to numbered suffix pattern
+ user = User.objects.create_user('testuser2000eight')
+ self.make_user_openid(
+ user=user, claimed_id=self.message.endpoint.claimed_id)
+ # Trigger rename
+ self.message.set_sreg_args(nickname='testuser3')
+ renamed_user = self.backend.authenticate(
+ openid_response=self.message.to_response())
+
+ self.assertEqual(renamed_user.pk, user.pk)
+ self.assertEqual(
+ renamed_user.username, 'testuser3',
+ 'Username must be kept as there are no conflicts.')
+
+ @override_settings(
+ OPENID_FOLLOW_RENAMES=True,
+ OPENID_UPDATE_DETAILS_FROM_SREG=True)
+ def test_auth_follow_renames_from_numbered_suffix_to_conflict(self):
+ # Setup existing user to conflict with
+ User.objects.create_user('testuser')
+ # Setup user with numbered suffix pattern
+ user = User.objects.create_user('testuser2000')
+ self.make_user_openid(
+ user=user, claimed_id=self.message.endpoint.claimed_id)
+ # Trigger rename
+ self.message.set_sreg_args(nickname='testuser')
+ renamed_user = self.backend.authenticate(
+ openid_response=self.message.to_response())
+
+ self.assertEqual(renamed_user.pk, user.pk)
+ self.assertEqual(
+ user.username, 'testuser2000',
+ 'Since testuser conflicts, username must remain unchanged as it '
+ 'maches the number suffix pattern.')
+
+ @override_settings(
+ OPENID_FOLLOW_RENAMES=True,
+ OPENID_UPDATE_DETAILS_FROM_SREG=True)
+ def test_auth_follow_renames_from_numbered_suffix_no_conflict(self):
+ # Setup user with numbered suffix pattern
+ user = User.objects.create_user('testuser2')
+ self.make_user_openid(
+ user=user, claimed_id=self.message.endpoint.claimed_id)
+ # Trigger rename
+ self.message.set_sreg_args(nickname='testuser')
+ renamed_user = self.backend.authenticate(
+ openid_response=self.message.to_response())
+
+ self.assertEqual(renamed_user.pk, user.pk)
+ self.assertEqual(
+ renamed_user.username, 'testuser',
+ 'Username must be updated as there are no conflicts.')
+
+ @override_settings(OPENID_STRICT_USERNAMES=True)
+ def test_auth_duplicate_username_signal_is_sent(self):
+ existing_openid = self.make_user_openid()
+ expected_user_count = User.objects.count()
+ signal_kwargs = {}
+
+ def duplicate_username_handler(sender, **kwargs):
+ signal_kwargs.update(kwargs)
+ self.addCleanup(
+ openid_duplicate_username.disconnect,
+ duplicate_username_handler, sender=User, dispatch_uid='testing')
+ openid_duplicate_username.connect(
+ duplicate_username_handler, sender=User, weak=False,
+ dispatch_uid='testing')
+
+ self.message.set_sreg_args(
+ nickname=existing_openid.user.username)
+
+ with self.assertRaises(DuplicateUsernameViolation):
+ self.backend.authenticate(
+ openid_response=self.message.to_response())
+
+ self.assertIn('username', signal_kwargs)
+ self.assertEqual(
+ signal_kwargs['username'], existing_openid.user.username)
+ self.assert_no_users_created(expected_count=expected_user_count)
+
+ @override_settings(OPENID_STRICT_USERNAMES=True)
+ def test_auth_duplicate_username_signal_can_prevent_duplicate_error(self):
+ existing_openid = self.make_user_openid()
+
+ def duplicate_username_handler(sender, **kwargs):
+ existing_user = existing_openid.user
+ existing_user.username += '_other'
+ existing_user.save()
+ self.addCleanup(
+ openid_duplicate_username.disconnect,
+ duplicate_username_handler, sender=User, dispatch_uid='testing')
+ openid_duplicate_username.connect(
+ duplicate_username_handler, sender=User, weak=False,
+ dispatch_uid='testing')
+
+ self.message.set_sreg_args(
+ nickname=existing_openid.user.username)
+ user = self.backend.authenticate(
+ openid_response=self.message.to_response())
+
+ self.assertIsNotNone(user)
+ self.assertNotEqual(user, existing_openid.user)
+
+ @override_settings(OPENID_STRICT_USERNAMES=True)
+ def test_auth_duplicate_username_is_not_called_if_no_conflict(self):
+ def duplicate_username_handler(sender, **kwargs):
+ assert False, 'This should never have been called.'
+ self.addCleanup(
+ openid_duplicate_username.disconnect,
+ duplicate_username_handler, sender=User, dispatch_uid='testing')
+ openid_duplicate_username.connect(
+ duplicate_username_handler, sender=User, weak=False,
+ dispatch_uid='testing')
+
+ self.message.set_sreg_args(nickname='nickname')
+ self.backend.authenticate(openid_response=self.message.to_response())
+
+ @override_settings(OPENID_STRICT_USERNAMES=False)
+ def test_auth_duplicate_username_is_not_called_if_not_strict(self):
+ existing_openid = self.make_user_openid()
+
+ def duplicate_username_handler(sender, **kwargs):
+ assert False, 'This should never have been called.'
+ self.addCleanup(
+ openid_duplicate_username.disconnect,
+ duplicate_username_handler, sender=User, dispatch_uid='testing')
+ openid_duplicate_username.connect(
+ duplicate_username_handler, sender=User, weak=False,
+ dispatch_uid='testing')
+
+ self.message.set_sreg_args(nickname=existing_openid.user.username)
+ self.backend.authenticate(openid_response=self.message.to_response())
+
+ @override_settings(OPENID_STRICT_USERNAMES=True)
+ def test_auth_duplicate_username_handling_bypass_numbered_suffix(self):
+ nickname = 'nickname87'
+ existing_openid = self.make_user_openid(
+ user=User.objects.create_user(nickname))
+
+ def duplicate_username_handler(sender, **kwargs):
+ existing_user = existing_openid.user
+ existing_user.username += '00'
+ existing_user.save()
+ self.addCleanup(
+ openid_duplicate_username.disconnect,
+ duplicate_username_handler, sender=User, dispatch_uid='testing')
+ openid_duplicate_username.connect(
+ duplicate_username_handler, sender=User, weak=False,
+ dispatch_uid='testing')
+
+ self.message.set_sreg_args(nickname=existing_openid.user.username)
+ user = self.backend.authenticate(
+ openid_response=self.message.to_response())
+
+ self.assertIsNotNone(user)
+ self.assertNotEqual(user, existing_openid.user)
+ self.assertEqual(
+ user.username, nickname,
+ 'In strict mode, when conflicts are handled, the username must '
+ 'be kept unmodified without numbered suffixes.')
+
+
+class GetGroupModelTestCase(TestCase):
+
+ def setUp(self):
+ super(GetGroupModelTestCase, self).setUp()
+ self.inject_test_models()
+
+ def inject_test_models(self):
+ installed_apps = settings.INSTALLED_APPS + (
+ 'django_openid_auth.tests',
+ )
+ p = self.settings(INSTALLED_APPS=installed_apps)
+ p.enable()
+ self.addCleanup(p.disable)
+ self.clear_app_cache()
+
+ def clear_app_cache(self):
+ try:
+ from django.apps import apps
+ apps.clear_cache()
+ except ImportError:
+ from django.db.models.loading import cache
+ cache.loaded = False
+
+ def test_default_group_model(self):
+ model = get_user_group_model()
+ self.assertEqual(model, User.groups.through)
+
+ @override_settings(AUTH_USER_GROUP_MODEL='tests.UserGroup')
+ def test_custom_group_model(self):
+ from django_openid_auth.tests.models import UserGroup
+ model = get_user_group_model()
+ self.assertEqual(model, UserGroup)
+
+ @override_settings(
+ AUTH_USER_GROUP_MODEL='django_openid_auth.models.UserGroup')
+ def test_improperly_configured_invalid_name(self):
+ self.assertRaises(ImproperlyConfigured, get_user_group_model)
+
+ @override_settings(
+ AUTH_USER_GROUP_MODEL='invalid.UserGroup')
+ def test_improperly_configured_invalid_app(self):
+ self.assertRaises(ImproperlyConfigured, get_user_group_model)
diff --git a/django_openid_auth/tests/test_store.py b/django_openid_auth/tests/test_store.py
index e5e1451..3b71632 100644
--- a/django_openid_auth/tests/test_store.py
+++ b/django_openid_auth/tests/test_store.py
@@ -28,6 +28,7 @@
from __future__ import unicode_literals
+import base64
import time
from django.test import TestCase
@@ -52,7 +53,8 @@ class OpenIDStoreTests(TestCase):
server_url='server-url', handle='handle')
self.assertEquals(dbassoc.server_url, 'server-url')
self.assertEquals(dbassoc.handle, 'handle')
- self.assertEquals(dbassoc.secret, 'secret'.encode('base-64'))
+ self.assertEquals(
+ dbassoc.secret, base64.encodestring(b'secret').decode('utf-8'))
self.assertEquals(dbassoc.issued, 42)
self.assertEquals(dbassoc.lifetime, 600)
self.assertEquals(dbassoc.assoc_type, 'HMAC-SHA1')
@@ -66,7 +68,8 @@ class OpenIDStoreTests(TestCase):
self.store.storeAssociation('server-url', assoc)
dbassoc = Association.objects.get(
server_url='server-url', handle='handle')
- self.assertEqual(dbassoc.secret, 'secret2'.encode('base-64'))
+ self.assertEqual(
+ dbassoc.secret, base64.encodestring(b'secret2').decode('utf-8'))
self.assertEqual(dbassoc.issued, 420)
self.assertEqual(dbassoc.lifetime, 900)
self.assertEqual(dbassoc.assoc_type, 'HMAC-SHA256')
@@ -80,7 +83,7 @@ class OpenIDStoreTests(TestCase):
self.assertTrue(isinstance(assoc, OIDAssociation))
self.assertEquals(assoc.handle, 'handle')
- self.assertEquals(assoc.secret, 'secret')
+ self.assertEquals(assoc.secret, b'secret')
self.assertEquals(assoc.issued, timestamp)
self.assertEquals(assoc.lifetime, 600)
self.assertEquals(assoc.assoc_type, 'HMAC-SHA1')
diff --git a/django_openid_auth/tests/test_views.py b/django_openid_auth/tests/test_views.py
index 835ddd5..daa632d 100644
--- a/django_openid_auth/tests/test_views.py
+++ b/django_openid_auth/tests/test_views.py
@@ -31,13 +31,17 @@ from __future__ import unicode_literals
import cgi
-from urlparse import parse_qs
+try:
+ from urllib.parse import parse_qs
+except ImportError:
+ from urlparse import parse_qs
from django.conf import settings
from django.contrib.auth.models import User, Group, Permission
from django.core.urlresolvers import reverse
from django.http import HttpRequest, HttpResponse
from django.test import TestCase
+from django.test.client import RequestFactory
from django.test.utils import override_settings
from mock import patch
from openid.consumer.consumer import Consumer, SuccessResponse
@@ -54,8 +58,9 @@ from django_openid_auth import teams
from django_openid_auth.models import UserOpenID
from django_openid_auth.tests.helpers import override_session_serializer
from django_openid_auth.views import (
- sanitise_redirect_url,
+ get_request_data,
make_consumer,
+ sanitise_redirect_url,
)
from django_openid_auth.signals import openid_login_complete
from django_openid_auth.store import DjangoOpenIDStore
@@ -123,8 +128,8 @@ class StubOpenIDProvider(HTTPFetcher):
def parseFormPost(self, content):
"""Parse an HTML form post to create an OpenID request."""
# Hack to make the javascript XML compliant ...
- content = content.replace('i < elements.length',
- 'i &lt; elements.length')
+ content = content.replace(
+ 'i < elements.length', 'i &lt; elements.length')
tree = ET.XML(content)
form = tree.find('.//form')
assert form is not None, 'No form in document'
@@ -135,8 +140,7 @@ class StubOpenIDProvider(HTTPFetcher):
for input in form.findall('input'):
if input.get('type') != 'hidden':
continue
- query[input.get('name').encode('UTF-8')] = \
- input.get('value').encode('UTF-8')
+ query[input.get('name')] = input.get('value')
self.last_request = self.server.decodeRequest(query)
return self.last_request
@@ -163,13 +167,6 @@ class DummyDjangoRequest(object):
def build_absolute_uri(self):
return self.META['SCRIPT_NAME'] + self.request_path
- def _combined_request(self):
- request = {}
- request.update(self.POST)
- request.update(self.GET)
- return request
- REQUEST = property(_combined_request)
-
@override_session_serializer
@override_settings(
@@ -184,10 +181,10 @@ class DummyDjangoRequest(object):
OPENID_SREG_REQUIRED_FIELDS=[],
OPENID_USE_EMAIL_FOR_USERNAME=False,
OPENID_VALID_VERIFICATION_SCHEMES={},
+ ROOT_URLCONF='django_openid_auth.tests.urls',
)
class RelyingPartyTests(TestCase):
- urls = 'django_openid_auth.tests.urls'
login_url = reverse('openid-login')
def setUp(self):
@@ -241,7 +238,8 @@ class RelyingPartyTests(TestCase):
response = self.client.post(self.login_url, self.openid_req)
self.assertContains(response, 'OpenID transaction in progress')
- openid_request = self.provider.parseFormPost(response.content)
+ openid_request = self.provider.parseFormPost(
+ response.content.decode('utf-8'))
self.assertEqual(openid_request.mode, 'checkid_setup')
self.assertTrue(openid_request.return_to.startswith(
'http://testserver/openid/complete/'))
@@ -253,7 +251,7 @@ class RelyingPartyTests(TestCase):
# And they are now logged in:
response = self.client.get('/getuser/')
- self.assertEqual(response.content, 'someuser')
+ self.assertEqual(response.content.decode('utf-8'), 'someuser')
def test_login_with_nonascii_return_to(self):
"""Ensure non-ascii characters can be used for the 'next' arg."""
@@ -275,7 +273,8 @@ class RelyingPartyTests(TestCase):
self.assertContains(response, 'OpenID transaction in progress')
- openid_request = self.provider.parseFormPost(response.content)
+ openid_request = self.provider.parseFormPost(
+ response.content.decode('utf-8'))
self.assertEqual(openid_request.mode, 'checkid_setup')
self.assertTrue(openid_request.return_to.startswith(
'http://testserver/openid/complete/'))
@@ -302,7 +301,8 @@ class RelyingPartyTests(TestCase):
self.assertEqual(response.status_code, 200)
self.assertContains(response, 'OpenID transaction in progress')
- openid_request = self.provider.parseFormPost(response.content)
+ openid_request = self.provider.parseFormPost(
+ response.content.decode('utf-8'))
self.assertEqual(openid_request.mode, 'checkid_setup')
self.assertTrue(openid_request.return_to.startswith(
'http://testserver/openid/complete/'))
@@ -314,7 +314,7 @@ class RelyingPartyTests(TestCase):
# And they are now logged in:
response = self.client.get('/getuser/')
- self.assertEqual(response.content, 'someuser')
+ self.assertEqual(response.content.decode('utf-8'), 'someuser')
def test_login_create_users(self):
# Create a user with the same name as we'll pass back via sreg.
@@ -326,7 +326,8 @@ class RelyingPartyTests(TestCase):
# Complete the request, passing back some simple registration
# data. The user is redirected to the next URL.
- openid_request = self.provider.parseFormPost(response.content)
+ openid_request = self.provider.parseFormPost(
+ response.content.decode('utf-8'))
sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
openid_response = openid_request.answer(True)
sreg_response = sreg.SRegResponse.extractResponse(
@@ -340,7 +341,7 @@ class RelyingPartyTests(TestCase):
# And they are now logged in as a new user (they haven't taken
# over the existing "someuser" user).
response = self.client.get('/getuser/')
- self.assertEqual(response.content, 'someuser2')
+ self.assertEqual(response.content.decode('utf-8'), 'someuser2')
# Check the details of the new user.
user = User.objects.get(username='someuser2')
@@ -364,7 +365,8 @@ class RelyingPartyTests(TestCase):
# Complete the request, passing back some simple registration
# data. The user is redirected to the next URL.
- openid_request = self.provider.parseFormPost(response.content)
+ openid_request = self.provider.parseFormPost(
+ response.content.decode('utf-8'))
return openid_request
def _get_login_response(self, openid_request, resp_data, use_sreg,
@@ -388,7 +390,8 @@ class RelyingPartyTests(TestCase):
self.provider.type_uris.append(pape.ns_uri)
response = self.client.post(self.login_url, self.openid_req)
- openid_request = self.provider.parseFormPost(response.content)
+ openid_request = self.provider.parseFormPost(
+ response.content.decode('utf-8'))
request_auth = openid_request.message.getArg(
'http://specs.openid.net/extensions/pape/1.0',
@@ -436,7 +439,7 @@ class RelyingPartyTests(TestCase):
query['openid.pape.auth_policies'], [preferred_auth])
response = self.client.get('/getuser/')
- self.assertEqual(response.content, 'testuser')
+ self.assertEqual(response.content.decode('utf-8'), 'testuser')
@override_settings(OPENID_PHYSICAL_MULTIFACTOR_REQUIRED=True)
def test_login_physical_multifactor_not_provided(self):
@@ -552,10 +555,11 @@ class RelyingPartyTests(TestCase):
response = self.client.get('/getuser/')
# username defaults to 'openiduser'
- self.assertEqual(response.content, 'openiduser')
+ username = response.content.decode('utf-8')
+ self.assertEqual(username, 'openiduser')
# The user's full name and email have been updated.
- user = User.objects.get(username=response.content)
+ user = User.objects.get(username=username)
self.assertEqual(user.first_name, 'Openid')
self.assertEqual(user.last_name, 'User')
self.assertEqual(user.email, 'foo@example.com')
@@ -570,7 +574,7 @@ class RelyingPartyTests(TestCase):
response = self.client.get('/getuser/')
# username defaults to a munged version of the email
- self.assertEqual(response.content, 'fooexamplecom')
+ self.assertEqual(response.content.decode('utf-8'), 'fooexamplecom')
def test_login_duplicate_username_numbering(self):
# Setup existing user who's name we're going to conflict with
@@ -587,7 +591,7 @@ class RelyingPartyTests(TestCase):
# Since this username is already taken by someone else, we go through
# the process of adding +i to it, and get testuser2.
- self.assertEqual(response.content, 'testuser2')
+ self.assertEqual(response.content.decode('utf-8'), 'testuser2')
def test_login_duplicate_username_numbering_with_conflicts(self):
# Setup existing user who's name we're going to conflict with
@@ -607,7 +611,7 @@ class RelyingPartyTests(TestCase):
# the process of adding +i to it starting with the count of users with
# username starting with 'testuser', of which there are 2. i should
# start at 3, which already exists, so it should skip to 4.
- self.assertEqual(response.content, 'testuser4')
+ self.assertEqual(response.content.decode('utf-8'), 'testuser4')
def test_login_duplicate_username_numbering_with_holes(self):
# Setup existing user who's name we're going to conflict with
@@ -630,7 +634,7 @@ class RelyingPartyTests(TestCase):
# the process of adding +i to it starting with the count of users with
# username starting with 'testuser', of which there are 5. i should
# start at 6, and increment until it reaches 9.
- self.assertEqual(response.content, 'testuser9')
+ self.assertEqual(response.content.decode('utf-8'), 'testuser9')
def test_login_duplicate_username_numbering_with_nonsequential_matches(
self):
@@ -651,7 +655,7 @@ class RelyingPartyTests(TestCase):
# the process of adding +i to it starting with the count of users with
# username starting with 'testuser', of which there are 2. i should
# start at 3, which will be available.
- self.assertEqual(response.content, 'testuser3')
+ self.assertEqual(response.content.decode('utf-8'), 'testuser3')
def test_login_follow_rename(self):
user = User.objects.create_user('testuser', 'someone@example.com')
@@ -671,10 +675,11 @@ class RelyingPartyTests(TestCase):
# If OPENID_FOLLOW_RENAMES, they are logged in as
# someuser (the passed in nickname has changed the username)
- self.assertEqual(response.content, 'someuser')
+ username = response.content.decode('utf-8')
+ self.assertEqual(username, 'someuser')
# The user's full name and email have been updated.
- user = User.objects.get(username=response.content)
+ user = User.objects.get(username=username)
self.assertEqual(user.first_name, 'Some')
self.assertEqual(user.last_name, 'User')
self.assertEqual(user.email, 'foo@example.com')
@@ -696,10 +701,11 @@ class RelyingPartyTests(TestCase):
response = self.client.get('/getuser/')
# Username should not have changed
- self.assertEqual(response.content, 'testuser')
+ username = response.content.decode('utf-8')
+ self.assertEqual(username, 'testuser')
# The user's full name and email have been updated.
- user = User.objects.get(username=response.content)
+ user = User.objects.get(username=username)
self.assertEqual(user.first_name, 'Some')
self.assertEqual(user.last_name, 'User')
self.assertEqual(user.email, 'foo@example.com')
@@ -735,10 +741,11 @@ class RelyingPartyTests(TestCase):
# If OPENID_FOLLOW_RENAMES, attempt to change username to 'testuser'
# but since that username is already taken by someone else, we go
# through the process of adding +i to it, and get testuser2.
- self.assertEqual(response.content, 'testuser2')
+ username = response.content.decode('utf-8')
+ self.assertEqual(username, 'testuser2')
# The user's full name and email have been updated.
- user = User.objects.get(username=response.content)
+ user = User.objects.get(username=username)
self.assertEqual(user.first_name, 'Rename')
self.assertEqual(user.last_name, 'User')
self.assertEqual(user.email, 'rename@example.com')
@@ -777,10 +784,11 @@ class RelyingPartyTests(TestCase):
# the username follows the nickname+i scheme, it has non-numbers in the
# suffix, so it's not an auto-generated one. The regular process of
# renaming to 'testuser' has a conflict, so we get +2 at the end.
- self.assertEqual(response.content, 'testuser2')
+ username = response.content.decode('utf-8')
+ self.assertEqual(username, 'testuser2')
# The user's full name and email have been updated.
- user = User.objects.get(username=response.content)
+ user = User.objects.get(username=username)
self.assertEqual(user.first_name, 'Rename')
self.assertEqual(user.last_name, 'User')
self.assertEqual(user.email, 'rename@example.com')
@@ -817,10 +825,11 @@ class RelyingPartyTests(TestCase):
# but since that username is already taken by someone else, we go
# through the process of adding +i to it. Since the user for this
# identity url already has a name matching that pattern, check if first
- self.assertEqual(response.content, 'testuser2000')
+ username = response.content.decode('utf-8')
+ self.assertEqual(username, 'testuser2000')
# The user's full name and email have been updated.
- user = User.objects.get(username=response.content)
+ user = User.objects.get(username=username)
self.assertEqual(user.first_name, 'Rename')
self.assertEqual(user.last_name, 'User')
self.assertEqual(user.email, 'rename@example.com')
@@ -847,10 +856,11 @@ class RelyingPartyTests(TestCase):
# If OPENID_FOLLOW_RENAMES, username should be changed to 'testuser'
# because it wasn't currently taken
- self.assertEqual(response.content, 'testuser')
+ username = response.content.decode('utf-8')
+ self.assertEqual(username, 'testuser')
# The user's full name and email have been updated.
- user = User.objects.get(username=response.content)
+ user = User.objects.get(username=username)
self.assertEqual(user.first_name, 'Same')
self.assertEqual(user.last_name, 'User')
self.assertEqual(user.email, 'same@example.com')
@@ -865,7 +875,8 @@ class RelyingPartyTests(TestCase):
# Complete the request, passing back some simple registration
# data. The user is redirected to the next URL.
- openid_request = self.provider.parseFormPost(response.content)
+ openid_request = self.provider.parseFormPost(
+ response.content.decode('utf-8'))
sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
openid_response = openid_request.answer(True)
sreg_response = sreg.SRegResponse.extractResponse(
@@ -903,7 +914,8 @@ class RelyingPartyTests(TestCase):
# Complete the request, passing back some simple registration
# data. The user is redirected to the next URL.
- openid_request = self.provider.parseFormPost(response.content)
+ openid_request = self.provider.parseFormPost(
+ response.content.decode('utf-8'))
sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
openid_response = openid_request.answer(True)
sreg_response = sreg.SRegResponse.extractResponse(
@@ -932,7 +944,8 @@ class RelyingPartyTests(TestCase):
# Complete the request, passing back some simple registration
# data. The user is redirected to the next URL.
- openid_request = self.provider.parseFormPost(response.content)
+ openid_request = self.provider.parseFormPost(
+ response.content.decode('utf-8'))
sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
openid_response = openid_request.answer(True)
sreg_response = sreg.SRegResponse.extractResponse(
@@ -974,7 +987,8 @@ class RelyingPartyTests(TestCase):
# Complete the request, passing back some simple registration
# data. The user is redirected to the next URL.
- openid_request = self.provider.parseFormPost(response.content)
+ openid_request = self.provider.parseFormPost(
+ response.content.decode('utf-8'))
sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
openid_response = openid_request.answer(True)
sreg_response = sreg.SRegResponse.extractResponse(
@@ -997,7 +1011,8 @@ class RelyingPartyTests(TestCase):
# Complete the request, passing back some simple registration
# data. The user is redirected to the next URL.
- openid_request = self.provider.parseFormPost(response.content)
+ openid_request = self.provider.parseFormPost(
+ response.content.decode('utf-8'))
sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
openid_response = openid_request.answer(True)
sreg_response = sreg.SRegResponse.extractResponse(
@@ -1033,10 +1048,11 @@ class RelyingPartyTests(TestCase):
self._do_user_login(self.openid_req, self.openid_resp)
response = self.client.get('/getuser/')
- self.assertEqual(response.content, 'testuser')
+ username = response.content.decode('utf-8')
+ self.assertEqual(username, 'testuser')
# The user's full name and email have been updated.
- user = User.objects.get(username=response.content)
+ user = User.objects.get(username=username)
self.assertEqual(user.first_name, 'Some')
self.assertEqual(user.last_name, 'User')
self.assertEqual(user.email, 'foo@example.com')
@@ -1052,7 +1068,8 @@ class RelyingPartyTests(TestCase):
with self.settings(OPENID_SREG_EXTRA_FIELDS=('language',)):
response = self.client.post(self.login_url, self.openid_req)
- openid_request = self.provider.parseFormPost(response.content)
+ openid_request = self.provider.parseFormPost(
+ response.content.decode('utf-8'))
sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
for field in ('email', 'fullname', 'nickname', 'language'):
self.assertTrue(field in sreg_request)
@@ -1069,7 +1086,8 @@ class RelyingPartyTests(TestCase):
with self.settings(OPENID_SREG_REQUIRED_FIELDS=('email', 'language')):
response = self.client.post(self.login_url, self.openid_req)
- openid_request = self.provider.parseFormPost(response.content)
+ openid_request = self.provider.parseFormPost(
+ response.content.decode('utf-8'))
sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
self.assertEqual(['email', 'language'], sreg_request.required)
@@ -1091,7 +1109,8 @@ class RelyingPartyTests(TestCase):
# The resulting OpenID request uses the Attribute Exchange
# extension rather than the Simple Registration extension.
- openid_request = self.provider.parseFormPost(response.content)
+ openid_request = self.provider.parseFormPost(
+ response.content.decode('utf-8'))
sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
self.assertEqual(sreg_request.required, [])
self.assertEqual(sreg_request.optional, [])
@@ -1137,7 +1156,7 @@ class RelyingPartyTests(TestCase):
assert not settings.OPENID_FOLLOW_RENAMES, (
'OPENID_FOLLOW_RENAMES must be False')
response = self.client.get('/getuser/')
- self.assertEqual(response.content, 'testuser')
+ self.assertEqual(response.content.decode('utf-8'), 'testuser')
# The user's full name and email have been updated.
user = User.objects.get(username='testuser')
@@ -1221,7 +1240,8 @@ class RelyingPartyTests(TestCase):
self.assertContains(response, 'OpenID transaction in progress')
# Complete the request
- openid_request = self.provider.parseFormPost(response.content)
+ openid_request = self.provider.parseFormPost(
+ response.content.decode('utf-8'))
openid_response = openid_request.answer(True)
teams_request = teams.TeamsRequest.fromOpenIDRequest(openid_request)
teams_response = teams.TeamsResponse.extractResponse(
@@ -1236,7 +1256,7 @@ class RelyingPartyTests(TestCase):
# And they are now logged in as testuser
response = self.client.get('/getuser/')
- self.assertEqual(response.content, 'testuser')
+ self.assertEqual(response.content.decode('utf-8'), 'testuser')
# The user's groups have been updated.
User.objects.get(username='testuser')
@@ -1268,7 +1288,8 @@ class RelyingPartyTests(TestCase):
OPENID_LAUNCHPAD_TEAMS_MAPPING=mapping,
OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO=True,
OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO_BLACKLIST=blacklist):
- openid_request = self.provider.parseFormPost(response.content)
+ openid_request = self.provider.parseFormPost(
+ response.content.decode('utf-8'))
openid_request.answer(True)
teams.TeamsRequest.fromOpenIDRequest(openid_request)
@@ -1321,7 +1342,8 @@ class RelyingPartyTests(TestCase):
response = self.client.post(self.login_url, self.openid_req_no_next)
# Complete the request
- openid_request = self.provider.parseFormPost(response.content)
+ openid_request = self.provider.parseFormPost(
+ response.content.decode('utf-8'))
openid_response = openid_request.answer(True)
teams_request = teams.TeamsRequest.fromOpenIDRequest(openid_request)
teams_response = teams.TeamsResponse.extractResponse(
@@ -1340,7 +1362,8 @@ class RelyingPartyTests(TestCase):
display_id='http://example.com/identity')
response = self.client.post(self.login_url, self.openid_req_no_next)
- openid_request = self.provider.parseFormPost(response.content)
+ openid_request = self.provider.parseFormPost(
+ response.content.decode('utf-8'))
openid_response = openid_request.answer(True)
# Use a closure to test whether the signal handler was called.
self.signal_handler_called = False
@@ -1388,3 +1411,24 @@ class HelperFunctionsTest(TestCase):
self.assertEqual(url, sanitised)
else:
self.assertEqual(settings.LOGIN_REDIRECT_URL, sanitised)
+
+ def test_get_request_data_from_post(self):
+ request = RequestFactory().post('/', data={'foo': 'bar'})
+ data = get_request_data(request)
+ self.assertEqual(dict(data), {'foo': ['bar']})
+
+ def test_get_request_data_from_get(self):
+ request = RequestFactory().get('/', data={'foo': 'bar'})
+ data = get_request_data(request)
+ self.assertEqual(dict(data), {'foo': ['bar']})
+
+ def test_get_request_data_merged(self):
+ request = RequestFactory().post('/?baz=42', data={'foo': 'bar'})
+ data = get_request_data(request)
+ self.assertEqual(dict(data), {'foo': ['bar'], 'baz': ['42']})
+
+ def test_get_request_data_override_order(self):
+ request = RequestFactory().post('/?foo=42', data={'foo': 'bar'})
+ data = get_request_data(request)
+ self.assertEqual(dict(data), {'foo': ['42', 'bar']})
+ self.assertEqual(data['foo'], 'bar')
diff --git a/django_openid_auth/tests/urls.py b/django_openid_auth/tests/urls.py
index d30e01d..448fb23 100644
--- a/django_openid_auth/tests/urls.py
+++ b/django_openid_auth/tests/urls.py
@@ -28,7 +28,7 @@
from __future__ import unicode_literals
-from django.conf.urls import patterns, include
+from django.conf.urls import include, url
from django.http import HttpResponse
@@ -36,8 +36,7 @@ def get_user(request):
return HttpResponse(request.user.username)
-urlpatterns = patterns(
- '',
- (r'^getuser/$', get_user),
- (r'^openid/', include('django_openid_auth.urls')),
-)
+urlpatterns = [
+ url(r'^getuser/$', get_user),
+ url(r'^openid/', include('django_openid_auth.urls')),
+]
diff --git a/django_openid_auth/urls.py b/django_openid_auth/urls.py
index c984671..623d534 100644
--- a/django_openid_auth/urls.py
+++ b/django_openid_auth/urls.py
@@ -29,11 +29,17 @@
from __future__ import unicode_literals
-from django.conf.urls import patterns, url
+from django.conf.urls import url
-urlpatterns = patterns(
- 'django_openid_auth.views',
- url(r'^login/$', 'login_begin', name='openid-login'),
- url(r'^complete/$', 'login_complete', name='openid-complete'),
- url(r'^logo.gif$', 'logo', name='openid-logo'),
+from django_openid_auth.views import (
+ login_begin,
+ login_complete,
+ logo,
)
+
+
+urlpatterns = [
+ url(r'^login/$', login_begin, name='openid-login'),
+ url(r'^complete/$', login_complete, name='openid-complete'),
+ url(r'^logo.gif$', logo, name='openid-logo'),
+]
diff --git a/django_openid_auth/views.py b/django_openid_auth/views.py
index dc9c248..fdadc23 100644
--- a/django_openid_auth/views.py
+++ b/django_openid_auth/views.py
@@ -30,8 +30,11 @@
from __future__ import unicode_literals
import re
-import urllib
-from urlparse import urlsplit
+try:
+ from urllib.parse import urlencode, urlsplit
+except ImportError:
+ from urllib import urlencode
+ from urlparse import urlsplit
from django.conf import settings
from django.contrib.auth import (
@@ -39,6 +42,7 @@ from django.contrib.auth import (
from django.contrib.auth.models import Group
from django.core.urlresolvers import reverse
from django.http import HttpResponse, HttpResponseRedirect
+from django.http.request import QueryDict
from django.shortcuts import render_to_response
from django.template import RequestContext
from django.template.loader import render_to_string
@@ -129,22 +133,27 @@ def default_render_failure(request, message, status=403,
template_name='openid/failure.html',
exception=None):
"""Render an error page to the user."""
- data = render_to_string(
- template_name, dict(message=message, exception=exception),
- context_instance=RequestContext(request))
+ context = RequestContext(request)
+ context.update(dict(message=message, exception=exception))
+ data = render_to_string(template_name, context)
return HttpResponse(data, status=status)
def parse_openid_response(request):
"""Parse an OpenID response from a Django request."""
- # Short cut if there is no request parameters.
- # if len(request.REQUEST) == 0:
- # return None
-
current_url = request.build_absolute_uri()
consumer = make_consumer(request)
- return consumer.complete(dict(request.REQUEST.items()), current_url)
+ data = get_request_data(request)
+ return consumer.complete(data, current_url)
+
+
+def get_request_data(request):
+ # simulate old request.REQUEST for backwards compatibility
+ data = QueryDict(query_string=None, mutable=True)
+ data.update(request.GET)
+ data.update(request.POST)
+ return data
def login_begin(request, template_name='openid/login.html',
@@ -153,7 +162,8 @@ def login_begin(request, template_name='openid/login.html',
render_failure=default_render_failure,
redirect_field_name=REDIRECT_FIELD_NAME):
"""Begin an OpenID login request, possibly asking for an identity URL."""
- redirect_to = request.REQUEST.get(redirect_field_name, '')
+ data = get_request_data(request)
+ redirect_to = data.get(redirect_field_name, '')
# Get the OpenID URL to try. First see if we've been configured
# to use a fixed server URL.
@@ -169,10 +179,12 @@ def login_begin(request, template_name='openid/login.html',
# Invalid or no form data:
if openid_url is None:
- context = {'form': login_form, redirect_field_name: redirect_to}
- return render_to_response(
- template_name, context,
- context_instance=RequestContext(request))
+ context = RequestContext(request)
+ context.update({
+ 'form': login_form,
+ redirect_field_name: redirect_to,
+ })
+ return render_to_response(template_name, context)
consumer = make_consumer(request)
try:
@@ -268,7 +280,7 @@ def login_begin(request, template_name='openid/login.html',
# Django gives us Unicode, which is great. We must encode URI.
# urllib enforces str. We can't trust anything about the default
# encoding inside str(foo) , so we must explicitly make foo a str.
- return_to += urllib.urlencode(
+ return_to += urlencode(
{redirect_field_name: redirect_to.encode("UTF-8")})
return render_openid_request(request, openid_request, return_to)
@@ -277,7 +289,8 @@ def login_begin(request, template_name='openid/login.html',
@csrf_exempt
def login_complete(request, redirect_field_name=REDIRECT_FIELD_NAME,
render_failure=None):
- redirect_to = request.REQUEST.get(redirect_field_name, '')
+ data = get_request_data(request)
+ redirect_to = data.get(redirect_field_name, '')
render_failure = (
render_failure or getattr(settings, 'OPENID_RENDER_FAILURE', None) or
default_render_failure)
@@ -290,8 +303,9 @@ def login_complete(request, redirect_field_name=REDIRECT_FIELD_NAME,
if openid_response.status == SUCCESS:
try:
user = authenticate(openid_response=openid_response)
- except DjangoOpenIDException, e:
- return render_failure(request, e.message, exception=e)
+ except DjangoOpenIDException as e:
+ return render_failure(
+ request, getattr(e, 'message', str(e)), exception=e)
if user is not None:
if user.is_active:
@@ -325,6 +339,7 @@ def logo(request):
OPENID_LOGO_BASE_64.decode('base64'), mimetype='image/gif'
)
+
# Logo from http://openid.net/login-bg.gif
# Embedded here for convenience; you should serve this as a static file
OPENID_LOGO_BASE_64 = """
diff --git a/example_consumer/settings.py b/example_consumer/settings.py
index bd5e5cb..e19e2b3 100644
--- a/example_consumer/settings.py
+++ b/example_consumer/settings.py
@@ -54,6 +54,24 @@ SECRET_KEY = '34958734985734985734985798437'
DEBUG = True
TEMPLATE_DEBUG = True
+TEMPLATES = [
+ {
+ 'BACKEND': 'django.template.backends.django.DjangoTemplates',
+ 'DIRS': [],
+ 'APP_DIRS': True,
+ 'OPTIONS': {
+ 'context_processors': [
+ 'django.contrib.auth.context_processors.auth',
+ 'django.template.context_processors.debug',
+ 'django.template.context_processors.i18n',
+ 'django.template.context_processors.media',
+ 'django.template.context_processors.static',
+ 'django.template.context_processors.tz',
+ 'django.contrib.messages.context_processors.messages',
+ ]
+ }
+ }
+]
ALLOWED_HOSTS = []
diff --git a/example_consumer/urls.py b/example_consumer/urls.py
index 40cb233..3c8f586 100644
--- a/example_consumer/urls.py
+++ b/example_consumer/urls.py
@@ -27,20 +27,20 @@
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
-from django.conf.urls import patterns, include, url
+from django.conf.urls import include, url
from django.contrib import admin
+from django.contrib.auth import views as auth_views
-import views
+from example_consumer import views
admin.autodiscover()
-urlpatterns = patterns(
- '',
+urlpatterns = [
url(r'^$', views.index),
url(r'^openid/', include('django_openid_auth.urls')),
- url(r'^logout/$', 'django.contrib.auth.views.logout'),
+ url(r'^logout/$', auth_views.logout),
url(r'^private/$', views.require_authentication),
url(r'^admin/', include(admin.site.urls)),
-)
+]
diff --git a/setup.py b/setup.py
index ebc2e24..c73e435 100644
--- a/setup.py
+++ b/setup.py
@@ -39,21 +39,28 @@ library also includes the following features:
info.
"""
+import sys
+
from setuptools import find_packages, setup
+PY3 = sys.version_info.major >= 3
+
description, long_description = __doc__.split('\n\n', 1)
-VERSION = '0.8'
+VERSION = '0.14'
+
+install_requires = ['django>=1.6', 'six']
+if PY3:
+ install_requires.append('python3-openid')
+else:
+ install_requires.append('python-openid>=2.2.0')
setup(
name='django-openid-auth',
version=VERSION,
packages=find_packages(),
- install_requires=[
- 'django>=1.5',
- 'python-openid>=2.2.0',
- ],
+ install_requires=install_requires,
package_data={
'django_openid_auth': ['templates/openid/*.html'],
},
diff --git a/tox.ini b/tox.ini
index aba7e72..d980d1b 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,35 +1,47 @@
[tox]
envlist =
- py2.7-django1.5, py2.7-django1.6, py2.7-django1.7, py2.7-django1.8
+ py27-django{1.8,1.9,1.10}
+ # py3-django{1.11}
[testenv]
commands = python manage.py test django_openid_auth
-deps=
+deps =
mock
- python-openid
-[testenv:py2.7-django1.5]
+[testenv:py27]
basepython = python2.7
deps =
- django >= 1.5, < 1.6
+ python-openid
{[testenv]deps}
- south==1.0
-[testenv:py2.7-django1.6]
-basepython = python2.7
+[testenv:py3]
+basepython = python3
deps =
- django >= 1.6, < 1.7
+ python3-openid
{[testenv]deps}
- south==1.0
-[testenv:py2.7-django1.7]
-basepython = python2.7
+[testenv:py27-django1.8]
deps =
- django >= 1.7, < 1.8
- {[testenv]deps}
+ django >= 1.8, < 1.9
+ {[testenv:py27]deps}
-[testenv:py2.7-django1.8]
-basepython = python2.7
+[testenv:py27-django1.9]
deps =
- django >= 1.8, < 1.9
- {[testenv]deps}
+ django >= 1.9, < 1.10
+ {[testenv:py27]deps}
+
+[testenv:py27-django1.10]
+deps =
+ django >= 1.10, < 1.11
+ {[testenv:py27]deps}
+
+[testenv:py27-django1.11]
+deps =
+ django >= 1.11, < 2
+ {[testenv:py27]deps}
+
+
+[testenv:py3-django1.11]
+deps =
+ django >= 1.11, < 2
+ {[testenv:py3]deps}