$params */ public function run(Job $job, array $params, string $triggeredBy): JobOutcome { $owner = sprintf('%d/%s', getmypid() ?: 0, bin2hex(random_bytes(4))); $start = $this->clock->now(); $expires = $start->modify(sprintf('+%d seconds', $job->maxRuntimeSeconds() + $this->lockBufferSeconds)); if (!$this->locks->tryAcquire($job->name(), $start, $expires, $owner)) { $finish = $this->clock->now(); $runId = $this->runs->recordCompleted( $job->name(), JobStatus::SkippedLocked, 0, $start, $finish, $triggeredBy, 'lock held by another worker', ); return new JobOutcome( jobName: $job->name(), status: JobStatus::SkippedLocked, itemsProcessed: 0, durationMs: self::elapsedMs($start, $finish), runId: $runId, errorMessage: 'lock held by another worker', ); } $runId = $this->runs->startRun($job->name(), $start, $triggeredBy); $context = new JobContext($this->clock, $this->logger, $params); try { $result = $job->run($context); $finish = $this->clock->now(); $this->runs->finishRun($runId, JobStatus::Success, $result->itemsProcessed, $finish); return new JobOutcome( jobName: $job->name(), status: JobStatus::Success, itemsProcessed: $result->itemsProcessed, durationMs: self::elapsedMs($start, $finish), runId: $runId, ); } catch (Throwable $e) { $finish = $this->clock->now(); $this->logger->error('job_failed', [ 'job' => $job->name(), 'run_id' => $runId, 'error' => $e->getMessage(), 'trace' => SafeTrace::format($e), ]); $this->runs->finishRun($runId, JobStatus::Failure, 0, $finish, $e->getMessage()); return new JobOutcome( jobName: $job->name(), status: JobStatus::Failure, itemsProcessed: 0, durationMs: self::elapsedMs($start, $finish), runId: $runId, errorMessage: $e->getMessage(), ); } finally { $this->locks->release($job->name(), $owner); } } private static function elapsedMs(\DateTimeImmutable $start, \DateTimeImmutable $finish): int { $diffMicros = $finish->getTimestamp() - $start->getTimestamp(); $diffMicros = $diffMicros * 1_000_000 + ((int) $finish->format('u') - (int) $start->format('u')); return (int) max(0, intdiv($diffMicros, 1000)); } }