《深入Java虚拟机》之对象内存布局及使用

Q1. 对象什么时候创建

当虚拟机遇到new关键字时,首先去检查这个指令的参数是否能在常量池中定位到一个类的符号引用,并且检查这个符号引用代表的类是否被加载。如果没有执行相应的加载过程,这部分内容可以看类的加载流程。

Q2. 对象在哪里创建

对象要创建时必须要为新对象分配内存空间,即在Java堆中划分出一块等大小的内存区域。然而并不是所有的对象都在堆上分配,存在一种叫做 栈上分配 的技术,先判断待创建对象是否发生逃逸,如果没有就将对象打散成不可分割的变量,分配在栈上。该技术通过 逃逸分析标量替换 得以实现。

逃逸分析、标量替换相关文章

Q3. 怎么分配空间

根据内存区域是否完整、连续,将Java堆划分为两种情况:

  • 指针碰撞:该情况是指Java堆中内存是绝对规整的,所有用过的内存都放在一边,空闲的内存放在另一边,中间放着一个指针作为分界点的指示器,当分配内存时,就将指针向空闲内存空间移动一段和内存大小一样的距离即可
  • 空闲列表:如果Java堆中的内存是不规整的,已使用的内存和空闲内存相互交错,虚拟机就必须维护一个列表,记录上哪些内存是可用的,在分配时从列表中找到一块足够大的内存空间划分给对象实例

选择哪种方式存放内存是由 java堆是否规整 决定的,而Java堆是否规整又是由 所采用的垃圾收集器是否带有压缩整理功能 决定的。因此,在使用Serial、ParNew等带Compact过程的收集器时,系统采用的分配算法是指针碰撞,而使用CMS这种基于Mark-Sweep算法的收集器时,通常采用空闲列表。

可能不同代的内存,分配方式可能不一样。

Q4. 对象创建时指针的同步问题

在并发情况中,可能正在为A分配内存空间,指针没来得及移动;对象B又同时使用了原来的指针来分配内存。虚拟机处理这种情况的方法也有两种:

  • 对分配内存空间的情况进行同步——采用CAS + 失败重试 的方式保证更新操作的原子性
  • 另一种是把内存分配的动作按照线程划分在不同的空间中,即每个线程在Java堆中预先分配一块内存,称为 本地线程分配缓存(TLAB) 。线程分配的对象放置在各自的TLAB中,等TLAB的空间用完后要分配新的内存空间时,才需要重新同步锁定。用 -XX:+/-UseTLAB 开启或关闭TLAB

Q5. 内存分配完毕后做什么

  1. 初始化内存空间
  2. 对象头的设置
  3. 调用初始化方法

内存分配完后的第一步就是先将分配到的内存空间初始化为默认值;如果使用TLAB,这一工作过程也可以 提前至TLAB分配时进行

对象头的设置包括元数据、对象的锁标志位、对象的分代年龄等等,而根据虚拟机的运行状态不同,还会有不同的设置

最后执行方法(构造函数),将所有字段按程序员的意愿进行初始化,此时,这个对象才算是真正可用

Q6.对象在内存里有什么

对象内存布局示意图

  • 对象头
    • 存储对象自身运行时的数据
    • 类型指针
    • 若对象为数组,还需保存数组的长度
  • 实例数据
  • 对齐填充

Q7. 对象头

对象头通常包含两个部分(特殊情况有三部分),第一个部分 用于保存对象运行时的数据,该部分又称 MarkWord,保存着如哈希码、GC分代年龄、锁状态标志、线程持有的锁等等信息,这部分在32位虚拟机里占32位,64位虚拟机里占64位(不开启指针压缩)。MarkWord被设计为不定长的数据结构,它会根据运行状态复用自己的存储空间(是未锁定、轻量级锁定等等)。

关于对象头锁标志位的过程可以看这里:《Java并发编程的艺术》之synchronized的底层实现原理

第二个部分 是类型指针,虚拟机通过这个指针确定 这个对象是哪个类的实例。在32位虚拟机里占32位,64位虚拟机里占64位(不开启指针压缩)。若开启指针压缩,该部分在x64位机子上只占32位,即4字节。

第三个部分 是当对象为数组时才存在的,主要用于记录数组的长度。占用一个int的大小,即32位

所以在x64机子上,普通对象头占用128位(16byte),数组对象头占用160位(20byte)。

Q8. 实例数据

实例数据就是对象里面存放的那些属性占用的大小。

基本类型占用字节数表格
类型 | 占用字节 |

  • | :-: |
    boolean | 1 |
    byte | 1 |
    short | 2 |
    char | 2 |
    int | 4 |
    float | 4 |
    double | 8 |
    long | 8 |

引用类型在64位机子上占用8个字节,在32位机子上占用4个字节

比如下面这个例子:

1
2
3
4
5
6
7
8
9
public static void main( String[] args ) throws IllegalAccessException {
A a = new A();
// SizeOfObject会在后面放出使用教程,主要涉及到java agent
System.out.println(SizeOfObject.fullSizeOf(a));
}
static class A{
int a;
double b;
}

当A对象被实例化后(未开启指针压缩),对象头占用16字节(8字节 + 8字节 + 0字节),实例数据占用12字节(int 4字节,double 8字节),内存补齐占用4字节(对象头和实例数据占用28字节,无法被8整出,所以需要补齐4字节),所以最终A对象会占用32个字节。

未开启指针压缩

当开启指针压缩时,对象头里的类型指针只会占用4字节,所以对象头和实例数据只会占用24个字节,由于能正好被8整出,所以内存补齐只占用0字节,所以最终A对象只会占用24个字节。

开启指针压缩

Q9. 对齐填充

因为HotSpot VM的自动内存管理系统要求对象起始地址必须是8字节的整数倍,换句话说,就是对象的大小必须是8字节的整数倍,对象头部分正好是8字节的整数倍,因此当实例数据没有对齐时,就需要对齐填充。对齐填充是提高效率的一种方法,并且有些CPU如果不内存对齐,程序会直接崩溃;

内存对齐的目的

Q10. 创建完一个对象后,如何访问对象实例

使用对象的方法是,通过栈上的reference数据来操作堆上的具体对象。由于Java虚拟机规范中只规定了Reference类型是一个指向对象的引用,并没有定义这个引用通过何种方式去定位。目前主流的方式有两种:

  • 使用句柄,该方案会在Java堆中划分一块区域作为句柄池,reference中存储的就是对象的句柄地址,而句柄中包含了对象实例数据与类型数据各自的具体信息
    句柄引用示意图
  • 直接指针,reference存储的地址直接是对象地址,对象实例数据里会包含数据类型信息
    直接引用示意图

句柄引用的优势是移动对象实例后(比如GC情况下),reference变量不需要改变指向的地址。
直接引用的优势是速度快,节省了一次定位的开销。积少成多后,提高的速度也是很客观的

关于虚拟机选择何种引用方式的拓展

《深入Java虚拟机》之虚拟机栈

Java内存区域图
Java内存区域

注意:深底色为共享内存区域

先讲解虚拟机栈

虚拟机栈

虚拟机是线程私有的,它的生命周期与线程相同,每个方法在执行的时候都会创建一个栈帧用于存储局部变量表、操作数栈、动态链接、方法出口等信息。每一个方法从调用到执行完成的过程,就对应着一个栈帧在虚拟机栈中入栈到出栈的过程

局部变量表

用于存放方法参数和方法内部定义的局部变量。在编译后,局部变量表的大小就确定了,具体见代码和对应的字节码:

1
2
3
4
// 对应的java代码
public static void main(String[] args){
int a = 1000;
}


其中locals=2 即代表本地局部变量有两个

局部变量表的容量以变量槽(Slot)为最小单位,虚拟机规范表示 boolean、byte、char、short、int、float、reference或returnAddress类型的数据都应该占用1个Slot。注意,只要保证slot实现的效果和32位虚拟机中的一致即可,比如,在64位虚拟机中,使用了64位物理内存空间去实现一个Slot,虚拟机上仍要使用对齐和补白让Slot看起来和32位虚拟机中的一样。

其中介绍一下reference 这个数据类型:

  • 此引用应该直接或间接地查找到对象在Java堆中的数据存放的起始地址索引
  • 此引用应该直接或间接地查找到对象所属数据类型在方法区中地存储类型信息

对于64位的数据类型(double、long),虚拟机会以高位对齐的方式为其分配两个连续的Slot空间。注意,在局部变量表里,无论是否分两次读32位数据,都不会引起数据安全问题。

局部变量表的索引值范围是从0开始直最大的Slot数量,如果是 非static 方法,下标为0的位置默认是this引用。因为方法局部变量表在编译时就确定了,所以调用方法传入数据时,其实是传入了一个引用(待确定。

同时为了尽可能节省 栈帧空间,局部变量表中的Slot是可以重用的,方法体中定义的变量,其作用域不一定会覆盖整个方法,如果该变量超出作用范围,那个变量的slot就会交给其他变量使用。这个规则也会带来一定副作用,因为超出范围后,局部变量表不会主动断开和引用的联系,就导致某个对象一直被局部变量表持有着,可以参考下面的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* 共享变量 */
final int K = 1024;
final int M = 1024 * K;

/* 情况一,回收失败 */
{
byte[] bytes = new byte[10 * M]
}
System.gc();

/* 情况二,回收成功 */
{
byte[] bytes = new byte[10 * M];
}
int a = 0;
System.gc();

《Practical Java》里面讲到“把不使用的对象设置为null” 是有一定道理的,但是不推荐这样做,理由如下:

  • 代码不整洁,应该以恰当的变量作用域来控制回收才是最优雅的
  • JIT会堆null赋值进行优化

但是遇到这样的极端条件,还是可以一试的:对象占用内存大,此方法执行时间长,方法调用次数达不到JIT优化的的编译条件

操作数栈

就是一个处理运算的中转站,如果有做过逆波兰序数的应该可以理解,将操作数压入栈,然后遇到操作符时,弹出两个操作数进行运算。

动态连接

每个栈帧都包含一个指向运行时常量池中该栈帧所属方法的引用,持有这个引用是为了支持方法调用过程中的动态连接。在类加载阶段或者第一次使用就转换为引用,这种转换被称为静态解析。另一部分要等到每一次运行期间转换为直接引用,这部分称为动态连接。

方法调用不等同于方法执行,方法调用唯一的作用就是确定被调用方法的版本(调用哪个方法),不涉及具体运行过程。Class文件的编译过程不包含传统的连接步骤,一切方法调用在Class文件里都是符号引用,而不是方法在实际运行时内存布局中的入口地址(直接引用)。

在把符号引用转换成直接引用的过程有两种:

  1. 解析,调用的方法版本一定是在编译期就确定下来的,比如静态方法、私有方法、实例构造器、父类方法四种
  2. 分派,可能是静态也可能是动态,主要是重载和重写

因为调用什么方法签名的方法在编译期就已经决定了,如下所示

1
2
3
4
// java代码
Super aSuper = new Sub();
A a = new B();
aSuper.test(a);

上述代码的字节码

字节码#6的引用点

第11行已经决定了调用Super.test(A)方法,此时称该方法为 Resolved Method,但是具体调用哪个实例的test(A),需要取决以下的规则

假设C是一个对象的类,决定哪个方法被调用的规则如下所示:

  1. 如果C声明了一个方法m并且覆盖了Resolved Method,那么方法m就会被调用
  2. 否则,如果C有一个父类,那么搜索实现了 Resolved Method 的方法的直接父类,并向上递归查找。
  3. 抛出异常

综上所是,截至JDK7,java是静态多分派,动态单分派(多分派指需要参考多个元素):方法重载要考虑调用者的类型,考虑方法的参数,这些都是在编译期就固定在字节码里面了。而动态分配要到运行时,才会去查找具体的实例。

方法返回地址

  • 正常返回,将返回值传递给上层的方法调用者
  • 异常完成出口,在方法执行过程中遇到了异常,并且这个异常没有在方法内得到处理

无论采用哪种方式退出,都需要返回到方法被调用的位置,程序才能继续执行,方法返回时可能需要在栈帧中保存一些信息,用来帮助恢复它的上层方法的执行状态。一般来说,方法正常退出时,调用者的PC计数器的值可以作为返回地址;方法异常退出时,返回地址是要通过异常处理器表来确定的

方法退出的过程实际上就等同于把栈帧出栈,因此退出时可能执行的操作有:恢复上层方法的局部变量表和操作数栈,把返回值(如果有的话)压入调用者栈帧的操作数栈中,调整PC计数器的值来指向方法调用指令后的一条指令。

队列之PriorityQueue

由于该优先级队列是实现定时等任务的基础,所以想彻底理解后续的DelayQueue及ScheduledThreadPoolExecutor就要先学习优先级队列。

该队列是更具堆排序的划分优先级的队列,默认情况下优先级是使用按照自然顺序排序的比较器(即从小到大),当然你也可以传入一个比较器,让队列按照比较器排序。因为会用到比较器,所以每个元素都需要实现Comparable接口。

注意:该队列不是线程安全的

PriorityQueue有以下几个构造方法:

  • PriorityQueue(): 创建一个可以存放11个元素的,因为没有比较器,传入的元素必须要实现Comparable(像int、double这类可以自动装箱的元素不必实现,它们在强转的时候不会出问题)
  • PriorityQueue(Collection<? extends E>): 根据给定的集合创建一个优先级队列,每个元素都必须实现Comparable
  • PriorityQueue(PriorityQueue<? extends E> c)
  • PriorityQueue(SortedSet<? extends E> c)
  • PriorityQueue(int initialCapacity): 根据给定的初始容器大小,创建一个按自然数排序的优先级队列
  • PriorityQueue(int initialCapacity, Comparator<? super E> comparator): 根据给定的初始容器大小和比较器,创建一个优先级队列,优先级队列会按照比较器进行排序
  • PriorityQueue(Comparator<? super E> comparator): 该方法从1.8之后才有

属性介绍

变量名称 变量类型 作用
queue Object[] 二叉树。每一次插入或移除都会调整树的结构
size int 元素个数。
comparator Comparator<? super E> 比较器。决定队列是从小到大还是从大到小
modCount int 修改次数。每次增删操作都会使modCount递增

offer()

在执行offer()方法前的队列状态,此时 元素个数size为7, queue.length为8

插入前的初始状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
modCount++;
int i = size;
// 如果队列包含元素的数量大于等于数组的长度
if (i >= queue.length)
// 扩充数组长度
grow(i + 1);
// 递增总元素数量
size = i + 1;
if (i == 0)
queue[0] = e;
else
// 关键点
siftUp(i, e);
return true;
}

执行offer()时,

< queue.length``` 所以不扩容,只是对size进行递增。此时, **元素个数size为8**, **queue.length**为8。状态图如下所示
1
2
3
4
5
6
7
8
9
10
11

![插入时的状态1](https://blog-1252749790.file.myqcloud.com/collections/PriorityQueue_status1.png)

offer()方法前面都在做一些准备,比如检查元素是否为null、检查数组长度。真正执行插入的地方在siftUp()。siftUp()就是将插入的元素调整到大于等于其祖先元素,说的术语点就是堆排序。这里会判断是否存在比较器,如果创建PriorityQueue没有传入Comparator时,则进入else,通过元素的Comparable判断;否则进入if语句
```java
private void siftUp(int k, E x) {
if (comparator != null)
siftUpUsingComparator(k, x);
else
siftUpComparable(k, x);
}

假设我们没传Comparator,进入else语句:

1
2
3
4
5
6
7
8
9
10
11
12
13
private void siftUpComparable(int k, E x) {
// 必须要让元素显式实现Comparable,如果是int、double这类可以自动装箱的元素,倒是不需要显式实现
Comparable<? super E> key = (Comparable<? super E>) x;
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = queue[parent];
if (key.compareTo((E) e) >= 0)
break;
queue[k] = e;
k = parent;
}
queue[k] = key;
}

k为新元素的下标,x为新元素的值。由于新元素尚未插入,其中的元素个数实际上还是只有k - 1个,故要执行

- 1```。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

1. (k - 1) / 2 获取 **祖先元素下标 parent** ,parent是**新元素 x** 的祖先元素的下标。
2. 将祖先(该新元素的parent)和新元素进行比较
3. 如果新元素和parent比较后返回0或负数,就将**新元素所在位置**设置为parent值;反之就直接插入(这里还是要具体看比较器,通常是小到大的排序,下文也是这样)
4. 最后将**新元素的下标 k**改成**parent所在位置 parent**,即新元素值要保存在parent所在位置
5. 继续拿**新元素下标 k** 比较,重复1-3的步骤

对不明白为什么要这么做的同学,请移步这里[排序算法之堆排序]()

当`siftUpComarable`执行完后,状态图如下所示(没有体现排序的过程)
![](https://blog-1252749790.file.myqcloud.com/collections/PriorityQueue_status2.png)


## poll()
![poll前的初始状态](https://blog-1252749790.file.myqcloud.com/collections/PriorityQueue_poll_status1.png)

```java
public E poll() {
if (size == 0)
return null;
int s = --size;
modCount++;
E result = (E) queue[0];
E x = (E) queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
return result;
}

poll()方法和offer()有异曲同工之妙,其区别主要在于它出队后,是通过将最后一个元素放在首结点,然后 下沉 调整堆结构的。

将最后一个结点移到顶点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void siftDownComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>)x;
int half = size >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
Object c = queue[child];
int right = child + 1;
if (right < size &&
((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
c = queue[child = right];
if (key.compareTo((E) c) <= 0)
break;
queue[k] = c;
k = child;
}
queue[k] = key;
}

(k << 1) + 1 获取到首结点的左子结点,child + 1 获取到右子结点。当k < half时,(k >>> 1) + 1 <= size - 1,所以只需要判断right < size 即可知道数组是否越界。此时状态图如下所示

比较左子结点和右子节点,如果左大于右,将右下标设置给左下标,右值设置给左变量c

注意:这里只将右结点的下标设置到左结点的下标,数组里的值还没有动,然后将c设置为right的值,并没有修改数组

状态图如下所示

此时比较新值x 和 变量c(即min(leftValue, rightValue)),如果新值x最小,直接退出循环。反之,queue[k] = c; k = child

到这一步,前三个结点已经挑出最大值,看图可以得知数值“111”还没有设置进去,这是为什么呢?因为如果下标为2的结点还有子结点,还要和他们比较,最终才能确定一个最合适的位置。

当数值“111”确定下了位置queue[k] = key;将111设置到k的位置。

总结

到这一步,PriorityQueue的难点基本上没有了。让人头大的堆排序还是要归结于数据结构学的不够精,不过话说回来这段代码写的很妙,和看AQS一样的感觉,很精细,每个局部变量最大程度上被利用起来,缺一不可。

《Java并发编程的艺术》之线程池(二)

线程池核心Executor

Java的线程既是工作单元,也是执行机制。从JDK5开始,把工作单元与执行机制分离开来。工作单元包括Runnable、Callable,而执行机制由Executor提供。

Executor框架结构和流程

主要由以下三部分组成:

  • 任务:Runnable、Callable接口
  • 任务的执行:Executor,以及继承Executor的ExecutorService
  • 异步计算的结果:Future接口、实现Future接口的FutureTask类

各个部分的执行流程:

  • 主线程创建任务(实现Runnable或Callable接口的任务对象)。工具类Executors可以把一个Runnable对象封装成Callable对象。
  • 把Runnable对象直接交给ExecutorSerivce执行,或者也可以把Runnable对象或Callable对象提交给ExecutorService执行。
  • submit()会返回一个实现Future接口的对象。主线程可以执行FutureTask.get()方法来等待任务执行完成。主线程也可以通过FutureTask.cancel()方法取消任务执行。

Executor框架成员

Executors可以创建以下四个ThreadPoolExecutor特殊应用的类:

  • ThreadPoolExecutor
  • FixedThreadPool,固定线程数量的线程池,多余的任务会加入无界的阻塞队列中
  • SingleThreadPool,只有单个线程的线程池,多余的任务会加入无界的阻塞队列中
  • CachedThreadPool,没有上限的线程池,使用的SynchronousQueue队列

Executors可以创建以下两个ScheduledThreadPoolExecutor特殊应用的类:

  • ScheduledThreadPoolExecutor
  • SingleThreadScheduledExecutor

下面将对Executor的后两个部分进行分解讲解

ThreadPoolExecutor下的特殊应用(任务执行 部分)

FixedThreadPool

FixedThreadPool的创建方法,FixedThreadPool其实就是ThreadPoolExecutors的特殊用法:

1
2
3
4
5
6
// 只能通过Executors.newFixedThreadPool()创建
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

主要需要注意的是

1
2
3
4
5
6
7
8
9
10
11
12
13
* 执行中的线程数达到corePoolSize,新任务会在无界队列中等待,所以线程数不会超过corePoolSize
* 因为不会创建更多的线程,所以maximumPoolSize和keepAliveTime参数就没什么作用了
* 因为是无界队列,所以也不需要用到RejectPolicyHandler

### SingleThreadExecutor
该类也是ThreadPoolExecutor的特殊用法
```java
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

相比FixedThreadPool,除一个正在执行任务的线程,其他任务都会一一加入队列中等待执行。

CachedThreadPool

该类比较特殊,是FixedThreadPool的无限版本:

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

一手

1
2
3
4
5
6
7
8
9
10
11
12

### ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor虽然不是对ThreadPoolExecutor的参数进行调整,但是对其执行流程进行了一个调整。该类主要用来在给定的延时之后执行任务或定期执行任务。ScheduledThreadPoolExecutor和Timer类似,但是更强大。Timer在单线程下执行,如果前一个任务执行时间过长,会影响下一个任务的执行。

```java
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 继承ThreadPoolExecutor
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}

因为是无界队列,所以maximumPoolSize,aliveKeepTime和workQueue三个参数没什么意义(根据流程判断用不到它们)

ScheduledThreadPoolExecutor的执行主要分成两个部分:

  • 当调用ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWithFixedDelay()方法时,会向DelayQueue添加一个实现了RunnableScheduledFuture接口的ScheduledFutureTask
  • 线程池会从DelayQueue中获取ScheduledFutureTask,然后执行任务

ScheduledThreadPoolExecutor在ThreadPoolExecutor的基础上做了以下修改:

  • 使用DelayQueue作为任务队列
  • 获取任务的方式不同
  • 执行任务时会做一些修改

接下来就围绕着上面三点不同点,研究ScheduledThreadPoolExecutor

DelayQueue作为任务队列

具体的内容看这篇

DelayQueue封装了一个PriorityQueue,这个PriorityQueue会对队列中的ScheduledFutureTask进行排序。time小的排在前面(时间早的任务先被执行)。如果两个task的taim相同就比较sequenceNumber,sequenceNumber小的排在前面。

获取任务的方式

线程池总从DelayQueue中获取流程:

  1. 线程1从DelayQueue中获取已到期的ScheduledFutureTask
  2. 线程1执行该task
  3. 线程1修改该task的time变量为下一次执行的时间
  4. 线程1把这个修改time后的task放回DelayQueue中

FutureTask(异步结果 部分)

该类除了Future接口外还实现了Runnable接口。因此可以将FutureTask提交给线程池使用。该任务如果有多个线程尝试去执行,最终只会有一个线程可以执行,其他线程只能等待该任务执行完毕才能继续执行。

FutureTask可以处于以下三种状态:

  • 未启动。在run()方法还没执行之前,FutureTask处于未启动状态
  • 已启动。在run()方法执行过程中,FutureTask处于已启动状态
  • 已完成。在run()方法执行完后正常结束、被取消、抛出异常。

状态迁移示例图
当FutureTask处于未启动或已启动状态,调用get()方法,会让调用线程进入阻塞状态;当FutureTask处于已完成状态,阻塞的get()方法会立即返回

执行示意图
不同状态下调用get/cancel方法,会有不同的影响

总结

无论是ScheduledThreadPoolExecutor还是FixedThreadExecutor等,都是ThreadPoolExecutor的特殊用法,只要把ThreadPoolExecutor搞懂,那么就不需要担心其他的特殊应用。

《Java并发编程的艺术》之线程池(一)

线程池的优点:

  • 降低资源开销:每次任务到达时不需要重新创建和销毁
  • 提高可管理性:统一对线程进行调优、监控
  • 提高任务响应速度:不需要等待线程创建

线程池创建的各个参数:

  • corePoolSize: 核心线程数。在核心线程数未满时,也优先创建满核心线程数(就算有空余的核心线程)
  • maximumPoolSize: 最大线程数。当workQueue满的时候,创建新线程来执行新任务
  • keepAliveTime:存活时间。非核心线程可空闲的最大时间
  • workQueue: 任务阻塞队列。当提交新任务且核心线程数已经被使用完时,新任务会先加入任务队列

线程池中线程创建流程:

  1. 提交任务时,先判断核心线程数是否已经创建完毕,如果没有,则先将核心线程数创建满;反之则进入下个阶段
  2. 判断核心线程数是否已经全部被占用,如果没有则交给核心线程执行;反之进入下个阶段
  3. 判断工作队列是否已满,如果没满加入队列等待;反之进入下个阶段
  4. 判断当前线程数是否小于MaxThread,如果是,则创建新线程执行任务;反之调用RejectPolicyHandler执行对应的策略

线程池的创建

我们可以通过ThreadPoolExecutor创建一个线程池

1
2
3
4
5
6
7
ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)

1) corePoolSize: 当提交一个任务时(即使其他空闲核心线程也能够执行新任务),线程池会创建一个新线程来执行任务,等到corePoolSize个线程被创建了,就不再创建了。如果要额外创建,就要看后面的参数。如果调用了线程的

方法,线程池会提前创建并启动所有核心线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

2) maximumPoolSize: 线程池允许创建的最大线程数。如果队列满了,并且已经创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。如果使用了无界队列,该参数没什么效果

3) keepAliveTime: 线程池的工作线程空闲后,保持存活的时间。如果每个线程执行时间较短,可以将该值调大,提高线程利用率

4) unit: 时间单位

5) workQueue: 用于保存等待执行的任务的阻塞队列,可以选择以下几个阻塞队列:

* ArrayBlockingQueue: 是一个基于数组结构的有界阻塞队列,此队列按FIFO原则队元素进行排序
* LinkedBlockingQueue: 一个基于链表结构的阻塞,此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
* SynchronousQueue: 一个不存储元素的队列。每个插入操作要必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列
* PriorityQueue: 一个具有优先级的无限阻塞队列

6) handler: RejectPolicy默认有四种:
* AbortPolicy,直接抛出异常
* DiscardPolicy,丢弃该任务,放弃处理
* CallerRunsPolicy,只用调用者线程执行任务
* DiscardOldestPolicy,丢弃队列里的队首元素

## 执行任务
线程池提交任务有两个方法向线程池提交任务,分别为execute()和submit():

* execute: 执行后没有返回值,无法判断任务是否被线程池执行成功
* submit: 提交后会返回一个Future类型的对象,通过该Future对象可以判断执行是否成功。其中get方法可以阻塞当前线程直到任务完成,get(timeout, unit)方法可以阻塞当前线程一段时间后返回

## 关闭线程
关闭线程池有两个方法,分别为```shutdown()``` 和 ```shutdownNow()

共同点:
遍历线程池中的所有线程,然后逐个调用线程的

1
2
3
4
5
6
7
8

不同点:
* shutdownNow首先将线程池的状态设置为STOP状态,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表
* shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程

当任意一个关闭方法被调用时,isShutdown()方法就会返回true。当所有的任务关闭后,调用isTerminated()才会返回true。

具体要调用```shutdown()```还是```shutdownNow()```,需要根据具体业务来判断(通常调用```shutdown()```)。如果任务不一定要执行完,可以用```shutdownNow()

合理配置

  • 任务的性质:CPU密集型任务、IO密集型任务、混合型任务
  • 任务的优先级:高、中、低
  • 任务的执行时间:长、中、短
  • 任务的依赖性:是否依赖其他系统资源,如 数据库连接

如何挑选:

  • CPU密集型任务需要长时间用到CPU运算,所以线程数量最好配置: Ncpu + 1
  • IO密集型任务并不是一直占用CPU,则尽可能配置多的线程: 2 * Ncpu
  • 如果是混合型任务,尝试将其拆分为CPU密集型任务和IO密集型任务
  • 依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越长,则CPU空闲时间越长,那么线程数应该设置的越大,这样才能更好的利用CPU。

注意:建议使用有界队列,因为如果线程池里的工作线程全部阻塞,任务挤压在线程池里,最终会导致整个系统不可用

监控要素

  • taskCount: 线程池需要执行的任务数量
  • completedTaskCount: 线程池在运行过程中已完成的任务数量,小于等于taskCount
  • targetPoolSize: 线程池里曾将创建过的最大线程数。通过这个数据可以直到线程池是否曾经瞒过。
  • getPoolSize: 线程池的线程数量。如果线程池不销毁,线程池里的线程不会自动销毁,所以该值只增不减
  • getActiveCount: 获取活动的线程数

通过扩展线程池进行监控。可以通过继承线程池来自定义线程池,重写线程池的beforeExecute(), afterExecute(), terminated() 方法,也可以在任务执行前、后执行一些代码来监控。

总结

该篇主要了解了线程池的使用已经各个元素的含义以及监控时的优化。

《Java并发编程的艺术》之同步工具

CountDownLatch

该类主要实现了:让一个线程等待其他线程完成后再执行的功能,好比

1
2
3
4
5
6

该类的初始化需要一个整数值count,当每次调用```CountDownLatch.countDown()```时Count会递减。直到count降到0时,所有执行```CountDownLatch.await()```的方法都会返回。

初始化了一个共享变量latch,并赋予count为3
```java
CountDownLatch latch = new CountDownLatch(3);

创建一个任务,睡眠1秒假装执行任务,最后执行countDown

1
2
3
4
5
6
@Override
public void run() throw InterruptedException{
System.out.println("执行任务...");
Thread.sleep(1000);
latch.countDown();
}

主线程里执行如下方法

1
2
3
4
// 调用3个线程执行上述的任务
...
latch.await();
System.out.println("执行结束")

当三个任务线程全部执行完latch.countDown()时,main线程就会从阻塞的await()中返回,最后输出”执行结束”。
注意:CountDownLatch 只能使用一次,下一次使用要重新创建一个。

CylicBarrier

该类和CountDownLatch有点类似,不过从名字就可以看出它是一个可循环使用 的类。它的功能主要是等待所有线程达到某个位置,然后统一执行。可以想象成出发旅游,大家都先到集合地等待,待所有人都到了,就可以出发了。

创建一个屏障

1
CyclicBarrier barrier = new CyclicBarrier(4);

任务类,让先完成的任务进行等待,等待其他线程到达

1
2
3
4
5
6
@Override
public void run() throw InterruptedException{
System.out.println(Thread.currentThread.getName() + " -> 到达集合点");
barrier.await();
System.out.println(Thread.currentThread.getName() + "出发!")
}

睡5秒,不让主线程过早结束

1
2
3
4
// 创建4个线程执行上述的任务
...
Thread.sleep(5000);
System.out.println("执行结束")

注意,如果barrier在等待过程中某个线程被中断了一次,那么整个barrier就需要重新来过。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Thread thread = new Thread(() -> {
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
});
thread.start();
thread.interrupt();
try{
barrier.await();
}catch (Exception e){
System.out.println("无法等待...");
}

当另起的线程被中断后,后续的barrier就无法使用了,会抛出

1
2
3

## Semaphore
该类被称作信号量,用于控制同一时间的线程执行数。想象下面一副场景:


🚌 🚌 🚌 🚌↘—————
🚌 🚌 🚌 🚌🚌 🚌🚌

🚌 🚌 🚌↗—————

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

在窄路口里每次只能通过5辆车,5辆车通过后,后5辆车才能通过。Semaphore的作用就和它很类似,当有20条线程在执行IO密集型的任务,执行完后需要将处理结果存储到数据库中。如果数据库连接只有10条,那就要用semaphore去控制拿到数据库连接的线程数量。

Semaphore里没用过的方法:
* hasQueuedThreads():是否有等待的线程
* getQueueLength():返回正在等待获取许可证的线程数量

## Exchanger
该类是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个通不见,两个线程可以交换彼此间的数据。这两个线程通过exchange方法交换数据:如果第一个线程执行exchange(),它会一直等待第二个线程exchange(),当两个线程达到同步点,这两个线程就可以交换线程。
虽然我不知道这个类有啥作用(ε=ε=ε=┏(゜ロ゜;)┛
但是书上说可以用于遗传算法、校对工作:

```java
Exchanger<String> exchanger = new Exchanger<>();

new Thread(() -> {
String dataOfAThread = Thread.currentThread().getName() + "-----A的数据";
try {
String resultOfBThread = exchanger.exchange(dataOfAThread);
System.out.println(Thread.currentThread().getName() + "-------> " + resultOfBThread);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

new Thread(() -> {
String dataOfBThread = Thread.currentThread().getName() + "-----B的数据";
try {
String resultOfAThread = exchanger.exchange(dataOfBThread);
System.out.println(Thread.currentThread().getName() + "-------> " + resultOfAThread);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

最后输出:

1
2
Thread-1-------> Thread-0-----A的数据
Thread-0-------> Thread-1-----B的数据

总结

重新熟悉一下同步工具,学习到了CyclicBarrier被中断一次后,整个作废的点;学习到了Exchanger的适用场景

《Java并发编程的艺术》之AtomicX

其中主要是了解下AtomicReference以及AtomicXUpdater

AtomicReference是支持对象引用原子更新的类,仅仅是支持引用,如果要让对象内的字段支持原子更新,就一定要使用到AtomicXUpdater。

字段更新类需要特别注意,字段必须是public volatile 类型的。

AtomicStampedReference和AtomicMarkableReference均是用于解决ABA问题的类(后者不知道有没有,暂时没实践经验)。前者解决字段方面的更新,后者解决引用类型的更新。

《Java并发编程的艺术》之阻塞队列

阻塞队列是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法:

1) 支持阻塞的插入方法:当队列满时,队列会阻塞执行插入的线程
2) 支持阻塞的移除方法:当队列空时,队列会阻塞执行移除的线程

方法总结:

方法/处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e, time, unit)
移除方法 poll() take() poll(time, unit)
检测方法 element() peek()
  • 抛出异常:当队列满时,插入元素会抛出
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    * 返回特殊值:offer()是入队方法,当插入成功时返回true,插入失败返回false;poll()是出队方法,当出队成功时返回元素的值,队列为空时返回null
    * 一直阻塞:当队列满时,阻塞执行插入方法的线程;当队列空时,阻塞执行出队方法的线程
    * 超时退出:顾名思义

    在JDK7里,Java的阻塞队列总共有七个,每个类都继承```AbstractQueue```:

    * ArrayBlockingQueue
    * LinkedTransferQueue
    * SynchronousQueue
    * DelayQueue
    * LinkedBlockingDeque
    * LinkedBlockingQueue
    * PriorityBlockingQueue

    ## ArrayBlockingQueue
    该类是通过数组实现的有界阻塞队列,此队列按照FIFO的原则对元素进行排列。默认情况下不保证线程公平的访问队列,所谓公平访问是指阻塞的线程,可以按照阻塞的先后顺序访问队列,即先阻塞的先访问。公平访问是由ReentrantLock的公平锁实现的:
    ```java
    public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
    throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull = lock.newCondition();
    }
  1. 因为直接创建的数组,ArrayBlockingQueue的长度无法更改,
  2. 当ReentrantLock设置为公平锁后,后续所有Lock竞争中都会是公平竞争,没看过Condition原理的小伙伴会想,那和Condition有什么关系呢?答案是Condition在被唤醒时也要参与竞争Lock,那么在Lock竞争中都会遵守公平竞争。

LinkedBlockingQueue

该类是一个用链表实现的有界阻塞队列(也不算有界,只是因为计数器最多只能计Integer.MAX_VALUE)。此队列的默认和最大长度为Integer.MAX_VALUE。按照先进先出的原则对元素进行排列。

PriorityBlockingQueue

该类是一个支持优先级的无界阻塞队列。默认情况下元素采取自然升序的排序方式。也可以自定义类实现compareTo()来指定元素排序规则,或者初始化时指定Comparator来对元素排序。
详见:坑还没填,迟点填

DelayQueue

支持延时获取元素的无界阻塞队列。队列使用

1
2
3
4
5
6
7
8
9
10
11
12
运用场景:
* 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素,表示缓存有效期到了
* 定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,比如TimerQueue

执行流程如下所示:

首先实现```Delayed```接口:
```java
@Override
public long getDelay(TimeUnit unit){
// 返回当前元素还需要等待多久,当返回<0的值时,表示可以出队;>0时,表示仍需要等待
}

第二步,每次offer()时,往

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

第三步,每次获取时(调用take(),才能有阻塞的功能),如果时间未满会先阻塞
```java
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}

具体的可以看: 了解之DelayQueue

SynchronousQueue

一个不存储元素的队列,生产者生产一个,消费者消费一个,每一个put操作都需要等待一个take操作。该队列主要负责生产者线程处理的数据传递给消费者线程。比较适合传递性场景。其吞吐量高于ArrayBlockingQueue和LinkedBlockingQueue

LinkedTransferQueue

该类主要添加了两个方法:transfer()tryTransfer()
transfer() 主要是当消费者线程在等待的时候,生产者一产生元素,就立刻交给消费者,不需要再进行额外的入队操作;如果没有消费者在等待,就先进入队尾。
tryTransfer() 用来试探生产者传入的元素是否能直接传给消费者。

具体的可以看:了解之LinkedTansferQueue

总结

  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • LinkedBlockingDeque
  • SynchronousQueue
    上面四个是较为简单的,数据结构和各个方法都较为简单就不一一阐述了。剩下三个则是比较大头的,而且比较新颖,打算单独拎出来讲解

该章主要是粗略的了解一下其他阻塞队列的功能,对于一些不常见的类,后续会补上详细的分析(其实是先补充一波数据结构和算法)