WedX - журнал о программировании и компьютерных науках

Как отправить сообщение в BufferBlock и получить результат от ActionBlock?

Есть объект, который может обрабатывать только один запрос за раз, и для его обработки требуется немного времени. После выполнения задачи вызывается событие, возвращающее результат. Объект Computer в следующем коде, и допустим, я не могу изменить поведение этого класса.

Теперь я хочу создать класс-оболочку, чтобы создать у клиентов впечатление, что они могут отправить запрос в любое время. Запрос теперь является асинхронным методом, так что клиент может просто ждать, пока не будет возвращен результат. Конечно, базовый объект может обрабатывать один запрос за раз, поэтому оболочке необходимо поставить запрос в очередь, а когда приходит событие завершения обработки, он должен вернуть результат соответствующему клиенту. Этот класс-оболочка SharedComputer в следующем коде.

Я думаю, мне нужно вернуть значение, которое я получил от Place1 в Place2. Какова рекомендуемая практика для этого? Разве у BufferBlock/ActionBlock нет механизма для возврата значения?

    static void Main(string[] args)
    {
        SharedComputer pc = new SharedComputer();
        for(int i =0; i<10; i++)
        {
            Task.Factory.StartNew(async() =>
            {
                var r = new Random();
                int randomDelay = r.Next(500, 5000);
                Thread.Sleep(randomDelay);
                int random1 = r.Next(0, 10);
                int random2 = r.Next(0, 10);
                int sum = await pc.Add(random1, random2);
                if(random1 + random2 == sum)
                {
                    Debug.WriteLine($"Got correct answer: {random1} + {random2} = {sum}.");
                }
                else
                {
                    Debug.WriteLine($"Got incorrect answer: {random1} + {random2} = {sum}.");
                }
            });
        }
        System.Console.Read();
    }
}

class SharedComputer
{
    Computer Mainframe= Computer.GetInstance();
    BufferBlock<TwoNumbers> JobQueue = new BufferBlock<TwoNumbers>();
    TaskCompletionSource<int> TCS;

    public SharedComputer()
    {
        Mainframe.Calculated += Mainframe_Calculated;
        var options = new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 1
        };

        var jobProcessor = new ActionBlock<TwoNumbers>(async e =>
        {
            Debug.WriteLine("Starting an adding");
            TCS = new TaskCompletionSource<int>();
            Mainframe.StartAdding(e.A, e.B);
            var res = await TCS.Task; // Place1
            Debug.WriteLine("Got the result."); 
        }, options);

        JobQueue.LinkTo(jobProcessor);
    }

    private void Mainframe_Calculated(object sender, int e)
    {
        TCS.SetResult(e);
    }

    public async Task<int> Add(int a, int b)
    {
        var data = new TwoNumbers()
        {
            A = a,
            B = b
        };
        Debug.WriteLine("Queuing a new adding.");
        JobQueue.Post<TwoNumbers>(data);

        return 1;//Place2: Return the value received at Place1.
    }

    struct TwoNumbers
    {
        public int A;
        public int B;
    }
}

class Computer
{
    static Computer Instance;
    bool IsWorking = false;
    private Computer()
    {
    }

    public static Computer GetInstance()
    {
        if (Instance == null)
            Instance = new Computer();
        return Instance;
    }

    public event EventHandler<int> Calculated;
    public void StartAdding(int a, int b)
    {
        if (IsWorking)
        {
            throw new InvalidOperationException("Already working.");
        }
        IsWorking = true;
        Task.Factory.StartNew(() =>
        {
            Thread.Sleep(3000);
            IsWorking = false;
            Calculated(this, a + b);
        });
    }
}

  • В качестве примечания: ваш код создает 10 экземпляров Random в быстрой последовательности, что делает возможным заполнение некоторых из них одним и тем же начальным числом. Также BufferBlock, вероятно, избыточен. ActionBlock имеет собственную внутреннюю очередь ввода. Вы можете увидеть здесь идиоматический способ отправки работы ActionBlock, и получать уведомления о завершении работы (используются вложенные задачи вместо TaskCompletionSources). 25.03.2021
  • @TheodorZoulias Спасибо. Вы, вероятно, правы насчет избыточности. В примере, который я видел, использовался BufferBlock, поэтому я использовал его таким образом, но похоже, что ActionBlock также имеет Post(). И ActionBlock‹Task‹Task‹int›››, кажется, обеспечивает поведение, которое я хочу; хотя это трудно понять, чтобы иметь задачу задачи. 25.03.2021
  • Да, использование задач задач - довольно продвинутая техника. Вы могли бы добиться того же, используя вместо этого TaskCompletionSources, но код был бы более подробным. Вам нужно будет опубликовать TwoNumbers и TaskCompletionSource в ActionBlock, создав потребность в типе оболочки, или использовать кортежи в качестве оболочек. 25.03.2021

Ответы:


1

базовый объект может обрабатывать один запрос за раз, поэтому оболочке необходимо поставить запрос в очередь, а когда поступит событие завершения обработки, он должен вернуть результат соответствующему клиенту.

Итак, вам нужно взаимное исключение. Хотя вы можете создать мьютекс из потока данных TPL и TaskCompletionSource<T>, гораздо проще просто использовать встроенный SemaphoreSlim.

IMO чище сначала написать асинхронную абстракцию, а затем добавить взаимное исключение. Асинхронная абстракция будет выглядеть так:

public static class ComputerExtensions
{
  public static Task<int> AddAsync(this Computer computer, int a, int b)
  {
    var tcs = new TaskCompletionSource<int>();
    EventHandler<int> handler = null;
    handler = (_, result) =>
    {
      computer.Calculated -= handler;
      tcs.TrySetResult(result);
    };
    computer.Calculated += handler;
    computer.StartAdding(a, b);
  }
}

Если у вас есть асинхронный API, вы можете легко применить асинхронное регулирование (или взаимное исключение) через SemaphoreSlim:

class SharedComputer
{
  Computer Mainframe = Computer.GetInstance();
  readonly SemaphoreSlim _mutex = new();

  public async Task<int> AddAsync(int a, int b)
  {
    await _mutex.WaitAsync();
    try { return Mainframe.AddAsync(a, b); }
    finally { _mutex.Release(); }
  }
}

Кстати, используйте Task.Run вместо Task.Factory.StartNew.

25.03.2021
Новые материалы

Объяснение документов 02: BERT
BERT представил двухступенчатую структуру обучения: предварительное обучение и тонкая настройка. Во время предварительного обучения модель обучается на неразмеченных данных с помощью..

Как проанализировать работу вашего классификатора?
Не всегда просто знать, какие показатели использовать С развитием глубокого обучения все больше и больше людей учатся обучать свой первый классификатор. Но как только вы закончите..

Работа с цепями Маркова, часть 4 (Машинное обучение)
Нелинейные цепи Маркова с агрегатором и их приложения (arXiv) Автор : Бар Лайт Аннотация: Изучаются свойства подкласса случайных процессов, называемых дискретными нелинейными цепями Маркова..

Crazy Laravel Livewire упростил мне создание электронной коммерции (панель администратора и API) [Часть 3]
Как вы сегодня, ребята? В этой части мы создадим CRUD для данных о продукте. Думаю, в этой части я не буду слишком много делиться теорией, но чаще буду делиться своим кодом. Потому что..

Использование машинного обучения и Python для классификации 1000 сезонов новичков MLB Hitter
Чему может научиться машина, глядя на сезоны новичков 1000 игроков MLB? Это то, что исследует это приложение. В этом процессе мы будем использовать неконтролируемое обучение, чтобы..

Учебные заметки: создание моего первого пакета Node.js
Это мои обучающие заметки, когда я научился создавать свой самый первый пакет Node.js, распространяемый через npm. Оглавление Глоссарий I. Новый пакет 1.1 советы по инициализации..

Забудьте о Matplotlib: улучшите визуализацию данных с помощью умопомрачительных функций Seaborn!
Примечание. Эта запись в блоге предполагает базовое знакомство с Python и концепциями анализа данных. Привет, энтузиасты данных! Добро пожаловать в мой блог, где я расскажу о невероятных..


Для любых предложений по сайту: [email protected]