JobRunnerTest.php 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Tests\Integration\Jobs;
  4. use App\Domain\Jobs\Job;
  5. use App\Domain\Jobs\JobContext;
  6. use App\Domain\Jobs\JobResult;
  7. use App\Domain\Jobs\JobStatus;
  8. use App\Domain\Time\FixedClock;
  9. use App\Infrastructure\Jobs\JobLockRepository;
  10. use App\Infrastructure\Jobs\JobRunner;
  11. use App\Infrastructure\Jobs\JobRunRepository;
  12. use App\Tests\Integration\Support\AppTestCase;
  13. use Monolog\Handler\NullHandler;
  14. use Monolog\Logger;
  15. use RuntimeException;
  16. /**
  17. * Lock contention is the headline concurrency property M05 has to get
  18. * right. This test simulates two callers racing for the same job by
  19. * pre-acquiring the lock manually before invoking the runner — far simpler
  20. * than spawning processes, and tests exactly the same code path.
  21. */
  22. final class JobRunnerTest extends AppTestCase
  23. {
  24. private JobLockRepository $locks;
  25. private JobRunRepository $runs;
  26. private FixedClock $clock;
  27. private JobRunner $runner;
  28. protected function setUp(): void
  29. {
  30. parent::setUp();
  31. $this->locks = new JobLockRepository($this->db);
  32. $this->runs = new JobRunRepository($this->db);
  33. $this->clock = FixedClock::at('2026-04-29T00:00:00Z');
  34. $logger = new Logger('test');
  35. $logger->pushHandler(new NullHandler());
  36. $this->runner = new JobRunner($this->locks, $this->runs, $this->clock, $logger);
  37. }
  38. public function testSuccessfulRunWritesRowAndReleasesLock(): void
  39. {
  40. $job = new class () implements Job {
  41. public function name(): string
  42. {
  43. return 'demo';
  44. }
  45. public function defaultIntervalSeconds(): int
  46. {
  47. return 60;
  48. }
  49. public function maxRuntimeSeconds(): int
  50. {
  51. return 30;
  52. }
  53. public function run(JobContext $context): JobResult
  54. {
  55. return new JobResult(itemsProcessed: 7);
  56. }
  57. };
  58. $outcome = $this->runner->run($job, [], 'manual');
  59. self::assertSame(JobStatus::Success, $outcome->status);
  60. self::assertSame(7, $outcome->itemsProcessed);
  61. // Lock is released — a second run must succeed.
  62. $second = $this->runner->run($job, [], 'manual');
  63. self::assertSame(JobStatus::Success, $second->status);
  64. $rows = $this->db->fetchAllAssociative("SELECT status FROM job_runs WHERE job_name='demo'");
  65. self::assertCount(2, $rows);
  66. foreach ($rows as $row) {
  67. self::assertSame('success', $row['status']);
  68. }
  69. }
  70. public function testFailedRunCapturesErrorMessageAndReleasesLock(): void
  71. {
  72. $job = new class () implements Job {
  73. public function name(): string
  74. {
  75. return 'failing';
  76. }
  77. public function defaultIntervalSeconds(): int
  78. {
  79. return 60;
  80. }
  81. public function maxRuntimeSeconds(): int
  82. {
  83. return 30;
  84. }
  85. public function run(JobContext $context): JobResult
  86. {
  87. throw new RuntimeException('intentional');
  88. }
  89. };
  90. $outcome = $this->runner->run($job, [], 'manual');
  91. self::assertSame(JobStatus::Failure, $outcome->status);
  92. self::assertSame('intentional', $outcome->errorMessage);
  93. // Lock released even on failure → next run also fails (not skipped).
  94. $second = $this->runner->run($job, [], 'manual');
  95. self::assertSame(JobStatus::Failure, $second->status);
  96. }
  97. public function testLockContentionProducesOneSuccessAndOneSkipped(): void
  98. {
  99. // Pre-acquire the lock to simulate "another worker is already
  100. // running this job", then dispatch a fresh runner invocation. The
  101. // second one must come back as skipped_locked, not block.
  102. $now = $this->clock->now();
  103. $this->locks->tryAcquire(
  104. 'busy',
  105. $now,
  106. $now->modify('+10 minutes'),
  107. 'OTHER-WORKER',
  108. );
  109. $job = new class () implements Job {
  110. public bool $bodyExecuted = false;
  111. public function name(): string
  112. {
  113. return 'busy';
  114. }
  115. public function defaultIntervalSeconds(): int
  116. {
  117. return 60;
  118. }
  119. public function maxRuntimeSeconds(): int
  120. {
  121. return 30;
  122. }
  123. public function run(JobContext $context): JobResult
  124. {
  125. $this->bodyExecuted = true; // @codeCoverageIgnore — must NOT be reached
  126. return new JobResult(0);
  127. }
  128. };
  129. $outcome = $this->runner->run($job, [], 'api');
  130. self::assertSame(JobStatus::SkippedLocked, $outcome->status);
  131. self::assertFalse($job->bodyExecuted, 'job body must not run when lock is held');
  132. $rows = $this->db->fetchAllAssociative("SELECT status FROM job_runs WHERE job_name='busy'");
  133. self::assertCount(1, $rows);
  134. self::assertSame('skipped_locked', $rows[0]['status']);
  135. }
  136. }