Add ability to block domains

This commit is contained in:
rubenwardy 2023-01-03 12:17:01 +00:00
parent 13dcd373f2
commit 72b4029ed3
7 changed files with 141 additions and 104 deletions

@ -26,7 +26,8 @@ from wtforms import *
from wtforms.validators import *
from app.models import db, PackageReview, Thread, ThreadReply, NotificationType, PackageReviewVote, Package, UserRank, \
Permission, AuditSeverity, PackageState
from app.utils import is_package_page, addNotification, get_int_or_abort, isYes, is_safe_url, rank_required, addAuditLog
from app.utils import is_package_page, addNotification, get_int_or_abort, isYes, is_safe_url, rank_required, \
addAuditLog, has_blocked_domains
from app.tasks.webhooktasks import post_discord_webhook
@ -73,6 +74,9 @@ def review(package):
# Validate and submit
elif can_review and form.validate_on_submit():
if has_blocked_domains(form.comment.data, current_user.username, f"review of {package.getId()}"):
flash(gettext("Linking to malicious sites is not allowed."), "danger")
else:
was_new = False
if not review:
was_new = True
@ -217,7 +221,6 @@ def review_vote(package, review_id):
return redirect(review.thread.getViewURL())
@bp.route("/packages/<author>/<name>/review-votes/")
@rank_required(UserRank.ADMIN)
@is_package_page

@ -23,7 +23,7 @@ bp = Blueprint("threads", __name__)
from flask_login import current_user, login_required
from app.models import *
from app.utils import addNotification, isYes, addAuditLog, get_system_user, rank_required
from app.utils import addNotification, isYes, addAuditLog, get_system_user, rank_required, has_blocked_domains
from flask_wtf import FlaskForm
from wtforms import *
from wtforms.validators import *
@ -189,7 +189,7 @@ def edit_reply(id):
if reply_id is None:
abort(404)
reply = ThreadReply.query.get(reply_id)
reply: ThreadReply = ThreadReply.query.get(reply_id)
if reply is None or reply.thread != thread:
abort(404)
@ -199,7 +199,9 @@ def edit_reply(id):
form = CommentForm(formdata=request.form, obj=reply)
if form.validate_on_submit():
comment = form.comment.data
if has_blocked_domains(comment, current_user.username, f"edit to reply {reply.get_url(True)}"):
flash(gettext("Linking to malicious sites is not allowed."), "danger")
else:
msg = "Edited reply by {}".format(reply.author.display_name)
severity = AuditSeverity.NORMAL if current_user == reply.author else AuditSeverity.MODERATION
addNotification(reply.author, current_user, NotificationType.OTHER, msg, thread.getViewURL(), thread.package)
@ -230,6 +232,10 @@ def view(id):
flash(gettext("Please wait before commenting again"), "danger")
return redirect(thread.getViewURL())
if has_blocked_domains(comment, current_user.username, f"reply to {thread.getViewURL(True)}"):
flash(gettext("Linking to malicious sites is not allowed."), "danger")
return render_template("threads/view.html", thread=thread, form=form)
reply = ThreadReply()
reply.author = current_user
reply.comment = comment
@ -318,6 +324,9 @@ def new():
# Validate and submit
elif form.validate_on_submit():
if has_blocked_domains(form.comment.data, current_user.username, f"new thread"):
flash(gettext("Linking to malicious sites is not allowed."), "danger")
else:
thread = Thread()
thread.author = current_user
thread.title = form.title.data

@ -7,7 +7,7 @@ from wtforms import *
from wtforms.validators import *
from app.models import *
from app.utils import nonEmptyOrNone, addAuditLog, randomString, rank_required
from app.utils import nonEmptyOrNone, addAuditLog, randomString, rank_required, has_blocked_domains
from app.tasks.emails import send_verify_email
from . import bp
@ -53,7 +53,7 @@ class UserProfileForm(FlaskForm):
submit = SubmitField(lazy_gettext("Save"))
def handle_profile_edit(form, user, username):
def handle_profile_edit(form: UserProfileForm, user: User, username: str):
severity = AuditSeverity.NORMAL if current_user == user else AuditSeverity.MODERATION
addAuditLog(severity, current_user, "Edited {}'s profile".format(user.display_name),
url_for("users.profile", username=username))
@ -80,8 +80,13 @@ def handle_profile_edit(form, user, username):
url_for("users.profile", username=username))
if user.checkPerm(current_user, Permission.CHANGE_PROFILE_URLS):
user.website_url = form["website_url"].data
user.donate_url = form["donate_url"].data
if has_blocked_domains(form.website_url.data, current_user.username, f"{user.username}'s website_url") or \
has_blocked_domains(form.donate_url.data, current_user.username, f"{user.username}'s donate_url"):
flash(gettext("Linking to malicious sites is not allowed."), "danger")
return
user.website_url = form.website_url.data
user.donate_url = form.donate_url.data
db.session.commit()

@ -22,7 +22,7 @@ from flask_babel import lazy_gettext
from app.logic.LogicError import LogicError
from app.models import User, Package, PackageType, MetaPackage, Tag, ContentWarning, db, Permission, AuditSeverity, \
License, UserRank, PackageDevState
from app.utils import addAuditLog
from app.utils import addAuditLog, has_blocked_domains
from app.utils.url import clean_youtube_url
@ -118,6 +118,11 @@ def do_edit_package(user: User, package: Package, was_new: bool, was_web: bool,
validate(data)
for field in ["short_desc", "desc", "website", "issueTracker", "repo", "video_url"]:
if field in data and has_blocked_domains(data[field], user.username,
f"{field} of {package.getId()}"):
raise LogicError(403, lazy_gettext("Linking to malicious sites is not allowed."))
if "type" in data:
data["type"] = PackageType.coerce(data["type"])

@ -144,8 +144,8 @@ class ThreadReply(db.Model):
created_at = db.Column(db.DateTime, nullable=False, default=datetime.datetime.utcnow)
def get_url(self):
return url_for('threads.view', id=self.thread.id) + "#reply-" + str(self.id)
def get_url(self, absolute=False):
return self.thread.getViewURL(absolute) + "#reply-" + str(self.id)
def checkPerm(self, user, perm):
if not user.is_authenticated:

@ -20,6 +20,7 @@ import secrets
from .flask import *
from .models import *
from .user import *
from flask import current_app
YESES = ["yes", "true", "1", "on"]
@ -51,3 +52,19 @@ def shouldReturnJson():
def randomString(n):
return secrets.token_hex(int(n / 2))
def has_blocked_domains(text: str, username: str, location: str) -> bool:
if text is None:
return False
blocked_domains = current_app.config["BLOCKED_DOMAINS"]
for domain in blocked_domains:
if domain in text:
from app.tasks.webhooktasks import post_discord_webhook
post_discord_webhook.delay(username,
f"Attempted to post link to blocked domain {domain} in {location}",
True)
return True
return False

@ -34,6 +34,4 @@ DISCORD_WEBHOOK_QUEUE = None
TEMPLATES_AUTO_RELOAD = False
LOG_SQL = False
LANGUAGES = {
'en': 'English',
}
BLOCKED_DOMAINS = []