Storage Reference
1. 목적
이 문서는 Hospital-at-Home demo에서 사용하는 최소 PostgreSQL/Redis 저장소 경계를 설명합니다.
저장소는 product architecture의 중심이 아니라 outbound adapter입니다. Domain과 usecase는 PostgreSQL, Redis, table name, key name을 알지 않습니다. patient-risk-python-api app의 outbound persistence adapter가 SignalPacket, ReviewQueueItem, MonitoringSnapshot domain object를 SQLAlchemy ORM record 또는 Redis JSON document로 변환합니다. python-persistence package는 SQLAlchemy/Redis foundation만 제공합니다.
1-1. 저장소 선택 기준
| Store | Role | Data |
|---|---|---|
| PostgreSQL | 재현성과 조회가 필요한 장기 상태 | signal packet, risk assessment, review queue item, alarm audit event |
| Redis | 빠른 갱신이 필요한 runtime state | latest monitoring snapshot, alarm acknowledge/mute state |
이 구분은 성능 최적화만을 위한 것이 아닙니다. SaMD 흐름에서는 나중에 audit trail로 되짚어야 하는 입력과, 화면을 빠르게 갱신하기 위한 최신 상태를 분리해야 합니다.
1-2. 1단계 범위
1단계 storage는 local demo와 adapter boundary 검증용입니다.
- PostgreSQL schema는 API app의 Alembic migration으로 준비합니다.
Base.metadata.create_all()은 local/demo convenience로만 남기고 기본값은 끕니다.- Redis key는 demo prefix 아래 latest snapshot을 저장합니다.
python-persistence는 SQLAlchemy engine/session/pool, Redis client/pool/retry/readiness foundation만 제공합니다.- patient-risk table, Redis key, TTL, mapper, repository는 API app의 outbound persistence adapter가 결정합니다.
- backup, retention, PHI partition, encryption policy는 CD/infra repo의 책임으로 둡니다.
Alembic은 backup 도구가 아니라 schema migration/versioning 도구입니다. Alembic project와 revision은 patient-risk table ownership을 가진 patient-risk-python-api app에 두고, python-persistence는 SQLAlchemy Base, engine/session, Unit of Work foundation만 제공합니다.
2. PostgreSQL
PostgreSQL은 재현성과 조회가 필요한 데이터를 보관합니다. 현재 demo는 조회에 필요한 최소 column과 JSONB payload를 함께 저장해 schema evolution을 단순하게 보여줍니다.
Schema의 현재 source of truth는 app-owned Alembic revision입니다. ORM record는 mapper와 repository 구현을 위한 application persistence record입니다.
apps/patient-risk-python-api/src/patient_risk_python_api/adapters/outbound/persistence/models.py
apps/patient-risk-python-api/migrations/versions/0001_patient_risk_baseline.py
Repository adapter는 local/demo에서 PATIENT_RISK_AUTO_CREATE_SCHEMA=true일 때만 Base.metadata.create_all()로 schema를 준비할 수 있습니다. 기본값은 false입니다. Domain과 usecase는 table, index, JSON/JSONB column을 알지 않습니다.
2-1. patient_signal_packets
Signal packet ingestion 원본을 저장합니다.
create table if not exists patient_signal_packets (
packet_id text primary key,
site_id text not null,
device_id text not null,
demo_subject_id text not null,
timestamp_ms bigint not null,
payload jsonb not null,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);
create index if not exists patient_signal_packets_subject_time_idx
on patient_signal_packets (demo_subject_id, timestamp_ms desc);
packet_id는 demo에서 site_id:device_id:timestamp_ms 형태로 만듭니다. 실제 제품에서는 device sequence, gateway id, ingest idempotency key를 더 명확히 설계해야 합니다.
Repository port는 audit trail 검증을 위해 latest_for_subject(demo_subject_id) 조회를 제공합니다. 이 조회는 storage-backed demo와 integration test에서 signal packet이 단순히 insert된 것이 아니라 domain object로 복원 가능한지 확인하는 최소 경계입니다.
2-2. patient_risk_assessments
Risk assessment audit record를 저장합니다.
create table if not exists patient_risk_assessments (
assessment_key text primary key,
encounter_id text not null,
model_name text not null,
horizon text not null,
risk_level text not null,
clinician_review_required boolean not null,
payload jsonb not null,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);
create index if not exists patient_risk_assessments_encounter_idx
on patient_risk_assessments (encounter_id, updated_at desc);
assessment_key는 demo에서 encounter_id:model_name:horizon 형태로 만듭니다. 실제 제품에서는 inference request id, model artifact digest, input packet reference, reviewer attribution을 별도 column/event로 확장해야 합니다.
2-3. patient_review_queue
Clinician review queue item을 저장합니다.
create table if not exists patient_review_queue (
review_item_id text primary key,
demo_subject_id text not null,
priority text not null,
last_updated_ms bigint not null,
payload jsonb not null,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);
create index if not exists patient_review_queue_priority_time_idx
on patient_review_queue (
priority,
last_updated_ms desc
);
Review queue list는 priority와 last_updated_ms 기준으로 정렬합니다. 이 table은 clinician workflow projection이고, risk assessment의 source of truth는 patient_risk_assessments입니다. 이 정렬은 demo UI에서 가장 중요한 review item을 먼저 보여주기 위한 최소 정책입니다.
2-4. patient_alarm_audit_events
Alarm acknowledge/mute 같은 clinician interaction event를 append-only 형태로 저장합니다.
create table if not exists patient_alarm_audit_events (
event_id text primary key,
alarm_id text not null,
demo_subject_id text not null,
action text not null,
actor_ref text not null,
occurred_at_ms bigint not null,
payload jsonb not null,
created_at timestamptz not null default now()
);
create index if not exists patient_alarm_audit_events_alarm_time_idx
on patient_alarm_audit_events (alarm_id, occurred_at_ms desc);
Redis snapshot은 현재 alarm state projection입니다. 반면 patient_alarm_audit_events는 누가 어떤 action을 했고 alarm state가 어떻게 바뀌었는지 남기는 audit trail입니다. Demo에서는 actor_ref를 단순 문자열로 전달하지만, 실제 제품에서는 인증 주체, 역할, 병원/병동 context, reason code, client timestamp와 server timestamp를 분리해야 합니다.
2-5. patient_monitoring_snapshot_projections
Redis latest monitoring snapshot은 runtime projection입니다. SQL write가 성공한 뒤 Redis write가 실패하면 PostgreSQL에는 raw/result state가 남고 Redis snapshot은 stale일 수 있습니다. 이 상태를 관찰 가능하게 만들기 위해 app은 SQL UoW 안에 projection intent를 저장합니다.
create table if not exists patient_monitoring_snapshot_projections (
projection_id text primary key,
demo_subject_id text not null,
snapshot_key text not null,
latest_key text not null,
payload_fingerprint text not null,
payload jsonb not null,
status text not null,
attempt_count integer not null,
last_error text,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);
status는 pending, published, failed 중 하나입니다. API app은 signal ingestion SQL UoW에서 projection intent를 pending으로 저장한 뒤 Redis write를 시도합니다. Redis write가 성공하면 published, 실패하면 failed와 last_error를 기록하고 예외를 유지합니다. 이 구조는 이후 worker가 failed 또는 오래된 pending projection을 재시도할 수 있는 기반입니다.
이 table은 event bus가 아닙니다. Redis projection을 한 번 더 써도 downstream consumer는 payload_fingerprint 같은 idempotency key를 기준으로 중복 event를 만들지 않아야 합니다.
2-6. Migration 운영 기준
현재 baseline은 Alembic revision 0001_patient_risk_baseline입니다. 제품 단계에서는 Base.metadata.create_all()에 의존하지 않고 Alembic migration을 release pipeline 또는 migration job에서 실행합니다.
전환할 때 지켜야 하는 기준은 아래입니다.
- 현재 API app의 SQLAlchemy ORM metadata와 Alembic revision이 같은 schema를 표현해야 합니다.
- Alembic revision id와 schema baseline metadata를 release note 또는 migration table에서 추적합니다.
- runtime app startup에서 destructive DDL을 실행하지 않습니다.
create table if not exists,create index if not exists처럼 idempotent한 demo bootstrap과 versioned migration을 구분합니다.- migration revision은 domain/usecase가 아니라 storage adapter 또는 delivery layer의 책임입니다.
- schema 변경 PR은 repository adapter integration test와
make demo/check/storage로 확인합니다.
전환 트리거는 아래처럼 둡니다.
| Trigger | Why migration tool is needed |
|---|---|
| column rename/drop 또는 type 변경 | idempotent create table if not exists로는 기존 데이터 변환을 표현할 수 없습니다. |
| backfill이나 data correction 필요 | schema 변경과 data migration 순서를 version으로 추적해야 합니다. |
| 운영 환경에서 rolling deploy 필요 | app startup DDL과 request serving lifecycle을 분리해야 합니다. |
| 여러 service가 같은 table을 읽음 | migration ownership과 compatibility window가 필요합니다. |
| PHI/retention/access-control policy 적용 | schema, index, retention job, permission 변경을 review 가능한 revision으로 남겨야 합니다. |
Schema를 변경할 때는 SQLAlchemy ORM metadata와 Alembic revision이 같은 table/index를 만든다는 test를 둡니다. repository adapter integration test가 migration 적용 DB와 demo bootstrap DB에서 같은 repository behavior를 보이는지 확인합니다. App startup의 auto_create_schema 기본값은 꺼져 있고 local/demo에서만 명시적으로 켭니다.
현재 unit test는 domain dataclass와 ORM/Redis document mapper가 분리되어 있는지, repository adapter가 domain object를 복원 가능한지 확인하는 방향으로 둡니다. 이 test는 migration tool을 대체하지 않지만, demo baseline이 domain model을 table shape에 종속시키는 것을 막는 작은 guard입니다.
3. Redis
Redis는 latest monitoring snapshot과 alarm interaction state를 저장합니다.
3-1. Key shape
기본 prefix는 patient-risk입니다.
patient-risk:monitoring-snapshots:{demo_subject_id}
patient-risk:monitoring-snapshots:latest
첫 번째 key는 subject별 monitoring snapshot JSON payload를 저장합니다. 두 번째 key는 가장 최근 snapshot의 demo_subject_id를 저장합니다.
3-2. Alarm state
Alarm acknowledge/mute는 MonitoringSnapshot 안의 alarm state를 갱신한 뒤 같은 Redis snapshot key에 다시 저장합니다.
Redis에는 latest state만 둡니다. Audit event는 PostgreSQL의 patient_alarm_audit_events에 append합니다. 이 분리는 UI가 빠르게 현재 상태를 읽으면서도, clinical action 이력은 재현 가능한 장기 저장소에 남기기 위한 최소 제품 경계입니다.
API app은 같은 이력을 GET /alarms/{alarm_id}/audit-events로 노출합니다. 이 endpoint는 storage adapter의 append-only audit log를 clinician workflow와 운영 smoke에서 확인할 수 있게 하는 read-only adapter입니다. 응답은 previous와 current state를 함께 담아 acknowledge/mute action이 어떤 alarm state transition을 만들었는지 추적합니다.
4. 실행 방법
4-1. Connection, pool, retry
python-persistence는 SQLAlchemy와 Redis의 연결 lifecycle을 generic foundation으로 제공합니다.
| Concern | 위치 | 설명 |
|---|---|---|
| SQLAlchemy engine/pool | python-persistence |
pool_pre_ping, pool size, overflow, timeout, recycle 같은 연결 정책을 config로 표현합니다. |
| SQLAlchemy session | python-persistence |
session factory, commit/rollback/close boundary, transaction retry helper를 제공합니다. |
| SQLAlchemy ORM record | API app | patient-risk table과 column은 app outbound persistence adapter가 정의합니다. |
| Redis connection pool | python-persistence |
max connections, socket timeout, keepalive, health-check interval을 config로 표현합니다. |
| Redis retry/backoff/jitter | python-persistence |
redis-py retry/backoff를 config로 만들되, command idempotency 판단은 app adapter가 맡습니다. |
| Redis key/payload | API app | patient-risk key prefix, segment, TTL, JSON document version은 app outbound persistence adapter가 정의합니다. |
이 구분은 python-persistence가 너무 얇은 helper가 되지 않게 하면서도, 특정 제품의 table/key naming까지 가져가지 않게 하기 위한 경계입니다.
4-2. Readiness 확인
Python API의 /healthz는 process liveness만 확인하고, /readyz는 storage dependency readiness를 확인합니다. postgres-redis backend에서는 PostgreSQL과 Redis check가 모두 통과해야 ready입니다. 실패 시에도 configured check를 모두 실행해 PostgreSQL과 Redis 중 어느 dependency가 동시에 실패했는지 한 응답에서 볼 수 있게 합니다.
실패 응답은 어느 check까지 통과했고 어느 dependency에서 실패했는지 구분합니다.
{
"status": "not-ready",
"serviceName": "patient-risk-python-api",
"storageBackend": "postgres-redis",
"failedCheck": "redis",
"failureType": "TimeoutError",
"failedChecks": [
{ "name": "redis", "failureType": "TimeoutError" }
],
"dependencySummary": {
"total": 2,
"ready": 1,
"failed": 1
},
"durationMs": 1004,
"checks": [
{ "name": "postgres", "status": "ready", "durationMs": 3 },
{ "name": "redis", "status": "failed", "failureType": "TimeoutError", "durationMs": 1001 }
]
}
이 response는 operator나 Docker Compose healthcheck가 dependency 문제를 liveness 문제와 구분하기 위한 최소 관측성입니다. Top-level durationMs는 readiness check 전체 시간이고, 각 check의 durationMs는 특정 dependency가 느려지는 상황을 구분하기 위한 adapter-level signal입니다. dependencySummary는 configured dependency 중 몇 개가 ready/failed인지 바로 판단하게 해 줍니다. 구체적인 exception message는 response에 넣지 않습니다.
4-3. Local container 실행
Local storage container를 올립니다.
make storage/up
Schema migration을 적용합니다.
cd apps/patient-risk-python-api
DATABASE_URL=postgresql+psycopg://patient_risk:patient_risk@127.0.0.1:55432/patient_risk \
uv run --managed-python --package patient-risk-python-api alembic upgrade head
Storage-backed end-to-end smoke를 실행합니다. 이 명령은 필요한 container를 직접 띄우고 완료 후 정리합니다.
make demo/check/storage
PostgreSQL container 초기화가 느린 local Docker나 self-hosted runner에서는 storage wait timeout을 늘릴 수 있습니다. 기본값은 90초입니다.
STORAGE_WAIT_TIMEOUT_SECONDS=120 make demo/check/storage
저장소를 계속 띄워 둔 채 adapter integration test를 직접 실행했다면 정리합니다.
make storage/down
4-4. 선택 실행 테스트
PostgreSQL/Redis adapter integration test는 DATABASE_URL, REDIS_URL이 있을 때만 실제 저장소에 접근합니다. 환경변수가 없으면 skip됩니다.
DATABASE_URL=postgresql+psycopg://patient_risk:patient_risk@127.0.0.1:55432/patient_risk \
REDIS_URL=redis://127.0.0.1:56379/0 \
uv run --managed-python --package patient-risk-python-core \
pytest packages/patient-risk-python-core/tests/integration/test_storage_repositories.py
이 테스트는 SignalPacket ingestion usecase를 실행하고, PostgreSQL signal packet/risk assessment/review queue/alarm audit event 조회와 Redis latest monitoring snapshot, alarm acknowledge/mute flow를 확인합니다.