Boost.Cobalt: как использовать каналы из обратного вызова C API?C++

Программы на C++. Форум разработчиков
Ответить
Anonymous
 Boost.Cobalt: как использовать каналы из обратного вызова C API?

Сообщение Anonymous »

Я хочу использовать C API в конвейере сопрограмм, взаимодействующих через каналы.
Это моя первая попытка работы с сопрограммами, и мои знания о них ограничены.
форма трубопровода:

Код: Выделить всё

 --------    1    ---------    2    ------
| source | ----> | process | ----> | sink |
--------         ---------         ------
Каждое поле представляет сопрограмму, а каждая стрелка — канал.
В сопрограмме процесса используется API C. >
Его подпись примерно такая: bool start_work(consumer_callback). Этот API
синхронен и вызывает Consumer_callback один раз для каждых создаваемых им данных.
Сначала я рассматривал возможность записи в канал 2 (см. диаграмму выше). в
обратном вызове, но это изменит сигнатуру обратного вызова, поэтому это
невозможно.
Я изменил передачу дескриптора сопрограммы в обратный вызов, который возобновляется это. Затем
возобновленная сопрограмма записывает данные в канал 2.
Упрощенный код:

Код: Выделить всё

#include 
#include 
#include 
#include 
#include 
#include 
#include 

namespace cobalt = boost::cobalt;

// Data to communicate between the callback and the channel writer.
struct Data {
std::optional result;
bool done = false;
std::coroutine_handle coro_handle;
};

using Callback = void (*)(int, void*, bool);

void consumer_callback(int i, void* data, bool done) {
Data& data_ = *reinterpret_cast(data);
data_.done = done;
if (!done) {
data_.result = i;
}
data_.coro_handle.resume();
}

// C API that produces results and calls the callback to consume each result.
// Results are integers.
void start_work(void* data, Callback cb) {
bool done = false;
for (int i = 0; i < 10; ++i) {
cb(i, data, done); // !done case
}
done = true;
cb(0, data, done); // done case
}

struct Awaiter : std::suspend_always {
Data& data;
bool first;

bool await_ready() {
return data.result.has_value();
}

void await_suspend(std::coroutine_handle h) {
data.coro_handle = h;
if (first) start_work(&data, consumer_callback);
}

int await_resume() {
assert(data.result.has_value());
auto opt = std::exchange(data.result, std::nullopt);
return opt.value();
}
};

Awaiter nextResult(Data& data, bool first) {
return {{}, data, first};
}

cobalt::promise source(cobalt::channel& out) {
co_await out.write("Hello world!");
out.close();
}

cobalt::promise process(cobalt::channel& in, cobalt::channel& out) {
Data data;
while (in.is_open() && out.is_open()) {
auto _ = co_await in.read(); // ignore result for now
auto first = true;
while (!data.done || data.result.has_value()) {
auto i = co_await nextResult(data, first);
co_await out.write(i);
first = false;
}
}
in.close();
out.close();
}

cobalt::promise sink(cobalt::channel& in) {
while (in.is_open()) {
auto i = co_await in.read(); // ignore result for now
}
in.close();
}

cobalt::main co_main(int argc, char* argv[]) {
cobalt::channel a;
cobalt::channel b;
co_await cobalt::join(
source(a),
process(a, b),
sink(b)
);
co_return 0;
}
Приемник правильно получает все данные, но когда сопрограмма процесса завершается, внутри Asio находится сопрограмма, возобновляющая работу с нулевым указателем. Что я делаю не так?
Спасибо!
Среда:
Ubuntu 20.04
Boost 1.85
g++13 -std=gnu++2a

Подробнее здесь: https://stackoverflow.com/questions/787 ... i-callback
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «C++»