mirror of
https://github.com/minetest/contentdb.git
synced 2024-12-23 14:32:25 +01:00
303 lines
10 KiB
Python
303 lines
10 KiB
Python
from flask import *
|
|
from flask_login import current_user, login_required, logout_user
|
|
from flask_wtf import FlaskForm
|
|
from sqlalchemy import or_
|
|
from wtforms import *
|
|
from wtforms.validators import *
|
|
|
|
from app.models import *
|
|
from app.utils import nonEmptyOrNone, addAuditLog, randomString, rank_required
|
|
from app.tasks.emails import send_verify_email
|
|
from . import bp
|
|
|
|
|
|
def get_setting_tabs(user):
|
|
return [
|
|
{
|
|
"id": "edit_profile",
|
|
"title": "Edit Profile",
|
|
"url": url_for("users.profile_edit", username=user.username)
|
|
},
|
|
{
|
|
"id": "account",
|
|
"title": "Account and Security",
|
|
"url": url_for("users.account", username=user.username)
|
|
},
|
|
{
|
|
"id": "notifications",
|
|
"title": "Email and Notifications",
|
|
"url": url_for("users.email_notifications", username=user.username)
|
|
},
|
|
{
|
|
"id": "api_tokens",
|
|
"title": "API Tokens",
|
|
"url": url_for("api.list_tokens", username=user.username)
|
|
},
|
|
]
|
|
|
|
|
|
class UserProfileForm(FlaskForm):
|
|
display_name = StringField("Display Name", [Optional(), Length(1, 20)], filters=[lambda x: nonEmptyOrNone(x)])
|
|
website_url = StringField("Website URL", [Optional(), URL()], filters = [lambda x: x or None])
|
|
donate_url = StringField("Donation URL", [Optional(), URL()], filters = [lambda x: x or None])
|
|
submit = SubmitField("Save")
|
|
|
|
|
|
def handle_profile_edit(form, user, username):
|
|
severity = AuditSeverity.NORMAL if current_user == user else AuditSeverity.MODERATION
|
|
addAuditLog(severity, current_user, "Edited {}'s profile".format(user.display_name),
|
|
url_for("users.profile", username=username))
|
|
|
|
if user.checkPerm(current_user, Permission.CHANGE_DISPLAY_NAME) and \
|
|
user.display_name != form.display_name.data:
|
|
if User.query.filter(User.id != user.id,
|
|
or_(User.username == form.display_name.data,
|
|
User.display_name.ilike(form.display_name.data))).count() > 0:
|
|
flash("A user already has that name", "danger")
|
|
return None
|
|
|
|
alias_by_name = PackageAlias.query.filter(or_(
|
|
PackageAlias.author == form.display_name.data)).first()
|
|
if alias_by_name:
|
|
flash("A user already has that name", "danger")
|
|
return
|
|
|
|
user.display_name = form.display_name.data
|
|
|
|
severity = AuditSeverity.USER if current_user == user else AuditSeverity.MODERATION
|
|
addAuditLog(severity, current_user, "Changed display name of {} to {}"
|
|
.format(user.username, user.display_name),
|
|
url_for("users.profile", username=username))
|
|
|
|
if user.checkPerm(current_user, Permission.CHANGE_PROFILE_URLS):
|
|
user.website_url = form["website_url"].data
|
|
user.donate_url = form["donate_url"].data
|
|
|
|
db.session.commit()
|
|
|
|
return redirect(url_for("users.profile", username=username))
|
|
|
|
|
|
@bp.route("/users/<username>/settings/profile/", methods=["GET", "POST"])
|
|
@login_required
|
|
def profile_edit(username):
|
|
user : User = User.query.filter_by(username=username).first()
|
|
if not user:
|
|
abort(404)
|
|
|
|
if not user.can_see_edit_profile(current_user):
|
|
flash("Permission denied", "danger")
|
|
return redirect(url_for("users.profile", username=username))
|
|
|
|
form = UserProfileForm(obj=user)
|
|
if form.validate_on_submit():
|
|
ret = handle_profile_edit(form, user, username)
|
|
if ret:
|
|
return ret
|
|
|
|
# Process GET or invalid POST
|
|
return render_template("users/profile_edit.html", user=user, form=form, tabs=get_setting_tabs(user), current_tab="edit_profile")
|
|
|
|
|
|
def make_settings_form():
|
|
attrs = {
|
|
"email": StringField("Email", [Optional(), Email()]),
|
|
"submit": SubmitField("Save")
|
|
}
|
|
|
|
for notificationType in NotificationType:
|
|
key = "pref_" + notificationType.toName()
|
|
attrs[key] = BooleanField("")
|
|
attrs[key + "_digest"] = BooleanField("")
|
|
|
|
return type("SettingsForm", (FlaskForm,), attrs)
|
|
|
|
SettingsForm = make_settings_form()
|
|
|
|
|
|
def handle_email_notifications(user, prefs: UserNotificationPreferences, is_new, form):
|
|
for notificationType in NotificationType:
|
|
field_email = getattr(form, "pref_" + notificationType.toName()).data
|
|
field_digest = getattr(form, "pref_" + notificationType.toName() + "_digest").data or field_email
|
|
prefs.set_can_email(notificationType, field_email)
|
|
prefs.set_can_digest(notificationType, field_digest)
|
|
|
|
if is_new:
|
|
db.session.add(prefs)
|
|
|
|
if user.checkPerm(current_user, Permission.CHANGE_EMAIL):
|
|
newEmail = form.email.data
|
|
if newEmail and newEmail != user.email and newEmail.strip() != "":
|
|
if EmailSubscription.query.filter_by(email=form.email.data, blacklisted=True).count() > 0:
|
|
flash("That email address has been unsubscribed/blacklisted, and cannot be used", "danger")
|
|
return
|
|
|
|
token = randomString(32)
|
|
|
|
severity = AuditSeverity.NORMAL if current_user == user else AuditSeverity.MODERATION
|
|
|
|
msg = "Changed email of {}".format(user.display_name)
|
|
addAuditLog(severity, current_user, msg, url_for("users.profile", username=user.username))
|
|
|
|
ver = UserEmailVerification()
|
|
ver.user = user
|
|
ver.token = token
|
|
ver.email = newEmail
|
|
db.session.add(ver)
|
|
db.session.commit()
|
|
|
|
flash("Check your email to confirm it", "success")
|
|
|
|
send_verify_email.delay(newEmail, token)
|
|
return redirect(url_for("users.email_notifications", username=user.username))
|
|
|
|
db.session.commit()
|
|
return redirect(url_for("users.email_notifications", username=user.username))
|
|
|
|
|
|
@bp.route("/user/settings/email/")
|
|
@bp.route("/users/<username>/settings/email/", methods=["GET", "POST"])
|
|
@login_required
|
|
def email_notifications(username=None):
|
|
if username is None:
|
|
return redirect(url_for("users.email_notifications", username=current_user.username))
|
|
|
|
user: User = User.query.filter_by(username=username).first()
|
|
if not user:
|
|
abort(404)
|
|
|
|
if not user.checkPerm(current_user, Permission.CHANGE_EMAIL):
|
|
abort(403)
|
|
|
|
is_new = False
|
|
prefs = user.notification_preferences
|
|
if prefs is None:
|
|
is_new = True
|
|
prefs = UserNotificationPreferences(user)
|
|
|
|
data = {}
|
|
types = []
|
|
for notificationType in NotificationType:
|
|
types.append(notificationType)
|
|
data["pref_" + notificationType.toName()] = prefs.get_can_email(notificationType)
|
|
data["pref_" + notificationType.toName() + "_digest"] = prefs.get_can_digest(notificationType)
|
|
|
|
data["email"] = user.email
|
|
|
|
form = SettingsForm(data=data)
|
|
if form.validate_on_submit():
|
|
ret = handle_email_notifications(user, prefs, is_new, form)
|
|
if ret:
|
|
return ret
|
|
|
|
return render_template("users/settings_email.html",
|
|
form=form, user=user, types=types, is_new=is_new,
|
|
tabs=get_setting_tabs(user), current_tab="notifications")
|
|
|
|
|
|
class UserAccountForm(FlaskForm):
|
|
username = StringField("Username", [Optional(), Length(1, 50)])
|
|
display_name = StringField("Display name", [Optional(), Length(2, 100)])
|
|
forums_username = StringField("Forums Username", [Optional(), Length(2, 50)])
|
|
github_username = StringField("GitHub Username", [Optional(), Length(2, 50)])
|
|
rank = SelectField("Rank", [Optional()], choices=UserRank.choices(), coerce=UserRank.coerce,
|
|
default=UserRank.NEW_MEMBER)
|
|
submit = SubmitField("Save")
|
|
|
|
|
|
@bp.route("/users/<username>/settings/account/", methods=["GET", "POST"])
|
|
@login_required
|
|
def account(username):
|
|
user : User = User.query.filter_by(username=username).first()
|
|
if not user:
|
|
abort(404)
|
|
|
|
if not user.can_see_edit_profile(current_user):
|
|
flash("Permission denied", "danger")
|
|
return redirect(url_for("users.profile", username=username))
|
|
|
|
can_edit_account_settings = user.checkPerm(current_user, Permission.CHANGE_USERNAMES) or \
|
|
user.checkPerm(current_user, Permission.CHANGE_RANK)
|
|
form = UserAccountForm(obj=user) if can_edit_account_settings else None
|
|
if form and form.validate_on_submit():
|
|
severity = AuditSeverity.NORMAL if current_user == user else AuditSeverity.MODERATION
|
|
addAuditLog(severity, current_user, "Edited {}'s profile".format(user.display_name),
|
|
url_for("users.profile", username=username))
|
|
|
|
# Copy form fields to user_profile fields
|
|
if user.checkPerm(current_user, Permission.CHANGE_USERNAMES):
|
|
if user.username != form.username.data:
|
|
for package in user.packages:
|
|
alias = PackageAlias(user.username, package.name)
|
|
package.aliases.append(alias)
|
|
db.session.add(alias)
|
|
|
|
user.username = form.username.data
|
|
|
|
user.display_name = form.display_name.data
|
|
user.forums_username = nonEmptyOrNone(form.forums_username.data)
|
|
user.github_username = nonEmptyOrNone(form.github_username.data)
|
|
|
|
if user.checkPerm(current_user, Permission.CHANGE_RANK):
|
|
newRank = form["rank"].data
|
|
if current_user.rank.atLeast(newRank):
|
|
if newRank != user.rank:
|
|
user.rank = form["rank"].data
|
|
msg = "Set rank of {} to {}".format(user.display_name, user.rank.getTitle())
|
|
addAuditLog(AuditSeverity.MODERATION, current_user, msg,
|
|
url_for("users.profile", username=username))
|
|
else:
|
|
flash("Can't promote a user to a rank higher than yourself!", "danger")
|
|
|
|
db.session.commit()
|
|
|
|
return redirect(url_for("users.account", username=username))
|
|
|
|
return render_template("users/account.html", user=user, form=form, tabs=get_setting_tabs(user), current_tab="account")
|
|
|
|
|
|
@bp.route("/users/<username>/delete/", methods=["GET", "POST"])
|
|
@rank_required(UserRank.ADMIN)
|
|
def delete(username):
|
|
user: User = User.query.filter_by(username=username).first()
|
|
if not user:
|
|
abort(404)
|
|
|
|
if user.rank.atLeast(UserRank.MODERATOR):
|
|
flash("Users with moderator rank or above cannot be deleted", "danger")
|
|
return redirect(url_for("users.account", username=username))
|
|
|
|
if request.method == "GET":
|
|
return render_template("users/delete.html", user=user, can_delete=user.can_delete())
|
|
|
|
if "delete" in request.form and (user.can_delete() or current_user.rank.atLeast(UserRank.ADMIN)):
|
|
msg = "Deleted user {}".format(user.username)
|
|
flash(msg, "success")
|
|
addAuditLog(AuditSeverity.MODERATION, current_user, msg, None)
|
|
|
|
if current_user.rank.atLeast(UserRank.ADMIN):
|
|
for pkg in user.packages.all():
|
|
pkg.review_thread = None
|
|
db.session.delete(pkg)
|
|
|
|
db.session.delete(user)
|
|
elif "deactivate" in request.form:
|
|
user.replies.delete()
|
|
for thread in user.threads.all():
|
|
db.session.delete(thread)
|
|
user.email = None
|
|
user.rank = UserRank.NOT_JOINED
|
|
|
|
msg = "Deactivated user {}".format(user.username)
|
|
flash(msg, "success")
|
|
addAuditLog(AuditSeverity.MODERATION, current_user, msg, None)
|
|
else:
|
|
assert False
|
|
|
|
db.session.commit()
|
|
|
|
if user == current_user:
|
|
logout_user()
|
|
|
|
return redirect(url_for("homepage.home"))
|