# ContentDB # Copyright (C) 2024 rubenwardy # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import datetime from collections import namedtuple, defaultdict from typing import Dict, Optional from sqlalchemy import or_ from app.models import AuditLogEntry, db, PackageState class PackageInfo: state: Optional[PackageState] first_submitted: Optional[datetime.datetime] last_change: Optional[datetime.datetime] approved_at: Optional[datetime.datetime] wait_time: int total_approval_time: int is_in_range: bool events: list[tuple[str, str, str]] def __init__(self): self.state = None self.first_submitted = None self.last_change = None self.approved_at = None self.wait_time = 0 self.total_approval_time = -1 self.is_in_range = False self.events = [] def __lt__(self, other): return self.wait_time < other.wait_time def __dict__(self): return { "first_submitted": self.first_submitted.isoformat(), "last_change": self.last_change.isoformat(), "approved_at": self.approved_at.isoformat() if self.approved_at else None, "wait_time": self.wait_time, "total_approval_time": self.total_approval_time if self.total_approval_time >= 0 else None, "events": [ { "date": x[0], "by": x[1], "title": x[2] } for x in self.events ], } def add_event(self, created_at: datetime.datetime, causer: str, title: str): self.events.append((created_at.isoformat(), causer, title)) def get_state(title: str): if title.startswith("Approved "): return PackageState.APPROVED assert title.startswith("Marked ") for state in PackageState: if state.value in title: return state if "Work in Progress" in title: return PackageState.WIP raise Exception(f"Unable to get state for title {title}") Result = namedtuple("Result", "editor_approvals packages_info avg_turnaround_time max_turnaround_time") def _get_approval_statistics(entries: list[AuditLogEntry], start_date: Optional[datetime.datetime] = None, end_date: Optional[datetime.datetime] = None) -> Result: editor_approvals = defaultdict(int) package_info: Dict[str, PackageInfo] = {} ignored_packages = set() turnaround_times: list[int] = [] for entry in entries: package_id = str(entry.package.get_id()) if package_id in ignored_packages: continue info = package_info.get(package_id, PackageInfo()) package_info[package_id] = info is_in_range = (((start_date is None or entry.created_at >= start_date) and (end_date is None or entry.created_at <= end_date))) info.is_in_range = info.is_in_range or is_in_range new_state = get_state(entry.title) if new_state == info.state: continue info.add_event(entry.created_at, entry.causer.username if entry.causer else None, new_state.value) if info.state == PackageState.READY_FOR_REVIEW: seconds = int((entry.created_at - info.last_change).total_seconds()) info.wait_time += seconds if is_in_range: turnaround_times.append(seconds) if new_state == PackageState.APPROVED: ignored_packages.add(package_id) info.approved_at = entry.created_at if is_in_range: editor_approvals[entry.causer.username] += 1 if info.first_submitted is not None: info.total_approval_time = int((entry.created_at - info.first_submitted).total_seconds()) elif new_state == PackageState.READY_FOR_REVIEW: if info.first_submitted is None: info.first_submitted = entry.created_at info.state = new_state info.last_change = entry.created_at packages_info_2 = {} package_count = 0 for package_id, info in package_info.items(): if info.first_submitted and info.is_in_range: package_count += 1 packages_info_2[package_id] = info if len(turnaround_times) > 0: avg_turnaround_time = sum(turnaround_times) / len(turnaround_times) max_turnaround_time = max(turnaround_times) else: avg_turnaround_time = 0 max_turnaround_time = 0 return Result(editor_approvals, packages_info_2, avg_turnaround_time, max_turnaround_time) def get_approval_statistics(start_date: Optional[datetime.datetime] = None, end_date: Optional[datetime.datetime] = None) -> Result: entries = AuditLogEntry.query.filter(AuditLogEntry.package).filter(or_( AuditLogEntry.title.like("Approved %"), AuditLogEntry.title.like("Marked %")) ).order_by(db.asc(AuditLogEntry.created_at)).all() return _get_approval_statistics(entries, start_date, end_date)