Урок раскрывает ключевые паттерны конкурентности в Go: построение конвейеров (pipelines), объединение потоков данных (fan-in), корректное закрытие каналов, механизмы отмены через quit и context, а также использование генераторов для ленивой обработки данных.
Паттерны конкурентности помогают организовать взаимодействие между горутинами, строить безопасные конвейеры обработки данных, управлять жизненным циклом каналов и останавливать работу системы по сигналу отмены. Эти техники --- фундамент эффективного конкурентного кода в Go.
Этот урок охватывает ключевые паттерны: pipelines, fan-in, cancellation, context, а также generators.
Pipeline --- это последовательность этапов обработки, где каждый этап:
Каждый этап работает в собственной горутине.
func stage(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for v := range in {
out <- v * 2
}
}()
return out
}
Fan-in позволяет собрать данные из нескольких источников в один поток.
func fanIn(channels ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
wg.Add(len(channels))
for _, ch := range channels {
go func(c <-chan int) {
defer wg.Done()
for v := range c {
out <- v
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
Используется, когда необходимо объединить результаты нескольких параллельных обработчиков.
Закрытие каналов --- важная часть pipeline-паттернов.
Если канал не закрыть:
Если канал не закрыть --- горутины могут зависнуть навсегда.
Каждый этап pipeline должен закрывать свой выходной канал, когда данные
заканчиваются.
func stage(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for v := range in {
out <- process(v)
}
}()
return out
}
quitСтадии pipeline могут завершаться досрочно, если приходят сигналы об отмене.
Иногда нужно остановить pipeline досрочно:
func stage(in <-chan int, quit <-chan struct{}) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case v, ok := <-in:
if !ok { return }
out <- v
case <-quit:
return
}
}
}()
return out
}
Если приходит сообщение в канал quit --- стадия прекращает работу.
context.Context --- стандартный способ управлять временем выполнения и отменой горутин.
Преимущества:
Пример:
ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(time.Second)
cancel() // останавливает всю цепочку
}()
Горутины должны слушать <-ctx.Done().
Любая стадия должна слушать:
select {
case <-ctx.Done():
return
}
Генератор создаёт значения по требованию, когда потребитель читает из канала.
func generator() <-chan int {
out := make(chan int, 5)
go func() {
for i := 0; ; i++ {
out <- i
}
}()
return out
}
Преимущества:
quit или context.Context.