`

java.util.concurrent.ExecutorService与Executors例子的简单剖析

    博客分类:
  • java
阅读更多

java.util.concurrent.ExecutorService与Executors例子的简单剖析

 

对于多线程有了一点了解之后,那么来看看java.lang.concurrent包下面的一些东西。在此之前,我们运行一个线程都是显式调用了Thread的start()方法。我们用concurrent下面的类来实现一下线程的运行,而且这将成为以后常用的方法或者实现思路。 

 

        看一个简单的例子: 

 

Java代码  

public class CacheThreadPool {  

    public static void main(String[] args) {  

        ExecutorService exec=Executors.newCachedThreadPool();  

        for(int i=0;i<5;i++)  

            exec.execute(new LiftOff());  

        exec.shutdown();//并不是终止线程的运行,而是禁止在这个Executor中添加新的任务  

    }  

}  

 

        这个例子其实很容易看懂,ExecutorService中有一个execute方法,这个方法的参数是Runnable类型。也就是说,将一个实现了Runnable类型的类的实例作为参数传入execute方法并执行,那么线程就相应的执行了。 

 

        一、ExecutorService 

        先看看ExecutorService,这是一个接口,简单的列一下这个接口: 

 

Java代码  

public interface ExecutorService extends Executor {  

  

    void shutdown();  

  

    List<Runnable> shutdownNow();  

  

    boolean isShutdown();  

  

    boolean isTerminated();  

  

    boolean awaitTermination(long timeout, TimeUnit unit)  

  

    <T> Future<T> submit(Callable<T> task);  

  

    <T> Future<T> submit(Runnable task, T result);  

  

    Future<?> submit(Runnable task);  

  

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)  

  

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)  

  

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)  

  

    <T> T invokeAny(Collection<? extends Callable<T>> tasks,  

                    long timeout, TimeUnit unit)  

}  

 

        ExecuteService继承了Executor,Executor也是一个接口,里面只有一个方法: 

 

Java代码  

void execute(Runnable command)  

 

 

        二、Executors 

        Executors是一个类,直接援引JDK文档的说明来说一下这个类的作用: 

       

 

        Factory and utility methods for Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, and Callable classes defined in this package. This class supports the following kinds of methods: 

 

 

       

Methods that create and return an ExecutorService set up with commonly useful configuration settings.

       

Methods that create and return a ScheduledExecutorService set up with commonly useful configuration settings.

       

Methods that create and return a "wrapped" ExecutorService, that disables reconfiguration by making implementation-specific methods inaccessible.

       

Methods that create and return a ThreadFactory that sets newly created threads to a known state.

       

Methods that create and return a Callable out of other closure-like forms, so they can be used in execution methods requiring Callable.

       

 

        在上面的例子中,我们用到了newCachedThreadPool()方法。看一下这个方法: 

 

Java代码  

public static ExecutorService newCachedThreadPool() {  

        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  

                                      60L, TimeUnit.SECONDS,  

                                      new SynchronousQueue<Runnable>());  

    }  

 

        在源码中我们可以知道两点,1、这个方法返回类型是ExecutorService;2、此方法返回值实际是另一个类的实例。看一下这个类的信息: 

 

Java代码  

public class ThreadPoolExecutor extends AbstractExecutorService {  

    ..........  

    private final BlockingQueue<Runnable> workQueue;//这个变量在下面会提到  

    ..........  

}  

 

        ThreadPoolExecutor继承了AbstractExecutorService,而AbstractExecutorService又实现了ExecutorService接口。所以,根据多态,ThreadPoolExecutor可以看作是ExecutorService类型。 

 

        线程执行的最关键的一步是执行了executor方法,根据java的动态绑定,实际执行的是ThreadPoolExecutor所实现的executor方法。看看源码: 

 

Java代码  

public class ThreadPoolExecutor extends AbstractExecutorService {  

    ..........  

    public void execute(Runnable command) {  

        if (command == null)  

            throw new NullPointerException();  

        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {  

            if (runState == RUNNING && workQueue.offer(command)) {  

                if (runState != RUNNING || poolSize == 0)  

                    ensureQueuedTaskHandled(command);  

            }  

            else if (!addIfUnderMaximumPoolSize(command))  

                reject(command); // is shutdown or saturated  

        }  

    }  

    ..........  

}  

 

        根据程序正常执行的路线来看,这个方法中比较重要的两个地方分别是: 

        1、workQueue.offer(command) 

        workQueue在上面提到过,是BlockingQueue<Runnable>类型的变量,这条语句就是将Runnable类型的实例加入到队列中。 

        2、ensureQueuedTaskHandled(command) 

        这个是线程执行的关键语句。看看它的源码: 

 

Java代码  

public class ThreadPoolExecutor extends AbstractExecutorService {  

    ..........  

    private void ensureQueuedTaskHandled(Runnable command) {  

        final ReentrantLock mainLock = this.mainLock;  

        mainLock.lock();  

        boolean reject = false;  

        Thread t = null;  

        try {  

            int state = runState;  

            if (state != RUNNING && workQueue.remove(command))  

                reject = true;  

            else if (state < STOP &&  

                     poolSize < Math.max(corePoolSize, 1) &&  

                     !workQueue.isEmpty())  

                t = addThread(null);  

        } finally {  

            mainLock.unlock();  

        }  

        if (reject)  

            reject(command);  

        else if (t != null)  

            t.start();  

    }  

    ..........  

}  

 

        在这里我们就可以看到最终执行了t.start()方法来运行线程。在这之前的重点是t=addThread(null)方法,看看addThread方法的源码: 

 

Java代码  

public class ThreadPoolExecutor extends AbstractExecutorService {  

    ..........  

    private Thread addThread(Runnable firstTask) {  

        Worker w = new Worker(firstTask);  

        Thread t = threadFactory.newThread(w);  

        if (t != null) {  

            w.thread = t;  

            workers.add(w);  

            int nt = ++poolSize;  

            if (nt > largestPoolSize)  

                largestPoolSize = nt;  

        }  

        return t;  

    }  

    ..........  

}  

 

        这里两个重点,很明显: 

        1、Worker w = new Worker(firstTask) 

        2、Thread t = threadFactory.newThread(w) 

        先看Worker是个什么结构: 

 

Java代码  

public class ThreadPoolExecutor extends AbstractExecutorService {  

    ..........  

    private final class Worker implements Runnable {  

        ..........  

        Worker(Runnable firstTask) {  

            this.firstTask = firstTask;  

        }  

  

        private Runnable firstTask;  

        ..........  

  

        public void run() {  

            try {  

                Runnable task = firstTask;  

                firstTask = null;  

                while (task != null || (task = getTask()) != null) {  

                    runTask(task);  

                    task = null;  

                }  

            } finally {  

                workerDone(this);  

            }  

        }  

    }  

  

    Runnable getTask() {  

        for (;;) {  

            try {  

                int state = runState;  

                if (state > SHUTDOWN)  

                    return null;  

                Runnable r;  

                if (state == SHUTDOWN)  // Help drain queue  

                    r = workQueue.poll();  

                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)  

                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);  

                else  

                    r = workQueue.take();  

                if (r != null)  

                    return r;  

                if (workerCanExit()) {  

                    if (runState >= SHUTDOWN) // Wake up others  

                        interruptIdleWorkers();  

                    return null;  

                }  

                // Else retry  

            } catch (InterruptedException ie) {  

                // On interruption, re-check runState  

            }  

        }  

    }  

    }  

    ..........  

}  

 

        Worker是一个内部类。根据之前可以知道,传入addThread的参数是null,也就是说Work中firstTask为null。

        在看看newThread是一个什么方法: 

 

Java代码  

public class Executors {  

    ..........  

    static class DefaultThreadFactory implements ThreadFactory {  

        ..........  

        public Thread newThread(Runnable r) {  

            Thread t = new Thread(group, r,  

                                  namePrefix + threadNumber.getAndIncrement(),  

                                  0);  

            if (t.isDaemon())  

                t.setDaemon(false);  

            if (t.getPriority() != Thread.NORM_PRIORITY)  

                t.setPriority(Thread.NORM_PRIORITY);  

            return t;  

        }  

        ..........  

    }  

    ..........  

}  

 

        通过源码可以得知threadFactory的实际类型是DefaultThreadFactory,而DefaultThreadFactory是Executors的一个嵌套内部类。 

 

        之前我们提到了t.start()这个方法执行了线程。那么现在从头顺一下,看看到底是执行了谁的run方法。首先知道,t=addThread(null),而addThread内部执行了下面三步,Worker w = new Worker(null);Thread t = threadFactory.newThread(w);return t;这里两个t是一致的。 

        从这里可以看出,t.start()实际上执行的是Worker内部的run方法。run()内部会在if条件里面使用“短路”:判断firstTask是否为null,若不是null则直接执行firstTask的run方法;如果是null,则调用getTask()方法来获取Runnable类型实例。从哪里获取呢?workQueue!在execute方法中,执行ensureQueuedTaskHandled(command)之前就已经把Runnable类型实例放入到workQueue中了,所以这里可以从workQueue中获取到。

分享到:
评论

相关推荐

    Server100 代码

    import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.io.*; class Server{ static ServerSocket ss=null; static Socket s=null; static List l=new ArrayList();...

    Executor,Executors,ExecutorService比较.docx

    Executors: 是java.util.concurrent包下的一个类,提供了若干个静态方法,用于生成不同类型的线程池。Executors一共可以创建下面这四类线程池: 1.newFixedThreadPool创建一个可缓存线程池,如果线程池长度超过...

    thread count

    import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CountDownLatchDemo { private static final int PLAYER_AMOUNT = 5; public CountDownLatchDemo() { ...

    Java的Future使用方法

    首先,Future是一个接口,该接口用来返回异步的结果。 ... import java.util.ArrayList... import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future

    redis使用watch秒杀抢购实现思路

    import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import redis.clients.jedis.Jedis; /** * redis测试抢购 * * @author 10255_000 * */ public class Redis

    [Java]多线程:共享资源同步——不认真看你会后悔的

    共享资源同步 在进行多线程开发时最令人头痛的...import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /* * 这是一个生成器的抽象类 */ abstract class Generator { private b

    android实现定时拍照并发送微博功能

    最近在做android方面的开发,下面是android自动对焦并拍照的小例子: ... import java.io.File; ...import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import

    Java线程池以及Lambda表达式简单总结

    1. 线程池的使用 public static ExecutorService newFixedThreadPool(int nThreads); 得到一个线程对象,初始化参数是要求的当前...import java.util.concurrent.ExecutorService; import java.util.concurrent.Execut

    Android图片上传下载小框架

    import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class SmartImageView extends ImageView { private static final int LOADING_THREADS = 4; private static ...

    java socket线程池

    import java.util.concurrent.*; public class EchoServer { private int port=8000;   private ServerSocket serverSocket;   private ExecutorService executorService; //线程池   ...

Global site tag (gtag.js) - Google Analytics