Merge branch 'feature/signal-kind-code' into develop
This commit is contained in:
커밋
99b59f0ed5
149
sql/chnprmship-cache-diag.sql
Normal file
149
sql/chnprmship-cache-diag.sql
Normal file
@ -0,0 +1,149 @@
|
|||||||
|
-- ============================================================
|
||||||
|
-- ChnPrmShip 캐시 검증 진단 쿼리
|
||||||
|
-- 대상: t_std_snp_data.ais_target (일별 파티션)
|
||||||
|
-- 목적: 최근 2일 내 대상 MMSI별 최종위치 캐싱 검증
|
||||||
|
-- ============================================================
|
||||||
|
|
||||||
|
-- ============================================================
|
||||||
|
-- 0. 대상 MMSI 임시 테이블 생성
|
||||||
|
-- ============================================================
|
||||||
|
CREATE TEMP TABLE tmp_chn_mmsi (mmsi BIGINT PRIMARY KEY);
|
||||||
|
|
||||||
|
-- psql에서 실행:
|
||||||
|
-- \copy tmp_chn_mmsi(mmsi) FROM 'chnprmship-mmsi.txt'
|
||||||
|
|
||||||
|
|
||||||
|
-- ============================================================
|
||||||
|
-- 1. 기본 현황: 대상 MMSI 중 최근 2일 내 데이터 존재 여부
|
||||||
|
-- ============================================================
|
||||||
|
SELECT
|
||||||
|
(SELECT COUNT(*) FROM tmp_chn_mmsi) AS total_target_mmsi,
|
||||||
|
COUNT(DISTINCT a.mmsi) AS mmsi_with_data_2d,
|
||||||
|
(SELECT COUNT(*) FROM tmp_chn_mmsi) - COUNT(DISTINCT a.mmsi) AS mmsi_without_data_2d,
|
||||||
|
ROUND(COUNT(DISTINCT a.mmsi) * 100.0
|
||||||
|
/ NULLIF((SELECT COUNT(*) FROM tmp_chn_mmsi), 0), 1) AS hit_rate_pct
|
||||||
|
FROM t_std_snp_data.ais_target a
|
||||||
|
JOIN tmp_chn_mmsi t ON a.mmsi = t.mmsi
|
||||||
|
WHERE a.message_timestamp >= NOW() - INTERVAL '2 days';
|
||||||
|
|
||||||
|
|
||||||
|
-- ============================================================
|
||||||
|
-- 2. 워밍업 시뮬레이션: 최근 2일 내 MMSI별 최종위치
|
||||||
|
-- (수정 후 findLatestByMmsiIn 쿼리와 동일하게 동작)
|
||||||
|
-- ============================================================
|
||||||
|
SELECT COUNT(*) AS cached_count,
|
||||||
|
MIN(message_timestamp) AS oldest_cached,
|
||||||
|
MAX(message_timestamp) AS newest_cached,
|
||||||
|
NOW() - MAX(message_timestamp) AS newest_age
|
||||||
|
FROM (
|
||||||
|
SELECT DISTINCT ON (a.mmsi) a.mmsi, a.message_timestamp
|
||||||
|
FROM t_std_snp_data.ais_target a
|
||||||
|
JOIN tmp_chn_mmsi t ON a.mmsi = t.mmsi
|
||||||
|
WHERE a.message_timestamp >= NOW() - INTERVAL '2 days'
|
||||||
|
ORDER BY a.mmsi, a.message_timestamp DESC
|
||||||
|
) latest;
|
||||||
|
|
||||||
|
|
||||||
|
-- ============================================================
|
||||||
|
-- 3. MMSI별 최종위치 상세 (최근 2일 내, 최신순 상위 30건)
|
||||||
|
-- ============================================================
|
||||||
|
SELECT DISTINCT ON (a.mmsi)
|
||||||
|
a.mmsi,
|
||||||
|
a.message_timestamp,
|
||||||
|
a.name,
|
||||||
|
a.vessel_type,
|
||||||
|
a.lat,
|
||||||
|
a.lon,
|
||||||
|
a.sog,
|
||||||
|
a.cog,
|
||||||
|
a.heading,
|
||||||
|
NOW() - a.message_timestamp AS data_age
|
||||||
|
FROM t_std_snp_data.ais_target a
|
||||||
|
JOIN tmp_chn_mmsi t ON a.mmsi = t.mmsi
|
||||||
|
WHERE a.message_timestamp >= NOW() - INTERVAL '2 days'
|
||||||
|
ORDER BY a.mmsi, a.message_timestamp DESC
|
||||||
|
LIMIT 30;
|
||||||
|
|
||||||
|
|
||||||
|
-- ============================================================
|
||||||
|
-- 4. 데이터 없는 대상 MMSI (최근 2일 내 DB에 없는 선박)
|
||||||
|
-- ============================================================
|
||||||
|
SELECT t.mmsi AS missing_mmsi
|
||||||
|
FROM tmp_chn_mmsi t
|
||||||
|
LEFT JOIN (
|
||||||
|
SELECT DISTINCT mmsi
|
||||||
|
FROM t_std_snp_data.ais_target
|
||||||
|
WHERE mmsi IN (SELECT mmsi FROM tmp_chn_mmsi)
|
||||||
|
AND message_timestamp >= NOW() - INTERVAL '2 days'
|
||||||
|
) a ON t.mmsi = a.mmsi
|
||||||
|
WHERE a.mmsi IS NULL
|
||||||
|
ORDER BY t.mmsi;
|
||||||
|
|
||||||
|
|
||||||
|
-- ============================================================
|
||||||
|
-- 5. 시간대별 분포 (2일 기준 세부 확인)
|
||||||
|
-- ============================================================
|
||||||
|
SELECT
|
||||||
|
'6시간 이내' AS time_range,
|
||||||
|
COUNT(DISTINCT mmsi) AS distinct_mmsi
|
||||||
|
FROM t_std_snp_data.ais_target a
|
||||||
|
JOIN tmp_chn_mmsi t ON a.mmsi = t.mmsi
|
||||||
|
WHERE a.message_timestamp >= NOW() - INTERVAL '6 hours'
|
||||||
|
|
||||||
|
UNION ALL
|
||||||
|
SELECT '12시간 이내', COUNT(DISTINCT mmsi)
|
||||||
|
FROM t_std_snp_data.ais_target a
|
||||||
|
JOIN tmp_chn_mmsi t ON a.mmsi = t.mmsi
|
||||||
|
WHERE a.message_timestamp >= NOW() - INTERVAL '12 hours'
|
||||||
|
|
||||||
|
UNION ALL
|
||||||
|
SELECT '1일 이내', COUNT(DISTINCT mmsi)
|
||||||
|
FROM t_std_snp_data.ais_target a
|
||||||
|
JOIN tmp_chn_mmsi t ON a.mmsi = t.mmsi
|
||||||
|
WHERE a.message_timestamp >= NOW() - INTERVAL '1 day'
|
||||||
|
|
||||||
|
UNION ALL
|
||||||
|
SELECT '2일 이내', COUNT(DISTINCT mmsi)
|
||||||
|
FROM t_std_snp_data.ais_target a
|
||||||
|
JOIN tmp_chn_mmsi t ON a.mmsi = t.mmsi
|
||||||
|
WHERE a.message_timestamp >= NOW() - INTERVAL '2 days'
|
||||||
|
|
||||||
|
UNION ALL
|
||||||
|
SELECT '전체(무제한)', COUNT(DISTINCT mmsi)
|
||||||
|
FROM t_std_snp_data.ais_target a
|
||||||
|
JOIN tmp_chn_mmsi t ON a.mmsi = t.mmsi;
|
||||||
|
|
||||||
|
|
||||||
|
-- ============================================================
|
||||||
|
-- 6. 파티션별 대상 데이터 분포
|
||||||
|
-- ============================================================
|
||||||
|
SELECT
|
||||||
|
tableoid::regclass AS partition_name,
|
||||||
|
COUNT(*) AS row_count,
|
||||||
|
COUNT(DISTINCT mmsi) AS distinct_mmsi,
|
||||||
|
MIN(message_timestamp) AS min_ts,
|
||||||
|
MAX(message_timestamp) AS max_ts
|
||||||
|
FROM t_std_snp_data.ais_target a
|
||||||
|
JOIN tmp_chn_mmsi t ON a.mmsi = t.mmsi
|
||||||
|
GROUP BY tableoid::regclass
|
||||||
|
ORDER BY max_ts DESC;
|
||||||
|
|
||||||
|
|
||||||
|
-- ============================================================
|
||||||
|
-- 7. 전체 ais_target 파티션 현황
|
||||||
|
-- ============================================================
|
||||||
|
SELECT
|
||||||
|
c.relname AS partition_name,
|
||||||
|
pg_size_pretty(pg_relation_size(c.oid)) AS table_size,
|
||||||
|
s.n_live_tup AS estimated_rows
|
||||||
|
FROM pg_inherits i
|
||||||
|
JOIN pg_class c ON c.oid = i.inhrelid
|
||||||
|
JOIN pg_stat_user_tables s ON s.relid = c.oid
|
||||||
|
WHERE i.inhparent = 't_std_snp_data.ais_target'::regclass
|
||||||
|
ORDER BY c.relname DESC;
|
||||||
|
|
||||||
|
|
||||||
|
-- ============================================================
|
||||||
|
-- 정리
|
||||||
|
-- ============================================================
|
||||||
|
DROP TABLE IF EXISTS tmp_chn_mmsi;
|
||||||
@ -84,6 +84,14 @@ public class AisTargetEntity extends BaseEntity {
|
|||||||
private OffsetDateTime receivedDate;
|
private OffsetDateTime receivedDate;
|
||||||
private OffsetDateTime collectedAt; // 배치 수집 시점
|
private OffsetDateTime collectedAt; // 배치 수집 시점
|
||||||
|
|
||||||
|
// ========== 선종 분류 정보 ==========
|
||||||
|
/**
|
||||||
|
* MDA 범례코드 (signalKindCode)
|
||||||
|
* - vesselType + extraInfo 기반으로 치환
|
||||||
|
* - 예: "000020"(어선), "000023"(카고), "000027"(일반/기타)
|
||||||
|
*/
|
||||||
|
private String signalKindCode;
|
||||||
|
|
||||||
// ========== ClassType 분류 정보 ==========
|
// ========== ClassType 분류 정보 ==========
|
||||||
/**
|
/**
|
||||||
* 선박 클래스 타입
|
* 선박 클래스 타입
|
||||||
|
|||||||
@ -26,6 +26,14 @@ public interface AisTargetRepository {
|
|||||||
*/
|
*/
|
||||||
List<AisTargetEntity> findLatestByMmsiIn(List<Long> mmsiList);
|
List<AisTargetEntity> findLatestByMmsiIn(List<Long> mmsiList);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 여러 MMSI의 최신 위치 조회 (시간 범위 필터)
|
||||||
|
*
|
||||||
|
* @param mmsiList 대상 MMSI 목록
|
||||||
|
* @param since 이 시점 이후 데이터만 조회
|
||||||
|
*/
|
||||||
|
List<AisTargetEntity> findLatestByMmsiInSince(List<Long> mmsiList, OffsetDateTime since);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 시간 범위 내 특정 MMSI의 항적 조회
|
* 시간 범위 내 특정 MMSI의 항적 조회
|
||||||
*/
|
*/
|
||||||
|
|||||||
@ -130,7 +130,7 @@ public class AisTargetRepositoryImpl implements AisTargetRepository {
|
|||||||
private final RowMapper<AisTargetEntity> rowMapper = (rs, rowNum) -> AisTargetEntity.builder()
|
private final RowMapper<AisTargetEntity> rowMapper = (rs, rowNum) -> AisTargetEntity.builder()
|
||||||
.mmsi(rs.getLong("mmsi"))
|
.mmsi(rs.getLong("mmsi"))
|
||||||
.messageTimestamp(toOffsetDateTime(rs.getTimestamp("message_timestamp")))
|
.messageTimestamp(toOffsetDateTime(rs.getTimestamp("message_timestamp")))
|
||||||
.imo(rs.getObject("imo", Long.class))
|
.imo(toLong(rs, "imo"))
|
||||||
.name(rs.getString("name"))
|
.name(rs.getString("name"))
|
||||||
.callsign(rs.getString("callsign"))
|
.callsign(rs.getString("callsign"))
|
||||||
.vesselType(rs.getString("vessel_type"))
|
.vesselType(rs.getString("vessel_type"))
|
||||||
@ -140,45 +140,45 @@ public class AisTargetRepositoryImpl implements AisTargetRepository {
|
|||||||
.heading(rs.getObject("heading", Double.class))
|
.heading(rs.getObject("heading", Double.class))
|
||||||
.sog(rs.getObject("sog", Double.class))
|
.sog(rs.getObject("sog", Double.class))
|
||||||
.cog(rs.getObject("cog", Double.class))
|
.cog(rs.getObject("cog", Double.class))
|
||||||
.rot(rs.getObject("rot", Integer.class))
|
.rot(toInt(rs, "rot"))
|
||||||
.length(rs.getObject("length", Integer.class))
|
.length(toInt(rs, "length"))
|
||||||
.width(rs.getObject("width", Integer.class))
|
.width(toInt(rs, "width"))
|
||||||
.draught(rs.getObject("draught", Double.class))
|
.draught(rs.getObject("draught", Double.class))
|
||||||
.lengthBow(rs.getObject("length_bow", Integer.class))
|
.lengthBow(toInt(rs, "length_bow"))
|
||||||
.lengthStern(rs.getObject("length_stern", Integer.class))
|
.lengthStern(toInt(rs, "length_stern"))
|
||||||
.widthPort(rs.getObject("width_port", Integer.class))
|
.widthPort(toInt(rs, "width_port"))
|
||||||
.widthStarboard(rs.getObject("width_starboard", Integer.class))
|
.widthStarboard(toInt(rs, "width_starboard"))
|
||||||
.destination(rs.getString("destination"))
|
.destination(rs.getString("destination"))
|
||||||
.eta(toOffsetDateTime(rs.getTimestamp("eta")))
|
.eta(toOffsetDateTime(rs.getTimestamp("eta")))
|
||||||
.status(rs.getString("status"))
|
.status(rs.getString("status"))
|
||||||
.ageMinutes(rs.getObject("age_minutes", Double.class))
|
.ageMinutes(rs.getObject("age_minutes", Double.class))
|
||||||
.positionAccuracy(rs.getObject("position_accuracy", Integer.class))
|
.positionAccuracy(toInt(rs, "position_accuracy"))
|
||||||
.timestampUtc(rs.getObject("timestamp_utc", Integer.class))
|
.timestampUtc(toInt(rs, "timestamp_utc"))
|
||||||
.repeatIndicator(rs.getObject("repeat_indicator", Integer.class))
|
.repeatIndicator(toInt(rs, "repeat_indicator"))
|
||||||
.raimFlag(rs.getObject("raim_flag", Integer.class))
|
.raimFlag(toInt(rs, "raim_flag"))
|
||||||
.radioStatus(rs.getObject("radio_status", Integer.class))
|
.radioStatus(toInt(rs, "radio_status"))
|
||||||
.regional(rs.getObject("regional", Integer.class))
|
.regional(toInt(rs, "regional"))
|
||||||
.regional2(rs.getObject("regional2", Integer.class))
|
.regional2(toInt(rs, "regional2"))
|
||||||
.spare(rs.getObject("spare", Integer.class))
|
.spare(toInt(rs, "spare"))
|
||||||
.spare2(rs.getObject("spare2", Integer.class))
|
.spare2(toInt(rs, "spare2"))
|
||||||
.aisVersion(rs.getObject("ais_version", Integer.class))
|
.aisVersion(toInt(rs, "ais_version"))
|
||||||
.positionFixType(rs.getObject("position_fix_type", Integer.class))
|
.positionFixType(toInt(rs, "position_fix_type"))
|
||||||
.dte(rs.getObject("dte", Integer.class))
|
.dte(toInt(rs, "dte"))
|
||||||
.bandFlag(rs.getObject("band_flag", Integer.class))
|
.bandFlag(toInt(rs, "band_flag"))
|
||||||
.receivedDate(toOffsetDateTime(rs.getTimestamp("received_date")))
|
.receivedDate(toOffsetDateTime(rs.getTimestamp("received_date")))
|
||||||
.collectedAt(toOffsetDateTime(rs.getTimestamp("collected_at")))
|
.collectedAt(toOffsetDateTime(rs.getTimestamp("collected_at")))
|
||||||
.tonnesCargo(rs.getObject("tonnes_cargo", Integer.class))
|
.tonnesCargo(toInt(rs, "tonnes_cargo"))
|
||||||
.inSTS(rs.getObject("in_sts", Integer.class))
|
.inSTS(toInt(rs, "in_sts"))
|
||||||
.onBerth(rs.getObject("on_berth", Boolean.class))
|
.onBerth(rs.getObject("on_berth", Boolean.class))
|
||||||
.dwt(rs.getObject("dwt", Integer.class))
|
.dwt(toInt(rs, "dwt"))
|
||||||
.anomalous(rs.getString("anomalous"))
|
.anomalous(rs.getString("anomalous"))
|
||||||
.destinationPortID(rs.getObject("destination_port_id", Integer.class))
|
.destinationPortID(toInt(rs, "destination_port_id"))
|
||||||
.destinationTidied(rs.getString("destination_tidied"))
|
.destinationTidied(rs.getString("destination_tidied"))
|
||||||
.destinationUNLOCODE(rs.getString("destination_unlocode"))
|
.destinationUNLOCODE(rs.getString("destination_unlocode"))
|
||||||
.imoVerified(rs.getString("imo_verified"))
|
.imoVerified(rs.getString("imo_verified"))
|
||||||
.lastStaticUpdateReceived(toOffsetDateTime(rs.getTimestamp("last_static_update_received")))
|
.lastStaticUpdateReceived(toOffsetDateTime(rs.getTimestamp("last_static_update_received")))
|
||||||
.lpcCode(rs.getObject("lpc_code", Integer.class))
|
.lpcCode(toInt(rs, "lpc_code"))
|
||||||
.messageType(rs.getObject("message_type", Integer.class))
|
.messageType(toInt(rs, "message_type"))
|
||||||
.source(rs.getString("source"))
|
.source(rs.getString("source"))
|
||||||
.stationId(rs.getString("station_id"))
|
.stationId(rs.getString("station_id"))
|
||||||
.zoneId(rs.getObject("zone_id", Double.class))
|
.zoneId(rs.getObject("zone_id", Double.class))
|
||||||
@ -223,6 +223,24 @@ public class AisTargetRepositoryImpl implements AisTargetRepository {
|
|||||||
return jdbcTemplate.query(sql, rowMapper, (Object) mmsiArray);
|
return jdbcTemplate.query(sql, rowMapper, (Object) mmsiArray);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<AisTargetEntity> findLatestByMmsiInSince(List<Long> mmsiList, OffsetDateTime since) {
|
||||||
|
if (mmsiList == null || mmsiList.isEmpty()) {
|
||||||
|
return List.of();
|
||||||
|
}
|
||||||
|
|
||||||
|
String sql = """
|
||||||
|
SELECT DISTINCT ON (mmsi) *
|
||||||
|
FROM %s
|
||||||
|
WHERE mmsi = ANY(?)
|
||||||
|
AND message_timestamp >= ?
|
||||||
|
ORDER BY mmsi, message_timestamp DESC
|
||||||
|
""".formatted(tableName);
|
||||||
|
|
||||||
|
Long[] mmsiArray = mmsiList.toArray(new Long[0]);
|
||||||
|
return jdbcTemplate.query(sql, rowMapper, (Object) mmsiArray, toTimestamp(since));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<AisTargetEntity> findByMmsiAndTimeRange(Long mmsi, OffsetDateTime start, OffsetDateTime end) {
|
public List<AisTargetEntity> findByMmsiAndTimeRange(Long mmsi, OffsetDateTime start, OffsetDateTime end) {
|
||||||
String sql = """
|
String sql = """
|
||||||
@ -359,6 +377,23 @@ public class AisTargetRepositoryImpl implements AisTargetRepository {
|
|||||||
|
|
||||||
// ==================== Helper Methods ====================
|
// ==================== Helper Methods ====================
|
||||||
|
|
||||||
|
/**
|
||||||
|
* int8(bigint) → Integer 안전 변환
|
||||||
|
* PostgreSQL JDBC 드라이버는 int8 → Integer 자동 변환을 지원하지 않아
|
||||||
|
* getObject("col", Integer.class) 사용 시 오류 발생. Number로 읽어서 변환.
|
||||||
|
*/
|
||||||
|
private Integer toInt(ResultSet rs, String column) throws SQLException {
|
||||||
|
Object val = rs.getObject(column);
|
||||||
|
if (val == null) return null;
|
||||||
|
return ((Number) val).intValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Long toLong(ResultSet rs, String column) throws SQLException {
|
||||||
|
Object val = rs.getObject(column);
|
||||||
|
if (val == null) return null;
|
||||||
|
return ((Number) val).longValue();
|
||||||
|
}
|
||||||
|
|
||||||
private Timestamp toTimestamp(OffsetDateTime odt) {
|
private Timestamp toTimestamp(OffsetDateTime odt) {
|
||||||
return odt != null ? Timestamp.from(odt.toInstant()) : null;
|
return odt != null ? Timestamp.from(odt.toInstant()) : null;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -3,7 +3,9 @@ package com.snp.batch.jobs.aistarget.batch.writer;
|
|||||||
import com.snp.batch.common.batch.writer.BaseWriter;
|
import com.snp.batch.common.batch.writer.BaseWriter;
|
||||||
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
|
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
|
||||||
import com.snp.batch.jobs.aistarget.cache.AisTargetCacheManager;
|
import com.snp.batch.jobs.aistarget.cache.AisTargetCacheManager;
|
||||||
|
import com.snp.batch.jobs.aistarget.chnprmship.ChnPrmShipCacheManager;
|
||||||
import com.snp.batch.jobs.aistarget.classifier.AisClassTypeClassifier;
|
import com.snp.batch.jobs.aistarget.classifier.AisClassTypeClassifier;
|
||||||
|
import com.snp.batch.jobs.aistarget.classifier.SignalKindCode;
|
||||||
import com.snp.batch.jobs.aistarget.kafka.AisTargetKafkaProducer;
|
import com.snp.batch.jobs.aistarget.kafka.AisTargetKafkaProducer;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.lang.Nullable;
|
import org.springframework.lang.Nullable;
|
||||||
@ -16,8 +18,10 @@ import java.util.List;
|
|||||||
*
|
*
|
||||||
* 동작:
|
* 동작:
|
||||||
* 1. ClassType 분류 (Core20 캐시 기반 A/B 분류)
|
* 1. ClassType 분류 (Core20 캐시 기반 A/B 분류)
|
||||||
* 2. 캐시에 최신 위치 정보 업데이트 (classType, core20Mmsi 포함)
|
* 2. SignalKindCode 치환 (vesselType + extraInfo → MDA 범례코드)
|
||||||
* 3. Kafka 토픽으로 AIS Target 정보 전송 (서브청크 분할, 활성화된 경우에만)
|
* 3. 캐시에 최신 위치 정보 업데이트 (classType, core20Mmsi, signalKindCode 포함)
|
||||||
|
* 4. ChnPrmShip 전용 캐시 업데이트 (대상 MMSI만 필터)
|
||||||
|
* 5. Kafka 토픽으로 AIS Target 정보 전송 (활성화된 경우에만)
|
||||||
*
|
*
|
||||||
* 참고:
|
* 참고:
|
||||||
* - DB 저장은 별도 Job(aisTargetDbSyncJob)에서 15분 주기로 수행
|
* - DB 저장은 별도 Job(aisTargetDbSyncJob)에서 15분 주기로 수행
|
||||||
@ -32,15 +36,18 @@ public class AisTargetDataWriter extends BaseWriter<AisTargetEntity> {
|
|||||||
private final AisClassTypeClassifier classTypeClassifier;
|
private final AisClassTypeClassifier classTypeClassifier;
|
||||||
@Nullable
|
@Nullable
|
||||||
private final AisTargetKafkaProducer kafkaProducer;
|
private final AisTargetKafkaProducer kafkaProducer;
|
||||||
|
private final ChnPrmShipCacheManager chnPrmShipCacheManager;
|
||||||
|
|
||||||
public AisTargetDataWriter(
|
public AisTargetDataWriter(
|
||||||
AisTargetCacheManager cacheManager,
|
AisTargetCacheManager cacheManager,
|
||||||
AisClassTypeClassifier classTypeClassifier,
|
AisClassTypeClassifier classTypeClassifier,
|
||||||
@Nullable AisTargetKafkaProducer kafkaProducer) {
|
@Nullable AisTargetKafkaProducer kafkaProducer,
|
||||||
|
ChnPrmShipCacheManager chnPrmShipCacheManager) {
|
||||||
super("AisTarget");
|
super("AisTarget");
|
||||||
this.cacheManager = cacheManager;
|
this.cacheManager = cacheManager;
|
||||||
this.classTypeClassifier = classTypeClassifier;
|
this.classTypeClassifier = classTypeClassifier;
|
||||||
this.kafkaProducer = kafkaProducer;
|
this.kafkaProducer = kafkaProducer;
|
||||||
|
this.chnPrmShipCacheManager = chnPrmShipCacheManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -51,13 +58,22 @@ public class AisTargetDataWriter extends BaseWriter<AisTargetEntity> {
|
|||||||
// - Core20 캐시의 IMO와 매칭하여 classType(A/B), core20Mmsi 설정
|
// - Core20 캐시의 IMO와 매칭하여 classType(A/B), core20Mmsi 설정
|
||||||
classTypeClassifier.classifyAll(items);
|
classTypeClassifier.classifyAll(items);
|
||||||
|
|
||||||
// 2. 캐시 업데이트 (classType, core20Mmsi 포함)
|
// 2. SignalKindCode 치환 (vesselType + extraInfo → MDA 범례코드)
|
||||||
|
items.forEach(item -> {
|
||||||
|
SignalKindCode kindCode = SignalKindCode.resolve(item.getVesselType(), item.getExtraInfo());
|
||||||
|
item.setSignalKindCode(kindCode.getCode());
|
||||||
|
});
|
||||||
|
|
||||||
|
// 3. 캐시 업데이트 (classType, core20Mmsi, signalKindCode 포함)
|
||||||
cacheManager.putAll(items);
|
cacheManager.putAll(items);
|
||||||
|
|
||||||
log.debug("AIS Target 캐시 업데이트 완료: {} 건 (캐시 크기: {})",
|
log.debug("AIS Target 캐시 업데이트 완료: {} 건 (캐시 크기: {})",
|
||||||
items.size(), cacheManager.size());
|
items.size(), cacheManager.size());
|
||||||
|
|
||||||
// 3. Kafka 전송 (kafkaProducer 빈이 존재하는 경우에만)
|
// 4. ChnPrmShip 전용 캐시 업데이트 (대상 MMSI만 필터)
|
||||||
|
chnPrmShipCacheManager.putIfTarget(items);
|
||||||
|
|
||||||
|
// 5. Kafka 전송 (kafkaProducer 빈이 존재하는 경우에만)
|
||||||
if (kafkaProducer == null) {
|
if (kafkaProducer == null) {
|
||||||
log.debug("AIS Kafka Producer 미등록 - topic 전송 스킵");
|
log.debug("AIS Kafka Producer 미등록 - topic 전송 스킵");
|
||||||
return;
|
return;
|
||||||
|
|||||||
@ -0,0 +1,131 @@
|
|||||||
|
package com.snp.batch.jobs.aistarget.chnprmship;
|
||||||
|
|
||||||
|
import com.github.benmanes.caffeine.cache.Cache;
|
||||||
|
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||||
|
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
|
||||||
|
import jakarta.annotation.PostConstruct;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.time.OffsetDateTime;
|
||||||
|
import java.time.ZoneOffset;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 중국 허가선박 전용 캐시
|
||||||
|
*
|
||||||
|
* - 대상 MMSI(~1,400척)만 별도 관리
|
||||||
|
* - TTL: expireAfterWrite (마지막 put 이후 N일 경과 시 만료)
|
||||||
|
* - 순수 캐시 조회 전용 (DB fallback 없음)
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class ChnPrmShipCacheManager {
|
||||||
|
|
||||||
|
private final ChnPrmShipProperties properties;
|
||||||
|
private Cache<Long, AisTargetEntity> cache;
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
public void init() {
|
||||||
|
this.cache = Caffeine.newBuilder()
|
||||||
|
.maximumSize(properties.getMaxSize())
|
||||||
|
.expireAfterWrite(properties.getTtlDays(), TimeUnit.DAYS)
|
||||||
|
.recordStats()
|
||||||
|
.build();
|
||||||
|
|
||||||
|
log.info("ChnPrmShip 캐시 초기화 - TTL: {}일, 최대 크기: {}",
|
||||||
|
properties.getTtlDays(), properties.getMaxSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 대상 MMSI에 해당하는 항목만 필터링하여 캐시에 저장
|
||||||
|
*
|
||||||
|
* @param items 전체 AIS Target 데이터 (배치 수집 결과)
|
||||||
|
* @return 저장된 건수
|
||||||
|
*/
|
||||||
|
public int putIfTarget(List<AisTargetEntity> items) {
|
||||||
|
if (items == null || items.isEmpty()) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int updated = 0;
|
||||||
|
for (AisTargetEntity item : items) {
|
||||||
|
if (!properties.isTarget(item.getMmsi())) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
AisTargetEntity existing = cache.getIfPresent(item.getMmsi());
|
||||||
|
if (existing == null || isNewer(item, existing)) {
|
||||||
|
cache.put(item.getMmsi(), item);
|
||||||
|
updated++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (updated > 0) {
|
||||||
|
log.debug("ChnPrmShip 캐시 업데이트 - 입력: {}, 대상 저장: {}, 현재 크기: {}",
|
||||||
|
items.size(), updated, cache.estimatedSize());
|
||||||
|
}
|
||||||
|
return updated;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 시간 범위 내 캐시 데이터 조회
|
||||||
|
*
|
||||||
|
* @param minutes 조회 범위 (분)
|
||||||
|
* @return 시간 범위 내 데이터 목록
|
||||||
|
*/
|
||||||
|
public List<AisTargetEntity> getByTimeRange(int minutes) {
|
||||||
|
OffsetDateTime threshold = OffsetDateTime.now(ZoneOffset.UTC).minusMinutes(minutes);
|
||||||
|
|
||||||
|
return cache.asMap().values().stream()
|
||||||
|
.filter(entity -> entity.getMessageTimestamp() != null)
|
||||||
|
.filter(entity -> entity.getMessageTimestamp().isAfter(threshold))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 워밍업용 직접 저장 (시간 비교 없이 저장)
|
||||||
|
*/
|
||||||
|
public void putAll(List<AisTargetEntity> entities) {
|
||||||
|
if (entities == null || entities.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (AisTargetEntity entity : entities) {
|
||||||
|
if (entity != null && entity.getMmsi() != null) {
|
||||||
|
cache.put(entity.getMmsi(), entity);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public long size() {
|
||||||
|
return cache.estimatedSize();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, Object> getStats() {
|
||||||
|
var stats = cache.stats();
|
||||||
|
return Map.of(
|
||||||
|
"estimatedSize", cache.estimatedSize(),
|
||||||
|
"maxSize", properties.getMaxSize(),
|
||||||
|
"ttlDays", properties.getTtlDays(),
|
||||||
|
"targetMmsiCount", properties.getMmsiSet().size(),
|
||||||
|
"hitCount", stats.hitCount(),
|
||||||
|
"missCount", stats.missCount(),
|
||||||
|
"hitRate", String.format("%.2f%%", stats.hitRate() * 100)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isNewer(AisTargetEntity candidate, AisTargetEntity existing) {
|
||||||
|
if (candidate.getMessageTimestamp() == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (existing.getMessageTimestamp() == null) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return candidate.getMessageTimestamp().isAfter(existing.getMessageTimestamp());
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,79 @@
|
|||||||
|
package com.snp.batch.jobs.aistarget.chnprmship;
|
||||||
|
|
||||||
|
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
|
||||||
|
import com.snp.batch.jobs.aistarget.batch.repository.AisTargetRepository;
|
||||||
|
import com.snp.batch.jobs.aistarget.classifier.SignalKindCode;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.boot.ApplicationArguments;
|
||||||
|
import org.springframework.boot.ApplicationRunner;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.time.OffsetDateTime;
|
||||||
|
import java.time.ZoneOffset;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 기동 시 ChnPrmShip 캐시 워밍업
|
||||||
|
*
|
||||||
|
* DB(ais_target)에서 대상 MMSI의 최근 데이터를 조회하여 캐시를 채운다.
|
||||||
|
* 이후 매 분 배치 수집에서 실시간 데이터가 캐시를 갱신한다.
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class ChnPrmShipCacheWarmer implements ApplicationRunner {
|
||||||
|
|
||||||
|
private static final int DB_QUERY_CHUNK_SIZE = 500;
|
||||||
|
|
||||||
|
private final ChnPrmShipProperties properties;
|
||||||
|
private final ChnPrmShipCacheManager cacheManager;
|
||||||
|
private final AisTargetRepository aisTargetRepository;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run(ApplicationArguments args) {
|
||||||
|
if (!properties.isWarmupEnabled()) {
|
||||||
|
log.info("ChnPrmShip 캐시 워밍업 비활성화");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (properties.getMmsiSet().isEmpty()) {
|
||||||
|
log.warn("ChnPrmShip 대상 MMSI가 없어 워밍업을 건너뜁니다");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
OffsetDateTime since = OffsetDateTime.now(ZoneOffset.UTC)
|
||||||
|
.minusDays(properties.getWarmupDays());
|
||||||
|
|
||||||
|
log.info("ChnPrmShip 캐시 워밍업 시작 - 대상: {}건, 조회 범위: 최근 {}일 (since: {})",
|
||||||
|
properties.getMmsiSet().size(), properties.getWarmupDays(), since);
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
|
|
||||||
|
List<Long> mmsiList = new ArrayList<>(properties.getMmsiSet());
|
||||||
|
int totalLoaded = 0;
|
||||||
|
|
||||||
|
for (int i = 0; i < mmsiList.size(); i += DB_QUERY_CHUNK_SIZE) {
|
||||||
|
List<Long> chunk = mmsiList.subList(i,
|
||||||
|
Math.min(i + DB_QUERY_CHUNK_SIZE, mmsiList.size()));
|
||||||
|
|
||||||
|
List<AisTargetEntity> fromDb = aisTargetRepository.findLatestByMmsiInSince(chunk, since);
|
||||||
|
|
||||||
|
// signalKindCode 치환 (DB 데이터는 치환이 안 되어 있을 수 있음)
|
||||||
|
fromDb.forEach(entity -> {
|
||||||
|
if (entity.getSignalKindCode() == null) {
|
||||||
|
SignalKindCode kindCode = SignalKindCode.resolve(
|
||||||
|
entity.getVesselType(), entity.getExtraInfo());
|
||||||
|
entity.setSignalKindCode(kindCode.getCode());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
cacheManager.putAll(fromDb);
|
||||||
|
totalLoaded += fromDb.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
long elapsed = System.currentTimeMillis() - startTime;
|
||||||
|
log.info("ChnPrmShip 캐시 워밍업 완료 - 대상: {}, 로딩: {}건, 소요: {}ms",
|
||||||
|
properties.getMmsiSet().size(), totalLoaded, elapsed);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,82 @@
|
|||||||
|
package com.snp.batch.jobs.aistarget.chnprmship;
|
||||||
|
|
||||||
|
import jakarta.annotation.PostConstruct;
|
||||||
|
import lombok.Getter;
|
||||||
|
import lombok.Setter;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||||
|
import org.springframework.core.io.DefaultResourceLoader;
|
||||||
|
import org.springframework.core.io.Resource;
|
||||||
|
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 중국 허가선박(ChnPrmShip) 설정
|
||||||
|
*
|
||||||
|
* 대상 MMSI 목록을 리소스 파일에서 로딩하여 Set으로 보관한다.
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Getter
|
||||||
|
@Setter
|
||||||
|
@ConfigurationProperties(prefix = "app.batch.chnprmship")
|
||||||
|
public class ChnPrmShipProperties {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MMSI 목록 리소스 경로
|
||||||
|
*/
|
||||||
|
private String mmsiResourcePath = "classpath:chnprmship-mmsi.txt";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 캐시 TTL (일)
|
||||||
|
* - 마지막 put() 이후 이 기간이 지나면 만료
|
||||||
|
*/
|
||||||
|
private int ttlDays = 2;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 최대 캐시 크기
|
||||||
|
*/
|
||||||
|
private int maxSize = 2000;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 기동 시 DB 워밍업 활성화 여부
|
||||||
|
*/
|
||||||
|
private boolean warmupEnabled = true;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* DB 워밍업 조회 범위 (일)
|
||||||
|
*/
|
||||||
|
private int warmupDays = 2;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 로딩된 대상 MMSI 집합
|
||||||
|
*/
|
||||||
|
private Set<Long> mmsiSet = Collections.emptySet();
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
public void init() {
|
||||||
|
try {
|
||||||
|
Resource resource = new DefaultResourceLoader().getResource(mmsiResourcePath);
|
||||||
|
try (BufferedReader reader = new BufferedReader(
|
||||||
|
new InputStreamReader(resource.getInputStream(), StandardCharsets.UTF_8))) {
|
||||||
|
mmsiSet = reader.lines()
|
||||||
|
.map(String::trim)
|
||||||
|
.filter(line -> !line.isEmpty() && !line.startsWith("#"))
|
||||||
|
.map(Long::parseLong)
|
||||||
|
.collect(Collectors.toUnmodifiableSet());
|
||||||
|
}
|
||||||
|
log.info("ChnPrmShip MMSI 로딩 완료 - {}건 (경로: {})", mmsiSet.size(), mmsiResourcePath);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("ChnPrmShip MMSI 로딩 실패 - 경로: {}, 오류: {}", mmsiResourcePath, e.getMessage());
|
||||||
|
mmsiSet = Collections.emptySet();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isTarget(Long mmsi) {
|
||||||
|
return mmsi != null && mmsiSet.contains(mmsi);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,118 @@
|
|||||||
|
package com.snp.batch.jobs.aistarget.classifier;
|
||||||
|
|
||||||
|
import lombok.Getter;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MDA 선종 범례코드
|
||||||
|
*
|
||||||
|
* GlobalAIS 원본 데이터의 vesselType + extraInfo를 기반으로
|
||||||
|
* MDA 범례코드(signalKindCode)로 치환한다.
|
||||||
|
*
|
||||||
|
* @see <a href="GLOBALAIS - MDA 선종 범례 치환표.pdf">치환 규칙표</a>
|
||||||
|
*/
|
||||||
|
@Getter
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public enum SignalKindCode {
|
||||||
|
|
||||||
|
FISHING("000020", "어선"),
|
||||||
|
KCGV("000021", "함정"),
|
||||||
|
FERRY("000022", "여객선"),
|
||||||
|
CARGO("000023", "카고"),
|
||||||
|
TANKER("000024", "탱커"),
|
||||||
|
GOV("000025", "관공선"),
|
||||||
|
DEFAULT("000027", "일반/기타선박"),
|
||||||
|
BUOY("000028", "부이/항로표지");
|
||||||
|
|
||||||
|
private final String code;
|
||||||
|
private final String koreanName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* GlobalAIS vesselType + extraInfo → MDA 범례코드 치환
|
||||||
|
*
|
||||||
|
* 치환 우선순위:
|
||||||
|
* 1. vesselType 단독 매칭 (Cargo, Tanker, Passenger, AtoN 등)
|
||||||
|
* 2. vesselType + extraInfo 조합 매칭 (Vessel + Fishing 등)
|
||||||
|
* 3. fallback → DEFAULT (000027)
|
||||||
|
*/
|
||||||
|
public static SignalKindCode resolve(String vesselType, String extraInfo) {
|
||||||
|
String vt = normalizeOrEmpty(vesselType);
|
||||||
|
String ei = normalizeOrEmpty(extraInfo);
|
||||||
|
|
||||||
|
// 1. vesselType 단독 매칭 (extraInfo 무관)
|
||||||
|
switch (vt) {
|
||||||
|
case "cargo":
|
||||||
|
return CARGO;
|
||||||
|
case "tanker":
|
||||||
|
return TANKER;
|
||||||
|
case "passenger":
|
||||||
|
return FERRY;
|
||||||
|
case "aton":
|
||||||
|
return BUOY;
|
||||||
|
case "law enforcement":
|
||||||
|
return GOV;
|
||||||
|
case "search and rescue":
|
||||||
|
return KCGV;
|
||||||
|
case "local vessel":
|
||||||
|
return FISHING;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// vesselType 그룹 매칭 (복합 선종명)
|
||||||
|
if (matchesAny(vt, "tug", "pilot boat", "tender", "anti pollution", "medical transport")) {
|
||||||
|
return GOV;
|
||||||
|
}
|
||||||
|
if (matchesAny(vt, "high speed craft", "wing in ground-effect")) {
|
||||||
|
return FERRY;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. "Vessel" + extraInfo 조합
|
||||||
|
if ("vessel".equals(vt)) {
|
||||||
|
return resolveVesselExtraInfo(ei);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. "N/A" + extraInfo 조합
|
||||||
|
if ("n/a".equals(vt)) {
|
||||||
|
if (ei.startsWith("hazardous cat")) {
|
||||||
|
return CARGO;
|
||||||
|
}
|
||||||
|
return DEFAULT;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4. fallback
|
||||||
|
return DEFAULT;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static SignalKindCode resolveVesselExtraInfo(String extraInfo) {
|
||||||
|
if ("fishing".equals(extraInfo)) {
|
||||||
|
return FISHING;
|
||||||
|
}
|
||||||
|
if ("military operations".equals(extraInfo)) {
|
||||||
|
return GOV;
|
||||||
|
}
|
||||||
|
if (matchesAny(extraInfo, "towing", "towing (large)", "dredging/underwater ops", "diving operations")) {
|
||||||
|
return GOV;
|
||||||
|
}
|
||||||
|
if (matchesAny(extraInfo, "pleasure craft", "sailing", "n/a")) {
|
||||||
|
return FISHING;
|
||||||
|
}
|
||||||
|
if (extraInfo.startsWith("hazardous cat")) {
|
||||||
|
return CARGO;
|
||||||
|
}
|
||||||
|
return DEFAULT;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean matchesAny(String value, String... candidates) {
|
||||||
|
for (String candidate : candidates) {
|
||||||
|
if (candidate.equals(value)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String normalizeOrEmpty(String value) {
|
||||||
|
return (value == null || value.isBlank()) ? "" : value.strip().toLowerCase();
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -27,11 +27,8 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Component
|
@Component
|
||||||
@ConditionalOnProperty(
|
|
||||||
name = "app.batch.ais-target.kafka.enabled",
|
|
||||||
havingValue = "true"
|
|
||||||
)
|
|
||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
|
@ConditionalOnProperty(name = "app.batch.ais-target.kafka.enabled", havingValue = "true")
|
||||||
public class AisTargetKafkaProducer {
|
public class AisTargetKafkaProducer {
|
||||||
|
|
||||||
private final KafkaTemplate<String, String> kafkaTemplate;
|
private final KafkaTemplate<String, String> kafkaTemplate;
|
||||||
|
|||||||
@ -38,6 +38,42 @@ public class AisTargetController {
|
|||||||
|
|
||||||
private final AisTargetService aisTargetService;
|
private final AisTargetService aisTargetService;
|
||||||
|
|
||||||
|
// ==================== 중국 허가선박 전용 ====================
|
||||||
|
|
||||||
|
@Operation(
|
||||||
|
summary = "중국 허가선박 위치 조회",
|
||||||
|
description = """
|
||||||
|
중국 허가 어선(~1,400척) 전용 캐시에서 위치 정보를 조회합니다.
|
||||||
|
|
||||||
|
- 순수 캐시 조회 (DB fallback 없음)
|
||||||
|
- 캐시에 없으면 빈 배열 반환
|
||||||
|
- 응답 구조는 /search와 동일
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
@GetMapping("/chnprmship")
|
||||||
|
public ResponseEntity<ApiResponse<List<AisTargetResponseDto>>> getChnPrmShip(
|
||||||
|
@Parameter(description = "조회 범위 (분, 기본: 2880 = 2일)", example = "2880")
|
||||||
|
@RequestParam(defaultValue = "2880") Integer minutes) {
|
||||||
|
|
||||||
|
log.info("ChnPrmShip 조회 요청 - minutes: {}", minutes);
|
||||||
|
|
||||||
|
List<AisTargetResponseDto> result = aisTargetService.findChnPrmShip(minutes);
|
||||||
|
return ResponseEntity.ok(ApiResponse.success(
|
||||||
|
"ChnPrmShip 조회 완료: " + result.size() + " 건",
|
||||||
|
result
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Operation(
|
||||||
|
summary = "중국 허가선박 캐시 통계",
|
||||||
|
description = "중국 허가선박 전용 캐시의 현재 상태를 조회합니다"
|
||||||
|
)
|
||||||
|
@GetMapping("/chnprmship/stats")
|
||||||
|
public ResponseEntity<ApiResponse<Map<String, Object>>> getChnPrmShipStats() {
|
||||||
|
Map<String, Object> stats = aisTargetService.getChnPrmShipCacheStats();
|
||||||
|
return ResponseEntity.ok(ApiResponse.success(stats));
|
||||||
|
}
|
||||||
|
|
||||||
// ==================== 단건 조회 ====================
|
// ==================== 단건 조회 ====================
|
||||||
|
|
||||||
@Operation(
|
@Operation(
|
||||||
|
|||||||
@ -88,6 +88,21 @@ public class AisTargetResponseDto {
|
|||||||
@Schema(description = "데이터 소스", example = "cache", allowableValues = {"cache", "db"})
|
@Schema(description = "데이터 소스", example = "cache", allowableValues = {"cache", "db"})
|
||||||
private String source;
|
private String source;
|
||||||
|
|
||||||
|
// 선종 분류 정보
|
||||||
|
@Schema(description = """
|
||||||
|
MDA 범례코드 (선종 분류)
|
||||||
|
- 000020: 어선 (FISHING)
|
||||||
|
- 000021: 함정 (KCGV)
|
||||||
|
- 000022: 여객선 (FERRY)
|
||||||
|
- 000023: 카고 (CARGO)
|
||||||
|
- 000024: 탱커 (TANKER)
|
||||||
|
- 000025: 관공선 (GOV)
|
||||||
|
- 000027: 일반/기타선박 (DEFAULT)
|
||||||
|
- 000028: 부이/항로표지 (BUOY)
|
||||||
|
""",
|
||||||
|
example = "000023")
|
||||||
|
private String signalKindCode;
|
||||||
|
|
||||||
// ClassType 분류 정보
|
// ClassType 분류 정보
|
||||||
@Schema(description = """
|
@Schema(description = """
|
||||||
선박 클래스 타입
|
선박 클래스 타입
|
||||||
@ -134,6 +149,7 @@ public class AisTargetResponseDto {
|
|||||||
.messageTimestamp(entity.getMessageTimestamp())
|
.messageTimestamp(entity.getMessageTimestamp())
|
||||||
.receivedDate(entity.getReceivedDate())
|
.receivedDate(entity.getReceivedDate())
|
||||||
.source(source)
|
.source(source)
|
||||||
|
.signalKindCode(entity.getSignalKindCode())
|
||||||
.classType(entity.getClassType())
|
.classType(entity.getClassType())
|
||||||
.core20Mmsi(entity.getCore20Mmsi())
|
.core20Mmsi(entity.getCore20Mmsi())
|
||||||
.build();
|
.build();
|
||||||
|
|||||||
@ -5,6 +5,7 @@ import com.snp.batch.jobs.aistarget.batch.repository.AisTargetRepository;
|
|||||||
import com.snp.batch.jobs.aistarget.cache.AisTargetCacheManager;
|
import com.snp.batch.jobs.aistarget.cache.AisTargetCacheManager;
|
||||||
import com.snp.batch.jobs.aistarget.cache.AisTargetFilterUtil;
|
import com.snp.batch.jobs.aistarget.cache.AisTargetFilterUtil;
|
||||||
import com.snp.batch.jobs.aistarget.cache.SpatialFilterUtil;
|
import com.snp.batch.jobs.aistarget.cache.SpatialFilterUtil;
|
||||||
|
import com.snp.batch.jobs.aistarget.chnprmship.ChnPrmShipCacheManager;
|
||||||
import com.snp.batch.jobs.aistarget.web.dto.AisTargetFilterRequest;
|
import com.snp.batch.jobs.aistarget.web.dto.AisTargetFilterRequest;
|
||||||
import com.snp.batch.jobs.aistarget.web.dto.AisTargetResponseDto;
|
import com.snp.batch.jobs.aistarget.web.dto.AisTargetResponseDto;
|
||||||
import com.snp.batch.jobs.aistarget.web.dto.AisTargetSearchRequest;
|
import com.snp.batch.jobs.aistarget.web.dto.AisTargetSearchRequest;
|
||||||
@ -38,6 +39,7 @@ public class AisTargetService {
|
|||||||
private final AisTargetCacheManager cacheManager;
|
private final AisTargetCacheManager cacheManager;
|
||||||
private final SpatialFilterUtil spatialFilterUtil;
|
private final SpatialFilterUtil spatialFilterUtil;
|
||||||
private final AisTargetFilterUtil filterUtil;
|
private final AisTargetFilterUtil filterUtil;
|
||||||
|
private final ChnPrmShipCacheManager chnPrmShipCacheManager;
|
||||||
|
|
||||||
private static final String SOURCE_CACHE = "cache";
|
private static final String SOURCE_CACHE = "cache";
|
||||||
private static final String SOURCE_DB = "db";
|
private static final String SOURCE_DB = "db";
|
||||||
@ -360,6 +362,36 @@ public class AisTargetService {
|
|||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ==================== 중국 허가선박 전용 조회 ====================
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 중국 허가선박 전용 캐시 조회 (DB fallback 없음)
|
||||||
|
*
|
||||||
|
* @param minutes 조회 범위 (분)
|
||||||
|
* @return 시간 범위 내 대상 선박 목록
|
||||||
|
*/
|
||||||
|
public List<AisTargetResponseDto> findChnPrmShip(int minutes) {
|
||||||
|
log.debug("ChnPrmShip 조회 - minutes: {}", minutes);
|
||||||
|
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
|
|
||||||
|
List<AisTargetEntity> entities = chnPrmShipCacheManager.getByTimeRange(minutes);
|
||||||
|
|
||||||
|
long elapsed = System.currentTimeMillis() - startTime;
|
||||||
|
log.info("ChnPrmShip 조회 완료 - 결과: {} 건, 소요: {}ms", entities.size(), elapsed);
|
||||||
|
|
||||||
|
return entities.stream()
|
||||||
|
.map(e -> AisTargetResponseDto.from(e, SOURCE_CACHE))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ChnPrmShip 캐시 통계 조회
|
||||||
|
*/
|
||||||
|
public Map<String, Object> getChnPrmShipCacheStats() {
|
||||||
|
return chnPrmShipCacheManager.getStats();
|
||||||
|
}
|
||||||
|
|
||||||
// ==================== 캐시 관리 ====================
|
// ==================== 캐시 관리 ====================
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@ -3,7 +3,6 @@ package com.snp.batch.jobs.aistargetdbsync.batch.tasklet;
|
|||||||
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
|
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
|
||||||
import com.snp.batch.jobs.aistarget.batch.repository.AisTargetRepository;
|
import com.snp.batch.jobs.aistarget.batch.repository.AisTargetRepository;
|
||||||
import com.snp.batch.jobs.aistarget.cache.AisTargetCacheManager;
|
import com.snp.batch.jobs.aistarget.cache.AisTargetCacheManager;
|
||||||
import lombok.RequiredArgsConstructor;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.batch.core.StepContribution;
|
import org.springframework.batch.core.StepContribution;
|
||||||
import org.springframework.batch.core.scope.context.ChunkContext;
|
import org.springframework.batch.core.scope.context.ChunkContext;
|
||||||
@ -12,53 +11,69 @@ import org.springframework.batch.repeat.RepeatStatus;
|
|||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.time.Instant;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* AIS Target DB Sync Tasklet
|
* AIS Target DB Sync Tasklet
|
||||||
*
|
*
|
||||||
* 스케줄: 매 15분 (0 0/15 * * * ?)
|
|
||||||
*
|
|
||||||
* 동작:
|
* 동작:
|
||||||
* - Caffeine 캐시에서 최근 N분 이내 데이터 조회
|
* - Caffeine 캐시에서 마지막 성공 이후 ~ 현재까지의 데이터를 조회
|
||||||
* - MMSI별 최신 위치 1건씩 DB에 UPSERT
|
* - MMSI별 최신 위치 1건씩 DB에 UPSERT
|
||||||
* - 캐시의 모든 컬럼 정보를 그대로 DB에 저장
|
* - 캐시의 모든 컬럼 정보를 그대로 DB에 저장
|
||||||
*
|
*
|
||||||
|
* 시간 범위 결정 전략:
|
||||||
|
* - 첫 실행 또는 마지막 실행 정보 없음 → fallback(time-range-minutes) 사용
|
||||||
|
* - 이후 실행 → 마지막 성공 시각 기준으로 경과 시간 자동 계산
|
||||||
|
* - cron 주기를 변경해도 별도 설정 불필요 (자동 동기화)
|
||||||
|
*
|
||||||
* 참고:
|
* 참고:
|
||||||
* - 캐시에는 MMSI별 최신 데이터만 유지됨 (120분 TTL)
|
* - 캐시에는 MMSI별 최신 데이터만 유지됨 (120분 TTL)
|
||||||
* - DB 저장은 15분 주기로 수행하여 볼륨 절감
|
|
||||||
* - 기존 aisTargetImportJob은 캐시 업데이트만 수행
|
* - 기존 aisTargetImportJob은 캐시 업데이트만 수행
|
||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Component
|
@Component
|
||||||
@RequiredArgsConstructor
|
|
||||||
public class AisTargetDbSyncTasklet implements Tasklet {
|
public class AisTargetDbSyncTasklet implements Tasklet {
|
||||||
|
|
||||||
private final AisTargetCacheManager cacheManager;
|
private final AisTargetCacheManager cacheManager;
|
||||||
private final AisTargetRepository aisTargetRepository;
|
private final AisTargetRepository aisTargetRepository;
|
||||||
|
private final int fallbackMinutes;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* DB 동기화 시 조회할 캐시 데이터 시간 범위 (분)
|
* 마지막 성공 시각 (JVM 내 유지, 재기동 시 fallback 사용)
|
||||||
* 기본값: 15분 (스케줄 주기와 동일)
|
|
||||||
*/
|
*/
|
||||||
@Value("${app.batch.ais-target-db-sync.time-range-minutes:15}")
|
private final AtomicReference<Instant> lastSuccessTime = new AtomicReference<>();
|
||||||
private int timeRangeMinutes;
|
|
||||||
|
public AisTargetDbSyncTasklet(
|
||||||
|
AisTargetCacheManager cacheManager,
|
||||||
|
AisTargetRepository aisTargetRepository,
|
||||||
|
@Value("${app.batch.ais-target-db-sync.time-range-minutes:15}") int fallbackMinutes) {
|
||||||
|
this.cacheManager = cacheManager;
|
||||||
|
this.aisTargetRepository = aisTargetRepository;
|
||||||
|
this.fallbackMinutes = fallbackMinutes;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
|
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
|
||||||
|
Instant now = Instant.now();
|
||||||
|
int rangeMinutes = resolveRangeMinutes(now);
|
||||||
|
|
||||||
log.info("========================================");
|
log.info("========================================");
|
||||||
log.info("AIS Target DB Sync 시작");
|
log.info("AIS Target DB Sync 시작");
|
||||||
log.info("조회 범위: 최근 {}분", timeRangeMinutes);
|
log.info("조회 범위: 최근 {}분 (방식: {})", rangeMinutes,
|
||||||
|
lastSuccessTime.get() != null ? "마지막 성공 기준" : "fallback");
|
||||||
log.info("현재 캐시 크기: {}", cacheManager.size());
|
log.info("현재 캐시 크기: {}", cacheManager.size());
|
||||||
log.info("========================================");
|
log.info("========================================");
|
||||||
|
|
||||||
long startTime = System.currentTimeMillis();
|
long startTime = System.currentTimeMillis();
|
||||||
|
|
||||||
// 1. 캐시에서 최근 N분 이내 데이터 조회
|
// 1. 캐시에서 시간 범위 내 데이터 조회
|
||||||
List<AisTargetEntity> entities = cacheManager.getByTimeRange(timeRangeMinutes);
|
List<AisTargetEntity> entities = cacheManager.getByTimeRange(rangeMinutes);
|
||||||
|
|
||||||
if (entities.isEmpty()) {
|
if (entities.isEmpty()) {
|
||||||
log.warn("캐시에서 조회된 데이터가 없습니다 (범위: {}분)", timeRangeMinutes);
|
log.warn("캐시에서 조회된 데이터가 없습니다 (범위: {}분)", rangeMinutes);
|
||||||
|
lastSuccessTime.set(now);
|
||||||
return RepeatStatus.FINISHED;
|
return RepeatStatus.FINISHED;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -69,6 +84,9 @@ public class AisTargetDbSyncTasklet implements Tasklet {
|
|||||||
|
|
||||||
long elapsed = System.currentTimeMillis() - startTime;
|
long elapsed = System.currentTimeMillis() - startTime;
|
||||||
|
|
||||||
|
// 성공 시각 기록
|
||||||
|
lastSuccessTime.set(now);
|
||||||
|
|
||||||
log.info("========================================");
|
log.info("========================================");
|
||||||
log.info("AIS Target DB Sync 완료");
|
log.info("AIS Target DB Sync 완료");
|
||||||
log.info("저장 건수: {} 건", entities.size());
|
log.info("저장 건수: {} 건", entities.size());
|
||||||
@ -80,4 +98,24 @@ public class AisTargetDbSyncTasklet implements Tasklet {
|
|||||||
|
|
||||||
return RepeatStatus.FINISHED;
|
return RepeatStatus.FINISHED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final int MAX_RANGE_MINUTES = 60;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 조회 범위(분) 결정
|
||||||
|
* - 마지막 성공 시각이 있으면: 경과 시간 + 1분 버퍼 (최대 60분)
|
||||||
|
* - 없으면: fallback 값 사용
|
||||||
|
* - 오래 중단 후 재가동 시에도 최대 60분으로 제한하여 과부하 방지
|
||||||
|
*/
|
||||||
|
private int resolveRangeMinutes(Instant now) {
|
||||||
|
Instant last = lastSuccessTime.get();
|
||||||
|
if (last == null) {
|
||||||
|
return Math.min(fallbackMinutes, MAX_RANGE_MINUTES);
|
||||||
|
}
|
||||||
|
|
||||||
|
long elapsedMinutes = java.time.Duration.between(last, now).toMinutes();
|
||||||
|
// 경과 시간 + 1분 버퍼 (겹침 허용, UPSERT이므로 중복 안전), 최대 60분
|
||||||
|
int range = (int) Math.max(elapsedMinutes + 1, 1);
|
||||||
|
return Math.min(range, MAX_RANGE_MINUTES);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -185,6 +185,14 @@ app:
|
|||||||
ttl-minutes: 120 # 캐시 TTL (분) - 2시간
|
ttl-minutes: 120 # 캐시 TTL (분) - 2시간
|
||||||
max-size: 300000 # 최대 캐시 크기 - 30만 건
|
max-size: 300000 # 최대 캐시 크기 - 30만 건
|
||||||
|
|
||||||
|
# 중국 허가선박 전용 캐시 설정
|
||||||
|
chnprmship:
|
||||||
|
mmsi-resource-path: classpath:chnprmship-mmsi.txt
|
||||||
|
ttl-days: 2
|
||||||
|
max-size: 2000
|
||||||
|
warmup-enabled: true
|
||||||
|
warmup-days: 2
|
||||||
|
|
||||||
# ClassType 분류 설정
|
# ClassType 분류 설정
|
||||||
class-type:
|
class-type:
|
||||||
refresh-hour: 4 # Core20 캐시 갱신 시간 (기본: 04시)
|
refresh-hour: 4 # Core20 캐시 갱신 시간 (기본: 04시)
|
||||||
|
|||||||
1402
src/main/resources/chnprmship-mmsi.txt
Normal file
1402
src/main/resources/chnprmship-mmsi.txt
Normal file
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
불러오는 중...
Reference in New Issue
Block a user