Потоковая передача данных и обработка с помощью kafka+javaJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Потоковая передача данных и обработка с помощью kafka+java

Сообщение Anonymous »

Я относительно новичок в программировании (вы можете видеть это в моем коде), но в настоящее время я больше изучаю Kafka и Java для обработки данных. Имея данные в теме, мне нужно выполнить соединения с некоторыми таблицами для проверки существования данных и получения других данных, поэтому я делаю несколько запросов к базе данных (слишком много полей для извлечения, мне нужны отдельные запросы, чтобы сделать их читабельными). Для каждой записи получено из темы. Я делаю некоторые подключения к базам данных, а затем (после обработки данных) обновляю таблицы (я делаю это пакетами для таблиц, только это быстро).

Моя проблема здесь во времени. Я делаю тесты с двумястами тысячами регистров и... Это занимает полчаса, а шесть тысяч - это слишком медленно. Мой код выглядит примерно так:

Код: Выделить всё

public class TestKafka {
public static Connection conexion = null;
public static void main(){
conexion = C3P0DataSource.getInstance().getConnection();
runConsumer();
}
.
..
public static void runConsumer(){
try // ( Connection conexion C3P0DataSource.getInstance().getConnection();)
{
conexion.setAutoCommit(false);
while (true) {  // with kafka connector - I try to simulate data streaming
final ConsumerRecords consumerRecords = consumer.poll(Long.MAX_VALUE);
List recordData = new ArrayList();
ObjectMapper mapper = new ObjectMapper();
for (ConsumerRecord record : consumerRecords) {
Map map = new HashMap();
DataStructure_Topic config = mapper.readValue(record.value(), DataStructure_Topic.class);
map.put("row_id_1", config.getCodent());
map.put("row_id_2", config.getCentalta());
map.put("row_id_3", config.getCuenta());
datosAComprobar.add(map);
recordData = firstConsult(recordData, conexion);
if (recordData.size() > 0) {
recordData = SecondConsult(recordData, conexion);
// few petitions to the database
if (recordData.size() > 0) {
// ..data processing.. and update
}
}
datosAComprobar.clear();
}
consumer.commitSync();
Thread.sleep(100);
}
} catch(){...}
}
Запрос к базе данных (одинаковая структура для каждого запроса):

Код: Выделить всё

public static List FirstConsult(List MyList, Connection conn) {
PreparedStatement stmt = null;
ResultSet rs = null;
List list = new ArrayList();
String query = "";
int contador = 1;
for (Map val : MyList) {
query += "select " + val.get("row1") + " as r1, " + val.get("row2") + " as row2,"+val.get("cuenta")+"from table_a inner join table_b...."
if (contador < MyList.size()) {
query += "\r\nunion\r\n";
}
contador += 1;
}
try {
stmt = conn.prepareStatement(query);
rs = stmt.executeQuery();
ResultSetMetaData rsmd = rs.getMetaData();
int columnsNumber = rsmd.getColumnCount();
if (rs.next()) {
do {
Map map = new HashMap();
for (int i = 1; i 

Подробнее здесь: [url]https://stackoverflow.com/questions/58991161/streaming-data-and-processing-with-kafkajava[/url]
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»