-
Notifications
You must be signed in to change notification settings - Fork 0
高级使用
本文档包括:
- 超时控制
- 压缩控制
- 带宽限制
- 日志打印
- RpcController使用
- RPC失败处理
- Server/Client配置参数
- 多Server负载均衡与容错
- 辅助开发工具类
- Mock测试
- sofa-pbrpc-client工具
- 统计与监控
- HTTP支持
- Web监控
sofa-pbrpc支持三个级别的超时控制,按照生效的优先级由高到低依次为:
- Request超时:在RpcController中使用SetTimeout()函数设置超时时间(毫秒),该超时只对本次请求有效;如果不设置,则使用“Method超时”或者“Service超时”;
- Method超时:在proto文件中指定method的超时时间(毫秒),该超时对该方法的所有请求有效;如果不设置,则使用Service超时;
- Service超时:在proto文件中指定service的超时时间(毫秒),该超时对该服务的所有方法的所有请求都有效;如果不设置,则使用默认值(10000毫秒)。
在proto文件中指定“method超时”和“Service超时”的样例如下:
import "sofa/pbrpc/rpc_option.proto";
package sofa.pbrpc.test;
option cc_generic_services = true;
message SleepRequest {
required int32 sleep_time = 1; // in seconds
}
message SleepResponse {
required string message = 1;
}
service SleepServer {
// The service timeout is 2 seconds.
option (sofa.pbrpc.service_timeout) = 2000;
rpc SleepWithServiceTimeout(SleepRequest) returns(SleepResponse);
// The method timeout is 4 seconds.
rpc SleepWithMethodTimeout(SleepRequest) returns(SleepResponse) {
option (sofa.pbrpc.method_timeout) = 4000;
}
}
注意:
- 如果在proto中指定超时,需要import "sofa/pbrpc/rpc_option.proto",并且编译proto文件时还需要添加protobuf头文件路径到proto_path中,具体可参考样例“sample/timeout_sample”;
RpcController中设置超时时间的接口如下:
// Set expect timeout in milli-seconds of the call. If timeout is not set
// or set no more than 0, actual timeout will be taken from proto options.
void SetTimeout(int64 timeout_in_ms);
sofa-pbrpc支持两个级别的数据压缩控制,按照生效优先级由高到低依次为:
- Request压缩控制:在RpcController中使用SetRequestCompressType()和SetResponseCompressType()函数设置请求和回复的数据压缩类型,该设置只对本次请求有效;如果不设置,则使用“Method压缩控制”;
- Method压缩控制:在proto文件中使用扩展option来指定method的Request或者Response的压缩类型,该超时对该方法的所有请求有效;如果不设置,则使用默认值(不压缩);
目前支持的压缩类型包括:
- None
- Gzip
- Zlib
- Snappy
- LZ4
在proto文件中指定压缩类型的样例如下:
import "sofa/pbrpc/rpc_option.proto";
package sofa.pbrpc.test;
option cc_generic_services = true;
message EchoRequest {
required string message = 1;
}
message EchoResponse {
required string message = 1;
}
service EchoServer {
rpc Echo(EchoRequest) returns(EchoResponse) {
option (sofa.pbrpc.request_compress_type) = CompressTypeGzip;
option (sofa.pbrpc.response_compress_type) = CompressTypeGzip;
}
}
RpcController中设置压缩类型的接口如下:
// Set compress type of the request message.
// Supported types:
// CompressTypeNone
// CompressTypeGzip
// CompressTypeZlib
// CompressTypeSnappy
// CompressTypeLZ4
void SetRequestCompressType(CompressType compress_type);
// Set expected compress type of the response message.
// Supported types:
// CompressTypeNone
// CompressTypeGzip
// CompressTypeZlib
// CompressTypeSnappy
// CompressTypeLZ4
void SetResponseCompressType(CompressType compress_type);
sofa-pbrpc支持在RpcServer和RpcClient级别在Options中指定参数就可以控制网络带宽限制,包括“出带宽”、“入带宽”,使流量控制对应用层透明,简化应用层的工作。
在RpcServerOptions和RpcClientOptions中可以分别指定Server端和Client端的最大入带宽“max_throughput_in”和最大出带宽“max_throughput_out”,参数用法与语义如下(默认为-1,表示不控制):
// Network throughput limit.
// The network bandwidth is shared by all connections:
// * busy connections get more bandwidth.
// * the total bandwidth of all connections will not exceed the limit.
int max_throughput_in; // max network in throughput for all connections.
// in MB/s, should >= -1, -1 means no limit, default -1.
int max_throughput_out; // max network out throughput for all connections.
// in MB/s, should >= -1, -1 means no limit, default -1.
对于client端,如果限定了带宽流量,同时请求发送速度过快,来不及发出去的请求就会积压在Request Pending Buffer中。用户可以在Options指定该buffer的大小,如果buffer满了,请求就会立即返回RPC_ERROR_SEND_BUFFER_FULL的错误,用户可以根据需要进行处理,譬如减缓发送速度。在 RpcServerOptions 和 RpcClientOptions 中都可以指定“max_pending_buffer_size”:
int max_pending_buffer_size; // max size of pending buffer for one connection.
// in MB, should >= 0, 0 means no buffer, default 2.
另外需注意:流量控制依据的是用户业务数据的数据量,不包含socket底层的包头协议数据的数据量,所以一般实际出入的数据量会比指定的略大。
以下是某实际系统中的网络流量监控图(带宽限制设置为15MB/s):
sofa-pbrpc提供日志打印函数SLOG(),将日志打印到标准错误输出,使用方式类似于printf()。
日志级别由高到低为:
- FATAL
- ERROR
- WARNING
- INFO
- TRACE
- DEBUG
通过打印级别控制日志输出,只会输出高于或者等于打印级别的日志,默认级别为ERROR。
用户可通过SOFA_PBRPC_SET_LOG_LEVEL()设置打印级别,譬如设置为INFO:
SOFA_PBRPC_SET_LOG_LEVEL(INFO);
日志使用样例:
SLOG(ERROR, "error message: %s", message);
RpcController用于控制单次请求的调用过程。
在Client端调用方法时,都需要传入一个RpcController,其作用是:
- 在发送请求前:控制请求的超时时间,设置Request或者Response的压缩类型;
- 在请求完成后:获取本地和远程的网络地址,获取(成功/失败)状态,如果失败可获取错误信息,检查请求是否已发送至远端;
在Server端服务实现中,外层也会传入一个RpcController,其作用是:
- 获取本地和远程的网络地址;
- 如果服务失败,设置失败标志和错误信息;
使用RpcController时需注意如下几点:
- RpcController用于控制单次请求的调用过程,每次RPC请求都需要创建一个RpcController,在本次请求完成前,其不得释放或者重用;
- 请求完成后,该RpcController可以使用Reset()方法恢复初始状态以进行重用;
- RpcController的不同接口有不同的使用时间范围,只有在规定的时间范围内使用,接口才能返回正确结果,否则行为是不确定的;
RPC请求失败时,RpcController的Failed()接口会返回true,同时错误信息可以从ErrorCode()和ErrorText()中获得。
错误的原因主要有两大类:
- RPC类的错误:包括缓冲区满、请求没有发出去(网络错误)、请求发出去了但没有在规定时间内得到回复(超时);其中网络错误的原因又有多种,譬如连接不上、网络异常、请求的服务不存在等。
- 服务执行本身的错误(非RPC错误):Server端在服务执行过程中遇到了错误,使用SetFailed(reason)接口设置错误标识和失败原因;Client端的RpcController行为如下:
- Failed()会返回true;
- ErrorCode()返回的错误码是RPC_ERROR_FROM_USER(错误码的更多定义可参见文件"sofa/pbrpc/rpc_error_code.h");
- ErrorText()返回的错误描述就是Server端通过SetFailed()传入的reason;
参数名 | 参数说明 | 默认值 |
---|---|---|
work_thread_num | 工作线程数,建议等于机器核数 | 8 |
keep_alive_time | 空闲连接维持时间(s),-1表示总是保持连接 | -1 |
max_pending_buffer_size | 发送队列buffer大小(MB) | 2 |
max_throughput_in | 最大入带宽限制(MB/s),-1表示无限制 | -1 |
max_throughput_out | 最大出带宽限制(MB/s),-1表示无限制 | -1 |
disable_builtin_services | 禁止导出内建服务BuiltinService | false |
disable_list_service | 禁止导出ListService,避免公开服务的protobuf协议 | false |
参数名 | 参数说明 | 默认值 |
---|---|---|
work_thread_num | 工作线程数 | 4 |
callback_thread_num | 回调线程数 | 4 |
max_pending_buffer_size | 发送队列buffer大小(MB) | 2 |
max_throughput_in | 最大入带宽限制(MB/s),-1表示无限制 | -1 |
max_throughput_out | 最大出带宽限制(MB/s),-1表示无限制 | -1 |
在创建RpcChannel时,可以指定多个功能对等的Server地址。在调用服务时,会根据负载均衡策略选择合适的Server发送请求,并自动进行容错和探活处理。
RpcChannel支持点对多点的构造函数:
// Create multiple server points by server address list.
// The "rpc_client" is owned by the caller.
RpcChannel(RpcClient* rpc_client,
const std::vector<std::string>& address_list,
const RpcChannelOptions& options = RpcChannelOptions());
// Create multiple server points by address provider.
// The "rpc_client" is owned by the caller.
// The "address_provider" is owned by the caller.
RpcChannel(RpcClient* rpc_client,
AddressProvider* address_provider,
const RpcChannelOptions& options = RpcChannelOptions());
在上面第二个函数中,用户可以提供一个AddressProvider,支持动态增加或者删除server地址。
负载均衡策略:
- 对于每个单点channel,记录其“未完成的调用数(Not Done Calling Count)”,数量越少,表示负载越轻;
- 下次选择server的时候,优先选择“未完成的调用数”最少的机器;
- 在“未完成的调用数”相同的情况下,优先选择“最近最长时间未使用的机器”。
容错与探活策略:
- 每个server内置一个BuiltinService,提供Health()方法用于探活;
- 在Client端维护两个队列:“活动队列”和“待探活队列”;
- RPC调用选择server的时候,优先从“活动队列”中选择;
- 一旦某个server的RPC调用出错,则将其加入“待探活队列”;
- Client周期性地(每隔5秒)向“待探活队列”中的机器发送探活请求,如果探活成功,则重新放回到“活动队列”。
具体策略实现可以参考src/sofa/pbrpc/dynamic_rpc_channel_impl.cc。
使用样例代码可以参考test/perf_test/client_multi_server.cc。
AddressProvider的高级使用可以参考sample/multi_server_sample。
sofa-pbrpc提供了一些方便用户开发的工具类,包括:
类别 | 头文件 | 说明 |
---|---|---|
智能指针 | sofa/pbrpc/smart_ptr/smart_ptr.hpp | boost/smart_ptr的精简版,包括scoped_ptr, shared_ptr, weak_ptr, enable_shared_from_this |
原子操作 | sofa/pbrpc/atomic.h | 包括atomic_inc, atomic_dec, atomic_add, atomic_sub, atomic_comp_swap等 |
原子计数器 | sofa/pbrpc/counter.h | 包括AtomicCounter, AtomicCounter64 |
锁操作 | sofa/pbrpc/locks.h | 包括ScopedLocker, SpinLock, MutexLock, RWLock, ConditionVariable |
线程组 | sofa/pbrpc/thread_group.h | 线程组ThreadGroup,可以使用dispatch和post两种方式分派执行体 |
超时管理器 | sofa/pbrpc/timeout_manager.h | 超时管理器TimeoutManager,可以注册One-Time或者Repeating的超时事件 |
Closure | sofa/pbrpc/closure.h | goole::protobuf::Closure的扩展,以及NewClosure()和NewPermanentClosure()方法;只支持pre-bind;支持shared_ptr作为类方法绑定的this指针 |
ExtClosure | sofa/pbrpc/closure.h | Closure的加强版,以及NewExtClosure()和NewPermanentExtClosure()方法;同时支持pre-bind和post-bind;支持shared_ptr作为类方法绑定的this指针 |
具体使用方法请直接参考头文件。
sofa-pbrpc提供了Mock测试支持,便于用户编写测试程序。
Mock功能的接口主要参考"sofa/pbrpc/mock_test_helper.h"文件,使用样例可参考"sample/mock_sample/"。
主要提供了五个宏:
// Enable or disable the mock feature. Default disabled.
// The mock channel and mock methods will take effect iff mock enabled.
#define SOFA_PBRPC_ENABLE_MOCK() ::sofa::pbrpc::enable_mock()
#define SOFA_PBRPC_DISABLE_MOCK() ::sofa::pbrpc::disable_mock()
// If you create a channel with address of SOFA_PBRPC_MOCK_CHANNEL_ADDRESS, then the channel is a mock
// channel. The mock channel will not create real socket connection, but just uses mock methods.
#define SOFA_PBRPC_MOCK_CHANNEL_ADDRESS "/mock/"
// All mock method implements should use this function signature.
typedef ExtClosure<void(
::google::protobuf::RpcController*,
const ::google::protobuf::Message*,
::google::protobuf::Message*,
::google::protobuf::Closure*)> MockMethodHookFunction;
// Register a mock method implement. If mock enabled, all channels will prefer to call mock
// method first. If the corresponding mock method is not registered, then call the real method.
//
// "method_name" is the full name of the method to be mocked, should be a c-style string.
// "mock_method" is the mock method hook function, should be type of "MockMethodHookFunction*".
//
// For example:
// MockMethodHookFunction* mock_method = sofa::pbrpc::NewPermanentExtClosure(&MockEcho);
// SOFA_PBRPC_REGISTER_MOCK_METHOD("sofa.pbrpc.test.EchoServer.Echo", mock_method);
#define SOFA_PBRPC_REGISTER_MOCK_METHOD(method_name, mock_method) \
::sofa::pbrpc::MockTestHelper::GlobalInstance()->RegisterMockMethod(method_name, (mock_method))
// Clear all registered mock methods. This will not delete the cleared hook functions, which
// are take ownership by user.
#define SOFA_PBRPC_CLEAR_MOCK_METHOD() \
::sofa::pbrpc::MockTestHelper::GlobalInstance()->ClearMockMethod()
sofa-pbrpc提供了一个客户端工具sofa-pbrpc-client(位于"bin/"文件夹下),用于查询server状态、获取服务列表及proto描述、获取服务调用统计、构建文本快速发送rpc请求等。
具体功能:
- 查询server的健康状况(health)、配置参数(option)、负载情况(status);
- 查询server对外提供的服务列表(list);
- 获取服务的protobuf描述信息(desc);
- 获取服务调用统计(stat),包括处理请求数、平均处理时间、最大处理时间等;
- 使用text格式的请求数据,以通用的方式向server的某个method发送rpc请求调用(call);
使用帮助:
Usage: sofa-pbrpc-client <server-address> <sub-command> [args]
Available subcommands:
help
Print this usage help.
health
Check if the server is healthy.
status
Get status of the server.
option
Get RpcServerOptions of the server.
list
List all services provided by the server.
desc <protobuf-type-name>
Get descriptor of a protobuf type (service/message/enum).
call <method-full-name> <request-message-text> [timeout-in-ms]
Call a method using the text format of request message.
The "timeout-in-ms" is optional, default is 3000 milli-seconds.
stat [service-full-name] [period-in-seconds]
Get the service statistics in the latest period of seconds.
The "service-full-name" is optional, default is "all".
The "period-in-seconds" is optional, default is 60 seconds.
使用注意:
- health、status、option、stat命令要求server端配置满足:disable_builtin_service = false;
- list、desc、call命令要求server端配置满足:disable_builtin_service = false && disable_list_service = false;
- 使用"call"命令发起rpc调用时,用户需要按照protobuf的text格式构造Request数据,其使用google.protobuf.TextFormat进行序列化和反序列化。返回的Response数据也使用text格式打印出来。实际上,text格式也就是message->DebugString()打印出来的格式。如果字符串中包含回车,在作为命令行参数时建议使用单引号括起来。实现上,sofa-pbrpc-client首先获取Server端服务的描述信息,然后通过反射的方式构建请求数据,发送rpc请求。这个功能对调试很有帮助,另外这个思路也可以用来实现http代理。
使用样例:
向监听地址为127.0.0.1:12321的server发送rpc调用"sofa.pbrpc.test.EchoServer.Echo"
$ ./sofa-pbrpc-client 127.0.0.1:12321 call sofa.pbrpc.test.EchoServer.Echo \
'message:"hello from qinzuoyan01"'
Response:
------------------------------
message: "echo message: hello from qinzuoyan01"
------------------------------
sofa-pbrpc支持对服务的调用进行统计,并以内建服务的方式导出统计信息。内建服务的proto定义参见"sofa/pbrpc/builtin_service.proto"。
包含如下统计项:
- 服务在最近N秒中调用的总成功次数、总失败次数;
- 服务各个方法在最近N秒中调用的总成功次数、总失败次数、成功请求的平均耗时和最大耗时、失败请求的平均耗时和最大耗时;
- server端最多存储最近600秒的统计数据;
使用sofa-pbrpc-client工具,可以很容易地获取统计信息,譬如:
$ ./sofa-pbrpc-client 127.0.0.1:12321 stat sofa.pbrpc.test.EchoServer 10
上述命令用于获取"sofa.pbrpc.test.EchoServer.Echo"在最近10秒的调用统计,结果样例如下:
Response:
------------------------------
service_stats {
service_name: "sofa.pbrpc.test.EchoServer"
period_seconds: 10
succeed_count: 1151699
failed_count: 0
method_stats {
method_name: "sofa.pbrpc.test.EchoServer.Echo"
succeed_count: 1151699
succeed_avg_time_us: 0.98021704
succeed_max_time_us: 1108
failed_count: 0
failed_avg_time_us: 0
failed_max_time_us: 0
}
}
------------------------------
除此之外,内建服务还提供了一些监控server状态的接口。sofa-pbrpc-client工具也提供了这些子命令:
- 使用"health"子命令检查server是否存在;
- 使用"status"子命令查看server的一些状态信息,包括连接数、服务数、以及Pending Buffer的一些信息;
- 使用"option"子命令查看server当前的配置参数;
- 使用"list"子命令查看server注册了哪些服务;
- 使用"desc"子命令查询服务相关类型的proto描述;
sofa-pbrpc从1.0.1开始支持HTTP方式调用服务,同时支持原生POST,GET以及扩展的POST PROTOBUF三种方式提交。另外还提供了Web监控页面,方便开发和测试。
样例可以参见sample/echo/client_http.sh。
curl -d '{"message":"Hello, world!"}' http://localhost:12321/sofa.pbrpc.test.EchoServer.Echo
规则:
- URL:http://host:port/method_full_name
- Body:Request Message的json字符串;
- 返回:Content-Type: application/json;Body: Response Message的json字符串;
curl http://localhost:12321/sofa.pbrpc.test.EchoServer.Echo?request=%7B%22message%22%3A%22Hello%2C%20world%21%22%7D
规则:
- URL:http://host:port/method_full_name?request=request_json_string
- request_json_string需要对特殊字符({}[]:",)进行url encode;
- 返回:同POST;
普通的GET POST请求使用JSON传递数据,在字段增多的情况下性能下降严重。 所以在条件许可的情况下请使用HTTP POST PROTOBUF的接口,样例可以参见:python/sample/client_http_protobuf.py
提供的页面(假设server监听端口为8080):
- 主页:http://localhost:8080/
- 配置参数: http://localhost:8080/options
- 状态:http://localhost:8080/status
- 服务列表:http://localhost:8080/services
主页示例: