Недавно я был оспорен с задачей создания процесса, который извлекает данные из DB SQL Server и записывает его в файлы Parquet.
я искал онлайн и нашел различные примеры, которые загружают данные в DataTable, а затем записывают данные через ParquetWriter в файлы Parquet. static void TestWriteParquet(string ConnectionString, string Query, string OutputFilePath, int rowGroupSize = 10000)
{
DataTable dt = GetData(ConnectionString, Query);
var fields = GenerateSchema(dt);
using (var stream = File.Open(OutputFilePath, FileMode.Create, FileAccess.Write))
{
using (var writer = new ParquetWriter(new Schema(fields), stream))
{
var startRow = 0;
// Keep on creating row groups until we run out of data
while (startRow < dt.Rows.Count)
{
using (var rgw = writer.CreateRowGroup())
{
// Data is written to the row group column by column
for (var i = 0; i < dt.Columns.Count; i++)
{
var columnIndex = i;
// Determine the target data type for the column
var targetType = dt.Columns[columnIndex].DataType;
if (targetType == typeof(DateTime)) targetType = typeof(DateTimeOffset);
// Generate the value type, this is to ensure it can handle null values
var valueType = targetType.IsClass
? targetType
: typeof(Nullable).MakeGenericType(targetType);
// Create a list to hold values of the required type for the column
var list = (IList)typeof(List)
.MakeGenericType(valueType)
.GetConstructor(Type.EmptyTypes)
.Invoke(null);
// Get the data to be written to the parquet stream
foreach (var row in dt.AsEnumerable().Skip(startRow).Take(rowGroupSize))
{
// Check if value is null, if so then add a null value
if (row[columnIndex] == null || row[columnIndex] == DBNull.Value)
{
list.Add(null);
}
else
{
// Add the value to the list, but if it’s a DateTime then create it as a DateTimeOffset first
list.Add(dt.Columns[columnIndex].DataType == typeof(DateTime)
? new DateTimeOffset((DateTime)row[columnIndex])
: row[columnIndex]);
}
}
// Copy the list values to an array of the same type as the WriteColumn method expects
// and Array
var valuesArray = Array.CreateInstance(valueType, list.Count);
list.CopyTo(valuesArray, 0);
// Write the column
rgw.WriteColumn(new Parquet.Data.DataColumn(fields, valuesArray));
}
}
startRow += rowGroupSize;
}
}
}
}
< /code>
Учитывая тот факт, что мы имеем дело с огромными таблицами, которые должны быть разделены на несколько файлов, мне интересно, есть ли способ потоковой передачи данных вместо загрузки в таблицу данных? Что будет альтернативой этому подходу? Но, учитывая, что наша самая большая таблица имеет примерно 200 миллиардов строк и составляет не менее 60 столбцов, я сомневаюсь, что это может быть достигнуто.
Подробнее здесь: https://stackoverflow.com/questions/702 ... rquet-file