Skip to content

Median Heap

Published: at 02:11 AM

Author: jtara1

Introduction

In this article, we look at a secret technique to more efficiently calculate the median on an array of numbers concatenated over time.

Outline:

Requirements

Given an array of numbers, return its median. Given additional numbers concatenated with the array, return the new median.

That is, we can calculate:

Median of [a]      
Median of [a, b]   
Median of [a, b, c]
...
Median of [a, ..., z]

Simple Running Median

Median

def median(numbers):
    numbers.sort()
    n = len(numbers)
    if n % 2 == 0:
        return (numbers[n // 2 - 1] + numbers[n // 2]) / 2
    else:
        return float(numbers[n // 2])


assert median([10, 2, 450]) == 10.0
assert median([4, 1, 3, 2]) == 2.5

We convert to float for a consistent return type.

Simple Running Median Implementation

Given a new segment of the same dataset, calculate the median of the entire dataset we have so far.

def median(numbers):
    numbers.sort()
    n = len(numbers)
    if n % 2 == 0:
        return (numbers[n // 2 - 1] + numbers[n // 2]) / 2
    else:
        return float(numbers[n // 2])


data = []
medians = []


def running_median(numbers):
    global data, medians
    if not numbers:
        return None if not medians else medians[-1]

    for number in numbers:
        data.append(number)
        medians.append(median(data))

    return medians[-1]


assert running_median([51, 4, 11]) == 11.0
# after calling, data would contain [1, 2, 3, 4, 11, 51]
assert running_median([1, 2, 3]) == 3.5

We simply return the median of all known data while still meeting the requirement of calculating the median at each point in the array. While globals are used, we could refactor this to actually encapsulate.

Big-O Runtime

let
  n = len(data)
  m = len(numbers)
  Python collection sort call is in O(x * log(x))
in
  O(m * n * log(n))

To visualize some graphed references, we can peek at this

simple nss arch
Figure 1.1: Graphed common runtime equations

Here we can think about uses where sizes of n and m change and how it’s reflected.

Median Heap: Min Heap and Max Heap

What data structure gives us the minimal value in O(1) time? A heap. What if we put the smaller values in a max heap and the bigger values in a min heap? We want our data to look like this to work:

graph TD subgraph "Max Heap (Lower Half)" A[3] --> B[1] A --> C[2] end subgraph "Min Heap (Upper Half)" F[4] --> G[51] F --> H[11] end

We can take (root1 + root2) / 2 to get the median since they are the same size. We’ll derive later which root we should take to get the median if the heaps aren’t the same size.

Let’s call this special data structure the median heap.

Balancing Heaps

A single standalone heap is a balanced tree after insertion because of the heap data structure algorithm. I state this to clarify we’re balance the values stored between our min heap and max heap to represent our source data in a way such that a root or roots represent the median.

That is, if we push values [1, 2, 3] to data, which heap does each value go in?

Or, suppose we have balanced heaps where all nodes have node.value less than 50 and we push values [101, 102, 103], we can’t just insert all of these into the min heap because our 2 heaps would not define the median then.

Let’s determine which heap to insert into and how to balance them. The Insertion and Balancing Logic When we’re processing a stream of numbers to calculate the running median, we need rules for where to place each new number and how to maintain the balance.

Initial Insertion Decision

The simplest approach is to always insert new elements into one heap first (let’s say the max heap containing smaller values), and then rebalance if needed.

Heaps Property Enforcement

After initial insertion, we need to ensure the fundamental property: every element in the max heap must be smaller than every element in the min heap. If we inserted the number into the max heap and it’s larger than the smallest number in the min heap, we need to:

  1. Remove the largest element from the max heap
  2. Insert this element into the min heap

Size Balancing

For calculating the median efficiently, we want to maintain a specific size relationship between the heaps:

Either they have exactly the same number of elements (for an even total) Or the max heap has exactly one more element than the min heap (for an odd total)

If the heaps become unbalanced after insertion:

Finding the Median

Once properly balanced:

Example with [1, 2, 3, 4, 11, 51]

Let’s see how the algorithm processes these numbers step by step:

Insert 1: Goes into max heap -> max heap [1], min heap []
Median = 1

Insert 2: Goes into max heap -> max heap [1, 2], min heap []
Rebalance due to len(min) < len(max) - 1: move 2 to min heap -> max heap [1], min heap [2]
Median = (1 + 2) / 2 = 1.5

Insert 3: Goes into max heap -> max heap [1, 3], min heap [2]
3 > 2, so swap largest from max to min -> max heap [1], min heap [2, 3]
Rebalance due to len(min) > len(max): move 2 to max heap -> max heap [1, 2], min heap [3]
Median = (2 + 3) / 2 = 2.5

Insert 4: Goes into max heap -> max heap [1, 2, 4], min heap [3]
4 > 3, so swap largest from max to min -> max heap [1, 2], min heap [3, 4]
Median = (2 + 3) / 2 = 2.5

Insert 11: Goes into max heap -> max heap [1, 2, 11], min heap [3, 4]
11 > 3, so swap largest from max to min -> max heap [1, 2], min heap [3, 4, 11]
Rebalance due to len(min) > len(max): move 3 to max heap -> max heap [1, 2, 3], min heap [4, 11]
Median = (3 + 4) / 2 = 3.5
Insert 51: Goes into max heap -> max heap [1, 2, 3, 51], min heap [4, 11]
51 > 4, so swap largest from max to min -> max heap [1, 2, 3], min heap [4, 11, 51]
Median = (3 + 4) / 2 = 3.5

The elegance of this approach is that at each step, we’re maintaining the exact structure needed to calculate the median in constant time, regardless of how many numbers we’ve processed so far.

Median Heap Implementation

import heapq


class MedianHeap:
    def __init__(self):
        self.max_heap = []  # Lower half
        self.min_heap = []  # Upper half

    def __repr__(self):
        max_heap_values = [-x for x in self.max_heap]
        min_heap_values = list(self.min_heap)

        current_median = self.median() if self.max_heap or self.min_heap else "No data"

        return f"MedianFinder(max_heap={sorted(max_heap_values, reverse=True)}, "
               f"min_heap={sorted(min_heap_values)}, "
               f"median={current_median})"

    def add_number(self, num):
        # Add to max_heap first. Use negative numbers to simulate max heap
        heapq.heappush(self.max_heap, -num)

        # Balance: ensure max_heap's maximum is <= min_heap's minimum
        if self.max_heap and self.min_heap and -self.max_heap[0] > self.min_heap[0]:
            max_val = -heapq.heappop(self.max_heap)
            heapq.heappush(self.min_heap, max_val)

        # Ensure size property: max_heap has equal or one more element than min_heap
        if len(self.max_heap) > len(self.min_heap) + 1:
            # Move one element from max_heap to min_heap
            max_val = -heapq.heappop(self.max_heap)
            heapq.heappush(self.min_heap, max_val)
        elif len(self.max_heap) < len(self.min_heap):
            # Move one element from min_heap to max_heap
            min_val = heapq.heappop(self.min_heap)
            heapq.heappush(self.max_heap, -min_val)

    def median(self):
        if len(self.max_heap) > len(self.min_heap):
            return -self.max_heap[0]
        else:
            return (-self.max_heap[0] + self.min_heap[0]) / 2

    def add_numbers(self, numbers):
        for num in numbers:
            self.add_number(num)
        return self.median()


heap = MedianHeap()
heap.add_numbers([51, 4, 11])
print(heap)
assert heap.median() == 11.0

heap.add_numbers([1, 2, 3])
print(heap)
assert heap.median() == 3.5

running it sends this to stdout

MedianFinder(max_heap=[11, 4], min_heap=[51], median=11)
MedianFinder(max_heap=[3, 2, 1], min_heap=[4, 11, 51], median=3.5)

Checking the code, we got rid of the need for a sort call, as balancing between the heaps gives us what we’re looking for from the simple running median’s sort.

Big-O Runtime

let
  m = number of add_number calls
  n = length of all data
  heappush is in O(log(n))
  c = number of heappush per add_number = O(2)
  heappop is in O(1)
  heap[0] is in O(1)
in
  calls to add_number are in O(m * log(n)) 

Load Test

Calling each insert 100 times, inserting 1,000,000 numbers, we get

Simple Median Time: 0.316197 seconds
Median Heap Time: 0.017000 seconds

We could go on to plot runtimes for different inputs or profile the code, but this is just a quick check to see if it’s faster at all.

Conclusion

We start with the intuitive algorithm, determine its runtime, then explore a clever alternative that’s more efficient.

The median heap is especially useful if you need to calculate the median at various points throughout your data or if you have large amounts of data. It provides more value for:

  1. Streaming data applications where you continually receive new values
  2. Sliding window analytics where you both add new elements and remove old ones
  3. Real-time monitoring systems where latency matters

I find it hard to find a reason to not use median heap. Throw it in a library that has one purpose then call it day. Do note, in the real world you’d use a well-established library.