diff --git a/docs/RELEASE-NOTES.md b/docs/RELEASE-NOTES.md index f834229..334f20d 100644 --- a/docs/RELEASE-NOTES.md +++ b/docs/RELEASE-NOTES.md @@ -7,6 +7,7 @@ ### 수정 - 1h 활성 판정을 parent_name 전체 합산 기준으로 변경 (서브클러스터 분리 후 개별 소수 문제 해결) - vessel_store의 _last_bucket 타임존 오류 수정 (tz-naive KST → UTC 잘못 변환 → incremental fetch 0건) +- time_bucket 수집 안전 윈도우 도입 — safe_bucket(12분 지연) + 3 bucket 백필로 데이터 누락 방지 ## [2026-04-01.2] diff --git a/prediction/cache/vessel_store.py b/prediction/cache/vessel_store.py index 3cab5e5..a4089aa 100644 --- a/prediction/cache/vessel_store.py +++ b/prediction/cache/vessel_store.py @@ -164,10 +164,11 @@ class VesselStore: mmsi_str = str(mmsi) if mmsi_str in self._tracks: combined = pd.concat([self._tracks[mmsi_str], group], ignore_index=True) - combined = combined.drop_duplicates(subset=['timestamp']) + combined = combined.sort_values(['timestamp', 'time_bucket']) + combined = combined.drop_duplicates(subset=['timestamp'], keep='last') self._tracks[mmsi_str] = combined.reset_index(drop=True) else: - self._tracks[mmsi_str] = group.reset_index(drop=True) + self._tracks[mmsi_str] = group.sort_values(['timestamp', 'time_bucket']).reset_index(drop=True) if 'time_bucket' in group.columns and not group['time_bucket'].empty: bucket_vals = pd.to_datetime(group['time_bucket'].dropna()) @@ -191,6 +192,10 @@ class VesselStore: """Remove track points older than N hours and evict empty MMSI entries.""" import datetime as _dt + from time_bucket import compute_initial_window_start, compute_safe_bucket + + safe_bucket = compute_safe_bucket() + cutoff_bucket = compute_initial_window_start(hours, safe_bucket) now = datetime.now(timezone.utc) cutoff_aware = now - _dt.timedelta(hours=hours) cutoff_naive = cutoff_aware.replace(tzinfo=None) @@ -200,12 +205,15 @@ class VesselStore: for mmsi in list(self._tracks.keys()): df = self._tracks[mmsi] - ts_col = df['timestamp'] - # Handle tz-aware and tz-naive timestamps uniformly - if hasattr(ts_col.dtype, 'tz') and ts_col.dtype.tz is not None: - mask = ts_col >= pd.Timestamp(cutoff_aware) + if 'time_bucket' in df.columns and not df['time_bucket'].dropna().empty: + bucket_col = pd.to_datetime(df['time_bucket'], errors='coerce') + mask = bucket_col >= pd.Timestamp(cutoff_bucket) else: - mask = ts_col >= pd.Timestamp(cutoff_naive) + ts_col = df['timestamp'] + if hasattr(ts_col.dtype, 'tz') and ts_col.dtype.tz is not None: + mask = ts_col >= pd.Timestamp(cutoff_aware) + else: + mask = ts_col >= pd.Timestamp(cutoff_naive) filtered = df[mask].reset_index(drop=True) if filtered.empty: del self._tracks[mmsi] @@ -215,10 +223,11 @@ class VesselStore: after_total = sum(len(v) for v in self._tracks.values()) logger.info( - 'eviction complete: removed %d points, evicted %d mmsis (threshold=%dh)', + 'eviction complete: removed %d points, evicted %d mmsis (threshold=%dh, cutoff_bucket=%s)', before_total - after_total, len(evicted_mmsis), hours, + cutoff_bucket, ) def refresh_static_info(self) -> None: diff --git a/prediction/config.py b/prediction/config.py index 29d24cf..7d823c3 100644 --- a/prediction/config.py +++ b/prediction/config.py @@ -25,6 +25,8 @@ class Settings(BaseSettings): INITIAL_LOAD_HOURS: int = 24 STATIC_INFO_REFRESH_MIN: int = 60 PERMIT_REFRESH_MIN: int = 30 + SNPDB_SAFE_DELAY_MIN: int = 12 + SNPDB_BACKFILL_BUCKETS: int = 3 # 파이프라인 TRAJECTORY_HOURS: int = 6 diff --git a/prediction/db/snpdb.py b/prediction/db/snpdb.py index fbd5081..2048c58 100644 --- a/prediction/db/snpdb.py +++ b/prediction/db/snpdb.py @@ -8,6 +8,7 @@ import psycopg2 from psycopg2 import pool from config import settings +from time_bucket import compute_incremental_window_start, compute_initial_window_start, compute_safe_bucket logger = logging.getLogger(__name__) @@ -61,8 +62,12 @@ def fetch_all_tracks(hours: int = 24) -> pd.DataFrame: LineStringM 지오메트리에서 개별 포인트를 추출하며, 한국 해역(122-132E, 31-39N) 내 최근 N시간 데이터를 반환한다. + safe_bucket(현재 - 12분)까지만 조회하여 미완성 버킷을 제외한다. """ - query = f""" + safe_bucket = compute_safe_bucket() + window_start = compute_initial_window_start(hours, safe_bucket) + + query = """ SELECT t.mmsi, to_timestamp(ST_M((dp).geom)) as timestamp, @@ -75,18 +80,21 @@ def fetch_all_tracks(hours: int = 24) -> pd.DataFrame: END as raw_sog FROM signal.t_vessel_tracks_5min t, LATERAL ST_DumpPoints(t.track_geom) dp - WHERE t.time_bucket >= NOW() - INTERVAL '{hours} hours' + WHERE t.time_bucket >= %s + AND t.time_bucket <= %s AND t.track_geom && ST_MakeEnvelope(122, 31, 132, 39, 4326) ORDER BY t.mmsi, to_timestamp(ST_M((dp).geom)) """ try: with get_conn() as conn: - df = pd.read_sql_query(query, conn) + df = pd.read_sql_query(query, conn, params=(window_start, safe_bucket)) logger.info( - 'fetch_all_tracks: %d rows, %d vessels (last %dh)', + 'fetch_all_tracks: %d rows, %d vessels (window=%s..%s, last %dh safe)', len(df), df['mmsi'].nunique() if len(df) > 0 else 0, + window_start, + safe_bucket, hours, ) return df @@ -98,9 +106,20 @@ def fetch_all_tracks(hours: int = 24) -> pd.DataFrame: def fetch_incremental(last_bucket: datetime) -> pd.DataFrame: """last_bucket 이후의 신규 궤적 포인트를 조회한다. - 스케줄러 증분 업데이트에 사용되며, time_bucket > last_bucket 조건으로 - 이미 처리한 버킷을 건너뛴다. + safe_bucket(현재 - 12분)까지만 조회하여 미완성 버킷을 제외하고, + from_bucket(last_bucket - 15분)부터 재수집하여 지연 INSERT를 보상한다. """ + safe_bucket = compute_safe_bucket() + from_bucket = compute_incremental_window_start(last_bucket) + if safe_bucket <= from_bucket: + logger.info( + 'fetch_incremental skipped: safe_bucket=%s, from_bucket=%s, last_bucket=%s', + safe_bucket, + from_bucket, + last_bucket, + ) + return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'raw_sog']) + query = """ SELECT t.mmsi, @@ -115,17 +134,20 @@ def fetch_incremental(last_bucket: datetime) -> pd.DataFrame: FROM signal.t_vessel_tracks_5min t, LATERAL ST_DumpPoints(t.track_geom) dp WHERE t.time_bucket > %s + AND t.time_bucket <= %s AND t.track_geom && ST_MakeEnvelope(122, 31, 132, 39, 4326) ORDER BY t.mmsi, to_timestamp(ST_M((dp).geom)) """ try: with get_conn() as conn: - df = pd.read_sql_query(query, conn, params=(last_bucket,)) + df = pd.read_sql_query(query, conn, params=(from_bucket, safe_bucket)) logger.info( - 'fetch_incremental: %d rows, %d vessels (since %s)', + 'fetch_incremental: %d rows, %d vessels (from %s, safe %s, last %s)', len(df), df['mmsi'].nunique() if len(df) > 0 else 0, + from_bucket.isoformat(), + safe_bucket.isoformat(), last_bucket.isoformat(), ) return df diff --git a/prediction/time_bucket.py b/prediction/time_bucket.py new file mode 100644 index 0000000..2cc741d --- /dev/null +++ b/prediction/time_bucket.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +from datetime import datetime, timedelta, timezone +from zoneinfo import ZoneInfo + +from config import settings + +_KST = ZoneInfo('Asia/Seoul') +_BUCKET_MINUTES = 5 + + +def normalize_bucket_kst(bucket: datetime) -> datetime: + if bucket.tzinfo is None: + return bucket + return bucket.astimezone(_KST).replace(tzinfo=None) + + +def floor_bucket_kst(value: datetime, bucket_minutes: int = _BUCKET_MINUTES) -> datetime: + if value.tzinfo is None: + localized = value.replace(tzinfo=_KST) + else: + localized = value.astimezone(_KST) + floored_minute = (localized.minute // bucket_minutes) * bucket_minutes + return localized.replace(minute=floored_minute, second=0, microsecond=0) + + +def compute_safe_bucket(now: datetime | None = None) -> datetime: + current = now or datetime.now(timezone.utc) + if current.tzinfo is None: + current = current.replace(tzinfo=timezone.utc) + safe_point = current.astimezone(_KST) - timedelta(minutes=settings.SNPDB_SAFE_DELAY_MIN) + return floor_bucket_kst(safe_point).replace(tzinfo=None) + + +def compute_initial_window_start(hours: int, safe_bucket: datetime | None = None) -> datetime: + anchor = normalize_bucket_kst(safe_bucket or compute_safe_bucket()) + return anchor - timedelta(hours=hours) + + +def compute_incremental_window_start(last_bucket: datetime) -> datetime: + normalized = normalize_bucket_kst(last_bucket) + return normalized - timedelta(minutes=settings.SNPDB_BACKFILL_BUCKETS * _BUCKET_MINUTES)