fix(prediction): output 5종 이상 정상화 (stats/event/lightweight)

5가지 출력 이상 동시 해결:

1. stats_aggregator (이상 1, 5)
   - aggregate_hourly에 by_category, by_zone JSON 집계 추가
   - hour_start를 KST 기준으로 변경 (대시보드 표기와 boundary 일치)

2. event_generator 룰 정리 (이상 2, 3, 4)
   - critical_risk 임계값 90→70 (risk.py CRITICAL 분류와 일치)
   - territorial_sea_violation, contiguous_zone_high_risk, special_zone_entry 신설
     (실측 zone_code: TERRITORIAL_SEA/CONTIGUOUS_ZONE/ZONE_*)
   - 잘못된 NLL/SPECIAL_FISHING_* 룰 제거
   - HIGH_RISK_VESSEL 신규 카테고리 (50~69 MEDIUM, 70+ CRITICAL)
   - break 제거: 한 분석결과가 여러 카테고리에 동시 매칭 가능

3. dedup window prime 분산 (이상 5)
   - 30/60/120/360분 → 33/67/127/367분
   - 5분 사이클 boundary와 LCM 회피하여 정시 일제 만료 패턴 완화

4. lightweight path 신호 보강 (이상 2, 3, 4 근본 해결)
   - vessel_store._tracks의 24h 누적 궤적으로 dark/spoof/speed_jump 산출
   - 6,500 vessels(전체 93%)의 is_dark, spoofing_score가 비로소 채워짐
   - compute_lightweight_risk_score에 dark gap, spoofing 가점 추가
     (max 60→100 가능, CRITICAL 도달 가능)

시간 처리 원칙 적용:
- DB 컬럼은 모두 timestamptz 확인 완료
- aggregate_hourly KST aware datetime 사용
- pandas Timestamp는 source-internal 비교만 (안전)
This commit is contained in:
htlee 2026-04-08 15:18:18 +09:00
부모 1897ff45d3
커밋 0a4d023c76
4개의 변경된 파일153개의 추가작업 그리고 37개의 파일을 삭제

파일 보기

@ -11,10 +11,15 @@ def compute_lightweight_risk_score(
zone_info: dict, zone_info: dict,
sog: float, sog: float,
is_permitted: Optional[bool] = None, is_permitted: Optional[bool] = None,
is_dark: bool = False,
gap_duration_min: int = 0,
spoofing_score: float = 0.0,
) -> Tuple[int, str]: ) -> Tuple[int, str]:
"""위치·허가 이력 기반 경량 위험도 (파이프라인 미통과 선박용). """위치·허가·다크/스푸핑 기반 경량 위험도 (파이프라인 미통과 선박용).
pipeline path의 compute_vessel_risk_score와 동일한 임계값(70/50/30) 사용해
분류 결과의 일관성을 유지한다. dark/spoofing 신호를 추가하여 max 100 도달 가능.
compute_vessel_risk_score의 1(위치)+4(허가) 로직과 동일.
Returns: (risk_score, risk_level) Returns: (risk_score, risk_level)
""" """
score = 0 score = 0
@ -29,6 +34,19 @@ def compute_lightweight_risk_score(
if is_permitted is not None and not is_permitted: if is_permitted is not None and not is_permitted:
score += 25 score += 25
# 2. 다크 베셀 (최대 25점)
if is_dark:
if gap_duration_min >= 60:
score += 25
elif gap_duration_min >= 30:
score += 10
# 3. 스푸핑 (최대 15점)
if spoofing_score > 0.7:
score += 15
elif spoofing_score > 0.5:
score += 8
# 4. 허가 이력 (최대 20점) # 4. 허가 이력 (최대 20점)
if is_permitted is not None and not is_permitted: if is_permitted is not None and not is_permitted:
score += 20 score += 20

파일 보기

@ -19,36 +19,51 @@ logger = logging.getLogger(__name__)
EVENTS_TABLE = qualified_table('prediction_events') EVENTS_TABLE = qualified_table('prediction_events')
# 카테고리별 dedup 윈도우 (분) # 카테고리별 dedup 윈도우 (분).
# 사이클이 5분 간격이므로 5의 배수를 피해서 boundary 일제 만료 패턴을 회피한다.
DEDUP_WINDOWS = { DEDUP_WINDOWS = {
'EEZ_INTRUSION': 30, 'EEZ_INTRUSION': 33,
'DARK_VESSEL': 120, 'DARK_VESSEL': 127,
'FLEET_CLUSTER': 360, 'FLEET_CLUSTER': 367,
'ILLEGAL_TRANSSHIP': 60, 'ILLEGAL_TRANSSHIP': 67,
'MMSI_TAMPERING': 30, 'MMSI_TAMPERING': 33,
'AIS_LOSS': 120, 'AIS_LOSS': 127,
'SPEED_ANOMALY': 60, 'SPEED_ANOMALY': 67,
'ZONE_DEPARTURE': 120, 'ZONE_DEPARTURE': 127,
'GEAR_ILLEGAL': 360, 'GEAR_ILLEGAL': 367,
'AIS_RESUME': 60, 'AIS_RESUME': 67,
'HIGH_RISK_VESSEL': 67,
} }
# 이벤트 생성 룰 # 이벤트 생성 룰
# 한 분석결과가 여러 룰에 매칭되면 모두 생성한다 (카테고리별 dedup_key가 분리되어 안전).
# zone_code 실측값: EEZ_OR_BEYOND/ZONE_II/III/IV/CONTIGUOUS_ZONE/TERRITORIAL_SEA
# (algorithms.location.classify_zone 결과)
RULES = [ RULES = [
{ {
'name': 'critical_risk', # 영해 침범 — 가장 심각
'condition': lambda r: r.get('risk_score', 0) >= 90, 'name': 'territorial_sea_violation',
'condition': lambda r: r.get('zone_code') == 'TERRITORIAL_SEA',
'level': 'CRITICAL', 'level': 'CRITICAL',
'category': 'EEZ_INTRUSION', 'category': 'EEZ_INTRUSION',
'title_fn': lambda r: f"고위험 선박 탐지 (위험도 {r.get('risk_score', 0)})", 'title_fn': lambda r: f"영해 침범 탐지 (위험도 {r.get('risk_score', 0)})",
}, },
{ {
'name': 'eez_violation', # 접속수역 + 고위험
'condition': lambda r: r.get('zone_code', '') in ('NLL', 'SPECIAL_FISHING_1', 'SPECIAL_FISHING_2') 'name': 'contiguous_zone_high_risk',
and r.get('risk_score', 0) >= 70, 'condition': lambda r: r.get('zone_code') == 'CONTIGUOUS_ZONE'
'level': 'CRITICAL', and (r.get('risk_score', 0) or 0) >= 50,
'level': 'HIGH',
'category': 'EEZ_INTRUSION', 'category': 'EEZ_INTRUSION',
'title_fn': lambda r: f"EEZ 침범 탐지 ({r.get('zone_code', '')})", 'title_fn': lambda r: f"접속수역 침입 (위험도 {r.get('risk_score', 0)})",
},
{
# 종합 위험도 CRITICAL — risk.py 분류와 동일 임계값
'name': 'critical_risk',
'condition': lambda r: (r.get('risk_score', 0) or 0) >= 70,
'level': 'CRITICAL',
'category': 'HIGH_RISK_VESSEL',
'title_fn': lambda r: f"고위험 선박 탐지 (위험도 {r.get('risk_score', 0)})",
}, },
{ {
'name': 'dark_vessel_long', 'name': 'dark_vessel_long',
@ -79,10 +94,20 @@ RULES = [
'title_fn': lambda r: f"선단 밀집 감지 (클러스터 {r.get('fleet_cluster_id')})", 'title_fn': lambda r: f"선단 밀집 감지 (클러스터 {r.get('fleet_cluster_id')})",
}, },
{ {
'name': 'high_risk', # 특정수역(ZONE_*) 진입 — 운영자 모니터링용
'condition': lambda r: r.get('risk_level') == 'HIGH' and r.get('risk_score', 0) >= 60, 'name': 'special_zone_entry',
'condition': lambda r: (r.get('zone_code') or '').startswith('ZONE_')
and (r.get('risk_score', 0) or 0) >= 40,
'level': 'MEDIUM', 'level': 'MEDIUM',
'category': 'ZONE_DEPARTURE', 'category': 'ZONE_DEPARTURE',
'title_fn': lambda r: f"특정수역 진입 ({r.get('zone_code')}, 위험도 {r.get('risk_score', 0)})",
},
{
# 고위험 행동 패턴 (risk_level=HIGH 이상은 위 critical_risk가 잡고, 50~69점만 여기에)
'name': 'high_risk',
'condition': lambda r: 50 <= (r.get('risk_score', 0) or 0) < 70,
'level': 'MEDIUM',
'category': 'HIGH_RISK_VESSEL',
'title_fn': lambda r: f"위험 행동 패턴 (위험도 {r.get('risk_score', 0)})", 'title_fn': lambda r: f"위험 행동 패턴 (위험도 {r.get('risk_score', 0)})",
}, },
] ]
@ -181,7 +206,8 @@ def run_event_generator(analysis_results: list[dict]) -> dict:
dedup_key, dedup_key,
)) ))
generated += 1 generated += 1
break # 한 분석결과당 최고 우선순위 룰 1개만 # break 제거: 한 분석결과가 여러 룰에 매칭되면 모두 생성
# (카테고리별 dedup_key가 분리되어 안전)
if events_to_insert: if events_to_insert:
execute_values( execute_values(

파일 보기

@ -31,10 +31,21 @@ def _jsonb(d: dict) -> str:
def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict: def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict:
"""현재 시간 기준 hourly 집계.""" """현재 시간 기준 hourly 집계 (KST hour boundary).
now = target_hour or datetime.now(timezone.utc)
hour_start = now.replace(minute=0, second=0, microsecond=0) DB 컬럼은 모두 timestamptz이므로 aware datetime이면 안전 비교됨.
운영자/대시보드 표기와 stat_hour boundary가 일치하도록 KST 기준.
"""
if target_hour is not None:
# 외부에서 특정 시점을 지정한 경우 KST로 정규화
if target_hour.tzinfo is None:
target_hour = target_hour.replace(tzinfo=_KST)
now_kst = target_hour.astimezone(_KST)
else:
now_kst = datetime.now(_KST)
hour_start = now_kst.replace(minute=0, second=0, microsecond=0)
hour_end = hour_start + timedelta(hours=1) hour_end = hour_start + timedelta(hours=1)
updated_at = datetime.now(timezone.utc)
with get_conn() as conn: with get_conn() as conn:
cur = conn.cursor() cur = conn.cursor()
@ -55,6 +66,15 @@ def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict:
) )
by_risk = dict(cur.fetchall()) by_risk = dict(cur.fetchall())
# zone별 (vessel_analysis_results.zone_code)
cur.execute(
f"""SELECT zone_code, COUNT(*) FROM {VAR_TABLE}
WHERE analyzed_at >= %s AND analyzed_at < %s AND zone_code IS NOT NULL
GROUP BY zone_code""",
(hour_start, hour_end)
)
by_zone = dict(cur.fetchall())
# 이벤트 수 # 이벤트 수
cur.execute( cur.execute(
f"SELECT COUNT(*) FROM {EVENTS_TABLE} WHERE occurred_at >= %s AND occurred_at < %s", f"SELECT COUNT(*) FROM {EVENTS_TABLE} WHERE occurred_at >= %s AND occurred_at < %s",
@ -62,6 +82,15 @@ def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict:
) )
events = cur.fetchone()[0] or 0 events = cur.fetchone()[0] or 0
# 카테고리별 이벤트 (prediction_events.category)
cur.execute(
f"""SELECT category, COUNT(*) FROM {EVENTS_TABLE}
WHERE occurred_at >= %s AND occurred_at < %s AND category IS NOT NULL
GROUP BY category""",
(hour_start, hour_end)
)
by_category = dict(cur.fetchall())
# CRITICAL 이벤트 # CRITICAL 이벤트
cur.execute( cur.execute(
f"""SELECT COUNT(*) FROM {EVENTS_TABLE} f"""SELECT COUNT(*) FROM {EVENTS_TABLE}
@ -72,24 +101,35 @@ def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict:
cur.execute( cur.execute(
f"""INSERT INTO {STATS_HOURLY} f"""INSERT INTO {STATS_HOURLY}
(stat_hour, total_detections, by_risk_level, event_count, critical_count, updated_at) (stat_hour, total_detections, by_category, by_zone, by_risk_level,
VALUES (%s, %s, %s, %s, %s, %s) event_count, critical_count, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (stat_hour) DO UPDATE SET ON CONFLICT (stat_hour) DO UPDATE SET
total_detections = EXCLUDED.total_detections, total_detections = EXCLUDED.total_detections,
by_category = EXCLUDED.by_category,
by_zone = EXCLUDED.by_zone,
by_risk_level = EXCLUDED.by_risk_level, by_risk_level = EXCLUDED.by_risk_level,
event_count = EXCLUDED.event_count, event_count = EXCLUDED.event_count,
critical_count = EXCLUDED.critical_count, critical_count = EXCLUDED.critical_count,
updated_at = EXCLUDED.updated_at""", updated_at = EXCLUDED.updated_at""",
(hour_start, total, _jsonb(by_risk), events, critical, now) (hour_start, total, _jsonb(by_category), _jsonb(by_zone),
_jsonb(by_risk), events, critical, updated_at)
) )
# 48시간 이전 정리 # 48시간 이전 정리
cutoff = now - timedelta(hours=48) cutoff = updated_at - timedelta(hours=48)
cur.execute(f"DELETE FROM {STATS_HOURLY} WHERE stat_hour < %s", (cutoff,)) cur.execute(f"DELETE FROM {STATS_HOURLY} WHERE stat_hour < %s", (cutoff,))
conn.commit() conn.commit()
result = {'hour': hour_start.isoformat(), 'detections': total, 'events': events} result = {
'hour': hour_start.isoformat(),
'detections': total,
'events': events,
'critical': critical,
'categories': len(by_category),
'zones': len(by_zone),
}
logger.info(f'stats_aggregator hourly: {result}') logger.info(f'stats_aggregator hourly: {result}')
return result return result

파일 보기

@ -213,6 +213,7 @@ def run_analysis_cycle():
)) ))
# ── 5.5 경량 분석 — 파이프라인 미통과 412* 선박 ── # ── 5.5 경량 분석 — 파이프라인 미통과 412* 선박 ──
# vessel_store._tracks의 24h 누적 궤적을 직접 활용하여 dark/spoof 신호도 산출.
from algorithms.risk import compute_lightweight_risk_score from algorithms.risk import compute_lightweight_risk_score
pipeline_mmsis = {c['mmsi'] for c in classifications} pipeline_mmsis = {c['mmsi'] for c in classifications}
@ -222,6 +223,8 @@ def run_analysis_cycle():
now = datetime.now(timezone.utc) now = datetime.now(timezone.utc)
all_positions = vessel_store.get_all_latest_positions() all_positions = vessel_store.get_all_latest_positions()
lw_count = 0 lw_count = 0
lw_dark = 0
lw_spoof = 0
for mmsi in lightweight_mmsis: for mmsi in lightweight_mmsis:
pos = all_positions.get(mmsi) pos = all_positions.get(mmsi)
if pos is None or pos.get('lat') is None: if pos is None or pos.get('lat') is None:
@ -239,9 +242,35 @@ def run_analysis_cycle():
else: else:
state = 'SAILING' state = 'SAILING'
# 24h 누적 궤적으로 dark/spoofing 산출 (vessel_store._tracks 직접 접근)
df_v = vessel_store._tracks.get(mmsi)
dark = False
gap_min = 0
spoof_score = 0.0
speed_jumps = 0
if df_v is not None and len(df_v) >= 2:
try:
dark, gap_min = is_dark_vessel(df_v)
except Exception:
pass
try:
spoof_score = compute_spoofing_score(df_v)
except Exception:
pass
try:
speed_jumps = count_speed_jumps(df_v)
except Exception:
pass
if dark:
lw_dark += 1
if spoof_score > 0.5:
lw_spoof += 1
is_permitted = vessel_store.is_permitted(mmsi) is_permitted = vessel_store.is_permitted(mmsi)
risk_score, risk_level = compute_lightweight_risk_score( risk_score, risk_level = compute_lightweight_risk_score(
zone_info, sog, is_permitted=is_permitted, zone_info, sog, is_permitted=is_permitted,
is_dark=dark, gap_duration_min=gap_min,
spoofing_score=spoof_score,
) )
# BD-09 오프셋은 중국 선박이므로 제외 (412* = 중국) # BD-09 오프셋은 중국 선박이므로 제외 (412* = 중국)
@ -256,11 +285,11 @@ def run_analysis_cycle():
activity_state=state, activity_state=state,
ucaf_score=0.0, ucaf_score=0.0,
ucft_score=0.0, ucft_score=0.0,
is_dark=False, is_dark=dark,
gap_duration_min=0, gap_duration_min=gap_min,
spoofing_score=0.0, spoofing_score=spoof_score,
bd09_offset_m=0.0, bd09_offset_m=0.0,
speed_jump_count=0, speed_jump_count=speed_jumps,
cluster_id=-1, cluster_id=-1,
cluster_size=0, cluster_size=0,
is_leader=False, is_leader=False,
@ -272,7 +301,10 @@ def run_analysis_cycle():
transship_duration_min=0, transship_duration_min=0,
)) ))
lw_count += 1 lw_count += 1
logger.info('lightweight analysis: %d vessels', lw_count) logger.info(
'lightweight analysis: %d vessels (dark=%d, spoof>0.5=%d)',
lw_count, lw_dark, lw_spoof,
)
# 6. 환적 의심 탐지 (pair_history 모듈 레벨로 사이클 간 유지) # 6. 환적 의심 탐지 (pair_history 모듈 레벨로 사이클 간 유지)
from algorithms.transshipment import detect_transshipment from algorithms.transshipment import detect_transshipment