JAVA多线程内容比较多,今天写完了第七篇,最后一篇了,历时将近两周,终于写完了。
1. 多线程断点续传原理
(1)多线程断点续传,实际上就是传递文件过程中断了,下次再重新传递的时候,记住中断的点,重新传递文件。
(2)但是如何记录断开的点呢?多线程的断点续传是通过判断客户端的文件长度来来判定传输的位置的,然后服务端设置到已经传输完毕的下一个位置开始继续传输
(3)然后拆分成了多份,多线程同时下载,下载过程中通过临时文件(拆成多少份需要多少个临时文件)记录当前下载完成的长度,多个文件都下载完成就完成了
(4)如果到了哪个位置即使断了,多份文件中记录着每份下载完成长度,重新执行多线程的时候仍然分成多份文件,每份文件独立判断是否完成,完成多少,跳转到未完成的位置开始继续下载
2. 断点续传的实现
这个是我网上找了几个例子,只有这个才是真正的断点续传,其他的帖子都是骗子,只有分份下载,压根没续传。
import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
public class MutilDownload {
private static String path = "https://mirrors.cnnic.cn/apache/tomcat/tomcat-8/v8.5.73/bin/apache-tomcat-8.5.73-deployer.tar.gz";
private static final int threadCount = 3;
private static int runningThread; //标识 正在运行的线程数量
public static void main(String[] args) {
try {
URL url = new URL(path);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
conn.setConnectTimeout(5000);
int responseCode = conn.getResponseCode();
if (responseCode == 200) {
int contentLength = conn.getContentLength();
runningThread = threadCount;
System.out.println("length" + contentLength);
RandomAccessFile rafAccessFile = new RandomAccessFile("D://WUDownloadCache//apache-tomcat-8.5.73-deployer.tar.gz", "rw");
rafAccessFile.setLength(contentLength);
int blockSize = contentLength / threadCount;
for (int i = 0; i < threadCount; i++) {
int startIndex = i * blockSize; //每个现成下载的开始位置
int endIndex = (i + 1) * blockSize - 1;// 每个线程的结束位置
if (i == threadCount - 1) {
//最后一个线程
endIndex = contentLength - 1;
}
new DownloadThread(startIndex, endIndex, i).start();
}
}
} catch (Exception e) {
}
}
private static class DownloadThread extends Thread {
private int startIndex;
private int endIndex;
private int threadId;
public DownloadThread(int startIndex, int endIndex, int threadId) {
this.startIndex = startIndex;
this.endIndex = endIndex;
this.threadId = threadId;
}
@Override
public void run() {
try {
URL url = new URL(path);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
conn.setConnectTimeout(5000);
File file = new File("D://WUDownloadCache//"+threadId + ".txt");
if(file.exists() && file.length() > 0) {
FileInputStream fis = new FileInputStream(file);
BufferedReader buff = new BufferedReader(new InputStreamReader(fis));
String lastPosition = buff.readLine();// 读取出来的内容就是上次下载的位置
int lastPos = Integer.parseInt(lastPosition);
System.out.println("线程id:" + threadId + "当前线程下载的位置:-----"+ lastPos);
startIndex = lastPos;
fis.close();
buff.close();
}
conn.setRequestProperty("Range", "bytes=" + startIndex + "-" + endIndex); //固定写法,请求部分资源
int responseCode = conn.getResponseCode(); // 206表示请求部分资源
if (responseCode == 206) {
RandomAccessFile rafAccessFile = new RandomAccessFile("D://WUDownloadCache//apache-tomcat-8.5.73-deployer.tar.gz", "rw");
rafAccessFile.seek(startIndex);
InputStream is = conn.getInputStream();
int len = -1;
byte[] buffer = new byte[1024];
int total = 0; // 代表当前线程下载的大小
while ((len = is.read(buffer)) != -1) {
rafAccessFile.write(buffer, 0, len);
total += len;
//断点续传, 保存当前线程下载的位置
int currentThreadPosition = startIndex + total; //当前线程下载的位置
// 存储当线程的下载五位置
RandomAccessFile raff = new RandomAccessFile("D://WUDownloadCache//"+threadId +".txt", "rwd");
raff.write(String.valueOf(currentThreadPosition).getBytes());
raff.close();
}
rafAccessFile.close();
System.out.println("线程" + threadId + "下载完成");
//删除临时文件
// File tempFile = new File(threadId + ".txt");
// if(tempFile.exists()) {
// file.delete();
// }
synchronized (MutilDownload.class) {
runningThread--;
if(runningThread == 0) {
for (int i = 0; i < threadCount; i++) {
File deleteFile = new File(i + ".txt");
deleteFile.delete();
}
}
}
}
} catch (Exception e) {
}
}
}
}
3. 实现生产者消费者模式
简易版本,只有发送和消费,中间用queue实现。
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import static java.lang.Thread.*;
public class ProducerAndConsumer {
public static final ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(20);
public static void main(String[] args) {
new Thread(new Producer()).start();
try {
sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(new Consumer()).start();
}
static class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 25; i++) {
try {
queue.add(i + "");
}catch(Exception e){
e.printStackTrace();
if(e.getClass().equals(IllegalStateException.class)&&e.getMessage().equals("Queue full")){
waitTest();
}
}
}
}
}
static class Consumer implements Runnable {
@Override
public void run() {
while(true) {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
String result = queue.poll();
System.out.println(result);
if(Objects.isNull(result)){
notifyTest();
}
}
}
}
public static synchronized void notifyTest() {
ProducerAndConsumer.class.notify();
}
public static synchronized void waitTest() {
try {
ProducerAndConsumer.class.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
4. 用Java实现阻塞队列
(1)什么是阻塞队列:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
(2)主要就是存入和取出的处理,存入时候需要判断是否队列已经满了,满了需要等待队列有数据被取出才能放入,如果存入数据要通知等待取数据的队列;取出时候,要判断队列是否为空,如果空了就要等待队列有数据再取出,如果取出数据了要通知等待存入数据的队列。
(3)另外注意一定要在存/取的时候加锁,避免线程并发存储或者取出数据导致的问题
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicInteger;
public class BlockingQueueTest<E> {
public static void main(String[] args) {
BlockingQueueTest<String> test = new BlockingQueueTest<String>(10);
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 15; i++) {
test.add(i+"");
System.out.println("add "+ i );
}
}
}).start();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("get started");
while (true) {
String get = test.get();
System.out.println("get "+ get );
}
}
}).start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private LinkedList<E> list = new LinkedList<>();
private static int MAX_SIZE;
private final static Object lock = new Object();
public BlockingQueueTest(int size) {
this.MAX_SIZE = size;
}
//原子性操作数字的加减
private AtomicInteger count = new AtomicInteger(0);
public synchronized void waitTest() {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void add(E e) {
synchronized (lock) {
//满了
while(MAX_SIZE == count.get()){
waitTest();
}
list.add(e);
//count++
count.incrementAndGet();
lock.notify();
}
}
public E get(){
synchronized (lock) {
//满了
while(0 == count.get()){
waitTest();
}
E e = list.removeFirst();
//count--
count.decrementAndGet();
lock.notify();
return e;
}
}
}
5. BlockingQueue介绍:
(1)上边54代码讲的够详细了,下面主要讲讲他的方法吧
(2)offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)
(3)offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败
(4)put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续
(5)poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null;
(6)poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
(7)take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入;
(8)drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
6. Java中的同步集合与并发集合有什么区别?
(1)同步集合,就是集合的方法都有synchronized的集合,线程安全,效率不高。
(2)并发集合,Concurrent下面的集合,比如ConcurrentHashMap通过分段的方式控制多线程并发冲突,也是线程安全,效率较高。
网友评论