Merge pull request 'release: 2026-04-16.4 (50건 커밋)' (#58) from develop into main

This commit is contained in:
htlee 2026-04-16 11:10:45 +09:00
커밋 7d101604cc
8개의 변경된 파일629개의 추가작업 그리고 77개의 파일을 삭제

파일 보기

@ -4,6 +4,23 @@
## [Unreleased]
## [2026-04-16.4]
### 추가
- **G-02 금어기 조업 탐지**`fishery_permit_cn.fishing_period_1/2` 파싱(YYYY/MM/DD 범위) + `classify_gear_violations()` 에 permit_periods/observation_ts 인자 추가. 허가기간 밖 조업 시 `CLOSED_SEASON_FISHING` judgment
- **G-03 미등록/허가외 어구 탐지** — 감지 어구와 `fishery_code` 허용 목록 대조(PT→trawl, GN→gillnet, FC→금지 등). 불일치 시 `UNREGISTERED_GEAR` judgment
- **NAME_FUZZY 매칭** — 선박명 정규화(공백/대소문자/'NO.' 마커 통일, 선박번호 유지) + name_en 기반 fuzzy lookup. 동명이 중복 방지. 매칭률 9.1% → 53.1%
- **서버 스크립트 tier/G-02/G-03/match_method 추적** — diagnostic(5분) + hourly(1시간) 에 pair_tier 분포, reject 사유 카운터, fishery_code×match_method 교차, G-02/G-03 상세 섹션
### 변경
- **pair_trawl tier 분류** — AND 게이트(스펙 100% 2시간) 대신 STRONG/PROBABLE/SUSPECT 3단계. 완화 임계(800m/SOG 1.5-5/sog_delta 1.0/cog 20°)로 실제 공조 신호 포착. G-06 판정은 STRONG/PROBABLE 만
- **pair_trawl join key** — raw AIS timestamp → `time_bucket`(5분 리샘플). sog/cog on-demand 계산(`_ensure_sog_cog`)으로 vessel_store._tracks 직접 사용
- **pair base 확장** — classification 500척 → 전체 중국 MID(412/413/414) 조업 속력대 선박. candidates 61→1,668, detected 0→57
- **match_ais_to_registry 대상 확장** — vessel_dfs(500척) → vessel_store._tracks 전체 중국 선박(8k+)
### 수정
- **violation_classifier**`CLOSED_SEASON_FISHING`, `UNREGISTERED_GEAR` judgment → `ILLEGAL_GEAR` 카테고리 매핑 추가
## [2026-04-16.3]
### 변경

파일 보기

@ -2,14 +2,15 @@
어구 위반 G코드 분류 프레임워크 (DAR-03)
G-01: 허가수역 조업 (zone-gear mismatch)
G-02: 금어기 조업 (fishing outside permit period)
G-03: 미등록/허가외 어구 (detected gear registered fishery_code)
G-04: MMSI 조작 의심 (gear signal on/off cycling)
G-05: 어구 인위적 이동 (fixed gear drift > threshold)
G-06: 쌍끌이 공조 조업 (pair trawl from pair_trawl.py)
G-02 (금어기), G-03 (미등록 어구) 외부 데이터 필요하여 보류.
"""
import math
import logging
from datetime import datetime
from typing import Optional
import pandas as pd
@ -18,10 +19,27 @@ logger = logging.getLogger(__name__)
# G-code score weights
G01_SCORE = 15 # 비허가 수역 조업
G02_SCORE = 18 # 금어기 조업
G03_SCORE = 12 # 미등록/허가외 어구
G04_SCORE = 10 # MMSI 조작 의심
G05_SCORE = 5 # 고정어구 인위적 이동
G06_SCORE = 20 # 쌍끌이 공조 탐지
# G-03: 허가 업종코드 → 허용 어구 유형 매핑 (fishery_permit_cn.fishery_code 기준)
# PT = 2척식저인망(쌍끌이 본선), PT-S = 부속선 → trawl/pair_trawl
# GN = 자망 → gillnet
# PS = 선망(둘러치기) → purse_seine
# OT = 외끌이저인망 → trawl
# FC = 운반선 → 조업 금지
FISHERY_CODE_ALLOWED_GEAR: dict[str, set[str]] = {
'PT': {'PT', 'TRAWL', 'PT-S'},
'PT-S': {'PT', 'TRAWL', 'PT-S'},
'GN': {'GN', 'GNS', 'GND', 'GILLNET'},
'PS': {'PS', 'PURSE'},
'OT': {'OT', 'TRAWL'},
'FC': set(),
}
# G-04 thresholds
SIGNAL_CYCLING_GAP_MIN = 30 # minutes
SIGNAL_CYCLING_MIN_COUNT = 2
@ -131,6 +149,41 @@ def _detect_gear_drift(
}
def _is_in_closed_season(
ts: Optional[datetime],
permit_periods: Optional[list[tuple[datetime, datetime]]],
) -> bool:
"""허가 조업 기간 밖이면 금어기 조업 (G-02) 으로 판정.
permit_periods 비어 있으면 데이터 부재로 판정 불가 False.
ts None 이면 판정 불가 False.
"""
if not permit_periods or ts is None:
return False
try:
ts_naive = ts.replace(tzinfo=None) if ts.tzinfo is not None else ts
except AttributeError:
return False
return not any(start <= ts_naive <= end for start, end in permit_periods)
def _is_unregistered_gear(
detected_gear: Optional[str],
registered_fishery_code: Optional[str],
) -> bool:
"""감지된 어구가 허가 업종코드의 허용 어구에 포함되지 않으면 G-03.
하나라도 없으면 판정 불가 False (데이터 부족).
"""
if not detected_gear or not registered_fishery_code:
return False
detected_norm = detected_gear.upper().strip()
allowed = FISHERY_CODE_ALLOWED_GEAR.get(registered_fishery_code.upper().strip())
if allowed is None:
return False # 미지의 업종코드 — 판정 보류
return detected_norm not in allowed
def classify_gear_violations(
mmsi: str,
gear_type: str,
@ -140,6 +193,9 @@ def classify_gear_violations(
is_permitted: bool,
gear_episodes: Optional[list[dict]] = None,
gear_positions: Optional[list[tuple[float, float]]] = None,
permit_periods: Optional[list[tuple[datetime, datetime]]] = None,
registered_fishery_code: Optional[str] = None,
observation_ts: Optional[datetime] = None,
) -> dict:
"""어구 위반 G코드 분류 메인 함수 (DAR-03).
@ -197,6 +253,52 @@ def classify_gear_violations(
mmsi, zone, gear_type, allowed_gears,
)
# ── G-02: 금어기 조업 ────────────────────────────────────────
if permit_periods:
try:
in_closed = _is_in_closed_season(observation_ts, permit_periods)
except Exception as exc:
logger.error('G-02 평가 실패 [mmsi=%s]: %s', mmsi, exc)
in_closed = False
if in_closed:
g_codes.append('G-02')
score += G02_SCORE
evidence['G-02'] = {
'observed_at': observation_ts.isoformat() if observation_ts else None,
'permit_periods': [
[s.isoformat(), e.isoformat()] for s, e in permit_periods
],
}
if not judgment:
judgment = 'CLOSED_SEASON_FISHING'
logger.debug('G-02 탐지 [mmsi=%s] ts=%s', mmsi, observation_ts)
# ── G-03: 미등록/허가외 어구 ──────────────────────────────────
if registered_fishery_code:
try:
unregistered = _is_unregistered_gear(gear_type, registered_fishery_code)
except Exception as exc:
logger.error('G-03 평가 실패 [mmsi=%s]: %s', mmsi, exc)
unregistered = False
if unregistered:
g_codes.append('G-03')
score += G03_SCORE
evidence['G-03'] = {
'detected_gear': gear_type,
'registered_fishery_code': registered_fishery_code,
'allowed_gears': sorted(
FISHERY_CODE_ALLOWED_GEAR.get(
registered_fishery_code.upper().strip(), set()
)
),
}
if not judgment:
judgment = 'UNREGISTERED_GEAR'
logger.debug(
'G-03 탐지 [mmsi=%s] detected=%s registered=%s',
mmsi, gear_type, registered_fishery_code,
)
# ── G-04: MMSI 조작 의심 (고정어구 신호 on/off 반복) ───────────
if gear_episodes is not None and gear_type in FIXED_GEAR_TYPES:
try:

파일 보기

@ -37,15 +37,30 @@ logger = logging.getLogger(__name__)
# 상수
# ──────────────────────────────────────────────────────────────
PROXIMITY_NM = 0.27 # 500m ≈ 0.27 NM
SOG_DELTA_MAX = 0.5 # kn
COG_DELTA_MAX = 10.0 # degrees
SOG_MIN = 2.0 # kn (조업 속력 하한)
SOG_MAX = 4.0 # kn (조업 속력 상한)
MIN_SYNC_CYCLES = 24 # 24 × 5min = 2시간
PROXIMITY_NM = 0.27 # 500m ≈ 0.27 NM (STRONG tier 스펙)
SOG_DELTA_MAX = 0.5 # kn (STRONG)
COG_DELTA_MAX = 10.0 # degrees (STRONG)
SOG_MIN = 2.0 # kn (조업 속력 하한 — STRONG)
SOG_MAX = 4.0 # kn (조업 속력 상한 — STRONG)
MIN_SYNC_CYCLES = 24 # 24 × 5min = 2시간 (STRONG)
SIMULTANEOUS_GAP_MIN = 30 # 동시 AIS 차단 기준 (분)
CYCLE_INTERVAL_MIN = 5 # 5분 리샘플 데이터
# Tier별 완화 임계 — AND 게이트로 0건 만들지 않고 신호 강도에 맞춰 분류
# STRONG : 스펙 100% (2h 연속 + 전 조건)
# PROBABLE : 1h 연속 또는 누적 2h + sync_ratio 0.6
# SUSPECT : 30분+ + sync_ratio 0.3 (약한 신호, 플래그만)
PROBABLE_MIN_BLOCK_CYCLES = 12 # 1h
PROBABLE_MIN_SYNC_RATIO = 0.6
SUSPECT_MIN_BLOCK_CYCLES = 6 # 30min
SUSPECT_MIN_SYNC_RATIO = 0.3
# 완화 tier 기준: proximity 800m, SOG 1.5-5.0kn, sog_delta 1.0, cog 20°
PROBABLE_PROXIMITY_NM = 0.43 # ≈ 800m
PROBABLE_SOG_DELTA_MAX = 1.0
PROBABLE_COG_DELTA_MAX = 20.0
PROBABLE_SOG_MIN = 1.5
PROBABLE_SOG_MAX = 5.0
# scan_unregistered_pairs 전용
CELL_SIZE = 0.01 # ~1.1km 격자
CANDIDATE_PROXIMITY_FACTOR = 2.0 # 후보 탐색 반경: PROXIMITY_NM × 2
@ -110,9 +125,11 @@ def _cell_key(lat: float, lon: float) -> tuple[int, int]:
return (round(lat / CELL_SIZE), round(lon / CELL_SIZE))
def _default_result(mmsi_b: str) -> dict:
def _default_result(mmsi_b: str, reject_reason: str = '') -> dict:
return {
'pair_detected': False,
'tier': None,
'reject_reason': reject_reason,
'sync_duration_min': 0.0,
'max_sync_block_min': 0.0,
'mean_separation_nm': 0.0,
@ -125,6 +142,20 @@ def _default_result(mmsi_b: str) -> dict:
}
# 사이클별 거부 사유 카운터 (scheduler 가 읽어 로그 출력 후 reset)
REJECT_COUNTERS: dict[str, int] = {
'empty_df': 0,
'missing_columns': 0,
'insufficient_aligned': 0,
'no_sync_at_any_tier': 0,
}
def reset_reject_counters() -> None:
for k in REJECT_COUNTERS:
REJECT_COUNTERS[k] = 0
# ──────────────────────────────────────────────────────────────
# 공개 API
# ──────────────────────────────────────────────────────────────
@ -151,42 +182,48 @@ def detect_pair_trawl(
Returns:
필드들 + role_a/role_b/similarity/bonus/pair_type
"""
required_cols = {'timestamp', 'lat', 'lon', 'sog', 'cog'}
if df_a.empty or df_b.empty:
REJECT_COUNTERS['empty_df'] += 1
logger.debug('pair_trawl(%s, %s): empty DataFrame', mmsi_a, mmsi_b)
return _default_result(mmsi_b)
return _default_result(mmsi_b, 'empty_df')
df_a = _ensure_sog_cog(df_a)
df_b = _ensure_sog_cog(df_b)
join_key = _pair_join_key(df_a)
required_cols = {join_key, 'lat', 'lon', 'sog', 'cog'}
missing_a = required_cols - set(df_a.columns)
missing_b = required_cols - set(df_b.columns)
if missing_a or missing_b:
REJECT_COUNTERS['missing_columns'] += 1
logger.warning(
'pair_trawl(%s, %s): missing columns a=%s b=%s',
mmsi_a, mmsi_b, missing_a, missing_b,
)
return _default_result(mmsi_b)
return _default_result(mmsi_b, 'missing_columns')
# ── Step 1: timestamp inner join ────────────────────────
a = df_a[['timestamp', 'lat', 'lon', 'sog', 'cog']].copy()
b = df_b[['timestamp', 'lat', 'lon', 'sog', 'cog']].copy()
a['timestamp'] = pd.to_datetime(a['timestamp'])
b['timestamp'] = pd.to_datetime(b['timestamp'])
# ── Step 1: join_key (time_bucket 우선, fallback timestamp) inner join ──
a = df_a[[join_key, 'lat', 'lon', 'sog', 'cog']].copy()
b = df_b[[join_key, 'lat', 'lon', 'sog', 'cog']].copy()
# 같은 bucket 에 다수 샘플이 있으면 평균
a = a.groupby(join_key, as_index=False).mean(numeric_only=True)
b = b.groupby(join_key, as_index=False).mean(numeric_only=True)
merged = pd.merge(
a.rename(columns={'lat': 'lat_a', 'lon': 'lon_a', 'sog': 'sog_a', 'cog': 'cog_a'}),
b.rename(columns={'lat': 'lat_b', 'lon': 'lon_b', 'sog': 'sog_b', 'cog': 'cog_b'}),
on='timestamp',
on=join_key,
how='inner',
).sort_values('timestamp').reset_index(drop=True)
).sort_values(join_key).reset_index(drop=True)
total_aligned = len(merged)
if total_aligned < MIN_SYNC_CYCLES:
# SUSPECT tier 조차 성립 불가 (30min = 6 cycles)
if total_aligned < SUSPECT_MIN_BLOCK_CYCLES:
REJECT_COUNTERS['insufficient_aligned'] += 1
logger.debug(
'pair_trawl(%s, %s): only %d aligned rows (need %d)',
mmsi_a, mmsi_b, total_aligned, MIN_SYNC_CYCLES,
'pair_trawl(%s, %s): only %d aligned rows (need %d for SUSPECT)',
mmsi_a, mmsi_b, total_aligned, SUSPECT_MIN_BLOCK_CYCLES,
)
return _default_result(mmsi_b)
return _default_result(mmsi_b, 'insufficient_aligned')
# ── Step 2: 행별 동기화 지표 계산 ───────────────────────
merged['distance_nm'] = merged.apply(
@ -198,26 +235,53 @@ def detect_pair_trawl(
lambda r: _cog_delta(r['cog_a'], r['cog_b']),
axis=1,
)
merged['both_in_range'] = (
merged['sog_a'].between(SOG_MIN, SOG_MAX)
& merged['sog_b'].between(SOG_MIN, SOG_MAX)
)
merged['synced'] = (
# STRONG tier: 스펙 100% — 500m / SOG 2-4 / sog_delta 0.5 / cog 10° / 24 cycles 연속
merged['synced_strong'] = (
(merged['distance_nm'] <= PROXIMITY_NM)
& (merged['sog_delta'] <= SOG_DELTA_MAX)
& (merged['cog_delta'] <= COG_DELTA_MAX)
& merged['both_in_range']
& merged['sog_a'].between(SOG_MIN, SOG_MAX)
& merged['sog_b'].between(SOG_MIN, SOG_MAX)
)
# PROBABLE tier: 완화 — 800m / SOG 1.5-5 / sog_delta 1.0 / cog 20° / 12 cycles 연속
merged['synced_probable'] = (
(merged['distance_nm'] <= PROBABLE_PROXIMITY_NM)
& (merged['sog_delta'] <= PROBABLE_SOG_DELTA_MAX)
& (merged['cog_delta'] <= PROBABLE_COG_DELTA_MAX)
& merged['sog_a'].between(PROBABLE_SOG_MIN, PROBABLE_SOG_MAX)
& merged['sog_b'].between(PROBABLE_SOG_MIN, PROBABLE_SOG_MAX)
)
# ── Step 3: 연속 블록 탐지 ──────────────────────────────
max_block_cycles = _max_sync_block(merged['synced'])
if max_block_cycles < MIN_SYNC_CYCLES:
logger.debug(
'pair_trawl(%s, %s): max sync block %d cycles < %d required',
mmsi_a, mmsi_b, max_block_cycles, MIN_SYNC_CYCLES,
)
return _default_result(mmsi_b)
# ── Step 3: tier 분류 — 가장 높은 tier 채택 ──────────────
strong_block = _max_sync_block(merged['synced_strong'])
probable_block = _max_sync_block(merged['synced_probable'])
strong_total = int(merged['synced_strong'].sum())
probable_total = int(merged['synced_probable'].sum())
strong_ratio = strong_total / total_aligned if total_aligned else 0.0
probable_ratio = probable_total / total_aligned if total_aligned else 0.0
tier: Optional[str] = None
if strong_block >= MIN_SYNC_CYCLES:
tier = 'STRONG'
used_col = 'synced_strong'
max_block_cycles = strong_block
elif probable_block >= PROBABLE_MIN_BLOCK_CYCLES and probable_ratio >= PROBABLE_MIN_SYNC_RATIO:
tier = 'PROBABLE'
used_col = 'synced_probable'
max_block_cycles = probable_block
elif probable_block >= SUSPECT_MIN_BLOCK_CYCLES and probable_ratio >= SUSPECT_MIN_SYNC_RATIO:
tier = 'SUSPECT'
used_col = 'synced_probable'
max_block_cycles = probable_block
else:
REJECT_COUNTERS['no_sync_at_any_tier'] += 1
logger.debug(
'pair_trawl(%s, %s): no tier — strong_block=%d probable_block=%d probable_ratio=%.2f',
mmsi_a, mmsi_b, strong_block, probable_block, probable_ratio,
)
return _default_result(mmsi_b, 'no_sync_at_any_tier')
merged['synced'] = merged[used_col]
total_synced = int(merged['synced'].sum())
sync_duration_min = total_synced * CYCLE_INTERVAL_MIN
max_sync_block_min = max_block_cycles * CYCLE_INTERVAL_MIN
@ -267,15 +331,17 @@ def detect_pair_trawl(
bonus = 15; pair_type = 'TRANSSHIP_LIKE'
logger.info(
'pair_trawl(%s, %s): detected — sync=%.0fmin max_block=%.0fmin '
'pair_trawl(%s, %s): %s — sync=%.0fmin max_block=%.0fmin '
'sep=%.3fnm confidence=%.3f bonus=%d type=%s g_codes=%s',
mmsi_a, mmsi_b,
mmsi_a, mmsi_b, tier,
sync_duration_min, max_sync_block_min,
mean_separation_nm, confidence, bonus, pair_type, g_codes,
)
return {
'pair_detected': True,
'tier': tier,
'reject_reason': '',
'sync_duration_min': round(sync_duration_min, 1),
'max_sync_block_min': round(max_sync_block_min, 1),
'mean_separation_nm': round(mean_separation_nm, 4),
@ -321,6 +387,35 @@ def _classify_role(
return 'UNKNOWN'
def _ensure_sog_cog(df: pd.DataFrame) -> pd.DataFrame:
"""sog/cog 컬럼이 없으면 lat/lon/timestamp 로 haversine 계산하여 추가.
pair_trawl vessel_store._tracks(raw_sog 보유) 전체를 pool 받는다.
select_analysis_targets 결과에 의존하면 중국 412* 선박(한국 440xxx )
pool 에서 빠지므로, 필요 on-demand 계산한다.
"""
if 'sog' in df.columns and 'cog' in df.columns:
return df
if 'timestamp' not in df.columns or 'lat' not in df.columns or 'lon' not in df.columns:
return df
try:
# cache.vessel_store._compute_sog_cog 를 재사용
from cache.vessel_store import _compute_sog_cog
return _compute_sog_cog(df)
except Exception:
return df
def _pair_join_key(df: pd.DataFrame) -> str:
"""두 선박 궤적 inner-join 시 사용할 시간 키.
vessel_store._tracks raw AIS timestamp(ms 단위) 사용하므로 선박
동일 timestamp 우연히 일치할 확률이 거의 0 inner join 결과 빈약.
5 리샘플 단위 time_bucket 컬럼이 있으면 그것을 사용.
"""
return 'time_bucket' if 'time_bucket' in df.columns else 'timestamp'
def _trajectory_similarity(
df_a: pd.DataFrame, df_b: pd.DataFrame, min_samples: int = 6,
) -> tuple[float, int, dict]:
@ -331,18 +426,22 @@ def _trajectory_similarity(
"""
if df_a.empty or df_b.empty:
return 0.0, 0, {}
cols = {'timestamp', 'lat', 'lon', 'sog', 'cog'}
df_a = _ensure_sog_cog(df_a)
df_b = _ensure_sog_cog(df_b)
join_key = _pair_join_key(df_a)
cols = {join_key, 'lat', 'lon', 'sog', 'cog'}
if cols - set(df_a.columns) or cols - set(df_b.columns):
return 0.0, 0, {}
try:
a = df_a[['timestamp', 'lat', 'lon', 'sog', 'cog']].copy()
b = df_b[['timestamp', 'lat', 'lon', 'sog', 'cog']].copy()
a['timestamp'] = pd.to_datetime(a['timestamp'])
b['timestamp'] = pd.to_datetime(b['timestamp'])
a = df_a[[join_key, 'lat', 'lon', 'sog', 'cog']].copy()
b = df_b[[join_key, 'lat', 'lon', 'sog', 'cog']].copy()
# 같은 join_key 에 다수 샘플이 있을 수 있으므로 평균값으로 bucketize
a = a.groupby(join_key, as_index=False).mean(numeric_only=True)
b = b.groupby(join_key, as_index=False).mean(numeric_only=True)
m = pd.merge(
a.rename(columns={'lat': 'la', 'lon': 'oa', 'sog': 'sa', 'cog': 'ca'}),
b.rename(columns={'lat': 'lb', 'lon': 'ob', 'sog': 'sb', 'cog': 'cb'}),
on='timestamp', how='inner',
on=join_key, how='inner',
)
except Exception:
return 0.0, 0, {}
@ -390,7 +489,7 @@ def _trajectory_similarity(
# bbox 1차 탐색 반경 (도 단위)
BBOX_DEG = 0.01 # 약 1.1km — 주변 후보만 컷
SIMILARITY_PAIR = 0.70 # 유력 페어
SIMILARITY_OBSERVE = 0.50 # 관찰 페어
SIMILARITY_OBSERVE = 0.45 # 관찰 페어 (0.50 → 0.45 완화: pool 확대 후 recall 확보)
def find_pair_candidates(

파일 보기

@ -23,13 +23,61 @@ GEAR_IDENTITY_LOG = qualified_table('gear_identity_log')
GEAR_CORRELATION_SCORES = qualified_table('gear_correlation_scores')
FLEET_TRACKING_SNAPSHOT = qualified_table('fleet_tracking_snapshot')
# 선박명 정규화 (중국/한국 어선 식별자 보존):
# - 중국 어선명 = 업체명(浙岭渔) + 선박번호(20865) 로 고유 식별. 번호 제거 시 동명이 수십 개 발생 → 번호 유지
# - 통일 대상: 공백/구두점, 대소문자, NO./No. prefix
_NAME_STRIP_CHARS = re.compile(r'[\s\-_./,()\[\]·•\u3000#]+')
_NAME_STRIP_PREFIX_NUMBER_MARKER = re.compile(r'\bNO\.?\s*', re.IGNORECASE)
_PERIOD_RANGE_PATTERN = re.compile(
r'(\d{4})[/\-.](\d{1,2})[/\-.](\d{1,2})\s*[-~]\s*(\d{4})[/\-.](\d{1,2})[/\-.](\d{1,2})'
)
def _parse_period_range(raw: str) -> Optional[tuple[datetime, datetime]]:
"""fishing_period 'YYYY/MM/DD - YYYY/MM/DD' 포맷을 파싱.
실패 None. 구분자는 / - . 모두 허용, 연결자는 - ~ 허용.
"""
if not raw:
return None
m = _PERIOD_RANGE_PATTERN.search(raw)
if not m:
return None
try:
y1, m1, d1, y2, m2, d2 = (int(x) for x in m.groups())
start = datetime(y1, m1, d1, 0, 0, 0)
end = datetime(y2, m2, d2, 23, 59, 59)
if end < start:
return None
return (start, end)
except (ValueError, TypeError):
return None
def _normalize_vessel_name(name: Optional[str]) -> str:
"""선박명을 매칭용으로 정규화.
1. upper() + strip
2. 'No.123' / 'NO 123' 번호 마커만 제거 (숫자 자체는 고유 식별자로 유지)
3. 공백/구두점/중간점/전각공백/#/ 제거
"""
if not name:
return ''
s = name.strip().upper()
s = _NAME_STRIP_PREFIX_NUMBER_MARKER.sub('', s)
s = _NAME_STRIP_CHARS.sub('', s)
return s
class FleetTracker:
def __init__(self) -> None:
self._companies: dict[int, dict] = {} # id → {name_cn, name_en}
self._vessels: dict[int, dict] = {} # id → {permit_no, name_cn, ...}
self._name_cn_map: dict[str, int] = {} # name_cn → vessel_id
self._name_en_map: dict[str, int] = {} # name_en(lowercase) → vessel_id
self._name_cn_map: dict[str, int] = {} # name_cn → vessel_id (정확일치)
self._name_en_map: dict[str, int] = {} # name_en(lowercase) → vessel_id (정확일치)
self._name_fuzzy_map: dict[str, list[int]] = {} # 정규화 이름 → [vessel_id, ...]
self._mmsi_to_vid: dict[str, int] = {} # mmsi → vessel_id (매칭된 것만)
self._gear_active: dict[str, dict] = {} # mmsi → {name, parent_mmsi, ...}
self._last_registry_load: float = 0.0
@ -46,7 +94,7 @@ class FleetTracker:
# 현재 연도 허가만 조회 (연단위 갱신 정책, permit_year NULL은 legacy 허용)
cur.execute(
f"""SELECT id, company_id, permit_no, name_cn, name_en, tonnage,
gear_code, fleet_role, pair_vessel_id, mmsi
gear_code, fleet_role, pair_vessel_id, mmsi, fishery_code
FROM {FLEET_VESSELS}
WHERE permit_year = EXTRACT(YEAR FROM now())::int
OR permit_year IS NULL"""
@ -54,6 +102,7 @@ class FleetTracker:
self._vessels = {}
self._name_cn_map = {}
self._name_en_map = {}
self._name_fuzzy_map = {}
self._mmsi_to_vid = {}
for r in cur.fetchall():
@ -69,12 +118,17 @@ class FleetTracker:
'fleet_role': r[7],
'pair_vessel_id': r[8],
'mmsi': r[9],
'fishery_code': r[10],
}
self._vessels[vid] = v
if r[3]:
self._name_cn_map[r[3]] = vid
if r[4]:
self._name_en_map[r[4].lower().strip()] = vid
# FUZZY 매칭은 name_en 만 대상 (AIS 가 보고하는 이름은 영문이 주류)
key_en = _normalize_vessel_name(r[4])
if key_en:
self._name_fuzzy_map.setdefault(key_en, []).append(vid)
if r[9]:
self._mmsi_to_vid[r[9]] = vid
@ -141,6 +195,61 @@ class FleetTracker:
logger.warning('get_gear_episodes 실패 [mmsi=%s]: %s', mmsi, exc)
return []
def get_registered_fishery_code(self, mmsi: str) -> Optional[str]:
"""fleet_vessels 에 등록된 선박의 fishery_code (PT/PT-S/GN/PS/OT/FC).
V029 이후 fishery_code 컬럼을 우선 참조. legacy gear_code(C21 ) 별도 경로.
"""
vid = self._mmsi_to_vid.get(mmsi)
if vid is None:
return None
v = self._vessels.get(vid, {})
return v.get('fishery_code') or None
def get_permit_periods(
self, mmsi: str, conn, year: Optional[int] = None,
) -> list[tuple[datetime, datetime]]:
"""선박의 허가 조업 기간을 파싱하여 [(start, end), ...] 반환.
fishery_permit_cn.fishing_period_1/2 'YYYY/MM/DD - YYYY/MM/DD' 포맷을 파싱.
'-' (미사용) 또는 파싱 실패 해당 구간 생략. G-02 금어기 판정에 사용.
"""
vid = self._mmsi_to_vid.get(mmsi)
if vid is None:
return []
v = self._vessels.get(vid)
if not v:
return []
permit_no = v.get('permit_no')
target_year = year or datetime.now().year
if not permit_no:
return []
try:
cur = conn.cursor()
cur.execute(
"""SELECT fishing_period_1, fishing_period_2
FROM kcg.fishery_permit_cn
WHERE permit_year = %s AND permit_no = %s""",
(target_year, permit_no),
)
row = cur.fetchone()
cur.close()
except Exception as exc:
logger.warning('get_permit_periods DB 실패 [mmsi=%s]: %s', mmsi, exc)
return []
if not row:
return []
periods: list[tuple[datetime, datetime]] = []
for raw in row:
if not raw or raw.strip() in ('-', ''):
continue
parsed = _parse_period_range(raw)
if parsed:
periods.append(parsed)
return periods
def get_gear_positions(
self, mmsi: str, df_vessel: Optional[pd.DataFrame] = None,
) -> list[tuple[float, float]]:
@ -165,7 +274,8 @@ class FleetTracker:
ais_vessels: [{mmsi, name, lat, lon, sog, cog}, ...]
"""
cur = conn.cursor()
matched = 0
matched_exact = 0
matched_fuzzy = 0
for v in ais_vessels:
mmsi = v.get('mmsi', '')
@ -185,22 +295,47 @@ class FleetTracker:
vid: Optional[int] = self._name_cn_map.get(name)
if not vid:
vid = self._name_en_map.get(name.lower().strip())
method = 'NAME_EXACT'
confidence = 0.95
# NAME_FUZZY 매칭 (정규화 후 lookup)
if not vid:
key = _normalize_vessel_name(name)
if key:
candidates = self._name_fuzzy_map.get(key, [])
# 이미 다른 MMSI에 할당된 vid 제외 → 동명이 중복 매칭 방지
unassigned = [
c for c in candidates
if not self._vessels.get(c, {}).get('mmsi')
or self._vessels[c].get('mmsi') == mmsi
]
if len(unassigned) == 1:
vid = unassigned[0]
method = 'NAME_FUZZY'
confidence = 0.80
if vid:
cur.execute(
f"""UPDATE {FLEET_VESSELS}
SET mmsi = %s, match_confidence = 0.95, match_method = 'NAME_EXACT',
SET mmsi = %s, match_confidence = %s, match_method = %s,
last_seen_at = NOW(), updated_at = NOW()
WHERE id = %s AND (mmsi IS NULL OR mmsi = %s)""",
(mmsi, vid, mmsi),
(mmsi, confidence, method, vid, mmsi),
)
self._mmsi_to_vid[mmsi] = vid
matched += 1
self._vessels[vid]['mmsi'] = mmsi
if method == 'NAME_FUZZY':
matched_fuzzy += 1
else:
matched_exact += 1
conn.commit()
cur.close()
if matched > 0:
logger.info('AIS→registry matched: %d vessels', matched)
if matched_exact or matched_fuzzy:
logger.info(
'AIS→registry matched: exact=%d, fuzzy=%d',
matched_exact, matched_fuzzy,
)
def track_gear_identity(self, gear_signals: list[dict], conn) -> None:
"""어구/어망 정체성 추적.
@ -242,13 +377,19 @@ class FleetTracker:
if not is_trackable_parent_name(effective_parent_name):
continue
# 모선 매칭
# 모선 매칭 (EXACT → FUZZY 순)
parent_mmsi: Optional[str] = None
parent_vid: Optional[int] = None
if parent_name:
vid = self._name_cn_map.get(parent_name)
if not vid:
vid = self._name_en_map.get(parent_name.lower())
if not vid:
key = _normalize_vessel_name(parent_name)
if key:
candidates = self._name_fuzzy_map.get(key, [])
if len(candidates) == 1:
vid = candidates[0]
if vid:
parent_vid = vid
parent_mmsi = self._vessels[vid].get('mmsi')

파일 보기

@ -58,9 +58,12 @@ def classify_violations(result: dict) -> list[str]:
if transship:
violations.append('ILLEGAL_TRANSSHIP')
# 어구 불법 (gear_judgment은 classify_gear_violations()로 채워짐: G-01/G-04/G-05/G-06)
# 어구 불법 (gear_judgment은 classify_gear_violations()로 채워짐: G-01~G-06)
gear_judgment = result.get('gear_judgment', '') or ''
if gear_judgment in ('NO_PERMIT', 'GEAR_MISMATCH', 'ZONE_VIOLATION', 'SEASON_VIOLATION', 'PAIR_TRAWL'):
if gear_judgment in (
'NO_PERMIT', 'GEAR_MISMATCH', 'ZONE_VIOLATION', 'SEASON_VIOLATION',
'PAIR_TRAWL', 'CLOSED_SEASON_FISHING', 'UNREGISTERED_GEAR',
):
violations.append('ILLEGAL_GEAR')
# 위험 행동 (다른 위반 없이 고위험)

파일 보기

@ -131,16 +131,26 @@ def run_analysis_cycle():
with kcgdb.get_conn() as kcg_conn:
fleet_tracker.load_registry(kcg_conn)
# 등록 선단 매칭은 classification 통과자(500척) 만이 아닌
# 전체 중국 MID(412/413/414) 활성 선박을 대상으로 한다.
# vessel_dfs 에만 돌리면 허가선 906척 중 95% AIS 존재에도 매칭률이 크게 낮아짐.
all_ais = []
for mmsi, df in vessel_dfs.items():
if len(df) > 0:
last = df.iloc[-1]
all_ais.append({
'mmsi': mmsi,
'name': vessel_store.get_vessel_info(mmsi).get('name', ''),
'lat': float(last['lat']),
'lon': float(last['lon']),
})
all_tracks = getattr(vessel_store, '_tracks', {}) or vessel_dfs
for mmsi, df in all_tracks.items():
if not (mmsi.startswith('412') or mmsi.startswith('413') or mmsi.startswith('414')):
continue
if df is None or len(df) == 0:
continue
name = vessel_store.get_vessel_info(mmsi).get('name', '')
if not name:
continue
last = df.iloc[-1]
all_ais.append({
'mmsi': mmsi,
'name': name,
'lat': float(last['lat']),
'lon': float(last['lon']),
})
fleet_tracker.match_ais_to_registry(all_ais, kcg_conn)
@ -212,16 +222,38 @@ def run_analysis_cycle():
pt_sub_registered: set[str] = set() # TODO: fishery_code=PT-S 구분
base_mmsis: set[str] = {c['mmsi'] for c in classifications}
base_mmsis |= pt_registered
# pool은 전체 24h 누적 tracks (중국 어선 중심 8k+ 선박)
# pool 은 전체 24h 누적 tracks (중국 8k+ 한국/러시아 포함 55k).
# sog/cog 미계산 상태여도 _trajectory_similarity 내부에서 on-demand 계산.
pool_tracks = getattr(vessel_store, '_tracks', {}) or vessel_dfs
# 조업 속력대(1.5~5.0kn)에서 움직이는 모든 중국 선박을 base로 확장.
# 중국 MID: 412(본토) / 413(홍콩) / 414(마카오)
for mmsi, df in pool_tracks.items():
if not (mmsi.startswith('412') or mmsi.startswith('413') or mmsi.startswith('414')):
continue
if df is None or df.empty:
continue
sog_col = 'sog' if 'sog' in df.columns else ('raw_sog' if 'raw_sog' in df.columns else None)
if sog_col is None:
continue
try:
mean_sog = float(df[sog_col].tail(12).mean())
if 1.5 <= mean_sog <= 5.0:
base_mmsis.add(mmsi)
except Exception:
continue
pair_candidates = find_pair_candidates(
base_mmsis=base_mmsis,
vessel_dfs=pool_tracks,
get_vessel_info=vessel_store.get_vessel_info,
pt_registered=pt_registered,
pt_sub_registered=pt_sub_registered,
min_common_samples=4,
)
from algorithms.pair_trawl import REJECT_COUNTERS, reset_reject_counters
reset_reject_counters()
tier_counts = {'STRONG': 0, 'PROBABLE': 0, 'SUSPECT': 0}
pt_det = 0; coop_det = 0
for cand in pair_candidates:
ma, mb = cand['base_mmsi'], cand['target_mmsi']
@ -234,6 +266,8 @@ def run_analysis_cycle():
)
if not result.get('pair_detected'):
continue
tier = result.get('tier') or 'UNKNOWN'
tier_counts[tier] = tier_counts.get(tier, 0) + 1
pair_results[ma] = {**result, 'pair_mmsi': mb}
pair_results[mb] = {**result, 'pair_mmsi': ma}
if result.get('pair_type') == 'PT_REGISTERED':
@ -241,8 +275,14 @@ def run_analysis_cycle():
elif result.get('pair_type') == 'COOP_FISHING':
coop_det += 1
logger.info(
'pair detection: candidates=%d, detected=%d (pt=%d, coop=%d)',
len(pair_candidates), len(pair_results) // 2, pt_det, coop_det,
'pair detection: candidates=%d, detected=%d '
'(STRONG=%d PROBABLE=%d SUSPECT=%d, pt=%d coop=%d) '
'reject={empty=%d miss=%d insuf_align=%d no_sync=%d}',
len(pair_candidates), len(pair_results) // 2,
tier_counts['STRONG'], tier_counts['PROBABLE'], tier_counts['SUSPECT'],
pt_det, coop_det,
REJECT_COUNTERS['empty_df'], REJECT_COUNTERS['missing_columns'],
REJECT_COUNTERS['insufficient_aligned'], REJECT_COUNTERS['no_sync_at_any_tier'],
)
except Exception as e:
logger.warning('pair detection failed: %s', e)
@ -344,12 +384,18 @@ def run_analysis_cycle():
pair_result = pair_results.get(mmsi)
if pair_result and not pair_result.get('pair_detected'):
pair_result = None
# G-06 판정은 pair_type='PT_REGISTERED' 또는 엄격 sync 조건 만족일 때만
if pair_result and pair_result.get('pair_type') not in ('PT_REGISTERED', 'TRANSSHIP_LIKE', 'COOP_FISHING', 'GENERIC'):
pair_result = None
# G-06 판정은 STRONG/PROBABLE tier + 유효 pair_type 만. SUSPECT 는 플래그만 유지.
if pair_result:
if pair_result.get('tier') not in ('STRONG', 'PROBABLE'):
pair_result = None
elif pair_result.get('pair_type') not in (
'PT_REGISTERED', 'TRANSSHIP_LIKE', 'COOP_FISHING', 'GENERIC',
):
pair_result = None
gear_episodes: list = []
gear_positions: list = []
permit_periods: list = []
if gear in ('GN', 'TRAP', 'FYK', 'FPO', 'GNS', 'GND'):
try:
with kcgdb.get_conn() as gv_conn:
@ -358,12 +404,32 @@ def run_analysis_cycle():
except Exception as e:
logger.debug('gear episode/pos 조회 실패 [%s]: %s', mmsi, e)
# G-02/G-03 입력: 허가 조업 기간 + 등록 업종코드 (등록 매칭된 선박만 대상)
registered_fishery_code = fleet_tracker.get_registered_fishery_code(mmsi)
if registered_fishery_code:
try:
with kcgdb.get_conn() as gp_conn:
permit_periods = fleet_tracker.get_permit_periods(mmsi, gp_conn)
except Exception as e:
logger.debug('permit_periods 조회 실패 [%s]: %s', mmsi, e)
observation_ts = ts if isinstance(ts, datetime) else None
if observation_ts is None and ts is not None:
try:
import pandas as pd
observation_ts = pd.to_datetime(ts).to_pydatetime()
except Exception:
observation_ts = None
gv = classify_gear_violations(
mmsi=mmsi, gear_type=gear, zone_info=zone_info,
df_vessel=df_v, pair_result=pair_result,
is_permitted=is_permitted,
gear_episodes=gear_episodes or None,
gear_positions=gear_positions or None,
permit_periods=permit_periods or None,
registered_fishery_code=registered_fishery_code,
observation_ts=observation_ts,
)
g_codes = gv['g_codes']
gear_judgment = gv['gear_judgment']
@ -381,6 +447,9 @@ def run_analysis_cycle():
elif final_risk >= 30:
final_risk_level = 'MEDIUM'
# pair_result 는 STRONG/PROBABLE 필터링으로 SUSPECT 는 None.
# SUSPECT tier 까지 통계로 남기려면 raw pair 결과도 조회.
raw_pair = pair_results.get(mmsi) or {}
merged_features = {
**(c.get('features', {}) or {}),
**dark_features,
@ -389,6 +458,12 @@ def run_analysis_cycle():
'gear_violation_evidence': gear_violation_evidence,
'pair_trawl_detected': bool(pair_result and pair_result.get('pair_detected')),
'pair_trawl_pair_mmsi': (pair_result or {}).get('pair_mmsi', ''),
'pair_tier': raw_pair.get('tier') or '',
'pair_type': raw_pair.get('pair_type') or '',
'pair_reject_reason': raw_pair.get('reject_reason') or '',
'similarity': raw_pair.get('similarity', 0),
'confidence': raw_pair.get('confidence', 0),
'registered_fishery_code': registered_fishery_code or '',
}
results.append(AnalysisResult(

파일 보기

@ -203,9 +203,10 @@ ORDER BY risk_score DESC LIMIT 20;
SQL
echo ""
echo "--- 4-4. G-06 쌍끌이 공조 탐지 ---"
echo "--- 4-4. G-06 쌍끌이 공조 탐지 (tier 포함) ---"
$PSQL_TABLE << 'SQL'
SELECT mmsi, zone_code, vessel_type, risk_score,
features->>'pair_tier' tier,
(features->'gear_violation_evidence'->'G-06'->>'sync_duration_min') sync_min,
(features->'gear_violation_evidence'->'G-06'->>'mean_separation_nm') sep_nm,
(features->'gear_violation_evidence'->'G-06'->>'pair_mmsi') pair_mmsi,
@ -216,6 +217,50 @@ WHERE analyzed_at > now() - interval '5 minutes'
ORDER BY risk_score DESC LIMIT 20;
SQL
echo ""
echo "--- 4-4.1 pair_trawl tier 분포 (DAR-03 신호 강도별) ---"
$PSQL_TABLE << 'SQL'
SELECT coalesce(features->>'pair_tier', '(none)') tier,
count(*) cnt,
round(avg((features->>'similarity')::numeric)::numeric, 3) avg_sim,
round(avg((features->'gear_violation_evidence'->'G-06'->>'sync_duration_min')::numeric)::numeric, 1) avg_sync_min
FROM kcg.vessel_analysis_results
WHERE analyzed_at > now() - interval '5 minutes'
AND features->>'pair_trawl_detected' = 'true'
GROUP BY tier ORDER BY cnt DESC;
SQL
echo ""
echo "--- 4-4.2 pair detection reject 사유 (최근 5분 로그) ---"
ssh redis-211 "sudo journalctl -u kcg-ai-prediction --no-pager --since '5 minutes ago' | grep -oE 'pair detection:[^$]+reject=\{[^}]+\}' | tail -5" 2>/dev/null || true
echo ""
echo "--- 4-4.3 G-02 금어기 조업 탐지 ---"
$PSQL_TABLE << 'SQL'
SELECT mmsi, zone_code, vessel_type, risk_score,
features->>'g_codes' g_codes,
(features->'gear_violation_evidence'->'G-02'->>'observed_at') observed_at,
features->>'registered_fishery_code' fishery_code
FROM kcg.vessel_analysis_results
WHERE analyzed_at > now() - interval '5 minutes'
AND features->>'g_codes' LIKE '%G-02%'
ORDER BY risk_score DESC LIMIT 15;
SQL
echo ""
echo "--- 4-4.4 G-03 미등록/허가외 어구 탐지 ---"
$PSQL_TABLE << 'SQL'
SELECT mmsi, zone_code, vessel_type, risk_score,
features->>'g_codes' g_codes,
(features->'gear_violation_evidence'->'G-03'->>'detected_gear') detected,
(features->'gear_violation_evidence'->'G-03'->>'registered_fishery_code') registered,
(features->'gear_violation_evidence'->'G-03'->>'allowed_gears') allowed
FROM kcg.vessel_analysis_results
WHERE analyzed_at > now() - interval '5 minutes'
AND features->>'g_codes' LIKE '%G-03%'
ORDER BY risk_score DESC LIMIT 15;
SQL
echo ""
echo "--- 4-5. G-04 MMSI 조작 + G-05 어구 이동 ---"
$PSQL_TABLE << 'SQL'
@ -341,6 +386,30 @@ WHERE permit_year = EXTRACT(YEAR FROM now())::int
GROUP BY fishery_code ORDER BY total DESC;
SQL
echo ""
echo "--- 7.5-2b. match_method 분포 (NAME_EXACT vs NAME_FUZZY) ---"
$PSQL_TABLE << 'SQL'
SELECT coalesce(match_method, '(unmatched)') method,
count(*) cnt,
round(avg(match_confidence)::numeric, 3) avg_conf
FROM kcg.fleet_vessels
WHERE permit_year = EXTRACT(YEAR FROM now())::int
GROUP BY method ORDER BY cnt DESC;
SQL
echo ""
echo "--- 7.5-2c. fishery_code × match_method 교차 ---"
$PSQL_TABLE << 'SQL'
SELECT fishery_code,
count(*) FILTER (WHERE match_method = 'NAME_EXACT') exact,
count(*) FILTER (WHERE match_method = 'NAME_FUZZY') fuzzy,
count(*) FILTER (WHERE mmsi IS NULL) unmatched,
count(*) total
FROM kcg.fleet_vessels
WHERE permit_year = EXTRACT(YEAR FROM now())::int
GROUP BY fishery_code ORDER BY total DESC;
SQL
echo ""
echo "--- 7.5-3. vessel_analysis_results.gear_code 분포 (last 5min) ---"
$PSQL_TABLE << 'SQL'

파일 보기

@ -163,6 +163,26 @@ FROM kcg.fleet_vessels
WHERE permit_year = EXTRACT(YEAR FROM now())::int
GROUP BY fishery_code ORDER BY total DESC;
\echo
\echo === P3.5 match_method distribution (NAME_EXACT vs NAME_FUZZY) ===
SELECT coalesce(match_method, '(unmatched)') method,
count(*) cnt,
round(avg(match_confidence)::numeric, 3) avg_conf
FROM kcg.fleet_vessels
WHERE permit_year = EXTRACT(YEAR FROM now())::int
GROUP BY method ORDER BY cnt DESC;
\echo
\echo === P3.6 fishery_code × match_method cross ===
SELECT fishery_code,
count(*) FILTER (WHERE match_method = 'NAME_EXACT') exact,
count(*) FILTER (WHERE match_method = 'NAME_FUZZY') fuzzy,
count(*) FILTER (WHERE mmsi IS NULL) unmatched,
count(*) total
FROM kcg.fleet_vessels
WHERE permit_year = EXTRACT(YEAR FROM now())::int
GROUP BY fishery_code ORDER BY total DESC;
\echo
\echo === P4. vessel_analysis_results.gear_code distribution (last 1h) ===
SELECT coalesce(gear_code, '(null)') gear_code,
@ -258,6 +278,32 @@ WHERE analyzed_at > now() - interval '1 hour'
AND features->>'pair_trawl_detected' = 'true'
GROUP BY pair_type ORDER BY cnt DESC;
\echo
\echo === D3.6 pair_trawl tier distribution (signal-strength tier) ===
SELECT coalesce(features->>'pair_tier', '(none)') tier,
count(*) cnt,
round(avg((features->>'similarity')::numeric)::numeric, 3) avg_sim,
round(avg((features->'gear_violation_evidence'->'G-06'->>'sync_duration_min')::numeric)::numeric, 1) avg_sync_min,
round(avg((features->'gear_violation_evidence'->'G-06'->>'mean_separation_nm')::numeric)::numeric, 3) avg_sep_nm
FROM kcg.vessel_analysis_results
WHERE analyzed_at > now() - interval '1 hour'
AND features->>'pair_trawl_detected' = 'true'
GROUP BY tier ORDER BY cnt DESC;
\echo
\echo === D3.7 G-02 closed-season + G-03 unregistered-gear counts ===
SELECT
count(*) FILTER (WHERE features->>'g_codes' LIKE '%G-02%') g02_count,
count(*) FILTER (WHERE features->>'g_codes' LIKE '%G-03%') g03_count,
count(*) FILTER (WHERE features->>'gear_judgment' = 'CLOSED_SEASON_FISHING') judg_closed,
count(*) FILTER (WHERE features->>'gear_judgment' = 'UNREGISTERED_GEAR') judg_unreg
FROM kcg.vessel_analysis_results
WHERE analyzed_at > now() - interval '1 hour';
\echo
\echo === D3.8 pair detection reject breakdown (last 1h journal) ===
\! ssh redis-211 "sudo journalctl -u kcg-ai-prediction --no-pager --since '1 hour ago' | grep -oE 'pair detection:[^$]+reject=\{[^}]+\}' | awk -F'reject=' '{print $2}' | sort | uniq -c | sort -rn | head -10" 2>/dev/null || true
\echo
\echo === D4. G-06 pair trawl detections ===
SELECT mmsi, zone_code, vessel_type, risk_score,