1
0

M05-reputation-engine-and-jobs.md 12 KB

M05 — Reputation Engine & Internal Job Endpoints

Fresh Claude Code agent prompt. M04 must be complete and committed. Estimated effort: large.

Mission

Build the reputation engine (full bulk recompute with decay reapplication) and the internal job framework: locks, run history, runner abstraction, the /internal/jobs/* endpoints, network and token middlewares, the tick dispatcher, and a CLI runner. Three job types are wired: recompute-scores, cleanup-audit, enrich-pending (skeleton — full enrichment is M11).

Before you start

  1. Verify M04:

    git log --oneline -4
    cd api && composer test && composer stan && cd ..
    
  2. Read SPEC.md §4 (job_locks, job_runs), §5 (Reputation Engine — recomputation rules), §6 (Internal Jobs API — endpoints, middlewares, response envelope), §10 (where the scheduler comes in).

  3. Confirm clean tree.

Tasks

1. Clock & decay (extend M04)

You already have Decay.php (linear + exponential) and PairScorer.php from M04. Verify they handle hard cutoff (365 days default) correctly. Add tests for:

  • An age beyond cutoff → decay returns 0.
  • Linear with decay_param=30, age=0 → 1.0; age=15 → 0.5; age=30 → 0.0.
  • Exponential with decay_param=14 (half-life), age=14 → 0.5; age=28 → 0.25.

2. Job framework

In api/src/Infrastructure/Jobs/:

  • Job.php — interface: name(): string, defaultIntervalSeconds(): int, maxRuntimeSeconds(): int, run(JobContext $ctx): JobResult.
  • JobContext.php — carries the Clock, a logger, and any per-invocation params ($ctx->param('full', false)).
  • JobResult.phpitemsProcessed: int, details: array.
  • JobLockRepository.php:
    • tryAcquire(string $name, int $maxRuntimeSeconds, string $owner): bool — atomic. Implementation:
    • Begin transaction.
    • Delete rows where expires_at < now.
    • INSERT INTO job_locks (job_name, acquired_at, acquired_by, expires_at) VALUES (...) — fails on PK conflict if held.
    • Commit. Return success/failure.
    • release(string $name, string $owner)DELETE WHERE job_name = ? AND acquired_by = ?.
  • JobRunRepository.php — append rows, query latest per job, query overdue.
  • JobRunner.php:
    • run(Job $job, array $params, string $triggeredBy): JobOutcome — orchestrates: try-acquire → write running row → run → on success/failure write final row → release lock. Always writes a final row even on skipped_locked.
    • Generates a unique owner per invocation (e.g. getmypid() . '/' . random_bytes(4) hex).
  • JobRegistry.php — registers job classes by name; resolves by name.

3. Concrete jobs

In api/src/Application/Jobs/ (or api/src/Infrastructure/Jobs/Tasks/ — pick one and stay consistent):

  • RecomputeScoresJob.php:
    • Default interval: 300s. Max runtime: 240s.
    • Runs in two modes: full (full=true) and incremental (default).
    • Incremental: pairs (ip_bin, category_id) from reports with received_at >= now - interval UNION pairs from ip_scores where recomputed_at < now - freshness_window (default 1 hour). Cap at JOB_RECOMPUTE_MAX_ROWS_PER_TICK.
    • Full: every pair in ip_scores plus every pair in reports. No cap (but bounded by maxRuntimeSeconds).
    • For each pair: call PairScorer::score(), upsert ip_scores. Drop rows where score < 0.01 AND last_report_at < now - 90 days.
  • CleanupAuditJob.php:
    • Default interval: 86400s (daily). Max runtime: 60s.
    • Deletes audit_log rows older than JOB_AUDIT_RETENTION_DAYS. Audit table exists from M02 even though emitter doesn't yet — that's fine.
  • EnrichPendingJob.php:
    • Skeleton only. Default interval: 300s. Max runtime: 60s. For now: no-op that returns items_processed: 0 and logs a debug line. Full implementation in M11.

4. Tick dispatcher

TickJob.php (or TickDispatcher.php — kept in same dir):

  • Iterates the registry. For each job, reads the latest job_runs entry for that name. If now - last_finished_at >= job.defaultInterval (or no row exists), invokes JobRunner::run() for that job. Per-job exceptions are caught and recorded but don't abort the dispatcher.
  • Itself recorded in job_runs as tick. Default interval doesn't apply (it's invoked directly by the scheduler), but max runtime should be ~5 minutes total to avoid the cron piling up.

5. HTTP endpoints

In api/src/Application/Internal/JobsController.php:

  • POST /internal/jobs/recompute-scores — body {full?: bool, max_rows?: int}.
  • POST /internal/jobs/cleanup-audit
  • POST /internal/jobs/enrich-pending
  • POST /internal/jobs/tick
  • POST /internal/jobs/refresh-geoip — for now: returns 412 Precondition Failed with {"error":"not_implemented"}. Real implementation in M11.
  • GET /internal/jobs/status — returns latest job_runs per known job, lock state, overdue: bool, computed against defaultIntervalSeconds.

Response envelope (POST endpoints):

{"job":"recompute-scores","status":"success","items_processed":1284,"duration_ms":8421,"run_id":42}

Statuses: success, failure, skipped_locked. failure returns HTTP 500. skipped_locked returns HTTP 409. Both still write a job_runs row and return the envelope.

6. Middlewares

In api/src/Infrastructure/Http/Middleware/:

  • InternalNetworkMiddleware.php — checks $_SERVER['REMOTE_ADDR'] against the CIDR list 127.0.0.1/32, ::1/128, 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16. Reject with 404 (NOT 403 — be opaque about the existence of these endpoints to outsiders). Use IpAddress and Cidr from M02 for parsing.
  • InternalTokenMiddleware.php — checks Authorization: Bearer <INTERNAL_JOB_TOKEN> (hash_equals). Reject with 401 if mismatch.

Apply both to all /internal/* routes. Order: network → token. (If network fails, don't even acknowledge the auth attempt.)

Also confirm the Caddyfile (from M01) actually applies the network restriction for defense in depth — Caddy returns 404 for non-RFC1918 sources. The PHP middleware is belt-and-suspenders.

7. CLI

Extend api/bin/console:

  • jobs:run <name> [--full] — invokes JobRunner::run() directly. Useful for dev/debugging without HTTP.
  • jobs:status — prints the same data as GET /internal/jobs/status.
  • scores:rebuild — convenience alias for jobs:run recompute-scores --full.

Implementation notes

  • Concurrency: lock acquire+release must survive process crash. The expires_at reclaim handles crashed processes; pick expires_at = now + maxRuntimeSeconds + 30s buffer.
  • Long-running jobs in HTTP: FrankenPHP's worker mode has a per-request timeout. Configure max_execution_time to be longer than your longest maxRuntimeSeconds for /internal/jobs/* routes. Keep public/admin routes at the default lower timeout.
  • DB perf: incremental recompute should batch by reading all touched pair-keys first, then iterating. Avoid N+1 queries — fetch all relevant reports for a batch of pairs in one IN-list query.
  • Drop-stale rule: be careful — score < 0.01 AND last_report_at ≥ 90 days old. Don't drop pairs with recent reports just because their score dropped temporarily (shouldn't happen with correct math, but defensive).
  • Tests: Three critical scenarios:
    1. Decay over time. Inject Clock to advance; verify scores fall predictably.
    2. Lock contention. Two concurrent RecomputeScoresJob runs (use a barrier in tests). Exactly one success, one skipped_locked.
    3. Tick dispatcher invokes only what's due. Set up job_runs history; verify only the right jobs run.
  • Network middleware tests: integration tests bind to 127.0.0.1 so they should pass naturally; add a unit test that constructs a request with a public IP via REMOTE_ADDR mock and asserts 404.

Out of scope (DO NOT)

  • Audit log emission (M12). The cleanup job runs but the table will mostly be empty.
  • GeoIP enrichment logic (M11). The skeleton job no-ops.
  • Allowlist / manual block evaluation (M06). Recompute only updates ip_scores; final blocklist filtering is M07.
  • Distribution endpoint (M07).
  • UI changes.
  • Calling /internal/jobs/* from the UI directly (UI uses the admin job-trigger wrapper added in M12).
  • New dependencies.

Acceptance

cd api && composer cs && composer stan && composer test && cd ..

docker compose down -v
cp .env.example .env
docker compose up -d
sleep 15

ADMIN_TOKEN=$(docker compose exec -T api php bin/console auth:create-token --kind=admin --role=admin --quiet)
INTERNAL_TOKEN=$(grep ^INTERNAL_JOB_TOKEN= .env | cut -d= -f2)

# Internal endpoint requires the internal token
test "$(curl -s -o /dev/null -w '%{http_code}' -X POST http://localhost:8081/internal/jobs/tick)" = "401"
test "$(curl -s -o /dev/null -w '%{http_code}' \
  -H "Authorization: Bearer wrong" \
  -X POST http://localhost:8081/internal/jobs/tick)" = "401"

# tick succeeds
RESP=$(curl -s -X POST -H "Authorization: Bearer $INTERNAL_TOKEN" \
  http://localhost:8081/internal/jobs/tick)
echo "$RESP" | grep -q '"job":"tick"'

# recompute-scores runs
RESP=$(curl -s -X POST -H "Authorization: Bearer $INTERNAL_TOKEN" \
  http://localhost:8081/internal/jobs/recompute-scores)
echo "$RESP" | grep -q '"status":"success"'

# Concurrent calls: exactly one success + one skipped_locked
RESP1_FILE=$(mktemp); RESP2_FILE=$(mktemp)
curl -s -X POST -H "Authorization: Bearer $INTERNAL_TOKEN" \
  -d '{"full":true}' http://localhost:8081/internal/jobs/recompute-scores > $RESP1_FILE &
curl -s -X POST -H "Authorization: Bearer $INTERNAL_TOKEN" \
  -d '{"full":true}' http://localhost:8081/internal/jobs/recompute-scores > $RESP2_FILE &
wait
STATUSES=$(cat $RESP1_FILE $RESP2_FILE | grep -oE '"status":"[a-z_]+"' | sort)
echo "$STATUSES" | grep -q '"status":"success"'
echo "$STATUSES" | grep -q '"status":"skipped_locked"'

# /internal/jobs/status returns per-job state
curl -s -H "Authorization: Bearer $INTERNAL_TOKEN" \
  http://localhost:8081/internal/jobs/status | grep -q '"recompute-scores"'

# Decay over time: insert old reports, recompute, expect lower scores than fresh
# (use the CLI scores:rebuild and inspect ip_scores; this is the trickiest acceptance step)
docker compose exec -T api php bin/console scores:rebuild
docker compose exec -T api sqlite3 /data/irdb.sqlite "SELECT COUNT(*) FROM ip_scores;"

docker compose down -v

Add a focused integration test in PHP that clocks-forward 30 days between reports and asserts a known score with an exponential half-life of 14 days.

Handoff

  1. Commit:

    feat(M05): reputation engine + internal jobs framework
    
    - Job interface, JobLockRepository (atomic acquire), JobRunner, JobRegistry
    - RecomputeScoresJob (full + incremental), CleanupAuditJob, EnrichPendingJob (skeleton)
    - tick dispatcher; /internal/jobs/{recompute-scores,cleanup-audit,enrich-pending,tick,status}
    - InternalNetworkMiddleware + InternalTokenMiddleware (network-bound + token)
    - CLI: jobs:run, jobs:status, scores:rebuild
    
  2. Append to PROGRESS.md:

    ## M05 — Reputation engine & jobs (done)
    
    **Built:** decay math, bulk recompute (incremental + full), job framework with locks, /internal/jobs/*.
    
    **Notes for next milestone:**
    - PairScorer (from M04) is reused by RecomputeScoresJob; both produce identical scores for the same pair.
    - EnrichPendingJob is a skeleton — M11 fills it in.
    - refresh-geoip endpoint returns 412 — M11 wires it up.
    - Job results are returned synchronously; long jobs may exceed default request timeout. /internal/* routes have an extended timeout configured.
    - Drop rule: score < 0.01 AND last_report_at older than 90 days.
    
    **Deviations from SPEC:** none.
    **Added dependencies:** none.
    
  3. Stop. Do not start M06.