本博客需要你有一点基本的gRPC的常识,如果你完全是新手建议访问官网全面了解。
1. Unary RPC
proto文件如下
syntax = "proto3"; option go_package=".;service"; message HelloRequest { // Name of the person to greet string name = 1; } message HelloResponse { // Greeting message string greeting = 1; } service HelloService { // RPC method to say hello rpc SayHello (HelloRequest) returns (HelloResponse){} }
使用命令(注意命令路径和自己的对应):
protoc -I . --go-grpc_out=require_unimplemented_servers=false:. --go_out=. *.proto
对应目录上有xx.pb.go
和xx_grpc.pb.go
然后对应目录实现服务端接口
package main import ( "context" "fmt" "google.golang.org/grpc" "net" "test_grpc/service" ) type HelloService struct { } func (hs *HelloService) SayHello(ctx context.Context, req *service.HelloRequest) (*service.HelloResponse, error) { resp := &service.HelloResponse{ Greeting: fmt.Sprintf("hello %s --from Golang Server", req.Name), } return resp, nil } func main() { // listen on 127.0.0.1:50051 listen, err := net.Listen("tcp", "127.0.0.1:50051") if err != nil { fmt.Println("Error happened when listen on 127.0.0.1:50051:", err) return } // grpc server s := grpc.NewServer() // register HelloService in grpc server service.RegisterHelloServiceServer(s, &HelloService{}) // start rpc server fmt.Println("Golang rpc server is waiting messages......") if err = s.Serve(listen); err != nil { fmt.Println("Error happened when start rpc server:", err) return } }
客户端接口如下:
package main import ( "context" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "test_grpc/service" "time" ) func main() { // connect to server conn, err := grpc.Dial("127.0.0.1:50051", grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { fmt.Println("Connect to rpc server err:", err) return } defer conn.Close() // init service client c := service.NewHelloServiceClient(conn) // init context with timeout ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() // send message req := &service.HelloRequest{Name: "Golang"} r, err := c.SayHello(ctx, req) if err != nil { fmt.Println("Send message err:", err) return } fmt.Println("Client:", r.Greeting) }
实际上为了更好得感受gRPC这种跨语言调用的感觉,可以尝试使用python编写client端代码,直接复制proto文件,在python中使用以下命令生成对应的proto文件(注意命令和自己的对应):
python -m grpc_tools.protoc -I . --python_out=. --pyi_out=. --grpc_python_out=. *.proto
使用python实现的客户端代码如下:
# client template # grpc server address channel = grpc.insecure_channel("127.0.0.1:50051") stub = hello_pb2_grpc.HelloServiceStub(channel) # send request response = stub.SayHello(hello_pb2.HelloRequest(name="Python")) print(response.greeting) # hello Python --from Golang Server
2. Server-side streaming RPC
重新给出这个的proto文件,服务端将以流式数据的形式发送给客户端数据
syntax = "proto3"; option go_package=".;service"; message HelloRequest { // Name of the person to greet string name = 1; } message HelloResponse { // Greeting message string greeting = 1; } service HelloService { // RPC method to say hello rpc SayHello (HelloRequest) returns (stream HelloResponse){} }
同理,生成对应的proto文件后,在对应的文件先生成server端的代码:
package main import ( "fmt" "google.golang.org/grpc" "net" "test_grpc/service" ) type HelloService struct { } func (hs *HelloService) SayHello(req *service.HelloRequest, stream service.HelloService_SayHelloServer) error { resp := &service.HelloResponse{ Greeting: fmt.Sprintf("hello %s --from Golang Server", req.Name), } // 连续发送5次 for i := 0; i < 5; i++ { if err := stream.Send(resp); err != nil { return err } } return nil } func main() { // listen on 127.0.0.1:50051 listen, err := net.Listen("tcp", "127.0.0.1:50051") if err != nil { fmt.Println("Error happened when listen on 127.0.0.1:50051:", err) return } // grpc server s := grpc.NewServer() // register HelloService in grpc server service.RegisterHelloServiceServer(s, &HelloService{}) // start rpc server fmt.Println("Golang rpc server is waiting messages......") if err = s.Serve(listen); err != nil { fmt.Println("Error happened when start rpc server:", err) return } }
同理给出客户端的代码:
package main import ( "context" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "io" "log" "test_grpc/service" "time" ) func main() { // connect to server conn, err := grpc.Dial("127.0.0.1:50051", grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { fmt.Println("Connect to rpc server err:", err) return } defer conn.Close() // init service client c := service.NewHelloServiceClient(conn) // init context with timeout ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() // send message req := &service.HelloRequest{Name: "Golang"} stream, err := c.SayHello(ctx, req) if err != nil { fmt.Println("Send message err:", err) return } // 加载消息 for { resp, err := stream.Recv() // 读到结束标志 if err == io.EOF { log.Fatalf("end.....") break } if err != nil { log.Fatalf("failed to receive response: %v", err) } log.Printf("Greeting: %s", resp.Greeting) } }
3. Client-side streaming RPC
对应的proto文件如下
syntax = "proto3"; option go_package=".;service"; message HelloRequest { // Name of the person to greet string name = 1; } message HelloResponse { // Greeting message string greeting = 1; } service HelloService { // RPC method to say hello rpc SayHello (stream HelloRequest) returns (HelloResponse){} }
同理使用protoc命令生成对应的proto文件,后先编写client端的代码,如下:
package main import ( "context" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "log" "test_grpc/service" "time" ) func main() { // connect to server conn, err := grpc.Dial("127.0.0.1:50051", grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { fmt.Println("Connect to rpc server err:", err) return } defer conn.Close() // init service client c := service.NewHelloServiceClient(conn) // init context with timeout ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() // create stream stream, err := c.SayHello(ctx) if err != nil { log.Fatalf("could not greet: %v", err) } names := []string{"World", "Gophers", "Anthropic"} for _, name := range names { // request body req := &service.HelloRequest{Name: name} if err := stream.Send(req); err != nil { log.Fatalf("faild to send request: %v", err) } } resp, err := stream.CloseAndRecv() if err != nil { log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil) } log.Printf("Greeting: %s", resp.Greeting) }
对应得完成服务端的代码:
package main import ( "fmt" "google.golang.org/grpc" "io" "net" "strings" "test_grpc/service" ) type HelloService struct { } func (hs *HelloService) SayHello(stream service.HelloService_SayHelloServer) error { var strs []string for { msg, err := stream.Recv() if err == io.EOF { break } if err != nil { return err } strs = append(strs, msg.Name) } resp := &service.HelloResponse{Greeting: strings.Join(strs, " ")} err := stream.SendAndClose(resp) if err != nil { return err } return nil } func main() { // listen on 127.0.0.1:50051 listen, err := net.Listen("tcp", "127.0.0.1:50051") if err != nil { fmt.Println("Error happened when listen on 127.0.0.1:50051:", err) return } // grpc server s := grpc.NewServer() // register HelloService in grpc server service.RegisterHelloServiceServer(s, &HelloService{}) // start rpc server fmt.Println("Golang rpc server is waiting messages......") if err = s.Serve(listen); err != nil { fmt.Println("Error happened when start rpc server:", err) return } }
4. Bidirectional streaming RPC
新的proto文件被如下给出:
syntax = "proto3"; option go_package=".;service"; message HelloRequest { // Name of the person to greet string name = 1; } message HelloResponse { // Greeting message string greeting = 1; } service HelloService { // RPC method to say hello rpc SayHello (stream HelloRequest) returns (stream HelloResponse){} }
和上文中的操作一致,同时给出server端的代码:
package main import ( "fmt" "google.golang.org/grpc" "io" "log" "net" "test_grpc/service" ) type HelloService struct { } func (hs *HelloService) SayHello(stream service.HelloService_SayHelloServer) error { for { msg, err := stream.Recv() if err == io.EOF { break } if err != nil { return err } name := msg.Name resp := &service.HelloResponse{Greeting: name} if err = stream.Send(resp); err != nil { log.Fatalf("Failed to send a resp:%s", err) } } return nil } func main() { // listen on 127.0.0.1:50051 listen, err := net.Listen("tcp", "127.0.0.1:50051") if err != nil { fmt.Println("Error happened when listen on 127.0.0.1:50051:", err) return } // grpc server s := grpc.NewServer() // register HelloService in grpc server service.RegisterHelloServiceServer(s, &HelloService{}) // start rpc server fmt.Println("Golang rpc server is waiting messages......") if err = s.Serve(listen); err != nil { fmt.Println("Error happened when start rpc server:", err) return } }
同时给出下面的client端的代码:
package main import ( "context" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "io" "log" "test_grpc/service" "time" ) func main() { // connect to server conn, err := grpc.Dial("127.0.0.1:50051", grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { fmt.Println("Connect to rpc server err:", err) return } defer conn.Close() // init service client c := service.NewHelloServiceClient(conn) // init context with timeout ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() // create stream stream, err := c.SayHello(ctx) if err != nil { log.Fatalf("could not greet: %v", err) } names := []string{"World", "Gophers", "Anthropic"} waitc := make(chan struct{}) go func() { for { resp, err := stream.Recv() if err == io.EOF { close(waitc) return } if err != nil { log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil) } log.Printf("Greeting: %s", resp.Greeting) } }() go func() { for _, name := range names { // request body req := &service.HelloRequest{Name: name} if err := stream.Send(req); err != nil { log.Fatalf("faild to send request: %v", err) } // send delay time.Sleep(1) } // 发送结束的消息 if err := stream.CloseSend(); err != nil { log.Fatalf("failed to close stream: %v", err) } }() <-waitc }
一定要注意关闭发送或者避免针对一个已经关闭stream进行发送消息,读取消息是被允许的,这里有一点类似chan
4. ALTS
4.1 ALTS的介绍
应用层传输安全(ALTS)是谷歌开发的一种相互验证和传输加密系统。它用于确保谷歌基础设施内 RPC 通信的安全。ALTS 类似于相互 TLS,但经过设计和优化,可满足 Google 生产环境的需要。ALTS在gRPC中有以下的特征:
- 使用ALTS作为传输协议创建gRPC的服务端和客户端;
- ALSTS是一个端到端的保护,具有隐私性和完成性;
- 应用可以访问对等信息比如对等服务账户;
- 支持客户端和服务端的认知;
- 最小的代码更改就能使用ALTS;
值得注意的是ALTS被全部发挥作用如果应用程序运行在CE或者GKE中
4.2 gRPC客户端使用ALTS传输安全协议
gRPC客户端使用ALTS认证去连接服务端,正如下面代码中所描述的:
import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/alts" ) altsTC := alts.NewClientCreds(alts.DefaultClientOptions()) // connect to server conn, err := grpc.Dial("127.0.0.1:50051", grpc.WithTransportCredentials(altsTC))
gRPC服务端能够使用ALTS认证来运行客户端连接到它,正如下面的描述:
import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/alts" ) altsTC := alts.NewServerCreds(alts.DefaultServerOptions()) server := grpc.NewServer(grpc.Creds(altsTC))
4.3 Server Authorization
gRPC 内置了使用 ALTS 的服务器授权支持。使用 ALTS 的 gRPC 客户端可以在建立连接前设置预期的服务器服务账户。然后,在握手结束时,服务器授权会保证服务器身份与客户端指定的服务账户之一相匹配。否则,连接将失败。
import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/alts" ) clientOpts := alts.DefaultClientOptions() clientOpts.TargetServiceAccounts = []string{expectedServerSA} altsTC := alts.NewClientCreds(clientOpts) conn, err := grpc.Dial(serverAddr, grpc.WithTransportCredentials(altsTC))