1  using System;
   2  using System.Collections.Generic;
   3  using System.Threading;
   4  using System.Diagnostics;
   5  
   6  class WorkerThread {
   7      public WorkerThread(int workerNumber, WorkerThread consumerThread) {
   8          workerNumber_ = workerNumber;
   9          consumerThread_ = consumerThread;
  10          itemEnqueuedEvent_ = new AutoResetEvent(false);
  11          terminateThreadEvent_ = new AutoResetEvent(false);
  12          }
  13  
  14      public void doWork() {
  15          // Wait on two events at once:
  16          WaitHandle[] events = new WaitHandle[2];
  17          events[0] = itemEnqueuedEvent_;
  18          events[1] = terminateThreadEvent_;
  19  
  20          try {
  21              // Wait for item enqueued or terminate event:
  22              while (WaitHandle.WaitAny(events) != 1) {  // While not terminating ...
  23                  emptyTheQueue();
  24                  }
  25  
  26              emptyTheQueue();
  27      
  28              // Ask consumer to terminate itself (when it's ready)
  29              if (consumerThread_ != null) consumerThread_.setTerminateEventAndWait();
  30              Debug.WriteLine("Worker thread " + workerNumber_ + " terminating ...");            
  31              }
  32  
  33          catch (Exception x) 
  34              {Console.WriteLine("Worker thread " + workerNumber_ + x.Message.ToString());}
  35          }
  36  
  37      private void emptyTheQueue() {
  38          while(queueIsNonempty()) {
  39              int n = dequeue();
  40  
  41              // Do work:
  42              //n += workerNumber_;
  43              n = doHardWork(n);
  44  
  45              // Dispose of work:
  46              if(consumerThread_==null) {} // Console.Write("*");  // (" " + n);
  47              else consumerThread_.enqueue(n);                
  48              }
  49          }
  50  
  51      static private int doHardWork(int n) {
  52          DateTime startTime = DateTime.Now;
  53          for(int i = 0; i<1000000; i++) {  // Takes 16 mS on Xeon 2.5 MHz
  54              n = n * n;
  55              n = n%9999;
  56              }
  57          DateTime stopTime = DateTime.Now;
  58          TimeSpan duration = stopTime - startTime;
  59          Debug.WriteLine("doHardWork() terminating after " + duration.TotalSeconds + " seconds.");
  60          return n;
  61          }
  62  
  63      public void enqueue(int k) {
  64          lock (inputQueue_) inputQueue_.Enqueue(k);            
  65          itemEnqueuedEvent_.Set();
  66          }
  67  
  68      private int dequeue() {
  69          lock (inputQueue_) return inputQueue_.Dequeue();
  70          }
  71  
  72      private Boolean queueIsNonempty() {
  73          lock (inputQueue_) return inputQueue_.Count!=0;            
  74          }
  75  
  76      public void setTerminateEventAndWait() {
  77          terminateThreadEvent_.Set();
  78          myThread_.Join();
  79          }
  80  
  81      public void setThread(Thread thread) {myThread_ = thread;}
  82  
  83      private int workerNumber_;
  84      private WorkerThread consumerThread_;  // null if no thread consumes from this thread.
  85      private Queue<int> inputQueue_ = new Queue<int>();
  86      private Thread myThread_;
  87      private AutoResetEvent itemEnqueuedEvent_;
  88      private AutoResetEvent terminateThreadEvent_;
  89  }
  90