> Как реализовать приоритеты в микросервисе рассылки уведомлений (Python)
Уровень: senior · Роль: backend · Язык: Python · Категория: Технические вопросы
Компании: Международный аэропорт Шереметьево
Стек: Python
> Пример ответа
В микросервисе рассылки уведомлений приоритеты можно реализовать через многоуровневую очередь с использованием брокера сообщений (например, RabbitMQ или Redis). Основная идея - разделить уведомления на классы приоритета (high, medium, low) и обрабатывать их с разной интенсивностью.
Пример реализации на Python с RabbitMQ:
-
Определение приоритетов:
- Создаем отдельные очереди для каждого уровня:
notifications.high,notifications.medium,notifications.low. - В сообщении передаем метаданные (тип, получатель, контент) и приоритет.
- Создаем отдельные очереди для каждого уровня:
-
Публикация сообщений:
PYTHONimport pikaimport jsondef publish_notification(notification_data, priority='medium'):connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()queue_name = f"notifications.{priority}"channel.queue_declare(queue=queue_name, durable=True)channel.basic_publish(exchange='',routing_key=queue_name,body=json.dumps(notification_data),properties=pika.BasicProperties(delivery_mode=2, # persistent message))connection.close() -
Обработка с весами:
- Запускаем несколько воркеров, каждый из которых слушает все очереди, но с разным приоритетом выборки.
- Используем
basic_qosс разными prefetch_count для разных очередей, или реализуем взвешенный polling.
Пример воркера с взвешенным потреблением:
PYTHONimport pikaimport timefrom collections import dequeclass PriorityWorker:def __init__(self):self.queues = {'high': {'weight': 10, 'last_processed': 0},'medium': {'weight': 5, 'last_processed': 0},'low': {'weight': 1, 'last_processed': 0}}self.connection = pika.BlockingConnection(...)self.channel = self.connection.channel()def process(self):while True:for queue, config in sorted(self.queues.items(),key=lambda x: -x[1]['weight']):# Пропорциональное потреблениеif config['last_processed'] < config['weight']:method, properties, body = self.channel.basic_get(queue=f"notifications.{queue}", auto_ack=False)if method:self.handle_message(body)self.channel.basic_ack(method.delivery_tag)config['last_processed'] += 1else:config['last_processed'] = 0else:config['last_processed'] = 0time.sleep(0.1) # избегаем busy loop -
Альтернатива с Redis:
- Используем sorted sets с score как приоритет (чем выше score, тем важнее).
- Воркеры берут сообщения с максимальным score через
ZREVRANGEBYSCORE.
Такой подход гарантирует, что высокоприоритетные уведомления (например, сброс пароля или платежные оповещения) обрабатываются быстрее, а низкоприоритетные (маркетинговые рассылки) - с задержкой, без блокировки критичных сообщений.
> Похожие задачи по Python
Что такое линтеры и форматтеры и как с ними работать
Был ли опыт работы с инцидентами на продакшене и как решал баги
Как структурировать уведомления
Откуда брать данные для уведомлений
> Похожие задачи по backend
Что такое линтеры и форматтеры и как с ними работать
Был ли опыт работы с инцидентами на продакшене и как решал баги
Как структурировать уведомления
Откуда брать данные для уведомлений
> ГОТОВЫ К СЛЕДУЮЩЕМУ СОБЕСЕДОВАНИЮ?
Запустите тренировочную сессию с ИИ и получите детальную обратную связь, чтобы увереннее проходить реальные интервью