# Content DB # Copyright (C) 2018 rubenwardy # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . from flask import Blueprint bp = Blueprint("github", __name__) from flask import redirect, url_for, request, flash, abort from flask_user import current_user from sqlalchemy import func from flask_github import GitHub from app import github, csrf from app.models import db, User, APIToken, Package from app.utils import loginUser from app.blueprints.api.support import error, handleCreateRelease import hmac @bp.route("/github/start/") def start(): return github.authorize("") @bp.route("/github/callback/") @github.authorized_handler def callback(oauth_token): next_url = request.args.get("next") if oauth_token is None: flash("Authorization failed [err=gh-oauth-login-failed]", "danger") return redirect(url_for("user.login")) import requests # Get Github username url = "https://api.github.com/user" r = requests.get(url, headers={"Authorization": "token " + oauth_token}) username = r.json()["login"] # Get user by github username userByGithub = User.query.filter(func.lower(User.github_username) == func.lower(username)).first() # If logged in, connect if current_user and current_user.is_authenticated: if userByGithub is None: current_user.github_username = username db.session.commit() flash("Linked github to account", "success") return redirect(url_for("homepage.home")) else: flash("Github account is already associated with another user", "danger") return redirect(url_for("homepage.home")) # If not logged in, log in else: if userByGithub is None: flash("Unable to find an account for that Github user", "danger") return redirect(url_for("users.claim")) elif loginUser(userByGithub): if not current_user.hasPassword(): return redirect(next_url or url_for("users.set_password", optional=True)) else: return redirect(next_url or url_for("homepage.home")) else: flash("Authorization failed [err=gh-login-failed]", "danger") return redirect(url_for("user.login")) @bp.route("/github/webhook/", methods=["POST"]) @csrf.exempt def webhook(): json = request.json # Get package github_url = "github.com/" + json["repository"]["full_name"] package = Package.query.filter(Package.repo.like("%{}%".format(github_url))).first() if package is None: return error(400, "Unknown package") # Get all tokens for package possible_tokens = APIToken.query.filter_by(package=package).all() actual_token = None # # Check signature # header_signature = request.headers.get('X-Hub-Signature') if header_signature is None: return error(403, "Expected payload signature") sha_name, signature = header_signature.split('=') if sha_name != 'sha1': return error(403, "Expected SHA1 payload signature") for token in possible_tokens: mac = hmac.new(token.access_token.encode("utf-8"), msg=request.data, digestmod='sha1') if hmac.compare_digest(str(mac.hexdigest()), signature): actual_token = token break if actual_token is None: return error(403, "Invalid authentication") # # Check event # event = request.headers.get("X-GitHub-Event") if event == "push": title = json["head_commit"]["message"].partition("\n")[0] ref = json["after"] else: return error(400, "Unknown event, expected 'push'") # # Perform release # return handleCreateRelease(actual_token, package, title, ref)