终于在调试并不断地修改源码过程发现了此问题奔溃的原因了。发生此问题的原因是三个线程对网络数据的缓冲区进行了操作,但是没有保证线程对其的互斥访问,从而导致不可预见的问题的出现。原来的处理过程:首先管道写线程为网络数据分配堆空间,把数据拷贝到上面去,并且将此堆的指针付给管道数据的格式的缓冲区的指针中。二、读管道线程把数据取出并加入发送队列中。三、发送线程把数据从队列取出并发送到网络中,其中发送线程会对其中的数据进行。由此看出一个线程对其操作与另外两个的线程对其操作并不互斥,因此得到的修改处理流程如下:首先在管道写线程增加一个用于管道读写线程之间的全局缓冲区,并为管道读写线程添加同步操作(用于对全局缓冲区的先后操作)。而在管道读线程为网络数据分配空间,改变管道格式数据的缓冲区的指针,并将其加入队列。发送线程从队列取出元素后就只有一个线程对其中的数据进行操作了。最后,把对会议的配置文件的操作改为了对内存的操作,并每隔10分钟与ctl+c后把配置参数写入文件,代码则不罗列了。 代码如下:
管道写线程:
sem_t sem_tx; // 管道数据发送等待信号量.
uint8_t pipe_buf[TRANSMIT_DATA_BUFFER_SIZE] = {0};// 全局管道数据缓冲区, 与读管道的的线程使用.
void init_sem_tx_can( void ) // 初始化管道传输信号量->控制pipe_buf
{
sem_init( &sem_tx, 0, 0 );
}
int system_raw_queue_tx( void *frame, uint16_t frame_len, uint8_t data_type, const uint8_t dest_mac[6], bool isresp )
{
assert( frame);
int ret = -1;
if( (data_type == TRANSMIT_TYPE_ADP) || (data_type == TRANSMIT_TYPE_ACMP) || (data_type == TRANSMIT_TYPE_AECP) )
{
tx_data tx;
uint8_t *tran_buf = pipe_buf;
if( frame_len > TRANSMIT_DATA_BUFFER_SIZE )
{
DEBUG_INFO( "frame_len bigger than pipe transmit buffer!" );
return -1;
}
memset( &tx.udp_sin, 0, sizeof(struct sockaddr_in) );
memset( tran_buf, 0, sizeof(pipe_buf) );
memcpy( tx.raw_dest.value, dest_mac, sizeof(struct jdksavdecc_eui48) );
memcpy( tran_buf, (uint8_t*)frame, frame_len );
tx.frame = tran_buf;
tx.data_type = data_type;
tx.frame_len = frame_len;
tx.notification_flag = RUNINFLIGHT;
tx.resp = isresp;
if( (ret = write_pipe_tx(&tx, sizeof(tx_data))) == -1 )
{
DEBUG_INFO( "ERR transmit data to PIPE" );
assert(-1 != ret);
}
sem_wait( &sem_tx );
}
else
{
DEBUG_INFO( "ERR transmit data type" );
}
return ret;
}
管道读线程:
int thread_pipe_fn( void *pgm )
{
struct fds *kfds = ( struct fds* )pgm;
struct fds thr_fds;
sdpwqueue* send_wq = &net_send_queue;
assert( send_wq );
memcpy( &thr_fds, kfds, sizeof(struct fds));
while(1)
{
if( check_pipe_read_ready( thr_fds.tx_pipe[PIPE_RD]) )
{
tx_data tnt;
int result = read_pipe_tx( &tnt, sizeof(tx_data) );
//DEBUG_INFO( "frame len = %d ", tnt.frame_len );
if( result > 0 )
{
// 加入网络数据发送队列
uint8_t* frame_buf = NULL;
uint16_t frame_len = tnt.frame_len;
pthread_mutex_lock( &send_wq->control.mutex );
// heap using later free by sending thread.frame_buf space must to be free!
frame_buf = allot_heap_space( TRANSMIT_DATA_BUFFER_SIZE, &frame_buf );
if( NULL == frame_buf )
{
DEBUG_INFO( "system_raw_queue_tx Err: allot space for frame failed!" );
pthread_mutex_unlock( &send_wq->control.mutex ); // unlock mutex
pthread_cond_signal( &send_wq->control.cond );
sem_post( &sem_tx ); // 用于tnt.frame 指针的同步操作
continue;
}
if( frame_len > TRANSMIT_DATA_BUFFER_SIZE )
{
pthread_mutex_unlock( &send_wq->control.mutex ); // unlock mutex
pthread_cond_signal( &send_wq->control.cond );
sem_post( &sem_tx ); // 用于tnt.frame 指针的同步操作
continue;
}
memset( frame_buf, 0, TRANSMIT_DATA_BUFFER_SIZE );
memcpy( frame_buf, tnt.frame, tnt.frame_len );
tnt.frame = frame_buf;// change the tnt frame buf to heap space
send_work_queue_message_save( &tnt, send_wq );
int queue_len = get_queue_length( &send_wq->work );
DEBUG_INFO( "save queue len = %d ", queue_len );
pthread_mutex_unlock( &send_wq->control.mutex ); // unlock mutex
pthread_cond_signal( &send_wq->control.cond );
sem_post( &sem_tx );
}
else
{
assert( tnt.frame && (result >= 0) );
}
}
else
{
DEBUG_INFO( "read pipe is not ready!" );
continue;
}
}
return 0;
}
发送线程:
int thread_send_func( void *pgm )
{
sdpwqueue* p_send_wq = &net_send_queue;
assert( p_send_wq );
while( 1 )
{
pthread_mutex_lock( &p_send_wq->control.mutex ); // lock mutex
while( p_send_wq->work.front == NULL && p_send_wq->control.active )
{
DEBUG_INFO( "active = %d", p_send_wq->control.active );
pthread_cond_wait( &p_send_wq->control.cond, &p_send_wq->control.mutex );
}
// 获取队列数据
p_sdpqueue_wnode p_send_wnode = NULL;
bool is_resp_data = false;
p_send_wnode = send_queue_message_get( p_send_wq );
if( NULL == p_send_wnode )
{
DEBUG_INFO( "No send queue message: ERROR!" );
pthread_mutex_unlock( &p_send_wq->control.mutex );
continue;
}
int queue_len = get_queue_length( &p_send_wq->work );
DEBUG_INFO( "after get queue len = %d ", queue_len );
pthread_mutex_unlock( &p_send_wq->control.mutex ); // unlock mutex
// ready to sending data
is_resp_data = p_send_wnode->job_data.resp;
tx_packet_event( p_send_wnode->job_data.data_type,
p_send_wnode->job_data.notification_flag,
p_send_wnode->job_data.frame,
p_send_wnode->job_data.frame_len,
&net_fd,// network fds
command_send_guard,
p_send_wnode->job_data.raw_dest.value,
&p_send_wnode->job_data.udp_sin,
is_resp_data ); // 数据向外发送函数
release_heap_space( &p_send_wnode->job_data.frame ); // free heap space mallo by write pipe thread
assert( p_send_wnode->job_data.frame == NULL ); // free successfully and result is NULL?
if( NULL != p_send_wnode )
{
free( p_send_wnode );
p_send_wnode = NULL;
}
/*发送下一条数据的条件-数据获得响应或数据超时或时间间隔到了(注:时间间隔只适用于系统响应数据或摄像头控制数据的发送)*/
if( is_wait_messsage_primed_state() )
{
int status = -1;
struct timespec timeout;
int ret = 0;
if ( clock_gettime( CLOCK_REALTIME, &timeout ) == -1)
{
perror("clock_gettime:");
return -1;
}
timeout.tv_sec += 4;// timeouts is 4 seconds
if( !is_resp_data )
{
status = set_wait_message_active_state();
assert( status == 0 );
//sem_wait( &sem_waiting );
ret = sem_timedwait( &sem_waiting, &timeout );
if( ret == -1 )
{
if( errno == ETIMEDOUT )
{
DEBUG_INFO( "sem_timedwait(): time out!send pthread proccess continue" );
sem_post( &sem_waiting );
}
else
{
perror( "sem_timedwait():" );
}
}
status = set_wait_message_idle_state();
assert( status == 0 );
}
else
{
status = set_wait_message_active_state();
assert( status == 0 );
uart_resp_send_interval_timer_start(); // start timer
//sem_wait( &sem_waiting );
ret = sem_timedwait( &sem_waiting, &timeout );
if( ret == -1 )
{
if( errno == ETIMEDOUT )
{
DEBUG_INFO( "sem_timedwait(): time out! send pthread proccess continue" );
sem_post( &sem_waiting );
}
else
{
perror( "sem_timedwait():" );
}
}
resp_send_interval_timer_stop();
status = set_wait_message_idle_state();
assert( status == 0 );
}
}
else
{
DEBUG_INFO(" not message primed success!" );
}
}
return 0;
经过上面四处的修改,现在系统终于可以正常工作了,现在的我终于可以松一口气了,因为终于解决了这个问题。虽然此方式的动态分配内存消耗了大量资源,但是这也是最基本的队列操作了。
总结:线程间对共享变量的使用必须使用同步机制或保证其在线程间的互斥访问。 补充(2016/2/2):在上面的修改完成后,在虚拟机跑没有出现问题,但是移植到arm开发板这个问题的现象又出现了,说明上述的解决方法没有根本地解决问题。经过了几天的代码跟踪,终于找到了出现这个问题的根本原因,那就是操作系统的发送队列时出现了致命错误。因为判断队列为空时只需判断队列头为空,而不判断队尾不为空。因此取完队列的元素时,队尾并不为空,而此时在操作时并不将队尾置为空,导致下次存队列元素队列操作了不可用的内存地址(因为此时对头的元素的空间被释放了,指向这个空间的指针也就是一个垂悬指针),因而导致了上述的内存错误。解决的方法是获取队列的元素后,若队列为空,则同时置队列的尾指针为空,这样也就正常地插入队列的元素不会发生错误的引用不可用的内存地址了。上面用到的工具有:Valgrind。