fix: time_bucket 수집 안전 윈도우 도입 — incremental fetch 데이터 누락 방지

snpdb 5분 버킷 데이터가 적재 완료까지 ~12분 소요되는데,
기존 fetch_incremental이 상한 없이 미완성 버킷을 수집하여
_last_bucket이 조기 전진 → 뒤늦게 완성된 행 영구 누락.

- time_bucket.py 신규: safe_bucket(12분 지연) + backfill(3 bucket)
- snpdb.py: fetch_all_tracks/fetch_incremental에 safe 상한 + 백필 하한
- vessel_store.py: merge_incremental sort+keep='last', evict_stale time_bucket 우선
- config.py: SNPDB_SAFE_DELAY_MIN=12, SNPDB_BACKFILL_BUCKETS=3

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
htlee 2026-04-03 15:11:20 +09:00
부모 67523b475d
커밋 2534c9dbca
5개의 변경된 파일92개의 추가작업 그리고 16개의 파일을 삭제

파일 보기

@ -7,6 +7,7 @@
### 수정
- 1h 활성 판정을 parent_name 전체 합산 기준으로 변경 (서브클러스터 분리 후 개별 소수 문제 해결)
- vessel_store의 _last_bucket 타임존 오류 수정 (tz-naive KST → UTC 잘못 변환 → incremental fetch 0건)
- time_bucket 수집 안전 윈도우 도입 — safe_bucket(12분 지연) + 3 bucket 백필로 데이터 누락 방지
## [2026-04-01.2]

파일 보기

@ -164,10 +164,11 @@ class VesselStore:
mmsi_str = str(mmsi)
if mmsi_str in self._tracks:
combined = pd.concat([self._tracks[mmsi_str], group], ignore_index=True)
combined = combined.drop_duplicates(subset=['timestamp'])
combined = combined.sort_values(['timestamp', 'time_bucket'])
combined = combined.drop_duplicates(subset=['timestamp'], keep='last')
self._tracks[mmsi_str] = combined.reset_index(drop=True)
else:
self._tracks[mmsi_str] = group.reset_index(drop=True)
self._tracks[mmsi_str] = group.sort_values(['timestamp', 'time_bucket']).reset_index(drop=True)
if 'time_bucket' in group.columns and not group['time_bucket'].empty:
bucket_vals = pd.to_datetime(group['time_bucket'].dropna())
@ -191,6 +192,10 @@ class VesselStore:
"""Remove track points older than N hours and evict empty MMSI entries."""
import datetime as _dt
from time_bucket import compute_initial_window_start, compute_safe_bucket
safe_bucket = compute_safe_bucket()
cutoff_bucket = compute_initial_window_start(hours, safe_bucket)
now = datetime.now(timezone.utc)
cutoff_aware = now - _dt.timedelta(hours=hours)
cutoff_naive = cutoff_aware.replace(tzinfo=None)
@ -200,8 +205,11 @@ class VesselStore:
for mmsi in list(self._tracks.keys()):
df = self._tracks[mmsi]
if 'time_bucket' in df.columns and not df['time_bucket'].dropna().empty:
bucket_col = pd.to_datetime(df['time_bucket'], errors='coerce')
mask = bucket_col >= pd.Timestamp(cutoff_bucket)
else:
ts_col = df['timestamp']
# Handle tz-aware and tz-naive timestamps uniformly
if hasattr(ts_col.dtype, 'tz') and ts_col.dtype.tz is not None:
mask = ts_col >= pd.Timestamp(cutoff_aware)
else:
@ -215,10 +223,11 @@ class VesselStore:
after_total = sum(len(v) for v in self._tracks.values())
logger.info(
'eviction complete: removed %d points, evicted %d mmsis (threshold=%dh)',
'eviction complete: removed %d points, evicted %d mmsis (threshold=%dh, cutoff_bucket=%s)',
before_total - after_total,
len(evicted_mmsis),
hours,
cutoff_bucket,
)
def refresh_static_info(self) -> None:

파일 보기

@ -25,6 +25,8 @@ class Settings(BaseSettings):
INITIAL_LOAD_HOURS: int = 24
STATIC_INFO_REFRESH_MIN: int = 60
PERMIT_REFRESH_MIN: int = 30
SNPDB_SAFE_DELAY_MIN: int = 12
SNPDB_BACKFILL_BUCKETS: int = 3
# 파이프라인
TRAJECTORY_HOURS: int = 6

파일 보기

@ -8,6 +8,7 @@ import psycopg2
from psycopg2 import pool
from config import settings
from time_bucket import compute_incremental_window_start, compute_initial_window_start, compute_safe_bucket
logger = logging.getLogger(__name__)
@ -61,8 +62,12 @@ def fetch_all_tracks(hours: int = 24) -> pd.DataFrame:
LineStringM 지오메트리에서 개별 포인트를 추출하며,
한국 해역(122-132E, 31-39N) 최근 N시간 데이터를 반환한다.
safe_bucket(현재 - 12)까지만 조회하여 미완성 버킷을 제외한다.
"""
query = f"""
safe_bucket = compute_safe_bucket()
window_start = compute_initial_window_start(hours, safe_bucket)
query = """
SELECT
t.mmsi,
to_timestamp(ST_M((dp).geom)) as timestamp,
@ -75,18 +80,21 @@ def fetch_all_tracks(hours: int = 24) -> pd.DataFrame:
END as raw_sog
FROM signal.t_vessel_tracks_5min t,
LATERAL ST_DumpPoints(t.track_geom) dp
WHERE t.time_bucket >= NOW() - INTERVAL '{hours} hours'
WHERE t.time_bucket >= %s
AND t.time_bucket <= %s
AND t.track_geom && ST_MakeEnvelope(122, 31, 132, 39, 4326)
ORDER BY t.mmsi, to_timestamp(ST_M((dp).geom))
"""
try:
with get_conn() as conn:
df = pd.read_sql_query(query, conn)
df = pd.read_sql_query(query, conn, params=(window_start, safe_bucket))
logger.info(
'fetch_all_tracks: %d rows, %d vessels (last %dh)',
'fetch_all_tracks: %d rows, %d vessels (window=%s..%s, last %dh safe)',
len(df),
df['mmsi'].nunique() if len(df) > 0 else 0,
window_start,
safe_bucket,
hours,
)
return df
@ -98,9 +106,20 @@ def fetch_all_tracks(hours: int = 24) -> pd.DataFrame:
def fetch_incremental(last_bucket: datetime) -> pd.DataFrame:
"""last_bucket 이후의 신규 궤적 포인트를 조회한다.
스케줄러 증분 업데이트에 사용되며, time_bucket > last_bucket 조건으로
이미 처리한 버킷을 건너뛴.
safe_bucket(현재 - 12)까지만 조회하여 미완성 버킷을 제외하고,
from_bucket(last_bucket - 15)부터 재수집하여 지연 INSERT를 보상한.
"""
safe_bucket = compute_safe_bucket()
from_bucket = compute_incremental_window_start(last_bucket)
if safe_bucket <= from_bucket:
logger.info(
'fetch_incremental skipped: safe_bucket=%s, from_bucket=%s, last_bucket=%s',
safe_bucket,
from_bucket,
last_bucket,
)
return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'raw_sog'])
query = """
SELECT
t.mmsi,
@ -115,17 +134,20 @@ def fetch_incremental(last_bucket: datetime) -> pd.DataFrame:
FROM signal.t_vessel_tracks_5min t,
LATERAL ST_DumpPoints(t.track_geom) dp
WHERE t.time_bucket > %s
AND t.time_bucket <= %s
AND t.track_geom && ST_MakeEnvelope(122, 31, 132, 39, 4326)
ORDER BY t.mmsi, to_timestamp(ST_M((dp).geom))
"""
try:
with get_conn() as conn:
df = pd.read_sql_query(query, conn, params=(last_bucket,))
df = pd.read_sql_query(query, conn, params=(from_bucket, safe_bucket))
logger.info(
'fetch_incremental: %d rows, %d vessels (since %s)',
'fetch_incremental: %d rows, %d vessels (from %s, safe %s, last %s)',
len(df),
df['mmsi'].nunique() if len(df) > 0 else 0,
from_bucket.isoformat(),
safe_bucket.isoformat(),
last_bucket.isoformat(),
)
return df

42
prediction/time_bucket.py Normal file
파일 보기

@ -0,0 +1,42 @@
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from zoneinfo import ZoneInfo
from config import settings
_KST = ZoneInfo('Asia/Seoul')
_BUCKET_MINUTES = 5
def normalize_bucket_kst(bucket: datetime) -> datetime:
if bucket.tzinfo is None:
return bucket
return bucket.astimezone(_KST).replace(tzinfo=None)
def floor_bucket_kst(value: datetime, bucket_minutes: int = _BUCKET_MINUTES) -> datetime:
if value.tzinfo is None:
localized = value.replace(tzinfo=_KST)
else:
localized = value.astimezone(_KST)
floored_minute = (localized.minute // bucket_minutes) * bucket_minutes
return localized.replace(minute=floored_minute, second=0, microsecond=0)
def compute_safe_bucket(now: datetime | None = None) -> datetime:
current = now or datetime.now(timezone.utc)
if current.tzinfo is None:
current = current.replace(tzinfo=timezone.utc)
safe_point = current.astimezone(_KST) - timedelta(minutes=settings.SNPDB_SAFE_DELAY_MIN)
return floor_bucket_kst(safe_point).replace(tzinfo=None)
def compute_initial_window_start(hours: int, safe_bucket: datetime | None = None) -> datetime:
anchor = normalize_bucket_kst(safe_bucket or compute_safe_bucket())
return anchor - timedelta(hours=hours)
def compute_incremental_window_start(last_bucket: datetime) -> datetime:
normalized = normalize_bucket_kst(last_bucket)
return normalized - timedelta(minutes=settings.SNPDB_BACKFILL_BUCKETS * _BUCKET_MINUTES)