作者

InterSystems中国
文章 Claire Zheng · 十一月 11 7m read

OMOP探索之旅——基于GCP医疗API的实时FHIR®到OMOP数据转换(RealTymus)

实时 FHIR® 到 OMOP 转换

OMOP 之旅的这一部分,我们在尝试挑战Scylla 之前先反思一下,InterSystems OMOP 转换是建立在批量 FHIR 导出作为源有效载荷的基础上的,我们是多么幸运。 这开启了 InterSystems OMOP 转换与多个 FHIR® 供应商的放手互操作性,这次是与谷歌云医疗 API 的互操作性

谷歌云医疗 API FHIR® (Google Cloud Healthcare API FHIR®)导出

GCP FHIR® 数据库支持通过命令行界面(CLI)或应用程序编程接口(API)进行FHIR数据的批量导入/导出,其前提条件很简单,且相关文档极为详尽,因此我们就不让模型再针对这方面进行赘述训练了,如有兴趣,我们会附上相关链接。 本段标题中更有价值的是bulk FHIR export (批量FHIR导出)标准本身的实现。

谷歌实施 FHIR® 导出的重要区别在于:通过 Pub/Sub 发送资源变更通知,以及指定增量导出的能力。

实时?

是的!我想我会死在这把剑上的。 这不仅是我的说唱手柄,而且其机制绝对可以支持一个很好的技术论证,可以说...

"当一个新的组织被创建到 FHIR 时,我们会对其进行转换,并将其添加到 InterSystems OMOP CDM 中,与 Care_site/location 一样"。

演练

本文试图简明扼要地概括发布/子通知如何与云功能相结合,将这两种解决方案粘合在一起,并在细粒度级别上实现 OMOP 的自动摄取。

第一步:将 InterSystems OMOP 与 AWS Bucket 连接起来

这个步骤在本社区的帖子中越来越多地重复出现,因此我将以最快的速度介绍这些步骤。

  • 购买 AWS S3 存储桶
  • 启动 InterSystems OMOP,添加桶配置
  • 从 InterSystems OMOP 部署中弹出策略
  • 将策略应用到 AWS S3 存储桶

 

我不知道,这些步骤和图像在我的脑海中似乎更好用,但也许不是。 这里有文档,这里有一个更深入的方法,可以在本系列中用更好的示例来解决这个问题。

第二步:在谷歌云医疗 API 中添加发布/订阅(Pub/Sub)目标

如前所述,实现这一功能的基础是数据存储中的资源变更通知这一超级强大的功能。 您可以在对话框中的设置中找到此选项,也可以在配置后使用。 我通常喜欢选中这两个选项,以便在通知中提供尽可能多的数据。 例如,在删除时,您可以在通知中包含已删除的资源,这对于 EMPI 解决方案来说非常重要。

第三步:云功能 ⭐

云功能需要投入工作,其 SOW 看起来有点像这样。

在创建方法上监听组织类型的 FHIR 资源更改发布/子通知,并从事件触发时起增量导出数据存储。 由于导出功能仅支持 GCS 目标,因此读入创建的导出并创建 fhir 导出压缩文件,将 ndjson 文件压缩到压缩文件根目录,然后将创建的压缩文件推送到 aws bucket。

再次说明第二个让它特别出色的功能,那就是从特定日期和时间导出的功能,这意味着我们不需要导出整个数据集。 为此,我们将使用收到事件的时间,再加上一分钟左右,希望导出、导入和转换的步骤更少,当然也更及时。

 

realtimefhir2omop.py

import os, io, json, base64, time, zipfile, datetime
import requests, boto3
from google.cloud import storage
from google.auth.transport.requests import Request
import google.auth
from google.auth.transport.requests import AuthorizedSession
import base64
import functions_framework
import pathlib
import textwrap
import json
from datetime import datetime, timedelta, timezone



# Config
PROJECT_ID = "pidtoo-fhir"
LOCATION = "us-east4"
DATASET_ID = "isc"
FHIR_STORE_ID = "fhir-omop"
GCS_EXPORT_BUCKET = "fhir-export-bucket"
AWS_BUCKET = "intersystems-fhir2omop"
AWS_REGION = "us-east-2"# Trigger FHIR exportdeftrigger_incremental_export(export_time_iso):
    client = storage.Client()
    bucket = client.bucket("fhir-export-bucket")

    blobs = bucket.list_blobs()
    for blob in blobs:
        print(f"Deleting: {blob.name}")
        blob.delete()
    
    credentials, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)

    export_uri = f"gs://{GCS_EXPORT_BUCKET}/fhir-export-{int(time.time())}/"
    export_uri = f"gs://{GCS_EXPORT_BUCKET}/"
    url = (
        f"https://healthcare.googleapis.com/v1/projects/{PROJECT_ID}/locations/{LOCATION}/"f"datasets/{DATASET_ID}/fhirStores/{FHIR_STORE_ID}:export"
    )

    body = {
        "gcsDestination": {"uriPrefix": export_uri},
        "since": export_time_iso
    }

    response = authed_session.post(url, json=body)
    print(f"Export response: {response.status_code} - {response.text}")
    return export_uri if response.ok elseNone# Poll GCS for export resultsdefwait_for_ndjson_files(export_uri_prefix):
    client = storage.Client()
    bucket_name = export_uri_prefix.split("/")[2]
    prefix = "/".join(export_uri_prefix.split("/")[3:])
    print(bucket_name)
    print(prefix)

    bucket = client.bucket(bucket_name)
    for _ in range(20):  # Wait up to ~5 mins
        blobs = list(bucket.list_blobs(prefix=prefix))
        if any(blob.name.endswith("Organization") for blob in blobs):
            return [blob for blob in blobs if blob.name.endswith("Organization")]
        time.sleep(5)
    raise TimeoutError("Export files did not appear in GCS within timeout window")

# Zip .ndjsons into flat ZIP filedefcreate_zip_from_blobs(blobs, zip_path):
    client = storage.Client()
    with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
        for blob in blobs:
            data = blob.download_as_bytes()
            fname = os.path.basename(blob.name)
            zipf.writestr(fname + ".ndjson", data)

# Upload ZIP to AWS S3defupload_to_s3(zip_path, s3_key):
    s3 = boto3.client('s3', region_name=AWS_REGION)
    s3.upload_file(zip_path, AWS_BUCKET, "from_gcp_to_omop" + s3_key)
    print(f"Uploaded {zip_path} to s3://{AWS_BUCKET}/from_gcp_to_omop/{s3_key}")


#@functions_framework.cloud_event#def mit_grandhack(cloud_event):# Print out the data from Pub/Sub, to prove that it worked#    print(base64.b64decode(cloud_event.data["message"]["data"]))#    question = base64.b64decode(cloud_event.data["message"]["data"]).decode()@functions_framework.cloud_eventdefreceive_pubsub(cloud_event):#envelope = request.get_json()
    print(cloud_event)
    data = base64.b64decode(cloud_event.data["message"]["data"]).decode()
    data = cloud_event.data
    print(data)
    print(type(data))
    ifnot data:
        return"No data", 400#payload = data # json.loads(data)#method = payload.get("protoPayload", {}).get("methodName", "")
    method = data['message']['attributes']['action']
    #resource_name = payload.get("protoPayload", {}).get("resourceName", "")
    resource_name = data['message']['attributes']['resourceType']
    #timestamp = payload.get("timestamp", "")
    timestamp = data['message']['publishTime']
    # Input datetime string# Parse the string to a datetime object
    dt = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=timezone.utc)

    # Subtract 5 minutes
    five_minutes_ago = dt - timedelta(minutes=5)

    # Convert back to ISO 8601 string format with 'Z'
    timestamp = five_minutes_ago.isoformat().replace('+00:00', 'Z')

    print(method)
    print(resource_name)
    print(timestamp)

    if"CreateResource"in method and"Organization"in resource_name:
        print(f"New Organization detected at {timestamp}")
        export_uri = trigger_incremental_export(timestamp)
        ifnot export_uri:
            return"Export failed", 500
        blobs = wait_for_ndjson_files(export_uri)
        zip_file_path = "/tmp/fhir_export.zip"
        create_zip_from_blobs(blobs, zip_file_path)
        s3_key = f"/export-{int(time.time())}.zip"
        upload_to_s3(zip_file_path, s3_key)
        return"Exported and uploaded", 200return"No relevant event", 204

第四步:现在正在发生什么?

为了分清发生了什么,让我们通过每一点的截图来检查实时处理情况。

创建 FHIR Organization

发布/订阅(Pub/Sub)事件已发布

 

发布/订阅 FHIR 事件

{'attributes': {'specversion': '1.0', 'id': '13999883936448345', 'source': '//pubsub.googleapis.com/projects/pidtoo-fhir/topics/fhir-omop-topic', 'type': 'google.cloud.pubsub.topic.v1.messagePublished', 'datacontenttype': 'application/json', 'time': '2025-05-13T20:13:20.339Z'}, 'data': {'message': {'attributes': {'action': 'CreateResource', 'lastUpdatedTime': 'Tue, 13 May 2025 20:13:20 UTC', 'payloadType': 'FullResource', 'resourceType': 'Organization', 'storeName': 'projects/pidtoo-fhir/locations/us-east4/datasets/isc/fhirStores/fhir-omop', 'versionId': 'MTc0NzE2NzIwMDEwNzczODAwMA'}, 'data': 'ewogICJhZGRyZXNzIjogWwogICAgewogICAgICAiY2l0eSI6IC', 'messageId': '13999883936448345', 'message_id': '13999883936448345', 'publishTime': '2025-05-13T20:13:20.339Z', 'publish_time': '2025-05-13T20:13:20.339Z'}, 'subscription': 'projects/pidtoo-fhir/subscriptions/eventarc-us-east4-fhir2omop-trigger-sub-855'}}

云功能从订阅中接收资源事件

云功能导出 FHIR 存储 GCS

云功能从 GCS 创建 ZIP 并推送至 AWS

InterSystems OMOP 将 FHIR 转换为 OMOP

组织可在 CDM 中作为护理站点使用

FHIR 资源是何时转换到 CDM 的?
YARN | Now. You're looking at now. Everything that happens now is happening  now. | Spaceballs (1987) | Video gifs by quotes | 1606b976 | 紗

第四步:验证乐趣

OBS 的趣味性和音频的不确定性(注意:视频来源YouTube,请移步至原帖查看)

总结

去年在MIT Grand Hack 上也做过类似的事情,使用的是相同的设计模式,但中间使用了 Questionairre/Response 资源和 Gemini。
Gemini FHIR Agent MIT Grand Hack