Go微服务实战
上QQ阅读APP看书,第一时间看更新

7.3 服务器端

本节将实现聊天服务器的服务器端,相关代码都在chatserver/server包内。

实现这个包的代码开发时,还是先从定义接口开始。将服务器端(server)该有的方法都进行定义,这样有利于后面代码逻辑的具体实现和思路梳理,代码如下:


chatserver/server/server.go
1. package server
2.
3. type Server interface {
4.     Listen(address string) error
5.     Broadcast(command interface{}) error
6.     Start()
7.     Close()
8. }

这里定义了四个方法:Listen方法用于监听信息的写入;Broadcast方法用于将收到的信息发送给其他用户;Start和Close方法则分别用于启动和关闭服务器。

接下来完成服务器端这四个方法的具体实现。同时还要注意,因为Broadcast需要向连接在服务器上的所有客户端广播收到的某客户端信息,所以应该有个struct来保存所有的客户端。同样地,也需要对服务器端的信息定义struct。具体的实现如下:


chatserver/server/tcp_server.go
1. package server
2.
3. import (
4.     "errors"
5.     "io"
6.     "log"
7.     "net"
8.     "sync"
9.     "github.com/ScottAI/chatserver/protocol"
10. )
11.
12. type client struct {
13.     conn net.Conn
14.     name string
15.     writer *protocol.Writer
16. }
17.
18. type TcpServer struct {
19.     listener net.Listener
20.     clients []*client
21.     mutex *sync.Mutex
22. }
23.
24. var (
25.     UnknownClient = errors.New("Unknown client")
26. )
27.
28. func NewServer() *TcpServer  {
29.     return &TcpServer{
30.         mutex:&sync.Mutex{},
31.     }
32. }
33.
34. func (s *TcpServer) Listen(address string) error{
35.     l,err := net.Listen("tcp",address)
36.
37.     if err == nil{
38.         s.listener = l
39.     }
40.
41.     log.Printf("Listening on %v",address)
42.
43.     return err
44. }
45.
46. func (s *TcpServer) Close(){
47.     s.listener.Close()
48. }
49.
50. func (s *TcpServer) Start(){
51.     for{
52.         conn,err := s.listener.Accept()
53.
54.         if err != nil{
55.             log.Print(err)
56.         }else{
57.             client := s.accept(conn)
58.             go s.serve(client)
59.         }
60.     }
61. }
62.
63. func (s *TcpServer) Broadcast(command interface{}) error {
64.     for _,client := range s.clients {
65.         client.writer.Write(command)
66.     }
67.     return nil
68. }
69.
70. func (s *TcpServer) Send(name string,command interface{}) error {
71.     for _,client := range s.clients{
72.         if client.name == name{
73.             return client.writer.Write(command)
74.         }
75.     }
76.     return UnknownClient
77. }
78.
79. func (s *TcpServer) accept(conn net.Conn) *client  {
80.     log.Printf("Accepting connection from %v,total clients:%v",conn.
            RemoteAddr().String(),len(s.clients)+1)
81.
82.     s.mutex.Lock()
83.     defer s.mutex.Unlock()
84.
85.     client := &client{
86.         conn:conn,
87.         writer:protocol.NewWriter(conn),
88.     }
89.
90.     s.clients = append(s.clients,client)
91.     return client
92. }
93.
94. func (s *TcpServer) remove(client *client)  {
95.     s.mutex.Lock()
96.     defer s.mutex.Unlock()
97.
98.     for i,check := range s.clients{
99.         if check == client {
100.             s.clients = append(s.clients[:i],s.clients[i+1:]...)
101.         }
102.     }
103.     log.Printf("Closing connection from %v",client.conn.RemoteAddr().String())
104.     client.conn.Close()
105. }
106.
107. func (s *TcpServer) serve(client *client)  {
108.     cmdReader := protocol.NewReader(client.conn)
109.
110.     defer s.remove(client)
111.
112.     for {
113.         cmd,err := cmdReader.Read()
114.         if err != nil && err != io.EOF {
115.             log.Printf("Read error: %v",err)
116.         }
117.
118.         if cmd != nil {
119.             switch v := cmd.(type) {
120.             case protocol.SendCmd:
121.                 go s.Broadcast(protocol.MessCmd{
122.                     Message: v.Message,
123.                     Name : client.name,
124.                 })
125.             case protocol.NameCmd:
126.                 client.name = v.Name
127.             }
128.         }
129.
130.         if err == io.EOF {
131.             break
132.         }
133.     }
134. }

第12行至第16行,定义client结构体,因为当有客户端连接服务器的时候,需要记录客户端的信息,便于保存客户端的名字信息以及向客户端进行信息广播。

第18行至第22行,TcpServer结构体用于描述服务器,其中定义了net.Listener类型的变量listener,这是一个监听器。关于net包会在本书第二部分进行详细介绍。clients变量是client结构体的切片,用于保存所有连接到服务器的客户端。因为客户端连接服务器端是并发的,所以要为服务器端加上互斥锁,避免竞态。关于竞态也会在本书第二部分进行介绍,读者现在先了解sync.Mutex可以定义互斥锁即可。

第24行至第26行,定义一个错误,当不能识别客户端时抛出此错误。

第28行至第32行,NewServer函数用于返回一个TcpServer类型的实体地址。

第34行至第39行,Listen方法内通过net.Listener.Listen方法启动对特定端口的监听。

第46行至第48行,Close方法用于关闭端口监听。

第50行至第61行,Start方法用于启动服务器端,方法内通过net.Listener.Accept方法接收到新的连接,然后通过自己定义的accept方法处理连接,再在第58行启动一个goroutine来运行serve方法。

第63行至第68行,Broadcast方法用于向所有的客户端广播服务器收到的信息。

第70行至第77行,Send方法用于向特定的用户发送信息。因为是通过slice来存储所有的client的,所以需要循环遍历所有的client,然后逐个判断。如果要求用户名不重复,可以改为通过map来存储所有的client,读者可以在书中源码的基础上自己动手改造一下。

第79行至第92行,accept方法用于接收一个请求,并且创建一个client结构体,然后保存到client切片中。注意该方法是互斥的,因为涉及对client切片的操作。

第94行至第105行,remove方法在tcp连接结束时断开并且从client切片删除对应的client结构体。本方法也是互斥的,同样是因为对client切片的操作。请读者注意第98行至第102行的切片删除方法的使用,这些内容在3.2节做过介绍。

第107行至第137行,在服务器端和客户端创建连接以后,会为每一个连接启动一个goroutine,在第58行(Start方法内)启动serve方法。serve方法不停地从tcp连接中读取字符串,并且判断是名字还是信息,如果是信息就进行广播,如果是名字就保存。这里通过defer关键字调用remove方法,同时也意味着goroutine结束的时候会从切片删除本client。

注意

defer关键字在实战中非常实用,如果有在程序遇到panic崩溃后仍然需要处理的动作则会使用defer处理。虽然defer有些许的性能问题,但是这是官方提供的标准的且遇到panic也会执行的方式。当然,能少用还是少用。

整个tcp_server.go完全实现了server.go定义的接口,下面只需给server包一个启动入口就可以了。程序的启动都是从main包的main方法开始的,所以要在server包内创建启动用的main包main函数。因为其作用仅仅是启动程序,所以为了方便区分,专门在server包下面再建了一个cmd包,用来放启动代码,如下:


chatserver/server/cmd/main.go
1. package main
2.
3. import (
4.     "github.com/ScottAI/chatserver/server"
5. )
6.
7. func main()  {
8.     var s server.Server
9.     s = server.NewServer()
10.     s.Listen(":3333")
11.     s.Start()
12. }

main函数很简单,用于启动服务端并监听3333端口。

到这里,整个服务器端也完成了。因为服务器端是单独的,在部署的时候是可以不管客户端而只把服务端单独部署到服务器的,所以可以直接运行测试效果。这里就不再赘述,读者可以自行尝试。

现在来看一下server包文件的结构:


--chatserver
----server
------server.go
------tcp_server.go
------cmd
--------main.go

服务器端的实现并不复杂,因为很多通信相关的功能在protocol包已经实现了,可以看出包的功能划分在项目中非常重要,包功能划分清晰便于厘清思路,也便于代码实现。