Потоковый работник Laravel Redis заполняется быстрее, чем обработка в часы пикMySql

Форум по Mysql
Ответить
Anonymous
 Потоковый работник Laravel Redis заполняется быстрее, чем обработка в часы пик

Сообщение Anonymous »

Ищу помощь с моим пользовательским инструментом upsert для Redis 7 и Laravel 11. Моему проекту нужна быстрая и расширенная отчетность. Одна из моих моделей называется StatisticAggregate, которая агрегирует данные в течение дня, принимает сеансы, увеличивает строку или создает ее, если она не существует, для каждого сеанса.
У меня есть несколько сотен записей, которые необходимо обновить или вставить, поэтому здесь уместно добавить в мою базу данных MySQL 8.
Данные добавляются в Redis поток, а затем пользовательский artisan, похожая на Laravel Pulse, работает бесконечно, извлекает данные порциями и добавляет их в базу данных, я прикреплю код.
Проблема, которую я вижу, заключается в том, что В течение дня поток приема в Redis заполняет все больше записей, чем то, что извлекается, до такой степени, что обработка фрагмента из 1000 элементов занимает около 20-30 секунд, то есть 2000 в минуту, однако в часы пик каждую минуту в поток поступает около 5000 или более записей.
Похоже, что это не является узким местом базы данных, поскольку, несмотря на огромное отставание в Redis и В таблице 100 тысяч строк, за ночь отставание полностью прорабатывается, когда нагрузка на систему падает, если бы таблица работала медленно, это наверняка было бы здесь видно.
Что могу ли я отсутствовать в своем код?
Мой рабочий php artisan statistic:aggregates:stream:work:

Код: Выделить всё

namespace App\Console\Commands\StatisticAggregates;

use Illuminate\Console\Command;
use Illuminate\Contracts\Console\Isolatable;
use Illuminate\Support\Sleep;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\Log;
use App\Services\StatisticAggregateStream;
use Carbon\CarbonImmutable;

class StatisticAggregatesStreamWork extends Command implements Isolatable
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'statistic:aggregates:stream:work';

/**
* The console command description.
*
* @var string
*/
protected $description = 'Process incoming statistic aggregate ingest stream entries';

/**
* Schedule Telescope to store entries if enabled.
*/
protected function ensureTelescopeEntriesAreCollected(): void
{
if (config('telescope.enabled') && $this->laravel->bound(\Laravel\Telescope\Contracts\EntriesRepository::class)) {
\Laravel\Telescope\Telescope::store($this->laravel->make(\Laravel\Telescope\Contracts\EntriesRepository::class));
}
}

/**
* Ensure we collect any garbage
*/
protected function collectGarbage(int $garbage = 96): void
{
if ($garbage && (memory_get_usage() / 1024 / 1024) >  $garbage) {
gc_collect_cycles();
}
}

/**
* Execute the console command.
*/
public function handle(): int
{
$lastRestart = Cache::get('statistic_aggregates:ingest:restart');

$lastTrimmedStorageAt = CarbonImmutable::now()->startOfMinute();

while (true) {
$now = CarbonImmutable::now();

if ($lastRestart !== Cache::get('statistic_aggregates:ingest:restart')) {
Cache::forget('statistic_aggregates:ingest:restart');

return self::SUCCESS;
}

StatisticAggregateStream::digest();

$this->ensureTelescopeEntriesAreCollected();
$this->collectGarbage();

Sleep::for(1)->second();
}
}
}
И функция дайджеста:

Код: Выделить всё

namespace App\Services;

use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\Redis;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Arr;
use Illuminate\Support\Sleep;
use Illuminate\Support\Lottery;
use Illuminate\Support\Collection;
use Illuminate\Database\QueryException;
use Illuminate\Database\Query\Expression;
use App\Models\Reports\StatisticAggregate;
use App\Models\Reports\StatisticAggregateRelation;
use Carbon\CarbonImmutable;
use Carbon\CarbonInterval;
use Throwable;

class StatisticAggregateStream
{
/**
* The redis stream.
*/
protected static string $stream = 'statistic_aggregates:stream';

/**
* The chunk size for the stream
*/
protected static int $chunk = 1000;

/**
* Gets the stream key
*/
private static function key(string $key = ''): string
{
return self::$stream . ':' .  $key;
}

/**
* Store recent digest runtime stats
*/
private static function logDigestRuntime(int $start = 0): void
{
Redis::connection('stream')->xadd(self::key('runtime-stats'), '*', [
'data' => json_encode([
'digest_runtime_in_seconds' => round(microtime(true) - $start)
])
], 100);
}

/**
* Store recent digest upsert stats
*/
private static function logDigestUpsert(int $statisticAggregate = 0, int $statisticAggregateRelation = 0): void
{
Redis::connection('stream')->xadd(self::key('upsert-stats'), '*', [
'data' => json_encode([
'statistic_aggregate_duration_in_ms' => $statisticAggregate,
'statistic_aggregate_relation_duration_in_ms' => $statisticAggregateRelation
])
], 100);
}

/**
* Restart the stream
*/
private static function restart(): void
{
Cache::store('stream')->forever(self::key('restart'), now());
}

/**
* Clear the stream
*/
private static function clear(): void
{
// ...
}

/**
* Store the entries in storage
*/
private static function store(Collection $items): void
{
$start = microtime(true);

if ($items->isEmpty()) {
self::logDigestRuntime($start);

return;
}

$items->each(function (array $aggregate) {
try {
$column = Arr::get($aggregate, 'column', 'total_other');

$filteredAggregates = collect(Arr::get($aggregate, 'aggregates', []))->map(function ($aggregate) {
return collect($aggregate)->except([
'company_id', 'affiliate_id', 'affiliate_campaign_id', 'buyer_id', 'buyer_tier_id'
])->toArray();
})->toArray();

$statistics = StatisticAggregate::upsert($filteredAggregates, [
'for',
'product',
'country',
'bucket',
'period',
'modelable_type',
'modelable_id',
'bucket_starts_at',
'bucket_ends_at',
'key_hash'
], [
$column => new Expression("`$column` + VALUES(`$column`)")
]);

$statistics = StatisticAggregateRelation::upsert(Arr::get($aggregate, 'aggregates', []), [
'for',
'product',
'country',
'bucket',
'period',
'modelable_type',
'modelable_id',
'bucket_starts_at',
'bucket_ends_at',
'key_hash'
], [
$column => new Expression("`$column` + VALUES(`$column`)")
]);
} catch (QueryException $err) {
if (config('app.debug')) {
Log::warning('function: store QueryException error', [
'err' => $err->getMessage(),
'file' => $err->getFile(),
'line' => $err->getLine(),
]);
}
} catch (Throwable $err) {
\Sentry\captureException($err);

if (config('app.debug')) {
Log::warning('function: store throwable error', [
'err' => $err->getMessage(),
'file' => $err->getFile(),
'line' => $err->getLine(),
]);
}
}
});

Lottery::odds(1, 2)->winner(fn () => self::logDigestRuntime($start))->choose();
}

/**
* Returns the ingest key
*/
public static function ingestKey(): string
{
return self::key('ingest');
}

/**
* Restart the stream
*/
public static function command(string $cmd = ''): void
{
match ($cmd) {
'restart' =>  self::restart(),
'clear' => self::clear(),
default => null
};
}

/**
* Append data to the stream
*/
public static function append(array $data): void
{
Redis::connection('stream')->xadd(self::ingestKey(), '*', [
'data' => serialize($data)
]);
}

/**
* Digest the stored ingest entries
*/
public static function digest(): int
{
$total = 0;

while (true) {
$entries = collect(Redis::connection('stream')->xrange(self::ingestKey(), '-', '+', self::$chunk));

if ($entries->isEmpty()) {
return $total;
}

$keys = $entries->keys();

self::store(
$entries->map(fn (array $payload) => unserialize($payload['data']))
);

Redis::connection('stream')->xdel(self::ingestKey(), $keys->toArray());

if ($entries->count() < self::$chunk) {
return $total + $entries->count();
}

$total = $total + $entries->count();
}
}

/**
* Statistics for stream
*/
public static function stats(): array
{
$stats = [
'stream' => [
'max_wait_time_in_minutes' => 0
]
];

$oldestEntry = Redis::connection('stream')->xrange(self::ingestKey(), '-', '+', 1);
$newestEntry = Redis::connection('stream')->xrevrange(self::ingestKey(), '+', '-', 1);

if ($oldestEntry && $newestEntry) {
$oldestTimestamp = (int) explode('-', array_key_first($oldestEntry))[0];
$newestTimestamp = (int) explode('-', array_key_first($newestEntry))[0];

if ($oldestEntry && $newestTimestamp) {
Arr::set($stats, 'stream.max_wait_time_in_minutes', intval(($newestTimestamp - $oldestTimestamp) / 1000 / 60, 0));
}
}

return $stats;
}
}
Чтобы внести ясность в часть upsert, как StatisticAggregate::upsert, так и StatisticAggregateRelation::upsert независимо занимают около 15 мс каждый, и каждый upsert здесь может быть добавлено от 3 до 5 строк. В key_hash есть уникальный индекс


Подробнее здесь: https://stackoverflow.com/questions/793 ... peak-times
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «MySql»