Skip to content
124 changes: 124 additions & 0 deletions dj_rest_auth/registration/password_validations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import re
from typing import Union, List
from dataclasses import dataclass
from enum import Enum

class PasswordStrength(Enum):
WEAK = "weak"
MEDIUM = "medium"
STRONG = "strong"

@dataclass
class PasswordValidationResult:
is_valid: bool
errors: List[str]
strength: PasswordStrength

class PasswordValidator:
def __init__(self,
min_length: int = 8,
require_uppercase: bool = True,
require_lowercase: bool = True,
require_digits: bool = True,
require_special_chars: bool = True):

self.min_length = min_length
self.require_uppercase = require_uppercase
self.require_lowercase = require_lowercase
self.require_digits = require_digits
self.require_special_chars = require_special_chars

def validate_password(self, password1, password2) -> PasswordValidationResult:
errors = []

# Basic validation
if not password1 or not password2:
errors.append("Password fields cannot be empty")
return PasswordValidationResult(False, errors, PasswordStrength.WEAK)

# Check if passwords match
if password1 != password2:
errors.append("Passwords do not match")
return PasswordValidationResult(False, errors, PasswordStrength.WEAK)

# Length check
if len(password1) < self.min_length:
errors.append(f"Password must be at least {self.min_length} characters long")

# Uppercase check
if self.require_uppercase and not any(c.isupper() for c in password1):
errors.append("Password must contain at least one uppercase letter")

# Lowercase check
if self.require_lowercase and not any(c.islower() for c in password1):
errors.append("Password must contain at least one lowercase letter")

# Digit check
if self.require_digits and not any(c.isdigit() for c in password1):
errors.append("Password must contain at least one number")

# Special character check
if self.require_special_chars:
special_chars = re.compile(r'[!@#$%^&*(),.?":{}|<>]')
if not special_chars.search(password1):
errors.append("Password must contain at least one special character")


# Sequential characters check
if self._has_sequential_chars(password1):
errors.append("Password contains sequential characters")

# Determine password strength
strength = self._calculate_strength(password1)

return PasswordValidationResult

def _has_sequential_chars(self, password: str) -> bool:
"""Check for sequential characters (e.g., 'abc', '123')"""
sequences = ('abcdefghijklmnopqrstuvwxyz', '0123456789')
lowercase_pass = password.lower()

for seq in sequences:
for i in range(len(seq) - 2):
if seq[i:i+3] in lowercase_pass:
return True
return False

def _calculate_strength(self, password: str) -> PasswordStrength:
"""Calculate password strength based on various factors"""
score = 0

# Length points (up to 5)
score += min(5, len(password) // 2)

# Character variety points
if any(c.isupper() for c in password):
score += 2
if any(c.islower() for c in password):
score += 2
if any(c.isdigit() for c in password):
score += 2
if any(not c.isalnum() for c in password):
score += 3

# Unique character points
score += min(3, len(set(password)) // 3)

match score:
case s if s >= 10:
return PasswordStrength.STRONG
case s if s >= 6:
return PasswordStrength.MEDIUM
case _:
return PasswordStrength.WEAK

# Example usage:
def validate_password(password1: str, password2: str) -> Union[str, List[str]]:
validator = PasswordValidator()
result = validator.validate_password(password1, password2)

if not result.is_valid:
return result.errors

return f"Password is valid (Strength: {result.strength.value})"

164 changes: 72 additions & 92 deletions dj_rest_auth/registration/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,7 @@ class SocialLoginSerializer(serializers.Serializer):
id_token = serializers.CharField(required=False, allow_blank=True)

def _get_request(self):
request = self.context.get('request')
if not isinstance(request, HttpRequest):
request = request._request
return request
return self.context.get('request', getattr(self.context.get('request'), '_request', None))

def get_social_login(self, adapter, app, token, response):
"""
Expand Down Expand Up @@ -75,7 +72,7 @@ def set_callback_url(self, view, adapter_class):
)
except NoReverseMatch:
raise serializers.ValidationError(
_('Define callback_url in view'),
({'callback_url': _('Define callback_url in view or ensure URL name exists.')})
)

def validate(self, attrs):
Expand All @@ -99,99 +96,97 @@ def validate(self, attrs):

access_token = attrs.get('access_token')
code = attrs.get('code')
# Case 1: We received the access_token

if access_token:
tokens_to_parse = {'access_token': access_token}
token = access_token
# For sign in with apple

id_token = attrs.get('id_token')
if id_token:
tokens_to_parse['id_token'] = id_token

# Case 2: We received the authorization code
tokens_to_parse['id_token'] = id_token
elif code:
self.set_callback_url(view=view, adapter_class=adapter_class)
self.client_class = getattr(view, 'client_class', None)

if not self.client_class:
raise serializers.ValidationError(
_('Define client_class in view'),
)

provider = adapter.get_provider()
scope = provider.get_scope_from_request(request)
client = self.client_class(
request,
app.client_id,
app.secret,
adapter.access_token_method,
adapter.access_token_url,
self.callback_url,
scope,
scope_delimiter=adapter.scope_delimiter,
headers=adapter.headers,
basic_auth=adapter.basic_auth,
)
try:
token = client.get_access_token(code)
except OAuth2Error as ex:
raise serializers.ValidationError(
_('Failed to exchange code for access token')
) from ex
access_token = token['access_token']
tokens_to_parse = {'access_token': access_token}

# If available we add additional data to the dictionary
for key in ['refresh_token', 'id_token', adapter.expires_in_key]:
if key in token:
tokens_to_parse[key] = token[key]
self._handle_code_flow(view, adapter_class, adapter, app, code, request, tokens_to_parse)
else:
raise serializers.ValidationError(
_('Incorrect input. access_token or code is required.'),
)

raise serializers.ValidationError(_('Incorrect input. access_token or code is required.'))

social_token = adapter.parse_token(tokens_to_parse)
social_token.app = app

try:
if adapter.provider_id == 'google' and not code:
login = self.get_social_login(adapter, app, social_token, response={'id_token': id_token})
else:
login = self.get_social_login(adapter, app, social_token, token)
ret = complete_social_login(request, login)
login = self._attempt_login(adapter, app, social_token, code, attrs)
except HTTPError:
raise serializers.ValidationError(_('Incorrect value'))


if isinstance(ret, HttpResponseBadRequest):
raise serializers.ValidationError(ret.content)
if isinstance(login, HttpResponseBadRequest):
raise serializers.ValidationError(login.content)

if not login.is_existing:
# We have an account already signed up in a different flow
# with the same email address: raise an exception.
# This needs to be handled in the frontend. We can not just
# link up the accounts due to security constraints
if allauth_account_settings.UNIQUE_EMAIL:
# Do we have an account already with this email address?
account_exists = get_user_model().objects.filter(
email=login.user.email,
).exists()
if account_exists:
raise serializers.ValidationError(
_('User is already registered with this e-mail address.'),
)

login.lookup()
try:
login.save(request, connect=True)
except IntegrityError as ex:
raise serializers.ValidationError(
_('User is already registered with this e-mail address.'),
) from ex
self.post_signup(login, attrs)
self._new_user_registration(login, request, attrs)

attrs['user'] = login.account.user

return attrs

def _handle_code_flow(self, view, adapter_class, adapter, app, code, request, tokens_to_parse):
"""Handles the auth flow when an authorization code is provided."""
self.set_callback_url(view=view, adapter_class=adapter_class)
self.client_class = getattr(view, 'client_class', None)

if not self.client_class:
raise serializers.ValidationError(_('Define client_class in view'))

provider = adapter.get_provider()
scope = provider.get_scope_from_request(request)
client = self.client_class(
request,
app.client_id,
app.secret,
adapter.access_token_method,
adapter.access_token_url,
self.callback_url,
scope,
scope_delimiter=adapter.scope_delimiter,
headers=adapter.headers,
basic_auth=adapter.basic_auth,
)
try:
token = client.get_access_token(code)
except OAuth2Error as ex:
raise serializers.ValidationError(
('Failed to exchange code for access token')
) from ex

access_token = token['access_token']
tokens_to_parse = {'access_token': access_token}
for key in ['refresh_token', 'id_token', adapter.expires_in_key]:
if key in token:
tokens_to_parse[key] = token[key]



def _attempt_login(self, adapter, app, social_token, code, attrs):
"""Attempts to log in the user using the adapter."""
if adapter.provider_id == 'google' and not code:
return self.get_social_login(adapter, app, social_token, response={'id_token': attrs.get('id_token')})
return self.get_social_login(adapter, app, social_token, token=attrs.get('access_token'))

def _new_user_registration(self, login, request, attrs):
"""Handles user registration if the user does not exist."""
if allauth_account_settings.UNIQUE_EMAIL:
account_exists = get_user_model().objects.filter(email=login.user.email).exists()
if account_exists:
raise serializers.ValidationError(_('User is already registered with this e-mail address.'))
login.lookup()
try:
login.save(request, connect=True)
except IntegrityError as ex:
raise serializers.ValidationError(
_('User is already registered with this e-mail address.'),
) from ex
self.post_signup(login, attrs)


def post_signup(self, login, attrs):
"""
Expand Down Expand Up @@ -244,21 +239,13 @@ def validate_email(self, email):
)
return email

def validate_password1(self, password):
return get_adapter().clean_password(password)

def validate(self, data):
if data['password1'] != data['password2']:
raise serializers.ValidationError(_("The two password fields didn't match."))
return data

def custom_signup(self, request, user):
pass

def get_cleaned_data(self):
return {
'username': self.validated_data.get('username', ''),
'password1': self.validated_data.get('password1', ''),
'email': self.validated_data.get('email', ''),
}

Expand All @@ -267,13 +254,6 @@ def save(self, request):
user = adapter.new_user(request)
self.cleaned_data = self.get_cleaned_data()
user = adapter.save_user(request, user, self, commit=False)
if "password1" in self.cleaned_data:
try:
adapter.clean_password(self.cleaned_data['password1'], user=user)
except DjangoValidationError as exc:
raise serializers.ValidationError(
detail=serializers.as_serializer_error(exc)
)
user.save()
self.custom_signup(request, user)
setup_user_email(request, user, [])
Expand Down