Build an HL7 Data Lake

Creating a queryable HL7 data lake on AWS.

Michael Sambol
5 min readJul 11, 2023

Introduction

Health Level 7 (HL7) is a data format for exchanging healthcare information between disparate software systems. When you visit a clinic or doctor’s office, a series of HL7 messages are generated that capture the purpose of the visit. The data may be used by an electronic medical record (EMR) system or a health information organization (HIO), amongst others.

Today I’ll walk through a solution for ingesting HL7 messages and making them easily consumable in a data lake on Amazon Web Services (AWS). We’ll focus on HL7 version 2, specifically Admit, Discharge and Transfer (ADT) messages. We’ll use the hl7 Python package that converts HL7 messages into Python objects. You can find the code for my solution here.

Architecture

I chose to ingest HL7 messages via an Amazon Simple Storage Service (S3) bucket. If your data sources don’t support this, you can build an API with Amazon API Gateway or another similar technology. Once the HL7 messages land in S3, S3 event notifications are sent to Amazon Simple Notification Service (SNS). SNS then sends the S3 object name and related metadata (not the actual HL7 message contents) to Amazon Simple Queue Service (SQS), queuing the HL7 message for processing. AWS Lambda (Lambda) polls the SQS queue and reads the HL7 contents from S3, parsing the relevant HL7 data. Lambda writes the data as JavaScript Object Notation (JSON) to both S3 and Amazon Kinesis Data Firehose (Firehose). From there, Firehose transforms the JSON into Apache Parquet (Parquet) to make it efficiently consumable via Amazon Athena (Athena). Errors are handled gracefully by Lambda, as failures are sent to a dead-letter queue (DLQ) in SQS and can be reprocessed or discarded. Below is a diagram of the solution:

Tech Stack

Infrastructure and application code are deployed via AWS Cloud Development Kit (CDK). I prefer this toolkit because it allows me to write infrastructure as code, in this case Typescript, which I chose because CDK itself is written in Typescript. My application code is written in Python as it’s very readable and the hl7 Python package is excellent at converting HL7 messages into Python objects.

Interesting Tidbits and Gotchas

We won’t review all of the code, but here are the important sections.

1. Partitioning of data in S3

Partitioning is important because it limits the amount of data scanned by Athena, resulting in faster queries and reduced cost. Partitioning is largely dependent on data access patterns. I like to think about it in terms of the WHERE clause in my SQL statements, or what I'm filtering on. In this example, I partitioned by source_facility and transaction_date. As a result, the data in S3 is organized as follows:

s3://bucket/source_facility=GA/transaction_date=2022–12–13/ 
s3://bucket/source_facility=GA/transaction_date=2022–12–14/
s3://bucket/source_facility=GA/transaction_date=2022–12–15/
s3://bucket/source_facility=NE/transaction_date=2022–12–13/
s3://bucket/source_facility=NE/transaction_date=2022–12–14/
s3://bucket/source_facility=NE/transaction_date=2022–12–14/
s3://bucket/source_facility=SD/transaction_date=2022–12–13/
s3://bucket/source_facility=SD/transaction_date=2022–12–14/
s3://bucket/source_facility=SD/transaction_date=2022–12–15/

I could have also organized year/month/day in this way:

s3://bucket/source_facility=GA/year=2022/month=12/day=13/ 
s3://bucket/source_facility=GA/year=2022/month=12/day=14/
s3://bucket/source_facility=GA/year=2022/month=12/day=15/

But using transaction_date allows me to easily query across month and year boundaries:

SELECT * FROM hl7_messages
WHERE transaction_date >= date_parse('2022–11–30', '%Y-%m-%d')
AND transaction_date <= date_parse('2022–12–01', '%Y-%m-%d')
limit 100;

2. Dynamic partitioning in Firehose

Above I described the partitioning layout to achieve optimal query performance. To land data in the correct partition (prefix) in S3, I used dynamic partitioning in Firehose. This feature uses jq to parse specific values out of the data. This is powerful because I can run historical and real-time data through the system and it will be correctly partitioned based on the transaction date in the message, not on processing date, and without any post-processing! Here’s a snippet from the CDK code to enable this:

{
parameterName: 'MetadataExtractionQuery',
parameterValue: '{
source_facility: .partitions.source_facility,
transaction_date: .partitions.transaction_date
}'
}

It’s worth mentioning that reprocessing data requires additional care, as you may end up with duplicate records in Parquet files. There are a few ways around this, for example, you can clear files beforehand or add a processed_date field to assist with uniqueness in your queries. If reprocessing is rare, then you may not have to worry about this. Your performance tolerance versus additional code and maintenance is the tradeoff to consider.

3. Size of Parquet files

For this system, I’m less concerned about real-time than I am about efficient data querying (read a great blog about this here). As such, I chose to buffer the data in Firehose for up to 10 minutes or a file size of 128 MB, whichever occurs first. This allows for larger Parquet files in S3 and faster queries via Athena. The buffering works in conjunction with dynamic partitioning, as Firehose internally buffers records based on partition. Here’s the CDK code to buffer records:

bufferingHints: {
intervalInSeconds: 600,
sizeInMBs: 128,
}

4. Writing data to Firehose and S3

There’s a subtle difference when writing to S3 versus Firehose. Firehose expects a string:

firehose.put_record(
DeliveryStreamName=STREAM_NAME,
Record={'Data': json.dumps(parsed_hl7)}
)

While S3 expects bytes:

s3_object.put(
Body=json.dumps(parsed_hl7).encode('UTF-8')
)

5. Deserialization by Firehose

It is important how you write the data into Firehose to avoid data format conversion errors. Let’s look at one specific example. If you look at my schema, you’ll see I’m storing processed_date as type DATE:

{
name: 'processed_date',
type: glue.Schema.DATE,
}

I am using OpenX SerDe in Firehose to deserialize the data (string → object). In order for Firehose to properly convert my value to a date, I set processed_date in Python as follows:

parsed_hl7.processed_date = datetime.datetime.now().isoformat()

6. Lambda batch size for polling SQS

When Lambda polls SQS, it can read messages in batches. To keep the demo code simple, I chose a batch size of 1. This is because additional logic is required to avoid reprocessing if you have a larger batch size and one of the messages fails. You can read more in the AWS docs here. If HL7 messages are ingested into S3 very quickly, you’d want to consider a larger batch size to increase performance and reduce cost.

Viewing Data

Once the data flows through the system, you can easily query it in Athena. First load partitions:

MSCK REPAIR TABLE `hl7_data_lake_dev`;

Then select rows:

SELECT * FROM "hl7_data_lake_db_dev"."hl7_data_lake_dev" limit 10;

You can automate loading partitions by deploying a daily cron (or more frequently if your partitions are more granular) in Amazon EventBridge rules, targeting a Lambda function that runs the MSCK Athena query.

Conclusion

I hope this blog was informative. Drop me a note if you have questions or comments.

--

--

Michael Sambol

Software engineer and sporadic YouTuber. I write about software development and make videos on data structures and algorithms.