diff --git a/.gitea/workflows/deploy.yml b/.gitea/workflows/deploy.yml index fed34a5..30977b2 100644 --- a/.gitea/workflows/deploy.yml +++ b/.gitea/workflows/deploy.yml @@ -68,6 +68,7 @@ jobs: [ -n "$GOOGLE_CLIENT_ID" ] && echo "GOOGLE_CLIENT_ID=${GOOGLE_CLIENT_ID}" >> $DEPLOY_DIR/.env [ -n "$JWT_SECRET" ] && echo "JWT_SECRET=${JWT_SECRET}" >> $DEPLOY_DIR/.env [ -n "$DB_PASSWORD" ] && echo "DB_PASSWORD=${DB_PASSWORD}" >> $DEPLOY_DIR/.env + echo "PREDICTION_BASE_URL=http://192.168.1.18:8001" >> $DEPLOY_DIR/.env # JAR 내부에 application-prod.yml이 있으면 외부 파일 제거 if unzip -l backend/target/kcg.jar | grep -q 'application-prod.yml$'; then @@ -143,6 +144,79 @@ jobs: sleep 10 done + # ═══ Prediction (FastAPI → redis-211) ═══ + - name: Deploy prediction via SSH + env: + DEPLOY_KEY: ${{ secrets.DEPLOY_SSH_KEY }} + PRED_HOST: 192.168.1.18 + PRED_PORT: 32023 + run: | + mkdir -p ~/.ssh + printf '%s\n' "$DEPLOY_KEY" > ~/.ssh/id_deploy + chmod 600 ~/.ssh/id_deploy + + SSH_OPTS="-i ~/.ssh/id_deploy -o StrictHostKeyChecking=no -o ConnectTimeout=10 -o ServerAliveInterval=15 -p $PRED_PORT" + SCP_OPTS="-i ~/.ssh/id_deploy -o StrictHostKeyChecking=no -P $PRED_PORT" + + REMOTE_DIR=/home/apps/kcg-prediction + + # 코드 전송 (rsync 대체: tar + scp) + tar czf /tmp/prediction.tar.gz -C prediction --exclude='__pycache__' --exclude='venv' --exclude='.env' . + for attempt in 1 2 3; do + echo "SCP prediction attempt $attempt/3..." + if scp $SCP_OPTS /tmp/prediction.tar.gz root@$PRED_HOST:/tmp/prediction.tar.gz; then break; fi + [ "$attempt" -eq 3 ] && { echo "ERROR: SCP failed"; exit 1; } + sleep 10 + done + + # systemd 서비스 파일 전송 + scp $SCP_OPTS deploy/kcg-prediction.service root@$PRED_HOST:/tmp/kcg-prediction.service + + # 원격 설치 + 재시작 + for attempt in 1 2 3; do + echo "SSH deploy attempt $attempt/3..." + if ssh $SSH_OPTS root@$PRED_HOST bash -s << 'SCRIPT' + set -e + REMOTE_DIR=/home/apps/kcg-prediction + mkdir -p $REMOTE_DIR + cd $REMOTE_DIR + + # 코드 배포 + tar xzf /tmp/prediction.tar.gz -C $REMOTE_DIR + rm -f /tmp/prediction.tar.gz + + # venv + 의존성 + python3 -m venv venv 2>/dev/null || true + venv/bin/pip install -r requirements.txt -q + + # systemd 서비스 갱신 + if ! diff -q /tmp/kcg-prediction.service /etc/systemd/system/kcg-prediction.service >/dev/null 2>&1; then + cp /tmp/kcg-prediction.service /etc/systemd/system/kcg-prediction.service + systemctl daemon-reload + systemctl enable kcg-prediction + fi + rm -f /tmp/kcg-prediction.service + + # 재시작 + systemctl restart kcg-prediction + + # health 확인 (30초) + for i in $(seq 1 6); do + if curl -sf http://localhost:8001/health > /dev/null 2>&1; then + echo "Prediction healthy (${i})" + exit 0 + fi + sleep 5 + done + echo "WARNING: Prediction health timeout" + journalctl -u kcg-prediction --no-pager -n 10 + exit 1 + SCRIPT + then exit 0; fi + [ "$attempt" -eq 3 ] && { echo "ERROR: SSH failed"; exit 1; } + sleep 10 + done + - name: Cleanup if: always() run: rm -f ~/.ssh/id_deploy diff --git a/backend/src/main/java/gc/mda/kcg/auth/AuthFilter.java b/backend/src/main/java/gc/mda/kcg/auth/AuthFilter.java index 9394f43..8b172c1 100644 --- a/backend/src/main/java/gc/mda/kcg/auth/AuthFilter.java +++ b/backend/src/main/java/gc/mda/kcg/auth/AuthFilter.java @@ -24,6 +24,7 @@ public class AuthFilter extends OncePerRequestFilter { private static final String SENSOR_PATH_PREFIX = "/api/sensor/"; private static final String CCTV_PATH_PREFIX = "/api/cctv/"; private static final String VESSEL_ANALYSIS_PATH_PREFIX = "/api/vessel-analysis"; + private static final String PREDICTION_PATH_PREFIX = "/api/prediction/"; private final JwtProvider jwtProvider; @@ -33,7 +34,8 @@ public class AuthFilter extends OncePerRequestFilter { return path.startsWith(AUTH_PATH_PREFIX) || path.startsWith(SENSOR_PATH_PREFIX) || path.startsWith(CCTV_PATH_PREFIX) - || path.startsWith(VESSEL_ANALYSIS_PATH_PREFIX); + || path.startsWith(VESSEL_ANALYSIS_PATH_PREFIX) + || path.startsWith(PREDICTION_PATH_PREFIX); } @Override diff --git a/backend/src/main/java/gc/mda/kcg/config/AppProperties.java b/backend/src/main/java/gc/mda/kcg/config/AppProperties.java index 5fb83d8..c1c2ff3 100644 --- a/backend/src/main/java/gc/mda/kcg/config/AppProperties.java +++ b/backend/src/main/java/gc/mda/kcg/config/AppProperties.java @@ -43,6 +43,7 @@ public class AppProperties { private String gdeltBaseUrl = "https://api.gdeltproject.org/api/v2/doc/doc"; private String googleNewsBaseUrl = "https://news.google.com/rss/search"; private String celestrakBaseUrl = "https://celestrak.org/NORAD/elements/gp.php"; + private String predictionBaseUrl = "http://localhost:8001"; private int requestDelayMs = 1500; private int backoffMs = 5000; } diff --git a/backend/src/main/java/gc/mda/kcg/domain/analysis/PredictionProxyController.java b/backend/src/main/java/gc/mda/kcg/domain/analysis/PredictionProxyController.java new file mode 100644 index 0000000..f13b8a2 --- /dev/null +++ b/backend/src/main/java/gc/mda/kcg/domain/analysis/PredictionProxyController.java @@ -0,0 +1,80 @@ +package gc.mda.kcg.domain.analysis; + +import gc.mda.kcg.config.AppProperties; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.*; +import org.springframework.web.bind.annotation.*; +import org.springframework.web.client.ResourceAccessException; +import org.springframework.web.client.RestClientException; +import org.springframework.web.client.RestTemplate; + +import java.util.Map; + +/** + * Prediction 서비스(FastAPI) 상태/트리거 프록시 + * GET /api/prediction/health → {predictionBaseUrl}/health + * GET /api/prediction/status → {predictionBaseUrl}/api/v1/analysis/status + * POST /api/prediction/trigger → {predictionBaseUrl}/api/v1/analysis/trigger + */ +@Slf4j +@RestController +@RequestMapping("/api/prediction") +@RequiredArgsConstructor +public class PredictionProxyController { + + private final RestTemplate restTemplate; + private final AppProperties appProperties; + + @GetMapping("/health") + public ResponseEntity health() { + String url = appProperties.getCollector().getPredictionBaseUrl() + "/health"; + return proxyGet(url); + } + + @GetMapping("/status") + public ResponseEntity status() { + String url = appProperties.getCollector().getPredictionBaseUrl() + "/api/v1/analysis/status"; + return proxyGet(url); + } + + @PostMapping("/trigger") + public ResponseEntity trigger(@RequestBody(required = false) Object body) { + String url = appProperties.getCollector().getPredictionBaseUrl() + "/api/v1/analysis/trigger"; + return proxyPost(url, body); + } + + private ResponseEntity proxyGet(String url) { + try { + ResponseEntity upstream = restTemplate.exchange( + url, HttpMethod.GET, null, Object.class); + return ResponseEntity.status(upstream.getStatusCode()).body(upstream.getBody()); + } catch (ResourceAccessException e) { + log.warn("Prediction 서비스 접속 불가: {}", e.getMessage()); + return unreachable(e.getMessage()); + } catch (RestClientException e) { + log.warn("Prediction 서비스 오류: {}", e.getMessage()); + return unreachable(e.getMessage()); + } + } + + private ResponseEntity proxyPost(String url, Object body) { + try { + HttpEntity entity = new HttpEntity<>(body); + ResponseEntity upstream = restTemplate.exchange( + url, HttpMethod.POST, entity, Object.class); + return ResponseEntity.status(upstream.getStatusCode()).body(upstream.getBody()); + } catch (ResourceAccessException e) { + log.warn("Prediction 서비스 접속 불가: {}", e.getMessage()); + return unreachable(e.getMessage()); + } catch (RestClientException e) { + log.warn("Prediction 서비스 오류: {}", e.getMessage()); + return unreachable(e.getMessage()); + } + } + + private ResponseEntity unreachable(String message) { + return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE) + .body(Map.of("status", "unreachable", "error", message != null ? message : "connection failed")); + } +} diff --git a/backend/src/main/resources/application-prod.yml b/backend/src/main/resources/application-prod.yml index b62d921..ea2d3c9 100644 --- a/backend/src/main/resources/application-prod.yml +++ b/backend/src/main/resources/application-prod.yml @@ -11,5 +11,7 @@ app: client-id: ${GOOGLE_CLIENT_ID} auth: allowed-domain: ${AUTH_ALLOWED_DOMAIN:gcsc.co.kr} + collector: + prediction-base-url: ${PREDICTION_BASE_URL:http://192.168.1.18:8001} cors: allowed-origins: http://localhost:5173,https://kcg.gc-si.dev diff --git a/database/migration/006_vessel_analysis_unique.sql b/database/migration/006_vessel_analysis_unique.sql new file mode 100644 index 0000000..6ecf39c --- /dev/null +++ b/database/migration/006_vessel_analysis_unique.sql @@ -0,0 +1,3 @@ +-- UPSERT를 위한 UNIQUE 인덱스 추가 +CREATE UNIQUE INDEX IF NOT EXISTS idx_vessel_analysis_mmsi_ts + ON kcg.vessel_analysis_results(mmsi, timestamp); diff --git a/deploy/kcg-prediction.service b/deploy/kcg-prediction.service new file mode 100644 index 0000000..ff007bc --- /dev/null +++ b/deploy/kcg-prediction.service @@ -0,0 +1,15 @@ +[Unit] +Description=KCG Vessel Analysis (FastAPI) +After=network.target + +[Service] +Type=simple +User=root +WorkingDirectory=/home/apps/kcg-prediction +ExecStart=/home/apps/kcg-prediction/venv/bin/uvicorn main:app --host 0.0.0.0 --port 8001 +EnvironmentFile=/home/apps/kcg-prediction/.env +Restart=always +RestartSec=5 + +[Install] +WantedBy=multi-user.target diff --git a/prediction/algorithms/__init__.py b/prediction/algorithms/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/prediction/algorithms/dark_vessel.py b/prediction/algorithms/dark_vessel.py new file mode 100644 index 0000000..9e8b9f2 --- /dev/null +++ b/prediction/algorithms/dark_vessel.py @@ -0,0 +1,59 @@ +import pandas as pd +from algorithms.location import haversine_nm + +GAP_SUSPICIOUS_SEC = 1800 # 30분 +GAP_HIGH_SUSPICIOUS_SEC = 3600 # 1시간 +GAP_VIOLATION_SEC = 86400 # 24시간 + + +def detect_ais_gaps(df_vessel: pd.DataFrame) -> list[dict]: + """AIS 수신 기록에서 소실 구간 추출.""" + if len(df_vessel) < 2: + return [] + + gaps = [] + records = df_vessel.sort_values('timestamp').to_dict('records') + + for i in range(1, len(records)): + prev, curr = records[i - 1], records[i] + prev_ts = pd.Timestamp(prev['timestamp']) + curr_ts = pd.Timestamp(curr['timestamp']) + gap_sec = (curr_ts - prev_ts).total_seconds() + + if gap_sec < GAP_SUSPICIOUS_SEC: + continue + + disp = haversine_nm( + prev['lat'], prev['lon'], + curr['lat'], curr['lon'], + ) + + if gap_sec >= GAP_VIOLATION_SEC: + severity = 'VIOLATION' + elif gap_sec >= GAP_HIGH_SUSPICIOUS_SEC: + severity = 'HIGH_SUSPICIOUS' + else: + severity = 'SUSPICIOUS' + + gaps.append({ + 'gap_sec': int(gap_sec), + 'gap_min': round(gap_sec / 60, 1), + 'displacement_nm': round(disp, 2), + 'severity': severity, + }) + + return gaps + + +def is_dark_vessel(df_vessel: pd.DataFrame) -> tuple[bool, int]: + """다크베셀 여부 판정. + + Returns: (is_dark, max_gap_duration_min) + """ + gaps = detect_ais_gaps(df_vessel) + if not gaps: + return False, 0 + + max_gap_min = max(g['gap_min'] for g in gaps) + is_dark = max_gap_min >= 30 # 30분 이상 소실 + return is_dark, int(max_gap_min) diff --git a/prediction/algorithms/fishing_pattern.py b/prediction/algorithms/fishing_pattern.py new file mode 100644 index 0000000..c2815ec --- /dev/null +++ b/prediction/algorithms/fishing_pattern.py @@ -0,0 +1,117 @@ +from __future__ import annotations + +import pandas as pd +from algorithms.location import haversine_nm, classify_zone # noqa: F401 (haversine_nm re-exported for callers) + +# Yan et al. (2022) 어구별 조업 속도 임계값 +GEAR_SOG_THRESHOLDS: dict[str, tuple[float, float]] = { + 'PT': (2.5, 4.5), # 쌍끌이저인망 + 'OT': (2.0, 4.0), # 단선저인망 + 'GN': (0.5, 2.5), # 자망·유망 + 'SQ': (0.0, 1.0), # 오징어채낚기 + 'TRAP': (0.3, 1.5), # 통발 + 'PS': (3.0, 6.0), # 선망 + 'TRAWL': (2.0, 4.5), # (alias) + 'PURSE': (3.0, 6.0), # (alias) + 'LONGLINE': (0.5, 2.5), +} +TRANSIT_SOG_MIN = 5.0 +ANCHORED_SOG_MAX = 0.5 + + +def classify_vessel_state(sog: float, cog_delta: float = 0.0, + gear_type: str = 'PT') -> str: + """UCAF: 어구별 상태 분류.""" + if sog <= ANCHORED_SOG_MAX: + return 'ANCHORED' + if sog >= TRANSIT_SOG_MIN: + return 'TRANSIT' + sog_min, sog_max = GEAR_SOG_THRESHOLDS.get(gear_type, (1.0, 5.0)) + if sog_min <= sog <= sog_max: + return 'FISHING' + return 'UNKNOWN' + + +def compute_ucaf_score(df_vessel: pd.DataFrame, gear_type: str = 'PT') -> float: + """UCAF 점수: 어구별 조업 상태 비율 (0~1).""" + if len(df_vessel) == 0: + return 0.0 + sog_min, sog_max = GEAR_SOG_THRESHOLDS.get(gear_type, (1.0, 5.0)) + in_range = df_vessel['sog'].between(sog_min, sog_max).sum() + return round(in_range / len(df_vessel), 4) + + +def compute_ucft_score(df_vessel: pd.DataFrame) -> float: + """UCFT 점수: 조업 vs 항행 이진 신뢰도 (0~1).""" + if len(df_vessel) == 0: + return 0.0 + fishing = (df_vessel['sog'].between(0.5, 5.0)).sum() + transit = (df_vessel['sog'] >= TRANSIT_SOG_MIN).sum() + total = fishing + transit + if total == 0: + return 0.0 + return round(fishing / total, 4) + + +def detect_fishing_segments(df_vessel: pd.DataFrame, + window_min: int = 15, + gear_type: str = 'PT') -> list[dict]: + """연속 조업 구간 추출.""" + if len(df_vessel) < 2: + return [] + + segments: list[dict] = [] + in_fishing = False + seg_start_idx = 0 + + records = df_vessel.to_dict('records') + for i, rec in enumerate(records): + sog = rec.get('sog', 0) + state = classify_vessel_state(sog, gear_type=gear_type) + + if state == 'FISHING' and not in_fishing: + in_fishing = True + seg_start_idx = i + elif state != 'FISHING' and in_fishing: + start_ts = records[seg_start_idx].get('timestamp') + end_ts = rec.get('timestamp') + if start_ts and end_ts: + dur_sec = (pd.Timestamp(end_ts) - pd.Timestamp(start_ts)).total_seconds() + dur_min = dur_sec / 60 + if dur_min >= window_min: + zone_info = classify_zone( + records[seg_start_idx].get('lat', 0), + records[seg_start_idx].get('lon', 0), + ) + segments.append({ + 'start_idx': seg_start_idx, + 'end_idx': i - 1, + 'duration_min': round(dur_min, 1), + 'zone': zone_info.get('zone', 'UNKNOWN'), + 'in_territorial_sea': zone_info.get('zone') == 'TERRITORIAL_SEA', + }) + in_fishing = False + + return segments + + +def detect_trawl_uturn(df_vessel: pd.DataFrame, + uturn_threshold_deg: float = 150.0, + min_uturn_count: int = 3) -> dict: + """U-turn 왕복 패턴 감지 (저인망 특징).""" + if len(df_vessel) < 2: + return {'uturn_count': 0, 'trawl_suspected': False} + + uturn_count = 0 + cog_vals = df_vessel['cog'].values + sog_vals = df_vessel['sog'].values + + for i in range(1, len(cog_vals)): + delta = abs((cog_vals[i] - cog_vals[i - 1] + 180) % 360 - 180) + if delta >= uturn_threshold_deg and sog_vals[i] < TRANSIT_SOG_MIN: + uturn_count += 1 + + return { + 'uturn_count': uturn_count, + 'trawl_suspected': uturn_count >= min_uturn_count, + } diff --git a/prediction/algorithms/fleet.py b/prediction/algorithms/fleet.py new file mode 100644 index 0000000..22ccd0c --- /dev/null +++ b/prediction/algorithms/fleet.py @@ -0,0 +1,152 @@ +import math +import logging + +import numpy as np +import pandas as pd +from algorithms.location import haversine_nm, dist_to_baseline, EARTH_RADIUS_NM + +logger = logging.getLogger(__name__) + + +def detect_group_clusters( + vessel_snapshots: list[dict], + spatial_eps_nm: float = 10.0, + time_eps_hours: float = 2.0, + min_vessels: int = 3, +) -> dict[int, list[dict]]: + """DBSCAN 시공간 클러스터링으로 집단 탐지.""" + if len(vessel_snapshots) < min_vessels: + return {} + + try: + from sklearn.cluster import DBSCAN + except ImportError: + logger.warning('sklearn not available for DBSCAN clustering') + return {} + + lat_rad = [math.radians(v['lat']) * EARTH_RADIUS_NM for v in vessel_snapshots] + lon_rad = [math.radians(v['lon']) * EARTH_RADIUS_NM for v in vessel_snapshots] + + # 시간을 NM 단위로 정규화 + timestamps = [pd.Timestamp(v['timestamp']).timestamp() for v in vessel_snapshots] + t_min = min(timestamps) + time_nm = [(t - t_min) / 3600 * 10 / time_eps_hours for t in timestamps] + + X = np.array(list(zip(lat_rad, lon_rad, time_nm))) + + db = DBSCAN(eps=spatial_eps_nm, min_samples=min_vessels, metric='euclidean').fit(X) + + clusters: dict[int, list[dict]] = {} + for idx, label in enumerate(db.labels_): + if label == -1: + continue + clusters.setdefault(int(label), []).append(vessel_snapshots[idx]) + + return clusters + + +def identify_lead_vessel(cluster_vessels: list[dict]) -> dict: + """5기준 스코어링으로 대표선 특정.""" + if not cluster_vessels: + return {} + + scores: dict[str, float] = {} + + timestamps = [pd.Timestamp(v.get('timestamp', 0)).timestamp() for v in cluster_vessels] + min_ts = min(timestamps) if timestamps else 0 + + lats = [v['lat'] for v in cluster_vessels] + lons = [v['lon'] for v in cluster_vessels] + centroid_lat = float(np.mean(lats)) + centroid_lon = float(np.mean(lons)) + + for i, v in enumerate(cluster_vessels): + mmsi = v['mmsi'] + s = 0.0 + + # 기준 1: 최초 시각 (30점) + ts_rank = timestamps[i] - min_ts + s += 30.0 * (1.0 - min(ts_rank, 7200) / 7200) + + # 기준 2: 총톤수 (25점) — 외부 DB 연동 전까지 균등 배점 + s += 12.5 + + # 기준 3: 클러스터 중심 근접성 (20점) + dist_center = haversine_nm(v['lat'], v['lon'], centroid_lat, centroid_lon) + s += 20.0 * (1.0 - min(dist_center, 10) / 10) + + # 기준 4: 기선 최근접 (15점) + dist_base = dist_to_baseline(v['lat'], v['lon']) + s += 15.0 * (1.0 - min(dist_base, 12) / 12) + + # 기준 5: AIS 소실 이력 (10점) — 이력 없으면 만점 + s += 10.0 + + scores[mmsi] = round(s, 2) + + lead_mmsi = max(scores, key=lambda k: scores[k]) + score_vals = sorted(scores.values(), reverse=True) + + if len(score_vals) > 1 and score_vals[0] - score_vals[1] > 15: + confidence = 'HIGH' + elif len(score_vals) > 1 and score_vals[0] - score_vals[1] > 8: + confidence = 'MED' + else: + confidence = 'LOW' + + return { + 'lead_mmsi': lead_mmsi, + 'lead_score': scores[lead_mmsi], + 'all_scores': scores, + 'confidence': confidence, + } + + +def assign_fleet_roles( + vessel_dfs: dict[str, pd.DataFrame], + cluster_map: dict[str, int], +) -> dict[str, dict]: + """선단 역할 할당: LEADER/MEMBER/NOISE.""" + results: dict[str, dict] = {} + + # 클러스터별 그룹핑 + clusters: dict[int, list[str]] = {} + for mmsi, cid in cluster_map.items(): + clusters.setdefault(cid, []).append(mmsi) + + for cid, mmsi_list in clusters.items(): + if cid == -1: + for mmsi in mmsi_list: + results[mmsi] = { + 'cluster_size': 0, + 'is_leader': False, + 'fleet_role': 'NOISE', + } + continue + + cluster_size = len(mmsi_list) + + # 스냅샷 생성 (각 선박의 마지막 포인트) + snapshots: list[dict] = [] + for mmsi in mmsi_list: + df = vessel_dfs.get(mmsi) + if df is not None and len(df) > 0: + last = df.iloc[-1] + snapshots.append({ + 'mmsi': mmsi, + 'lat': last['lat'], + 'lon': last['lon'], + 'timestamp': last.get('timestamp', pd.Timestamp.now()), + }) + + lead_info = identify_lead_vessel(snapshots) if len(snapshots) >= 2 else {} + lead_mmsi = lead_info.get('lead_mmsi') + + for mmsi in mmsi_list: + results[mmsi] = { + 'cluster_size': cluster_size, + 'is_leader': mmsi == lead_mmsi, + 'fleet_role': 'LEADER' if mmsi == lead_mmsi else 'MEMBER', + } + + return results diff --git a/prediction/algorithms/location.py b/prediction/algorithms/location.py new file mode 100644 index 0000000..44ccf86 --- /dev/null +++ b/prediction/algorithms/location.py @@ -0,0 +1,93 @@ +from __future__ import annotations + +import json +import math +from pathlib import Path +from typing import List, Optional, Tuple + +EARTH_RADIUS_NM = 3440.065 +TERRITORIAL_SEA_NM = 12.0 +CONTIGUOUS_ZONE_NM = 24.0 + +_baseline_points: Optional[List[Tuple[float, float]]] = None + + +def _load_baseline() -> List[Tuple[float, float]]: + global _baseline_points + if _baseline_points is not None: + return _baseline_points + path = Path(__file__).parent.parent / 'data' / 'korea_baseline.json' + with open(path, 'r') as f: + data = json.load(f) + _baseline_points = [(p['lat'], p['lon']) for p in data['points']] + return _baseline_points + + +def haversine_nm(lat1: float, lon1: float, lat2: float, lon2: float) -> float: + """두 좌표 간 거리 (해리).""" + R = EARTH_RADIUS_NM + phi1, phi2 = math.radians(lat1), math.radians(lat2) + dphi = math.radians(lat2 - lat1) + dlam = math.radians(lon2 - lon1) + a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2 + return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) + + +def dist_to_baseline(vessel_lat: float, vessel_lon: float, + baseline_points: Optional[List[Tuple[float, float]]] = None) -> float: + """선박 좌표에서 기선까지 최소 거리 (NM).""" + if baseline_points is None: + baseline_points = _load_baseline() + min_dist = float('inf') + for bp_lat, bp_lon in baseline_points: + d = haversine_nm(vessel_lat, vessel_lon, bp_lat, bp_lon) + if d < min_dist: + min_dist = d + return min_dist + + +def classify_zone(vessel_lat: float, vessel_lon: float) -> dict: + """선박 위치 수역 분류.""" + dist = dist_to_baseline(vessel_lat, vessel_lon) + + if dist <= TERRITORIAL_SEA_NM: + return { + 'zone': 'TERRITORIAL_SEA', + 'dist_from_baseline_nm': round(dist, 2), + 'violation': True, + 'alert_level': 'CRITICAL', + } + elif dist <= CONTIGUOUS_ZONE_NM: + return { + 'zone': 'CONTIGUOUS_ZONE', + 'dist_from_baseline_nm': round(dist, 2), + 'violation': False, + 'alert_level': 'WATCH', + } + else: + return { + 'zone': 'EEZ_OR_BEYOND', + 'dist_from_baseline_nm': round(dist, 2), + 'violation': False, + 'alert_level': 'NORMAL', + } + + +def bd09_to_wgs84(bd_lat: float, bd_lon: float) -> tuple[float, float]: + """BD-09 좌표계를 WGS84로 변환.""" + x = bd_lon - 0.0065 + y = bd_lat - 0.006 + z = math.sqrt(x ** 2 + y ** 2) - 0.00002 * math.sin(y * 52.35987756) + theta = math.atan2(y, x) - 0.000003 * math.cos(x * 52.35987756) + gcj_lon = z * math.cos(theta) + gcj_lat = z * math.sin(theta) + wgs_lat = gcj_lat - 0.0023 + wgs_lon = gcj_lon - 0.0059 + return wgs_lat, wgs_lon + + +def compute_bd09_offset(lat: float, lon: float) -> float: + """BD09 좌표와 WGS84 좌표 간 오프셋 (미터).""" + wgs_lat, wgs_lon = bd09_to_wgs84(lat, lon) + dist_nm = haversine_nm(lat, lon, wgs_lat, wgs_lon) + return round(dist_nm * 1852.0, 1) # NM to meters diff --git a/prediction/algorithms/risk.py b/prediction/algorithms/risk.py new file mode 100644 index 0000000..b11b3c0 --- /dev/null +++ b/prediction/algorithms/risk.py @@ -0,0 +1,75 @@ +from typing import Optional, Tuple + +import pandas as pd +from algorithms.location import classify_zone +from algorithms.fishing_pattern import detect_fishing_segments, detect_trawl_uturn +from algorithms.dark_vessel import detect_ais_gaps +from algorithms.spoofing import detect_teleportation + + +def compute_vessel_risk_score( + mmsi: str, + df_vessel: pd.DataFrame, + zone_info: Optional[dict] = None, + is_permitted: Optional[bool] = None, +) -> Tuple[int, str]: + """선박별 종합 위반 위험도 (0~100점). + + Returns: (risk_score, risk_level) + """ + if len(df_vessel) == 0: + return 0, 'LOW' + + score = 0 + + # 1. 위치 기반 (최대 40점) + if zone_info is None: + last = df_vessel.iloc[-1] + zone_info = classify_zone(last['lat'], last['lon']) + + zone = zone_info.get('zone', '') + if zone == 'TERRITORIAL_SEA': + score += 40 + elif zone == 'CONTIGUOUS_ZONE': + score += 10 + + # 2. 조업 행위 (최대 30점) + segs = detect_fishing_segments(df_vessel) + ts_fishing = [s for s in segs if s.get('in_territorial_sea')] + if ts_fishing: + score += 20 + elif segs: + score += 5 + + uturn = detect_trawl_uturn(df_vessel) + if uturn.get('trawl_suspected'): + score += 10 + + # 3. AIS 조작 (최대 35점) + teleports = detect_teleportation(df_vessel) + if teleports: + score += 20 + + gaps = detect_ais_gaps(df_vessel) + critical_gaps = [g for g in gaps if g['gap_min'] >= 60] + if critical_gaps: + score += 15 + elif gaps: + score += 5 + + # 4. 허가 이력 (최대 20점) + if is_permitted is not None and not is_permitted: + score += 20 + + score = min(score, 100) + + if score >= 70: + level = 'CRITICAL' + elif score >= 50: + level = 'HIGH' + elif score >= 30: + level = 'MEDIUM' + else: + level = 'LOW' + + return score, level diff --git a/prediction/algorithms/spoofing.py b/prediction/algorithms/spoofing.py new file mode 100644 index 0000000..e2ec081 --- /dev/null +++ b/prediction/algorithms/spoofing.py @@ -0,0 +1,80 @@ +import pandas as pd +from algorithms.location import haversine_nm, bd09_to_wgs84, compute_bd09_offset # noqa: F401 + +MAX_FISHING_SPEED_KNOTS = 25.0 + + +def detect_teleportation(df_vessel: pd.DataFrame, + max_speed_knots: float = MAX_FISHING_SPEED_KNOTS) -> list[dict]: + """연속 AIS 포인트 간 물리적 불가능 이동 탐지.""" + if len(df_vessel) < 2: + return [] + + anomalies = [] + records = df_vessel.sort_values('timestamp').to_dict('records') + + for i in range(1, len(records)): + prev, curr = records[i - 1], records[i] + dist_nm = haversine_nm(prev['lat'], prev['lon'], curr['lat'], curr['lon']) + dt_hours = ( + pd.Timestamp(curr['timestamp']) - pd.Timestamp(prev['timestamp']) + ).total_seconds() / 3600 + + if dt_hours <= 0: + continue + + implied_speed = dist_nm / dt_hours + + if implied_speed > max_speed_knots: + anomalies.append({ + 'idx': i, + 'dist_nm': round(dist_nm, 2), + 'implied_kn': round(implied_speed, 1), + 'type': 'TELEPORTATION', + 'confidence': 'HIGH' if implied_speed > 50 else 'MED', + }) + + return anomalies + + +def count_speed_jumps(df_vessel: pd.DataFrame, threshold_knots: float = 10.0) -> int: + """연속 SOG 급변 횟수.""" + if len(df_vessel) < 2: + return 0 + + sog = df_vessel['sog'].values + jumps = 0 + for i in range(1, len(sog)): + if abs(sog[i] - sog[i - 1]) > threshold_knots: + jumps += 1 + return jumps + + +def compute_spoofing_score(df_vessel: pd.DataFrame) -> float: + """종합 GPS 스푸핑 점수 (0~1).""" + if len(df_vessel) < 2: + return 0.0 + + score = 0.0 + n = len(df_vessel) + + # 순간이동 비율 + teleports = detect_teleportation(df_vessel) + if teleports: + score += min(0.4, len(teleports) / n * 10) + + # SOG 급변 비율 + jumps = count_speed_jumps(df_vessel) + if jumps > 0: + score += min(0.3, jumps / n * 5) + + # BD09 오프셋 (중국 좌표 사용 의심) + mid_idx = len(df_vessel) // 2 + row = df_vessel.iloc[mid_idx] + offset = compute_bd09_offset(row['lat'], row['lon']) + if offset > 300: # 300m 이상 + score += 0.3 + elif offset > 100: + score += 0.1 + + return round(min(score, 1.0), 4) diff --git a/prediction/cache/__init__.py b/prediction/cache/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/prediction/cache/vessel_store.py b/prediction/cache/vessel_store.py new file mode 100644 index 0000000..8a59536 --- /dev/null +++ b/prediction/cache/vessel_store.py @@ -0,0 +1,335 @@ +import logging +from datetime import datetime, timezone +from typing import Optional + +import numpy as np +import pandas as pd + +logger = logging.getLogger(__name__) + +_STATIC_REFRESH_INTERVAL_MIN = 60 +_PERMIT_REFRESH_INTERVAL_MIN = 30 +_EARTH_RADIUS_NM = 3440.065 +_MAX_REASONABLE_SOG = 30.0 +_CHINESE_MMSI_PREFIX = '412' + + +def _compute_sog_cog(df: pd.DataFrame) -> pd.DataFrame: + """Compute SOG (knots) and COG (degrees) from consecutive lat/lon/timestamp points.""" + df = df.sort_values(['mmsi', 'timestamp']).copy() + + lat1 = np.radians(df['lat'].values[:-1]) + lon1 = np.radians(df['lon'].values[:-1]) + lat2 = np.radians(df['lat'].values[1:]) + lon2 = np.radians(df['lon'].values[1:]) + + # Haversine distance (nautical miles) + dlat = lat2 - lat1 + dlon = lon2 - lon1 + a = np.sin(dlat / 2) ** 2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2) ** 2 + dist_nm = _EARTH_RADIUS_NM * 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a)) + + # Time difference (hours) + ts = df['timestamp'].values + dt_sec = (ts[1:] - ts[:-1]).astype('timedelta64[s]').astype(float) + dt_hours = dt_sec / 3600.0 + dt_hours[dt_hours <= 0] = np.nan + + # SOG = dist / time (knots) + computed_sog = dist_nm / dt_hours + computed_sog = np.clip(np.nan_to_num(computed_sog, nan=0.0), 0, _MAX_REASONABLE_SOG) + + # COG = bearing (degrees) + x = np.sin(dlon) * np.cos(lat2) + y = np.cos(lat1) * np.sin(lat2) - np.sin(lat1) * np.cos(lat2) * np.cos(dlon) + bearing = (np.degrees(np.arctan2(x, y)) + 360) % 360 + + # Append last value (copy from previous) + sog_arr = np.append(computed_sog, computed_sog[-1:] if len(computed_sog) > 0 else [0]) + cog_arr = np.append(bearing, bearing[-1:] if len(bearing) > 0 else [0]) + + # Reset at MMSI boundaries + mmsi_vals = df['mmsi'].values + boundary = np.where(mmsi_vals[:-1] != mmsi_vals[1:])[0] + for idx in boundary: + sog_arr[idx + 1] = df['raw_sog'].iloc[idx + 1] if 'raw_sog' in df.columns else 0 + cog_arr[idx + 1] = 0 + + # Where computed SOG is 0 or NaN, fall back to raw_sog + df['sog'] = sog_arr + if 'raw_sog' in df.columns: + mask = (df['sog'] == 0) | np.isnan(df['sog']) + df.loc[mask, 'sog'] = df.loc[mask, 'raw_sog'].fillna(0) + + df['cog'] = cog_arr + return df + + +class VesselStore: + """In-memory vessel trajectory store for Korean waters vessel data. + + Maintains a 24-hour sliding window of all vessel tracks and supports + incremental 5-minute updates. Chinese vessel (MMSI 412*) filtering + is applied only at analysis target selection time. + """ + + def __init__(self) -> None: + self._tracks: dict[str, pd.DataFrame] = {} + self._last_bucket: Optional[datetime] = None + self._static_info: dict[str, dict] = {} + self._permit_set: set[str] = set() + self._static_refreshed_at: Optional[datetime] = None + self._permit_refreshed_at: Optional[datetime] = None + + # ------------------------------------------------------------------ + # Public load / update methods + # ------------------------------------------------------------------ + + def load_initial(self, hours: int = 24) -> None: + """Load all Korean waters vessel data for the past N hours. + + Fetches a bulk DataFrame from snpdb, groups by MMSI, and stores + each vessel's track separately. Also triggers static info and + permit registry refresh. + """ + from db import snpdb + + logger.info('loading initial vessel tracks (last %dh)...', hours) + try: + df_all = snpdb.fetch_all_tracks(hours) + except Exception as e: + logger.error('fetch_all_tracks failed: %s', e) + return + + if df_all.empty: + logger.warning('fetch_all_tracks returned empty DataFrame') + return + + # Rename sog column to raw_sog to preserve original AIS-reported speed + if 'sog' in df_all.columns and 'raw_sog' not in df_all.columns: + df_all = df_all.rename(columns={'sog': 'raw_sog'}) + + self._tracks = {} + for mmsi, group in df_all.groupby('mmsi'): + self._tracks[str(mmsi)] = group.reset_index(drop=True) + + vessel_count = len(self._tracks) + point_count = sum(len(v) for v in self._tracks.values()) + logger.info( + 'initial load complete: %d vessels, %d total points', + vessel_count, + point_count, + ) + + self.refresh_static_info() + self.refresh_permit_registry() + + def merge_incremental(self, df_new: pd.DataFrame) -> None: + """Merge a new batch of vessel positions into the in-memory store. + + Deduplicates by timestamp within each MMSI and updates _last_bucket. + """ + if df_new.empty: + logger.debug('merge_incremental called with empty DataFrame, skipping') + return + + if 'sog' in df_new.columns and 'raw_sog' not in df_new.columns: + df_new = df_new.rename(columns={'sog': 'raw_sog'}) + + new_buckets: list[datetime] = [] + + for mmsi, group in df_new.groupby('mmsi'): + mmsi_str = str(mmsi) + if mmsi_str in self._tracks: + combined = pd.concat([self._tracks[mmsi_str], group], ignore_index=True) + combined = combined.drop_duplicates(subset=['timestamp']) + self._tracks[mmsi_str] = combined.reset_index(drop=True) + else: + self._tracks[mmsi_str] = group.reset_index(drop=True) + + if 'time_bucket' in group.columns and not group['time_bucket'].empty: + bucket_vals = pd.to_datetime(group['time_bucket'].dropna()) + if not bucket_vals.empty: + new_buckets.append(bucket_vals.max().to_pydatetime()) + + if new_buckets: + latest = max(new_buckets) + if isinstance(latest, datetime) and latest.tzinfo is None: + latest = latest.replace(tzinfo=timezone.utc) + if self._last_bucket is None or latest > self._last_bucket: + self._last_bucket = latest + + logger.debug( + 'incremental merge done: %d mmsis in batch, store has %d vessels', + df_new['mmsi'].nunique(), + len(self._tracks), + ) + + def evict_stale(self, hours: int = 24) -> None: + """Remove track points older than N hours and evict empty MMSI entries.""" + import datetime as _dt + + now = datetime.now(timezone.utc) + cutoff_aware = now - _dt.timedelta(hours=hours) + cutoff_naive = cutoff_aware.replace(tzinfo=None) + + before_total = sum(len(v) for v in self._tracks.values()) + evicted_mmsis: list[str] = [] + + for mmsi in list(self._tracks.keys()): + df = self._tracks[mmsi] + ts_col = df['timestamp'] + # Handle tz-aware and tz-naive timestamps uniformly + if hasattr(ts_col.dtype, 'tz') and ts_col.dtype.tz is not None: + mask = ts_col >= pd.Timestamp(cutoff_aware) + else: + mask = ts_col >= pd.Timestamp(cutoff_naive) + filtered = df[mask].reset_index(drop=True) + if filtered.empty: + del self._tracks[mmsi] + evicted_mmsis.append(mmsi) + else: + self._tracks[mmsi] = filtered + + after_total = sum(len(v) for v in self._tracks.values()) + logger.info( + 'eviction complete: removed %d points, evicted %d mmsis (threshold=%dh)', + before_total - after_total, + len(evicted_mmsis), + hours, + ) + + def refresh_static_info(self) -> None: + """Fetch vessel static info (type, name, dimensions) from snpdb. + + Skips refresh if called within the last 60 minutes. + """ + now = datetime.now(timezone.utc) + if self._static_refreshed_at is not None: + elapsed_min = (now - self._static_refreshed_at).total_seconds() / 60 + if elapsed_min < _STATIC_REFRESH_INTERVAL_MIN: + logger.debug( + 'static info refresh skipped (%.1f min since last refresh)', + elapsed_min, + ) + return + + if not self._tracks: + logger.debug('no tracks in store, skipping static info refresh') + return + + from db import snpdb + + mmsi_list = list(self._tracks.keys()) + try: + info = snpdb.fetch_static_info(mmsi_list) + self._static_info.update(info) + self._static_refreshed_at = now + logger.info('static info refreshed: %d vessels', len(info)) + except Exception as e: + logger.error('fetch_static_info failed: %s', e) + + def refresh_permit_registry(self) -> None: + """Fetch permitted Chinese fishing vessel MMSIs from snpdb. + + Skips refresh if called within the last 30 minutes. + """ + now = datetime.now(timezone.utc) + if self._permit_refreshed_at is not None: + elapsed_min = (now - self._permit_refreshed_at).total_seconds() / 60 + if elapsed_min < _PERMIT_REFRESH_INTERVAL_MIN: + logger.debug( + 'permit registry refresh skipped (%.1f min since last refresh)', + elapsed_min, + ) + return + + from db import snpdb + + try: + mmsis = snpdb.fetch_permit_mmsis() + self._permit_set = set(mmsis) + self._permit_refreshed_at = now + logger.info('permit registry refreshed: %d permitted vessels', len(self._permit_set)) + except Exception as e: + logger.error('fetch_permit_mmsis failed: %s', e) + + # ------------------------------------------------------------------ + # Analysis target selection + # ------------------------------------------------------------------ + + def select_analysis_targets(self) -> pd.DataFrame: + """Build a combined DataFrame of Chinese vessel tracks with computed SOG/COG. + + Filters to MMSI starting with '412', computes SOG and COG from + consecutive lat/lon/timestamp pairs using the haversine formula, + and falls back to raw_sog where computed values are zero or NaN. + + Returns: + DataFrame with columns: mmsi, timestamp, lat, lon, sog, cog + """ + chinese_mmsis = [m for m in self._tracks if m.startswith(_CHINESE_MMSI_PREFIX)] + if not chinese_mmsis: + logger.info('no Chinese vessels (412*) found in store') + return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'sog', 'cog']) + + frames = [self._tracks[m] for m in chinese_mmsis] + combined = pd.concat(frames, ignore_index=True) + + required_cols = {'mmsi', 'timestamp', 'lat', 'lon'} + missing = required_cols - set(combined.columns) + if missing: + logger.error('combined DataFrame missing required columns: %s', missing) + return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'sog', 'cog']) + + result = _compute_sog_cog(combined) + + output_cols = ['mmsi', 'timestamp', 'lat', 'lon', 'sog', 'cog'] + available = [c for c in output_cols if c in result.columns] + return result[available].reset_index(drop=True) + + # ------------------------------------------------------------------ + # Lookup helpers + # ------------------------------------------------------------------ + + def is_permitted(self, mmsi: str) -> bool: + """Return True if the given MMSI is in the permitted Chinese fishing vessel registry.""" + return mmsi in self._permit_set + + def get_vessel_info(self, mmsi: str) -> dict: + """Return static vessel info dict for the given MMSI, or empty dict if not found.""" + return self._static_info.get(mmsi, {}) + + # ------------------------------------------------------------------ + # Properties + # ------------------------------------------------------------------ + + @property + def last_bucket(self) -> Optional[datetime]: + """Return the latest time bucket seen across all merged incremental batches.""" + return self._last_bucket + + # ------------------------------------------------------------------ + # Diagnostics + # ------------------------------------------------------------------ + + def stats(self) -> dict: + """Return store statistics for health/status reporting.""" + total_points = sum(len(v) for v in self._tracks.values()) + chinese_count = sum(1 for m in self._tracks if m.startswith(_CHINESE_MMSI_PREFIX)) + + # Rough memory estimate: each row ~200 bytes across columns + memory_mb = round((total_points * 200) / (1024 * 1024), 2) + + return { + 'vessels': len(self._tracks), + 'points': total_points, + 'memory_mb': memory_mb, + 'last_bucket': self._last_bucket.isoformat() if self._last_bucket else None, + 'targets': chinese_count, + 'permitted': len(self._permit_set), + } + + +# Module-level singleton +vessel_store = VesselStore() diff --git a/prediction/config.py b/prediction/config.py new file mode 100644 index 0000000..7414022 --- /dev/null +++ b/prediction/config.py @@ -0,0 +1,40 @@ +from pydantic_settings import BaseSettings + + +class Settings(BaseSettings): + # snpdb (궤적 데이터 소스) + SNPDB_HOST: str = '211.208.115.83' + SNPDB_PORT: int = 5432 + SNPDB_NAME: str = 'snpdb' + SNPDB_USER: str = 'snp' + SNPDB_PASSWORD: str + + # kcgdb (분석 결과 저장) + KCGDB_HOST: str = '211.208.115.83' + KCGDB_PORT: int = 5432 + KCGDB_NAME: str = 'kcgdb' + KCGDB_SCHEMA: str = 'kcg' + KCGDB_USER: str = 'kcg_app' + KCGDB_PASSWORD: str + + # 스케줄러 + SCHEDULER_INTERVAL_MIN: int = 5 + + # 인메모리 캐시 + CACHE_WINDOW_HOURS: int = 24 + INITIAL_LOAD_HOURS: int = 24 + STATIC_INFO_REFRESH_MIN: int = 60 + PERMIT_REFRESH_MIN: int = 30 + + # 파이프라인 + TRAJECTORY_HOURS: int = 6 + MMSI_PREFIX: str = '412' + MIN_TRAJ_POINTS: int = 100 + + # 로깅 + LOG_LEVEL: str = 'INFO' + + model_config = {'env_file': '.env', 'env_file_encoding': 'utf-8', 'extra': 'ignore'} + + +settings = Settings() diff --git a/prediction/data/korea_baseline.json b/prediction/data/korea_baseline.json new file mode 100644 index 0000000..9b20cd7 --- /dev/null +++ b/prediction/data/korea_baseline.json @@ -0,0 +1 @@ +{"points": [{"lat": 37.0, "lon": 124.0}, {"lat": 35.0, "lon": 129.0}]} \ No newline at end of file diff --git a/prediction/db/__init__.py b/prediction/db/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/prediction/db/kcgdb.py b/prediction/db/kcgdb.py new file mode 100644 index 0000000..1014d3d --- /dev/null +++ b/prediction/db/kcgdb.py @@ -0,0 +1,134 @@ +import logging +from contextlib import contextmanager +from typing import TYPE_CHECKING, Optional + +import psycopg2 +from psycopg2 import pool +from psycopg2.extras import execute_values + +from config import settings + +if TYPE_CHECKING: + from models.result import AnalysisResult + +logger = logging.getLogger(__name__) + +_pool: Optional[pool.ThreadedConnectionPool] = None + + +def init_pool(): + global _pool + _pool = pool.ThreadedConnectionPool( + minconn=1, + maxconn=3, + host=settings.KCGDB_HOST, + port=settings.KCGDB_PORT, + dbname=settings.KCGDB_NAME, + user=settings.KCGDB_USER, + password=settings.KCGDB_PASSWORD, + options=f'-c search_path={settings.KCGDB_SCHEMA},public', + ) + logger.info('kcgdb connection pool initialized') + + +def close_pool(): + global _pool + if _pool: + _pool.closeall() + _pool = None + logger.info('kcgdb connection pool closed') + + +@contextmanager +def get_conn(): + conn = _pool.getconn() + try: + yield conn + except Exception: + conn.rollback() + raise + finally: + _pool.putconn(conn) + + +def check_health() -> bool: + try: + with get_conn() as conn: + with conn.cursor() as cur: + cur.execute('SELECT 1') + return True + except Exception as e: + logger.error('kcgdb health check failed: %s', e) + return False + + +def upsert_results(results: list['AnalysisResult']) -> int: + """분석 결과를 vessel_analysis_results 테이블에 upsert.""" + if not results: + return 0 + + insert_sql = """ + INSERT INTO vessel_analysis_results ( + mmsi, timestamp, vessel_type, confidence, fishing_pct, + cluster_id, season, zone, dist_to_baseline_nm, activity_state, + ucaf_score, ucft_score, is_dark, gap_duration_min, + spoofing_score, bd09_offset_m, speed_jump_count, + cluster_size, is_leader, fleet_role, + risk_score, risk_level, features, analyzed_at + ) VALUES %s + ON CONFLICT (mmsi, timestamp) DO UPDATE SET + vessel_type = EXCLUDED.vessel_type, + confidence = EXCLUDED.confidence, + fishing_pct = EXCLUDED.fishing_pct, + cluster_id = EXCLUDED.cluster_id, + season = EXCLUDED.season, + zone = EXCLUDED.zone, + dist_to_baseline_nm = EXCLUDED.dist_to_baseline_nm, + activity_state = EXCLUDED.activity_state, + ucaf_score = EXCLUDED.ucaf_score, + ucft_score = EXCLUDED.ucft_score, + is_dark = EXCLUDED.is_dark, + gap_duration_min = EXCLUDED.gap_duration_min, + spoofing_score = EXCLUDED.spoofing_score, + bd09_offset_m = EXCLUDED.bd09_offset_m, + speed_jump_count = EXCLUDED.speed_jump_count, + cluster_size = EXCLUDED.cluster_size, + is_leader = EXCLUDED.is_leader, + fleet_role = EXCLUDED.fleet_role, + risk_score = EXCLUDED.risk_score, + risk_level = EXCLUDED.risk_level, + features = EXCLUDED.features, + analyzed_at = EXCLUDED.analyzed_at + """ + + try: + with get_conn() as conn: + with conn.cursor() as cur: + tuples = [r.to_db_tuple() for r in results] + execute_values(cur, insert_sql, tuples, page_size=100) + conn.commit() + count = len(tuples) + logger.info('upserted %d analysis results', count) + return count + except Exception as e: + logger.error('failed to upsert results: %s', e) + return 0 + + +def cleanup_old(hours: int = 48) -> int: + """오래된 분석 결과 삭제.""" + try: + with get_conn() as conn: + with conn.cursor() as cur: + cur.execute( + 'DELETE FROM vessel_analysis_results WHERE analyzed_at < NOW() - (%s * INTERVAL \'1 hour\')', + (hours,), + ) + deleted = cur.rowcount + conn.commit() + if deleted > 0: + logger.info('cleaned up %d old results (older than %dh)', deleted, hours) + return deleted + except Exception as e: + logger.error('failed to cleanup old results: %s', e) + return 0 diff --git a/prediction/db/snpdb.py b/prediction/db/snpdb.py new file mode 100644 index 0000000..fda2397 --- /dev/null +++ b/prediction/db/snpdb.py @@ -0,0 +1,187 @@ +import logging +from contextlib import contextmanager +from datetime import datetime +from typing import Optional + +import pandas as pd +import psycopg2 +from psycopg2 import pool + +from config import settings + +logger = logging.getLogger(__name__) + +_pool: Optional[pool.ThreadedConnectionPool] = None + + +def init_pool(): + global _pool + _pool = pool.ThreadedConnectionPool( + minconn=1, + maxconn=3, + host=settings.SNPDB_HOST, + port=settings.SNPDB_PORT, + dbname=settings.SNPDB_NAME, + user=settings.SNPDB_USER, + password=settings.SNPDB_PASSWORD, + ) + logger.info('snpdb connection pool initialized') + + +def close_pool(): + global _pool + if _pool: + _pool.closeall() + _pool = None + logger.info('snpdb connection pool closed') + + +@contextmanager +def get_conn(): + conn = _pool.getconn() + try: + yield conn + finally: + _pool.putconn(conn) + + +def check_health() -> bool: + try: + with get_conn() as conn: + with conn.cursor() as cur: + cur.execute('SELECT 1') + return True + except Exception as e: + logger.error('snpdb health check failed: %s', e) + return False + + +def fetch_all_tracks(hours: int = 24) -> pd.DataFrame: + """한국 해역 전 선박의 궤적 포인트를 조회한다. + + LineStringM 지오메트리에서 개별 포인트를 추출하며, + 한국 해역(124-132E, 32-39N) 내 최근 N시간 데이터를 반환한다. + """ + query = f""" + SELECT + t.mmsi, + to_timestamp(ST_M((dp).geom)) as timestamp, + ST_Y((dp).geom) as lat, + ST_X((dp).geom) as lon, + CASE + WHEN (dp).path[1] = 1 THEN (t.start_position->>'sog')::float + ELSE COALESCE((t.end_position->>'sog')::float, t.avg_speed::float) + END as raw_sog + FROM signal.t_vessel_tracks_5min t, + LATERAL ST_DumpPoints(t.track_geom) dp + WHERE t.time_bucket >= NOW() - INTERVAL '{hours} hours' + AND t.track_geom && ST_MakeEnvelope(124, 32, 132, 39, 4326) + ORDER BY t.mmsi, to_timestamp(ST_M((dp).geom)) + """ + + try: + with get_conn() as conn: + df = pd.read_sql_query(query, conn) + logger.info( + 'fetch_all_tracks: %d rows, %d vessels (last %dh)', + len(df), + df['mmsi'].nunique() if len(df) > 0 else 0, + hours, + ) + return df + except Exception as e: + logger.error('fetch_all_tracks failed: %s', e) + return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'raw_sog']) + + +def fetch_incremental(last_bucket: datetime) -> pd.DataFrame: + """last_bucket 이후의 신규 궤적 포인트를 조회한다. + + 스케줄러 증분 업데이트에 사용되며, time_bucket > last_bucket 조건으로 + 이미 처리한 버킷을 건너뛴다. + """ + query = """ + SELECT + t.mmsi, + to_timestamp(ST_M((dp).geom)) as timestamp, + ST_Y((dp).geom) as lat, + ST_X((dp).geom) as lon, + CASE + WHEN (dp).path[1] = 1 THEN (t.start_position->>'sog')::float + ELSE COALESCE((t.end_position->>'sog')::float, t.avg_speed::float) + END as raw_sog + FROM signal.t_vessel_tracks_5min t, + LATERAL ST_DumpPoints(t.track_geom) dp + WHERE t.time_bucket > %s + AND t.track_geom && ST_MakeEnvelope(124, 32, 132, 39, 4326) + ORDER BY t.mmsi, to_timestamp(ST_M((dp).geom)) + """ + + try: + with get_conn() as conn: + df = pd.read_sql_query(query, conn, params=(last_bucket,)) + logger.info( + 'fetch_incremental: %d rows, %d vessels (since %s)', + len(df), + df['mmsi'].nunique() if len(df) > 0 else 0, + last_bucket.isoformat(), + ) + return df + except Exception as e: + logger.error('fetch_incremental failed: %s', e) + return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'raw_sog']) + + +def fetch_static_info(mmsi_list: list[str]) -> dict[str, dict]: + """MMSI 목록에 해당하는 선박 정적 정보를 조회한다. + + DISTINCT ON (mmsi)로 최신 레코드만 반환한다. + """ + query = """ + SELECT DISTINCT ON (mmsi) mmsi, name, vessel_type, length, width + FROM signal.t_vessel_static + WHERE mmsi = ANY(%s) + ORDER BY mmsi, time_bucket DESC + """ + + try: + with get_conn() as conn: + with conn.cursor() as cur: + cur.execute(query, (mmsi_list,)) + rows = cur.fetchall() + result = { + row[0]: { + 'name': row[1], + 'vessel_type': row[2], + 'length': row[3], + 'width': row[4], + } + for row in rows + } + logger.info('fetch_static_info: %d vessels resolved', len(result)) + return result + except Exception as e: + logger.error('fetch_static_info failed: %s', e) + return {} + + +def fetch_permit_mmsis() -> set[str]: + """중국 허가어선 MMSI 목록을 조회한다. + + signal.t_chnprmship_positions 테이블에서 DISTINCT mmsi를 반환한다. + """ + query = """ + SELECT DISTINCT mmsi FROM signal.t_chnprmship_positions + """ + + try: + with get_conn() as conn: + with conn.cursor() as cur: + cur.execute(query) + rows = cur.fetchall() + result = {row[0] for row in rows} + logger.info('fetch_permit_mmsis: %d permitted vessels', len(result)) + return result + except Exception as e: + logger.error('fetch_permit_mmsis failed: %s', e) + return set() diff --git a/prediction/env.example b/prediction/env.example new file mode 100644 index 0000000..c771eac --- /dev/null +++ b/prediction/env.example @@ -0,0 +1,25 @@ +# snpdb (궤적 데이터 소스) +SNPDB_HOST=211.208.115.83 +SNPDB_PORT=5432 +SNPDB_NAME=snpdb +SNPDB_USER=snp +SNPDB_PASSWORD=snp#8932 + +# kcgdb (분석 결과 저장) +KCGDB_HOST=211.208.115.83 +KCGDB_PORT=5432 +KCGDB_NAME=kcgdb +KCGDB_SCHEMA=kcg +KCGDB_USER=kcg_app +KCGDB_PASSWORD=Kcg2026monitor + +# 스케줄러 +SCHEDULER_INTERVAL_MIN=5 + +# 파이프라인 +TRAJECTORY_HOURS=6 +MMSI_PREFIX=412 +MIN_TRAJ_POINTS=100 + +# 로깅 +LOG_LEVEL=INFO diff --git a/prediction/main.py b/prediction/main.py index fbccfe3..4f717dd 100644 --- a/prediction/main.py +++ b/prediction/main.py @@ -1,8 +1,66 @@ -from fastapi import FastAPI +import logging +import sys +from contextlib import asynccontextmanager -app = FastAPI(title="KCG Prediction Service", version="0.1.0") +from fastapi import BackgroundTasks, FastAPI + +from config import settings +from db import kcgdb, snpdb +from scheduler import get_last_run, run_analysis_cycle, start_scheduler, stop_scheduler + +logging.basicConfig( + level=getattr(logging, settings.LOG_LEVEL, logging.INFO), + format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', + stream=sys.stdout, +) +logger = logging.getLogger(__name__) -@app.get("/health") +@asynccontextmanager +async def lifespan(application: FastAPI): + from cache.vessel_store import vessel_store + + logger.info('starting KCG Prediction Service') + snpdb.init_pool() + kcgdb.init_pool() + + # 인메모리 캐시 초기 로드 (24시간) + logger.info('loading initial vessel data (%dh)...', settings.INITIAL_LOAD_HOURS) + vessel_store.load_initial(settings.INITIAL_LOAD_HOURS) + logger.info('initial load complete: %s', vessel_store.stats()) + + start_scheduler() + yield + stop_scheduler() + snpdb.close_pool() + kcgdb.close_pool() + logger.info('KCG Prediction Service stopped') + + +app = FastAPI( + title='KCG Prediction Service', + version='2.0.0', + lifespan=lifespan, +) + + +@app.get('/health') def health_check(): - return {"status": "ok"} + from cache.vessel_store import vessel_store + return { + 'status': 'ok', + 'snpdb': snpdb.check_health(), + 'kcgdb': kcgdb.check_health(), + 'store': vessel_store.stats(), + } + + +@app.get('/api/v1/analysis/status') +def analysis_status(): + return get_last_run() + + +@app.post('/api/v1/analysis/trigger') +def trigger_analysis(background_tasks: BackgroundTasks): + background_tasks.add_task(run_analysis_cycle) + return {'message': 'analysis cycle triggered'} diff --git a/prediction/models/__init__.py b/prediction/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/prediction/models/ais.py b/prediction/models/ais.py new file mode 100644 index 0000000..9effbc4 --- /dev/null +++ b/prediction/models/ais.py @@ -0,0 +1,38 @@ +from dataclasses import dataclass, field +from typing import List, Dict + +import pandas as pd + + +@dataclass +class AISPoint: + mmsi: str + ts: pd.Timestamp + lat: float + lon: float + sog: float + cog: float + state: str = 'UNKNOWN' + + +@dataclass +class VesselTrajectory: + mmsi: str + points: List[AISPoint] = field(default_factory=list) + vessel_type: str = 'UNKNOWN' + cluster_id: int = -1 + season: str = 'UNKNOWN' + fishing_pct: float = 0.0 + features: Dict = field(default_factory=dict) + + +@dataclass +class ClassificationResult: + mmsi: str + vessel_type: str + confidence: float + dominant_state: str + fishing_pct: float + cluster_id: int + season: str + feature_vector: Dict diff --git a/prediction/models/result.py b/prediction/models/result.py new file mode 100644 index 0000000..9792351 --- /dev/null +++ b/prediction/models/result.py @@ -0,0 +1,84 @@ +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Optional + + +@dataclass +class AnalysisResult: + """vessel_analysis_results 테이블 28컬럼 매핑.""" + + mmsi: str + timestamp: datetime + + # 분류 결과 + vessel_type: str = 'UNKNOWN' + confidence: float = 0.0 + fishing_pct: float = 0.0 + cluster_id: int = -1 + season: str = 'UNKNOWN' + + # ALGO 01: 위치 + zone: str = 'EEZ_OR_BEYOND' + dist_to_baseline_nm: float = 999.0 + + # ALGO 02: 활동 상태 + activity_state: str = 'UNKNOWN' + ucaf_score: float = 0.0 + ucft_score: float = 0.0 + + # ALGO 03: 다크 베셀 + is_dark: bool = False + gap_duration_min: int = 0 + + # ALGO 04: GPS 스푸핑 + spoofing_score: float = 0.0 + bd09_offset_m: float = 0.0 + speed_jump_count: int = 0 + + # ALGO 05+06: 선단 + cluster_size: int = 0 + is_leader: bool = False + fleet_role: str = 'NOISE' + + # ALGO 07: 위험도 + risk_score: int = 0 + risk_level: str = 'LOW' + + # 특징 벡터 + features: dict = field(default_factory=dict) + + # 메타 + analyzed_at: Optional[datetime] = None + + def __post_init__(self): + if self.analyzed_at is None: + self.analyzed_at = datetime.now(timezone.utc) + + def to_db_tuple(self) -> tuple: + import json + return ( + self.mmsi, + self.timestamp, + self.vessel_type, + self.confidence, + self.fishing_pct, + self.cluster_id, + self.season, + self.zone, + self.dist_to_baseline_nm, + self.activity_state, + self.ucaf_score, + self.ucft_score, + self.is_dark, + self.gap_duration_min, + self.spoofing_score, + self.bd09_offset_m, + self.speed_jump_count, + self.cluster_size, + self.is_leader, + self.fleet_role, + self.risk_score, + self.risk_level, + json.dumps(self.features), + self.analyzed_at, + ) diff --git a/prediction/pipeline/__init__.py b/prediction/pipeline/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/prediction/pipeline/behavior.py b/prediction/pipeline/behavior.py new file mode 100644 index 0000000..7d40a83 --- /dev/null +++ b/prediction/pipeline/behavior.py @@ -0,0 +1,31 @@ +import pandas as pd +from pipeline.constants import SOG_STATIONARY_MAX, SOG_FISHING_MAX + + +class BehaviorDetector: + """ + 속도 기반 3단계 행동 분류 (Yan et al. 2022, Natale et al. 2015) + 정박(STATIONARY) / 조업(FISHING) / 항행(SAILING) + """ + + @staticmethod + def classify_point(sog: float) -> str: + if sog < SOG_STATIONARY_MAX: + return 'STATIONARY' + elif sog <= SOG_FISHING_MAX: + return 'FISHING' + else: + return 'SAILING' + + def detect(self, df: pd.DataFrame) -> pd.DataFrame: + df = df.copy() + df['state'] = df['sog'].apply(self.classify_point) + return df + + @staticmethod + def compute_fishing_ratio(df_vessel: pd.DataFrame) -> float: + total = len(df_vessel) + if total == 0: + return 0.0 + fishing = (df_vessel['state'] == 'FISHING').sum() + return round(fishing / total * 100, 2) diff --git a/prediction/pipeline/classifier.py b/prediction/pipeline/classifier.py new file mode 100644 index 0000000..9de9184 --- /dev/null +++ b/prediction/pipeline/classifier.py @@ -0,0 +1,100 @@ +import pandas as pd +from typing import Dict, Tuple + + +class VesselTypeClassifier: + """ + Rule-based scoring classifier for fishing vessel types. + + Scoring: for each feature in a type's profile, if the value falls within + the defined range a distance-based score is added (closer to the range + centre = higher score). Values outside the range incur a penalty. + Returns (vessel_type, confidence). + + TRAWL — trawling speed 2.5–4.5 kt, high COG variation + PURSE — purse-seine speed 3–5 kt, circular COG pattern + LONGLINE — longline speed 0.5–2 kt, low COG variation, long fishing runs + TRAP — trap/pot speed ~0 kt, many stationary events, short range + """ + + PROFILES: Dict[str, Dict[str, Tuple[float, float]]] = { + 'TRAWL': { + 'sog_fishing_mean': (2.5, 4.5), + 'cog_change_mean': (0.15, 9.9), + 'fishing_pct': (0.3, 0.7), + 'fishing_run_mean': (5, 50), + 'stationary_events': (0, 5), + }, + 'PURSE': { + 'sog_fishing_mean': (3.0, 5.0), + 'cog_circularity': (0.2, 1.0), + 'fishing_pct': (0.1, 0.5), + 'fishing_run_mean': (3, 30), + 'stationary_events': (0, 3), + }, + 'LONGLINE': { + 'sog_fishing_mean': (0.5, 2.5), + 'cog_change_mean': (0.0, 0.15), + 'fishing_pct': (0.4, 0.9), + 'fishing_run_mean': (20, 999), + 'stationary_events': (0, 10), + }, + 'TRAP': { + 'sog_fishing_mean': (0.0, 2.0), + 'stationary_pct': (0.2, 0.8), + 'stationary_events': (5, 999), + 'fishing_run_mean': (1, 10), + 'total_distance_km': (0, 100), + }, + } + + def classify(self, features: Dict) -> Tuple[str, float]: + """Classify a vessel from its feature dict. + + Returns: + (vessel_type, confidence) where confidence is in [0, 1]. + """ + if not features: + return 'UNKNOWN', 0.0 + + scores: Dict[str, float] = {} + for vtype, profile in self.PROFILES.items(): + score = 0.0 + matched = 0 + for feat_name, (lo, hi) in profile.items(): + val = features.get(feat_name) + if val is None: + continue + matched += 1 + if lo <= val <= hi: + mid = (lo + hi) / 2 + span = (hi - lo) / 2 if (hi - lo) > 0 else 1 + score += max(0.0, 1 - abs(val - mid) / span) + else: + overshoot = min(abs(val - lo), abs(val - hi)) + score -= min(0.5, overshoot / (hi - lo + 1e-9)) + scores[vtype] = score / matched if matched > 0 else 0.0 + + best_type = max(scores, key=lambda k: scores[k]) + total = sum(max(v, 0.0) for v in scores.values()) + confidence = scores[best_type] / total if total > 0 else 0.0 + + return best_type, round(confidence, 3) + + +def get_season(ts: pd.Timestamp) -> str: + """Return the Northern-Hemisphere season for a timestamp. + + Reference: paper 12 seasonal activity analysis (Chinese EEZ). + Chinese fishing ban period: Yellow Sea / East China Sea May–Sep, + South China Sea May–Aug. + """ + m = ts.month + if m in [3, 4, 5]: + return 'SPRING' + elif m in [6, 7, 8]: + return 'SUMMER' + elif m in [9, 10, 11]: + return 'FALL' + else: + return 'WINTER' diff --git a/prediction/pipeline/clusterer.py b/prediction/pipeline/clusterer.py new file mode 100644 index 0000000..7f5d34d --- /dev/null +++ b/prediction/pipeline/clusterer.py @@ -0,0 +1,101 @@ +from collections import Counter +from typing import Dict, Optional + +import numpy as np +import pandas as pd + +from pipeline.constants import BIRCH_THRESHOLD, BIRCH_BRANCHING, MIN_CLUSTER_SIZE + + +class EnhancedBIRCHClusterer: + """Trajectory clustering using sklearn Birch with a simple K-means fallback. + + Based on the enhanced-BIRCH approach (Yan, Yang et al.): + 1. Resample each trajectory to a fixed-length vector. + 2. Build a BIRCH CF-tree for memory-efficient hierarchical clustering. + 3. Small clusters (< MIN_CLUSTER_SIZE) are relabelled as noise (-1). + """ + + def __init__( + self, + threshold: float = BIRCH_THRESHOLD, + branching: int = BIRCH_BRANCHING, + n_clusters: Optional[int] = None, + ) -> None: + self.threshold = threshold + self.branching = branching + self.n_clusters = n_clusters + self._model = None + + def _traj_to_vector(self, df_vessel: pd.DataFrame, n_points: int = 20) -> np.ndarray: + """Convert a vessel trajectory DataFrame to a fixed-length vector. + + Linearly samples n_points from the trajectory and interleaves lat/lon + values, then normalises to zero mean / unit variance. + """ + lats = df_vessel['lat'].values + lons = df_vessel['lon'].values + idx = np.linspace(0, len(lats) - 1, n_points).astype(int) + vec = np.concatenate([lats[idx], lons[idx]]) + vec = (vec - vec.mean()) / (vec.std() + 1e-9) + return vec + + def fit_predict(self, vessels: Dict[str, pd.DataFrame]) -> Dict[str, int]: + """Cluster vessel trajectories. + + Args: + vessels: mapping of mmsi -> resampled trajectory DataFrame. + + Returns: + Mapping of mmsi -> cluster_id. Vessels in small clusters are + assigned cluster_id -1 (noise). Vessels with fewer than 20 + points are excluded from the result. + """ + mmsi_list: list[str] = [] + vectors: list[np.ndarray] = [] + + for mmsi, df_v in vessels.items(): + if len(df_v) < 20: + continue + mmsi_list.append(mmsi) + vectors.append(self._traj_to_vector(df_v)) + + if len(vectors) < 3: + return {m: 0 for m in mmsi_list} + + X = np.array(vectors) + + try: + from sklearn.cluster import Birch + model = Birch( + threshold=self.threshold, + branching_factor=self.branching, + n_clusters=self.n_clusters, + ) + labels = model.fit_predict(X) + self._model = model + except ImportError: + labels = self._simple_cluster(X) + + cnt = Counter(labels) + labels = np.array([lbl if cnt[lbl] >= MIN_CLUSTER_SIZE else -1 for lbl in labels]) + + return dict(zip(mmsi_list, labels.tolist())) + + @staticmethod + def _simple_cluster(X: np.ndarray, k: int = 5) -> np.ndarray: + """Fallback K-means used when sklearn is unavailable.""" + n = len(X) + k = min(k, n) + centers = X[np.random.choice(n, k, replace=False)] + labels = np.zeros(n, dtype=int) + for _ in range(20): + dists = np.array([[np.linalg.norm(x - c) for c in centers] for x in X]) + labels = dists.argmin(axis=1) + new_centers = np.array( + [X[labels == i].mean(axis=0) if (labels == i).any() else centers[i] for i in range(k)] + ) + if np.allclose(centers, new_centers, atol=1e-6): + break + centers = new_centers + return labels diff --git a/prediction/pipeline/constants.py b/prediction/pipeline/constants.py new file mode 100644 index 0000000..4f07866 --- /dev/null +++ b/prediction/pipeline/constants.py @@ -0,0 +1,26 @@ +SOG_STATIONARY_MAX = 1.0 +SOG_FISHING_MAX = 5.0 +SOG_SAILING_MIN = 5.0 + +VESSEL_SOG_PROFILE = { + 'TRAWL': {'min': 1.5, 'max': 4.5, 'mean': 2.8, 'cog_var': 'high'}, + 'PURSE': {'min': 2.0, 'max': 5.0, 'mean': 3.5, 'cog_var': 'circular'}, + 'LONGLINE': {'min': 0.5, 'max': 3.0, 'mean': 1.8, 'cog_var': 'low'}, + 'TRAP': {'min': 0.0, 'max': 2.0, 'mean': 0.8, 'cog_var': 'very_low'}, +} + +RESAMPLE_INTERVAL_MIN = 4 + +BIRCH_THRESHOLD = 0.35 +BIRCH_BRANCHING = 50 +MIN_CLUSTER_SIZE = 5 + +MMSI_DIGITS = 9 +MAX_VESSEL_LENGTH = 300 +MAX_SOG_KNOTS = 30.0 +MIN_TRAJ_POINTS = 100 + +KR_BOUNDS = { + 'lat_min': 32.0, 'lat_max': 39.0, + 'lon_min': 124.0, 'lon_max': 132.0, +} diff --git a/prediction/pipeline/features.py b/prediction/pipeline/features.py new file mode 100644 index 0000000..b59565e --- /dev/null +++ b/prediction/pipeline/features.py @@ -0,0 +1,93 @@ +import math +import numpy as np +import pandas as pd +from typing import Dict + + +class FeatureExtractor: + """ + 어선 유형 분류를 위한 특징 벡터 추출 + 논문 12 (남중국해 어선 유형 식별) 기반 핵심 피처: + - 속도 통계 (mean, std, 분위수) + - 침로 변동성 (COG variance → 선회 패턴) + - 조업 비율 및 조업 지속 시간 + - 이동 거리 및 해역 커버리지 + - 정박 빈도 (투망/양망 간격 추정) + """ + + @staticmethod + def haversine(lat1: float, lon1: float, lat2: float, lon2: float) -> float: + """두 좌표 간 거리 (km)""" + R = 6371.0 + phi1, phi2 = math.radians(lat1), math.radians(lat2) + dphi = math.radians(lat2 - lat1) + dlam = math.radians(lon2 - lon1) + a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2 + return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) + + def extract(self, df_vessel: pd.DataFrame) -> Dict[str, float]: + if len(df_vessel) < 10: + return {} + + sog = df_vessel['sog'].values + cog = df_vessel['cog'].values + states = df_vessel['state'].values + + # Speed features + fishing_sog = sog[states == 'FISHING'] if (states == 'FISHING').any() else np.array([0]) + feat: Dict[str, float] = { + 'sog_mean': float(np.mean(sog)), + 'sog_std': float(np.std(sog)), + 'sog_fishing_mean': float(np.mean(fishing_sog)), + 'sog_fishing_std': float(np.std(fishing_sog)), + 'sog_q25': float(np.percentile(sog, 25)), + 'sog_q75': float(np.percentile(sog, 75)), + } + + # COG features (선망: 원형, 트롤: 직선왕복, 연승: 부드러운 곡선) + cog_diff = np.abs(np.diff(np.unwrap(np.radians(cog)))) + feat['cog_change_mean'] = float(np.mean(cog_diff)) + feat['cog_change_std'] = float(np.std(cog_diff)) + feat['cog_circularity'] = float(np.sum(cog_diff > np.pi / 4) / len(cog_diff)) + + # State ratios + n = len(states) + feat['fishing_pct'] = float((states == 'FISHING').sum() / n) + feat['stationary_pct'] = float((states == 'STATIONARY').sum() / n) + feat['sailing_pct'] = float((states == 'SAILING').sum() / n) + + # Stationary events (투망·양망 횟수 추정) + stationary_events = 0 + prev = None + for s in states: + if s == 'STATIONARY' and prev != 'STATIONARY': + stationary_events += 1 + prev = s + feat['stationary_events'] = float(stationary_events) + + # Total distance (km) + lats = df_vessel['lat'].values + lons = df_vessel['lon'].values + total_dist = sum( + self.haversine(lats[i], lons[i], lats[i + 1], lons[i + 1]) + for i in range(len(lats) - 1) + ) + feat['total_distance_km'] = round(total_dist, 2) + + # Coverage (바운딩 박스 면적 — 근사) + feat['coverage_deg2'] = round(float(np.ptp(lats)) * float(np.ptp(lons)), 4) + + # Average fishing run length + fishing_runs = [] + run = 0 + for s in states: + if s == 'FISHING': + run += 1 + elif run > 0: + fishing_runs.append(run) + run = 0 + if run > 0: + fishing_runs.append(run) + feat['fishing_run_mean'] = float(np.mean(fishing_runs)) if fishing_runs else 0.0 + + return feat diff --git a/prediction/pipeline/orchestrator.py b/prediction/pipeline/orchestrator.py new file mode 100644 index 0000000..2bcbf86 --- /dev/null +++ b/prediction/pipeline/orchestrator.py @@ -0,0 +1,95 @@ +import logging + +import pandas as pd + +from pipeline.preprocessor import AISPreprocessor +from pipeline.behavior import BehaviorDetector +from pipeline.resampler import TrajectoryResampler +from pipeline.features import FeatureExtractor +from pipeline.classifier import VesselTypeClassifier, get_season +from pipeline.clusterer import EnhancedBIRCHClusterer +from pipeline.constants import RESAMPLE_INTERVAL_MIN + +logger = logging.getLogger(__name__) + + +class ChineseFishingVesselPipeline: + """7-step pipeline for classifying Chinese fishing vessel activity types. + + Steps: + 1. AIS preprocessing (Yan et al. 2022) + 2. Behaviour-state detection (speed-based 3-class) + 3. Trajectory resampling (Yan, Yang et al. — 4-minute interval) + 4. Feature vector extraction (paper 12) + 5. Vessel-type classification (rule-based scoring) + 6. Enhanced BIRCH trajectory clustering (Yan, Yang et al.) + 7. Seasonal activity tagging (paper 12) + """ + + def __init__(self) -> None: + self.preprocessor = AISPreprocessor() + self.detector = BehaviorDetector() + self.resampler = TrajectoryResampler(RESAMPLE_INTERVAL_MIN) + self.extractor = FeatureExtractor() + self.classifier = VesselTypeClassifier() + self.clusterer = EnhancedBIRCHClusterer() + + def run( + self, df_raw: pd.DataFrame + ) -> tuple[list[dict], dict[str, pd.DataFrame]]: + """Run the 7-step pipeline. + + Args: + df_raw: raw AIS DataFrame with columns mmsi, timestamp, lat, lon, + sog, cog. + + Returns: + (results, vessel_dfs) where: + - results is a list of classification dicts, each containing: + mmsi, vessel_type, confidence, fishing_pct, cluster_id, season, + n_points, features. + - vessel_dfs is a mapping of mmsi -> resampled trajectory DataFrame. + """ + # Step 1: preprocess + df = self.preprocessor.run(df_raw) + if len(df) == 0: + logger.warning('pipeline: no rows after preprocessing') + return [], {} + + # Step 2: behaviour detection + df = self.detector.detect(df) + + # Steps 3–5: per-vessel processing + vessel_dfs: dict[str, pd.DataFrame] = {} + results: list[dict] = [] + + for mmsi, df_v in df.groupby('mmsi'): + df_resampled = self.resampler.resample(df_v) + vessel_dfs[mmsi] = df_resampled + + features = self.extractor.extract(df_resampled) + vtype, confidence = self.classifier.classify(features) + fishing_pct = BehaviorDetector.compute_fishing_ratio(df_resampled) + season = get_season(df_v['timestamp'].iloc[len(df_v) // 2]) + + results.append({ + 'mmsi': mmsi, + 'vessel_type': vtype, + 'confidence': confidence, + 'fishing_pct': fishing_pct, + 'season': season, + 'n_points': len(df_resampled), + 'features': features, + }) + + # Step 6: BIRCH clustering + cluster_map = self.clusterer.fit_predict(vessel_dfs) + for r in results: + r['cluster_id'] = cluster_map.get(r['mmsi'], -1) + + logger.info( + 'pipeline complete: %d vessels, types=%s', + len(results), + {r['vessel_type'] for r in results}, + ) + return results, vessel_dfs diff --git a/prediction/pipeline/preprocessor.py b/prediction/pipeline/preprocessor.py new file mode 100644 index 0000000..762d651 --- /dev/null +++ b/prediction/pipeline/preprocessor.py @@ -0,0 +1,52 @@ +import pandas as pd +from collections import defaultdict + +from pipeline.constants import KR_BOUNDS, MAX_SOG_KNOTS, MIN_TRAJ_POINTS + + +class AISPreprocessor: + """Delete-Supplement-Update (Yan et al. 2022)""" + + def __init__(self): + self.stats = defaultdict(int) + + def run(self, df: pd.DataFrame) -> pd.DataFrame: + original = len(df) + + required = ['mmsi', 'timestamp', 'lat', 'lon', 'sog', 'cog'] + missing = [c for c in required if c not in df.columns] + if missing: + raise ValueError(f"필수 컬럼 누락: {missing}") + + df = df.copy() + df['timestamp'] = pd.to_datetime(df['timestamp']) + + valid_mmsi = df['mmsi'].astype(str).str.match(r'^\d{9}$') + df = df[valid_mmsi] + self.stats['invalid_mmsi'] += original - len(df) + + df = df[(df['lat'].between(-90, 90)) & (df['lon'].between(-180, 180))] + + df = df[ + df['lat'].between(KR_BOUNDS['lat_min'], KR_BOUNDS['lat_max']) & + df['lon'].between(KR_BOUNDS['lon_min'], KR_BOUNDS['lon_max']) + ] + + df = df.sort_values(['mmsi', 'timestamp']) + df['sog'] = df.groupby('mmsi')['sog'].transform( + lambda x: x.where( + x.between(0, MAX_SOG_KNOTS), + x.rolling(3, center=True, min_periods=1).mean(), + ) + ) + df = df[(df['sog'] >= 0) & (df['sog'] <= MAX_SOG_KNOTS)] + + counts = df.groupby('mmsi').size() + valid_mmsi_list = counts[counts >= MIN_TRAJ_POINTS].index + df = df[df['mmsi'].isin(valid_mmsi_list)] + + df = df.drop_duplicates(subset=['mmsi', 'timestamp']) + + self.stats['final_records'] = len(df) + self.stats['retention_pct'] = round(len(df) / max(original, 1) * 100, 2) + return df.reset_index(drop=True) diff --git a/prediction/pipeline/resampler.py b/prediction/pipeline/resampler.py new file mode 100644 index 0000000..2c6330f --- /dev/null +++ b/prediction/pipeline/resampler.py @@ -0,0 +1,35 @@ +import pandas as pd +from pipeline.constants import RESAMPLE_INTERVAL_MIN +from pipeline.behavior import BehaviorDetector + + +class TrajectoryResampler: + """ + 불균등 AIS 수신 간격을 균등 시간 간격으로 보간 + 목적: BIRCH 군집화의 입력 벡터 정규화 + 방법: 선형 보간 (위도·경도·SOG·COG) + 기준: 4분 간격 (Shepperson et al. 2017) + """ + + def __init__(self, interval_min: int = RESAMPLE_INTERVAL_MIN): + self.interval = pd.Timedelta(minutes=interval_min) + + def resample(self, df_vessel: pd.DataFrame) -> pd.DataFrame: + df_vessel = df_vessel.sort_values('timestamp').copy() + if len(df_vessel) < 2: + return df_vessel + + t_start = df_vessel['timestamp'].iloc[0] + t_end = df_vessel['timestamp'].iloc[-1] + new_times = pd.date_range(t_start, t_end, freq=self.interval) + + df_vessel = df_vessel.set_index('timestamp') + df_vessel = df_vessel.reindex(df_vessel.index.union(new_times)) + for col in ['lat', 'lon', 'sog', 'cog']: + if col in df_vessel.columns: + df_vessel[col] = df_vessel[col].interpolate(method='time') + + df_vessel = df_vessel.loc[new_times].reset_index() + df_vessel.rename(columns={'index': 'timestamp'}, inplace=True) + df_vessel['state'] = df_vessel['sog'].apply(BehaviorDetector.classify_point) + return df_vessel diff --git a/prediction/requirements.txt b/prediction/requirements.txt index 531efda..7268415 100644 --- a/prediction/requirements.txt +++ b/prediction/requirements.txt @@ -1,2 +1,8 @@ fastapi==0.115.0 uvicorn==0.30.6 +pydantic-settings>=2.0 +psycopg2-binary>=2.9 +numpy>=1.26 +pandas>=2.2 +scikit-learn>=1.5 +apscheduler>=3.10 diff --git a/prediction/scheduler.py b/prediction/scheduler.py new file mode 100644 index 0000000..1b6f755 --- /dev/null +++ b/prediction/scheduler.py @@ -0,0 +1,177 @@ +import logging +import time +from datetime import datetime, timezone +from typing import Optional + +from apscheduler.schedulers.background import BackgroundScheduler + +from config import settings + +logger = logging.getLogger(__name__) + +_scheduler: Optional[BackgroundScheduler] = None +_last_run: dict = { + 'timestamp': None, + 'duration_sec': 0, + 'vessel_count': 0, + 'upserted': 0, + 'error': None, +} + + +def get_last_run() -> dict: + return _last_run.copy() + + +def run_analysis_cycle(): + """5분 주기 분석 사이클 — 인메모리 캐시 기반.""" + from cache.vessel_store import vessel_store + from db import snpdb, kcgdb + from pipeline.orchestrator import ChineseFishingVesselPipeline + from algorithms.location import classify_zone + from algorithms.fishing_pattern import compute_ucaf_score, compute_ucft_score + from algorithms.dark_vessel import is_dark_vessel + from algorithms.spoofing import compute_spoofing_score, count_speed_jumps, compute_bd09_offset + from algorithms.fleet import assign_fleet_roles + from algorithms.risk import compute_vessel_risk_score + from models.result import AnalysisResult + + start = time.time() + _last_run['timestamp'] = datetime.now(timezone.utc).isoformat() + _last_run['error'] = None + + try: + # 1. 증분 로드 + stale 제거 + if vessel_store.last_bucket is None: + logger.warning('last_bucket is None, skipping incremental fetch (initial load not complete)') + df_new = None + else: + df_new = snpdb.fetch_incremental(vessel_store.last_bucket) + if df_new is not None and len(df_new) > 0: + vessel_store.merge_incremental(df_new) + vessel_store.evict_stale(settings.CACHE_WINDOW_HOURS) + + # 정적정보 / 허가어선 주기적 갱신 + vessel_store.refresh_static_info() + vessel_store.refresh_permit_registry() + + # 2. 분석 대상 선별 (SOG/COG 계산 포함) + df_targets = vessel_store.select_analysis_targets() + if len(df_targets) == 0: + logger.info('no analysis targets, skipping cycle') + _last_run['vessel_count'] = 0 + return + + # 3. 7단계 파이프라인 실행 + pipeline = ChineseFishingVesselPipeline() + classifications, vessel_dfs = pipeline.run(df_targets) + + if not classifications: + logger.info('no vessels classified, skipping') + _last_run['vessel_count'] = 0 + return + + # 4. 선단 역할 분석 + cluster_map = {c['mmsi']: c['cluster_id'] for c in classifications} + fleet_roles = assign_fleet_roles(vessel_dfs, cluster_map) + + # 5. 선박별 추가 알고리즘 → AnalysisResult 생성 + results = [] + for c in classifications: + mmsi = c['mmsi'] + df_v = vessel_dfs.get(mmsi) + if df_v is None or len(df_v) == 0: + continue + + last_row = df_v.iloc[-1] + ts = last_row.get('timestamp') + + zone_info = classify_zone(last_row['lat'], last_row['lon']) + + gear_map = {'TRAWL': 'OT', 'PURSE': 'PS', 'LONGLINE': 'GN', 'TRAP': 'TRAP'} + gear = gear_map.get(c['vessel_type'], 'OT') + ucaf = compute_ucaf_score(df_v, gear) + ucft = compute_ucft_score(df_v) + + dark, gap_min = is_dark_vessel(df_v) + + spoof_score = compute_spoofing_score(df_v) + speed_jumps = count_speed_jumps(df_v) + bd09_offset = compute_bd09_offset(last_row['lat'], last_row['lon']) + + fleet_info = fleet_roles.get(mmsi, {}) + + is_permitted = vessel_store.is_permitted(mmsi) + risk_score, risk_level = compute_vessel_risk_score( + mmsi, df_v, zone_info, is_permitted=is_permitted, + ) + + activity = 'UNKNOWN' + if 'state' in df_v.columns and len(df_v) > 0: + activity = df_v['state'].mode().iloc[0] + + results.append(AnalysisResult( + mmsi=mmsi, + timestamp=ts, + vessel_type=c['vessel_type'], + confidence=c['confidence'], + fishing_pct=c['fishing_pct'], + cluster_id=c['cluster_id'], + season=c['season'], + zone=zone_info.get('zone', 'EEZ_OR_BEYOND'), + dist_to_baseline_nm=zone_info.get('dist_from_baseline_nm', 999.0), + activity_state=activity, + ucaf_score=ucaf, + ucft_score=ucft, + is_dark=dark, + gap_duration_min=gap_min, + spoofing_score=spoof_score, + bd09_offset_m=bd09_offset, + speed_jump_count=speed_jumps, + cluster_size=fleet_info.get('cluster_size', 0), + is_leader=fleet_info.get('is_leader', False), + fleet_role=fleet_info.get('fleet_role', 'NOISE'), + risk_score=risk_score, + risk_level=risk_level, + features=c.get('features', {}), + )) + + # 6. 결과 저장 + upserted = kcgdb.upsert_results(results) + kcgdb.cleanup_old(hours=48) + + elapsed = round(time.time() - start, 2) + _last_run['duration_sec'] = elapsed + _last_run['vessel_count'] = len(results) + _last_run['upserted'] = upserted + logger.info( + 'analysis cycle: %d vessels, %d upserted, %.2fs', + len(results), upserted, elapsed, + ) + + except Exception as e: + _last_run['error'] = str(e) + logger.exception('analysis cycle failed: %s', e) + + +def start_scheduler(): + global _scheduler + _scheduler = BackgroundScheduler() + _scheduler.add_job( + run_analysis_cycle, + 'interval', + minutes=settings.SCHEDULER_INTERVAL_MIN, + id='vessel_analysis', + max_instances=1, + replace_existing=True, + ) + _scheduler.start() + logger.info('scheduler started (interval=%dm)', settings.SCHEDULER_INTERVAL_MIN) + + +def stop_scheduler(): + global _scheduler + if _scheduler: + _scheduler.shutdown(wait=False) + _scheduler = None + logger.info('scheduler stopped')