> Как использовать паттерн outbox для обработки задержек записи в базу и обеспечения отказоустойчивости (Go)
Уровень: senior · Роль: backend · Язык: Go · Категория: Технические вопросы
Компании: BrightPattern
Стек: Go
> Пример ответа
Паттерн Outbox решает проблему атомарности между записью в базу данных и отправкой сообщения (или вызовом внешнего сервиса). В Go это реализуется так:
-
Основная идея: Вместо прямой отправки сообщения в брокер, мы сначала сохраняем "исходящее сообщение" в той же транзакции, что и бизнес-данные. Отдельный процесс (outbox relay) читает эти сообщения и отправляет их.
-
Пример реализации на Go:
GOtype Order struct {ID int64Status string}type OutboxMessage struct {ID int64Topic stringPayload []byteCreatedAt time.Time}// Сохранение заказа и outbox-сообщения в одной транзакцииfunc CreateOrder(ctx context.Context, db *sql.DB, order Order) error {tx, err := db.BeginTx(ctx, nil)if err != nil {return err}defer tx.Rollback()// Бизнес-логика_, err = tx.ExecContext(ctx, "INSERT INTO orders (id, status) VALUES ($1, $2)", order.ID, order.Status)if err != nil {return err}// Сообщение в outboxpayload, _ := json.Marshal(order)_, err = tx.ExecContext(ctx,"INSERT INTO outbox (topic, payload, created_at) VALUES ($1, $2, $3)","order_created", payload, time.Now())if err != nil {return err}return tx.Commit()}// Outbox relay - отдельная горутинаfunc OutboxRelay(ctx context.Context, db *sql.DB, producer MessageProducer) {for {select {case <-ctx.Done():returndefault:// Читаем неотправленные сообщенияrows, err := db.QueryContext(ctx,"SELECT id, topic, payload FROM outbox WHERE sent_at IS NULL ORDER BY id LIMIT 100")if err != nil {time.Sleep(time.Second)continue}for rows.Next() {var msg OutboxMessagerows.Scan(&msg.ID, &msg.Topic, &msg.Payload)// Отправляем в брокерerr := producer.Send(msg.Topic, msg.Payload)if err != nil {log.Printf("Failed to send message %d: %v", msg.ID, err)continue}// Помечаем как отправленноеdb.ExecContext(ctx, "UPDATE outbox SET sent_at = NOW() WHERE id = $1", msg.ID)}rows.Close()time.Sleep(100 * time.Millisecond) // Пауза между итерациями}}}
-
Обработка задержек и отказоустойчивость:
- Задержки записи: Outbox relay работает асинхронно, не блокируя основной поток. Если БД медленная, сообщения накапливаются в таблице и отправляются позже.
- Отказоустойчивость: При падении сервиса после коммита транзакции, но до отправки сообщения, relay при перезапуске подхватит неотправленные сообщения (sent_at IS NULL).
- Идемпотентность: Добавьте уникальный ключ (например,
event_id) в outbox, чтобы избежать дубликатов при повторной отправке.
-
Дополнительные улучшения:
- Используйте
SELECT ... FOR UPDATE SKIP LOCKEDдля параллельной обработки несколькими relay. - Добавьте TTL для старых сообщений, чтобы не засорять таблицу.
- Для высоких нагрузок используйте batch-отправку сообщений.
- Используйте
Этот паттерн гарантирует, что каждое бизнес-событие будет доставлено хотя бы один раз (at-least-once), даже при сбоях.
> Похожие задачи по Go
Как хранить историю изменений цен акций в базе данных
Как организовать запись данных из WebSocket в базу данных и кэш
Как организовать буферизацию данных для предотвращения потери сообщений
Как организовать таблицы и шардирование для хранения исторических данных
> Похожие задачи по backend
Как хранить историю изменений цен акций в базе данных
Как организовать запись данных из WebSocket в базу данных и кэш
Как организовать буферизацию данных для предотвращения потери сообщений
Как организовать таблицы и шардирование для хранения исторических данных
> ГОТОВЫ К СЛЕДУЮЩЕМУ СОБЕСЕДОВАНИЮ?
Запустите тренировочную сессию с ИИ и получите детальную обратную связь, чтобы увереннее проходить реальные интервью