Auf der letzten BASTA! im Februar wurde mir die Frage gestellt: Kann ich die Priorität (Priority) eines Tasks festlegen?
Diese Frage möchte ich nun in dem folgenden Post ausführlich beantworten. Wie ich bereits sagte, kann die Priorität beeinflusst werden. Aber zunächst der Reihe nach.
Task und Thread
Konkret geht es darum die Priorität eines Threads zu ändern der einen Task ausführt. Die Priorität eines Tasks kann nicht festgelegt werden. Ein Task Objekt kann höchstens eine Information über die gewünschte Priorität mitführen. Das Task Objekt selbst ist ja bekanntlich nicht ausführungsfähig, sondern benötigt immer einen Thread. D. h. jeder Task wird früher oder später an einen Thread gebunden. An dieser Stelle muss nun eingegriffen werden und ein Thread mit gewünschter Priorität an den Task gebunden werden. Dies ist mittels eigenen Task Scheduler möglich.
Implementierung eines eigenen Task Scheduler
Durch die Umsetzung eines eigenen Task Scheduler besteht die Möglichkeit, auf die Einstellungen der (Ausführungs-) Threads Einfluss zu nehmen. Innerhalb eines eigenen Task Schedulers werden die verfügbaren Threads mit den gewünschten Werten (Apartment State, Priority usw.) angelegt. Anschließend kann der umgesetzte Task Scheduler dazu verwendet werden, die Tasks auszuführen.
Wie sieht so was konkret aus?
Ein eigener Task Scheduler wird von der Klasse TaskScheduler aus dem Namensraum System.Threading.Tasks abgeleitet. Die wichtigsten zu implementierenden Methoden sind:
- QueueTask
- GetScheduledTasks
- TryExecuteTaskInline
Der Methode QueueTask wird ein neuer Task zu Ausführung übergeben. Dieser wird dann zunächst einer Warteschlange hinzugefügt und verarbeitet, sobald Ressourcen verfügbar werden. Die Methode GetScheduledTasks gibt eine Liste der aktuell eingeplanten Task Objekte zurück. Die Methode TryExecuteTaskInline versucht einen Task auf dem aktuellen Thread auszuführen.
Nachfolgend werden die wichtigsten Implementierungen erläutert:
public class TaskThreadPriority : TaskScheduler, IDisposable
{
private ThreadPriority priority = ThreadPriority.Normal;
private ApartmentState apState = ApartmentState.MTA;
private Lazy<Thread[]> availableThreads = null;
private BlockingCollection<Task> queuedTasks;
[ThreadStatic]
private static bool currentThreadIsActive;
private int degreeOfParallelism = 1;
public TaskThreadPriority(ThreadPriority priority, int degreeOfParallelism)
{
this.degreeOfParallelism = degreeOfParallelism;
this.priority = priority;
queuedTasks = new BlockingCollection<Task>();
InitializeScheduler(this.degreeOfParallelism, this.priority);
}
private void InitializeScheduler(int parallelismLevel, ThreadPriority priority)
{
// Set up threads
availableThreads = new Lazy<Thread[]>(() =>
{
Thread[] threads = new Thread[parallelismLevel];
for (int i = 0; i < threads.Length; i++)
{
threads[i] = new Thread(ProcessingRequests);
threads[i].IsBackground = true;
threads[i].Priority = priority;
threads[i].SetApartmentState(apState);
threads[i].Start();
}
return threads;
});
}
...
}
Dem Konstruktor können die wesentlichen Informationen, die zur Anlage des eigenen Task Schedulers notwendig sind, übergeben werden. Der Parameter priority legt die gewünschte Priorität für die späteren Threads fest und über den Parameter degreeOfParallelism kann die maximale Anzahl aktiver Thread eingestellt werden.
Die private Methode InitializeScheduler bereitet den Scheduler für die Ausführung vor. Innerhalb der Methode werden die gewünschte Anzahl Threads angelegt. Als Thread-Delegate wird die Methode ProcessingRequest mit folgender Implementierung übergeben:
private void ProcessingRequests()
{
foreach (var task in queuedTasks.GetConsumingEnumerable())
{
currentThreadIsActive = true;
base.TryExecuteTask(task);
currentThreadIsActive = false;
}
}
Die Methode ProcessingRequests ist für die Verarbeitung anstehender Task-Objekte zuständig. Neue Tasks Objekte werden über die Methode QueueTask der Warteschlange hinzugefügt. Sobald ein wartender Task aus der Warteschlange entnommen wurde, wird dieser der TryExecuteTask Methode zur Ausführung übergeben.
protected override void QueueTask(Task task)
{
var forceIgnore = availableThreads.Value;
lock (queuedTasks)
{
queuedTasks.Add(task);
}
}
Als Warteschlange wird eine BlockingCollection verwendet. Diese ist zu finden in dem Namensraum System.Collections.Concurrent und gehöhrt zu den neuen - unter .NET 4.0 eingeführten - thread-safe Collections. D. h. ein separater Schutz gegen gleichzeitige/konkurrierende Zugriffe ist out-of-the-box gewährleistet. Der eigene Scheduler setzt mithilfe der BlockingCollection das typische Design-Pattern Producer/Consumer um. Der Producer ist in diesem Fall die Anwendung, die den Scheduler verwendet. Die Consumer sind die einzelnen Threads, die anstehende Tasks verarbeiten.
Verwendung des Custom Task Schedulers
Die Verwendung des eigenen Schedulers ist relativ einfach, wie das nachfolgende Beispiel verdeutlicht:
public void UsingTasksCustomScheduler()
{
CancellationTokenSource cts = new CancellationTokenSource();
CancellationToken token = cts.Token;
TaskThreadPriority taskPrio = new TaskThreadPriority(ThreadPriority.Lowest, 8);
Task[] tasks = new Task[WORKLOAD];
int i = 0;
foreach (List<int> list in allLists)
{
tasks[i] = Task.Factory.StartNew(() => { list.Sort(); },
token, TaskCreationOptions.None, taskPrio);
i++;
}
Task.WaitAll(tasks);
}
Es muss lediglich eine Instanz des eigenen Task Schedulers erstellt werden und diese kann dann z. B. der Task.Factory.StartNew Methode übergeben werden.
Die gesamte Implementierung des Schedulers ist im Anhang zu finden.
HINWEIS:
Die hier gezeigte und zur Verfügung gestellte Implementierung dient lediglich der Demonstration und eignet sich nicht für den Einsatz in einer produktiven Umgebung. Innerhalb der Dispose-Methode muss z. B. noch sichergestellt werden, dass alle Threads ordnungsgemäß beendet wurden.
TaskThreadPriority.cs (2,80 kb)