# ContentDB
# Copyright (C) 2018-21 rubenwardy
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see
{escape(text)}" msg.html = render_template("emails/base.html", subject=subject, content=html, reason=reason, sub=sub) if conn: conn.send(msg) else: mail.send(msg) @celery.task(rate_limit="25/m") def send_user_email(email: str, locale: str, subject: str, text: str, html=None, conn=None): return send_email_with_reason(email, locale, subject, text, html, lazy_gettext("You are receiving this email because you are a registered user of ContentDB."), conn) @celery.task(rate_limit="25/m") def send_anon_email(email: str, locale: str, subject: str, text: str, html=None): return send_email_with_reason(email, locale, subject, text, html, lazy_gettext("You are receiving this email because someone (hopefully you) entered your email address as a user's email."), None) def send_single_email(notification, locale): sub = get_email_subscription(notification.user.email) if sub.blacklisted: return with force_locale(locale or "en"): msg = Message(notification.title, recipients=[notification.user.email], extra_headers=gen_headers(sub, False)) msg.body = """ New notification: {} View: {} Manage email settings: {} Unsubscribe: {} """.format(notification.title, abs_url(notification.url), abs_url_for("users.email_notifications", username=notification.user.username), abs_url_for("users.unsubscribe", token=sub.token)) msg.html = render_template("emails/notification.html", notification=notification, sub=sub) mail.send(msg) def send_notification_digest(notifications: [Notification], locale): user = notifications[0].user sub = get_email_subscription(user.email) if sub.blacklisted: return with force_locale(locale or "en"): msg = Message(gettext("%(num)d new notifications", num=len(notifications)), recipients=[user.email]) msg.body = "".join(["<{}> {}\n{}: {}\n\n".format(notification.causer.display_name, notification.title, gettext("View"), abs_url(notification.url)) for notification in notifications]) msg.body += "{}: {}\n{}: {}".format( gettext("Manage email settings"), abs_url_for("users.email_notifications", username=user.username), gettext("Unsubscribe"), abs_url_for("users.unsubscribe", token=sub.token)) msg.html = render_template("emails/notification_digest.html", notifications=notifications, user=user, sub=sub) mail.send(msg) @celery.task() def send_pending_digests(): for user in User.query.filter(User.notifications.any(emailed=False)).all(): to_send = [] for notification in user.notifications: if not notification.emailed and notification.can_send_digest(): to_send.append(notification) notification.emailed = True if len(to_send) > 0: send_notification_digest(to_send, user.locale or "en") db.session.commit() @celery.task() def send_pending_notifications(): for user in User.query.filter(User.notifications.any(emailed=False)).all(): to_send = [] for notification in user.notifications: if not notification.emailed: if notification.can_send_email(): to_send.append(notification) notification.emailed = True elif not notification.can_send_digest(): notification.emailed = True db.session.commit() if len(to_send) > 1: send_notification_digest(to_send, user.locale or "en") elif len(to_send) > 0: send_single_email(to_send[0], user.locale or "en") @celery.task() def send_bulk_email(subject: str, text: str, html=None): with mail.connect() as conn: for user in User.query.filter(User.email.isnot(None)).all(): send_user_email(user.email, user.locale or "en", subject, text, html, conn)