# ContentDB # Copyright (C) rubenwardy # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import datetime from datetime import timedelta from typing import Optional from app.models import User, Package, PackageDailyStats, db, PackageState from sqlalchemy import func def daterange(start_date, end_date): for n in range(int((end_date - start_date).days) + 1): yield start_date + timedelta(n) keys = ["platform_minetest", "platform_other", "reason_new", "reason_dependency", "reason_update", "views_minetest"] def flatten_data(stats): start_date = stats[0].date end_date = stats[-1].date result = { "start": start_date.isoformat(), "end": end_date.isoformat(), } for key in keys: result[key] = [] i = 0 for date in daterange(start_date, end_date): stat = stats[i] if stat.date == date: for key in keys: result[key].append(getattr(stat, key)) i += 1 else: for key in keys: result[key].append(0) return result def get_package_stats(package: Package, start_date: Optional[datetime.date], end_date: Optional[datetime.date]): query = package.daily_stats.order_by(db.asc(PackageDailyStats.date)) if start_date: query = query.filter(PackageDailyStats.date >= start_date) if end_date: query = query.filter(PackageDailyStats.date <= end_date) stats = query.all() if len(stats) == 0: return None return flatten_data(stats) def get_package_stats_for_user(user: User, start_date: Optional[datetime.date], end_date: Optional[datetime.date]): query = db.session \ .query(PackageDailyStats.date, func.sum(PackageDailyStats.platform_minetest).label("platform_minetest"), func.sum(PackageDailyStats.platform_other).label("platform_other"), func.sum(PackageDailyStats.reason_new).label("reason_new"), func.sum(PackageDailyStats.reason_dependency).label("reason_dependency"), func.sum(PackageDailyStats.reason_update).label("reason_update"), func.sum(PackageDailyStats.views_minetest).label("views_minetest")) \ .filter(PackageDailyStats.package.has(author_id=user.id)) if start_date: query = query.filter(PackageDailyStats.date >= start_date) if end_date: query = query.filter(PackageDailyStats.date <= end_date) stats = query.order_by(db.asc(PackageDailyStats.date)) \ .group_by(PackageDailyStats.date) \ .all() if len(stats) == 0: return None results = flatten_data(stats) results["package_downloads"] = get_package_overview_for_user(user, stats[0].date, stats[-1].date) return results def get_package_overview_for_user(user: Optional[User], start_date: datetime.date, end_date: datetime.date): query = db.session \ .query(PackageDailyStats.package_id, PackageDailyStats.date, (PackageDailyStats.platform_minetest + PackageDailyStats.platform_other).label("downloads")) if user: query = query.filter(PackageDailyStats.package.has(author_id=user.id)) all_stats = query \ .filter(PackageDailyStats.package.has(state=PackageState.APPROVED), PackageDailyStats.date >= start_date, PackageDailyStats.date <= end_date) \ .order_by(db.asc(PackageDailyStats.package_id), db.asc(PackageDailyStats.date)) \ .all() stats_by_package = {} for stat in all_stats: bucket = stats_by_package.get(stat.package_id, []) stats_by_package[stat.package_id] = bucket bucket.append(stat) package_title_by_id = {} pkg_query = user.packages if user else Package.query for package in pkg_query.filter_by(state=PackageState.APPROVED).all(): if user: package_title_by_id[package.id] = package.title else: package_title_by_id[package.id] = package.get_id() result = {} for package_id, stats in stats_by_package.items(): i = 0 row = [] result[package_title_by_id[package_id]] = row for date in daterange(start_date, end_date): if i >= len(stats): row.append(0) continue stat = stats[i] if stat.date == date: row.append(stat.downloads) i += 1 elif stat.date > date: row.append(0) else: raise Exception(f"Invalid logic, expected stat {stat.date} to be later than {date}") return result def get_all_package_stats(start_date: Optional[datetime.date] = None, end_date: Optional[datetime.date] = None): now_date = datetime.datetime.utcnow().date() if end_date is None or end_date > now_date: end_date = now_date min_start_date = (datetime.datetime.utcnow() - datetime.timedelta(days=29)).date() if start_date is None or start_date < min_start_date: start_date = min_start_date return { "start": start_date.isoformat(), "end": end_date.isoformat(), "package_downloads": get_package_overview_for_user(None, start_date, end_date), }