Callable and Future
- 一般来说 Runnable 使用来包装一段代码,使之在另外一个线程工作。这种方式的局限性便在于不在执行过程中返回一个结果。想要获得返回结果的唯一方法便是在外部定义一个变量。
- Callable 是在java5 被引入的,用来执行一段异步代码的。Callable有一个基础方法叫做call。call方法比之runnable 方法,有一个额外的功能,返回结果并且允许抛出检查异常。
callable 返回的结果通常会封装成一个Future对象;
Callable Interface
public interface Callable<V> {
V call() throws Exception;
}
Future Interface
interface Future<V> {
V get();
V get(long timeout, TimeUnit unit);
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
}
示例
public class ComplexCalculator implements Callable<String> {
@Override
public String call() throws Exception {
// just sleep for 10 secs to simulate a lengthy computation
Thread.sleep(10000);
System.out.println("Result after a lengthy 10sec calculation");
return "Complex Result"; // the result
}
}
public static void main(String[] args) throws Exception {
ExecutorService es = Executors.newSingleThreadExecutor();
System.out.println("Time At Task Submission : " + new Date());
Future<String> result = es.submit(new ComplexCalculator());
// the call to Future.get() blocks until the result is available.So //we are in for about a 10 sec wait now
System.out.println("Result of Complex Calculation is : " +result.get());
System.out.println("Time At the Point of Printing the Result : " + new Date());
}
Future 方法介绍
- get() 获取Future 执行完之后的实际结果
- get(long timeout, TimeUnit unit)定义最大超时时间
- cancel(mayInterruptIfRunning) mayInterruptIfRunning 变量是用来标志,如果这个task 已经启动或者正在运行,是否可以停止
- isDone() 用来检查这个任务是否执行完毕
- isCancelled() 用来检查 我们取消的task 是否已经被取消。
CountDownLatch
- CountDownLatch 是一种同步工具,让你一个或者多个线程能够等待另一组线程执行完操作。
- CountDownLatch 初始化需要有固定的数量
- await方法将会一直阻塞,如果countDown() 调用次数依然没有超过初始化的givennumber,在countdown 次数执行完毕之后,所有等待的线程都会被释放,在此之后的任何调用都会迅速返回结果。
- 这是一个一次性策略,如果你需要重新设置数量没那么请使用CyclicBarrier。
关键方法
- public void countDown()
会导致当前线程await 只到latch 数到零。除非阻塞线程(非当前线程)被阻塞。 - public void countDown()
对当前的latch 的数量进行--1,当数量达到0 时候释放所有等待的线程·
示例代码
import java.util.concurrent.*;
class DoSomethingInAThread implements Runnable {
CountDownLatch latch;
public DoSomethingInAThread(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
try {
System.out.println("Do some thing");
latch.countDown();
} catch (Exception err) {
err.printStackTrace();
}
}
}
public class CountDownLatchDemo {
public static void main(String[] args) {
try {
int numberOfThreads = 5;
if (args.length < 1) {
System.out.println("Usage: java CountDownLatchDemo numberOfThreads");
return;
}
try {
numberOfThreads = Integer.parseInt(args[0]);
} catch (NumberFormatException ne) {
}
CountDownLatch latch = new CountDownLatch(numberOfThreads);
for (int n = 0; n < numberOfThreads; n++) {
Thread t = new Thread(new DoSomethingInAThread(latch));
t.start();
}
latch.await();
System.out.println("In Main thread after completion of " + numberOfThreads + "
threads");
} catch (Exception err) {
err.printStackTrace();
}
}
}
备注:
- CountDownLatch 在主线程初始化,count为5
- 通过调用await 方法阻塞主线程.
- 总共创建了 五个DoSomethingInAThread对象,每个对象都通过执行countDown 减少 count。
- 一旦couter 变成0 那么主线程状态就会被重新恢复。
多线程基础
- 使用条件:如果你有很多任务要进行处理,而且这些任务不依赖当前的数据,那么你就可以使用多线程。
class CountAndPrint implements Runnable {
private final String name;
CountAndPrint(String name) {
this.name = name;
}
/** This is what a CountAndPrint will do */
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
System.out.println(this.name + ": " + i);
}
}
public static void main(String[] args) {
// Launching 4 parallel threads
for (int i = 1; i <= 4; i++) {
// `start` method will call the `run` method
// of CountAndPrint in another thread
new Thread(new CountAndPrint("Instance " + i)).start();
}
// Doing some others tasks in the main Thread
for (int i = 0; i < 10000; i++) {
System.out.println("Main: " + i);
}
}
}
加锁同步措施
锁是同步机制的基础,一般用来同步代码块或者关键字
固有锁
int count = 0; // shared among multiple threads
public void doSomething() {
synchronized(this) {
++count; // a non-atomic operation
}
}
重入锁
int count = 0; // shared among multiple threads
Lock lockObj = new ReentrantLock();
public void doSomething() {
try {
lockObj.lock();
++count; // a non-atomic operation
} finally {
lockObj.unlock(); // sure to release the lock without fail
}
}
- 锁提供了一些固有锁没有的的功能。例如在加锁的同时允许被打断,或者在在一定条件下才允许加锁。
加锁的同时允许被打断
class Locky {
int count = 0; // shared among multiple threads
Lock lockObj = new ReentrantLock();
public void doSomething() {
try {
try {
lockObj.lockInterruptibly();
++count; // a non-atomic operation
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // stopping
}
} finally {
if (!Thread.currentThread().isInterrupted()) {
lockObj.unlock(); // sure to release the lock without fail
}
}
}
}
只有在符合某些条件下才会被加锁
public class Locky2 {
int count = 0; // shared among multiple threads
Lock lockObj = new ReentrantLock();
public void doSomething() {
boolean locked = lockObj.tryLock(); // returns true upon successful lock
if (locked) {
try {
++count; // a non-atomic operation
} finally {
lockObj.unlock(); // sure to release the lock without fail
}
}
}
}
Semaphore(信号灯)
Semaphore是一个高度同步的类,其维护一组许可,这些许可可以被请求以及释放。Semaphore 可以想象成一个一组许可的计数器。当有线程请求的时候计数器-1,有线程释放的时候计数器+1. 如果计数器 为0那么当有线程尝试发起请求的时候,那么这个线程将会一直阻塞,只到许可条件产生。
Semaphore初始化方式
-
Semaphore semaphore = new Semaphore(1);
-
Semaphore 构造器接受一个boolean 值 ,用来作为公平锁。如果设置成false,那么将不保证公平性,即不保证线程请求顺序。当 设置成true 时,将会保证线程调用顺序与请求顺序一直。
-
Semaphore semaphore = new Semaphore(1, true);
-
现在我们看一下用Semaphore 控制一组对象,Semaphore被用来提供阻塞功能,以便保证当getItem() 被调用的时候一直能够取到数据。
class Pool {
/*
* Note that this DOES NOT bound the amount that may be released!
* This is only a starting value for the Semaphore and has no other
* significant meaning UNLESS you enforce this inside of the
* getNextAvailableItem() and markAsUnused() methods
*/
private static final int MAX_AVAILABLE = 100;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
/**
* Obtains the next available item and reduces the permit count by 1.
* If there are no items available, block.
*/
public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}
/**
* Puts the item into the pool and add 1 permit.
*/
public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}
private Object getNextAvailableItem() {
// Implementation
}
private boolean markAsUnused(Object o) {
// Implementation
}
}
Runnable Object
Runnable interface 定义一个run()方法,在其中的代码都在新的线程执行。
Runnable对象可以传递给Thread 构造器。Thread 的 start 方法被调用便意味着 runnable 对象被执行了。
示例代码
public class HelloRunnable implements Runnable {
@Override
public void run() {
System.out.println("Hello from a thread");
}
public static void main(String[] args) {
new Thread(new HelloRunnable()).start();
}
}
public static void main(String[] args) {
Runnable r = () -> System.out.println("Hello world");
new Thread(r).start();
}
Runnable vs Thread subclass
- Runnable对象管理较为容易,Runnable比Thread更容易创建子类对象
- Thread 子类在简单应用比较容易,由于其必须继承Thread 限制了其的使用
- Runnable对象适合较为复杂的线程管理
创建一个死锁
死锁通常发生在两个互相等待结束的竞争操作,但是并没有。在java中一个lock 与每个对象都有关系。为了避免多线程同步修改一个对象这种情况,我们可以使用一个叫做synchronized 的关键字,不过还是要付出一些代价的。错误的使用synchronized可能会产生死锁。
考虑到有两个线程在操作一个实例,我们给线程定义为1,2,然后假设我们有两个资源文件R1,R2。线程11去请求R1而且需要R2,但是R2 被线程2 占用了 线程2 需要线程1,这样下去就比较刺激了 线程1 得到了R1 ,线程2 得到R2 。双方都在等待互相释放对象,那么神奇的死锁便产生了。
示例代码
public class Example2 {
public static void main(String[] args) throws InterruptedException {
final DeadLock dl = new DeadLock();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
dl.methodA();Java® Notes for Professionals 676
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
try {
dl.method2();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
t1.setName("First");
t2.setName("Second");
t1.start();
t2.start();
}
}
class DeadLock {
Object mLock1 = new Object();
Object mLock2 = new Object();
public void methodA() {
System.out.println("methodA wait for mLock1 " + Thread.currentThread().getName());
synchronized (mLock1) {
System.out.println("methodA mLock1 acquired " + Thread.currentThread().getName());
try {
Thread.sleep(100);
method2();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public void method2() throws InterruptedException {
System.out.println("method2 wait for mLock2 " + Thread.currentThread().getName());
synchronized (mLock2) {
System.out.println("method2 mLock2 acquired " + Thread.currentThread().getName());
Thread.sleep(100);
method3();
}
}
public void method3() throws InterruptedException {
System.out.println("method3 mLock1 "+ Thread.currentThread().getName());
synchronized (mLock1) {
System.out.println("method3 mLock1 acquired " + Thread.currentThread().getName());
}
}
}
创建一个Thread线程实例
在java 中主要有两种方式创建线程,一般来说之间新建一个线程然后执行比较容易。两种方式的主要区别就是在哪儿创建执行代码。 在java中一个线程就是一个对象,Thread的实例或者子类对象。所以第一种创建线程的方式就是创建其子类,然后复现run 方法。
示例代码
class MyThread extends Thread {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println("Thread running!");
}
}
}
MyThread t = new MyThread();
Thread 类还可以接受一个String 值作为构造方法,这在多线程编程调试的时候会特别好用
class MyThread extends Thread {
public MyThread(String name) {
super(name);
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println("Thread running! ");
}
}
}
MyThread t = new MyThread("Greeting Producer");
第二种创建线程的方式是用一个runnable 对象。然后就会在一个单独的线程中执行runnable中run 的操作。
Thread t = new Thread(aRunnable);
当然你还可以这样定义
Thread t = new Thread(operator::hardWork, "Pi operator");
一般来说,这两种创建方式你都可以放心使用,但是明智的做法应该采用后者。
下面我们来说一下本文要说的第四种
ThreadGroup tg = new ThreadGroup("Operators");
Thread t = new Thread(tg, operator::hardWork, "PI operator");
所以我们来总结一下,线程可以通过如下构造
Thread()
Thread(String name)
Thread(Runnable target)
Thread(Runnable target, String name)
Thread(ThreadGroup group, String name)
Thread(ThreadGroup group, Runnable target)
Thread(ThreadGroup group, Runnable target, String name)
Thread(ThreadGroup group, Runnable target, String name, long stackSize)
最后一个允许我们定义个一个期望的size 来创建新的线程
通常来说创建很多相同属性的线程会让你十分痛苦。这时候就该 java.util.concurrent.ThreadFactory 登场了。这个接口可以通过工厂模式极大的减少我们创建线程的过程,该接口使用十分简单newThread(Runnable)。
class WorkerFactory implements ThreadFactory {
private int id = 0;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "Worker " + id++);
}
}
原子操作
原子操作指的是一次操作期间这个对象只能被执行一次,在执行操作期间其他线程是没有机会观察或者改变状态。
我们来看一下负面案例
private static int t = 0;
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count
is for demonstration purposes.
for (int i = 0; i < 100; i++) {
executorService.execute(() -> {
t++;
System.out.println(MessageFormat.format("t: {0}", t));
});
}
executorService.shutdown();
}
这个案例存在两个问题。第一个就是++1 这个行为不是原子的。他包含多个操作:获取值,+1,赋值。这就是为什么当我们允许这个样例的时候,我们基本上看不到t:100.第二个问题两个线程有可能同时获取值然后+1赋值。当我们假设当前t=10,然后两个线程同时操作t。两个线程都对t 设置成11,既然这样后面执行的程序可以获取t的值了,即便线程1还没有关闭。
为了避免这种情况,我们一般使用java.util.concurrent.atomic.AtomicInteger,该类可以为我们提供很多原子操作。
private static AtomicInteger t = new AtomicInteger(0);
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count
is for demonstration purposes.
for (int i = 0; i < 100; i++) {
executorService.execute(() -> {
int currentT = t.incrementAndGet();
System.out.println(MessageFormat.format("t: {0}", currentT));
});
}
executorService.shutdown();
}
网友评论