编写本文的主要目的是熟悉boost thread和asio库;主要实现下面内容;
1)实现jpeg图片传输;客户端将jpeg图片传输到服务端,服务端接收,打印接收到的jpeg数据长度
2)客户端使用boost thread开一个线程不断的读jpeg图片,然后进行传输,传输使用asio库;
3)服务端使用asio库,进行jpeg图片接收,接收到图片后,打印一行日志,接收到jpg图片的size是多少?
程序中只使用stl和boost库,不使用任何其它不利于移植的库;
实现要点:
客户端:
1)首先使用boost::filesystem,对指定路径下的jpg图片进行枚举;然后将枚举出来的图片放入STL Vector 容器;
2)文中使用了boost thread库,创建线程,为了演示多线程之间的同步,使用了2个线程,一个线程专门用读一帧jpeg图片,另外一个线程用来发送jpeg数据;
3)jpeg发送线程中调用asio库进行发送;将asio的sock单独封一个简单的类(CClientSock);
4)两个线程使用boost的互斥锁进行同步;
//多线程同步
typedef boost::try_mutex MUTEX;
typedef MUTEX::scoped_try_lock LOCK;
MUTEX iomutex;
5)文中演示了如何使用boost bind库,boost时间库
boost::this_thread::sleep(boost::posix_time::milliseconds(5));
6)文中很好的使用了STL的vector和List容器;
客户端代码如下:
1)sock 封装类
ClientSock.h
#ifndef __CLIENT_SOCK__ #define __CLIENT_SOCK__ #include <boost/asio.hpp> #include <boost/bind.hpp> using namespace boost; using boost::asio::io_service; using boost::asio::ip::tcp; using namespace boost::asio; #include <string> #include <iostream> using namespace std; class CClientSock { public: CClientSock(tcp::endpoint& endpoint); virtual ~CClientSock(void); unsigned int send (char *pBuff,unsigned int len); unsigned int recv (char *pBuff, int len); void close (); private: io_service * io_service_; tcp::socket * socket_; }; #endif
ClientSock.cpp
#include "ClientSock.h" CClientSock::CClientSock(tcp::endpoint& endpoint) { // io_service_ = new io_service(); socket_ = new tcp::socket(*io_service_); boost::system::error_code ec; socket_->connect(endpoint, ec); if (ec) { std::cerr << "Error: " << ec << std::endl; throw ec; } } CClientSock::~CClientSock(void) { // delete io_service_; delete socket_; } // unsigned int CClientSock:: send (char *pBuff,unsigned int len) { boost::asio::const_buffers_1 request(pBuff, len); return socket_->send(request); } // unsigned int CClientSock:: recv ( char *pBuff, int len) { // //return boost::asio::read( socket_ ,boost::asio::buffer(static_cast<void*>(pBuff),len)); return 0; } // void CClientSock::close () { // socket_->close(); }
2)客户端传输封装类
ClientTransfer.h
#ifndef __CLIENT_TRANSFER__ #define __CLIENT_TRANSFER__ #include <boost/bind.hpp> #include <boost/thread.hpp> #include <boost/filesystem.hpp> #include <boost/timer.hpp> using namespace boost; #include <iostream> #include <string> #include <vector> #include <algorithm> #include <fstream> #include <List> using namespace std; #include "ClientSock.h" typedef struct { unsigned char *pBuff; //图像Buff指针 unsigned int length; //Buff总的大小 unsigned int valid_len; //有效数据的长度 }image_buff; #define MAX_BUFF_SIZE 1024*1024 #define MAX_BUFF_NUMBER 4 class CClientTransfer { public: CClientTransfer(std::string jpgPath); virtual ~CClientTransfer(void); void StartThread(void); void WaitThreadTerminate(void); private: bool GetJpegData(image_buff &imageBuff,std::string jpgPath); bool scanFilesUseRecursive(const string& rootPath,vector<string>& container=*(new vector<string>())); static void ReadJpegThread(void *arg); void ReadJpegData(void); boost::thread m_ReadThread; //读Jpeg线程 // static void SendJpegThread(void *arg); void SendJpegData(void); boost::thread m_SendThread; //发送Jpeg图片 std::string m_jpgPath; vector<string> m_vecFilePic; //jpeg目录 unsigned int m_PicId; //jpeg vector 容器中的第几张图片 //图像Buff 空List list <image_buff> IdleList; //发送图像Buff List list <image_buff> SendBuffList; bool m_bQuit; //线程结束标志 //多线程同步 typedef boost::try_mutex MUTEX; typedef MUTEX::scoped_try_lock LOCK; MUTEX iomutex; //asio CClientSock *m_sock; }; #endif
ClientTransfer.cpp
#include "ClientTransfer.h" //参数jpgPath:保存jpg序列的目录 CClientTransfer::CClientTransfer(std::string jpgPath) { m_jpgPath=jpgPath; //分配内存 image_buff desc; desc.pBuff = NULL; desc.length = MAX_BUFF_SIZE; desc.valid_len = 0; for (int i = 0; i < MAX_BUFF_NUMBER; ++i) { desc.pBuff = new unsigned char [MAX_BUFF_SIZE]; IdleList.push_back(desc); } m_PicId=0; m_bQuit=false; ip::tcp::endpoint ep(ip::address_v4::from_string("127.0.0.1"), 8100); m_sock=new CClientSock(ep); } // CClientTransfer::~CClientTransfer(void) { // std::list<image_buff>::iterator iter; //注意加锁 LOCK lock(iomutex); for (iter = SendBuffList.begin();iter != SendBuffList.end(); ++iter) { delete []((*iter).pBuff); (*iter).pBuff = NULL; } SendBuffList.clear(); for (iter = IdleList.begin();iter != IdleList.end(); ++iter) { delete []((*iter).pBuff); (*iter).pBuff = NULL; } IdleList.clear(); if (m_sock) { delete m_sock; m_sock=NULL; } } //函数功能:从jpgPath中获取jpeg数据 bool CClientTransfer::GetJpegData(image_buff &imageBuff,std::string jpgPath) { // bool status=false; ifstream ifs(jpgPath.c_str(), std::ios_base::binary); unsigned int nJpgLen = 0; if ( ifs.is_open() ) { ifs.seekg( 0 , std::ios::end ); nJpgLen = ifs.tellg(); if ( nJpgLen > 0 &&nJpgLen<=imageBuff.length) { ifs.seekg( 0 , std::ios::beg ); ifs.read((char*)imageBuff.pBuff,nJpgLen); imageBuff.valid_len=nJpgLen; status=true; }else { status=false; } ifs.close(); } return status; } //函数功能:从文件夹下枚举.jpg图片,放入到jpg的vector容器中 bool CClientTransfer::scanFilesUseRecursive(const string& rootPath,vector<string>& container) { // namespace fs = boost::filesystem; fs::path fullpath (rootPath, fs::native); vector<string> &ret = container; //判断路径是否存在 if(!fs::exists(fullpath)) { return false; } fs::recursive_directory_iterator end_iter; for(fs::recursive_directory_iterator iter(fullpath);iter!=end_iter;iter++) { try{ //是文件名&&.jpg if (!fs::is_directory( *iter ) &&(fs::extension(*iter)==".jpg"||fs::extension(*iter)==".JPG"||fs::extension(*iter)==".jpeg"||fs::extension(*iter)==".JPEG") ) { // std::string jpgstring=iter->path().string(); ret.push_back(jpgstring); std::cout << *iter << " is a file" << std::endl; } } catch ( const std::exception & ex ) { std::cerr << ex.what() << std::endl; continue; } } return true; } // void TransferThread(image_buff &imageBuff) { // printf("imageBuff len=%d\n",imageBuff.valid_len); } void CClientTransfer::ReadJpegThread(void *arg) { // CClientTransfer *pThis=(CClientTransfer *)arg; while (1) { if (pThis->m_bQuit) { break; } pThis->ReadJpegData(); } } //读jpeg数据 void CClientTransfer::ReadJpegData(void) { image_buff struImageBuff; memset(&struImageBuff,0,sizeof(struImageBuff)); //多线程必须加锁 if (IdleList.size()>0) { LOCK lock(iomutex); //锁定mutex struImageBuff=IdleList.front(); IdleList.pop_front(); }else { //使用boost ::时间库替换 boost::this_thread::sleep(boost::posix_time::milliseconds(5)); return; } LOCK lock(iomutex); //锁定mutex if (m_PicId<m_vecFilePic.size()) { // std::string jpgPath=m_vecFilePic[m_PicId].c_str(); if (GetJpegData(struImageBuff,jpgPath)) { m_PicId++; SendBuffList.push_back(struImageBuff); cout<<"read a jpeg image"<<endl; }else { IdleList.push_back(struImageBuff); } }else { IdleList.push_back(struImageBuff); cout<<"all jpeg image read complete"<<endl; } } //发送Jpeg数据 void CClientTransfer::SendJpegThread(void *arg) { // CClientTransfer *pThis=(CClientTransfer *)arg; while (1) { if (pThis->m_bQuit) { // break; } pThis->SendJpegData(); } } void CClientTransfer::SendJpegData(void) { // image_buff struImageBuff; memset(&struImageBuff,0,sizeof(struImageBuff)); //加锁 if (SendBuffList.size()>0) { LOCK lock(iomutex); //锁定mutex struImageBuff=SendBuffList.front(); SendBuffList.pop_front(); }else { boost::this_thread::sleep(boost::posix_time::milliseconds(5)); return; } //调用boost Asio发送数据 m_sock->send((char*)struImageBuff.pBuff,struImageBuff.valid_len); LOCK lock(iomutex); //锁定mutex IdleList.push_back(struImageBuff); cout<<"send a jpeg image"<<endl; } void CClientTransfer::StartThread(void) { scanFilesUseRecursive(m_jpgPath,m_vecFilePic); m_ReadThread= boost::thread(boost::bind(&CClientTransfer::ReadJpegThread,this)); m_SendThread=boost::thread(boost::bind(&CClientTransfer::SendJpegThread,this)); } // void CClientTransfer::WaitThreadTerminate(void) { // m_ReadThread.join(); m_SendThread.join(); }
3)客户端传输类调用代码
client.cpp
#include "stdafx.h" #include "ClientTransfer.h" int _tmain(int argc, _TCHAR* argv[]) { CClientTransfer clientTransfer("D:\\Tool\\"); clientTransfer.StartThread(); clientTransfer.WaitThreadTerminate(); return 0; }
服务端:
1)做一个简单的服务器,对jpeg图片进行接收,服务器代码如下:
// boostAsio.cpp : 定义控制台应用程序的入口点。 // #include "stdafx.h" #include <boost/asio.hpp> #include <boost/bind.hpp> #include <boost/shared_ptr.hpp> #include <boost/enable_shared_from_this.hpp> #include <iostream> using boost::asio::ip::tcp; #define max_len 1024*1024 class clientSession :public boost::enable_shared_from_this<clientSession> { public: clientSession(boost::asio::io_service& ioservice) :m_socket(ioservice) { memset(data_,'\0',sizeof(data_)); } ~clientSession() { // } tcp::socket& socket() { return m_socket; } void start() { //boost::asio::async_write(m_socket, // boost::asio::buffer("link successed!"), // boost::bind(&clientSession::handle_write,shared_from_this(), // boost::asio::placeholders::error)); m_socket.async_read_some(boost::asio::buffer(data_,max_len),boost::bind(&clientSession::handle_read,shared_from_this() ,boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } private: void handle_write(const boost::system::error_code& error) { if(error) { m_socket.close(); } } void handle_read(const boost::system::error_code& error,size_t bytes_transferred) { if(!error) { //std::cout << data_ << std::endl; std::cout<<"read jpeg size="<<bytes_transferred<<std::endl; m_socket.async_read_some(boost::asio::buffer(data_,max_len),boost::bind(&clientSession::handle_read,shared_from_this() ,boost::asio::placeholders::error,boost::asio::placeholders::bytes_transferred)); } else { m_socket.close(); } } private: tcp::socket m_socket; char data_[max_len]; }; class serverApp { typedef boost::shared_ptr<clientSession> session_ptr; public: serverApp(boost::asio::io_service& ioservice,tcp::endpoint& endpoint) :m_ioservice(ioservice), acceptor_(ioservice,endpoint) { session_ptr new_session(new clientSession(ioservice)); acceptor_.async_accept(new_session->socket(), boost::bind(&serverApp::handle_accept,this,boost::asio::placeholders::error, new_session)); } ~serverApp() { } private: void handle_accept(const boost::system::error_code& error,session_ptr& session) { if(!error) { std::cout << "get a new client!" << std::endl; session->start(); session_ptr new_session(new clientSession(m_ioservice)); acceptor_.async_accept(new_session->socket(), boost::bind(&serverApp::handle_accept,this,boost::asio::placeholders::error, new_session)); } } private: boost::asio::io_service& m_ioservice; tcp::acceptor acceptor_; }; int main(int argc , char* argv[]) { boost::asio::io_service myIoService; short port = 8100; tcp::endpoint endPoint(tcp::v4(),port); serverApp sa(myIoService,endPoint); myIoService.run(); return 0; }