本教程提供了一个基本的Node.js程序员介绍如何使用gRPC。
通过阅读这个例子,你将学习如何:
- 在.proto文件中定义一个服务。
- 使用Node.js gRPC API为您的服务编写一个简单的客户端和服务器。
它假设您已经阅读了概述,并熟悉协议缓冲区。请注意,本教程中的示例使用协议缓冲区语言的proto3版本,该协议缓冲区语言目前处于alpha版本:您可以在proto3语言指南中找到更多信息,并参阅协议缓冲区Github存储库中新版本的 发行说明。
为什么使用gRPC?
我们的例子是一个简单的路由映射应用程序,它可以让客户端获取有关路由功能的信息,创建路由摘要,并与服务器和其他客户端交换路由信息(如流量更新)。
使用gRPC,我们可以在.proto文件中定义我们的服务,并以gRPC所支持的任何语言来实现客户端和服务器,而这些语言又可以在从Google内部的服务器到您自己的平板电脑的环境中运行 - 所有不同的通信语言和环境由gRPC为您处理。我们还获得了使用协议缓冲区的所有优点,包括高效的序列化,简单的IDL以及简单的接口更新。
示例代码和设置
本教程的示例代码位于 grpc / grpc / examples / node / dynamic_codegen / route_guide中。正如你将看到的,如果你看看版本库,在grpc / grpc / examples / node / static_codegen / route_guide中也有一个类似的例子 。我们有两个版本的路由指南示例,因为有两种方法可以生成在Node.js中使用协议缓冲区所需的代码 - 一种方法用于Protobuf.js
在运行时动态生成代码,另一种方法使用通过协议缓冲区静态生成的代码编译器protoc
。这些示例的行为完全相同,并且任一服务器都可以与任一客户端一起使用。如目录名称所示,我们将在本文档中使用动态生成代码的版本,但也可以*查看静态代码示例。
要下载该示例,请grpc
通过运行以下命令来克隆存储库:
$ git clone -b v1.8.x https://github.com/grpc/grpc
$ cd grpc
然后将您的当前目录更改为examples/node
:
$ cd examples/node
您还应该安装相关工具来生成服务器和客户端界面代码 - 如果您尚未安装,请按照Node.js快速入门指南中的设置说明进行操作 。
定义服务
我们的第一步(从概述中可以知道)是使用协议缓冲区定义gRPC 服务以及方法请求和响应类型 。你可以看到完整的.proto文件 。examples/protos/route_guide.proto
要定义一个服务,你需要service
在.proto文件中指定一个名字:
service RouteGuide {
...
}
然后,rpc
在服务定义中定义方法,指定它们的请求和响应类型。gRPC允许您定义四种服务方法,所有这些方法都在RouteGuide
服务中使用:
- 一个简单的RPC,其中客户端使用存根发送请求到服务器,并等待响应返回,就像正常的函数调用一样。
// Obtains the feature at a given position. rpc GetFeature(Point) returns (Feature) {}
- 甲服务器端流RPC,其中客户端发送请求到服务器,并获得一个流中读取消息的序列后面。客户端从返回的流中读取,直到没有更多的消息。正如您在我们的示例中所看到的那样,您可以通过将
stream
关键字放置在响应类型之前来指定服务器端的流式方法。
// Obtains the Features available within the given Rectangle. Results are // streamed rather than returned at once (e.g. in a response message with a // repeated field), as the rectangle may cover a large area and contain a // huge number of features. rpc ListFeatures(Rectangle) returns (stream Feature) {}
- 甲客户端流传输的RPC,其中客户端将消息写入的序列,并且将它们发送到服务器,再次使用提供的流。客户端完成写入消息后,等待服务器将其全部读取并返回其响应。您可以通过
stream
在请求类型之前放置关键字来指定客户端流式传输方法。
// Accepts a stream of Points on a route being traversed, returning a // RouteSummary when traversal is completed. rpc RecordRoute(stream Point) returns (RouteSummary) {}
- 一个双向流动的RPC双方都派出使用读写流的消息序列。这两个流独立运行,因此客户端和服务器可以按照自己喜欢的顺序读取和写入:例如,服务器可以在写入响应之前等待接收所有客户端消息,或者可以交替地读取消息然后写入消息,或读写的其他组合。每个流中消息的顺序被保留。通过
stream
在请求和响应之前放置关键字来指定这种类型的方法。
// Accepts a stream of RouteNotes sent while a route is being traversed, // while receiving other RouteNotes (e.g. from other users). rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
我们的.proto文件还包含协议缓冲区消息类型定义,用于我们的服务方法中使用的所有请求和响应类型 - 例如,以下是Point
消息类型:
// Points are represented as latitude-longitude pairs in the E7 representation // (degrees multiplied by 10**7 and rounded to the nearest integer). // Latitudes should be in the range +/- 90 degrees and longitude should be in // the range +/- 180 degrees (inclusive). message Point {
int32 latitude = 1;
int32 longitude = 2;
}
从原型文件加载服务描述符
Node.js库从.proto
运行时加载的文件动态生成服务描述符和客户端存根定义。
要加载一个.proto
文件,只需要require
使用gRPC库,然后使用它的 load()
方法:
var PROTO_PATH = __dirname + '/../../../protos/route_guide.proto';
var grpc = require('grpc');
var protoDescriptor = grpc.load(PROTO_PATH);
// The protoDescriptor object has the full package hierarchy
var routeguide = protoDescriptor.routeguide;
一旦你完成了这个,存根构造函数就在routeguide
namespace(protoDescriptor.routeguide.RouteGuide
)中,而服务描述符(用于创建服务器)是stub(protoDescriptor.routeguide.RouteGuide.service
)的一个属性。
创建服务器
首先让我们看看我们如何创建一个RouteGuide
服务器。如果你只是想创建gRPC客户端,你可以跳过这一节,直接去创建客户端(尽管你可能会发现它很有趣!)。
有两个部分使我们的RouteGuide
服务能够完成工作:
- 实现从我们的服务定义生成的服务接口:做我们的服务的实际“工作”。
- 运行gRPC服务器来侦听来自客户端的请求并返回服务响应。
您可以RouteGuide
在examples / node / dynamic_codegen / route_guide / route_guide_server.js中找到我们的示例服务器 。让我们仔细看看它是如何工作的。
实施RouteGuide
正如你所看到的,我们的服务器有一个Server
从RouteGuide.service
描述符对象生成的构造 函数
var Server = new grpc.Server();
在这种情况下,我们正在实现异步版本RouteGuide
,它提供了我们的默认gRPC服务器行为。
这些功能route_guide_server.js
实现我们所有的服务方法。我们先来看最简单的类型getFeature
,它只是Point
从客户端获取一个,并从其数据库中返回相应的特征信息Feature
。
function checkFeature(point) {
var feature;
// Check if there is already a feature object for the given point
for (var i = 0; i < feature_list.length; i++) {
feature = feature_list[i];
if (feature.location.latitude === point.latitude &&
feature.location.longitude === point.longitude) {
return feature;
}
}
var name = '';
feature = {
name: name,
location: point
};
return feature;
}
function getFeature(call, callback) {
callback(null, checkFeature(call.request));
}
该方法传递了RPC的调用对象,该对象具有Point
作为属性的参数以及可以传递返回的回调Feature
。在方法体中,我们填充一个Feature
对应的给定点,并将其传递给回调函数,用空的第一个参数表示没有错误。
现在让我们看一些更复杂的东西 - 流RPC。 listFeatures
是一个服务器端流RPC,所以我们需要发回多个 Feature
s到我们的客户端。
function listFeatures(call) {
var lo = call.request.lo;
var hi = call.request.hi;
var left = _.min([lo.longitude, hi.longitude]);
var right = _.max([lo.longitude, hi.longitude]);
var top = _.max([lo.latitude, hi.latitude]);
var bottom = _.min([lo.latitude, hi.latitude]);
// For each feature, check if it is in the given bounding box
_.each(feature_list, function(feature) {
if (feature.name === '') {
return;
}
if (feature.location.longitude >= left &&
feature.location.longitude <= right &&
feature.location.latitude >= bottom &&
feature.location.latitude <= top) {
call.write(feature);
}
});
call.end();
}
正如你所看到的,而不是在我们的方法参数中获得调用对象和回调,这次我们得到一个call
实现Writable
接口的对象。在该方法中,我们创建了许多Feature
对象,我们需要返回,将其写入到call
使用它的write()
方法。最后,我们打电话 call.end()
来表明我们已经发送了所有消息。
如果你看看客户端的流媒体方法,RecordRoute
你会发现它与一元调用非常相似,除了这次call
参数实现Reader
接口。该call
的'data'
事件触发,每次有新的数据,以及'end'
事件触发时,所有的数据已被读取。就像一元的情况一样,我们通过调用回调来回应
call.on('data', function(point) {
// Process user data
});
call.on('end', function() {
callback(null, result);
});
最后,我们来看看我们的双向流式RPC RouteChat()
。
function routeChat(call) {
call.on('data', function(note) {
var key = pointKey(note.location);
/* For each note sent, respond with all previous notes that correspond to * the same point */
if (route_notes.hasOwnProperty(key)) {
_.each(route_notes[key], function(note) {
call.write(note);
});
} else {
route_notes[key] = [];
}
// Then add the new note to the list
route_notes[key].push(JSON.parse(JSON.stringify(note)));
});
call.on('end', function() {
call.end();
});
}
这一次,我们得到了call
实现Duplex
,可以用来读取和 写入消息。这里的读写语法与我们的客户端流式传输和服务器流式传输方法完全相同。尽管每一方都会按照写入的顺序获取其他消息,但客户端和服务器都可以按任意顺序读取和写入 - 这些流完全独立运行。
启动服务器
一旦我们实现了所有的方法,我们还需要启动一个gRPC服务器,以便客户端可以真正使用我们的服务。以下片段显示了我们如何为我们的RouteGuide
服务做到这一点:
function getServer() {
var server = new grpc.Server();
server.addProtoService(routeguide.RouteGuide.service, {
getFeature: getFeature,
listFeatures: listFeatures,
recordRoute: recordRoute,
routeChat: routeChat
});
return server;
}
var routeServer = getServer();
routeServer.bind('0.0.0.0:50051', grpc.ServerCredentials.createInsecure());
routeServer.start();
如您所见,我们使用以下步骤构建并启动服务器:
-
Server
从RouteGuide
服务描述符创建一个构造函数。 - 实施服务方法。
- 通过调用
Server
具有方法实现的构造函数来创建服务器的一个实例。 - 指定我们要使用实例的
bind()
方法侦听客户端请求的地址和端口。 - 调用
start()
实例来启动RPC服务器。
创建客户端
在本节中,我们将看看为我们的RouteGuide
服务创建一个Node.js客户端。您可以在examples / node / dynamic_codegen / route_guide / route_guide_client.js中看到完整的示例客户端代码 。
创建一个存根
要调用服务方法,我们首先需要创建一个存根。为此,我们只需要调用RouteGuide存根构造函数,指定服务器地址和端口。
new example.RouteGuide('localhost:50051', grpc.credentials.createInsecure());
调用服务方法
现在让我们看看我们如何调用我们的服务方法。请注意,所有这些方法都是异步的:它们使用事件或回调来检索结果。
简单的RPC
调用简单的RPC GetFeature
几乎和调用本地异步方法一样简单。
var point = {latitude: 409146138, longitude: -746188906};
stub.getFeature(point, function(err, feature) {
if (err) {
// process error
} else {
// process feature
}
});
正如你所看到的,我们创建并填充一个请求对象。最后,我们调用存根上的方法,将请求和回调传递给它。如果没有错误,那么我们可以从响应对象中读取来自服务器的响应信息。
console.log('Found feature called "' + feature.name + '" at ' +
feature.location.latitude/COORD_FACTOR + ', ' +
feature.location.longitude/COORD_FACTOR);
流式RPC
现在我们来看看我们的流式传输方法。如果您已经阅读过创建服务器,其中的一些可能看起来非常熟悉 - 流式RPC在两端都以类似的方式实现。这就是我们所说的服务器端流式传输方法ListFeatures
,它返回一个地理流 Feature
:
var call = client.listFeatures(rectangle);
call.on('data', function(feature) {
console.log('Found feature called "' + feature.name + '" at ' +
feature.location.latitude/COORD_FACTOR + ', ' +
feature.location.longitude/COORD_FACTOR);
});
call.on('end', function() {
// The server has finished sending
});
call.on('status', function(status) {
// process status
});
我们传递一个请求并Readable
返回一个流对象,而不是传递一个请求和回调方法。该客户端可以使用Readable
的 'data'
情况下读取服务器的响应。此事件与每个Feature
消息对象一起触发, 直到不再有消息:'end'
事件指示该呼叫已完成。最后,状态事件在服务器发送状态时触发。
客户端的流式传输方法RecordRoute
是类似的,除了我们通过回调方法并取回一个Writable
。
var call = client.recordRoute(function(error, stats) {
if (error) {
callback(error);
}
console.log('Finished trip with', stats.point_count, 'points');
console.log('Passed', stats.feature_count, 'features');
console.log('Travelled', stats.distance, 'meters');
console.log('It took', stats.elapsed_time, 'seconds');
});
function pointSender(lat, lng) {
return function(callback) {
console.log('Visiting point ' + lat/COORD_FACTOR + ', ' +
lng/COORD_FACTOR);
call.write({
latitude: lat,
longitude: lng
});
_.delay(callback, _.random(500, 1500));
};
}
var point_senders = [];
for (var i = 0; i < num_points; i++) {
var rand_point = feature_list[_.random(0, feature_list.length - 1)];
point_senders[i] = pointSender(rand_point.location.latitude,
rand_point.location.longitude);
}
async.series(point_senders, function() {
call.end();
});
一旦我们完成了将客户端的请求写入流write()
,我们需要调用end()
流来让gRPC知道我们已经写完了。如果状态是OK
,该stats
对象将填充服务器的响应。
最后,我们来看看我们的双向流式RPC routeChat()
。在这种情况下,我们只是将一个上下文传递给方法并获取一个Duplex
流对象,我们可以使用它来写和读消息。
var call = client.routeChat();
这里的读写语法与我们的客户端流式传输和服务器流式传输方法完全相同。尽管每一方都会按照写入的顺序获取其他消息,但客户端和服务器都可以按任意顺序读取和写入 - 这些数据流完全独立运行。
试试看!
建立客户端和服务器:
$ npm install
运行将在端口50051上侦听的服务器:
$ node ./dynamic_codegen/route_guide/route_guide_server.js --db_path=./dynamic_codegen/route_guide/route_guide_db.json
运行客户端(在不同的终端中):
$ node ./dynamic_codegen/route_guide/route_guide_client.js --db_path=./dynamic_codegen/route_guide/route_guide_db.json