contentdb/app/utils/git.py

162 lines
4.4 KiB
Python

# ContentDB
# Copyright (C) 2018-21 rubenwardy
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import contextlib
from typing import List, Optional, Tuple
import git
import gitdb
import os
import shutil
import tempfile
from urllib.parse import urlsplit
from git import GitCommandError
from app.tasks import TaskError
from app.utils import random_string, normalize_line_endings
def generate_git_url(urlstr):
scheme, netloc, path, query, frag = urlsplit(urlstr)
if not scheme.startswith("http"):
scheme = "http"
return scheme + "://:@" + netloc + path + query
@contextlib.contextmanager
def get_temp_dir():
temp = os.path.join(tempfile.gettempdir(), random_string(10))
yield temp
shutil.rmtree(temp)
# Clones a repo from an unvalidated URL.
# Returns a tuple of path and repo on sucess.
# Throws `TaskError` on failure.
# Caller is responsible for deleting returned directory.
@contextlib.contextmanager
def clone_repo(url_str, ref=None, recursive=False):
git_dir = os.path.join(tempfile.gettempdir(), random_string(10))
try:
git_url = generate_git_url(url_str)
print("Cloning from " + git_url)
if ref is None:
repo = git.Repo.clone_from(git_url, git_dir,
progress=None, env=None, depth=1, recursive=recursive, kill_after_timeout=15)
else:
assert ref != ""
repo = git.Repo.init(git_dir)
origin = repo.create_remote("origin", url=git_url)
assert origin.exists()
origin.fetch()
repo.git.checkout(ref)
repo.git.submodule('update', '--init')
yield repo
shutil.rmtree(git_dir)
return
except GitCommandError as e:
# This is needed to stop the backtrace being weird
err = e.stderr
except gitdb.exc.BadName as e:
err = "Unable to find the reference " + (ref or "?") + "\n" + e.stderr
raise TaskError(err.replace("stderr: ", "") \
.replace("Cloning into '" + git_dir + "'...", "") \
.strip())
def get_latest_commit(git_url, ref_name=None):
git_url = generate_git_url(git_url)
if ref_name:
ref_name = "refs/heads/" + ref_name
else:
ref_name = "HEAD"
g = git.cmd.Git()
remote_refs = {}
for ref in g.ls_remote(git_url).split('\n'):
hash_ref_list = ref.split('\t')
remote_refs[hash_ref_list[1]] = hash_ref_list[0]
return remote_refs.get(ref_name)
# @returns (tag_name, commit_hash, tag_message)
def get_latest_tag(git_url) -> Tuple[Optional[str], Optional[str], Optional[str]]:
with get_temp_dir() as git_dir:
repo = git.Repo.init(git_dir)
origin = repo.create_remote("origin", url=git_url)
origin.fetch()
refs = repo.git.for_each_ref(sort="creatordate", format="%(objectname)\t%(refname)").split("\n")
refs = [ref for ref in refs if "refs/tags/" in ref]
if len(refs) == 0:
return None, None, None
last_ref = refs[-1]
hash_ref_list = last_ref.split('\t')
tag = hash_ref_list[1].replace("refs/tags/", "")
# "^{}" means dereference the tag until an actual commit is found
commit_hash = repo.git.rev_parse(tag + "^{}")
# Get summary message of annotated tag from GitPython
annotated_tag = repo.tag(tag).tag
if annotated_tag:
message = annotated_tag.message
message = normalize_line_endings(message)
if message == "":
message = None
else:
message = None
return tag, commit_hash, message
def get_commit_list(git_url: str, start: str, end: str) -> List[str]:
with (get_temp_dir() as git_dir):
repo = git.Repo.init(git_dir)
origin = repo.create_remote("origin", url=git_url)
origin.fetch()
commits = repo.iter_commits(f"{start}..{end}")
ret = [commit.summary for commit in commits]
ret.reverse()
return ret
def get_release_notes(git_url: str, start: str, end: str) -> Optional[str]:
commits = get_commit_list(git_url, start, end)
commits = [x for x in commits if not x.startswith("Merge ")]
if len(commits) == 0:
return None
text = "\n".join(map(lambda x: f"- {x}", commits)) + f"\n<!-- auto from {start[0:5]} to {end[0:5]} -->"
return normalize_line_endings(text)