Как публиковать потоковые данные WebSocket в Kafka?C#

Место общения программистов C#
Ответить Пред. темаСлед. тема
Anonymous
 Как публиковать потоковые данные WebSocket в Kafka?

Сообщение Anonymous »

Я получаю данные тика от подключения к WebSocket. Как только я получу данные, мне нужно опубликовать их в Кафку. То, что я попробовал до сих пор, то, как только я получу данные от WebSocket, я запускаю асинхронную задачу, чтобы опубликовать данные в Kafka и хранить задание Async в хэшсете, называемом ProductaskList . Я также добавил обратный вызов к задаче, который, когда задача завершена, она удаляется из хэшса. быть пустым перед закрытием приложения.
Код, который я написал до сих пор:

Код: Выделить всё

class KafkaObject
{
// producer, config variables
private HashSet produceTaskList;

public void KafkaProduceAsync(string jsonString)
{
try
{
// extract topic, data from jsonString
var task = producer.ProduceAsync(topic, new Message { Key = topic, Value = data });

_ = task.ContinueWith(self =>
{
produceTaskList.Remove(self);
if (self.IsFaulted)
{
logger.LogError($"Error producing message: {self.Exception}");
}
});

produceTaskList.Add(task);
}
// catch blocks for error handling
}

public async Task WaitProduce()
{
try
{
await Task.WhenAll(produceTaskList);
}
catch (Exception ex)
{
logger.LogError($"Error in WaitProduce: {ex.Message}");
}
logger.LogInformation("Pending Kafka Produce Tasks Finished.");
producer.Flush();
producer.Dispose();
}
}
< /code>
using System.Net.Sockets;
using System.Net.WebSockets;
using System.Text;

namespace TickDataKafka;
internal class Program
{
static async Task Main()
{
// set url, other variables
Console.WriteLine("Hello world!");

var cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = cancellationTokenSource.Token;

var reconnectTokenSource = new CancellationTokenSource();
ClientWebSocket socket = new();
bool close = false;

Console.CancelKeyPress += async (s, e) =>
{
e.Cancel = true;
cancellationTokenSource.Cancel();
await kafkaObject.WaitProduce();
};

while (!cancellationToken.IsCancellationRequested)
{
try
{
socket = new();
reconnectTokenSource = new();
var reconnectToken = reconnectTokenSource.Token;
await socket.ConnectAsync(new Uri(url), cancellationToken);
while (socket.State == WebSocketState.Open)
{
var recvBuffer = new byte[512];
reconnectTokenSource.CancelAfter(TimeSpan.FromSeconds(15));
var result = await socket.ReceiveAsync(new ArraySegment(recvBuffer), CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, reconnectToken).Token);
if (result.MessageType == WebSocketMessageType.Close)
{
close = true;
break;
}
Console.WriteLine(Encoding.UTF8.GetString(recvBuffer, 0, result.Count).TrimEnd());
}
}
catch (OperationCanceledException)
{
if (cancellationToken.IsCancellationRequested)
{
Console.WriteLine("Closing WebSocket connection.");
break;
}
else
{
reconnectTokenSource.Dispose();
socket.Dispose();
Console.WriteLine("Connection dropped.  Attempting to reconnect...");
}
}
catch (Exception ex)
{
Console.WriteLine($"Error connecting: {ex.Message}, {ex.StackTrace}.");
break;
}
if (close)
{
Console.WriteLine("Vendor closed connection.");
break;
}
}
reconnectTokenSource.Dispose();
socket.Dispose();
Console.WriteLine("Socket destroyed.");
}
}
Проблема заключается в том, что когда я отправляю сигнал sigint , я не получаю сообщение «В ожидании завершенных задач Kafka». Программа, кажется, не ожидает, пока функция waitProduce () завершится.

Я также получаю сообщение об ошибке от kafka %4 | 1738735993.355 | derminate | rdkafka#Производитель-1 | [thrd: app]: производитель, завершающий 1 сообщение (279 байтов), все еще в очереди или транзит: используйте fload (), чтобы ждать выдающейся доставки сообщений , даже если я использую производительность. flush () перед Удаление производителя.


Подробнее здесь: https://stackoverflow.com/questions/794 ... a-to-kafka
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Потоковые данные Telnet отображаются в текстовом поле tkinter
    Гость » » в форуме Python
    0 Ответы
    27 Просмотры
    Последнее сообщение Гость
  • Потоковые данные Telnet отображаются в текстовом поле tkinter
    Anonymous » » в форуме Python
    0 Ответы
    21 Просмотры
    Последнее сообщение Anonymous
  • Как декодировать потоковые данные с помощью библиотеки ffmpeg
    Anonymous » » в форуме C++
    0 Ответы
    13 Просмотры
    Последнее сообщение Anonymous
  • Python запрашивает потоковые данные из API
    Anonymous » » в форуме Python
    0 Ответы
    14 Просмотры
    Последнее сообщение Anonymous
  • Потоковые данные из сети в другое приложение на Android
    Anonymous » » в форуме Android
    0 Ответы
    3 Просмотры
    Последнее сообщение Anonymous

Вернуться в «C#»