Rewrite game support algorithm

Fixes #395
This commit is contained in:
rubenwardy 2024-03-27 19:03:48 +00:00
parent f9048a8f49
commit 2c0d90e797
6 changed files with 865 additions and 149 deletions

@ -34,9 +34,9 @@ from app.logic.LogicError import LogicError
from app.logic.packages import do_edit_package
from app.querybuilder import QueryBuilder
from app.rediscache import has_key, set_temp_key
from app.tasks.importtasks import import_repo_screenshot, check_zip_release
from app.tasks.importtasks import import_repo_screenshot, check_zip_release, remove_package_game_support, \
update_package_game_support
from app.tasks.webhooktasks import post_discord_webhook
from app.logic.game_support import GameSupportResolver
from . import bp, get_package_tabs
from app.models import Package, Tag, db, User, Tags, PackageState, Permission, PackageType, MetaPackage, ForumTopic, \
@ -46,6 +46,7 @@ from app.models import Package, Tag, db, User, Tags, PackageState, Permission, P
from app.utils import is_user_bot, get_int_or_abort, is_package_page, abs_url_for, add_audit_log, get_package_by_info, \
add_notification, get_system_user, rank_required, get_games_from_csv, get_daterange_options, post_to_approval_thread
from app.logic.package_approval import validate_package_for_approval, can_move_to_state
from app.logic.game_support import game_support_set
@bp.route("/packages/")
@ -409,6 +410,7 @@ def move_to_state(package):
s.approved = True
msg = "Approved {}".format(package.title)
update_package_game_support.delay(package.id)
elif state == PackageState.READY_FOR_REVIEW:
post_discord_webhook.delay(package.author.display_name,
"Ready for Review: {}".format(package.get_url("packages.view", absolute=True)), True,
@ -478,6 +480,8 @@ def remove(package):
f"Deleted package {package.author.username}/{package.name} with reason '{reason}'",
True, package.title, package.short_desc, package.get_thumb_url(2, True, "png"))
remove_package_game_support.delay(package.id)
flash(gettext("Deleted package"), "success")
return redirect(url)
@ -498,6 +502,8 @@ def remove(package):
"Unapproved package with reason {}\n\n{}".format(reason, package.get_url("packages.view", absolute=True)), True,
package.title, package.short_desc, package.get_thumb_url(2, True, "png"))
remove_package_game_support.delay(package.id)
flash(gettext("Unapproved package"), "success")
return redirect(package.get_url("packages.view"))
@ -724,14 +730,12 @@ def game_support(package):
if can_override:
try:
resolver = GameSupportResolver(db.session)
game_is_supported = {}
for game in get_games_from_csv(db.session, form.supported.data or ""):
game_is_supported[game.id] = True
for game in get_games_from_csv(db.session, form.unsupported.data or ""):
game_is_supported[game.id] = False
resolver.set_supported(package, game_is_supported, 11)
game_support_set(db.session, package, game_is_supported, 11)
detect_update_needed = True
except LogicError as e:
flash(e.message, "danger")

@ -1,5 +1,5 @@
# ContentDB
# Copyright (C) 2022 rubenwardy
# Copyright (C) rubenwardy
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@ -14,31 +14,12 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import List, Dict, Optional
import sys
from typing import List, Dict
import sqlalchemy
import sqlalchemy.orm
from app.logic.LogicError import LogicError
from app.models import Package, MetaPackage, PackageType, PackageState, PackageGameSupport
"""
get_game_support(package):
if package is a game:
return [ package ]
for all hard dependencies:
support = support AND get_meta_package_support(dep)
return support
get_meta_package_support(meta):
for package implementing mod name:
support = support OR get_game_support(package)
return support
"""
from app.models import PackageType, Package, PackageState, PackageGameSupport
from app.utils import post_bot_message
minetest_game_mods = {
@ -55,123 +36,319 @@ mtg_mod_blacklist = {
}
class GameSupportResolver:
session: sqlalchemy.orm.Session
checked_packages = set()
checked_modnames = set()
resolved_packages: Dict[int, set[int]] = {}
resolved_modnames: Dict[int, set[int]] = {}
class GSPackage:
author: str
name: str
type: PackageType
def __init__(self, session):
self.session = session
provides: set[str]
depends: set[str]
def resolve_for_meta_package(self, meta: MetaPackage, history: List[str]) -> set[int]:
print(f"Resolving for {meta.name}", file=sys.stderr)
user_supported_games: set[str]
user_unsupported_games: set[str]
detected_supported_games: set[str]
supports_all_games: bool
key = meta.name
if key in self.resolved_modnames:
return self.resolved_modnames.get(key)
detection_disabled: bool
if key in self.checked_modnames:
print(f"Error, cycle found: {','.join(history)}", file=sys.stderr)
return set()
is_confirmed: bool
errors: set[str]
self.checked_modnames.add(key)
def __init__(self, author: str, name: str, type: PackageType, provides: set[str]):
self.author = author
self.name = name
self.type = type
self.provides = provides
self.depends = set()
self.user_supported_games = set()
self.user_unsupported_games = set()
self.detected_supported_games = set()
self.supports_all_games = False
self.detection_disabled = False
self.is_confirmed = type == PackageType.GAME
self.errors = set()
retval = set()
# For dodgy games, discard MTG mods
if self.type == PackageType.GAME and self.name in mtg_mod_blacklist:
self.provides.difference_update(minetest_game_mods)
for package in meta.packages:
if package.state != PackageState.APPROVED:
continue
@property
def id_(self) -> str:
return f"{self.author}/{self.name}"
if meta.name in minetest_game_mods and package.name in mtg_mod_blacklist:
continue
@property
def supported_games(self) -> set[str]:
ret = set()
ret.update(self.user_supported_games)
if not self.detection_disabled:
ret.update(self.detected_supported_games)
ret.difference_update(self.user_unsupported_games)
return ret
ret = self.resolve(package, history)
if len(ret) == 0:
retval = set()
@property
def unsupported_games(self) -> set[str]:
return self.user_unsupported_games
class GameSupport:
packages: Dict[str, GSPackage]
modified_packages: set[GSPackage]
def __init__(self):
self.packages = {}
self.modified_packages = set()
@property
def all_confirmed(self):
return all([x.is_confirmed for x in self.packages.values()])
@property
def has_errors(self):
return any([len(x.errors) > 0 for x in self.packages.values()])
@property
def error_count(self):
return sum([len(x.errors) for x in self.packages.values()])
@property
def all_errors(self) -> set[str]:
errors = set()
for package in self.packages.values():
for err in package.errors:
errors.add(package.id_ + ": " + err)
return errors
def add(self, package: GSPackage) -> GSPackage:
self.packages[package.id_] = package
return package
def get(self, id_: str) -> Optional[GSPackage]:
return self.packages[id_]
def get_all_that_provide(self, modname: str) -> List[GSPackage]:
return [package for package in self.packages.values() if modname in package.provides]
def get_all_that_depend_on(self, modname: str) -> List[GSPackage]:
return [package for package in self.packages.values() if modname in package.depends]
def _get_supported_games_for_modname(self, depend: str, visited: list[str]):
dep_supports_all = False
for_dep = set()
for provider in self.get_all_that_provide(depend):
found_in = self._get_supported_games(provider, visited)
if found_in is None:
# Unsupported, keep going
pass
elif len(found_in) == 0:
dep_supports_all = True
break
else:
for_dep.update(found_in)
retval.update(ret)
return dep_supports_all, for_dep
self.resolved_modnames[key] = retval
return retval
def _get_supported_games_for_deps(self, package: GSPackage, visited: list[str]) -> Optional[set[str]]:
ret = set()
def resolve(self, package: Package, history: List[str]) -> set[int]:
key: int = package.id
print(f"Resolving for {key}", file=sys.stderr)
for depend in package.depends:
dep_supports_all, for_dep = self._get_supported_games_for_modname(depend, visited)
history = history.copy()
history.append(package.get_id())
if dep_supports_all:
# Dep is game independent
pass
elif len(for_dep) == 0:
package.errors.add(f"Unable to fulfill dependency {depend}")
return None
elif len(ret) == 0:
ret = for_dep
else:
ret.intersection_update(for_dep)
if len(ret) == 0:
package.errors.add("Game support conflict, unable to install package on any games")
return None
return ret
def _get_supported_games(self, package: GSPackage, visited: list[str]) -> Optional[set[str]]:
if package.id_ in visited:
first_idx = visited.index(package.id_)
visited = visited[first_idx:]
err = f"Dependency cycle detected: {' -> '.join(visited)} -> {package.id_}"
for id_ in visited:
package2 = self.get(id_)
package2.errors.add(err)
return None
visited = visited.copy()
visited.append(package.id_)
if package.type == PackageType.GAME:
return {package.id}
return {package.name}
elif package.is_confirmed:
return package.supported_games
else:
ret = self._get_supported_games_for_deps(package, visited)
if ret is None:
assert len(package.errors) > 0
return None
if key in self.resolved_packages:
return self.resolved_packages.get(key)
ret = ret.copy()
ret.difference_update(package.user_unsupported_games)
package.detected_supported_games = ret
self.modified_packages.add(package)
if key in self.checked_packages:
print(f"Error, cycle found: {','.join(history)}", file=sys.stderr)
return set()
if len(ret) > 0:
for supported in package.user_supported_games:
if supported not in ret:
package.errors.add(f"`{supported}` is specified in supported_games but it is impossible to run {package.name} in that game. " +
f"Its dependencies can only be fulfilled in {', '.join([f'`{x}`' for x in ret])}. " +
"Check your hard dependencies.")
self.checked_packages.add(key)
if package.supports_all_games:
package.errors.add(
"This package cannot support all games as some dependencies require specific game(s): " +
", ".join([f'`{x}`' for x in ret]))
if package.type != PackageType.MOD:
raise LogicError(500, "Got non-mod")
package.is_confirmed = True
return package.supported_games
retval = set()
def on_update(self, package: GSPackage):
to_update = {package}
checked = set()
for dep in package.dependencies.filter_by(optional=False).all():
ret = self.resolve_for_meta_package(dep.meta_package, history)
if len(ret) == 0:
continue
elif len(retval) == 0:
retval.update(ret)
else:
retval.intersection_update(ret)
if len(retval) == 0:
raise LogicError(500, f"Detected game support contradiction, {key} may not be compatible with any games")
while len(to_update) > 0:
current_package = to_update.pop()
if current_package.id_ in self.packages and current_package.type != PackageType.GAME:
current_package.is_confirmed = False
current_package.detected_supported_games = []
self._get_supported_games(current_package, [])
self.resolved_packages[key] = retval
return retval
for modname in current_package.provides:
for depending_package in self.get_all_that_depend_on(modname):
if depending_package not in checked:
to_update.add(depending_package)
checked.add(depending_package)
def init_all(self) -> None:
for package in self.session.query(Package).filter(Package.type == PackageType.MOD, Package.state != PackageState.DELETED).all():
retval = self.resolve(package, [])
for game_id in retval:
game = self.session.query(Package).get(game_id)
support = PackageGameSupport(package, game, 1, True)
self.session.add(support)
def on_remove(self, package: GSPackage):
del self.packages[package.id_]
self.on_update(package)
"""
Update game supported package on a package, given the confidence.
def on_first_run(self):
for package in self.packages.values():
if not package.is_confirmed:
self.on_update(package)
Higher confidences outweigh lower ones.
"""
def set_supported(self, package: Package, game_is_supported: Dict[int, bool], confidence: int):
previous_supported: Dict[int, PackageGameSupport] = {}
for support in package.supported_games.all():
previous_supported[support.game.id] = support
for game_id, supports in game_is_supported.items():
game = self.session.query(Package).get(game_id)
lookup = previous_supported.pop(game_id, None)
if lookup is None:
support = PackageGameSupport(package, game, confidence, supports)
self.session.add(support)
elif lookup.confidence <= confidence:
lookup.supports = supports
lookup.confidence = confidence
def _convert_package(support: GameSupport, package: Package) -> GSPackage:
# Unapproved packages shouldn't be considered to fulfill anything
provides = set()
if package.state == PackageState.APPROVED:
provides = set([x.name for x in package.provides])
for game, support in previous_supported.items():
if support.confidence == confidence:
self.session.delete(support)
gs_package = GSPackage(package.author.username, package.name, package.type, provides)
gs_package.depends = set([x.meta_package.name for x in package.dependencies if not x.optional])
gs_package.detection_disabled = not package.enable_game_support_detection
gs_package.supports_all_games = package.supports_all_games
def update(self, package: Package) -> None:
game_is_supported = {}
if package.enable_game_support_detection:
retval = self.resolve(package, [])
for game_id in retval:
game_is_supported[game_id] = True
existing_game_support = (package.supported_games
.filter(PackageGameSupport.game.has(state=PackageState.APPROVED),
PackageGameSupport.confidence > 5)
.all())
gs_package.user_supported_games = [x.game.name for x in existing_game_support if x.supports]
gs_package.user_unsupported_games = [x.game.name for x in existing_game_support if not x.supports]
return support.add(gs_package)
self.set_supported(package, game_is_supported, 1)
def _create_instance(session: sqlalchemy.orm.Session) -> GameSupport:
support = GameSupport()
packages: List[Package] = (session.query(Package)
.filter(Package.state == PackageState.APPROVED, Package.type.in_([PackageType.GAME, PackageType.MOD]))
.all())
for package in packages:
_convert_package(support, package)
return support
def _persist(session: sqlalchemy.orm.Session, support: GameSupport):
for gs_package in support.packages.values():
if len(gs_package.errors) != 0:
msg = "\n".join([f"- {x}" for x in gs_package.errors])
package = session.query(Package).filter(
Package.author.has(username=gs_package.author),
Package.name == gs_package.name).one()
post_bot_message(package, "Error when checking game support", msg, session)
for gs_package in support.modified_packages:
if not gs_package.detection_disabled:
package = session.query(Package).filter(
Package.author.has(username=gs_package.author),
Package.name == gs_package.name).one()
# Clear existing
session.query(PackageGameSupport) \
.filter_by(package=package, confidence=1) \
.delete()
# Add new
supported_games = gs_package.supported_games \
.difference(gs_package.user_supported_games)
for game_name in supported_games:
game_id = session.query(Package.id) \
.filter(Package.type == PackageType.GAME, Package.name == game_name, Package.state == PackageState.APPROVED) \
.one()[0]
new_support = PackageGameSupport()
new_support.package = package
new_support.game_id = game_id
new_support.confidence = 1
new_support.supports = True
session.add(new_support)
def game_support_update(session: sqlalchemy.orm.Session, package: Package) -> set[str]:
support = _create_instance(session)
gs_package = support.get(package.get_id())
if gs_package is None:
gs_package = _convert_package(support, package)
support.on_update(gs_package)
_persist(session, support)
return gs_package.errors
def game_support_update_all(session: sqlalchemy.orm.Session):
support = _create_instance(session)
support.on_first_run()
_persist(session, support)
def game_support_remove(session: sqlalchemy.orm.Session, package: Package):
support = _create_instance(session)
gs_package = support.get(package.get_id())
if gs_package is None:
gs_package = _convert_package(support, package)
support.on_remove(gs_package)
_persist(session, support)
def game_support_set(session, package: Package, game_is_supported: Dict[int, bool], confidence: int):
previous_supported: Dict[int, PackageGameSupport] = {}
for support in package.supported_games.all():
previous_supported[support.game.id] = support
for game_id, supports in game_is_supported.items():
game = session.query(Package).get(game_id)
lookup = previous_supported.pop(game_id, None)
if lookup is None:
support = PackageGameSupport(package, game, confidence, supports)
session.add(support)
elif lookup.confidence <= confidence:
lookup.supports = supports
lookup.confidence = confidence
for game, support in previous_supported.items():
if support.confidence == confidence:
session.delete(support)

@ -329,12 +329,6 @@ class PackageGameSupport(db.Model):
__table_args__ = (db.UniqueConstraint("game_id", "package_id", name="_package_game_support_uc"),)
def __init__(self, package, game, confidence, supports):
self.package = package
self.game = game
self.confidence = confidence
self.supports = supports
class Package(db.Model):
query_class = PackageQuery
@ -531,14 +525,14 @@ class Package(db.Model):
def get_sorted_optional_dependencies(self):
return self.get_sorted_dependencies(False)
def get_sorted_game_support(self):
def get_sorted_game_support(self) -> list[PackageGameSupport]:
query = self.supported_games.filter(PackageGameSupport.game.has(state=PackageState.APPROVED))
supported = query.all()
supported.sort(key=lambda x: -(x.game.score + 100000*x.confidence))
return supported
def get_sorted_game_support_pair(self):
def get_sorted_game_support_pair(self) -> list[list[PackageGameSupport]]:
supported = self.get_sorted_game_support()
return [
[x for x in supported if x.supports],

@ -40,8 +40,8 @@ from .minetestcheck import build_tree, MinetestCheckError, ContentType, PackageT
from .webhooktasks import post_discord_webhook
from app import app
from app.logic.LogicError import LogicError
from app.logic.game_support import GameSupportResolver
from app.logic.packages import do_edit_package, ALIASES
from app.logic.game_support import game_support_update, game_support_set, game_support_update_all, game_support_remove
from app.utils.image import get_image_size
@ -90,8 +90,21 @@ def get_meta(urlstr, author):
@celery.task()
def update_all_game_support():
resolver = GameSupportResolver(db.session)
resolver.init_all()
game_support_update_all(db.session)
db.session.commit()
@celery.task()
def update_package_game_support(package_id: int):
package = Package.query.get(package_id)
game_support_update(db.session, package)
db.session.commit()
@celery.task()
def remove_package_game_support(package_id: int):
package = Package.query.get(package_id)
game_support_remove(db.session, package)
db.session.commit()
@ -186,8 +199,6 @@ def post_release_check_update(self, release: PackageRelease, path):
# Update game support
if package.type == PackageType.MOD or package.type == PackageType.TXP:
resolver = GameSupportResolver(db.session)
game_is_supported = {}
if "supported_games" in tree.meta:
for game in get_games_from_list(db.session, tree.meta["supported_games"]):
@ -205,17 +216,21 @@ def post_release_check_update(self, release: PackageRelease, path):
for game in get_games_from_list(db.session, tree.meta["unsupported_games"]):
game_is_supported[game.id] = False
resolver.set_supported(package, game_is_supported, 10)
game_support_set(db.session, package, game_is_supported, 10)
if package.type == PackageType.MOD:
resolver.update(package)
errors = game_support_update(db.session, package)
if len(errors) != 0:
raise TaskError("Error validating game support:\n\n" + "\n".join([f"- {x}" for x in errors]))
return tree
except (MinetestCheckError, TaskError, LogicError) as err:
db.session.rollback()
error_message = err.value if hasattr(err, "value") else str(err)
task_url = url_for('tasks.check', id=self.request.id)
msg = f"{err}\n\n[View Release]({release.get_edit_url()}) | [View Task]({task_url})"
msg = f"{error_message}\n\n[View Release]({release.get_edit_url()}) | [View Task]({task_url})"
post_bot_message(release.package, f"Release {release.title} validation failed", msg)
if "Fails validation" not in release.title:
@ -225,7 +240,7 @@ def post_release_check_update(self, release: PackageRelease, path):
release.approved = False
db.session.commit()
raise TaskError(str(err))
raise TaskError(error_message)
def update_translations(package: Package, tree: PackageTreeNode):

@ -0,0 +1,517 @@
# ContentDB
# Copyright (C) rubenwardy
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import List
from app.logic.game_support import GSPackage, GameSupport
from app.models import PackageType
def make_mod(name: str, provides: List[str], deps: List[str]) -> GSPackage:
ret = GSPackage("author", name, PackageType.MOD, set(provides))
ret.depends.update(deps)
return ret
def make_game(name: str, provides: List[str]) -> GSPackage:
return GSPackage("author", name, PackageType.GAME, set(provides))
def test_game_supports_itself():
"""
Games obviously support themselves
"""
support = GameSupport()
game = support.add(make_game("game1", ["default"]))
assert not support.has_errors
assert game.is_confirmed
assert len(game.detected_supported_games) == 0
support.on_update(game)
assert not support.has_errors
assert game.is_confirmed
assert len(game.detected_supported_games) == 0
def test_no_deps():
"""
Test a mod with no dependencies supports all games
"""
support = GameSupport()
support.add(make_game("game1", ["default"]))
mod1 = support.add(make_mod("mod1", ["mod1"], []))
support.on_update(mod1)
assert not support.has_errors
assert mod1.is_confirmed
assert len(mod1.detected_supported_games) == 0
def test_direct_game_dep():
"""
Test that depending on a mod in a game works
"""
support = GameSupport()
support.add(make_game("game1", ["default"]))
mod1 = support.add(make_mod("mod1", ["mod1"], ["default"]))
support.on_update(mod1)
assert not support.has_errors
assert mod1.is_confirmed
assert mod1.detected_supported_games == {"game1"}
def test_indirect_game_dep():
"""
Test that depending on a mod that depends on a game works
"""
support = GameSupport()
support.add(make_game("game1", ["default"]))
mod1 = support.add(make_mod("mod1", ["mod1"], ["default"]))
mod2 = support.add(make_mod("mod2", ["mod2"], ["mod1"]))
support.on_update(mod2)
assert not support.has_errors
assert mod1.is_confirmed
assert mod1.detected_supported_games == {"game1"}
assert mod2.is_confirmed
assert mod2.detected_supported_games == {"game1"}
def test_multiple_game_dep():
"""
Test with multiple games, with dependencies in games and as standalone mods
"""
support = GameSupport()
support.add(make_game("game1", ["default"]))
support.add(make_game("game2", ["core", "mod_b"]))
lib = support.add(make_mod("lib", ["lib"], []))
modB = support.add(make_mod("mod_b", ["mod_b"], ["default"]))
modA = support.add(make_mod("mod_a", ["mod_a"], ["mod_b", "lib"]))
support.on_update(modA)
assert not support.has_errors
assert modA.is_confirmed
assert modA.detected_supported_games == {"game1", "game2"}
assert modB.is_confirmed
assert modB.detected_supported_games == {"game1"}
assert lib.is_confirmed
assert len(lib.detected_supported_games) == 0
def test_dependency_supports_all():
"""
Test with dependencies that support all games
"""
support = GameSupport()
support.add(make_game("game1", ["default"]))
mod1 = support.add(make_mod("mod1", ["mod1"], ["default"]))
lib = support.add(make_mod("lib", ["lib"], []))
mod2 = support.add(make_mod("mod2", ["mod2"], ["mod1", "lib"]))
support.on_update(mod2)
assert not support.has_errors
assert mod1.is_confirmed
assert mod1.detected_supported_games == {"game1"}
assert mod2.is_confirmed
assert mod2.detected_supported_games == {"game1"}
assert lib.is_confirmed
assert len(lib.detected_supported_games) == 0
def test_dependency_supports_all2():
"""
Test with dependencies that support all games, but are also in games
"""
support = GameSupport()
support.add(make_game("game1", ["default", "lib"]))
lib = support.add(make_mod("lib", ["lib"], []))
mod1 = support.add(make_mod("mod1", ["mod1"], ["lib"]))
support.on_update(mod1)
assert not support.has_errors
assert mod1.is_confirmed
assert len(mod1.detected_supported_games) == 0
assert lib.is_confirmed
assert len(lib.detected_supported_games) == 0
def test_dependency_game_conflict():
"""
Test situation where a mod is not installable in any games
"""
support = GameSupport()
support.add(make_game("game1", ["default", "mod_b"]))
support.add(make_game("game2", ["default", "mod_c"]))
modA = support.add(make_mod("mod_a", ["mod_a"], ["mod_b", "mod_c"]))
support.on_update(modA)
assert not modA.is_confirmed
assert len(modA.detected_supported_games) == 0
assert support.all_errors == {
"author/mod_a: Game support conflict, unable to install package on any games"
}
def test_missing_hard_dep():
"""
Test missing hard dependency
"""
support = GameSupport()
support.add(make_game("game1", ["default"]))
modA = support.add(make_mod("mod_a", ["mod_a"], ["nonexist"]))
support.on_update(modA)
assert not modA.is_confirmed
assert len(modA.detected_supported_games) == 0
assert support.all_errors == {
"author/mod_a: Unable to fulfill dependency nonexist"
}
def test_cycle():
"""
Test for dependency cycles
"""
support = GameSupport()
support.add(make_game("game1", ["default"]))
support.add(make_mod("mod_b", ["mod_b"], ["mod_a"]))
modA = support.add(make_mod("mod_a", ["mod_a"], ["mod_b"]))
support.on_update(modA)
assert support.all_errors == {
"author/mod_a: Dependency cycle detected: author/mod_b -> author/mod_a -> author/mod_b",
"author/mod_b: Dependency cycle detected: author/mod_a -> author/mod_b -> author/mod_a",
"author/mod_a: Dependency cycle detected: author/mod_a -> author/mod_b -> author/mod_a",
"author/mod_b: Unable to fulfill dependency mod_a",
"author/mod_b: Dependency cycle detected: author/mod_b -> author/mod_a -> author/mod_b",
"author/mod_a: Unable to fulfill dependency mod_b"
}
def test_cycle_fails_safely():
"""
A dependency cycle shouldn't completely break the graph if a mod is
available elsewhere
"""
support = GameSupport()
support.add(make_game("game1", ["default", "mod_d"]))
modC = support.add(make_mod("mod_c", ["mod_c"], ["mod_b"]))
modB = support.add(make_mod("mod_b", ["mod_b"], ["mod_c"]))
support.add(make_mod("mod_d", ["mod_d"], ["mod_b"]))
modA = support.add(make_mod("mod_a", ["mod_a"], ["mod_d"]))
support.on_update(modA)
assert not modC.is_confirmed
assert not modB.is_confirmed
assert modA.is_confirmed
assert len(modA.errors) == 0
assert modA.detected_supported_games == {"game1"}
assert support.all_errors == {
"author/mod_b: Unable to fulfill dependency mod_c",
"author/mod_d: Unable to fulfill dependency mod_b",
"author/mod_c: Unable to fulfill dependency mod_b",
"author/mod_c: Dependency cycle detected: author/mod_b -> author/mod_c -> author/mod_b",
"author/mod_b: Dependency cycle detected: author/mod_b -> author/mod_c -> author/mod_b"
}
def test_update():
"""
Test updating a mod will update mods that depend on it
"""
support = GameSupport()
support.add(make_game("game1", ["default"]))
game2 = support.add(make_game("game2", ["core"]))
lib = support.add(make_mod("lib", ["lib"], []))
modB = support.add(make_mod("mod_b", ["mod_b"], ["default"]))
modA = support.add(make_mod("mod_a", ["mod_a"], ["mod_b", "lib"]))
support.on_update(modA)
assert not support.has_errors
assert modA.is_confirmed
assert modA.detected_supported_games == {"game1"}
assert modB.is_confirmed
assert modB.detected_supported_games == {"game1"}
assert lib.is_confirmed
assert len(lib.detected_supported_games) == 0
game2.provides.add("mod_b")
support.on_update(game2)
assert not support.has_errors
assert modA.is_confirmed
assert modA.detected_supported_games == {"game1", "game2"}
assert modB.is_confirmed
assert modB.detected_supported_games == {"game1"}
assert lib.is_confirmed
assert len(lib.detected_supported_games) == 0
def test_update_new_mod():
"""
Test that adding a mod will update mods that depend on the modname
"""
support = GameSupport()
support.add(make_game("game1", ["default"]))
support.add(make_game("game2", ["core", "mod_b"]))
lib = support.add(make_mod("lib", ["lib"], []))
modA = support.add(make_mod("mod_a", ["mod_a"], ["mod_b", "lib"]))
support.on_update(modA)
assert not support.has_errors
assert modA.is_confirmed
assert modA.detected_supported_games == {"game2"}
assert lib.is_confirmed
assert len(lib.detected_supported_games) == 0
modB = support.add(make_mod("mod_b", ["mod_b"], ["default"]))
support.on_update(modB)
assert not support.has_errors
assert modA.is_confirmed
assert modA.detected_supported_games == {"game1", "game2"}
assert modB.is_confirmed
assert modB.detected_supported_games == {"game1"}
assert lib.is_confirmed
assert len(lib.detected_supported_games) == 0
def test_update_cycle():
"""
Test that updating a package with a cycle depending on it doesn't break
"""
support = GameSupport()
game1 = support.add(make_game("game1", ["default"]))
modB = support.add(make_mod("mod_b", ["mod_b"], ["default"]))
modA = support.add(make_mod("mod_a", ["mod_a"], ["mod_b"]))
support.on_update(modA)
assert not support.has_errors
assert modA.is_confirmed
assert modA.detected_supported_games == {"game1"}
assert modB.is_confirmed
assert modB.detected_supported_games == {"game1"}
support.add(make_mod("mod_c", ["mod_c"], ["mod_a"]))
modA.depends.add("mod_c")
support.on_update(game1)
assert support.all_errors == {
"author/mod_c: Dependency cycle detected: author/mod_a -> author/mod_c -> author/mod_a",
"author/mod_a: Dependency cycle detected: author/mod_a -> author/mod_c -> author/mod_a",
"author/mod_a: Dependency cycle detected: author/mod_c -> author/mod_a -> author/mod_c",
"author/mod_a: Unable to fulfill dependency mod_c",
"author/mod_c: Dependency cycle detected: author/mod_c -> author/mod_a -> author/mod_c",
"author/mod_c: Unable to fulfill dependency mod_a"
}
def test_remove():
support = GameSupport()
support.add(make_game("game1", ["default"]))
support.add(make_game("game2", ["core", "mod_b"]))
support.add(make_mod("lib", ["lib"], []))
modB = support.add(make_mod("mod_b", ["mod_b"], ["default"]))
modA = support.add(make_mod("mod_a", ["mod_a"], ["mod_b", "lib"]))
support.on_update(modA)
assert not support.has_errors
assert modA.is_confirmed
assert len(modA.detected_supported_games) == 2
assert "game1" in modA.detected_supported_games
assert "game2" in modA.detected_supported_games
assert modB.is_confirmed
assert len(modB.detected_supported_games) == 1
assert "game1" in modB.detected_supported_games
support.on_remove(modB)
assert not support.has_errors
assert modA.is_confirmed
assert len(modA.detected_supported_games) == 1
assert "game2" in modA.detected_supported_games
def test_propagates_user_unsupported_games():
support = GameSupport()
support.add(make_game("game1", ["default"]))
support.add(make_game("game2", ["default"]))
modB = support.add(make_mod("mod_b", ["mod_b"], ["default"]))
modB.user_supported_games.add("game1")
modB.user_unsupported_games.add("game2")
modA = support.add(make_mod("mod_a", ["mod_a"], ["mod_b"]))
support.on_update(modA)
assert not support.has_errors
assert modA.is_confirmed
assert modA.detected_supported_games == {"game1"}
assert modA.supported_games == {"game1"}
assert modB.is_confirmed
assert modB.detected_supported_games == {"game1"}
assert modB.supported_games == {"game1"}
def test_propagates_user_supported_games():
support = GameSupport()
support.add(make_game("game1", ["default"]))
support.add(make_game("game2", ["default"]))
modB = support.add(make_mod("mod_b", ["mod_b"], []))
modB.user_supported_games.add("game1")
modA = support.add(make_mod("mod_a", ["mod_a"], ["mod_b"]))
support.on_update(modA)
assert not support.has_errors
assert modA.is_confirmed
assert modA.detected_supported_games == {"game1"}
assert modB.is_confirmed
assert len(modB.detected_supported_games) == 0
def test_validate_inconsistent_user_supported_game():
support = GameSupport()
support.add(make_game("game1", ["default"]))
support.add(make_game("game2", ["core"]))
support.add(make_mod("mod_b", ["mod_b"], ["core"]))
modA = support.add(make_mod("mod_a", ["mod_a"], ["mod_b"]))
modA.user_supported_games.add("game1")
support.on_update(modA)
assert support.all_errors == {
"author/mod_a: `game1` is specified in supported_games but it is impossible to run mod_a in that game. "
"Its dependencies can only be fulfilled in `game2`. "
"Check your hard dependencies.",
}
def test_validate_inconsistent_supports_all():
support = GameSupport()
support.add(make_game("game1", ["default"]))
support.add(make_game("game2", ["core"]))
support.add(make_mod("mod_b", ["mod_b"], ["core"]))
modA = support.add(make_mod("mod_a", ["mod_a"], ["mod_b"]))
modA.supports_all_games = True
support.on_update(modA)
assert support.all_errors == {
"author/mod_a: This package cannot support all games as some dependencies require specific game(s): `game2`",
}
def test_enable_detection():
support = GameSupport()
support.add(make_game("game1", ["default"]))
support.add(make_game("game2", ["default"]))
modB = support.add(make_mod("mod_b", ["mod_b"], ["default"]))
modB.user_supported_games.add("game1")
modA = support.add(make_mod("mod_a", ["mod_a"], ["mod_b"]))
support.on_update(modA)
assert not support.has_errors
assert modA.is_confirmed
assert modA.detected_supported_games == {"game1", "game2"}
assert modA.supported_games == {"game1", "game2"}
assert modB.is_confirmed
assert modB.detected_supported_games == {"game1", "game2"}
assert modB.supported_games == {"game1", "game2"}
def test_disable_detection():
support = GameSupport()
support.add(make_game("game1", ["default"]))
support.add(make_game("game2", ["default"]))
modB = support.add(make_mod("mod_b", ["mod_b"], ["default"]))
modB.user_supported_games.add("game1")
modB.detection_disabled = True
modA = support.add(make_mod("mod_a", ["mod_a"], ["mod_b"]))
support.on_update(modA)
assert not support.has_errors
assert modA.is_confirmed
assert modA.detected_supported_games == {"game1"}
assert modA.supported_games == {"game1"}
assert modB.is_confirmed
assert modB.detected_supported_games == {"game1", "game2"}
assert modB.supported_games == {"game1"}
def test_first_run():
support = GameSupport()
support.add(make_game("game1", ["default"]))
support.add(make_game("game2", ["core", "mod_b"]))
lib = support.add(make_mod("lib", ["lib"], []))
modB = support.add(make_mod("mod_b", ["mod_b"], ["default"]))
modA = support.add(make_mod("mod_a", ["mod_a"], ["mod_b", "lib"]))
modF = support.add(make_mod("mod_f", ["mod_f"], []))
assert not support.all_confirmed
support.on_first_run()
assert support.all_confirmed
assert not support.has_errors
assert modA.detected_supported_games == {"game1", "game2"}
assert modB.detected_supported_games == {"game1"}
assert len(lib.detected_supported_games) == 0
assert len(modF.detected_supported_games) == 0
def test_ignores_mtg_in_violating_games():
support = GameSupport()
support.add(make_game("minetest_game", ["default"]))
support.add(make_game("tutorial", ["default", "tutorial"]))
modA = support.add(make_mod("mod_a", ["mod_a"], ["default"]))
modB = support.add(make_mod("mod_b", ["mod_b"], ["tutorial"]))
support.on_first_run()
assert not support.has_errors
assert modA.detected_supported_games == {"minetest_game"}
assert modB.detected_supported_games == {"tutorial"}

@ -73,23 +73,29 @@ def is_package_page(f):
return decorated_function
def add_notification(target, causer: User, type: NotificationType, title: str, url: str, package: Package = None):
def add_notification(target, causer: User, type: NotificationType, title: str, url: str,
package: Package = None, session: sqlalchemy.orm.Session = None):
if session is None:
session = db.session
try:
iter(target)
for x in target:
add_notification(x, causer, type, title, url, package)
add_notification(x, causer, type, title, url, package, session)
return
except TypeError:
pass
if target.rank.at_least(UserRank.NEW_MEMBER) and target != causer:
Notification.query.filter_by(user=target, causer=causer, type=type, title=title, url=url, package=package).delete()
session.query(Notification) \
.filter_by(user=target, causer=causer, type=type, title=title, url=url, package=package) \
.delete()
notif = Notification(target, causer, type, title, url, package)
db.session.add(notif)
session.add(notif)
def add_audit_log(severity: AuditSeverity, causer: User, title: str, url: typing.Optional[str],
package: Package = None, description: str = None):
package: Package = None, description: str = None):
entry = AuditLogEntry(causer, severity, title, url, package, description)
db.session.add(entry)
@ -114,7 +120,10 @@ def add_system_audit_log(severity: AuditSeverity, title: str, url: str, package=
return add_audit_log(severity, get_system_user(), title, url, package, description)
def post_bot_message(package: Package, title: str, message: str):
def post_bot_message(package: Package, title: str, message: str, session=None):
if session is None:
session = db.session
system_user = get_system_user()
thread = package.threads.filter_by(author=system_user).first()
@ -125,16 +134,16 @@ def post_bot_message(package: Package, title: str, message: str):
thread.author = system_user
thread.private = True
thread.watchers.extend(package.maintainers)
db.session.add(thread)
db.session.flush()
session.add(thread)
session.flush()
reply = ThreadReply()
reply.thread = thread
reply.author = system_user
reply.thread = thread
reply.author = system_user
reply.comment = "**{}**\n\n{}\n\nThis is an automated message, but you can reply if you need help".format(title, message)
db.session.add(reply)
session.add(reply)
add_notification(thread.watchers, system_user, NotificationType.BOT, title, thread.get_view_url(), thread.package)
add_notification(thread.watchers, system_user, NotificationType.BOT, title, thread.get_view_url(), thread.package, session)
thread.replies.append(reply)