mirror of
https://github.com/minetest/contentdb.git
synced 2025-01-24 06:51:29 +01:00
Split utils.py into package
This commit is contained in:
parent
b22ef5ae83
commit
b3237b0c49
@ -346,7 +346,7 @@ def check_update_config_impl(package):
|
||||
rel.task_id = uuid()
|
||||
db.session.add(rel)
|
||||
|
||||
msg = "Created release {} (Git Update Config)".format(rel.title)
|
||||
msg = "Created release {} (Git Update Detection)".format(rel.title)
|
||||
addSystemAuditLog(AuditSeverity.NORMAL, msg, package.getDetailsURL(), package)
|
||||
|
||||
db.session.commit()
|
||||
|
296
app/utils.py
296
app/utils.py
@ -1,296 +0,0 @@
|
||||
# ContentDB
|
||||
# Copyright (C) 2018-21 rubenwardy
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
|
||||
import imghdr
|
||||
import os
|
||||
import random
|
||||
import string
|
||||
from functools import wraps
|
||||
from urllib.parse import urljoin
|
||||
|
||||
import user_agents
|
||||
from flask import request, flash, abort, redirect
|
||||
from flask_login import login_user, current_user
|
||||
from werkzeug.datastructures import MultiDict
|
||||
from passlib.hash import bcrypt
|
||||
|
||||
from .models import *
|
||||
|
||||
|
||||
def is_safe_url(target):
|
||||
ref_url = urlparse(request.host_url)
|
||||
test_url = urlparse(urljoin(request.host_url, target))
|
||||
return test_url.scheme in ('http', 'https') and \
|
||||
ref_url.netloc == test_url.netloc
|
||||
|
||||
|
||||
# These are given to Jinja in template_filters.py
|
||||
|
||||
def abs_url_for(path, **kwargs):
|
||||
scheme = "https" if app.config["BASE_URL"][:5] == "https" else "http"
|
||||
return url_for(path, _external=True, _scheme=scheme, **kwargs)
|
||||
|
||||
def abs_url(path):
|
||||
return urljoin(app.config["BASE_URL"], path)
|
||||
|
||||
def url_set_query(**kwargs):
|
||||
args = MultiDict(request.args)
|
||||
|
||||
for key, value in kwargs.items():
|
||||
if key == "_add":
|
||||
for key2, value_to_add in value.items():
|
||||
values = set(args.getlist(key2))
|
||||
values.add(value_to_add)
|
||||
args.setlist(key2, list(values))
|
||||
elif key == "_remove":
|
||||
for key2, value_to_remove in value.items():
|
||||
values = set(args.getlist(key2))
|
||||
values.discard(value_to_remove)
|
||||
args.setlist(key2, list(values))
|
||||
else:
|
||||
args.setlist(key, [ value ])
|
||||
|
||||
|
||||
dargs = dict(args.lists())
|
||||
|
||||
return url_for(request.endpoint, **dargs)
|
||||
|
||||
def get_int_or_abort(v, default=None):
|
||||
if v is None:
|
||||
return default
|
||||
|
||||
try:
|
||||
return int(v or default)
|
||||
except ValueError:
|
||||
abort(400)
|
||||
|
||||
def is_user_bot():
|
||||
user_agent = request.headers.get('User-Agent')
|
||||
if user_agent is None:
|
||||
return True
|
||||
|
||||
user_agent = user_agents.parse(user_agent)
|
||||
return user_agent.is_bot
|
||||
|
||||
def getExtension(filename):
|
||||
return filename.rsplit(".", 1)[1].lower() if "." in filename else None
|
||||
|
||||
def isFilenameAllowed(filename, exts):
|
||||
return getExtension(filename) in exts
|
||||
|
||||
ALLOWED_IMAGES = {"jpeg", "png"}
|
||||
def isAllowedImage(data):
|
||||
return imghdr.what(None, data) in ALLOWED_IMAGES
|
||||
|
||||
def shouldReturnJson():
|
||||
return "application/json" in request.accept_mimetypes and \
|
||||
not "text/html" in request.accept_mimetypes
|
||||
|
||||
def randomString(n):
|
||||
return ''.join(random.choice(string.ascii_lowercase + \
|
||||
string.ascii_uppercase + string.digits) for _ in range(n))
|
||||
|
||||
def doFileUpload(file, fileType, fileTypeDesc):
|
||||
if not file or file is None or file.filename == "":
|
||||
flash("No selected file", "danger")
|
||||
return None, None
|
||||
|
||||
assert os.path.isdir(app.config["UPLOAD_DIR"]), "UPLOAD_DIR must exist"
|
||||
|
||||
allowedExtensions = []
|
||||
isImage = False
|
||||
if fileType == "image":
|
||||
allowedExtensions = ["jpg", "jpeg", "png"]
|
||||
isImage = True
|
||||
elif fileType == "zip":
|
||||
allowedExtensions = ["zip"]
|
||||
else:
|
||||
raise Exception("Invalid fileType")
|
||||
|
||||
ext = getExtension(file.filename)
|
||||
if ext is None or not ext in allowedExtensions:
|
||||
flash("Please upload " + fileTypeDesc, "danger")
|
||||
return None, None
|
||||
|
||||
if isImage and not isAllowedImage(file.stream.read()):
|
||||
flash("Uploaded image isn't actually an image", "danger")
|
||||
return None, None
|
||||
|
||||
file.stream.seek(0)
|
||||
|
||||
filename = randomString(10) + "." + ext
|
||||
filepath = os.path.join(app.config["UPLOAD_DIR"], filename)
|
||||
file.save(filepath)
|
||||
return "/uploads/" + filename, filepath
|
||||
|
||||
|
||||
def check_password_hash(stored, given):
|
||||
if stored is None or stored == "":
|
||||
return False
|
||||
|
||||
return bcrypt.verify(given.encode("UTF-8"), stored)
|
||||
|
||||
|
||||
def make_flask_login_password(plaintext):
|
||||
return bcrypt.hash(plaintext.encode("UTF-8"))
|
||||
|
||||
|
||||
def login_user_set_active(user: User, *args, **kwargs):
|
||||
if user.rank == UserRank.NOT_JOINED and user.email is None:
|
||||
user.rank = UserRank.MEMBER
|
||||
user.notification_preferences = UserNotificationPreferences(user)
|
||||
user.is_active = True
|
||||
db.session.commit()
|
||||
|
||||
return login_user(user, *args, **kwargs)
|
||||
|
||||
|
||||
def rank_required(rank):
|
||||
def decorator(f):
|
||||
@wraps(f)
|
||||
def decorated_function(*args, **kwargs):
|
||||
if not current_user.is_authenticated:
|
||||
return redirect(url_for("users.login"))
|
||||
if not current_user.rank.atLeast(rank):
|
||||
abort(403)
|
||||
|
||||
return f(*args, **kwargs)
|
||||
|
||||
return decorated_function
|
||||
return decorator
|
||||
|
||||
|
||||
def getPackageByInfo(author, name):
|
||||
user = User.query.filter_by(username=author).first()
|
||||
if user is None:
|
||||
return None
|
||||
|
||||
package = Package.query.filter_by(name=name, author_id=user.id) \
|
||||
.filter(Package.state!=PackageState.DELETED).first()
|
||||
if package is None:
|
||||
return None
|
||||
|
||||
return package
|
||||
|
||||
def is_package_page(f):
|
||||
@wraps(f)
|
||||
def decorated_function(*args, **kwargs):
|
||||
if not ("author" in kwargs and "name" in kwargs):
|
||||
abort(400)
|
||||
|
||||
author = kwargs["author"]
|
||||
name = kwargs["name"]
|
||||
|
||||
package = getPackageByInfo(author, name)
|
||||
if package is None:
|
||||
package = getPackageByInfo(author, name + "_game")
|
||||
if package is None or package.type != PackageType.GAME:
|
||||
abort(404)
|
||||
|
||||
args = dict(kwargs)
|
||||
args["name"] = name + "_game"
|
||||
return redirect(url_for(request.endpoint, **args))
|
||||
|
||||
del kwargs["author"]
|
||||
del kwargs["name"]
|
||||
|
||||
return f(package=package, *args, **kwargs)
|
||||
|
||||
return decorated_function
|
||||
|
||||
|
||||
def addNotification(target, causer: User, type: NotificationType, title: str, url: str, package: Package = None):
|
||||
try:
|
||||
iter(target)
|
||||
for x in target:
|
||||
addNotification(x, causer, type, title, url, package)
|
||||
return
|
||||
except TypeError:
|
||||
pass
|
||||
|
||||
if target.rank.atLeast(UserRank.NEW_MEMBER) and target != causer:
|
||||
Notification.query.filter_by(user=target, causer=causer, type=type, title=title, url=url, package=package).delete()
|
||||
notif = Notification(target, causer, type, title, url, package)
|
||||
db.session.add(notif)
|
||||
|
||||
|
||||
def addAuditLog(severity: AuditSeverity, causer: User, title: str, url: str, package : Package =None, description : str =None):
|
||||
entry = AuditLogEntry(causer, severity, title, url, package, description)
|
||||
db.session.add(entry)
|
||||
|
||||
|
||||
def clearNotifications(url):
|
||||
if current_user.is_authenticated:
|
||||
Notification.query.filter_by(user=current_user, url=url).delete()
|
||||
db.session.commit()
|
||||
|
||||
|
||||
YESES = ["yes", "true", "1", "on"]
|
||||
|
||||
def isYes(val):
|
||||
return val and val.lower() in YESES
|
||||
|
||||
|
||||
def isNo(val):
|
||||
return val and not isYes(val)
|
||||
|
||||
def nonEmptyOrNone(str):
|
||||
if str is None or str == "":
|
||||
return None
|
||||
|
||||
return str
|
||||
|
||||
|
||||
def addSystemNotification(target, type: NotificationType, title: str, url: str, package: Package = None):
|
||||
system_user = User.query.filter_by(username="ContentDB").first()
|
||||
assert system_user
|
||||
|
||||
return addNotification(target, system_user, type, title, url, package)
|
||||
|
||||
|
||||
def addSystemAuditLog(severity: AuditSeverity, title: str, url: str, package=None, description=None):
|
||||
system_user = User.query.filter_by(username="ContentDB").first()
|
||||
assert system_user
|
||||
|
||||
return addAuditLog(severity, system_user, title, url, package, description)
|
||||
|
||||
|
||||
def post_bot_message(package: Package, title: str, message: str):
|
||||
system_user = User.query.filter_by(username="ContentDB").first()
|
||||
assert system_user
|
||||
|
||||
thread = package.threads.filter_by(author=system_user).first()
|
||||
if not thread:
|
||||
thread = Thread()
|
||||
thread.package = package
|
||||
thread.title = "Bot messages for {}".format(package.title)
|
||||
thread.author = system_user
|
||||
thread.private = True
|
||||
thread.watchers.append(package.author)
|
||||
db.session.add(thread)
|
||||
db.session.flush()
|
||||
|
||||
reply = ThreadReply()
|
||||
reply.thread = thread
|
||||
reply.author = system_user
|
||||
reply.comment = "**{}**\n\n{}".format(title, message)
|
||||
db.session.add(reply)
|
||||
|
||||
addNotification(thread.watchers, system_user, NotificationType.BOT,
|
||||
title, thread.getViewURL(), thread.package)
|
||||
|
||||
thread.replies.append(reply)
|
36
app/utils/__init__.py
Normal file
36
app/utils/__init__.py
Normal file
@ -0,0 +1,36 @@
|
||||
# ContentDB
|
||||
# Copyright (C) 2018-21 rubenwardy
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
|
||||
from .flask import *
|
||||
from .uploads import *
|
||||
from .models import *
|
||||
from .user import *
|
||||
|
||||
|
||||
YESES = ["yes", "true", "1", "on"]
|
||||
|
||||
def isYes(val):
|
||||
return val and val.lower() in YESES
|
||||
|
||||
def isNo(val):
|
||||
return val and not isYes(val)
|
||||
|
||||
def nonEmptyOrNone(str):
|
||||
if str is None or str == "":
|
||||
return None
|
||||
|
||||
return str
|
80
app/utils/flask.py
Normal file
80
app/utils/flask.py
Normal file
@ -0,0 +1,80 @@
|
||||
# ContentDB
|
||||
# Copyright (C) 2018-21 rubenwardy
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
|
||||
from urllib.parse import urljoin
|
||||
|
||||
import user_agents
|
||||
from flask import request, abort
|
||||
from werkzeug.datastructures import MultiDict
|
||||
|
||||
from app.models import *
|
||||
|
||||
|
||||
def is_safe_url(target):
|
||||
ref_url = urlparse(request.host_url)
|
||||
test_url = urlparse(urljoin(request.host_url, target))
|
||||
return test_url.scheme in ('http', 'https') and \
|
||||
ref_url.netloc == test_url.netloc
|
||||
|
||||
|
||||
# These are given to Jinja in template_filters.py
|
||||
|
||||
def abs_url_for(path, **kwargs):
|
||||
scheme = "https" if app.config["BASE_URL"][:5] == "https" else "http"
|
||||
return url_for(path, _external=True, _scheme=scheme, **kwargs)
|
||||
|
||||
def abs_url(path):
|
||||
return urljoin(app.config["BASE_URL"], path)
|
||||
|
||||
def url_set_query(**kwargs):
|
||||
args = MultiDict(request.args)
|
||||
|
||||
for key, value in kwargs.items():
|
||||
if key == "_add":
|
||||
for key2, value_to_add in value.items():
|
||||
values = set(args.getlist(key2))
|
||||
values.add(value_to_add)
|
||||
args.setlist(key2, list(values))
|
||||
elif key == "_remove":
|
||||
for key2, value_to_remove in value.items():
|
||||
values = set(args.getlist(key2))
|
||||
values.discard(value_to_remove)
|
||||
args.setlist(key2, list(values))
|
||||
else:
|
||||
args.setlist(key, [ value ])
|
||||
|
||||
|
||||
dargs = dict(args.lists())
|
||||
|
||||
return url_for(request.endpoint, **dargs)
|
||||
|
||||
def get_int_or_abort(v, default=None):
|
||||
if v is None:
|
||||
return default
|
||||
|
||||
try:
|
||||
return int(v or default)
|
||||
except ValueError:
|
||||
abort(400)
|
||||
|
||||
def is_user_bot():
|
||||
user_agent = request.headers.get('User-Agent')
|
||||
if user_agent is None:
|
||||
return True
|
||||
|
||||
user_agent = user_agents.parse(user_agent)
|
||||
return user_agent.is_bot
|
126
app/utils/models.py
Normal file
126
app/utils/models.py
Normal file
@ -0,0 +1,126 @@
|
||||
# ContentDB
|
||||
# Copyright (C) 2018-21 rubenwardy
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
|
||||
from functools import wraps
|
||||
from flask import abort, redirect, url_for, request
|
||||
from flask_login import current_user
|
||||
from app.models import User, NotificationType, Package, UserRank, Notification, db, AuditSeverity, AuditLogEntry, ThreadReply, Thread, PackageState, PackageType
|
||||
|
||||
|
||||
def getPackageByInfo(author, name):
|
||||
user = User.query.filter_by(username=author).first()
|
||||
if user is None:
|
||||
return None
|
||||
|
||||
package = Package.query.filter_by(name=name, author_id=user.id) \
|
||||
.filter(Package.state!=PackageState.DELETED).first()
|
||||
if package is None:
|
||||
return None
|
||||
|
||||
return package
|
||||
|
||||
def is_package_page(f):
|
||||
@wraps(f)
|
||||
def decorated_function(*args, **kwargs):
|
||||
if not ("author" in kwargs and "name" in kwargs):
|
||||
abort(400)
|
||||
|
||||
author = kwargs["author"]
|
||||
name = kwargs["name"]
|
||||
|
||||
package = getPackageByInfo(author, name)
|
||||
if package is None:
|
||||
package = getPackageByInfo(author, name + "_game")
|
||||
if package is None or package.type != PackageType.GAME:
|
||||
abort(404)
|
||||
|
||||
args = dict(kwargs)
|
||||
args["name"] = name + "_game"
|
||||
return redirect(url_for(request.endpoint, **args))
|
||||
|
||||
del kwargs["author"]
|
||||
del kwargs["name"]
|
||||
|
||||
return f(package=package, *args, **kwargs)
|
||||
|
||||
return decorated_function
|
||||
|
||||
|
||||
def addNotification(target, causer: User, type: NotificationType, title: str, url: str, package: Package = None):
|
||||
try:
|
||||
iter(target)
|
||||
for x in target:
|
||||
addNotification(x, causer, type, title, url, package)
|
||||
return
|
||||
except TypeError:
|
||||
pass
|
||||
|
||||
if target.rank.atLeast(UserRank.NEW_MEMBER) and target != causer:
|
||||
Notification.query.filter_by(user=target, causer=causer, type=type, title=title, url=url, package=package).delete()
|
||||
notif = Notification(target, causer, type, title, url, package)
|
||||
db.session.add(notif)
|
||||
|
||||
|
||||
def addAuditLog(severity: AuditSeverity, causer: User, title: str, url: str, package : Package =None, description : str =None):
|
||||
entry = AuditLogEntry(causer, severity, title, url, package, description)
|
||||
db.session.add(entry)
|
||||
|
||||
|
||||
def clearNotifications(url):
|
||||
if current_user.is_authenticated:
|
||||
Notification.query.filter_by(user=current_user, url=url).delete()
|
||||
db.session.commit()
|
||||
|
||||
def addSystemNotification(target, type: NotificationType, title: str, url: str, package: Package = None):
|
||||
system_user = User.query.filter_by(username="ContentDB").first()
|
||||
assert system_user
|
||||
|
||||
return addNotification(target, system_user, type, title, url, package)
|
||||
|
||||
|
||||
def addSystemAuditLog(severity: AuditSeverity, title: str, url: str, package=None, description=None):
|
||||
system_user = User.query.filter_by(username="ContentDB").first()
|
||||
assert system_user
|
||||
|
||||
return addAuditLog(severity, system_user, title, url, package, description)
|
||||
|
||||
|
||||
def post_bot_message(package: Package, title: str, message: str):
|
||||
system_user = User.query.filter_by(username="ContentDB").first()
|
||||
assert system_user
|
||||
|
||||
thread = package.threads.filter_by(author=system_user).first()
|
||||
if not thread:
|
||||
thread = Thread()
|
||||
thread.package = package
|
||||
thread.title = "Bot messages for {}".format(package.title)
|
||||
thread.author = system_user
|
||||
thread.private = True
|
||||
thread.watchers.append(package.author)
|
||||
db.session.add(thread)
|
||||
db.session.flush()
|
||||
|
||||
reply = ThreadReply()
|
||||
reply.thread = thread
|
||||
reply.author = system_user
|
||||
reply.comment = "**{}**\n\n{}".format(title, message)
|
||||
db.session.add(reply)
|
||||
|
||||
addNotification(thread.watchers, system_user, NotificationType.BOT,
|
||||
title, thread.getViewURL(), thread.package)
|
||||
|
||||
thread.replies.append(reply)
|
74
app/utils/uploads.py
Normal file
74
app/utils/uploads.py
Normal file
@ -0,0 +1,74 @@
|
||||
# ContentDB
|
||||
# Copyright (C) 2018-21 rubenwardy
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
|
||||
import imghdr
|
||||
import os
|
||||
import random
|
||||
import string
|
||||
|
||||
from flask import request, flash
|
||||
|
||||
from app.models import *
|
||||
|
||||
|
||||
def getExtension(filename):
|
||||
return filename.rsplit(".", 1)[1].lower() if "." in filename else None
|
||||
|
||||
ALLOWED_IMAGES = {"jpeg", "png"}
|
||||
def isAllowedImage(data):
|
||||
return imghdr.what(None, data) in ALLOWED_IMAGES
|
||||
|
||||
def shouldReturnJson():
|
||||
return "application/json" in request.accept_mimetypes and \
|
||||
not "text/html" in request.accept_mimetypes
|
||||
|
||||
def randomString(n):
|
||||
return ''.join(random.choice(string.ascii_lowercase + \
|
||||
string.ascii_uppercase + string.digits) for _ in range(n))
|
||||
|
||||
def doFileUpload(file, fileType, fileTypeDesc):
|
||||
if not file or file is None or file.filename == "":
|
||||
flash("No selected file", "danger")
|
||||
return None, None
|
||||
|
||||
assert os.path.isdir(app.config["UPLOAD_DIR"]), "UPLOAD_DIR must exist"
|
||||
|
||||
allowedExtensions = []
|
||||
isImage = False
|
||||
if fileType == "image":
|
||||
allowedExtensions = ["jpg", "jpeg", "png"]
|
||||
isImage = True
|
||||
elif fileType == "zip":
|
||||
allowedExtensions = ["zip"]
|
||||
else:
|
||||
raise Exception("Invalid fileType")
|
||||
|
||||
ext = getExtension(file.filename)
|
||||
if ext is None or not ext in allowedExtensions:
|
||||
flash("Please upload " + fileTypeDesc, "danger")
|
||||
return None, None
|
||||
|
||||
if isImage and not isAllowedImage(file.stream.read()):
|
||||
flash("Uploaded image isn't actually an image", "danger")
|
||||
return None, None
|
||||
|
||||
file.stream.seek(0)
|
||||
|
||||
filename = randomString(10) + "." + ext
|
||||
filepath = os.path.join(app.config["UPLOAD_DIR"], filename)
|
||||
file.save(filepath)
|
||||
return "/uploads/" + filename, filepath
|
60
app/utils/user.py
Normal file
60
app/utils/user.py
Normal file
@ -0,0 +1,60 @@
|
||||
# ContentDB
|
||||
# Copyright (C) 2018-21 rubenwardy
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
|
||||
from functools import wraps
|
||||
|
||||
from flask_login import login_user, current_user
|
||||
from passlib.handlers.bcrypt import bcrypt
|
||||
from flask import redirect, url_for, abort
|
||||
|
||||
from app.models import User, UserRank, UserNotificationPreferences, db
|
||||
|
||||
|
||||
def check_password_hash(stored, given):
|
||||
if stored is None or stored == "":
|
||||
return False
|
||||
|
||||
return bcrypt.verify(given.encode("UTF-8"), stored)
|
||||
|
||||
|
||||
def make_flask_login_password(plaintext):
|
||||
return bcrypt.hash(plaintext.encode("UTF-8"))
|
||||
|
||||
|
||||
def login_user_set_active(user: User, *args, **kwargs):
|
||||
if user.rank == UserRank.NOT_JOINED and user.email is None:
|
||||
user.rank = UserRank.MEMBER
|
||||
user.notification_preferences = UserNotificationPreferences(user)
|
||||
user.is_active = True
|
||||
db.session.commit()
|
||||
|
||||
return login_user(user, *args, **kwargs)
|
||||
|
||||
|
||||
def rank_required(rank):
|
||||
def decorator(f):
|
||||
@wraps(f)
|
||||
def decorated_function(*args, **kwargs):
|
||||
if not current_user.is_authenticated:
|
||||
return redirect(url_for("users.login"))
|
||||
if not current_user.rank.atLeast(rank):
|
||||
abort(403)
|
||||
|
||||
return f(*args, **kwargs)
|
||||
|
||||
return decorated_function
|
||||
return decorator
|
Loading…
Reference in New Issue
Block a user