Clean up user registration code

This commit is contained in:
rubenwardy 2024-06-02 21:17:15 +01:00
parent 4872ea9e6a
commit 5e122279ec
3 changed files with 77 additions and 55 deletions

@ -16,11 +16,9 @@
import datetime import datetime
from flask import Blueprint, abort from flask import Blueprint, abort, Response
from flask_babel import gettext from flask_babel import gettext
from app.logic.users import create_user
bp = Blueprint("github", __name__)
from flask import redirect, url_for, request, flash, jsonify, current_app from flask import redirect, url_for, request, flash, jsonify, current_app
from flask_login import current_user from flask_login import current_user
from sqlalchemy import or_, and_ from sqlalchemy import or_, and_
@ -30,6 +28,8 @@ from app.utils import abs_url_for, add_audit_log, login_user_set_active, is_safe
from app.blueprints.api.support import error, api_create_vcs_release from app.blueprints.api.support import error, api_create_vcs_release
import hmac, requests import hmac, requests
bp = Blueprint("github", __name__)
@bp.route("/github/start/") @bp.route("/github/start/")
def start(): def start():
@ -103,17 +103,12 @@ def callback(oauth_token):
# Sign up # Sign up
else: else:
existing_user = (User.query user = create_user(github_username, github_username, None, "GitHub")
.filter(or_(User.username == github_username, User.forums_username == github_username)) if isinstance(user, Response):
.first()) return user
if existing_user: elif user is None:
flash(gettext("Unable to create an account as the username is already taken. "
"If you meant to log in, you need to connect GitHub to your account first"), "danger")
return redirect(url_for("users.login")) return redirect(url_for("users.login"))
user = User(github_username, True)
db.session.add(user)
add_audit_log(AuditSeverity.USER, user, "Registered with GitHub, display name=" + user.display_name, add_audit_log(AuditSeverity.USER, user, "Registered with GitHub, display name=" + user.display_name,
url_for("users.profile", username=user.username)) url_for("users.profile", username=user.username))

@ -16,7 +16,7 @@
import datetime import datetime
from flask import redirect, abort, render_template, flash, request, url_for from flask import redirect, abort, render_template, flash, request, url_for, Response
from flask_babel import gettext, get_locale, lazy_gettext from flask_babel import gettext, get_locale, lazy_gettext
from flask_login import current_user, login_required, logout_user, login_user from flask_login import current_user, login_required, logout_user, login_user
from flask_wtf import FlaskForm from flask_wtf import FlaskForm
@ -26,10 +26,10 @@ from wtforms.validators import InputRequired, Length, Regexp, DataRequired, Opti
from app.tasks.emails import send_verify_email, send_anon_email, send_unsubscribe_verify, send_user_email from app.tasks.emails import send_verify_email, send_anon_email, send_unsubscribe_verify, send_user_email
from app.utils import random_string, make_flask_login_password, is_safe_url, check_password_hash, add_audit_log, \ from app.utils import random_string, make_flask_login_password, is_safe_url, check_password_hash, add_audit_log, \
nonempty_or_none, post_login, is_username_valid nonempty_or_none, post_login
from . import bp from . import bp
from app.models import User, AuditSeverity, db, UserRank, PackageAlias, EmailSubscription, UserNotificationPreferences, \ from app.models import User, AuditSeverity, db, EmailSubscription, UserEmailVerification
UserEmailVerification from app.logic.users import create_user
class LoginForm(FlaskForm): class LoginForm(FlaskForm):
@ -113,46 +113,13 @@ def handle_register(form):
flash(gettext("Incorrect captcha answer"), "danger") flash(gettext("Incorrect captcha answer"), "danger")
return return
if not is_username_valid(form.username.data): user = create_user(form.username.data, form.display_name.data, form.email.data)
flash(gettext("Username is invalid")) if isinstance(user, Response):
return user
elif user is None:
return return
user_by_name = User.query.filter(or_( user.password = make_flask_login_password(form.password.data)
User.username == form.username.data,
User.username == form.display_name.data,
User.display_name == form.display_name.data,
User.forums_username == form.username.data,
User.github_username == form.username.data)).first()
if user_by_name:
if user_by_name.rank == UserRank.NOT_JOINED and user_by_name.forums_username:
flash(gettext("An account already exists for that username but hasn't been claimed yet."), "danger")
return redirect(url_for("users.claim_forums", username=user_by_name.forums_username))
else:
flash(gettext("That username/display name is already in use, please choose another."), "danger")
return
alias_by_name = PackageAlias.query.filter(or_(
PackageAlias.author==form.username.data,
PackageAlias.author==form.display_name.data)).first()
if alias_by_name:
flash(gettext("That username/display name is already in use, please choose another."), "danger")
return
user_by_email = User.query.filter_by(email=form.email.data).first()
if user_by_email:
send_anon_email.delay(form.email.data, get_locale().language, gettext("Email already in use"),
gettext("We were unable to create the account as the email is already in use by %(display_name)s. Try a different email address.",
display_name=user_by_email.display_name))
return redirect(url_for("users.email_sent"))
elif EmailSubscription.query.filter_by(email=form.email.data, blacklisted=True).count() > 0:
flash(gettext("That email address has been unsubscribed/blacklisted, and cannot be used"), "danger")
return
user = User(form.username.data, False, form.email.data, make_flask_login_password(form.password.data))
user.notification_preferences = UserNotificationPreferences(user)
if form.display_name.data:
user.display_name = form.display_name.data
db.session.add(user)
add_audit_log(AuditSeverity.USER, user, "Registered with email, display name=" + user.display_name, add_audit_log(AuditSeverity.USER, user, "Registered with email, display name=" + user.display_name,
url_for("users.profile", username=user.username)) url_for("users.profile", username=user.username))

60
app/logic/users.py Normal file

@ -0,0 +1,60 @@
from typing import Optional
from flask import flash, redirect, url_for
from flask_babel import gettext, get_locale
from sqlalchemy import or_
from werkzeug import Response
from app.models import User, UserRank, PackageAlias, EmailSubscription, UserNotificationPreferences, db
from app.utils import is_username_valid
from app.tasks.emails import send_anon_email
def create_user(username: str, display_name: str, email: Optional[str], oauth_provider: Optional[str] = None) -> None | Response | User:
if not is_username_valid(username):
flash(gettext("Username is invalid"))
return
user_by_name = User.query.filter(or_(
User.username == username,
User.username == display_name,
User.display_name == display_name,
User.forums_username == username,
User.github_username == username)).first()
if user_by_name:
if user_by_name.rank == UserRank.NOT_JOINED and user_by_name.forums_username:
flash(gettext("An account already exists for that username but hasn't been claimed yet."), "danger")
return redirect(url_for("users.claim_forums", username=user_by_name.forums_username))
elif oauth_provider:
flash(gettext("Unable to create an account as the username is already taken. "
"If you meant to log in, you need to connect %(provider)s to your account first", provider=oauth_provider), "danger")
return
else:
flash(gettext("That username/display name is already in use, please choose another."), "danger")
return
alias_by_name = (PackageAlias.query
.filter(or_(PackageAlias.author == username, PackageAlias.author == display_name))
.first())
if alias_by_name:
flash(gettext("Unable to create an account as the username was used in the past."), "danger")
return
if email:
user_by_email = User.query.filter_by(email=email).first()
if user_by_email:
send_anon_email.delay(email, get_locale().language, gettext("Email already in use"),
gettext("We were unable to create the account as the email is already in use by %(display_name)s. Try a different email address.",
display_name=user_by_email.display_name))
return redirect(url_for("users.email_sent"))
elif EmailSubscription.query.filter_by(email=email, blacklisted=True).count() > 0:
flash(gettext("That email address has been unsubscribed/blacklisted, and cannot be used"), "danger")
return
user = User(username, False, email)
user.notification_preferences = UserNotificationPreferences(user)
if display_name:
user.display_name = display_name
db.session.add(user)
return user