kcg-ai-monitoring/prediction/scripts/hourly-analysis-snapshot.sh
htlee b15a94066a docs: prediction 2차 개편 릴리즈 노트 + hourly snapshot 스크립트
- RELEASE-NOTES [Unreleased] 섹션에 dark 의심 점수화 + transship 재설계 변경사항 추가
- prediction/scripts/hourly-analysis-snapshot.sh: 시간별 상태 스냅샷 수집 (25개 섹션)
2026-04-09 09:55:46 +09:00

341 lines
13 KiB
Bash
Executable File
Raw Blame 히스토리

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/bin/bash
# prediction 시간당 상태 스냅샷 수집기
# 실행 환경: redis-211 서버 (prediction 서비스 호스트)
# cron: 0 * * * * /home/apps/kcg-ai-prediction/scripts/hourly-analysis-snapshot.sh
#
# 출력: /home/apps/kcg-ai-prediction/data/hourly-analysis/YYYYMMDD-HHMM.txt
# 수집 대상:
# 1. vessel_analysis_results 전체 분포 (pipeline vs lightweight, dark/spoof/risk)
# 2. zone_code 분포 + dark 교차 집계
# 3. dark vessel gap_duration_min 분포
# 4. dark vessel activity_state 분포
# 5. dark vessel 상세 샘플 20건 (mmsi/zone/gap/lat/lon)
# 6. prediction_events 카테고리×level 분포
# 7. prediction_stats_hourly 최근 2건
# 8. prediction_kpi_realtime 전체
# 9. risk_score 히스토그램
# 10. 직전 1시간 사이클 로그 (journalctl)
set -u
OUTDIR=/home/apps/kcg-ai-prediction/data/hourly-analysis
mkdir -p "$OUTDIR"
STAMP=$(date '+%Y%m%d-%H%M')
OUT="$OUTDIR/$STAMP.txt"
export PGPASSWORD=Kcg2026ai
PSQL="psql -U kcg-app -d kcgaidb -h 211.208.115.83 -P pager=off"
{
echo "# prediction hourly snapshot"
echo "# generated: $(date '+%Y-%m-%d %H:%M:%S %Z')"
echo "# host: $(hostname)"
echo ""
$PSQL << 'SQL'
\echo === 1. VESSEL_ANALYSIS overview (last 1h) ===
SELECT count(*) total,
count(*) FILTER (WHERE vessel_type != 'UNKNOWN') pipeline_path,
count(*) FILTER (WHERE vessel_type = 'UNKNOWN') lightweight_path,
count(*) FILTER (WHERE is_dark) dark,
count(*) FILTER (WHERE spoofing_score > 0.5) spoof_hi,
count(*) FILTER (WHERE spoofing_score > 0) spoof_any,
count(*) FILTER (WHERE risk_score >= 70) crit_score,
count(*) FILTER (WHERE risk_level='CRITICAL') crit_lvl,
count(*) FILTER (WHERE risk_level='HIGH') high_lvl,
max(risk_score) max_risk,
round(avg(risk_score)::numeric, 2) avg_risk,
count(*) FILTER (WHERE transship_suspect) transship
FROM kcg.vessel_analysis_results
WHERE analyzed_at > now() - interval '1 hour';
\echo
\echo === 2. ZONE × DARK distribution ===
SELECT zone_code,
count(*) total,
count(*) FILTER (WHERE is_dark) dark,
count(*) FILTER (WHERE risk_score >= 70) crit,
round(avg(risk_score)::numeric, 1) avg_risk
FROM kcg.vessel_analysis_results
WHERE analyzed_at > now() - interval '1 hour'
GROUP BY zone_code ORDER BY total DESC;
\echo
\echo === 3. DARK GAP distribution (all vessels in last 1h) ===
SELECT CASE
WHEN gap_duration_min < 30 THEN 'a_lt30'
WHEN gap_duration_min < 60 THEN 'b_30-59'
WHEN gap_duration_min < 120 THEN 'c_60-119'
WHEN gap_duration_min < 360 THEN 'd_120-359'
WHEN gap_duration_min < 1440 THEN 'e_360-1439'
ELSE 'f_gte1440' END gap_bucket,
count(*) total,
count(*) FILTER (WHERE is_dark) dark,
count(*) FILTER (WHERE is_dark AND vessel_type='UNKNOWN') dark_lightweight,
count(*) FILTER (WHERE is_dark AND vessel_type!='UNKNOWN') dark_pipeline
FROM kcg.vessel_analysis_results
WHERE analyzed_at > now() - interval '1 hour'
GROUP BY gap_bucket ORDER BY gap_bucket;
\echo
\echo === 4. DARK vessels by activity_state ===
SELECT activity_state, count(*), round(avg(gap_duration_min)::numeric, 0) avg_gap_min
FROM kcg.vessel_analysis_results
WHERE analyzed_at > now() - interval '1 hour' AND is_dark
GROUP BY activity_state ORDER BY count DESC;
\echo
\echo === 5. DARK sample top 20 by gap (mmsi/zone/gap/state) ===
SELECT mmsi, zone_code, activity_state, gap_duration_min, risk_score
FROM (
SELECT DISTINCT ON (mmsi) mmsi, zone_code, activity_state, gap_duration_min,
risk_score, analyzed_at
FROM kcg.vessel_analysis_results
WHERE analyzed_at > now() - interval '1 hour' AND is_dark
ORDER BY mmsi, analyzed_at DESC
) latest
ORDER BY gap_duration_min DESC LIMIT 20;
\echo
\echo === 6. PREDICTION_EVENTS last 1h by category×level ===
SELECT category, level, count(*) cnt
FROM kcg.prediction_events
WHERE created_at > now() - interval '1 hour'
GROUP BY category, level ORDER BY cnt DESC;
\echo
\echo === 7. STATS_HOURLY latest 3 rows ===
SELECT stat_hour, total_detections, event_count, critical_count,
by_category::text, by_zone::text
FROM kcg.prediction_stats_hourly
ORDER BY stat_hour DESC LIMIT 3;
\echo
\echo === 8. KPI REALTIME ===
SELECT kpi_key, value, trend, delta_pct, updated_at
FROM kcg.prediction_kpi_realtime ORDER BY kpi_key;
\echo
\echo === 9. RISK_SCORE histogram (last 1h) ===
SELECT CASE
WHEN risk_score < 10 THEN 'a_0-9'
WHEN risk_score < 30 THEN 'b_10-29'
WHEN risk_score < 50 THEN 'c_30-49'
WHEN risk_score < 70 THEN 'd_50-69'
WHEN risk_score < 90 THEN 'e_70-89'
ELSE 'f_90-100' END bucket,
count(*) cnt,
count(*) FILTER (WHERE vessel_type='UNKNOWN') lightweight
FROM kcg.vessel_analysis_results
WHERE analyzed_at > now() - interval '1 hour'
GROUP BY bucket ORDER BY bucket;
\echo
\echo === 10. TRANSSHIP, SPOOFING, FLEET 요약 ===
SELECT
count(*) FILTER (WHERE transship_suspect) transship_ct,
count(*) FILTER (WHERE spoofing_score > 0.7) spoof_gt070,
count(*) FILTER (WHERE spoofing_score > 0.5 AND spoofing_score <= 0.7) spoof_050_070,
count(*) FILTER (WHERE speed_jump_count > 0) speed_jumps,
count(*) FILTER (WHERE fleet_is_leader) fleet_leader,
count(DISTINCT fleet_cluster_id) FILTER (WHERE fleet_cluster_id IS NOT NULL AND fleet_cluster_id > 0) fleet_clusters
FROM kcg.vessel_analysis_results
WHERE analyzed_at > now() - interval '1 hour';
\echo
\echo === 10-1. FLEET_ROLE distribution ===
SELECT fleet_role, count(*), count(DISTINCT mmsi) uniq_mmsi
FROM kcg.vessel_analysis_results
WHERE analyzed_at > now() - interval '1 hour'
GROUP BY fleet_role ORDER BY count DESC;
\echo
\echo === 10-2. TRANSSHIPMENT duration histogram ===
SELECT CASE
WHEN transship_duration_min < 5 THEN 'a_0-4'
WHEN transship_duration_min < 15 THEN 'b_5-14'
WHEN transship_duration_min < 30 THEN 'c_15-29'
WHEN transship_duration_min < 60 THEN 'd_30-59'
WHEN transship_duration_min < 120 THEN 'e_60-119'
ELSE 'f_gte120' END bucket, count(*)
FROM kcg.vessel_analysis_results
WHERE analyzed_at > now() - interval '1 hour' AND transship_suspect
GROUP BY bucket ORDER BY bucket;
\echo
\echo === G1. PIPELINE vessel_type (어구 타입) distribution ===
SELECT vessel_type,
count(*),
count(*) FILTER (WHERE fishing_pct > 50) active_fishing,
round(avg(fishing_pct)::numeric, 1) avg_fish_pct,
round(avg(ucaf_score)::numeric, 3) avg_ucaf,
round(avg(ucft_score)::numeric, 3) avg_ucft,
round(avg(risk_score)::numeric, 1) avg_risk
FROM kcg.vessel_analysis_results
WHERE analyzed_at > now() - interval '1 hour'
GROUP BY vessel_type ORDER BY count DESC;
\echo
\echo === G2. ACTIVITY_STATE distribution (전체) ===
SELECT activity_state, count(*),
count(*) FILTER (WHERE vessel_type != 'UNKNOWN') pipeline,
round(avg(risk_score)::numeric, 1) avg_risk
FROM kcg.vessel_analysis_results
WHERE analyzed_at > now() - interval '1 hour'
GROUP BY activity_state ORDER BY count DESC;
\echo
\echo === G3. GEAR_GROUP_PARENT_RESOLUTION status + confidence ===
SELECT status, count(*),
round(avg(confidence)::numeric, 3) avg_conf,
round(avg(top_score)::numeric, 3) avg_top,
round(avg(score_margin)::numeric, 3) avg_margin,
round(avg(stable_cycles)::numeric, 1) avg_stable
FROM kcg.gear_group_parent_resolution
GROUP BY status ORDER BY count DESC;
\echo
\echo === G3-1. PARENT_RESOLUTION decision_source ===
SELECT coalesce(decision_source, '(null)') ds, status, count(*)
FROM kcg.gear_group_parent_resolution
GROUP BY ds, status ORDER BY count DESC LIMIT 20;
\echo
\echo === G4. GEAR_GROUP_EPISODES (active) ===
SELECT status, continuity_source, count(*),
round(avg(continuity_score)::numeric, 3) avg_cont,
round(avg(current_member_count)::numeric, 1) avg_members,
round(avg(EXTRACT(EPOCH FROM (now() - first_seen_at))/3600)::numeric, 1) avg_age_h
FROM kcg.gear_group_episodes
WHERE last_seen_at > now() - interval '24 hours'
GROUP BY status, continuity_source ORDER BY count DESC;
\echo
\echo === G5. GEAR_CORRELATION_SCORES (current_score) 분포 ===
SELECT CASE
WHEN current_score < 0.3 THEN 'a_lt0.3'
WHEN current_score < 0.5 THEN 'b_0.3-0.5'
WHEN current_score < 0.7 THEN 'c_0.5-0.7'
WHEN current_score < 0.85 THEN 'd_0.7-0.85'
ELSE 'e_gte0.85' END bucket,
count(*),
count(DISTINCT group_key) uniq_groups,
count(DISTINCT target_mmsi) uniq_targets,
round(avg(streak_count)::numeric, 1) avg_streak
FROM kcg.gear_correlation_scores
WHERE updated_at > now() - interval '1 hour'
GROUP BY bucket ORDER BY bucket;
\echo
\echo === G5-1. CORRELATION freeze_state ===
SELECT freeze_state, count(*), round(avg(current_score)::numeric, 3) avg_score
FROM kcg.gear_correlation_scores
WHERE updated_at > now() - interval '1 hour'
GROUP BY freeze_state ORDER BY count DESC;
\echo
\echo === G6. GROUP_POLYGON_SNAPSHOTS (last 1h, by type × zone) ===
SELECT group_type,
coalesce(zone_id, '(null)') zone,
count(*),
round(avg(area_sq_nm)::numeric, 2) avg_area_nm,
round(avg(member_count)::numeric, 1) avg_members
FROM kcg.group_polygon_snapshots
WHERE snapshot_time > now() - interval '1 hour'
GROUP BY group_type, zone_id ORDER BY count DESC LIMIT 20;
\echo
\echo === G7. IS_PERMITTED breakdown (lightweight path 기준) ===
SELECT
count(*) FILTER (WHERE vessel_type != 'UNKNOWN') pipeline_ct,
count(*) FILTER (WHERE vessel_type = 'UNKNOWN') lightweight_ct,
count(DISTINCT mmsi) FILTER (WHERE risk_score >= 20) risk_gte20_uniq,
count(DISTINCT mmsi) FILTER (WHERE risk_score >= 50) risk_gte50_uniq,
count(DISTINCT mmsi) FILTER (WHERE risk_score >= 70) risk_gte70_uniq
FROM kcg.vessel_analysis_results
WHERE analyzed_at > now() - interval '1 hour';
\echo
\echo === G8. VIOLATION_CATEGORIES (last 1h, unnest) ===
SELECT unnest(violation_categories) vcat, count(*)
FROM kcg.vessel_analysis_results
WHERE analyzed_at > now() - interval '1 hour' AND violation_categories IS NOT NULL
GROUP BY vcat ORDER BY count DESC LIMIT 20;
\echo
\echo === G9. PREDICTION_EVENTS 24h hourly trend (KST) ===
SELECT date_trunc('hour', occurred_at AT TIME ZONE 'Asia/Seoul') hr,
count(*) tot,
count(*) FILTER (WHERE category='DARK_VESSEL') dark,
count(*) FILTER (WHERE category='ILLEGAL_TRANSSHIP') transship,
count(*) FILTER (WHERE category='EEZ_INTRUSION') eez,
count(*) FILTER (WHERE category='HIGH_RISK_VESSEL') high_risk,
count(*) FILTER (WHERE category='ZONE_DEPARTURE') zone_dep,
count(*) FILTER (WHERE level='CRITICAL') critical
FROM kcg.prediction_events
WHERE created_at > now() - interval '24 hours'
GROUP BY hr ORDER BY hr DESC LIMIT 25;
\echo
\echo === G10. PREDICTION_ALERTS (last 1h) ===
SELECT channel, delivery_status, count(*),
round(avg(ai_confidence)::numeric, 3) avg_conf
FROM kcg.prediction_alerts
WHERE sent_at > now() - interval '1 hour'
GROUP BY channel, delivery_status ORDER BY count DESC;
SQL
echo ""
echo "=== 11. DARK SAMPLE latest position (snpdb t_vessel_tracks_5min) ==="
# Cross-database 불가 → 두 단계: kcgaidb에서 mmsi 추출 → snpdb에 별도 쿼리
DARK_MMSIS=$(PGPASSWORD=Kcg2026ai psql -U kcg-app -d kcgaidb -h 211.208.115.83 -tA -c "
SELECT string_agg(quote_literal(mmsi), ',')
FROM (SELECT DISTINCT ON (mmsi) mmsi, gap_duration_min, analyzed_at
FROM kcg.vessel_analysis_results
WHERE analyzed_at > now() - interval '1 hour' AND is_dark
ORDER BY mmsi, analyzed_at DESC) v
WHERE v.mmsi IN (
SELECT mmsi FROM (SELECT DISTINCT ON (mmsi) mmsi, gap_duration_min, analyzed_at
FROM kcg.vessel_analysis_results
WHERE analyzed_at > now() - interval '1 hour' AND is_dark
ORDER BY mmsi, analyzed_at DESC) x
ORDER BY gap_duration_min DESC LIMIT 20
);" 2>/dev/null)
if [ -n "$DARK_MMSIS" ]; then
PGPASSWORD='snp#8932' psql -U snp -d snpdb -h 211.208.115.83 -P pager=off -c "
SELECT DISTINCT ON (mmsi) mmsi, time_bucket,
round(ST_Y(ST_EndPoint(track_geom))::numeric, 4) lat,
round(ST_X(ST_EndPoint(track_geom))::numeric, 4) lon
FROM signal.t_vessel_tracks_5min
WHERE mmsi IN ($DARK_MMSIS) AND time_bucket > now() - interval '24 hours'
ORDER BY mmsi, time_bucket DESC;
" 2>&1 | head -30
else
echo "(no dark vessels in last 1h)"
fi
echo ""
echo "=== 12. PREDICTION_EVENTS occurred_at distribution by 10-min buckets ==="
PGPASSWORD=Kcg2026ai psql -U kcg-app -d kcgaidb -h 211.208.115.83 -P pager=off -c "
SELECT date_trunc('hour', occurred_at) + (date_part('minute', occurred_at)::int / 10 * interval '10 minutes') bucket,
category, count(*)
FROM kcg.prediction_events
WHERE created_at > now() - interval '1 hour'
GROUP BY bucket, category
ORDER BY bucket DESC, count DESC LIMIT 30;
" 2>&1
echo ""
echo "=== 13. CYCLE LOG (last 65 min) ==="
journalctl -u kcg-ai-prediction --since '65 minutes ago' --no-pager 2>/dev/null | \
grep -E 'lightweight analysis|event_generator:|stats_aggregator hourly|kpi_writer:|analysis cycle:|ERROR|Traceback' | \
tail -60
echo ""
echo "=== END ==="
} > "$OUT" 2>&1
echo "[snapshot] saved: $OUT"