contentdb/app/logic/approval_stats.py
2024-05-17 18:52:55 +01:00

149 lines
4.8 KiB
Python

# ContentDB
# Copyright (C) 2024 rubenwardy
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import datetime
from collections import namedtuple, defaultdict
from typing import Dict, Optional
from sqlalchemy import or_
from app.models import AuditLogEntry, db, PackageState
class PackageInfo:
state: Optional[PackageState]
first_submitted: Optional[datetime.datetime]
last_change: Optional[datetime.datetime]
approved_at: Optional[datetime.datetime]
wait_time: int
total_approval_time: int
is_in_range: bool
events: list[tuple[str, str, str]]
def __init__(self):
self.state = None
self.first_submitted = None
self.last_change = None
self.approved_at = None
self.wait_time = 0
self.total_approval_time = -1
self.is_in_range = False
self.events = []
def __lt__(self, other):
return self.wait_time < other.wait_time
def __dict__(self):
return {
"first_submitted": self.first_submitted.isoformat(),
"last_change": self.last_change.isoformat(),
"approved_at": self.approved_at.isoformat() if self.approved_at else None,
"wait_time": self.wait_time,
"total_approval_time": self.total_approval_time if self.total_approval_time >= 0 else None,
"events": [ { "date": x[0], "by": x[1], "title": x[2] } for x in self.events ],
}
def add_event(self, created_at: datetime.datetime, causer: str, title: str):
self.events.append((created_at.isoformat(), causer, title))
def get_state(title: str):
if title.startswith("Approved "):
return PackageState.APPROVED
assert title.startswith("Marked ")
for state in PackageState:
if state.value in title:
return state
if "Work in Progress" in title:
return PackageState.WIP
raise Exception(f"Unable to get state for title {title}")
Result = namedtuple("Result", "editor_approvals packages_info avg_turnaround_time max_turnaround_time")
def _get_approval_statistics(entries: list[AuditLogEntry], start_date: Optional[datetime.datetime] = None, end_date: Optional[datetime.datetime] = None) -> Result:
editor_approvals = defaultdict(int)
package_info: Dict[str, PackageInfo] = {}
ignored_packages = set()
turnaround_times: list[int] = []
for entry in entries:
package_id = str(entry.package.get_id())
if package_id in ignored_packages:
continue
info = package_info.get(package_id, PackageInfo())
package_info[package_id] = info
is_in_range = (((start_date is None or entry.created_at >= start_date) and
(end_date is None or entry.created_at <= end_date)))
info.is_in_range = info.is_in_range or is_in_range
new_state = get_state(entry.title)
if new_state == info.state:
continue
info.add_event(entry.created_at, entry.causer.username if entry.causer else None, new_state.value)
if info.state == PackageState.READY_FOR_REVIEW:
seconds = int((entry.created_at - info.last_change).total_seconds())
info.wait_time += seconds
if is_in_range:
turnaround_times.append(seconds)
if new_state == PackageState.APPROVED:
ignored_packages.add(package_id)
info.approved_at = entry.created_at
if is_in_range:
editor_approvals[entry.causer.username] += 1
if info.first_submitted is not None:
info.total_approval_time = int((entry.created_at - info.first_submitted).total_seconds())
elif new_state == PackageState.READY_FOR_REVIEW:
if info.first_submitted is None:
info.first_submitted = entry.created_at
info.state = new_state
info.last_change = entry.created_at
packages_info_2 = {}
package_count = 0
for package_id, info in package_info.items():
if info.first_submitted and info.is_in_range:
package_count += 1
packages_info_2[package_id] = info
if len(turnaround_times) > 0:
avg_turnaround_time = sum(turnaround_times) / len(turnaround_times)
max_turnaround_time = max(turnaround_times)
else:
avg_turnaround_time = 0
max_turnaround_time = 0
return Result(editor_approvals, packages_info_2, avg_turnaround_time, max_turnaround_time)
def get_approval_statistics(start_date: Optional[datetime.datetime] = None, end_date: Optional[datetime.datetime] = None) -> Result:
entries = AuditLogEntry.query.filter(AuditLogEntry.package).filter(or_(
AuditLogEntry.title.like("Approved %"),
AuditLogEntry.title.like("Marked %"))
).order_by(db.asc(AuditLogEntry.created_at)).all()
return _get_approval_statistics(entries, start_date, end_date)