#!/bin/bash # prediction 시간당 상태 스냅샷 수집기 # 실행 환경: redis-211 서버 (prediction 서비스 호스트) # cron: 0 * * * * /home/apps/kcg-ai-prediction/scripts/hourly-analysis-snapshot.sh # # 출력: /home/apps/kcg-ai-prediction/data/hourly-analysis/YYYYMMDD-HHMM.txt # 수집 대상: # 1. vessel_analysis_results 전체 분포 (pipeline vs lightweight, dark/spoof/risk) # 2. zone_code 분포 + dark 교차 집계 # 3. dark vessel gap_duration_min 분포 # 4. dark vessel activity_state 분포 # 5. dark vessel 상세 샘플 20건 (mmsi/zone/gap/lat/lon) # 6. prediction_events 카테고리×level 분포 # 7. prediction_stats_hourly 최근 2건 # 8. prediction_kpi_realtime 전체 # 9. risk_score 히스토그램 # 10. 직전 1시간 사이클 로그 (journalctl) set -u OUTDIR=/home/apps/kcg-ai-prediction/data/hourly-analysis mkdir -p "$OUTDIR" STAMP=$(date '+%Y%m%d-%H%M') OUT="$OUTDIR/$STAMP.txt" export PGPASSWORD=Kcg2026ai PSQL="psql -U kcg-app -d kcgaidb -h 211.208.115.83 -P pager=off" { echo "# prediction hourly snapshot" echo "# generated: $(date '+%Y-%m-%d %H:%M:%S %Z')" echo "# host: $(hostname)" echo "" $PSQL << 'SQL' \echo === 1. VESSEL_ANALYSIS overview (last 1h) === SELECT count(*) total, count(*) FILTER (WHERE vessel_type != 'UNKNOWN') pipeline_path, count(*) FILTER (WHERE vessel_type = 'UNKNOWN') lightweight_path, count(*) FILTER (WHERE is_dark) dark, count(*) FILTER (WHERE spoofing_score > 0.5) spoof_hi, count(*) FILTER (WHERE spoofing_score > 0) spoof_any, count(*) FILTER (WHERE risk_score >= 70) crit_score, count(*) FILTER (WHERE risk_level='CRITICAL') crit_lvl, count(*) FILTER (WHERE risk_level='HIGH') high_lvl, max(risk_score) max_risk, round(avg(risk_score)::numeric, 2) avg_risk, count(*) FILTER (WHERE transship_suspect) transship FROM kcg.vessel_analysis_results WHERE analyzed_at > now() - interval '1 hour'; \echo \echo === 2. ZONE × DARK distribution === SELECT zone_code, count(*) total, count(*) FILTER (WHERE is_dark) dark, count(*) FILTER (WHERE risk_score >= 70) crit, round(avg(risk_score)::numeric, 1) avg_risk FROM kcg.vessel_analysis_results WHERE analyzed_at > now() - interval '1 hour' GROUP BY zone_code ORDER BY total DESC; \echo \echo === 3. DARK GAP distribution (all vessels in last 1h) === SELECT CASE WHEN gap_duration_min < 30 THEN 'a_lt30' WHEN gap_duration_min < 60 THEN 'b_30-59' WHEN gap_duration_min < 120 THEN 'c_60-119' WHEN gap_duration_min < 360 THEN 'd_120-359' WHEN gap_duration_min < 1440 THEN 'e_360-1439' ELSE 'f_gte1440' END gap_bucket, count(*) total, count(*) FILTER (WHERE is_dark) dark, count(*) FILTER (WHERE is_dark AND vessel_type='UNKNOWN') dark_lightweight, count(*) FILTER (WHERE is_dark AND vessel_type!='UNKNOWN') dark_pipeline FROM kcg.vessel_analysis_results WHERE analyzed_at > now() - interval '1 hour' GROUP BY gap_bucket ORDER BY gap_bucket; \echo \echo === 4. DARK vessels by activity_state === SELECT activity_state, count(*), round(avg(gap_duration_min)::numeric, 0) avg_gap_min FROM kcg.vessel_analysis_results WHERE analyzed_at > now() - interval '1 hour' AND is_dark GROUP BY activity_state ORDER BY count DESC; \echo \echo === 5. DARK sample top 20 by gap (mmsi/zone/gap/state) === SELECT mmsi, zone_code, activity_state, gap_duration_min, risk_score FROM ( SELECT DISTINCT ON (mmsi) mmsi, zone_code, activity_state, gap_duration_min, risk_score, analyzed_at FROM kcg.vessel_analysis_results WHERE analyzed_at > now() - interval '1 hour' AND is_dark ORDER BY mmsi, analyzed_at DESC ) latest ORDER BY gap_duration_min DESC LIMIT 20; \echo \echo === 6. PREDICTION_EVENTS last 1h by category×level === SELECT category, level, count(*) cnt FROM kcg.prediction_events WHERE created_at > now() - interval '1 hour' GROUP BY category, level ORDER BY cnt DESC; \echo \echo === 7. STATS_HOURLY latest 3 rows === SELECT stat_hour, total_detections, event_count, critical_count, by_category::text, by_zone::text FROM kcg.prediction_stats_hourly ORDER BY stat_hour DESC LIMIT 3; \echo \echo === 8. KPI REALTIME === SELECT kpi_key, value, trend, delta_pct, updated_at FROM kcg.prediction_kpi_realtime ORDER BY kpi_key; \echo \echo === 9. RISK_SCORE histogram (last 1h) === SELECT CASE WHEN risk_score < 10 THEN 'a_0-9' WHEN risk_score < 30 THEN 'b_10-29' WHEN risk_score < 50 THEN 'c_30-49' WHEN risk_score < 70 THEN 'd_50-69' WHEN risk_score < 90 THEN 'e_70-89' ELSE 'f_90-100' END bucket, count(*) cnt, count(*) FILTER (WHERE vessel_type='UNKNOWN') lightweight FROM kcg.vessel_analysis_results WHERE analyzed_at > now() - interval '1 hour' GROUP BY bucket ORDER BY bucket; \echo \echo === 10. TRANSSHIP, SPOOFING, FLEET 요약 === SELECT count(*) FILTER (WHERE transship_suspect) transship_ct, count(*) FILTER (WHERE spoofing_score > 0.7) spoof_gt070, count(*) FILTER (WHERE spoofing_score > 0.5 AND spoofing_score <= 0.7) spoof_050_070, count(*) FILTER (WHERE speed_jump_count > 0) speed_jumps, count(*) FILTER (WHERE fleet_is_leader) fleet_leader, count(DISTINCT fleet_cluster_id) FILTER (WHERE fleet_cluster_id IS NOT NULL AND fleet_cluster_id > 0) fleet_clusters FROM kcg.vessel_analysis_results WHERE analyzed_at > now() - interval '1 hour'; \echo \echo === 10-1. FLEET_ROLE distribution === SELECT fleet_role, count(*), count(DISTINCT mmsi) uniq_mmsi FROM kcg.vessel_analysis_results WHERE analyzed_at > now() - interval '1 hour' GROUP BY fleet_role ORDER BY count DESC; \echo \echo === 10-2. TRANSSHIPMENT duration histogram === SELECT CASE WHEN transship_duration_min < 5 THEN 'a_0-4' WHEN transship_duration_min < 15 THEN 'b_5-14' WHEN transship_duration_min < 30 THEN 'c_15-29' WHEN transship_duration_min < 60 THEN 'd_30-59' WHEN transship_duration_min < 120 THEN 'e_60-119' ELSE 'f_gte120' END bucket, count(*) FROM kcg.vessel_analysis_results WHERE analyzed_at > now() - interval '1 hour' AND transship_suspect GROUP BY bucket ORDER BY bucket; \echo \echo === G1. PIPELINE vessel_type (어구 타입) distribution === SELECT vessel_type, count(*), count(*) FILTER (WHERE fishing_pct > 50) active_fishing, round(avg(fishing_pct)::numeric, 1) avg_fish_pct, round(avg(ucaf_score)::numeric, 3) avg_ucaf, round(avg(ucft_score)::numeric, 3) avg_ucft, round(avg(risk_score)::numeric, 1) avg_risk FROM kcg.vessel_analysis_results WHERE analyzed_at > now() - interval '1 hour' GROUP BY vessel_type ORDER BY count DESC; \echo \echo === G2. ACTIVITY_STATE distribution (전체) === SELECT activity_state, count(*), count(*) FILTER (WHERE vessel_type != 'UNKNOWN') pipeline, round(avg(risk_score)::numeric, 1) avg_risk FROM kcg.vessel_analysis_results WHERE analyzed_at > now() - interval '1 hour' GROUP BY activity_state ORDER BY count DESC; \echo \echo === G3. GEAR_GROUP_PARENT_RESOLUTION status + confidence === SELECT status, count(*), round(avg(confidence)::numeric, 3) avg_conf, round(avg(top_score)::numeric, 3) avg_top, round(avg(score_margin)::numeric, 3) avg_margin, round(avg(stable_cycles)::numeric, 1) avg_stable FROM kcg.gear_group_parent_resolution GROUP BY status ORDER BY count DESC; \echo \echo === G3-1. PARENT_RESOLUTION decision_source === SELECT coalesce(decision_source, '(null)') ds, status, count(*) FROM kcg.gear_group_parent_resolution GROUP BY ds, status ORDER BY count DESC LIMIT 20; \echo \echo === G4. GEAR_GROUP_EPISODES (active) === SELECT status, continuity_source, count(*), round(avg(continuity_score)::numeric, 3) avg_cont, round(avg(current_member_count)::numeric, 1) avg_members, round(avg(EXTRACT(EPOCH FROM (now() - first_seen_at))/3600)::numeric, 1) avg_age_h FROM kcg.gear_group_episodes WHERE last_seen_at > now() - interval '24 hours' GROUP BY status, continuity_source ORDER BY count DESC; \echo \echo === G5. GEAR_CORRELATION_SCORES (current_score) 분포 === SELECT CASE WHEN current_score < 0.3 THEN 'a_lt0.3' WHEN current_score < 0.5 THEN 'b_0.3-0.5' WHEN current_score < 0.7 THEN 'c_0.5-0.7' WHEN current_score < 0.85 THEN 'd_0.7-0.85' ELSE 'e_gte0.85' END bucket, count(*), count(DISTINCT group_key) uniq_groups, count(DISTINCT target_mmsi) uniq_targets, round(avg(streak_count)::numeric, 1) avg_streak FROM kcg.gear_correlation_scores WHERE updated_at > now() - interval '1 hour' GROUP BY bucket ORDER BY bucket; \echo \echo === G5-1. CORRELATION freeze_state === SELECT freeze_state, count(*), round(avg(current_score)::numeric, 3) avg_score FROM kcg.gear_correlation_scores WHERE updated_at > now() - interval '1 hour' GROUP BY freeze_state ORDER BY count DESC; \echo \echo === G6. GROUP_POLYGON_SNAPSHOTS (last 1h, by type × zone) === SELECT group_type, coalesce(zone_id, '(null)') zone, count(*), round(avg(area_sq_nm)::numeric, 2) avg_area_nm, round(avg(member_count)::numeric, 1) avg_members FROM kcg.group_polygon_snapshots WHERE snapshot_time > now() - interval '1 hour' GROUP BY group_type, zone_id ORDER BY count DESC LIMIT 20; \echo \echo === G7. IS_PERMITTED breakdown (lightweight path 기준) === SELECT count(*) FILTER (WHERE vessel_type != 'UNKNOWN') pipeline_ct, count(*) FILTER (WHERE vessel_type = 'UNKNOWN') lightweight_ct, count(DISTINCT mmsi) FILTER (WHERE risk_score >= 20) risk_gte20_uniq, count(DISTINCT mmsi) FILTER (WHERE risk_score >= 50) risk_gte50_uniq, count(DISTINCT mmsi) FILTER (WHERE risk_score >= 70) risk_gte70_uniq FROM kcg.vessel_analysis_results WHERE analyzed_at > now() - interval '1 hour'; \echo \echo === G8. VIOLATION_CATEGORIES (last 1h, unnest) === SELECT unnest(violation_categories) vcat, count(*) FROM kcg.vessel_analysis_results WHERE analyzed_at > now() - interval '1 hour' AND violation_categories IS NOT NULL GROUP BY vcat ORDER BY count DESC LIMIT 20; \echo \echo === G9. PREDICTION_EVENTS 24h hourly trend (KST) === SELECT date_trunc('hour', occurred_at AT TIME ZONE 'Asia/Seoul') hr, count(*) tot, count(*) FILTER (WHERE category='DARK_VESSEL') dark, count(*) FILTER (WHERE category='ILLEGAL_TRANSSHIP') transship, count(*) FILTER (WHERE category='EEZ_INTRUSION') eez, count(*) FILTER (WHERE category='HIGH_RISK_VESSEL') high_risk, count(*) FILTER (WHERE category='ZONE_DEPARTURE') zone_dep, count(*) FILTER (WHERE level='CRITICAL') critical FROM kcg.prediction_events WHERE created_at > now() - interval '24 hours' GROUP BY hr ORDER BY hr DESC LIMIT 25; \echo \echo === G10. PREDICTION_ALERTS (last 1h) === SELECT channel, delivery_status, count(*), round(avg(ai_confidence)::numeric, 3) avg_conf FROM kcg.prediction_alerts WHERE sent_at > now() - interval '1 hour' GROUP BY channel, delivery_status ORDER BY count DESC; SQL echo "" echo "=== 11. DARK SAMPLE latest position (snpdb t_vessel_tracks_5min) ===" # Cross-database 불가 → 두 단계: kcgaidb에서 mmsi 추출 → snpdb에 별도 쿼리 DARK_MMSIS=$(PGPASSWORD=Kcg2026ai psql -U kcg-app -d kcgaidb -h 211.208.115.83 -tA -c " SELECT string_agg(quote_literal(mmsi), ',') FROM (SELECT DISTINCT ON (mmsi) mmsi, gap_duration_min, analyzed_at FROM kcg.vessel_analysis_results WHERE analyzed_at > now() - interval '1 hour' AND is_dark ORDER BY mmsi, analyzed_at DESC) v WHERE v.mmsi IN ( SELECT mmsi FROM (SELECT DISTINCT ON (mmsi) mmsi, gap_duration_min, analyzed_at FROM kcg.vessel_analysis_results WHERE analyzed_at > now() - interval '1 hour' AND is_dark ORDER BY mmsi, analyzed_at DESC) x ORDER BY gap_duration_min DESC LIMIT 20 );" 2>/dev/null) if [ -n "$DARK_MMSIS" ]; then PGPASSWORD='snp#8932' psql -U snp -d snpdb -h 211.208.115.83 -P pager=off -c " SELECT DISTINCT ON (mmsi) mmsi, time_bucket, round(ST_Y(ST_EndPoint(track_geom))::numeric, 4) lat, round(ST_X(ST_EndPoint(track_geom))::numeric, 4) lon FROM signal.t_vessel_tracks_5min WHERE mmsi IN ($DARK_MMSIS) AND time_bucket > now() - interval '24 hours' ORDER BY mmsi, time_bucket DESC; " 2>&1 | head -30 else echo "(no dark vessels in last 1h)" fi echo "" echo "=== 12. PREDICTION_EVENTS occurred_at distribution by 10-min buckets ===" PGPASSWORD=Kcg2026ai psql -U kcg-app -d kcgaidb -h 211.208.115.83 -P pager=off -c " SELECT date_trunc('hour', occurred_at) + (date_part('minute', occurred_at)::int / 10 * interval '10 minutes') bucket, category, count(*) FROM kcg.prediction_events WHERE created_at > now() - interval '1 hour' GROUP BY bucket, category ORDER BY bucket DESC, count DESC LIMIT 30; " 2>&1 echo "" echo "=== 13. CYCLE LOG (last 65 min) ===" journalctl -u kcg-ai-prediction --since '65 minutes ago' --no-pager 2>/dev/null | \ grep -E 'lightweight analysis|event_generator:|stats_aggregator hourly|kpi_writer:|analysis cycle:|ERROR|Traceback' | \ tail -60 echo "" echo "=== END ===" } > "$OUT" 2>&1 echo "[snapshot] saved: $OUT"