Feel Good.

03 августа 2010

NQueueing - Система Массового Обслуживания

Хочу представить Вам свою миниатюрную библиотеку (NQueueing), упрощающая C# .NET разработчикам процесс разработки приложений, в которых присутствует многопоточная обработка очередей (Система Массового Обслуживания (СМО)). Библиотеку NQueueing можно скачать с nqueueing.codeplex.com как в виде исходников, так и в виде готовой сборки.

Q: Зачем это?
A: Библиотека содержит расширяемый набор классов и интерфейсов, представляющих для .NET разработчика простой инструмент для реализации собственной СМО.

Q: Пример использования?
A: Например, Вы пишете приложение которое принимает из сети пакеты, фильтрует, разбирает и складывает их в базу, при этом, каждый из перечисленных шагов выносится в отдельную СМО (отдельный(-ые) поток(и) со своей очередью). Обработка задач из очереди на каждом шаге происходит независимо от других шагов в отдельном потоке, что позволяет достичь максимальной производительности системы целиком.

Q: Как устроена?
A: Основу библиотеки NQueueing представляет интерфейс IServerQueue.

Интерфейс является generic-типом, и типизируется типом задачи. Все обработчики очередей реализуют данный интерфейс.
Уровнем выше, находится абстрактный базовый класс для всех обработчиков очередей ServerQueueBase, инкапсулирующий в себя общие свойства любой СМО (например: объект очередь и массив потоков).
Следующий уровень абстракции - это конкретные реализации различных типов СМО, унаследованные от ServerQueueBase классы: StandartServerQueue (для поштучной обработки задач из очереди) и BatchServerQueue (для пакетной обработки):


Q: Как использовать?
A: Использовать библиотеку достаточно просто:
  1. Скачать библиотеку.
  2. Добавить на нее reference в проекте. После чего станет доступен namespace NQueueing.
  3. В зависимости от типа задачи, выбрать соответствующий тип СМО (StandartServerQueue, BatchServerQueue итп).
  4. Далее создать экземпляр СМО (выбранной на предыдущем шаге), передав в конструктор ссылку на делегат, в котором и описывается процесс обработки пришедшей задачи, и указав параметры СМО: длина очереди, число потоков обработки.
  5. При помощи потоко-безопасных TryEnqueue/Enqueue ставить задачи в очередь.
Простой пример:

using System;

using System.Threading;

 

// Добавить reference на библиотеку

using NQueueing;

 

namespace NQueueingTestApp

{

    class Program

    {

 

        // Имитация случайной задержки

        static Random rnd = new Random(DateTime.Now.Millisecond);

        static void RandomSleep(int min, int max)

        {

            Thread.Sleep(rnd.Next(min, max));

        }

 

 

        static void ProcessingOne(int item)

        {

            // Обработать здесь задачу item...

            // Обработка происходит в отдельном потоке.

            RandomSleep(10, 300); // Случайная задержка в обработке

 

            // Номер текущего потока, от 0 до (maxThreadsCount-1)

            string name = Thread.CurrentThread.Name;

            Console.WriteLine("{0} / Thread №: {1}", item, name);

        }

 

        static void Main(string[] args)

        {

            // Создадим Систему Массового Обслуживания состоящую

            // из 3-х потоков и очередью длины 10.

            IServerQueue<int> smo = new StandartServerQueue<int>(ProcessingOne, 10, 3);

 

            smo.Start(); // Запускаем процесс обработки

 

            // Добавим 15 задач в очередь

            for (int task = 0; task < 15; task++)

            {

                // Поставим задачу в очередь в текущем потоке

                EnqueueStatus result = smo.TryEnqueue(task);

 

                RandomSleep(10, 100); // Случайная задержка при добавлении

 

                Console.WriteLine("{0} / Enqueue: {1}", task, result);

            }

            smo.Close(); // Не забываем остановить

 

            Console.ReadKey();

        }

    }

}



Q: Расширяемость?
A: Да, такая возможность есть. Для этого необходимо реализовать интерфейс IServerQueue, либо реализовать наследника от базового класса ServerQueueBase.

Буду благодарен за Ваши отзывы, рекомендации и конструктивные советы.

Ссылки:
  1. Queueing model

22 комментария:

  1. "и очередью длины 10"
    что такое "длина очереди" ?

    ОтветитьУдалить
  2. @zerkms
    StandartServerQueue(ProcessingOne, 10, 3);
    Имелось в виду максимальный размер размер накопителя или максимальная длина очереди.

    Другими словами это параметр K в нотации Кендалла:
    "K: The number of places in the system
    The capacity of the system, or the maximum number of customers allowed in the system including those in service. When the number is at this maximum, further arrivals are turned away. If this number is omitted, the capacity is assumed to be unlimited, or infinite."
    http://en.wikipedia.org/wiki/Kendall%27s_notation

    ОтветитьУдалить
  3. Всё-таки не совсем понятна сфера применения, и от этого непонятно зачем нужна эта библиотека.
    Для "приложения которое принимает из сети пакеты, фильтрует, разбирает и складывает их в базу" при разумной нагрузке думаю проще использовать Task Parallel Library и BlockingCollection.

    ОтветитьУдалить
  4. @Александр
    NServiceBus - более ориентирована на взаимодействие нескольких систем. Насколько я помню, в NServiceBus главное это транспорт заявки.

    NQueueing фактически очередь и потоки Queue+Threads.

    ОтветитьУдалить
  5. @Konstantin
    Task Parallel Library и BlockingCollection все это из .NET 4.0.

    ОтветитьУдалить
  6. Здравствуйте!
    Я новичек в .NET, есть простая задача - нужна очередь потоков, максимальное количество одновременно выполняемых потоков допустим 10, в очередь можно в любой момент добавлять потоки, которые стартуют когда подойдёт их "очередь".
    Как это сделать попроще? Любые .NET-технологии.

    ОтветитьУдалить
  7. @Hermann
    Например используя NQueueing так:

    static void ProcessingOne(Thread item)
    {
    item.Start();
    item.Join();
    }

    static void Main(string[] args)
    {

    IServerQueue smo = new StandartServerQueue(ProcessingOne, 64, 10);
    smo.Start();
    for (int i = 0; i < 15; i++)
    {
    smo.Enqueue(new Thread(() => { /* Ваш thread */ }));
    RandomSleep(10, 100);
    }
    smo.Close();
    }

    ОтветитьУдалить
  8. Великолепно, спасибо большое!
    И маленький вопрос, потоки стартуют в порядке добавления в очередь, или могут в любом порядке? По этому маленькому примеру кажется что в любом порядке:

    IServerQueue smo = new StandartServerQueue(ProcessingOne, 512, 10);
    smo.Start();
    for (int i = 0; i < 200; i++)
    {
    int i1 = i;
    smo.Enqueue(new Thread(() => { Console.WriteLine(i1); Thread.Sleep(5000); }));
    }
    smo.Stop();

    ОтветитьУдалить
  9. Хм, парсер съедает знаки меньше-больше...

    ОтветитьУдалить
  10. Да, в любом порядке.
    Может произойти следующая ситуация: извлекается первый поток из очереди и пока ему делают item.Start(); другой поток извлекает уже второй поток из очереди и ему же успевает сделать item.Start(); быстрее первого. Тогда второй окажется запущен быстрее первого.

    ОтветитьУдалить
  11. По правде говоря у меня возник точно такой же вопрос как и у Konstantin Savelev'а.

    Дело в том, что Task Parallel Library и Blocking Collection можно использовать в .NET Framework 3.5 SP1.

    Есть релиз библиотеки Reactive Extensions for .NET 3.5, в которую как раз входит то, что раньше называли Parallel Extensions CTP.

    Соответственно вопрос. Есть ли какие-то выгодные отличия в API этой библиотеки, ориентированной на определенную задачу от более обобщенного API у Parallel Extensions?


    ...еще раз прочитал исходную задачу. Вполне возможно что Parallel Extensions тут не очень подойдет. Возможно что тут будет более полезно использовать как раз Reactive Extensions и прокидывать приходящие данные через разные этапы обработки, при этом на каждом из этапе использовать свой Rx-планировщик (IScheduler).

    Конечно это при условии что внешний мир проталкивает нам данные (пакеты), а не наша система вытягивает эти данные извне.

    ОтветитьУдалить
  12. Rx, TPL это инструменты для широкого применения, а nQueueing это готовый компонент, решающий конкретную задачу - как можно быстро и просто построить систему массового обслуживания.

    ОтветитьУдалить
  13. Этот комментарий был удален автором.

    ОтветитьУдалить
  14. Приветствую! Интересный проект, не совсем понятно:
    "При помощи потоко-безопасных TryEnqueue/Enqueue ставить задачи в очередь"
    А сразу обработать задачу, не ставя её в очередь, нельзя?

    ОтветитьУдалить
  15. И можно ещё вопрос, как происходит извлечение задачи из очереди?

    ОтветитьУдалить
  16. >> А сразу обработать задачу, не ставя её в очередь, нельзя?
    Если интенсивность потока заявок превышает интенсивность обработки, то мы не откидываем заявку, а ставим в очередь, иначе пришлось бы ее откинуть.

    >> И можно ещё вопрос, как происходит извлечение задачи из очереди?
    В этой версии посредством "опроса очереди" polling. В данном случае это не оптимально. Правильно было бы использовать push механизм. Хотя для batch очереди polling оправдан, собственно для таких очередей я и задумывал проект.

    Если Вам необходимо реализовать producing-consuming pattern, то я рекомендую использовать System.Collections.Concurrent.ConcurrentQueue в качестве очереди. Правда это .NET4.0

    ОтветитьУдалить
  17. >>Если интенсивность потока заявок превышает интенсивность обработки, то мы не откидываем заявку, а ставим в очередь, иначе пришлось бы ее откинуть.
    Нет, я имею ввиду случаи, когда интенсивность потока не превышает интенсивность обработки, а просто имеются не занятые обработкой потоки. Т.е. напрямую назначить свободному потоку обработку задачи, игнорируя тем самым дополнительный перенос задачи в очередь.

    Мне сегодня этот компонент помог, поставленная задача решена, спасибо)

    Ещё правда возникла проблема с тем, что в текущий момент времени не удаётся определить длину очереди и её средний показатель, за всё время работы СМО.

    ОтветитьУдалить
  18. Имею ввиду текущее колличество задач, поставленных в очередь.

    ОтветитьУдалить
  19. Ну если интенсивность потока ниже чем интенсивность обработки, то тогда очередь не нужна (так как она будет всегда пустой). В таком случае обрабатывать заявки надо в пуле потоков: либо стандартным (http://msdn.microsoft.com/ru-ru/library/system.threading.threadpool.aspx), либо написать свой.
    Длину очереди можно вычислить теоретически либо измерить практически. В первом случае надо знать показатели системы, а во втором достаточно задать размер очереди с "запасом" и погонять, при этом собирая статистику.

    ОтветитьУдалить
  20. Здравствуйте Илья. Я новичек в C# и сейчас у меня есть задачка составить СМО. Есть несколько вопросов: 1. Как мне сделать, чтобы задача подавалась раз в 20 минут? 2. Я так понимаю если есть 3 свободных потока и 10 задач, то берутся сразу 3 задачи на выполнение, по освобождению поток берутся еще 3. А если моя задача должна выполняться сначала в 1 потоке(допустим 10 минут), после должна выполняться во 2 потоке(8 минут), как мне это реализовать?

    ОтветитьУдалить
  21. >> 1. Как мне сделать, чтобы задача подавалась раз в 20 минут?
    Вам нужно использовать планировщик задач. Есть 2 пути решения: написать свой и очень простой либо использовать готовую библиотеку. Свой планировщик пишется с использованием таймера (см http://msdn.microsoft.com/en-us/library/system.threading.timer.aspx), из готовых например http://quartznet.sourceforge.net.

    >>А если моя задача должна выполняться сначала в 1 потоке(допустим 10 минут), после должна выполняться во 2 потоке(8 минут), как мне это реализовать?
    Блоки выполнения можно соединять либо параллельно либо последовательно. В вашем случае нужно именно последовательно. Если рассмотреть мой код, то в методе ProcessingOne нужно сделать новую задачу, и запланировать ее в новой СМО: smo1.TryEnqueue(task1);

    PS: Олег, моя статья очень сильно устарела, и я настоятельно рекомендую вам воспользоваться TPL для решения этой задачи: http://msdn.microsoft.com/en-us/library/dd460717.aspx, или
    TPL Dataflow: http://msdn.microsoft.com/en-us/devlabs/gg585582.aspx.

    ОтветитьУдалить