diff --git a/dj_rest_auth/registration/password_validations.py b/dj_rest_auth/registration/password_validations.py new file mode 100644 index 00000000..d97146dd --- /dev/null +++ b/dj_rest_auth/registration/password_validations.py @@ -0,0 +1,124 @@ +import re +from typing import Union, List +from dataclasses import dataclass +from enum import Enum + +class PasswordStrength(Enum): + WEAK = "weak" + MEDIUM = "medium" + STRONG = "strong" + +@dataclass +class PasswordValidationResult: + is_valid: bool + errors: List[str] + strength: PasswordStrength + +class PasswordValidator: + def __init__(self, + min_length: int = 8, + require_uppercase: bool = True, + require_lowercase: bool = True, + require_digits: bool = True, + require_special_chars: bool = True): + + self.min_length = min_length + self.require_uppercase = require_uppercase + self.require_lowercase = require_lowercase + self.require_digits = require_digits + self.require_special_chars = require_special_chars + + def validate_password(self, password1, password2) -> PasswordValidationResult: + errors = [] + + # Basic validation + if not password1 or not password2: + errors.append("Password fields cannot be empty") + return PasswordValidationResult(False, errors, PasswordStrength.WEAK) + + # Check if passwords match + if password1 != password2: + errors.append("Passwords do not match") + return PasswordValidationResult(False, errors, PasswordStrength.WEAK) + + # Length check + if len(password1) < self.min_length: + errors.append(f"Password must be at least {self.min_length} characters long") + + # Uppercase check + if self.require_uppercase and not any(c.isupper() for c in password1): + errors.append("Password must contain at least one uppercase letter") + + # Lowercase check + if self.require_lowercase and not any(c.islower() for c in password1): + errors.append("Password must contain at least one lowercase letter") + + # Digit check + if self.require_digits and not any(c.isdigit() for c in password1): + errors.append("Password must contain at least one number") + + # Special character check + if self.require_special_chars: + special_chars = re.compile(r'[!@#$%^&*(),.?":{}|<>]') + if not special_chars.search(password1): + errors.append("Password must contain at least one special character") + + + # Sequential characters check + if self._has_sequential_chars(password1): + errors.append("Password contains sequential characters") + + # Determine password strength + strength = self._calculate_strength(password1) + + return PasswordValidationResult + + def _has_sequential_chars(self, password: str) -> bool: + """Check for sequential characters (e.g., 'abc', '123')""" + sequences = ('abcdefghijklmnopqrstuvwxyz', '0123456789') + lowercase_pass = password.lower() + + for seq in sequences: + for i in range(len(seq) - 2): + if seq[i:i+3] in lowercase_pass: + return True + return False + + def _calculate_strength(self, password: str) -> PasswordStrength: + """Calculate password strength based on various factors""" + score = 0 + + # Length points (up to 5) + score += min(5, len(password) // 2) + + # Character variety points + if any(c.isupper() for c in password): + score += 2 + if any(c.islower() for c in password): + score += 2 + if any(c.isdigit() for c in password): + score += 2 + if any(not c.isalnum() for c in password): + score += 3 + + # Unique character points + score += min(3, len(set(password)) // 3) + + match score: + case s if s >= 10: + return PasswordStrength.STRONG + case s if s >= 6: + return PasswordStrength.MEDIUM + case _: + return PasswordStrength.WEAK + +# Example usage: +def validate_password(password1: str, password2: str) -> Union[str, List[str]]: + validator = PasswordValidator() + result = validator.validate_password(password1, password2) + + if not result.is_valid: + return result.errors + + return f"Password is valid (Strength: {result.strength.value})" + diff --git a/dj_rest_auth/registration/serializers.py b/dj_rest_auth/registration/serializers.py index ea9ababb..84b7d8bd 100644 --- a/dj_rest_auth/registration/serializers.py +++ b/dj_rest_auth/registration/serializers.py @@ -43,10 +43,7 @@ class SocialLoginSerializer(serializers.Serializer): id_token = serializers.CharField(required=False, allow_blank=True) def _get_request(self): - request = self.context.get('request') - if not isinstance(request, HttpRequest): - request = request._request - return request + return self.context.get('request', getattr(self.context.get('request'), '_request', None)) def get_social_login(self, adapter, app, token, response): """ @@ -75,7 +72,7 @@ def set_callback_url(self, view, adapter_class): ) except NoReverseMatch: raise serializers.ValidationError( - _('Define callback_url in view'), + ({'callback_url': _('Define callback_url in view or ensure URL name exists.')}) ) def validate(self, attrs): @@ -99,99 +96,97 @@ def validate(self, attrs): access_token = attrs.get('access_token') code = attrs.get('code') - # Case 1: We received the access_token + if access_token: tokens_to_parse = {'access_token': access_token} token = access_token - # For sign in with apple + id_token = attrs.get('id_token') if id_token: tokens_to_parse['id_token'] = id_token - - # Case 2: We received the authorization code + tokens_to_parse['id_token'] = id_token elif code: - self.set_callback_url(view=view, adapter_class=adapter_class) - self.client_class = getattr(view, 'client_class', None) - - if not self.client_class: - raise serializers.ValidationError( - _('Define client_class in view'), - ) - - provider = adapter.get_provider() - scope = provider.get_scope_from_request(request) - client = self.client_class( - request, - app.client_id, - app.secret, - adapter.access_token_method, - adapter.access_token_url, - self.callback_url, - scope, - scope_delimiter=adapter.scope_delimiter, - headers=adapter.headers, - basic_auth=adapter.basic_auth, - ) - try: - token = client.get_access_token(code) - except OAuth2Error as ex: - raise serializers.ValidationError( - _('Failed to exchange code for access token') - ) from ex - access_token = token['access_token'] - tokens_to_parse = {'access_token': access_token} - - # If available we add additional data to the dictionary - for key in ['refresh_token', 'id_token', adapter.expires_in_key]: - if key in token: - tokens_to_parse[key] = token[key] + self._handle_code_flow(view, adapter_class, adapter, app, code, request, tokens_to_parse) else: - raise serializers.ValidationError( - _('Incorrect input. access_token or code is required.'), - ) - + raise serializers.ValidationError(_('Incorrect input. access_token or code is required.')) + social_token = adapter.parse_token(tokens_to_parse) social_token.app = app try: - if adapter.provider_id == 'google' and not code: - login = self.get_social_login(adapter, app, social_token, response={'id_token': id_token}) - else: - login = self.get_social_login(adapter, app, social_token, token) - ret = complete_social_login(request, login) + login = self._attempt_login(adapter, app, social_token, code, attrs) except HTTPError: raise serializers.ValidationError(_('Incorrect value')) + - if isinstance(ret, HttpResponseBadRequest): - raise serializers.ValidationError(ret.content) + if isinstance(login, HttpResponseBadRequest): + raise serializers.ValidationError(login.content) if not login.is_existing: - # We have an account already signed up in a different flow - # with the same email address: raise an exception. - # This needs to be handled in the frontend. We can not just - # link up the accounts due to security constraints - if allauth_account_settings.UNIQUE_EMAIL: - # Do we have an account already with this email address? - account_exists = get_user_model().objects.filter( - email=login.user.email, - ).exists() - if account_exists: - raise serializers.ValidationError( - _('User is already registered with this e-mail address.'), - ) - - login.lookup() - try: - login.save(request, connect=True) - except IntegrityError as ex: - raise serializers.ValidationError( - _('User is already registered with this e-mail address.'), - ) from ex - self.post_signup(login, attrs) + self._new_user_registration(login, request, attrs) attrs['user'] = login.account.user return attrs + + def _handle_code_flow(self, view, adapter_class, adapter, app, code, request, tokens_to_parse): + """Handles the auth flow when an authorization code is provided.""" + self.set_callback_url(view=view, adapter_class=adapter_class) + self.client_class = getattr(view, 'client_class', None) + + if not self.client_class: + raise serializers.ValidationError(_('Define client_class in view')) + + provider = adapter.get_provider() + scope = provider.get_scope_from_request(request) + client = self.client_class( + request, + app.client_id, + app.secret, + adapter.access_token_method, + adapter.access_token_url, + self.callback_url, + scope, + scope_delimiter=adapter.scope_delimiter, + headers=adapter.headers, + basic_auth=adapter.basic_auth, + ) + try: + token = client.get_access_token(code) + except OAuth2Error as ex: + raise serializers.ValidationError( + ('Failed to exchange code for access token') + ) from ex + + access_token = token['access_token'] + tokens_to_parse = {'access_token': access_token} + for key in ['refresh_token', 'id_token', adapter.expires_in_key]: + if key in token: + tokens_to_parse[key] = token[key] + + + + def _attempt_login(self, adapter, app, social_token, code, attrs): + """Attempts to log in the user using the adapter.""" + if adapter.provider_id == 'google' and not code: + return self.get_social_login(adapter, app, social_token, response={'id_token': attrs.get('id_token')}) + return self.get_social_login(adapter, app, social_token, token=attrs.get('access_token')) + + def _new_user_registration(self, login, request, attrs): + """Handles user registration if the user does not exist.""" + if allauth_account_settings.UNIQUE_EMAIL: + account_exists = get_user_model().objects.filter(email=login.user.email).exists() + if account_exists: + raise serializers.ValidationError(_('User is already registered with this e-mail address.')) + login.lookup() + try: + login.save(request, connect=True) + except IntegrityError as ex: + raise serializers.ValidationError( + _('User is already registered with this e-mail address.'), + ) from ex + self.post_signup(login, attrs) + def post_signup(self, login, attrs): """ @@ -244,13 +239,6 @@ def validate_email(self, email): ) return email - def validate_password1(self, password): - return get_adapter().clean_password(password) - - def validate(self, data): - if data['password1'] != data['password2']: - raise serializers.ValidationError(_("The two password fields didn't match.")) - return data def custom_signup(self, request, user): pass @@ -258,7 +246,6 @@ def custom_signup(self, request, user): def get_cleaned_data(self): return { 'username': self.validated_data.get('username', ''), - 'password1': self.validated_data.get('password1', ''), 'email': self.validated_data.get('email', ''), } @@ -267,13 +254,6 @@ def save(self, request): user = adapter.new_user(request) self.cleaned_data = self.get_cleaned_data() user = adapter.save_user(request, user, self, commit=False) - if "password1" in self.cleaned_data: - try: - adapter.clean_password(self.cleaned_data['password1'], user=user) - except DjangoValidationError as exc: - raise serializers.ValidationError( - detail=serializers.as_serializer_error(exc) - ) user.save() self.custom_signup(request, user) setup_user_email(request, user, [])