IpEnrichmentRepository.php 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Infrastructure\Reputation;
  4. use App\Domain\Enrichment\EnrichmentResult;
  5. use App\Infrastructure\Db\RepositoryBase;
  6. use Doctrine\DBAL\ParameterType;
  7. /**
  8. * Read + write gateway for `ip_enrichment`.
  9. *
  10. * Originally read-only (M09); M11 grew it into the full enrichment-job
  11. * sink. Driver-aware UPSERT mirrors `IpScoreRepository::upsert`.
  12. */
  13. class IpEnrichmentRepository extends RepositoryBase
  14. {
  15. /**
  16. * @return array{country_code: ?string, asn: ?int, as_org: ?string, enriched_at: ?string}|null
  17. */
  18. public function findByIpBin(string $ipBin): ?array
  19. {
  20. $row = $this->fetchByIpBin('ip_enrichment', $ipBin);
  21. if ($row === null) {
  22. return null;
  23. }
  24. return [
  25. 'country_code' => $row['country_code'] !== null ? (string) $row['country_code'] : null,
  26. 'asn' => $row['asn'] !== null ? (int) $row['asn'] : null,
  27. 'as_org' => $row['as_org'] !== null ? (string) $row['as_org'] : null,
  28. 'enriched_at' => $row['enriched_at'] !== null ? (string) $row['enriched_at'] : null,
  29. ];
  30. }
  31. /**
  32. * Batched form of `findByIpBin`. Single SELECT with `IN (…)` over the
  33. * bin set, returning a map keyed by raw `ip_bin` so callers can
  34. * dereference per-row without an extra round-trip per IP.
  35. *
  36. * SEC_REVIEW F32: the admin IPs list previously called
  37. * `findByIpBin` per page row; at `page_size=200` that's 200
  38. * round-trips just for enrichment. Bind binary IN-list params with
  39. * `LARGE_OBJECT` so MySQL treats them as octets, matching `upsert`.
  40. *
  41. * @param list<string> $ipBins
  42. * @return array<string, array{country_code: ?string, asn: ?int, as_org: ?string, enriched_at: ?string}>
  43. */
  44. public function findByIpBins(array $ipBins): array
  45. {
  46. if ($ipBins === []) {
  47. return [];
  48. }
  49. $names = [];
  50. $params = [];
  51. $types = [];
  52. foreach ($ipBins as $i => $bin) {
  53. $name = 'b' . $i;
  54. $names[] = ':' . $name;
  55. $params[$name] = $bin;
  56. $types[$name] = ParameterType::LARGE_OBJECT;
  57. }
  58. $sql = 'SELECT ip_bin, country_code, asn, as_org, enriched_at FROM ip_enrichment '
  59. . 'WHERE ip_bin IN (' . implode(', ', $names) . ')';
  60. /** @var list<array<string, mixed>> $rows */
  61. $rows = $this->connection()->fetchAllAssociative($sql, $params, $types);
  62. $out = [];
  63. foreach ($rows as $row) {
  64. $out[(string) $row['ip_bin']] = [
  65. 'country_code' => $row['country_code'] !== null ? (string) $row['country_code'] : null,
  66. 'asn' => $row['asn'] !== null ? (int) $row['asn'] : null,
  67. 'as_org' => $row['as_org'] !== null ? (string) $row['as_org'] : null,
  68. 'enriched_at' => $row['enriched_at'] !== null ? (string) $row['enriched_at'] : null,
  69. ];
  70. }
  71. return $out;
  72. }
  73. /**
  74. * Insert or update one enrichment row. Driver-aware: SQLite uses
  75. * `ON CONFLICT(ip_bin) DO UPDATE`, MySQL uses `ON DUPLICATE KEY`.
  76. */
  77. public function upsert(string $ipBin, EnrichmentResult $result): void
  78. {
  79. $platform = $this->connection()->getDatabasePlatform()::class;
  80. $isMysql = stripos($platform, 'mysql') !== false || stripos($platform, 'mariadb') !== false;
  81. if ($isMysql) {
  82. $sql = 'INSERT INTO ip_enrichment (ip_bin, country_code, asn, as_org, enriched_at) '
  83. . 'VALUES (:ip_bin, :country, :asn, :as_org, :enriched_at) '
  84. . 'ON DUPLICATE KEY UPDATE '
  85. . 'country_code = VALUES(country_code), asn = VALUES(asn), '
  86. . 'as_org = VALUES(as_org), enriched_at = VALUES(enriched_at)';
  87. } else {
  88. $sql = 'INSERT INTO ip_enrichment (ip_bin, country_code, asn, as_org, enriched_at) '
  89. . 'VALUES (:ip_bin, :country, :asn, :as_org, :enriched_at) '
  90. . 'ON CONFLICT(ip_bin) DO UPDATE SET '
  91. . 'country_code = excluded.country_code, asn = excluded.asn, '
  92. . 'as_org = excluded.as_org, enriched_at = excluded.enriched_at';
  93. }
  94. $stmt = $this->connection()->prepare($sql);
  95. $stmt->bindValue('ip_bin', $ipBin, ParameterType::LARGE_OBJECT);
  96. $stmt->bindValue('country', $result->countryCode);
  97. if ($result->asn === null) {
  98. $stmt->bindValue('asn', null, ParameterType::NULL);
  99. } else {
  100. $stmt->bindValue('asn', $result->asn, ParameterType::INTEGER);
  101. }
  102. $stmt->bindValue('as_org', $result->asOrg);
  103. $stmt->bindValue('enriched_at', $result->enrichedAt->format('Y-m-d H:i:s'));
  104. $stmt->executeStatement();
  105. }
  106. /**
  107. * IPs known to the system (reports OR manual_blocks) but missing
  108. * from `ip_enrichment` (or whose enriched_at was cleared by
  109. * ?reenrich=true). Ordered by oldest first-seen so backlogs catch
  110. * up before newer arrivals.
  111. *
  112. * @return list<string> ip_bin values, length ≤ $limit
  113. */
  114. public function findPending(int $limit): array
  115. {
  116. $sql = <<<SQL
  117. SELECT t.ip_bin AS ip_bin, MIN(t.received_at) AS received_at
  118. FROM (
  119. SELECT ip_bin, received_at FROM reports
  120. UNION ALL
  121. SELECT ip_bin, created_at AS received_at FROM manual_blocks WHERE kind = 'ip' AND ip_bin IS NOT NULL
  122. ) t
  123. LEFT JOIN ip_enrichment e ON e.ip_bin = t.ip_bin AND e.enriched_at IS NOT NULL
  124. WHERE e.ip_bin IS NULL
  125. GROUP BY t.ip_bin
  126. ORDER BY MIN(t.received_at) ASC
  127. LIMIT :limit
  128. SQL;
  129. $stmt = $this->connection()->prepare($sql);
  130. $stmt->bindValue('limit', $limit, ParameterType::INTEGER);
  131. /** @var list<array<string, mixed>> $rows */
  132. $rows = $stmt->executeQuery()->fetchAllAssociative();
  133. $out = [];
  134. foreach ($rows as $row) {
  135. $out[] = (string) $row['ip_bin'];
  136. }
  137. return $out;
  138. }
  139. /**
  140. * Clear `enriched_at` on every row. Used only by the `?reenrich=true`
  141. * flag on refresh-geoip; lets `findPending` re-pick all rows up.
  142. * Returns the affected row count for the job's `items_processed`.
  143. */
  144. public function clearAllEnrichedAt(): int
  145. {
  146. return (int) $this->connection()->executeStatement(
  147. 'UPDATE ip_enrichment SET enriched_at = NULL'
  148. );
  149. }
  150. /**
  151. * Distinct country codes seen so far with their populations.
  152. * Powers the IPs-list country dropdown.
  153. *
  154. * @return list<array{code: string, count: int}>
  155. */
  156. public function countryCounts(): array
  157. {
  158. $rows = $this->connection()->fetchAllAssociative(
  159. 'SELECT country_code AS code, COUNT(*) AS cnt FROM ip_enrichment '
  160. . 'WHERE country_code IS NOT NULL GROUP BY country_code ORDER BY country_code'
  161. );
  162. $out = [];
  163. foreach ($rows as $row) {
  164. $out[] = [
  165. 'code' => (string) $row['code'],
  166. 'count' => (int) $row['cnt'],
  167. ];
  168. }
  169. return $out;
  170. }
  171. }