重要前提
安装AI Skills的关键前提是:必须科学上网,且开启TUN模式,这一点至关重要,直接决定安装能否顺利完成,在此郑重提醒三遍:科学上网,科学上网,科学上网。查看完整安装教程 →
grpc-microservices by manutej/luxor-claude-marketplace
npx skills add https://github.com/manutej/luxor-claude-marketplace --skill grpc-microservices一项全面的技能,用于使用 gRPC 和 Protocol Buffers 构建高性能、类型安全的微服务。此技能涵盖服务设计、所有流模式、拦截器、负载均衡、错误处理以及分布式系统的生产部署模式。
在以下情况使用此技能:
gRPC 是一个现代的开源 RPC 框架,可以在任何地方运行。它使客户端和服务器应用程序能够透明地通信,并使构建连接系统变得更加容易。
关键特性:
Protocol Buffers 是一种与语言无关、平台无关的可扩展机制,用于序列化结构化数据。
优势:
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
基本语法:
syntax = "proto3";
message User {
int32 id = 1;
string name = 2;
string email = 3;
}
gRPC 服务在 .proto 文件中定义,并指定可用的方法及其输入/输出类型。
基本服务:
service UserService {
rpc GetUser(GetUserRequest) returns (GetUserResponse);
rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
}
简单的请求-响应模式,类似于传统的 REST API 调用。
rpc GetUser(GetUserRequest) returns (GetUserResponse);
使用场景:
客户端发送一个请求,服务器返回一个响应流。
rpc ListUsers(ListUsersRequest) returns (stream User);
使用场景:
客户端发送一个请求流,服务器返回一个响应。
rpc CreateUsers(stream CreateUserRequest) returns (CreateUsersResponse);
使用场景:
客户端和服务器都独立地发送消息流。
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
使用场景:
1. 使用显式字段编号
字段编号对于向后兼容性至关重要,绝不应重复使用。
message User {
int32 id = 1; // 永远不要更改此编号
string name = 2; // 永远不要更改此编号
string email = 3; // 永远不要更改此编号
// int32 age = 4; // 已弃用 - 不要重用 4
string phone = 5; // 新字段 - 使用下一个可用的编号
}
2. 对固定集合使用枚举
enum UserRole {
USER_ROLE_UNSPECIFIED = 0; // 始终有一个零值
USER_ROLE_ADMIN = 1;
USER_ROLE_MODERATOR = 2;
USER_ROLE_MEMBER = 3;
}
message User {
int32 id = 1;
string name = 2;
UserRole role = 3;
}
3. 对复杂类型使用嵌套消息
message User {
int32 id = 1;
string name = 2;
message Address {
string street = 1;
string city = 2;
string state = 3;
string zip = 4;
}
Address address = 3;
repeated Address additional_addresses = 4;
}
4. 对数组使用 repeated
message UserList {
repeated User users = 1;
}
message User {
int32 id = 1;
string name = 2;
repeated string tags = 3;
}
5. 对联合类型使用 oneof
message SearchRequest {
string query = 1;
oneof filter {
string category = 2;
int32 user_id = 3;
string tag = 4;
}
}
6. 使用 google.protobuf 知名类型
import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/wrappers.proto";
message Event {
string id = 1;
string name = 2;
google.protobuf.Timestamp created_at = 3;
google.protobuf.Duration duration = 4;
google.protobuf.Int32Value optional_count = 5;
}
1. 面向资源的设计
遵循适用于 RPC 的 RESTful 原则:
service UserService {
// 获取单个资源
rpc GetUser(GetUserRequest) returns (User);
// 列出资源
rpc ListUsers(ListUsersRequest) returns (ListUsersResponse);
// 创建资源
rpc CreateUser(CreateUserRequest) returns (User);
// 更新资源
rpc UpdateUser(UpdateUserRequest) returns (User);
// 删除资源
rpc DeleteUser(DeleteUserRequest) returns (google.protobuf.Empty);
}
2. 分页模式
message ListUsersRequest {
int32 page_size = 1;
string page_token = 2;
string filter = 3;
}
message ListUsersResponse {
repeated User users = 1;
string next_page_token = 2;
int32 total_count = 3;
}
3. 批量操作模式
message BatchGetUsersRequest {
repeated int32 user_ids = 1;
}
message BatchGetUsersResponse {
map<int32, User> users = 1;
repeated int32 not_found = 2;
}
4. 长时间运行操作模式
import "google/longrunning/operations.proto";
service BatchJobService {
rpc ProcessBatch(BatchRequest) returns (google.longrunning.Operation);
rpc GetOperation(GetOperationRequest) returns (google.longrunning.Operation);
}
1. 分页流
高效地流式传输大型结果集:
service ProductService {
rpc SearchProducts(SearchRequest) returns (stream Product);
}
message SearchRequest {
string query = 1;
int32 limit = 2;
}
实现(Go):
func (s *server) SearchProducts(req *pb.SearchRequest, stream pb.ProductService_SearchProductsServer) error {
products := s.db.Search(req.Query, req.Limit)
for _, product := range products {
if err := stream.Send(&product); err != nil {
return err
}
}
return nil
}
2. 实时更新
在事件发生时将更新推送给客户端:
service EventService {
rpc SubscribeToEvents(SubscribeRequest) returns (stream Event);
}
message SubscribeRequest {
repeated string event_types = 1;
google.protobuf.Timestamp since = 2;
}
3. 日志跟踪
流式传输日志或审计跟踪:
service LogService {
rpc TailLogs(TailRequest) returns (stream LogEntry);
}
message TailRequest {
string service_name = 1;
string level = 2;
int32 lines = 3;
}
1. 批量上传
客户端流式传输数据,服务器处理并返回摘要:
service UploadService {
rpc UploadImages(stream ImageChunk) returns (UploadSummary);
}
message ImageChunk {
string filename = 1;
bytes data = 2;
int32 chunk_number = 3;
}
message UploadSummary {
int32 total_images = 1;
int64 total_bytes = 2;
repeated string uploaded_filenames = 3;
}
实现(Go):
func (s *server) UploadImages(stream pb.UploadService_UploadImagesServer) error {
var count int32
var totalBytes int64
var filenames []string
for {
chunk, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.UploadSummary{
TotalImages: count,
TotalBytes: totalBytes,
UploadedFilenames: filenames,
})
}
if err != nil {
return err
}
// Process chunk
totalBytes += int64(len(chunk.Data))
if chunk.ChunkNumber == 0 {
count++
filenames = append(filenames, chunk.Filename)
}
}
}
2. 聚合
客户端发送多个数据点,服务器聚合:
service AnalyticsService {
rpc RecordMetrics(stream Metric) returns (AggregateResult);
}
message Metric {
string name = 1;
double value = 2;
google.protobuf.Timestamp timestamp = 3;
}
1. 聊天应用
实时双向通信:
service ChatService {
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}
message ChatMessage {
string user_id = 1;
string room_id = 2;
string content = 3;
google.protobuf.Timestamp timestamp = 4;
}
实现(Go):
func (s *server) Chat(stream pb.ChatService_ChatServer) error {
// Create channel for this client
clientID := uuid.New().String()
msgChan := make(chan *pb.ChatMessage, 10)
// Register client
s.mu.Lock()
s.clients[clientID] = msgChan
s.mu.Unlock()
defer func() {
s.mu.Lock()
delete(s.clients, clientID)
close(msgChan)
s.mu.Unlock()
}()
// Goroutine to send messages to client
go func() {
for msg := range msgChan {
if err := stream.Send(msg); err != nil {
return
}
}
}()
// Receive messages from client
for {
msg, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
// Broadcast to all clients in room
s.broadcast(msg)
}
}
2. 实时协作
实时文档编辑:
service CollaborationService {
rpc Collaborate(stream DocumentEdit) returns (stream DocumentEdit);
}
message DocumentEdit {
string document_id = 1;
string user_id = 2;
int32 position = 3;
string content = 4;
enum Operation {
OPERATION_UNSPECIFIED = 0;
OPERATION_INSERT = 1;
OPERATION_DELETE = 2;
OPERATION_UPDATE = 3;
}
Operation operation = 5;
}
3. 游戏状态同步
实时多人游戏更新:
service GameService {
rpc PlayGame(stream GameAction) returns (stream GameState);
}
message GameAction {
string player_id = 1;
string game_id = 2;
string action_type = 3;
bytes action_data = 4;
}
message GameState {
string game_id = 1;
repeated PlayerState players = 2;
bytes world_state = 3;
google.protobuf.Timestamp timestamp = 4;
}
拦截器提供了一种向 gRPC 服务添加横切关注点的方式。
服务器端一元拦截器:
func UnaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
// Pre-processing
start := time.Now()
log.Printf("Method: %s, Start: %v", info.FullMethod, start)
// Call the handler
resp, err := handler(ctx, req)
// Post-processing
duration := time.Since(start)
log.Printf("Method: %s, Duration: %v, Error: %v",
info.FullMethod, duration, err)
return resp, err
}
}
// Usage
server := grpc.NewServer(
grpc.UnaryInterceptor(UnaryServerInterceptor()),
)
客户端一元拦截器:
func UnaryClientInterceptor() grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
start := time.Now()
// Call the remote method
err := invoker(ctx, method, req, reply, cc, opts...)
log.Printf("Method: %s, Duration: %v, Error: %v",
method, time.Since(start), err)
return err
}
}
// Usage
conn, err := grpc.Dial(
address,
grpc.WithUnaryInterceptor(UnaryClientInterceptor()),
)
服务器端流式拦截器:
func StreamServerInterceptor() grpc.StreamServerInterceptor {
return func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
log.Printf("Stream started: %s", info.FullMethod)
err := handler(srv, ss)
log.Printf("Stream ended: %s, Error: %v", info.FullMethod, err)
return err
}
}
func AuthInterceptor(secret string) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
// Extract metadata
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "no metadata")
}
// Get authorization token
tokens := md["authorization"]
if len(tokens) == 0 {
return nil, status.Error(codes.Unauthenticated, "no token")
}
// Validate token
token := tokens[0]
claims, err := validateJWT(token, secret)
if err != nil {
return nil, status.Error(codes.Unauthenticated, "invalid token")
}
// Add claims to context
ctx = context.WithValue(ctx, "claims", claims)
return handler(ctx, req)
}
}
func LoggingInterceptor(logger *log.Logger) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
start := time.Now()
// Get request ID from metadata
requestID := getRequestID(ctx)
logger.Printf("[%s] Request: %s", requestID, info.FullMethod)
resp, err := handler(ctx, req)
duration := time.Since(start)
statusCode := status.Code(err)
logger.Printf("[%s] Response: %s, Duration: %v, Status: %v",
requestID, info.FullMethod, duration, statusCode)
return resp, err
}
}
func RateLimitInterceptor(limiter *rate.Limiter) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
if !limiter.Allow() {
return nil, status.Error(
codes.ResourceExhausted,
"rate limit exceeded",
)
}
return handler(ctx, req)
}
}
func TracingInterceptor(tracer trace.Tracer) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
ctx, span := tracer.Start(ctx, info.FullMethod)
defer span.End()
// Add attributes
span.SetAttributes(
attribute.String("rpc.method", info.FullMethod),
attribute.String("rpc.service", "MyService"),
)
resp, err := handler(ctx, req)
if err != nil {
span.RecordError(err)
span.SetStatus(codes2.Error, err.Error())
} else {
span.SetStatus(codes2.Ok, "")
}
return resp, err
}
}
func RecoveryInterceptor() grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp interface{}, err error) {
defer func() {
if r := recover(); r != nil {
log.Printf("Panic recovered: %v\n%s", r, debug.Stack())
err = status.Error(codes.Internal, "internal server error")
}
}()
return handler(ctx, req)
}
}
server := grpc.NewServer(
grpc.ChainUnaryInterceptor(
RecoveryInterceptor(),
LoggingInterceptor(logger),
TracingInterceptor(tracer),
AuthInterceptor(jwtSecret),
RateLimitInterceptor(limiter),
),
grpc.ChainStreamInterceptor(
StreamRecoveryInterceptor(),
StreamLoggingInterceptor(logger),
),
)
gRPC 提供内置的客户端负载均衡,支持多种策略。
1. 轮询
import "google.golang.org/grpc/balancer/roundrobin"
conn, err := grpc.Dial(
"dns:///my-service.example.com:50051",
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
grpc.WithInsecure(),
)
2. 优先选择(默认)
conn, err := grpc.Dial(
"dns:///my-service.example.com:50051",
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"pick_first"}`),
grpc.WithInsecure(),
)
3. 自定义解析器
实现自定义服务发现:
type exampleResolver struct {
target resolver.Target
cc resolver.ClientConn
addrsStore map[string][]string
}
func (r *exampleResolver) ResolveNow(resolver.ResolveNowOptions) {
// Discover service addresses
addresses := r.discoverServices()
var addrs []resolver.Address
for _, addr := range addresses {
addrs = append(addrs, resolver.Address{Addr: addr})
}
r.cc.UpdateState(resolver.State{Addresses: addrs})
}
func init() {
resolver.Register(&exampleResolverBuilder{})
}
Kubernetes 与服务网格(Istio/Linkerd):
apiVersion: v1
kind: Service
metadata:
name: grpc-service
spec:
selector:
app: grpc-app
ports:
- name: grpc
port: 50051
targetPort: 50051
protocol: TCP
type: ClusterIP
---
apiVersion: networking.istio.io/v1alpha3
kind: DestinationRule
metadata:
name: grpc-service
spec:
host: grpc-service
trafficPolicy:
loadBalancer:
simple: ROUND_ROBIN
connectionPool:
http:
http2MaxRequests: 1000
maxRequestsPerConnection: 10
实现健康检查服务:
service Health {
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}
message HealthCheckRequest {
string service = 1;
}
message HealthCheckResponse {
enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
SERVICE_UNKNOWN = 3;
}
ServingStatus status = 1;
}
实现:
import "google.golang.org/grpc/health"
import healthpb "google.golang.org/grpc/health/grpc_health_v1"
healthServer := health.NewServer()
healthpb.RegisterHealthServer(grpcServer, healthServer)
// Set service status
healthServer.SetServingStatus("UserService", healthpb.HealthCheckResponse_SERVING)
gRPC 使用标准化的状态码进行错误处理:
import "google.golang.org/grpc/codes"
import "google.golang.org/grpc/status"
// Return errors with appropriate codes
func (s *server) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
if req.Id <= 0 {
return nil, status.Error(codes.InvalidArgument, "id must be positive")
}
user, err := s.db.GetUser(req.Id)
if err == sql.ErrNoRows {
return nil, status.Error(codes.NotFound, "user not found")
}
if err != nil {
return nil, status.Error(codes.Internal, "database error")
}
return user, nil
}
常见状态码:
OK:成功Canceled:操作被取消Unknown:未知错误InvalidArgument:客户端指定了无效参数DeadlineExceeded:操作在截止时间前未完成NotFound:实体未找到AlreadyExists:实体已存在PermissionDenied:权限被拒绝ResourceExhausted:资源耗尽(速率限制)FailedPrecondition:操作被拒绝(系统不在有效状态)Aborted:操作中止OutOfRange:超出有效范围Unimplemented:操作未实现Internal:内部服务器错误Unavailable:服务不可用DataLoss:不可恢复的数据丢失Unauthenticated:请求缺少有效身份验证添加结构化的错误详情:
import "google.golang.org/genproto/googleapis/rpc/errdetails"
func (s *server) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.User, error) {
// Validate request
violations := validateCreateUserRequest(req)
if len(violations) > 0 {
badRequest := &errdetails.BadRequest{}
for field, msg := range violations {
badRequest.FieldViolations = append(
badRequest.FieldViolations,
&errdetails.BadRequest_FieldViolation{
Field: field,
Description: msg,
},
)
}
st := status.New(codes.InvalidArgument, "invalid request")
st, _ = st.WithDetails(badRequest)
return nil, st.Err()
}
// Create user...
}
客户端错误处理:
resp, err := client.CreateUser(ctx, req)
if err != nil {
st := status.Convert(err)
for _, detail := range st.Details() {
switch t := detail.(type) {
case *errdetails.BadRequest:
for _, violation := range t.FieldViolations {
fmt.Printf("Invalid field %s: %s\n",
violation.Field, violation.Description)
}
}
}
}
func (s *server) ProcessOrder(ctx context.Context, req *pb.OrderRequest) (*pb.OrderResponse, error) {
// Call inventory service
inventory, err := s.inventoryClient.CheckInventory(ctx, &pb.InventoryRequest{
ProductId: req.ProductId,
})
if err != nil {
// Propagate error with additional context
st := status.Convert(err)
return nil, status.Errorf(st.Code(),
"inventory check failed: %v", st.Message())
}
// Continue processing...
}
import "google.golang.org/grpc/codes"
import "google.golang.org/grpc/status"
func CallWithRetry(ctx context.Context, maxRetries int, fn func() error) error {
var err error
for i := 0; i < maxRetries; i++ {
err = fn()
if err == nil {
return nil
}
// Check if error is retryable
st := status.Convert(err)
if !isRetryable(st.Code()) {
return err
}
// Exponential backoff
backoff := time.Duration(math.Pow(2, float64(i))) * time.Second
time.Sleep(backoff)
}
return err
}
func isRetryable(code codes.Code) bool {
return code == codes.Unavailable ||
code == codes.DeadlineExceeded ||
code == codes.ResourceExhausted
}
应该:
syntax = "proto3"reservedmessage User {
int32 id = 1;
string name = 2;
// string age = 3; // Deprecated
reserved 3;
reserved "age";
string email = 4;
google.protobuf.Int32Value phone = 5; // Optional
}
不应该:
连接管理:
// Reuse connections
var conn *grpc.ClientConn
var once sync.Once
func getConnection() *grpc.ClientConn {
once.Do(func() {
var err error
conn, err = grpc.Dial(
address,
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 3 * time.Second,
PermitWithoutStream: true,
}),
)
if err != nil {
log.Fatal(err)
}
})
return conn
}
连接池:
type ConnectionPool struct {
connections []*grpc.ClientConn
next uint32
}
func (p *ConnectionPool) GetConnection() *grpc.ClientConn {
n := atomic.AddUint32(&p.next, 1)
return p.connections[n%uint32(len(p.connections))]
}
大数据流式传输:
// Instead of this:
rpc GetAllUsers(Empty) returns (UserList); // Large response
// Use this:
rpc ListUsers(ListUsersRequest) returns (stream User); // Streamed
TLS 配置:
// Server-side
creds, err := credentials.NewServerTLSFromFile(certFile, keyFile)
server := grpc.NewServer(grpc.Creds(creds))
// Client-side
creds, err := credentials.NewClientTLSFromFile(certFile, "")
conn, err := grpc.Dial(address, grpc.WithTransportCredentials(creds))
双向 TLS(mTLS):
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
certPool := x509.NewCertPool()
ca, err := ioutil.ReadFile(caFile)
certPool.AppendCertsFromPEM(ca)
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: certPool,
})
server := grpc.NewServer(grpc.Creds(creds))
令牌身份验证:
type tokenAuth struct {
token string
}
func (t tokenAuth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"authorization": "Bearer " + t.token,
}, nil
}
func (t tokenAuth) RequireTransportSecurity() bool {
return true
}
// Usage
conn, err := grpc.Dial(
address,
grpc.WithPerRPCCredentials(tokenAuth{token: "my-token"}),
)
// Set deadline for request
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := client.GetUser(ctx, &pb.GetUserRequest{Id: 123})
if err != nil {
if status.Code(err) == codes.DeadlineExceeded {
log.Println("Request timed out")
}
}
服务器端截止时间传播:
func (s *server) ComplexOperation(ctx context.Context, req *pb.Request) (*pb.Response, error) {
// Check if deadline is already exceeded
deadline, ok := ctx.Deadline()
if ok && time.Now().After(deadline) {
return nil, status.Error(codes.DeadlineExceeded, "deadline exceeded")
}
// Propagate context to downstream calls
user, err := s.userClient.GetUser(ctx, &pb.GetUserRequest{Id: req.UserId})
if err != nil {
return nil, err
}
// Continue with remaining time...
}
Prometheus 指标:
import "github.com/grpc-ecosystem/go-grpc-prometheus"
// Server metrics
grpcMetrics := grpc_prometheus.NewServerMetrics()
server := grpc.NewServer(
grpc.UnaryInterceptor(grpcMetrics.UnaryServerInterceptor()),
grpc.StreamInterceptor(grpcMetrics.StreamServerInterceptor()),
)
grpcMetrics.InitializeMetrics(server)
// Expose metrics
http.Handle("/metrics", promhttp.Handler())
server := grpc.NewServer()
// Register services...
go func() {
if err := server.Serve(listener); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}()
// Wait for interrupt signal
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
<-quit
log.Println("Shutting down server...")
// Graceful shutdown
server.GracefulStop()
log.Println("Server stopped")
基于 URL 的版本控制:
package api.v1;
service UserServiceV1 {
rpc GetUser(GetUserRequest) returns (User);
}
package api.v2;
service UserServiceV2 {
rpc GetUser(GetUserRequest) returns (User);
}
基于字段的版本控制:
message User {
int32 id = 1;
string name = 2;
string email = 3;
// v2 additions
string phone = 4;
Address address = 5;
}
使用模拟进行单元测试:
type mockUserClient struct {
pb.UserServiceClient
getUserFunc func(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error)
}
func (m *mockUserClient) GetUser(ctx context.Context, req *pb.GetUserRequest, opts ...grpc.CallOption) (*pb.User, error) {
return m.getUserFunc(ctx, req)
}
func TestOrderService(t *testing.T) {
mockClient := &mockUserClient{
getUserFunc: func(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
return &pb.User{Id: 1, Name: "Test User"}, nil
},
}
// Test with mock...
}
集成测试:
func TestIntegration(t *testing.T) {
// Start test server
lis, err := net.Listen("tcp", ":0")
require.NoError(t, err)
server := grpc.NewServer()
pb.RegisterUserServiceServer(server, &userServer{})
go server.Serve(lis)
defer server.Stop()
// Connect client
conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure())
require.NoError(t, err)
defer conn.Close()
client := pb.NewUserServiceClient(conn)
// Test requests...
}
Dockerfile:
FROM golang:1.21-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -o server ./cmd/server
FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /root/
COPY --from=builder /app/server .
COPY --from=builder /app/proto ./proto
EXPOSE 50051
CMD ["./server"]
deployment.yaml:
apiVersion: apps/v1
kind: Deployment
metadata:
name: grpc-service
spec:
replicas: 3
selector:
matchLabels:
app: grpc-service
template:
metadata:
labels:
app: grpc-service
spec:
containers:
- name: grpc-service
image: grpc-service:latest
ports:
- containerPort: 50051
name: grpc
protocol: TCP
env:
- name: PORT
value: "50051"
livenessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:50051"]
A comprehensive skill for building high-performance, type-safe microservices using gRPC and Protocol Buffers. This skill covers service design, all streaming patterns, interceptors, load balancing, error handling, and production deployment patterns for distributed systems.
Use this skill when:
gRPC is a modern open-source RPC framework that can run anywhere. It enables client and server applications to communicate transparently and makes it easier to build connected systems.
Key Characteristics:
Protocol Buffers is a language-neutral, platform-neutral extensible mechanism for serializing structured data.
Advantages:
Basic Syntax:
syntax = "proto3";
message User {
int32 id = 1;
string name = 2;
string email = 3;
}
gRPC services are defined in .proto files and specify available methods and their input/output types.
Basic Service:
service UserService {
rpc GetUser(GetUserRequest) returns (GetUserResponse);
rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
}
Simple request-response pattern, like a traditional REST API call.
rpc GetUser(GetUserRequest) returns (GetUserResponse);
Use Cases:
Client sends one request, server returns a stream of responses.
rpc ListUsers(ListUsersRequest) returns (stream User);
Use Cases:
Client sends a stream of requests, server returns one response.
rpc CreateUsers(stream CreateUserRequest) returns (CreateUsersResponse);
Use Cases:
Both client and server send streams of messages independently.
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
Use Cases:
1. Use Explicit Field Numbers
Field numbers are critical for backward compatibility and should never be reused.
message User {
int32 id = 1; // Never change this number
string name = 2; // Never change this number
string email = 3; // Never change this number
// int32 age = 4; // DEPRECATED - don't reuse 4
string phone = 5; // New field - use next available
}
2. Use Enumerations for Fixed Sets
enum UserRole {
USER_ROLE_UNSPECIFIED = 0; // Always have a zero value
USER_ROLE_ADMIN = 1;
USER_ROLE_MODERATOR = 2;
USER_ROLE_MEMBER = 3;
}
message User {
int32 id = 1;
string name = 2;
UserRole role = 3;
}
3. Use Nested Messages for Complex Types
message User {
int32 id = 1;
string name = 2;
message Address {
string street = 1;
string city = 2;
string state = 3;
string zip = 4;
}
Address address = 3;
repeated Address additional_addresses = 4;
}
4. Userepeated for Arrays
message UserList {
repeated User users = 1;
}
message User {
int32 id = 1;
string name = 2;
repeated string tags = 3;
}
5. Useoneof for Union Types
message SearchRequest {
string query = 1;
oneof filter {
string category = 2;
int32 user_id = 3;
string tag = 4;
}
}
6. Usegoogle.protobuf Well-Known Types
import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/wrappers.proto";
message Event {
string id = 1;
string name = 2;
google.protobuf.Timestamp created_at = 3;
google.protobuf.Duration duration = 4;
google.protobuf.Int32Value optional_count = 5;
}
1. Resource-Oriented Design
Follow RESTful principles adapted for RPC:
service UserService {
// Get single resource
rpc GetUser(GetUserRequest) returns (User);
// List resources
rpc ListUsers(ListUsersRequest) returns (ListUsersResponse);
// Create resource
rpc CreateUser(CreateUserRequest) returns (User);
// Update resource
rpc UpdateUser(UpdateUserRequest) returns (User);
// Delete resource
rpc DeleteUser(DeleteUserRequest) returns (google.protobuf.Empty);
}
2. Pagination Pattern
message ListUsersRequest {
int32 page_size = 1;
string page_token = 2;
string filter = 3;
}
message ListUsersResponse {
repeated User users = 1;
string next_page_token = 2;
int32 total_count = 3;
}
3. Batch Operations Pattern
message BatchGetUsersRequest {
repeated int32 user_ids = 1;
}
message BatchGetUsersResponse {
map<int32, User> users = 1;
repeated int32 not_found = 2;
}
4. Long-Running Operations Pattern
import "google/longrunning/operations.proto";
service BatchJobService {
rpc ProcessBatch(BatchRequest) returns (google.longrunning.Operation);
rpc GetOperation(GetOperationRequest) returns (google.longrunning.Operation);
}
1. Pagination Streaming
Stream large result sets efficiently:
service ProductService {
rpc SearchProducts(SearchRequest) returns (stream Product);
}
message SearchRequest {
string query = 1;
int32 limit = 2;
}
Implementation (Go):
func (s *server) SearchProducts(req *pb.SearchRequest, stream pb.ProductService_SearchProductsServer) error {
products := s.db.Search(req.Query, req.Limit)
for _, product := range products {
if err := stream.Send(&product); err != nil {
return err
}
}
return nil
}
2. Real-Time Updates
Push updates to clients as they occur:
service EventService {
rpc SubscribeToEvents(SubscribeRequest) returns (stream Event);
}
message SubscribeRequest {
repeated string event_types = 1;
google.protobuf.Timestamp since = 2;
}
3. Log Tailing
Stream logs or audit trails:
service LogService {
rpc TailLogs(TailRequest) returns (stream LogEntry);
}
message TailRequest {
string service_name = 1;
string level = 2;
int32 lines = 3;
}
1. Bulk Upload
Client streams data, server processes and returns summary:
service UploadService {
rpc UploadImages(stream ImageChunk) returns (UploadSummary);
}
message ImageChunk {
string filename = 1;
bytes data = 2;
int32 chunk_number = 3;
}
message UploadSummary {
int32 total_images = 1;
int64 total_bytes = 2;
repeated string uploaded_filenames = 3;
}
Implementation (Go):
func (s *server) UploadImages(stream pb.UploadService_UploadImagesServer) error {
var count int32
var totalBytes int64
var filenames []string
for {
chunk, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.UploadSummary{
TotalImages: count,
TotalBytes: totalBytes,
UploadedFilenames: filenames,
})
}
if err != nil {
return err
}
// Process chunk
totalBytes += int64(len(chunk.Data))
if chunk.ChunkNumber == 0 {
count++
filenames = append(filenames, chunk.Filename)
}
}
}
2. Aggregation
Client sends multiple data points, server aggregates:
service AnalyticsService {
rpc RecordMetrics(stream Metric) returns (AggregateResult);
}
message Metric {
string name = 1;
double value = 2;
google.protobuf.Timestamp timestamp = 3;
}
1. Chat Application
Real-time bidirectional communication:
service ChatService {
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}
message ChatMessage {
string user_id = 1;
string room_id = 2;
string content = 3;
google.protobuf.Timestamp timestamp = 4;
}
Implementation (Go):
func (s *server) Chat(stream pb.ChatService_ChatServer) error {
// Create channel for this client
clientID := uuid.New().String()
msgChan := make(chan *pb.ChatMessage, 10)
// Register client
s.mu.Lock()
s.clients[clientID] = msgChan
s.mu.Unlock()
defer func() {
s.mu.Lock()
delete(s.clients, clientID)
close(msgChan)
s.mu.Unlock()
}()
// Goroutine to send messages to client
go func() {
for msg := range msgChan {
if err := stream.Send(msg); err != nil {
return
}
}
}()
// Receive messages from client
for {
msg, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
// Broadcast to all clients in room
s.broadcast(msg)
}
}
2. Live Collaboration
Real-time document editing:
service CollaborationService {
rpc Collaborate(stream DocumentEdit) returns (stream DocumentEdit);
}
message DocumentEdit {
string document_id = 1;
string user_id = 2;
int32 position = 3;
string content = 4;
enum Operation {
OPERATION_UNSPECIFIED = 0;
OPERATION_INSERT = 1;
OPERATION_DELETE = 2;
OPERATION_UPDATE = 3;
}
Operation operation = 5;
}
3. Game State Synchronization
Real-time multiplayer game updates:
service GameService {
rpc PlayGame(stream GameAction) returns (stream GameState);
}
message GameAction {
string player_id = 1;
string game_id = 2;
string action_type = 3;
bytes action_data = 4;
}
message GameState {
string game_id = 1;
repeated PlayerState players = 2;
bytes world_state = 3;
google.protobuf.Timestamp timestamp = 4;
}
Interceptors provide a way to add cross-cutting concerns to gRPC services.
Server-side Unary Interceptor:
func UnaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
// Pre-processing
start := time.Now()
log.Printf("Method: %s, Start: %v", info.FullMethod, start)
// Call the handler
resp, err := handler(ctx, req)
// Post-processing
duration := time.Since(start)
log.Printf("Method: %s, Duration: %v, Error: %v",
info.FullMethod, duration, err)
return resp, err
}
}
// Usage
server := grpc.NewServer(
grpc.UnaryInterceptor(UnaryServerInterceptor()),
)
Client-side Unary Interceptor:
func UnaryClientInterceptor() grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
start := time.Now()
// Call the remote method
err := invoker(ctx, method, req, reply, cc, opts...)
log.Printf("Method: %s, Duration: %v, Error: %v",
method, time.Since(start), err)
return err
}
}
// Usage
conn, err := grpc.Dial(
address,
grpc.WithUnaryInterceptor(UnaryClientInterceptor()),
)
Server-side Stream Interceptor:
func StreamServerInterceptor() grpc.StreamServerInterceptor {
return func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
log.Printf("Stream started: %s", info.FullMethod)
err := handler(srv, ss)
log.Printf("Stream ended: %s, Error: %v", info.FullMethod, err)
return err
}
}
func AuthInterceptor(secret string) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
// Extract metadata
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "no metadata")
}
// Get authorization token
tokens := md["authorization"]
if len(tokens) == 0 {
return nil, status.Error(codes.Unauthenticated, "no token")
}
// Validate token
token := tokens[0]
claims, err := validateJWT(token, secret)
if err != nil {
return nil, status.Error(codes.Unauthenticated, "invalid token")
}
// Add claims to context
ctx = context.WithValue(ctx, "claims", claims)
return handler(ctx, req)
}
}
func LoggingInterceptor(logger *log.Logger) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
start := time.Now()
// Get request ID from metadata
requestID := getRequestID(ctx)
logger.Printf("[%s] Request: %s", requestID, info.FullMethod)
resp, err := handler(ctx, req)
duration := time.Since(start)
statusCode := status.Code(err)
logger.Printf("[%s] Response: %s, Duration: %v, Status: %v",
requestID, info.FullMethod, duration, statusCode)
return resp, err
}
}
func RateLimitInterceptor(limiter *rate.Limiter) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
if !limiter.Allow() {
return nil, status.Error(
codes.ResourceExhausted,
"rate limit exceeded",
)
}
return handler(ctx, req)
}
}
func TracingInterceptor(tracer trace.Tracer) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
ctx, span := tracer.Start(ctx, info.FullMethod)
defer span.End()
// Add attributes
span.SetAttributes(
attribute.String("rpc.method", info.FullMethod),
attribute.String("rpc.service", "MyService"),
)
resp, err := handler(ctx, req)
if err != nil {
span.RecordError(err)
span.SetStatus(codes2.Error, err.Error())
} else {
span.SetStatus(codes2.Ok, "")
}
return resp, err
}
}
func RecoveryInterceptor() grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp interface{}, err error) {
defer func() {
if r := recover(); r != nil {
log.Printf("Panic recovered: %v\n%s", r, debug.Stack())
err = status.Error(codes.Internal, "internal server error")
}
}()
return handler(ctx, req)
}
}
server := grpc.NewServer(
grpc.ChainUnaryInterceptor(
RecoveryInterceptor(),
LoggingInterceptor(logger),
TracingInterceptor(tracer),
AuthInterceptor(jwtSecret),
RateLimitInterceptor(limiter),
),
grpc.ChainStreamInterceptor(
StreamRecoveryInterceptor(),
StreamLoggingInterceptor(logger),
),
)
gRPC provides built-in client-side load balancing with multiple policies.
1. Round Robin
import "google.golang.org/grpc/balancer/roundrobin"
conn, err := grpc.Dial(
"dns:///my-service.example.com:50051",
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
grpc.WithInsecure(),
)
2. Pick First (Default)
conn, err := grpc.Dial(
"dns:///my-service.example.com:50051",
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"pick_first"}`),
grpc.WithInsecure(),
)
3. Custom Resolver
Implement custom service discovery:
type exampleResolver struct {
target resolver.Target
cc resolver.ClientConn
addrsStore map[string][]string
}
func (r *exampleResolver) ResolveNow(resolver.ResolveNowOptions) {
// Discover service addresses
addresses := r.discoverServices()
var addrs []resolver.Address
for _, addr := range addresses {
addrs = append(addrs, resolver.Address{Addr: addr})
}
r.cc.UpdateState(resolver.State{Addresses: addrs})
}
func init() {
resolver.Register(&exampleResolverBuilder{})
}
Kubernetes with Service Mesh (Istio/Linkerd):
apiVersion: v1
kind: Service
metadata:
name: grpc-service
spec:
selector:
app: grpc-app
ports:
- name: grpc
port: 50051
targetPort: 50051
protocol: TCP
type: ClusterIP
---
apiVersion: networking.istio.io/v1alpha3
kind: DestinationRule
metadata:
name: grpc-service
spec:
host: grpc-service
trafficPolicy:
loadBalancer:
simple: ROUND_ROBIN
connectionPool:
http:
http2MaxRequests: 1000
maxRequestsPerConnection: 10
Implement health check service:
service Health {
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}
message HealthCheckRequest {
string service = 1;
}
message HealthCheckResponse {
enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
SERVICE_UNKNOWN = 3;
}
ServingStatus status = 1;
}
Implementation:
import "google.golang.org/grpc/health"
import healthpb "google.golang.org/grpc/health/grpc_health_v1"
healthServer := health.NewServer()
healthpb.RegisterHealthServer(grpcServer, healthServer)
// Set service status
healthServer.SetServingStatus("UserService", healthpb.HealthCheckResponse_SERVING)
gRPC uses standardized status codes for error handling:
import "google.golang.org/grpc/codes"
import "google.golang.org/grpc/status"
// Return errors with appropriate codes
func (s *server) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
if req.Id <= 0 {
return nil, status.Error(codes.InvalidArgument, "id must be positive")
}
user, err := s.db.GetUser(req.Id)
if err == sql.ErrNoRows {
return nil, status.Error(codes.NotFound, "user not found")
}
if err != nil {
return nil, status.Error(codes.Internal, "database error")
}
return user, nil
}
Common Status Codes:
OK: SuccessCanceled: Operation was cancelledUnknown: Unknown errorInvalidArgument: Client specified invalid argumentDeadlineExceeded: Deadline expired before operationNotFound: Entity not foundAlreadyExists: Entity already existsPermissionDenied: Permission deniedResourceExhausted: Resource exhausted (rate limit)FailedPrecondition: Operation rejected (system not in valid state)Add structured error details:
import "google.golang.org/genproto/googleapis/rpc/errdetails"
func (s *server) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.User, error) {
// Validate request
violations := validateCreateUserRequest(req)
if len(violations) > 0 {
badRequest := &errdetails.BadRequest{}
for field, msg := range violations {
badRequest.FieldViolations = append(
badRequest.FieldViolations,
&errdetails.BadRequest_FieldViolation{
Field: field,
Description: msg,
},
)
}
st := status.New(codes.InvalidArgument, "invalid request")
st, _ = st.WithDetails(badRequest)
return nil, st.Err()
}
// Create user...
}
Client-side error handling:
resp, err := client.CreateUser(ctx, req)
if err != nil {
st := status.Convert(err)
for _, detail := range st.Details() {
switch t := detail.(type) {
case *errdetails.BadRequest:
for _, violation := range t.FieldViolations {
fmt.Printf("Invalid field %s: %s\n",
violation.Field, violation.Description)
}
}
}
}
func (s *server) ProcessOrder(ctx context.Context, req *pb.OrderRequest) (*pb.OrderResponse, error) {
// Call inventory service
inventory, err := s.inventoryClient.CheckInventory(ctx, &pb.InventoryRequest{
ProductId: req.ProductId,
})
if err != nil {
// Propagate error with additional context
st := status.Convert(err)
return nil, status.Errorf(st.Code(),
"inventory check failed: %v", st.Message())
}
// Continue processing...
}
import "google.golang.org/grpc/codes"
import "google.golang.org/grpc/status"
func CallWithRetry(ctx context.Context, maxRetries int, fn func() error) error {
var err error
for i := 0; i < maxRetries; i++ {
err = fn()
if err == nil {
return nil
}
// Check if error is retryable
st := status.Convert(err)
if !isRetryable(st.Code()) {
return err
}
// Exponential backoff
backoff := time.Duration(math.Pow(2, float64(i))) * time.Second
time.Sleep(backoff)
}
return err
}
func isRetryable(code codes.Code) bool {
return code == codes.Unavailable ||
code == codes.DeadlineExceeded ||
code == codes.ResourceExhausted
}
DO:
Always use syntax = "proto3"
Never reuse field numbers
Use reserved for deprecated fields
Add new fields with new numbers
Use optional wrappers for nullable fields
message User { int32 id = 1; string name = 2; // string age = 3; // Deprecated reserved 3; reserved "age";
string email = 4; google.protobuf.Int32Value phone = 5; // Optional }
DON'T:
Connection Management:
// Reuse connections
var conn *grpc.ClientConn
var once sync.Once
func getConnection() *grpc.ClientConn {
once.Do(func() {
var err error
conn, err = grpc.Dial(
address,
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 3 * time.Second,
PermitWithoutStream: true,
}),
)
if err != nil {
log.Fatal(err)
}
})
return conn
}
Connection Pooling:
type ConnectionPool struct {
connections []*grpc.ClientConn
next uint32
}
func (p *ConnectionPool) GetConnection() *grpc.ClientConn {
n := atomic.AddUint32(&p.next, 1)
return p.connections[n%uint32(len(p.connections))]
}
Streaming for Large Data:
// Instead of this:
rpc GetAllUsers(Empty) returns (UserList); // Large response
// Use this:
rpc ListUsers(ListUsersRequest) returns (stream User); // Streamed
TLS Configuration:
// Server-side
creds, err := credentials.NewServerTLSFromFile(certFile, keyFile)
server := grpc.NewServer(grpc.Creds(creds))
// Client-side
creds, err := credentials.NewClientTLSFromFile(certFile, "")
conn, err := grpc.Dial(address, grpc.WithTransportCredentials(creds))
Mutual TLS (mTLS):
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
certPool := x509.NewCertPool()
ca, err := ioutil.ReadFile(caFile)
certPool.AppendCertsFromPEM(ca)
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: certPool,
})
server := grpc.NewServer(grpc.Creds(creds))
Token Authentication:
type tokenAuth struct {
token string
}
func (t tokenAuth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"authorization": "Bearer " + t.token,
}, nil
}
func (t tokenAuth) RequireTransportSecurity() bool {
return true
}
// Usage
conn, err := grpc.Dial(
address,
grpc.WithPerRPCCredentials(tokenAuth{token: "my-token"}),
)
// Set deadline for request
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := client.GetUser(ctx, &pb.GetUserRequest{Id: 123})
if err != nil {
if status.Code(err) == codes.DeadlineExceeded {
log.Println("Request timed out")
}
}
Server-side deadline propagation:
func (s *server) ComplexOperation(ctx context.Context, req *pb.Request) (*pb.Response, error) {
// Check if deadline is already exceeded
deadline, ok := ctx.Deadline()
if ok && time.Now().After(deadline) {
return nil, status.Error(codes.DeadlineExceeded, "deadline exceeded")
}
// Propagate context to downstream calls
user, err := s.userClient.GetUser(ctx, &pb.GetUserRequest{Id: req.UserId})
if err != nil {
return nil, err
}
// Continue with remaining time...
}
Prometheus Metrics:
import "github.com/grpc-ecosystem/go-grpc-prometheus"
// Server metrics
grpcMetrics := grpc_prometheus.NewServerMetrics()
server := grpc.NewServer(
grpc.UnaryInterceptor(grpcMetrics.UnaryServerInterceptor()),
grpc.StreamInterceptor(grpcMetrics.StreamServerInterceptor()),
)
grpcMetrics.InitializeMetrics(server)
// Expose metrics
http.Handle("/metrics", promhttp.Handler())
server := grpc.NewServer()
// Register services...
go func() {
if err := server.Serve(listener); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}()
// Wait for interrupt signal
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
<-quit
log.Println("Shutting down server...")
// Graceful shutdown
server.GracefulStop()
log.Println("Server stopped")
URL-based versioning:
package api.v1;
service UserServiceV1 {
rpc GetUser(GetUserRequest) returns (User);
}
package api.v2;
service UserServiceV2 {
rpc GetUser(GetUserRequest) returns (User);
}
Field-based versioning:
message User {
int32 id = 1;
string name = 2;
string email = 3;
// v2 additions
string phone = 4;
Address address = 5;
}
Unit Testing with Mocks:
type mockUserClient struct {
pb.UserServiceClient
getUserFunc func(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error)
}
func (m *mockUserClient) GetUser(ctx context.Context, req *pb.GetUserRequest, opts ...grpc.CallOption) (*pb.User, error) {
return m.getUserFunc(ctx, req)
}
func TestOrderService(t *testing.T) {
mockClient := &mockUserClient{
getUserFunc: func(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
return &pb.User{Id: 1, Name: "Test User"}, nil
},
}
// Test with mock...
}
Integration Testing:
func TestIntegration(t *testing.T) {
// Start test server
lis, err := net.Listen("tcp", ":0")
require.NoError(t, err)
server := grpc.NewServer()
pb.RegisterUserServiceServer(server, &userServer{})
go server.Serve(lis)
defer server.Stop()
// Connect client
conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure())
require.NoError(t, err)
defer conn.Close()
client := pb.NewUserServiceClient(conn)
// Test requests...
}
Dockerfile:
FROM golang:1.21-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -o server ./cmd/server
FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /root/
COPY --from=builder /app/server .
COPY --from=builder /app/proto ./proto
EXPOSE 50051
CMD ["./server"]
deployment.yaml:
apiVersion: apps/v1
kind: Deployment
metadata:
name: grpc-service
spec:
replicas: 3
selector:
matchLabels:
app: grpc-service
template:
metadata:
labels:
app: grpc-service
spec:
containers:
- name: grpc-service
image: grpc-service:latest
ports:
- containerPort: 50051
name: grpc
protocol: TCP
env:
- name: PORT
value: "50051"
livenessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:50051"]
initialDelaySeconds: 10
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:50051"]
initialDelaySeconds: 5
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"
---
apiVersion: v1
kind: Service
metadata:
name: grpc-service
spec:
selector:
app: grpc-service
ports:
- port: 50051
targetPort: 50051
protocol: TCP
name: grpc
type: ClusterIP
VirtualService for traffic routing:
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: grpc-service
spec:
hosts:
- grpc-service
http:
- match:
- headers:
version:
exact: v2
route:
- destination:
host: grpc-service
subset: v2
- route:
- destination:
host: grpc-service
subset: v1
Skill Version : 1.0.0 Last Updated : October 2025 Skill Category : Microservices, gRPC, Distributed Systems, API Design Compatible With : Go, Python, Node.js, Java, C++, C#, Ruby, and more
Weekly Installs
67
Repository
GitHub Stars
44
First Seen
Jan 22, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykWarn
Installed on
gemini-cli54
opencode53
codex52
cursor48
github-copilot48
claude-code48
Lark CLI IM 即时消息管理工具:机器人/用户身份操作聊天、消息、文件下载
41,800 周安装
Ghost Security Secrets Scanner - 机密信息扫描编排器,自动化检测代码中的敏感数据
1,100 周安装
Salesforce CRM 集成指南:使用 Membrane CLI 连接与自动化操作
1,100 周安装
MS OneDrive API 集成指南:使用 Membrane CLI 实现文件上传、共享与管理
1,100 周安装
Google Drive API 集成指南:使用 Membrane CLI 实现云端文件管理与自动化
1,100 周安装
Zoho CRM自动化指南:通过Rube MCP实现记录搜索、创建、更新与转换
62 周安装
Dropbox API 集成指南:使用 Membrane CLI 实现云存储自动化与文件管理
1,100 周安装
Aborted: Operation abortedOutOfRange: Out of valid rangeUnimplemented: Operation not implementedInternal: Internal server errorUnavailable: Service unavailableDataLoss: Unrecoverable data lossUnauthenticated: Request lacks valid authentication