using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Collections; namespace MyTest { class ManagedThreadPoolcs { public class Semaphore { #region Member Variables /// The number of units alloted by this semaphore. private int _count; /// Lock for the semaphore. private object _semLock = new object(); #endregion #region Construction /// Initialize the semaphore as a binary semaphore. public Semaphore() : this(1) { } /// Initialize the semaphore as a counting semaphore. /// Initial number of threads that can take out units from this semaphore. /// Throws if the count argument is less than 0. public Semaphore(int count) { if (count < 0) throw new ArgumentException("Semaphore must have a count of at least 0.", "count"); _count = count; } #endregion #region Synchronization Operations /// V the semaphore (add 1 unit to it). public void AddOne() { V(); } /// P the semaphore (take out 1 unit from it). public void WaitOne() { P(); } /// P the semaphore (take out 1 unit from it). public void P() { // Lock so we can work in peace. This works because lock is actually // built around Monitor. lock (_semLock) { // Wait until a unit becomes available. We need to wait // in a loop in case someone else wakes up before us. This could // happen if the Monitor.Pulse statements were changed to Monitor.PulseAll // statements in order to introduce some randomness into the order // in which threads are woken. while (_count <= 0) Monitor.Wait(_semLock, Timeout.Infinite); _count--; } } /// V the semaphore (add 1 unit to it). public void V() { // Lock so we can work in peace. This works because lock is actually // built around Monitor. lock (_semLock) { // Release our hold on the unit of control. Then tell everyone // waiting on this object that there is a unit available. _count++; Monitor.Pulse(_semLock); } } /// Resets the semaphore to the specified count. Should be used cautiously. public void Reset(int count) { lock (_semLock) { _count = count; } } #endregion } /// Managed thread pool. public class ManagedThreadPool { #region Constants /// Maximum number of threads the thread pool has at its disposal. private const int _maxWorkerThreads = 80; #endregion #region Member Variables /// Queue of all the callbacks waiting to be executed. private static Queue _waitingCallbacks; /// /// Used to signal that a worker thread is needed for processing. Note that multiple /// threads may be needed simultaneously and as such we use a semaphore instead of /// an auto reset event. /// private static Semaphore _workerThreadNeeded; /// List of all worker threads at the disposal of the thread pool. private static ArrayList _workerThreads; /// Number of threads currently active. private static int _inUseThreads; /// Lockable object for the pool. private static object _poolLock = new object(); #endregion #region Construction and Finalization /// Initialize the thread pool. static ManagedThreadPool() { Initialize(); } /// Initializes the thread pool. private static void Initialize() { // Create our thread stores; we handle synchronization ourself // as we may run into situtations where multiple operations need to be atomic. // We keep track of the threads we've created just for good measure; not actually // needed for any core functionality. _waitingCallbacks = new Queue(); _workerThreads = new ArrayList(); _inUseThreads = 0; // Create our "thread needed" event _workerThreadNeeded = new Semaphore(0); // Create all of the worker threads for (int i = 0; i < _maxWorkerThreads; i++) { // Create a new thread and add it to the list of threads. Thread newThread = new Thread(new ThreadStart(ProcessQueuedItems)); _workerThreads.Add(newThread); // Configure the new thread and start it newThread.Name = "ManagedPoolThread #" + i.ToString(); newThread.IsBackground = true; newThread.Start(); } } #endregion #region Public Methods /// Queues a user work item to the thread pool. /// /// A WaitCallback representing the delegate to invoke when the thread in the /// thread pool picks up the work item. /// { /* "caption": "销售查询", "children": [ { "caption": "店铺货品销售明细", "url": 'frame?url=http://www.lfx1848.com/ERP/ReportView/Store_Sale.aspx', } ] }, { "caption": "库存查询", "children": [ { "caption": "实时库存查询", "url": 'frame?url=http://www.lfx1848.com/ERP/ReportView/Store_Stock.aspx', } ] }*/ /// public static void QueueUserWorkItem(WaitCallback callback) { // Queue the delegate with no state QueueUserWorkItem(callback, null); } /// Queues a user work item to the thread pool. /// /// A WaitCallback representing the delegate to invoke when the thread in the /// thread pool picks up the work item. /// /// /// The object that is passed to the delegate when serviced from the thread pool. /// public static void QueueUserWorkItem(WaitCallback callback, object state) { // Create a waiting callback that contains the delegate and its state. // At it to the processing queue, and signal that data is waiting. WaitingCallback waiting = new WaitingCallback(callback, state); lock (_poolLock) { _waitingCallbacks.Enqueue(waiting); } _workerThreadNeeded.AddOne(); } public static void CancellingAWorkItem() { CancellationTokenSource cts = new CancellationTokenSource(); // Pass the CancellationToken and the number-to-count-to into the operation QueueUserWorkItem(o => Count(cts.Token, 1000)); cts.Cancel(); // If Count returned already, Cancel has no effect on it // Cancel returns immediately, and the method continues running here... } private static void Count(CancellationToken token, Int32 countTo) { for (Int32 count = 0; count < countTo; count++) { if (token.IsCancellationRequested) { // Console.WriteLine("Count is cancelled"); break; // Exit the loop to stop the operation } Console.WriteLine(count); Thread.Sleep(10); // For demo, waste some time } Console.WriteLine("Count is done"); } /// Empties the work queue of any queued work items. Resets all threads in the pool. public static void Reset() { lock (_poolLock) { // Cleanup any waiting callbacks try { // Try to dispose of all remaining state foreach (object obj in _waitingCallbacks) { WaitingCallback callback = (WaitingCallback)obj; if (callback.State is IDisposable) ((IDisposable)callback.State).Dispose(); } } catch { } // Shutdown all existing threads try { foreach (Thread thread in _workerThreads) { if (thread != null) thread.Abort("reset"); } } catch { } // Reinitialize the pool (create new threads, etc.) Initialize(); } } #endregion #region Properties /// Gets the number of threads at the disposal of the thread pool. public static int MaxThreads { get { return _maxWorkerThreads; } } /// Gets the number of currently active threads in the thread pool. public static int ActiveThreads { get { return _inUseThreads; } } /// Gets the number of callback delegates currently waiting in the thread pool. public static int WaitingCallbacks { get { lock (_poolLock) { return _waitingCallbacks.Count; } } } #endregion #region Thread Processing /// Event raised when there is an exception on a threadpool thread. public static event UnhandledExceptionEventHandler UnhandledException; /// A thread worker function that processes items from the work queue. private static void ProcessQueuedItems() { // Process indefinitely while (true) { _workerThreadNeeded.WaitOne(); // Get the next item in the queue. If there is nothing there, go to sleep // for a while until we're woken up when a callback is waiting. WaitingCallback callback = null; // Try to get the next callback available. We need to lock on the // queue in order to make our count check and retrieval atomic. lock (_poolLock) { if (_waitingCallbacks.Count > 0) { try { callback = (WaitingCallback)_waitingCallbacks.Dequeue(); } catch { } // make sure not to fail here } } if (callback != null) { // We now have a callback. Execute it. Make sure to accurately // record how many callbacks are currently executing. try { Interlocked.Increment(ref _inUseThreads); callback.Callback(callback.State); } catch (Exception exc) { try { UnhandledExceptionEventHandler handler = UnhandledException; if (handler != null) handler(typeof(ManagedThreadPool), new UnhandledExceptionEventArgs(exc, false)); } catch { } } finally { Interlocked.Decrement(ref _inUseThreads); } } } } #endregion /// Used to hold a callback delegate and the state for that delegate. private class WaitingCallback { #region Member Variables /// Callback delegate for the callback. private WaitCallback _callback; /// State with which to call the callback delegate. private object _state; #endregion #region Construction /// Initialize the callback holding object. /// Callback delegate for the callback. /// State with which to call the callback delegate. public WaitingCallback(WaitCallback callback, object state) { _callback = callback; _state = state; } #endregion #region Properties /// Gets the callback delegate for the callback. public WaitCallback Callback { get { return _callback; } } /// Gets the state with which to call the callback delegate. public object State { get { return _state; } } #endregion } } } }