using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Collections;
namespace MyTest
{
class ManagedThreadPoolcs
{
public class Semaphore
{
#region Member Variables
/// The number of units alloted by this semaphore.
private int _count;
/// Lock for the semaphore.
private object _semLock = new object();
#endregion
#region Construction
/// Initialize the semaphore as a binary semaphore.
public Semaphore()
: this(1)
{
}
/// Initialize the semaphore as a counting semaphore.
/// Initial number of threads that can take out units from this semaphore.
/// Throws if the count argument is less than 0.
public Semaphore(int count)
{
if (count < 0) throw new ArgumentException("Semaphore must have a count of at least 0.", "count");
_count = count;
}
#endregion
#region Synchronization Operations
/// V the semaphore (add 1 unit to it).
public void AddOne() { V(); }
/// P the semaphore (take out 1 unit from it).
public void WaitOne() { P(); }
/// P the semaphore (take out 1 unit from it).
public void P()
{
// Lock so we can work in peace. This works because lock is actually
// built around Monitor.
lock (_semLock)
{
// Wait until a unit becomes available. We need to wait
// in a loop in case someone else wakes up before us. This could
// happen if the Monitor.Pulse statements were changed to Monitor.PulseAll
// statements in order to introduce some randomness into the order
// in which threads are woken.
while (_count <= 0) Monitor.Wait(_semLock, Timeout.Infinite);
_count--;
}
}
/// V the semaphore (add 1 unit to it).
public void V()
{
// Lock so we can work in peace. This works because lock is actually
// built around Monitor.
lock (_semLock)
{
// Release our hold on the unit of control. Then tell everyone
// waiting on this object that there is a unit available.
_count++;
Monitor.Pulse(_semLock);
}
}
/// Resets the semaphore to the specified count. Should be used cautiously.
public void Reset(int count)
{
lock (_semLock) { _count = count; }
}
#endregion
}
/// Managed thread pool.
public class ManagedThreadPool
{
#region Constants
/// Maximum number of threads the thread pool has at its disposal.
private const int _maxWorkerThreads = 80;
#endregion
#region Member Variables
/// Queue of all the callbacks waiting to be executed.
private static Queue _waitingCallbacks;
///
/// Used to signal that a worker thread is needed for processing. Note that multiple
/// threads may be needed simultaneously and as such we use a semaphore instead of
/// an auto reset event.
///
private static Semaphore _workerThreadNeeded;
/// List of all worker threads at the disposal of the thread pool.
private static ArrayList _workerThreads;
/// Number of threads currently active.
private static int _inUseThreads;
/// Lockable object for the pool.
private static object _poolLock = new object();
#endregion
#region Construction and Finalization
/// Initialize the thread pool.
static ManagedThreadPool() { Initialize(); }
/// Initializes the thread pool.
private static void Initialize()
{
// Create our thread stores; we handle synchronization ourself
// as we may run into situtations where multiple operations need to be atomic.
// We keep track of the threads we've created just for good measure; not actually
// needed for any core functionality.
_waitingCallbacks = new Queue();
_workerThreads = new ArrayList();
_inUseThreads = 0;
// Create our "thread needed" event
_workerThreadNeeded = new Semaphore(0);
// Create all of the worker threads
for (int i = 0; i < _maxWorkerThreads; i++)
{
// Create a new thread and add it to the list of threads.
Thread newThread = new Thread(new ThreadStart(ProcessQueuedItems));
_workerThreads.Add(newThread);
// Configure the new thread and start it
newThread.Name = "ManagedPoolThread #" + i.ToString();
newThread.IsBackground = true;
newThread.Start();
}
}
#endregion
#region Public Methods
/// Queues a user work item to the thread pool.
///
/// A WaitCallback representing the delegate to invoke when the thread in the
/// thread pool picks up the work item.
/// {
/* "caption": "销售查询",
"children": [
{
"caption": "店铺货品销售明细",
"url": 'frame?url=http://www.lfx1848.com/ERP/ReportView/Store_Sale.aspx',
}
]
},
{
"caption": "库存查询",
"children": [
{
"caption": "实时库存查询",
"url": 'frame?url=http://www.lfx1848.com/ERP/ReportView/Store_Stock.aspx',
}
]
}*/
///
public static void QueueUserWorkItem(WaitCallback callback)
{
// Queue the delegate with no state
QueueUserWorkItem(callback, null);
}
/// Queues a user work item to the thread pool.
///
/// A WaitCallback representing the delegate to invoke when the thread in the
/// thread pool picks up the work item.
///
///
/// The object that is passed to the delegate when serviced from the thread pool.
///
public static void QueueUserWorkItem(WaitCallback callback, object state)
{
// Create a waiting callback that contains the delegate and its state.
// At it to the processing queue, and signal that data is waiting.
WaitingCallback waiting = new WaitingCallback(callback, state);
lock (_poolLock) { _waitingCallbacks.Enqueue(waiting); }
_workerThreadNeeded.AddOne();
}
public static void CancellingAWorkItem()
{
CancellationTokenSource cts = new CancellationTokenSource();
// Pass the CancellationToken and the number-to-count-to into the operation
QueueUserWorkItem(o => Count(cts.Token, 1000));
cts.Cancel(); // If Count returned already, Cancel has no effect on it
// Cancel returns immediately, and the method continues running here...
}
private static void Count(CancellationToken token, Int32 countTo)
{
for (Int32 count = 0; count < countTo; count++)
{
if (token.IsCancellationRequested)
{
// Console.WriteLine("Count is cancelled");
break; // Exit the loop to stop the operation
}
Console.WriteLine(count);
Thread.Sleep(10); // For demo, waste some time
}
Console.WriteLine("Count is done");
}
/// Empties the work queue of any queued work items. Resets all threads in the pool.
public static void Reset()
{
lock (_poolLock)
{
// Cleanup any waiting callbacks
try
{
// Try to dispose of all remaining state
foreach (object obj in _waitingCallbacks)
{
WaitingCallback callback = (WaitingCallback)obj;
if (callback.State is IDisposable) ((IDisposable)callback.State).Dispose();
}
}
catch { }
// Shutdown all existing threads
try
{
foreach (Thread thread in _workerThreads)
{
if (thread != null) thread.Abort("reset");
}
}
catch { }
// Reinitialize the pool (create new threads, etc.)
Initialize();
}
}
#endregion
#region Properties
/// Gets the number of threads at the disposal of the thread pool.
public static int MaxThreads { get { return _maxWorkerThreads; } }
/// Gets the number of currently active threads in the thread pool.
public static int ActiveThreads { get { return _inUseThreads; } }
/// Gets the number of callback delegates currently waiting in the thread pool.
public static int WaitingCallbacks { get { lock (_poolLock) { return _waitingCallbacks.Count; } } }
#endregion
#region Thread Processing
/// Event raised when there is an exception on a threadpool thread.
public static event UnhandledExceptionEventHandler UnhandledException;
/// A thread worker function that processes items from the work queue.
private static void ProcessQueuedItems()
{
// Process indefinitely
while (true)
{
_workerThreadNeeded.WaitOne();
// Get the next item in the queue. If there is nothing there, go to sleep
// for a while until we're woken up when a callback is waiting.
WaitingCallback callback = null;
// Try to get the next callback available. We need to lock on the
// queue in order to make our count check and retrieval atomic.
lock (_poolLock)
{
if (_waitingCallbacks.Count > 0)
{
try { callback = (WaitingCallback)_waitingCallbacks.Dequeue(); }
catch { } // make sure not to fail here
}
}
if (callback != null)
{
// We now have a callback. Execute it. Make sure to accurately
// record how many callbacks are currently executing.
try
{
Interlocked.Increment(ref _inUseThreads);
callback.Callback(callback.State);
}
catch (Exception exc)
{
try
{
UnhandledExceptionEventHandler handler = UnhandledException;
if (handler != null) handler(typeof(ManagedThreadPool), new UnhandledExceptionEventArgs(exc, false));
}
catch { }
}
finally
{
Interlocked.Decrement(ref _inUseThreads);
}
}
}
}
#endregion
/// Used to hold a callback delegate and the state for that delegate.
private class WaitingCallback
{
#region Member Variables
/// Callback delegate for the callback.
private WaitCallback _callback;
/// State with which to call the callback delegate.
private object _state;
#endregion
#region Construction
/// Initialize the callback holding object.
/// Callback delegate for the callback.
/// State with which to call the callback delegate.
public WaitingCallback(WaitCallback callback, object state)
{
_callback = callback;
_state = state;
}
#endregion
#region Properties
/// Gets the callback delegate for the callback.
public WaitCallback Callback { get { return _callback; } }
/// Gets the state with which to call the callback delegate.
public object State { get { return _state; } }
#endregion
}
}
}
}