Share This
Связаться со мной
Крути в низ
Categories
//🏃 Параллельное программирование в Go

🏃 Параллельное программирование в Go

Изучаем основы параллельного программирования в Go, а также пытаемся разобраться на примерах, почему конкурентность в Go – это не совсем параллелизм. Обсудить

parallelnoe programmirovanie v go 69c3389 - 🏃 Параллельное программирование в Go

Перевод публикуется с сокращениями, автор оригинальной статьи Stefan Nilsson.

Для начала необходимо разобраться, как писать базовые программы на Go. Освежим знания, чтобы помочь себе быстрее освоиться.

Основы

The Go Playground – интерактивный веб-сервис, который позволяет запускать в песочнице небольшие программы в духе «Hello world!». Попробуйте!

         package main  import "fmt"  func main() {     fmt.Println("Hello, world!") }     

Изучите основы Go

A Tour of Go – еще один интерактивный учебник с кучей примеров. Он берет начало на официальном сайте и обучает вас основам программирования Go в браузере.

Установите инструменты Go

В Getting Started объясняется, как установить инструменты Go. Доступны бинарные пакеты для FreeBSD, Linux, Mac OS X и Windows, а также инструкции по развертыванию и настройке.

Начните проект Go

How to Write Go Code посвящен разработке простых пакетов Go. Он рассказывает про организацию и тестирование кода, а также про использование команд fetch, build и install.

Горутины

Вы можете создать новый поток (горутину) с помощью оператора go. Все горутины в одной программе используют одно и то же адресное пространство.

         go list.Sort() //запускается list.Sort параллельно, без ожидания     

Программа выводит сообщение «Hello from main goroutine». Она также может напечатать «Hello from another goroutine», в зависимости от того, какая из двух горутин завершится первой.

         func main() {     go fmt.Println("Hello from another goroutine")     fmt.Println("Hello from main goroutine")      // В этот момент выполнение программы останавливается и убиваются все     // активные горутины }     

Следующая программа скорее всего выведет «Hello from main goroutine» и «Hello from another goroutine». Они могут появиться в любом порядке. Еще одна особенность заключается в том, что вторая горутина работает очень медленно и не печатает сообщение до завершения программы.

         func main() {     go fmt.Println("Hello from another goroutine")     fmt.Println("Hello from main goroutine")      time.Sleep(time.Second) // дадим другой гороутине время завершиться }     

Вот более реалистичный пример, где определяется функция, которая использует concurrency для отсрочки события:

         // Publish печатает текст в stdout по истечении заданного времени. // Он не блокируется и сразу же возвращается. func Publish(text string, delay time.Duration) {     go func() {         time.Sleep(delay)         fmt.Println("BREAKING NEWS:", text)     }() // Обратите внимание на круглые скобки. Мы должны вызвать          // анонимную функцию. }     

Вот как вы можете использовать функцию Publish:

         func main() {     Publish("A goroutine starts a new thread.", 5*time.Second)     fmt.Println("Let’s hope the news will published before I leave.")      // Дожидаемся публикации новостей     time.Sleep(10 * time.Second)      fmt.Println("Ten seconds later: I’m leaving now.") }     

Скорее всего программа напечатает три строки в заданном порядке с пятисекундными перерывами между ними.

         $ go run publish1.go Let’s hope the news will published before I leave. BREAKING NEWS: A goroutine starts a new thread. Ten seconds later: I’m leaving now.     

Невозможно реализовать ожидание потоков в процессе «сна», но есть метод синхронизации – использование каналов.

Реализация

Горутины имеют небольшой вес и стоят немногим больше, чем выделение места в стеке. Место в куче выделяется и освобождается по мере необходимости.

Внутри горутины действуют как корутины, которые мультиплексируются между несколькими потоками операционной системы. Если одна горутина блокирует поток ОС, например, ожидая ввода, другие горутины в этом потоке будут мигрировать, чтобы продолжать работать.

Каналы обеспечивают синхронизированную связь

Каналы – это механизм, с помощью которого горутины синхронизируют выполнение и обмениваются данными, передавая значения.

Новое значение канала можно задать с помощью встроенной функции make.

         // небуферизованный канал int-ов ic := make(chan int)  // буферизованный канал на 10 строк sc := make(chan string, 10)     

Чтобы отправить значение в канал, используйте бинарный оператор «<-», а для получения – унарный оператор.

         ic <- 3   // отправляем 3 в канал n := <-sc // получаем строку из канала     

Оператор задает направление канала на отправку или получение. По умолчанию канал является двунаправленным.

         chan Sushi    // может использоваться для отправки и получения значений типа Sushi chan<- string // может использоваться только для отправки строк <-chan int    // может использоваться только для получения int     

Буферизованные и небуферизованные каналы

  • Если пропускная способность канала равна нулю или отсутствует, канал не буферизуется и отправитель блокируется до тех пор, пока получатель не получит значение.
  • Если канал имеет буфер, отправитель блокируется только до тех пор, пока значение не будет скопировано в буфер. Если буфер заполнен, ждем пока какой-либо получатель не получит значение.
  • Приемники всегда блокируются, пока не появятся данные для приема.
  • Отправка или получение с nil-канала блокируется навсегда.

Закрытие канала

Функция закрытия помечает, что никакие значения больше не будут отправляться по каналу. Обратите внимание, что закрывать канал необходимо только в том случае, если приемник этого ожидает.

  • После вызова close и после получения любых ранее отправленных значений, операции приема вернут нулевое значение без блокировки.
  • Операция приема множества значений дополнительно возвращает состояние канала.
  • Отправка в закрытый канал или его закрытие, а также закрытие nil-канала, вызовут run-time panic.
         ch := make(chan string) go func() {     ch <- "Hello!"     close(ch) }()  fmt.Println(<-ch) // напечатает «Hello!» fmt.Println(<-ch) // выведет нулевое значение «» без блокировки fmt.Println(<-ch) // еще раз напечатает «» v, ok := <-ch     // v - это «», ok – false  // получать значения от ch до закрытия for v := range ch {     fmt.Println(v) // не выполнится }     

Пример

В следующем примере функция Publish вернет канал, который используется для броадкастинга сообщения после публикации текста:

         // Publish напечатает текст в stdout по истечении заданного времени. // Когда текст будет опубликован, закрываем канал, который на «паузе». func Publish(text string, delay time.Duration) (wait <-chan struct{}) { 	ch := make(chan struct{}) 	go func() { 		time.Sleep(delay) 		fmt.Println(text) 		close(ch) 	}() 	return ch }     

Обратите внимание: мы используем канал пустых структур для указания, что канал будет использоваться только для сигнализации, а не для передачи данных. Выглядит это так:

         wait := Publish("important news", 2 * time.Minute) // выполним что-нибудь <-wait // в блоке, пока текст не будет опубликован     

Select ожидает группы каналов

Оператор select одновременно ожидает нескольких операций отправки или получения.

  • Оператор блокируется до тех пор, пока одна из операций не будет разблокирована.
  • Если выполняется несколько операций, то одна из них будет выбрана случайным образом.
         // блокируется до тех пор, пока данные не появятся в ch1 или ch2 select { case <-ch1:     fmt.Println("Received from ch1") case <-ch2:     fmt.Println("Received from ch2") }     

Операции отправки и приема в nil-канале блокируются навсегда. Это можно использовать для отключения канала в инструкции select:

         ch1 = nil // отключает этот канал select { case <-ch1:     fmt.Println("Received from ch1") // не произойдет case <-ch2:     fmt.Println("Received from ch2") }     

Вариант по умолчанию

Вариант по умолчанию будет выполнен, если все остальные заблокированы.

         // никогда не заблокируется select { case x := <-ch:     fmt.Println("Received", x) default:     fmt.Println("Nothing available") }     

Примеры

Бесконечная случайная двоичная последовательность

В качестве примера можно использовать случайный выбор вариантов, которые могут генерировать случайные биты.

         rand := make(chan int) for {     select {     case rand <- 0: // no statement     case rand <- 1:     } }     

Операция блокировки по таймауту

Функция time.After входит в стандартную библиотеку. Она ожидает истечения указанного времени, а затем отправляет текущее время в возвращаемый канал:

         select { case news := <-AFP:     fmt.Println(news) case <-time.After(time.Minute):     fmt.Println("Time out: No news in one minute") }     

Оператор select блокируется до тех пор, пока по крайней мере один case не сможет выполниться. С нулевыми кейсами этого никогда не произойдет:

         select {}     

Гонки данных

Гонка данных происходит, когда две горутины одновременно обращаются к одной и той же переменной и хотя бы одно из обращение является записью.

Такая ситуация возникает часто и может усложнить отладку.

Показанная ниже функция приводит к гонке данных, и ее поведение не определено – она может, например, напечатать число 1. Попробуем выяснить, как это происходит:

         func race() {     wait := make(chan struct{})     n := 0     go func() {         n++ // чтение, увеличение, запись         close(wait)     }()     n++ // конфликтующий доступ     <-wait     fmt.Println(n) // Вывод: <unspecified> }     

Две горутины g1 и g2, участвуют в гонке, и нет никакого способа узнать, в каком порядке будут выполняться операции. Ниже приведен один из нескольких возможных вариантов:

parallelnoe programmirovanie v go ad082d8 - 🏃 Параллельное программирование в Go

Как избежать гонки данных?

Единственный способ избежать гонки – синхронизировать доступ ко всем mutable-данным, которые используются потоками совместно. Есть несколько способов добиться этого. В Go обычно используется канал или блокировка (низкоуровневые механизмы доступны в пакетах sync и sync/atomic).

Предпочтительный способ обработки одновременного доступа к данным в Go – использовать канал для передачи данных от одной горутины к следующей.

         func sharingIsCaring() {     ch := make(chan int)     go func() {         n := 0 // Локальная переменная видна только для первой горутины         n++         ch <- n // Данные отправляются из первой горутины     }()     n := <-ch // ...и благополучно прибывают во вторую     n++     fmt.Println(n) // Вывод: 2 }     

В этом коде канала происходят два события:

  • передаются данные от одной горутины к другой – точка синхронизации;
  • отправляющая горутина будет ждать, пока другая получит данные и наоборот.

Модель памяти Go довольно сложна: переменная в одной горутине может гарантированно наблюдать значения, полученные при записи в ту же переменную в другой горутине, но до тех пор, пока вы делитесь всеми mutable-данными между горутинами по каналам, вы защищены от гонки данных.

Как обнаружить гонку данных?

Гонки данных могут легко появляться, но обнаружить их трудно. К счастью среда выполнения Go может помочь и в этом. Используйте ключ -race для включения встроенного детектора гонки данных.

         $ go test -race [packages] $ go run -race [packages]     

Пример

Программа с гонкой данных:

         package main import "fmt"  func main() {     i := 0     go func() {         i++ // запись     }()     fmt.Println(i) // конкурентное чтение }     

Запуск этой программы с параметром -race покажет нам, что существует гонка между записью в строке 7 и чтением в строке 9:

         $ go run -race main.go 0 ================== WARNING: DATA RACE Write by goroutine 6:   main.main.func1()       /tmp/main.go:7 +0x44  Previous read by main goroutine:   main.main()       /tmp/main.go:9 +0x7e  Goroutine 6 (running) created at:   main.main()       /tmp/main.go:8 +0x70 ================== Found 1 data race(s) exit status 66     

Подробности

Детектор гонки не выполняет никакого статического анализа. Он проверяет доступ к памяти во время выполнения только для фактически работающего кода.

Он работает на darwin/amd64, freebsd/amd64, linux/amd64 и Windows/amd64.

Накладные расходы варьируются, но обычно происходит увеличение использования памяти в 5-10 раз и увеличение времени выполнения в 2-20 раз.

Как отлаживать deadlock-и

Дэдлоки возникают, когда горутины ждут друг друга и ни одна из них не может завершиться.

Взглянем на пример:

         func main() { 	ch := make(chan int) 	ch <- 1 	fmt.Println(<-ch) }     

Программа застрянет на операции отправки, ожидая вечно, пока кто-то прочитает значение. Go способен обнаруживать подобные ситуации во время выполнения. Вот результат нашей программы:

         fatal error: all goroutines are asleep - deadlock!  goroutine 1 [chan send]: main.main() 	.../deadlock.go:7 +0x6c     

Советы по отладке

Горутина может застрять:

  • когда она ждет канал;
  • либо когда она ждет одну из блокировок в пакете sync.

Общие причины:

  • ни одна горутина не имеет доступа к каналу или блокировке;
  • горутины ждут друг друга.

Сейчас Go обнаруживает только зависание всей программы в целом, а не когда застревает некое подмножество горутин.

С помощью каналов легко понять, что вызвало дедлок. С другой стороны, интенсивно использующие мьютексы программы могут быть заведомо трудными для отладки.

Ожидание горутин

Группа sync.WaitGroup ожидает завершения работы группы горутин:

         var wg sync.WaitGroup wg.Add(2) go func() {     // Do work.     wg.Done() }() go func() {     // Do work.     wg.Done() }() wg.Wait()     
  • сначала основная горутина вызывает Add, чтобы установить количество ожидающих горутин;
  • затем запускаются две новые горутины и вызывают Done при завершении.

В то же время Wait используется для блокировки до тех пор, пока эти две горутины не завершатся.

Замечание: группа ожидания не должна копироваться после первого использования.

Трансляция сигнала по каналу

В этом примере функция Publish возвращает канал, который используется для передачи сигнала при публикации сообщения.

         // печать текста по истечении заданного времени // когда это будет выполнено, канал ожидания будет закрыт func Publish(text string, delay time.Duration) (wait <-chan struct{}) {     ch := make(chan struct{})     go func() {         time.Sleep(delay)         fmt.Println("BREAKING NEWS:", text)         close(ch) // трансляция на все приемники     }()     return ch }     

Обратите внимание, что мы используем канал пустых структур: struct{}. Это явно указывает на то, что канал предназначен только для сигнализации, а не для передачи данных.

Вот как можно это использовать:

         func main() {     wait := Publish("Channels let goroutines communicate.", 5*time.Second)     fmt.Println("Waiting for news...")     <-wait     fmt.Println("Time to leave.") }     
         Waiting for news... BREAKING NEWS: Channels let goroutines communicate. Time to leave.     

Как убить горутину

Чтобы горутина остановилась, ей необходимо прослушивать сигнал остановки на выделенном выходном канале и проверять его.

         quit := make(chan bool) go func() {     for {         select {         case <-quit:             return         default:             // …         }     } }() // … quit <- true     

Вот более полный пример, где используется один канал как для передачи данных, так и для сигнализации:

         // генератор возвращает канал, который производит числа 1, 2, 3… // чтобы остановить основную горутину, необходимо отправить  // номер этому каналу func Generator() chan int {     ch := make(chan int)     go func() {         n := 1         for {             select {             case ch <- n:                 n++             case <-ch:                 return             }         }     }()     return ch }  func main() {     number := Generator()     fmt.Println(<-number)     fmt.Println(<-number)     number <- 0           // остановка основной горутины     fmt.Println(<-numberм) // ошибка, больше никто не отправляет     }     
         1 2 fatal error: all goroutines are asleep - deadlock!     

Timer и Ticker

Таймеры и тикеры позволяют выполнять код по расписанию один или несколько раз.

Timeout (Timer)

time.After ожидает в течение заданного промежутка, а затем отправляет текущее время по возвращаемому каналу:

         select { case news := <-AFP: 	fmt.Println(news) case <-time.After(time.Hour): 	fmt.Println("No news in an hour.") }     

time.Timer не будет обработан сборщиком мусора до тех пор, пока таймер не сработает. Используйте time.NewTimer вместо вызова метода Stop, когда таймер больше не нужен:

         for alive := true; alive; { 	timer := time.NewTimer(time.Hour) 	select { 	case news := <-AFP: 		timer.Stop() 		fmt.Println(news) 	case <-timer.C: 		alive = false 		fmt.Println("No news in an hour. Service aborting.") 	} }     

Repeat (Ticker)

time.Tick возвращает канал, который обеспечивает тиканье часов с четными интервалами:

         go func() { 	for now := range time.Tick(time.Minute) { 		fmt.Println(now, statusUpdate()) 	} }()     

time.Ticker не будет обработан сборщиком мусора до тех пор, пока таймер не сработает. Используйте time.NewTicker вместо вызова метода Stop, когда тикер больше не нужен:

         func Foo() {     timer = time.AfterFunc(time.Minute, func() {         log.Println("Foo run for more than a minute.")     })     defer timer.Stop()      // Do heavy work }     

Блокировка взаимного исключения (мьютекс)

Иногда удобнее синхронизировать доступ к данным с помощью явной блокировки, а не с помощью каналов. Стандартная библиотека Go предлагает для этой цели блокировку взаимного исключения sync.Mutex.

Используйте с осторожностью

Чтобы этот тип блокировки был безопасным, крайне важно, чтобы все обращения к общим данным выполнялись только тогда, когда горутина находится в блокировке. Одной ошибки в одной горутине достаточно, чтобы ввести гонку данных и сломать программу.

Из-за этого вам следует подумать о разработке кастомной структуры данных с чистым API и убедиться, что вся синхронизация выполняется внутри.

В этом примере мы создаем безопасную и простую в использовании конкурентную структуру данных AtomicInt, в которой хранится integer. Любое количество горутин может безопасно получить доступ к этому числу с помощью методов Add и Value.

         // AtomicInt – это параллельная структура данных, содержащая int // его значение равно 0 type AtomicInt struct {     mu sync.Mutex // блокировка может удерживаться одной горутиной за раз     n  int }  // добавляет n к AtomicInt func (a *AtomicInt) Add(n int) {     a.mu.Lock() //  ждем пока блокировка освободится     a.n += n     a.mu.Unlock() // освобождение блокировки }  // Value возвращает значение a func (a *AtomicInt) Value() int {     a.mu.Lock()     n := a.n     a.mu.Unlock()     return n }  func main() {     wait := make(chan struct{})     var n AtomicInt     go func() {         n.Add(1) // один доступ         close(wait)     }()     n.Add(1) // другой конкурентный доступ     <-wait     fmt.Println(n.Value()) // 2 }     

Заключение

Мы рассмотрели распространенные проблемы, относящиеся к конкурентности в Go. Это не весь материал по теме – остальное вам придется самостоятельно изучать на официальном сайте. Не ленитесь, развивайтесь и удачи в обучении!

Дополнительные материалы:

  • Где используется язык программирования Go?
  • Конкурентность в Golang и WorkerPool [Часть 2]
  • Golang против Python: какой язык программирования выбрать?
  • Язык Go: как стать востребованным программистом

  • 12 views
  • 0 Comment

Leave a Reply

Ваш адрес email не будет опубликован. Обязательные поля помечены *

Этот сайт использует Akismet для борьбы со спамом. Узнайте, как обрабатываются ваши данные комментариев.

Связаться со мной
Close