本站消息

站长简介/公众号

  出租广告位,需要合作请联系站长


+关注
已关注

分类  

暂无分类

标签  

暂无标签

日期归档  

2024-11(1)

手写简易线程池-----java(可直接运行,解释清晰)

发布于2021-03-13 14:19     阅读(467)     评论(0)     点赞(18)     收藏(5)


## 可直接运行已测试

1.先定义一个工作类(需要被运行的工作)

public class Job implements Runnable {
    private String name;
    public String getName() {
        return name;
    }
    public Job(String name) {
        this.name = name;
    }
    public void setName(String name) {
        this.name = name;
    }
    @Override
    public void run() {
        System.out.println("这个是Job"+name);
    }
}

2.创建一个自己的线程池(分段介绍,完整代码在后面)

2.1属性定义

// 线程池最大限制数
    private static final int MAX_WORKER_NUMBERS = 10;
    // 线程池默认的数量
    private static final int DEFAULT_WORKER_NUMBERS = 5;
    // 线程池最小的数量
    private static final int MIN_WORKER_NUMBERS = 1;
    // 这是一个工作列表,将会向里面插入工作
    private final LinkedList<Job> jobs = new LinkedList<Job>();
    // 工作者列表
    private final List<Worker> workers = Collections.synchronizedList(new
            ArrayList<Worker>());
    // 工作者线程的数量
    private int workerNum = DEFAULT_WORKER_NUMBERS;
    // 线程编号生成
    private AtomicLong threadNum = new AtomicLong();

这里面定义了一些属性,包括最大的线程数,默认线程数,最小线程数,以及定义了工作队列等等。

2.2构造方法

 public DefaultThreadPool() throws InterruptedException {
        initializeWokers(DEFAULT_WORKER_NUMBERS);
    }
    public DefaultThreadPool(int num) throws InterruptedException {
        workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : (Math.max(num, MIN_WORKER_NUMBERS));
        initializeWokers(workerNum);
    }

这是两个构造方法,如果给定创建的线程数就按照参数创建,没有就创建默认个线程数。然后执行初始化。

2.3worker(内部类)工作者线程(就是线程池产生的都是工作线程)

// 工作者,负责消费任务
class Worker implements Runnable {
    // 是否工作
    private volatile boolean running = true;
    private String name;

    public Worker(String name) {
        this.name = name;
    }
    public void run() {
        while (running) {
            Job job = null;
            synchronized (jobs) {
// 如果工作者列表是空的,那么就wait
                while (jobs.isEmpty()) {
                    try {
                        System.out.println("worker"+name+"现在空了");
                        jobs.wait();
                    } catch (InterruptedException ex) {
// 感知到外部对WorkerThread的中断操作,返回
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
// 取出一个Job
                job = jobs.removeFirst();
                if (job != null) {
                    try {
                        System.out.print("worker"+name);
                        job.run();
                    } catch (Exception ex) {
// 忽略Job执行中的Exception
                    }
                }
            }
        }
    }
    public void shutdown() {
        running = false;
    }
}

每次去jobs取job都会给全部jobs加锁,如果获取不到就等待,获取到了就消费一个工作,之后释放锁。定义了关闭工作线程的方法。

2.4线程初始化

// 初始化线程工作者
    private void initializeWokers(int num) throws InterruptedException {
        for (int i = 0; i < num; i++) {
            Worker worker = new Worker(i+"");
            workers.add(worker);
            Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.
                    incrementAndGet());
            System.out.println(thread.getName());
            thread.sleep(200);
            thread.start();
        }
    }

2.5增加减少工作线程(worker)以及其他方法

 public void shutdown() {
        for (Worker worker : workers) {
            worker.shutdown();
        }
    }
    public void addWorkers(int num) throws InterruptedException {
        synchronized (jobs) {
// 限制新增的Worker数量不能超过最大值
            if (num + this.workerNum > MAX_WORKER_NUMBERS) {
                num = MAX_WORKER_NUMBERS - this.workerNum;
            }
            initializeWokers(num);
            this.workerNum += num;
}
    }
    public void removeWorker(int num) {
        synchronized (jobs) {
            if (num >= this.workerNum) {
                throw new IllegalArgumentException("beyond workNum");
            }
// 按照给定的数量停止Worker
            int count = 0;
            while (count < num) {
                Worker worker = workers.get(count);
                if (workers.remove(worker)) {
                    worker.shutdown();
                    count++;
                }
            }
            this.workerNum -= count;
        }
    }
    public int getJobSize() {
        return jobs.size();
    }

这部分没什么难度。

2.6给工作列表增加工作并通知等待在工作列表上的一个线程

public void execute(Job job) {
        if (job != null) {
// 添加一个工作,然后进行通知
            synchronized (jobs) {
                jobs.addLast(job);
                jobs.notify();
            }
        }
    }

2.7此部分完整代码

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {
    // 线程池最大限制数
    private static final int MAX_WORKER_NUMBERS = 10;
    // 线程池默认的数量
    private static final int DEFAULT_WORKER_NUMBERS = 5;
    // 线程池最小的数量
    private static final int MIN_WORKER_NUMBERS = 1;
    // 这是一个工作列表,将会向里面插入工作
    private final LinkedList<Job> jobs = new LinkedList<Job>();
    // 工作者列表
    private final List<Worker> workers = Collections.synchronizedList(new
            ArrayList<Worker>());
    // 工作者线程的数量
    private int workerNum = DEFAULT_WORKER_NUMBERS;
    // 线程编号生成
    private AtomicLong threadNum = new AtomicLong();
    public DefaultThreadPool() throws InterruptedException {
        initializeWokers(DEFAULT_WORKER_NUMBERS);
    }
    public DefaultThreadPool(int num) throws InterruptedException {
        workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : (Math.max(num, MIN_WORKER_NUMBERS));
        initializeWokers(workerNum);
    }
    public void execute(Job job) {
        if (job != null) {
// 添加一个工作,然后进行通知
            synchronized (jobs) {
                jobs.addLast(job);
                jobs.notify();
            }
        }
    }
    public void shutdown() {
        for (Worker worker : workers) {
            worker.shutdown();
        }
    }
    public void addWorkers(int num) throws InterruptedException {
        synchronized (jobs) {
// 限制新增的Worker数量不能超过最大值
            if (num + this.workerNum > MAX_WORKER_NUMBERS) {
                num = MAX_WORKER_NUMBERS - this.workerNum;
            }
            initializeWokers(num);
            this.workerNum += num;
}
    }
    public void removeWorker(int num) {
        synchronized (jobs) {
            if (num >= this.workerNum) {
                throw new IllegalArgumentException("beyond workNum");
            }
// 按照给定的数量停止Worker
            int count = 0;
            while (count < num) {
                Worker worker = workers.get(count);
                if (workers.remove(worker)) {
                    worker.shutdown();
                    count++;
                }
            }
            this.workerNum -= count;
        }
    }
    public int getJobSize() {
        return jobs.size();
    }
    // 初始化线程工作者
    private void initializeWokers(int num) throws InterruptedException {
        for (int i = 0; i < num; i++) {
            Worker worker = new Worker(i+"");
            workers.add(worker);
            Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.
                    incrementAndGet());
            System.out.println(thread.getName());
            thread.sleep(200);
            thread.start();
        }
    }
// 工作者,负责消费任务
class Worker implements Runnable {
    // 是否工作
    private volatile boolean running = true;
    private String name;

    public Worker(String name) {
        this.name = name;
    }
    public void run() {
        while (running) {
            Job job = null;
            synchronized (jobs) {
// 如果工作者列表是空的,那么就wait
                while (jobs.isEmpty()) {
                    try {
                        System.out.println("worker"+name+"现在空了");
                        jobs.wait();
                    } catch (InterruptedException ex) {
// 感知到外部对WorkerThread的中断操作,返回
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
// 取出一个Job
                job = jobs.removeFirst();
                if (job != null) {
                    try {
                        System.out.print("worker"+name);
                        job.run();
                    } catch (Exception ex) {
// 忽略Job执行中的Exception
                    }
                }
            }

        }
    }
    public void shutdown() {
        running = false;
    }
}
}

3.测试代码和结果

public class Test {
    public static void main(String[] args) throws InterruptedException {
        DefaultThreadPool<Runnable> pool = new DefaultThreadPool<>(2);
        for (int i = 0; i < 10; i++) {
            Job job = new Job(""+i);
            pool.execute(job);
        }
    }
}

在这里插入图片描述
结语:这只是简单的线程池功能,有很多不足的地方,敬请指正。

原文链接:https://blog.csdn.net/mmxgl/article/details/114704064



所属网站分类: 技术文章 > 博客

作者:天天在家

链接:http://www.javaheidong.com/blog/article/114226/682685ad07b2bc25cc60/

来源:java黑洞网

任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任

18 0
收藏该文
已收藏

评论内容:(最多支持255个字符)