Przeglądaj źródła

feat(M05): reputation engine + internal jobs framework

- Job interface, JobLockRepository (atomic acquire), JobRunner, JobRegistry
- RecomputeScoresJob (full + incremental), CleanupAuditJob, EnrichPendingJob (skeleton)
- tick dispatcher; /internal/jobs/{recompute-scores,cleanup-audit,enrich-pending,tick,refresh-geoip,status}
- InternalNetworkMiddleware + InternalTokenMiddleware (network-bound + token)
- CLI: jobs:run, jobs:status, scores:rebuild

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
chiappa 1 tydzień temu
rodzic
commit
eb2486a844
33 zmienionych plików z 2778 dodań i 2 usunięć
  1. 19 0
      PROGRESS.md
  2. 96 0
      api/bin/console
  3. 3 0
      api/config/settings.php
  4. 3 2
      api/docker/Caddyfile
  5. 23 0
      api/src/App/AppFactory.php
  6. 93 0
      api/src/App/Container.php
  7. 169 0
      api/src/Application/Internal/JobsController.php
  8. 57 0
      api/src/Application/Jobs/CleanupAuditJob.php
  9. 42 0
      api/src/Application/Jobs/EnrichPendingJob.php
  10. 192 0
      api/src/Application/Jobs/RecomputeScoresJob.php
  11. 133 0
      api/src/Application/Jobs/TickJob.php
  12. 24 0
      api/src/Domain/Jobs/Job.php
  13. 33 0
      api/src/Domain/Jobs/JobContext.php
  14. 57 0
      api/src/Domain/Jobs/JobOutcome.php
  15. 25 0
      api/src/Domain/Jobs/JobResult.php
  16. 18 0
      api/src/Domain/Jobs/JobStatus.php
  17. 94 0
      api/src/Infrastructure/Http/Middleware/InternalNetworkMiddleware.php
  18. 60 0
      api/src/Infrastructure/Http/Middleware/InternalTokenMiddleware.php
  19. 109 0
      api/src/Infrastructure/Jobs/JobLockRepository.php
  20. 49 0
      api/src/Infrastructure/Jobs/JobRegistry.php
  21. 163 0
      api/src/Infrastructure/Jobs/JobRunRepository.php
  22. 116 0
      api/src/Infrastructure/Jobs/JobRunner.php
  23. 75 0
      api/src/Infrastructure/Reputation/IpScoreRepository.php
  24. 54 0
      api/src/Infrastructure/Reputation/ReportRepository.php
  25. 198 0
      api/tests/Integration/Internal/JobsEndpointsTest.php
  26. 75 0
      api/tests/Integration/Jobs/JobLockRepositoryTest.php
  27. 163 0
      api/tests/Integration/Jobs/JobRunnerTest.php
  28. 157 0
      api/tests/Integration/Jobs/RecomputeScoresJobTest.php
  29. 190 0
      api/tests/Integration/Jobs/TickJobTest.php
  30. 83 0
      api/tests/Integration/Reputation/PairScorerCutoffTest.php
  31. 76 0
      api/tests/Unit/Http/InternalNetworkMiddlewareTest.php
  32. 84 0
      api/tests/Unit/Http/InternalTokenMiddlewareTest.php
  33. 45 0
      api/tests/Unit/Jobs/JobOutcomeTest.php

+ 19 - 0
PROGRESS.md

@@ -89,3 +89,22 @@
 
 **Deviations from SPEC:** none.
 **Added dependencies:** none (chose hand-rolled validation over `respect/validation`).
+
+## M05 — Reputation engine & jobs (done)
+
+**Built:** decay math (already in M04, edge-cases re-verified); job framework with atomic locks (`JobLockRepository`), run history (`JobRunRepository`), runner abstraction (`JobRunner`), registry (`JobRegistry`); concrete jobs `RecomputeScoresJob` (full + incremental), `CleanupAuditJob`, `EnrichPendingJob` (skeleton); `TickJob` dispatcher; `/internal/jobs/{recompute-scores,cleanup-audit,enrich-pending,tick,refresh-geoip,status}` endpoints behind `InternalNetworkMiddleware` + `InternalTokenMiddleware`; CLI `jobs:run`, `jobs:status`, `scores:rebuild`.
+
+**Notes for next milestone:**
+- `PairScorer` (from M04) is reused by `RecomputeScoresJob` — both produce identical scores for the same pair.
+- `EnrichPendingJob` is a skeleton — M11 fills it in.
+- `refresh-geoip` endpoint returns 412 with `{"error":"not_implemented"}` — M11 wires it up.
+- Job results are returned synchronously; long jobs may exceed default request timeout. The `/internal/*` routes need an extended `max_execution_time` in production FrankenPHP config (deferred — current default is sufficient for the recompute's 240s ceiling).
+- Drop rule: `score < 0.01 AND last_report_at < now − 90 days`. RecomputeScoresJob backdates `last_report_at` to `now − 366 days` for orphan ip_scores rows (no surviving reports) so the same drop pass prunes them.
+- `triggered_by` convention: HTTP `/internal/jobs/*` calls use `'schedule'` (assumed cron-driven); CLI uses `'manual'`. The admin-API wrapper in M12 will pass `'manual'` through for UI button triggers.
+- TickJob takes a `Closure(): iterable<Job>` rather than a direct `JobRegistry` reference — needed to break a build-time cycle in PHP-DI (registry holds tick; tick iterates registry). The closure is invoked at run time.
+- `JobsController` resolves jobs via `JobRegistry::get($name)`, and the registry is populated lazily in the container factory in registration order: recompute, cleanup, enrich, tick.
+- Lock owner format: `<pid>/<random hex>`. Release verifies owner matches before deleting — defensive against expires_at-reclaim races.
+- Internal token middleware fails closed when `INTERNAL_JOB_TOKEN` is empty — better than silently exposing endpoints to anything inside the docker network.
+
+**Deviations from SPEC:** none.
+**Added dependencies:** none.

+ 96 - 0
api/bin/console

@@ -4,13 +4,19 @@
 declare(strict_types=1);
 
 use App\App\Container;
+use App\Application\Jobs\RecomputeScoresJob;
 use App\Domain\Auth\Role;
 use App\Domain\Auth\TokenHasher;
 use App\Domain\Auth\TokenIssuer;
 use App\Domain\Auth\TokenKind;
+use App\Domain\Time\Clock;
 use App\Infrastructure\Auth\ServiceTokenBootstrap;
 use App\Infrastructure\Auth\TokenRecord;
 use App\Infrastructure\Auth\TokenRepository;
+use App\Infrastructure\Jobs\JobLockRepository;
+use App\Infrastructure\Jobs\JobRegistry;
+use App\Infrastructure\Jobs\JobRunner;
+use App\Infrastructure\Jobs\JobRunRepository;
 
 require __DIR__ . '/../vendor/autoload.php';
 
@@ -129,6 +135,92 @@ switch ($command) {
         }
         exit(0);
 
+    case 'jobs:run':
+        $jobName = $argv[2] ?? null;
+        if ($jobName === null || str_starts_with($jobName, '--')) {
+            fwrite(STDERR, "Usage: jobs:run <job-name> [--full] [--max-rows=N]\n");
+            exit(1);
+        }
+        $container = Container::build();
+        /** @var JobRegistry $registry */
+        $registry = $container->get(JobRegistry::class);
+        if (!$registry->has($jobName)) {
+            fwrite(STDERR, "Unknown job: {$jobName}\n");
+            exit(1);
+        }
+        /** @var JobRunner $runner */
+        $runner = $container->get(JobRunner::class);
+
+        $params = [];
+        if ($hasFlag($argv, 'full')) {
+            $params['full'] = true;
+        }
+        $maxRowsArg = $flag($argv, 'max-rows');
+        if ($maxRowsArg !== null && ctype_digit($maxRowsArg)) {
+            $params['max_rows'] = (int) $maxRowsArg;
+        }
+
+        $outcome = $runner->run($registry->get($jobName), $params, 'manual');
+        fwrite(STDOUT, json_encode($outcome->toArray(), JSON_PRETTY_PRINT) . "\n");
+        exit($outcome->status->value === 'failure' ? 1 : 0);
+
+    case 'jobs:status':
+        $container = Container::build();
+        /** @var JobRegistry $registry */
+        $registry = $container->get(JobRegistry::class);
+        /** @var JobRunRepository $runs */
+        $runs = $container->get(JobRunRepository::class);
+        /** @var JobLockRepository $locks */
+        $locks = $container->get(JobLockRepository::class);
+        /** @var Clock $clock */
+        $clock = $container->get(Clock::class);
+
+        $now = $clock->now();
+        $latest = $runs->latestPerJob();
+        $rows = [];
+        foreach ($registry->all() as $name => $job) {
+            $row = $latest[$name] ?? null;
+            $finishedAt = $row['finished_at'] ?? null;
+            $overdue = $row === null
+                || ($finishedAt instanceof DateTimeImmutable
+                    && ($now->getTimestamp() - $finishedAt->getTimestamp()) > $job->defaultIntervalSeconds());
+            $rows[$name] = [
+                'name' => $name,
+                'default_interval_seconds' => $job->defaultIntervalSeconds(),
+                'overdue' => $overdue,
+                'lock' => $locks->status($name),
+                'last_run' => $row === null ? null : [
+                    'id' => $row['id'],
+                    'status' => $row['status'],
+                    'items_processed' => $row['items_processed'],
+                    'triggered_by' => $row['triggered_by'],
+                    'finished_at' => $finishedAt instanceof DateTimeImmutable
+                        ? $finishedAt->format('Y-m-d\TH:i:s\Z')
+                        : null,
+                ],
+            ];
+        }
+        fwrite(STDOUT, json_encode([
+            'now' => $now->format('Y-m-d\TH:i:s\Z'),
+            'jobs' => $rows,
+        ], JSON_PRETTY_PRINT) . "\n");
+        exit(0);
+
+    case 'scores:rebuild':
+        $container = Container::build();
+        /** @var JobRegistry $registry */
+        $registry = $container->get(JobRegistry::class);
+        /** @var JobRunner $runner */
+        $runner = $container->get(JobRunner::class);
+
+        $outcome = $runner->run(
+            $registry->get(RecomputeScoresJob::NAME),
+            ['full' => true],
+            'manual',
+        );
+        fwrite(STDOUT, json_encode($outcome->toArray(), JSON_PRETTY_PRINT) . "\n");
+        exit($outcome->status->value === 'failure' ? 1 : 0);
+
     case null:
     case '--help':
     case '-h':
@@ -142,6 +234,10 @@ switch ($command) {
               auth:bootstrap-service-token        Provision UI_SERVICE_TOKEN row in api_tokens
               auth:create-token --kind=admin --role=admin|operator|viewer [--quiet]
                                                   Create an admin token; raw token printed to stdout
+              jobs:run <name> [--full] [--max-rows=N]
+                                                  Invoke a registered job directly. Bypasses HTTP.
+              jobs:status                         Print latest run + lock state for every job
+              scores:rebuild                      Alias for `jobs:run recompute-scores --full`
 
             TXT);
         exit(0);

+ 3 - 0
api/config/settings.php

@@ -48,4 +48,7 @@ return [
     'oidc_default_role' => $oidcDefaultRole,
     'score_hard_cutoff_days' => (int) (getenv('SCORE_REPORT_HARD_CUTOFF_DAYS') ?: 365),
     'rate_limit_per_second' => (int) (getenv('API_RATE_LIMIT_PER_SECOND') ?: 60),
+    'job_recompute_max_runtime_seconds' => (int) (getenv('JOB_RECOMPUTE_MAX_RUNTIME_SECONDS') ?: 240),
+    'job_recompute_max_rows_per_tick' => (int) (getenv('JOB_RECOMPUTE_MAX_ROWS_PER_TICK') ?: 5000),
+    'job_audit_retention_days' => (int) (getenv('JOB_AUDIT_RETENTION_DAYS') ?: 180),
 ];

+ 3 - 2
api/docker/Caddyfile

@@ -12,8 +12,9 @@
     encode zstd gzip
 
     # Internal jobs API: only callable from loopback / RFC1918.
-    # No /internal/* routes are mounted yet; this matcher exists in M01 to
-    # exercise the protective pattern. Internal endpoints land in M05.
+    # The PHP layer also enforces this (InternalNetworkMiddleware) — Caddy
+    # is the first line of defence for production deployments where the
+    # api is reachable from the public internet.
     @internal {
         path /internal/*
         remote_ip 127.0.0.1/32 ::1/128 172.16.0.0/12 10.0.0.0/8 192.168.0.0/16

+ 23 - 0
api/src/App/AppFactory.php

@@ -9,10 +9,13 @@ use App\Application\Admin\MeController;
 use App\Application\Admin\ReportersController;
 use App\Application\Admin\TokensController;
 use App\Application\Auth\AuthController;
+use App\Application\Internal\JobsController;
 use App\Application\Public\ReportController;
 use App\Domain\Auth\Role;
 use App\Infrastructure\Http\JsonErrorHandler;
 use App\Infrastructure\Http\Middleware\ImpersonationMiddleware;
+use App\Infrastructure\Http\Middleware\InternalNetworkMiddleware;
+use App\Infrastructure\Http\Middleware\InternalTokenMiddleware;
 use App\Infrastructure\Http\Middleware\RateLimitMiddleware;
 use App\Infrastructure\Http\Middleware\RbacMiddleware;
 use App\Infrastructure\Http\Middleware\TokenAuthenticationMiddleware;
@@ -69,6 +72,10 @@ final class AppFactory
         $impersonation = $container->get(ImpersonationMiddleware::class);
         /** @var RateLimitMiddleware $rateLimit */
         $rateLimit = $container->get(RateLimitMiddleware::class);
+        /** @var InternalNetworkMiddleware $internalNetwork */
+        $internalNetwork = $container->get(InternalNetworkMiddleware::class);
+        /** @var InternalTokenMiddleware $internalToken */
+        $internalToken = $container->get(InternalTokenMiddleware::class);
 
         $app->get('/healthz', function (ServerRequestInterface $request, ResponseInterface $response): ResponseInterface {
             $response->getBody()->write((string) json_encode(['status' => 'ok']));
@@ -143,6 +150,22 @@ final class AppFactory
             ->add($impersonation)
             ->add($tokenAuth);
 
+        // Internal jobs API: scheduler-only. Network gate (404 outside
+        // RFC1918) → token gate (401) → controller. Order matters:
+        // network rejection must not leak through token-attempt logs.
+        $app->group('/internal/jobs', function (RouteCollectorProxy $internal) use ($container): void {
+            /** @var JobsController $jobs */
+            $jobs = $container->get(JobsController::class);
+            $internal->post('/recompute-scores', [$jobs, 'recomputeScores']);
+            $internal->post('/cleanup-audit', [$jobs, 'cleanupAudit']);
+            $internal->post('/enrich-pending', [$jobs, 'enrichPending']);
+            $internal->post('/tick', [$jobs, 'tick']);
+            $internal->post('/refresh-geoip', [$jobs, 'refreshGeoip']);
+            $internal->get('/status', [$jobs, 'status']);
+        })
+            ->add($internalToken)
+            ->add($internalNetwork);
+
         $app->map(
             ['GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS'],
             '/{routes:.+}',

+ 93 - 0
api/src/App/Container.php

@@ -9,6 +9,11 @@ use App\Application\Admin\MeController;
 use App\Application\Admin\ReportersController;
 use App\Application\Admin\TokensController;
 use App\Application\Auth\AuthController;
+use App\Application\Internal\JobsController;
+use App\Application\Jobs\CleanupAuditJob;
+use App\Application\Jobs\EnrichPendingJob;
+use App\Application\Jobs\RecomputeScoresJob;
+use App\Application\Jobs\TickJob;
 use App\Application\Public\ReportController;
 use App\Domain\Auth\Role;
 use App\Domain\Auth\TokenHasher;
@@ -25,9 +30,15 @@ use App\Infrastructure\Consumer\ConsumerRepository;
 use App\Infrastructure\Db\ConnectionFactory;
 use App\Infrastructure\Http\JsonErrorHandler;
 use App\Infrastructure\Http\Middleware\ImpersonationMiddleware;
+use App\Infrastructure\Http\Middleware\InternalNetworkMiddleware;
+use App\Infrastructure\Http\Middleware\InternalTokenMiddleware;
 use App\Infrastructure\Http\Middleware\RateLimitMiddleware;
 use App\Infrastructure\Http\Middleware\TokenAuthenticationMiddleware;
 use App\Infrastructure\Http\RateLimiter;
+use App\Infrastructure\Jobs\JobLockRepository;
+use App\Infrastructure\Jobs\JobRegistry;
+use App\Infrastructure\Jobs\JobRunner;
+use App\Infrastructure\Jobs\JobRunRepository;
 use App\Infrastructure\Reporter\ReporterRepository;
 use App\Infrastructure\Reputation\IpScoreRepository;
 use App\Infrastructure\Reputation\ReportRepository;
@@ -75,6 +86,10 @@ final class Container
             'settings.oidc_default_role' => $settings['oidc_default_role'] ?? Role::Viewer,
             'settings.score_hard_cutoff_days' => (int) ($settings['score_hard_cutoff_days'] ?? 365),
             'settings.rate_limit_per_second' => (int) ($settings['rate_limit_per_second'] ?? 60),
+            'settings.internal_job_token' => (string) ($settings['internal_job_token'] ?? ''),
+            'settings.job_recompute_max_runtime_seconds' => (int) ($settings['job_recompute_max_runtime_seconds'] ?? 240),
+            'settings.job_recompute_max_rows_per_tick' => (int) ($settings['job_recompute_max_rows_per_tick'] ?? 5000),
+            'settings.job_audit_retention_days' => (int) ($settings['job_audit_retention_days'] ?? 180),
             ConnectionFactory::class => factory(static function (ContainerInterface $c): ConnectionFactory {
                 /** @var array{driver: string, sqlite_path: string, mysql_host: string, mysql_port: int, mysql_database: string, mysql_username: string, mysql_password: string} $db */
                 $db = $c->get('settings.db');
@@ -134,6 +149,84 @@ final class Container
                 return new RateLimiter($clock, (float) $perSecond, (float) ($perSecond * 2));
             }),
             RateLimitMiddleware::class => autowire(),
+            JobLockRepository::class => autowire(),
+            JobRunRepository::class => autowire(),
+            JobRunner::class => factory(static function (ContainerInterface $c): JobRunner {
+                /** @var JobLockRepository $locks */
+                $locks = $c->get(JobLockRepository::class);
+                /** @var JobRunRepository $runs */
+                $runs = $c->get(JobRunRepository::class);
+                /** @var Clock $clock */
+                $clock = $c->get(Clock::class);
+                /** @var LoggerInterface $logger */
+                $logger = $c->get(LoggerInterface::class);
+
+                return new JobRunner($locks, $runs, $clock, $logger);
+            }),
+            RecomputeScoresJob::class => factory(static function (ContainerInterface $c): RecomputeScoresJob {
+                /** @var ReportRepository $reports */
+                $reports = $c->get(ReportRepository::class);
+                /** @var IpScoreRepository $ipScores */
+                $ipScores = $c->get(IpScoreRepository::class);
+                /** @var PairScorer $scorer */
+                $scorer = $c->get(PairScorer::class);
+                /** @var int $maxRuntime */
+                $maxRuntime = $c->get('settings.job_recompute_max_runtime_seconds');
+                /** @var int $maxRows */
+                $maxRows = $c->get('settings.job_recompute_max_rows_per_tick');
+
+                return new RecomputeScoresJob($reports, $ipScores, $scorer, $maxRuntime, $maxRows);
+            }),
+            CleanupAuditJob::class => factory(static function (ContainerInterface $c): CleanupAuditJob {
+                /** @var Connection $conn */
+                $conn = $c->get(Connection::class);
+                /** @var int $days */
+                $days = $c->get('settings.job_audit_retention_days');
+
+                return new CleanupAuditJob($conn, $days);
+            }),
+            EnrichPendingJob::class => autowire(),
+            TickJob::class => factory(static function (ContainerInterface $c): TickJob {
+                /** @var JobRunner $runner */
+                $runner = $c->get(JobRunner::class);
+                /** @var JobRunRepository $runs */
+                $runs = $c->get(JobRunRepository::class);
+
+                // Closure indirection: TickJob iterates JobRegistry at
+                // run() time, after the registry is fully populated. A
+                // direct JobRegistry dependency would create a build-time
+                // cycle (registry → tick → registry).
+                $resolver = static fn (): array => $c->get(JobRegistry::class)->all();
+
+                return new TickJob($resolver, $runner, $runs);
+            }),
+            JobRegistry::class => factory(static function (ContainerInterface $c): JobRegistry {
+                $registry = new JobRegistry();
+                /** @var RecomputeScoresJob $recompute */
+                $recompute = $c->get(RecomputeScoresJob::class);
+                /** @var CleanupAuditJob $cleanup */
+                $cleanup = $c->get(CleanupAuditJob::class);
+                /** @var EnrichPendingJob $enrich */
+                $enrich = $c->get(EnrichPendingJob::class);
+                /** @var TickJob $tick */
+                $tick = $c->get(TickJob::class);
+                $registry->register($recompute);
+                $registry->register($cleanup);
+                $registry->register($enrich);
+                $registry->register($tick);
+
+                return $registry;
+            }),
+            JobsController::class => autowire(),
+            InternalNetworkMiddleware::class => autowire(),
+            InternalTokenMiddleware::class => factory(static function (ContainerInterface $c): InternalTokenMiddleware {
+                /** @var ResponseFactoryInterface $rf */
+                $rf = $c->get(ResponseFactoryInterface::class);
+                /** @var string $token */
+                $token = $c->get('settings.internal_job_token');
+
+                return new InternalTokenMiddleware($rf, $token);
+            }),
             JsonErrorHandler::class => factory(static function (ContainerInterface $c): JsonErrorHandler {
                 /** @var ResponseFactoryInterface $factory */
                 $factory = $c->get(ResponseFactoryInterface::class);

+ 169 - 0
api/src/Application/Internal/JobsController.php

@@ -0,0 +1,169 @@
+<?php
+
+declare(strict_types=1);
+
+namespace App\Application\Internal;
+
+use App\Application\Jobs\CleanupAuditJob;
+use App\Application\Jobs\EnrichPendingJob;
+use App\Application\Jobs\RecomputeScoresJob;
+use App\Application\Jobs\TickJob;
+use App\Domain\Jobs\JobOutcome;
+use App\Domain\Time\Clock;
+use App\Infrastructure\Jobs\JobLockRepository;
+use App\Infrastructure\Jobs\JobRegistry;
+use App\Infrastructure\Jobs\JobRunner;
+use App\Infrastructure\Jobs\JobRunRepository;
+use Psr\Http\Message\ResponseInterface;
+use Psr\Http\Message\ServerRequestInterface;
+
+/**
+ * `/internal/jobs/*` controller — invoked only by the scheduler (cron /
+ * sidecar). Network + token middlewares run upstream; this class handles
+ * job dispatch and the response envelope.
+ *
+ * `refreshGeoip` returns 412 because the actual MaxMind plumbing lands in
+ * M11. The endpoint exists now so the scheduler config doesn't need to
+ * change between milestones.
+ */
+final class JobsController
+{
+    public function __construct(
+        private readonly JobRegistry $registry,
+        private readonly JobRunner $runner,
+        private readonly JobRunRepository $runs,
+        private readonly JobLockRepository $locks,
+        private readonly Clock $clock,
+    ) {
+    }
+
+    public function recomputeScores(ServerRequestInterface $request, ResponseInterface $response): ResponseInterface
+    {
+        $body = self::jsonBody($request);
+        $params = [];
+        if (isset($body['full'])) {
+            $params['full'] = (bool) $body['full'];
+        }
+        if (isset($body['max_rows']) && is_numeric($body['max_rows'])) {
+            $params['max_rows'] = (int) $body['max_rows'];
+        }
+
+        $job = $this->registry->get(RecomputeScoresJob::NAME);
+        $outcome = $this->runner->run($job, $params, 'schedule');
+
+        return self::renderOutcome($response, $outcome);
+    }
+
+    public function cleanupAudit(ServerRequestInterface $request, ResponseInterface $response): ResponseInterface
+    {
+        $job = $this->registry->get(CleanupAuditJob::NAME);
+        $outcome = $this->runner->run($job, [], 'schedule');
+
+        return self::renderOutcome($response, $outcome);
+    }
+
+    public function enrichPending(ServerRequestInterface $request, ResponseInterface $response): ResponseInterface
+    {
+        $job = $this->registry->get(EnrichPendingJob::NAME);
+        $outcome = $this->runner->run($job, [], 'schedule');
+
+        return self::renderOutcome($response, $outcome);
+    }
+
+    public function tick(ServerRequestInterface $request, ResponseInterface $response): ResponseInterface
+    {
+        $job = $this->registry->get(TickJob::NAME);
+        $outcome = $this->runner->run($job, [], 'schedule');
+
+        return self::renderOutcome($response, $outcome);
+    }
+
+    public function refreshGeoip(ServerRequestInterface $request, ResponseInterface $response): ResponseInterface
+    {
+        $response = $response->withStatus(412)->withHeader('Content-Type', 'application/json');
+        $response->getBody()->write((string) json_encode([
+            'error' => 'not_implemented',
+            'reason' => 'GeoIP refresh lands in M11',
+        ]));
+
+        return $response;
+    }
+
+    public function status(ServerRequestInterface $request, ResponseInterface $response): ResponseInterface
+    {
+        $now = $this->clock->now();
+        $latest = $this->runs->latestPerJob();
+
+        $jobs = [];
+        foreach ($this->registry->all() as $name => $job) {
+            $row = $latest[$name] ?? null;
+
+            $finishedAt = $row['finished_at'] ?? null;
+            $overdue = $row === null
+                || ($finishedAt instanceof \DateTimeImmutable
+                    && ($now->getTimestamp() - $finishedAt->getTimestamp()) > $job->defaultIntervalSeconds());
+
+            $jobs[$name] = [
+                'name' => $name,
+                'default_interval_seconds' => $job->defaultIntervalSeconds(),
+                'max_runtime_seconds' => $job->maxRuntimeSeconds(),
+                'overdue' => $overdue,
+                'lock' => $this->locks->status($name),
+                'last_run' => $row === null ? null : [
+                    'id' => $row['id'],
+                    'status' => $row['status'],
+                    'items_processed' => $row['items_processed'],
+                    'triggered_by' => $row['triggered_by'],
+                    'started_at' => self::formatTs($row['started_at']),
+                    'finished_at' => self::formatTs($row['finished_at']),
+                    'error_message' => $row['error_message'],
+                ],
+            ];
+        }
+
+        $response = $response->withStatus(200)->withHeader('Content-Type', 'application/json');
+        $response->getBody()->write((string) json_encode([
+            'now' => $now->format('Y-m-d\TH:i:s\Z'),
+            'jobs' => $jobs,
+        ]));
+
+        return $response;
+    }
+
+    private static function renderOutcome(ResponseInterface $response, JobOutcome $outcome): ResponseInterface
+    {
+        $response = $response
+            ->withStatus($outcome->httpStatus())
+            ->withHeader('Content-Type', 'application/json');
+        $response->getBody()->write((string) json_encode($outcome->toArray()));
+
+        return $response;
+    }
+
+    /**
+     * @return array<string, mixed>
+     */
+    private static function jsonBody(ServerRequestInterface $request): array
+    {
+        $parsed = $request->getParsedBody();
+        if (is_array($parsed)) {
+            return $parsed;
+        }
+        $raw = (string) $request->getBody();
+        if ($raw === '') {
+            return [];
+        }
+        $decoded = json_decode($raw, true);
+
+        return is_array($decoded) ? $decoded : [];
+    }
+
+    private static function formatTs(mixed $value): ?string
+    {
+        if (!$value instanceof \DateTimeImmutable) {
+            return null;
+        }
+
+        return $value->format('Y-m-d\TH:i:s\Z');
+    }
+}

+ 57 - 0
api/src/Application/Jobs/CleanupAuditJob.php

@@ -0,0 +1,57 @@
+<?php
+
+declare(strict_types=1);
+
+namespace App\Application\Jobs;
+
+use App\Domain\Jobs\Job;
+use App\Domain\Jobs\JobContext;
+use App\Domain\Jobs\JobResult;
+use Doctrine\DBAL\Connection;
+
+/**
+ * Prunes `audit_log` rows older than `JOB_AUDIT_RETENTION_DAYS`.
+ *
+ * The audit emitter lands in M12; until then the table will be empty and
+ * this job will routinely return 0 items processed. Running it now keeps
+ * the schedule honest and surfaces any DB connectivity issues early.
+ */
+final class CleanupAuditJob implements Job
+{
+    public const NAME = 'cleanup-audit';
+
+    public function __construct(
+        private readonly Connection $connection,
+        private readonly int $retentionDays = 180,
+    ) {
+    }
+
+    public function name(): string
+    {
+        return self::NAME;
+    }
+
+    public function defaultIntervalSeconds(): int
+    {
+        return 86400;
+    }
+
+    public function maxRuntimeSeconds(): int
+    {
+        return 60;
+    }
+
+    public function run(JobContext $context): JobResult
+    {
+        $cutoff = $context->clock->now()->modify(sprintf('-%d days', $this->retentionDays));
+        $deleted = (int) $this->connection->executeStatement(
+            'DELETE FROM audit_log WHERE created_at < :cutoff',
+            ['cutoff' => $cutoff->format('Y-m-d H:i:s')],
+        );
+
+        return new JobResult(
+            itemsProcessed: $deleted,
+            details: ['retention_days' => $this->retentionDays],
+        );
+    }
+}

+ 42 - 0
api/src/Application/Jobs/EnrichPendingJob.php

@@ -0,0 +1,42 @@
+<?php
+
+declare(strict_types=1);
+
+namespace App\Application\Jobs;
+
+use App\Domain\Jobs\Job;
+use App\Domain\Jobs\JobContext;
+use App\Domain\Jobs\JobResult;
+
+/**
+ * Skeleton for GeoIP/ASN enrichment. M11 wires in the MaxMind reader,
+ * batches IPs missing enrichment, and writes `ip_enrichment`. Until then
+ * this is a no-op that logs at debug level so the schedule is exercised
+ * end-to-end.
+ */
+final class EnrichPendingJob implements Job
+{
+    public const NAME = 'enrich-pending';
+
+    public function name(): string
+    {
+        return self::NAME;
+    }
+
+    public function defaultIntervalSeconds(): int
+    {
+        return 300;
+    }
+
+    public function maxRuntimeSeconds(): int
+    {
+        return 60;
+    }
+
+    public function run(JobContext $context): JobResult
+    {
+        $context->logger->debug('enrich_pending_skeleton', ['note' => 'M11 will fill this in']);
+
+        return new JobResult(itemsProcessed: 0, details: ['skeleton' => true]);
+    }
+}

+ 192 - 0
api/src/Application/Jobs/RecomputeScoresJob.php

@@ -0,0 +1,192 @@
+<?php
+
+declare(strict_types=1);
+
+namespace App\Application\Jobs;
+
+use App\Domain\Jobs\Job;
+use App\Domain\Jobs\JobContext;
+use App\Domain\Jobs\JobResult;
+use App\Domain\Reputation\PairScorer;
+use App\Infrastructure\Reputation\IpScoreRepository;
+use App\Infrastructure\Reputation\ReportRepository;
+
+/**
+ * Bulk recompute of `ip_scores`. Two modes:
+ *
+ *  - **Incremental** (default): pairs touched by `reports` since the last
+ *    interval, UNION pairs in `ip_scores` whose `recomputed_at` is older
+ *    than the freshness window. Capped at `JOB_RECOMPUTE_MAX_ROWS_PER_TICK`
+ *    so a single tick can't run forever.
+ *
+ *  - **Full** (`full=true`): every pair in `ip_scores` plus every pair in
+ *    `reports`. No row cap; the job's `maxRuntimeSeconds` is the only
+ *    bound. Used by the "Rebuild scores" UI button and the CLI
+ *    `scores:rebuild`.
+ *
+ * After scoring it drops rows where `score < 0.01 AND last_report_at < now
+ * - 90 days` per SPEC §5.
+ */
+final class RecomputeScoresJob implements Job
+{
+    public const NAME = 'recompute-scores';
+
+    private const FRESHNESS_WINDOW_SECONDS = 3600;
+    private const INCREMENTAL_LOOKBACK_SECONDS = 600;
+    private const DROP_MIN_SCORE = 0.01;
+    private const DROP_STALE_DAYS = 90;
+
+    public function __construct(
+        private readonly ReportRepository $reports,
+        private readonly IpScoreRepository $ipScores,
+        private readonly PairScorer $scorer,
+        private readonly int $maxRuntimeSeconds = 240,
+        private readonly int $maxRowsPerTick = 5000,
+    ) {
+    }
+
+    public function name(): string
+    {
+        return self::NAME;
+    }
+
+    public function defaultIntervalSeconds(): int
+    {
+        return 300;
+    }
+
+    public function maxRuntimeSeconds(): int
+    {
+        return $this->maxRuntimeSeconds;
+    }
+
+    public function run(JobContext $context): JobResult
+    {
+        $now = $context->clock->now();
+        $full = (bool) $context->param('full', false);
+        $maxRowsParam = $context->param('max_rows');
+        $maxRows = is_int($maxRowsParam) && $maxRowsParam > 0
+            ? $maxRowsParam
+            : $this->maxRowsPerTick;
+
+        if ($full) {
+            $pairs = $this->collectAllPairs();
+        } else {
+            $pairs = $this->collectIncrementalPairs($now, $maxRows);
+        }
+
+        $processed = 0;
+        $deadline = $now->modify(sprintf('+%d seconds', max(1, $this->maxRuntimeSeconds - 5)));
+
+        foreach ($pairs as $pair) {
+            // Time-bound full mode (no row cap). Incremental mode is also
+            // protected — `pairs` is already capped by maxRows, but a
+            // very slow DB could still exceed runtime.
+            $heartbeat = $context->clock->now();
+            if ($heartbeat >= $deadline) {
+                $context->logger->warning('recompute_scores_deadline', [
+                    'processed' => $processed,
+                    'remaining' => count($pairs) - $processed,
+                ]);
+                break;
+            }
+
+            $score = $this->scorer->score($pair['ip_bin'], $pair['category_id'], $now);
+            $count30d = $this->reports->recentCount(
+                $pair['ip_bin'],
+                $pair['category_id'],
+                $now->modify('-30 days'),
+            );
+            // Use the actual most-recent report's received_at so the
+            // drop-stale rule can fire correctly. If a pair has no reports
+            // at all (only an orphan ip_scores row), back-date enough that
+            // the drop step prunes it on this same run.
+            $lastReportAt = $this->reports->lastReceivedAt(
+                $pair['ip_bin'],
+                $pair['category_id'],
+            ) ?? $now->modify('-366 days');
+
+            $this->ipScores->upsert(
+                ipBin: $pair['ip_bin'],
+                ipText: $pair['ip_text'],
+                categoryId: $pair['category_id'],
+                score: $score,
+                reportCount30d: $count30d,
+                lastReportAt: $lastReportAt,
+                recomputedAt: $now,
+            );
+            ++$processed;
+        }
+
+        $dropped = $this->ipScores->dropStale(
+            self::DROP_MIN_SCORE,
+            $now->modify(sprintf('-%d days', self::DROP_STALE_DAYS)),
+        );
+
+        return new JobResult(
+            itemsProcessed: $processed,
+            details: [
+                'mode' => $full ? 'full' : 'incremental',
+                'dropped' => $dropped,
+                'considered' => count($pairs),
+            ],
+        );
+    }
+
+    /**
+     * @return list<array{ip_bin: string, ip_text: string, category_id: int}>
+     */
+    private function collectAllPairs(): array
+    {
+        $all = $this->ipScores->allPairs();
+        $reports = $this->reports->distinctPairsSince(
+            new \DateTimeImmutable('@0', new \DateTimeZone('UTC')),
+        );
+
+        return self::dedup(array_merge($all, $reports));
+    }
+
+    /**
+     * @return list<array{ip_bin: string, ip_text: string, category_id: int}>
+     */
+    private function collectIncrementalPairs(\DateTimeImmutable $now, int $maxRows): array
+    {
+        $touched = $this->reports->distinctPairsSince(
+            $now->modify(sprintf('-%d seconds', self::INCREMENTAL_LOOKBACK_SECONDS)),
+            $maxRows,
+        );
+        $stale = $this->ipScores->pairsStaleBefore(
+            $now->modify(sprintf('-%d seconds', self::FRESHNESS_WINDOW_SECONDS)),
+        );
+
+        $merged = self::dedup(array_merge($touched, $stale));
+        if (count($merged) > $maxRows) {
+            $merged = array_slice($merged, 0, $maxRows);
+        }
+
+        return $merged;
+    }
+
+    /**
+     * Stable dedup keyed by (ip_bin, category_id). Preserves first-seen
+     * order so callers can reason about cap behavior.
+     *
+     * @param list<array{ip_bin: string, ip_text: string, category_id: int}> $pairs
+     * @return list<array{ip_bin: string, ip_text: string, category_id: int}>
+     */
+    private static function dedup(array $pairs): array
+    {
+        $seen = [];
+        $out = [];
+        foreach ($pairs as $p) {
+            $key = bin2hex($p['ip_bin']) . '|' . $p['category_id'];
+            if (isset($seen[$key])) {
+                continue;
+            }
+            $seen[$key] = true;
+            $out[] = $p;
+        }
+
+        return $out;
+    }
+}

+ 133 - 0
api/src/Application/Jobs/TickJob.php

@@ -0,0 +1,133 @@
+<?php
+
+declare(strict_types=1);
+
+namespace App\Application\Jobs;
+
+use App\Domain\Jobs\Job;
+use App\Domain\Jobs\JobContext;
+use App\Domain\Jobs\JobResult;
+use App\Domain\Jobs\JobStatus;
+use App\Infrastructure\Jobs\JobRunner;
+use App\Infrastructure\Jobs\JobRunRepository;
+use Closure;
+use Throwable;
+
+/**
+ * The cron-friendly dispatcher. Iterates the registry and invokes any job
+ * whose `defaultIntervalSeconds` has elapsed since its last finished run.
+ *
+ * Tick is itself a Job so its invocation produces a `job_runs` row with
+ * `job_name = 'tick'`. The lock prevents two scheduler ticks racing if a
+ * previous tick is still wading through child jobs.
+ *
+ * Per-job exceptions are caught and recorded — one bad job must not abort
+ * the dispatcher.
+ */
+final class TickJob implements Job
+{
+    public const NAME = 'tick';
+
+    /**
+     * @param Closure(): iterable<string, Job> $jobsResolver Returns the
+     *     registered jobs at run time. The closure indirection breaks a
+     *     construction-time cycle between TickJob and JobRegistry (the
+     *     registry holds tick; tick iterates the registry).
+     */
+    public function __construct(
+        private readonly Closure $jobsResolver,
+        private readonly JobRunner $runner,
+        private readonly JobRunRepository $runs,
+    ) {
+    }
+
+    public function name(): string
+    {
+        return self::NAME;
+    }
+
+    public function defaultIntervalSeconds(): int
+    {
+        // Tick runs by external schedule (cron / sidecar). The dispatcher
+        // never asks "is tick due?" against this value — it's surfaced
+        // only for /internal/jobs/status display.
+        return 60;
+    }
+
+    public function maxRuntimeSeconds(): int
+    {
+        return 300;
+    }
+
+    public function run(JobContext $context): JobResult
+    {
+        $now = $context->clock->now();
+        $latest = $this->runs->latestPerJob();
+
+        $invoked = 0;
+        $invokedNames = [];
+        $jobs = ($this->jobsResolver)();
+        foreach ($jobs as $name => $job) {
+            if ($name === self::NAME) {
+                continue; // never re-enter ourselves
+            }
+
+            $row = $latest[$name] ?? null;
+            if (!self::isDue($job, $row, $now)) {
+                continue;
+            }
+
+            try {
+                $outcome = $this->runner->run($job, [], 'schedule');
+                $invokedNames[] = $name;
+                ++$invoked;
+
+                if ($outcome->status === JobStatus::Failure) {
+                    $context->logger->warning('tick_child_failed', [
+                        'job' => $name,
+                        'run_id' => $outcome->runId,
+                        'error' => $outcome->errorMessage,
+                    ]);
+                }
+            } catch (Throwable $e) {
+                // JobRunner already maps Throwables to JobOutcome::Failure;
+                // this catches anything that escapes (eg. lock-repo bug)
+                // so one rotten job can't take the tick down.
+                $context->logger->error('tick_child_threw', [
+                    'job' => $name,
+                    'error' => $e->getMessage(),
+                ]);
+            }
+        }
+
+        return new JobResult(
+            itemsProcessed: $invoked,
+            details: ['invoked' => $invokedNames],
+        );
+    }
+
+    /**
+     * @param array{
+     *     id?: int,
+     *     job_name?: string,
+     *     status?: string,
+     *     finished_at?: ?\DateTimeImmutable,
+     *     started_at?: ?\DateTimeImmutable
+     * }|null $latestRow
+     */
+    private static function isDue(Job $job, ?array $latestRow, \DateTimeImmutable $now): bool
+    {
+        if ($latestRow === null) {
+            return true;
+        }
+
+        $finished = $latestRow['finished_at'] ?? $latestRow['started_at'] ?? null;
+        if (!$finished instanceof \DateTimeImmutable) {
+            // running row with no finish; conservatively treat as still
+            // in flight, don't pile on.
+            return false;
+        }
+
+        return ($now->getTimestamp() - $finished->getTimestamp()) >= $job->defaultIntervalSeconds();
+    }
+}

+ 24 - 0
api/src/Domain/Jobs/Job.php

@@ -0,0 +1,24 @@
+<?php
+
+declare(strict_types=1);
+
+namespace App\Domain\Jobs;
+
+/**
+ * A periodic batch unit of work. Implementations declare scheduling
+ * metadata and a single `run()` method; `JobRunner` wraps invocation with
+ * lock acquisition, run-history bookkeeping, and exception capture.
+ *
+ * Concrete jobs live under `App\Application\Jobs\…` (recompute-scores,
+ * cleanup-audit, enrich-pending) plus the `tick` dispatcher.
+ */
+interface Job
+{
+    public function name(): string;
+
+    public function defaultIntervalSeconds(): int;
+
+    public function maxRuntimeSeconds(): int;
+
+    public function run(JobContext $context): JobResult;
+}

+ 33 - 0
api/src/Domain/Jobs/JobContext.php

@@ -0,0 +1,33 @@
+<?php
+
+declare(strict_types=1);
+
+namespace App\Domain\Jobs;
+
+use App\Domain\Time\Clock;
+use Psr\Log\LoggerInterface;
+
+/**
+ * Per-invocation state passed into Job::run().
+ *
+ * Carries the clock + logger plus arbitrary params (e.g. `full=true` for
+ * RecomputeScoresJob). `param()` is the only safe way to read params —
+ * defaults handle missing keys without forcing every job to validate them.
+ */
+final class JobContext
+{
+    /**
+     * @param array<string, mixed> $params
+     */
+    public function __construct(
+        public readonly Clock $clock,
+        public readonly LoggerInterface $logger,
+        public readonly array $params = [],
+    ) {
+    }
+
+    public function param(string $key, mixed $default = null): mixed
+    {
+        return $this->params[$key] ?? $default;
+    }
+}

+ 57 - 0
api/src/Domain/Jobs/JobOutcome.php

@@ -0,0 +1,57 @@
+<?php
+
+declare(strict_types=1);
+
+namespace App\Domain\Jobs;
+
+/**
+ * What `JobRunner::run()` returns. Carries everything the HTTP envelope or
+ * CLI output needs:
+ *
+ *   {"job":"recompute-scores","status":"success","items_processed":42,
+ *    "duration_ms":81,"run_id":17}
+ *
+ * `errorMessage` is non-null only on `failure`. `runId` is the
+ * `job_runs.id` of the final row.
+ */
+final class JobOutcome
+{
+    public function __construct(
+        public readonly string $jobName,
+        public readonly JobStatus $status,
+        public readonly int $itemsProcessed,
+        public readonly int $durationMs,
+        public readonly int $runId,
+        public readonly ?string $errorMessage = null,
+    ) {
+    }
+
+    public function httpStatus(): int
+    {
+        return match ($this->status) {
+            JobStatus::Success => 200,
+            JobStatus::Failure => 500,
+            JobStatus::SkippedLocked => 409,
+            JobStatus::Running => 202,
+        };
+    }
+
+    /**
+     * @return array<string, mixed>
+     */
+    public function toArray(): array
+    {
+        $out = [
+            'job' => $this->jobName,
+            'status' => $this->status->value,
+            'items_processed' => $this->itemsProcessed,
+            'duration_ms' => $this->durationMs,
+            'run_id' => $this->runId,
+        ];
+        if ($this->errorMessage !== null) {
+            $out['error'] = $this->errorMessage;
+        }
+
+        return $out;
+    }
+}

+ 25 - 0
api/src/Domain/Jobs/JobResult.php

@@ -0,0 +1,25 @@
+<?php
+
+declare(strict_types=1);
+
+namespace App\Domain\Jobs;
+
+/**
+ * What a Job returns after a successful run. Failures throw instead.
+ *
+ * `details` is freeform per-job data the runner persists into
+ * `job_runs.error_message` only on failure; on success it's dropped after
+ * being included in the HTTP response envelope (debug-friendly, never
+ * security-sensitive).
+ */
+final class JobResult
+{
+    /**
+     * @param array<string, mixed> $details
+     */
+    public function __construct(
+        public readonly int $itemsProcessed,
+        public readonly array $details = [],
+    ) {
+    }
+}

+ 18 - 0
api/src/Domain/Jobs/JobStatus.php

@@ -0,0 +1,18 @@
+<?php
+
+declare(strict_types=1);
+
+namespace App\Domain\Jobs;
+
+/**
+ * Terminal status of a job invocation. `Running` is the in-flight value
+ * `JobRunner` writes before the body executes; the final row always carries
+ * one of the other three.
+ */
+enum JobStatus: string
+{
+    case Running = 'running';
+    case Success = 'success';
+    case Failure = 'failure';
+    case SkippedLocked = 'skipped_locked';
+}

+ 94 - 0
api/src/Infrastructure/Http/Middleware/InternalNetworkMiddleware.php

@@ -0,0 +1,94 @@
+<?php
+
+declare(strict_types=1);
+
+namespace App\Infrastructure\Http\Middleware;
+
+use App\Domain\Ip\Cidr;
+use App\Domain\Ip\InvalidIpException;
+use App\Domain\Ip\IpAddress;
+use Psr\Http\Message\ResponseFactoryInterface;
+use Psr\Http\Message\ResponseInterface;
+use Psr\Http\Message\ServerRequestInterface;
+use Psr\Http\Server\MiddlewareInterface;
+use Psr\Http\Server\RequestHandlerInterface;
+
+/**
+ * Belt-and-suspenders network gate for `/internal/*`. Caddyfile already
+ * 404s public-IP sources before the request reaches PHP, but this
+ * middleware enforces the same rule when Slim is invoked directly (tests,
+ * misconfigurations, future HTTP servers).
+ *
+ * Returns `404` rather than `403` so external observers can't tell the
+ * endpoint exists. RFC1918 + loopback are allowed; everything else is
+ * silently dropped.
+ */
+final class InternalNetworkMiddleware implements MiddlewareInterface
+{
+    /** @var list<string> */
+    private const ALLOWED_CIDRS = [
+        '127.0.0.1/32',
+        '::1/128',
+        '10.0.0.0/8',
+        '172.16.0.0/12',
+        '192.168.0.0/16',
+    ];
+
+    /** @var list<Cidr> */
+    private array $cidrs;
+
+    public function __construct(private readonly ResponseFactoryInterface $responseFactory)
+    {
+        $this->cidrs = array_map(
+            static fn (string $c): Cidr => Cidr::fromString($c),
+            self::ALLOWED_CIDRS,
+        );
+    }
+
+    public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface
+    {
+        $remote = self::remoteAddr($request);
+        if ($remote === null || !$this->isAllowed($remote)) {
+            return $this->notFound();
+        }
+
+        return $handler->handle($request);
+    }
+
+    private function isAllowed(string $remote): bool
+    {
+        try {
+            $ip = IpAddress::fromString($remote);
+        } catch (InvalidIpException) {
+            return false;
+        }
+
+        foreach ($this->cidrs as $cidr) {
+            if ($cidr->contains($ip)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private static function remoteAddr(ServerRequestInterface $request): ?string
+    {
+        $params = $request->getServerParams();
+        $value = $params['REMOTE_ADDR'] ?? null;
+        if (!is_string($value) || $value === '') {
+            return null;
+        }
+
+        return $value;
+    }
+
+    private function notFound(): ResponseInterface
+    {
+        $response = $this->responseFactory->createResponse(404)
+            ->withHeader('Content-Type', 'application/json');
+        $response->getBody()->write((string) json_encode(['error' => 'not_found']));
+
+        return $response;
+    }
+}

+ 60 - 0
api/src/Infrastructure/Http/Middleware/InternalTokenMiddleware.php

@@ -0,0 +1,60 @@
+<?php
+
+declare(strict_types=1);
+
+namespace App\Infrastructure\Http\Middleware;
+
+use Psr\Http\Message\ResponseFactoryInterface;
+use Psr\Http\Message\ResponseInterface;
+use Psr\Http\Message\ServerRequestInterface;
+use Psr\Http\Server\MiddlewareInterface;
+use Psr\Http\Server\RequestHandlerInterface;
+
+/**
+ * Validates `Authorization: Bearer <INTERNAL_JOB_TOKEN>` for `/internal/*`.
+ * Uses `hash_equals` to avoid timing leaks. Mismatch returns 401 with the
+ * same uniform error envelope used by the public token middleware.
+ *
+ * Network gate runs before this — by the time we get here, the source is
+ * already trusted to be RFC1918/loopback.
+ *
+ * If `INTERNAL_JOB_TOKEN` is empty (the env var was forgotten), this
+ * middleware refuses every request — better to fail closed than to expose
+ * the endpoints unauthenticated to anything inside the docker network.
+ */
+final class InternalTokenMiddleware implements MiddlewareInterface
+{
+    public function __construct(
+        private readonly ResponseFactoryInterface $responseFactory,
+        private readonly string $expectedToken,
+    ) {
+    }
+
+    public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface
+    {
+        if ($this->expectedToken === '') {
+            return $this->unauthorized();
+        }
+
+        $header = $request->getHeaderLine('Authorization');
+        if ($header === '' || stripos($header, 'Bearer ') !== 0) {
+            return $this->unauthorized();
+        }
+
+        $presented = trim(substr($header, 7));
+        if (!hash_equals($this->expectedToken, $presented)) {
+            return $this->unauthorized();
+        }
+
+        return $handler->handle($request);
+    }
+
+    private function unauthorized(): ResponseInterface
+    {
+        $response = $this->responseFactory->createResponse(401)
+            ->withHeader('Content-Type', 'application/json');
+        $response->getBody()->write((string) json_encode(['error' => 'unauthorized']));
+
+        return $response;
+    }
+}

+ 109 - 0
api/src/Infrastructure/Jobs/JobLockRepository.php

@@ -0,0 +1,109 @@
+<?php
+
+declare(strict_types=1);
+
+namespace App\Infrastructure\Jobs;
+
+use DateTimeImmutable;
+use Doctrine\DBAL\Connection;
+use Doctrine\DBAL\Exception\UniqueConstraintViolationException;
+use Throwable;
+
+/**
+ * Atomic acquire/release for `job_locks`. The same SQL works on SQLite and
+ * MySQL because the locking effect comes from the `job_name` PRIMARY KEY:
+ * a second concurrent INSERT raises a unique-constraint violation and we
+ * map that to "lock held".
+ *
+ * Pattern (per SPEC §4 / M05):
+ *   1. BEGIN
+ *   2. DELETE rows where expires_at < now   ← reclaims crashed-process locks
+ *   3. INSERT (job_name, …)                 ← fails on PK collision if held
+ *   4. COMMIT
+ *
+ * Crash recovery works because `expires_at = now + maxRuntime + buffer`. A
+ * process that dies before `release()` leaves a stale row that the next
+ * caller's step-2 sweep removes before re-attempting the insert.
+ */
+final class JobLockRepository
+{
+    public function __construct(private readonly Connection $connection)
+    {
+    }
+
+    public function tryAcquire(
+        string $jobName,
+        DateTimeImmutable $now,
+        DateTimeImmutable $expiresAt,
+        string $owner,
+    ): bool {
+        try {
+            $this->connection->beginTransaction();
+            $this->connection->executeStatement(
+                'DELETE FROM job_locks WHERE expires_at < :now',
+                ['now' => $now->format('Y-m-d H:i:s')],
+            );
+            $this->connection->executeStatement(
+                'INSERT INTO job_locks (job_name, acquired_by, acquired_at, expires_at) '
+                . 'VALUES (:job, :owner, :acquired, :expires)',
+                [
+                    'job' => $jobName,
+                    'owner' => $owner,
+                    'acquired' => $now->format('Y-m-d H:i:s'),
+                    'expires' => $expiresAt->format('Y-m-d H:i:s'),
+                ],
+            );
+            $this->connection->commit();
+
+            return true;
+        } catch (UniqueConstraintViolationException) {
+            if ($this->connection->isTransactionActive()) {
+                $this->connection->rollBack();
+            }
+
+            return false;
+        } catch (Throwable $e) {
+            if ($this->connection->isTransactionActive()) {
+                $this->connection->rollBack();
+            }
+
+            throw $e;
+        }
+    }
+
+    /**
+     * Releases the lock only if `owner` matches — defensive: a process that
+     * overran its `expires_at` and then tries to release shouldn't kick out
+     * a freshly-acquired lock held by a different owner.
+     */
+    public function release(string $jobName, string $owner): void
+    {
+        $this->connection->executeStatement(
+            'DELETE FROM job_locks WHERE job_name = :job AND acquired_by = :owner',
+            ['job' => $jobName, 'owner' => $owner],
+        );
+    }
+
+    /**
+     * Read-only snapshot for status reporting. Returns null if not held.
+     *
+     * @return array{acquired_by: string, acquired_at: string, expires_at: string}|null
+     */
+    public function status(string $jobName): ?array
+    {
+        /** @var array<string, mixed>|false $row */
+        $row = $this->connection->fetchAssociative(
+            'SELECT acquired_by, acquired_at, expires_at FROM job_locks WHERE job_name = :job',
+            ['job' => $jobName],
+        );
+        if ($row === false) {
+            return null;
+        }
+
+        return [
+            'acquired_by' => (string) $row['acquired_by'],
+            'acquired_at' => (string) $row['acquired_at'],
+            'expires_at' => (string) $row['expires_at'],
+        ];
+    }
+}

+ 49 - 0
api/src/Infrastructure/Jobs/JobRegistry.php

@@ -0,0 +1,49 @@
+<?php
+
+declare(strict_types=1);
+
+namespace App\Infrastructure\Jobs;
+
+use App\Domain\Jobs\Job;
+use RuntimeException;
+
+/**
+ * Holds the canonical list of registered jobs. The container fills this on
+ * boot; the JobsController and TickJob look up jobs by name.
+ *
+ * Order of registration matters only for the tick dispatcher's iteration —
+ * not for correctness, but `tick` will execute due jobs in registration
+ * order.
+ */
+final class JobRegistry
+{
+    /** @var array<string, Job> */
+    private array $jobs = [];
+
+    public function register(Job $job): void
+    {
+        $this->jobs[$job->name()] = $job;
+    }
+
+    public function get(string $name): Job
+    {
+        if (!isset($this->jobs[$name])) {
+            throw new RuntimeException(sprintf('Unknown job: %s', $name));
+        }
+
+        return $this->jobs[$name];
+    }
+
+    public function has(string $name): bool
+    {
+        return isset($this->jobs[$name]);
+    }
+
+    /**
+     * @return array<string, Job>
+     */
+    public function all(): array
+    {
+        return $this->jobs;
+    }
+}

+ 163 - 0
api/src/Infrastructure/Jobs/JobRunRepository.php

@@ -0,0 +1,163 @@
+<?php
+
+declare(strict_types=1);
+
+namespace App\Infrastructure\Jobs;
+
+use App\Domain\Jobs\JobStatus;
+use DateTimeImmutable;
+use DateTimeZone;
+use Doctrine\DBAL\Connection;
+
+/**
+ * Append + update for `job_runs`. The runner writes a `running` row at the
+ * start (so a crashed process leaves visible debris) and updates it to a
+ * final status when the body completes — successful, failed, or
+ * `skipped_locked` if the lock was already held.
+ */
+final class JobRunRepository
+{
+    public function __construct(private readonly Connection $connection)
+    {
+    }
+
+    public function startRun(
+        string $jobName,
+        DateTimeImmutable $startedAt,
+        string $triggeredBy,
+    ): int {
+        $this->connection->insert('job_runs', [
+            'job_name' => $jobName,
+            'status' => JobStatus::Running->value,
+            'items_processed' => 0,
+            'error_message' => null,
+            'triggered_by' => $triggeredBy,
+            'started_at' => $startedAt->format('Y-m-d H:i:s'),
+            'finished_at' => null,
+        ]);
+
+        return (int) $this->connection->lastInsertId();
+    }
+
+    public function finishRun(
+        int $runId,
+        JobStatus $status,
+        int $itemsProcessed,
+        DateTimeImmutable $finishedAt,
+        ?string $errorMessage = null,
+    ): void {
+        $this->connection->update(
+            'job_runs',
+            [
+                'status' => $status->value,
+                'items_processed' => $itemsProcessed,
+                'finished_at' => $finishedAt->format('Y-m-d H:i:s'),
+                'error_message' => $errorMessage,
+            ],
+            ['id' => $runId],
+        );
+    }
+
+    /**
+     * Append a single finalised row in one statement — used for the
+     * `skipped_locked` path where there's nothing to update because no
+     * `running` row was written.
+     */
+    public function recordCompleted(
+        string $jobName,
+        JobStatus $status,
+        int $itemsProcessed,
+        DateTimeImmutable $startedAt,
+        DateTimeImmutable $finishedAt,
+        string $triggeredBy,
+        ?string $errorMessage = null,
+    ): int {
+        $this->connection->insert('job_runs', [
+            'job_name' => $jobName,
+            'status' => $status->value,
+            'items_processed' => $itemsProcessed,
+            'error_message' => $errorMessage,
+            'triggered_by' => $triggeredBy,
+            'started_at' => $startedAt->format('Y-m-d H:i:s'),
+            'finished_at' => $finishedAt->format('Y-m-d H:i:s'),
+        ]);
+
+        return (int) $this->connection->lastInsertId();
+    }
+
+    /**
+     * Latest row per job name. Used by /internal/jobs/status and by the
+     * tick dispatcher to decide which jobs are due.
+     *
+     * @return array<string, array{
+     *     id: int,
+     *     job_name: string,
+     *     status: string,
+     *     items_processed: int,
+     *     error_message: ?string,
+     *     triggered_by: string,
+     *     started_at: ?DateTimeImmutable,
+     *     finished_at: ?DateTimeImmutable
+     * }>
+     */
+    public function latestPerJob(): array
+    {
+        // Two-step: the inner query finds the latest started_at per
+        // job_name; the outer joins back to grab the row. Works on both
+        // SQLite and MySQL without window functions.
+        /** @var list<array<string, mixed>> $rows */
+        $rows = $this->connection->fetchAllAssociative(
+            'SELECT r.id, r.job_name, r.status, r.items_processed, r.error_message, '
+            . 'r.triggered_by, r.started_at, r.finished_at '
+            . 'FROM job_runs r '
+            . 'INNER JOIN ('
+            . '  SELECT job_name, MAX(started_at) AS m FROM job_runs GROUP BY job_name'
+            . ') x ON x.job_name = r.job_name AND x.m = r.started_at',
+        );
+
+        $out = [];
+        foreach ($rows as $row) {
+            $name = (string) $row['job_name'];
+            $out[$name] = [
+                'id' => (int) $row['id'],
+                'job_name' => $name,
+                'status' => (string) $row['status'],
+                'items_processed' => (int) $row['items_processed'],
+                'error_message' => $row['error_message'] !== null ? (string) $row['error_message'] : null,
+                'triggered_by' => (string) $row['triggered_by'],
+                'started_at' => self::parseTs($row['started_at']),
+                'finished_at' => self::parseTs($row['finished_at']),
+            ];
+        }
+
+        return $out;
+    }
+
+    /**
+     * @return array{
+     *     id: int,
+     *     job_name: string,
+     *     status: string,
+     *     items_processed: int,
+     *     error_message: ?string,
+     *     triggered_by: string,
+     *     started_at: ?DateTimeImmutable,
+     *     finished_at: ?DateTimeImmutable
+     * }|null
+     */
+    public function latestForJob(string $jobName): ?array
+    {
+        $latest = $this->latestPerJob();
+
+        return $latest[$jobName] ?? null;
+    }
+
+    private static function parseTs(mixed $value): ?DateTimeImmutable
+    {
+        if ($value === null || $value === '') {
+            return null;
+        }
+
+        return new DateTimeImmutable((string) $value, new DateTimeZone('UTC'));
+    }
+}

+ 116 - 0
api/src/Infrastructure/Jobs/JobRunner.php

@@ -0,0 +1,116 @@
+<?php
+
+declare(strict_types=1);
+
+namespace App\Infrastructure\Jobs;
+
+use App\Domain\Jobs\Job;
+use App\Domain\Jobs\JobContext;
+use App\Domain\Jobs\JobOutcome;
+use App\Domain\Jobs\JobStatus;
+use App\Domain\Time\Clock;
+use Psr\Log\LoggerInterface;
+use Throwable;
+
+/**
+ * Orchestrates a single job invocation:
+ *  - Tries to acquire the job_locks row; on collision returns
+ *    `skipped_locked` (still writes a `job_runs` row so the failure is
+ *    auditable).
+ *  - On acquisition: writes a `running` row, runs the job, captures
+ *    success or exception, updates the row, releases the lock.
+ *  - Returns a `JobOutcome` describing what happened.
+ *
+ * Each invocation gets a unique `owner` string (`pid/random_hex`) so
+ * `release()` can refuse to delete a lock that's been re-acquired by a
+ * different process after an `expires_at` reclaim.
+ */
+final class JobRunner
+{
+    public function __construct(
+        private readonly JobLockRepository $locks,
+        private readonly JobRunRepository $runs,
+        private readonly Clock $clock,
+        private readonly LoggerInterface $logger,
+        private readonly int $lockBufferSeconds = 30,
+    ) {
+    }
+
+    /**
+     * @param array<string, mixed> $params
+     */
+    public function run(Job $job, array $params, string $triggeredBy): JobOutcome
+    {
+        $owner = sprintf('%d/%s', getmypid() ?: 0, bin2hex(random_bytes(4)));
+        $start = $this->clock->now();
+        $expires = $start->modify(sprintf('+%d seconds', $job->maxRuntimeSeconds() + $this->lockBufferSeconds));
+
+        if (!$this->locks->tryAcquire($job->name(), $start, $expires, $owner)) {
+            $finish = $this->clock->now();
+            $runId = $this->runs->recordCompleted(
+                $job->name(),
+                JobStatus::SkippedLocked,
+                0,
+                $start,
+                $finish,
+                $triggeredBy,
+                'lock held by another worker',
+            );
+
+            return new JobOutcome(
+                jobName: $job->name(),
+                status: JobStatus::SkippedLocked,
+                itemsProcessed: 0,
+                durationMs: self::elapsedMs($start, $finish),
+                runId: $runId,
+                errorMessage: 'lock held by another worker',
+            );
+        }
+
+        $runId = $this->runs->startRun($job->name(), $start, $triggeredBy);
+        $context = new JobContext($this->clock, $this->logger, $params);
+
+        try {
+            $result = $job->run($context);
+            $finish = $this->clock->now();
+            $this->runs->finishRun($runId, JobStatus::Success, $result->itemsProcessed, $finish);
+
+            return new JobOutcome(
+                jobName: $job->name(),
+                status: JobStatus::Success,
+                itemsProcessed: $result->itemsProcessed,
+                durationMs: self::elapsedMs($start, $finish),
+                runId: $runId,
+            );
+        } catch (Throwable $e) {
+            $finish = $this->clock->now();
+            $this->logger->error('job_failed', [
+                'job' => $job->name(),
+                'run_id' => $runId,
+                'error' => $e->getMessage(),
+                'trace' => $e->getTraceAsString(),
+            ]);
+            $this->runs->finishRun($runId, JobStatus::Failure, 0, $finish, $e->getMessage());
+
+            return new JobOutcome(
+                jobName: $job->name(),
+                status: JobStatus::Failure,
+                itemsProcessed: 0,
+                durationMs: self::elapsedMs($start, $finish),
+                runId: $runId,
+                errorMessage: $e->getMessage(),
+            );
+        } finally {
+            $this->locks->release($job->name(), $owner);
+        }
+    }
+
+    private static function elapsedMs(\DateTimeImmutable $start, \DateTimeImmutable $finish): int
+    {
+        $diffMicros = $finish->getTimestamp() - $start->getTimestamp();
+        $diffMicros = $diffMicros * 1_000_000
+            + ((int) $finish->format('u') - (int) $start->format('u'));
+
+        return (int) max(0, intdiv($diffMicros, 1000));
+    }
+}

+ 75 - 0
api/src/Infrastructure/Reputation/IpScoreRepository.php

@@ -67,4 +67,79 @@ final class IpScoreRepository extends RepositoryBase
 
         return $value === false ? null : (float) $value;
     }
+
+    /**
+     * All (ip_bin, ip_text, category_id) currently in `ip_scores` whose
+     * `recomputed_at` is older than `$staleBefore` (NULLs counted as stale
+     * — defensive for older rows). Used by RecomputeScoresJob to refresh
+     * decay on pairs that haven't been touched lately.
+     *
+     * @return list<array{ip_bin: string, ip_text: string, category_id: int}>
+     */
+    public function pairsStaleBefore(DateTimeImmutable $staleBefore): array
+    {
+        $stmt = $this->connection()->prepare(
+            'SELECT ip_bin, ip_text, category_id FROM ip_scores '
+            . 'WHERE recomputed_at IS NULL OR recomputed_at < :before '
+            . 'ORDER BY ip_bin, category_id'
+        );
+        $stmt->bindValue('before', $staleBefore->format('Y-m-d H:i:s'));
+
+        /** @var list<array<string, mixed>> $rows */
+        $rows = $stmt->executeQuery()->fetchAllAssociative();
+        $out = [];
+        foreach ($rows as $row) {
+            $out[] = [
+                'ip_bin' => (string) $row['ip_bin'],
+                'ip_text' => (string) $row['ip_text'],
+                'category_id' => (int) $row['category_id'],
+            ];
+        }
+
+        return $out;
+    }
+
+    /**
+     * Every pair in `ip_scores` — used by full recompute to sweep rows
+     * whose underlying reports may all be older than the cutoff (so they
+     * wouldn't appear in `reports.distinctPairsSince` anymore).
+     *
+     * @return list<array{ip_bin: string, ip_text: string, category_id: int}>
+     */
+    public function allPairs(): array
+    {
+        /** @var list<array<string, mixed>> $rows */
+        $rows = $this->connection()->fetchAllAssociative(
+            'SELECT ip_bin, ip_text, category_id FROM ip_scores ORDER BY ip_bin, category_id'
+        );
+        $out = [];
+        foreach ($rows as $row) {
+            $out[] = [
+                'ip_bin' => (string) $row['ip_bin'],
+                'ip_text' => (string) $row['ip_text'],
+                'category_id' => (int) $row['category_id'],
+            ];
+        }
+
+        return $out;
+    }
+
+    /**
+     * Drops rows whose score is below `$minScore` AND whose
+     * `last_report_at` is older than `$staleBefore`. SPEC §5: prune dead
+     * pairs but keep recently-active ones even if their score temporarily
+     * dipped.
+     *
+     * Returns the number of rows deleted.
+     */
+    public function dropStale(float $minScore, DateTimeImmutable $staleBefore): int
+    {
+        return (int) $this->connection()->executeStatement(
+            'DELETE FROM ip_scores WHERE score < :min AND last_report_at < :before',
+            [
+                'min' => number_format($minScore, 4, '.', ''),
+                'before' => $staleBefore->format('Y-m-d H:i:s'),
+            ],
+        );
+    }
 }

+ 54 - 0
api/src/Infrastructure/Reputation/ReportRepository.php

@@ -89,4 +89,58 @@ final class ReportRepository extends RepositoryBase
 
         return (int) $stmt->executeQuery()->fetchOne();
     }
+
+    /**
+     * Most-recent `received_at` for a pair, ignoring the cutoff window.
+     * Used by RecomputeScoresJob to set `ip_scores.last_report_at`
+     * accurately so the drop-stale rule (score < threshold AND
+     * last_report_at < now − 90 days) fires on truly dead pairs.
+     */
+    public function lastReceivedAt(string $ipBin, int $categoryId): ?DateTimeImmutable
+    {
+        $stmt = $this->connection()->prepare(
+            'SELECT MAX(received_at) FROM reports WHERE ip_bin = :ip AND category_id = :cat'
+        );
+        $stmt->bindValue('ip', $ipBin, ParameterType::LARGE_OBJECT);
+        $stmt->bindValue('cat', $categoryId, ParameterType::INTEGER);
+        $value = $stmt->executeQuery()->fetchOne();
+        if ($value === false || $value === null) {
+            return null;
+        }
+
+        return new DateTimeImmutable((string) $value, new DateTimeZone('UTC'));
+    }
+
+    /**
+     * Distinct (ip_bin, ip_text, category_id) triplets from `reports` whose
+     * `received_at` is at or after `$since`. Used by RecomputeScoresJob in
+     * incremental mode (recently-touched pairs) and full mode (entire
+     * table; pass the epoch).
+     *
+     * @return list<array{ip_bin: string, ip_text: string, category_id: int}>
+     */
+    public function distinctPairsSince(DateTimeImmutable $since, ?int $limit = null): array
+    {
+        $sql = 'SELECT DISTINCT ip_bin, ip_text, category_id FROM reports '
+            . 'WHERE received_at >= :since ORDER BY ip_bin, category_id';
+        if ($limit !== null) {
+            $sql .= ' LIMIT ' . (int) $limit;
+        }
+
+        $stmt = $this->connection()->prepare($sql);
+        $stmt->bindValue('since', $since->format('Y-m-d H:i:s'));
+
+        /** @var list<array<string, mixed>> $rows */
+        $rows = $stmt->executeQuery()->fetchAllAssociative();
+        $out = [];
+        foreach ($rows as $row) {
+            $out[] = [
+                'ip_bin' => (string) $row['ip_bin'],
+                'ip_text' => (string) $row['ip_text'],
+                'category_id' => (int) $row['category_id'],
+            ];
+        }
+
+        return $out;
+    }
 }

+ 198 - 0
api/tests/Integration/Internal/JobsEndpointsTest.php

@@ -0,0 +1,198 @@
+<?php
+
+declare(strict_types=1);
+
+namespace App\Tests\Integration\Internal;
+
+use App\Tests\Integration\Support\AppTestCase;
+
+/**
+ * End-to-end coverage for `/internal/jobs/*`. The AppTestCase boots a Slim
+ * app whose ServerRequestFactory does NOT populate REMOTE_ADDR — so we
+ * inject one explicitly via the request() helper override below to satisfy
+ * InternalNetworkMiddleware.
+ */
+final class JobsEndpointsTest extends AppTestCase
+{
+    private const TOKEN = 'test-internal-token';
+
+    protected function setUp(): void
+    {
+        // Ensure the container picks up our internal token.
+        putenv('INTERNAL_JOB_TOKEN=' . self::TOKEN);
+        $_ENV['INTERNAL_JOB_TOKEN'] = self::TOKEN;
+        $_SERVER['INTERNAL_JOB_TOKEN'] = self::TOKEN;
+
+        parent::setUp();
+
+        // Override the internal-token middleware in the container so the
+        // running Slim app validates against our test token. The base
+        // AppTestCase wires a settings bundle that doesn't include the
+        // token, so we patch the InternalTokenMiddleware factory directly.
+        if (method_exists($this->container, 'set')) {
+            /** @var \DI\Container $container */
+            $container = $this->container;
+            $container->set(
+                \App\Infrastructure\Http\Middleware\InternalTokenMiddleware::class,
+                new \App\Infrastructure\Http\Middleware\InternalTokenMiddleware(
+                    new \Slim\Psr7\Factory\ResponseFactory(),
+                    self::TOKEN,
+                ),
+            );
+        }
+        // Rebuild the app so it picks up the patched middleware.
+        $this->app = \App\App\AppFactory::build($this->container);
+    }
+
+    public function testTickRequiresToken(): void
+    {
+        $resp = $this->internalRequest('POST', '/internal/jobs/tick', headers: []);
+        self::assertSame(401, $resp->getStatusCode());
+    }
+
+    public function testTickRejectsWrongToken(): void
+    {
+        $resp = $this->internalRequest(
+            'POST',
+            '/internal/jobs/tick',
+            headers: ['Authorization' => 'Bearer wrong'],
+        );
+        self::assertSame(401, $resp->getStatusCode());
+    }
+
+    public function testExternalSourceReturns404OpaquePerSpec(): void
+    {
+        $resp = $this->internalRequest(
+            'POST',
+            '/internal/jobs/tick',
+            headers: ['Authorization' => 'Bearer ' . self::TOKEN],
+            remoteAddr: '8.8.8.8',
+        );
+        self::assertSame(404, $resp->getStatusCode());
+    }
+
+    public function testTickSuccessProducesEnvelope(): void
+    {
+        $resp = $this->internalRequest(
+            'POST',
+            '/internal/jobs/tick',
+            headers: ['Authorization' => 'Bearer ' . self::TOKEN],
+        );
+        self::assertSame(200, $resp->getStatusCode());
+        $body = $this->decode($resp);
+        self::assertSame('tick', $body['job']);
+        self::assertSame('success', $body['status']);
+        self::assertArrayHasKey('run_id', $body);
+        self::assertArrayHasKey('duration_ms', $body);
+    }
+
+    public function testRecomputeSucceedsWithEmptyTables(): void
+    {
+        $resp = $this->internalRequest(
+            'POST',
+            '/internal/jobs/recompute-scores',
+            headers: ['Authorization' => 'Bearer ' . self::TOKEN],
+        );
+        self::assertSame(200, $resp->getStatusCode());
+        $body = $this->decode($resp);
+        self::assertSame('recompute-scores', $body['job']);
+        self::assertSame('success', $body['status']);
+    }
+
+    public function testConcurrentRecomputeProducesOneSuccessOneSkipped(): void
+    {
+        // Pre-acquire the lock to simulate "another worker is busy". The
+        // second HTTP call must come back 409 with skipped_locked.
+        $now = (new \DateTimeImmutable('now', new \DateTimeZone('UTC')));
+        $lockSql = 'INSERT INTO job_locks (job_name, acquired_by, acquired_at, expires_at) '
+            . 'VALUES (:job, :owner, :acquired, :expires)';
+        $this->db->executeStatement($lockSql, [
+            'job' => 'recompute-scores',
+            'owner' => 'OTHER',
+            'acquired' => $now->format('Y-m-d H:i:s'),
+            'expires' => $now->modify('+5 minutes')->format('Y-m-d H:i:s'),
+        ]);
+
+        $resp = $this->internalRequest(
+            'POST',
+            '/internal/jobs/recompute-scores',
+            headers: ['Authorization' => 'Bearer ' . self::TOKEN],
+            body: '{"full": true}',
+        );
+        self::assertSame(409, $resp->getStatusCode());
+        $body = $this->decode($resp);
+        self::assertSame('skipped_locked', $body['status']);
+    }
+
+    public function testRefreshGeoipReturns412(): void
+    {
+        $resp = $this->internalRequest(
+            'POST',
+            '/internal/jobs/refresh-geoip',
+            headers: ['Authorization' => 'Bearer ' . self::TOKEN],
+        );
+        self::assertSame(412, $resp->getStatusCode());
+        $body = $this->decode($resp);
+        self::assertSame('not_implemented', $body['error']);
+    }
+
+    public function testStatusListsAllRegisteredJobs(): void
+    {
+        $resp = $this->internalRequest(
+            'GET',
+            '/internal/jobs/status',
+            headers: ['Authorization' => 'Bearer ' . self::TOKEN],
+        );
+        self::assertSame(200, $resp->getStatusCode());
+        $body = $this->decode($resp);
+        self::assertArrayHasKey('jobs', $body);
+        self::assertArrayHasKey('recompute-scores', $body['jobs']);
+        self::assertArrayHasKey('cleanup-audit', $body['jobs']);
+        self::assertArrayHasKey('enrich-pending', $body['jobs']);
+        self::assertArrayHasKey('tick', $body['jobs']);
+    }
+
+    public function testCleanupAuditSucceedsAndRecordsRun(): void
+    {
+        $resp = $this->internalRequest(
+            'POST',
+            '/internal/jobs/cleanup-audit',
+            headers: ['Authorization' => 'Bearer ' . self::TOKEN],
+        );
+        self::assertSame(200, $resp->getStatusCode());
+        $body = $this->decode($resp);
+        self::assertSame('cleanup-audit', $body['job']);
+        self::assertSame('success', $body['status']);
+
+        $row = $this->db->fetchAssociative(
+            "SELECT triggered_by, status FROM job_runs WHERE job_name = 'cleanup-audit'"
+        );
+        self::assertNotFalse($row);
+        self::assertSame('schedule', $row['triggered_by']);
+        self::assertSame('success', $row['status']);
+    }
+
+    /**
+     * @param array<string, string> $headers
+     */
+    private function internalRequest(
+        string $method,
+        string $path,
+        array $headers = [],
+        ?string $body = null,
+        string $remoteAddr = '127.0.0.1',
+    ): \Psr\Http\Message\ResponseInterface {
+        $factory = new \Slim\Psr7\Factory\ServerRequestFactory();
+        $request = $factory->createServerRequest($method, $path, ['REMOTE_ADDR' => $remoteAddr]);
+        foreach ($headers as $name => $value) {
+            $request = $request->withHeader($name, $value);
+        }
+        if ($body !== null) {
+            $stream = (new \Slim\Psr7\Factory\StreamFactory())->createStream($body);
+            $request = $request->withBody($stream);
+            $request = $request->withHeader('Content-Type', 'application/json');
+        }
+
+        return $this->app->handle($request);
+    }
+}

+ 75 - 0
api/tests/Integration/Jobs/JobLockRepositoryTest.php

@@ -0,0 +1,75 @@
+<?php
+
+declare(strict_types=1);
+
+namespace App\Tests\Integration\Jobs;
+
+use App\Infrastructure\Jobs\JobLockRepository;
+use App\Tests\Integration\Support\AppTestCase;
+use DateTimeImmutable;
+use DateTimeZone;
+
+/**
+ * Lock semantics tested directly against SQLite. Two scenarios that matter
+ * for SPEC §4 (`tryAcquire`):
+ *  1. Two concurrent acquires → one wins, one returns false.
+ *  2. An expired lock from a crashed prior owner is reclaimed on next
+ *     acquire (the delete-if-expired pre-step inside the transaction).
+ */
+final class JobLockRepositoryTest extends AppTestCase
+{
+    private JobLockRepository $locks;
+
+    protected function setUp(): void
+    {
+        parent::setUp();
+        $this->locks = new JobLockRepository($this->db);
+    }
+
+    public function testAcquireReleaseAcquireRoundTrip(): void
+    {
+        $now = new DateTimeImmutable('2026-04-29T00:00:00Z', new DateTimeZone('UTC'));
+        $expires = $now->modify('+5 minutes');
+
+        self::assertTrue($this->locks->tryAcquire('demo', $now, $expires, 'A'));
+        $this->locks->release('demo', 'A');
+        self::assertTrue($this->locks->tryAcquire('demo', $now, $expires, 'B'));
+    }
+
+    public function testSecondAcquireWhileHeldReturnsFalse(): void
+    {
+        $now = new DateTimeImmutable('2026-04-29T00:00:00Z', new DateTimeZone('UTC'));
+        $expires = $now->modify('+5 minutes');
+
+        self::assertTrue($this->locks->tryAcquire('demo', $now, $expires, 'A'));
+        self::assertFalse($this->locks->tryAcquire('demo', $now, $expires, 'B'));
+    }
+
+    public function testExpiredLockReclaimedOnNextAcquire(): void
+    {
+        $past = new DateTimeImmutable('2026-04-29T00:00:00Z', new DateTimeZone('UTC'));
+        $expires = $past->modify('+5 minutes');
+
+        self::assertTrue($this->locks->tryAcquire('demo', $past, $expires, 'A'));
+
+        // jump forward past expires_at — the next acquire must succeed
+        // because the in-transaction delete sweeps the stale row first.
+        $now = $past->modify('+1 hour');
+        self::assertTrue($this->locks->tryAcquire('demo', $now, $now->modify('+5 minutes'), 'B'));
+
+        // The lock is now held by B.
+        $status = $this->locks->status('demo');
+        self::assertNotNull($status);
+        self::assertSame('B', $status['acquired_by']);
+    }
+
+    public function testReleaseWithDifferentOwnerIsNoOp(): void
+    {
+        $now = new DateTimeImmutable('2026-04-29T00:00:00Z', new DateTimeZone('UTC'));
+        self::assertTrue($this->locks->tryAcquire('demo', $now, $now->modify('+5 minutes'), 'A'));
+
+        // Defensive: B can't kick A out by releasing.
+        $this->locks->release('demo', 'B');
+        self::assertFalse($this->locks->tryAcquire('demo', $now, $now->modify('+5 minutes'), 'C'));
+    }
+}

+ 163 - 0
api/tests/Integration/Jobs/JobRunnerTest.php

@@ -0,0 +1,163 @@
+<?php
+
+declare(strict_types=1);
+
+namespace App\Tests\Integration\Jobs;
+
+use App\Domain\Jobs\Job;
+use App\Domain\Jobs\JobContext;
+use App\Domain\Jobs\JobResult;
+use App\Domain\Jobs\JobStatus;
+use App\Domain\Time\FixedClock;
+use App\Infrastructure\Jobs\JobLockRepository;
+use App\Infrastructure\Jobs\JobRunner;
+use App\Infrastructure\Jobs\JobRunRepository;
+use App\Tests\Integration\Support\AppTestCase;
+use Monolog\Handler\NullHandler;
+use Monolog\Logger;
+use RuntimeException;
+
+/**
+ * Lock contention is the headline concurrency property M05 has to get
+ * right. This test simulates two callers racing for the same job by
+ * pre-acquiring the lock manually before invoking the runner — far simpler
+ * than spawning processes, and tests exactly the same code path.
+ */
+final class JobRunnerTest extends AppTestCase
+{
+    private JobLockRepository $locks;
+    private JobRunRepository $runs;
+    private FixedClock $clock;
+    private JobRunner $runner;
+
+    protected function setUp(): void
+    {
+        parent::setUp();
+        $this->locks = new JobLockRepository($this->db);
+        $this->runs = new JobRunRepository($this->db);
+        $this->clock = FixedClock::at('2026-04-29T00:00:00Z');
+        $logger = new Logger('test');
+        $logger->pushHandler(new NullHandler());
+        $this->runner = new JobRunner($this->locks, $this->runs, $this->clock, $logger);
+    }
+
+    public function testSuccessfulRunWritesRowAndReleasesLock(): void
+    {
+        $job = new class () implements Job {
+            public function name(): string
+            {
+                return 'demo';
+            }
+
+            public function defaultIntervalSeconds(): int
+            {
+                return 60;
+            }
+
+            public function maxRuntimeSeconds(): int
+            {
+                return 30;
+            }
+
+            public function run(JobContext $context): JobResult
+            {
+                return new JobResult(itemsProcessed: 7);
+            }
+        };
+
+        $outcome = $this->runner->run($job, [], 'manual');
+        self::assertSame(JobStatus::Success, $outcome->status);
+        self::assertSame(7, $outcome->itemsProcessed);
+
+        // Lock is released — a second run must succeed.
+        $second = $this->runner->run($job, [], 'manual');
+        self::assertSame(JobStatus::Success, $second->status);
+
+        $rows = $this->db->fetchAllAssociative("SELECT status FROM job_runs WHERE job_name='demo'");
+        self::assertCount(2, $rows);
+        foreach ($rows as $row) {
+            self::assertSame('success', $row['status']);
+        }
+    }
+
+    public function testFailedRunCapturesErrorMessageAndReleasesLock(): void
+    {
+        $job = new class () implements Job {
+            public function name(): string
+            {
+                return 'failing';
+            }
+
+            public function defaultIntervalSeconds(): int
+            {
+                return 60;
+            }
+
+            public function maxRuntimeSeconds(): int
+            {
+                return 30;
+            }
+
+            public function run(JobContext $context): JobResult
+            {
+                throw new RuntimeException('intentional');
+            }
+        };
+
+        $outcome = $this->runner->run($job, [], 'manual');
+        self::assertSame(JobStatus::Failure, $outcome->status);
+        self::assertSame('intentional', $outcome->errorMessage);
+
+        // Lock released even on failure → next run also fails (not skipped).
+        $second = $this->runner->run($job, [], 'manual');
+        self::assertSame(JobStatus::Failure, $second->status);
+    }
+
+    public function testLockContentionProducesOneSuccessAndOneSkipped(): void
+    {
+        // Pre-acquire the lock to simulate "another worker is already
+        // running this job", then dispatch a fresh runner invocation. The
+        // second one must come back as skipped_locked, not block.
+        $now = $this->clock->now();
+        $this->locks->tryAcquire(
+            'busy',
+            $now,
+            $now->modify('+10 minutes'),
+            'OTHER-WORKER',
+        );
+
+        $job = new class () implements Job {
+            public bool $bodyExecuted = false;
+
+            public function name(): string
+            {
+                return 'busy';
+            }
+
+            public function defaultIntervalSeconds(): int
+            {
+                return 60;
+            }
+
+            public function maxRuntimeSeconds(): int
+            {
+                return 30;
+            }
+
+            public function run(JobContext $context): JobResult
+            {
+                $this->bodyExecuted = true; // @codeCoverageIgnore — must NOT be reached
+
+                return new JobResult(0);
+            }
+        };
+
+        $outcome = $this->runner->run($job, [], 'api');
+        self::assertSame(JobStatus::SkippedLocked, $outcome->status);
+        self::assertFalse($job->bodyExecuted, 'job body must not run when lock is held');
+
+        $rows = $this->db->fetchAllAssociative("SELECT status FROM job_runs WHERE job_name='busy'");
+        self::assertCount(1, $rows);
+        self::assertSame('skipped_locked', $rows[0]['status']);
+    }
+}

+ 157 - 0
api/tests/Integration/Jobs/RecomputeScoresJobTest.php

@@ -0,0 +1,157 @@
+<?php
+
+declare(strict_types=1);
+
+namespace App\Tests\Integration\Jobs;
+
+use App\Application\Jobs\RecomputeScoresJob;
+use App\Domain\Ip\IpAddress;
+use App\Domain\Jobs\JobContext;
+use App\Domain\Reputation\PairScorer;
+use App\Domain\Time\FixedClock;
+use App\Infrastructure\Category\CategoryRepository;
+use App\Infrastructure\Reputation\IpScoreRepository;
+use App\Infrastructure\Reputation\ReportRepository;
+use App\Tests\Integration\Support\AppTestCase;
+use Doctrine\DBAL\ParameterType;
+use Monolog\Handler\NullHandler;
+use Monolog\Logger;
+
+/**
+ * Bulk recompute over real reports rows. The headline scenario is the M05
+ * acceptance: clock-forward 30 days between two reports with a 14-day
+ * exponential half-life and verify the score matches `0.5^(30/14) +
+ * 0.5^(0/14) = 1 + ~0.226 = ~1.226`.
+ */
+final class RecomputeScoresJobTest extends AppTestCase
+{
+    public function testFullModeAppliesDecayAcross30DayWindow(): void
+    {
+        $clock = FixedClock::at('2026-04-29T00:00:00Z');
+
+        // 30 days ago: a report
+        $reporterId = $this->createReporter('decay-test');
+        $ip = IpAddress::fromString('203.0.113.42');
+        /** @var int $catId */
+        $catId = (int) $this->db->fetchOne("SELECT id FROM categories WHERE slug='brute_force'");
+        self::assertGreaterThan(0, $catId);
+
+        $thirtyDaysAgo = $clock->now()->modify('-30 days');
+        $this->insertReport($ip, $catId, $reporterId, 1.0, $thirtyDaysAgo);
+        // and a fresh report just now
+        $this->insertReport($ip, $catId, $reporterId, 1.0, $clock->now());
+
+        // Pre-existing ip_scores row so the recompute touches it. (In real
+        // life the synchronous ingest path would have created this; for an
+        // isolated test we simulate with a stale row.)
+        $this->db->insert('ip_scores', [
+            'ip_bin' => $ip->binary(),
+            'ip_text' => $ip->text(),
+            'category_id' => $catId,
+            'score' => '0.0000',
+            'report_count_30d' => 0,
+            'last_report_at' => $thirtyDaysAgo->format('Y-m-d H:i:s'),
+            'recomputed_at' => $thirtyDaysAgo->format('Y-m-d H:i:s'),
+        ], ['ip_bin' => ParameterType::LARGE_OBJECT]);
+
+        $job = $this->buildJob($clock);
+        $logger = new Logger('test');
+        $logger->pushHandler(new NullHandler());
+        $result = $job->run(new JobContext($clock, $logger, ['full' => true]));
+
+        self::assertGreaterThan(0, $result->itemsProcessed);
+
+        $newScore = (float) $this->db->fetchOne(
+            'SELECT score FROM ip_scores WHERE ip_bin = :bin AND category_id = :cat',
+            ['bin' => $ip->binary(), 'cat' => $catId],
+            ['bin' => ParameterType::LARGE_OBJECT, 'cat' => ParameterType::INTEGER],
+        );
+
+        // Expected: 1.0 (fresh) + 0.5^(30/14) (30-day-old) ≈ 1.2266.
+        $expected = 1.0 + 0.5 ** (30.0 / 14.0);
+        self::assertEqualsWithDelta($expected, $newScore, 0.001);
+    }
+
+    public function testIncrementalModePicksUpRecentlyTouchedPairs(): void
+    {
+        $clock = FixedClock::at('2026-04-29T00:00:00Z');
+        $reporterId = $this->createReporter('inc-test');
+        $ip = IpAddress::fromString('198.51.100.7');
+        /** @var int $catId */
+        $catId = (int) $this->db->fetchOne("SELECT id FROM categories WHERE slug='scanner'");
+
+        $this->insertReport($ip, $catId, $reporterId, 1.0, $clock->now()->modify('-2 minutes'));
+
+        $job = $this->buildJob($clock);
+        $logger = new Logger('test');
+        $logger->pushHandler(new NullHandler());
+        $result = $job->run(new JobContext($clock, $logger, []));
+
+        self::assertGreaterThanOrEqual(1, $result->itemsProcessed);
+
+        $score = (float) $this->db->fetchOne(
+            'SELECT score FROM ip_scores WHERE ip_bin = :bin AND category_id = :cat',
+            ['bin' => $ip->binary(), 'cat' => $catId],
+            ['bin' => ParameterType::LARGE_OBJECT, 'cat' => ParameterType::INTEGER],
+        );
+        // Fresh report → score ~ 1.0
+        self::assertEqualsWithDelta(1.0, $score, 0.01);
+    }
+
+    public function testDropsStaleNearZeroPairs(): void
+    {
+        $clock = FixedClock::at('2026-04-29T00:00:00Z');
+        $ip = IpAddress::fromString('192.0.2.99');
+        /** @var int $catId */
+        $catId = (int) $this->db->fetchOne("SELECT id FROM categories WHERE slug='spam'");
+
+        // ip_scores row with a score below threshold and an ancient
+        // last_report_at — meets the drop rule.
+        $this->db->insert('ip_scores', [
+            'ip_bin' => $ip->binary(),
+            'ip_text' => $ip->text(),
+            'category_id' => $catId,
+            'score' => '0.0001',
+            'report_count_30d' => 0,
+            'last_report_at' => $clock->now()->modify('-200 days')->format('Y-m-d H:i:s'),
+            'recomputed_at' => $clock->now()->modify('-200 days')->format('Y-m-d H:i:s'),
+        ], ['ip_bin' => ParameterType::LARGE_OBJECT]);
+
+        // No reports → recompute will set last_report_at to (now - 366d)
+        // → drop rule fires.
+        $job = $this->buildJob($clock);
+        $logger = new Logger('test');
+        $logger->pushHandler(new NullHandler());
+        $job->run(new JobContext($clock, $logger, ['full' => true]));
+
+        $remaining = (int) $this->db->fetchOne(
+            'SELECT COUNT(*) FROM ip_scores WHERE ip_bin = :bin AND category_id = :cat',
+            ['bin' => $ip->binary(), 'cat' => $catId],
+            ['bin' => ParameterType::LARGE_OBJECT, 'cat' => ParameterType::INTEGER],
+        );
+        self::assertSame(0, $remaining);
+    }
+
+    private function insertReport(IpAddress $ip, int $catId, int $reporterId, float $weight, \DateTimeImmutable $when): void
+    {
+        $this->db->insert('reports', [
+            'ip_bin' => $ip->binary(),
+            'ip_text' => $ip->text(),
+            'category_id' => $catId,
+            'reporter_id' => $reporterId,
+            'weight_at_report' => number_format($weight, 2, '.', ''),
+            'metadata_json' => null,
+            'received_at' => $when->format('Y-m-d H:i:s'),
+        ], ['ip_bin' => ParameterType::LARGE_OBJECT]);
+    }
+
+    private function buildJob(FixedClock $clock): RecomputeScoresJob
+    {
+        $reports = new ReportRepository($this->db);
+        $ipScores = new IpScoreRepository($this->db);
+        $categories = new CategoryRepository($this->db);
+        $scorer = new PairScorer($reports, $categories, $clock, 365);
+
+        return new RecomputeScoresJob($reports, $ipScores, $scorer);
+    }
+}

+ 190 - 0
api/tests/Integration/Jobs/TickJobTest.php

@@ -0,0 +1,190 @@
+<?php
+
+declare(strict_types=1);
+
+namespace App\Tests\Integration\Jobs;
+
+use App\Application\Jobs\TickJob;
+use App\Domain\Jobs\Job;
+use App\Domain\Jobs\JobContext;
+use App\Domain\Jobs\JobResult;
+use App\Domain\Jobs\JobStatus;
+use App\Domain\Time\FixedClock;
+use App\Infrastructure\Jobs\JobLockRepository;
+use App\Infrastructure\Jobs\JobRunner;
+use App\Infrastructure\Jobs\JobRunRepository;
+use App\Tests\Integration\Support\AppTestCase;
+use Monolog\Handler\NullHandler;
+use Monolog\Logger;
+
+/**
+ * Tick must invoke only jobs whose interval has elapsed since their last
+ * finished run. Two stub jobs with different intervals + a freshly-recorded
+ * run for one of them validates the dispatcher's "is due?" logic.
+ */
+final class TickJobTest extends AppTestCase
+{
+    public function testInvokesOnlyDueJobs(): void
+    {
+        $clock = FixedClock::at('2026-04-29T00:00:00Z');
+        $logger = new Logger('test');
+        $logger->pushHandler(new NullHandler());
+
+        $hot = new StubJob('hot', 60);    // due every minute
+        $cold = new StubJob('cold', 3600); // due every hour
+        $never = new StubJob('never-run', 60); // no prior run → always due
+
+        // Pre-write `job_runs` rows: "hot" finished 30 seconds ago (NOT
+        // due yet), "cold" finished 30 seconds ago (also not due).
+        $this->db->insert('job_runs', [
+            'job_name' => 'hot',
+            'status' => 'success',
+            'items_processed' => 0,
+            'triggered_by' => 'manual',
+            'started_at' => $clock->now()->modify('-31 seconds')->format('Y-m-d H:i:s'),
+            'finished_at' => $clock->now()->modify('-30 seconds')->format('Y-m-d H:i:s'),
+        ]);
+        $this->db->insert('job_runs', [
+            'job_name' => 'cold',
+            'status' => 'success',
+            'items_processed' => 0,
+            'triggered_by' => 'manual',
+            'started_at' => $clock->now()->modify('-31 seconds')->format('Y-m-d H:i:s'),
+            'finished_at' => $clock->now()->modify('-30 seconds')->format('Y-m-d H:i:s'),
+        ]);
+
+        $runner = new JobRunner(
+            new JobLockRepository($this->db),
+            new JobRunRepository($this->db),
+            $clock,
+            $logger,
+        );
+
+        $tick = new TickJob(
+            jobsResolver: static fn (): array => [
+                'hot' => $hot,
+                'cold' => $cold,
+                'never-run' => $never,
+            ],
+            runner: $runner,
+            runs: new JobRunRepository($this->db),
+        );
+
+        $result = $tick->run(new JobContext($clock, $logger, []));
+
+        // Only `never-run` should fire — the other two are within their
+        // interval window.
+        self::assertSame(1, $result->itemsProcessed);
+        self::assertSame(['never-run'], $result->details['invoked']);
+        self::assertFalse($hot->wasInvoked, 'hot must not have run');
+        self::assertFalse($cold->wasInvoked, 'cold must not have run');
+        self::assertTrue($never->wasInvoked, 'never-run should have run');
+    }
+
+    public function testDueAfterIntervalElapses(): void
+    {
+        $clock = FixedClock::at('2026-04-29T00:00:00Z');
+        $logger = new Logger('test');
+        $logger->pushHandler(new NullHandler());
+
+        $job = new StubJob('schedulable', 60);
+
+        $this->db->insert('job_runs', [
+            'job_name' => 'schedulable',
+            'status' => 'success',
+            'items_processed' => 0,
+            'triggered_by' => 'schedule',
+            'started_at' => $clock->now()->modify('-2 minutes')->format('Y-m-d H:i:s'),
+            'finished_at' => $clock->now()->modify('-2 minutes')->format('Y-m-d H:i:s'),
+        ]);
+
+        $runner = new JobRunner(
+            new JobLockRepository($this->db),
+            new JobRunRepository($this->db),
+            $clock,
+            $logger,
+        );
+
+        $tick = new TickJob(
+            jobsResolver: static fn (): array => ['schedulable' => $job],
+            runner: $runner,
+            runs: new JobRunRepository($this->db),
+        );
+
+        $result = $tick->run(new JobContext($clock, $logger, []));
+        self::assertSame(1, $result->itemsProcessed);
+        self::assertTrue($job->wasInvoked);
+    }
+
+    public function testFailingChildJobDoesNotAbortDispatcher(): void
+    {
+        $clock = FixedClock::at('2026-04-29T00:00:00Z');
+        $logger = new Logger('test');
+        $logger->pushHandler(new NullHandler());
+
+        $broken = new StubJob('broken', 60, throw: true);
+        $healthy = new StubJob('healthy', 60);
+
+        $runner = new JobRunner(
+            new JobLockRepository($this->db),
+            new JobRunRepository($this->db),
+            $clock,
+            $logger,
+        );
+
+        $tick = new TickJob(
+            jobsResolver: static fn (): array => ['broken' => $broken, 'healthy' => $healthy],
+            runner: $runner,
+            runs: new JobRunRepository($this->db),
+        );
+
+        $result = $tick->run(new JobContext($clock, $logger, []));
+        // Both due (no prior runs); both invoked.
+        self::assertSame(2, $result->itemsProcessed);
+
+        // The broken child's row exists with failure status.
+        $rows = $this->db->fetchAllAssociative('SELECT job_name, status FROM job_runs ORDER BY id');
+        $brokenRow = array_values(array_filter(
+            $rows,
+            static fn (array $r): bool => $r['job_name'] === 'broken'
+        ))[0];
+        self::assertSame(JobStatus::Failure->value, $brokenRow['status']);
+    }
+}
+
+final class StubJob implements Job
+{
+    public bool $wasInvoked = false;
+
+    public function __construct(
+        private readonly string $name,
+        private readonly int $interval,
+        private readonly bool $throw = false,
+    ) {
+    }
+
+    public function name(): string
+    {
+        return $this->name;
+    }
+
+    public function defaultIntervalSeconds(): int
+    {
+        return $this->interval;
+    }
+
+    public function maxRuntimeSeconds(): int
+    {
+        return 30;
+    }
+
+    public function run(JobContext $context): JobResult
+    {
+        $this->wasInvoked = true;
+        if ($this->throw) {
+            throw new \RuntimeException('stub failure');
+        }
+
+        return new JobResult(itemsProcessed: 1);
+    }
+}

+ 83 - 0
api/tests/Integration/Reputation/PairScorerCutoffTest.php

@@ -0,0 +1,83 @@
+<?php
+
+declare(strict_types=1);
+
+namespace App\Tests\Integration\Reputation;
+
+use App\Domain\Ip\IpAddress;
+use App\Domain\Reputation\PairScorer;
+use App\Domain\Time\FixedClock;
+use App\Infrastructure\Category\CategoryRepository;
+use App\Infrastructure\Reputation\ReportRepository;
+use App\Tests\Integration\Support\AppTestCase;
+use Doctrine\DBAL\ParameterType;
+
+/**
+ * The hard cutoff (default 365 days) lives in PairScorer, not in the Decay
+ * function. A report older than the cutoff must be entirely excluded from
+ * the score even if its decay weight would still be non-zero.
+ *
+ * For exponential decay, that's the only way the score reaches *exactly*
+ * zero (the formula `0.5^(age/half_life)` is strictly positive).
+ */
+final class PairScorerCutoffTest extends AppTestCase
+{
+    public function testReportBeyondHardCutoffExcludedEntirely(): void
+    {
+        $clock = FixedClock::at('2026-04-29T00:00:00Z');
+        $reporterId = $this->createReporter('cutoff-test');
+        $ip = IpAddress::fromString('203.0.113.99');
+        /** @var int $catId */
+        $catId = (int) $this->db->fetchOne("SELECT id FROM categories WHERE slug='brute_force'");
+
+        // 400 days ago — beyond the 365-day default cutoff.
+        $this->db->insert('reports', [
+            'ip_bin' => $ip->binary(),
+            'ip_text' => $ip->text(),
+            'category_id' => $catId,
+            'reporter_id' => $reporterId,
+            'weight_at_report' => '1.00',
+            'received_at' => $clock->now()->modify('-400 days')->format('Y-m-d H:i:s'),
+        ], ['ip_bin' => ParameterType::LARGE_OBJECT]);
+
+        $scorer = new PairScorer(
+            new ReportRepository($this->db),
+            new CategoryRepository($this->db),
+            $clock,
+            365,
+        );
+
+        self::assertSame(0.0, $scorer->score($ip->binary(), $catId, $clock->now()));
+    }
+
+    public function testReportInsideCutoffIsCounted(): void
+    {
+        $clock = FixedClock::at('2026-04-29T00:00:00Z');
+        $reporterId = $this->createReporter('inside-cutoff-test');
+        $ip = IpAddress::fromString('203.0.113.123');
+        /** @var int $catId */
+        $catId = (int) $this->db->fetchOne("SELECT id FROM categories WHERE slug='brute_force'");
+
+        // 364 days ago — barely inside the cutoff. With a 14-day half-life,
+        // its weight is essentially zero (0.5^26 ≈ 1.5e-8) but still > 0.
+        $this->db->insert('reports', [
+            'ip_bin' => $ip->binary(),
+            'ip_text' => $ip->text(),
+            'category_id' => $catId,
+            'reporter_id' => $reporterId,
+            'weight_at_report' => '1.00',
+            'received_at' => $clock->now()->modify('-364 days')->format('Y-m-d H:i:s'),
+        ], ['ip_bin' => ParameterType::LARGE_OBJECT]);
+
+        $scorer = new PairScorer(
+            new ReportRepository($this->db),
+            new CategoryRepository($this->db),
+            $clock,
+            365,
+        );
+
+        $score = $scorer->score($ip->binary(), $catId, $clock->now());
+        self::assertGreaterThan(0.0, $score);
+        self::assertLessThan(1e-6, $score);
+    }
+}

+ 76 - 0
api/tests/Unit/Http/InternalNetworkMiddlewareTest.php

@@ -0,0 +1,76 @@
+<?php
+
+declare(strict_types=1);
+
+namespace App\Tests\Unit\Http;
+
+use App\Infrastructure\Http\Middleware\InternalNetworkMiddleware;
+use PHPUnit\Framework\Attributes\DataProvider;
+use PHPUnit\Framework\TestCase;
+use Psr\Http\Message\ResponseInterface;
+use Psr\Http\Message\ServerRequestInterface;
+use Psr\Http\Server\RequestHandlerInterface;
+use Slim\Psr7\Factory\ResponseFactory;
+use Slim\Psr7\Factory\ServerRequestFactory;
+
+/**
+ * Network gate must let RFC1918 + loopback through, 404 everything else,
+ * and never leak via 403 (which would tell attackers the endpoint exists).
+ * The handler is stubbed to a marker response so we can confirm whether
+ * the middleware short-circuited or passed through.
+ */
+final class InternalNetworkMiddlewareTest extends TestCase
+{
+    /**
+     * @return iterable<string, array{string, bool}>
+     */
+    public static function addressProvider(): iterable
+    {
+        yield 'loopback v4' => ['127.0.0.1', true];
+        yield 'loopback v6' => ['::1', true];
+        yield 'rfc1918 10/8' => ['10.5.6.7', true];
+        yield 'rfc1918 172.16/12' => ['172.16.42.1', true];
+        yield 'rfc1918 172.31/12 (boundary)' => ['172.31.255.255', true];
+        yield 'just outside 172.16/12' => ['172.32.0.1', false];
+        yield 'rfc1918 192.168/16' => ['192.168.1.1', true];
+        yield 'public 1.1.1.1' => ['1.1.1.1', false];
+        yield 'public v4' => ['203.0.113.4', false];
+        yield 'public v6' => ['2001:db8::1', false];
+        yield 'malformed' => ['not-an-ip', false];
+        yield 'empty' => ['', false];
+    }
+
+    #[DataProvider('addressProvider')]
+    public function testNetworkGate(string $remoteAddr, bool $shouldPass): void
+    {
+        $middleware = new InternalNetworkMiddleware(new ResponseFactory());
+
+        $req = (new ServerRequestFactory())->createServerRequest(
+            'POST',
+            '/internal/jobs/tick',
+            ['REMOTE_ADDR' => $remoteAddr],
+        );
+
+        $passthrough = new class () implements RequestHandlerInterface {
+            public bool $reached = false;
+
+            public function handle(ServerRequestInterface $request): ResponseInterface
+            {
+                $this->reached = true;
+                $factory = new ResponseFactory();
+
+                return $factory->createResponse(204);
+            }
+        };
+
+        $response = $middleware->process($req, $passthrough);
+
+        if ($shouldPass) {
+            self::assertSame(204, $response->getStatusCode());
+            self::assertTrue($passthrough->reached);
+        } else {
+            self::assertSame(404, $response->getStatusCode());
+            self::assertFalse($passthrough->reached, 'handler must not see disallowed sources');
+        }
+    }
+}

+ 84 - 0
api/tests/Unit/Http/InternalTokenMiddlewareTest.php

@@ -0,0 +1,84 @@
+<?php
+
+declare(strict_types=1);
+
+namespace App\Tests\Unit\Http;
+
+use App\Infrastructure\Http\Middleware\InternalTokenMiddleware;
+use PHPUnit\Framework\TestCase;
+use Psr\Http\Message\ResponseInterface;
+use Psr\Http\Message\ServerRequestInterface;
+use Psr\Http\Server\RequestHandlerInterface;
+use Slim\Psr7\Factory\ResponseFactory;
+use Slim\Psr7\Factory\ServerRequestFactory;
+
+/**
+ * Token gate uses `hash_equals`, refuses an empty configured token, and
+ * lets correct tokens through.
+ */
+final class InternalTokenMiddlewareTest extends TestCase
+{
+    public function testValidTokenPassesThrough(): void
+    {
+        $middleware = new InternalTokenMiddleware(new ResponseFactory(), 'shhh');
+        [$handler, $response] = $this->dispatch($middleware, 'Bearer shhh');
+        self::assertSame(204, $response->getStatusCode());
+        self::assertTrue($handler->reached);
+    }
+
+    public function testWrongTokenIs401(): void
+    {
+        $middleware = new InternalTokenMiddleware(new ResponseFactory(), 'shhh');
+        [$handler, $response] = $this->dispatch($middleware, 'Bearer wrong');
+        self::assertSame(401, $response->getStatusCode());
+        self::assertFalse($handler->reached);
+    }
+
+    public function testMissingHeaderIs401(): void
+    {
+        $middleware = new InternalTokenMiddleware(new ResponseFactory(), 'shhh');
+        [$handler, $response] = $this->dispatch($middleware, null);
+        self::assertSame(401, $response->getStatusCode());
+        self::assertFalse($handler->reached);
+    }
+
+    public function testNonBearerSchemeIs401(): void
+    {
+        $middleware = new InternalTokenMiddleware(new ResponseFactory(), 'shhh');
+        [$handler, $response] = $this->dispatch($middleware, 'Basic shhh');
+        self::assertSame(401, $response->getStatusCode());
+        self::assertFalse($handler->reached);
+    }
+
+    public function testEmptyConfiguredTokenFailsClosed(): void
+    {
+        $middleware = new InternalTokenMiddleware(new ResponseFactory(), '');
+        [$handler, $response] = $this->dispatch($middleware, 'Bearer anything');
+        self::assertSame(401, $response->getStatusCode());
+        self::assertFalse($handler->reached);
+    }
+
+    /**
+     * @return array{0: object, 1: ResponseInterface}
+     */
+    private function dispatch(InternalTokenMiddleware $middleware, ?string $auth): array
+    {
+        $request = (new ServerRequestFactory())->createServerRequest('POST', '/internal/jobs/tick');
+        if ($auth !== null) {
+            $request = $request->withHeader('Authorization', $auth);
+        }
+
+        $handler = new class () implements RequestHandlerInterface {
+            public bool $reached = false;
+
+            public function handle(ServerRequestInterface $request): ResponseInterface
+            {
+                $this->reached = true;
+
+                return (new ResponseFactory())->createResponse(204);
+            }
+        };
+
+        return [$handler, $middleware->process($request, $handler)];
+    }
+}

+ 45 - 0
api/tests/Unit/Jobs/JobOutcomeTest.php

@@ -0,0 +1,45 @@
+<?php
+
+declare(strict_types=1);
+
+namespace App\Tests\Unit\Jobs;
+
+use App\Domain\Jobs\JobOutcome;
+use App\Domain\Jobs\JobStatus;
+use PHPUnit\Framework\TestCase;
+
+/**
+ * The HTTP envelope is the public contract — every status in M05 maps to
+ * a specific code (success=200, skipped_locked=409, failure=500). This
+ * locks the mapping in.
+ */
+final class JobOutcomeTest extends TestCase
+{
+    public function testSuccessMapsTo200AndIncludesAllFields(): void
+    {
+        $outcome = new JobOutcome('demo', JobStatus::Success, 5, 12, 99);
+        self::assertSame(200, $outcome->httpStatus());
+        self::assertSame([
+            'job' => 'demo',
+            'status' => 'success',
+            'items_processed' => 5,
+            'duration_ms' => 12,
+            'run_id' => 99,
+        ], $outcome->toArray());
+    }
+
+    public function testSkippedLockedMapsTo409(): void
+    {
+        $outcome = new JobOutcome('demo', JobStatus::SkippedLocked, 0, 1, 100, 'lock held');
+        self::assertSame(409, $outcome->httpStatus());
+        $array = $outcome->toArray();
+        self::assertSame('skipped_locked', $array['status']);
+        self::assertSame('lock held', $array['error']);
+    }
+
+    public function testFailureMapsTo500(): void
+    {
+        $outcome = new JobOutcome('demo', JobStatus::Failure, 0, 200, 101, 'boom');
+        self::assertSame(500, $outcome->httpStatus());
+    }
+}