Share This
Связаться со мной
Крути в низ
Categories
//Параллелизм в Golang и WorkerPool [Часть 1]

Параллелизм в Golang и WorkerPool [Часть 1]

В современных языках программирования параллелизм стал безусловной потребностью. В этой статье речь пойдет об устройстве и использовании concurrency в Go. Обсудить

parallelizm v golang i workerpool chast 1 83ca61e - Параллелизм в Golang и WorkerPool [Часть 1]

Перевод публикуется с сокращениями, автор оригинальной статьи Ahad Hasan.

В некоторых языках программирования имеются мощные конструкции, которые могут выгружать работу в разные потоки ОС (например, Java), а другие только имитируют это поведение в одном потоке (например, Ruby).

parallelizm v golang i workerpool chast 1 d88c77a - Параллелизм в Golang и WorkerPool [Часть 1]

У Golang есть мощная модель параллелизма – CSP (communicating sequential processes), которая разбивает проблему на более мелкие последовательные процессы, а затем планирует несколько экземпляров этих процессов, называемых Goroutines (горутины). Связь между горутинами осуществляется путем передачи неизменяемых сообщений через Channels.

Рассмотрим, как можно воспользоваться преимуществами параллелизма в Golang и как ограничить его использование с рабочими пулами.

Простой пример

Представим, что у нас есть внешний вызов API, выполняющийся около 100 мс. Если будет 1000 таких вызовов, и мы вызовем их синхронно, то для завершения потребуется около 100 секунд.

         //// model/data.go  package model  type SimpleData struct { 	ID int }  //// basic/basic.go  package basic  import ( 	"fmt" 	"github.com/Joker666/goworkerpool/model" 	"time" )  func Work(allData []model.SimpleData) { 	start := time.Now() 	for i, _ := range allData { 		Process(allData[i]) 	} 	elapsed := time.Since(start) 	fmt.Printf("Took ===============> %sn", elapsed) }  func Process(data model.SimpleData) { 	fmt.Printf("Start processing %dn", data.ID) 	time.Sleep(100 * time.Millisecond) 	fmt.Printf("Finish processing %dn", data.ID) }  //// main.go  package main  import ( 	"fmt" 	"github.com/Joker666/goworkerpool/basic" 	"github.com/Joker666/goworkerpool/model" 	"github.com/Joker666/goworkerpool/worker" )  func main() { 	// Prepare the data 	var allData []model.SimpleData 	for i := 0; i < 1000; i++ { 		data := model.SimpleData{ ID: i } 		allData = append(allData, data) 	} 	fmt.Printf("Start processing all work n")  	// Process 	basic.Work(allData) }     
         Start processing all work  Took ===============> 1m40.226679665s     

Здесь у нас простая модель, содержащая структуру данных только с целочисленными значениями. Массив данных мы обрабатываем массив синхронно: очевидно, что такое решение неоптимально решение, поскольку задачи можно выполнить одновременно. Давайте превратим это в асинхронный процесс с Goroutines и Channels.

Асинхронность

parallelizm v golang i workerpool chast 1 073c540 - Параллелизм в Golang и WorkerPool [Часть 1]

         //// worker/notPooled.go  func NotPooledWork(allData []model.SimpleData) { 	start := time.Now() 	var wg sync.WaitGroup  	dataCh := make(chan model.SimpleData, 100)  	wg.Add(1) 	go func() { 		defer wg.Done() 		for data := range dataCh { 			wg.Add(1) 			go func(data model.SimpleData) { 				defer wg.Done() 				basic.Process(data) 			}(data) 		} 	}()  	for i, _ := range allData { 		dataCh <- allData[i] 	}  	close(dataCh) 	wg.Wait() 	elapsed := time.Since(start) 	fmt.Printf("Took ===============> %sn", elapsed) }  //// main.go  // Process worker.NotPooledWork(allData)     
         Start processing all work  Took ===============> 101.191534ms     

Здесь мы создаем буферизованный канал 100 и добавляем в него все данные, переданные NoPooledWork. Поскольку канал буферизованный, нельзя ввести больше 100 экземпляров данных до полного извлечения из него, что и происходит внутри горутины. Мы перемещаемся по каналу, извлекаем из него данные, добавляем горутину и обрабатываем. Здесь нет ограничений на количество созданных горутин, как нет и ограничений на обработку задач (следует учитывать необходимые ресурсы) – сколько пришло, столько обработали. Если мы запустим такой код, то выполним 1000 задач примерно за 100 мс.

Проблема

Если у нас нет безграничных ресурсов, их нужно ограниченно распределять в течение определенного периода времени. Минимальный размер объекта Goroutine составляет 2 К, но он может достигать 1 ГБ. Приведенное выше решение выполняет все задачи параллельно, а для миллиона таких задач оно может быстро исчерпать память и ресурсы процессора. Придется либо модернизировать машину, либо найти другой подход.

Существует блестящее решение под названием Thread Pool или Worker Pool. Идея состоит в том, чтобы иметь ограниченный пул worker-ов для обработки задач. Как только «рабочий» закончит с одной из них, он переходит к следующей. Это уменьшает нагрузку на процессор и память, а также оперативнее распределяет задачи с течением времени.

Решение: Worker Pool

parallelizm v golang i workerpool chast 1 d09ba4e - Параллелизм в Golang и WorkerPool [Часть 1]

Исправим описанную проблему и реализуем рабочий пул:

         //// worker/pooled.go  func PooledWork(allData []model.SimpleData) { 	start := time.Now() 	var wg sync.WaitGroup 	workerPoolSize := 100  	dataCh := make(chan model.SimpleData, workerPoolSize)  	for i := 0; i < workerPoolSize; i++ { 		wg.Add(1) 		go func() { 			defer wg.Done()  			for data := range dataCh { 				basic.Process(data) 			} 		}() 	}  	for i, _ := range allData { 		dataCh <- allData[i] 	}  	close(dataCh) 	wg.Wait() 	elapsed := time.Since(start) 	fmt.Printf("Took ===============> %sn", elapsed) }  //// main.go  // Process worker.PooledWork(allData)     
         Start processing all work  Took ===============> 1.002972449s     

Здесь у нас есть ограниченное количество worker-ов (100) и ровно 100 горутин для обработки задач. Каналы можно рассматривать как очереди, а каждый worker – как клиента. Несколько горутин могут прослушивать один и тот же канал, но каждый элемент в нем будет обработан только один раз.

Это хорошее решение, и если мы запустим его, то увидим, что для завершения всех задач требуется 1 секунда. Не совсем 100 мс, но нам это и не нужно. Мы получаем гораздо лучшее решение, которое распределяет нагрузку во времени.

Обработка ошибок

Код уже выглядит как готовый продукт, но это не так, поскольку мы не обрабатываем ошибки. Давайте создадим сценарий, в котором посмотрим, как можно это реализовать:

         //// worker/pooledError.go  func PooledWorkError(allData []model.SimpleData) { 	start := time.Now() 	var wg sync.WaitGroup 	workerPoolSize := 100  	dataCh := make(chan model.SimpleData, workerPoolSize) 	errors := make(chan error, 1000)  	for i := 0; i < workerPoolSize; i++ { 		wg.Add(1) 		go func() { 			defer wg.Done()  			for data := range dataCh { 				process(data, errors) 			} 		}() 	}  	for i, _ := range allData { 		dataCh <- allData[i] 	}  	close(dataCh)  	wg.Add(1) 	go func() { 		defer wg.Done() 		for { 			select { 			case err := <-errors: 				fmt.Println("finished with error:", err.Error()) 			case <-time.After(time.Second * 1): 				fmt.Println("Timeout: errors finished") 				return 			} 		} 	}()  	defer close(errors) 	wg.Wait() 	elapsed := time.Since(start) 	fmt.Printf("Took ===============> %sn", elapsed) }  func process(data model.SimpleData, errors chan<- error) { 	fmt.Printf("Start processing %dn", data.ID) 	time.Sleep(100 * time.Millisecond) 	if data.ID % 29 == 0 { 		errors <- fmt.Errorf("error on job %v", data.ID) 	} else { 		fmt.Printf("Finish processing %dn", data.ID) 	} }  //// main.go  // Process worker.PooledWorkError(allData)     

Мы модифицировали метод обработки некоторых ошибок и добавили его в канал переданных ошибок. Таким образом для обработки ошибок в параллельной модели нам нужен канал для хранения данных о них. После того, как все задачи завершены, мы его проверяем. Объект error содержит идентификатор задачи, так что при необходимости мы можем обработать их снова.

Это лучшее решение, чем то, которое вообще не учитывало ошибки. Во второй части туториала рассмотрим, как сделать выделенный и надежный пакет рабочих пулов, который сможет обрабатывать параллельные задачи с ограниченным количеством Worker pool.

Заключение

Мы рассмотрели синхронный и асинхронный подходы, обработку ошибок, горутины и функционирование worker-ов. Модель параллелизма Golang достаточно мощна, чтобы просто построить решение Worker pool без особых накладных расходов, поэтому она не включена в стандартную библиотеку. Однако мы всегда можем создать собственное решение, которое соответствует нашим потребностям. Скоро будет следующая статья, а пока следите за обновлениями.

***

Дополнительные материалы:

  • Язык Go: как стать востребованным программистом
  • Программирование на Go с нуля: 9 полезных видеоуроков
  • ТОП-10 книг по языку программирования Go: от новичка до профессионала
  • Чем хорош язык Go и зачем его изучать? Все плюшки Golang
  • Более 200 избранных ссылок на материалы о языке Go

  • 41 views
  • 0 Comment

Leave a Reply

Ваш адрес email не будет опубликован. Обязательные поля помечены *

Этот сайт использует Akismet для борьбы со спамом. Узнайте, как обрабатываются ваши данные комментариев.

Связаться со мной
Close