JobRunner.php 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Infrastructure\Jobs;
  4. use App\Domain\Jobs\Job;
  5. use App\Domain\Jobs\JobContext;
  6. use App\Domain\Jobs\JobOutcome;
  7. use App\Domain\Jobs\JobStatus;
  8. use App\Domain\Time\Clock;
  9. use App\Infrastructure\Logging\SafeTrace;
  10. use Psr\Log\LoggerInterface;
  11. use Throwable;
  12. /**
  13. * Orchestrates a single job invocation:
  14. * - Tries to acquire the job_locks row; on collision returns
  15. * `skipped_locked` (still writes a `job_runs` row so the failure is
  16. * auditable).
  17. * - On acquisition: writes a `running` row, runs the job, captures
  18. * success or exception, updates the row, releases the lock.
  19. * - Returns a `JobOutcome` describing what happened.
  20. *
  21. * Each invocation gets a unique `owner` string (`pid/random_hex`) so
  22. * `release()` can refuse to delete a lock that's been re-acquired by a
  23. * different process after an `expires_at` reclaim.
  24. */
  25. final class JobRunner
  26. {
  27. public function __construct(
  28. private readonly JobLockRepository $locks,
  29. private readonly JobRunRepository $runs,
  30. private readonly Clock $clock,
  31. private readonly LoggerInterface $logger,
  32. private readonly int $lockBufferSeconds = 30,
  33. ) {
  34. }
  35. /**
  36. * @param array<string, mixed> $params
  37. */
  38. public function run(Job $job, array $params, string $triggeredBy): JobOutcome
  39. {
  40. $owner = sprintf('%d/%s', getmypid() ?: 0, bin2hex(random_bytes(4)));
  41. $start = $this->clock->now();
  42. $expires = $start->modify(sprintf('+%d seconds', $job->maxRuntimeSeconds() + $this->lockBufferSeconds));
  43. if (!$this->locks->tryAcquire($job->name(), $start, $expires, $owner)) {
  44. $finish = $this->clock->now();
  45. $runId = $this->runs->recordCompleted(
  46. $job->name(),
  47. JobStatus::SkippedLocked,
  48. 0,
  49. $start,
  50. $finish,
  51. $triggeredBy,
  52. 'lock held by another worker',
  53. );
  54. return new JobOutcome(
  55. jobName: $job->name(),
  56. status: JobStatus::SkippedLocked,
  57. itemsProcessed: 0,
  58. durationMs: self::elapsedMs($start, $finish),
  59. runId: $runId,
  60. errorMessage: 'lock held by another worker',
  61. );
  62. }
  63. $runId = $this->runs->startRun($job->name(), $start, $triggeredBy);
  64. $context = new JobContext($this->clock, $this->logger, $params);
  65. try {
  66. $result = $job->run($context);
  67. $finish = $this->clock->now();
  68. $this->runs->finishRun($runId, JobStatus::Success, $result->itemsProcessed, $finish);
  69. return new JobOutcome(
  70. jobName: $job->name(),
  71. status: JobStatus::Success,
  72. itemsProcessed: $result->itemsProcessed,
  73. durationMs: self::elapsedMs($start, $finish),
  74. runId: $runId,
  75. );
  76. } catch (Throwable $e) {
  77. $finish = $this->clock->now();
  78. $this->logger->error('job_failed', [
  79. 'job' => $job->name(),
  80. 'run_id' => $runId,
  81. 'error' => $e->getMessage(),
  82. 'trace' => SafeTrace::format($e),
  83. ]);
  84. $this->runs->finishRun($runId, JobStatus::Failure, 0, $finish, $e->getMessage());
  85. return new JobOutcome(
  86. jobName: $job->name(),
  87. status: JobStatus::Failure,
  88. itemsProcessed: 0,
  89. durationMs: self::elapsedMs($start, $finish),
  90. runId: $runId,
  91. errorMessage: $e->getMessage(),
  92. );
  93. } finally {
  94. $this->locks->release($job->name(), $owner);
  95. }
  96. }
  97. private static function elapsedMs(\DateTimeImmutable $start, \DateTimeImmutable $finish): int
  98. {
  99. $diffMicros = $finish->getTimestamp() - $start->getTimestamp();
  100. $diffMicros = $diffMicros * 1_000_000
  101. + ((int) $finish->format('u') - (int) $start->format('u'));
  102. return (int) max(0, intdiv($diffMicros, 1000));
  103. }
  104. }