feat: Python 어선 분류기 + 배포 설정 + 백엔드 모니터링 프록시 #87

닫힘
htlee "feature/korea-layers-enhancement 에서 develop 로 4 commits 를 머지하려 합니다"
50개의 변경된 파일3753개의 추가작업 그리고 36개의 파일을 삭제

파일 보기

@ -68,6 +68,7 @@ jobs:
[ -n "$GOOGLE_CLIENT_ID" ] && echo "GOOGLE_CLIENT_ID=${GOOGLE_CLIENT_ID}" >> $DEPLOY_DIR/.env
[ -n "$JWT_SECRET" ] && echo "JWT_SECRET=${JWT_SECRET}" >> $DEPLOY_DIR/.env
[ -n "$DB_PASSWORD" ] && echo "DB_PASSWORD=${DB_PASSWORD}" >> $DEPLOY_DIR/.env
echo "PREDICTION_BASE_URL=http://192.168.1.18:8001" >> $DEPLOY_DIR/.env
# JAR 내부에 application-prod.yml이 있으면 외부 파일 제거
if unzip -l backend/target/kcg.jar | grep -q 'application-prod.yml$'; then
@ -143,6 +144,79 @@ jobs:
sleep 10
done
# ═══ Prediction (FastAPI → redis-211) ═══
- name: Deploy prediction via SSH
env:
DEPLOY_KEY: ${{ secrets.DEPLOY_SSH_KEY }}
PRED_HOST: 192.168.1.18
PRED_PORT: 32023
run: |
mkdir -p ~/.ssh
printf '%s\n' "$DEPLOY_KEY" > ~/.ssh/id_deploy
chmod 600 ~/.ssh/id_deploy
SSH_OPTS="-i ~/.ssh/id_deploy -o StrictHostKeyChecking=no -o ConnectTimeout=10 -o ServerAliveInterval=15 -p $PRED_PORT"
SCP_OPTS="-i ~/.ssh/id_deploy -o StrictHostKeyChecking=no -P $PRED_PORT"
REMOTE_DIR=/home/apps/kcg-prediction
# 코드 전송 (rsync 대체: tar + scp)
tar czf /tmp/prediction.tar.gz -C prediction --exclude='__pycache__' --exclude='venv' --exclude='.env' .
for attempt in 1 2 3; do
echo "SCP prediction attempt $attempt/3..."
if scp $SCP_OPTS /tmp/prediction.tar.gz root@$PRED_HOST:/tmp/prediction.tar.gz; then break; fi
[ "$attempt" -eq 3 ] && { echo "ERROR: SCP failed"; exit 1; }
sleep 10
done
# systemd 서비스 파일 전송
scp $SCP_OPTS deploy/kcg-prediction.service root@$PRED_HOST:/tmp/kcg-prediction.service
# 원격 설치 + 재시작
for attempt in 1 2 3; do
echo "SSH deploy attempt $attempt/3..."
if ssh $SSH_OPTS root@$PRED_HOST bash -s << 'SCRIPT'
set -e
REMOTE_DIR=/home/apps/kcg-prediction
mkdir -p $REMOTE_DIR
cd $REMOTE_DIR
# 코드 배포
tar xzf /tmp/prediction.tar.gz -C $REMOTE_DIR
rm -f /tmp/prediction.tar.gz
# venv + 의존성
python3 -m venv venv 2>/dev/null || true
venv/bin/pip install -r requirements.txt -q
# systemd 서비스 갱신
if ! diff -q /tmp/kcg-prediction.service /etc/systemd/system/kcg-prediction.service >/dev/null 2>&1; then
cp /tmp/kcg-prediction.service /etc/systemd/system/kcg-prediction.service
systemctl daemon-reload
systemctl enable kcg-prediction
fi
rm -f /tmp/kcg-prediction.service
# 재시작
systemctl restart kcg-prediction
# health 확인 (30초)
for i in $(seq 1 6); do
if curl -sf http://localhost:8001/health > /dev/null 2>&1; then
echo "Prediction healthy (${i})"
exit 0
fi
sleep 5
done
echo "WARNING: Prediction health timeout"
journalctl -u kcg-prediction --no-pager -n 10
exit 1
SCRIPT
then exit 0; fi
[ "$attempt" -eq 3 ] && { echo "ERROR: SSH failed"; exit 1; }
sleep 10
done
- name: Cleanup
if: always()
run: rm -f ~/.ssh/id_deploy

파일 보기

@ -23,6 +23,8 @@ public class AuthFilter extends OncePerRequestFilter {
private static final String AUTH_PATH_PREFIX = "/api/auth/";
private static final String SENSOR_PATH_PREFIX = "/api/sensor/";
private static final String CCTV_PATH_PREFIX = "/api/cctv/";
private static final String VESSEL_ANALYSIS_PATH_PREFIX = "/api/vessel-analysis";
private static final String PREDICTION_PATH_PREFIX = "/api/prediction/";
private final JwtProvider jwtProvider;
@ -31,7 +33,9 @@ public class AuthFilter extends OncePerRequestFilter {
String path = request.getRequestURI();
return path.startsWith(AUTH_PATH_PREFIX)
|| path.startsWith(SENSOR_PATH_PREFIX)
|| path.startsWith(CCTV_PATH_PREFIX);
|| path.startsWith(CCTV_PATH_PREFIX)
|| path.startsWith(VESSEL_ANALYSIS_PATH_PREFIX)
|| path.startsWith(PREDICTION_PATH_PREFIX);
}
@Override

파일 보기

@ -43,6 +43,7 @@ public class AppProperties {
private String gdeltBaseUrl = "https://api.gdeltproject.org/api/v2/doc/doc";
private String googleNewsBaseUrl = "https://news.google.com/rss/search";
private String celestrakBaseUrl = "https://celestrak.org/NORAD/elements/gp.php";
private String predictionBaseUrl = "http://localhost:8001";
private int requestDelayMs = 1500;
private int backoffMs = 5000;
}

파일 보기

@ -20,6 +20,7 @@ public class CacheConfig {
public static final String SATELLITES = "satellites";
public static final String SEISMIC = "seismic";
public static final String PRESSURE = "pressure";
public static final String VESSEL_ANALYSIS = "vessel-analysis";
@Bean
public CacheManager cacheManager() {
@ -27,7 +28,8 @@ public class CacheConfig {
AIRCRAFT_IRAN, AIRCRAFT_KOREA,
OSINT_IRAN, OSINT_KOREA,
SATELLITES,
SEISMIC, PRESSURE
SEISMIC, PRESSURE,
VESSEL_ANALYSIS
);
manager.setCaffeine(Caffeine.newBuilder()
.expireAfterWrite(2, TimeUnit.DAYS)

파일 보기

@ -0,0 +1,80 @@
package gc.mda.kcg.domain.analysis;
import gc.mda.kcg.config.AppProperties;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.*;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.client.ResourceAccessException;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
import java.util.Map;
/**
* Prediction 서비스(FastAPI) 상태/트리거 프록시
* GET /api/prediction/health {predictionBaseUrl}/health
* GET /api/prediction/status {predictionBaseUrl}/api/v1/analysis/status
* POST /api/prediction/trigger {predictionBaseUrl}/api/v1/analysis/trigger
*/
@Slf4j
@RestController
@RequestMapping("/api/prediction")
@RequiredArgsConstructor
public class PredictionProxyController {
private final RestTemplate restTemplate;
private final AppProperties appProperties;
@GetMapping("/health")
public ResponseEntity<Object> health() {
String url = appProperties.getCollector().getPredictionBaseUrl() + "/health";
return proxyGet(url);
}
@GetMapping("/status")
public ResponseEntity<Object> status() {
String url = appProperties.getCollector().getPredictionBaseUrl() + "/api/v1/analysis/status";
return proxyGet(url);
}
@PostMapping("/trigger")
public ResponseEntity<Object> trigger(@RequestBody(required = false) Object body) {
String url = appProperties.getCollector().getPredictionBaseUrl() + "/api/v1/analysis/trigger";
return proxyPost(url, body);
}
private ResponseEntity<Object> proxyGet(String url) {
try {
ResponseEntity<Object> upstream = restTemplate.exchange(
url, HttpMethod.GET, null, Object.class);
return ResponseEntity.status(upstream.getStatusCode()).body(upstream.getBody());
} catch (ResourceAccessException e) {
log.warn("Prediction 서비스 접속 불가: {}", e.getMessage());
return unreachable(e.getMessage());
} catch (RestClientException e) {
log.warn("Prediction 서비스 오류: {}", e.getMessage());
return unreachable(e.getMessage());
}
}
private ResponseEntity<Object> proxyPost(String url, Object body) {
try {
HttpEntity<Object> entity = new HttpEntity<>(body);
ResponseEntity<Object> upstream = restTemplate.exchange(
url, HttpMethod.POST, entity, Object.class);
return ResponseEntity.status(upstream.getStatusCode()).body(upstream.getBody());
} catch (ResourceAccessException e) {
log.warn("Prediction 서비스 접속 불가: {}", e.getMessage());
return unreachable(e.getMessage());
} catch (RestClientException e) {
log.warn("Prediction 서비스 오류: {}", e.getMessage());
return unreachable(e.getMessage());
}
}
private ResponseEntity<Object> unreachable(String message) {
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE)
.body(Map.of("status", "unreachable", "error", message != null ? message : "connection failed"));
}
}

파일 보기

@ -0,0 +1,35 @@
package gc.mda.kcg.domain.analysis;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
import java.util.Map;
@RestController
@RequestMapping("/api/vessel-analysis")
@RequiredArgsConstructor
public class VesselAnalysisController {
private final VesselAnalysisService vesselAnalysisService;
/**
* 최근 선박 분석 결과 조회
* @param region 지역 필터 (향후 확장용, 현재 미사용)
*/
@GetMapping
public ResponseEntity<Map<String, Object>> getVesselAnalysis(
@RequestParam(required = false) String region) {
List<VesselAnalysisDto> results = vesselAnalysisService.getLatestResults();
return ResponseEntity.ok(Map.of(
"count", results.size(),
"items", results
));
}
}

파일 보기

@ -0,0 +1,148 @@
package gc.mda.kcg.domain.analysis;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.Builder;
import lombok.Getter;
import java.util.Map;
@Getter
@Builder
@JsonInclude(JsonInclude.Include.NON_NULL)
public class VesselAnalysisDto {
private String mmsi;
private String timestamp;
private Classification classification;
private Algorithms algorithms;
private Map<String, Double> features;
@Getter
@Builder
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class Classification {
private String vesselType;
private Double confidence;
private Double fishingPct;
private Integer clusterId;
private String season;
}
@Getter
@Builder
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class Algorithms {
private LocationInfo location;
private ActivityInfo activity;
private DarkVesselInfo darkVessel;
private GpsSpoofingInfo gpsSpoofing;
private ClusterInfo cluster;
private FleetRoleInfo fleetRole;
private RiskScoreInfo riskScore;
}
@Getter
@Builder
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class LocationInfo {
private String zone;
private Double distToBaselineNm;
}
@Getter
@Builder
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class ActivityInfo {
private String state;
private Double ucafScore;
private Double ucftScore;
}
@Getter
@Builder
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class DarkVesselInfo {
private Boolean isDark;
private Integer gapDurationMin;
}
@Getter
@Builder
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class GpsSpoofingInfo {
private Double spoofingScore;
private Double bd09OffsetM;
private Integer speedJumpCount;
}
@Getter
@Builder
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class ClusterInfo {
private Integer clusterId;
private Integer clusterSize;
}
@Getter
@Builder
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class FleetRoleInfo {
private Boolean isLeader;
private String role;
}
@Getter
@Builder
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class RiskScoreInfo {
private Integer score;
private String level;
}
public static VesselAnalysisDto from(VesselAnalysisResult r) {
return VesselAnalysisDto.builder()
.mmsi(r.getMmsi())
.timestamp(r.getTimestamp().toString())
.classification(Classification.builder()
.vesselType(r.getVesselType())
.confidence(r.getConfidence())
.fishingPct(r.getFishingPct())
.clusterId(r.getClusterId())
.season(r.getSeason())
.build())
.algorithms(Algorithms.builder()
.location(LocationInfo.builder()
.zone(r.getZone())
.distToBaselineNm(r.getDistToBaselineNm())
.build())
.activity(ActivityInfo.builder()
.state(r.getActivityState())
.ucafScore(r.getUcafScore())
.ucftScore(r.getUcftScore())
.build())
.darkVessel(DarkVesselInfo.builder()
.isDark(r.getIsDark())
.gapDurationMin(r.getGapDurationMin())
.build())
.gpsSpoofing(GpsSpoofingInfo.builder()
.spoofingScore(r.getSpoofingScore())
.bd09OffsetM(r.getBd09OffsetM())
.speedJumpCount(r.getSpeedJumpCount())
.build())
.cluster(ClusterInfo.builder()
.clusterId(r.getClusterId())
.clusterSize(r.getClusterSize())
.build())
.fleetRole(FleetRoleInfo.builder()
.isLeader(r.getIsLeader())
.role(r.getFleetRole())
.build())
.riskScore(RiskScoreInfo.builder()
.score(r.getRiskScore())
.level(r.getRiskLevel())
.build())
.build())
.features(r.getFeatures())
.build();
}
}

파일 보기

@ -0,0 +1,97 @@
package gc.mda.kcg.domain.analysis;
import jakarta.persistence.*;
import lombok.*;
import org.hibernate.annotations.JdbcTypeCode;
import org.hibernate.type.SqlTypes;
import java.time.Instant;
import java.util.Map;
@Entity
@Table(name = "vessel_analysis_results", schema = "kcg")
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class VesselAnalysisResult {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false, length = 15)
private String mmsi;
@Column(nullable = false)
private Instant timestamp;
@Column(length = 20)
private String vesselType;
private Double confidence;
private Double fishingPct;
private Integer clusterId;
@Column(length = 10)
private String season;
@Column(length = 20)
private String zone;
private Double distToBaselineNm;
@Column(length = 20)
private String activityState;
private Double ucafScore;
private Double ucftScore;
@Column(nullable = false)
private Boolean isDark;
private Integer gapDurationMin;
private Double spoofingScore;
private Double bd09OffsetM;
private Integer speedJumpCount;
private Integer clusterSize;
@Column(nullable = false)
private Boolean isLeader;
@Column(length = 20)
private String fleetRole;
private Integer riskScore;
@Column(length = 20)
private String riskLevel;
@JdbcTypeCode(SqlTypes.JSON)
@Column(columnDefinition = "jsonb")
private Map<String, Double> features;
@Column(nullable = false)
private Instant analyzedAt;
@PrePersist
protected void onCreate() {
if (analyzedAt == null) {
analyzedAt = Instant.now();
}
if (isDark == null) {
isDark = false;
}
if (isLeader == null) {
isLeader = false;
}
}
}

파일 보기

@ -0,0 +1,11 @@
package gc.mda.kcg.domain.analysis;
import org.springframework.data.jpa.repository.JpaRepository;
import java.time.Instant;
import java.util.List;
public interface VesselAnalysisResultRepository extends JpaRepository<VesselAnalysisResult, Long> {
List<VesselAnalysisResult> findByTimestampAfter(Instant since);
}

파일 보기

@ -0,0 +1,47 @@
package gc.mda.kcg.domain.analysis;
import gc.mda.kcg.config.CacheConfig;
import lombok.RequiredArgsConstructor;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.stereotype.Service;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
@Service
@RequiredArgsConstructor
public class VesselAnalysisService {
private static final int RECENT_MINUTES = 10;
private final VesselAnalysisResultRepository repository;
private final CacheManager cacheManager;
/**
* 최근 10분 분석 결과를 반환한다. Caffeine 캐시(TTL 5분) 적용.
*/
@SuppressWarnings("unchecked")
public List<VesselAnalysisDto> getLatestResults() {
Cache cache = cacheManager.getCache(CacheConfig.VESSEL_ANALYSIS);
if (cache != null) {
Cache.ValueWrapper wrapper = cache.get("data");
if (wrapper != null) {
return (List<VesselAnalysisDto>) wrapper.get();
}
}
Instant since = Instant.now().minus(RECENT_MINUTES, ChronoUnit.MINUTES);
List<VesselAnalysisDto> results = repository.findByTimestampAfter(since)
.stream()
.map(VesselAnalysisDto::from)
.toList();
if (cache != null) {
cache.put("data", results);
}
return results;
}
}

파일 보기

@ -11,5 +11,7 @@ app:
client-id: ${GOOGLE_CLIENT_ID}
auth:
allowed-domain: ${AUTH_ALLOWED_DOMAIN:gcsc.co.kr}
collector:
prediction-base-url: ${PREDICTION_BASE_URL:http://192.168.1.18:8001}
cors:
allowed-origins: http://localhost:5173,https://kcg.gc-si.dev

파일 보기

@ -0,0 +1,30 @@
CREATE TABLE IF NOT EXISTS kcg.vessel_analysis_results (
id BIGSERIAL PRIMARY KEY,
mmsi VARCHAR(15) NOT NULL,
timestamp TIMESTAMPTZ NOT NULL,
vessel_type VARCHAR(20),
confidence DOUBLE PRECISION,
fishing_pct DOUBLE PRECISION,
cluster_id INTEGER,
season VARCHAR(10),
zone VARCHAR(20),
dist_to_baseline_nm DOUBLE PRECISION,
activity_state VARCHAR(20),
ucaf_score DOUBLE PRECISION,
ucft_score DOUBLE PRECISION,
is_dark BOOLEAN DEFAULT FALSE,
gap_duration_min INTEGER,
spoofing_score DOUBLE PRECISION,
bd09_offset_m DOUBLE PRECISION,
speed_jump_count INTEGER,
cluster_size INTEGER,
is_leader BOOLEAN DEFAULT FALSE,
fleet_role VARCHAR(20),
risk_score INTEGER,
risk_level VARCHAR(20),
features JSONB,
analyzed_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_vessel_analysis_mmsi ON kcg.vessel_analysis_results(mmsi);
CREATE INDEX IF NOT EXISTS idx_vessel_analysis_timestamp ON kcg.vessel_analysis_results(timestamp DESC);

파일 보기

@ -0,0 +1,3 @@
-- UPSERT를 위한 UNIQUE 인덱스 추가
CREATE UNIQUE INDEX IF NOT EXISTS idx_vessel_analysis_mmsi_ts
ON kcg.vessel_analysis_results(mmsi, timestamp);

파일 보기

@ -0,0 +1,15 @@
[Unit]
Description=KCG Vessel Analysis (FastAPI)
After=network.target
[Service]
Type=simple
User=root
WorkingDirectory=/home/apps/kcg-prediction
ExecStart=/home/apps/kcg-prediction/venv/bin/uvicorn main:app --host 0.0.0.0 --port 8001
EnvironmentFile=/home/apps/kcg-prediction/.env
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target

파일 보기

@ -1881,6 +1881,11 @@
border-top-color: rgba(10, 10, 26, 0.96) !important;
}
/* 중국어선 오버레이 마커 — 이벤트 차단 */
.maplibregl-marker:has(.cn-fishing-no-events) {
pointer-events: none;
}
.gl-popup .maplibregl-popup-close-button,
.event-popup .maplibregl-popup-close-button {
color: #aaa !important;

파일 보기

@ -21,6 +21,7 @@ import { useAuth } from './hooks/useAuth';
import { useTranslation } from 'react-i18next';
import LoginPage from './components/auth/LoginPage';
import CollectorMonitor from './components/common/CollectorMonitor';
import { FieldAnalysisModal } from './components/korea/FieldAnalysisModal';
import './App.css';
function App() {
@ -147,6 +148,7 @@ function AuthenticatedApp({ user, onLogout }: AuthenticatedAppProps) {
}, []);
const [showCollectorMonitor, setShowCollectorMonitor] = useState(false);
const [showFieldAnalysis, setShowFieldAnalysis] = useState(false);
const [timeZone, setTimeZone] = useState<'KST' | 'UTC'>('KST');
const [hoveredShipMmsi, setHoveredShipMmsi] = useState<string | null>(null);
const [focusShipMmsi, setFocusShipMmsi] = useState<string | null>(null);
@ -253,15 +255,6 @@ function AuthenticatedApp({ user, onLogout }: AuthenticatedAppProps) {
{dashboardTab === 'korea' && (
<div className="mode-toggle">
<button
type="button"
className={`mode-btn ${koreaFiltersResult.filters.illegalFishing ? 'active live' : ''}`}
onClick={() => koreaFiltersResult.setFilter('illegalFishing', !koreaFiltersResult.filters.illegalFishing)}
title={t('filters.illegalFishing')}
>
<span className="text-[11px]">🚫🐟</span>
{t('filters.illegalFishing')}
</button>
<button
type="button"
className={`mode-btn ${koreaFiltersResult.filters.illegalTransship ? 'active live' : ''}`}
@ -316,6 +309,15 @@ function AuthenticatedApp({ user, onLogout }: AuthenticatedAppProps) {
<span className="text-[11px]">🎣</span>
</button>
<button
type="button"
className={`mode-btn ${showFieldAnalysis ? 'active' : ''}`}
onClick={() => setShowFieldAnalysis(v => !v)}
title="현장분석"
>
<span className="text-[11px]">📊</span>
</button>
</div>
)}
@ -548,6 +550,9 @@ function AuthenticatedApp({ user, onLogout }: AuthenticatedAppProps) {
<>
<main className="app-main">
<div className="map-panel">
{showFieldAnalysis && (
<FieldAnalysisModal ships={koreaData.ships} onClose={() => setShowFieldAnalysis(false)} />
)}
<KoreaMap
ships={koreaFiltersResult.filteredShips}
aircraft={koreaData.visibleAircraft}

파일 보기

@ -194,32 +194,33 @@ export function ChineseFishingOverlay({ ships }: Props) {
</Source>
)}
{/* 어구/어망 위치 마커 (모선 연결된 것) */}
{gearLinks.map(link => (
{/* 어구/어망 위치 마커 (모선 연결된 것) — 최대 50개 */}
{gearLinks.slice(0, 50).map(link => (
<Marker key={`gearlink-${link.gear.mmsi}`} longitude={link.gear.lng} latitude={link.gear.lat} anchor="center">
<div style={{ filter: 'drop-shadow(0 0 3px #f9731688)', pointerEvents: 'none' }}>
<div className="cn-fishing-no-events">
<div style={{ filter: 'drop-shadow(0 0 3px #f9731688)' }}>
<FishingNetIcon color="#f97316" size={10} />
</div>
<div style={{
fontSize: 5, color: '#f97316', textAlign: 'center',
textShadow: '0 0 2px #000', fontWeight: 700, marginTop: -1,
whiteSpace: 'nowrap', pointerEvents: 'none',
whiteSpace: 'nowrap',
}}>
{link.parentName}
</div>
</div>
</Marker>
))}
{/* 조업 중 어선 — 어구 아이콘 */}
{operating.map(({ ship, analysis }) => {
{/* 조업 중 어선 — 어구 아이콘 — 최대 80개 */}
{operating.slice(0, 80).map(({ ship, analysis }) => {
const meta = GEAR_LABELS[analysis.gearType];
return (
<Marker key={`gear-${ship.mmsi}`} longitude={ship.lng} latitude={ship.lat} anchor="bottom">
<div style={{
<div className="cn-fishing-no-events" style={{
marginBottom: 8,
filter: `drop-shadow(0 0 3px ${meta?.color || '#f97316'}88)`,
opacity: 0.85,
pointerEvents: 'none',
}}>
<GearIcon gear={analysis.gearType} size={12} />
</div>
@ -227,10 +228,10 @@ export function ChineseFishingOverlay({ ships }: Props) {
);
})}
{/* 본선/부속선/어선 역할 라벨 */}
{analyzed.filter(a => a.role.role).map(({ ship, role }) => (
{/* 본선/부속선/어선 역할 라벨 — 최대 100개 */}
{analyzed.filter(a => a.role.role).slice(0, 100).map(({ ship, role }) => (
<Marker key={`role-${ship.mmsi}`} longitude={ship.lng} latitude={ship.lat} anchor="top">
<div style={{
<div className="cn-fishing-no-events" style={{
marginTop: 6,
fontSize: 5,
fontWeight: 700,
@ -238,7 +239,6 @@ export function ChineseFishingOverlay({ ships }: Props) {
textShadow: '0 0 2px #000, 0 0 2px #000',
textAlign: 'center',
whiteSpace: 'nowrap',
pointerEvents: 'none',
}}>
{role.roleKo}
</div>
@ -248,7 +248,7 @@ export function ChineseFishingOverlay({ ships }: Props) {
{/* 운반선 라벨 */}
{carriers.map(s => (
<Marker key={`carrier-${s.mmsi}`} longitude={s.lng} latitude={s.lat} anchor="top">
<div style={{
<div className="cn-fishing-no-events" style={{
marginTop: 6,
fontSize: 5,
fontWeight: 700,
@ -256,7 +256,6 @@ export function ChineseFishingOverlay({ ships }: Props) {
textShadow: '0 0 2px #000, 0 0 2px #000',
textAlign: 'center',
whiteSpace: 'nowrap',
pointerEvents: 'none',
}}>
</div>

파일 보기

@ -0,0 +1,854 @@
import { useState, useMemo, useEffect, useCallback } from 'react';
import type { Ship, ChnPrmShipInfo } from '../../types';
import { analyzeFishing } from '../../utils/fishingAnalysis';
import { getMarineTrafficCategory } from '../../utils/marineTraffic';
import { lookupPermittedShip } from '../../services/chnPrmShip';
// MarineTraffic 사진 캐시 (null = 없음, undefined = 미조회)
const mtPhotoCache = new Map<string, string | null>();
async function loadMarineTrafficPhoto(mmsi: string): Promise<string | null> {
if (mtPhotoCache.has(mmsi)) return mtPhotoCache.get(mmsi) ?? null;
return new Promise(resolve => {
const url = `https://photos.marinetraffic.com/ais/showphoto.aspx?mmsi=${mmsi}&size=thumb300`;
const img = new Image();
img.onload = () => { mtPhotoCache.set(mmsi, url); resolve(url); };
img.onerror = () => { mtPhotoCache.set(mmsi, null); resolve(null); };
img.src = url;
});
}
// S&P Global 이미지 캐시
const spgCache = new Map<string, string | null>();
async function loadSpgPhoto(imo: string, shipImagePath: string): Promise<string | null> {
if (spgCache.has(imo)) return spgCache.get(imo) ?? null;
try {
const res = await fetch(`/signal-batch/api/v1/shipimg/${imo}`);
if (!res.ok) throw new Error();
const data: Array<{ picId: number; path: string }> = await res.json();
const url = data.length > 0 ? `${data[0].path}_2.jpg` : `${shipImagePath.replace(/_[12]\.\w+$/, '')}_2.jpg`;
spgCache.set(imo, url);
return url;
} catch {
const fallback = `${shipImagePath.replace(/_[12]\.\w+$/, '')}_2.jpg`;
spgCache.set(imo, fallback);
return fallback;
}
}
// ── 항상 다크 테마 색상 팔레트
const C = {
bg: '#07101A',
bg2: '#0C1825',
bg3: '#112033',
panel: '#040C14',
green: '#00E676',
cyan: '#18FFFF',
amber: '#FFD740',
red: '#FF5252',
purple: '#E040FB',
ink: '#CFE2F3',
ink2: '#7EA8C4',
ink3: '#3D6480',
border: '#1A3350',
border2: '#0E2035',
} as const;
// 황해 위치 기반 수역 분류 (근사값)
function classifyZone(lng: number): string {
if (lng > 124.8) return 'TERRITORIAL';
if (lng > 124.2) return 'CONTIGUOUS';
if (lng > 121.5) return 'EEZ';
return 'BEYOND';
}
// AIS 수신 기준 선박 상태 분류
function classifyState(ship: Ship): string {
const ageMins = (Date.now() - ship.lastSeen) / 60000;
if (ageMins > 20) return 'AIS_LOSS';
if (ship.speed <= 0.5) return 'STATIONARY';
if (ship.speed >= 5.0) return 'SAILING';
return 'FISHING';
}
function getAlertLevel(zone: string, state: string): 'CRITICAL' | 'WATCH' | 'MONITOR' | 'NORMAL' {
if (zone === 'TERRITORIAL') return 'CRITICAL';
if (state === 'AIS_LOSS') return 'WATCH';
if (zone === 'CONTIGUOUS' && state === 'FISHING') return 'WATCH';
if (zone === 'EEZ' && state === 'FISHING') return 'MONITOR';
return 'NORMAL';
}
function stateLabel(s: string): string {
const map: Record<string, string> = {
FISHING: '조업중', SAILING: '항행중', STATIONARY: '정박', AIS_LOSS: 'AIS소실',
};
return map[s] ?? s;
}
function zoneLabel(z: string): string {
const map: Record<string, string> = {
TERRITORIAL: '영해(침범!)', CONTIGUOUS: '접속수역', EEZ: 'EEZ', BEYOND: 'EEZ외측',
};
return map[z] ?? z;
}
// 근접 클러스터링 (~5NM 내 2척 이상 집단)
function buildClusters(vessels: ProcessedVessel[]): Map<string, string> {
const result = new Map<string, string>();
let clusterIdx = 0;
for (let i = 0; i < vessels.length; i++) {
if (result.has(vessels[i].ship.mmsi)) continue;
const cluster: string[] = [vessels[i].ship.mmsi];
for (let j = i + 1; j < vessels.length; j++) {
if (result.has(vessels[j].ship.mmsi)) continue;
const dlat = Math.abs(vessels[i].ship.lat - vessels[j].ship.lat);
const dlng = Math.abs(vessels[i].ship.lng - vessels[j].ship.lng);
if (dlat < 0.08 && dlng < 0.08) {
cluster.push(vessels[j].ship.mmsi);
}
}
if (cluster.length >= 2) {
clusterIdx++;
const id = `C-${String(clusterIdx).padStart(2, '0')}`;
cluster.forEach(mmsi => result.set(mmsi, id));
}
}
return result;
}
interface ProcessedVessel {
ship: Ship;
zone: string;
state: string;
alert: 'CRITICAL' | 'WATCH' | 'MONITOR' | 'NORMAL';
vtype: string;
cluster: string;
}
interface LogEntry {
ts: string;
mmsi: string;
name: string;
type: string;
level: 'critical' | 'watch' | 'info';
}
interface Props {
ships: Ship[];
onClose: () => void;
}
const PIPE_STEPS = [
{ num: '01', name: 'AIS 전처리' },
{ num: '02', name: '행동 상태 탐지' },
{ num: '03', name: '궤적 리샘플링' },
{ num: '04', name: '특징 벡터 추출' },
{ num: '05', name: '규칙 기반 분류' },
{ num: '06', name: 'BIRCH 군집화' },
{ num: '07', name: '계절 활동 분석' },
];
const ALERT_ORDER: Record<string, number> = { CRITICAL: 0, WATCH: 1, MONITOR: 2, NORMAL: 3 };
export function FieldAnalysisModal({ ships, onClose }: Props) {
const [activeFilter, setActiveFilter] = useState('ALL');
const [search, setSearch] = useState('');
const [selectedMmsi, setSelectedMmsi] = useState<string | null>(null);
const [logs, setLogs] = useState<LogEntry[]>([]);
const [pipeStep, setPipeStep] = useState(0);
const [tick, setTick] = useState(0);
// 중국 어선만 필터
const cnFishing = useMemo(() => ships.filter(s => {
if (s.flag !== 'CN') return false;
const cat = getMarineTrafficCategory(s.typecode, s.category);
return cat === 'fishing' || s.category === 'fishing';
}), [ships]);
// 선박 데이터 처리
const processed = useMemo((): ProcessedVessel[] => {
const baseList = cnFishing.map(ship => {
const zone = classifyZone(ship.lng);
const state = classifyState(ship);
const alert = getAlertLevel(zone, state);
const analysis = analyzeFishing(ship);
const gear = analysis.gearType;
const vtype =
(gear === 'trawl_pair' || gear === 'trawl_single') ? 'TRAWL' :
gear === 'purse_seine' ? 'PURSE' :
gear === 'gillnet' ? 'GILLNET' :
gear === 'stow_net' ? 'TRAP' :
'TRAWL';
return { ship, zone, state, alert, vtype, cluster: '' };
});
const clusterMap = buildClusters(baseList);
return baseList.map(v => ({ ...v, cluster: clusterMap.get(v.ship.mmsi) ?? '—' }));
}, [cnFishing]);
// 필터 + 정렬
const displayed = useMemo(() => {
return processed
.filter(v => {
if (activeFilter === 'CRITICAL' && v.alert !== 'CRITICAL') return false;
if (activeFilter === 'FISHING' && v.state !== 'FISHING') return false;
if (activeFilter === 'AIS_LOSS' && v.state !== 'AIS_LOSS') return false;
if (activeFilter === 'TERRITORIAL' && v.zone !== 'TERRITORIAL') return false;
if (search && !v.ship.mmsi.includes(search) && !v.ship.name.toLowerCase().includes(search)) return false;
return true;
})
.sort((a, b) => ALERT_ORDER[a.alert] - ALERT_ORDER[b.alert]);
}, [processed, activeFilter, search]);
// 통계
const stats = useMemo(() => ({
total: processed.length,
territorial: processed.filter(v => v.zone === 'TERRITORIAL').length,
fishing: processed.filter(v => v.state === 'FISHING').length,
aisLoss: processed.filter(v => v.state === 'AIS_LOSS').length,
gpsAnomaly: 0,
clusters: new Set(processed.filter(v => v.cluster !== '—').map(v => v.cluster)).size,
trawl: processed.filter(v => v.vtype === 'TRAWL').length,
purse: processed.filter(v => v.vtype === 'PURSE').length,
}), [processed]);
// 구역별 카운트
const zoneCounts = useMemo(() => ({
terr: processed.filter(v => v.zone === 'TERRITORIAL').length,
cont: processed.filter(v => v.zone === 'CONTIGUOUS').length,
eez: processed.filter(v => v.zone === 'EEZ').length,
beyond: processed.filter(v => v.zone === 'BEYOND').length,
}), [processed]);
// 초기 경보 로그 생성
useEffect(() => {
const initLogs: LogEntry[] = processed
.filter(v => v.alert === 'CRITICAL' || v.alert === 'WATCH')
.slice(0, 10)
.map((v, i) => {
const t = new Date(Date.now() - i * 4 * 60000);
const ts = t.toTimeString().slice(0, 8);
const type =
v.zone === 'TERRITORIAL' ? '영해 내 불법조업 탐지' :
v.state === 'AIS_LOSS' ? 'AIS 신호 소실 — Dark Vessel 의심' :
'접속수역 조업 행위 감지';
return { ts, mmsi: v.ship.mmsi, name: v.ship.name || '(Unknown)', type, level: v.alert === 'CRITICAL' ? 'critical' : 'watch' };
});
setLogs(initLogs);
// eslint-disable-next-line react-hooks/exhaustive-deps
}, []);
// AI 파이프라인 애니메이션
useEffect(() => {
const t = setInterval(() => setPipeStep(s => s + 1), 1200);
return () => clearInterval(t);
}, []);
// 시계 tick
useEffect(() => {
const t = setInterval(() => setTick(s => s + 1), 1000);
return () => clearInterval(t);
}, []);
void tick; // used to force re-render for clock
// Escape 키 닫기
useEffect(() => {
const onKey = (e: KeyboardEvent) => { if (e.key === 'Escape') onClose(); };
window.addEventListener('keydown', onKey);
return () => window.removeEventListener('keydown', onKey);
}, [onClose]);
const selectedVessel = useMemo(() =>
selectedMmsi ? processed.find(v => v.ship.mmsi === selectedMmsi) ?? null : null,
[selectedMmsi, processed],
);
// 허가 정보
const [permitStatus, setPermitStatus] = useState<'idle' | 'loading' | 'found' | 'not-found'>('idle');
const [permitData, setPermitData] = useState<ChnPrmShipInfo | null>(null);
// 선박 사진
const [photoUrl, setPhotoUrl] = useState<string | null | undefined>(undefined); // undefined=로딩, null=없음
useEffect(() => {
if (!selectedVessel) return;
const { ship } = selectedVessel;
// 허가 조회
setPermitStatus('loading');
setPermitData(null);
lookupPermittedShip(ship.mmsi).then(data => {
setPermitData(data);
setPermitStatus(data ? 'found' : 'not-found');
});
// 사진 로드: S&P Global 우선, 없으면 MarineTraffic
setPhotoUrl(undefined);
if (ship.imo && ship.shipImagePath) {
loadSpgPhoto(ship.imo, ship.shipImagePath).then(url => {
if (url) { setPhotoUrl(url); return; }
loadMarineTrafficPhoto(ship.mmsi).then(setPhotoUrl);
});
} else {
loadMarineTrafficPhoto(ship.mmsi).then(setPhotoUrl);
}
}, [selectedMmsi]); // eslint-disable-line react-hooks/exhaustive-deps
const addLog = useCallback((mmsi: string, name: string, type: string, level: 'critical' | 'watch') => {
const ts = new Date().toTimeString().slice(0, 8);
setLogs(prev => [{ ts, mmsi, name, type, level }, ...prev].slice(0, 60));
}, []);
const downloadCsv = useCallback(() => {
const headers = ['MMSI', '선명', '위도', '경도', 'SOG(kt)', '침로(°)', '상태', '선종', '구역', '클러스터', '경보등급', '마지막수신(분전)'];
const rows = processed.map(v => {
const ageMins = Math.floor((Date.now() - v.ship.lastSeen) / 60000);
return [
v.ship.mmsi,
v.ship.name || '',
v.ship.lat.toFixed(5),
v.ship.lng.toFixed(5),
v.state === 'AIS_LOSS' ? '' : v.ship.speed.toFixed(1),
v.state === 'AIS_LOSS' ? '' : String(v.ship.course),
stateLabel(v.state),
v.vtype,
zoneLabel(v.zone),
v.cluster,
v.alert,
String(ageMins),
].map(s => `"${s}"`).join(',');
});
const csv = [headers.join(','), ...rows].join('\n');
const blob = new Blob(['\uFEFF' + csv], { type: 'text/csv;charset=utf-8;' });
const url = URL.createObjectURL(blob);
const a = document.createElement('a');
a.href = url;
a.download = `cn_fishing_vessels_${new Date().toISOString().slice(0, 10)}.csv`;
a.click();
URL.revokeObjectURL(url);
}, [processed]);
// 색상 헬퍼
const alertColor = (al: string) => ({ CRITICAL: C.red, WATCH: C.amber, MONITOR: C.cyan, NORMAL: C.green }[al] ?? C.ink3);
const zoneColor = (z: string) => ({ TERRITORIAL: C.red, CONTIGUOUS: C.amber, EEZ: C.cyan, BEYOND: C.green }[z] ?? C.ink3);
const stateColor = (s: string) => ({ FISHING: C.amber, SAILING: C.cyan, STATIONARY: C.green, AIS_LOSS: C.red }[s] ?? C.ink3);
return (
<div style={{
position: 'absolute', inset: 0, zIndex: 2000,
background: 'rgba(2,6,14,0.96)',
display: 'flex', flexDirection: 'column',
fontFamily: "'IBM Plex Mono', 'Noto Sans KR', monospace",
}}>
{/* ── 헤더 */}
<div style={{
background: C.panel,
borderBottom: `1px solid ${C.border}`,
padding: '10px 20px',
display: 'flex', alignItems: 'center', gap: 12, flexShrink: 0,
}}>
<span style={{ color: C.green, fontSize: 9, letterSpacing: 3 }}> FIELD ANALYSIS</span>
<span style={{ color: '#fff', fontSize: 14, fontWeight: 700, letterSpacing: 1 }}> </span>
<span style={{ color: C.ink3, fontSize: 10 }}>AIS · · BIRCH · Shepperson(2017) · Yan et al.(2022)</span>
<div style={{ marginLeft: 'auto', display: 'flex', gap: 16, alignItems: 'center' }}>
<span style={{ color: C.green, fontSize: 10, display: 'flex', alignItems: 'center', gap: 4 }}>
<span style={{ width: 6, height: 6, borderRadius: '50%', background: C.green, display: 'inline-block', animation: 'pulse 1.5s ease-in-out infinite' }} />
LIVE
</span>
<span style={{ color: C.ink2, fontSize: 10 }}>{new Date().toLocaleTimeString('ko-KR')}</span>
<button
type="button"
onClick={onClose}
style={{
background: 'rgba(255,82,82,0.1)', border: `1px solid rgba(255,82,82,0.4)`,
color: C.red, padding: '4px 14px', cursor: 'pointer',
fontSize: 11, borderRadius: 2, fontFamily: 'inherit',
}}
>
</button>
</div>
</div>
{/* ── 통계 스트립 */}
<div style={{
display: 'flex', gap: 8, padding: '8px 12px',
background: C.bg, flexShrink: 0,
borderBottom: `1px solid ${C.border}`,
}}>
{[
{ label: '총 탐지 어선', val: stats.total, color: C.cyan, sub: 'AIS 수신 기준' },
{ label: '영해 침범', val: stats.territorial, color: C.red, sub: '12NM 이내' },
{ label: '조업 중', val: stats.fishing, color: C.amber, sub: 'SOG 0.55.0kt' },
{ label: 'AIS 소실', val: stats.aisLoss, color: C.red, sub: '>20분 미수신' },
{ label: 'GPS 이상', val: stats.gpsAnomaly, color: C.purple, sub: 'BD-09 의심' },
{ label: '집단 클러스터', val: stats.clusters, color: C.amber, sub: 'BIRCH 군집' },
{ label: '트롤어선', val: stats.trawl, color: C.purple, sub: '규칙 기반 분류' },
{ label: '선망어선', val: stats.purse, color: C.cyan, sub: '규칙 기반 분류' },
].map(({ label, val, color, sub }) => (
<div key={label} style={{
flex: 1, background: C.bg2, border: `1px solid ${C.border}`,
borderRadius: 3, padding: '8px 10px', textAlign: 'center',
borderTop: `2px solid ${color}`,
}}>
<div style={{ fontSize: 9, color: C.ink3, letterSpacing: 1 }}>{label}</div>
<div style={{ fontSize: 22, fontWeight: 700, color, lineHeight: 1.2 }}>{val}</div>
<div style={{ fontSize: 9, color: C.ink3 }}>{sub}</div>
</div>
))}
</div>
{/* ── 메인 그리드 */}
<div style={{
display: 'flex', flex: 1, overflow: 'hidden',
background: C.bg,
}}>
{/* ── 좌측 패널: 구역 현황 + AI 파이프라인 */}
<div style={{
width: 240, flexShrink: 0,
background: C.panel, borderRight: `1px solid ${C.border}`,
overflow: 'auto', padding: '10px 12px',
}}>
<div style={{ fontSize: 9, letterSpacing: 2, color: C.cyan, marginBottom: 8, paddingBottom: 6, borderBottom: `1px solid ${C.border}` }}>
<span style={{ float: 'right', color: C.green, fontSize: 8 }}></span>
</div>
{([
{ label: '영해 (12NM)', count: zoneCounts.terr, color: C.red, sub: '즉시 퇴거 명령 필요' },
{ label: '접속수역 (24NM)', count: zoneCounts.cont, color: C.amber, sub: '조업 행위 집중 모니터링' },
{ label: 'EEZ 내측', count: zoneCounts.eez, color: C.amber, sub: '조업밀도 핫스팟 포함' },
{ label: 'EEZ 외측', count: zoneCounts.beyond, color: C.green, sub: '정상 모니터링' },
] as const).map(({ label, count, color, sub }) => {
const max = Math.max(processed.length, 1);
return (
<div key={label} style={{ marginBottom: 10 }}>
<div style={{ display: 'flex', justifyContent: 'space-between', marginBottom: 3 }}>
<span style={{ fontSize: 10, color }}>{label}</span>
<span style={{ fontSize: 11, fontWeight: 700, color }}>{count}</span>
</div>
<div style={{ height: 4, background: C.border2, borderRadius: 2, overflow: 'hidden' }}>
<div style={{ height: '100%', width: `${Math.min((count / max) * 100, 100)}%`, background: color, borderRadius: 2, transition: 'width 0.5s' }} />
</div>
<div style={{ fontSize: 9, color: C.ink3, marginTop: 2 }}>{sub}</div>
</div>
);
})}
<div style={{ fontSize: 9, letterSpacing: 2, color: C.cyan, margin: '12px 0 8px', paddingBottom: 6, borderBottom: `1px solid ${C.border}` }}>
AI
<span style={{ float: 'right', color: C.green, fontSize: 8 }}></span>
</div>
{PIPE_STEPS.map((step, idx) => {
const isRunning = idx === pipeStep % PIPE_STEPS.length;
return (
<div key={step.num} style={{ display: 'flex', alignItems: 'center', gap: 6, marginBottom: 5 }}>
<span style={{ fontSize: 9, color: C.ink3, width: 20 }}>{step.num}</span>
<span style={{ fontSize: 10, color: C.ink, flex: 1 }}>{step.name}</span>
<span style={{
fontSize: 8, padding: '1px 6px', borderRadius: 2,
background: isRunning ? 'rgba(0,230,118,0.15)' : 'rgba(0,230,118,0.06)',
border: `1px solid ${isRunning ? C.green : C.border}`,
color: isRunning ? C.green : C.ink3,
fontWeight: isRunning ? 700 : 400,
}}>
{isRunning ? 'PROC' : 'OK'}
</span>
</div>
);
})}
{[
{ num: 'GPS', name: 'BD-09 변환', status: 'STANDBY', color: C.ink3 },
{ num: 'NRD', name: '레이더 교차검증', status: '미연동', color: C.ink3 },
].map(step => (
<div key={step.num} style={{ display: 'flex', alignItems: 'center', gap: 6, marginBottom: 5 }}>
<span style={{ fontSize: 9, color: C.ink3, width: 20 }}>{step.num}</span>
<span style={{ fontSize: 10, color: C.ink, flex: 1 }}>{step.name}</span>
<span style={{
fontSize: 8, padding: '1px 6px', borderRadius: 2,
background: 'rgba(24,255,255,0.08)', border: `1px solid ${C.border}`, color: step.color,
}}>
{step.status}
</span>
</div>
))}
{/* 알고리즘 기준 요약 */}
<div style={{ fontSize: 9, letterSpacing: 2, color: C.cyan, margin: '12px 0 8px', paddingBottom: 6, borderBottom: `1px solid ${C.border}` }}>
</div>
{[
{ label: '위치 판정', val: 'Haversine + 기선', color: C.ink2 },
{ label: '조업 패턴', val: 'UCAF/UCFT SOG', color: C.ink2 },
{ label: 'AIS 소실', val: '>20분 미수신', color: C.amber },
{ label: 'GPS 조작', val: 'BD-09 좌표계', color: C.purple },
{ label: '클러스터', val: 'BIRCH 5NM', color: C.ink2 },
{ label: '선종 분류', val: '규칙 기반 (Python 연동 예정)', color: C.ink2 },
].map(({ label, val, color }) => (
<div key={label} style={{ display: 'flex', justifyContent: 'space-between', marginBottom: 4 }}>
<span style={{ fontSize: 9, color: C.ink3 }}>{label}</span>
<span style={{ fontSize: 9, color }}>{val}</span>
</div>
))}
</div>
{/* ── 중앙 패널: 선박 테이블 */}
<div style={{ flex: 1, overflow: 'hidden', display: 'flex', flexDirection: 'column' }}>
{/* 필터 바 */}
<div style={{
display: 'flex', gap: 6, padding: '8px 12px', alignItems: 'center',
background: C.bg2, borderBottom: `1px solid ${C.border}`, flexShrink: 0,
flexWrap: 'wrap',
}}>
{[
{ key: 'ALL', label: '전체' },
{ key: 'CRITICAL', label: '긴급 경보' },
{ key: 'FISHING', label: '조업 중' },
{ key: 'AIS_LOSS', label: 'AIS 소실' },
{ key: 'TERRITORIAL', label: '영해 내' },
].map(({ key, label }) => (
<button
key={key}
type="button"
onClick={() => setActiveFilter(key)}
style={{
padding: '3px 10px', fontSize: 10, cursor: 'pointer',
borderRadius: 2, fontFamily: 'inherit',
background: activeFilter === key ? 'rgba(0,230,118,0.15)' : C.bg3,
border: `1px solid ${activeFilter === key ? C.green : C.border}`,
color: activeFilter === key ? C.green : C.ink2,
}}
>
{label}
</button>
))}
<input
value={search}
onChange={e => setSearch(e.target.value.toLowerCase())}
placeholder="MMSI / 선명 검색..."
style={{
flex: 1, minWidth: 120,
background: C.bg3, border: `1px solid ${C.border}`,
color: C.ink, padding: '3px 10px', fontSize: 10,
borderRadius: 2, outline: 'none', fontFamily: 'inherit',
}}
/>
<span style={{ color: C.ink3, fontSize: 10, whiteSpace: 'nowrap' }}>
: <span style={{ color: C.cyan }}>{displayed.length}</span>
</span>
<button
type="button"
onClick={downloadCsv}
title="CSV 다운로드"
style={{
padding: '3px 10px', fontSize: 10, cursor: 'pointer',
borderRadius: 2, fontFamily: 'inherit', whiteSpace: 'nowrap',
background: 'rgba(24,255,255,0.08)', border: `1px solid ${C.border}`, color: C.cyan,
}}
>
CSV
</button>
</div>
{/* 테이블 */}
<div style={{ flex: 1, overflow: 'auto' }}>
<table style={{ width: '100%', borderCollapse: 'separate', borderSpacing: 0 }}>
<thead>
<tr style={{ position: 'sticky', top: 0, background: C.panel, zIndex: 1 }}>
{['AIS', 'MMSI', '선명', '위도', '경도', 'SOG', '침로', '상태', '선종', '구역', '클러스터', '경보', '수신'].map(h => (
<th key={h} style={{
padding: '6px 8px', fontSize: 9, color: C.ink3, fontWeight: 600,
letterSpacing: 1, textAlign: 'left',
borderBottom: `1px solid ${C.border}`, whiteSpace: 'nowrap',
}}>{h}</th>
))}
</tr>
</thead>
<tbody>
{displayed.slice(0, 120).map(v => {
const rowBg =
v.alert === 'CRITICAL' ? 'rgba(255,82,82,0.08)' :
v.alert === 'WATCH' ? 'rgba(255,215,64,0.05)' :
v.alert === 'MONITOR' ? 'rgba(24,255,255,0.04)' :
'transparent';
const isSelected = v.ship.mmsi === selectedMmsi;
const ageMins = Math.floor((Date.now() - v.ship.lastSeen) / 60000);
return (
<tr
key={v.ship.mmsi}
onClick={() => setSelectedMmsi(v.ship.mmsi)}
style={{
background: isSelected ? 'rgba(0,230,118,0.08)' : rowBg,
cursor: 'pointer',
outline: isSelected ? `1px solid ${C.green}` : undefined,
}}
>
<td style={{ padding: '5px 8px' }}>
<span style={{
display: 'inline-block', width: 7, height: 7, borderRadius: '50%',
background: v.state === 'AIS_LOSS' ? C.red : C.green,
}} />
</td>
<td style={{ fontSize: 10, color: C.cyan, whiteSpace: 'nowrap', padding: '5px 8px' }}>{v.ship.mmsi}</td>
<td style={{ fontSize: 10, color: '#fff', padding: '5px 8px', maxWidth: 90, overflow: 'hidden', textOverflow: 'ellipsis', whiteSpace: 'nowrap' }}>
{v.ship.name || '(Unknown)'}
</td>
<td style={{ fontSize: 10, color: C.ink2, padding: '5px 8px', whiteSpace: 'nowrap' }}>{v.ship.lat.toFixed(3)}°N</td>
<td style={{ fontSize: 10, color: C.ink2, padding: '5px 8px', whiteSpace: 'nowrap' }}>{v.ship.lng.toFixed(3)}°E</td>
<td style={{ fontSize: 10, color: C.amber, padding: '5px 8px', whiteSpace: 'nowrap' }}>
{v.state === 'AIS_LOSS' ? '—' : `${v.ship.speed.toFixed(1)}kt`}
</td>
<td style={{ fontSize: 10, color: C.ink2, padding: '5px 8px', whiteSpace: 'nowrap' }}>
{v.state !== 'AIS_LOSS' ? `${v.ship.course}°` : '—'}
</td>
<td style={{ padding: '5px 8px' }}>
<span style={{
fontSize: 9, padding: '2px 5px', borderRadius: 2,
background: `${stateColor(v.state)}22`,
border: `1px solid ${stateColor(v.state)}66`,
color: stateColor(v.state),
}}>
{stateLabel(v.state)}
</span>
</td>
<td style={{ padding: '5px 8px' }}>
<span style={{
fontSize: 9, padding: '2px 5px', borderRadius: 2,
background: 'rgba(24,255,255,0.08)', border: `1px solid ${C.border}`, color: C.cyan,
}}>
{v.vtype}
</span>
</td>
<td style={{ padding: '5px 8px' }}>
<span style={{
fontSize: 9, padding: '2px 5px', borderRadius: 2, whiteSpace: 'nowrap',
background: `${zoneColor(v.zone)}15`,
border: `1px solid ${zoneColor(v.zone)}55`,
color: zoneColor(v.zone),
}}>
{zoneLabel(v.zone)}
</span>
</td>
<td style={{ padding: '5px 8px', fontSize: 10, color: v.cluster !== '—' ? C.purple : C.ink3 }}>
{v.cluster}
</td>
<td style={{ padding: '5px 8px' }}>
<span style={{
fontSize: 9, padding: '2px 5px', borderRadius: 2,
background: `${alertColor(v.alert)}15`,
border: `1px solid ${alertColor(v.alert)}55`,
color: alertColor(v.alert),
}}>
{v.alert}
</span>
</td>
<td style={{ fontSize: 9, color: C.ink3, padding: '5px 8px', whiteSpace: 'nowrap' }}>
{ageMins < 60 ? `${ageMins}분전` : `${Math.floor(ageMins / 60)}시간전`}
</td>
</tr>
);
})}
{displayed.length === 0 && (
<tr>
<td colSpan={13} style={{ padding: 32, textAlign: 'center', color: C.ink3, fontSize: 11 }}>
</td>
</tr>
)}
</tbody>
</table>
</div>
{/* 하단 범례 */}
<div style={{
display: 'flex', gap: 16, padding: '6px 12px', alignItems: 'center',
background: C.bg2, borderTop: `1px solid ${C.border}`,
fontSize: 10, flexShrink: 0, flexWrap: 'wrap',
}}>
{[
{ color: C.red, label: 'CRITICAL — 즉시대응' },
{ color: C.amber, label: 'WATCH — 집중모니터링' },
{ color: C.cyan, label: 'MONITOR — 주시' },
{ color: C.green, label: 'NORMAL — 정상' },
].map(({ color, label }) => (
<span key={label} style={{ display: 'flex', alignItems: 'center', gap: 5, color: C.ink2 }}>
<span style={{ width: 8, height: 8, borderRadius: '50%', background: color, display: 'inline-block' }} />
{label}
</span>
))}
<span style={{ marginLeft: 'auto', color: C.ink3, fontSize: 9 }}>
AIS 4 | Shepperson(2017) | | 5NM
</span>
</div>
</div>
{/* ── 우측 패널: 선박 상세 + 허가 정보 + 사진 + 경보 로그 */}
<div style={{
width: 280, flexShrink: 0,
background: C.panel, borderLeft: `1px solid ${C.border}`,
overflow: 'hidden', display: 'flex', flexDirection: 'column',
}}>
{/* 패널 헤더 */}
<div style={{ fontSize: 9, letterSpacing: 2, color: C.cyan, padding: '10px 12px 6px', borderBottom: `1px solid ${C.border}`, flexShrink: 0 }}>
<span style={{ float: 'right', color: C.green, fontSize: 8 }}></span>
</div>
{/* 스크롤 영역: 상세 + 허가 + 사진 */}
<div style={{ flex: 1, overflow: 'auto', minHeight: 0 }}>
{selectedVessel ? (
<>
{/* 기본 상세 필드 */}
<div style={{ padding: '8px 12px' }}>
{[
{ label: 'MMSI', val: selectedVessel.ship.mmsi, color: C.cyan },
{ label: '선명', val: selectedVessel.ship.name || '(Unknown)', color: '#fff' },
{ label: '위치', val: `${selectedVessel.ship.lat.toFixed(4)}°N ${selectedVessel.ship.lng.toFixed(4)}°E`, color: C.ink },
{ label: '속도 / 침로', val: `${selectedVessel.ship.speed.toFixed(1)}kt ${selectedVessel.ship.course}°`, color: C.amber },
{ label: '행동 상태', val: stateLabel(selectedVessel.state), color: stateColor(selectedVessel.state) },
{ label: '추정 선종', val: selectedVessel.vtype, color: C.ink },
{ label: '현재 구역', val: zoneLabel(selectedVessel.zone), color: zoneColor(selectedVessel.zone) },
{ label: 'BIRCH 클러스터', val: selectedVessel.cluster, color: selectedVessel.cluster !== '—' ? C.purple : C.ink3 },
{ label: '경보 등급', val: selectedVessel.alert, color: alertColor(selectedVessel.alert) },
].map(({ label, val, color }) => (
<div key={label} style={{ display: 'flex', justifyContent: 'space-between', padding: '4px 0', borderBottom: `1px solid ${C.border2}` }}>
<span style={{ fontSize: 9, color: C.ink3 }}>{label}</span>
<span style={{ fontSize: 10, color, fontWeight: 600, textAlign: 'right', maxWidth: 150, overflow: 'hidden', textOverflow: 'ellipsis', whiteSpace: 'nowrap' }}>{val}</span>
</div>
))}
<div style={{ display: 'flex', gap: 6, marginTop: 8 }}>
<button
type="button"
onClick={() => addLog(selectedVessel.ship.mmsi, selectedVessel.ship.name || '', '대응 명령 발령', 'critical')}
style={{ flex: 1, padding: '5px 0', fontSize: 9, cursor: 'pointer', background: 'rgba(255,82,82,0.1)', border: `1px solid rgba(255,82,82,0.4)`, color: C.red, borderRadius: 2, fontFamily: 'inherit' }}
> </button>
<button
type="button"
onClick={() => addLog(selectedVessel.ship.mmsi, selectedVessel.ship.name || '', 'ENG/드론 투입 명령', 'watch')}
style={{ flex: 1, padding: '5px 0', fontSize: 9, cursor: 'pointer', background: 'rgba(255,215,64,0.08)', border: `1px solid rgba(255,215,64,0.3)`, color: C.amber, borderRadius: 2, fontFamily: 'inherit' }}
>ENG/</button>
</div>
</div>
{/* ── 허가 정보 */}
<div style={{ borderTop: `1px solid ${C.border}`, padding: '8px 12px' }}>
<div style={{ fontSize: 9, letterSpacing: 2, color: C.cyan, marginBottom: 7 }}> </div>
{/* 허가 여부 배지 */}
<div style={{ display: 'flex', alignItems: 'center', gap: 8, marginBottom: 8 }}>
<span style={{ fontSize: 9, color: C.ink3 }}> </span>
{permitStatus === 'loading' && (
<span style={{ fontSize: 9, color: C.ink3 }}> ...</span>
)}
{permitStatus === 'found' && (
<span style={{ fontSize: 10, fontWeight: 700, padding: '2px 10px', borderRadius: 2, background: 'rgba(0,230,118,0.15)', border: `1px solid ${C.green}`, color: C.green }}>
</span>
)}
{permitStatus === 'not-found' && (
<span style={{ fontSize: 10, fontWeight: 700, padding: '2px 10px', borderRadius: 2, background: 'rgba(255,82,82,0.12)', border: `1px solid ${C.red}`, color: C.red }}>
</span>
)}
</div>
{/* 허가 내역 (데이터 있을 때) */}
{permitStatus === 'found' && permitData && (
<div style={{ background: C.bg2, border: `1px solid ${C.border}`, borderRadius: 3, padding: '7px 10px' }}>
{[
{ label: '선명', val: permitData.name },
{ label: '선종', val: permitData.vesselType },
{ label: 'IMO', val: String(permitData.imo || '—') },
{ label: '호출부호', val: permitData.callsign || '—' },
{ label: '길이/폭', val: `${permitData.length ?? 0}m / ${permitData.width ?? 0}m` },
{ label: '흘수', val: permitData.draught ? `${permitData.draught}m` : '—' },
{ label: '목적지', val: permitData.destination || '—' },
{ label: '상태', val: permitData.status || '—' },
].map(({ label, val }) => (
<div key={label} style={{ display: 'flex', justifyContent: 'space-between', padding: '3px 0', borderBottom: `1px solid ${C.border2}` }}>
<span style={{ fontSize: 9, color: C.ink3 }}>{label}</span>
<span style={{ fontSize: 9, color: C.ink, textAlign: 'right', maxWidth: 150, overflow: 'hidden', textOverflow: 'ellipsis', whiteSpace: 'nowrap' }}>{val}</span>
</div>
))}
</div>
)}
{/* 미등록 안내 */}
{permitStatus === 'not-found' && (
<div style={{ background: 'rgba(255,82,82,0.06)', border: `1px solid rgba(255,82,82,0.2)`, borderRadius: 3, padding: '7px 10px' }}>
<div style={{ fontSize: 9, color: '#FF8A80', lineHeight: 1.6 }}>
DB에 .<br />
</div>
</div>
)}
</div>
{/* ── 선박 사진 */}
<div style={{ borderTop: `1px solid ${C.border}`, padding: '8px 12px 12px' }}>
<div style={{ fontSize: 9, letterSpacing: 2, color: C.cyan, marginBottom: 7 }}> </div>
<div style={{
width: '100%', height: 140,
background: C.bg3, border: `1px solid ${C.border}`,
borderRadius: 3, overflow: 'hidden',
display: 'flex', alignItems: 'center', justifyContent: 'center',
}}>
{photoUrl === undefined && (
<span style={{ fontSize: 9, color: C.ink3 }}> ...</span>
)}
{photoUrl === null && (
<span style={{ fontSize: 9, color: C.ink3 }}> </span>
)}
{photoUrl && (
<img
src={photoUrl}
alt={selectedVessel.ship.name || '선박'}
style={{ width: '100%', height: '100%', objectFit: 'cover' }}
onError={() => setPhotoUrl(null)}
/>
)}
</div>
{photoUrl && (
<div style={{ fontSize: 8, color: C.ink3, marginTop: 4, textAlign: 'right' }}>
© MarineTraffic / S&P Global
</div>
)}
</div>
</>
) : (
<div style={{ padding: '24px 0', textAlign: 'center', color: C.ink3, fontSize: 10 }}>
</div>
)}
</div>
{/* 경보 로그 — 하단 고정 */}
<div style={{ fontSize: 9, letterSpacing: 2, color: C.cyan, padding: '6px 12px', borderTop: `1px solid ${C.border}`, borderBottom: `1px solid ${C.border}`, flexShrink: 0, display: 'flex', justifyContent: 'space-between' }}>
<span> </span>
<span style={{ color: C.ink3, fontSize: 8 }}>{logs.length}</span>
</div>
<div style={{ flex: '0 0 160px', overflow: 'auto' }}>
{logs.map((log, i) => (
<div key={i} style={{
padding: '5px 12px',
borderBottom: `1px solid ${C.border2}`,
borderLeft: `2px solid ${log.level === 'critical' ? C.red : log.level === 'watch' ? C.amber : C.cyan}`,
}}>
<div style={{ fontSize: 9, color: C.ink3 }}>{log.ts}</div>
<div style={{ fontSize: 10, lineHeight: 1.4, color: log.level === 'critical' ? '#FF8A80' : log.level === 'watch' ? '#FFE57F' : '#80DEEA' }}>
<span style={{ color: C.cyan }}>{log.mmsi}</span> {log.name} {log.type}
</div>
</div>
))}
{logs.length === 0 && (
<div style={{ padding: 16, textAlign: 'center', color: C.ink3, fontSize: 10 }}> </div>
)}
</div>
</div>
</div>
</div>
);
}

파일 보기

@ -0,0 +1,54 @@
import type { ChnPrmShipInfo } from '../types';
const SIGNAL_BATCH_BASE = '/signal-batch';
const CACHE_TTL_MS = 5 * 60_000; // 5분
let cachedList: ChnPrmShipInfo[] = [];
let cacheTime = 0;
let fetchPromise: Promise<ChnPrmShipInfo[]> | null = null;
async function fetchList(): Promise<ChnPrmShipInfo[]> {
const now = Date.now();
if (cachedList.length > 0 && now - cacheTime < CACHE_TTL_MS) {
return cachedList;
}
if (fetchPromise) return fetchPromise;
fetchPromise = (async () => {
try {
const res = await fetch(
`${SIGNAL_BATCH_BASE}/api/v2/vessels/chnprmship/recent-positions?minutes=60`,
{ headers: { accept: 'application/json' } },
);
if (!res.ok) return cachedList;
const json: unknown = await res.json();
cachedList = Array.isArray(json) ? (json as ChnPrmShipInfo[]) : [];
cacheTime = Date.now();
return cachedList;
} catch {
return cachedList;
} finally {
fetchPromise = null;
}
})();
return fetchPromise;
}
/** mmsi로 허가어선 정보 조회 — 목록을 캐시하고 lookup */
export async function lookupPermittedShip(mmsi: string): Promise<ChnPrmShipInfo | null> {
const list = await fetchList();
return list.find((s) => s.mmsi === mmsi) ?? null;
}
/** 허가어선 mmsi Set (빠른 조회용) */
export async function getPermittedMmsiSet(): Promise<Set<string>> {
const list = await fetchList();
return new Set(list.map((s) => s.mmsi));
}
/** 캐시 강제 갱신 */
export function invalidateCache(): void {
cacheTime = 0;
}

파일 보기

@ -147,3 +147,60 @@ export interface LayerVisibility {
}
export type AppMode = 'replay' | 'live';
// ── 중국어선 분석 결과 (Python 분류기 → REST API → Frontend) ──
export type VesselType = 'TRAWL' | 'PURSE' | 'LONGLINE' | 'TRAP';
export type RiskLevel = 'CRITICAL' | 'WATCH' | 'MONITOR' | 'NORMAL';
export type ActivityState = 'FISHING' | 'SAILING' | 'STATIONARY' | 'AIS_LOSS';
export type ZoneType = 'TERRITORIAL' | 'CONTIGUOUS' | 'EEZ' | 'BEYOND';
export type FleetRole = 'MOTHER' | 'SUB' | 'TRANSPORT' | 'INDEPENDENT';
export interface VesselClassification {
vesselType: VesselType;
confidence: number; // 0~1
fishingPct: number; // 조업 비율 %
clusterId: number; // BIRCH 군집 ID (-1=노이즈)
season: string; // SPRING/SUMMER/FALL/WINTER
}
export interface VesselAlgorithms {
location: { zone: ZoneType; distToBaselineNm: number };
activity: { state: ActivityState; ucafScore: number; ucftScore: number };
darkVessel: { isDark: boolean; gapDurationMin: number };
gpsSpoofing: { spoofingScore: number; bd09OffsetM: number; speedJumpCount: number };
cluster: { clusterId: number; clusterSize: number; centroid?: [number, number] };
fleetRole: { isLeader: boolean; role: FleetRole };
riskScore: { score: number; level: RiskLevel };
}
export interface VesselAnalysisResult {
mmsi: string;
timestamp: string; // ISO 분석 시점
classification: VesselClassification;
algorithms: VesselAlgorithms;
features: Record<string, number>;
}
// 허가어선 정보 (signal-batch /api/v2/vessels/chnprmship)
export interface ChnPrmShipInfo {
mmsi: string;
imo: number;
name: string;
callsign: string;
vesselType: string;
lat: number;
lon: number;
sog: number;
cog: number;
heading: number;
length: number;
width: number;
draught: number;
destination: string;
status: string;
signalKindCode: string;
messageTimestamp: string;
shipImagePath?: string | null;
shipImageCount?: number;
}

파일 보기

파일 보기

@ -0,0 +1,59 @@
import pandas as pd
from algorithms.location import haversine_nm
GAP_SUSPICIOUS_SEC = 1800 # 30분
GAP_HIGH_SUSPICIOUS_SEC = 3600 # 1시간
GAP_VIOLATION_SEC = 86400 # 24시간
def detect_ais_gaps(df_vessel: pd.DataFrame) -> list[dict]:
"""AIS 수신 기록에서 소실 구간 추출."""
if len(df_vessel) < 2:
return []
gaps = []
records = df_vessel.sort_values('timestamp').to_dict('records')
for i in range(1, len(records)):
prev, curr = records[i - 1], records[i]
prev_ts = pd.Timestamp(prev['timestamp'])
curr_ts = pd.Timestamp(curr['timestamp'])
gap_sec = (curr_ts - prev_ts).total_seconds()
if gap_sec < GAP_SUSPICIOUS_SEC:
continue
disp = haversine_nm(
prev['lat'], prev['lon'],
curr['lat'], curr['lon'],
)
if gap_sec >= GAP_VIOLATION_SEC:
severity = 'VIOLATION'
elif gap_sec >= GAP_HIGH_SUSPICIOUS_SEC:
severity = 'HIGH_SUSPICIOUS'
else:
severity = 'SUSPICIOUS'
gaps.append({
'gap_sec': int(gap_sec),
'gap_min': round(gap_sec / 60, 1),
'displacement_nm': round(disp, 2),
'severity': severity,
})
return gaps
def is_dark_vessel(df_vessel: pd.DataFrame) -> tuple[bool, int]:
"""다크베셀 여부 판정.
Returns: (is_dark, max_gap_duration_min)
"""
gaps = detect_ais_gaps(df_vessel)
if not gaps:
return False, 0
max_gap_min = max(g['gap_min'] for g in gaps)
is_dark = max_gap_min >= 30 # 30분 이상 소실
return is_dark, int(max_gap_min)

파일 보기

@ -0,0 +1,117 @@
from __future__ import annotations
import pandas as pd
from algorithms.location import haversine_nm, classify_zone # noqa: F401 (haversine_nm re-exported for callers)
# Yan et al. (2022) 어구별 조업 속도 임계값
GEAR_SOG_THRESHOLDS: dict[str, tuple[float, float]] = {
'PT': (2.5, 4.5), # 쌍끌이저인망
'OT': (2.0, 4.0), # 단선저인망
'GN': (0.5, 2.5), # 자망·유망
'SQ': (0.0, 1.0), # 오징어채낚기
'TRAP': (0.3, 1.5), # 통발
'PS': (3.0, 6.0), # 선망
'TRAWL': (2.0, 4.5), # (alias)
'PURSE': (3.0, 6.0), # (alias)
'LONGLINE': (0.5, 2.5),
}
TRANSIT_SOG_MIN = 5.0
ANCHORED_SOG_MAX = 0.5
def classify_vessel_state(sog: float, cog_delta: float = 0.0,
gear_type: str = 'PT') -> str:
"""UCAF: 어구별 상태 분류."""
if sog <= ANCHORED_SOG_MAX:
return 'ANCHORED'
if sog >= TRANSIT_SOG_MIN:
return 'TRANSIT'
sog_min, sog_max = GEAR_SOG_THRESHOLDS.get(gear_type, (1.0, 5.0))
if sog_min <= sog <= sog_max:
return 'FISHING'
return 'UNKNOWN'
def compute_ucaf_score(df_vessel: pd.DataFrame, gear_type: str = 'PT') -> float:
"""UCAF 점수: 어구별 조업 상태 비율 (0~1)."""
if len(df_vessel) == 0:
return 0.0
sog_min, sog_max = GEAR_SOG_THRESHOLDS.get(gear_type, (1.0, 5.0))
in_range = df_vessel['sog'].between(sog_min, sog_max).sum()
return round(in_range / len(df_vessel), 4)
def compute_ucft_score(df_vessel: pd.DataFrame) -> float:
"""UCFT 점수: 조업 vs 항행 이진 신뢰도 (0~1)."""
if len(df_vessel) == 0:
return 0.0
fishing = (df_vessel['sog'].between(0.5, 5.0)).sum()
transit = (df_vessel['sog'] >= TRANSIT_SOG_MIN).sum()
total = fishing + transit
if total == 0:
return 0.0
return round(fishing / total, 4)
def detect_fishing_segments(df_vessel: pd.DataFrame,
window_min: int = 15,
gear_type: str = 'PT') -> list[dict]:
"""연속 조업 구간 추출."""
if len(df_vessel) < 2:
return []
segments: list[dict] = []
in_fishing = False
seg_start_idx = 0
records = df_vessel.to_dict('records')
for i, rec in enumerate(records):
sog = rec.get('sog', 0)
state = classify_vessel_state(sog, gear_type=gear_type)
if state == 'FISHING' and not in_fishing:
in_fishing = True
seg_start_idx = i
elif state != 'FISHING' and in_fishing:
start_ts = records[seg_start_idx].get('timestamp')
end_ts = rec.get('timestamp')
if start_ts and end_ts:
dur_sec = (pd.Timestamp(end_ts) - pd.Timestamp(start_ts)).total_seconds()
dur_min = dur_sec / 60
if dur_min >= window_min:
zone_info = classify_zone(
records[seg_start_idx].get('lat', 0),
records[seg_start_idx].get('lon', 0),
)
segments.append({
'start_idx': seg_start_idx,
'end_idx': i - 1,
'duration_min': round(dur_min, 1),
'zone': zone_info.get('zone', 'UNKNOWN'),
'in_territorial_sea': zone_info.get('zone') == 'TERRITORIAL_SEA',
})
in_fishing = False
return segments
def detect_trawl_uturn(df_vessel: pd.DataFrame,
uturn_threshold_deg: float = 150.0,
min_uturn_count: int = 3) -> dict:
"""U-turn 왕복 패턴 감지 (저인망 특징)."""
if len(df_vessel) < 2:
return {'uturn_count': 0, 'trawl_suspected': False}
uturn_count = 0
cog_vals = df_vessel['cog'].values
sog_vals = df_vessel['sog'].values
for i in range(1, len(cog_vals)):
delta = abs((cog_vals[i] - cog_vals[i - 1] + 180) % 360 - 180)
if delta >= uturn_threshold_deg and sog_vals[i] < TRANSIT_SOG_MIN:
uturn_count += 1
return {
'uturn_count': uturn_count,
'trawl_suspected': uturn_count >= min_uturn_count,
}

파일 보기

@ -0,0 +1,152 @@
import math
import logging
import numpy as np
import pandas as pd
from algorithms.location import haversine_nm, dist_to_baseline, EARTH_RADIUS_NM
logger = logging.getLogger(__name__)
def detect_group_clusters(
vessel_snapshots: list[dict],
spatial_eps_nm: float = 10.0,
time_eps_hours: float = 2.0,
min_vessels: int = 3,
) -> dict[int, list[dict]]:
"""DBSCAN 시공간 클러스터링으로 집단 탐지."""
if len(vessel_snapshots) < min_vessels:
return {}
try:
from sklearn.cluster import DBSCAN
except ImportError:
logger.warning('sklearn not available for DBSCAN clustering')
return {}
lat_rad = [math.radians(v['lat']) * EARTH_RADIUS_NM for v in vessel_snapshots]
lon_rad = [math.radians(v['lon']) * EARTH_RADIUS_NM for v in vessel_snapshots]
# 시간을 NM 단위로 정규화
timestamps = [pd.Timestamp(v['timestamp']).timestamp() for v in vessel_snapshots]
t_min = min(timestamps)
time_nm = [(t - t_min) / 3600 * 10 / time_eps_hours for t in timestamps]
X = np.array(list(zip(lat_rad, lon_rad, time_nm)))
db = DBSCAN(eps=spatial_eps_nm, min_samples=min_vessels, metric='euclidean').fit(X)
clusters: dict[int, list[dict]] = {}
for idx, label in enumerate(db.labels_):
if label == -1:
continue
clusters.setdefault(int(label), []).append(vessel_snapshots[idx])
return clusters
def identify_lead_vessel(cluster_vessels: list[dict]) -> dict:
"""5기준 스코어링으로 대표선 특정."""
if not cluster_vessels:
return {}
scores: dict[str, float] = {}
timestamps = [pd.Timestamp(v.get('timestamp', 0)).timestamp() for v in cluster_vessels]
min_ts = min(timestamps) if timestamps else 0
lats = [v['lat'] for v in cluster_vessels]
lons = [v['lon'] for v in cluster_vessels]
centroid_lat = float(np.mean(lats))
centroid_lon = float(np.mean(lons))
for i, v in enumerate(cluster_vessels):
mmsi = v['mmsi']
s = 0.0
# 기준 1: 최초 시각 (30점)
ts_rank = timestamps[i] - min_ts
s += 30.0 * (1.0 - min(ts_rank, 7200) / 7200)
# 기준 2: 총톤수 (25점) — 외부 DB 연동 전까지 균등 배점
s += 12.5
# 기준 3: 클러스터 중심 근접성 (20점)
dist_center = haversine_nm(v['lat'], v['lon'], centroid_lat, centroid_lon)
s += 20.0 * (1.0 - min(dist_center, 10) / 10)
# 기준 4: 기선 최근접 (15점)
dist_base = dist_to_baseline(v['lat'], v['lon'])
s += 15.0 * (1.0 - min(dist_base, 12) / 12)
# 기준 5: AIS 소실 이력 (10점) — 이력 없으면 만점
s += 10.0
scores[mmsi] = round(s, 2)
lead_mmsi = max(scores, key=lambda k: scores[k])
score_vals = sorted(scores.values(), reverse=True)
if len(score_vals) > 1 and score_vals[0] - score_vals[1] > 15:
confidence = 'HIGH'
elif len(score_vals) > 1 and score_vals[0] - score_vals[1] > 8:
confidence = 'MED'
else:
confidence = 'LOW'
return {
'lead_mmsi': lead_mmsi,
'lead_score': scores[lead_mmsi],
'all_scores': scores,
'confidence': confidence,
}
def assign_fleet_roles(
vessel_dfs: dict[str, pd.DataFrame],
cluster_map: dict[str, int],
) -> dict[str, dict]:
"""선단 역할 할당: LEADER/MEMBER/NOISE."""
results: dict[str, dict] = {}
# 클러스터별 그룹핑
clusters: dict[int, list[str]] = {}
for mmsi, cid in cluster_map.items():
clusters.setdefault(cid, []).append(mmsi)
for cid, mmsi_list in clusters.items():
if cid == -1:
for mmsi in mmsi_list:
results[mmsi] = {
'cluster_size': 0,
'is_leader': False,
'fleet_role': 'NOISE',
}
continue
cluster_size = len(mmsi_list)
# 스냅샷 생성 (각 선박의 마지막 포인트)
snapshots: list[dict] = []
for mmsi in mmsi_list:
df = vessel_dfs.get(mmsi)
if df is not None and len(df) > 0:
last = df.iloc[-1]
snapshots.append({
'mmsi': mmsi,
'lat': last['lat'],
'lon': last['lon'],
'timestamp': last.get('timestamp', pd.Timestamp.now()),
})
lead_info = identify_lead_vessel(snapshots) if len(snapshots) >= 2 else {}
lead_mmsi = lead_info.get('lead_mmsi')
for mmsi in mmsi_list:
results[mmsi] = {
'cluster_size': cluster_size,
'is_leader': mmsi == lead_mmsi,
'fleet_role': 'LEADER' if mmsi == lead_mmsi else 'MEMBER',
}
return results

파일 보기

@ -0,0 +1,93 @@
from __future__ import annotations
import json
import math
from pathlib import Path
from typing import List, Optional, Tuple
EARTH_RADIUS_NM = 3440.065
TERRITORIAL_SEA_NM = 12.0
CONTIGUOUS_ZONE_NM = 24.0
_baseline_points: Optional[List[Tuple[float, float]]] = None
def _load_baseline() -> List[Tuple[float, float]]:
global _baseline_points
if _baseline_points is not None:
return _baseline_points
path = Path(__file__).parent.parent / 'data' / 'korea_baseline.json'
with open(path, 'r') as f:
data = json.load(f)
_baseline_points = [(p['lat'], p['lon']) for p in data['points']]
return _baseline_points
def haversine_nm(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""두 좌표 간 거리 (해리)."""
R = EARTH_RADIUS_NM
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlam = math.radians(lon2 - lon1)
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def dist_to_baseline(vessel_lat: float, vessel_lon: float,
baseline_points: Optional[List[Tuple[float, float]]] = None) -> float:
"""선박 좌표에서 기선까지 최소 거리 (NM)."""
if baseline_points is None:
baseline_points = _load_baseline()
min_dist = float('inf')
for bp_lat, bp_lon in baseline_points:
d = haversine_nm(vessel_lat, vessel_lon, bp_lat, bp_lon)
if d < min_dist:
min_dist = d
return min_dist
def classify_zone(vessel_lat: float, vessel_lon: float) -> dict:
"""선박 위치 수역 분류."""
dist = dist_to_baseline(vessel_lat, vessel_lon)
if dist <= TERRITORIAL_SEA_NM:
return {
'zone': 'TERRITORIAL_SEA',
'dist_from_baseline_nm': round(dist, 2),
'violation': True,
'alert_level': 'CRITICAL',
}
elif dist <= CONTIGUOUS_ZONE_NM:
return {
'zone': 'CONTIGUOUS_ZONE',
'dist_from_baseline_nm': round(dist, 2),
'violation': False,
'alert_level': 'WATCH',
}
else:
return {
'zone': 'EEZ_OR_BEYOND',
'dist_from_baseline_nm': round(dist, 2),
'violation': False,
'alert_level': 'NORMAL',
}
def bd09_to_wgs84(bd_lat: float, bd_lon: float) -> tuple[float, float]:
"""BD-09 좌표계를 WGS84로 변환."""
x = bd_lon - 0.0065
y = bd_lat - 0.006
z = math.sqrt(x ** 2 + y ** 2) - 0.00002 * math.sin(y * 52.35987756)
theta = math.atan2(y, x) - 0.000003 * math.cos(x * 52.35987756)
gcj_lon = z * math.cos(theta)
gcj_lat = z * math.sin(theta)
wgs_lat = gcj_lat - 0.0023
wgs_lon = gcj_lon - 0.0059
return wgs_lat, wgs_lon
def compute_bd09_offset(lat: float, lon: float) -> float:
"""BD09 좌표와 WGS84 좌표 간 오프셋 (미터)."""
wgs_lat, wgs_lon = bd09_to_wgs84(lat, lon)
dist_nm = haversine_nm(lat, lon, wgs_lat, wgs_lon)
return round(dist_nm * 1852.0, 1) # NM to meters

파일 보기

@ -0,0 +1,75 @@
from typing import Optional, Tuple
import pandas as pd
from algorithms.location import classify_zone
from algorithms.fishing_pattern import detect_fishing_segments, detect_trawl_uturn
from algorithms.dark_vessel import detect_ais_gaps
from algorithms.spoofing import detect_teleportation
def compute_vessel_risk_score(
mmsi: str,
df_vessel: pd.DataFrame,
zone_info: Optional[dict] = None,
is_permitted: Optional[bool] = None,
) -> Tuple[int, str]:
"""선박별 종합 위반 위험도 (0~100점).
Returns: (risk_score, risk_level)
"""
if len(df_vessel) == 0:
return 0, 'LOW'
score = 0
# 1. 위치 기반 (최대 40점)
if zone_info is None:
last = df_vessel.iloc[-1]
zone_info = classify_zone(last['lat'], last['lon'])
zone = zone_info.get('zone', '')
if zone == 'TERRITORIAL_SEA':
score += 40
elif zone == 'CONTIGUOUS_ZONE':
score += 10
# 2. 조업 행위 (최대 30점)
segs = detect_fishing_segments(df_vessel)
ts_fishing = [s for s in segs if s.get('in_territorial_sea')]
if ts_fishing:
score += 20
elif segs:
score += 5
uturn = detect_trawl_uturn(df_vessel)
if uturn.get('trawl_suspected'):
score += 10
# 3. AIS 조작 (최대 35점)
teleports = detect_teleportation(df_vessel)
if teleports:
score += 20
gaps = detect_ais_gaps(df_vessel)
critical_gaps = [g for g in gaps if g['gap_min'] >= 60]
if critical_gaps:
score += 15
elif gaps:
score += 5
# 4. 허가 이력 (최대 20점)
if is_permitted is not None and not is_permitted:
score += 20
score = min(score, 100)
if score >= 70:
level = 'CRITICAL'
elif score >= 50:
level = 'HIGH'
elif score >= 30:
level = 'MEDIUM'
else:
level = 'LOW'
return score, level

파일 보기

@ -0,0 +1,80 @@
import pandas as pd
from algorithms.location import haversine_nm, bd09_to_wgs84, compute_bd09_offset # noqa: F401
MAX_FISHING_SPEED_KNOTS = 25.0
def detect_teleportation(df_vessel: pd.DataFrame,
max_speed_knots: float = MAX_FISHING_SPEED_KNOTS) -> list[dict]:
"""연속 AIS 포인트 간 물리적 불가능 이동 탐지."""
if len(df_vessel) < 2:
return []
anomalies = []
records = df_vessel.sort_values('timestamp').to_dict('records')
for i in range(1, len(records)):
prev, curr = records[i - 1], records[i]
dist_nm = haversine_nm(prev['lat'], prev['lon'], curr['lat'], curr['lon'])
dt_hours = (
pd.Timestamp(curr['timestamp']) - pd.Timestamp(prev['timestamp'])
).total_seconds() / 3600
if dt_hours <= 0:
continue
implied_speed = dist_nm / dt_hours
if implied_speed > max_speed_knots:
anomalies.append({
'idx': i,
'dist_nm': round(dist_nm, 2),
'implied_kn': round(implied_speed, 1),
'type': 'TELEPORTATION',
'confidence': 'HIGH' if implied_speed > 50 else 'MED',
})
return anomalies
def count_speed_jumps(df_vessel: pd.DataFrame, threshold_knots: float = 10.0) -> int:
"""연속 SOG 급변 횟수."""
if len(df_vessel) < 2:
return 0
sog = df_vessel['sog'].values
jumps = 0
for i in range(1, len(sog)):
if abs(sog[i] - sog[i - 1]) > threshold_knots:
jumps += 1
return jumps
def compute_spoofing_score(df_vessel: pd.DataFrame) -> float:
"""종합 GPS 스푸핑 점수 (0~1)."""
if len(df_vessel) < 2:
return 0.0
score = 0.0
n = len(df_vessel)
# 순간이동 비율
teleports = detect_teleportation(df_vessel)
if teleports:
score += min(0.4, len(teleports) / n * 10)
# SOG 급변 비율
jumps = count_speed_jumps(df_vessel)
if jumps > 0:
score += min(0.3, jumps / n * 5)
# BD09 오프셋 (중국 좌표 사용 의심)
mid_idx = len(df_vessel) // 2
row = df_vessel.iloc[mid_idx]
offset = compute_bd09_offset(row['lat'], row['lon'])
if offset > 300: # 300m 이상
score += 0.3
elif offset > 100:
score += 0.1
return round(min(score, 1.0), 4)

0
prediction/cache/__init__.py vendored Normal file
파일 보기

335
prediction/cache/vessel_store.py vendored Normal file
파일 보기

@ -0,0 +1,335 @@
import logging
from datetime import datetime, timezone
from typing import Optional
import numpy as np
import pandas as pd
logger = logging.getLogger(__name__)
_STATIC_REFRESH_INTERVAL_MIN = 60
_PERMIT_REFRESH_INTERVAL_MIN = 30
_EARTH_RADIUS_NM = 3440.065
_MAX_REASONABLE_SOG = 30.0
_CHINESE_MMSI_PREFIX = '412'
def _compute_sog_cog(df: pd.DataFrame) -> pd.DataFrame:
"""Compute SOG (knots) and COG (degrees) from consecutive lat/lon/timestamp points."""
df = df.sort_values(['mmsi', 'timestamp']).copy()
lat1 = np.radians(df['lat'].values[:-1])
lon1 = np.radians(df['lon'].values[:-1])
lat2 = np.radians(df['lat'].values[1:])
lon2 = np.radians(df['lon'].values[1:])
# Haversine distance (nautical miles)
dlat = lat2 - lat1
dlon = lon2 - lon1
a = np.sin(dlat / 2) ** 2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2) ** 2
dist_nm = _EARTH_RADIUS_NM * 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a))
# Time difference (hours)
ts = df['timestamp'].values
dt_sec = (ts[1:] - ts[:-1]).astype('timedelta64[s]').astype(float)
dt_hours = dt_sec / 3600.0
dt_hours[dt_hours <= 0] = np.nan
# SOG = dist / time (knots)
computed_sog = dist_nm / dt_hours
computed_sog = np.clip(np.nan_to_num(computed_sog, nan=0.0), 0, _MAX_REASONABLE_SOG)
# COG = bearing (degrees)
x = np.sin(dlon) * np.cos(lat2)
y = np.cos(lat1) * np.sin(lat2) - np.sin(lat1) * np.cos(lat2) * np.cos(dlon)
bearing = (np.degrees(np.arctan2(x, y)) + 360) % 360
# Append last value (copy from previous)
sog_arr = np.append(computed_sog, computed_sog[-1:] if len(computed_sog) > 0 else [0])
cog_arr = np.append(bearing, bearing[-1:] if len(bearing) > 0 else [0])
# Reset at MMSI boundaries
mmsi_vals = df['mmsi'].values
boundary = np.where(mmsi_vals[:-1] != mmsi_vals[1:])[0]
for idx in boundary:
sog_arr[idx + 1] = df['raw_sog'].iloc[idx + 1] if 'raw_sog' in df.columns else 0
cog_arr[idx + 1] = 0
# Where computed SOG is 0 or NaN, fall back to raw_sog
df['sog'] = sog_arr
if 'raw_sog' in df.columns:
mask = (df['sog'] == 0) | np.isnan(df['sog'])
df.loc[mask, 'sog'] = df.loc[mask, 'raw_sog'].fillna(0)
df['cog'] = cog_arr
return df
class VesselStore:
"""In-memory vessel trajectory store for Korean waters vessel data.
Maintains a 24-hour sliding window of all vessel tracks and supports
incremental 5-minute updates. Chinese vessel (MMSI 412*) filtering
is applied only at analysis target selection time.
"""
def __init__(self) -> None:
self._tracks: dict[str, pd.DataFrame] = {}
self._last_bucket: Optional[datetime] = None
self._static_info: dict[str, dict] = {}
self._permit_set: set[str] = set()
self._static_refreshed_at: Optional[datetime] = None
self._permit_refreshed_at: Optional[datetime] = None
# ------------------------------------------------------------------
# Public load / update methods
# ------------------------------------------------------------------
def load_initial(self, hours: int = 24) -> None:
"""Load all Korean waters vessel data for the past N hours.
Fetches a bulk DataFrame from snpdb, groups by MMSI, and stores
each vessel's track separately. Also triggers static info and
permit registry refresh.
"""
from db import snpdb
logger.info('loading initial vessel tracks (last %dh)...', hours)
try:
df_all = snpdb.fetch_all_tracks(hours)
except Exception as e:
logger.error('fetch_all_tracks failed: %s', e)
return
if df_all.empty:
logger.warning('fetch_all_tracks returned empty DataFrame')
return
# Rename sog column to raw_sog to preserve original AIS-reported speed
if 'sog' in df_all.columns and 'raw_sog' not in df_all.columns:
df_all = df_all.rename(columns={'sog': 'raw_sog'})
self._tracks = {}
for mmsi, group in df_all.groupby('mmsi'):
self._tracks[str(mmsi)] = group.reset_index(drop=True)
vessel_count = len(self._tracks)
point_count = sum(len(v) for v in self._tracks.values())
logger.info(
'initial load complete: %d vessels, %d total points',
vessel_count,
point_count,
)
self.refresh_static_info()
self.refresh_permit_registry()
def merge_incremental(self, df_new: pd.DataFrame) -> None:
"""Merge a new batch of vessel positions into the in-memory store.
Deduplicates by timestamp within each MMSI and updates _last_bucket.
"""
if df_new.empty:
logger.debug('merge_incremental called with empty DataFrame, skipping')
return
if 'sog' in df_new.columns and 'raw_sog' not in df_new.columns:
df_new = df_new.rename(columns={'sog': 'raw_sog'})
new_buckets: list[datetime] = []
for mmsi, group in df_new.groupby('mmsi'):
mmsi_str = str(mmsi)
if mmsi_str in self._tracks:
combined = pd.concat([self._tracks[mmsi_str], group], ignore_index=True)
combined = combined.drop_duplicates(subset=['timestamp'])
self._tracks[mmsi_str] = combined.reset_index(drop=True)
else:
self._tracks[mmsi_str] = group.reset_index(drop=True)
if 'time_bucket' in group.columns and not group['time_bucket'].empty:
bucket_vals = pd.to_datetime(group['time_bucket'].dropna())
if not bucket_vals.empty:
new_buckets.append(bucket_vals.max().to_pydatetime())
if new_buckets:
latest = max(new_buckets)
if isinstance(latest, datetime) and latest.tzinfo is None:
latest = latest.replace(tzinfo=timezone.utc)
if self._last_bucket is None or latest > self._last_bucket:
self._last_bucket = latest
logger.debug(
'incremental merge done: %d mmsis in batch, store has %d vessels',
df_new['mmsi'].nunique(),
len(self._tracks),
)
def evict_stale(self, hours: int = 24) -> None:
"""Remove track points older than N hours and evict empty MMSI entries."""
import datetime as _dt
now = datetime.now(timezone.utc)
cutoff_aware = now - _dt.timedelta(hours=hours)
cutoff_naive = cutoff_aware.replace(tzinfo=None)
before_total = sum(len(v) for v in self._tracks.values())
evicted_mmsis: list[str] = []
for mmsi in list(self._tracks.keys()):
df = self._tracks[mmsi]
ts_col = df['timestamp']
# Handle tz-aware and tz-naive timestamps uniformly
if hasattr(ts_col.dtype, 'tz') and ts_col.dtype.tz is not None:
mask = ts_col >= pd.Timestamp(cutoff_aware)
else:
mask = ts_col >= pd.Timestamp(cutoff_naive)
filtered = df[mask].reset_index(drop=True)
if filtered.empty:
del self._tracks[mmsi]
evicted_mmsis.append(mmsi)
else:
self._tracks[mmsi] = filtered
after_total = sum(len(v) for v in self._tracks.values())
logger.info(
'eviction complete: removed %d points, evicted %d mmsis (threshold=%dh)',
before_total - after_total,
len(evicted_mmsis),
hours,
)
def refresh_static_info(self) -> None:
"""Fetch vessel static info (type, name, dimensions) from snpdb.
Skips refresh if called within the last 60 minutes.
"""
now = datetime.now(timezone.utc)
if self._static_refreshed_at is not None:
elapsed_min = (now - self._static_refreshed_at).total_seconds() / 60
if elapsed_min < _STATIC_REFRESH_INTERVAL_MIN:
logger.debug(
'static info refresh skipped (%.1f min since last refresh)',
elapsed_min,
)
return
if not self._tracks:
logger.debug('no tracks in store, skipping static info refresh')
return
from db import snpdb
mmsi_list = list(self._tracks.keys())
try:
info = snpdb.fetch_static_info(mmsi_list)
self._static_info.update(info)
self._static_refreshed_at = now
logger.info('static info refreshed: %d vessels', len(info))
except Exception as e:
logger.error('fetch_static_info failed: %s', e)
def refresh_permit_registry(self) -> None:
"""Fetch permitted Chinese fishing vessel MMSIs from snpdb.
Skips refresh if called within the last 30 minutes.
"""
now = datetime.now(timezone.utc)
if self._permit_refreshed_at is not None:
elapsed_min = (now - self._permit_refreshed_at).total_seconds() / 60
if elapsed_min < _PERMIT_REFRESH_INTERVAL_MIN:
logger.debug(
'permit registry refresh skipped (%.1f min since last refresh)',
elapsed_min,
)
return
from db import snpdb
try:
mmsis = snpdb.fetch_permit_mmsis()
self._permit_set = set(mmsis)
self._permit_refreshed_at = now
logger.info('permit registry refreshed: %d permitted vessels', len(self._permit_set))
except Exception as e:
logger.error('fetch_permit_mmsis failed: %s', e)
# ------------------------------------------------------------------
# Analysis target selection
# ------------------------------------------------------------------
def select_analysis_targets(self) -> pd.DataFrame:
"""Build a combined DataFrame of Chinese vessel tracks with computed SOG/COG.
Filters to MMSI starting with '412', computes SOG and COG from
consecutive lat/lon/timestamp pairs using the haversine formula,
and falls back to raw_sog where computed values are zero or NaN.
Returns:
DataFrame with columns: mmsi, timestamp, lat, lon, sog, cog
"""
chinese_mmsis = [m for m in self._tracks if m.startswith(_CHINESE_MMSI_PREFIX)]
if not chinese_mmsis:
logger.info('no Chinese vessels (412*) found in store')
return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'sog', 'cog'])
frames = [self._tracks[m] for m in chinese_mmsis]
combined = pd.concat(frames, ignore_index=True)
required_cols = {'mmsi', 'timestamp', 'lat', 'lon'}
missing = required_cols - set(combined.columns)
if missing:
logger.error('combined DataFrame missing required columns: %s', missing)
return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'sog', 'cog'])
result = _compute_sog_cog(combined)
output_cols = ['mmsi', 'timestamp', 'lat', 'lon', 'sog', 'cog']
available = [c for c in output_cols if c in result.columns]
return result[available].reset_index(drop=True)
# ------------------------------------------------------------------
# Lookup helpers
# ------------------------------------------------------------------
def is_permitted(self, mmsi: str) -> bool:
"""Return True if the given MMSI is in the permitted Chinese fishing vessel registry."""
return mmsi in self._permit_set
def get_vessel_info(self, mmsi: str) -> dict:
"""Return static vessel info dict for the given MMSI, or empty dict if not found."""
return self._static_info.get(mmsi, {})
# ------------------------------------------------------------------
# Properties
# ------------------------------------------------------------------
@property
def last_bucket(self) -> Optional[datetime]:
"""Return the latest time bucket seen across all merged incremental batches."""
return self._last_bucket
# ------------------------------------------------------------------
# Diagnostics
# ------------------------------------------------------------------
def stats(self) -> dict:
"""Return store statistics for health/status reporting."""
total_points = sum(len(v) for v in self._tracks.values())
chinese_count = sum(1 for m in self._tracks if m.startswith(_CHINESE_MMSI_PREFIX))
# Rough memory estimate: each row ~200 bytes across columns
memory_mb = round((total_points * 200) / (1024 * 1024), 2)
return {
'vessels': len(self._tracks),
'points': total_points,
'memory_mb': memory_mb,
'last_bucket': self._last_bucket.isoformat() if self._last_bucket else None,
'targets': chinese_count,
'permitted': len(self._permit_set),
}
# Module-level singleton
vessel_store = VesselStore()

40
prediction/config.py Normal file
파일 보기

@ -0,0 +1,40 @@
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# snpdb (궤적 데이터 소스)
SNPDB_HOST: str = '211.208.115.83'
SNPDB_PORT: int = 5432
SNPDB_NAME: str = 'snpdb'
SNPDB_USER: str = 'snp'
SNPDB_PASSWORD: str
# kcgdb (분석 결과 저장)
KCGDB_HOST: str = '211.208.115.83'
KCGDB_PORT: int = 5432
KCGDB_NAME: str = 'kcgdb'
KCGDB_SCHEMA: str = 'kcg'
KCGDB_USER: str = 'kcg_app'
KCGDB_PASSWORD: str
# 스케줄러
SCHEDULER_INTERVAL_MIN: int = 5
# 인메모리 캐시
CACHE_WINDOW_HOURS: int = 24
INITIAL_LOAD_HOURS: int = 24
STATIC_INFO_REFRESH_MIN: int = 60
PERMIT_REFRESH_MIN: int = 30
# 파이프라인
TRAJECTORY_HOURS: int = 6
MMSI_PREFIX: str = '412'
MIN_TRAJ_POINTS: int = 100
# 로깅
LOG_LEVEL: str = 'INFO'
model_config = {'env_file': '.env', 'env_file_encoding': 'utf-8', 'extra': 'ignore'}
settings = Settings()

파일 보기

@ -0,0 +1 @@
{"points": [{"lat": 37.0, "lon": 124.0}, {"lat": 35.0, "lon": 129.0}]}

파일 보기

134
prediction/db/kcgdb.py Normal file
파일 보기

@ -0,0 +1,134 @@
import logging
from contextlib import contextmanager
from typing import TYPE_CHECKING, Optional
import psycopg2
from psycopg2 import pool
from psycopg2.extras import execute_values
from config import settings
if TYPE_CHECKING:
from models.result import AnalysisResult
logger = logging.getLogger(__name__)
_pool: Optional[pool.ThreadedConnectionPool] = None
def init_pool():
global _pool
_pool = pool.ThreadedConnectionPool(
minconn=1,
maxconn=3,
host=settings.KCGDB_HOST,
port=settings.KCGDB_PORT,
dbname=settings.KCGDB_NAME,
user=settings.KCGDB_USER,
password=settings.KCGDB_PASSWORD,
options=f'-c search_path={settings.KCGDB_SCHEMA},public',
)
logger.info('kcgdb connection pool initialized')
def close_pool():
global _pool
if _pool:
_pool.closeall()
_pool = None
logger.info('kcgdb connection pool closed')
@contextmanager
def get_conn():
conn = _pool.getconn()
try:
yield conn
except Exception:
conn.rollback()
raise
finally:
_pool.putconn(conn)
def check_health() -> bool:
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute('SELECT 1')
return True
except Exception as e:
logger.error('kcgdb health check failed: %s', e)
return False
def upsert_results(results: list['AnalysisResult']) -> int:
"""분석 결과를 vessel_analysis_results 테이블에 upsert."""
if not results:
return 0
insert_sql = """
INSERT INTO vessel_analysis_results (
mmsi, timestamp, vessel_type, confidence, fishing_pct,
cluster_id, season, zone, dist_to_baseline_nm, activity_state,
ucaf_score, ucft_score, is_dark, gap_duration_min,
spoofing_score, bd09_offset_m, speed_jump_count,
cluster_size, is_leader, fleet_role,
risk_score, risk_level, features, analyzed_at
) VALUES %s
ON CONFLICT (mmsi, timestamp) DO UPDATE SET
vessel_type = EXCLUDED.vessel_type,
confidence = EXCLUDED.confidence,
fishing_pct = EXCLUDED.fishing_pct,
cluster_id = EXCLUDED.cluster_id,
season = EXCLUDED.season,
zone = EXCLUDED.zone,
dist_to_baseline_nm = EXCLUDED.dist_to_baseline_nm,
activity_state = EXCLUDED.activity_state,
ucaf_score = EXCLUDED.ucaf_score,
ucft_score = EXCLUDED.ucft_score,
is_dark = EXCLUDED.is_dark,
gap_duration_min = EXCLUDED.gap_duration_min,
spoofing_score = EXCLUDED.spoofing_score,
bd09_offset_m = EXCLUDED.bd09_offset_m,
speed_jump_count = EXCLUDED.speed_jump_count,
cluster_size = EXCLUDED.cluster_size,
is_leader = EXCLUDED.is_leader,
fleet_role = EXCLUDED.fleet_role,
risk_score = EXCLUDED.risk_score,
risk_level = EXCLUDED.risk_level,
features = EXCLUDED.features,
analyzed_at = EXCLUDED.analyzed_at
"""
try:
with get_conn() as conn:
with conn.cursor() as cur:
tuples = [r.to_db_tuple() for r in results]
execute_values(cur, insert_sql, tuples, page_size=100)
conn.commit()
count = len(tuples)
logger.info('upserted %d analysis results', count)
return count
except Exception as e:
logger.error('failed to upsert results: %s', e)
return 0
def cleanup_old(hours: int = 48) -> int:
"""오래된 분석 결과 삭제."""
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
'DELETE FROM vessel_analysis_results WHERE analyzed_at < NOW() - (%s * INTERVAL \'1 hour\')',
(hours,),
)
deleted = cur.rowcount
conn.commit()
if deleted > 0:
logger.info('cleaned up %d old results (older than %dh)', deleted, hours)
return deleted
except Exception as e:
logger.error('failed to cleanup old results: %s', e)
return 0

187
prediction/db/snpdb.py Normal file
파일 보기

@ -0,0 +1,187 @@
import logging
from contextlib import contextmanager
from datetime import datetime
from typing import Optional
import pandas as pd
import psycopg2
from psycopg2 import pool
from config import settings
logger = logging.getLogger(__name__)
_pool: Optional[pool.ThreadedConnectionPool] = None
def init_pool():
global _pool
_pool = pool.ThreadedConnectionPool(
minconn=1,
maxconn=3,
host=settings.SNPDB_HOST,
port=settings.SNPDB_PORT,
dbname=settings.SNPDB_NAME,
user=settings.SNPDB_USER,
password=settings.SNPDB_PASSWORD,
)
logger.info('snpdb connection pool initialized')
def close_pool():
global _pool
if _pool:
_pool.closeall()
_pool = None
logger.info('snpdb connection pool closed')
@contextmanager
def get_conn():
conn = _pool.getconn()
try:
yield conn
finally:
_pool.putconn(conn)
def check_health() -> bool:
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute('SELECT 1')
return True
except Exception as e:
logger.error('snpdb health check failed: %s', e)
return False
def fetch_all_tracks(hours: int = 24) -> pd.DataFrame:
"""한국 해역 전 선박의 궤적 포인트를 조회한다.
LineStringM 지오메트리에서 개별 포인트를 추출하며,
한국 해역(124-132E, 32-39N) 최근 N시간 데이터를 반환한다.
"""
query = f"""
SELECT
t.mmsi,
to_timestamp(ST_M((dp).geom)) as timestamp,
ST_Y((dp).geom) as lat,
ST_X((dp).geom) as lon,
CASE
WHEN (dp).path[1] = 1 THEN (t.start_position->>'sog')::float
ELSE COALESCE((t.end_position->>'sog')::float, t.avg_speed::float)
END as raw_sog
FROM signal.t_vessel_tracks_5min t,
LATERAL ST_DumpPoints(t.track_geom) dp
WHERE t.time_bucket >= NOW() - INTERVAL '{hours} hours'
AND t.track_geom && ST_MakeEnvelope(124, 32, 132, 39, 4326)
ORDER BY t.mmsi, to_timestamp(ST_M((dp).geom))
"""
try:
with get_conn() as conn:
df = pd.read_sql_query(query, conn)
logger.info(
'fetch_all_tracks: %d rows, %d vessels (last %dh)',
len(df),
df['mmsi'].nunique() if len(df) > 0 else 0,
hours,
)
return df
except Exception as e:
logger.error('fetch_all_tracks failed: %s', e)
return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'raw_sog'])
def fetch_incremental(last_bucket: datetime) -> pd.DataFrame:
"""last_bucket 이후의 신규 궤적 포인트를 조회한다.
스케줄러 증분 업데이트에 사용되며, time_bucket > last_bucket 조건으로
이미 처리한 버킷을 건너뛴다.
"""
query = """
SELECT
t.mmsi,
to_timestamp(ST_M((dp).geom)) as timestamp,
ST_Y((dp).geom) as lat,
ST_X((dp).geom) as lon,
CASE
WHEN (dp).path[1] = 1 THEN (t.start_position->>'sog')::float
ELSE COALESCE((t.end_position->>'sog')::float, t.avg_speed::float)
END as raw_sog
FROM signal.t_vessel_tracks_5min t,
LATERAL ST_DumpPoints(t.track_geom) dp
WHERE t.time_bucket > %s
AND t.track_geom && ST_MakeEnvelope(124, 32, 132, 39, 4326)
ORDER BY t.mmsi, to_timestamp(ST_M((dp).geom))
"""
try:
with get_conn() as conn:
df = pd.read_sql_query(query, conn, params=(last_bucket,))
logger.info(
'fetch_incremental: %d rows, %d vessels (since %s)',
len(df),
df['mmsi'].nunique() if len(df) > 0 else 0,
last_bucket.isoformat(),
)
return df
except Exception as e:
logger.error('fetch_incremental failed: %s', e)
return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'raw_sog'])
def fetch_static_info(mmsi_list: list[str]) -> dict[str, dict]:
"""MMSI 목록에 해당하는 선박 정적 정보를 조회한다.
DISTINCT ON (mmsi) 최신 레코드만 반환한다.
"""
query = """
SELECT DISTINCT ON (mmsi) mmsi, name, vessel_type, length, width
FROM signal.t_vessel_static
WHERE mmsi = ANY(%s)
ORDER BY mmsi, time_bucket DESC
"""
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(query, (mmsi_list,))
rows = cur.fetchall()
result = {
row[0]: {
'name': row[1],
'vessel_type': row[2],
'length': row[3],
'width': row[4],
}
for row in rows
}
logger.info('fetch_static_info: %d vessels resolved', len(result))
return result
except Exception as e:
logger.error('fetch_static_info failed: %s', e)
return {}
def fetch_permit_mmsis() -> set[str]:
"""중국 허가어선 MMSI 목록을 조회한다.
signal.t_chnprmship_positions 테이블에서 DISTINCT mmsi를 반환한다.
"""
query = """
SELECT DISTINCT mmsi FROM signal.t_chnprmship_positions
"""
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(query)
rows = cur.fetchall()
result = {row[0] for row in rows}
logger.info('fetch_permit_mmsis: %d permitted vessels', len(result))
return result
except Exception as e:
logger.error('fetch_permit_mmsis failed: %s', e)
return set()

25
prediction/env.example Normal file
파일 보기

@ -0,0 +1,25 @@
# snpdb (궤적 데이터 소스)
SNPDB_HOST=211.208.115.83
SNPDB_PORT=5432
SNPDB_NAME=snpdb
SNPDB_USER=snp
SNPDB_PASSWORD=snp#8932
# kcgdb (분석 결과 저장)
KCGDB_HOST=211.208.115.83
KCGDB_PORT=5432
KCGDB_NAME=kcgdb
KCGDB_SCHEMA=kcg
KCGDB_USER=kcg_app
KCGDB_PASSWORD=Kcg2026monitor
# 스케줄러
SCHEDULER_INTERVAL_MIN=5
# 파이프라인
TRAJECTORY_HOURS=6
MMSI_PREFIX=412
MIN_TRAJ_POINTS=100
# 로깅
LOG_LEVEL=INFO

파일 보기

@ -1,8 +1,66 @@
from fastapi import FastAPI
import logging
import sys
from contextlib import asynccontextmanager
app = FastAPI(title="KCG Prediction Service", version="0.1.0")
from fastapi import BackgroundTasks, FastAPI
from config import settings
from db import kcgdb, snpdb
from scheduler import get_last_run, run_analysis_cycle, start_scheduler, stop_scheduler
logging.basicConfig(
level=getattr(logging, settings.LOG_LEVEL, logging.INFO),
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s',
stream=sys.stdout,
)
logger = logging.getLogger(__name__)
@app.get("/health")
@asynccontextmanager
async def lifespan(application: FastAPI):
from cache.vessel_store import vessel_store
logger.info('starting KCG Prediction Service')
snpdb.init_pool()
kcgdb.init_pool()
# 인메모리 캐시 초기 로드 (24시간)
logger.info('loading initial vessel data (%dh)...', settings.INITIAL_LOAD_HOURS)
vessel_store.load_initial(settings.INITIAL_LOAD_HOURS)
logger.info('initial load complete: %s', vessel_store.stats())
start_scheduler()
yield
stop_scheduler()
snpdb.close_pool()
kcgdb.close_pool()
logger.info('KCG Prediction Service stopped')
app = FastAPI(
title='KCG Prediction Service',
version='2.0.0',
lifespan=lifespan,
)
@app.get('/health')
def health_check():
return {"status": "ok"}
from cache.vessel_store import vessel_store
return {
'status': 'ok',
'snpdb': snpdb.check_health(),
'kcgdb': kcgdb.check_health(),
'store': vessel_store.stats(),
}
@app.get('/api/v1/analysis/status')
def analysis_status():
return get_last_run()
@app.post('/api/v1/analysis/trigger')
def trigger_analysis(background_tasks: BackgroundTasks):
background_tasks.add_task(run_analysis_cycle)
return {'message': 'analysis cycle triggered'}

파일 보기

38
prediction/models/ais.py Normal file
파일 보기

@ -0,0 +1,38 @@
from dataclasses import dataclass, field
from typing import List, Dict
import pandas as pd
@dataclass
class AISPoint:
mmsi: str
ts: pd.Timestamp
lat: float
lon: float
sog: float
cog: float
state: str = 'UNKNOWN'
@dataclass
class VesselTrajectory:
mmsi: str
points: List[AISPoint] = field(default_factory=list)
vessel_type: str = 'UNKNOWN'
cluster_id: int = -1
season: str = 'UNKNOWN'
fishing_pct: float = 0.0
features: Dict = field(default_factory=dict)
@dataclass
class ClassificationResult:
mmsi: str
vessel_type: str
confidence: float
dominant_state: str
fishing_pct: float
cluster_id: int
season: str
feature_vector: Dict

파일 보기

@ -0,0 +1,84 @@
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional
@dataclass
class AnalysisResult:
"""vessel_analysis_results 테이블 28컬럼 매핑."""
mmsi: str
timestamp: datetime
# 분류 결과
vessel_type: str = 'UNKNOWN'
confidence: float = 0.0
fishing_pct: float = 0.0
cluster_id: int = -1
season: str = 'UNKNOWN'
# ALGO 01: 위치
zone: str = 'EEZ_OR_BEYOND'
dist_to_baseline_nm: float = 999.0
# ALGO 02: 활동 상태
activity_state: str = 'UNKNOWN'
ucaf_score: float = 0.0
ucft_score: float = 0.0
# ALGO 03: 다크 베셀
is_dark: bool = False
gap_duration_min: int = 0
# ALGO 04: GPS 스푸핑
spoofing_score: float = 0.0
bd09_offset_m: float = 0.0
speed_jump_count: int = 0
# ALGO 05+06: 선단
cluster_size: int = 0
is_leader: bool = False
fleet_role: str = 'NOISE'
# ALGO 07: 위험도
risk_score: int = 0
risk_level: str = 'LOW'
# 특징 벡터
features: dict = field(default_factory=dict)
# 메타
analyzed_at: Optional[datetime] = None
def __post_init__(self):
if self.analyzed_at is None:
self.analyzed_at = datetime.now(timezone.utc)
def to_db_tuple(self) -> tuple:
import json
return (
self.mmsi,
self.timestamp,
self.vessel_type,
self.confidence,
self.fishing_pct,
self.cluster_id,
self.season,
self.zone,
self.dist_to_baseline_nm,
self.activity_state,
self.ucaf_score,
self.ucft_score,
self.is_dark,
self.gap_duration_min,
self.spoofing_score,
self.bd09_offset_m,
self.speed_jump_count,
self.cluster_size,
self.is_leader,
self.fleet_role,
self.risk_score,
self.risk_level,
json.dumps(self.features),
self.analyzed_at,
)

파일 보기

파일 보기

@ -0,0 +1,31 @@
import pandas as pd
from pipeline.constants import SOG_STATIONARY_MAX, SOG_FISHING_MAX
class BehaviorDetector:
"""
속도 기반 3단계 행동 분류 (Yan et al. 2022, Natale et al. 2015)
정박(STATIONARY) / 조업(FISHING) / 항행(SAILING)
"""
@staticmethod
def classify_point(sog: float) -> str:
if sog < SOG_STATIONARY_MAX:
return 'STATIONARY'
elif sog <= SOG_FISHING_MAX:
return 'FISHING'
else:
return 'SAILING'
def detect(self, df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
df['state'] = df['sog'].apply(self.classify_point)
return df
@staticmethod
def compute_fishing_ratio(df_vessel: pd.DataFrame) -> float:
total = len(df_vessel)
if total == 0:
return 0.0
fishing = (df_vessel['state'] == 'FISHING').sum()
return round(fishing / total * 100, 2)

파일 보기

@ -0,0 +1,100 @@
import pandas as pd
from typing import Dict, Tuple
class VesselTypeClassifier:
"""
Rule-based scoring classifier for fishing vessel types.
Scoring: for each feature in a type's profile, if the value falls within
the defined range a distance-based score is added (closer to the range
centre = higher score). Values outside the range incur a penalty.
Returns (vessel_type, confidence).
TRAWL trawling speed 2.54.5 kt, high COG variation
PURSE purse-seine speed 35 kt, circular COG pattern
LONGLINE longline speed 0.52 kt, low COG variation, long fishing runs
TRAP trap/pot speed ~0 kt, many stationary events, short range
"""
PROFILES: Dict[str, Dict[str, Tuple[float, float]]] = {
'TRAWL': {
'sog_fishing_mean': (2.5, 4.5),
'cog_change_mean': (0.15, 9.9),
'fishing_pct': (0.3, 0.7),
'fishing_run_mean': (5, 50),
'stationary_events': (0, 5),
},
'PURSE': {
'sog_fishing_mean': (3.0, 5.0),
'cog_circularity': (0.2, 1.0),
'fishing_pct': (0.1, 0.5),
'fishing_run_mean': (3, 30),
'stationary_events': (0, 3),
},
'LONGLINE': {
'sog_fishing_mean': (0.5, 2.5),
'cog_change_mean': (0.0, 0.15),
'fishing_pct': (0.4, 0.9),
'fishing_run_mean': (20, 999),
'stationary_events': (0, 10),
},
'TRAP': {
'sog_fishing_mean': (0.0, 2.0),
'stationary_pct': (0.2, 0.8),
'stationary_events': (5, 999),
'fishing_run_mean': (1, 10),
'total_distance_km': (0, 100),
},
}
def classify(self, features: Dict) -> Tuple[str, float]:
"""Classify a vessel from its feature dict.
Returns:
(vessel_type, confidence) where confidence is in [0, 1].
"""
if not features:
return 'UNKNOWN', 0.0
scores: Dict[str, float] = {}
for vtype, profile in self.PROFILES.items():
score = 0.0
matched = 0
for feat_name, (lo, hi) in profile.items():
val = features.get(feat_name)
if val is None:
continue
matched += 1
if lo <= val <= hi:
mid = (lo + hi) / 2
span = (hi - lo) / 2 if (hi - lo) > 0 else 1
score += max(0.0, 1 - abs(val - mid) / span)
else:
overshoot = min(abs(val - lo), abs(val - hi))
score -= min(0.5, overshoot / (hi - lo + 1e-9))
scores[vtype] = score / matched if matched > 0 else 0.0
best_type = max(scores, key=lambda k: scores[k])
total = sum(max(v, 0.0) for v in scores.values())
confidence = scores[best_type] / total if total > 0 else 0.0
return best_type, round(confidence, 3)
def get_season(ts: pd.Timestamp) -> str:
"""Return the Northern-Hemisphere season for a timestamp.
Reference: paper 12 seasonal activity analysis (Chinese EEZ).
Chinese fishing ban period: Yellow Sea / East China Sea MaySep,
South China Sea MayAug.
"""
m = ts.month
if m in [3, 4, 5]:
return 'SPRING'
elif m in [6, 7, 8]:
return 'SUMMER'
elif m in [9, 10, 11]:
return 'FALL'
else:
return 'WINTER'

파일 보기

@ -0,0 +1,101 @@
from collections import Counter
from typing import Dict, Optional
import numpy as np
import pandas as pd
from pipeline.constants import BIRCH_THRESHOLD, BIRCH_BRANCHING, MIN_CLUSTER_SIZE
class EnhancedBIRCHClusterer:
"""Trajectory clustering using sklearn Birch with a simple K-means fallback.
Based on the enhanced-BIRCH approach (Yan, Yang et al.):
1. Resample each trajectory to a fixed-length vector.
2. Build a BIRCH CF-tree for memory-efficient hierarchical clustering.
3. Small clusters (< MIN_CLUSTER_SIZE) are relabelled as noise (-1).
"""
def __init__(
self,
threshold: float = BIRCH_THRESHOLD,
branching: int = BIRCH_BRANCHING,
n_clusters: Optional[int] = None,
) -> None:
self.threshold = threshold
self.branching = branching
self.n_clusters = n_clusters
self._model = None
def _traj_to_vector(self, df_vessel: pd.DataFrame, n_points: int = 20) -> np.ndarray:
"""Convert a vessel trajectory DataFrame to a fixed-length vector.
Linearly samples n_points from the trajectory and interleaves lat/lon
values, then normalises to zero mean / unit variance.
"""
lats = df_vessel['lat'].values
lons = df_vessel['lon'].values
idx = np.linspace(0, len(lats) - 1, n_points).astype(int)
vec = np.concatenate([lats[idx], lons[idx]])
vec = (vec - vec.mean()) / (vec.std() + 1e-9)
return vec
def fit_predict(self, vessels: Dict[str, pd.DataFrame]) -> Dict[str, int]:
"""Cluster vessel trajectories.
Args:
vessels: mapping of mmsi -> resampled trajectory DataFrame.
Returns:
Mapping of mmsi -> cluster_id. Vessels in small clusters are
assigned cluster_id -1 (noise). Vessels with fewer than 20
points are excluded from the result.
"""
mmsi_list: list[str] = []
vectors: list[np.ndarray] = []
for mmsi, df_v in vessels.items():
if len(df_v) < 20:
continue
mmsi_list.append(mmsi)
vectors.append(self._traj_to_vector(df_v))
if len(vectors) < 3:
return {m: 0 for m in mmsi_list}
X = np.array(vectors)
try:
from sklearn.cluster import Birch
model = Birch(
threshold=self.threshold,
branching_factor=self.branching,
n_clusters=self.n_clusters,
)
labels = model.fit_predict(X)
self._model = model
except ImportError:
labels = self._simple_cluster(X)
cnt = Counter(labels)
labels = np.array([lbl if cnt[lbl] >= MIN_CLUSTER_SIZE else -1 for lbl in labels])
return dict(zip(mmsi_list, labels.tolist()))
@staticmethod
def _simple_cluster(X: np.ndarray, k: int = 5) -> np.ndarray:
"""Fallback K-means used when sklearn is unavailable."""
n = len(X)
k = min(k, n)
centers = X[np.random.choice(n, k, replace=False)]
labels = np.zeros(n, dtype=int)
for _ in range(20):
dists = np.array([[np.linalg.norm(x - c) for c in centers] for x in X])
labels = dists.argmin(axis=1)
new_centers = np.array(
[X[labels == i].mean(axis=0) if (labels == i).any() else centers[i] for i in range(k)]
)
if np.allclose(centers, new_centers, atol=1e-6):
break
centers = new_centers
return labels

파일 보기

@ -0,0 +1,26 @@
SOG_STATIONARY_MAX = 1.0
SOG_FISHING_MAX = 5.0
SOG_SAILING_MIN = 5.0
VESSEL_SOG_PROFILE = {
'TRAWL': {'min': 1.5, 'max': 4.5, 'mean': 2.8, 'cog_var': 'high'},
'PURSE': {'min': 2.0, 'max': 5.0, 'mean': 3.5, 'cog_var': 'circular'},
'LONGLINE': {'min': 0.5, 'max': 3.0, 'mean': 1.8, 'cog_var': 'low'},
'TRAP': {'min': 0.0, 'max': 2.0, 'mean': 0.8, 'cog_var': 'very_low'},
}
RESAMPLE_INTERVAL_MIN = 4
BIRCH_THRESHOLD = 0.35
BIRCH_BRANCHING = 50
MIN_CLUSTER_SIZE = 5
MMSI_DIGITS = 9
MAX_VESSEL_LENGTH = 300
MAX_SOG_KNOTS = 30.0
MIN_TRAJ_POINTS = 100
KR_BOUNDS = {
'lat_min': 32.0, 'lat_max': 39.0,
'lon_min': 124.0, 'lon_max': 132.0,
}

파일 보기

@ -0,0 +1,93 @@
import math
import numpy as np
import pandas as pd
from typing import Dict
class FeatureExtractor:
"""
어선 유형 분류를 위한 특징 벡터 추출
논문 12 (남중국해 어선 유형 식별) 기반 핵심 피처:
- 속도 통계 (mean, std, 분위수)
- 침로 변동성 (COG variance 선회 패턴)
- 조업 비율 조업 지속 시간
- 이동 거리 해역 커버리지
- 정박 빈도 (투망/양망 간격 추정)
"""
@staticmethod
def haversine(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""두 좌표 간 거리 (km)"""
R = 6371.0
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlam = math.radians(lon2 - lon1)
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def extract(self, df_vessel: pd.DataFrame) -> Dict[str, float]:
if len(df_vessel) < 10:
return {}
sog = df_vessel['sog'].values
cog = df_vessel['cog'].values
states = df_vessel['state'].values
# Speed features
fishing_sog = sog[states == 'FISHING'] if (states == 'FISHING').any() else np.array([0])
feat: Dict[str, float] = {
'sog_mean': float(np.mean(sog)),
'sog_std': float(np.std(sog)),
'sog_fishing_mean': float(np.mean(fishing_sog)),
'sog_fishing_std': float(np.std(fishing_sog)),
'sog_q25': float(np.percentile(sog, 25)),
'sog_q75': float(np.percentile(sog, 75)),
}
# COG features (선망: 원형, 트롤: 직선왕복, 연승: 부드러운 곡선)
cog_diff = np.abs(np.diff(np.unwrap(np.radians(cog))))
feat['cog_change_mean'] = float(np.mean(cog_diff))
feat['cog_change_std'] = float(np.std(cog_diff))
feat['cog_circularity'] = float(np.sum(cog_diff > np.pi / 4) / len(cog_diff))
# State ratios
n = len(states)
feat['fishing_pct'] = float((states == 'FISHING').sum() / n)
feat['stationary_pct'] = float((states == 'STATIONARY').sum() / n)
feat['sailing_pct'] = float((states == 'SAILING').sum() / n)
# Stationary events (투망·양망 횟수 추정)
stationary_events = 0
prev = None
for s in states:
if s == 'STATIONARY' and prev != 'STATIONARY':
stationary_events += 1
prev = s
feat['stationary_events'] = float(stationary_events)
# Total distance (km)
lats = df_vessel['lat'].values
lons = df_vessel['lon'].values
total_dist = sum(
self.haversine(lats[i], lons[i], lats[i + 1], lons[i + 1])
for i in range(len(lats) - 1)
)
feat['total_distance_km'] = round(total_dist, 2)
# Coverage (바운딩 박스 면적 — 근사)
feat['coverage_deg2'] = round(float(np.ptp(lats)) * float(np.ptp(lons)), 4)
# Average fishing run length
fishing_runs = []
run = 0
for s in states:
if s == 'FISHING':
run += 1
elif run > 0:
fishing_runs.append(run)
run = 0
if run > 0:
fishing_runs.append(run)
feat['fishing_run_mean'] = float(np.mean(fishing_runs)) if fishing_runs else 0.0
return feat

파일 보기

@ -0,0 +1,95 @@
import logging
import pandas as pd
from pipeline.preprocessor import AISPreprocessor
from pipeline.behavior import BehaviorDetector
from pipeline.resampler import TrajectoryResampler
from pipeline.features import FeatureExtractor
from pipeline.classifier import VesselTypeClassifier, get_season
from pipeline.clusterer import EnhancedBIRCHClusterer
from pipeline.constants import RESAMPLE_INTERVAL_MIN
logger = logging.getLogger(__name__)
class ChineseFishingVesselPipeline:
"""7-step pipeline for classifying Chinese fishing vessel activity types.
Steps:
1. AIS preprocessing (Yan et al. 2022)
2. Behaviour-state detection (speed-based 3-class)
3. Trajectory resampling (Yan, Yang et al. 4-minute interval)
4. Feature vector extraction (paper 12)
5. Vessel-type classification (rule-based scoring)
6. Enhanced BIRCH trajectory clustering (Yan, Yang et al.)
7. Seasonal activity tagging (paper 12)
"""
def __init__(self) -> None:
self.preprocessor = AISPreprocessor()
self.detector = BehaviorDetector()
self.resampler = TrajectoryResampler(RESAMPLE_INTERVAL_MIN)
self.extractor = FeatureExtractor()
self.classifier = VesselTypeClassifier()
self.clusterer = EnhancedBIRCHClusterer()
def run(
self, df_raw: pd.DataFrame
) -> tuple[list[dict], dict[str, pd.DataFrame]]:
"""Run the 7-step pipeline.
Args:
df_raw: raw AIS DataFrame with columns mmsi, timestamp, lat, lon,
sog, cog.
Returns:
(results, vessel_dfs) where:
- results is a list of classification dicts, each containing:
mmsi, vessel_type, confidence, fishing_pct, cluster_id, season,
n_points, features.
- vessel_dfs is a mapping of mmsi -> resampled trajectory DataFrame.
"""
# Step 1: preprocess
df = self.preprocessor.run(df_raw)
if len(df) == 0:
logger.warning('pipeline: no rows after preprocessing')
return [], {}
# Step 2: behaviour detection
df = self.detector.detect(df)
# Steps 35: per-vessel processing
vessel_dfs: dict[str, pd.DataFrame] = {}
results: list[dict] = []
for mmsi, df_v in df.groupby('mmsi'):
df_resampled = self.resampler.resample(df_v)
vessel_dfs[mmsi] = df_resampled
features = self.extractor.extract(df_resampled)
vtype, confidence = self.classifier.classify(features)
fishing_pct = BehaviorDetector.compute_fishing_ratio(df_resampled)
season = get_season(df_v['timestamp'].iloc[len(df_v) // 2])
results.append({
'mmsi': mmsi,
'vessel_type': vtype,
'confidence': confidence,
'fishing_pct': fishing_pct,
'season': season,
'n_points': len(df_resampled),
'features': features,
})
# Step 6: BIRCH clustering
cluster_map = self.clusterer.fit_predict(vessel_dfs)
for r in results:
r['cluster_id'] = cluster_map.get(r['mmsi'], -1)
logger.info(
'pipeline complete: %d vessels, types=%s',
len(results),
{r['vessel_type'] for r in results},
)
return results, vessel_dfs

파일 보기

@ -0,0 +1,52 @@
import pandas as pd
from collections import defaultdict
from pipeline.constants import KR_BOUNDS, MAX_SOG_KNOTS, MIN_TRAJ_POINTS
class AISPreprocessor:
"""Delete-Supplement-Update (Yan et al. 2022)"""
def __init__(self):
self.stats = defaultdict(int)
def run(self, df: pd.DataFrame) -> pd.DataFrame:
original = len(df)
required = ['mmsi', 'timestamp', 'lat', 'lon', 'sog', 'cog']
missing = [c for c in required if c not in df.columns]
if missing:
raise ValueError(f"필수 컬럼 누락: {missing}")
df = df.copy()
df['timestamp'] = pd.to_datetime(df['timestamp'])
valid_mmsi = df['mmsi'].astype(str).str.match(r'^\d{9}$')
df = df[valid_mmsi]
self.stats['invalid_mmsi'] += original - len(df)
df = df[(df['lat'].between(-90, 90)) & (df['lon'].between(-180, 180))]
df = df[
df['lat'].between(KR_BOUNDS['lat_min'], KR_BOUNDS['lat_max']) &
df['lon'].between(KR_BOUNDS['lon_min'], KR_BOUNDS['lon_max'])
]
df = df.sort_values(['mmsi', 'timestamp'])
df['sog'] = df.groupby('mmsi')['sog'].transform(
lambda x: x.where(
x.between(0, MAX_SOG_KNOTS),
x.rolling(3, center=True, min_periods=1).mean(),
)
)
df = df[(df['sog'] >= 0) & (df['sog'] <= MAX_SOG_KNOTS)]
counts = df.groupby('mmsi').size()
valid_mmsi_list = counts[counts >= MIN_TRAJ_POINTS].index
df = df[df['mmsi'].isin(valid_mmsi_list)]
df = df.drop_duplicates(subset=['mmsi', 'timestamp'])
self.stats['final_records'] = len(df)
self.stats['retention_pct'] = round(len(df) / max(original, 1) * 100, 2)
return df.reset_index(drop=True)

파일 보기

@ -0,0 +1,35 @@
import pandas as pd
from pipeline.constants import RESAMPLE_INTERVAL_MIN
from pipeline.behavior import BehaviorDetector
class TrajectoryResampler:
"""
불균등 AIS 수신 간격을 균등 시간 간격으로 보간
목적: BIRCH 군집화의 입력 벡터 정규화
방법: 선형 보간 (위도·경도·SOG·COG)
기준: 4 간격 (Shepperson et al. 2017)
"""
def __init__(self, interval_min: int = RESAMPLE_INTERVAL_MIN):
self.interval = pd.Timedelta(minutes=interval_min)
def resample(self, df_vessel: pd.DataFrame) -> pd.DataFrame:
df_vessel = df_vessel.sort_values('timestamp').copy()
if len(df_vessel) < 2:
return df_vessel
t_start = df_vessel['timestamp'].iloc[0]
t_end = df_vessel['timestamp'].iloc[-1]
new_times = pd.date_range(t_start, t_end, freq=self.interval)
df_vessel = df_vessel.set_index('timestamp')
df_vessel = df_vessel.reindex(df_vessel.index.union(new_times))
for col in ['lat', 'lon', 'sog', 'cog']:
if col in df_vessel.columns:
df_vessel[col] = df_vessel[col].interpolate(method='time')
df_vessel = df_vessel.loc[new_times].reset_index()
df_vessel.rename(columns={'index': 'timestamp'}, inplace=True)
df_vessel['state'] = df_vessel['sog'].apply(BehaviorDetector.classify_point)
return df_vessel

파일 보기

@ -1,2 +1,8 @@
fastapi==0.115.0
uvicorn==0.30.6
pydantic-settings>=2.0
psycopg2-binary>=2.9
numpy>=1.26
pandas>=2.2
scikit-learn>=1.5
apscheduler>=3.10

177
prediction/scheduler.py Normal file
파일 보기

@ -0,0 +1,177 @@
import logging
import time
from datetime import datetime, timezone
from typing import Optional
from apscheduler.schedulers.background import BackgroundScheduler
from config import settings
logger = logging.getLogger(__name__)
_scheduler: Optional[BackgroundScheduler] = None
_last_run: dict = {
'timestamp': None,
'duration_sec': 0,
'vessel_count': 0,
'upserted': 0,
'error': None,
}
def get_last_run() -> dict:
return _last_run.copy()
def run_analysis_cycle():
"""5분 주기 분석 사이클 — 인메모리 캐시 기반."""
from cache.vessel_store import vessel_store
from db import snpdb, kcgdb
from pipeline.orchestrator import ChineseFishingVesselPipeline
from algorithms.location import classify_zone
from algorithms.fishing_pattern import compute_ucaf_score, compute_ucft_score
from algorithms.dark_vessel import is_dark_vessel
from algorithms.spoofing import compute_spoofing_score, count_speed_jumps, compute_bd09_offset
from algorithms.fleet import assign_fleet_roles
from algorithms.risk import compute_vessel_risk_score
from models.result import AnalysisResult
start = time.time()
_last_run['timestamp'] = datetime.now(timezone.utc).isoformat()
_last_run['error'] = None
try:
# 1. 증분 로드 + stale 제거
if vessel_store.last_bucket is None:
logger.warning('last_bucket is None, skipping incremental fetch (initial load not complete)')
df_new = None
else:
df_new = snpdb.fetch_incremental(vessel_store.last_bucket)
if df_new is not None and len(df_new) > 0:
vessel_store.merge_incremental(df_new)
vessel_store.evict_stale(settings.CACHE_WINDOW_HOURS)
# 정적정보 / 허가어선 주기적 갱신
vessel_store.refresh_static_info()
vessel_store.refresh_permit_registry()
# 2. 분석 대상 선별 (SOG/COG 계산 포함)
df_targets = vessel_store.select_analysis_targets()
if len(df_targets) == 0:
logger.info('no analysis targets, skipping cycle')
_last_run['vessel_count'] = 0
return
# 3. 7단계 파이프라인 실행
pipeline = ChineseFishingVesselPipeline()
classifications, vessel_dfs = pipeline.run(df_targets)
if not classifications:
logger.info('no vessels classified, skipping')
_last_run['vessel_count'] = 0
return
# 4. 선단 역할 분석
cluster_map = {c['mmsi']: c['cluster_id'] for c in classifications}
fleet_roles = assign_fleet_roles(vessel_dfs, cluster_map)
# 5. 선박별 추가 알고리즘 → AnalysisResult 생성
results = []
for c in classifications:
mmsi = c['mmsi']
df_v = vessel_dfs.get(mmsi)
if df_v is None or len(df_v) == 0:
continue
last_row = df_v.iloc[-1]
ts = last_row.get('timestamp')
zone_info = classify_zone(last_row['lat'], last_row['lon'])
gear_map = {'TRAWL': 'OT', 'PURSE': 'PS', 'LONGLINE': 'GN', 'TRAP': 'TRAP'}
gear = gear_map.get(c['vessel_type'], 'OT')
ucaf = compute_ucaf_score(df_v, gear)
ucft = compute_ucft_score(df_v)
dark, gap_min = is_dark_vessel(df_v)
spoof_score = compute_spoofing_score(df_v)
speed_jumps = count_speed_jumps(df_v)
bd09_offset = compute_bd09_offset(last_row['lat'], last_row['lon'])
fleet_info = fleet_roles.get(mmsi, {})
is_permitted = vessel_store.is_permitted(mmsi)
risk_score, risk_level = compute_vessel_risk_score(
mmsi, df_v, zone_info, is_permitted=is_permitted,
)
activity = 'UNKNOWN'
if 'state' in df_v.columns and len(df_v) > 0:
activity = df_v['state'].mode().iloc[0]
results.append(AnalysisResult(
mmsi=mmsi,
timestamp=ts,
vessel_type=c['vessel_type'],
confidence=c['confidence'],
fishing_pct=c['fishing_pct'],
cluster_id=c['cluster_id'],
season=c['season'],
zone=zone_info.get('zone', 'EEZ_OR_BEYOND'),
dist_to_baseline_nm=zone_info.get('dist_from_baseline_nm', 999.0),
activity_state=activity,
ucaf_score=ucaf,
ucft_score=ucft,
is_dark=dark,
gap_duration_min=gap_min,
spoofing_score=spoof_score,
bd09_offset_m=bd09_offset,
speed_jump_count=speed_jumps,
cluster_size=fleet_info.get('cluster_size', 0),
is_leader=fleet_info.get('is_leader', False),
fleet_role=fleet_info.get('fleet_role', 'NOISE'),
risk_score=risk_score,
risk_level=risk_level,
features=c.get('features', {}),
))
# 6. 결과 저장
upserted = kcgdb.upsert_results(results)
kcgdb.cleanup_old(hours=48)
elapsed = round(time.time() - start, 2)
_last_run['duration_sec'] = elapsed
_last_run['vessel_count'] = len(results)
_last_run['upserted'] = upserted
logger.info(
'analysis cycle: %d vessels, %d upserted, %.2fs',
len(results), upserted, elapsed,
)
except Exception as e:
_last_run['error'] = str(e)
logger.exception('analysis cycle failed: %s', e)
def start_scheduler():
global _scheduler
_scheduler = BackgroundScheduler()
_scheduler.add_job(
run_analysis_cycle,
'interval',
minutes=settings.SCHEDULER_INTERVAL_MIN,
id='vessel_analysis',
max_instances=1,
replace_existing=True,
)
_scheduler.start()
logger.info('scheduler started (interval=%dm)', settings.SCHEDULER_INTERVAL_MIN)
def stop_scheduler():
global _scheduler
if _scheduler:
_scheduler.shutdown(wait=False)
_scheduler = None
logger.info('scheduler stopped')