# ContentDB # Copyright (C) 2023 rubenwardy # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import re import typing from flask import Blueprint, request, redirect, render_template, flash, abort, url_for, jsonify from flask_babel import lazy_gettext, gettext from flask_login import current_user, login_required from flask_wtf import FlaskForm from wtforms import StringField, BooleanField, SubmitField, FieldList, HiddenField, TextAreaField from wtforms.validators import InputRequired, Length, Optional, Regexp from app.models import Collection, db, Package, Permission, CollectionPackage, User, UserRank, AuditSeverity from app.utils import nonempty_or_none, normalize_line_endings, should_return_json from app.utils.models import is_package_page, add_audit_log, create_session bp = Blueprint("collections", __name__) regex_invalid_chars = re.compile("[^a-z0-9_]") @bp.route("/collections/") @bp.route("/collections//") def list_all(author=None): if author: user = User.query.filter_by(username=author).one_or_404() query = user.collections else: user = None query = Collection.query.filter(Collection.items.any()).order_by(db.asc(Collection.title)) if "package" in request.args: package = Package.get_by_key(request.args["package"]) if package is None: abort(404) query = query.filter(Collection.packages.contains(package)) collections = [x for x in query.all() if x.check_perm(current_user, Permission.VIEW_COLLECTION)] return render_template("collections/list.html", user=user, collections=collections, noindex=len(collections) == 0) @bp.route("/collections///") def view(author, name): collection = Collection.query \ .filter(Collection.name == name, Collection.author.has(username=author)) \ .one_or_404() if not collection.check_perm(current_user, Permission.VIEW_COLLECTION): abort(404) items = collection.items if not collection.check_perm(current_user, Permission.EDIT_COLLECTION): items = [x for x in items if x.package.check_perm(current_user, Permission.VIEW_PACKAGE)] if should_return_json(): return jsonify([ item.package.as_key_dict() for item in items ]) else: return render_template("collections/view.html", collection=collection, items=items) class CollectionForm(FlaskForm): title = StringField(lazy_gettext("Title"), [InputRequired(), Length(3, 100)]) name = StringField("URL", [Optional(), Length(1, 20), Regexp("^[a-z0-9_]", 0, "Lower case letters (a-z), digits (0-9), and underscores (_) only")]) short_description = StringField(lazy_gettext("Short Description"), [Optional(), Length(0, 200)]) long_description = TextAreaField(lazy_gettext("Page Content"), [Optional()], filters=[nonempty_or_none, normalize_line_endings]) private = BooleanField(lazy_gettext("Private")) pinned = BooleanField(lazy_gettext("Pinned to my profile")) descriptions = FieldList( StringField(lazy_gettext("Short Description"), [Optional(), Length(0, 500)], filters=[nonempty_or_none]), min_entries=0) package_ids = FieldList(HiddenField(), min_entries=0) package_removed = FieldList(HiddenField(), min_entries=0) order = HiddenField() submit = SubmitField(lazy_gettext("Save")) @bp.route("/collections/new/", methods=["GET", "POST"]) @bp.route("/collections///edit/", methods=["GET", "POST"]) @login_required def create_edit(author=None, name=None): collection: typing.Optional[Collection] = None if author is not None and name is not None: collection = Collection.query \ .filter(Collection.name == name, Collection.author.has(username=author)) \ .one_or_404() if not collection.check_perm(current_user, Permission.EDIT_COLLECTION): abort(403) elif "author" in request.args: author = request.args["author"] if author != current_user.username and not current_user.rank.at_least(UserRank.EDITOR): abort(403) if author is None: author = current_user else: author = User.query.filter_by(username=author).one() form = CollectionForm(formdata=request.form, obj=collection) initial_packages = [] if "package" in request.args: for package_id in request.args.getlist("package"): package = Package.get_by_key(package_id) if package: initial_packages.append(package) if request.method == "GET": # HACK: fix bug in wtforms form.private.data = collection.private if collection else False form.pinned.data = collection.pinned if collection else False if collection: for item in collection.items: form.descriptions.append_entry(item.description) form.package_ids.append_entry(item.package.get_id()) form.package_removed.append_entry("0") else: form.name = None form.pinned = None if form.validate_on_submit(): ret = handle_create_edit(collection, form, initial_packages, author) if ret: return ret return render_template("collections/create_edit.html", collection=collection, form=form) def handle_create_edit(collection: Collection, form: CollectionForm, initial_packages: typing.List[Package], author: User): severity = AuditSeverity.NORMAL if author == current_user else AuditSeverity.EDITOR name = form.name.data if collection else regex_invalid_chars.sub("", form.title.data.lower().replace(" ", "_")) if collection is None or name != collection.name: if Collection.query \ .filter(Collection.name == name, Collection.author == author) \ .count() > 0: flash(gettext("A collection with a similar title already exists"), "danger") return if Package.query \ .filter(Package.name == name, Package.author == author) \ .count() > 0: flash(gettext("Unable to create collection as a package with that name already exists"), "danger") return if collection is None: collection = Collection() collection.author = author form.populate_obj(collection) collection.name = name db.session.add(collection) for package in initial_packages: link = CollectionPackage() link.package = package link.collection = collection link.order = len(collection.items) db.session.add(link) add_audit_log(severity, current_user, f"Created collection {collection.author.username}/{collection.name}", collection.get_url("collections.view"), None) else: form.populate_obj(collection) collection.name = name link_lookup = {} for link in collection.items: link_lookup[link.package.get_id()] = link for i, package_id in enumerate(form.package_ids): link = link_lookup.get(package_id.data) to_delete = form.package_removed[i].data == "1" if link is None: if to_delete: continue package = Package.get_by_key(package_id.data) if package is None: abort(400) link = CollectionPackage() link.package = package link.collection = collection link.description = form.descriptions[i].data link_lookup[link.package.get_id()] = link db.session.add(link) elif to_delete: db.session.delete(link) else: link.description = form.descriptions[i].data for i, package_id in enumerate(form.order.data.split(",")): if package_id != "": link_lookup[package_id].order = i + 1 add_audit_log(severity, current_user, f"Edited collection {collection.author.username}/{collection.name}", collection.get_url("collections.view"), None) db.session.commit() return redirect(collection.get_url("collections.view")) @bp.route("/collections///delete/", methods=["GET", "POST"]) @login_required def delete(author, name): collection = Collection.query \ .filter(Collection.name == name, Collection.author.has(username=author)) \ .one_or_404() if not collection.check_perm(current_user, Permission.EDIT_COLLECTION): abort(403) if request.method == "POST": add_audit_log(AuditSeverity.NORMAL, current_user, f"Deleted collection {collection.author.username}/{collection.name}", collection.get_url("collections.view"), None) db.session.delete(collection) db.session.commit() return redirect(url_for("collections.list_all", author=author)) return render_template("collections/delete.html", collection=collection) def toggle_package(collection: Collection, package: Package): severity = AuditSeverity.NORMAL if collection.author == current_user else AuditSeverity.EDITOR author = User.query.get(collection.author_id) if collection.author is None else collection.author if package in collection.packages: CollectionPackage.query \ .filter(CollectionPackage.collection == collection, CollectionPackage.package == package) \ .delete(synchronize_session=False) add_audit_log(severity, current_user, f"Removed {package.get_id()} from collection {author.username}/{collection.name}", collection.get_url("collections.view"), None) db.session.commit() return False else: link = CollectionPackage() link.package = package link.collection = collection link.order = len(collection.items) db.session.add(link) add_audit_log(severity, current_user, f"Added {package.get_id()} to collection {author.username}/{collection.name}", collection.get_url("collections.view"), None) db.session.commit() return True def get_or_create_favorites(session): collection = Collection.query.filter(Collection.name == "favorites", Collection.author == current_user).first() if collection is None: is_new = True collection = Collection() collection.title = "Favorites" collection.name = "favorites" collection.short_description = "My favorites" collection.author_id = current_user.id session.add(collection) else: is_new = False return collection, is_new @bp.route("/packages///add-to/", methods=["GET", "POST"]) @is_package_page @login_required def package_add(package): with create_session() as new_session: collection, is_new = get_or_create_favorites(new_session) if is_new: new_session.commit() if request.method == "POST": collection_id = request.form["collection"] collection = Collection.query.get(collection_id) if collection is None: abort(404) if not collection.check_perm(current_user, Permission.EDIT_COLLECTION): abort(403) if toggle_package(collection, package): flash(gettext("Added package to collection"), "success") else: flash(gettext("Removed package from collection"), "success") return redirect(package.get_url("collections.package_add")) collections = current_user.collections.all() if current_user.rank.at_least(UserRank.EDITOR) and current_user.username != "ContentDB": collections.extend(Collection.query.filter(Collection.author.has(username="ContentDB")).all()) return render_template("collections/package_add_to.html", package=package, collections=collections) @bp.route("/packages///favorite/", methods=["POST"]) @is_package_page @login_required def package_toggle_favorite(package): collection, _is_new = get_or_create_favorites(db.session) collection.author = current_user if toggle_package(collection, package): msg = gettext("Added package to favorites collection") if not collection.private: msg += " " + gettext("(Public, change from Profile > My Collections)") flash(msg, "success") else: flash(gettext("Removed package from favorites collection"), "success") return redirect(package.get_url("packages.view")) @bp.route("/collections///clone/", methods=["POST"]) @login_required def clone(author, name): old_collection: typing.Optional[Collection] = Collection.query \ .filter(Collection.name == name, Collection.author.has(username=author)) \ .one_or_404() index = 0 new_name = name new_title = old_collection.title while True: if Collection.query \ .filter(Collection.name == new_name, Collection.author == current_user) \ .count() == 0: break index += 1 new_name = f"{name}_{index}" new_title = f"{old_collection.title} ({index})" collection = Collection() collection.title = new_title collection.author = current_user collection.short_description = old_collection.short_description collection.name = new_name collection.private = True db.session.add(collection) for item in old_collection.items: new_item = CollectionPackage() new_item.package = item.package new_item.collection = collection new_item.description = item.description new_item.order = item.order db.session.add(new_item) add_audit_log(AuditSeverity.NORMAL, current_user, f"Created collection {collection.name} from {old_collection.author.username}/{old_collection.name} ", collection.get_url("collections.view"), None) db.session.commit() return redirect(collection.get_url("collections.view"))