Назад к урокам

Паттерны конкурентности в Go

Урок раскрывает ключевые паттерны конкурентности в Go: построение конвейеров (pipelines), объединение потоков данных (fan-in), корректное закрытие каналов, механизмы отмены через quit и context, а также использование генераторов для ленивой обработки данных.

Начинающий14 min

🔀 Паттерны конкурентности в Go

Паттерны конкурентности помогают организовать взаимодействие между горутинами, строить безопасные конвейеры обработки данных, управлять жизненным циклом каналов и останавливать работу системы по сигналу отмены. Эти техники --- фундамент эффективного конкурентного кода в Go.

Этот урок охватывает ключевые паттерны: pipelines, fan-in, cancellation, context, а также generators.


🧩 Pipelines --- конвейеры обработки данных

Pipeline --- это последовательность этапов обработки, где каждый этап:

  • получает данные из входного канала,
  • обрабатывает их,
  • отправляет в выходной канал.

Каждый этап работает в собственной горутине.

Пример этапа:

func stage(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for v := range in {
            out <- v * 2
        }
    }()
    return out
}

Важные принципы

  • у каждого этапа есть хотя бы один вход и один выход;
  • этап должен закрывать канал, когда данные закончились;
  • каждая стадия зависит от результатов предыдущей.

🛠 Fan-in --- объединение нескольких каналов в один

Fan-in позволяет собрать данные из нескольких источников в один поток.

func fanIn(channels ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup
    wg.Add(len(channels))

    for _, ch := range channels {
        go func(c <-chan int) {
            defer wg.Done()
            for v := range c {
                out <- v
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

Используется, когда необходимо объединить результаты нескольких параллельных обработчиков.


❌ Завершение и очистка каналов

Закрытие каналов --- важная часть pipeline-паттернов.

Если канал не закрыть:

  • горутины могут зависнуть навсегда,
  • произойдёт утечка ресурсов,
  • выполнение программы будет заблокировано.

Если канал не закрыть --- горутины могут зависнуть навсегда.
Каждый этап pipeline должен закрывать свой выходной канал, когда данные заканчиваются.

Пример закрытия

func stage(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for v := range in {
            out <- process(v)
        }
    }()
    return out
}

🛑 Отмена (Cancellation) через канал (сигнал) quit

Стадии pipeline могут завершаться досрочно, если приходят сигналы об отмене.

Иногда нужно остановить pipeline досрочно:

func stage(in <-chan int, quit <-chan struct{}) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for {
            select {
            case v, ok := <-in:
                if !ok { return }
                out <- v
            case <-quit:
                return
            }
        }
    }()
    return out
}

Если приходит сообщение в канал quit --- стадия прекращает работу.


🌀 Context --- универсальный механизм отмены

context.Context --- стандартный способ управлять временем выполнения и отменой горутин.

Преимущества:

  • единая точка отмены всей цепочки,
  • не требуется вручную управлять каналами,
  • можно хранить доп. данные (session ID, request ID и т.д.).

Пример:

ctx, cancel := context.WithCancel(context.Background())

go func() {
    time.Sleep(time.Second)
    cancel() // останавливает всю цепочку
}()

Горутины должны слушать <-ctx.Done(). Любая стадия должна слушать:

select {
case <-ctx.Done():
    return
}

⚙ Generators --- ленивое вычисление значений

Генератор создаёт значения по требованию, когда потребитель читает из канала.

func generator() <-chan int {
    out := make(chan int, 5)
    go func() {
        for i := 0; ; i++ {
            out <- i
        }
    }()
    return out
}

Преимущества:

  • вычисления происходят по мере необходимости,
  • избегается лишняя обработка.

🎯 Итоги

  • Паттерны конкурентности помогают структурировать код и избегать утечек.
  • Pipelines --- цепочки обработчиков, передающих данные через каналы.
  • Fan-in объединяет несколько потоков в один.
  • Закрытие каналов обязательно для корректной остановки.
  • Cancellation возможен через quit или context.Context.
  • Generators позволяют генерировать данные по требованию.