123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343 |
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading;
- using System.Collections;
- namespace MyTest
- {
- class ManagedThreadPoolcs
- {
- public class Semaphore
- {
- #region Member Variables
- /// <summary>The number of units alloted by this semaphore.</summary>
- private int _count;
- /// <summary>Lock for the semaphore.</summary>
- private object _semLock = new object();
- #endregion
- #region Construction
- /// <summary> Initialize the semaphore as a binary semaphore.</summary>
- public Semaphore()
- : this(1)
- {
- }
- /// <summary> Initialize the semaphore as a counting semaphore.</summary>
- /// <param name="count">Initial number of threads that can take out units from this semaphore.</param>
- /// <exception cref="ArgumentException">Throws if the count argument is less than 0.</exception>
- public Semaphore(int count)
- {
- if (count < 0) throw new ArgumentException("Semaphore must have a count of at least 0.", "count");
- _count = count;
- }
- #endregion
- #region Synchronization Operations
- /// <summary>V the semaphore (add 1 unit to it).</summary>
- public void AddOne() { V(); }
- /// <summary>P the semaphore (take out 1 unit from it).</summary>
- public void WaitOne() { P(); }
- /// <summary>P the semaphore (take out 1 unit from it).</summary>
- public void P()
- {
- // Lock so we can work in peace. This works because lock is actually
- // built around Monitor.
- lock (_semLock)
- {
- // Wait until a unit becomes available. We need to wait
- // in a loop in case someone else wakes up before us. This could
- // happen if the Monitor.Pulse statements were changed to Monitor.PulseAll
- // statements in order to introduce some randomness into the order
- // in which threads are woken.
- while (_count <= 0) Monitor.Wait(_semLock, Timeout.Infinite);
- _count--;
- }
- }
- /// <summary>V the semaphore (add 1 unit to it).</summary>
- public void V()
- {
- // Lock so we can work in peace. This works because lock is actually
- // built around Monitor.
- lock (_semLock)
- {
- // Release our hold on the unit of control. Then tell everyone
- // waiting on this object that there is a unit available.
- _count++;
- Monitor.Pulse(_semLock);
- }
- }
- /// <summary>Resets the semaphore to the specified count. Should be used cautiously.</summary>
- public void Reset(int count)
- {
- lock (_semLock) { _count = count; }
- }
- #endregion
- }
- /// <summary>Managed thread pool.</summary>
- public class ManagedThreadPool
- {
- #region Constants
- /// <summary>Maximum number of threads the thread pool has at its disposal.</summary>
- private const int _maxWorkerThreads = 80;
- #endregion
- #region Member Variables
- /// <summary>Queue of all the callbacks waiting to be executed.</summary>
- private static Queue _waitingCallbacks;
- /// <summary>
- /// Used to signal that a worker thread is needed for processing. Note that multiple
- /// threads may be needed simultaneously and as such we use a semaphore instead of
- /// an auto reset event.
- /// </summary>
- private static Semaphore _workerThreadNeeded;
- /// <summary>List of all worker threads at the disposal of the thread pool.</summary>
- private static ArrayList _workerThreads;
- /// <summary>Number of threads currently active.</summary>
- private static int _inUseThreads;
- /// <summary>Lockable object for the pool.</summary>
- private static object _poolLock = new object();
- #endregion
- #region Construction and Finalization
- /// <summary>Initialize the thread pool.</summary>
- static ManagedThreadPool() { Initialize(); }
- /// <summary>Initializes the thread pool.</summary>
- private static void Initialize()
- {
- // Create our thread stores; we handle synchronization ourself
- // as we may run into situtations where multiple operations need to be atomic.
- // We keep track of the threads we've created just for good measure; not actually
- // needed for any core functionality.
- _waitingCallbacks = new Queue();
- _workerThreads = new ArrayList();
- _inUseThreads = 0;
- // Create our "thread needed" event
- _workerThreadNeeded = new Semaphore(0);
- // Create all of the worker threads
- for (int i = 0; i < _maxWorkerThreads; i++)
- {
- // Create a new thread and add it to the list of threads.
- Thread newThread = new Thread(new ThreadStart(ProcessQueuedItems));
- _workerThreads.Add(newThread);
- // Configure the new thread and start it
- newThread.Name = "ManagedPoolThread #" + i.ToString();
- newThread.IsBackground = true;
- newThread.Start();
- }
- }
- #endregion
- #region Public Methods
- /// <summary>Queues a user work item to the thread pool.</summary>
- /// <param name="callback">
- /// A WaitCallback representing the delegate to invoke when the thread in the
- /// thread pool picks up the work item.
- /// {
- /* "caption": "销售查询",
- "children": [
- {
- "caption": "店铺货品销售明细",
- "url": 'frame?url=http://www.lfx1848.com/ERP/ReportView/Store_Sale.aspx',
- }
- ]
- },
- {
- "caption": "库存查询",
- "children": [
- {
- "caption": "实时库存查询",
- "url": 'frame?url=http://www.lfx1848.com/ERP/ReportView/Store_Stock.aspx',
- }
- ]
- }*/
- /// </param>
- public static void QueueUserWorkItem(WaitCallback callback)
- {
- // Queue the delegate with no state
- QueueUserWorkItem(callback, null);
- }
- /// <summary>Queues a user work item to the thread pool.</summary>
- /// <param name="callback">
- /// A WaitCallback representing the delegate to invoke when the thread in the
- /// thread pool picks up the work item.
- /// </param>
- /// <param name="state">
- /// The object that is passed to the delegate when serviced from the thread pool.
- /// </param>
- public static void QueueUserWorkItem(WaitCallback callback, object state)
- {
- // Create a waiting callback that contains the delegate and its state.
- // At it to the processing queue, and signal that data is waiting.
- WaitingCallback waiting = new WaitingCallback(callback, state);
- lock (_poolLock) { _waitingCallbacks.Enqueue(waiting); }
- _workerThreadNeeded.AddOne();
- }
- public static void CancellingAWorkItem()
- {
- CancellationTokenSource cts = new CancellationTokenSource();
- // Pass the CancellationToken and the number-to-count-to into the operation
- QueueUserWorkItem(o => Count(cts.Token, 1000));
- cts.Cancel(); // If Count returned already, Cancel has no effect on it
- // Cancel returns immediately, and the method continues running here...
- }
- private static void Count(CancellationToken token, Int32 countTo)
- {
- for (Int32 count = 0; count < countTo; count++)
- {
- if (token.IsCancellationRequested)
- {
- // Console.WriteLine("Count is cancelled");
- break; // Exit the loop to stop the operation
- }
- Console.WriteLine(count);
- Thread.Sleep(10); // For demo, waste some time
- }
- Console.WriteLine("Count is done");
- }
- /// <summary>Empties the work queue of any queued work items. Resets all threads in the pool.</summary>
- public static void Reset()
- {
- lock (_poolLock)
- {
- // Cleanup any waiting callbacks
- try
- {
- // Try to dispose of all remaining state
- foreach (object obj in _waitingCallbacks)
- {
- WaitingCallback callback = (WaitingCallback)obj;
- if (callback.State is IDisposable) ((IDisposable)callback.State).Dispose();
- }
- }
- catch { }
- // Shutdown all existing threads
- try
- {
- foreach (Thread thread in _workerThreads)
- {
- if (thread != null) thread.Abort("reset");
- }
- }
- catch { }
- // Reinitialize the pool (create new threads, etc.)
- Initialize();
- }
- }
- #endregion
- #region Properties
- /// <summary>Gets the number of threads at the disposal of the thread pool.</summary>
- public static int MaxThreads { get { return _maxWorkerThreads; } }
- /// <summary>Gets the number of currently active threads in the thread pool.</summary>
- public static int ActiveThreads { get { return _inUseThreads; } }
- /// <summary>Gets the number of callback delegates currently waiting in the thread pool.</summary>
- public static int WaitingCallbacks { get { lock (_poolLock) { return _waitingCallbacks.Count; } } }
- #endregion
- #region Thread Processing
- /// <summary>Event raised when there is an exception on a threadpool thread.</summary>
- public static event UnhandledExceptionEventHandler UnhandledException;
- /// <summary>A thread worker function that processes items from the work queue.</summary>
- private static void ProcessQueuedItems()
- {
- // Process indefinitely
- while (true)
- {
- _workerThreadNeeded.WaitOne();
- // Get the next item in the queue. If there is nothing there, go to sleep
- // for a while until we're woken up when a callback is waiting.
- WaitingCallback callback = null;
- // Try to get the next callback available. We need to lock on the
- // queue in order to make our count check and retrieval atomic.
- lock (_poolLock)
- {
- if (_waitingCallbacks.Count > 0)
- {
- try { callback = (WaitingCallback)_waitingCallbacks.Dequeue(); }
- catch { } // make sure not to fail here
- }
- }
- if (callback != null)
- {
- // We now have a callback. Execute it. Make sure to accurately
- // record how many callbacks are currently executing.
- try
- {
- Interlocked.Increment(ref _inUseThreads);
- callback.Callback(callback.State);
- }
- catch (Exception exc)
- {
- try
- {
- UnhandledExceptionEventHandler handler = UnhandledException;
- if (handler != null) handler(typeof(ManagedThreadPool), new UnhandledExceptionEventArgs(exc, false));
- }
- catch { }
- }
- finally
- {
- Interlocked.Decrement(ref _inUseThreads);
- }
- }
- }
- }
- #endregion
- /// <summary>Used to hold a callback delegate and the state for that delegate.</summary>
- private class WaitingCallback
- {
- #region Member Variables
- /// <summary>Callback delegate for the callback.</summary>
- private WaitCallback _callback;
- /// <summary>State with which to call the callback delegate.</summary>
- private object _state;
- #endregion
- #region Construction
- /// <summary>Initialize the callback holding object.</summary>
- /// <param name="callback">Callback delegate for the callback.</param>
- /// <param name="state">State with which to call the callback delegate.</param>
- public WaitingCallback(WaitCallback callback, object state)
- {
- _callback = callback;
- _state = state;
- }
- #endregion
- #region Properties
- /// <summary>Gets the callback delegate for the callback.</summary>
- public WaitCallback Callback { get { return _callback; } }
- /// <summary>Gets the state with which to call the callback delegate.</summary>
- public object State { get { return _state; } }
- #endregion
- }
- }
- }
- }
|