异步传输官方示例只给了普通Unary元对象的传输,没有流式传输示例,经过摸索调试,实现了grpc的异步流式传输(目前只是单向流,服务端推流至客户端,或者客户端上送流至服务端)。
文件与上一篇同步传图一样,自然生成的 也是一样的。
2.服务端程序
#include <algorithm>
#include <chrono>
#include <cmath>
#include <iostream>
#include <fstream>
#include <memory>
#include <string>
#include <map>
#include <exception>
#include <grpcpp/>
#include ""
#include ""
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using grpc::ServerCompletionQueue;
using grpc::ServerAsyncResponseWriter;
using ImgTransmit::ImgInfo;
using ImgTransmit::ImgInfo_Img;
using ImgTransmit::ImgInfo_ImgType;
typedef ImgTransmit::Status My_Status;
using ImgTransmit::ImgDemo;
class ServerImpl final {
public:
enum CallStatus { CREATE, PROCESS, FINISH };
~ServerImpl() {
server_->Shutdown();
// Always shutdown the completion queue after the server.
cq_->Shutdown();
}
// There is no shutdown handling in this code.
void Run() {
std::string server_address("0.0.0.0:50051");
ServerBuilder builder;
// Listen on the given address without any authentication mechanism.
(server_address, grpc::InsecureServerCredentials());
// Register "service_" as the instance through which we'll communicate with
// clients. In this case it corresponds to an *asynchronous* service.
(&service_);
// Get hold of the completion queue used for the asynchronous communication
// with the gRPC runtime.
cq_ = ();
// Finally assemble the server.
server_ = ();
std::cout << "Server listening on " << server_address << std::endl;
// Proceed to the server's main loop.
HandleRpcs();
}
private:
class CallData {
public:
CallData(ImgDemo::AsyncService* service, ServerCompletionQueue* cq)
: service_(service), cq_(cq),status_(CREATE) {
Proceed();
}
virtual void Proceed() {
return;
}
CallStatus status_; // The current serving state.
protected:
// The means of communication with the gRPC runtime for an asynchronous
// server.
ImgDemo::AsyncService* service_;
// The producer-consumer queue where for asynchronous server notifications.
ServerCompletionQueue* cq_;
// Context for the rpc, allowing to tweak aspects of it such as the use
// of compression, authentication, as well as to send metadata back to the
// client.
ServerContext ctx_;
};
class HandleImgUpload :public CallData {
public:
HandleImgUpload(ImgDemo::AsyncService* service, ServerCompletionQueue* cq,int len=5)
: CallData(service,cq), imgload_reader(&ctx_), times_(0), max_stream_length(len){
Proceed();
}
void Proceed() override {
ImgUpload_proceed();
}
private:
void ImgUpload_proceed() {
if (status_ == CREATE) {
status_ = PROCESS;
service_->RequestImgUpload(&ctx_, &imgload_reader, cq_, cq_, this);
}
else if (status_ == PROCESS) {
if (times_ == 0)
new HandleImgUpload(service_, cq_, max_stream_length);
if(++times_> max_stream_length){//次数超过max_stream_length,终止读取流(代表达到一次请求的最大上传量)
status_ = FINISH;
imgload_response.set_code(1);
imgload_reader.Finish(imgload_response, Status::OK, this);
auto maps = rev_info.maps();
auto it = ();
auto img = it->second;
std::cout<<"get image form client: "<< rev_info.name() << ", img size :" << () << "," << () << "," << () << ", data length:" << ().size() << std::endl;
std::cout << "HandleImgUpload end... " << std::endl;
}
else{
if(times_>1){
auto maps = rev_info.maps();
auto it = ();
auto img = it->second;
std::cout <<"get image form client: "<< rev_info.name()<<", img size :" << () << "," << () << "," << () << ", data length:" << ().size() << std::endl;
/****************do something else*****************************/
//这里rev_info不能用临时变量,小坑。
}
imgload_reader.Read(&rev_info, this);// 继续读取下一帧
}
}
else {
GPR_ASSERT(status_ == FINISH);
delete this;
}
}
private:
int times_ = 0;
int max_stream_length;
ImgTransmit::ImgInfo rev_info;
::grpc::ServerAsyncReader<My_Status, ::ImgTransmit::ImgInfo> imgload_reader;
My_Status imgload_response;
};
class HandleResImgFetched :public CallData {
public:
HandleResImgFetched(ImgDemo::AsyncService* service, ServerCompletionQueue* cq,int len=10)
: CallData(service,cq), resImgfetch_writer(&ctx_), times_(0),max_stream_length(len) {
Proceed();
}
void Proceed() override{
resImgFetched_proceed();
}
private:
void resImgFetched_proceed() {
if (status_ == CREATE) {
status_ = PROCESS;
service_->RequestresImgFetched(&ctx_, &resImgfetch_name, &resImgfetch_writer, cq_, cq_, this);
}
else if (status_ == PROCESS) {
if (times_ == 0)
new HandleResImgFetched(service_, cq_, max_stream_length);
if(times_++>= max_stream_length){
status_ = FINISH;
resImgfetch_writer.Finish(Status::OK, this);//次数超过max_stream_length,终止写入流(表示客户端本次请求的资源已经全部发出)
}
else {
//if(times_ != 1 && times_%10 == 1)
//{
//如果此轮不调用Write,则以this为tag向完成队列尾部写一个延时唤醒任务,及时让出队头位置,使其他客户端请求流得到处理
//(&cq, std::chrono::system_clock::now()+std::chrono::milliseconds(500), (void*)this);//500 ms后唤醒(唤醒不一定马上能得到处理,需要等队列前面的任务让出执行权限)
//times_--;
//return;
//}
ImgTransmit::ImgInfo info;
std::cout << "Handle resImgFetched Call, query image name is: " << resImgfetch_name.name()[0] << std::endl;
ImgInfo_Img img_detail;
ImgInfo_ImgType type = ImgInfo_ImgType::ImgInfo_ImgType_JPG;
img_detail.set_height(224);
img_detail.set_width(224);
img_detail.set_channel(3);
img_detail.set_type(type);
img_detail.set_data("binary img data here",20);
info.set_name("query suceed");
google::protobuf::Map<google::protobuf::int32, ImgInfo_Img>* maps = info.mutable_maps();
google::protobuf::MapPair<google::protobuf::int32, ImgInfo_Img> item(0, img_detail);
maps->insert(item);
/****************do something else*****************************/
resImgfetch_writer.Write(info, this);
}
}
else {
GPR_ASSERT(status_ == FINISH);
delete this;
}
}
private:
ImgTransmit::BaseName resImgfetch_name;
::grpc::ServerAsyncWriter< ::ImgTransmit::ImgInfo> resImgfetch_writer;
grpc::Alarm alarm;
int times_;
int max_stream_length;
};
class HandleDescFetched :public CallData {
public:
HandleDescFetched(ImgDemo::AsyncService* service, ServerCompletionQueue* cq) :CallData(service, cq), descFetch_responder(&ctx_){
Proceed();
}
void Proceed() override {
descFetched_proceed();
}
private:
void descFetched_proceed() {
if (status_ == CREATE) {
status_ = PROCESS;
service_->RequestresDescFetched(&ctx_,&resDescfetch_name,&descFetch_responder,cq_,cq_,this);
}
else if (status_ == PROCESS) {
// 创建一个新的对象,用以处理下一个客户端(连接)的调用事件
new HandleDescFetched(service_, cq_);
status_ = FINISH;
/*
do something with resDescfetch_name
*/
std::cout << "Handle DescFetched Call, query image name is: " << resDescfetch_name.name()[0] << std::endl;
::ImgTransmit::Description desc;
desc.add_desc("request accepted.");
descFetch_responder.Finish(desc, Status::OK, this);
}
else {
GPR_ASSERT(status_ == FINISH);
delete this;
}
}
private:
ImgTransmit::BaseName resDescfetch_name;
ServerAsyncResponseWriter<ImgTransmit::Description> descFetch_responder;
};
// This can be run in multiple threads if needed.
void HandleRpcs() {
new HandleResImgFetched(&service_, cq_.get(),20);
new HandleImgUpload(&service_, cq_.get(),3);
new HandleDescFetched(&service_, cq_.get());
void* tag; // uniquely identifies a request.
bool ok=false;
while (true) {
// Block waiting to read the next event from the completion queue.
GPR_ASSERT(cq_->Next(&tag, &ok));
CallData* handle = static_cast<CallData*>(tag);
if (!ok) {
printf("Got a canceled events, Maybe connection is closed unusually.\n");
handle->status_ = FINISH;
//delete handle;
//continue;
}
handle->Proceed();
}
}
std::unique_ptr<ServerCompletionQueue> cq_;
ImgDemo::AsyncService service_;
std::unique_ptr<Server> server_;
};
int main(int argc, char** argv) {
ServerImpl server;
();
return 0;
}
3.客户端程序
#include <iostream>
#include <chrono>
#include <fstream>
#include <memory>
#include <string>
#include <grpc/support/>
#include <grpcpp/>
#include <grpcpp/>
#ifdef BAZEL_BUILD
#include "examples/protos/"
#else
#include ""
#endif
using grpc::Channel;
using grpc::ClientAsyncResponseReader;
using grpc::ClientContext;
using grpc::CompletionQueue;
using grpc::Status;
using grpc::StatusCode;
using ImgTransmit::ImgDemo;
using ImgTransmit::ImgInfo;
using ImgTransmit::ImgInfo_Img;
using ImgTransmit::ImgInfo_ImgType;
typedef ImgTransmit::Status My_Status;
class ImageClientAsyncImpl {
public:
explicit ImageClientAsyncImpl(std::shared_ptr<Channel> channel)
: stub_(ImgDemo::NewStub(channel))/*同一个stub_可以同时创建多个请求流*/ {}
void imgUpload(const std::vector<std::string>& file_list) {
if (file_list.size()<1)
return;
AsyncImgUploadCall* call = new AsyncImgUploadCall(file_list.size(),cq_);
call->img_upload_list = file_list;
//创建一个写入流
call->writer = stub_->PrepareAsyncImgUpload(&call->context, &call->reply, &cq_);
call->writer->StartCall((void*)call);//开启写入流,一旦可写,cq_事件管理器就会检测到
}
void resImgFetched(const std::string& imgname,int count=3) {
ImgTransmit::BaseName request;
request.add_name(imgname);
AsyncResImageFetchedCall* call = new AsyncResImageFetchedCall(count);
//创建一个读取流
call->response_reader = stub_->PrepareAsyncresImgFetched(&call->context, request, &cq_);
call->response_reader->StartCall((void*)call);//开启读取流,一旦可读,cq_事件管理器就会检测到
}
void resDescFetched(const std::string& imgname) {
// Data we are sending to the server.
ImgTransmit::BaseName request;
request.add_name(imgname);
AsyncResDescFetchedCall* call = new AsyncResDescFetchedCall;
//创建一个读取流
call->response_reader =stub_->PrepareAsyncresDescFetched(&call->context, request, &cq_);
//这个接口是单个元对象传输,与流式传输有所不同。
call->response_reader->StartCall();
call->response_reader->Finish(&call->reply,&call->status, (void*)call);
}
// Loop while listening for completed responses.
void AsyncCompleteRpc() {
void* got_tag;
bool ok = false;
// Block until the next result is available in the completion queue "cq".
while (cq_.Next(&got_tag, &ok)) {
//GPR_ASSERT(ok);//Failed Maybe, Please Check Status of Network Status and Server Process
AsyncRpcCall* tmpcall = static_cast<AsyncRpcCall*>(got_tag);
if (tmpcall !=nullptr){
if (!ok) {
tmpcall->status = Status::OK;//事件异常断开,强行终止。
}
tmpcall->Proceed();
}
}
}
private:
class AsyncRpcCall {
public:
AsyncRpcCall() = default;
virtual void* GetResult() = 0;
virtual void Proceed() = 0;
virtual ~AsyncRpcCall() {}
public:
ClientContext context;
Status status= Status(StatusCode::PERMISSION_DENIED,"finished code.");
};
//普通异步接口(Unary元数据传输)
class AsyncResDescFetchedCall:public AsyncRpcCall {
public:
AsyncResDescFetchedCall(){}
void* GetResult() override {
return (void*)&reply;
}
void Proceed() override {
std::cout << "resDescFetched call, reply info: " << ()[0] << std::endl;
/**********do something else.*************/
//one-shot call for non-stream API
delete this;
return;
}
public:
ImgTransmit::Description reply;
std::unique_ptr<ClientAsyncResponseReader<ImgTransmit::Description> > response_reader;
};
// 异步流式接口(单向,客户端接收服务端的流对象)
class AsyncResImageFetchedCall:public AsyncRpcCall {
public:
AsyncResImageFetchedCall(int len):times_(0), max_stream_length(len){
}
void* GetResult() override {
return (void*)(&info);
}
void Proceed() override {
if (()) {
std::cout << "rpc stream fetched ended..."<<std::endl;
delete this;
return;
}
times_++;
if (times_ > max_stream_length) {
status = Status::OK;
response_reader->Finish(&status, (void*)this);//向完成队列中加入终止读流事件
std::cout << "resImageFetched call, reply name: " << () << std::endl;
std::cout << "ResImageFetchedCall end... " << std::endl;
}
else {
if (times_ > 1) {
std::cout << "resImageFetched call, reply name: " << () << std::endl;
}
response_reader->Read(&info, (void*)this);//继续读取下一帧
}
}
public:
ImgTransmit::ImgInfo info;
std::unique_ptr< ::grpc::ClientAsyncReader< ImgTransmit::ImgInfo>> response_reader;
int max_stream_length;
int times_;
};
// 异步流式接口(单向,客户端推送流对象至服务端)
class AsyncImgUploadCall :public AsyncRpcCall {
public:
AsyncImgUploadCall(int len,CompletionQueue& q_): times_(0), max_stream_length(len),q(q_){}
void* GetResult() override {
return (void*)&reply;
}
void Proceed() override {
if (()) {
std::cout << "rpc stream upload ended, with final reply code: " << () << std::endl;
delete this;
}
else
{
std::cout << "rpc stream upload call. times is: "<<times_ << std::endl;
if (++times_> max_stream_length) {
status = Status::OK;
writer->Finish(&status, (void*)this);//向完成队列中加入终止写流事件
}
else {
if (FillingupImginfo()){
writer->Write(info, (void*)this);//继续写入下一帧
}
else
{
std::cout << "fill information failed." << std::endl;
status = Status::OK;
writer->Finish(&(status), (void*)this);//直接写入终止读流事件
}
}
}
}
private:
bool FillingupImginfo() {
int id = times_ - 1;
const std::string& _path = img_upload_list[id];
std::ifstream imgreader(_path, std::ifstream::in | std::ios::binary);
long file_length = 0;
char* buffer = nullptr;
if(imgreader.is_open()){
(0, ); //将文件流指针定位到文件末尾
file_length = ();
(0, ); //将文件流指针重新定位到文件开头
buffer = (char*)malloc(sizeof(char) * file_length);
(buffer, file_length);
();
}
ImgInfo_Img img_detail;
if(buffer!=nullptr){
ImgInfo_ImgType type = ImgInfo_ImgType::ImgInfo_ImgType_JPG;
img_detail.set_height(224);// you can set real parameters here.
img_detail.set_width(224);
img_detail.set_channel(3);
img_detail.set_type(type);
img_detail.set_data(buffer, file_length);
free(buffer);
}
();
info.set_name(_path);
google::protobuf::Map<google::protobuf::int32, ImgInfo_Img>* maps = info.mutable_maps();
google::protobuf::MapPair<google::protobuf::int32, ImgInfo_Img> item(id, img_detail);
maps->insert(item);
return true;
}
public:
My_Status reply;
ImgTransmit::ImgInfo info;
std::unique_ptr< grpc::ClientAsyncWriter< ::ImgTransmit::ImgInfo>> writer;
std::vector<std::string> img_upload_list;
private:
int times_;
int max_stream_length;
CompletionQueue& q;
};
std::unique_ptr<ImgDemo::Stub> stub_;
CompletionQueue cq_;
};
int main(int argc, char** argv) {
ImageClientAsyncImpl imgclient(grpc::CreateChannel(
"localhost:50051", grpc::InsecureChannelCredentials()));
// Spawn reader thread that loops indefinitely
std::thread thread_ = std::thread(&ImageClientAsyncImpl::AsyncCompleteRpc, &imgclient);
std::string queryname("imgs/");
(queryname);
(queryname);
//指定要上传的图片
std::vector<std::string> lis /*= {"imgs/","imgs/"}*/;
(lis);
std::cout << "Press control-c to quit" << std::endl << std::endl;
thread_.join(); // blocks forever
return 0;
}
4.做一下笔记,后面有空再慢慢说吧。
流式传输与元对象传输的区别,就是流式传输需要传多个元对象才能算作一次完整的数据传输。这多个元对象可以是同步传输,也可以是异步传输。同步就是用一个循环,挨个按顺序将这些元对象发出去或者接收到,期间每个元对象的接收或发送也是阻塞的。而异步传输这些元对象时,利用了一个称之为CompletionQueue的组件,将一个元对象的发送或接收视作一次网络IO的写或读task,task需要添加到CompletionQueue中才能被处理。CompletionQueue 不允许task的tag重复,即不能将多个具有相同tag的task添加到队列中,只有当前一个小task执行完了,才允许重用这个tag添加新的task。也就是说,对于同一个tag而言(同一个tag也意味着同一次客户端请求),数据流中元数据是一个一个串行的传输的。
在实际使用CompletionQueue做异步流传送时,在服务端有个现象比较奇怪,先入队的客户端请求如果不断有task任务被添加,将会一直优先得到处理,期间虽然其他客户端请求已经向CompletionQueue中添加了task,但是任务总得不到执行,只能等到前面的客户端请求处理结束后才行。(ps猜测,可能是因为共享了tag所致, 比如最先到达的客户端请求向CompletionQueue中添加了一个带tag的task,当该task完成后立即又重用该tag向CompletionQueue添加新task,意味着只是更新队列中这个tag任务内容,而不是将任务追加到队列尾部,所以队头的任务会一直优先处理)。为了解决这个问题,需要借助grpc::Alarm,向CompletionQueue尾部添加超时任务,及时让出队头位置,具体请参考:Lessons learnt from writing asynchronous streaming gRPC services in C++ - G Research。