# ContentDB # Copyright (C) 2023 rubenwardy # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from flask import redirect, abort, render_template, request, flash, url_for from flask_babel import gettext, get_locale, lazy_gettext from flask_login import current_user, login_required, logout_user from flask_wtf import FlaskForm from sqlalchemy import or_ from wtforms import StringField, SubmitField, BooleanField, SelectField from wtforms.validators import Length, Optional, Email, URL from app.models import User, AuditSeverity, db, UserRank, PackageAlias, EmailSubscription, UserNotificationPreferences, \ UserEmailVerification, Permission, NotificationType, UserBan from app.tasks.emails import send_verify_email from app.utils import nonempty_or_none, add_audit_log, random_string, rank_required, has_blocked_domains from . import bp def get_setting_tabs(user): ret = [ { "id": "edit_profile", "title": gettext("Edit Profile"), "url": url_for("users.profile_edit", username=user.username) }, { "id": "account", "title": gettext("Account and Security"), "url": url_for("users.account", username=user.username) }, { "id": "notifications", "title": gettext("Email and Notifications"), "url": url_for("users.email_notifications", username=user.username) }, { "id": "api_tokens", "title": gettext("API Tokens"), "url": url_for("api.list_tokens", username=user.username) }, ] if user.check_perm(current_user, Permission.CREATE_OAUTH_CLIENT): ret.append({ "id": "oauth_clients", "title": gettext("OAuth2 Applications"), "url": url_for("oauth.list_clients", username=user.username) }) if current_user.rank.at_least(UserRank.MODERATOR): ret.append({ "id": "modtools", "title": gettext("Moderator Tools"), "url": url_for("users.modtools", username=user.username) }) return ret class UserProfileForm(FlaskForm): display_name = StringField(lazy_gettext("Display Name"), [Optional(), Length(1, 20)], filters=[lambda x: nonempty_or_none(x.strip())]) website_url = StringField(lazy_gettext("Website URL"), [Optional(), URL()], filters = [lambda x: x or None]) donate_url = StringField(lazy_gettext("Donation URL"), [Optional(), URL()], filters = [lambda x: x or None]) submit = SubmitField(lazy_gettext("Save")) def handle_profile_edit(form: UserProfileForm, user: User, username: str): severity = AuditSeverity.NORMAL if current_user == user else AuditSeverity.MODERATION add_audit_log(severity, current_user, "Edited {}'s profile".format(user.display_name), url_for("users.profile", username=username)) display_name = form.display_name.data or user.username if user.check_perm(current_user, Permission.CHANGE_DISPLAY_NAME) and \ user.display_name != display_name: if User.query.filter(User.id != user.id, or_(User.username == display_name, User.display_name.ilike(display_name))).count() > 0: flash(gettext("A user already has that name"), "danger") return None alias_by_name = PackageAlias.query.filter(or_( PackageAlias.author == display_name)).first() if alias_by_name: flash(gettext("A user already has that name"), "danger") return user.display_name = display_name severity = AuditSeverity.USER if current_user == user else AuditSeverity.MODERATION add_audit_log(severity, current_user, "Changed display name of {} to {}" .format(user.username, user.display_name), url_for("users.profile", username=username)) if user.check_perm(current_user, Permission.CHANGE_PROFILE_URLS): if has_blocked_domains(form.website_url.data, current_user.username, f"{user.username}'s website_url") or \ has_blocked_domains(form.donate_url.data, current_user.username, f"{user.username}'s donate_url"): flash(gettext("Linking to blocked sites is not allowed"), "danger") return user.website_url = form.website_url.data user.donate_url = form.donate_url.data db.session.commit() return redirect(url_for("users.profile", username=username)) @bp.route("/users//settings/profile/", methods=["GET", "POST"]) @login_required def profile_edit(username): user : User = User.query.filter_by(username=username).first() if not user: abort(404) if not user.can_see_edit_profile(current_user): flash(gettext("Permission denied"), "danger") return redirect(url_for("users.profile", username=username)) form = UserProfileForm(obj=user) if form.validate_on_submit(): ret = handle_profile_edit(form, user, username) if ret: return ret # Process GET or invalid POST return render_template("users/profile_edit.html", user=user, form=form, tabs=get_setting_tabs(user), current_tab="edit_profile") def make_settings_form(): attrs = { "email": StringField(lazy_gettext("Email"), [Optional(), Email()]), "submit": SubmitField(lazy_gettext("Save")) } for notificationType in NotificationType: key = "pref_" + notificationType.to_name() attrs[key] = BooleanField("") attrs[key + "_digest"] = BooleanField("") return type("SettingsForm", (FlaskForm,), attrs) SettingsForm = make_settings_form() def handle_email_notifications(user, prefs: UserNotificationPreferences, is_new, form): for notificationType in NotificationType: field_email = getattr(form, "pref_" + notificationType.to_name()).data field_digest = getattr(form, "pref_" + notificationType.to_name() + "_digest").data or field_email prefs.set_can_email(notificationType, field_email) prefs.set_can_digest(notificationType, field_digest) if is_new: db.session.add(prefs) if user.check_perm(current_user, Permission.CHANGE_EMAIL): newEmail = form.email.data if newEmail and newEmail != user.email and newEmail.strip() != "": if EmailSubscription.query.filter_by(email=form.email.data, blacklisted=True).count() > 0: flash(gettext("That email address has been unsubscribed/blacklisted, and cannot be used"), "danger") return token = random_string(32) severity = AuditSeverity.NORMAL if current_user == user else AuditSeverity.MODERATION msg = "Changed email of {}".format(user.display_name) add_audit_log(severity, current_user, msg, url_for("users.profile", username=user.username)) ver = UserEmailVerification() ver.user = user ver.token = token ver.email = newEmail db.session.add(ver) db.session.commit() send_verify_email.delay(newEmail, token, get_locale().language) return redirect(url_for("users.email_sent")) db.session.commit() return redirect(url_for("users.email_notifications", username=user.username)) @bp.route("/user/settings/email/") @bp.route("/users//settings/email/", methods=["GET", "POST"]) @login_required def email_notifications(username=None): if username is None: return redirect(url_for("users.email_notifications", username=current_user.username)) user: User = User.query.filter_by(username=username).first() if not user: abort(404) if not user.check_perm(current_user, Permission.CHANGE_EMAIL): abort(403) is_new = False prefs = user.notification_preferences if prefs is None: is_new = True prefs = UserNotificationPreferences(user) data = {} types = [] for notificationType in NotificationType: types.append(notificationType) data["pref_" + notificationType.to_name()] = prefs.get_can_email(notificationType) data["pref_" + notificationType.to_name() + "_digest"] = prefs.get_can_digest(notificationType) data["email"] = user.email form = SettingsForm(data=data) if form.validate_on_submit(): ret = handle_email_notifications(user, prefs, is_new, form) if ret: return ret return render_template("users/settings_email.html", form=form, user=user, types=types, is_new=is_new, tabs=get_setting_tabs(user), current_tab="notifications") @bp.route("/users//settings/account/") @login_required def account(username): user : User = User.query.filter_by(username=username).first() if not user: abort(404) return render_template("users/account.html", user=user, tabs=get_setting_tabs(user), current_tab="account") @bp.route("/users//delete/", methods=["GET", "POST"]) @rank_required(UserRank.ADMIN) def delete(username): user: User = User.query.filter_by(username=username).first() if not user: abort(404) if user.rank.at_least(UserRank.MODERATOR): flash(gettext("Users with moderator rank or above cannot be deleted"), "danger") return redirect(url_for("users.account", username=username)) if request.method == "GET": return render_template("users/delete.html", user=user, can_delete=user.can_delete()) if "delete" in request.form and (user.can_delete() or current_user.rank.at_least(UserRank.ADMIN)): msg = "Deleted user {}".format(user.username) flash(msg, "success") add_audit_log(AuditSeverity.MODERATION, current_user, msg, None) if current_user.rank.at_least(UserRank.ADMIN): for pkg in user.packages.all(): pkg.review_thread = None db.session.delete(pkg) db.session.delete(user) elif "deactivate" in request.form: for reply in user.replies.all(): db.session.delete(reply) for thread in user.threads.all(): db.session.delete(thread) user.email = None if user.rank != UserRank.BANNED: user.rank = UserRank.NOT_JOINED msg = "Deactivated user {}".format(user.username) flash(msg, "success") add_audit_log(AuditSeverity.MODERATION, current_user, msg, None) else: assert False db.session.commit() if user == current_user: logout_user() return redirect(url_for("homepage.home")) class ModToolsForm(FlaskForm): username = StringField(lazy_gettext("Username"), [Optional(), Length(1, 50)]) display_name = StringField(lazy_gettext("Display name"), [Optional(), Length(2, 100)]) forums_username = StringField(lazy_gettext("Forums Username"), [Optional(), Length(2, 50)]) github_username = StringField(lazy_gettext("GitHub Username"), [Optional(), Length(2, 50)]) rank = SelectField(lazy_gettext("Rank"), [Optional()], choices=UserRank.choices(), coerce=UserRank.coerce, default=UserRank.NEW_MEMBER) submit = SubmitField(lazy_gettext("Save")) @bp.route("/users//modtools/", methods=["GET", "POST"]) @rank_required(UserRank.MODERATOR) def modtools(username): user: User = User.query.filter_by(username=username).first() if not user: abort(404) if not user.check_perm(current_user, Permission.CHANGE_EMAIL): abort(403) form = ModToolsForm(obj=user) if form.validate_on_submit(): severity = AuditSeverity.NORMAL if current_user == user else AuditSeverity.MODERATION add_audit_log(severity, current_user, "Edited {}'s account".format(user.display_name), url_for("users.profile", username=username)) # Copy form fields to user_profile fields if user.check_perm(current_user, Permission.CHANGE_USERNAMES): if user.username != form.username.data: for package in user.packages: alias = PackageAlias(user.username, package.name) package.aliases.append(alias) db.session.add(alias) user.username = form.username.data user.display_name = form.display_name.data user.forums_username = nonempty_or_none(form.forums_username.data) user.github_username = nonempty_or_none(form.github_username.data) if user.check_perm(current_user, Permission.CHANGE_RANK): new_rank = form["rank"].data if current_user.rank.at_least(new_rank): if new_rank != user.rank: user.rank = form["rank"].data msg = "Set rank of {} to {}".format(user.display_name, user.rank.get_title()) add_audit_log(AuditSeverity.MODERATION, current_user, msg, url_for("users.profile", username=username)) else: flash(gettext("Can't promote a user to a rank higher than yourself!"), "danger") db.session.commit() return redirect(url_for("users.modtools", username=username)) return render_template("users/modtools.html", user=user, form=form, tabs=get_setting_tabs(user), current_tab="modtools") @bp.route("/users//modtools/set-email/", methods=["POST"]) @rank_required(UserRank.MODERATOR) def modtools_set_email(username): user: User = User.query.filter_by(username=username).first() if not user: abort(404) if not user.check_perm(current_user, Permission.CHANGE_EMAIL): abort(403) user.email = request.form["email"] user.is_active = False token = random_string(32) add_audit_log(AuditSeverity.MODERATION, current_user, f"Set email and sent a password reset on {user.username}", url_for("users.profile", username=user.username), None) ver = UserEmailVerification() ver.user = user ver.token = token ver.email = user.email ver.is_password_reset = True db.session.add(ver) db.session.commit() send_verify_email.delay(user.email, token, user.locale or "en") flash(f"Set email and sent a password reset on {user.username}", "success") return redirect(url_for("users.modtools", username=username)) @bp.route("/users//modtools/ban/", methods=["POST"]) @rank_required(UserRank.MODERATOR) def modtools_ban(username): user: User = User.query.filter_by(username=username).first() if not user: abort(404) if not user.check_perm(current_user, Permission.CHANGE_RANK): abort(403) message = request.form["message"] expires_at = request.form.get("expires_at") user.ban = UserBan() user.ban.banned_by = current_user user.ban.message = message if expires_at and expires_at != "": user.ban.expires_at = expires_at else: user.rank = UserRank.BANNED add_audit_log(AuditSeverity.MODERATION, current_user, f"Banned {user.username}, expires {user.ban.expires_at or '-'}, message: {message}", url_for("users.profile", username=user.username), None) db.session.commit() flash(f"Banned {user.username}", "success") return redirect(url_for("users.modtools", username=username)) @bp.route("/users//modtools/unban/", methods=["POST"]) @rank_required(UserRank.MODERATOR) def modtools_unban(username): user: User = User.query.filter_by(username=username).first() if not user: abort(404) if not user.check_perm(current_user, Permission.CHANGE_RANK): abort(403) if user.ban: db.session.delete(user.ban) if user.rank == UserRank.BANNED: user.rank = UserRank.MEMBER add_audit_log(AuditSeverity.MODERATION, current_user, f"Unbanned {user.username}", url_for("users.profile", username=user.username), None) db.session.commit() flash(f"Unbanned {user.username}", "success") return redirect(url_for("users.modtools", username=username))