剖析tomcat线程池的源码,本文以源码来解析tomcat的线程池使用策略

查找来源

首先先在tomcat官网找到对应的tomcat线程池配置,具体定位在:Tomcat线程池 file 然后对其配置的默认参数进行解释: file

threadPriority :优先级,默认是Normal daemon :是否守护线程,默认是true namePrefix:线程名字:tomcat-exc-1+ maxThreads:最大线程:默认 200 minSpareThreads :最小在线线程:默认25 maxIdleTime :最大在线时间:默认60s(线程执行完成之后60s就会被shutdown) maxQueueSize:队列的最大值:Integer的最大值> Integer.MAX_VALUE prestartminSpareThreads :是否在启动的时候占用最小在线线程:默认 false(如果为true,即在启动tomcat的时候就会启动minSpareThreads个线程) threadRenewalDelay:重建线程池内的线程:默认值为1000(为了避免线程同时重建,每隔threadRenewalDelay(单位: ms )重建一个线程)

好了,看完这些参数之后,来进行分析tomcat启动线程的源码,这里以springboot内置的tomcat源码分析为主~

首先找到这个类,这个类即为启动的核心

org.apache.catalina.core.StandardThreadExecutor

file

分析源码

protected void startInternal() throws LifecycleException {
  //创建一个任务队列  容量为Int的最大值
	taskqueue = new TaskQueue(maxQueueSize);
	//创建线程factory  
	TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());
	//创建一个线程池    核心线程25个  最大200个  存活60s
	executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
   //设置重建线程时间  1s
   executor.setThreadRenewalDelay(threadRenewalDelay);
   //是否开启占用核心线程,默认关闭
	if (prestartminSpareThreads) {
	//如果开启  见下面的代码@1.1
		executor.prestartAllCoreThreads();
	}
	//@1.0 设置parent,关联线程池对象
	taskqueue.setParent(executor); 
	setState(LifecycleState.STARTING);
}
@1.1
public int prestartAllCoreThreads() {
	int n = 0;
	while (addWorker(null, true))
		++n;
	return n;
}

继续跟源码:

当有一个请求执行自然也是会执行execute方法 找到StandardThreadExecutor#execute

public void execute(Runnable command, long timeout, TimeUnit unit) {
	if (executor != null) {
		//执行线程操作
		executor.execute(command,timeout,unit);
	} else {
		throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted"));
	}
}

它这个是将jdk的execute二次封装了一下,我们不用管,继续跟源码…… file

public void execute(Runnable command, long timeout, TimeUnit unit) {
//这个字段是记录的线程提交的数量,如果线程执行完毕,这个字段会减1 等下会用到 
submittedCount.incrementAndGet();
try {
//执行jdk的线程池代码
super.execute(command);
} catch (RejectedExecutionException rx) {
//拒绝策略,发生的情况是线程超过最大值(maxThreads),并且队列也已经满了,也就是(Integer.MAX+maxThreads)
if (super.getQueue() instanceof TaskQueue) {
	final TaskQueue queue = (TaskQueue)super.getQueue();
	try {
	//这里其实就是讲这个线程(60s后)丢到队列里面,如果60s后队列还是满的,那就没办法了,抛异常……
		if (!queue.force(command, timeout, unit)) {
			submittedCount.decrementAndGet();
			throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
		}
	} catch (InterruptedException x) {
		submittedCount.decrementAndGet();
		throw new RejectedExecutionException(x);
	}
} else {
	submittedCount.decrementAndGet();
	throw rx;
} 
}
}
@1.2
/**
* The number of tasks submitted but not yet finished. This includes tasks
* in the queue and tasks that have been handed to a worker thread but the
* latter did not start executing the task yet. 
大概意思是任务提交了但是没有执行结束,统计的是队列里的任务和已经在执行但是还没有执行完的任务
*/
private final AtomicInteger submittedCount = new AtomicInteger(0);

分析线程池源码:

(其实前面已经分析过了:线程池原理分析

// ctl 中保存的线程池当前的一些状态信息
int c = ctl.get();

//  下面会涉及到 3 步 操作
// 1.首先判断当前线程池中之行的任务数量是否小于 corePoolSize
// 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
if (workerCountOf(c) < corePoolSize) {
	if (addWorker(command, true))
		return;
	c = ctl.get();
}
// 2.注意这里是重点!!!
// 如果当前之行的任务数量大于等于 corePoolSize 的时候就会走到这里
// 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态才会被并且队列可以加入任务,该任务才会被加入进去
//断点达到workQueue,发现workQueue对象是我们tomcat启动类的TaskQueue对象,那么我们源码打到taskQueue的offer方法
//@1.3 workQueue.offer(command)如果返回false 则会执行@1.4操作

if (isRunning(c) && workQueue.offer(command)) {
	int recheck = ctl.get();
	// 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
	if (!isRunning(recheck) && remove(command))
		reject(command);
   // 如果当前线程池为空就新创建一个线程并执行。 
	else if (workerCountOf(recheck) == 0)
		addWorker(null, false);
}
//3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
//如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
@1.4 
else if (!addWorker(command, false))
	reject(command);
}

file

public boolean offer(Runnable o) {
//we can't do any checks >>> 如果parent为null直接丢到队列里面 当然,parent肯定不会为null,本页面搜@1.0 
if (parent==null) return super.offer(o);
//we are maxed out on threads, simply queue the object>>>如果工作线程数量等于最大线程数量>>>丢到队列里面
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
//we have idle threads, just add it to the queue>>>如果队列里的任务加正在执行的任务(见@1.2) 小于等于正在执行的任务,丢到队列里面
if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
//if we have less threads than maximum force creation of a new thread >>> 如果正在执行的任务小于最大任务数量,返回false???
//这里看看,返回false是什么意思呢?见@1.3
if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
//if we reached here, we need to add it to the queue
return super.offer(o);
}

因此,这里我们就分析完了,原来他是这么搞的,再用一张流程图来解释一下线程池中的队列执行流程 file

声明:本图来源 https://mp.weixin.qq.com/s/n28nH8xL6dTlw_vCi0wI6w

拒绝策略: 具体自己写个demo(工作线程大于将队列的容量+最大线程数量,则会触发拒绝策略)

dubbo线程池的拒绝策略

最后:看一下dubbo的拒绝策略: 类:

org.apache.dubbo.common.threadpool.support.eager.EagerThreadPoolExecutor

file

跟源码…… 类名一样,还是TaskQueue,不过它是org.apache.dubbo.common.threadpool.support.eager.TaskQueue

public boolean offer(Runnable runnable) {
	if (executor == null) {
		throw new RejectedExecutionException("The task queue does not have executor!");
	}
//当前工作线程大小
	int currentPoolThreadSize = executor.getPoolSize();
	// have free worker. put task into queue to let the worker deal with task.>>已经提交的线程如果小于工作线程 丢到队列里面
	if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
		return super.offer(runnable);
	}

	// return false to let executor create new worker.>> 当前线程小于最大线程容量 可以进行创建线程并执行
	if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
		return false;
	}

	// currentPoolThreadSize >= max
	return super.offer(runnable);
}

然后再看一下它的拒绝策略,具体在下面那行代码

queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)

/**
* retry offer task
*
* @param o task
* @return offer success or not
* @throws RejectedExecutionException if executor is terminated.
*/
public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if (executor.isShutdown()) {
	throw new RejectedExecutionException("Executor is shutdown!");
}
//不就是立马丢到队列里面???感觉又包装了一层……可能它想的是公用性吧
return super.offer(o, timeout, unit);
}

file

//dubbo的线程池拒绝策略
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Thread pool is EXHAUSTED!" +
				" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
				" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
		threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
		e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
		url.getProtocol(), url.getIp(), url.getPort());
logger.warn(msg);
//dump栈信息
dumpJStack();

throw new RejectedExecutionException(msg);
}

private void dumpJStack() {
long now = System.currentTimeMillis();

//dump every 10 minutes
if (now - lastPrintTime < 10 * 60 * 1000) {
	return;
} 
if (!guard.tryAcquire()) {
	return;
}
//开启一个新的线程将jstack信息dump下来,并保存到服务器上
ExecutorService pool = Executors.newSingleThreadExecutor();
pool.execute(() -> {
	String dumpPath = url.getParameter(Constants.DUMP_DIRECTORY, System.getProperty("user.home")); 
	SimpleDateFormat sdf; 
	String os = System.getProperty("os.name").toLowerCase(); 
	// window system don't support ":" in file name
	if (os.contains("win")) {
		sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
	} else {
		sdf = new SimpleDateFormat("yyyy-MM-dd_HH:mm:ss");
	} 
	String dateStr = sdf.format(new Date());
	//try-with-resources
	//栈文件日志名为……
	try (FileOutputStream jStackStream = new FileOutputStream(new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr))) {
		JVMUtil.jstack(jStackStream);
	} catch (Throwable t) {
		logger.error("dump jStack error", t);
	} finally {
		guard.release();
	}
	lastPrintTime = System.currentTimeMillis();
});
//must shutdown thread pool ,if not will lead to OOM
//这里使用完线程池之后一定要shutdown,不然会oom
pool.shutdown(); 
}

这里的话tomcat线程池的源码和拒绝策略就解释完了

结尾

面试题:

tomcat线程池是怎样执行的?拒绝策略是如何?

看完这个源码后,balabala……其实我们也可以先把最大线程数用完,然后再让任务进入队列。通过自定义队列,重写其 offer 方法就可以实现。目前我知道的 Tomcat 和 Dubbo 都提供了这样策略的线程池,扯一堆。 面试官:好了,我们聊聊其他的吧!