> Как реализовать приоритеты в микросервисе рассылки уведомлений (Python)

Уровень: senior · Роль: backend · Язык: Python · Категория: Технические вопросы

Компании: Международный аэропорт Шереметьево

Стек: Python

> Пример ответа

В микросервисе рассылки уведомлений приоритеты можно реализовать через многоуровневую очередь с использованием брокера сообщений (например, RabbitMQ или Redis). Основная идея - разделить уведомления на классы приоритета (high, medium, low) и обрабатывать их с разной интенсивностью.

Пример реализации на Python с RabbitMQ:

  1. Определение приоритетов:

    • Создаем отдельные очереди для каждого уровня: notifications.high, notifications.medium, notifications.low.
    • В сообщении передаем метаданные (тип, получатель, контент) и приоритет.
  2. Публикация сообщений:

    PYTHON
    import pika
    import json
    def publish_notification(notification_data, priority='medium'):
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    queue_name = f"notifications.{priority}"
    channel.queue_declare(queue=queue_name, durable=True)
    channel.basic_publish(
    exchange='',
    routing_key=queue_name,
    body=json.dumps(notification_data),
    properties=pika.BasicProperties(
    delivery_mode=2, # persistent message
    )
    )
    connection.close()
  3. Обработка с весами:

    • Запускаем несколько воркеров, каждый из которых слушает все очереди, но с разным приоритетом выборки.
    • Используем basic_qos с разными prefetch_count для разных очередей, или реализуем взвешенный polling.

    Пример воркера с взвешенным потреблением:

    PYTHON
    import pika
    import time
    from collections import deque
    class PriorityWorker:
    def __init__(self):
    self.queues = {
    'high': {'weight': 10, 'last_processed': 0},
    'medium': {'weight': 5, 'last_processed': 0},
    'low': {'weight': 1, 'last_processed': 0}
    }
    self.connection = pika.BlockingConnection(...)
    self.channel = self.connection.channel()
    def process(self):
    while True:
    for queue, config in sorted(self.queues.items(),
    key=lambda x: -x[1]['weight']):
    # Пропорциональное потребление
    if config['last_processed'] < config['weight']:
    method, properties, body = self.channel.basic_get(
    queue=f"notifications.{queue}", auto_ack=False
    )
    if method:
    self.handle_message(body)
    self.channel.basic_ack(method.delivery_tag)
    config['last_processed'] += 1
    else:
    config['last_processed'] = 0
    else:
    config['last_processed'] = 0
    time.sleep(0.1) # избегаем busy loop
  4. Альтернатива с Redis:

    • Используем sorted sets с score как приоритет (чем выше score, тем важнее).
    • Воркеры берут сообщения с максимальным score через ZREVRANGEBYSCORE.

Такой подход гарантирует, что высокоприоритетные уведомления (например, сброс пароля или платежные оповещения) обрабатываются быстрее, а низкоприоритетные (маркетинговые рассылки) - с задержкой, без блокировки критичных сообщений.

> ГОТОВЫ К СЛЕДУЮЩЕМУ СОБЕСЕДОВАНИЮ?

Запустите тренировочную сессию с ИИ и получите детальную обратную связь, чтобы увереннее проходить реальные интервью