近期对json-rpc比較感兴趣,思想非常easy,并且看到了非常多不同语言的实现。在github上 hmngomes 的 json-rpc-c (实现的是server端,基于TCP流),短小精悍,提供了非常好的框架。代码十分清晰。易于扩展,并且代码easy看懂,非常经典。该实现依赖于其他两个库 libev 和 cJSON。值得认真学习。
測试的时候先启动server,而后通过 nc 命令发送对应的json格式数据,就会有对应的效果:
vonzhou@de15:~$ echo "{\"method\":\"sayHello\"}" | nc localhost 1234
{
"result": "Hello!"
}
vonzhou@de15:~$ echo "{\"method\":\"exit\"}" | nc localhost 1234
{
"result": "Bye!"
}
{
"result": "Hello!"
}
vonzhou@de15:~$ echo "{\"method\":\"exit\"}" | nc localhost 1234
{
"result": "Bye!"
}
以下贴出代码,便于温习。
-------------------server.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include <signal.h>
#include "jsonrpc-c.h"
#define PORT 1234 //
the port users will be connecting to
the port users will be connecting to
struct jrpc_server my_server;
cJSON * say_hello( jrpc_context *
ctx, cJSON * params, cJSON *id) {
ctx, cJSON * params, cJSON *id) {
return cJSON_CreateString( "Hello!" );
}
cJSON * exit_server( jrpc_context *
ctx, cJSON * params, cJSON *id) {
ctx, cJSON * params, cJSON *id) {
jrpc_server_stop(&my_server);
return cJSON_CreateString( "Bye!" );
}
int main( void )
{
{
jrpc_server_init(&my_server, PORT);
//注冊俩方法
jrpc_register_procedure(&my_server, say_hello, "sayHello" ,
NULL );
NULL );
jrpc_register_procedure(&my_server, exit_server, "exit" ,
NULL );
NULL );
jrpc_server_run(&my_server);
jrpc_server_destroy(&my_server);
return 0;
}
-----------------json-rpc.h
#ifndef JSONRPCC_H_
#define JSONRPCC_H_
#include "cJSON.h"
#include <ev.h>
/*
*
* http://www.jsonrpc.org/specification
*
* code message meaning
* -32700 Parse error Invalid JSON was received by the server.
* An error occurred on the server while parsing the JSON text.
* -32600 Invalid Request The JSON sent is not a valid Request object.
* -32601 Method not found The method does not exist / is not available.
* -32602 Invalid paramsInvalid method parameter(s).
* -32603 Internal errorInternal JSON-RPC error.
* -32000 to -32099Server error Reserved for implementation-defined server-errors.
*/
#define JRPC_PARSE_ERROR -32700
#define JRPC_INVALID_REQUEST -32600
#define JRPC_METHOD_NOT_FOUND -32601
#define JRPC_INVALID_PARAMS -32603
#define JRPC_INTERNAL_ERROR -32693
typedef struct {
void * data;
int error_code;
char * error_message ;
} jrpc_context;
//JSON方法类型
typedef cJSON*
(* jrpc_function )(jrpc_context *context, cJSON *params, cJSON*
id);
(* jrpc_function )(jrpc_context *context, cJSON *params, cJSON*
id);
struct jrpc_procedure {
char * name;
//方法名
//方法名
jrpc_function function;
//方法地址
//方法地址
void * data;
//额外信息
//额外信息
};
struct jrpc_server {
int port_number;
struct ev_loop
* loop; //eventloop类型
* loop; //eventloop类型
ev_io listen_watcher ;
//
//
int procedure_count;
struct jrpc_procedure
* procedures ;
* procedures ;
int debug_level;
};
struct jrpc_connection {
struct ev_io io;
int fd;
int pos; //记录在buffer中的位置
unsigned int buffer_size ;
char * buffer;
int debug_level;
};
int jrpc_server_init( struct jrpc_server
*server, int port_number);
*server, int port_number);
int jrpc_server_init_with_ev_loop( struct jrpc_server
*server,
*server,
int port_number, struct ev_loop
*loop);
*loop);
static int __jrpc_server_start (struct jrpc_server
*server);
*server);
void jrpc_server_run( struct jrpc_server
*server);
*server);
int jrpc_server_stop( struct jrpc_server
*server);
*server);
void jrpc_server_destroy( struct jrpc_server
*server);
*server);
static void jrpc_procedure_destroy (struct jrpc_procedure
*procedure);
*procedure);
int jrpc_register_procedure( struct jrpc_server
*server,
*server,
jrpc_function function_pointer, char *name, void *data);
int jrpc_deregister_procedure( struct jrpc_server
*server, char *name);
*server, char *name);
#endif
-----------------json-rpc.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <arpa/inet.h>
#include "jsonrpc-c.h"
struct ev_loop *loop;
// get sockaddr, IPv4 or IPv6:
static void *get_in_addr( struct sockaddr
*sa) {
*sa) {
if (sa-> sa_family == AF_INET)
{
{
return &(((struct sockaddr_in*)
sa)->sin_addr );
sa)->sin_addr );
}
return &((( struct sockaddr_in6*)
sa)->sin6_addr);
sa)->sin6_addr);
}
static int send_response( struct jrpc_connection
* conn, char *response) {
* conn, char *response) {
int fd
= conn-> fd;
= conn-> fd;
if (conn-> debug_level >
1)
1)
printf ("JSON
Response:\n%s\n" , response);
Response:\n%s\n" , response);
write(fd,
response, strlen (response));
response, strlen (response));
write(fd, "\n" ,
1);
1);
return 0;
}
static int send_error( struct jrpc_connection
* conn, int code, char *
message,
* conn, int code, char *
message,
cJSON * id) {
int return_value
= 0;
= 0;
cJSON *result_root = cJSON_CreateObject();
cJSON *error_root = cJSON_CreateObject();
cJSON_AddNumberToObject(error_root, "code" ,
code);
code);
cJSON_AddStringToObject(error_root, "message" ,
message);
message);
cJSON_AddItemToObject(result_root, "error" ,
error_root);
error_root);
cJSON_AddItemToObject(result_root, "id" ,
id);
id);
char *
str_result = cJSON_Print(result_root);
str_result = cJSON_Print(result_root);
return_value = send_response(conn, str_result);
free(str_result);
cJSON_Delete(result_root);
free(message);
return return_value;
}
static int send_result( struct jrpc_connection
* conn, cJSON * result,
* conn, cJSON * result,
cJSON * id) {
int return_value
= 0;
= 0;
cJSON *result_root = cJSON_CreateObject();
if (result)
cJSON_AddItemToObject(result_root, "result" ,
result);
result);
cJSON_AddItemToObject(result_root, "id" ,
id);
id);
char *
str_result = cJSON_Print(result_root);
str_result = cJSON_Print(result_root);
return_value = send_response(conn, str_result);
free(str_result);
cJSON_Delete(result_root);
return return_value;
}
static int invoke_procedure( struct jrpc_server
*server,
*server,
struct jrpc_connection
* conn, char *name, cJSON *params, cJSON *id)
{
* conn, char *name, cJSON *params, cJSON *id)
{
cJSON *returned = NULL;
int procedure_found
= 0;
= 0;
jrpc_context ctx;
ctx. error_code =
0;
0;
ctx. error_message =
NULL;
NULL;
int i
= server-> procedure_count ;
= server-> procedure_count ;
while (i--)
{
{
if (!strcmp (server-> procedures[i]. name,
name)) {
name)) {
procedure_found = 1;
ctx. data =
server-> procedures [i].data ;
server-> procedures [i].data ;
returned = server->procedures [i].function (&ctx,
params, id);
params, id);
break ;
}
}
if (!procedure_found)
return send_error(conn, JRPC_METHOD_NOT_FOUND,
strdup ("Method
not found." ), id);
not found." ), id);
else {
if (ctx.error_code )
return send_error(conn,
ctx.error_code , ctx.error_message ,
id);
ctx.error_code , ctx.error_message ,
id);
else
return send_result(conn,
returned, id);
returned, id);
}
}
static int eval_request (struct jrpc_server
*server,
*server,
struct jrpc_connection
* conn, cJSON *root) {
* conn, cJSON *root) {
cJSON *method, *params, *id;
method = cJSON_GetObjectItem(root, "method" );
if (method
!= NULL && method-> type == cJSON_String ) {
!= NULL && method-> type == cJSON_String ) {
params = cJSON_GetObjectItem(root, "params ");
if (params
== NULL|| params->type == cJSON_Array
== NULL|| params->type == cJSON_Array
|| params-> type == cJSON_Object )
{
{
id = cJSON_GetObjectItem(root, "id" );
if (id
== NULL|| id->type == cJSON_String
== NULL|| id->type == cJSON_String
|| id-> type == cJSON_Number )
{
{
//We have to copy ID because using
it on the reply and deleting the response Object will also delete ID
it on the reply and deleting the response Object will also delete ID
cJSON * id_copy = NULL;
if (id
!= NULL)
!= NULL)
id_copy =
(id-> type == cJSON_String)
? cJSON_CreateString(
? cJSON_CreateString(
id-> valuestring) :
cJSON_CreateNumber(id-> valueint);
if (server->debug_level )
printf ("Method
Invoked: %s\n" , method->valuestring);
Invoked: %s\n" , method->valuestring);
return invoke_procedure(server,
conn, method->valuestring,
conn, method->valuestring,
params, id_copy);
}
}
}
send_error(conn, JRPC_INVALID_REQUEST,
strdup ("The
JSON sent is not a valid Request object."), NULL);
JSON sent is not a valid Request object."), NULL);
return -1;
}
static void close_connection( struct ev_loop
*loop, ev_io *w) {
*loop, ev_io *w) {
ev_io_stop(loop, w);
close((( struct jrpc_connection
*) w)->fd );
*) w)->fd );
free((( struct jrpc_connection
*) w)->buffer );
*) w)->buffer );
free((( struct jrpc_connection
*) w));
*) w));
}
static void connection_cb( struct ev_loop
*loop, ev_io *w, int revents)
{
*loop, ev_io *w, int revents)
{
struct jrpc_connection
*conn;
*conn;
struct jrpc_server
*server = ( struct jrpc_server *) w->data;
*server = ( struct jrpc_server *) w->data;
size_t bytes_read
= 0;
= 0;
//get our 'subclassed'
event watcher
event watcher
conn = ( struct jrpc_connection
*) w;
*) w;
int fd
= conn-> fd;
= conn-> fd;
// 为这个session 分配的buffer满了,须要又一次分配空间,成倍增长
if (conn-> pos ==
(conn-> buffer_size - 1)) {
(conn-> buffer_size - 1)) {
char *
new_buffer = realloc(conn-> buffer,
conn->buffer_size *= 2);
new_buffer = realloc(conn-> buffer,
conn->buffer_size *= 2);
if (new_buffer
== NULL) {
== NULL) {
perror ("Memory
error" );
error" );
return close_connection(loop,
w);
w);
}
conn-> buffer =
new_buffer;
new_buffer;
//把后来增长的空间置空
memset (conn->buffer +
conn-> pos, 0, conn->buffer_size -
conn->pos );
conn-> pos, 0, conn->buffer_size -
conn->pos );
}
// can not fill the entire buffer, string must
be NULL terminated
be NULL terminated
int max_read_size
= conn-> buffer_size - conn->pos -
1;
= conn-> buffer_size - conn->pos -
1;
//从套接字中读取数据。-1 说明异常终止
if ((bytes_read
= read (fd, conn->buffer +
conn-> pos, max_read_size))
= read (fd, conn->buffer +
conn-> pos, max_read_size))
== -1) {
perror ("read" );
return close_connection(loop,
w);
w);
}
//为0说明client关闭连接
if (!bytes_read)
{
{
// client closed the sending half of the
connection
connection
if (server->debug_level )
printf ("Client
closed connection.\n" );
closed connection.\n" );
return close_connection(loop,
w);
w);
} else {
cJSON *root;
char *end_ptr
= NULL;
= NULL;
conn-> pos +=
bytes_read;
bytes_read;
if ((root
= cJSON_Parse_Stream(conn->buffer , &end_ptr)) != NULL)
{
= cJSON_Parse_Stream(conn->buffer , &end_ptr)) != NULL)
{
if (server->debug_level >
1) {
1) {
char *
str_result = cJSON_Print(root);
str_result = cJSON_Print(root);
printf ("Valid
JSON Received:\n%s\n" , str_result);
JSON Received:\n%s\n" , str_result);
free (str_result);
}
if (root->type == cJSON_Object)
{
{
eval_request(server, conn, root);
}
//shift processed request, discarding
it
it
memmove (conn->buffer ,
end_ptr, strlen (end_ptr) + 2);
end_ptr, strlen (end_ptr) + 2);
conn-> pos = strlen (end_ptr);
memset (conn->buffer +
conn-> pos, 0,
conn-> pos, 0,
conn-> buffer_size -
conn->pos - 1);
conn->pos - 1);
cJSON_Delete(root);
} else {
// did we parse the all buffer?
If
so, just wait for more.
// else there was an error before the
buffer's end
buffer's end
if (end_ptr
!= (conn->buffer + conn-> pos))
{
!= (conn->buffer + conn-> pos))
{
if (server->debug_level )
{
{
printf ("INVALID
JSON Received:\n---\n%s\n---\n",
JSON Received:\n---\n%s\n---\n",
conn-> buffer);
}
send_error(conn, JRPC_PARSE_ERROR,
strdup (
"Parse error.
Invalid JSON was received by the server."),
Invalid JSON was received by the server."),
NULL);
return close_connection(loop,
w);
w);
}
}
}
}
static void accept_cb( struct ev_loop
*loop, ev_io *w, int revents)
{
*loop, ev_io *w, int revents)
{
char s[ INET6_ADDRSTRLEN];
struct jrpc_connection
*connection_watcher;
*connection_watcher;
connection_watcher = malloc (sizeof ( struct jrpc_connection));
//通用socket addr
struct sockaddr_storage
their_addr; // connector's address information
their_addr; // connector's address information
socklen_t sin_size;
sin_size = sizeof their_addr;
connection_watcher-> fd =
accept(w-> fd, ( struct sockaddr
*) &their_addr,
accept(w-> fd, ( struct sockaddr
*) &their_addr,
&sin_size);
if (connection_watcher-> fd ==
-1) {
-1) {
perror ("accept" );
free (connection_watcher);
} else {
if (((struct jrpc_server
*) w->data)-> debug_level)
{
*) w->data)-> debug_level)
{
inet_ntop(their_addr. ss_family ,
get_in_addr(( struct sockaddr
*) &their_addr), s, sizeof s);
*) &their_addr), s, sizeof s);
printf ("server:
got connection from %s\n" , s);
got connection from %s\n" , s);
}
ev_io_init(&connection_watcher-> io,
connection_cb,
connection_cb,
connection_watcher-> fd, EV_READ);
//copy pointer to struct jrpc_server
connection_watcher-> io. data =
w->data ;
w->data ;
connection_watcher-> buffer_size =
1500;
1500;
connection_watcher-> buffer = malloc (1500);
memset (connection_watcher->buffer ,
0, 1500);
0, 1500);
connection_watcher-> pos =
0;
0;
//copy debug_level, struct jrpc_connection
has no pointer to struct jrpc_server
has no pointer to struct jrpc_server
connection_watcher-> debug_level =
(( struct jrpc_server
*) w->data)-> debug_level;
*) w->data)-> debug_level;
ev_io_start(loop, &connection_watcher-> io);
}
}
int jrpc_server_init( struct jrpc_server
*server, int port_number) {
*server, int port_number) {
loop = EV_DEFAULT;
return jrpc_server_init_with_ev_loop(server,
port_number, loop);
port_number, loop);
}
int jrpc_server_init_with_ev_loop( struct jrpc_server
*server,
*server,
int port_number, struct ev_loop
*loop) {
*loop) {
memset(server,
0, sizeof (struct jrpc_server));
0, sizeof (struct jrpc_server));
server-> loop =
loop;
loop;
server-> port_number =
port_number;
port_number;
char *
debug_level_env = getenv ("JRPC_DEBUG" );
debug_level_env = getenv ("JRPC_DEBUG" );
if (debug_level_env
== NULL)
== NULL)
server-> debug_level =
0;
0;
else {
server-> debug_level = strtol (debug_level_env,
NULL, 10);
NULL, 10);
printf ("JSONRPC-C
Debug level %d\n" , server->debug_level );
Debug level %d\n" , server->debug_level );
}
return __jrpc_server_start(server);
}
static int __jrpc_server_start (struct jrpc_server
*server) {
*server) {
int sockfd;
struct addrinfo
hints, *servinfo, *p;
hints, *servinfo, *p;
struct sockaddr_in
sockaddr;
sockaddr;
int len;
int yes
= 1;
= 1;
int rv;
char PORT[6];
sprintf(PORT, "%d" ,
server->port_number );
server->port_number );
memset(&hints,
0, sizeof hints);
0, sizeof hints);
hints. ai_family = AF_UNSPEC;
hints. ai_socktype = SOCK_STREAM;
hints. ai_flags = AI_PASSIVE; //
use my IP
use my IP
if ((rv
= getaddrinfo(NULL, PORT, &hints, &servinfo)) != 0) {
= getaddrinfo(NULL, PORT, &hints, &servinfo)) != 0) {
fprintf (stderr, " getaddrinfo:
%s\n" , gai_strerror(rv));
%s\n" , gai_strerror(rv));
return 1;
}
// loop through all the results and bind to the first we can
for (p
= servinfo; p != NULL; p = p-> ai_next) {
= servinfo; p != NULL; p = p-> ai_next) {
if ((sockfd
= socket(p->ai_family , p->ai_socktype ,
p->ai_protocol ))
= socket(p->ai_family , p->ai_socktype ,
p->ai_protocol ))
== -1) {
perror ("server:
socket" );
socket" );
continue ;
}
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR,
&yes, sizeof (int ))
&yes, sizeof (int ))
== -1) {
perror (" setsockopt" );
exit (1);
}
if (bind(sockfd,
p->ai_addr , p->ai_addrlen )
== -1) {
p->ai_addr , p->ai_addrlen )
== -1) {
close (sockfd);
perror ("server:
bind" );
bind" );
continue ;
}
len = sizeof (sockaddr);
if (getsockname(sockfd,
( struct sockaddr *) &sockaddr, &len)
== -1) {
( struct sockaddr *) &sockaddr, &len)
== -1) {
close (sockfd);
perror ("server: getsockname" );
continue ;
}
server-> port_number =
ntohs( sockaddr.sin_port );
ntohs( sockaddr.sin_port );
break ;
}
if (p
== NULL) {
== NULL) {
fprintf (stderr, "server:
failed to bind\n" );
failed to bind\n" );
return 2;
}
freeaddrinfo(servinfo); // all done with this
structure
structure
if (listen(sockfd,
5) == -1) {
5) == -1) {
perror ("listen" );
exit (1);
}
if (server-> debug_level )
printf ("server:
waiting for connections...\n" );
waiting for connections...\n" );
ev_io_init(&server-> listen_watcher ,
accept_cb, sockfd, EV_READ);
accept_cb, sockfd, EV_READ);
server-> listen_watcher .data =
server;
server;
ev_io_start(server-> loop,
&server-> listen_watcher );
&server-> listen_watcher );
return 0;
}
// Make the code work with both the old (ev_loop/ev_unloop)
// and new (ev_run/ev_break) versions of libev.
#ifdef EVUNLOOP_ALL
#define EV_RUN
ev_loop
ev_loop
#define EV_BREAK
ev_unloop
ev_unloop
#define EVBREAK_ALL
EVUNLOOP_ALL
EVUNLOOP_ALL
#else
#define EV_RUN
ev_run
ev_run
#define EV_BREAK
ev_break
ev_break
#endif
void jrpc_server_run( struct jrpc_server
*server){
*server){
EV_RUN(server-> loop,
0);
0);
}
int jrpc_server_stop( struct jrpc_server
*server) {
*server) {
EV_BREAK(server-> loop, EVBREAK_ALL);
return 0;
}
void jrpc_server_destroy( struct jrpc_server
*server){
*server){
/* Don't destroy server */
int i;
for (i
= 0; i < server-> procedure_count ; i++){
= 0; i < server-> procedure_count ; i++){
jrpc_procedure_destroy( &(server-> procedures [i])
);
);
}
free(server-> procedures );
}
static void jrpc_procedure_destroy (struct jrpc_procedure
*procedure){
*procedure){
if (procedure-> name){
free (procedure->name );
procedure-> name =
NULL;
NULL;
}
if (procedure-> data){
free (procedure->data );
procedure-> data =
NULL;
NULL;
}
}
int jrpc_register_procedure( struct jrpc_server
*server,
*server,
jrpc_function function_pointer, char *name, void *
data) {
data) {
int i
= server-> procedure_count ++;
= server-> procedure_count ++;
if (!server-> procedures )
server-> procedures = malloc ( sizeof( struct jrpc_procedure));
else {
//每次添加一个jrpc_procedure空间
struct jrpc_procedure
* ptr = realloc (server->procedures ,
* ptr = realloc (server->procedures ,
sizeof (struct jrpc_procedure)
* server->procedure_count );
* server->procedure_count );
if (!ptr)
return -1;
server-> procedures =
ptr;
ptr;
}
//name是局部变量,所以要 strdup
if ((server-> procedures [i].name = strdup(name))
== NULL)
== NULL)
return -1;
server-> procedures [i].function =
function_pointer;
function_pointer;
server-> procedures [i].data =
data;
data;
return 0;
}
int jrpc_deregister_procedure( struct jrpc_server
*server, char *name) {
*server, char *name) {
/* Search the procedure to deregister */
int i;
int found
= 0;
= 0;
if (server-> procedures ){
for (i
= 0; i < server->procedure_count ; i++){
= 0; i < server->procedure_count ; i++){
if (found)
server-> procedures [i-1]
= server->procedures [i];
= server->procedures [i];
else if (! strcmp(name,
server->procedures [i].name )){
server->procedures [i].name )){
found = 1;
jrpc_procedure_destroy( &(server->procedures [i])
);
);
}
}
if (found){
server-> procedure_count --;
if (server->procedure_count ){
struct jrpc_procedure
* ptr = realloc (server->procedures ,
* ptr = realloc (server->procedures ,
sizeof (struct jrpc_procedure)
* server->procedure_count );
* server->procedure_count );
if (!ptr){
perror (" realloc" );
return -1;
}
server-> procedures =
ptr;
ptr;
} else {
server-> procedures =
NULL;
NULL;
}
}
} else {
fprintf (stderr, "server
: procedure '%s' not found\n", name);
: procedure '%s' not found\n", name);
return -1;
}
return 0;
}
----------当中关于cJSON代码部分。见 C语言解析JSON部分。
參考: