一、memcached_set
在设置好memcached_st之后,就调用memcached_set(memcached_st *ptr, const char *key, size_t key_length, const char *value, size_t value_length, time_t expiration, uint32_t flags),它内部只是调用memcached_send(ptr, key, key_length, key, key_length, value, value_length, expiration, flags, 0, SET_OP)。SET_OP表示这是set操作。
二、memcached_send
1、initialize_query(memcached_st *self, bool increment_query_id),此处increment_query_id为true,主要完成self->query_id++。
2、memcached_validate_key_length(key_length, memcached_is_binary(ptr))检查key的长度。
3、memcached_key_test(*ptr, (const char **)&key, &key_length, 1)检查key。
4、uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length),此处group_key就是key。该函数选择一个服务器id,应该是一致性哈希的核心函数,暂不分析。
5、memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key)获得服务器。
6、根据协议类型,决定调用memcached_send_binary(...),还是memcached_send_ascii(ptr, instance, key, key_length, value, value_length, expiration, flags, cas, flush, reply, verb)。此外verb为SET_OP。
三、memcached_send_ascii
将参数放入libmemcached_io_vector_st vector[ ],然后调用memcached_vdo(instance, vector, 12, flush)。如果成功,memcached_server_response_increment(instance)即(instance)->cursor_active++;并且reply和flush为true,则还要执行memcached_response(instance, buffer, sizeof(buffer), NULL)。
四、memcached_vdo
1、memcached_connect(memcached_server_write_instance_st server)
memcached_return_t memcached_connect(memcached_server_write_instance_st server)
{
if (server->fd != INVALID_SOCKET) //这说明已经连接了
{
return MEMCACHED_SUCCESS;
}
bool in_timeout= false;
memcached_return_t rc;
if (memcached_failed(rc= backoff_handling(server, in_timeout))) //防止由于出错而一直处于某个状态持续等待。其中也有可能调用run_distribution
{
set_last_disconnected_host(server);
return rc;
}
if (LIBMEMCACHED_WITH_SASL_SUPPORT and server->root-> and memcached_is_udp(server->root))
{
return memcached_set_error(*server, MEMCACHED_INVALID_HOST_PROTOCOL, MEMCACHED_AT, memcached_literal_param("SASL is not supported for UDP connections"));
}
/* We need to clean up the multi startup piece */
switch (server->type)
{
case MEMCACHED_CONNECTION_UDP:
case MEMCACHED_CONNECTION_TCP:
rc= network_connect(server);
if (LIBMEMCACHED_WITH_SASL_SUPPORT)
{
if (server->fd != INVALID_SOCKET and server->root->)
{
rc= memcached_sasl_authenticate_connection(server);
if (memcached_failed(rc) and server->fd != INVALID_SOCKET)
{
WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
(void)closesocket(server->fd);
server->fd= INVALID_SOCKET;
}
}
}
break;
case MEMCACHED_CONNECTION_UNIX_SOCKET:
rc= unix_socket_connect(server);
break;
}
if (memcached_success(rc))
{
memcached_mark_server_as_clean(server); //设置server->server_failure_counter= 0;server->next_retry= 0;
return rc;
}
set_last_disconnected_host(server);
if (memcached_has_current_error(*server)) //这里的出错处理,以后再看了
{
memcached_mark_server_for_timeout(server);
assert(memcached_failed(memcached_server_error_return(server)));
}
else
{
memcached_set_error(*server, rc, MEMCACHED_AT);
memcached_mark_server_for_timeout(server);
}
LIBMEMCACHED_MEMCACHED_CONNECT_END();
if (in_timeout)
{
return memcached_set_error(*server, MEMCACHED_SERVER_TEMPORARILY_DISABLED, MEMCACHED_AT);
}
return rc;
}
其中,对于MEMCACHED_CONNECTION_TCP,调用network_connect(server):
static memcached_return_t network_connect(memcached_server_st *server)
{
bool timeout_error_occured= false;
WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
WATCHPOINT_ASSERT(server->cursor_active == 0);
/*
We want to check both of these because if address_info_next has been fully tried, we want to do a new lookup to make sure we have picked up on any new DNS information.
*/
if (server->address_info == NULL or server->address_info_next == NULL)
{
WATCHPOINT_ASSERT(server->state == MEMCACHED_SERVER_STATE_NEW);
server->address_info_next= NULL;
memcached_return_t rc;
uint32_t counter= 5;
while (--counter)
{
if ((rc= set_hostinfo(server)) != MEMCACHED_TIMEOUT)
{
break;
}
}
if (memcached_failed(rc))
{
return rc;
}
}
if (server->address_info_next == NULL)
{
server->address_info_next= server->address_info;
server->state= MEMCACHED_SERVER_STATE_ADDRINFO;
}
/* Create the socket */
while (server->address_info_next and server->fd == INVALID_SOCKET)
{
/* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
if (memcached_is_udp(server->root) and server->address_info_next->ai_family != AF_INET)
{
server->address_info_next= server->address_info_next->ai_next;
continue;
}
if ((server->fd= socket(server->address_info_next->ai_family,
server->address_info_next->ai_socktype,
server->address_info_next->ai_protocol)) < 0)
{
return memcached_set_errno(*server, get_socket_errno(), NULL);
}
set_socket_options(server); //设置O_NONBLOCK标志(总是使用非阻塞IO以防写死锁);同时根据server->root->flags,进行相应标志设置。
/* connect to server */
if ((connect(server->fd, server->address_info_next->ai_addr, server->address_info_next->ai_addrlen) != SOCKET_ERROR))
{
server->state= MEMCACHED_SERVER_STATE_CONNECTED;
return MEMCACHED_SUCCESS;
}
/* An error occurred */
switch (get_socket_errno())
{
case ETIMEDOUT:
timeout_error_occured= true;
break;
case EWOULDBLOCK:
case EINPROGRESS: // nonblocking mode - first return
case EALREADY: // nonblocking mode - subsequent returns
{
server->state= MEMCACHED_SERVER_STATE_IN_PROGRESS;
memcached_return_t rc= connect_poll(server); //其中调用poll()来处理memcached的回应
if (memcached_success(rc))
{
server->state= MEMCACHED_SERVER_STATE_CONNECTED;
return MEMCACHED_SUCCESS;
}
// A timeout here is treated as an error, we will not retry
if (rc == MEMCACHED_TIMEOUT)
{
timeout_error_occured= true;
}
}
break;
case EISCONN: // we are connected :-)
WATCHPOINT_ASSERT(0); // This is a programmer's error
break;
case EINTR: // Special case, we retry ai_addr
WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
(void)closesocket(server->fd);
server->fd= INVALID_SOCKET;
continue;
default:
break;
}
WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
(void)closesocket(server->fd);
server->fd= INVALID_SOCKET;
server->address_info_next= server->address_info_next->ai_next;
}
WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
if (timeout_error_occured)
{
if (server->fd != INVALID_SOCKET)
{
(void)closesocket(server->fd);
server->fd= INVALID_SOCKET;
}
}
WATCHPOINT_STRING("Never got a good file descriptor");
if (memcached_has_current_error(*server))
{
return memcached_server_error_return(server);
}
if (timeout_error_occured and server->state < MEMCACHED_SERVER_STATE_IN_PROGRESS)
{
return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT);
}
return memcached_set_error(*server, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT); /* The last error should be from connect() */
}
由于此处为非阻塞模式,所以connect()立即返回,并get_socket_errno()为EINPROGRESS,通过connect_poll()使用poll()完成最终的连接。最后server->state =
MEMCACHED_SERVER_STATE_CONNECTED。
2、ssize_t memcached_io_writev(memcached_server_write_instance_st ptr, libmemcached_io_vector_st vector[], const size_t number_of, const bool with_flush)
其中vector为参数数组。
ssize_t memcached_io_writev(memcached_server_write_instance_st ptr,
libmemcached_io_vector_st vector[],
const size_t number_of, const bool with_flush)
{
ssize_t total= 0;
for (size_t x= 0; x < number_of; x++, vector++)
{
ssize_t returnable;
if (vector->length)
{
if ((returnable= _io_write(ptr, vector->buffer, vector->length, false)) == -1) //将数据写入ptr->write_buffer
{
return -1;
}
total+= returnable;
}
}
if (with_flush)
{
if (memcached_io_write(ptr) == false) //内部调用_io_write(),进一步调用io_flush(),内部调用系统全局函数::send();若::send()失败,则可能调用io_wait()(内部调用poll)
{ //io_flush()和io_wait()(写等待时)中,都会调用memcached_purge(),它的作用?清理输入缓冲区中的内容,和以前的版本保持兼容。
return -1;
}
}
return total;
}
五、memcached_response
定义于libmemcached/,它会调用同名函数memcached_response(memcached_server_write_instance_st ptr, char *buffer, size_t buffer_length, memcached_result_st *result, uint64_t& numeric_value)。进一步会调用_read_one_response(ptr, buffer, buffer_length, result, numeric_value)。该函数先memcached_server_response_decrement(ptr)即(ptr)->cursor_active--,可以看出cursor_acive表示未处理的response的个数;
之后对于memcached_result_st * result == NULL,设置result为(memcached_st *)ptr->root中的result;最后调用textual_read_one_response()或者binary_read_one_response()(二进制协议)。
static memcached_return_t textual_read_one_response(memcached_server_write_instance_st ptr,
char *buffer, const size_t buffer_length,
memcached_result_st *result,
uint64_t& numeric_value)
{
numeric_value= UINT64_MAX;
size_t total_read;
memcached_return_t rc= memcached_io_readline(ptr, buffer, buffer_length, total_read);
//内部调用memcached_io_read,进一步调用::recv()。先一次性读入buffer,再分析出一行。
if (memcached_failed(rc))
{
return rc;
}
assert(total_read);
switch(buffer[0])
{
case 'V':
{
// VALUE
if (buffer[1] == 'A' and buffer[2] == 'L' and buffer[3] == 'U' and buffer[4] == 'E') /* VALUE */
{
/* We add back in one because we will need to search for END */
memcached_server_response_increment(ptr);
return textual_value_fetch(ptr, buffer, result);
}
// VERSION
else if (buffer[1] == 'E' and buffer[2] == 'R' and buffer[3] == 'S' and buffer[4] == 'I' and buffer[5] == 'O' and buffer[6] == 'N') /* VERSION */
{
return MEMCACHED_SUCCESS;
}
}
break;
case 'S':
{
// STAT
if (buffer[1] == 'T' and buffer[2] == 'A' and buffer[3] == 'T') /* STORED STATS */
{
memcached_server_response_increment(ptr);
return MEMCACHED_STAT;
}
// SERVER_ERROR
else if (buffer[1] == 'E' and buffer[2] == 'R' and buffer[3] == 'V' and buffer[4] == 'E' and buffer[5] == 'R'
and buffer[6] == '_'
and buffer[7] == 'E' and buffer[8] == 'R' and buffer[9] == 'R' and buffer[10] == 'O' and buffer[11] == 'R' )
{
if (total_read == memcached_literal_param_size("SERVER_ERROR"))
{
return MEMCACHED_SERVER_ERROR;
}
if (total_read > memcached_literal_param_size("SERVER_ERROR object too large for cache") and
(memcmp(buffer, memcached_literal_param("SERVER_ERROR object too large for cache")) == 0))
{
return MEMCACHED_E2BIG;
}
// Move past the basic error message and whitespace
char *startptr= buffer + memcached_literal_param_size("SERVER_ERROR");
if (startptr[0] == ' ')
{
startptr++;
}
char *endptr= startptr;
while (*endptr != '\r' && *endptr != '\n') endptr++;
return memcached_set_error(*ptr, MEMCACHED_SERVER_ERROR, MEMCACHED_AT, startptr, size_t(endptr - startptr));
}
// STORED
else if (buffer[1] == 'T' and buffer[2] == 'O' and buffer[3] == 'R') // and buffer[4] == 'E' and buffer[5] == 'D')
{
return MEMCACHED_STORED;
}
}
break;
... //多个case
default:
break;
}
buffer[total_read]= 0;
return memcached_set_error(*ptr, MEMCACHED_UNKNOWN_READ_FAILURE, MEMCACHED_AT,
buffer, total_read);
}
最终,返回MEMCACHED_STORED,至此memcached_set()完成。