# M05 — Reputation Engine & Internal Job Endpoints > Fresh Claude Code agent prompt. M04 must be complete and committed. > Estimated effort: large. ## Mission Build the reputation engine (full bulk recompute with decay reapplication) and the internal job framework: locks, run history, runner abstraction, the `/internal/jobs/*` endpoints, network and token middlewares, the `tick` dispatcher, and a CLI runner. Three job types are wired: `recompute-scores`, `cleanup-audit`, `enrich-pending` (skeleton — full enrichment is M11). ## Before you start 1. Verify M04: ```bash git log --oneline -4 cd api && composer test && composer stan && cd .. ``` 2. Read `SPEC.md` §4 (`job_locks`, `job_runs`), §5 (Reputation Engine — recomputation rules), §6 (Internal Jobs API — endpoints, middlewares, response envelope), §10 (where the scheduler comes in). 3. Confirm clean tree. ## Tasks ### 1. Clock & decay (extend M04) You already have `Decay.php` (linear + exponential) and `PairScorer.php` from M04. Verify they handle hard cutoff (365 days default) correctly. Add tests for: - An age beyond cutoff → decay returns 0. - Linear with `decay_param=30`, age=0 → 1.0; age=15 → 0.5; age=30 → 0.0. - Exponential with `decay_param=14` (half-life), age=14 → 0.5; age=28 → 0.25. ### 2. Job framework In `api/src/Infrastructure/Jobs/`: - `Job.php` — interface: `name(): string`, `defaultIntervalSeconds(): int`, `maxRuntimeSeconds(): int`, `run(JobContext $ctx): JobResult`. - `JobContext.php` — carries the `Clock`, a logger, and any per-invocation params (`$ctx->param('full', false)`). - `JobResult.php` — `itemsProcessed: int`, `details: array`. - `JobLockRepository.php`: - `tryAcquire(string $name, int $maxRuntimeSeconds, string $owner): bool` — atomic. Implementation: 1. Begin transaction. 2. Delete rows where `expires_at < now`. 3. `INSERT INTO job_locks (job_name, acquired_at, acquired_by, expires_at) VALUES (...)` — fails on PK conflict if held. 4. Commit. Return success/failure. - `release(string $name, string $owner)` — `DELETE WHERE job_name = ? AND acquired_by = ?`. - `JobRunRepository.php` — append rows, query latest per job, query overdue. - `JobRunner.php`: - `run(Job $job, array $params, string $triggeredBy): JobOutcome` — orchestrates: try-acquire → write `running` row → run → on success/failure write final row → release lock. Always writes a final row even on `skipped_locked`. - Generates a unique `owner` per invocation (e.g. `getmypid() . '/' . random_bytes(4) hex`). - `JobRegistry.php` — registers job classes by name; resolves by name. ### 3. Concrete jobs In `api/src/Application/Jobs/` (or `api/src/Infrastructure/Jobs/Tasks/` — pick one and stay consistent): - `RecomputeScoresJob.php`: - Default interval: 300s. Max runtime: 240s. - Runs in two modes: full (`full=true`) and incremental (default). - Incremental: pairs `(ip_bin, category_id)` from `reports` with `received_at >= now - interval` UNION pairs from `ip_scores` where `recomputed_at < now - freshness_window` (default 1 hour). Cap at `JOB_RECOMPUTE_MAX_ROWS_PER_TICK`. - Full: every pair in `ip_scores` plus every pair in `reports`. No cap (but bounded by `maxRuntimeSeconds`). - For each pair: call `PairScorer::score()`, upsert `ip_scores`. Drop rows where score < 0.01 AND `last_report_at < now - 90 days`. - `CleanupAuditJob.php`: - Default interval: 86400s (daily). Max runtime: 60s. - Deletes `audit_log` rows older than `JOB_AUDIT_RETENTION_DAYS`. Audit table exists from M02 even though emitter doesn't yet — that's fine. - `EnrichPendingJob.php`: - Skeleton only. Default interval: 300s. Max runtime: 60s. For now: no-op that returns `items_processed: 0` and logs a debug line. Full implementation in M11. ### 4. Tick dispatcher `TickJob.php` (or `TickDispatcher.php` — kept in same dir): - Iterates the registry. For each job, reads the latest `job_runs` entry for that name. If `now - last_finished_at >= job.defaultInterval` (or no row exists), invokes `JobRunner::run()` for that job. Per-job exceptions are caught and recorded but don't abort the dispatcher. - Itself recorded in `job_runs` as `tick`. Default interval doesn't apply (it's invoked directly by the scheduler), but max runtime should be ~5 minutes total to avoid the cron piling up. ### 5. HTTP endpoints In `api/src/Application/Internal/JobsController.php`: - `POST /internal/jobs/recompute-scores` — body `{full?: bool, max_rows?: int}`. - `POST /internal/jobs/cleanup-audit` - `POST /internal/jobs/enrich-pending` - `POST /internal/jobs/tick` - `POST /internal/jobs/refresh-geoip` — for now: returns `412 Precondition Failed` with `{"error":"not_implemented"}`. Real implementation in M11. - `GET /internal/jobs/status` — returns latest `job_runs` per known job, lock state, `overdue: bool`, computed against `defaultIntervalSeconds`. Response envelope (POST endpoints): ```json {"job":"recompute-scores","status":"success","items_processed":1284,"duration_ms":8421,"run_id":42} ``` Statuses: `success`, `failure`, `skipped_locked`. `failure` returns HTTP 500. `skipped_locked` returns HTTP 409. Both still write a `job_runs` row and return the envelope. ### 6. Middlewares In `api/src/Infrastructure/Http/Middleware/`: - `InternalNetworkMiddleware.php` — checks `$_SERVER['REMOTE_ADDR']` against the CIDR list `127.0.0.1/32, ::1/128, 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16`. Reject with `404` (NOT 403 — be opaque about the existence of these endpoints to outsiders). Use `IpAddress` and `Cidr` from M02 for parsing. - `InternalTokenMiddleware.php` — checks `Authorization: Bearer ` (`hash_equals`). Reject with `401` if mismatch. Apply both to all `/internal/*` routes. Order: network → token. (If network fails, don't even acknowledge the auth attempt.) Also confirm the Caddyfile (from M01) actually applies the network restriction for defense in depth — Caddy returns 404 for non-RFC1918 sources. The PHP middleware is belt-and-suspenders. ### 7. CLI Extend `api/bin/console`: - `jobs:run [--full]` — invokes `JobRunner::run()` directly. Useful for dev/debugging without HTTP. - `jobs:status` — prints the same data as `GET /internal/jobs/status`. - `scores:rebuild` — convenience alias for `jobs:run recompute-scores --full`. ## Implementation notes - **Concurrency**: lock acquire+release must survive process crash. The `expires_at` reclaim handles crashed processes; pick `expires_at = now + maxRuntimeSeconds + 30s buffer`. - **Long-running jobs in HTTP**: FrankenPHP's worker mode has a per-request timeout. Configure `max_execution_time` to be longer than your longest `maxRuntimeSeconds` for `/internal/jobs/*` routes. Keep public/admin routes at the default lower timeout. - **DB perf**: incremental recompute should batch by reading all touched pair-keys first, then iterating. Avoid N+1 queries — fetch all relevant `reports` for a batch of pairs in one IN-list query. - **Drop-stale rule**: be careful — score < 0.01 AND `last_report_at` ≥ 90 days old. Don't drop pairs with recent reports just because their score dropped temporarily (shouldn't happen with correct math, but defensive). - **Tests**: Three critical scenarios: 1. Decay over time. Inject `Clock` to advance; verify scores fall predictably. 2. Lock contention. Two concurrent `RecomputeScoresJob` runs (use a barrier in tests). Exactly one `success`, one `skipped_locked`. 3. Tick dispatcher invokes only what's due. Set up `job_runs` history; verify only the right jobs run. - **Network middleware tests**: integration tests bind to `127.0.0.1` so they should pass naturally; add a unit test that constructs a request with a public IP via `REMOTE_ADDR` mock and asserts 404. ## Out of scope (DO NOT) - Audit log emission (M12). The cleanup job runs but the table will mostly be empty. - GeoIP enrichment logic (M11). The skeleton job no-ops. - Allowlist / manual block evaluation (M06). Recompute only updates `ip_scores`; final blocklist filtering is M07. - Distribution endpoint (M07). - UI changes. - Calling `/internal/jobs/*` from the UI directly (UI uses the admin job-trigger wrapper added in M12). - New dependencies. ## Acceptance ```bash cd api && composer cs && composer stan && composer test && cd .. docker compose down -v cp .env.example .env docker compose up -d sleep 15 ADMIN_TOKEN=$(docker compose exec -T api php bin/console auth:create-token --kind=admin --role=admin --quiet) INTERNAL_TOKEN=$(grep ^INTERNAL_JOB_TOKEN= .env | cut -d= -f2) # Internal endpoint requires the internal token test "$(curl -s -o /dev/null -w '%{http_code}' -X POST http://localhost:8081/internal/jobs/tick)" = "401" test "$(curl -s -o /dev/null -w '%{http_code}' \ -H "Authorization: Bearer wrong" \ -X POST http://localhost:8081/internal/jobs/tick)" = "401" # tick succeeds RESP=$(curl -s -X POST -H "Authorization: Bearer $INTERNAL_TOKEN" \ http://localhost:8081/internal/jobs/tick) echo "$RESP" | grep -q '"job":"tick"' # recompute-scores runs RESP=$(curl -s -X POST -H "Authorization: Bearer $INTERNAL_TOKEN" \ http://localhost:8081/internal/jobs/recompute-scores) echo "$RESP" | grep -q '"status":"success"' # Concurrent calls: exactly one success + one skipped_locked RESP1_FILE=$(mktemp); RESP2_FILE=$(mktemp) curl -s -X POST -H "Authorization: Bearer $INTERNAL_TOKEN" \ -d '{"full":true}' http://localhost:8081/internal/jobs/recompute-scores > $RESP1_FILE & curl -s -X POST -H "Authorization: Bearer $INTERNAL_TOKEN" \ -d '{"full":true}' http://localhost:8081/internal/jobs/recompute-scores > $RESP2_FILE & wait STATUSES=$(cat $RESP1_FILE $RESP2_FILE | grep -oE '"status":"[a-z_]+"' | sort) echo "$STATUSES" | grep -q '"status":"success"' echo "$STATUSES" | grep -q '"status":"skipped_locked"' # /internal/jobs/status returns per-job state curl -s -H "Authorization: Bearer $INTERNAL_TOKEN" \ http://localhost:8081/internal/jobs/status | grep -q '"recompute-scores"' # Decay over time: insert old reports, recompute, expect lower scores than fresh # (use the CLI scores:rebuild and inspect ip_scores; this is the trickiest acceptance step) docker compose exec -T api php bin/console scores:rebuild docker compose exec -T api sqlite3 /data/irdb.sqlite "SELECT COUNT(*) FROM ip_scores;" docker compose down -v ``` Add a focused integration test in PHP that clocks-forward 30 days between reports and asserts a known score with an exponential half-life of 14 days. ## Handoff 1. Commit: ``` feat(M05): reputation engine + internal jobs framework - Job interface, JobLockRepository (atomic acquire), JobRunner, JobRegistry - RecomputeScoresJob (full + incremental), CleanupAuditJob, EnrichPendingJob (skeleton) - tick dispatcher; /internal/jobs/{recompute-scores,cleanup-audit,enrich-pending,tick,status} - InternalNetworkMiddleware + InternalTokenMiddleware (network-bound + token) - CLI: jobs:run, jobs:status, scores:rebuild ``` 2. Append to `PROGRESS.md`: ```markdown ## M05 — Reputation engine & jobs (done) **Built:** decay math, bulk recompute (incremental + full), job framework with locks, /internal/jobs/*. **Notes for next milestone:** - PairScorer (from M04) is reused by RecomputeScoresJob; both produce identical scores for the same pair. - EnrichPendingJob is a skeleton — M11 fills it in. - refresh-geoip endpoint returns 412 — M11 wires it up. - Job results are returned synchronously; long jobs may exceed default request timeout. /internal/* routes have an extended timeout configured. - Drop rule: score < 0.01 AND last_report_at older than 90 days. **Deviations from SPEC:** none. **Added dependencies:** none. ``` 3. **Stop.** Do not start M06.