Monday, April 26, 2010

Threads: Producer-Consumer Problem

Producer-Consumer problem is a very common question related to threads. Following is a simple solution to the problem. QueueClass represents the queue which both the Producer and Consumer has to access. The critical methods in the QueueClass - add() and remove() are synchronized in order to maintain the status quo between the Producer and Consumer threads.

  1. import java.util.List;  
  2. import java.util.ArrayList;  
  3.   
  4. public class ProducerConsumerTest {  
  5.   
  6.  /** 
  7.   * @param args 
  8.   */  
  9.  public static void main(String[] args) {  
  10.   QueueClass q = new QueueClass();  
  11.   Producer p = new Producer(q, 10);  
  12.   Consumer c = new Consumer(q, 10);  
  13.     
  14.   Thread t1 = new Thread(p);  
  15.   Thread t2 = new Thread(c);  
  16.   c.setpThread(t1);  
  17.   t1.start();  
  18.   t2.start();  
  19.  }  
  20.   
  21. }  
  22.   
  23. class Producer implements Runnable {  
  24.  QueueClass q;  
  25.  int size;  
  26.    
  27.  Producer(QueueClass q, int size) {  
  28.   this.q = q;  
  29.   this.size = size;  
  30.  }  
  31.   
  32.  public void run() {  
  33.   int index = -1;  
  34.   while(true) {  
  35.    q.add(new String("" + ++index));  
  36.    try {  
  37.     Thread.sleep(1000);  
  38.    }  
  39.    catch(InterruptedException e) {  
  40.     e.printStackTrace();  
  41.    }  
  42.   }  
  43.  }  
  44. }  
  45.   
  46. class Consumer implements Runnable {  
  47.  QueueClass q;  
  48.  int size;  
  49.  Thread pThread;  
  50.    
  51.  public void setpThread(Thread pThread) {  
  52.   this.pThread = pThread;  
  53.  }  
  54.   
  55.  Consumer(QueueClass q, int size) {  
  56.   this.q = q;  
  57.   this.size = size;  
  58.  }  
  59.   
  60.  public void run() {  
  61.   while(true) {  
  62.    q.remove();  
  63.    try {  
  64.     Thread.sleep(1000);  
  65.    }  
  66.    catch(InterruptedException e) {  
  67.     e.printStackTrace();  
  68.    }  
  69.   }  
  70.  }  
  71. }  
  72.   
  73. class QueueClass {  
  74.  List<string> queue = new ArrayList<string>();  
  75.  int size = 10;  
  76.    
  77.  public synchronized void add(String s) {  
  78.   if(getSize() < size) queue.add(s);  
  79.   System.out.println("Added: " + queue);  
  80.   try {  
  81.    if(getSize() >= size) {  
  82.     notifyAll();  
  83.     wait();  
  84.    }  
  85.   }  
  86.   catch(InterruptedException e) {  
  87.    e.printStackTrace();  
  88.   }  
  89.  }  
  90.    
  91.  public synchronized void remove() {  
  92.   if(getSize() > 0) queue.remove(queue.size()-1);  
  93.   System.out.println("Removed: " + queue);  
  94.   try {  
  95.    if(getSize() <= 0) {  
  96.     notifyAll();  
  97.     wait();  
  98.    }  
  99.   }  
  100.   catch(InterruptedException e) {  
  101.    e.printStackTrace();  
  102.   }  
  103.  }  
  104.    
  105.  public synchronized int getSize() {  
  106.   return queue.size();  
  107.  }  
  108. }  
  109.   
  110. </string></string>  

import java.util.List;
import java.util.ArrayList;

public class ProducerConsumerTest {

/**
* @param args
*/
public static void main(String[] args) {
QueueClass q = new QueueClass();
Producer p = new Producer(q, 10);
Consumer c = new Consumer(q, 10);

Thread t1 = new Thread(p);
Thread t2 = new Thread(c);
c.setpThread(t1);
t1.start();
t2.start();
}

}

class Producer implements Runnable {
QueueClass q;
int size;

Producer(QueueClass q, int size) {
this.q = q;
this.size = size;
}

public void run() {
int index = -1;
while(true) {
q.add(new String("" + ++index));
try {
Thread.sleep(1000);
}
catch(InterruptedException e) {
e.printStackTrace();
}
}
}
}

class Consumer implements Runnable {
QueueClass q;
int size;
Thread pThread;

public void setpThread(Thread pThread) {
this.pThread = pThread;
}

Consumer(QueueClass q, int size) {
this.q = q;
this.size = size;
}

public void run() {
while(true) {
q.remove();
try {
Thread.sleep(1000);
}
catch(InterruptedException e) {
e.printStackTrace();
}
}
}
}

class QueueClass {
List queue = new ArrayList();
int size = 10;

public synchronized void add(String s) {
if(getSize() < size) queue.add(s);
System.out.println("Added: " + queue);
try {
if(getSize() >= size) {
notifyAll();
wait();
}
}
catch(InterruptedException e) {
e.printStackTrace();
}
}

public synchronized void remove() {
if(getSize() > 0) queue.remove(queue.size()-1);
System.out.println("Removed: " + queue);
try {
if(getSize() <= 0) {
notifyAll();
wait();
}
}
catch(InterruptedException e) {
e.printStackTrace();
}
}

public synchronized int getSize() {
return queue.size();
}
}