public class BlockingQueue { private Queuequeue = new LinkedList<>(); private int maxCount; private Lock lock = new ReentrantLock(); BlockingQueue(int maxCount){ this.maxCount=maxCount; } public void enQueue(int d) throws InterruptedException { lock.lock(); if(queue.size() >= maxCount){ System.out.println("正在等待入队 " + Thread.currentThread().getName() + " 大小 " + queue.size()); lock.wait(); } System.out.println(" 添加 " + d); queue.add(d); lock.notify(); lock.unlock(); } public Integer deQueue() throws InterruptedException { lock.lock(); if(queue.size()==0){ System.out.println("正在等待出队 " + Thread.currentThread().getName() + " 大小 " + queue.size()); lock.wait(); } int data = queue.remove(); lock.notify(); lock.unlock(); return data; } }
public class Main { public static void main(String args[]){ BlockingQueue queue=new BlockingQueue(10); Producer p = new Producer(queue); Consumer c = new Consumer(queue); Thread t1=new Thread(c,"消费者"); Thread t2=new Thread(p, "生产者"); t1.start(); t2.start(); } }
public class Producer implements Runnable { private BlockingQueue q; Producer(BlockingQueue qu){ this.q=qu; } public Integer generateWork() throws InterruptedException { return new Random().nextInt(100); } @Override public void run() { for(int i =0; i<100; i++){ try { Thread.sleep(100); q.enQueue(generateWork()); } catch (InterruptedException e) { e.printStackTrace(); } } } }
public class Consumer implements Runnable { private BlockingQueue queue; Consumer(BlockingQueue q){ this.queue=q; } public void consume(int data){ System.out.println(" 消费 " + data); } @Override public void run() { for(int i=0; i < 100; i++){ try { Thread.sleep(1000); consume(queue.deQueue()); } catch (InterruptedException e) { System.out.println("中断"); } } } }
\n输出:\nAdding 94\nException in thread \"producer\" java.lang.IllegalMonitorStateException\nat java.lang.Object.notify(Native Method)\nat BlockingQueue.enQueue(\nat\nat
public class BlockingQueue { private Queuequeue = new LinkedList<>(); private int maxCount; private Lock lock = new ReentrantLock(); BlockingQueue(int maxCount) { this.maxCount = maxCount; } public void enQueue(int d) throws InterruptedException { lock.lock(); if (queue.size() >= maxCount) { System.out.println("going to wait enqueu " + Thread.currentThread().getName() + " size " + queue.size()); waitInternal(); } System.out.println(" Adding " + d); queue.add(d); notifyInternal(); lock.unlock(); } public Integer deQueue() throws InterruptedException { lock.lock(); if (queue.size() == 0) { System.out.println("going to wait dequeue " + Thread.currentThread().getName() + " size " + queue.size()); waitInternal(); } int data = queue.remove(); notifyInternal(); lock.unlock(); return data; } private void waitInternal() throws InterruptedException { synchronized (lock) { lock.wait(); } } private void notifyInternal() throws InterruptedException { synchronized (lock) { lock.notify(); } } }