Ищу помощь с моим пользовательским инструментом upsert для Redis 7 и Laravel 11. Моему проекту нужна быстрая и расширенная отчетность. Одна из моих моделей называется StatisticAggregate, которая агрегирует данные в течение дня, принимает сеансы, увеличивает строку или создает ее, если она не существует, для каждого сеанса.
У меня есть несколько сотен записей, которые необходимо обновить или вставить, поэтому здесь уместно добавить в мою базу данных MySQL 8.
Данные добавляются в Redis поток, а затем пользовательский artisan, похожая на Laravel Pulse, работает бесконечно, извлекает данные порциями и добавляет их в базу данных, я прикреплю код.
Проблема, которую я вижу, заключается в том, что В течение дня поток приема в Redis заполняет все больше записей, чем то, что извлекается, до такой степени, что обработка фрагмента из 1000 элементов занимает около 20-30 секунд, то есть 2000 в минуту, однако в часы пик каждую минуту в поток поступает около 5000 или более записей.
Похоже, что это не является узким местом базы данных, поскольку, несмотря на огромное отставание в Redis и В таблице 100 тысяч строк, за ночь отставание полностью прорабатывается, когда нагрузка на систему падает, если бы таблица работала медленно, это наверняка было бы здесь видно.
Что могу ли я отсутствовать в своем код?
Мой рабочий php artisan statistic:aggregates:stream:work:
namespace App\Services;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\Redis;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Arr;
use Illuminate\Support\Sleep;
use Illuminate\Support\Lottery;
use Illuminate\Support\Collection;
use Illuminate\Database\QueryException;
use Illuminate\Database\Query\Expression;
use App\Models\Reports\StatisticAggregate;
use App\Models\Reports\StatisticAggregateRelation;
use Carbon\CarbonImmutable;
use Carbon\CarbonInterval;
use Throwable;
class StatisticAggregateStream
{
/**
* The redis stream.
*/
protected static string $stream = 'statistic_aggregates:stream';
/**
* The chunk size for the stream
*/
protected static int $chunk = 1000;
/**
* Gets the stream key
*/
private static function key(string $key = ''): string
{
return self::$stream . ':' . $key;
}
/**
* Store recent digest runtime stats
*/
private static function logDigestRuntime(int $start = 0): void
{
Redis::connection('stream')->xadd(self::key('runtime-stats'), '*', [
'data' => json_encode([
'digest_runtime_in_seconds' => round(microtime(true) - $start)
])
], 100);
}
/**
* Store recent digest upsert stats
*/
private static function logDigestUpsert(int $statisticAggregate = 0, int $statisticAggregateRelation = 0): void
{
Redis::connection('stream')->xadd(self::key('upsert-stats'), '*', [
'data' => json_encode([
'statistic_aggregate_duration_in_ms' => $statisticAggregate,
'statistic_aggregate_relation_duration_in_ms' => $statisticAggregateRelation
])
], 100);
}
/**
* Restart the stream
*/
private static function restart(): void
{
Cache::store('stream')->forever(self::key('restart'), now());
}
/**
* Clear the stream
*/
private static function clear(): void
{
// ...
}
/**
* Store the entries in storage
*/
private static function store(Collection $items): void
{
$start = microtime(true);
if ($items->isEmpty()) {
self::logDigestRuntime($start);
return;
}
$items->each(function (array $aggregate) {
try {
$column = Arr::get($aggregate, 'column', 'total_other');
$filteredAggregates = collect(Arr::get($aggregate, 'aggregates', []))->map(function ($aggregate) {
return collect($aggregate)->except([
'company_id', 'affiliate_id', 'affiliate_campaign_id', 'buyer_id', 'buyer_tier_id'
])->toArray();
})->toArray();
$statistics = StatisticAggregate::upsert($filteredAggregates, [
'for',
'product',
'country',
'bucket',
'period',
'modelable_type',
'modelable_id',
'bucket_starts_at',
'bucket_ends_at',
'key_hash'
], [
$column => new Expression("`$column` + VALUES(`$column`)")
]);
$statistics = StatisticAggregateRelation::upsert(Arr::get($aggregate, 'aggregates', []), [
'for',
'product',
'country',
'bucket',
'period',
'modelable_type',
'modelable_id',
'bucket_starts_at',
'bucket_ends_at',
'key_hash'
], [
$column => new Expression("`$column` + VALUES(`$column`)")
]);
} catch (QueryException $err) {
if (config('app.debug')) {
Log::warning('function: store QueryException error', [
'err' => $err->getMessage(),
'file' => $err->getFile(),
'line' => $err->getLine(),
]);
}
} catch (Throwable $err) {
\Sentry\captureException($err);
if (config('app.debug')) {
Log::warning('function: store throwable error', [
'err' => $err->getMessage(),
'file' => $err->getFile(),
'line' => $err->getLine(),
]);
}
}
});
Lottery::odds(1, 2)->winner(fn () => self::logDigestRuntime($start))->choose();
}
/**
* Returns the ingest key
*/
public static function ingestKey(): string
{
return self::key('ingest');
}
/**
* Restart the stream
*/
public static function command(string $cmd = ''): void
{
match ($cmd) {
'restart' => self::restart(),
'clear' => self::clear(),
default => null
};
}
/**
* Append data to the stream
*/
public static function append(array $data): void
{
Redis::connection('stream')->xadd(self::ingestKey(), '*', [
'data' => serialize($data)
]);
}
/**
* Digest the stored ingest entries
*/
public static function digest(): int
{
$total = 0;
while (true) {
$entries = collect(Redis::connection('stream')->xrange(self::ingestKey(), '-', '+', self::$chunk));
if ($entries->isEmpty()) {
return $total;
}
$keys = $entries->keys();
self::store(
$entries->map(fn (array $payload) => unserialize($payload['data']))
);
Redis::connection('stream')->xdel(self::ingestKey(), $keys->toArray());
if ($entries->count() < self::$chunk) {
return $total + $entries->count();
}
$total = $total + $entries->count();
}
}
/**
* Statistics for stream
*/
public static function stats(): array
{
$stats = [
'stream' => [
'max_wait_time_in_minutes' => 0
]
];
$oldestEntry = Redis::connection('stream')->xrange(self::ingestKey(), '-', '+', 1);
$newestEntry = Redis::connection('stream')->xrevrange(self::ingestKey(), '+', '-', 1);
if ($oldestEntry && $newestEntry) {
$oldestTimestamp = (int) explode('-', array_key_first($oldestEntry))[0];
$newestTimestamp = (int) explode('-', array_key_first($newestEntry))[0];
if ($oldestEntry && $newestTimestamp) {
Arr::set($stats, 'stream.max_wait_time_in_minutes', intval(($newestTimestamp - $oldestTimestamp) / 1000 / 60, 0));
}
}
return $stats;
}
}
Чтобы внести ясность в часть upsert, как StatisticAggregate::upsert, так и StatisticAggregateRelation::upsert независимо занимают около 15 мс каждый, и каждый upsert здесь может быть добавлено от 3 до 5 строк. В key_hash есть уникальный индекс
Ищу помощь с моим пользовательским инструментом upsert для Redis 7 и Laravel 11. Моему проекту нужна быстрая и расширенная отчетность. Одна из моих моделей называется StatisticAggregate, которая агрегирует данные в течение дня, принимает сеансы, увеличивает строку или создает ее, если она не существует, для каждого сеанса. У меня есть несколько сотен записей, которые необходимо обновить или вставить, поэтому здесь уместно добавить в мою базу данных MySQL 8. Данные добавляются в Redis поток, а затем пользовательский artisan, похожая на Laravel Pulse, работает бесконечно, извлекает данные порциями и добавляет их в базу данных, я прикреплю код. Проблема, которую я вижу, заключается в том, что В течение дня поток приема в Redis заполняет все больше записей, чем то, что извлекается, до такой степени, что обработка фрагмента из 1000 элементов занимает около 20-30 секунд, то есть 2000 в минуту, однако в часы пик каждую минуту в поток поступает около 5000 или более записей. Похоже, что это не является узким местом базы данных, поскольку, несмотря на огромное отставание в Redis и В таблице 100 тысяч строк, за ночь отставание полностью прорабатывается, когда нагрузка на систему падает, если бы таблица работала медленно, это наверняка было бы здесь видно. Что могу ли я отсутствовать в своем код? Мой рабочий php artisan statistic:aggregates:stream:work: [code]namespace App\Console\Commands\StatisticAggregates;
use Illuminate\Console\Command; use Illuminate\Contracts\Console\Isolatable; use Illuminate\Support\Sleep; use Illuminate\Support\Facades\Cache; use Illuminate\Support\Facades\Log; use App\Services\StatisticAggregateStream; use Carbon\CarbonImmutable;
class StatisticAggregatesStreamWork extends Command implements Isolatable { /** * The name and signature of the console command. * * @var string */ protected $signature = 'statistic:aggregates:stream:work';
/** * Schedule Telescope to store entries if enabled. */ protected function ensureTelescopeEntriesAreCollected(): void { if (config('telescope.enabled') && $this->laravel->bound(\Laravel\Telescope\Contracts\EntriesRepository::class)) { \Laravel\Telescope\Telescope::store($this->laravel->make(\Laravel\Telescope\Contracts\EntriesRepository::class)); } }
/** * Ensure we collect any garbage */ protected function collectGarbage(int $garbage = 96): void { if ($garbage && (memory_get_usage() / 1024 / 1024) > $garbage) { gc_collect_cycles(); } }
/** * Execute the console command. */ public function handle(): int { $lastRestart = Cache::get('statistic_aggregates:ingest:restart');
Sleep::for(1)->second(); } } } [/code] И функция дайджеста: [code]namespace App\Services;
use Illuminate\Support\Facades\Cache; use Illuminate\Support\Facades\Redis; use Illuminate\Support\Facades\Log; use Illuminate\Support\Facades\DB; use Illuminate\Support\Arr; use Illuminate\Support\Sleep; use Illuminate\Support\Lottery; use Illuminate\Support\Collection; use Illuminate\Database\QueryException; use Illuminate\Database\Query\Expression; use App\Models\Reports\StatisticAggregate; use App\Models\Reports\StatisticAggregateRelation; use Carbon\CarbonImmutable; use Carbon\CarbonInterval; use Throwable;
class StatisticAggregateStream { /** * The redis stream. */ protected static string $stream = 'statistic_aggregates:stream';
/** * The chunk size for the stream */ protected static int $chunk = 1000;
/** * Returns the ingest key */ public static function ingestKey(): string { return self::key('ingest'); }
/** * Restart the stream */ public static function command(string $cmd = ''): void { match ($cmd) { 'restart' => self::restart(), 'clear' => self::clear(), default => null }; }
/** * Append data to the stream */ public static function append(array $data): void { Redis::connection('stream')->xadd(self::ingestKey(), '*', [ 'data' => serialize($data) ]); }
/** * Digest the stored ingest entries */ public static function digest(): int { $total = 0;
while (true) { $entries = collect(Redis::connection('stream')->xrange(self::ingestKey(), '-', '+', self::$chunk));
return $stats; } } [/code] Чтобы внести ясность в часть upsert, как StatisticAggregate::upsert, так и StatisticAggregateRelation::upsert независимо занимают около 15 мс каждый, и каждый upsert здесь может быть добавлено от 3 до 5 строк. В key_hash есть уникальный индекс