> Как работает периодический таск, который читает таблицу уведомлений и отправляет их? (Python)

Уровень: senior · Роль: backend · Язык: Python · Категория: Технические вопросы

Компании: HeadHunter

Стек: Python

> Пример ответа

Периодический таск для чтения таблицы уведомлений и их отправки обычно реализуется с помощью фонового воркера (например, Celery Beat, APScheduler или простого threading.Timer). Основная логика выглядит так:

  1. Планировщик запускает задачу с заданным интервалом (например, каждые 10 секунд).
  2. Запрос к БД: выбираются уведомления со статусом pending или new, которые ещё не были отправлены. Часто добавляется условие по времени (например, scheduled_at <= NOW()).
  3. Блокировка (опционально): чтобы избежать дублирования при параллельных запусках, используется SELECT ... FOR UPDATE или оптимистичная блокировка через поле version.
  4. Отправка: для каждого уведомления вызывается соответствующий канал (email, push, SMS и т.д.). Ошибки логируются, а статус записи меняется на failed или sent.
  5. Обновление статуса: после успешной отправки запись помечается как sent, чтобы повторно не обрабатываться.

Пример на Python с использованием APScheduler и SQLAlchemy:

PYTHON
from apscheduler.schedulers.background import BackgroundScheduler
from sqlalchemy import select, update
from your_app.models import Notification, NotificationStatus
from your_app.services import send_notification
def process_notifications():
with db_session() as session:
# Выбираем неотправленные уведомления с блокировкой строк
stmt = select(Notification).where(
Notification.status == NotificationStatus.PENDING
).with_for_update(skip_locked=True).limit(100)
notifications = session.execute(stmt).scalars().all()
for notif in notifications:
try:
send_notification(notif)
notif.status = NotificationStatus.SENT
except Exception as e:
notif.status = NotificationStatus.FAILED
notif.error_message = str(e)
session.commit()
# Настройка планировщика
scheduler = BackgroundScheduler()
scheduler.add_job(process_notifications, 'interval', seconds=10)
scheduler.start()

Важные нюансы:

  • Используйте skip_locked=True в блокировке, чтобы разные воркеры не ждали друг друга.
  • Ограничивайте количество записей за один раз (LIMIT), чтобы не перегружать память.
  • Для высокой нагрузки рассмотрите очередь (RabbitMQ, Redis) вместо прямого опроса БД.
  • Мониторьте количество уведомлений в статусе pending - если оно растёт, увеличьте частоту или масштабируйте воркеры.

> ГОТОВЫ К СЛЕДУЮЩЕМУ СОБЕСЕДОВАНИЮ?

Запустите тренировочную сессию с ИИ и получите детальную обратную связь, чтобы увереннее проходить реальные интервью