Python Docs
이 문서는 Hospital-at-Home 예제에서 Python이 맡는 signal ingestion, clinical workflow API, risk assessment orchestration 책임을 설명합니다. Python은 HTTP/MQTT 같은 inbound protocol과 Rust preprocessing adapter를 조립하지만, raw signal heavy preprocessing이나 frontend rendering은 담당하지 않습니다.
Python 문서는 MkDocs와 mkdocstrings를 기준으로 관리합니다.
uv run --managed-python mkdocs serve
uv run --managed-python mkdocs build
Python app/package는 docstring을 API reference의 source of truth로 두고, 사용법과 운영 문서는 Markdown으로 작성합니다.
1. Hospital-at-Home 구현 기준
이 시나리오에서 Python은 AI/ML orchestration, clinical workflow API, signal ingestion usecase를 담당합니다. 1차 demo는 HTTP POST /signal-packets를 기본 ingestion path로 사용하고, grouped path는 POST /signal-frame-sets로 제공합니다. Rust preprocessing은 API app의 outbound adapter 뒤에 둡니다. 고주파 waveform과 20 bed 이상 동시 처리를 고려할 때 app runtime은 여러 SignalFrame을 SignalFrameSet으로 묶을 수 있어야 하지만, core/domain public contract에는 Batch라는 이름을 사용하지 않습니다. 1단계에서는 실제 ML training pipeline 없이 deterministic rule hybrid로 구현합니다.
Python 경계도 repo 전체의 app/package 원칙을 따릅니다. Core package는 clinical workflow의 제품 의미와 subject별 결과 contract를 만들고, API app은 inbound protocol, grouping policy, Rust preprocessing 호출, subject별 fan-out, transaction, persistence ordering, readiness, observability를 조율합니다.
1-1. 책임
Python이 담당합니다.
- HTTP signal packet ingestion adapter for local demo
- Rust preprocessing package 호출
- clinical context repository
- demo model metadata와 threshold policy
- risk assessment usecase
- alarm policy와 alarm state 생성
- monitoring snapshot API
- in-memory review queue repository와 API
- HTTP API request/response contract
- model version, policy version, request id audit context
Python이 담당하지 않습니다.
- raw signal heavy preprocessing 구현
- firmware packet 생성
- frontend view rendering
- Kubernetes reconcile logic
- 실제 모델 학습 pipeline
1-2. Signal ingestion
Device 또는 simulator에서 backend로 들어오는 1차 demo telemetry는 HTTP POST /signal-packets를 기본값으로 둡니다. MQTT는 domain이 아니라 inbound adapter이므로, 붙이더라도 같은 API app orchestration을 호출해야 합니다. 현재 코드는 broker client 없이 adapters/inbound/mqtt message handler를 제공하고, 이 handler는 topic과 payload를 domain input으로 변환한 뒤 HTTP route와 같은 workflow를 호출합니다.
현재 1차 흐름은 아래와 같습니다.
FastAPI POST /signal-packets
-> Pydantic request model validation
-> API app workflow
-> SignalGroupingPolicy(size 1 for HTTP packet path)
-> Rust preprocessing outbound adapter
-> core subject result builder
-> alarm policy
-> SignalIngestionResult
-> app persistence orchestration and HTTP response
Grouped signal path는 같은 원칙을 따릅니다.
HTTP / MQTT / relay input
-> API app adapter
-> SignalFrameSet
-> Rust preprocessing call
-> SignalProcessingRunResult
-> for each item: core clinical result builder
-> app persistence/projection fan-out
batch_size는 Python core usecase 인자가 아닙니다. API app은 SignalGroupingPolicy로 maxItems, maxLatencyMs, maxPointsPerSet, maxWindowMs를 관리하고, core는 이미 만들어진 SignalFrameSet 또는 item별 preprocessed result를 처리합니다. 단건 처리는 크기 1인 set으로 표현합니다.
고주파 waveform 본문은 Python domain object list로 계속 들고 다니지 않습니다. 1차 demo와 contract smoke에서는 channel payload에 inline samples를 둘 수 있지만, 제품형 경로에서는 payloadRef를 사용해 binary segment location, encoding, byte length, checksum, sample offset을 넘깁니다. Python API app은 이 envelope를 검증하고 Rust preprocessing adapter에 전달하며, Python core는 이미 계산된 PreprocessedSignal과 clinical context만 처리합니다.
FastAPI와 Pydantic은 inbound adapter의 도구입니다. Pydantic model은 JSON payload를 검증하고 domain input으로 변환하는 데까지만 사용합니다. patient-risk-python-core는 dataclass와 port/protocol 중심으로 유지해 web framework, ASGI server, validation library에 결합되지 않게 합니다.
초기 topic은 아래를 기준으로 둡니다.
patient-risk/{site_id}/{device_id}/signal-packets/v1
patient-risk/{site_id}/{ward_id}/signal-frame-sets/v1
MQTT를 붙일 때의 broker URL, credential, QoS, reconnect policy는 app config와 adapter 책임입니다. Domain과 usecase는 HTTP와 MQTT를 모두 몰라야 합니다.
1-3. Rust-backed Python package 사용
Rust preprocessing은 PyO3 + maturin 기반 Python wheel로 제공합니다. Python API app은 RustSignalPreprocessor outbound adapter를 통해 Rust Python package를 Python 함수처럼 호출합니다. 현재 기본 runtime은 Rust Python package이며, local/CI runtime에는 patient_signal_python_rust module이 설치되어 있어야 합니다. 현재 구현은 단건 packet path를 먼저 제공하지만, 목표 contract는 SignalFrameSet 단위 호출입니다.
patient-risk-python-api
-> patient-signal-python-rust
-> patient-signal-rust-core
-> patient-risk-python-core
Python risk usecase는 Rust preprocessing 결과와 clinical context를 결합합니다.
PreprocessedSignal item
+ ClinicalContext
+ RiskPolicy
+ ModelMetadata
-> RiskAssessment
초기 모델은 deterministic rule hybrid를 사용합니다. 중요한 것은 모델 크기가 아니라, demo model metadata, threshold policy, evidence, risk horizon이 응답과 audit context에 남는 것입니다.
Rust preprocessor는 Rust core가 정의한 feature contract를 유지해야 합니다. 예를 들어 missing ratio는 invalid physiological sample뿐 아니라 dropped sample과 sequence gap을 반영하고, artifact score는 motion burden, low contact, sequence disorder, clipped/saturated/excessive motion flag를 함께 고려합니다. Python API app은 같은 계산을 재구현하지 않고 Rust Python package import, payload mapping, audit context propagation을 검증합니다.
20 bed 이상을 처리하는 경로에서는 partial failure가 정상 케이스입니다. Rust preprocessing 결과는 전체 run status와 item status를 분리해야 하며, Python app은 성공한 item만 clinical workflow로 fan-out하고 실패/skip item은 audit 가능한 error result로 남깁니다. 한 frame의 품질 문제나 payload 누락 때문에 같은 set 안의 다른 subject 결과까지 실패시키지 않습니다.
Rust 구현은 feature 값뿐 아니라 provenance도 반환합니다. PreprocessedSignal.preprocessing_version은 Python API의 auditContext.preprocessingVersion에 사용합니다. 따라서 실행 결과가 UI와 API contract에서는 rust-signal-preprocessor-<version>으로 드러나야 합니다.
Preprocessing 기준이 흔들리지 않게 shared signal fixture를 사용합니다. Rust core integration test와 API app의 Python Rust package binding test는 같은 fixtures/signals/*.csv를 읽고 normal, low SpO2 trend, motion artifact, sensor dropout 패턴을 같은 방향으로 해석해야 합니다. Python core의 일반 workflow test는 fixed PreprocessedSignal을 입력으로 넣어 clinical result builder만 검증합니다.
1-4. Policy와 review queue
1단계 Python 예제의 기본값은 in-memory repository입니다. 다만 제품에 가까운 흐름을 보여주기 위해 repository port 뒤에 최소 PostgreSQL/Redis adapter를 둡니다. 기본 demo와 CI는 PATIENT_RISK_STORAGE_BACKEND=memory로 빠르게 돌고, storage 흐름을 확인할 때는 PATIENT_RISK_STORAGE_BACKEND=postgres-redis, DATABASE_URL, REDIS_URL을 함께 설정합니다.
PostgreSQL/Redis client는 core package의 기본 dependency가 아닙니다. python-persistence package가 SQLAlchemy/Redis foundation을 제공하고, patient-risk 전용 table/key/mapper/repository는 API app의 outbound persistence adapter에 둡니다. Domain/usecase만 소비하는 환경은 storage driver를 설치하지 않아도 import 가능해야 합니다.
권장 파일은 아래와 같습니다.
packages/patient-risk-python-core/
resources/
demo-model-metadata.json
demo-risk-policy.yaml
demo-risk-policy.yaml은 최소한 아래 값을 포함합니다.
- SpO2 drop threshold
- heart rate trend threshold
- motion/artifact quality threshold
- risk level mapping
- policy version
Risk assessment 결과는 core result builder가 SignalIngestionResult로 반환하고, API app workflow가 raw packet, assessment, review item, monitoring snapshot projection intent를 저장합니다. Redis latest monitoring snapshot은 app persistence adapter가 idempotent projection으로 갱신합니다. Web app은 GET /monitoring-snapshot과 GET /review-queue로 조회합니다. Python core는 risk score, evidence, data quality, alarm state를 조합해 frontend가 재계산 없이 표시할 수 있는 MonitoringSnapshot을 만듭니다.
저장소 책임은 아래처럼 나눕니다.
| Store | Responsibility |
|---|---|
| PostgreSQL | SignalPacket, RiskAssessment, ReviewQueueItem, AlarmAuditEvent, projection intent처럼 audit trail과 조회가 필요한 state |
| Redis | latest MonitoringSnapshot, alarm acknowledge/mute처럼 빠른 조회가 필요한 runtime projection |
이 구분은 성능 때문만이 아닙니다. 장기 보관과 재현성이 필요한 데이터는 PostgreSQL에 두고, 화면을 빠르게 갱신하기 위한 최신 상태는 Redis에 둬서 결합을 줄입니다.
API app은 Alembic baseline migration을 소유합니다. python-persistence는 SQLAlchemy Base, engine/session, Unit of Work foundation만 제공하고, patient-risk table과 migration revision은 patient-risk-python-api에 둡니다. Base.metadata.create_all()은 local/demo convenience로만 남기며 기본값은 꺼져 있습니다.
Storage backend는 API lifecycle과도 분리합니다. /healthz는 process liveness만 표현하고, /readyz는 StorageHealthCheck port를 통해 PostgreSQL/Redis adapter가 실제로 준비되었는지 확인합니다. 이렇게 나누면 Kubernetes나 Docker Compose가 app process 생존과 dependency readiness를 서로 다른 신호로 볼 수 있습니다.
1-5. HTTP API
Python API는 clinician UI와 integration test가 사용할 계약을 제공합니다.
현재 app 구현은 FastAPI ASGI app을 composition root에서 생성하고, uvicorn으로 실행합니다. API test는 fastapi.testclient.TestClient로 route, presenter, error shape를 검증합니다. 이 테스트는 network socket이나 process lifecycle을 검증하지 않고, inbound adapter가 package usecase를 올바르게 호출하는지를 확인합니다.
권장 endpoint는 아래와 같습니다.
GET /healthz
POST /signal-packets
POST /risk-assessments
GET /monitoring-snapshot
GET /review-queue
GET /review-queue/{assessment_id}
POST /alarms/{alarm_id}/ack
POST /alarms/{alarm_id}/mute
/signal-packets는 CI와 local demo를 위한 1차 ingestion path입니다. MQTT adapter는 같은 ingestion usecase를 호출하는 2차 확장으로 둡니다. 즉 HTTP와 MQTT가 서로 다른 business flow를 만들면 안 됩니다.
/monitoring-snapshot은 TypeScript runtime의 기본 화면 계약입니다. Backend는 waveform window, numeric measurement status, risk summary, alarm state, data quality, audit context를 한 응답으로 제공합니다. Frontend는 이 결과를 view model로 변환하고, alarm acknowledge/mute action만 API로 돌려보냅니다.
1-6. Test strategy
| Test | Scope |
|---|---|
| Unit | risk policy, alarm policy, evidence builder, clinical context rule |
| Integration | MQTT/HTTP adapter -> usecase -> repository |
| Rust binding | Python input -> Rust Python package -> preprocessed result |
| API contract | HTTP request/response schema, monitoring snapshot, alarm action, error shape |
현재 구현은 Rust preprocessor를 기본값으로 사용합니다. Python API app bootstrap은 runtime에 설치된 patient_signal_python_rust.preprocess_signal_packet(payload)와 patient_signal_python_rust.preprocess_signal_frame_set(payload)를 호출하는 outbound adapter를 조립합니다. patient-signal-python-rust는 PyO3/maturin package로 제공하며, 기본 Python package job과 분리된 Rust Python package 검증 job에서 build/test합니다. Rust package 검증은 API app adapter integration test뿐 아니라 FastAPI POST /signal-packets route가 Rust preprocessing version을 audit context에 노출하는지도 확인합니다. POST /signal-frame-sets route는 processing run status와 item status를 반환하고 successful item만 clinical workflow로 fan-out합니다.
1-7. Clean Architecture 기준
Python에서 Clean Architecture의 핵심은 FastAPI, Pydantic, PostgreSQL, Redis, Rust Python package 호출을 risk domain 안으로 끌고 들어오지 않는 것입니다.
patient-risk-python-core는 clinical workflow의 제품 의미, result contract, model/policy port를 소유합니다. 조회와 alarm state 변경에 필요한 repository capability는 core가 표현할 수 있지만, preprocessing adapter, signal ingestion의 저장 순서, transaction, outbox, readiness 같은 side effect 정책은 API app이 소유합니다. patient-risk-python-api는 FastAPI route, runtime config, health/readiness, adapter composition, preprocessing orchestration, persistence orchestration을 담당합니다.
현재 core package는 domain을 bounded context 단위로 나눕니다.
patient_risk_python_core/
domain/
risk.py RiskPolicy, ModelObservation, RiskAssessment
signal.py SignalPacket, SignalSample, PreprocessedSignal shape
monitoring.py MonitoringSnapshot, ReviewQueueItem projection rule
alarm.py AlarmState, acknowledge/mute domain transition
adapters/outbound/
memory/ local demo용 in-memory repository port implementations
reference/ PHI 없는 reference clinical context provider
application/ baseline scorer, static policy
ports/risk.py repository/model ports
ports/runtime.py Clock, event id generation ports
usecases.py ingestion, assessment, alarm action workflow
python_persistence/
domain/ persistence config/readiness/reliability language
ports/ SQL/key-value/messaging/reliability/health ports
adapters/outbound/ SQLAlchemy and Redis foundations
patient_risk_python_api/adapters/outbound/persistence/
models.py SQLAlchemy ORM records, not domain models
mappers.py pure dataclass <-> ORM/Redis documents
projections.py Redis projection writer and projection status
repositories.py core repository port implementations
keys.py patient-risk Redis key policy
Domain은 flat *_domain.py 파일이 아니라 domain/ subpackage로 둡니다. Risk, signal, monitoring, alarm 규칙을 같은 package 안에 모아 DDD language를 보존하고, app/runtime composition이 필요할 때만 bootstrap.py를 직접 사용합니다.
Core package의 adapters/outbound에는 reusable demo/reference adapter만 둡니다. In-memory repository는 adapters/outbound/memory, PHI 없는 demo clinical context는 adapters/outbound/reference, Rust preprocessing package 호출과 baseline risk scoring은 application/으로 분리합니다. 이 이름 구분이 중요합니다. core adapter라고 해서 SQLAlchemy, Redis, OpenTelemetry exporter 같은 runtime infrastructure adapter까지 core가 소유하면 안 됩니다. 그런 table, key, schema, retry 정책과 signal ingestion 저장 순서는 app/persistence layer가 가져야 합니다.
FastAPI route
-> Pydantic request validation
-> API app usecase
-> core usecase
-> API app Rust preprocessing adapter
-> repository ports
-> response presenter
단단하게 고정할 것은 RiskAssessment, MonitoringSnapshot, AlarmState, ReviewQueueItem, audit context의 의미입니다. 느슨하게 둘 것은 HTTP/MQTT inbound adapter, SQLAlchemy table shape, Redis key/document shape, Rust Python package payload shape입니다. SQLAlchemy ORM record와 Redis JSON document는 domain dataclass가 아니라 adapter record입니다.
1-8. DDD 기준
Python bounded context는 risk assessment와 clinical workflow입니다. Python은 device quality evidence와 preprocessing feature를 받아 clinical review에 필요한 판단 결과를 만듭니다.
| Term | Meaning |
|---|---|
RiskAssessment |
policy/model metadata와 evidence를 포함한 risk 판단 결과 |
AlarmState |
frontend가 재계산하지 않는 alarm severity/action state |
ReviewQueueItem |
의료진이 검토해야 하는 작업 단위 |
MonitoringSnapshot |
UI가 표시하는 현재 backend snapshot |
AuditContext |
model/policy/preprocessing/storage provenance |
Pydantic model이나 SQL table 이름이 이 domain language를 대체하면 안 됩니다. Pydantic은 protocol validation, SQL은 persistence detail입니다.
1-9. TDD 기준
Python TDD는 clinical rule을 app server보다 먼저 고정합니다.
- shared fixture와 policy example을 먼저 둡니다.
- risk policy, evidence builder, alarm policy unit test를 작성합니다.
- fixed
PreprocessedSignal과 fake repository로 clinical result builder integration test를 작성합니다. - PostgreSQL/Redis adapter integration test로 persistence contract를 확인합니다.
- FastAPI route test로 HTTP shape와 presenter를 검증합니다.
make demo/check와make demo/check/storage로 runtime composition을 확인합니다.
이 순서를 따르면 storage나 Rust extension을 바꿔도 clinical rule 회귀를 빠르게 잡을 수 있습니다.
1-10. Observability 기준
OpenTelemetry는 app/runtime boundary에서 시작합니다. patient-risk-python-core는 trace provider, meter provider, Loki, Prometheus, collector endpoint를 몰라야 합니다. Python API app은 ports/observability.py와 adapters/inbound/http/observability.py에 request logging과 이후 tracing/metrics middleware를 추가할 자리를 둡니다.
권장 경계는 아래와 같습니다.
| Signal | Primary owner | Notes |
|---|---|---|
| Log | app adapter/process | JSON event로 남기고 Loki가 수집합니다. PHI와 secret은 제외합니다. |
| Metric | app runtime + adapter | request count/latency, readiness check latency, storage failure count를 우선 둡니다. |
| Trace | app runtime + outbound adapter | HTTP request span에서 시작하고 storage/preprocessor call을 child span으로 둡니다. |
| Domain evidence | core package | model version, policy version, preprocessing version은 trace가 아니라 audit context입니다. |
Metric과 tracing은 infra와 함께 봐야 합니다. Collector endpoint, sampling, resource attributes, service name, Kubernetes labels, Prometheus scrape 또는 OTLP export 방식은 app config와 deployment manifest가 같이 바뀌는 영역입니다. 따라서 이번 예제에서는 OpenTelemetry dependency를 바로 넣지 않고, API app 내부에 observability 확장 지점만 둡니다.
2. Package Registry
Python dependency download는 root pyproject.toml의 uv index 설정을 사용합니다.
[[tool.uv.index]]
name = "tirosh-pypi"
url = "https://nexus.internal.tirosh.ai/repository/pypi/simple/"
default = true
authenticate = "always"
credential은 repository에 커밋하지 않고 환경 변수로 주입합니다.
export UV_INDEX_TIROSH_PYPI_USERNAME=github-actions
export UV_INDEX_TIROSH_PYPI_PASSWORD="$NEXUS_PASSWORD"
package publish는 Nexus PyPI guide hosted repository로 보냅니다.
UV_PUBLISH_URL="$NEXUS_PYPI_PUBLISH_URL" \
UV_PUBLISH_USERNAME="$NEXUS_USERNAME" \
UV_PUBLISH_PASSWORD="$NEXUS_PASSWORD" \
uv publish dist/python/*
3. Package Tests
uv sync --managed-python --locked
uv run --managed-python --package patient-risk-python-core pytest packages/patient-risk-python-core/tests/unit -m unit
uv run --managed-python --package patient-risk-python-core pytest packages/patient-risk-python-core/tests/integration -m integration
uv build --package patient-risk-python-core --out-dir dist/python