0%

Java 并发基础

bing

Producer and Consumer

生产者-消费者模型可以通过平衡生产者的生产能力和消费者的消费能力来提升整个系统的运行效率,同时实现解耦

生产者消费者-basic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class ProducerAndConsumerBasic {

private final int MAX_VALUE = 10; // 仓库最大容量
private Queue<String> queue = new LinkedList<>(); // 仓库

public synchronized void produce() { // 生产
while(queue.size() == MAX_VALUE) {
System.out.println("inability to produce");
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.offer("product");
System.out.println(Thread.currentThread().getName() + " has producted a product");
notifyAll();
}

public synchronized void consume() { // 消费
while(queue.size() == 0) {
System.out.println("inability to consume");
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.poll();
System.out.println(Thread.currentThread().getName() + " has consumed a product");
notifyAll();
}

public static void main(String[] args) {
ProducerAndConsumerBasic pac = new ProducerAndConsumerBasic();
ExecutorService service = Executors.newCachedThreadPool();
for(int i = 0; i < 2; i++) {
service.execute(()->{
while(true)
pac.produce();
});
}
for(int i = 0; i < 3; i++) {
service.execute(()->{
while(true)
pac.consume();
});
}
service.shutdown();
}

}

生产者消费者-Condition

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ProducerAndConsumerCondition {

private int MAX_VALUE; // 最大生产量
private Queue<String> queue = new LinkedList<>(); // 产品队列
private Lock lock = new ReentrantLock(); // 锁
private Condition full = lock.newCondition(); // 队列满的时候等待
private Condition empty = lock.newCondition(); // 队列空的时候等待

public ProducerAndConsumerCondition(int MAX_VALUE) { // 初始化最大生产量
this.MAX_VALUE = MAX_VALUE;
}

public void produce() { // 生产
lock.lock();
try {
while(queue.size() == MAX_VALUE){
System.out.println("queue's size is " + queue.size() + ", inability to produce");
full.await();
}
queue.offer("product");
System.out.println("producting " + queue.size());
empty.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void consume() { // 消费
lock.lock();
try {
while(queue.size() == 0){
System.out.println("queue's size is " + queue.size() + ", inability to consume");
empty.await();
}
queue.poll();
System.out.println("consuming " + queue.size());
full.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public static void main(String[] args) {
ProducerAndConsumerCondition pac = new ProducerAndConsumerCondition(10);
ExecutorService service = Executors.newCachedThreadPool();
for(int i = 0; i < 20; i++) {
service.execute(()->{
pac.produce();
});
}
for(int i = 0; i < 20; i++) {
service.execute(()->{
pac.consume();
});
}
service.shutdown(); // 不写这句会一直等待
}
}

生产者消费者-BlockingQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class ProducerAndConsumerBlockingQueue2 {

private BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10); // 设置阻塞队列大小为10

public class Producer implements Runnable{

private String name;

public Producer(String name){
this.name = name;
}

@Override
public void run() {
try {
while(true){
int d = (int) (Math.random()*10000);
System.out.println(this.name + " is producting " + d);
queue.put(d);
System.out.println(this.name + " has producted " + d);
System.out.println("===============");
Thread.sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

public class Consumer implements Runnable{

private String name;

public Consumer(String name) {
this.name = name;
}

@Override
public void run() {
try {
while(true) {
System.out.println(this.name + " is consuming");
int d = queue.take();
System.out.println(this.name + " has consumed " + d);
System.out.println("===============");
Thread.sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {
ProducerAndConsumerBlockingQueue2 pc = new ProducerAndConsumerBlockingQueue2();
ExecutorService service = Executors.newCachedThreadPool();
service.execute(pc.new Producer("producer 1"));
service.execute(pc.new Producer("producer 2"));
service.execute(pc.new Consumer("consumer 1"));
service.execute(pc.new Consumer("consumer 2"));
service.execute(pc.new Consumer("consumer 3"));
service.shutdown();
}
}

生产者消费者-PipedStream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.concurrent.atomic.AtomicInteger;

class ProducerAndConsumerStream {

// 不要在一个线程中同时使用PipedInpuStream和PipedOutputStream,这会造成死锁
private PipedInputStream pis; // 输入流,读数据
private PipedOutputStream pos; // 输出流,写数据

private AtomicInteger count = new AtomicInteger();

public ProducerAndConsumerStream(int size) {
try {
pis = new PipedInputStream(size);
pos = new PipedOutputStream();
pis.connect(pos); // 只能调用一次
} catch (IOException e) {
e.printStackTrace();
}
}

public void produce() {
try {
while(true){
pos.write(count.getAndIncrement());
pos.flush();
System.out.println(Thread.currentThread().getName()
+ " has produced a product " + count);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
pis.close();
pos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

public void consume() {
try {
while(true) {
pis.read();
count.getAndDecrement();
System.out.println(Thread.currentThread().getName()
+ " has consumed a product " + count);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
pis.close();
pos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {
ProducerAndConsumerStream pac = new ProducerAndConsumerStream(10);
new Thread(new Runnable(){

@Override
public void run() {
pac.consume();
}
}).start();

new Thread(new Runnable(){

@Override
public void run() {
pac.produce();
}
}).start();
}
}

Reader and Writer

读写锁-读者写者问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import java.util.Random;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

class ReaderAndWriter {

private Object data; // 共享数据
private ReadWriteLock lock = new ReentrantReadWriteLock(); // 锁

public void read() { // 读
lock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + " before read: " + this.data);
Thread.sleep(500);
System.out.println(Thread.currentThread().getName() + " after read: " + this.data);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.readLock().unlock();
}
}

public void write(Object data) { // 写
lock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + " before write: " + this.data);
Thread.sleep(1000);
this.data = data;
System.out.println(Thread.currentThread().getName() + " after write: " + this.data);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.writeLock().unlock();
}
}

public static void main(String[] args) {
ReaderAndWriter raw = new ReaderAndWriter();
for(int i = 0; i < 3; i++) {
new Thread(new Runnable(){

@Override
public void run() {
while(true)
raw.read();
}
}).start();

new Thread(new Runnable(){

@Override
public void run() {
while(true)
raw.write(new Random().nextInt(10000));
}
}).start();
}
}
}

读写锁-缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

class Cache {

private Map<String, Object> map = Collections.synchronizedMap(new HashMap<>()); // 线程安全
private ReadWriteLock lock = new ReentrantReadWriteLock(); // 读写锁

public Object getData(String key) {
lock.readLock().lock(); // 读锁
if(map.get(key) == null){ // 如果没有缓存写数据
lock.readLock().unlock(); // 释放读锁
lock.writeLock().lock(); // 开启写锁
if(map.get(key) == null) { // 双重检查
System.out.println(Thread.currentThread().getName() + " is writing to cache for " + key);
map.put(key, "value " + System.currentTimeMillis()); // 写数据
System.out.println(Thread.currentThread().getName() + " has writed to cache for " + key);
}
lock.readLock().lock(); // 开启读锁
lock.writeLock().unlock(); // 释放写锁
}
Object value = map.get(key); // 读数据
lock.readLock().unlock(); // 释放写锁
return value; // 返回数据
}

public static void main(String[] args) {
Cache cache = new Cache();
for(int i = 0; i < 3; i++) {
new Thread(){

@Override
public void run() {
Object value = cache.getData("key one");
System.out.println(Thread.currentThread().getName() + " gets key one's cache: " + value);
}

}.start();
}

for(int i = 0; i < 3; i++) {
new Thread(){

@Override
public void run() {
Object value = cache.getData("key two");
System.out.println(Thread.currentThread().getName() + " gets key two's cache: " + value);
}

}.start();
}
}
}

Dead block

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class DeadBock {
public static void main(String[] args) {
new Thread(new MyRunnable(true)).start();
new Thread(new MyRunnable(false)).start();;
}
}

class MyRunnable implements Runnable {
boolean tag;

MyRunnable(boolean tag){
this.tag = tag;
}

@Override
public void run() {
if(tag){
while(true) {
synchronized(MyBock.lock_a){
System.out.println(Thread.currentThread().getName() + " : lock_a");
synchronized(MyBock.lock_b){
System.out.println(Thread.currentThread().getName() + " : lock_b");
}
}
}
}else{
while(true) {
synchronized(MyBock.lock_b){
System.out.println(Thread.currentThread().getName() + " : lock_b");
synchronized(MyBock.lock_a){
System.out.println(Thread.currentThread().getName() + " : lock_a");
}
}
}
}
}
}

class MyBock {
public static final Object lock_a = new Object();
public static final Object lock_b = new Object();
}

CyclicBarrier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* CyclicBarrier可以使不同的线程彼此等待,在不同的线程都执行完了后再执行下面的程序
*/
class CyclicBarrierDemo {

public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
CyclicBarrier cb = new CyclicBarrier(3); // 设置3个线程等待,都执行完再继续执行

for(int i = 0; i < 3; i++) {
service.execute(()->{
try {
System.out.println(Thread.currentThread().getName() + " is coming, "
+ " one");
Thread.sleep(1000);
cb.await();
System.out.println(Thread.currentThread().getName() + " is coming, "
+ " two");
Thread.sleep(1000);
cb.await();
System.out.println(Thread.currentThread().getName() + " is coming, "
+ " three");
Thread.sleep(1000);
cb.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
});
}
service.shutdown();
}
}