Merge pull request 'release: Python 어선 분류기 + 배포 설정 + 모니터링 프록시' (#89) from develop into main
Some checks failed
Deploy KCG / deploy (push) Failing after 1m47s
Some checks failed
Deploy KCG / deploy (push) Failing after 1m47s
This commit is contained in:
커밋
635753f636
@ -72,6 +72,7 @@ jobs:
|
|||||||
[ -n "$DB_PASSWORD" ] && echo "DB_PASSWORD=${DB_PASSWORD}" >> $DEPLOY_DIR/.env
|
[ -n "$DB_PASSWORD" ] && echo "DB_PASSWORD=${DB_PASSWORD}" >> $DEPLOY_DIR/.env
|
||||||
[ -n "$OPENSKY_CLIENT_ID" ] && echo "OPENSKY_CLIENT_ID=${OPENSKY_CLIENT_ID}" >> $DEPLOY_DIR/.env
|
[ -n "$OPENSKY_CLIENT_ID" ] && echo "OPENSKY_CLIENT_ID=${OPENSKY_CLIENT_ID}" >> $DEPLOY_DIR/.env
|
||||||
[ -n "$OPENSKY_CLIENT_SECRET" ] && echo "OPENSKY_CLIENT_SECRET=${OPENSKY_CLIENT_SECRET}" >> $DEPLOY_DIR/.env
|
[ -n "$OPENSKY_CLIENT_SECRET" ] && echo "OPENSKY_CLIENT_SECRET=${OPENSKY_CLIENT_SECRET}" >> $DEPLOY_DIR/.env
|
||||||
|
echo "PREDICTION_BASE_URL=http://192.168.1.18:8001" >> $DEPLOY_DIR/.env
|
||||||
|
|
||||||
# JAR 내부에 application-prod.yml이 있으면 외부 파일 제거
|
# JAR 내부에 application-prod.yml이 있으면 외부 파일 제거
|
||||||
if unzip -l backend/target/kcg.jar | grep -q 'application-prod.yml$'; then
|
if unzip -l backend/target/kcg.jar | grep -q 'application-prod.yml$'; then
|
||||||
@ -147,6 +148,79 @@ jobs:
|
|||||||
sleep 10
|
sleep 10
|
||||||
done
|
done
|
||||||
|
|
||||||
|
# ═══ Prediction (FastAPI → redis-211) ═══
|
||||||
|
- name: Deploy prediction via SSH
|
||||||
|
env:
|
||||||
|
DEPLOY_KEY: ${{ secrets.DEPLOY_SSH_KEY }}
|
||||||
|
PRED_HOST: 192.168.1.18
|
||||||
|
PRED_PORT: 32023
|
||||||
|
run: |
|
||||||
|
mkdir -p ~/.ssh
|
||||||
|
printf '%s\n' "$DEPLOY_KEY" > ~/.ssh/id_deploy
|
||||||
|
chmod 600 ~/.ssh/id_deploy
|
||||||
|
|
||||||
|
SSH_OPTS="-i ~/.ssh/id_deploy -o StrictHostKeyChecking=no -o ConnectTimeout=10 -o ServerAliveInterval=15 -p $PRED_PORT"
|
||||||
|
SCP_OPTS="-i ~/.ssh/id_deploy -o StrictHostKeyChecking=no -P $PRED_PORT"
|
||||||
|
|
||||||
|
REMOTE_DIR=/home/apps/kcg-prediction
|
||||||
|
|
||||||
|
# 코드 전송 (rsync 대체: tar + scp)
|
||||||
|
tar czf /tmp/prediction.tar.gz -C prediction --exclude='__pycache__' --exclude='venv' --exclude='.env' .
|
||||||
|
for attempt in 1 2 3; do
|
||||||
|
echo "SCP prediction attempt $attempt/3..."
|
||||||
|
if scp $SCP_OPTS /tmp/prediction.tar.gz root@$PRED_HOST:/tmp/prediction.tar.gz; then break; fi
|
||||||
|
[ "$attempt" -eq 3 ] && { echo "ERROR: SCP failed"; exit 1; }
|
||||||
|
sleep 10
|
||||||
|
done
|
||||||
|
|
||||||
|
# systemd 서비스 파일 전송
|
||||||
|
scp $SCP_OPTS deploy/kcg-prediction.service root@$PRED_HOST:/tmp/kcg-prediction.service
|
||||||
|
|
||||||
|
# 원격 설치 + 재시작
|
||||||
|
for attempt in 1 2 3; do
|
||||||
|
echo "SSH deploy attempt $attempt/3..."
|
||||||
|
if ssh $SSH_OPTS root@$PRED_HOST bash -s << 'SCRIPT'
|
||||||
|
set -e
|
||||||
|
REMOTE_DIR=/home/apps/kcg-prediction
|
||||||
|
mkdir -p $REMOTE_DIR
|
||||||
|
cd $REMOTE_DIR
|
||||||
|
|
||||||
|
# 코드 배포
|
||||||
|
tar xzf /tmp/prediction.tar.gz -C $REMOTE_DIR
|
||||||
|
rm -f /tmp/prediction.tar.gz
|
||||||
|
|
||||||
|
# venv + 의존성
|
||||||
|
python3 -m venv venv 2>/dev/null || true
|
||||||
|
venv/bin/pip install -r requirements.txt -q
|
||||||
|
|
||||||
|
# systemd 서비스 갱신
|
||||||
|
if ! diff -q /tmp/kcg-prediction.service /etc/systemd/system/kcg-prediction.service >/dev/null 2>&1; then
|
||||||
|
cp /tmp/kcg-prediction.service /etc/systemd/system/kcg-prediction.service
|
||||||
|
systemctl daemon-reload
|
||||||
|
systemctl enable kcg-prediction
|
||||||
|
fi
|
||||||
|
rm -f /tmp/kcg-prediction.service
|
||||||
|
|
||||||
|
# 재시작
|
||||||
|
systemctl restart kcg-prediction
|
||||||
|
|
||||||
|
# health 확인 (30초)
|
||||||
|
for i in $(seq 1 6); do
|
||||||
|
if curl -sf http://localhost:8001/health > /dev/null 2>&1; then
|
||||||
|
echo "Prediction healthy (${i})"
|
||||||
|
exit 0
|
||||||
|
fi
|
||||||
|
sleep 5
|
||||||
|
done
|
||||||
|
echo "WARNING: Prediction health timeout"
|
||||||
|
journalctl -u kcg-prediction --no-pager -n 10
|
||||||
|
exit 1
|
||||||
|
SCRIPT
|
||||||
|
then exit 0; fi
|
||||||
|
[ "$attempt" -eq 3 ] && { echo "ERROR: SSH failed"; exit 1; }
|
||||||
|
sleep 10
|
||||||
|
done
|
||||||
|
|
||||||
- name: Cleanup
|
- name: Cleanup
|
||||||
if: always()
|
if: always()
|
||||||
run: rm -f ~/.ssh/id_deploy
|
run: rm -f ~/.ssh/id_deploy
|
||||||
|
|||||||
@ -23,6 +23,8 @@ public class AuthFilter extends OncePerRequestFilter {
|
|||||||
private static final String AUTH_PATH_PREFIX = "/api/auth/";
|
private static final String AUTH_PATH_PREFIX = "/api/auth/";
|
||||||
private static final String SENSOR_PATH_PREFIX = "/api/sensor/";
|
private static final String SENSOR_PATH_PREFIX = "/api/sensor/";
|
||||||
private static final String CCTV_PATH_PREFIX = "/api/cctv/";
|
private static final String CCTV_PATH_PREFIX = "/api/cctv/";
|
||||||
|
private static final String VESSEL_ANALYSIS_PATH_PREFIX = "/api/vessel-analysis";
|
||||||
|
private static final String PREDICTION_PATH_PREFIX = "/api/prediction/";
|
||||||
|
|
||||||
private final JwtProvider jwtProvider;
|
private final JwtProvider jwtProvider;
|
||||||
|
|
||||||
@ -31,7 +33,9 @@ public class AuthFilter extends OncePerRequestFilter {
|
|||||||
String path = request.getRequestURI();
|
String path = request.getRequestURI();
|
||||||
return path.startsWith(AUTH_PATH_PREFIX)
|
return path.startsWith(AUTH_PATH_PREFIX)
|
||||||
|| path.startsWith(SENSOR_PATH_PREFIX)
|
|| path.startsWith(SENSOR_PATH_PREFIX)
|
||||||
|| path.startsWith(CCTV_PATH_PREFIX);
|
|| path.startsWith(CCTV_PATH_PREFIX)
|
||||||
|
|| path.startsWith(VESSEL_ANALYSIS_PATH_PREFIX)
|
||||||
|
|| path.startsWith(PREDICTION_PATH_PREFIX);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@ -46,6 +46,7 @@ public class AppProperties {
|
|||||||
private String gdeltBaseUrl = "https://api.gdeltproject.org/api/v2/doc/doc";
|
private String gdeltBaseUrl = "https://api.gdeltproject.org/api/v2/doc/doc";
|
||||||
private String googleNewsBaseUrl = "https://news.google.com/rss/search";
|
private String googleNewsBaseUrl = "https://news.google.com/rss/search";
|
||||||
private String celestrakBaseUrl = "https://celestrak.org/NORAD/elements/gp.php";
|
private String celestrakBaseUrl = "https://celestrak.org/NORAD/elements/gp.php";
|
||||||
|
private String predictionBaseUrl = "http://localhost:8001";
|
||||||
private int requestDelayMs = 1500;
|
private int requestDelayMs = 1500;
|
||||||
private int backoffMs = 5000;
|
private int backoffMs = 5000;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,80 @@
|
|||||||
|
package gc.mda.kcg.domain.analysis;
|
||||||
|
|
||||||
|
import gc.mda.kcg.config.AppProperties;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.http.*;
|
||||||
|
import org.springframework.web.bind.annotation.*;
|
||||||
|
import org.springframework.web.client.ResourceAccessException;
|
||||||
|
import org.springframework.web.client.RestClientException;
|
||||||
|
import org.springframework.web.client.RestTemplate;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Prediction 서비스(FastAPI) 상태/트리거 프록시
|
||||||
|
* GET /api/prediction/health → {predictionBaseUrl}/health
|
||||||
|
* GET /api/prediction/status → {predictionBaseUrl}/api/v1/analysis/status
|
||||||
|
* POST /api/prediction/trigger → {predictionBaseUrl}/api/v1/analysis/trigger
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@RestController
|
||||||
|
@RequestMapping("/api/prediction")
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class PredictionProxyController {
|
||||||
|
|
||||||
|
private final RestTemplate restTemplate;
|
||||||
|
private final AppProperties appProperties;
|
||||||
|
|
||||||
|
@GetMapping("/health")
|
||||||
|
public ResponseEntity<Object> health() {
|
||||||
|
String url = appProperties.getCollector().getPredictionBaseUrl() + "/health";
|
||||||
|
return proxyGet(url);
|
||||||
|
}
|
||||||
|
|
||||||
|
@GetMapping("/status")
|
||||||
|
public ResponseEntity<Object> status() {
|
||||||
|
String url = appProperties.getCollector().getPredictionBaseUrl() + "/api/v1/analysis/status";
|
||||||
|
return proxyGet(url);
|
||||||
|
}
|
||||||
|
|
||||||
|
@PostMapping("/trigger")
|
||||||
|
public ResponseEntity<Object> trigger(@RequestBody(required = false) Object body) {
|
||||||
|
String url = appProperties.getCollector().getPredictionBaseUrl() + "/api/v1/analysis/trigger";
|
||||||
|
return proxyPost(url, body);
|
||||||
|
}
|
||||||
|
|
||||||
|
private ResponseEntity<Object> proxyGet(String url) {
|
||||||
|
try {
|
||||||
|
ResponseEntity<Object> upstream = restTemplate.exchange(
|
||||||
|
url, HttpMethod.GET, null, Object.class);
|
||||||
|
return ResponseEntity.status(upstream.getStatusCode()).body(upstream.getBody());
|
||||||
|
} catch (ResourceAccessException e) {
|
||||||
|
log.warn("Prediction 서비스 접속 불가: {}", e.getMessage());
|
||||||
|
return unreachable(e.getMessage());
|
||||||
|
} catch (RestClientException e) {
|
||||||
|
log.warn("Prediction 서비스 오류: {}", e.getMessage());
|
||||||
|
return unreachable(e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private ResponseEntity<Object> proxyPost(String url, Object body) {
|
||||||
|
try {
|
||||||
|
HttpEntity<Object> entity = new HttpEntity<>(body);
|
||||||
|
ResponseEntity<Object> upstream = restTemplate.exchange(
|
||||||
|
url, HttpMethod.POST, entity, Object.class);
|
||||||
|
return ResponseEntity.status(upstream.getStatusCode()).body(upstream.getBody());
|
||||||
|
} catch (ResourceAccessException e) {
|
||||||
|
log.warn("Prediction 서비스 접속 불가: {}", e.getMessage());
|
||||||
|
return unreachable(e.getMessage());
|
||||||
|
} catch (RestClientException e) {
|
||||||
|
log.warn("Prediction 서비스 오류: {}", e.getMessage());
|
||||||
|
return unreachable(e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private ResponseEntity<Object> unreachable(String message) {
|
||||||
|
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE)
|
||||||
|
.body(Map.of("status", "unreachable", "error", message != null ? message : "connection failed"));
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -14,5 +14,6 @@ app:
|
|||||||
collector:
|
collector:
|
||||||
open-sky-client-id: ${OPENSKY_CLIENT_ID:}
|
open-sky-client-id: ${OPENSKY_CLIENT_ID:}
|
||||||
open-sky-client-secret: ${OPENSKY_CLIENT_SECRET:}
|
open-sky-client-secret: ${OPENSKY_CLIENT_SECRET:}
|
||||||
|
prediction-base-url: ${PREDICTION_BASE_URL:http://192.168.1.18:8001}
|
||||||
cors:
|
cors:
|
||||||
allowed-origins: http://localhost:5173,https://kcg.gc-si.dev
|
allowed-origins: http://localhost:5173,https://kcg.gc-si.dev
|
||||||
|
|||||||
3
database/migration/006_vessel_analysis_unique.sql
Normal file
3
database/migration/006_vessel_analysis_unique.sql
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
-- UPSERT를 위한 UNIQUE 인덱스 추가
|
||||||
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_vessel_analysis_mmsi_ts
|
||||||
|
ON kcg.vessel_analysis_results(mmsi, timestamp);
|
||||||
15
deploy/kcg-prediction.service
Normal file
15
deploy/kcg-prediction.service
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
[Unit]
|
||||||
|
Description=KCG Vessel Analysis (FastAPI)
|
||||||
|
After=network.target
|
||||||
|
|
||||||
|
[Service]
|
||||||
|
Type=simple
|
||||||
|
User=root
|
||||||
|
WorkingDirectory=/home/apps/kcg-prediction
|
||||||
|
ExecStart=/home/apps/kcg-prediction/venv/bin/uvicorn main:app --host 0.0.0.0 --port 8001
|
||||||
|
EnvironmentFile=/home/apps/kcg-prediction/.env
|
||||||
|
Restart=always
|
||||||
|
RestartSec=5
|
||||||
|
|
||||||
|
[Install]
|
||||||
|
WantedBy=multi-user.target
|
||||||
0
prediction/algorithms/__init__.py
Normal file
0
prediction/algorithms/__init__.py
Normal file
59
prediction/algorithms/dark_vessel.py
Normal file
59
prediction/algorithms/dark_vessel.py
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
import pandas as pd
|
||||||
|
from algorithms.location import haversine_nm
|
||||||
|
|
||||||
|
GAP_SUSPICIOUS_SEC = 1800 # 30분
|
||||||
|
GAP_HIGH_SUSPICIOUS_SEC = 3600 # 1시간
|
||||||
|
GAP_VIOLATION_SEC = 86400 # 24시간
|
||||||
|
|
||||||
|
|
||||||
|
def detect_ais_gaps(df_vessel: pd.DataFrame) -> list[dict]:
|
||||||
|
"""AIS 수신 기록에서 소실 구간 추출."""
|
||||||
|
if len(df_vessel) < 2:
|
||||||
|
return []
|
||||||
|
|
||||||
|
gaps = []
|
||||||
|
records = df_vessel.sort_values('timestamp').to_dict('records')
|
||||||
|
|
||||||
|
for i in range(1, len(records)):
|
||||||
|
prev, curr = records[i - 1], records[i]
|
||||||
|
prev_ts = pd.Timestamp(prev['timestamp'])
|
||||||
|
curr_ts = pd.Timestamp(curr['timestamp'])
|
||||||
|
gap_sec = (curr_ts - prev_ts).total_seconds()
|
||||||
|
|
||||||
|
if gap_sec < GAP_SUSPICIOUS_SEC:
|
||||||
|
continue
|
||||||
|
|
||||||
|
disp = haversine_nm(
|
||||||
|
prev['lat'], prev['lon'],
|
||||||
|
curr['lat'], curr['lon'],
|
||||||
|
)
|
||||||
|
|
||||||
|
if gap_sec >= GAP_VIOLATION_SEC:
|
||||||
|
severity = 'VIOLATION'
|
||||||
|
elif gap_sec >= GAP_HIGH_SUSPICIOUS_SEC:
|
||||||
|
severity = 'HIGH_SUSPICIOUS'
|
||||||
|
else:
|
||||||
|
severity = 'SUSPICIOUS'
|
||||||
|
|
||||||
|
gaps.append({
|
||||||
|
'gap_sec': int(gap_sec),
|
||||||
|
'gap_min': round(gap_sec / 60, 1),
|
||||||
|
'displacement_nm': round(disp, 2),
|
||||||
|
'severity': severity,
|
||||||
|
})
|
||||||
|
|
||||||
|
return gaps
|
||||||
|
|
||||||
|
|
||||||
|
def is_dark_vessel(df_vessel: pd.DataFrame) -> tuple[bool, int]:
|
||||||
|
"""다크베셀 여부 판정.
|
||||||
|
|
||||||
|
Returns: (is_dark, max_gap_duration_min)
|
||||||
|
"""
|
||||||
|
gaps = detect_ais_gaps(df_vessel)
|
||||||
|
if not gaps:
|
||||||
|
return False, 0
|
||||||
|
|
||||||
|
max_gap_min = max(g['gap_min'] for g in gaps)
|
||||||
|
is_dark = max_gap_min >= 30 # 30분 이상 소실
|
||||||
|
return is_dark, int(max_gap_min)
|
||||||
117
prediction/algorithms/fishing_pattern.py
Normal file
117
prediction/algorithms/fishing_pattern.py
Normal file
@ -0,0 +1,117 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
from algorithms.location import haversine_nm, classify_zone # noqa: F401 (haversine_nm re-exported for callers)
|
||||||
|
|
||||||
|
# Yan et al. (2022) 어구별 조업 속도 임계값
|
||||||
|
GEAR_SOG_THRESHOLDS: dict[str, tuple[float, float]] = {
|
||||||
|
'PT': (2.5, 4.5), # 쌍끌이저인망
|
||||||
|
'OT': (2.0, 4.0), # 단선저인망
|
||||||
|
'GN': (0.5, 2.5), # 자망·유망
|
||||||
|
'SQ': (0.0, 1.0), # 오징어채낚기
|
||||||
|
'TRAP': (0.3, 1.5), # 통발
|
||||||
|
'PS': (3.0, 6.0), # 선망
|
||||||
|
'TRAWL': (2.0, 4.5), # (alias)
|
||||||
|
'PURSE': (3.0, 6.0), # (alias)
|
||||||
|
'LONGLINE': (0.5, 2.5),
|
||||||
|
}
|
||||||
|
TRANSIT_SOG_MIN = 5.0
|
||||||
|
ANCHORED_SOG_MAX = 0.5
|
||||||
|
|
||||||
|
|
||||||
|
def classify_vessel_state(sog: float, cog_delta: float = 0.0,
|
||||||
|
gear_type: str = 'PT') -> str:
|
||||||
|
"""UCAF: 어구별 상태 분류."""
|
||||||
|
if sog <= ANCHORED_SOG_MAX:
|
||||||
|
return 'ANCHORED'
|
||||||
|
if sog >= TRANSIT_SOG_MIN:
|
||||||
|
return 'TRANSIT'
|
||||||
|
sog_min, sog_max = GEAR_SOG_THRESHOLDS.get(gear_type, (1.0, 5.0))
|
||||||
|
if sog_min <= sog <= sog_max:
|
||||||
|
return 'FISHING'
|
||||||
|
return 'UNKNOWN'
|
||||||
|
|
||||||
|
|
||||||
|
def compute_ucaf_score(df_vessel: pd.DataFrame, gear_type: str = 'PT') -> float:
|
||||||
|
"""UCAF 점수: 어구별 조업 상태 비율 (0~1)."""
|
||||||
|
if len(df_vessel) == 0:
|
||||||
|
return 0.0
|
||||||
|
sog_min, sog_max = GEAR_SOG_THRESHOLDS.get(gear_type, (1.0, 5.0))
|
||||||
|
in_range = df_vessel['sog'].between(sog_min, sog_max).sum()
|
||||||
|
return round(in_range / len(df_vessel), 4)
|
||||||
|
|
||||||
|
|
||||||
|
def compute_ucft_score(df_vessel: pd.DataFrame) -> float:
|
||||||
|
"""UCFT 점수: 조업 vs 항행 이진 신뢰도 (0~1)."""
|
||||||
|
if len(df_vessel) == 0:
|
||||||
|
return 0.0
|
||||||
|
fishing = (df_vessel['sog'].between(0.5, 5.0)).sum()
|
||||||
|
transit = (df_vessel['sog'] >= TRANSIT_SOG_MIN).sum()
|
||||||
|
total = fishing + transit
|
||||||
|
if total == 0:
|
||||||
|
return 0.0
|
||||||
|
return round(fishing / total, 4)
|
||||||
|
|
||||||
|
|
||||||
|
def detect_fishing_segments(df_vessel: pd.DataFrame,
|
||||||
|
window_min: int = 15,
|
||||||
|
gear_type: str = 'PT') -> list[dict]:
|
||||||
|
"""연속 조업 구간 추출."""
|
||||||
|
if len(df_vessel) < 2:
|
||||||
|
return []
|
||||||
|
|
||||||
|
segments: list[dict] = []
|
||||||
|
in_fishing = False
|
||||||
|
seg_start_idx = 0
|
||||||
|
|
||||||
|
records = df_vessel.to_dict('records')
|
||||||
|
for i, rec in enumerate(records):
|
||||||
|
sog = rec.get('sog', 0)
|
||||||
|
state = classify_vessel_state(sog, gear_type=gear_type)
|
||||||
|
|
||||||
|
if state == 'FISHING' and not in_fishing:
|
||||||
|
in_fishing = True
|
||||||
|
seg_start_idx = i
|
||||||
|
elif state != 'FISHING' and in_fishing:
|
||||||
|
start_ts = records[seg_start_idx].get('timestamp')
|
||||||
|
end_ts = rec.get('timestamp')
|
||||||
|
if start_ts and end_ts:
|
||||||
|
dur_sec = (pd.Timestamp(end_ts) - pd.Timestamp(start_ts)).total_seconds()
|
||||||
|
dur_min = dur_sec / 60
|
||||||
|
if dur_min >= window_min:
|
||||||
|
zone_info = classify_zone(
|
||||||
|
records[seg_start_idx].get('lat', 0),
|
||||||
|
records[seg_start_idx].get('lon', 0),
|
||||||
|
)
|
||||||
|
segments.append({
|
||||||
|
'start_idx': seg_start_idx,
|
||||||
|
'end_idx': i - 1,
|
||||||
|
'duration_min': round(dur_min, 1),
|
||||||
|
'zone': zone_info.get('zone', 'UNKNOWN'),
|
||||||
|
'in_territorial_sea': zone_info.get('zone') == 'TERRITORIAL_SEA',
|
||||||
|
})
|
||||||
|
in_fishing = False
|
||||||
|
|
||||||
|
return segments
|
||||||
|
|
||||||
|
|
||||||
|
def detect_trawl_uturn(df_vessel: pd.DataFrame,
|
||||||
|
uturn_threshold_deg: float = 150.0,
|
||||||
|
min_uturn_count: int = 3) -> dict:
|
||||||
|
"""U-turn 왕복 패턴 감지 (저인망 특징)."""
|
||||||
|
if len(df_vessel) < 2:
|
||||||
|
return {'uturn_count': 0, 'trawl_suspected': False}
|
||||||
|
|
||||||
|
uturn_count = 0
|
||||||
|
cog_vals = df_vessel['cog'].values
|
||||||
|
sog_vals = df_vessel['sog'].values
|
||||||
|
|
||||||
|
for i in range(1, len(cog_vals)):
|
||||||
|
delta = abs((cog_vals[i] - cog_vals[i - 1] + 180) % 360 - 180)
|
||||||
|
if delta >= uturn_threshold_deg and sog_vals[i] < TRANSIT_SOG_MIN:
|
||||||
|
uturn_count += 1
|
||||||
|
|
||||||
|
return {
|
||||||
|
'uturn_count': uturn_count,
|
||||||
|
'trawl_suspected': uturn_count >= min_uturn_count,
|
||||||
|
}
|
||||||
152
prediction/algorithms/fleet.py
Normal file
152
prediction/algorithms/fleet.py
Normal file
@ -0,0 +1,152 @@
|
|||||||
|
import math
|
||||||
|
import logging
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import pandas as pd
|
||||||
|
from algorithms.location import haversine_nm, dist_to_baseline, EARTH_RADIUS_NM
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def detect_group_clusters(
|
||||||
|
vessel_snapshots: list[dict],
|
||||||
|
spatial_eps_nm: float = 10.0,
|
||||||
|
time_eps_hours: float = 2.0,
|
||||||
|
min_vessels: int = 3,
|
||||||
|
) -> dict[int, list[dict]]:
|
||||||
|
"""DBSCAN 시공간 클러스터링으로 집단 탐지."""
|
||||||
|
if len(vessel_snapshots) < min_vessels:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
try:
|
||||||
|
from sklearn.cluster import DBSCAN
|
||||||
|
except ImportError:
|
||||||
|
logger.warning('sklearn not available for DBSCAN clustering')
|
||||||
|
return {}
|
||||||
|
|
||||||
|
lat_rad = [math.radians(v['lat']) * EARTH_RADIUS_NM for v in vessel_snapshots]
|
||||||
|
lon_rad = [math.radians(v['lon']) * EARTH_RADIUS_NM for v in vessel_snapshots]
|
||||||
|
|
||||||
|
# 시간을 NM 단위로 정규화
|
||||||
|
timestamps = [pd.Timestamp(v['timestamp']).timestamp() for v in vessel_snapshots]
|
||||||
|
t_min = min(timestamps)
|
||||||
|
time_nm = [(t - t_min) / 3600 * 10 / time_eps_hours for t in timestamps]
|
||||||
|
|
||||||
|
X = np.array(list(zip(lat_rad, lon_rad, time_nm)))
|
||||||
|
|
||||||
|
db = DBSCAN(eps=spatial_eps_nm, min_samples=min_vessels, metric='euclidean').fit(X)
|
||||||
|
|
||||||
|
clusters: dict[int, list[dict]] = {}
|
||||||
|
for idx, label in enumerate(db.labels_):
|
||||||
|
if label == -1:
|
||||||
|
continue
|
||||||
|
clusters.setdefault(int(label), []).append(vessel_snapshots[idx])
|
||||||
|
|
||||||
|
return clusters
|
||||||
|
|
||||||
|
|
||||||
|
def identify_lead_vessel(cluster_vessels: list[dict]) -> dict:
|
||||||
|
"""5기준 스코어링으로 대표선 특정."""
|
||||||
|
if not cluster_vessels:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
scores: dict[str, float] = {}
|
||||||
|
|
||||||
|
timestamps = [pd.Timestamp(v.get('timestamp', 0)).timestamp() for v in cluster_vessels]
|
||||||
|
min_ts = min(timestamps) if timestamps else 0
|
||||||
|
|
||||||
|
lats = [v['lat'] for v in cluster_vessels]
|
||||||
|
lons = [v['lon'] for v in cluster_vessels]
|
||||||
|
centroid_lat = float(np.mean(lats))
|
||||||
|
centroid_lon = float(np.mean(lons))
|
||||||
|
|
||||||
|
for i, v in enumerate(cluster_vessels):
|
||||||
|
mmsi = v['mmsi']
|
||||||
|
s = 0.0
|
||||||
|
|
||||||
|
# 기준 1: 최초 시각 (30점)
|
||||||
|
ts_rank = timestamps[i] - min_ts
|
||||||
|
s += 30.0 * (1.0 - min(ts_rank, 7200) / 7200)
|
||||||
|
|
||||||
|
# 기준 2: 총톤수 (25점) — 외부 DB 연동 전까지 균등 배점
|
||||||
|
s += 12.5
|
||||||
|
|
||||||
|
# 기준 3: 클러스터 중심 근접성 (20점)
|
||||||
|
dist_center = haversine_nm(v['lat'], v['lon'], centroid_lat, centroid_lon)
|
||||||
|
s += 20.0 * (1.0 - min(dist_center, 10) / 10)
|
||||||
|
|
||||||
|
# 기준 4: 기선 최근접 (15점)
|
||||||
|
dist_base = dist_to_baseline(v['lat'], v['lon'])
|
||||||
|
s += 15.0 * (1.0 - min(dist_base, 12) / 12)
|
||||||
|
|
||||||
|
# 기준 5: AIS 소실 이력 (10점) — 이력 없으면 만점
|
||||||
|
s += 10.0
|
||||||
|
|
||||||
|
scores[mmsi] = round(s, 2)
|
||||||
|
|
||||||
|
lead_mmsi = max(scores, key=lambda k: scores[k])
|
||||||
|
score_vals = sorted(scores.values(), reverse=True)
|
||||||
|
|
||||||
|
if len(score_vals) > 1 and score_vals[0] - score_vals[1] > 15:
|
||||||
|
confidence = 'HIGH'
|
||||||
|
elif len(score_vals) > 1 and score_vals[0] - score_vals[1] > 8:
|
||||||
|
confidence = 'MED'
|
||||||
|
else:
|
||||||
|
confidence = 'LOW'
|
||||||
|
|
||||||
|
return {
|
||||||
|
'lead_mmsi': lead_mmsi,
|
||||||
|
'lead_score': scores[lead_mmsi],
|
||||||
|
'all_scores': scores,
|
||||||
|
'confidence': confidence,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def assign_fleet_roles(
|
||||||
|
vessel_dfs: dict[str, pd.DataFrame],
|
||||||
|
cluster_map: dict[str, int],
|
||||||
|
) -> dict[str, dict]:
|
||||||
|
"""선단 역할 할당: LEADER/MEMBER/NOISE."""
|
||||||
|
results: dict[str, dict] = {}
|
||||||
|
|
||||||
|
# 클러스터별 그룹핑
|
||||||
|
clusters: dict[int, list[str]] = {}
|
||||||
|
for mmsi, cid in cluster_map.items():
|
||||||
|
clusters.setdefault(cid, []).append(mmsi)
|
||||||
|
|
||||||
|
for cid, mmsi_list in clusters.items():
|
||||||
|
if cid == -1:
|
||||||
|
for mmsi in mmsi_list:
|
||||||
|
results[mmsi] = {
|
||||||
|
'cluster_size': 0,
|
||||||
|
'is_leader': False,
|
||||||
|
'fleet_role': 'NOISE',
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
|
||||||
|
cluster_size = len(mmsi_list)
|
||||||
|
|
||||||
|
# 스냅샷 생성 (각 선박의 마지막 포인트)
|
||||||
|
snapshots: list[dict] = []
|
||||||
|
for mmsi in mmsi_list:
|
||||||
|
df = vessel_dfs.get(mmsi)
|
||||||
|
if df is not None and len(df) > 0:
|
||||||
|
last = df.iloc[-1]
|
||||||
|
snapshots.append({
|
||||||
|
'mmsi': mmsi,
|
||||||
|
'lat': last['lat'],
|
||||||
|
'lon': last['lon'],
|
||||||
|
'timestamp': last.get('timestamp', pd.Timestamp.now()),
|
||||||
|
})
|
||||||
|
|
||||||
|
lead_info = identify_lead_vessel(snapshots) if len(snapshots) >= 2 else {}
|
||||||
|
lead_mmsi = lead_info.get('lead_mmsi')
|
||||||
|
|
||||||
|
for mmsi in mmsi_list:
|
||||||
|
results[mmsi] = {
|
||||||
|
'cluster_size': cluster_size,
|
||||||
|
'is_leader': mmsi == lead_mmsi,
|
||||||
|
'fleet_role': 'LEADER' if mmsi == lead_mmsi else 'MEMBER',
|
||||||
|
}
|
||||||
|
|
||||||
|
return results
|
||||||
93
prediction/algorithms/location.py
Normal file
93
prediction/algorithms/location.py
Normal file
@ -0,0 +1,93 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import math
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import List, Optional, Tuple
|
||||||
|
|
||||||
|
EARTH_RADIUS_NM = 3440.065
|
||||||
|
TERRITORIAL_SEA_NM = 12.0
|
||||||
|
CONTIGUOUS_ZONE_NM = 24.0
|
||||||
|
|
||||||
|
_baseline_points: Optional[List[Tuple[float, float]]] = None
|
||||||
|
|
||||||
|
|
||||||
|
def _load_baseline() -> List[Tuple[float, float]]:
|
||||||
|
global _baseline_points
|
||||||
|
if _baseline_points is not None:
|
||||||
|
return _baseline_points
|
||||||
|
path = Path(__file__).parent.parent / 'data' / 'korea_baseline.json'
|
||||||
|
with open(path, 'r') as f:
|
||||||
|
data = json.load(f)
|
||||||
|
_baseline_points = [(p['lat'], p['lon']) for p in data['points']]
|
||||||
|
return _baseline_points
|
||||||
|
|
||||||
|
|
||||||
|
def haversine_nm(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
|
||||||
|
"""두 좌표 간 거리 (해리)."""
|
||||||
|
R = EARTH_RADIUS_NM
|
||||||
|
phi1, phi2 = math.radians(lat1), math.radians(lat2)
|
||||||
|
dphi = math.radians(lat2 - lat1)
|
||||||
|
dlam = math.radians(lon2 - lon1)
|
||||||
|
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
|
||||||
|
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
|
||||||
|
|
||||||
|
|
||||||
|
def dist_to_baseline(vessel_lat: float, vessel_lon: float,
|
||||||
|
baseline_points: Optional[List[Tuple[float, float]]] = None) -> float:
|
||||||
|
"""선박 좌표에서 기선까지 최소 거리 (NM)."""
|
||||||
|
if baseline_points is None:
|
||||||
|
baseline_points = _load_baseline()
|
||||||
|
min_dist = float('inf')
|
||||||
|
for bp_lat, bp_lon in baseline_points:
|
||||||
|
d = haversine_nm(vessel_lat, vessel_lon, bp_lat, bp_lon)
|
||||||
|
if d < min_dist:
|
||||||
|
min_dist = d
|
||||||
|
return min_dist
|
||||||
|
|
||||||
|
|
||||||
|
def classify_zone(vessel_lat: float, vessel_lon: float) -> dict:
|
||||||
|
"""선박 위치 수역 분류."""
|
||||||
|
dist = dist_to_baseline(vessel_lat, vessel_lon)
|
||||||
|
|
||||||
|
if dist <= TERRITORIAL_SEA_NM:
|
||||||
|
return {
|
||||||
|
'zone': 'TERRITORIAL_SEA',
|
||||||
|
'dist_from_baseline_nm': round(dist, 2),
|
||||||
|
'violation': True,
|
||||||
|
'alert_level': 'CRITICAL',
|
||||||
|
}
|
||||||
|
elif dist <= CONTIGUOUS_ZONE_NM:
|
||||||
|
return {
|
||||||
|
'zone': 'CONTIGUOUS_ZONE',
|
||||||
|
'dist_from_baseline_nm': round(dist, 2),
|
||||||
|
'violation': False,
|
||||||
|
'alert_level': 'WATCH',
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
return {
|
||||||
|
'zone': 'EEZ_OR_BEYOND',
|
||||||
|
'dist_from_baseline_nm': round(dist, 2),
|
||||||
|
'violation': False,
|
||||||
|
'alert_level': 'NORMAL',
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def bd09_to_wgs84(bd_lat: float, bd_lon: float) -> tuple[float, float]:
|
||||||
|
"""BD-09 좌표계를 WGS84로 변환."""
|
||||||
|
x = bd_lon - 0.0065
|
||||||
|
y = bd_lat - 0.006
|
||||||
|
z = math.sqrt(x ** 2 + y ** 2) - 0.00002 * math.sin(y * 52.35987756)
|
||||||
|
theta = math.atan2(y, x) - 0.000003 * math.cos(x * 52.35987756)
|
||||||
|
gcj_lon = z * math.cos(theta)
|
||||||
|
gcj_lat = z * math.sin(theta)
|
||||||
|
wgs_lat = gcj_lat - 0.0023
|
||||||
|
wgs_lon = gcj_lon - 0.0059
|
||||||
|
return wgs_lat, wgs_lon
|
||||||
|
|
||||||
|
|
||||||
|
def compute_bd09_offset(lat: float, lon: float) -> float:
|
||||||
|
"""BD09 좌표와 WGS84 좌표 간 오프셋 (미터)."""
|
||||||
|
wgs_lat, wgs_lon = bd09_to_wgs84(lat, lon)
|
||||||
|
dist_nm = haversine_nm(lat, lon, wgs_lat, wgs_lon)
|
||||||
|
return round(dist_nm * 1852.0, 1) # NM to meters
|
||||||
75
prediction/algorithms/risk.py
Normal file
75
prediction/algorithms/risk.py
Normal file
@ -0,0 +1,75 @@
|
|||||||
|
from typing import Optional, Tuple
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
from algorithms.location import classify_zone
|
||||||
|
from algorithms.fishing_pattern import detect_fishing_segments, detect_trawl_uturn
|
||||||
|
from algorithms.dark_vessel import detect_ais_gaps
|
||||||
|
from algorithms.spoofing import detect_teleportation
|
||||||
|
|
||||||
|
|
||||||
|
def compute_vessel_risk_score(
|
||||||
|
mmsi: str,
|
||||||
|
df_vessel: pd.DataFrame,
|
||||||
|
zone_info: Optional[dict] = None,
|
||||||
|
is_permitted: Optional[bool] = None,
|
||||||
|
) -> Tuple[int, str]:
|
||||||
|
"""선박별 종합 위반 위험도 (0~100점).
|
||||||
|
|
||||||
|
Returns: (risk_score, risk_level)
|
||||||
|
"""
|
||||||
|
if len(df_vessel) == 0:
|
||||||
|
return 0, 'LOW'
|
||||||
|
|
||||||
|
score = 0
|
||||||
|
|
||||||
|
# 1. 위치 기반 (최대 40점)
|
||||||
|
if zone_info is None:
|
||||||
|
last = df_vessel.iloc[-1]
|
||||||
|
zone_info = classify_zone(last['lat'], last['lon'])
|
||||||
|
|
||||||
|
zone = zone_info.get('zone', '')
|
||||||
|
if zone == 'TERRITORIAL_SEA':
|
||||||
|
score += 40
|
||||||
|
elif zone == 'CONTIGUOUS_ZONE':
|
||||||
|
score += 10
|
||||||
|
|
||||||
|
# 2. 조업 행위 (최대 30점)
|
||||||
|
segs = detect_fishing_segments(df_vessel)
|
||||||
|
ts_fishing = [s for s in segs if s.get('in_territorial_sea')]
|
||||||
|
if ts_fishing:
|
||||||
|
score += 20
|
||||||
|
elif segs:
|
||||||
|
score += 5
|
||||||
|
|
||||||
|
uturn = detect_trawl_uturn(df_vessel)
|
||||||
|
if uturn.get('trawl_suspected'):
|
||||||
|
score += 10
|
||||||
|
|
||||||
|
# 3. AIS 조작 (최대 35점)
|
||||||
|
teleports = detect_teleportation(df_vessel)
|
||||||
|
if teleports:
|
||||||
|
score += 20
|
||||||
|
|
||||||
|
gaps = detect_ais_gaps(df_vessel)
|
||||||
|
critical_gaps = [g for g in gaps if g['gap_min'] >= 60]
|
||||||
|
if critical_gaps:
|
||||||
|
score += 15
|
||||||
|
elif gaps:
|
||||||
|
score += 5
|
||||||
|
|
||||||
|
# 4. 허가 이력 (최대 20점)
|
||||||
|
if is_permitted is not None and not is_permitted:
|
||||||
|
score += 20
|
||||||
|
|
||||||
|
score = min(score, 100)
|
||||||
|
|
||||||
|
if score >= 70:
|
||||||
|
level = 'CRITICAL'
|
||||||
|
elif score >= 50:
|
||||||
|
level = 'HIGH'
|
||||||
|
elif score >= 30:
|
||||||
|
level = 'MEDIUM'
|
||||||
|
else:
|
||||||
|
level = 'LOW'
|
||||||
|
|
||||||
|
return score, level
|
||||||
80
prediction/algorithms/spoofing.py
Normal file
80
prediction/algorithms/spoofing.py
Normal file
@ -0,0 +1,80 @@
|
|||||||
|
import pandas as pd
|
||||||
|
from algorithms.location import haversine_nm, bd09_to_wgs84, compute_bd09_offset # noqa: F401
|
||||||
|
|
||||||
|
MAX_FISHING_SPEED_KNOTS = 25.0
|
||||||
|
|
||||||
|
|
||||||
|
def detect_teleportation(df_vessel: pd.DataFrame,
|
||||||
|
max_speed_knots: float = MAX_FISHING_SPEED_KNOTS) -> list[dict]:
|
||||||
|
"""연속 AIS 포인트 간 물리적 불가능 이동 탐지."""
|
||||||
|
if len(df_vessel) < 2:
|
||||||
|
return []
|
||||||
|
|
||||||
|
anomalies = []
|
||||||
|
records = df_vessel.sort_values('timestamp').to_dict('records')
|
||||||
|
|
||||||
|
for i in range(1, len(records)):
|
||||||
|
prev, curr = records[i - 1], records[i]
|
||||||
|
dist_nm = haversine_nm(prev['lat'], prev['lon'], curr['lat'], curr['lon'])
|
||||||
|
dt_hours = (
|
||||||
|
pd.Timestamp(curr['timestamp']) - pd.Timestamp(prev['timestamp'])
|
||||||
|
).total_seconds() / 3600
|
||||||
|
|
||||||
|
if dt_hours <= 0:
|
||||||
|
continue
|
||||||
|
|
||||||
|
implied_speed = dist_nm / dt_hours
|
||||||
|
|
||||||
|
if implied_speed > max_speed_knots:
|
||||||
|
anomalies.append({
|
||||||
|
'idx': i,
|
||||||
|
'dist_nm': round(dist_nm, 2),
|
||||||
|
'implied_kn': round(implied_speed, 1),
|
||||||
|
'type': 'TELEPORTATION',
|
||||||
|
'confidence': 'HIGH' if implied_speed > 50 else 'MED',
|
||||||
|
})
|
||||||
|
|
||||||
|
return anomalies
|
||||||
|
|
||||||
|
|
||||||
|
def count_speed_jumps(df_vessel: pd.DataFrame, threshold_knots: float = 10.0) -> int:
|
||||||
|
"""연속 SOG 급변 횟수."""
|
||||||
|
if len(df_vessel) < 2:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
sog = df_vessel['sog'].values
|
||||||
|
jumps = 0
|
||||||
|
for i in range(1, len(sog)):
|
||||||
|
if abs(sog[i] - sog[i - 1]) > threshold_knots:
|
||||||
|
jumps += 1
|
||||||
|
return jumps
|
||||||
|
|
||||||
|
|
||||||
|
def compute_spoofing_score(df_vessel: pd.DataFrame) -> float:
|
||||||
|
"""종합 GPS 스푸핑 점수 (0~1)."""
|
||||||
|
if len(df_vessel) < 2:
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
score = 0.0
|
||||||
|
n = len(df_vessel)
|
||||||
|
|
||||||
|
# 순간이동 비율
|
||||||
|
teleports = detect_teleportation(df_vessel)
|
||||||
|
if teleports:
|
||||||
|
score += min(0.4, len(teleports) / n * 10)
|
||||||
|
|
||||||
|
# SOG 급변 비율
|
||||||
|
jumps = count_speed_jumps(df_vessel)
|
||||||
|
if jumps > 0:
|
||||||
|
score += min(0.3, jumps / n * 5)
|
||||||
|
|
||||||
|
# BD09 오프셋 (중국 좌표 사용 의심)
|
||||||
|
mid_idx = len(df_vessel) // 2
|
||||||
|
row = df_vessel.iloc[mid_idx]
|
||||||
|
offset = compute_bd09_offset(row['lat'], row['lon'])
|
||||||
|
if offset > 300: # 300m 이상
|
||||||
|
score += 0.3
|
||||||
|
elif offset > 100:
|
||||||
|
score += 0.1
|
||||||
|
|
||||||
|
return round(min(score, 1.0), 4)
|
||||||
0
prediction/cache/__init__.py
vendored
Normal file
0
prediction/cache/__init__.py
vendored
Normal file
335
prediction/cache/vessel_store.py
vendored
Normal file
335
prediction/cache/vessel_store.py
vendored
Normal file
@ -0,0 +1,335 @@
|
|||||||
|
import logging
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_STATIC_REFRESH_INTERVAL_MIN = 60
|
||||||
|
_PERMIT_REFRESH_INTERVAL_MIN = 30
|
||||||
|
_EARTH_RADIUS_NM = 3440.065
|
||||||
|
_MAX_REASONABLE_SOG = 30.0
|
||||||
|
_CHINESE_MMSI_PREFIX = '412'
|
||||||
|
|
||||||
|
|
||||||
|
def _compute_sog_cog(df: pd.DataFrame) -> pd.DataFrame:
|
||||||
|
"""Compute SOG (knots) and COG (degrees) from consecutive lat/lon/timestamp points."""
|
||||||
|
df = df.sort_values(['mmsi', 'timestamp']).copy()
|
||||||
|
|
||||||
|
lat1 = np.radians(df['lat'].values[:-1])
|
||||||
|
lon1 = np.radians(df['lon'].values[:-1])
|
||||||
|
lat2 = np.radians(df['lat'].values[1:])
|
||||||
|
lon2 = np.radians(df['lon'].values[1:])
|
||||||
|
|
||||||
|
# Haversine distance (nautical miles)
|
||||||
|
dlat = lat2 - lat1
|
||||||
|
dlon = lon2 - lon1
|
||||||
|
a = np.sin(dlat / 2) ** 2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2) ** 2
|
||||||
|
dist_nm = _EARTH_RADIUS_NM * 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a))
|
||||||
|
|
||||||
|
# Time difference (hours)
|
||||||
|
ts = df['timestamp'].values
|
||||||
|
dt_sec = (ts[1:] - ts[:-1]).astype('timedelta64[s]').astype(float)
|
||||||
|
dt_hours = dt_sec / 3600.0
|
||||||
|
dt_hours[dt_hours <= 0] = np.nan
|
||||||
|
|
||||||
|
# SOG = dist / time (knots)
|
||||||
|
computed_sog = dist_nm / dt_hours
|
||||||
|
computed_sog = np.clip(np.nan_to_num(computed_sog, nan=0.0), 0, _MAX_REASONABLE_SOG)
|
||||||
|
|
||||||
|
# COG = bearing (degrees)
|
||||||
|
x = np.sin(dlon) * np.cos(lat2)
|
||||||
|
y = np.cos(lat1) * np.sin(lat2) - np.sin(lat1) * np.cos(lat2) * np.cos(dlon)
|
||||||
|
bearing = (np.degrees(np.arctan2(x, y)) + 360) % 360
|
||||||
|
|
||||||
|
# Append last value (copy from previous)
|
||||||
|
sog_arr = np.append(computed_sog, computed_sog[-1:] if len(computed_sog) > 0 else [0])
|
||||||
|
cog_arr = np.append(bearing, bearing[-1:] if len(bearing) > 0 else [0])
|
||||||
|
|
||||||
|
# Reset at MMSI boundaries
|
||||||
|
mmsi_vals = df['mmsi'].values
|
||||||
|
boundary = np.where(mmsi_vals[:-1] != mmsi_vals[1:])[0]
|
||||||
|
for idx in boundary:
|
||||||
|
sog_arr[idx + 1] = df['raw_sog'].iloc[idx + 1] if 'raw_sog' in df.columns else 0
|
||||||
|
cog_arr[idx + 1] = 0
|
||||||
|
|
||||||
|
# Where computed SOG is 0 or NaN, fall back to raw_sog
|
||||||
|
df['sog'] = sog_arr
|
||||||
|
if 'raw_sog' in df.columns:
|
||||||
|
mask = (df['sog'] == 0) | np.isnan(df['sog'])
|
||||||
|
df.loc[mask, 'sog'] = df.loc[mask, 'raw_sog'].fillna(0)
|
||||||
|
|
||||||
|
df['cog'] = cog_arr
|
||||||
|
return df
|
||||||
|
|
||||||
|
|
||||||
|
class VesselStore:
|
||||||
|
"""In-memory vessel trajectory store for Korean waters vessel data.
|
||||||
|
|
||||||
|
Maintains a 24-hour sliding window of all vessel tracks and supports
|
||||||
|
incremental 5-minute updates. Chinese vessel (MMSI 412*) filtering
|
||||||
|
is applied only at analysis target selection time.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self._tracks: dict[str, pd.DataFrame] = {}
|
||||||
|
self._last_bucket: Optional[datetime] = None
|
||||||
|
self._static_info: dict[str, dict] = {}
|
||||||
|
self._permit_set: set[str] = set()
|
||||||
|
self._static_refreshed_at: Optional[datetime] = None
|
||||||
|
self._permit_refreshed_at: Optional[datetime] = None
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Public load / update methods
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
|
def load_initial(self, hours: int = 24) -> None:
|
||||||
|
"""Load all Korean waters vessel data for the past N hours.
|
||||||
|
|
||||||
|
Fetches a bulk DataFrame from snpdb, groups by MMSI, and stores
|
||||||
|
each vessel's track separately. Also triggers static info and
|
||||||
|
permit registry refresh.
|
||||||
|
"""
|
||||||
|
from db import snpdb
|
||||||
|
|
||||||
|
logger.info('loading initial vessel tracks (last %dh)...', hours)
|
||||||
|
try:
|
||||||
|
df_all = snpdb.fetch_all_tracks(hours)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error('fetch_all_tracks failed: %s', e)
|
||||||
|
return
|
||||||
|
|
||||||
|
if df_all.empty:
|
||||||
|
logger.warning('fetch_all_tracks returned empty DataFrame')
|
||||||
|
return
|
||||||
|
|
||||||
|
# Rename sog column to raw_sog to preserve original AIS-reported speed
|
||||||
|
if 'sog' in df_all.columns and 'raw_sog' not in df_all.columns:
|
||||||
|
df_all = df_all.rename(columns={'sog': 'raw_sog'})
|
||||||
|
|
||||||
|
self._tracks = {}
|
||||||
|
for mmsi, group in df_all.groupby('mmsi'):
|
||||||
|
self._tracks[str(mmsi)] = group.reset_index(drop=True)
|
||||||
|
|
||||||
|
vessel_count = len(self._tracks)
|
||||||
|
point_count = sum(len(v) for v in self._tracks.values())
|
||||||
|
logger.info(
|
||||||
|
'initial load complete: %d vessels, %d total points',
|
||||||
|
vessel_count,
|
||||||
|
point_count,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.refresh_static_info()
|
||||||
|
self.refresh_permit_registry()
|
||||||
|
|
||||||
|
def merge_incremental(self, df_new: pd.DataFrame) -> None:
|
||||||
|
"""Merge a new batch of vessel positions into the in-memory store.
|
||||||
|
|
||||||
|
Deduplicates by timestamp within each MMSI and updates _last_bucket.
|
||||||
|
"""
|
||||||
|
if df_new.empty:
|
||||||
|
logger.debug('merge_incremental called with empty DataFrame, skipping')
|
||||||
|
return
|
||||||
|
|
||||||
|
if 'sog' in df_new.columns and 'raw_sog' not in df_new.columns:
|
||||||
|
df_new = df_new.rename(columns={'sog': 'raw_sog'})
|
||||||
|
|
||||||
|
new_buckets: list[datetime] = []
|
||||||
|
|
||||||
|
for mmsi, group in df_new.groupby('mmsi'):
|
||||||
|
mmsi_str = str(mmsi)
|
||||||
|
if mmsi_str in self._tracks:
|
||||||
|
combined = pd.concat([self._tracks[mmsi_str], group], ignore_index=True)
|
||||||
|
combined = combined.drop_duplicates(subset=['timestamp'])
|
||||||
|
self._tracks[mmsi_str] = combined.reset_index(drop=True)
|
||||||
|
else:
|
||||||
|
self._tracks[mmsi_str] = group.reset_index(drop=True)
|
||||||
|
|
||||||
|
if 'time_bucket' in group.columns and not group['time_bucket'].empty:
|
||||||
|
bucket_vals = pd.to_datetime(group['time_bucket'].dropna())
|
||||||
|
if not bucket_vals.empty:
|
||||||
|
new_buckets.append(bucket_vals.max().to_pydatetime())
|
||||||
|
|
||||||
|
if new_buckets:
|
||||||
|
latest = max(new_buckets)
|
||||||
|
if isinstance(latest, datetime) and latest.tzinfo is None:
|
||||||
|
latest = latest.replace(tzinfo=timezone.utc)
|
||||||
|
if self._last_bucket is None or latest > self._last_bucket:
|
||||||
|
self._last_bucket = latest
|
||||||
|
|
||||||
|
logger.debug(
|
||||||
|
'incremental merge done: %d mmsis in batch, store has %d vessels',
|
||||||
|
df_new['mmsi'].nunique(),
|
||||||
|
len(self._tracks),
|
||||||
|
)
|
||||||
|
|
||||||
|
def evict_stale(self, hours: int = 24) -> None:
|
||||||
|
"""Remove track points older than N hours and evict empty MMSI entries."""
|
||||||
|
import datetime as _dt
|
||||||
|
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
cutoff_aware = now - _dt.timedelta(hours=hours)
|
||||||
|
cutoff_naive = cutoff_aware.replace(tzinfo=None)
|
||||||
|
|
||||||
|
before_total = sum(len(v) for v in self._tracks.values())
|
||||||
|
evicted_mmsis: list[str] = []
|
||||||
|
|
||||||
|
for mmsi in list(self._tracks.keys()):
|
||||||
|
df = self._tracks[mmsi]
|
||||||
|
ts_col = df['timestamp']
|
||||||
|
# Handle tz-aware and tz-naive timestamps uniformly
|
||||||
|
if hasattr(ts_col.dtype, 'tz') and ts_col.dtype.tz is not None:
|
||||||
|
mask = ts_col >= pd.Timestamp(cutoff_aware)
|
||||||
|
else:
|
||||||
|
mask = ts_col >= pd.Timestamp(cutoff_naive)
|
||||||
|
filtered = df[mask].reset_index(drop=True)
|
||||||
|
if filtered.empty:
|
||||||
|
del self._tracks[mmsi]
|
||||||
|
evicted_mmsis.append(mmsi)
|
||||||
|
else:
|
||||||
|
self._tracks[mmsi] = filtered
|
||||||
|
|
||||||
|
after_total = sum(len(v) for v in self._tracks.values())
|
||||||
|
logger.info(
|
||||||
|
'eviction complete: removed %d points, evicted %d mmsis (threshold=%dh)',
|
||||||
|
before_total - after_total,
|
||||||
|
len(evicted_mmsis),
|
||||||
|
hours,
|
||||||
|
)
|
||||||
|
|
||||||
|
def refresh_static_info(self) -> None:
|
||||||
|
"""Fetch vessel static info (type, name, dimensions) from snpdb.
|
||||||
|
|
||||||
|
Skips refresh if called within the last 60 minutes.
|
||||||
|
"""
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
if self._static_refreshed_at is not None:
|
||||||
|
elapsed_min = (now - self._static_refreshed_at).total_seconds() / 60
|
||||||
|
if elapsed_min < _STATIC_REFRESH_INTERVAL_MIN:
|
||||||
|
logger.debug(
|
||||||
|
'static info refresh skipped (%.1f min since last refresh)',
|
||||||
|
elapsed_min,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
if not self._tracks:
|
||||||
|
logger.debug('no tracks in store, skipping static info refresh')
|
||||||
|
return
|
||||||
|
|
||||||
|
from db import snpdb
|
||||||
|
|
||||||
|
mmsi_list = list(self._tracks.keys())
|
||||||
|
try:
|
||||||
|
info = snpdb.fetch_static_info(mmsi_list)
|
||||||
|
self._static_info.update(info)
|
||||||
|
self._static_refreshed_at = now
|
||||||
|
logger.info('static info refreshed: %d vessels', len(info))
|
||||||
|
except Exception as e:
|
||||||
|
logger.error('fetch_static_info failed: %s', e)
|
||||||
|
|
||||||
|
def refresh_permit_registry(self) -> None:
|
||||||
|
"""Fetch permitted Chinese fishing vessel MMSIs from snpdb.
|
||||||
|
|
||||||
|
Skips refresh if called within the last 30 minutes.
|
||||||
|
"""
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
if self._permit_refreshed_at is not None:
|
||||||
|
elapsed_min = (now - self._permit_refreshed_at).total_seconds() / 60
|
||||||
|
if elapsed_min < _PERMIT_REFRESH_INTERVAL_MIN:
|
||||||
|
logger.debug(
|
||||||
|
'permit registry refresh skipped (%.1f min since last refresh)',
|
||||||
|
elapsed_min,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
from db import snpdb
|
||||||
|
|
||||||
|
try:
|
||||||
|
mmsis = snpdb.fetch_permit_mmsis()
|
||||||
|
self._permit_set = set(mmsis)
|
||||||
|
self._permit_refreshed_at = now
|
||||||
|
logger.info('permit registry refreshed: %d permitted vessels', len(self._permit_set))
|
||||||
|
except Exception as e:
|
||||||
|
logger.error('fetch_permit_mmsis failed: %s', e)
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Analysis target selection
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
|
def select_analysis_targets(self) -> pd.DataFrame:
|
||||||
|
"""Build a combined DataFrame of Chinese vessel tracks with computed SOG/COG.
|
||||||
|
|
||||||
|
Filters to MMSI starting with '412', computes SOG and COG from
|
||||||
|
consecutive lat/lon/timestamp pairs using the haversine formula,
|
||||||
|
and falls back to raw_sog where computed values are zero or NaN.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
DataFrame with columns: mmsi, timestamp, lat, lon, sog, cog
|
||||||
|
"""
|
||||||
|
chinese_mmsis = [m for m in self._tracks if m.startswith(_CHINESE_MMSI_PREFIX)]
|
||||||
|
if not chinese_mmsis:
|
||||||
|
logger.info('no Chinese vessels (412*) found in store')
|
||||||
|
return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'sog', 'cog'])
|
||||||
|
|
||||||
|
frames = [self._tracks[m] for m in chinese_mmsis]
|
||||||
|
combined = pd.concat(frames, ignore_index=True)
|
||||||
|
|
||||||
|
required_cols = {'mmsi', 'timestamp', 'lat', 'lon'}
|
||||||
|
missing = required_cols - set(combined.columns)
|
||||||
|
if missing:
|
||||||
|
logger.error('combined DataFrame missing required columns: %s', missing)
|
||||||
|
return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'sog', 'cog'])
|
||||||
|
|
||||||
|
result = _compute_sog_cog(combined)
|
||||||
|
|
||||||
|
output_cols = ['mmsi', 'timestamp', 'lat', 'lon', 'sog', 'cog']
|
||||||
|
available = [c for c in output_cols if c in result.columns]
|
||||||
|
return result[available].reset_index(drop=True)
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Lookup helpers
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
|
def is_permitted(self, mmsi: str) -> bool:
|
||||||
|
"""Return True if the given MMSI is in the permitted Chinese fishing vessel registry."""
|
||||||
|
return mmsi in self._permit_set
|
||||||
|
|
||||||
|
def get_vessel_info(self, mmsi: str) -> dict:
|
||||||
|
"""Return static vessel info dict for the given MMSI, or empty dict if not found."""
|
||||||
|
return self._static_info.get(mmsi, {})
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Properties
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
|
@property
|
||||||
|
def last_bucket(self) -> Optional[datetime]:
|
||||||
|
"""Return the latest time bucket seen across all merged incremental batches."""
|
||||||
|
return self._last_bucket
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Diagnostics
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
|
def stats(self) -> dict:
|
||||||
|
"""Return store statistics for health/status reporting."""
|
||||||
|
total_points = sum(len(v) for v in self._tracks.values())
|
||||||
|
chinese_count = sum(1 for m in self._tracks if m.startswith(_CHINESE_MMSI_PREFIX))
|
||||||
|
|
||||||
|
# Rough memory estimate: each row ~200 bytes across columns
|
||||||
|
memory_mb = round((total_points * 200) / (1024 * 1024), 2)
|
||||||
|
|
||||||
|
return {
|
||||||
|
'vessels': len(self._tracks),
|
||||||
|
'points': total_points,
|
||||||
|
'memory_mb': memory_mb,
|
||||||
|
'last_bucket': self._last_bucket.isoformat() if self._last_bucket else None,
|
||||||
|
'targets': chinese_count,
|
||||||
|
'permitted': len(self._permit_set),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# Module-level singleton
|
||||||
|
vessel_store = VesselStore()
|
||||||
40
prediction/config.py
Normal file
40
prediction/config.py
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
from pydantic_settings import BaseSettings
|
||||||
|
|
||||||
|
|
||||||
|
class Settings(BaseSettings):
|
||||||
|
# snpdb (궤적 데이터 소스)
|
||||||
|
SNPDB_HOST: str = '211.208.115.83'
|
||||||
|
SNPDB_PORT: int = 5432
|
||||||
|
SNPDB_NAME: str = 'snpdb'
|
||||||
|
SNPDB_USER: str = 'snp'
|
||||||
|
SNPDB_PASSWORD: str
|
||||||
|
|
||||||
|
# kcgdb (분석 결과 저장)
|
||||||
|
KCGDB_HOST: str = '211.208.115.83'
|
||||||
|
KCGDB_PORT: int = 5432
|
||||||
|
KCGDB_NAME: str = 'kcgdb'
|
||||||
|
KCGDB_SCHEMA: str = 'kcg'
|
||||||
|
KCGDB_USER: str = 'kcg_app'
|
||||||
|
KCGDB_PASSWORD: str
|
||||||
|
|
||||||
|
# 스케줄러
|
||||||
|
SCHEDULER_INTERVAL_MIN: int = 5
|
||||||
|
|
||||||
|
# 인메모리 캐시
|
||||||
|
CACHE_WINDOW_HOURS: int = 24
|
||||||
|
INITIAL_LOAD_HOURS: int = 24
|
||||||
|
STATIC_INFO_REFRESH_MIN: int = 60
|
||||||
|
PERMIT_REFRESH_MIN: int = 30
|
||||||
|
|
||||||
|
# 파이프라인
|
||||||
|
TRAJECTORY_HOURS: int = 6
|
||||||
|
MMSI_PREFIX: str = '412'
|
||||||
|
MIN_TRAJ_POINTS: int = 100
|
||||||
|
|
||||||
|
# 로깅
|
||||||
|
LOG_LEVEL: str = 'INFO'
|
||||||
|
|
||||||
|
model_config = {'env_file': '.env', 'env_file_encoding': 'utf-8', 'extra': 'ignore'}
|
||||||
|
|
||||||
|
|
||||||
|
settings = Settings()
|
||||||
1
prediction/data/korea_baseline.json
Normal file
1
prediction/data/korea_baseline.json
Normal file
@ -0,0 +1 @@
|
|||||||
|
{"points": [{"lat": 37.0, "lon": 124.0}, {"lat": 35.0, "lon": 129.0}]}
|
||||||
0
prediction/db/__init__.py
Normal file
0
prediction/db/__init__.py
Normal file
134
prediction/db/kcgdb.py
Normal file
134
prediction/db/kcgdb.py
Normal file
@ -0,0 +1,134 @@
|
|||||||
|
import logging
|
||||||
|
from contextlib import contextmanager
|
||||||
|
from typing import TYPE_CHECKING, Optional
|
||||||
|
|
||||||
|
import psycopg2
|
||||||
|
from psycopg2 import pool
|
||||||
|
from psycopg2.extras import execute_values
|
||||||
|
|
||||||
|
from config import settings
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from models.result import AnalysisResult
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_pool: Optional[pool.ThreadedConnectionPool] = None
|
||||||
|
|
||||||
|
|
||||||
|
def init_pool():
|
||||||
|
global _pool
|
||||||
|
_pool = pool.ThreadedConnectionPool(
|
||||||
|
minconn=1,
|
||||||
|
maxconn=3,
|
||||||
|
host=settings.KCGDB_HOST,
|
||||||
|
port=settings.KCGDB_PORT,
|
||||||
|
dbname=settings.KCGDB_NAME,
|
||||||
|
user=settings.KCGDB_USER,
|
||||||
|
password=settings.KCGDB_PASSWORD,
|
||||||
|
options=f'-c search_path={settings.KCGDB_SCHEMA},public',
|
||||||
|
)
|
||||||
|
logger.info('kcgdb connection pool initialized')
|
||||||
|
|
||||||
|
|
||||||
|
def close_pool():
|
||||||
|
global _pool
|
||||||
|
if _pool:
|
||||||
|
_pool.closeall()
|
||||||
|
_pool = None
|
||||||
|
logger.info('kcgdb connection pool closed')
|
||||||
|
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def get_conn():
|
||||||
|
conn = _pool.getconn()
|
||||||
|
try:
|
||||||
|
yield conn
|
||||||
|
except Exception:
|
||||||
|
conn.rollback()
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
_pool.putconn(conn)
|
||||||
|
|
||||||
|
|
||||||
|
def check_health() -> bool:
|
||||||
|
try:
|
||||||
|
with get_conn() as conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
cur.execute('SELECT 1')
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
logger.error('kcgdb health check failed: %s', e)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def upsert_results(results: list['AnalysisResult']) -> int:
|
||||||
|
"""분석 결과를 vessel_analysis_results 테이블에 upsert."""
|
||||||
|
if not results:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
insert_sql = """
|
||||||
|
INSERT INTO vessel_analysis_results (
|
||||||
|
mmsi, timestamp, vessel_type, confidence, fishing_pct,
|
||||||
|
cluster_id, season, zone, dist_to_baseline_nm, activity_state,
|
||||||
|
ucaf_score, ucft_score, is_dark, gap_duration_min,
|
||||||
|
spoofing_score, bd09_offset_m, speed_jump_count,
|
||||||
|
cluster_size, is_leader, fleet_role,
|
||||||
|
risk_score, risk_level, features, analyzed_at
|
||||||
|
) VALUES %s
|
||||||
|
ON CONFLICT (mmsi, timestamp) DO UPDATE SET
|
||||||
|
vessel_type = EXCLUDED.vessel_type,
|
||||||
|
confidence = EXCLUDED.confidence,
|
||||||
|
fishing_pct = EXCLUDED.fishing_pct,
|
||||||
|
cluster_id = EXCLUDED.cluster_id,
|
||||||
|
season = EXCLUDED.season,
|
||||||
|
zone = EXCLUDED.zone,
|
||||||
|
dist_to_baseline_nm = EXCLUDED.dist_to_baseline_nm,
|
||||||
|
activity_state = EXCLUDED.activity_state,
|
||||||
|
ucaf_score = EXCLUDED.ucaf_score,
|
||||||
|
ucft_score = EXCLUDED.ucft_score,
|
||||||
|
is_dark = EXCLUDED.is_dark,
|
||||||
|
gap_duration_min = EXCLUDED.gap_duration_min,
|
||||||
|
spoofing_score = EXCLUDED.spoofing_score,
|
||||||
|
bd09_offset_m = EXCLUDED.bd09_offset_m,
|
||||||
|
speed_jump_count = EXCLUDED.speed_jump_count,
|
||||||
|
cluster_size = EXCLUDED.cluster_size,
|
||||||
|
is_leader = EXCLUDED.is_leader,
|
||||||
|
fleet_role = EXCLUDED.fleet_role,
|
||||||
|
risk_score = EXCLUDED.risk_score,
|
||||||
|
risk_level = EXCLUDED.risk_level,
|
||||||
|
features = EXCLUDED.features,
|
||||||
|
analyzed_at = EXCLUDED.analyzed_at
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
with get_conn() as conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
tuples = [r.to_db_tuple() for r in results]
|
||||||
|
execute_values(cur, insert_sql, tuples, page_size=100)
|
||||||
|
conn.commit()
|
||||||
|
count = len(tuples)
|
||||||
|
logger.info('upserted %d analysis results', count)
|
||||||
|
return count
|
||||||
|
except Exception as e:
|
||||||
|
logger.error('failed to upsert results: %s', e)
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
def cleanup_old(hours: int = 48) -> int:
|
||||||
|
"""오래된 분석 결과 삭제."""
|
||||||
|
try:
|
||||||
|
with get_conn() as conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
cur.execute(
|
||||||
|
'DELETE FROM vessel_analysis_results WHERE analyzed_at < NOW() - (%s * INTERVAL \'1 hour\')',
|
||||||
|
(hours,),
|
||||||
|
)
|
||||||
|
deleted = cur.rowcount
|
||||||
|
conn.commit()
|
||||||
|
if deleted > 0:
|
||||||
|
logger.info('cleaned up %d old results (older than %dh)', deleted, hours)
|
||||||
|
return deleted
|
||||||
|
except Exception as e:
|
||||||
|
logger.error('failed to cleanup old results: %s', e)
|
||||||
|
return 0
|
||||||
187
prediction/db/snpdb.py
Normal file
187
prediction/db/snpdb.py
Normal file
@ -0,0 +1,187 @@
|
|||||||
|
import logging
|
||||||
|
from contextlib import contextmanager
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
import psycopg2
|
||||||
|
from psycopg2 import pool
|
||||||
|
|
||||||
|
from config import settings
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_pool: Optional[pool.ThreadedConnectionPool] = None
|
||||||
|
|
||||||
|
|
||||||
|
def init_pool():
|
||||||
|
global _pool
|
||||||
|
_pool = pool.ThreadedConnectionPool(
|
||||||
|
minconn=1,
|
||||||
|
maxconn=3,
|
||||||
|
host=settings.SNPDB_HOST,
|
||||||
|
port=settings.SNPDB_PORT,
|
||||||
|
dbname=settings.SNPDB_NAME,
|
||||||
|
user=settings.SNPDB_USER,
|
||||||
|
password=settings.SNPDB_PASSWORD,
|
||||||
|
)
|
||||||
|
logger.info('snpdb connection pool initialized')
|
||||||
|
|
||||||
|
|
||||||
|
def close_pool():
|
||||||
|
global _pool
|
||||||
|
if _pool:
|
||||||
|
_pool.closeall()
|
||||||
|
_pool = None
|
||||||
|
logger.info('snpdb connection pool closed')
|
||||||
|
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def get_conn():
|
||||||
|
conn = _pool.getconn()
|
||||||
|
try:
|
||||||
|
yield conn
|
||||||
|
finally:
|
||||||
|
_pool.putconn(conn)
|
||||||
|
|
||||||
|
|
||||||
|
def check_health() -> bool:
|
||||||
|
try:
|
||||||
|
with get_conn() as conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
cur.execute('SELECT 1')
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
logger.error('snpdb health check failed: %s', e)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def fetch_all_tracks(hours: int = 24) -> pd.DataFrame:
|
||||||
|
"""한국 해역 전 선박의 궤적 포인트를 조회한다.
|
||||||
|
|
||||||
|
LineStringM 지오메트리에서 개별 포인트를 추출하며,
|
||||||
|
한국 해역(124-132E, 32-39N) 내 최근 N시간 데이터를 반환한다.
|
||||||
|
"""
|
||||||
|
query = f"""
|
||||||
|
SELECT
|
||||||
|
t.mmsi,
|
||||||
|
to_timestamp(ST_M((dp).geom)) as timestamp,
|
||||||
|
ST_Y((dp).geom) as lat,
|
||||||
|
ST_X((dp).geom) as lon,
|
||||||
|
CASE
|
||||||
|
WHEN (dp).path[1] = 1 THEN (t.start_position->>'sog')::float
|
||||||
|
ELSE COALESCE((t.end_position->>'sog')::float, t.avg_speed::float)
|
||||||
|
END as raw_sog
|
||||||
|
FROM signal.t_vessel_tracks_5min t,
|
||||||
|
LATERAL ST_DumpPoints(t.track_geom) dp
|
||||||
|
WHERE t.time_bucket >= NOW() - INTERVAL '{hours} hours'
|
||||||
|
AND t.track_geom && ST_MakeEnvelope(124, 32, 132, 39, 4326)
|
||||||
|
ORDER BY t.mmsi, to_timestamp(ST_M((dp).geom))
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
with get_conn() as conn:
|
||||||
|
df = pd.read_sql_query(query, conn)
|
||||||
|
logger.info(
|
||||||
|
'fetch_all_tracks: %d rows, %d vessels (last %dh)',
|
||||||
|
len(df),
|
||||||
|
df['mmsi'].nunique() if len(df) > 0 else 0,
|
||||||
|
hours,
|
||||||
|
)
|
||||||
|
return df
|
||||||
|
except Exception as e:
|
||||||
|
logger.error('fetch_all_tracks failed: %s', e)
|
||||||
|
return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'raw_sog'])
|
||||||
|
|
||||||
|
|
||||||
|
def fetch_incremental(last_bucket: datetime) -> pd.DataFrame:
|
||||||
|
"""last_bucket 이후의 신규 궤적 포인트를 조회한다.
|
||||||
|
|
||||||
|
스케줄러 증분 업데이트에 사용되며, time_bucket > last_bucket 조건으로
|
||||||
|
이미 처리한 버킷을 건너뛴다.
|
||||||
|
"""
|
||||||
|
query = """
|
||||||
|
SELECT
|
||||||
|
t.mmsi,
|
||||||
|
to_timestamp(ST_M((dp).geom)) as timestamp,
|
||||||
|
ST_Y((dp).geom) as lat,
|
||||||
|
ST_X((dp).geom) as lon,
|
||||||
|
CASE
|
||||||
|
WHEN (dp).path[1] = 1 THEN (t.start_position->>'sog')::float
|
||||||
|
ELSE COALESCE((t.end_position->>'sog')::float, t.avg_speed::float)
|
||||||
|
END as raw_sog
|
||||||
|
FROM signal.t_vessel_tracks_5min t,
|
||||||
|
LATERAL ST_DumpPoints(t.track_geom) dp
|
||||||
|
WHERE t.time_bucket > %s
|
||||||
|
AND t.track_geom && ST_MakeEnvelope(124, 32, 132, 39, 4326)
|
||||||
|
ORDER BY t.mmsi, to_timestamp(ST_M((dp).geom))
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
with get_conn() as conn:
|
||||||
|
df = pd.read_sql_query(query, conn, params=(last_bucket,))
|
||||||
|
logger.info(
|
||||||
|
'fetch_incremental: %d rows, %d vessels (since %s)',
|
||||||
|
len(df),
|
||||||
|
df['mmsi'].nunique() if len(df) > 0 else 0,
|
||||||
|
last_bucket.isoformat(),
|
||||||
|
)
|
||||||
|
return df
|
||||||
|
except Exception as e:
|
||||||
|
logger.error('fetch_incremental failed: %s', e)
|
||||||
|
return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'raw_sog'])
|
||||||
|
|
||||||
|
|
||||||
|
def fetch_static_info(mmsi_list: list[str]) -> dict[str, dict]:
|
||||||
|
"""MMSI 목록에 해당하는 선박 정적 정보를 조회한다.
|
||||||
|
|
||||||
|
DISTINCT ON (mmsi)로 최신 레코드만 반환한다.
|
||||||
|
"""
|
||||||
|
query = """
|
||||||
|
SELECT DISTINCT ON (mmsi) mmsi, name, vessel_type, length, width
|
||||||
|
FROM signal.t_vessel_static
|
||||||
|
WHERE mmsi = ANY(%s)
|
||||||
|
ORDER BY mmsi, time_bucket DESC
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
with get_conn() as conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
cur.execute(query, (mmsi_list,))
|
||||||
|
rows = cur.fetchall()
|
||||||
|
result = {
|
||||||
|
row[0]: {
|
||||||
|
'name': row[1],
|
||||||
|
'vessel_type': row[2],
|
||||||
|
'length': row[3],
|
||||||
|
'width': row[4],
|
||||||
|
}
|
||||||
|
for row in rows
|
||||||
|
}
|
||||||
|
logger.info('fetch_static_info: %d vessels resolved', len(result))
|
||||||
|
return result
|
||||||
|
except Exception as e:
|
||||||
|
logger.error('fetch_static_info failed: %s', e)
|
||||||
|
return {}
|
||||||
|
|
||||||
|
|
||||||
|
def fetch_permit_mmsis() -> set[str]:
|
||||||
|
"""중국 허가어선 MMSI 목록을 조회한다.
|
||||||
|
|
||||||
|
signal.t_chnprmship_positions 테이블에서 DISTINCT mmsi를 반환한다.
|
||||||
|
"""
|
||||||
|
query = """
|
||||||
|
SELECT DISTINCT mmsi FROM signal.t_chnprmship_positions
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
with get_conn() as conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
cur.execute(query)
|
||||||
|
rows = cur.fetchall()
|
||||||
|
result = {row[0] for row in rows}
|
||||||
|
logger.info('fetch_permit_mmsis: %d permitted vessels', len(result))
|
||||||
|
return result
|
||||||
|
except Exception as e:
|
||||||
|
logger.error('fetch_permit_mmsis failed: %s', e)
|
||||||
|
return set()
|
||||||
25
prediction/env.example
Normal file
25
prediction/env.example
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
# snpdb (궤적 데이터 소스)
|
||||||
|
SNPDB_HOST=211.208.115.83
|
||||||
|
SNPDB_PORT=5432
|
||||||
|
SNPDB_NAME=snpdb
|
||||||
|
SNPDB_USER=snp
|
||||||
|
SNPDB_PASSWORD=snp#8932
|
||||||
|
|
||||||
|
# kcgdb (분석 결과 저장)
|
||||||
|
KCGDB_HOST=211.208.115.83
|
||||||
|
KCGDB_PORT=5432
|
||||||
|
KCGDB_NAME=kcgdb
|
||||||
|
KCGDB_SCHEMA=kcg
|
||||||
|
KCGDB_USER=kcg_app
|
||||||
|
KCGDB_PASSWORD=Kcg2026monitor
|
||||||
|
|
||||||
|
# 스케줄러
|
||||||
|
SCHEDULER_INTERVAL_MIN=5
|
||||||
|
|
||||||
|
# 파이프라인
|
||||||
|
TRAJECTORY_HOURS=6
|
||||||
|
MMSI_PREFIX=412
|
||||||
|
MIN_TRAJ_POINTS=100
|
||||||
|
|
||||||
|
# 로깅
|
||||||
|
LOG_LEVEL=INFO
|
||||||
@ -1,8 +1,66 @@
|
|||||||
from fastapi import FastAPI
|
import logging
|
||||||
|
import sys
|
||||||
|
from contextlib import asynccontextmanager
|
||||||
|
|
||||||
app = FastAPI(title="KCG Prediction Service", version="0.1.0")
|
from fastapi import BackgroundTasks, FastAPI
|
||||||
|
|
||||||
|
from config import settings
|
||||||
|
from db import kcgdb, snpdb
|
||||||
|
from scheduler import get_last_run, run_analysis_cycle, start_scheduler, stop_scheduler
|
||||||
|
|
||||||
|
logging.basicConfig(
|
||||||
|
level=getattr(logging, settings.LOG_LEVEL, logging.INFO),
|
||||||
|
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s',
|
||||||
|
stream=sys.stdout,
|
||||||
|
)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@app.get("/health")
|
@asynccontextmanager
|
||||||
|
async def lifespan(application: FastAPI):
|
||||||
|
from cache.vessel_store import vessel_store
|
||||||
|
|
||||||
|
logger.info('starting KCG Prediction Service')
|
||||||
|
snpdb.init_pool()
|
||||||
|
kcgdb.init_pool()
|
||||||
|
|
||||||
|
# 인메모리 캐시 초기 로드 (24시간)
|
||||||
|
logger.info('loading initial vessel data (%dh)...', settings.INITIAL_LOAD_HOURS)
|
||||||
|
vessel_store.load_initial(settings.INITIAL_LOAD_HOURS)
|
||||||
|
logger.info('initial load complete: %s', vessel_store.stats())
|
||||||
|
|
||||||
|
start_scheduler()
|
||||||
|
yield
|
||||||
|
stop_scheduler()
|
||||||
|
snpdb.close_pool()
|
||||||
|
kcgdb.close_pool()
|
||||||
|
logger.info('KCG Prediction Service stopped')
|
||||||
|
|
||||||
|
|
||||||
|
app = FastAPI(
|
||||||
|
title='KCG Prediction Service',
|
||||||
|
version='2.0.0',
|
||||||
|
lifespan=lifespan,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@app.get('/health')
|
||||||
def health_check():
|
def health_check():
|
||||||
return {"status": "ok"}
|
from cache.vessel_store import vessel_store
|
||||||
|
return {
|
||||||
|
'status': 'ok',
|
||||||
|
'snpdb': snpdb.check_health(),
|
||||||
|
'kcgdb': kcgdb.check_health(),
|
||||||
|
'store': vessel_store.stats(),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@app.get('/api/v1/analysis/status')
|
||||||
|
def analysis_status():
|
||||||
|
return get_last_run()
|
||||||
|
|
||||||
|
|
||||||
|
@app.post('/api/v1/analysis/trigger')
|
||||||
|
def trigger_analysis(background_tasks: BackgroundTasks):
|
||||||
|
background_tasks.add_task(run_analysis_cycle)
|
||||||
|
return {'message': 'analysis cycle triggered'}
|
||||||
|
|||||||
0
prediction/models/__init__.py
Normal file
0
prediction/models/__init__.py
Normal file
38
prediction/models/ais.py
Normal file
38
prediction/models/ais.py
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
from dataclasses import dataclass, field
|
||||||
|
from typing import List, Dict
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class AISPoint:
|
||||||
|
mmsi: str
|
||||||
|
ts: pd.Timestamp
|
||||||
|
lat: float
|
||||||
|
lon: float
|
||||||
|
sog: float
|
||||||
|
cog: float
|
||||||
|
state: str = 'UNKNOWN'
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class VesselTrajectory:
|
||||||
|
mmsi: str
|
||||||
|
points: List[AISPoint] = field(default_factory=list)
|
||||||
|
vessel_type: str = 'UNKNOWN'
|
||||||
|
cluster_id: int = -1
|
||||||
|
season: str = 'UNKNOWN'
|
||||||
|
fishing_pct: float = 0.0
|
||||||
|
features: Dict = field(default_factory=dict)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ClassificationResult:
|
||||||
|
mmsi: str
|
||||||
|
vessel_type: str
|
||||||
|
confidence: float
|
||||||
|
dominant_state: str
|
||||||
|
fishing_pct: float
|
||||||
|
cluster_id: int
|
||||||
|
season: str
|
||||||
|
feature_vector: Dict
|
||||||
84
prediction/models/result.py
Normal file
84
prediction/models/result.py
Normal file
@ -0,0 +1,84 @@
|
|||||||
|
from dataclasses import dataclass, field
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class AnalysisResult:
|
||||||
|
"""vessel_analysis_results 테이블 28컬럼 매핑."""
|
||||||
|
|
||||||
|
mmsi: str
|
||||||
|
timestamp: datetime
|
||||||
|
|
||||||
|
# 분류 결과
|
||||||
|
vessel_type: str = 'UNKNOWN'
|
||||||
|
confidence: float = 0.0
|
||||||
|
fishing_pct: float = 0.0
|
||||||
|
cluster_id: int = -1
|
||||||
|
season: str = 'UNKNOWN'
|
||||||
|
|
||||||
|
# ALGO 01: 위치
|
||||||
|
zone: str = 'EEZ_OR_BEYOND'
|
||||||
|
dist_to_baseline_nm: float = 999.0
|
||||||
|
|
||||||
|
# ALGO 02: 활동 상태
|
||||||
|
activity_state: str = 'UNKNOWN'
|
||||||
|
ucaf_score: float = 0.0
|
||||||
|
ucft_score: float = 0.0
|
||||||
|
|
||||||
|
# ALGO 03: 다크 베셀
|
||||||
|
is_dark: bool = False
|
||||||
|
gap_duration_min: int = 0
|
||||||
|
|
||||||
|
# ALGO 04: GPS 스푸핑
|
||||||
|
spoofing_score: float = 0.0
|
||||||
|
bd09_offset_m: float = 0.0
|
||||||
|
speed_jump_count: int = 0
|
||||||
|
|
||||||
|
# ALGO 05+06: 선단
|
||||||
|
cluster_size: int = 0
|
||||||
|
is_leader: bool = False
|
||||||
|
fleet_role: str = 'NOISE'
|
||||||
|
|
||||||
|
# ALGO 07: 위험도
|
||||||
|
risk_score: int = 0
|
||||||
|
risk_level: str = 'LOW'
|
||||||
|
|
||||||
|
# 특징 벡터
|
||||||
|
features: dict = field(default_factory=dict)
|
||||||
|
|
||||||
|
# 메타
|
||||||
|
analyzed_at: Optional[datetime] = None
|
||||||
|
|
||||||
|
def __post_init__(self):
|
||||||
|
if self.analyzed_at is None:
|
||||||
|
self.analyzed_at = datetime.now(timezone.utc)
|
||||||
|
|
||||||
|
def to_db_tuple(self) -> tuple:
|
||||||
|
import json
|
||||||
|
return (
|
||||||
|
self.mmsi,
|
||||||
|
self.timestamp,
|
||||||
|
self.vessel_type,
|
||||||
|
self.confidence,
|
||||||
|
self.fishing_pct,
|
||||||
|
self.cluster_id,
|
||||||
|
self.season,
|
||||||
|
self.zone,
|
||||||
|
self.dist_to_baseline_nm,
|
||||||
|
self.activity_state,
|
||||||
|
self.ucaf_score,
|
||||||
|
self.ucft_score,
|
||||||
|
self.is_dark,
|
||||||
|
self.gap_duration_min,
|
||||||
|
self.spoofing_score,
|
||||||
|
self.bd09_offset_m,
|
||||||
|
self.speed_jump_count,
|
||||||
|
self.cluster_size,
|
||||||
|
self.is_leader,
|
||||||
|
self.fleet_role,
|
||||||
|
self.risk_score,
|
||||||
|
self.risk_level,
|
||||||
|
json.dumps(self.features),
|
||||||
|
self.analyzed_at,
|
||||||
|
)
|
||||||
0
prediction/pipeline/__init__.py
Normal file
0
prediction/pipeline/__init__.py
Normal file
31
prediction/pipeline/behavior.py
Normal file
31
prediction/pipeline/behavior.py
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
import pandas as pd
|
||||||
|
from pipeline.constants import SOG_STATIONARY_MAX, SOG_FISHING_MAX
|
||||||
|
|
||||||
|
|
||||||
|
class BehaviorDetector:
|
||||||
|
"""
|
||||||
|
속도 기반 3단계 행동 분류 (Yan et al. 2022, Natale et al. 2015)
|
||||||
|
정박(STATIONARY) / 조업(FISHING) / 항행(SAILING)
|
||||||
|
"""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def classify_point(sog: float) -> str:
|
||||||
|
if sog < SOG_STATIONARY_MAX:
|
||||||
|
return 'STATIONARY'
|
||||||
|
elif sog <= SOG_FISHING_MAX:
|
||||||
|
return 'FISHING'
|
||||||
|
else:
|
||||||
|
return 'SAILING'
|
||||||
|
|
||||||
|
def detect(self, df: pd.DataFrame) -> pd.DataFrame:
|
||||||
|
df = df.copy()
|
||||||
|
df['state'] = df['sog'].apply(self.classify_point)
|
||||||
|
return df
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def compute_fishing_ratio(df_vessel: pd.DataFrame) -> float:
|
||||||
|
total = len(df_vessel)
|
||||||
|
if total == 0:
|
||||||
|
return 0.0
|
||||||
|
fishing = (df_vessel['state'] == 'FISHING').sum()
|
||||||
|
return round(fishing / total * 100, 2)
|
||||||
100
prediction/pipeline/classifier.py
Normal file
100
prediction/pipeline/classifier.py
Normal file
@ -0,0 +1,100 @@
|
|||||||
|
import pandas as pd
|
||||||
|
from typing import Dict, Tuple
|
||||||
|
|
||||||
|
|
||||||
|
class VesselTypeClassifier:
|
||||||
|
"""
|
||||||
|
Rule-based scoring classifier for fishing vessel types.
|
||||||
|
|
||||||
|
Scoring: for each feature in a type's profile, if the value falls within
|
||||||
|
the defined range a distance-based score is added (closer to the range
|
||||||
|
centre = higher score). Values outside the range incur a penalty.
|
||||||
|
Returns (vessel_type, confidence).
|
||||||
|
|
||||||
|
TRAWL — trawling speed 2.5–4.5 kt, high COG variation
|
||||||
|
PURSE — purse-seine speed 3–5 kt, circular COG pattern
|
||||||
|
LONGLINE — longline speed 0.5–2 kt, low COG variation, long fishing runs
|
||||||
|
TRAP — trap/pot speed ~0 kt, many stationary events, short range
|
||||||
|
"""
|
||||||
|
|
||||||
|
PROFILES: Dict[str, Dict[str, Tuple[float, float]]] = {
|
||||||
|
'TRAWL': {
|
||||||
|
'sog_fishing_mean': (2.5, 4.5),
|
||||||
|
'cog_change_mean': (0.15, 9.9),
|
||||||
|
'fishing_pct': (0.3, 0.7),
|
||||||
|
'fishing_run_mean': (5, 50),
|
||||||
|
'stationary_events': (0, 5),
|
||||||
|
},
|
||||||
|
'PURSE': {
|
||||||
|
'sog_fishing_mean': (3.0, 5.0),
|
||||||
|
'cog_circularity': (0.2, 1.0),
|
||||||
|
'fishing_pct': (0.1, 0.5),
|
||||||
|
'fishing_run_mean': (3, 30),
|
||||||
|
'stationary_events': (0, 3),
|
||||||
|
},
|
||||||
|
'LONGLINE': {
|
||||||
|
'sog_fishing_mean': (0.5, 2.5),
|
||||||
|
'cog_change_mean': (0.0, 0.15),
|
||||||
|
'fishing_pct': (0.4, 0.9),
|
||||||
|
'fishing_run_mean': (20, 999),
|
||||||
|
'stationary_events': (0, 10),
|
||||||
|
},
|
||||||
|
'TRAP': {
|
||||||
|
'sog_fishing_mean': (0.0, 2.0),
|
||||||
|
'stationary_pct': (0.2, 0.8),
|
||||||
|
'stationary_events': (5, 999),
|
||||||
|
'fishing_run_mean': (1, 10),
|
||||||
|
'total_distance_km': (0, 100),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
def classify(self, features: Dict) -> Tuple[str, float]:
|
||||||
|
"""Classify a vessel from its feature dict.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(vessel_type, confidence) where confidence is in [0, 1].
|
||||||
|
"""
|
||||||
|
if not features:
|
||||||
|
return 'UNKNOWN', 0.0
|
||||||
|
|
||||||
|
scores: Dict[str, float] = {}
|
||||||
|
for vtype, profile in self.PROFILES.items():
|
||||||
|
score = 0.0
|
||||||
|
matched = 0
|
||||||
|
for feat_name, (lo, hi) in profile.items():
|
||||||
|
val = features.get(feat_name)
|
||||||
|
if val is None:
|
||||||
|
continue
|
||||||
|
matched += 1
|
||||||
|
if lo <= val <= hi:
|
||||||
|
mid = (lo + hi) / 2
|
||||||
|
span = (hi - lo) / 2 if (hi - lo) > 0 else 1
|
||||||
|
score += max(0.0, 1 - abs(val - mid) / span)
|
||||||
|
else:
|
||||||
|
overshoot = min(abs(val - lo), abs(val - hi))
|
||||||
|
score -= min(0.5, overshoot / (hi - lo + 1e-9))
|
||||||
|
scores[vtype] = score / matched if matched > 0 else 0.0
|
||||||
|
|
||||||
|
best_type = max(scores, key=lambda k: scores[k])
|
||||||
|
total = sum(max(v, 0.0) for v in scores.values())
|
||||||
|
confidence = scores[best_type] / total if total > 0 else 0.0
|
||||||
|
|
||||||
|
return best_type, round(confidence, 3)
|
||||||
|
|
||||||
|
|
||||||
|
def get_season(ts: pd.Timestamp) -> str:
|
||||||
|
"""Return the Northern-Hemisphere season for a timestamp.
|
||||||
|
|
||||||
|
Reference: paper 12 seasonal activity analysis (Chinese EEZ).
|
||||||
|
Chinese fishing ban period: Yellow Sea / East China Sea May–Sep,
|
||||||
|
South China Sea May–Aug.
|
||||||
|
"""
|
||||||
|
m = ts.month
|
||||||
|
if m in [3, 4, 5]:
|
||||||
|
return 'SPRING'
|
||||||
|
elif m in [6, 7, 8]:
|
||||||
|
return 'SUMMER'
|
||||||
|
elif m in [9, 10, 11]:
|
||||||
|
return 'FALL'
|
||||||
|
else:
|
||||||
|
return 'WINTER'
|
||||||
101
prediction/pipeline/clusterer.py
Normal file
101
prediction/pipeline/clusterer.py
Normal file
@ -0,0 +1,101 @@
|
|||||||
|
from collections import Counter
|
||||||
|
from typing import Dict, Optional
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
from pipeline.constants import BIRCH_THRESHOLD, BIRCH_BRANCHING, MIN_CLUSTER_SIZE
|
||||||
|
|
||||||
|
|
||||||
|
class EnhancedBIRCHClusterer:
|
||||||
|
"""Trajectory clustering using sklearn Birch with a simple K-means fallback.
|
||||||
|
|
||||||
|
Based on the enhanced-BIRCH approach (Yan, Yang et al.):
|
||||||
|
1. Resample each trajectory to a fixed-length vector.
|
||||||
|
2. Build a BIRCH CF-tree for memory-efficient hierarchical clustering.
|
||||||
|
3. Small clusters (< MIN_CLUSTER_SIZE) are relabelled as noise (-1).
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
threshold: float = BIRCH_THRESHOLD,
|
||||||
|
branching: int = BIRCH_BRANCHING,
|
||||||
|
n_clusters: Optional[int] = None,
|
||||||
|
) -> None:
|
||||||
|
self.threshold = threshold
|
||||||
|
self.branching = branching
|
||||||
|
self.n_clusters = n_clusters
|
||||||
|
self._model = None
|
||||||
|
|
||||||
|
def _traj_to_vector(self, df_vessel: pd.DataFrame, n_points: int = 20) -> np.ndarray:
|
||||||
|
"""Convert a vessel trajectory DataFrame to a fixed-length vector.
|
||||||
|
|
||||||
|
Linearly samples n_points from the trajectory and interleaves lat/lon
|
||||||
|
values, then normalises to zero mean / unit variance.
|
||||||
|
"""
|
||||||
|
lats = df_vessel['lat'].values
|
||||||
|
lons = df_vessel['lon'].values
|
||||||
|
idx = np.linspace(0, len(lats) - 1, n_points).astype(int)
|
||||||
|
vec = np.concatenate([lats[idx], lons[idx]])
|
||||||
|
vec = (vec - vec.mean()) / (vec.std() + 1e-9)
|
||||||
|
return vec
|
||||||
|
|
||||||
|
def fit_predict(self, vessels: Dict[str, pd.DataFrame]) -> Dict[str, int]:
|
||||||
|
"""Cluster vessel trajectories.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
vessels: mapping of mmsi -> resampled trajectory DataFrame.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Mapping of mmsi -> cluster_id. Vessels in small clusters are
|
||||||
|
assigned cluster_id -1 (noise). Vessels with fewer than 20
|
||||||
|
points are excluded from the result.
|
||||||
|
"""
|
||||||
|
mmsi_list: list[str] = []
|
||||||
|
vectors: list[np.ndarray] = []
|
||||||
|
|
||||||
|
for mmsi, df_v in vessels.items():
|
||||||
|
if len(df_v) < 20:
|
||||||
|
continue
|
||||||
|
mmsi_list.append(mmsi)
|
||||||
|
vectors.append(self._traj_to_vector(df_v))
|
||||||
|
|
||||||
|
if len(vectors) < 3:
|
||||||
|
return {m: 0 for m in mmsi_list}
|
||||||
|
|
||||||
|
X = np.array(vectors)
|
||||||
|
|
||||||
|
try:
|
||||||
|
from sklearn.cluster import Birch
|
||||||
|
model = Birch(
|
||||||
|
threshold=self.threshold,
|
||||||
|
branching_factor=self.branching,
|
||||||
|
n_clusters=self.n_clusters,
|
||||||
|
)
|
||||||
|
labels = model.fit_predict(X)
|
||||||
|
self._model = model
|
||||||
|
except ImportError:
|
||||||
|
labels = self._simple_cluster(X)
|
||||||
|
|
||||||
|
cnt = Counter(labels)
|
||||||
|
labels = np.array([lbl if cnt[lbl] >= MIN_CLUSTER_SIZE else -1 for lbl in labels])
|
||||||
|
|
||||||
|
return dict(zip(mmsi_list, labels.tolist()))
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _simple_cluster(X: np.ndarray, k: int = 5) -> np.ndarray:
|
||||||
|
"""Fallback K-means used when sklearn is unavailable."""
|
||||||
|
n = len(X)
|
||||||
|
k = min(k, n)
|
||||||
|
centers = X[np.random.choice(n, k, replace=False)]
|
||||||
|
labels = np.zeros(n, dtype=int)
|
||||||
|
for _ in range(20):
|
||||||
|
dists = np.array([[np.linalg.norm(x - c) for c in centers] for x in X])
|
||||||
|
labels = dists.argmin(axis=1)
|
||||||
|
new_centers = np.array(
|
||||||
|
[X[labels == i].mean(axis=0) if (labels == i).any() else centers[i] for i in range(k)]
|
||||||
|
)
|
||||||
|
if np.allclose(centers, new_centers, atol=1e-6):
|
||||||
|
break
|
||||||
|
centers = new_centers
|
||||||
|
return labels
|
||||||
26
prediction/pipeline/constants.py
Normal file
26
prediction/pipeline/constants.py
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
SOG_STATIONARY_MAX = 1.0
|
||||||
|
SOG_FISHING_MAX = 5.0
|
||||||
|
SOG_SAILING_MIN = 5.0
|
||||||
|
|
||||||
|
VESSEL_SOG_PROFILE = {
|
||||||
|
'TRAWL': {'min': 1.5, 'max': 4.5, 'mean': 2.8, 'cog_var': 'high'},
|
||||||
|
'PURSE': {'min': 2.0, 'max': 5.0, 'mean': 3.5, 'cog_var': 'circular'},
|
||||||
|
'LONGLINE': {'min': 0.5, 'max': 3.0, 'mean': 1.8, 'cog_var': 'low'},
|
||||||
|
'TRAP': {'min': 0.0, 'max': 2.0, 'mean': 0.8, 'cog_var': 'very_low'},
|
||||||
|
}
|
||||||
|
|
||||||
|
RESAMPLE_INTERVAL_MIN = 4
|
||||||
|
|
||||||
|
BIRCH_THRESHOLD = 0.35
|
||||||
|
BIRCH_BRANCHING = 50
|
||||||
|
MIN_CLUSTER_SIZE = 5
|
||||||
|
|
||||||
|
MMSI_DIGITS = 9
|
||||||
|
MAX_VESSEL_LENGTH = 300
|
||||||
|
MAX_SOG_KNOTS = 30.0
|
||||||
|
MIN_TRAJ_POINTS = 100
|
||||||
|
|
||||||
|
KR_BOUNDS = {
|
||||||
|
'lat_min': 32.0, 'lat_max': 39.0,
|
||||||
|
'lon_min': 124.0, 'lon_max': 132.0,
|
||||||
|
}
|
||||||
93
prediction/pipeline/features.py
Normal file
93
prediction/pipeline/features.py
Normal file
@ -0,0 +1,93 @@
|
|||||||
|
import math
|
||||||
|
import numpy as np
|
||||||
|
import pandas as pd
|
||||||
|
from typing import Dict
|
||||||
|
|
||||||
|
|
||||||
|
class FeatureExtractor:
|
||||||
|
"""
|
||||||
|
어선 유형 분류를 위한 특징 벡터 추출
|
||||||
|
논문 12 (남중국해 어선 유형 식별) 기반 핵심 피처:
|
||||||
|
- 속도 통계 (mean, std, 분위수)
|
||||||
|
- 침로 변동성 (COG variance → 선회 패턴)
|
||||||
|
- 조업 비율 및 조업 지속 시간
|
||||||
|
- 이동 거리 및 해역 커버리지
|
||||||
|
- 정박 빈도 (투망/양망 간격 추정)
|
||||||
|
"""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def haversine(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
|
||||||
|
"""두 좌표 간 거리 (km)"""
|
||||||
|
R = 6371.0
|
||||||
|
phi1, phi2 = math.radians(lat1), math.radians(lat2)
|
||||||
|
dphi = math.radians(lat2 - lat1)
|
||||||
|
dlam = math.radians(lon2 - lon1)
|
||||||
|
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
|
||||||
|
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
|
||||||
|
|
||||||
|
def extract(self, df_vessel: pd.DataFrame) -> Dict[str, float]:
|
||||||
|
if len(df_vessel) < 10:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
sog = df_vessel['sog'].values
|
||||||
|
cog = df_vessel['cog'].values
|
||||||
|
states = df_vessel['state'].values
|
||||||
|
|
||||||
|
# Speed features
|
||||||
|
fishing_sog = sog[states == 'FISHING'] if (states == 'FISHING').any() else np.array([0])
|
||||||
|
feat: Dict[str, float] = {
|
||||||
|
'sog_mean': float(np.mean(sog)),
|
||||||
|
'sog_std': float(np.std(sog)),
|
||||||
|
'sog_fishing_mean': float(np.mean(fishing_sog)),
|
||||||
|
'sog_fishing_std': float(np.std(fishing_sog)),
|
||||||
|
'sog_q25': float(np.percentile(sog, 25)),
|
||||||
|
'sog_q75': float(np.percentile(sog, 75)),
|
||||||
|
}
|
||||||
|
|
||||||
|
# COG features (선망: 원형, 트롤: 직선왕복, 연승: 부드러운 곡선)
|
||||||
|
cog_diff = np.abs(np.diff(np.unwrap(np.radians(cog))))
|
||||||
|
feat['cog_change_mean'] = float(np.mean(cog_diff))
|
||||||
|
feat['cog_change_std'] = float(np.std(cog_diff))
|
||||||
|
feat['cog_circularity'] = float(np.sum(cog_diff > np.pi / 4) / len(cog_diff))
|
||||||
|
|
||||||
|
# State ratios
|
||||||
|
n = len(states)
|
||||||
|
feat['fishing_pct'] = float((states == 'FISHING').sum() / n)
|
||||||
|
feat['stationary_pct'] = float((states == 'STATIONARY').sum() / n)
|
||||||
|
feat['sailing_pct'] = float((states == 'SAILING').sum() / n)
|
||||||
|
|
||||||
|
# Stationary events (투망·양망 횟수 추정)
|
||||||
|
stationary_events = 0
|
||||||
|
prev = None
|
||||||
|
for s in states:
|
||||||
|
if s == 'STATIONARY' and prev != 'STATIONARY':
|
||||||
|
stationary_events += 1
|
||||||
|
prev = s
|
||||||
|
feat['stationary_events'] = float(stationary_events)
|
||||||
|
|
||||||
|
# Total distance (km)
|
||||||
|
lats = df_vessel['lat'].values
|
||||||
|
lons = df_vessel['lon'].values
|
||||||
|
total_dist = sum(
|
||||||
|
self.haversine(lats[i], lons[i], lats[i + 1], lons[i + 1])
|
||||||
|
for i in range(len(lats) - 1)
|
||||||
|
)
|
||||||
|
feat['total_distance_km'] = round(total_dist, 2)
|
||||||
|
|
||||||
|
# Coverage (바운딩 박스 면적 — 근사)
|
||||||
|
feat['coverage_deg2'] = round(float(np.ptp(lats)) * float(np.ptp(lons)), 4)
|
||||||
|
|
||||||
|
# Average fishing run length
|
||||||
|
fishing_runs = []
|
||||||
|
run = 0
|
||||||
|
for s in states:
|
||||||
|
if s == 'FISHING':
|
||||||
|
run += 1
|
||||||
|
elif run > 0:
|
||||||
|
fishing_runs.append(run)
|
||||||
|
run = 0
|
||||||
|
if run > 0:
|
||||||
|
fishing_runs.append(run)
|
||||||
|
feat['fishing_run_mean'] = float(np.mean(fishing_runs)) if fishing_runs else 0.0
|
||||||
|
|
||||||
|
return feat
|
||||||
95
prediction/pipeline/orchestrator.py
Normal file
95
prediction/pipeline/orchestrator.py
Normal file
@ -0,0 +1,95 @@
|
|||||||
|
import logging
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
from pipeline.preprocessor import AISPreprocessor
|
||||||
|
from pipeline.behavior import BehaviorDetector
|
||||||
|
from pipeline.resampler import TrajectoryResampler
|
||||||
|
from pipeline.features import FeatureExtractor
|
||||||
|
from pipeline.classifier import VesselTypeClassifier, get_season
|
||||||
|
from pipeline.clusterer import EnhancedBIRCHClusterer
|
||||||
|
from pipeline.constants import RESAMPLE_INTERVAL_MIN
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class ChineseFishingVesselPipeline:
|
||||||
|
"""7-step pipeline for classifying Chinese fishing vessel activity types.
|
||||||
|
|
||||||
|
Steps:
|
||||||
|
1. AIS preprocessing (Yan et al. 2022)
|
||||||
|
2. Behaviour-state detection (speed-based 3-class)
|
||||||
|
3. Trajectory resampling (Yan, Yang et al. — 4-minute interval)
|
||||||
|
4. Feature vector extraction (paper 12)
|
||||||
|
5. Vessel-type classification (rule-based scoring)
|
||||||
|
6. Enhanced BIRCH trajectory clustering (Yan, Yang et al.)
|
||||||
|
7. Seasonal activity tagging (paper 12)
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self.preprocessor = AISPreprocessor()
|
||||||
|
self.detector = BehaviorDetector()
|
||||||
|
self.resampler = TrajectoryResampler(RESAMPLE_INTERVAL_MIN)
|
||||||
|
self.extractor = FeatureExtractor()
|
||||||
|
self.classifier = VesselTypeClassifier()
|
||||||
|
self.clusterer = EnhancedBIRCHClusterer()
|
||||||
|
|
||||||
|
def run(
|
||||||
|
self, df_raw: pd.DataFrame
|
||||||
|
) -> tuple[list[dict], dict[str, pd.DataFrame]]:
|
||||||
|
"""Run the 7-step pipeline.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
df_raw: raw AIS DataFrame with columns mmsi, timestamp, lat, lon,
|
||||||
|
sog, cog.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(results, vessel_dfs) where:
|
||||||
|
- results is a list of classification dicts, each containing:
|
||||||
|
mmsi, vessel_type, confidence, fishing_pct, cluster_id, season,
|
||||||
|
n_points, features.
|
||||||
|
- vessel_dfs is a mapping of mmsi -> resampled trajectory DataFrame.
|
||||||
|
"""
|
||||||
|
# Step 1: preprocess
|
||||||
|
df = self.preprocessor.run(df_raw)
|
||||||
|
if len(df) == 0:
|
||||||
|
logger.warning('pipeline: no rows after preprocessing')
|
||||||
|
return [], {}
|
||||||
|
|
||||||
|
# Step 2: behaviour detection
|
||||||
|
df = self.detector.detect(df)
|
||||||
|
|
||||||
|
# Steps 3–5: per-vessel processing
|
||||||
|
vessel_dfs: dict[str, pd.DataFrame] = {}
|
||||||
|
results: list[dict] = []
|
||||||
|
|
||||||
|
for mmsi, df_v in df.groupby('mmsi'):
|
||||||
|
df_resampled = self.resampler.resample(df_v)
|
||||||
|
vessel_dfs[mmsi] = df_resampled
|
||||||
|
|
||||||
|
features = self.extractor.extract(df_resampled)
|
||||||
|
vtype, confidence = self.classifier.classify(features)
|
||||||
|
fishing_pct = BehaviorDetector.compute_fishing_ratio(df_resampled)
|
||||||
|
season = get_season(df_v['timestamp'].iloc[len(df_v) // 2])
|
||||||
|
|
||||||
|
results.append({
|
||||||
|
'mmsi': mmsi,
|
||||||
|
'vessel_type': vtype,
|
||||||
|
'confidence': confidence,
|
||||||
|
'fishing_pct': fishing_pct,
|
||||||
|
'season': season,
|
||||||
|
'n_points': len(df_resampled),
|
||||||
|
'features': features,
|
||||||
|
})
|
||||||
|
|
||||||
|
# Step 6: BIRCH clustering
|
||||||
|
cluster_map = self.clusterer.fit_predict(vessel_dfs)
|
||||||
|
for r in results:
|
||||||
|
r['cluster_id'] = cluster_map.get(r['mmsi'], -1)
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
'pipeline complete: %d vessels, types=%s',
|
||||||
|
len(results),
|
||||||
|
{r['vessel_type'] for r in results},
|
||||||
|
)
|
||||||
|
return results, vessel_dfs
|
||||||
52
prediction/pipeline/preprocessor.py
Normal file
52
prediction/pipeline/preprocessor.py
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
import pandas as pd
|
||||||
|
from collections import defaultdict
|
||||||
|
|
||||||
|
from pipeline.constants import KR_BOUNDS, MAX_SOG_KNOTS, MIN_TRAJ_POINTS
|
||||||
|
|
||||||
|
|
||||||
|
class AISPreprocessor:
|
||||||
|
"""Delete-Supplement-Update (Yan et al. 2022)"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.stats = defaultdict(int)
|
||||||
|
|
||||||
|
def run(self, df: pd.DataFrame) -> pd.DataFrame:
|
||||||
|
original = len(df)
|
||||||
|
|
||||||
|
required = ['mmsi', 'timestamp', 'lat', 'lon', 'sog', 'cog']
|
||||||
|
missing = [c for c in required if c not in df.columns]
|
||||||
|
if missing:
|
||||||
|
raise ValueError(f"필수 컬럼 누락: {missing}")
|
||||||
|
|
||||||
|
df = df.copy()
|
||||||
|
df['timestamp'] = pd.to_datetime(df['timestamp'])
|
||||||
|
|
||||||
|
valid_mmsi = df['mmsi'].astype(str).str.match(r'^\d{9}$')
|
||||||
|
df = df[valid_mmsi]
|
||||||
|
self.stats['invalid_mmsi'] += original - len(df)
|
||||||
|
|
||||||
|
df = df[(df['lat'].between(-90, 90)) & (df['lon'].between(-180, 180))]
|
||||||
|
|
||||||
|
df = df[
|
||||||
|
df['lat'].between(KR_BOUNDS['lat_min'], KR_BOUNDS['lat_max']) &
|
||||||
|
df['lon'].between(KR_BOUNDS['lon_min'], KR_BOUNDS['lon_max'])
|
||||||
|
]
|
||||||
|
|
||||||
|
df = df.sort_values(['mmsi', 'timestamp'])
|
||||||
|
df['sog'] = df.groupby('mmsi')['sog'].transform(
|
||||||
|
lambda x: x.where(
|
||||||
|
x.between(0, MAX_SOG_KNOTS),
|
||||||
|
x.rolling(3, center=True, min_periods=1).mean(),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
df = df[(df['sog'] >= 0) & (df['sog'] <= MAX_SOG_KNOTS)]
|
||||||
|
|
||||||
|
counts = df.groupby('mmsi').size()
|
||||||
|
valid_mmsi_list = counts[counts >= MIN_TRAJ_POINTS].index
|
||||||
|
df = df[df['mmsi'].isin(valid_mmsi_list)]
|
||||||
|
|
||||||
|
df = df.drop_duplicates(subset=['mmsi', 'timestamp'])
|
||||||
|
|
||||||
|
self.stats['final_records'] = len(df)
|
||||||
|
self.stats['retention_pct'] = round(len(df) / max(original, 1) * 100, 2)
|
||||||
|
return df.reset_index(drop=True)
|
||||||
35
prediction/pipeline/resampler.py
Normal file
35
prediction/pipeline/resampler.py
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
import pandas as pd
|
||||||
|
from pipeline.constants import RESAMPLE_INTERVAL_MIN
|
||||||
|
from pipeline.behavior import BehaviorDetector
|
||||||
|
|
||||||
|
|
||||||
|
class TrajectoryResampler:
|
||||||
|
"""
|
||||||
|
불균등 AIS 수신 간격을 균등 시간 간격으로 보간
|
||||||
|
목적: BIRCH 군집화의 입력 벡터 정규화
|
||||||
|
방법: 선형 보간 (위도·경도·SOG·COG)
|
||||||
|
기준: 4분 간격 (Shepperson et al. 2017)
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, interval_min: int = RESAMPLE_INTERVAL_MIN):
|
||||||
|
self.interval = pd.Timedelta(minutes=interval_min)
|
||||||
|
|
||||||
|
def resample(self, df_vessel: pd.DataFrame) -> pd.DataFrame:
|
||||||
|
df_vessel = df_vessel.sort_values('timestamp').copy()
|
||||||
|
if len(df_vessel) < 2:
|
||||||
|
return df_vessel
|
||||||
|
|
||||||
|
t_start = df_vessel['timestamp'].iloc[0]
|
||||||
|
t_end = df_vessel['timestamp'].iloc[-1]
|
||||||
|
new_times = pd.date_range(t_start, t_end, freq=self.interval)
|
||||||
|
|
||||||
|
df_vessel = df_vessel.set_index('timestamp')
|
||||||
|
df_vessel = df_vessel.reindex(df_vessel.index.union(new_times))
|
||||||
|
for col in ['lat', 'lon', 'sog', 'cog']:
|
||||||
|
if col in df_vessel.columns:
|
||||||
|
df_vessel[col] = df_vessel[col].interpolate(method='time')
|
||||||
|
|
||||||
|
df_vessel = df_vessel.loc[new_times].reset_index()
|
||||||
|
df_vessel.rename(columns={'index': 'timestamp'}, inplace=True)
|
||||||
|
df_vessel['state'] = df_vessel['sog'].apply(BehaviorDetector.classify_point)
|
||||||
|
return df_vessel
|
||||||
@ -1,2 +1,8 @@
|
|||||||
fastapi==0.115.0
|
fastapi==0.115.0
|
||||||
uvicorn==0.30.6
|
uvicorn==0.30.6
|
||||||
|
pydantic-settings>=2.0
|
||||||
|
psycopg2-binary>=2.9
|
||||||
|
numpy>=1.26
|
||||||
|
pandas>=2.2
|
||||||
|
scikit-learn>=1.5
|
||||||
|
apscheduler>=3.10
|
||||||
|
|||||||
177
prediction/scheduler.py
Normal file
177
prediction/scheduler.py
Normal file
@ -0,0 +1,177 @@
|
|||||||
|
import logging
|
||||||
|
import time
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from apscheduler.schedulers.background import BackgroundScheduler
|
||||||
|
|
||||||
|
from config import settings
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_scheduler: Optional[BackgroundScheduler] = None
|
||||||
|
_last_run: dict = {
|
||||||
|
'timestamp': None,
|
||||||
|
'duration_sec': 0,
|
||||||
|
'vessel_count': 0,
|
||||||
|
'upserted': 0,
|
||||||
|
'error': None,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def get_last_run() -> dict:
|
||||||
|
return _last_run.copy()
|
||||||
|
|
||||||
|
|
||||||
|
def run_analysis_cycle():
|
||||||
|
"""5분 주기 분석 사이클 — 인메모리 캐시 기반."""
|
||||||
|
from cache.vessel_store import vessel_store
|
||||||
|
from db import snpdb, kcgdb
|
||||||
|
from pipeline.orchestrator import ChineseFishingVesselPipeline
|
||||||
|
from algorithms.location import classify_zone
|
||||||
|
from algorithms.fishing_pattern import compute_ucaf_score, compute_ucft_score
|
||||||
|
from algorithms.dark_vessel import is_dark_vessel
|
||||||
|
from algorithms.spoofing import compute_spoofing_score, count_speed_jumps, compute_bd09_offset
|
||||||
|
from algorithms.fleet import assign_fleet_roles
|
||||||
|
from algorithms.risk import compute_vessel_risk_score
|
||||||
|
from models.result import AnalysisResult
|
||||||
|
|
||||||
|
start = time.time()
|
||||||
|
_last_run['timestamp'] = datetime.now(timezone.utc).isoformat()
|
||||||
|
_last_run['error'] = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
# 1. 증분 로드 + stale 제거
|
||||||
|
if vessel_store.last_bucket is None:
|
||||||
|
logger.warning('last_bucket is None, skipping incremental fetch (initial load not complete)')
|
||||||
|
df_new = None
|
||||||
|
else:
|
||||||
|
df_new = snpdb.fetch_incremental(vessel_store.last_bucket)
|
||||||
|
if df_new is not None and len(df_new) > 0:
|
||||||
|
vessel_store.merge_incremental(df_new)
|
||||||
|
vessel_store.evict_stale(settings.CACHE_WINDOW_HOURS)
|
||||||
|
|
||||||
|
# 정적정보 / 허가어선 주기적 갱신
|
||||||
|
vessel_store.refresh_static_info()
|
||||||
|
vessel_store.refresh_permit_registry()
|
||||||
|
|
||||||
|
# 2. 분석 대상 선별 (SOG/COG 계산 포함)
|
||||||
|
df_targets = vessel_store.select_analysis_targets()
|
||||||
|
if len(df_targets) == 0:
|
||||||
|
logger.info('no analysis targets, skipping cycle')
|
||||||
|
_last_run['vessel_count'] = 0
|
||||||
|
return
|
||||||
|
|
||||||
|
# 3. 7단계 파이프라인 실행
|
||||||
|
pipeline = ChineseFishingVesselPipeline()
|
||||||
|
classifications, vessel_dfs = pipeline.run(df_targets)
|
||||||
|
|
||||||
|
if not classifications:
|
||||||
|
logger.info('no vessels classified, skipping')
|
||||||
|
_last_run['vessel_count'] = 0
|
||||||
|
return
|
||||||
|
|
||||||
|
# 4. 선단 역할 분석
|
||||||
|
cluster_map = {c['mmsi']: c['cluster_id'] for c in classifications}
|
||||||
|
fleet_roles = assign_fleet_roles(vessel_dfs, cluster_map)
|
||||||
|
|
||||||
|
# 5. 선박별 추가 알고리즘 → AnalysisResult 생성
|
||||||
|
results = []
|
||||||
|
for c in classifications:
|
||||||
|
mmsi = c['mmsi']
|
||||||
|
df_v = vessel_dfs.get(mmsi)
|
||||||
|
if df_v is None or len(df_v) == 0:
|
||||||
|
continue
|
||||||
|
|
||||||
|
last_row = df_v.iloc[-1]
|
||||||
|
ts = last_row.get('timestamp')
|
||||||
|
|
||||||
|
zone_info = classify_zone(last_row['lat'], last_row['lon'])
|
||||||
|
|
||||||
|
gear_map = {'TRAWL': 'OT', 'PURSE': 'PS', 'LONGLINE': 'GN', 'TRAP': 'TRAP'}
|
||||||
|
gear = gear_map.get(c['vessel_type'], 'OT')
|
||||||
|
ucaf = compute_ucaf_score(df_v, gear)
|
||||||
|
ucft = compute_ucft_score(df_v)
|
||||||
|
|
||||||
|
dark, gap_min = is_dark_vessel(df_v)
|
||||||
|
|
||||||
|
spoof_score = compute_spoofing_score(df_v)
|
||||||
|
speed_jumps = count_speed_jumps(df_v)
|
||||||
|
bd09_offset = compute_bd09_offset(last_row['lat'], last_row['lon'])
|
||||||
|
|
||||||
|
fleet_info = fleet_roles.get(mmsi, {})
|
||||||
|
|
||||||
|
is_permitted = vessel_store.is_permitted(mmsi)
|
||||||
|
risk_score, risk_level = compute_vessel_risk_score(
|
||||||
|
mmsi, df_v, zone_info, is_permitted=is_permitted,
|
||||||
|
)
|
||||||
|
|
||||||
|
activity = 'UNKNOWN'
|
||||||
|
if 'state' in df_v.columns and len(df_v) > 0:
|
||||||
|
activity = df_v['state'].mode().iloc[0]
|
||||||
|
|
||||||
|
results.append(AnalysisResult(
|
||||||
|
mmsi=mmsi,
|
||||||
|
timestamp=ts,
|
||||||
|
vessel_type=c['vessel_type'],
|
||||||
|
confidence=c['confidence'],
|
||||||
|
fishing_pct=c['fishing_pct'],
|
||||||
|
cluster_id=c['cluster_id'],
|
||||||
|
season=c['season'],
|
||||||
|
zone=zone_info.get('zone', 'EEZ_OR_BEYOND'),
|
||||||
|
dist_to_baseline_nm=zone_info.get('dist_from_baseline_nm', 999.0),
|
||||||
|
activity_state=activity,
|
||||||
|
ucaf_score=ucaf,
|
||||||
|
ucft_score=ucft,
|
||||||
|
is_dark=dark,
|
||||||
|
gap_duration_min=gap_min,
|
||||||
|
spoofing_score=spoof_score,
|
||||||
|
bd09_offset_m=bd09_offset,
|
||||||
|
speed_jump_count=speed_jumps,
|
||||||
|
cluster_size=fleet_info.get('cluster_size', 0),
|
||||||
|
is_leader=fleet_info.get('is_leader', False),
|
||||||
|
fleet_role=fleet_info.get('fleet_role', 'NOISE'),
|
||||||
|
risk_score=risk_score,
|
||||||
|
risk_level=risk_level,
|
||||||
|
features=c.get('features', {}),
|
||||||
|
))
|
||||||
|
|
||||||
|
# 6. 결과 저장
|
||||||
|
upserted = kcgdb.upsert_results(results)
|
||||||
|
kcgdb.cleanup_old(hours=48)
|
||||||
|
|
||||||
|
elapsed = round(time.time() - start, 2)
|
||||||
|
_last_run['duration_sec'] = elapsed
|
||||||
|
_last_run['vessel_count'] = len(results)
|
||||||
|
_last_run['upserted'] = upserted
|
||||||
|
logger.info(
|
||||||
|
'analysis cycle: %d vessels, %d upserted, %.2fs',
|
||||||
|
len(results), upserted, elapsed,
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
_last_run['error'] = str(e)
|
||||||
|
logger.exception('analysis cycle failed: %s', e)
|
||||||
|
|
||||||
|
|
||||||
|
def start_scheduler():
|
||||||
|
global _scheduler
|
||||||
|
_scheduler = BackgroundScheduler()
|
||||||
|
_scheduler.add_job(
|
||||||
|
run_analysis_cycle,
|
||||||
|
'interval',
|
||||||
|
minutes=settings.SCHEDULER_INTERVAL_MIN,
|
||||||
|
id='vessel_analysis',
|
||||||
|
max_instances=1,
|
||||||
|
replace_existing=True,
|
||||||
|
)
|
||||||
|
_scheduler.start()
|
||||||
|
logger.info('scheduler started (interval=%dm)', settings.SCHEDULER_INTERVAL_MIN)
|
||||||
|
|
||||||
|
|
||||||
|
def stop_scheduler():
|
||||||
|
global _scheduler
|
||||||
|
if _scheduler:
|
||||||
|
_scheduler.shutdown(wait=False)
|
||||||
|
_scheduler = None
|
||||||
|
logger.info('scheduler stopped')
|
||||||
불러오는 중...
Reference in New Issue
Block a user