1. Object 类作为锁,缓存队列>1
package lock91_reentrant_procon;
import java.io.ObjectInputStream;
/**
* Created by lenovo on 2018/5/11.
*/
public class CommonService {
private Product[] buffer;
public CommonService(int length) {
buffer = new Product[length];
}
private int nowCount = 0;
private int dataBeginIndex = 0;
private int dataInsertIndex = 0;
private Object lock = new Object();
public void producer(Product product) {
synchronized (lock) {
try {
while (nowCount == buffer.length) {
System.out.println("producer: " + Thread.currentThread().getName() + "等待...");
lock.wait();
}
buffer[dataInsertIndex] = product;
System.out.println("producer: " + Thread.currentThread().getName() + "插入的索引为:" + dataInsertIndex + ",剩余产品:" + (nowCount + 1));
lock.notify();
dataInsertIndex = (dataInsertIndex + 1) % buffer.length;
nowCount++;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public Product consumer() {
Product product = null;
synchronized (lock) {
try {
while (nowCount == 0) {
System.out.println("consumer: " + Thread.currentThread().getName() + "等待...");
lock.wait();
}
product = buffer[dataBeginIndex];
lock.notify();
System.out.println("consumer: " + Thread.currentThread().getName() + "消费的索引为:" + dataBeginIndex + ",剩余产品数量:" + (nowCount - 1));
dataBeginIndex = (dataBeginIndex + 1) % buffer.length;
nowCount--;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return product;
}
}
package lock91_reentrant_procon;
import java.io.Serializable;
import java.util.Date;
/**
* Created by lenovo on 2018/5/11.
*/
public class CommonMain {
public static void main(String[] args) throws InterruptedException {
CommonService commonService = new CommonService(15);
Runnable consumerRunnable = new Runnable() {
@Override
public void run() {
commonService.consumer();
}
};
Runnable producerRunnable = new Runnable() {
@Override
public void run() {
Product product = new Product();
product.setName("产品名(" + Thread.currentThread().getName() + ")");
commonService.producer(product);
}
};
Thread[] consumer = new Thread[50];
Thread[] producer = new Thread[50];
for (int i = 0; i < 50; i++) {
consumer[i] = new Thread(consumerRunnable);
producer[i] = new Thread(producerRunnable);
consumer[i].setName("consumer" + i);
producer[i].setName("producer" + i);
}
for (int i = 0; i < 25; i++) {
producer[i].start();
}
for (int i = 0; i < 25; i++) {
consumer[i].start();
}
for (int i = 25; i < 50; i++) {
producer[i].start();
}
for (int i = 25; i < 50; i++) {
consumer[i].start();
}
Thread.sleep(5000);
}
}
输出
producer: producer0插入的索引为:0,剩余产品:1
producer: producer33插入的索引为:1,剩余产品:2
producer: producer2插入的索引为:2,剩余产品:3
producer: producer3插入的索引为:3,剩余产品:4
producer: producer4插入的索引为:4,剩余产品:5
producer: producer5插入的索引为:5,剩余产品:6
producer: producer6插入的索引为:6,剩余产品:7
producer: producer7插入的索引为:7,剩余产品:8
producer: producer8插入的索引为:8,剩余产品:9
producer: producer9插入的索引为:9,剩余产品:10
producer: producer10插入的索引为:10,剩余产品:11
producer: producer11插入的索引为:11,剩余产品:12
producer: producer12插入的索引为:12,剩余产品:13
producer: producer13插入的索引为:13,剩余产品:14
producer: producer14插入的索引为:14,剩余产品:15
producer: producer15等待...
producer: producer16等待...
producer: producer17等待...
2. ReentrantLock 和 Condition 实现
package lock91_reentrant_procon;
import java.io.Serializable;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* Created by lenovo on 2018/5/11.
*/
public class Service {
private ReentrantLock lock = new ReentrantLock();
private Condition consumerCondition = lock.newCondition();
private Condition producerCondition = lock.newCondition();
private Product[] buffer;
private int nowCount = 0; //
private int dataBeginIndex = 0; //第一条数据在 buffer中的索引,初始时为0,当生产者生产第一个时,才会调用
private int dataInsertIndex = 0; // 插入数据应放置的索引位置
public Service(int bufferLength) {
buffer = new Product[bufferLength]; // 指定缓存大小
}
public Product consumer() {
Product product = null;
try {
lock.lock();
// System.out.println(Thread.currentThread().getName() + "--start-------------------");
while (nowCount == 0) {
System.out.println(Thread.currentThread().getName() + " 等待...");
consumerCondition.await();
}
producerCondition.signal();
product = buffer[dataBeginIndex];
System.out.println(Thread.currentThread().getName() + "消费的索引为 : " + dataBeginIndex + ",剩余" + (nowCount - 1));
dataBeginIndex = (dataBeginIndex + 1) % buffer.length;
nowCount--;
} catch (InterruptedException i) {
i.printStackTrace();
} finally {
// System.out.println(Thread.currentThread().getName() + "--end---------------------");
lock.unlock();
}
return product;
}
public void producer(Product product) {
try {
lock.lock();
// System.out.println(Thread.currentThread().getName() + "--start-------------------");
while (nowCount == buffer.length) {
System.out.println(Thread.currentThread().getName() + " 等待...");
producerCondition.await();
}
consumerCondition.signal();
System.out.println(Thread.currentThread().getName() + " 插入的索引为 : " + dataInsertIndex + ", 剩余 " + (nowCount + 1));
buffer[dataInsertIndex] = product;
dataInsertIndex = (dataInsertIndex + 1) % buffer.length;
nowCount++;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// System.out.println(Thread.currentThread().getName() + "--end---------------------");
lock.unlock();
}
}
}
package lock91_reentrant_procon;
/**
* Created by lenovo on 2018/5/11.
*/
public class Product {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Product{" +
"name='" + name + '\'' +
'}';
}
}
package lock91_reentrant_procon;
import reentrant.Run;
import java.util.Date;
/**
* Created by lenovo on 2018/5/11.
*/
public class Main {
public static void main(String[] args) {
Service service = new Service(20);
/*
单一生产者、消费者,只有一个生产者线程、消费者线程,每个生产者/消费者可以生产/消费多次
Thread consumer = new Thread(() -> {
while (true) {
Product product = service.consumer();
System.out.println("消费的产品为: "+product.getName());
}
});
Thread producer = new Thread(() -> {
int count = 1;
while (true) {
Product product = new Product();
product.setName("产品" + ++count);
service.producer(product);
}
});
consumer.start();
producer.start();
*/
Runnable consumerRunnable = new Runnable() {
@Override
public void run() {
Product product = service.consumer();
System.out.println(Thread.currentThread().getName()+" 消费产品->"+product.getName());
}
};
Runnable producerRunnable = new Runnable() {
@Override
public void run() {
Product product = new Product();
product.setName("product of " + Thread.currentThread().getName());
service.producer(product);
}
};
Thread[] consumers = new Thread[50];
Thread[] producers = new Thread[50];
for (int i = 0; i < 50; i++) {
consumers[i] = new Thread(consumerRunnable);
producers[i] = new Thread(producerRunnable);
consumers[i].setName("consumer" + i);
producers[i].setName("producer" + i);
}
for (int i = 0; i < 50; i++) {
producers[i].start();
consumers[i].start();
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
输出
consumer10消费的索引为 : 8,剩余0
consumer10 消费产品->product of producer9
producer11 插入的索引为 : 9, 剩余 1
consumer11消费的索引为 : 9,剩余0
consumer11 消费产品->product of producer11
producer12 插入的索引为 : 10, 剩余 1
consumer12消费的索引为 : 10,剩余0
consumer12 消费产品->product of producer12
consumer13 等待...
producer14 插入的索引为 : 11, 剩余 1
consumer13消费的索引为 : 11,剩余0
consumer13 消费产品->product of producer14
consumer14 等待...
consumer15 等待...
producer16 插入的索引为 : 12, 剩余 1
consumer17消费的索引为 : 12,剩余0
consumer14 等待...
consumer17 消费产品->product of producer16
producer17 插入的索引为 : 13, 剩余 1
网友评论