发布于2021-03-13 14:19 阅读(467) 评论(0) 点赞(18) 收藏(5)
## 可直接运行已测试
public class Job implements Runnable {
private String name;
public String getName() {
return name;
}
public Job(String name) {
this.name = name;
}
public void setName(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println("这个是Job"+name);
}
}
2.1属性定义
// 线程池最大限制数
private static final int MAX_WORKER_NUMBERS = 10;
// 线程池默认的数量
private static final int DEFAULT_WORKER_NUMBERS = 5;
// 线程池最小的数量
private static final int MIN_WORKER_NUMBERS = 1;
// 这是一个工作列表,将会向里面插入工作
private final LinkedList<Job> jobs = new LinkedList<Job>();
// 工作者列表
private final List<Worker> workers = Collections.synchronizedList(new
ArrayList<Worker>());
// 工作者线程的数量
private int workerNum = DEFAULT_WORKER_NUMBERS;
// 线程编号生成
private AtomicLong threadNum = new AtomicLong();
这里面定义了一些属性,包括最大的线程数,默认线程数,最小线程数,以及定义了工作队列等等。
2.2构造方法
public DefaultThreadPool() throws InterruptedException {
initializeWokers(DEFAULT_WORKER_NUMBERS);
}
public DefaultThreadPool(int num) throws InterruptedException {
workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : (Math.max(num, MIN_WORKER_NUMBERS));
initializeWokers(workerNum);
}
这是两个构造方法,如果给定创建的线程数就按照参数创建,没有就创建默认个线程数。然后执行初始化。
2.3worker(内部类)工作者线程(就是线程池产生的都是工作线程)
// 工作者,负责消费任务
class Worker implements Runnable {
// 是否工作
private volatile boolean running = true;
private String name;
public Worker(String name) {
this.name = name;
}
public void run() {
while (running) {
Job job = null;
synchronized (jobs) {
// 如果工作者列表是空的,那么就wait
while (jobs.isEmpty()) {
try {
System.out.println("worker"+name+"现在空了");
jobs.wait();
} catch (InterruptedException ex) {
// 感知到外部对WorkerThread的中断操作,返回
Thread.currentThread().interrupt();
return;
}
}
// 取出一个Job
job = jobs.removeFirst();
if (job != null) {
try {
System.out.print("worker"+name);
job.run();
} catch (Exception ex) {
// 忽略Job执行中的Exception
}
}
}
}
}
public void shutdown() {
running = false;
}
}
每次去jobs取job都会给全部jobs加锁,如果获取不到就等待,获取到了就消费一个工作,之后释放锁。定义了关闭工作线程的方法。
2.4线程初始化
// 初始化线程工作者
private void initializeWokers(int num) throws InterruptedException {
for (int i = 0; i < num; i++) {
Worker worker = new Worker(i+"");
workers.add(worker);
Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.
incrementAndGet());
System.out.println(thread.getName());
thread.sleep(200);
thread.start();
}
}
2.5增加减少工作线程(worker)以及其他方法
public void shutdown() {
for (Worker worker : workers) {
worker.shutdown();
}
}
public void addWorkers(int num) throws InterruptedException {
synchronized (jobs) {
// 限制新增的Worker数量不能超过最大值
if (num + this.workerNum > MAX_WORKER_NUMBERS) {
num = MAX_WORKER_NUMBERS - this.workerNum;
}
initializeWokers(num);
this.workerNum += num;
}
}
public void removeWorker(int num) {
synchronized (jobs) {
if (num >= this.workerNum) {
throw new IllegalArgumentException("beyond workNum");
}
// 按照给定的数量停止Worker
int count = 0;
while (count < num) {
Worker worker = workers.get(count);
if (workers.remove(worker)) {
worker.shutdown();
count++;
}
}
this.workerNum -= count;
}
}
public int getJobSize() {
return jobs.size();
}
这部分没什么难度。
2.6给工作列表增加工作并通知等待在工作列表上的一个线程
public void execute(Job job) {
if (job != null) {
// 添加一个工作,然后进行通知
synchronized (jobs) {
jobs.addLast(job);
jobs.notify();
}
}
}
2.7此部分完整代码
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {
// 线程池最大限制数
private static final int MAX_WORKER_NUMBERS = 10;
// 线程池默认的数量
private static final int DEFAULT_WORKER_NUMBERS = 5;
// 线程池最小的数量
private static final int MIN_WORKER_NUMBERS = 1;
// 这是一个工作列表,将会向里面插入工作
private final LinkedList<Job> jobs = new LinkedList<Job>();
// 工作者列表
private final List<Worker> workers = Collections.synchronizedList(new
ArrayList<Worker>());
// 工作者线程的数量
private int workerNum = DEFAULT_WORKER_NUMBERS;
// 线程编号生成
private AtomicLong threadNum = new AtomicLong();
public DefaultThreadPool() throws InterruptedException {
initializeWokers(DEFAULT_WORKER_NUMBERS);
}
public DefaultThreadPool(int num) throws InterruptedException {
workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : (Math.max(num, MIN_WORKER_NUMBERS));
initializeWokers(workerNum);
}
public void execute(Job job) {
if (job != null) {
// 添加一个工作,然后进行通知
synchronized (jobs) {
jobs.addLast(job);
jobs.notify();
}
}
}
public void shutdown() {
for (Worker worker : workers) {
worker.shutdown();
}
}
public void addWorkers(int num) throws InterruptedException {
synchronized (jobs) {
// 限制新增的Worker数量不能超过最大值
if (num + this.workerNum > MAX_WORKER_NUMBERS) {
num = MAX_WORKER_NUMBERS - this.workerNum;
}
initializeWokers(num);
this.workerNum += num;
}
}
public void removeWorker(int num) {
synchronized (jobs) {
if (num >= this.workerNum) {
throw new IllegalArgumentException("beyond workNum");
}
// 按照给定的数量停止Worker
int count = 0;
while (count < num) {
Worker worker = workers.get(count);
if (workers.remove(worker)) {
worker.shutdown();
count++;
}
}
this.workerNum -= count;
}
}
public int getJobSize() {
return jobs.size();
}
// 初始化线程工作者
private void initializeWokers(int num) throws InterruptedException {
for (int i = 0; i < num; i++) {
Worker worker = new Worker(i+"");
workers.add(worker);
Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.
incrementAndGet());
System.out.println(thread.getName());
thread.sleep(200);
thread.start();
}
}
// 工作者,负责消费任务
class Worker implements Runnable {
// 是否工作
private volatile boolean running = true;
private String name;
public Worker(String name) {
this.name = name;
}
public void run() {
while (running) {
Job job = null;
synchronized (jobs) {
// 如果工作者列表是空的,那么就wait
while (jobs.isEmpty()) {
try {
System.out.println("worker"+name+"现在空了");
jobs.wait();
} catch (InterruptedException ex) {
// 感知到外部对WorkerThread的中断操作,返回
Thread.currentThread().interrupt();
return;
}
}
// 取出一个Job
job = jobs.removeFirst();
if (job != null) {
try {
System.out.print("worker"+name);
job.run();
} catch (Exception ex) {
// 忽略Job执行中的Exception
}
}
}
}
}
public void shutdown() {
running = false;
}
}
}
public class Test {
public static void main(String[] args) throws InterruptedException {
DefaultThreadPool<Runnable> pool = new DefaultThreadPool<>(2);
for (int i = 0; i < 10; i++) {
Job job = new Job(""+i);
pool.execute(job);
}
}
}
结语:这只是简单的线程池功能,有很多不足的地方,敬请指正。
原文链接:https://blog.csdn.net/mmxgl/article/details/114704064
作者:天天在家
链接:http://www.javaheidong.com/blog/article/114226/682685ad07b2bc25cc60/
来源:java黑洞网
任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任
昵称:
评论内容:(最多支持255个字符)
---无人问津也好,技不如人也罢,你都要试着安静下来,去做自己该做的事,而不是让内心的烦躁、焦虑,坏掉你本来就不多的热情和定力
Copyright © 2018-2021 java黑洞网 All Rights Reserved 版权所有,并保留所有权利。京ICP备18063182号-2
投诉与举报,广告合作请联系vgs_info@163.com或QQ3083709327
免责声明:网站文章均由用户上传,仅供读者学习交流使用,禁止用做商业用途。若文章涉及色情,反动,侵权等违法信息,请向我们举报,一经核实我们会立即删除!