Моя проблема здесь во времени. Я делаю тесты с двумястами тысячами регистров и... Это занимает полчаса, а шесть тысяч - это слишком медленно. Мой код выглядит примерно так:
Код: Выделить всё
public class TestKafka {
public static Connection conexion = null;
public static void main(){
conexion = C3P0DataSource.getInstance().getConnection();
runConsumer();
}
.
..
public static void runConsumer(){
try // ( Connection conexion C3P0DataSource.getInstance().getConnection();)
{
conexion.setAutoCommit(false);
while (true) { // with kafka connector - I try to simulate data streaming
final ConsumerRecords consumerRecords = consumer.poll(Long.MAX_VALUE);
List recordData = new ArrayList();
ObjectMapper mapper = new ObjectMapper();
for (ConsumerRecord record : consumerRecords) {
Map map = new HashMap();
DataStructure_Topic config = mapper.readValue(record.value(), DataStructure_Topic.class);
map.put("row_id_1", config.getCodent());
map.put("row_id_2", config.getCentalta());
map.put("row_id_3", config.getCuenta());
datosAComprobar.add(map);
recordData = firstConsult(recordData, conexion);
if (recordData.size() > 0) {
recordData = SecondConsult(recordData, conexion);
// few petitions to the database
if (recordData.size() > 0) {
// ..data processing.. and update
}
}
datosAComprobar.clear();
}
consumer.commitSync();
Thread.sleep(100);
}
} catch(){...}
}
Код: Выделить всё
public static List FirstConsult(List MyList, Connection conn) {
PreparedStatement stmt = null;
ResultSet rs = null;
List list = new ArrayList();
String query = "";
int contador = 1;
for (Map val : MyList) {
query += "select " + val.get("row1") + " as r1, " + val.get("row2") + " as row2,"+val.get("cuenta")+"from table_a inner join table_b...."
if (contador < MyList.size()) {
query += "\r\nunion\r\n";
}
contador += 1;
}
try {
stmt = conn.prepareStatement(query);
rs = stmt.executeQuery();
ResultSetMetaData rsmd = rs.getMetaData();
int columnsNumber = rsmd.getColumnCount();
if (rs.next()) {
do {
Map map = new HashMap();
for (int i = 1; i
Подробнее здесь: [url]https://stackoverflow.com/questions/58991161/streaming-data-and-processing-with-kafkajava[/url]