> Как обрабатывать дубликаты сообщений при асинхронной записи из очереди в базу (Go)

Уровень: senior · Роль: backend · Язык: Go · Категория: Технические вопросы

Компании: BrightPattern

Стек: Go

> Пример ответа

Для обработки дубликатов сообщений при асинхронной записи из очереди в базу данных в Go можно использовать идемпотентность на стороне потребителя. Основной подход - присвоить каждому сообщению уникальный идентификатор (например, UUID) и проверять его наличие в базе перед вставкой.

Пример реализации с PostgreSQL и database/sql:

GO
type Message struct {
ID string `json:"id"` // уникальный ID сообщения
Payload string `json:"payload"`
}
func (c *Consumer) ProcessMessage(ctx context.Context, msg Message) error {
// Начинаем транзакцию для атомарности
tx, err := c.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback() // откат при ошибке
// Проверяем, существует ли уже сообщение с таким ID
var exists bool
err = tx.QueryRowContext(ctx, "SELECT EXISTS(SELECT 1 FROM messages WHERE id = $1)", msg.ID).Scan(&exists)
if err != nil {
return fmt.Errorf("check duplicate: %w", err)
}
if exists {
// Дубликат - просто подтверждаем обработку (ack)
return tx.Commit() // коммит не обязателен, но для чистоты
}
// Вставляем новое сообщение
_, err = tx.ExecContext(ctx, "INSERT INTO messages (id, payload) VALUES ($1, $2)", msg.ID, msg.Payload)
if err != nil {
// Обработка возможного race condition при параллельной вставке
if isDuplicateKeyError(err) {
// Другой потребитель уже вставил - считаем успехом
return tx.Commit()
}
return fmt.Errorf("insert: %w", err)
}
return tx.Commit()
}
func isDuplicateKeyError(err error) bool {
// Проверка на PostgreSQL: код 23505 (unique_violation)
var pgErr *pq.Error
return errors.As(err, &pgErr) && pgErr.Code == "23505"
}

Альтернативы:

  • Использовать INSERT ... ON CONFLICT DO NOTHING в PostgreSQL для атомарной проверки и вставки без дополнительного SELECT.
  • Для Redis или других хранилищ - проверять через SETNX (set if not exists) с TTL.
  • Для Kafka - использовать ключ сообщения (message key) и идемпотентный продюсер, но на стороне потребителя всё равно нужна проверка.

Важно: всегда подтверждайте (ack) сообщение только после успешной записи в БД, чтобы избежать потери данных при сбое.

> ГОТОВЫ К СЛЕДУЮЩЕМУ СОБЕСЕДОВАНИЮ?

Запустите тренировочную сессию с ИИ и получите детальную обратную связь, чтобы увереннее проходить реальные интервью