mirror of
https://github.com/minetest/contentdb.git
synced 2024-11-10 17:43:46 +01:00
208 lines
6.4 KiB
Python
208 lines
6.4 KiB
Python
|
# ContentDB
|
||
|
# Copyright (C) 2023 rubenwardy
|
||
|
#
|
||
|
# This program is free software: you can redistribute it and/or modify
|
||
|
# it under the terms of the GNU Affero General Public License as published by
|
||
|
# the Free Software Foundation, either version 3 of the License, or
|
||
|
# (at your option) any later version.
|
||
|
#
|
||
|
# This program is distributed in the hope that it will be useful,
|
||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
|
# GNU Affero General Public License for more details.
|
||
|
#
|
||
|
# You should have received a copy of the GNU Affero General Public License
|
||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||
|
|
||
|
import urllib.parse as urlparse
|
||
|
from typing import Optional
|
||
|
from urllib.parse import urlencode
|
||
|
|
||
|
from flask import Blueprint, render_template, redirect, url_for, request, jsonify, abort, make_response
|
||
|
from flask_babel import lazy_gettext
|
||
|
from flask_login import current_user, login_required
|
||
|
from flask_wtf import FlaskForm
|
||
|
from wtforms import StringField, SubmitField, URLField
|
||
|
from wtforms.validators import InputRequired, Length
|
||
|
|
||
|
from app import csrf
|
||
|
from app.blueprints.users.settings import get_setting_tabs
|
||
|
from app.models import db, OAuthClient, User, Permission, APIToken
|
||
|
from app.utils import random_string
|
||
|
|
||
|
bp = Blueprint("oauth", __name__)
|
||
|
|
||
|
|
||
|
def build_redirect_url(url: str, code: str, state: Optional[str]):
|
||
|
params = {"code": code}
|
||
|
if state is not None:
|
||
|
params["state"] = state
|
||
|
url_parts = list(urlparse.urlparse(url))
|
||
|
query = dict(urlparse.parse_qsl(url_parts[4]))
|
||
|
query.update(params)
|
||
|
url_parts[4] = urlencode(query)
|
||
|
return urlparse.urlunparse(url_parts)
|
||
|
|
||
|
|
||
|
@bp.route("/oauth/authorize/", methods=["GET", "POST"])
|
||
|
@login_required
|
||
|
def oauth_start():
|
||
|
response_type = request.args.get("response_type", "code")
|
||
|
if response_type != "code":
|
||
|
return "Unsupported response_type, only code is supported", 400
|
||
|
|
||
|
client_id = request.args.get("client_id")
|
||
|
if client_id is None:
|
||
|
return "Missing client_id", 400
|
||
|
|
||
|
redirect_uri = request.args.get("redirect_uri")
|
||
|
if redirect_uri is None:
|
||
|
return "Missing redirect_uri", 400
|
||
|
|
||
|
client = OAuthClient.query.get_or_404(client_id)
|
||
|
if client.redirect_url != redirect_uri:
|
||
|
return "redirect_uri does not match client", 400
|
||
|
|
||
|
state = request.args.get("state")
|
||
|
|
||
|
token = APIToken.query.filter(APIToken.client == client, APIToken.owner == current_user).first()
|
||
|
if token:
|
||
|
token.access_token = random_string(32)
|
||
|
token.auth_code = random_string(32)
|
||
|
db.session.commit()
|
||
|
return redirect(build_redirect_url(client.redirect_url, token.auth_code, state))
|
||
|
|
||
|
if request.method == "POST":
|
||
|
action = request.form["action"]
|
||
|
if action == "cancel":
|
||
|
return redirect(client.redirect_url)
|
||
|
|
||
|
elif action == "authorize":
|
||
|
token = APIToken()
|
||
|
token.access_token = random_string(32)
|
||
|
token.name = f"Token for {client.title} by {client.owner.username}"
|
||
|
token.owner = current_user
|
||
|
token.client = client
|
||
|
assert client is not None
|
||
|
token.auth_code = random_string(32)
|
||
|
db.session.add(token)
|
||
|
db.session.commit()
|
||
|
|
||
|
return redirect(build_redirect_url(client.redirect_url, token.auth_code, state))
|
||
|
|
||
|
return render_template("oauth/authorize.html", client=client)
|
||
|
|
||
|
|
||
|
def error(code: int, msg: str):
|
||
|
abort(make_response(jsonify({"success": False, "error": msg}), code))
|
||
|
|
||
|
|
||
|
@bp.route("/oauth/token/", methods=["POST"])
|
||
|
@csrf.exempt
|
||
|
def oauth_grant():
|
||
|
form = request.form
|
||
|
|
||
|
grant_type = request.args.get("grant_type", "authorization_code")
|
||
|
if grant_type != "authorization_code":
|
||
|
error(400, "Unsupported grant_type, only authorization_code is supported")
|
||
|
|
||
|
client_id = form.get("client_id")
|
||
|
if client_id is None:
|
||
|
error(400, "Missing client_id")
|
||
|
|
||
|
client_secret = form.get("client_secret")
|
||
|
if client_secret is None:
|
||
|
error(400, "Missing client_secret")
|
||
|
|
||
|
code = form.get("code")
|
||
|
if code is None:
|
||
|
error(400, "Missing code")
|
||
|
|
||
|
client = OAuthClient.query.filter_by(id=client_id, secret=client_secret).first()
|
||
|
if client is None:
|
||
|
error(400, "client_id and/or client_secret is incorrect")
|
||
|
|
||
|
token = APIToken.query.filter_by(auth_code=code).first()
|
||
|
if token is None or token.client != client:
|
||
|
error(400, "Incorrect code. It may have already been redeemed")
|
||
|
|
||
|
token.auth_code = None
|
||
|
db.session.commit()
|
||
|
|
||
|
return jsonify({
|
||
|
"access_token": token.access_token,
|
||
|
"refresh_token": "abc",
|
||
|
"token_type": "Bearer",
|
||
|
})
|
||
|
|
||
|
|
||
|
@bp.route("/user/apps/")
|
||
|
@login_required
|
||
|
def list_clients_redirect():
|
||
|
return redirect(url_for("oauth.list_clients", username=current_user.username))
|
||
|
|
||
|
|
||
|
@bp.route("/users/<username>/apps/")
|
||
|
@login_required
|
||
|
def list_clients(username):
|
||
|
user = User.query.filter_by(username=username).first_or_404()
|
||
|
if not user.check_perm(current_user, Permission.CREATE_OAUTH_CLIENT):
|
||
|
abort(403)
|
||
|
|
||
|
return render_template("oauth/list_clients.html", user=user, tabs=get_setting_tabs(user), current_tab="oauth_clients")
|
||
|
|
||
|
|
||
|
class OAuthClientForm(FlaskForm):
|
||
|
title = StringField(lazy_gettext("Title"), [InputRequired(), Length(5, 30)])
|
||
|
redirect_url = URLField(lazy_gettext("Redirect URL"), [InputRequired(), Length(5, 123)])
|
||
|
submit = SubmitField(lazy_gettext("Save"))
|
||
|
|
||
|
|
||
|
@bp.route("/users/<username>/apps/new/", methods=["GET", "POST"])
|
||
|
@bp.route("/users/<username>/apps/<id_>/edit/", methods=["GET", "POST"])
|
||
|
@login_required
|
||
|
def create_edit_client(username, id_=None):
|
||
|
user = User.query.filter_by(username=username).first_or_404()
|
||
|
if not user.check_perm(current_user, Permission.CREATE_OAUTH_CLIENT):
|
||
|
abort(403)
|
||
|
|
||
|
is_new = id_ is None
|
||
|
client = None
|
||
|
if id_ is not None:
|
||
|
client = OAuthClient.query.get_or_404(id_)
|
||
|
|
||
|
form = OAuthClientForm(formdata=request.form, obj=client)
|
||
|
if form.validate_on_submit():
|
||
|
if is_new:
|
||
|
client = OAuthClient()
|
||
|
db.session.add(client)
|
||
|
client.owner = user
|
||
|
client.id = random_string(24)
|
||
|
client.secret = random_string(32)
|
||
|
|
||
|
form.populate_obj(client)
|
||
|
db.session.commit()
|
||
|
|
||
|
return redirect(url_for("oauth.create_edit_client", username=username, id_=client.id))
|
||
|
|
||
|
return render_template("oauth/create_edit.html", user=user, form=form, client=client)
|
||
|
|
||
|
|
||
|
@bp.route("/users/<username>/apps/<id_>/delete/", methods=["POST"])
|
||
|
@login_required
|
||
|
def delete_client(username, id_):
|
||
|
user = User.query.filter_by(username=username).first_or_404()
|
||
|
if not user.check_perm(current_user, Permission.CREATE_OAUTH_CLIENT):
|
||
|
abort(403)
|
||
|
|
||
|
client = OAuthClient.query.get(id_)
|
||
|
if client is None:
|
||
|
abort(404)
|
||
|
elif client.owner != user:
|
||
|
abort(403)
|
||
|
|
||
|
db.session.delete(client)
|
||
|
db.session.commit()
|
||
|
|
||
|
return redirect(url_for("oauth.list_clients", username=username))
|