# ContentDB # Copyright (C) 2023 rubenwardy # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import re import typing from flask import Blueprint, request, redirect, render_template, flash, abort, url_for from flask_babel import lazy_gettext, gettext from flask_login import current_user, login_required from flask_wtf import FlaskForm from wtforms import StringField, BooleanField, SubmitField, FieldList, HiddenField from wtforms.validators import InputRequired, Length, Optional, Regexp from app.models import Collection, db, Package, Permission, CollectionPackage, User, UserRank, AuditSeverity from app.utils import is_package_page, nonempty_or_none, add_audit_log bp = Blueprint("collections", __name__) regex_invalid_chars = re.compile("[^a-z0-9_]") @bp.route("/collections/") @bp.route("/collections//") def list_all(author=None): if author: user = User.query.filter_by(username=author).one_or_404() query = user.collections else: user = None query = Collection.query.order_by(db.asc(Collection.title)) if "package" in request.args: package = Package.get_by_key(request.args["package"]) if package is None: abort(404) query = query.filter(Collection.packages.contains(package)) collections = [x for x in query.all() if x.check_perm(current_user, Permission.VIEW_COLLECTION)] return render_template("collections/list.html", user=user, collections=collections, noindex=user is None or len(collections) == 0) @bp.route("/collections///") def view(author, name): collection = Collection.query \ .filter(Collection.name == name, Collection.author.has(username=author)) \ .one_or_404() if not collection.check_perm(current_user, Permission.VIEW_COLLECTION): abort(404) items = collection.items if collection.check_perm(current_user, Permission.EDIT_COLLECTION): items = [x for x in items if x.package.check_perm(current_user, Permission.VIEW_PACKAGE)] return render_template("collections/view.html", collection=collection, items=items) class CollectionForm(FlaskForm): title = StringField(lazy_gettext("Title"), [InputRequired(), Length(3, 100)]) name = StringField("URL", [Optional(), Length(1, 20), Regexp("^[a-z0-9_]", 0, "Lower case letters (a-z), digits (0-9), and underscores (_) only")]) short_description = StringField(lazy_gettext("Short Description"), [Optional(), Length(0, 200)]) private = BooleanField(lazy_gettext("Private")) descriptions = FieldList( StringField(lazy_gettext("Short Description"), [Optional(), Length(0, 500)], filters=[nonempty_or_none]), min_entries=0) package_ids = FieldList(HiddenField(), min_entries=0) submit = SubmitField(lazy_gettext("Save")) @bp.route("/collections/new/", methods=["GET", "POST"]) @bp.route("/collections///edit/", methods=["GET", "POST"]) @login_required def create_edit(author=None, name=None): collection: typing.Optional[Collection] = None if author is not None and name is not None: collection = Collection.query \ .filter(Collection.name == name, Collection.author.has(username=author)) \ .one_or_404() if not collection.check_perm(current_user, Permission.EDIT_COLLECTION): abort(403) elif "author" in request.args: author = request.args["author"] if author != current_user.username and not current_user.rank.at_least(UserRank.EDITOR): abort(403) if author is None: author = current_user else: author = User.query.filter_by(username=author).one() form = CollectionForm(formdata=request.form, obj=collection) initial_packages = [] if "package" in request.args: for package_id in request.args.getlist("package"): package = Package.get_by_key(package_id) if package: initial_packages.append(package) if request.method == "GET": # HACK: fix bug in wtforms form.private.data = collection.private if collection else False if collection: for item in collection.items: form.descriptions.append_entry(item.description) form.package_ids.append_entry(item.package.id) else: form.name = None if form.validate_on_submit(): ret = handle_create_edit(collection, form, initial_packages, author) if ret: return ret return render_template("collections/create_edit.html", collection=collection, form=form) def handle_create_edit(collection: Collection, form: CollectionForm, initial_packages: typing.List[Package], author: User): severity = AuditSeverity.NORMAL if author == current_user else AuditSeverity.EDITOR name = form.name.data if collection else regex_invalid_chars.sub("", form.title.data.lower().replace(" ", "_")) if collection is None or name != collection.name: if Collection.query \ .filter(Collection.name == name, Collection.author == author) \ .count() > 0: flash(gettext("A collection with a similar title already exists"), "danger") return if Package.query \ .filter(Package.name == name, Package.author == author) \ .count() > 0: flash(gettext("Unable to create collection as a package with that name already exists"), "danger") return if collection is None: collection = Collection() collection.author = author form.populate_obj(collection) collection.name = name db.session.add(collection) for package in initial_packages: link = CollectionPackage() link.package = package link.collection = collection link.order = len(collection.items) db.session.add(link) add_audit_log(severity, current_user, f"Created collection {collection.author.username}/{collection.name}", collection.get_url("collections.view"), None) else: form.populate_obj(collection) collection.name = name for i, package_id in enumerate(form.package_ids): item = next((x for x in collection.items if str(x.package.id) == package_id.data), None) if item is None: abort(400) item.description = form.descriptions[i].data add_audit_log(severity, current_user, f"Edited collection {collection.author.username}/{collection.name}", collection.get_url("collections.view"), None) db.session.commit() return redirect(collection.get_url("collections.view")) @bp.route("/collections///delete/", methods=["GET", "POST"]) @login_required def delete(author, name): collection = Collection.query \ .filter(Collection.name == name, Collection.author.has(username=author)) \ .one_or_404() if not collection.check_perm(current_user, Permission.EDIT_COLLECTION): abort(403) if request.method == "POST": add_audit_log(AuditSeverity.NORMAL, current_user, f"Deleted collection {collection.author.username}/{collection.name}", collection.get_url("collections.view"), None) db.session.delete(collection) db.session.commit() return redirect(url_for("collections.list_all", author=author)) return render_template("collections/delete.html", collection=collection) def toggle_package(collection: Collection, package: Package): severity = AuditSeverity.NORMAL if collection.author == current_user else AuditSeverity.EDITOR if package in collection.packages: CollectionPackage.query \ .filter(CollectionPackage.collection == collection, CollectionPackage.package == package) \ .delete(synchronize_session=False) add_audit_log(severity, current_user, f"Removed {package.get_id()} from collection {collection.author.username}/{collection.name}", collection.get_url("collections.view"), None) db.session.commit() return False else: link = CollectionPackage() link.package = package link.collection = collection link.order = len(collection.items) db.session.add(link) add_audit_log(severity, current_user, f"Added {package.get_id()} to collection {collection.author.username}/{collection.name}", collection.get_url("collections.view"), None) db.session.commit() return True @bp.route("/packages///add-to/", methods=["GET", "POST"]) @is_package_page @login_required def package_add(package): if request.method == "POST": collection_id = request.form["collection"] collection = Collection.query.get(collection_id) if collection is None: abort(404) if not collection.check_perm(current_user, Permission.EDIT_COLLECTION): abort(403) if toggle_package(collection, package): flash(gettext("Added package to collection"), "success") else: flash(gettext("Removed package from collection"), "success") return redirect(package.get_url("collections.package_add")) collections = current_user.collections.all() if current_user.rank.at_least(UserRank.EDITOR) and current_user.username != "ContentDB": collections.extend(Collection.query.filter(Collection.author.has(username="ContentDB")).all()) return render_template("collections/package_add_to.html", package=package, collections=collections) @bp.route("/packages///favorite/", methods=["POST"]) @is_package_page @login_required def package_toggle_favorite(package): collection = Collection.query.filter(Collection.name == "favorites", Collection.author == current_user).first() if collection is None: collection = Collection() collection.title = "Favorites" collection.name = "favorites" collection.short_description = "My favorites" collection.author = current_user db.session.add(collection) if toggle_package(collection, package): flash(gettext("Added package to favorites collection"), "success") else: flash(gettext("Removed package from favorites collection"), "success") return redirect(package.get_url("packages.view")) @bp.route("/collections///clone/", methods=["POST"]) @login_required def clone(author, name): old_collection: typing.Optional[Collection] = Collection.query \ .filter(Collection.name == name, Collection.author.has(username=author)) \ .one_or_404() index = 0 new_name = name new_title = old_collection.title while True: if Collection.query \ .filter(Collection.name == new_name, Collection.author == current_user) \ .count() == 0: break index += 1 new_name = f"{name}_{index}" new_title = f"{old_collection.title} ({index})" collection = Collection() collection.title = new_title collection.author = current_user collection.short_description = old_collection.short_description collection.name = new_name collection.private = True db.session.add(collection) for item in old_collection.items: new_item = CollectionPackage() new_item.package = item.package new_item.collection = collection new_item.description = item.description new_item.order = item.order db.session.add(new_item) add_audit_log(AuditSeverity.NORMAL, current_user, f"Created collection {collection.name} from {old_collection.author.username}/{old_collection.name} ", collection.get_url("collections.view"), None) db.session.commit() return redirect(collection.get_url("collections.view"))