Я работаю над инструментом ETL, который может получать данные из разных источников (например, Snowflake, SQL и т. Д.), А затем, в этом сценарии, вставьте его в базу данных SQL. GRPC), у меня нет ни одного идатарера, который приведет меня к созданию собственной реализации. /> Все логика работает, но я стараюсь сделать его более эффективным и масштабируемым при работе с огромными наборами данных.var bigQueryClient = await bigQueryClientProvider.GetBigQueryRestClientAsync(sourceCredentialId);
var results = await bigQueryClient.ExecuteQueryAsync(
selectQuery,
parameters: [],
queryOptions: new QueryOptions
{
UseQueryCache = true
},
cancellationToken: cancellationToken);
// results = all records
var fieldsMetadata = new List();
var columnIndex = 0;
foreach (var field in results.Schema.Fields)
{
fieldsMetadata.Add(new BigQueryFieldMetadata(field.Name, columnIndex++, field.Type, field.Mode));
}
return (new BigQueryQueryDataReader(results), fieldsMetadata);
class BigQueryQueryDataReader : IDataReader { .... }
< /code>
Пример кода реализации BQ grpc: < /p>
var bigQueryGrpcClient = await bigQueryClientProvider.GetBigQueryGrpcClientAsync(sourceCredentialId);
var projectId = await bigQueryClientProvider.GetProjectIdAsync(sourceCredentialId);
var tableNameRef = TableName.FromProjectDatasetTable(projectId, sourceDataset, sourceTableName);
var readSession = bigQueryGrpcClient.CreateReadSession(new CreateReadSessionRequest
{
Parent = $"projects/{tableNameRef}",
ReadSession = new ReadSession
{
Table = tableNameRef.ToString(),
DataFormat = DataFormat.Arrow,
ReadOptions = new ReadSession.Types.TableReadOptions(),
},
MaxStreamCount = 1
});
if (sourceSelectedColumns?.Count > 0)
{
readSession.ReadOptions.SelectedFields.AddRange(sourceSelectedColumns);
readSession.ReadOptions.RowRestriction = sourceFilterBy;
}
var results = new List();
var fieldsMetadata = new List();
foreach (var stream in readSession.Streams)
{
cancellationToken.ThrowIfCancellationRequested();
var streamRows = await this.ReadFromStreamAsync(bigQueryGrpcClient, stream.Name, fieldsMetadata, cancellationToken);
results.AddRange(streamRows);
// inserting rows into a list of key-value dictionary
}
return (new BigQueryTableScanDataReader(results, results.FirstOrDefault()?.Keys?.ToList() ?? []), fieldsMetadata);
class BigQueryTableScanDataReader : IDataReader { .... }
Подробнее здесь: https://stackoverflow.com/questions/796 ... ng-c-sharp
Передача данных набора данных больших запросов на SQL Server с помощью C# ⇐ C#
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение