Fix VCS webhooks assuming repo URLs are unique

Fixes #264
This commit is contained in:
rubenwardy 2024-06-22 11:42:51 +01:00
parent ca961cb35f
commit 09e06a159a
8 changed files with 216 additions and 169 deletions

@ -1,86 +0,0 @@
# ContentDB
# Copyright (C) 2020 rubenwardy
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import datetime
from flask import Blueprint, request, jsonify
bp = Blueprint("gitlab", __name__)
from app import csrf
from app.models import Package, APIToken, Permission, PackageState
from app.blueprints.api.support import error, api_create_vcs_release
def webhook_impl():
json = request.json
# Get package
gitlab_url = json["project"]["web_url"].replace("https://", "").replace("http://", "")
package = Package.query.filter(
Package.repo.ilike("%{}%".format(gitlab_url)), Package.state != PackageState.DELETED).first()
if package is None:
return error(400,
"Could not find package, did you set the VCS repo in CDB correctly? Expected {}".format(gitlab_url))
# Get all tokens for package
secret = request.headers.get("X-Gitlab-Token")
if secret is None:
return error(403, "Token required")
token = APIToken.query.filter_by(access_token=secret).first()
if token is None:
return error(403, "Invalid authentication")
if not package.check_perm(token.owner, Permission.APPROVE_RELEASE):
return error(403, "You do not have the permission to approve releases")
#
# Check event
#
event = json["event_name"]
if event == "push":
ref = json["after"]
title = datetime.datetime.utcnow().strftime("%Y-%m-%d") + " " + ref[:5]
branch = json["ref"].replace("refs/heads/", "")
if branch not in ["master", "main"]:
return jsonify({"success": False,
"message": "Webhook ignored, as it's not on the master/main branch"})
elif event == "tag_push":
ref = json["ref"]
title = ref.replace("refs/tags/", "")
else:
return error(400, "Unsupported event: '{}'. Only 'push', 'create:tag', and 'ping' are supported."
.format(event or "null"))
#
# Perform release
#
if package.releases.filter_by(commit_hash=ref).count() > 0:
return
return api_create_vcs_release(token, package, title, ref, reason="Webhook")
@bp.route("/gitlab/webhook/", methods=["POST"])
@csrf.exempt
def webhook():
try:
return webhook_impl()
except KeyError as err:
return error(400, "Missing field: {}".format(err.args[0]))

@ -50,7 +50,7 @@ def claim_forums():
flash(gettext("Unable to get GitHub username for user"), "danger") flash(gettext("Unable to get GitHub username for user"), "danger")
return redirect(url_for("users.claim_forums", username=username)) return redirect(url_for("users.claim_forums", username=username))
else: else:
return redirect(url_for("github.start")) return redirect(url_for("vcs.github_start"))
if "forum_token" in session: if "forum_token" in session:
token = session["forum_token"] token = session["forum_token"]

@ -0,0 +1,22 @@
# ContentDB
# Copyright (C) 2024 rubenwardy
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from flask import Blueprint
bp = Blueprint("vcs", __name__)
from . import github, gitlab

@ -0,0 +1,42 @@
# ContentDB
# Copyright (C) 2024 rubenwardy
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from app.blueprints.api.support import error
from app.models import Package, APIToken, Permission, PackageState
def get_packages_for_vcs_and_token(token: APIToken, repo_url: str) -> list[Package]:
if token.package:
packages = [token.package]
if not token.package.check_perm(token.owner, Permission.APPROVE_RELEASE):
return error(403, "You do not have the permission to approve releases")
actual_repo_url: str = token.package.repo or ""
if repo_url not in actual_repo_url.lower():
return error(400, "Repo URL does not match the API token's package")
else:
# Get package
packages = Package.query.filter(
Package.repo.ilike("%{}%".format(repo_url)), Package.state != PackageState.DELETED).all()
if len(packages) == 0:
return error(400,
"Could not find package, did you set the VCS repo in CDB correctly? Expected {}".format(repo_url))
packages = [x for x in packages if x.check_perm(token.owner, Permission.APPROVE_RELEASE)]
if len(packages) == 0:
return error(403, "You do not have the permission to approve releases")
return packages

@ -1,5 +1,5 @@
# ContentDB # ContentDB
# Copyright (C) 2018-21 rubenwardy # Copyright (C) 2018-24 rubenwardy
# #
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
@ -15,33 +15,35 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
import datetime import datetime
import hmac
from flask import Blueprint, abort, Response import requests
from flask_babel import gettext from flask import abort, Response
from app.logic.users import create_user
from flask import redirect, url_for, request, flash, jsonify, current_app from flask import redirect, url_for, request, flash, jsonify, current_app
from flask_babel import gettext
from flask_login import current_user from flask_login import current_user
from sqlalchemy import or_, and_
from app import github, csrf
from app.models import db, User, APIToken, Package, Permission, AuditSeverity, PackageState
from app.utils import abs_url_for, add_audit_log, login_user_set_active, is_safe_url
from app.blueprints.api.support import error, api_create_vcs_release
import hmac, requests
bp = Blueprint("github", __name__) from app import github, csrf
from app.blueprints.api.support import error, api_create_vcs_release
from app.logic.users import create_user
from app.models import db, User, APIToken, AuditSeverity
from app.utils import abs_url_for, add_audit_log, login_user_set_active, is_safe_url
from . import bp
from .common import get_packages_for_vcs_and_token
@bp.route("/github/start/") @bp.route("/github/start/")
def start(): def github_start():
next = request.args.get("next") next = request.args.get("next")
if next and not is_safe_url(next): if next and not is_safe_url(next):
abort(400) abort(400)
return github.authorize("", redirect_uri=abs_url_for("github.callback", next=next)) return github.authorize("", redirect_uri=abs_url_for("vcs.github_callback", next=next))
@bp.route("/github/view/") @bp.route("/github/view/")
def view_permissions(): def github_view_permissions():
url = "https://github.com/settings/connections/applications/" + \ url = "https://github.com/settings/connections/applications/" + \
current_app.config["GITHUB_CLIENT_ID"] current_app.config["GITHUB_CLIENT_ID"]
return redirect(url) return redirect(url)
@ -49,7 +51,7 @@ def view_permissions():
@bp.route("/github/callback/") @bp.route("/github/callback/")
@github.authorized_handler @github.authorized_handler
def callback(oauth_token): def github_callback(oauth_token):
if oauth_token is None: if oauth_token is None:
flash(gettext("Authorization failed [err=gh-oauth-login-failed]"), "danger") flash(gettext("Authorization failed [err=gh-oauth-login-failed]"), "danger")
return redirect(url_for("users.login")) return redirect(url_for("users.login"))
@ -125,61 +127,43 @@ def callback(oauth_token):
return ret return ret
def _find_api_token(header_signature: str) -> APIToken:
sha_name, signature = header_signature.split('=')
if sha_name != 'sha1':
error(403, "Expected SHA1 payload signature")
for token in APIToken.query.all():
mac = hmac.new(token.access_token.encode("utf-8"), msg=request.data, digestmod='sha1')
if hmac.compare_digest(str(mac.hexdigest()), signature):
return token
error(401, "Invalid authentication, couldn't validate API token")
@bp.route("/github/webhook/", methods=["POST"]) @bp.route("/github/webhook/", methods=["POST"])
@csrf.exempt @csrf.exempt
def webhook(): def github_webhook():
json = request.json json = request.json
# Get package
github_url = "github.com/" + json["repository"]["full_name"]
package = Package.query.filter(
Package.repo.ilike("%{}%".format(github_url)), Package.state != PackageState.DELETED).first()
if package is None:
return error(400, "Could not find package, did you set the VCS repo in CDB correctly? Expected {}".format(github_url))
# Get all tokens for package
tokens_query = APIToken.query.filter(or_(APIToken.package==package,
and_(APIToken.package==None, APIToken.owner==package.author)))
possible_tokens = tokens_query.all()
actual_token = None
#
# Check signature
#
header_signature = request.headers.get('X-Hub-Signature') header_signature = request.headers.get('X-Hub-Signature')
if header_signature is None: if header_signature is None:
return error(403, "Expected payload signature") return error(403, "Expected payload signature")
sha_name, signature = header_signature.split('=') token = _find_api_token(header_signature)
if sha_name != 'sha1': packages = get_packages_for_vcs_and_token(token, "github.com/" + json["repository"]["full_name"])
return error(403, "Expected SHA1 payload signature")
for token in possible_tokens:
mac = hmac.new(token.access_token.encode("utf-8"), msg=request.data, digestmod='sha1')
if hmac.compare_digest(str(mac.hexdigest()), signature):
actual_token = token
break
if actual_token is None:
return error(403, "Invalid authentication, couldn't validate API token")
if not package.check_perm(actual_token.owner, Permission.APPROVE_RELEASE):
return error(403, "You do not have the permission to approve releases")
for package in packages:
# #
# Check event # Check event
# #
event = request.headers.get("X-GitHub-Event") event = request.headers.get("X-GitHub-Event")
if event == "push": if event == "push":
ref = json["after"] ref = json["after"]
title = datetime.datetime.utcnow().strftime("%Y-%m-%d") + " " + ref[:5] title = datetime.datetime.utcnow().strftime("%Y-%m-%d") + " " + ref[:5]
branch = json["ref"].replace("refs/heads/", "") branch = json["ref"].replace("refs/heads/", "")
if branch not in [ "master", "main" ]: if branch not in [ "master", "main" ]:
return jsonify({ "success": False, "message": "Webhook ignored, as it's not on the master/main branch" }) continue
elif event == "create": elif event == "create":
ref_type = json.get("ref_type") ref_type = json.get("ref_type")
@ -202,8 +186,12 @@ def webhook():
# #
# Perform release # Perform release
# #
if package.releases.filter_by(commit_hash=ref).count() > 0: if package.releases.filter_by(commit_hash=ref).count() > 0:
return return
return api_create_vcs_release(actual_token, package, title, ref, reason="Webhook") return api_create_vcs_release(token, package, title, ref, reason="Webhook")
return jsonify({
"success": False,
"message": "No release made. Either the release already exists or the event was filtered based on the branch"
})

@ -0,0 +1,81 @@
# ContentDB
# Copyright (C) 2020-24 rubenwardy
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import datetime
from flask import request, jsonify
from app import csrf
from app.blueprints.api.support import error, api_create_vcs_release
from app.models import APIToken
from . import bp
from .common import get_packages_for_vcs_and_token
def webhook_impl():
json = request.json
# Get all tokens for package
secret = request.headers.get("X-Gitlab-Token")
if secret is None:
return error(403, "Token required")
token: APIToken = APIToken.query.filter_by(access_token=secret).first()
if token is None:
return error(403, "Invalid authentication")
packages = get_packages_for_vcs_and_token(token, json["project"]["web_url"].replace("https://", "").replace("http://", ""))
for package in packages:
#
# Check event
#
event = json["event_name"]
if event == "push":
ref = json["after"]
title = datetime.datetime.utcnow().strftime("%Y-%m-%d") + " " + ref[:5]
branch = json["ref"].replace("refs/heads/", "")
if branch not in ["master", "main"]:
continue
elif event == "tag_push":
ref = json["ref"]
title = ref.replace("refs/tags/", "")
else:
return error(400, "Unsupported event: '{}'. Only 'push', 'create:tag', and 'ping' are supported."
.format(event or "null"))
#
# Perform release
#
if package.releases.filter_by(commit_hash=ref).count() > 0:
continue
return api_create_vcs_release(token, package, title, ref, reason="Webhook")
return jsonify({
"success": False,
"message": "No release made. Either the release already exists or the event was filtered based on the branch"
})
@bp.route("/gitlab/webhook/", methods=["POST"])
@csrf.exempt
def gitlab_webhook():
try:
return webhook_impl()
except KeyError as err:
return error(400, "Missing field: {}".format(err.args[0]))

@ -55,7 +55,7 @@
</p> </p>
{% if user == current_user %} {% if user == current_user %}
<a class="btn btn-secondary" href="{{ url_for('github.view_permissions') }}"> <a class="btn btn-secondary" href="{{ url_for('vcs.github_view_permissions') }}">
{{ _("View ContentDB's GitHub Permissions") }} {{ _("View ContentDB's GitHub Permissions") }}
</a> </a>
{% endif %} {% endif %}
@ -66,7 +66,7 @@
</form> </form>
{% endif %} {% endif %}
{% elif user == current_user %} {% elif user == current_user %}
<a class="btn btn-secondary" href="{{ url_for('github.start') }}"> <a class="btn btn-secondary" href="{{ url_for('vcs.github_start') }}">
{{ _("Link Github") }} {{ _("Link Github") }}
</a> </a>
{% else %} {% else %}

@ -25,7 +25,7 @@
<hr class="my-5" /> <hr class="my-5" />
<p> <p>
<a class="btn btn-secondary me-3" href="{{ url_for('github.start', next=next) }}"> <a class="btn btn-secondary me-3" href="{{ url_for('vcs.github_start', next=next) }}">
<i class="fab fa-github me-1"></i> <i class="fab fa-github me-1"></i>
{{ _("GitHub") }} {{ _("GitHub") }}
</a> </a>