Producer and Consumer
生产者-消费者模型可以通过平衡生产者的生产能力和消费者的消费能力来提升整个系统的运行效率,同时实现解耦
生产者消费者-basic
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
class ProducerAndConsumerBasic {
private final int MAX_VALUE = 10; private Queue<String> queue = new LinkedList<>();
public synchronized void produce() { while(queue.size() == MAX_VALUE) { System.out.println("inability to produce"); try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.offer("product"); System.out.println(Thread.currentThread().getName() + " has producted a product"); notifyAll(); }
public synchronized void consume() { while(queue.size() == 0) { System.out.println("inability to consume"); try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.poll(); System.out.println(Thread.currentThread().getName() + " has consumed a product"); notifyAll(); }
public static void main(String[] args) { ProducerAndConsumerBasic pac = new ProducerAndConsumerBasic(); ExecutorService service = Executors.newCachedThreadPool(); for(int i = 0; i < 2; i++) { service.execute(()->{ while(true) pac.produce(); }); } for(int i = 0; i < 3; i++) { service.execute(()->{ while(true) pac.consume(); }); } service.shutdown(); }
}
|
生产者消费者-Condition
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;
public class ProducerAndConsumerCondition {
private int MAX_VALUE; private Queue<String> queue = new LinkedList<>(); private Lock lock = new ReentrantLock(); private Condition full = lock.newCondition(); private Condition empty = lock.newCondition();
public ProducerAndConsumerCondition(int MAX_VALUE) { this.MAX_VALUE = MAX_VALUE; }
public void produce() { lock.lock(); try { while(queue.size() == MAX_VALUE){ System.out.println("queue's size is " + queue.size() + ", inability to produce"); full.await(); } queue.offer("product"); System.out.println("producting " + queue.size()); empty.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }
public void consume() { lock.lock(); try { while(queue.size() == 0){ System.out.println("queue's size is " + queue.size() + ", inability to consume"); empty.await(); } queue.poll(); System.out.println("consuming " + queue.size()); full.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }
public static void main(String[] args) { ProducerAndConsumerCondition pac = new ProducerAndConsumerCondition(10); ExecutorService service = Executors.newCachedThreadPool(); for(int i = 0; i < 20; i++) { service.execute(()->{ pac.produce(); }); } for(int i = 0; i < 20; i++) { service.execute(()->{ pac.consume(); }); } service.shutdown(); } }
|
生产者消费者-BlockingQueue
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
class ProducerAndConsumerBlockingQueue2 {
private BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
public class Producer implements Runnable{
private String name;
public Producer(String name){ this.name = name; }
@Override public void run() { try { while(true){ int d = (int) (Math.random()*10000); System.out.println(this.name + " is producting " + d); queue.put(d); System.out.println(this.name + " has producted " + d); System.out.println("==============="); Thread.sleep(500); } } catch (InterruptedException e) { e.printStackTrace(); } }
}
public class Consumer implements Runnable{
private String name;
public Consumer(String name) { this.name = name; }
@Override public void run() { try { while(true) { System.out.println(this.name + " is consuming"); int d = queue.take(); System.out.println(this.name + " has consumed " + d); System.out.println("==============="); Thread.sleep(500); } } catch (InterruptedException e) { e.printStackTrace(); } } }
public static void main(String[] args) { ProducerAndConsumerBlockingQueue2 pc = new ProducerAndConsumerBlockingQueue2(); ExecutorService service = Executors.newCachedThreadPool(); service.execute(pc.new Producer("producer 1")); service.execute(pc.new Producer("producer 2")); service.execute(pc.new Consumer("consumer 1")); service.execute(pc.new Consumer("consumer 2")); service.execute(pc.new Consumer("consumer 3")); service.shutdown(); } }
|
生产者消费者-PipedStream
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; import java.util.concurrent.atomic.AtomicInteger;
class ProducerAndConsumerStream {
private PipedInputStream pis; private PipedOutputStream pos;
private AtomicInteger count = new AtomicInteger();
public ProducerAndConsumerStream(int size) { try { pis = new PipedInputStream(size); pos = new PipedOutputStream(); pis.connect(pos); } catch (IOException e) { e.printStackTrace(); } }
public void produce() { try { while(true){ pos.write(count.getAndIncrement()); pos.flush(); System.out.println(Thread.currentThread().getName() + " has produced a product " + count); } } catch (IOException e) { e.printStackTrace(); } finally { try { pis.close(); pos.close(); } catch (IOException e) { e.printStackTrace(); } } }
public void consume() { try { while(true) { pis.read(); count.getAndDecrement(); System.out.println(Thread.currentThread().getName() + " has consumed a product " + count); } } catch (IOException e) { e.printStackTrace(); } finally { try { pis.close(); pos.close(); } catch (IOException e) { e.printStackTrace(); } } }
public static void main(String[] args) { ProducerAndConsumerStream pac = new ProducerAndConsumerStream(10); new Thread(new Runnable(){
@Override public void run() { pac.consume(); } }).start();
new Thread(new Runnable(){
@Override public void run() { pac.produce(); } }).start(); } }
|
Reader and Writer
读写锁-读者写者问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| import java.util.Random; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
class ReaderAndWriter {
private Object data; private ReadWriteLock lock = new ReentrantReadWriteLock();
public void read() { lock.readLock().lock(); try { System.out.println(Thread.currentThread().getName() + " before read: " + this.data); Thread.sleep(500); System.out.println(Thread.currentThread().getName() + " after read: " + this.data); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.readLock().unlock(); } }
public void write(Object data) { lock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName() + " before write: " + this.data); Thread.sleep(1000); this.data = data; System.out.println(Thread.currentThread().getName() + " after write: " + this.data); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.writeLock().unlock(); } }
public static void main(String[] args) { ReaderAndWriter raw = new ReaderAndWriter(); for(int i = 0; i < 3; i++) { new Thread(new Runnable(){
@Override public void run() { while(true) raw.read(); } }).start();
new Thread(new Runnable(){
@Override public void run() { while(true) raw.write(new Random().nextInt(10000)); } }).start(); } } }
|
读写锁-缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
class Cache {
private Map<String, Object> map = Collections.synchronizedMap(new HashMap<>()); private ReadWriteLock lock = new ReentrantReadWriteLock();
public Object getData(String key) { lock.readLock().lock(); if(map.get(key) == null){ lock.readLock().unlock(); lock.writeLock().lock(); if(map.get(key) == null) { System.out.println(Thread.currentThread().getName() + " is writing to cache for " + key); map.put(key, "value " + System.currentTimeMillis()); System.out.println(Thread.currentThread().getName() + " has writed to cache for " + key); } lock.readLock().lock(); lock.writeLock().unlock(); } Object value = map.get(key); lock.readLock().unlock(); return value; }
public static void main(String[] args) { Cache cache = new Cache(); for(int i = 0; i < 3; i++) { new Thread(){
@Override public void run() { Object value = cache.getData("key one"); System.out.println(Thread.currentThread().getName() + " gets key one's cache: " + value); }
}.start(); }
for(int i = 0; i < 3; i++) { new Thread(){
@Override public void run() { Object value = cache.getData("key two"); System.out.println(Thread.currentThread().getName() + " gets key two's cache: " + value); }
}.start(); } } }
|
Dead block
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| public class DeadBock { public static void main(String[] args) { new Thread(new MyRunnable(true)).start(); new Thread(new MyRunnable(false)).start();; } }
class MyRunnable implements Runnable { boolean tag;
MyRunnable(boolean tag){ this.tag = tag; }
@Override public void run() { if(tag){ while(true) { synchronized(MyBock.lock_a){ System.out.println(Thread.currentThread().getName() + " : lock_a"); synchronized(MyBock.lock_b){ System.out.println(Thread.currentThread().getName() + " : lock_b"); } } } }else{ while(true) { synchronized(MyBock.lock_b){ System.out.println(Thread.currentThread().getName() + " : lock_b"); synchronized(MyBock.lock_a){ System.out.println(Thread.currentThread().getName() + " : lock_a"); } } } } } }
class MyBock { public static final Object lock_a = new Object(); public static final Object lock_b = new Object(); }
|
CyclicBarrier
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
class CyclicBarrierDemo {
public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); CyclicBarrier cb = new CyclicBarrier(3);
for(int i = 0; i < 3; i++) { service.execute(()->{ try { System.out.println(Thread.currentThread().getName() + " is coming, " + " one"); Thread.sleep(1000); cb.await(); System.out.println(Thread.currentThread().getName() + " is coming, " + " two"); Thread.sleep(1000); cb.await(); System.out.println(Thread.currentThread().getName() + " is coming, " + " three"); Thread.sleep(1000); cb.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }); } service.shutdown(); } }
|