Шаг 1. Ищем код эхо-сервера
Типовой эхо-сервер, на который дал мне первую ссылку Yandex, описан в статье «Golang: простой сервер TCP и TCP-клиент». Если вы не имеете представления, как работают сокеты и соединения, стоит её почитать. Исходный код из этой статьи можно скачать тут и желательно сразу же его запустить.
Если скомпилировать исходники и запустить в отдельных консолях клиент и сервер, то можно обмениваться сообщениями. У этого сервера есть один значительный недостаток: он работает только с одним клиентом. Попробуем запустить в еще одной консоли новый клиент и увидим, как она зависнет. То же самое будет и с четвертым, и с пятым клиентом.
Шаг 2. Реализуем прием нескольких соединений
Чтобы принять несколько одновременных соединений, необходимо:
функцию приема соединения conn.Accept()
заключить еще в один цикл for
.
весь код, который был в цикле, вынести в отдельную функцию process()
.
запустить функцию process()
как отдельную горутину в цикле for
сразу после приема соединения conn.Accept()
Подробнее о горутинах и каналах рассказывается в статье «Параллельное программирование в Go». Стоит с ней ознакомиться, поскольку на этих механизмах основывается наш будущий проект.
В результате небольших изменений наш код примет следующий вид:
// функция process запускается как горутина func process(conn net.Conn){ // определим, что перед выходом из функции, мы закроем соединение defer conn.Close for { // Будем прослушивать все сообщения разделенные n message, _ := bufio.NewReader(conn).ReadString('n') // Распечатываем полученое сообщение fmt.Print("Message Received:", string(message)) // Отправить новую строку обратно клиенту conn.Write([]byte(message + "n")){ } }
Код в main:
// Устанавливаем прослушивание порта ln, _ := net.Listen("tcp", ":8081") // выполнение цикла обработки соединений for { // Принимаем входящее соединение conn, _ := ln.Accept() // запускаем функцию process(conn) как горутину go process(conn) }
Внешний цикл (строки 31-36, код доступен в репозитории) будет принимать входящее соединение, а внутренний цикл (строки 15-19) будет обрабатывать входные данные. Исходный код клиента мы не меняем.
Если мы запустим в одной консоли сервер, а в нескольких других консолях программу клиента, то можно видеть, что сервер может обрабатывать уже несколько соединений с клиентами параллельно. На самом деле он их обрабатывает асинхронно: сперва один запрос, потом другой, осуществляя переключения между горутинами.
Шаг 3. Обрабатываем ошибки соединений
Давайте попробуем отсоединить один из клиентов, убив его процесс: наш сервер зациклится. Если что-нибудь набрать в клиенте, то данные куда-то уходят, и клиент ни о чём не подозревает.
Мы забыли обработать ошибки ввода-вывода. Функция вывода в сокет Write имеет два выходных параметра: кол-во считанных байт и ошибку:
data_len, err := conn.Write(b []byte)
Если ошибка не пустая (т.е. не равна nil
), значит мы не смогли принять данные. Какая ошибка произошла, можно узнать с помощью функции err.Error()
Заменим conn.Write(b []byte)
на следующий код:
_ , err := conn.Write(b []byte) if err != nil { fmt.Println(err.Error) break // выходим из цикла и функции }
Аналогичный код пропишем в клиенте. Еще в клиенте отсутствует отложенное закрытие соединения, которое срабатывает при выходе из функции defer conn.Close()
.
Если вы не смогли внести изменения самостоятельно, готовый код можно подглядеть на GitHub.
Теперь при закрытие клиента или сервера, у нас будет выдаваться сообщение:
write tcp 127.0.0.1:8081->127.0.0.1:40296: write: broken pipe
Шаг 4. Простой прототип мессенджера
К этому моменту мы допилили эхо-сервер, а теперь осталось сделать простой из него мессенджер. Пусть логика мессенджера будет следующей:
Клиент коннектится к серверу и получает номер сообщения. Каждай клиент имеет свой уникальный номер.
Клиент отправляет сообщение серверу с указанием номера клиента, кому адресовано это сообщение.
Сервер принимает сообщение от клиента, декодирует его и отправляет тому клиенту, которому адресовано это сообщение.
Нам осталось усовершенствовать сервер так, чтобы определить, с какого клиента было отправлено сообщение. Для простоты пусть каждый клиент будет иметь номер, соответствующий порядковому номеру соединения начиная с нуля (индексы массивов в Go начинаются с нуля).
Введем счетчик входящих соединений, а каждое новое соединение сохраним в xeштаблице, организовав таким образом пул:
conns := make( map[int] net.Conn, 1024)
Каждое соединение после conn.Accept()
мы сохраним в conns,
а в функцию process()
будем передавать весь пул (хештаблицу) и номер текущего соединения. В функции обработки соединения process()
мы можем иметь доступ ко всем активным соединениям. Не забываем увеличивать на единицу счетчик текущих соединений.
В функции process()
мы принимаем не текущее соединение, а пул и номер текущего соединения. Следовательно, чтоб получить доступ к текущему соединению, мы можем его взять из пула:
conn := conn[n]
Новый код сервера:
func process(conns map[int]net.Conn, n int) { // получаем доступ к текущему соединению conn := conns[n] // определим, что перед выходом из функции, мы закроем соединение defer conn.Close() for { // Будем прослушивать все сообщения разделенные n message, _ := bufio.NewReader(conn).ReadString('n') // Распечатываем полученое сообщение fmt.Print("Message Received:", string(message)) // Отправить новую строку обратно клиенту _, err := conn.Write([]byte(strconv.Itoa(n) + "->> " + message + "n")) // анализируем на ошибку if err != nil { fmt.Print(err.Error()) break // выходим из цикла } } } func main() { fmt.Println("Start server...") // создаем пул соединений conns := make(map[int]net.Conn, 1024) i := 0 // Устанавливаем прослушивание порта ln, _ := net.Listen("tcp", ":8081") // объвляем пул соединений на 1024 соединения conns := make(map[int]net.Conn, 1024) // Запускаем цикл обработки соединений for { // Принимаем входящее соединение conn, _ := ln.Accept() // сохраняем соединение в пул conns[i] = conn // запускаем функцию process(conn) как горутину go process(conns, i) i++ } }
Полный код примера можно найти в этом репозитории. Попробуем запустить сервер и отправить сообщение.
При тестировании мы видим, что в каждом ответном сообщении сервер возвращает клиенту номер текущего соединения:
./client Text to send: msg from 1 Message from server: 1->> msg from 1
Шаг 5. Реализация протокола обмена
Остается малость: определить как, кто и кому будет отправлять сообщения. Создадим простейший протокол обмена, в котором первые байты сообщения будут содержать номер клиента и текст через пробел.
Пример:
2 Оправляем сообщение второму клиенту
Для реализации этого протокола, необходимо сделать парсинг сообщения. Номер клиента, мы вытащим, используя fmt.Scanf()
, а само сообщение с использованием слайса:
// парсинг полученного сообщения fmt.Sscanf(message, "%d", &clientNo) // определи номер клиента pos := strings.Index(message, " ") // нашли позицию разделителя out_message := message[pos:] // взяли хвост сообщение после пробела
Дальше все очень просто: зная номер соединения (clientNo
) клиента, мы будем отправлять ответ в нужное соединение. Сообщение было немного изменено, и теперь мы выводим, от какого клиента оно исходит:
conns[clientNo].Write([]byte(strconv.Itoa(clientNo) + "->> " + out_message + "n"))
Запускаем, проверяем и видим некоторые баги. Если мы направляем сообщение самому себе, то вроде бы все работает. Если отправить сообщение другому клиенту, оно где-то теряется, а если этим клиентом отправить сообщение кому-то еще, оно появляется из ниоткуда. Что же пошло не так?
Шаг 6. Распараллеливание кода
Наш клиент работает синхронно: после отправления сообщения он переходит в режим ожидания чтения сокета. Все работает согласно алгоритму: записал и жди ответа. Если мы отправим сообщение самому себе, то примем его без проблем. А если отправим другому клиенту, то после этого встаем в режим ожиданий. Второй клиент тоже стоит в режиме ожидания и примет наше сообщение, только после того, как сам что-нибудь отправит. Как же выйти из этой ситуации?
Выход довольно простой: нужно чтение из сокета и чтение с консоли сделать асинхронными, т.е. сделать так, чтобы ввод из сокета и ввод из консоли не блокировали друг друга. Как говорилось выше, горутины позволяют коду выполняться асинхронно. Следовательно, должны быть запущены две разные горутины: одна должна осуществлять чтение с консоли, а вторая – чтение из сокета.
Первая горутина читает данные из сокета, и если в сокете есть данные, выводит их на печать. Чтение и вывод должны быть заключены в цикл:
// прием данных из сокета и вывод на печать func readSock(conn net.Conn ) { buf := make([]byte,256) for { readed_len, _ := conn.Read(buf) if readed_len > 0 { fmt.Print(string(buf)) } } }
Со второй горутиной все немного сложнее, поскольку она должна передать данные в основную программу. Почему нельзя сразу писать их в сокет, как это делает первая горутина? Увы, операция conn.Write()
– блокирующая, и если мы так сделаем, то можем заблокировать другие операции ввода-вывода. Все блокирующие операции нужно разнести по разным асинхронным частям программы.
// ввод данных с консоли и вывод их в канал func readConsole(ch chan string) { for { fmt.Print(">") line, _ := bufio.NewReader(os.Stdin).ReadString('n') out :=line[:len(line)-1] // убираем символ возврата каретки ch <- out // отправляем данные в канал } }
Основная программа должна запустить две асинхронных горутины: чтение с консоли и из сокета (в цикле читать канал и если в нем есть данные, то записать их в сокет). Чтобы наше консольное приложение не «съело» все ресурсы CPU, необходимо ввести некоторую задержку: time.Sleep(time.Seconds * 2)
Должно получиться примерно следующее:
func main(){ ch := make(chan string) defer close(ch) // закрываем канал при выходе из программы conn, _ := net.Dial("tcp", "127.0.0.1:8080") go readConsole(ch) go readSock(conn) for { val, ok := <- ch if ok { // если есть данные, то их пишем в сокет _, err := conn .Write([]bytes(val)) if err != nil { fmt.Println("Write:", err.Error()) break } } else { // данных в канале нет, задержка на 2 секунды time.Sleep(time.Second * 2) } } fmt.Println("Finished...") conn.Close() }
Шаг 7. Повышаем надежность выполнения кода
Наше приложение должно работать при любых входных данных, даже если они некорректные. Есть несколько простых правил, которые придется соблюдать при построении любых приложений:
необходимо проверять все входящие данные и если их длинна больше принимающего буфера, то либо обрезать их, либо генерировать ошибку;
необходимо обрабатывать все функции ввода-вывода на возможность возникновения ошибки.
Например, в коде было много сокращений и специально была опущена обработка функций conn.Accept()
и net.Dial()
:
ls,err := conn.Accept() if err != nil { fmt.Println(err) panic("accept") }
Также был опущен код обработки объема данных с консоли:
// ввод данных с консоли for { fmt.Print(">") line, _ := bufio.NewReader(os.Stdin).ReadString('n') if len(line) > 250 { fmt.Println("Error: message is very lagre") continue } ch <- b }
Почти готовое и работоспособное решение можно найти в репозитории: вам остается самостоятельно дописать обработку всех ошибок ввода-вывода. Если возникнет желание сделать pull request в репозиторий, то я смогу указать в комментариях на ошибки или просто похвалить. Сделайте свой код достоянием общественности. Удачи!