| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- <?php
- declare(strict_types=1);
- namespace App\Tests\Integration\Jobs;
- use App\Domain\Jobs\Job;
- use App\Domain\Jobs\JobContext;
- use App\Domain\Jobs\JobResult;
- use App\Domain\Jobs\JobStatus;
- use App\Domain\Time\FixedClock;
- use App\Infrastructure\Jobs\JobLockRepository;
- use App\Infrastructure\Jobs\JobRunner;
- use App\Infrastructure\Jobs\JobRunRepository;
- use App\Tests\Integration\Support\AppTestCase;
- use Monolog\Handler\NullHandler;
- use Monolog\Logger;
- use RuntimeException;
- /**
- * Lock contention is the headline concurrency property M05 has to get
- * right. This test simulates two callers racing for the same job by
- * pre-acquiring the lock manually before invoking the runner — far simpler
- * than spawning processes, and tests exactly the same code path.
- */
- final class JobRunnerTest extends AppTestCase
- {
- private JobLockRepository $locks;
- private JobRunRepository $runs;
- private FixedClock $clock;
- private JobRunner $runner;
- protected function setUp(): void
- {
- parent::setUp();
- $this->locks = new JobLockRepository($this->db);
- $this->runs = new JobRunRepository($this->db);
- $this->clock = FixedClock::at('2026-04-29T00:00:00Z');
- $logger = new Logger('test');
- $logger->pushHandler(new NullHandler());
- $this->runner = new JobRunner($this->locks, $this->runs, $this->clock, $logger);
- }
- public function testSuccessfulRunWritesRowAndReleasesLock(): void
- {
- $job = new class () implements Job {
- public function name(): string
- {
- return 'demo';
- }
- public function defaultIntervalSeconds(): int
- {
- return 60;
- }
- public function maxRuntimeSeconds(): int
- {
- return 30;
- }
- public function run(JobContext $context): JobResult
- {
- return new JobResult(itemsProcessed: 7);
- }
- };
- $outcome = $this->runner->run($job, [], 'manual');
- self::assertSame(JobStatus::Success, $outcome->status);
- self::assertSame(7, $outcome->itemsProcessed);
- // Lock is released — a second run must succeed.
- $second = $this->runner->run($job, [], 'manual');
- self::assertSame(JobStatus::Success, $second->status);
- $rows = $this->db->fetchAllAssociative("SELECT status FROM job_runs WHERE job_name='demo'");
- self::assertCount(2, $rows);
- foreach ($rows as $row) {
- self::assertSame('success', $row['status']);
- }
- }
- public function testFailedRunCapturesErrorMessageAndReleasesLock(): void
- {
- $job = new class () implements Job {
- public function name(): string
- {
- return 'failing';
- }
- public function defaultIntervalSeconds(): int
- {
- return 60;
- }
- public function maxRuntimeSeconds(): int
- {
- return 30;
- }
- public function run(JobContext $context): JobResult
- {
- throw new RuntimeException('intentional');
- }
- };
- $outcome = $this->runner->run($job, [], 'manual');
- self::assertSame(JobStatus::Failure, $outcome->status);
- self::assertSame('intentional', $outcome->errorMessage);
- // Lock released even on failure → next run also fails (not skipped).
- $second = $this->runner->run($job, [], 'manual');
- self::assertSame(JobStatus::Failure, $second->status);
- }
- public function testLockContentionProducesOneSuccessAndOneSkipped(): void
- {
- // Pre-acquire the lock to simulate "another worker is already
- // running this job", then dispatch a fresh runner invocation. The
- // second one must come back as skipped_locked, not block.
- $now = $this->clock->now();
- $this->locks->tryAcquire(
- 'busy',
- $now,
- $now->modify('+10 minutes'),
- 'OTHER-WORKER',
- );
- $job = new class () implements Job {
- public bool $bodyExecuted = false;
- public function name(): string
- {
- return 'busy';
- }
- public function defaultIntervalSeconds(): int
- {
- return 60;
- }
- public function maxRuntimeSeconds(): int
- {
- return 30;
- }
- public function run(JobContext $context): JobResult
- {
- $this->bodyExecuted = true; // @codeCoverageIgnore — must NOT be reached
- return new JobResult(0);
- }
- };
- $outcome = $this->runner->run($job, [], 'api');
- self::assertSame(JobStatus::SkippedLocked, $outcome->status);
- self::assertFalse($job->bodyExecuted, 'job body must not run when lock is held');
- $rows = $this->db->fetchAllAssociative("SELECT status FROM job_runs WHERE job_name='busy'");
- self::assertCount(1, $rows);
- self::assertSame('skipped_locked', $rows[0]['status']);
- }
- }
|