GRPC理解与实践

gRPC 是google开源的,可以在任何环境中运行的现代开源高性能 RPC 框架。

概念理解

RPC

首先是需要对RPC进行了解。

RPC(Remote Procedure Call)远程过程调用,简单的理解是一个节点请求另一个节点提供的服务。即本机调用其他服务器的方法就像调用本地的一样,其本质是本机发送一个请求给远程服务器,远程服务器运行方法并将运行结果完整的按照格式发送回调用者。

需要注意的是,RPC只是一种思想,实现RPC的有很多,就连http也能勉强叫做实现类RPC。

GRPC

GRPC是Google对RPC的一个实现。用户只需关注服务,调用的过程(数据序列化、网络通讯等问题由框架解决)。

在 gRPC 里客户端应用可以像调用本地对象一样直接调用另一台不同的机器上服务端应用的方法,使得您能够更容易地创建分布式应用和服务。与许多 RPC 系统类似,gRPC 也是基于以下理念:定义一个服务,指定其能够被远程调用的方法(包含参数和返回类型)。在服务端实现这个接口,并运行一个 gRPC 服务器来处理客户端调用。在客户端拥有一个存根能够像服务端一样的方法。

protocol buffers

gRPC 默认使用 protocol buffers,这是 Google 开源的一套成熟的结构数据序列化机制(当然也可以使用其他数据格式如 JSON)。在使用 proto files 创建 gRPC 服务,用 protocol buffers 消息类型来定义方法参数和返回类型。借助protobuf的代码生成器,可以轻松生成各语言的GPRC调用接口。更多实践和理论参考protocol buffers一文及官方文档。

HTTP2.0

HTTP/2 (原名HTTP/2.0)即超文本传输协议 2.0,是下一代HTTP协议

提到http2.0,就不得不提http2.0与当前常用的http1.0的关系。

数据资源更小

可以看到:

  • HTTP/1里的header对应HTTP/2里的 HEADERS frame
  • HTTP/1里的payload对应HTTP/2里的 DATA frame

h2中应用经过数据压缩的header帧和data帧替代原来的纯文本或Json数据,耗费资源更少,传输速度更快。

支持多路复用

其次,h2支持多路复用(多路复用允许同时通过单一的 HTTP/2 连接发起多重的请求-响应消息)。

  • HTTP/1.1:浏览器客户端在同一时间,针对同一域名下的请求有一定数量限制。超过限制数目的请求会被阻塞。
  • HTTP/2 支持多路复用,h2将http协议通讯的基本单位缩小为一个个帧,并发发送请求,在同一个TCP连接上实现双向交换数据。

支持服务端推送

服务端推送是一种在客户端请求之前发送数据的机制。在 HTTP/2 中,服务器可以对客户端的一个请求发送多个响应。

如:浏览器只发送一个网页请求,但是服务器可以推送网页代码、js文件、css文件、图片等都给客户端。减少客户端请求的次数。

gRPC的四种服务类型

本文以一个案例来说明以下四种类型的使用,及grpc实现。

描述一个向导服务RouteGuide。

定义四种信息类型:Point(点)、Rectangle(长方形范围)、Feature(该点的特色)、RouteSummary(路线图总结)、RecommendationRequest(特色推荐请求)

定义四个方法,分别对应四种服务类型:

  1. GetFeature(输入一个Ponit,返回这个点的Feature)
  2. ListFeature(输入一个Rectangle、输出流这个区域的所有Feature)
  3. RecordRoute(输入流为每个事件点的位置Point、返回一个RouteSummary)
  4. RouteChat(输入流RecommendationRequest、输出流Feature)
  • 简单RPC(UNARY)
  • 服务端流式RPC
  • 客户端流式RPC
  • 双向流式RPC

实现流程

项目结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
GRPC-SAMPLE-PROJECT
│ go.mod
│ go.sum
│ makefile

├─client
│ client.go

├─route
│ route.pb.go
│ route.proto

└─server
server.go

protobuf设计

构建数据结构及服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
syntax = "proto3";

option go_package = "route.go";
package route;

// 坐标
message Point{
int32 longitude = 1;
int32 latitude = 2;
}

// 长方形区域
message Rectangle{
Point lo = 1;
Point hi = 2;
}

// 某个地点
message Feature{
string name = 1;
Point location = 2;
}

// 路线总结
message RouteSummary{
int32 point_count = 1;
int32 distance = 2;
int32 elapsed_time = 3;
}

// 地点推荐
message RecommendationRequest{
enum RecommendationMode {
GetFarthest = 0; //最远
GetNearest = 1; //最近
}
RecommendationMode mode = 1;
Point point = 2;
}

service RouteGuide{
rpc GetFeature(Point) returns (Feature){}
rpc ListFeature(Rectangle) returns (stream Feature){}
rpc RecordRoute(stream Point) returns (RouteSummary){}
rpc RouteChat(stream RecommendationRequest) returns (stream Feature){}
}

使用编译器自动生成grpc-go代码

1
protoc --go_out=plugins=grpc:.  route/route.proto

Server端设计

重点是要实现刚才设计的service RouteGuide服务编译后生成的接口。

1
2
3
4
5
6
7
8
9
10
11
// RouteGuideServer is the server API for RouteGuide service.
type RouteGuideServer interface {
GetFeature(context.Context, *Point) (*Feature, error)
ListFeature(*Rectangle, RouteGuide_ListFeatureServer) error
RecordRoute(RouteGuide_RecordRouteServer) error
RouteChat(RouteGuide_RouteChatServer) error
}

// UnimplementedRouteGuideServer can be embedded to have forward compatible implementations.
type UnimplementedRouteGuideServer struct {
}

在实现这些接口的同时,把业务逻辑写进去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package main

import (
"context"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
pb "grpc-sample/route"
"io"
"log"
"math"
"net"
"time"
)

type routeGuideServer struct {
pb.UnimplementedRouteGuideServer
// 模拟地点数据
features []*pb.Feature
}

// 根据经纬度获取地点信息
func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
for _,feature := range s.features {
if proto.Equal(feature.Location,point){
return feature, nil
}
}
return nil, nil
}


// check if a point is inside a rectangle.
func inRange(point *pb.Point,rect *pb.Rectangle) bool{
left := math.Min(float64(rect.Lo.Longitude), float64(rect.Hi.Longitude))
right := math.Max(float64(rect.Lo.Longitude), float64(rect.Hi.Longitude))
top := math.Max(float64(rect.Lo.Latitude), float64(rect.Hi.Latitude))
bottom := math.Min(float64(rect.Lo.Latitude), float64(rect.Hi.Latitude))

if float64(point.Longitude) >= left &&
float64(point.Longitude) <= right &&
float64(point.Latitude) >= bottom &&
float64(point.Latitude) <= top {
return true
}
return false
}

// 根据长方形区域获取该地区的所有地点信息
func (s *routeGuideServer) ListFeature(rectangle *pb.Rectangle,stream pb.RouteGuide_ListFeatureServer) error {
for _,feature := range s.features{
if inRange(feature.Location,rectangle){
err := stream.Send(feature)
if err != nil {
panic(err)
}
}
}
return nil
}


func computeDistance(x *pb.Point, y *pb.Point) int32{
a := (x.Latitude - y.Latitude) * (x.Latitude - y.Latitude)
b := (x.Longitude - y.Longitude) * (x.Longitude - y.Longitude)
return int32(math.Sqrt(float64(a + b)))
}

// 根据客户端的地点信息输入流(路程信息),返回该段路程的汇总统计
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
startTime := time.Now()
var pointCount,distance int32
var prevPoint *pb.Point
for{
point, err := stream.Recv()
if err != nil {
endTime := time.Now()
return stream.SendAndClose(&pb.RouteSummary{
Distance: distance,
PointCount: pointCount,
ElapsedTime: int32(endTime.Sub(startTime).Seconds()),
})
}

if err != nil{
return err
}

pointCount++
if prevPoint != nil{
distance += computeDistance(prevPoint,point)
}
prevPoint = point
}
}


func (s *routeGuideServer) recommendedOnce(request *pb.RecommendationRequest) *pb.Feature {
var nearest,farthest *pb.Feature
var nearestDistance,farthestDistance int32

for _,feature := range s.features{
dis := computeDistance(feature.Location,request.Point)
if nearest == nil || dis < nearestDistance{
nearestDistance = dis
nearest = feature
}
if farthest == nil || dis > farthestDistance{
farthestDistance = dis
farthest = feature
}
}

if request.Mode == pb.RecommendationRequest_GetFarthest{
return farthest
}else{
return nearest
}
}

// 根据客户端的地点信息输入流(经纬度信息),不断返回该对该经纬度最(近、远)的地点信息
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
for{
request,err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil{
return err
}
recommended := s.recommendedOnce(request)
if err := stream.Send(recommended);err != nil {
return err
}
}
}

// 实现一个 routeGuideServer
func newServer() *routeGuideServer{
return &routeGuideServer{
features: []*pb.Feature{
{Name: "清华大学",Location: &pb.Point{Longitude: 5,Latitude:5}},
{Name: "北京大学",Location: &pb.Point{Longitude: 5,Latitude:4}},
{Name: "南京大学",Location: &pb.Point{Longitude: 1,Latitude:4}},
{Name: "哈尔滨工业大学",Location: &pb.Point{Longitude: 10,Latitude:1}},
},
}
}


func main() {
// TCP连接监听端口
lis, err := net.Listen("tcp", "localhost:5000")
if err != nil {
log.Fatalln("can't create listener at the address")
}
// 创建grpc服务
grpcServer := grpc.NewServer()
// 注册RouteGuide服务的实现
pb.RegisterRouteGuideServer(grpcServer,newServer())
// 启动Server
log.Println("server started")
log.Fatalln(grpcServer.Serve(lis))
}

最后监听一个端口、创建grpc服务、注册RouteGuide服务接口的实现、启动grpc服务。

Client端设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package main

import (
"bufio"
"context"
"fmt"
"google.golang.org/grpc"
pb "grpc-sample/route"
"io"
"log"
"os"
"time"
)

// 执行 根据经纬度获取地点信息 的任务
func runFirst(client pb.RouteGuideClient) {
feature, err := client.GetFeature(context.Background(), &pb.Point{
Latitude: 5,
Longitude: 5,
})
if err != nil {
log.Fatalln(err)
}
fmt.Println(feature)
}

// 执行 根据长方形区域获取该地区的所有地点信息 的任务
func runSecond(client pb.RouteGuideClient) {
serverStream, err := client.ListFeature(context.Background(), &pb.Rectangle{
Lo: &pb.Point{Latitude: 1, Longitude: 1},
Hi: &pb.Point{Latitude: 5, Longitude: 5},
})
if err != nil {
log.Fatalln(err)
}

for {
feature, err := serverStream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalln(err)
}
fmt.Println(feature)
}
}

// 执行 根据客户端的地点信息输入流(路程信息),返回该段路程的汇总统计 的任务
func runThird(client pb.RouteGuideClient) {
clientStream, err := client.RecordRoute(context.Background())
if err != nil {
log.Fatal(err)
}

points := []*pb.Point{
{Latitude: 10, Longitude: 1},
{Latitude: 5, Longitude: 5},
{Latitude: 5, Longitude: 4},
}

for _, point := range points {
if err := clientStream.Send(point); err != nil {
log.Fatalln(err)
}
time.Sleep(time.Second)
}

summary, err := clientStream.CloseAndRecv()
if err != nil {
log.Fatalln(err)
}
fmt.Println(summary)
}

func readIntegerFromCommandLine(reader *bufio.Reader, target *int32) {
_, err := fmt.Fscanf(reader, "%d\n", target)
if err != nil {
log.Fatalln("cannot scan integer from command line")
}
}

// 执行 根据客户端的地点信息输入流(经纬度信息),不断返回该对该经纬度最(近、远)的地点信息 的任务
func runForth(client pb.RouteGuideClient) {
stream, err := client.RouteChat(context.Background())
if err != nil {
log.Fatal(err)
}

waitc := make(chan struct{})
go func() {
for {
feature, err := stream.Recv()
if err == io.EOF {
close(waitc)
return
}
if err != nil {
log.Fatal(err)
}
log.Println("Recommended: ", feature)
}
}()

reader := bufio.NewReader(os.Stdin)
for {
request := pb.RecommendationRequest{Point: new(pb.Point)}
var mode int32
fmt.Print("Enter Recommendation Mode (0 for farthest, 1 for nearest): ")
readIntegerFromCommandLine(reader, &mode)
fmt.Print("Enter Latitude: ")
readIntegerFromCommandLine(reader, &request.Point.Latitude)
fmt.Print("Enter Longitude: ")
readIntegerFromCommandLine(reader, &request.Point.Longitude)
request.Mode = pb.RecommendationRequest_RecommendationMode(mode)

if err := stream.Send(&request); err != nil {
panic(err)
}
time.Sleep(time.Millisecond * 100)
}
}

func runFive(client pb.RouteGuideClient) {

requests := []pb.RecommendationRequest{
pb.RecommendationRequest{Mode: 1, Point: &pb.Point{Latitude: 4, Longitude: 5}},
pb.RecommendationRequest{Mode: 1, Point: &pb.Point{Longitude: 1, Latitude: 5}},
pb.RecommendationRequest{Mode: 1, Point: &pb.Point{Latitude: 2, Longitude: 10}},
}
stream, err := client.RouteChat(context.Background())
if err != nil {
log.Fatal(err)
}

// receive response from server
waitc := make(chan struct{})
go func() {
for {
feature, err := stream.Recv()
if err == io.EOF {
close(waitc)
return
}
if err != nil {
panic(err)
}
log.Println("Recommended: ", feature)
}
}()

// send request to server
for _, request := range requests {
if err := stream.Send(&request); err != nil {
log.Fatalln(err)
}
}

stream.CloseSend()
<-waitc
}

func main() {
// 连接服务端,忽略证书验证、连通后才继续执行
conn, err := grpc.Dial("localhost:5000", grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
log.Fatal("client cannot dial grpc server")
}
defer conn.Close()
// 启动grpc客户端
client := pb.NewRouteGuideClient(conn)
//runFirst(client)
//runSecond(client)
//runThird(client)
//runFive(client)
runForth(client)
}

需要关注的点是:

  • 单边流,在发送完成后需要关闭。
  • 双边流,需要两个进程同时运行监听和发送任务。
  • 流任务结束后会返回io.EOF错误

参考:

gRPC 官方文档中文版

https://www.bilibili.com/video/BV1DV411s7ij