首页 > Java > 高级篇 > 高新技术(九) JAVA多线程与并发库(二)
2014
07-21

高新技术(九) JAVA多线程与并发库(二)

1、线程池(Executors)

从Java5开始,Java内建支持线程池,Java5新增了一个Executors工厂类来产生线程池,该类包含如下几个静态工厂方法来创建线程池:

newFixedThreadPool(int nThreads):创建一个具有固定线程数的线程池

newCachedThreadPool():创建带缓存的线程池,即系统可以根据需要增加线程数

newSingleThreadExecutor():创建一个只有单线程的线程池

public static void main(String[] args) {
	//ExecutorService threadPool = Executors.newFixedThreadPool(3);
	//ExecutorService threadPool = Executors.newCachedThreadPool();
	ExecutorService threadPool = Executors.newSingleThreadExecutor();
	for(int i =0;i < 10;i++){
		final int taskId = i;
		threadPool.execute(new Runnable(){
			public void run() {
				System.out.println(Thread.currentThread().getName() + " execute task:" + taskId);
			}
		});
	}

	threadPool.shutdown();
}

newScheduledThreadPool(int corePoolSize)、newSingleThreadScheduledExecutor():创建带定时器的线程池。指定的延迟不可以用绝对时间表示(可以用指定的时间时间戳减去当前时间戳)。

ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(3);
for(int i =0;i < 10;i++){
	final int taskId = i;
	threadPool.schedule(new Runnable(){
		public void run() {
			System.out.println(Thread.currentThread().getName() + " execute task:" + taskId);
		}
	},
	5,
	TimeUnit.SECONDS);
}

threadPool.shutdown();

前三个方法返回一个ExecutorService对象,该对象代表一个线程池,它可以执行Runnable对象传入的任务;后两个方法返回一个ScheduledExecutorService线程池,它是ExecutorService的子类,它可以在指定延迟后执行线程任务。

2、Future & Callable接口

本节的内容用到的比较少,工作的时候如果有类似需求,能想到就行。

2.1 从Executors的submit()方法说起

在第1节中可以看到,线程池的execute()返回值为空,如果要想在线程任务结束时有返回值,在提交任务的时候就需要使用线程池的submit()方法,且线程任务需实现Callable接口,复写其中的call方法。返回的结果保存在Future对象中。

public static void main(String[] args) throws Exception{
	ExecutorService threadPool = Executors.newFixedThreadPool(1);

	Future<String> future =
		threadPool.submit(new Callable<String>() {
			public String call() throws Exception {
				Thread.sleep(2000);
				return "cqu";
			}
		});

	System.out.println("wait for result:" + future.get());
	threadPool.shutdown();
}

注意:Future取得的结果类型和Callable返回的结果类型必须一致,这是通过泛型来实现的。

2.2 CompletionService接口

CompletionService用于提交一组Callable任务,其take方法返回已完成的一个Callable任务对应的Future对象。(好比同时种了几块地的麦子,然后就等待收割。收割时,则是那块先成熟了,则先去收割哪块麦子。)

public static void main(String[] args) throws Exception{
	ExecutorService threadPool = Executors.newFixedThreadPool(3);

	CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(threadPool);
	for(int i = 0 ;i < 10 ;i ++){
		final int taskId = i;
		completionService.submit(new Callable<Integer>(){
			public Integer call() throws Exception {
				Thread.sleep(new Random().nextInt(5000));
				return taskId;
			}
		});
	}

	for(int i = 0 ;i < 10 ;i ++){
		System.out.println(completionService.take().get());
	}
}

3、Lock & Condition 接口

从Java5开始,Java提供了一种功能更强大的线程同步机制——通过显示定义同步锁对象来实现同步。

①Lock

Lock比传统线程模型中的synchronized方式更加面向对象,与生活中的锁类似,锁本身也应该是一个对象。多个线程执行的代码片段要实现同步互斥的效果,它们必须用同一个Lock对象

使用Lock对象改写JAVA多线程与并发库(一)3.1节中的Outter对象,代码如下(使用finally块保证释放锁):

class Outter{
	Lock lock = new ReentrantLock();
	public void print(String s){
		lock.lock();
		try{
			for(int i = 0;i < s.length();i ++){
				System.out.print(s.charAt(i));
			}
		}finally{
			lock.unlock();
		}

		System.out.println();
	}
}

② ReadWriteLock

读写锁:分为读锁和写锁,多个读锁不互斥,读锁与写锁互斥,这是由jvm自己控制的,你只要上好相应的锁即可。如果你的代码只读数据,可以很多人同时读,但不能同时写,那就上读锁;如果你的代码修改数据,只能有一个人在写,且不能同时读取,那就上写锁。总之,读的时候上读锁,写的时候上写锁!

Demo1:下面的代码模拟使用读写锁对共享数据封装

class SharedData{
	private Object data = null;//共享数据,只能有一个线程能写该数据,但可以有多个线程同时读该数据。
	private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

	public void get(){
		rwl.readLock().lock();
		try {
			System.out.println(Thread.currentThread().getName() + " be ready to read data!");
			Thread.sleep((long)(Math.random()*1000));
			System.out.println(Thread.currentThread().getName() + " have read data :" + data);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}finally{
			rwl.readLock().unlock();
		}
	}

	public void put(Object data){
		rwl.writeLock().lock();
		try {
			System.out.println(Thread.currentThread().getName() + " be ready to write data!");
			Thread.sleep((long)(Math.random()*1000));
			this.data = data;
			System.out.println(Thread.currentThread().getName() + " have write data: " + data);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}finally{
			rwl.writeLock().unlock();
		}
	}
}

Demo2:下面的代码模拟实现了一个缓存系统

public class CacheSimulation {
	private Map<String , Object> cacheData = new HashMap<String , Object>();
	ReadWriteLock rwl = new ReentrantReadWriteLock();

	public Object get(String key){
		Object value = null;
		rwl.readLock().lock();
		try{
			value = cacheData.get(key);

			if(value == null){
				rwl.readLock().unlock();
				rwl.writeLock().lock();
				try{
					if(value == null){
						value = "aaa";//如果内存中没找到,则要从数据库读取数据,此处用"aaa"模拟
					}
					cacheData.put(key, value);
				}finally{
					rwl.writeLock().unlock();
				}
				rwl.readLock().lock();
			}

		}finally{
			rwl.readLock().unlock();
		}
		return value;
	}
}

③ Condition

当使用Lock对象来保证互斥时,系统中不存在隐式的同步监视器。为此,Java提供了一个Condition接口来保持协调,Condition的功能类似在传统线程技术中的Object.wait()和notify()。下面使用Lock和Condition改写JAVA多线程与并发库(一)3.2节中的Business类:

class Business{
	private Lock lock = new ReentrantLock();
	private Condition condition = lock.newCondition();
	boolean subFlag = true;

	public void sub(int i){
		lock.lock();
		try{
			while(!subFlag){
				try {
					condition.await();
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
			for(int j = 1;j <= 10;j++){
				System.out.println("sub Thread sequence of " + j + "loop of " + i);
			}
			subFlag = false;
			condition.signal();
		}finally{
			lock.unlock();
		}
	}

	public void main(int i){
		lock.lock();
		try {
			while(subFlag){
				try {
					condition.await();
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
			for(int j = 1;j <= 100;j++){
				System.out.println("main Thread sequence of " + j + "loop of " + i);
			}
			subFlag = true;
			condition.signal();
		} finally{
			lock.unlock();
		}
	}
}

高新技术(九) JAVA多线程与并发库(二)》有 2 条评论

  1. 你同桌 说:

    宇哥,大数据学的咋样了,你先在从事那方面

留下一个回复

你的email不会被公开。