| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117 |
- <?php
- declare(strict_types=1);
- namespace App\Infrastructure\Jobs;
- use App\Domain\Jobs\Job;
- use App\Domain\Jobs\JobContext;
- use App\Domain\Jobs\JobOutcome;
- use App\Domain\Jobs\JobStatus;
- use App\Domain\Time\Clock;
- use App\Infrastructure\Logging\SafeTrace;
- use Psr\Log\LoggerInterface;
- use Throwable;
- /**
- * Orchestrates a single job invocation:
- * - Tries to acquire the job_locks row; on collision returns
- * `skipped_locked` (still writes a `job_runs` row so the failure is
- * auditable).
- * - On acquisition: writes a `running` row, runs the job, captures
- * success or exception, updates the row, releases the lock.
- * - Returns a `JobOutcome` describing what happened.
- *
- * Each invocation gets a unique `owner` string (`pid/random_hex`) so
- * `release()` can refuse to delete a lock that's been re-acquired by a
- * different process after an `expires_at` reclaim.
- */
- final class JobRunner
- {
- public function __construct(
- private readonly JobLockRepository $locks,
- private readonly JobRunRepository $runs,
- private readonly Clock $clock,
- private readonly LoggerInterface $logger,
- private readonly int $lockBufferSeconds = 30,
- ) {
- }
- /**
- * @param array<string, mixed> $params
- */
- public function run(Job $job, array $params, string $triggeredBy): JobOutcome
- {
- $owner = sprintf('%d/%s', getmypid() ?: 0, bin2hex(random_bytes(4)));
- $start = $this->clock->now();
- $expires = $start->modify(sprintf('+%d seconds', $job->maxRuntimeSeconds() + $this->lockBufferSeconds));
- if (!$this->locks->tryAcquire($job->name(), $start, $expires, $owner)) {
- $finish = $this->clock->now();
- $runId = $this->runs->recordCompleted(
- $job->name(),
- JobStatus::SkippedLocked,
- 0,
- $start,
- $finish,
- $triggeredBy,
- 'lock held by another worker',
- );
- return new JobOutcome(
- jobName: $job->name(),
- status: JobStatus::SkippedLocked,
- itemsProcessed: 0,
- durationMs: self::elapsedMs($start, $finish),
- runId: $runId,
- errorMessage: 'lock held by another worker',
- );
- }
- $runId = $this->runs->startRun($job->name(), $start, $triggeredBy);
- $context = new JobContext($this->clock, $this->logger, $params);
- try {
- $result = $job->run($context);
- $finish = $this->clock->now();
- $this->runs->finishRun($runId, JobStatus::Success, $result->itemsProcessed, $finish);
- return new JobOutcome(
- jobName: $job->name(),
- status: JobStatus::Success,
- itemsProcessed: $result->itemsProcessed,
- durationMs: self::elapsedMs($start, $finish),
- runId: $runId,
- );
- } catch (Throwable $e) {
- $finish = $this->clock->now();
- $this->logger->error('job_failed', [
- 'job' => $job->name(),
- 'run_id' => $runId,
- 'error' => $e->getMessage(),
- 'trace' => SafeTrace::format($e),
- ]);
- $this->runs->finishRun($runId, JobStatus::Failure, 0, $finish, $e->getMessage());
- return new JobOutcome(
- jobName: $job->name(),
- status: JobStatus::Failure,
- itemsProcessed: 0,
- durationMs: self::elapsedMs($start, $finish),
- runId: $runId,
- errorMessage: $e->getMessage(),
- );
- } finally {
- $this->locks->release($job->name(), $owner);
- }
- }
- private static function elapsedMs(\DateTimeImmutable $start, \DateTimeImmutable $finish): int
- {
- $diffMicros = $finish->getTimestamp() - $start->getTimestamp();
- $diffMicros = $diffMicros * 1_000_000
- + ((int) $finish->format('u') - (int) $start->format('u'));
- return (int) max(0, intdiv($diffMicros, 1000));
- }
- }
|