这节介绍Java实现多线程的方法,我们都知道在单线程下载中是通过httpconnection打开一个流文件,然后从流文件中读取并写到文件中。但是单线程下载并不能充分利用CPU资源,那好下面来介绍一下多线程下载:
首先要清楚多线程下载和单线程下载的不同在哪里(原理),多线程下载又会存在哪些问题?
可以肯定的是在多线程模式下 多个线程依次从网络流中读取一段数据装入一个类似容器的东西里面去,下一个线程从上一个线程结束的位置继续读取,就这样一直到文件读取完毕。整个过程看似很简单,但是细想之后,又觉得好像那里不对。
1 多个线程每次下载的大小
2 每个线程下载的开始位置和结束位置
3 多个线程同时下载如何衔接,下一个线程如何知道其他线程下载的进度
4 每个线程下载完后将数据保存在哪里
5 由于是多线程下载 那么肯定存在一个问题,同时操作一个文件,我们都知道文件属于临界资源,在同一时刻只能一个线程访问,好像又涉及了同步锁的问题
6 对于活动(activity)如何实时得知线程的进度,也就是整个文件下载的百分比
7 既然是断点续传 那么下载记录如何保存
这样一列举好像问题还真的很多。。。
下面我们一个个来看
多个线程同时下载 下载记录保存在文件中好像不合适,因为文件并发访问能力差,那就使用数据库保存吧!
创建数据库
public class DBOpenHelper extends SQLiteOpenHelper {
public DBOpenHelper(Context context) {
super(context, "downs.db", null, 1);
}
@Override
public void onCreate(SQLiteDatabase db) {
//Structure of database:Table name:filedownlog field:id,downpath:currently downloaded resources
//thread id:download thread, downlength:The final location of the thread download
db.execSQL("CREATE TABLE IF NOT EXISTS filedownlog " +
"(id integer primary key autoincrement," +
" downpath varchar(100)," +
" threadid INTEGER, downlength INTEGER)");
}
@Override
public void onUpgrade(SQLiteDatabase db, int oldVersion, int newVersion) {
/*When the version number changed to call onUpgrade, where to delete the data table
* in the actual business is generally to carry out data backup*/
db.execSQL("DROP TABLE IF EXISTS filedownlog");
onCreate(db);
}
}
数据库创建完了总需要一个数据库接口类吧,用来提供访问接口
public class FileService {
//Declare the database manager
private DBOpenHelper openHelper;
//According to the context object instantiated in the method of constructing the database manager
public FileService(Context context) {
openHelper = new DBOpenHelper(context);
}
/**
* Get the specified URI each thread has downloaded file size
* @param path
* @return
* */
public Map<Integer, Integer> getData(String path)
{
//Get readable database handle, usually internal implementation returns are writable database handle
SQLiteDatabase db = openHelper.getReadableDatabase();
//According to the scene of the download path query all download data, return the Cursor to the first record before
Cursor cursor = db.rawQuery("select threadid, downlength from filedownlog where downpath=?",
new String[]{path});
//Create a hash table to hold each thread length of the downloaded file
Map<Integer,Integer> data = new HashMap<Integer, Integer>();
//From the first record traverse the Cursor object
cursor.moveToFirst();
while(cursor.moveToNext())
{
//The thread id and the thread has been download the length of the deposit to the data in the hash table
data.put(cursor.getInt(0), cursor.getInt(1));
data.put(cursor.getInt(cursor.getColumnIndexOrThrow("threadid")),
cursor.getInt(cursor.getColumnIndexOrThrow("downlength")));
}
cursor.close();//Close the cursor, releasing resources
db.close();
return data;
}
/**
* Save each thread has downloaded file size
* @param path The path of the download
* @param map Now the id and download the length of the collection
*/
public void save(String path,Map<Integer,Integer> map)
{
SQLiteDatabase db = openHelper.getWritableDatabase();
//More than open transaction, because here need to insert data
db.beginTransaction();
try{
//Using enhanced for loop iterates through the data collection
for(Map.Entry<Integer, Integer> entry : map.entrySet())
{
//Download path inserted at a specific thread ID already downloaded data
db.execSQL("insert into filedownlog(downpath, threadid, downlength) values(?,?,?)",
new Object[]{path, entry.getKey(), entry.getValue()});
}
//Set up a sign of a transaction is successful, if success is to commit the transaction, the transaction rollback if not call this method
//Is the above database operations revoked
db.setTransactionSuccessful();
}finally{
//End a transaction
db.endTransaction();
}
db.close();
}
/**
* Real-time update each thread has downloaded file size
* @param path
* @param map
*/
public void update(String path,int threadId,int pos)
{
SQLiteDatabase db = openHelper.getWritableDatabase();
//Download path update at a specific thread length of the downloaded file
db.execSQL("update filedownlog set downlength=? where downpath=? and threadid=?",
new Object[]{pos, path, threadId});
db.close();
}
/**
*When a file download is complete, delete the corresponding download records
*@param path
*/
public void delete(String path)
{
SQLiteDatabase db = openHelper.getWritableDatabase();
db.execSQL("delete from filedownlog where downpath=?", new Object[]{path});
db.close();
}
}
接下来就是创建下载线程类
public class DownloadThread extends Thread {
private File saveFile; //Data is saved to a file
private URL downUrl; //Download URL
private int block; //The size of the thread to download
private int threadId = -1; //Set the initialization thread id
private int downLength; //The length of the download data
private boolean finish = false; //Mark is the download is complete
private FileDownloadered downloader;
public DownloadThread(FileDownloadered downloader, URL downUrl
, File saveFile, int block, int downLength, int threadId) {
this.downUrl = downUrl;
this.saveFile = saveFile;
this.block = block;
this.downloader = downloader;
this.threadId = threadId;
this.downLength = downLength;
}
@Override
public void run() {
if(downLength < block){//Not the download is complete
try {
HttpURLConnection http = (HttpURLConnection) downUrl.openConnection();
http.setConnectTimeout(5 * 1000);
http.setRequestMethod("GET");
http.setRequestProperty("Accept", "image/gif, image/jpeg, image/pjpeg, image/pjpeg, application/x-shockwave-flash, application/xaml+xml, application/vnd.ms-xpsdocument, application/x-ms-xbap, application/x-ms-application, application/vnd.ms-excel, application/vnd.ms-powerpoint, application/msword, */*");
http.setRequestProperty("Accept-Language", "zh-CN");
http.setRequestProperty("Referer", downUrl.toString());
http.setRequestProperty("Charset", "UTF-8");
int startPos = block * (threadId - 1) + downLength;//starting position
int endPos = block * threadId -1;//end position
http.setRequestProperty("Range", "bytes=" + startPos + "-"+ endPos);//The scope of access to the entity data
http.setRequestProperty("User-Agent", "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.2; Trident/4.0; .NET CLR 1.1.4322; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729)");
http.setRequestProperty("Connection", "Keep-Alive");
InputStream inStream = http.getInputStream(); //Get the inputStream of the remote connection
byte[] buffer = new byte[10240]; //Set the local data cache
int offset = 0; //Read the amount of data at a time
print("Thread " + this.threadId + " start download from position "+ startPos); //Print thread start download location
RandomAccessFile threadfile = new RandomAccessFile(this.saveFile, "rwd");
threadfile.seek(startPos);
//Move the file pointer to the starting position
while (!downloader.getExited() && (offset = inStream.read(buffer, 0, 1024)) != -1) {
threadfile.write(buffer, 0, offset); //The data written to the file
downLength += offset; //Add data in a new thread has been written to a file to download in length
downloader.update(this.threadId, downLength); //The length of data update this thread has been download to the database and memory in the hash table
downloader.append(offset); //The new download data length added to the already download data in total length
}
threadfile.close();
inStream.close();
print("Thread " + this.threadId + " download finish");
this.finish = true; //Set up complete tag to true, regardless of the download is complete or download users to interrupt
} catch (Exception e) {
this.downLength = -1; //Set the length of this thread already download
print("Thread "+ this.threadId+ ":"+ e);
}
}
}
private static void print(String msg){
// Log.i(TAG, msg);
}
/**
* If the download is complete
* @return
*/
public boolean isFinish() {
return finish;
}
/**
* Have download content size
* @return If it returns a value of 1, on behalf of the failed download
*/
public long getDownLength() {
return downLength;
}
}
上面创建的是单个下载线程,但是多个线程同时访问,如何处理并发呢,线程之间如何衔接和协调,线程下载器出场了
在下载器里面会出现几个新的东西;一个是分段锁技术,一个是随机读写文件技术
当然这是Java设计者早就提供给我们的对象,我们只是负责使用即可,我这里只是简单的介绍一下原理:关于锁,在我的印象里好像是计算机操作系统里面讲过锁这个东西,还有就是Java入门的时候有同步这个东西 synchronized ,同步就是解决多个线程访问一个资源可能出现的问题,当一个线程访问某个资源的时候给这个资源上一个锁 禁止其他线程访问,当线程访问完后 打开锁,这时其他线程才能访问,但是给资源上锁 就会导致多线程访问缓慢 因为每次访问都要检查锁。我们都知道hashmap 是实现了hash算法的map集合,hashmap是线程不安全的而hashtable又是线程安全的(原理也是给资源上锁) ,那么分段锁又是什么?
分段锁
在原始的上锁技术中,将这个资源进行同步锁,同一时刻只允许一个线程访问,但是这样就会有一个漏洞,但一个资源里面有很多子资源,每个线程访问的子资源可能相同也可能相同,为什么不把子资源进行独立上锁呢 (这就好比一群人要进出(访问)一栋别墅,他们分别在别墅里面有自己的房间,但是某些资源是共享的,比如大厅,花园,游泳池等我们假设别墅里面的单个资源只能独享,以前是将整栋别墅上锁,同一时刻只能有一个人进入别墅,这样是很安全,但是在某些情况下,别墅里面的资源就存在浪费,比如进去的那个人就在自己房间,没有使用公共资源或者只是使用了一种公共资源,那么我们为什么不给每个单独资源上锁,再给整栋别墅都上锁,当一个人想进入别墅的时候,首先检查别墅是否上锁,如果别墅里面的公共资源都有人占用且想进去的这个人只想使用公共资源,那么这个别墅被锁,无法进入,如果他想先回到自己房间,然后看情况要不要去游泳,此时别墅对他来说没有上锁,但是游泳池是被锁的,他必须在自己房间休息,并等待游泳池解锁。当然了将单个锁进行分段,就会增加额外的开销,比如以前只需要一个别墅门卫,而现在不仅要门卫,还要有游泳池管理员,大厅管理员,花园管理员等,而且这些管理员之间要建立连接,以便随时查看全部的锁,提高访问效率而多谢开销也是值得的,我们只需要找到额外开销和分段带来的效率之间的平衡点)
public class FileDownloadered {
private static final String TAG = "File download class "; //Set up a check the mark of the log
private static final int RESPONSEOK = 200; //Set the response code is 200, on behalf of the access to success
private FileService fileService; //Access to the local database business Bean
private boolean exited; //stop the download
private Context context; //The context object of the program
private int downloadedSize = 0; //The length of the download file
private int fileSize = 0; //Began to file size
private DownloadThread[] threads; //According to the number of threads Settings download thread pool
private File saveFile; //Data saved to the local file
//下面的集合基于hashmap 用于对资源进行分段锁 伟大的Java
private Map<Integer, Integer> data = new ConcurrentHashMap<Integer, Integer>(); //The length of the cache is a piece of thread to download
private int block; //The length of each thread to download
private String downloadUrl; //The path of the download
/**
* Get the number of threads
*/
public int getThreadSize()
{
//return threads.length;
return 0;
}
/**
* Exit the download
* */
public void exit()
{
this.exited = true; //Set to true will exit the logo;
}
public boolean getExited()
{
return this.exited;
}
/**
* Get the size of the file
* */
public int getFileSize()
{
return fileSize;
}
/**
* The size of the cumulative download
* Use synchronous locking to solve the problem of concurrent access
* */
protected synchronized void append(int size)
{
//The real-time download download added to the total length of the length of the
downloadedSize += size;
}
/**
* Update the position of the specified thread last download
* @param ThreadId thread id
* @param pos The location of the last download
* */
protected synchronized void update(int threadId,int pos)
{
//To specify the thread id of the thread with the latest download length, the previous value will be overwritten
this.data.put(threadId, pos);
//Update the database set the length of the thread to download
this.fileService.update(this.downloadUrl, threadId, pos);
}
/**
* Build file downloader
* @param downloadUrl Download path
* @param fileSaveDir File directory
* @param threadNum Download the number of threads
* @return
*/
public FileDownloadered(Context context,String downloadUrl,File fileSaveDir,int threadNum)
{
try {
this.context = context;
this.downloadUrl = downloadUrl; //Path assignment for download
fileService = new FileService(this.context); //Bean class instantiation database operations business, need to pass a context
URL url = new URL(this.downloadUrl); //According to the download path to instantiate the URL
if(!fileSaveDir.exists()) fileSaveDir.mkdir(); //If the file does not exist specified directory, here you can create multiple directory
this.threads = new DownloadThread[threadNum]; //Download according to the number of threads created thread pool
HttpURLConnection conn = (HttpURLConnection) url.openConnection(); //Create a remote connection handle, here is not really connected
conn.setConnectTimeout(5000); //Set the connection timeout event for 5 seconds
conn.setRequestMethod("GET"); //Set the request mode to GET
//Set the client can receive media types
conn.setRequestProperty("Accept", "image/gif, image/jpeg, image/pjpeg, " +
"image/pjpeg, application/x-shockwave-flash, application/xaml+xml, " +
"application/vnd.ms-xpsdocument, application/x-ms-xbap," +
" application/x-ms-application, application/vnd.ms-excel," +
" application/vnd.ms-powerpoint, application/msword, */*");
conn.setRequestProperty("Accept-Language", "zh-CN"); //Set user language
conn.setRequestProperty("Referer", downloadUrl); //Set the source of the request page, to facilitate the service side to source statistics
conn.setRequestProperty("Charset", "UTF-8"); //Set up the client code
//Set the user agent
conn.setRequestProperty("User-Agent", "Mozilla/4.0 (compatible; MSIE 8.0; " +
"Windows NT 5.2; Trident/4.0; .NET CLR 1.1.4322; .NET CLR 2.0.50727;" +
" .NET CLR 3.0.04506.30; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729)");
conn.setRequestProperty("Connection", "Keep-Alive"); //Set up the connection way
conn.connect(); //And remote resources building is the link, but there is no return of data flow
printResponseHeader(conn); //Print collection returns the Http header fields
//To judgment, the returned a status code is used to check whether the request is successful, back to 200 when executing the following code
if(conn.getResponseCode() == RESPONSEOK)
{
this.fileSize = conn.getContentLength(); //According to the response file size
if(this.fileSize <= 0)throw new RuntimeException("Don't know the file size"); //File size less than or equal to zero when the runtime exception is thrown
String filename = getFileName(conn); //Access to the file name
this.saveFile = new File(fileSaveDir,filename); //According to the file directory and file name save the file
Map<Integer,Integer> logdata = fileService.getData(downloadUrl); //Access to download records
//If there is a download records
if(logdata.size() > 0)
{
//Traverse a collection of data, the data length each thread has been downloaded into the data
for(Map.Entry<Integer, Integer> entry : logdata.entrySet())
{
data.put(entry.getKey(), entry.getValue());
}
}
/*If the downloaded data on the number of threads and now set the number of threads at the
* same time, calculate all the total length field already downloaded data*/
if(this.data.size() == this.threads.length)
{
//Traverse each thread has downloaded data
for(int i = 0;i < this.threads.length;i++)
{
this.downloadedSize += this.data.get(i+1);
}
print("The length of the downloaded " + this.downloadedSize + "閿熸枻鎷烽敓琛楁枻鎷�");
}
//Use conditional operator and each thread length of the need to download the data
this.block = (this.fileSize % this.threads.length) == 0?
this.fileSize / this.threads.length:
this.fileSize / this.threads.length + 1;
}else{
//Print the error information
print("The server response errors :" + conn.getResponseCode() + conn.getResponseMessage());
throw new RuntimeException("Server error feedback ");
}
}catch (Exception e)
{
print(e.toString()); //printer error
throw new RuntimeException("Unable to connect URL");
}
}
/**
* Get File Names
* */
private String getFileName(HttpURLConnection conn)
{
//Obtained from the download path string file name
String filename = this.downloadUrl.substring(this.downloadUrl.lastIndexOf('/') + 1);
if(filename == null || "".equals(filename.trim())){ //If you do not get the file name
for(int i = 0;;i++) //Using infinite loop through
{
String mine = conn.getHeaderField(i); //The head of the particular index was obtained from the returned flow field values
if (mine == null) break; //If the traversal to return to exit is at the end of the first cycle
//Access to the content - disposition to return to the field, it may contain the file name
if("content-disposition".equals(conn.getHeaderFieldKey(i).toLowerCase())){
//Use regular expressions to query the file name
Matcher m = Pattern.compile(".*filename=(.*)").matcher(mine.toLowerCase());
if(m.find()) return m.group(1); //If there are in line with the regular expression rules of string ,return
}
}
filename = UUID.randomUUID()+ ".tmp";//If you haven't found, the default for a file name
/*By the card identification number (each card has a unique identification number)
* and the CPU time to generate a unique number of a 16 bytes of the binary as a file name */
}
return filename;
}
/**
* Start the download file
* @param listener Listen to download the change of the quantity, if you don't need to know the number of real-time, can be set to null
* @return Have downloaded the file size
* @throws Exception
*/
//To download, if there is one exception, throw an exception to the caller
public int download(DownloadProgressListener listener) throws Exception{
try {
//随机读写文件类 支持在任意位置读写文件 打开方式为rwd
RandomAccessFile randOut = new RandomAccessFile(this.saveFile, "rwd");
//Set the file size
if(this.fileSize>0) randOut.setLength(this.fileSize);
randOut.close(); //Close the file, make the Settings to take effect
URL url = new URL(this.downloadUrl);
if(this.data.size() != this.threads.length){
//If the original have not download or download the original number of threads and now do not match the number of threads
this.data.clear();
//Traverse the thread pool
for (int i = 0; i < this.threads.length; i++) {
this.data.put(i+1, 0);//Initialize each thread has downloaded data length of 0
}
this.downloadedSize = 0; //The length of the set already downloaded to 0
}
for (int i = 0; i < this.threads.length; i++) {//Open a thread to download
int downLength = this.data.get(i+1);
//By a specific thread id to get the thread has been downloaded data length
//To determine whether a thread has the download is complete, or continue to download
if(downLength < this.block && this.downloadedSize<this.fileSize){
//Initialize a specific thread id
this.threads[i] = new DownloadThread(this, url, this.saveFile, this.block, this.data.get(i+1), i+1);
//Set the thread priority ,Thread.NORM_PRIORITY = 5;
//Thread.MIN_PRIORITY = 1;Thread.MAX_PRIORITY = 10,The higher the value, the greater the priority
this.threads[i].setPriority(7);
this.threads[i].start();
}else{
this.threads[i] = null; //Indicates that the thread the download task has been completed
}
}
fileService.delete(this.downloadUrl);
//If there is a download records, delete them, and then add again
fileService.save(this.downloadUrl, this.data);
//The download of real-time data to the database
boolean notFinish = true;
//Download the unfinished
while (notFinish) {
// All threads circulation to judge whether the download is complete
Thread.sleep(900);
notFinish = false;
//Assume that all threads the download is complete
for (int i = 0; i < this.threads.length; i++){
if (this.threads[i] != null && !this.threads[i].isFinish()) {
//If it is found that the thread is not download
notFinish = true;
//Set the sign to download is not complete
if(this.threads[i].getDownLength() == -1){
//If the download fails, again on the basis of the downloaded data length to download
//Start download threads, set the priority of a thread
this.threads[i] = new DownloadThread(this, url, this.saveFile, this.block, this.data.get(i+1), i+1);
this.threads[i].setPriority(7);
this.threads[i].start();
}
}
}
if(listener!=null) listener.onDownloadSize(this.downloadedSize);
//The length of the notice has been downloaded data
}
if(downloadedSize == this.fileSize) fileService.delete(this.downloadUrl);
//The download is complete delete records
} catch (Exception e) {
print(e.toString());
throw new Exception("File download abnormal ");
}
return this.downloadedSize;
}
/**
* Get the Http response header fields
* @param http
* @return
*/
public static Map<String, String> getHttpResponseHeader(HttpURLConnection http) {
//LinkedHashMap guarantee written and convenience of the same order, but allows null values
Map<String, String> header = new LinkedHashMap<String, String>();
//Using wireless loop here, because I don't know the number of header fields
for (int i = 0;; i++) {
String mine = http.getHeaderField(i); //Access to the i the value of the field
if (mine == null) break; //No value explain header fields has been finished, use the break out of circulation
header.put(http.getHeaderFieldKey(i), mine); //Get the size I key fields
}
return header;
}
/**
* Print the Http header fields
* @param http
*/
public static void printResponseHeader(HttpURLConnection http){
//Get the HTTP response header fields
Map<String, String> header = getHttpResponseHeader(http);
//Using enhanced for loop iterates through the value of the header fields, the traversal cycle sequence is the same as the input tree stand
for(Map.Entry<String, String> entry : header.entrySet()){
//When the key is to obtain values, if not it is an empty string
String key = entry.getKey()!=null ? entry.getKey()+ ":" : "";
print(key+ entry.getValue()); //Print button and worth
}
}
/**
* print information
* @param msg Information string
* */
private static void print(String msg) {
Log.i(TAG, msg);
}
}
下面解决这样一个问题,如何实时获取线程下载的进度,我们这里采用接口,也就是创建一个监听器(有点类似按钮监听)来实时得到下载结果
public interface DownloadProgressListener {
public void onDownloadSize(int downloadedSize);
}
如果你仔细看过上面的下载器的代码,会发现一个处理监听的地方。 这里也说明了如何基于接口编程,(将一个监听的接口作为参数传递给下载器,在合适的地方 将下载的结果传递给接口中的方法,那么在使用这个下载器下载的时候,势必会传递一个接口进去并实现接口中的方法,这样就可以得到接口中的参数)
下面创建一个任务(在活动中)
开启一个新线程去处理任务 在监听器里发送消息
在主线程里处理消息
private final class DownloadTask implements Runnable {
private String path;
private File saveDir;
private FileDownloadered loader;
public DownloadTask(String path, File saveDir) {
this.path = path;
this.saveDir = saveDir;
}
public void exit() {
if (loader != null)
loader.exit();
}
public void run() {
try {
loader = new FileDownloadered(WifiDownloadActivity.this, path, saveDir, 5);
download_bar.setMax(loader.getFileSize());
loader.download(new DownloadProgressListener() {
public void onDownloadSize(int size) {
Message msg = new Message();
msg.what = PROCESSING;
msg.getData().putInt("size", size);
handler.sendMessage(msg);
}
});
} catch (Exception e) {
e.printStackTrace();
handler.sendMessage(handler.obtainMessage(FAILURE));
}
}
}
至此整个多线程下载就结束了,但是关于Java多线程的使用并没有结束。