Algorithm: 703. Kth Largest Element in a Stream (LeetCode)

Quiz

Can you optimize the following code?

from typing import List


class KthLargest:
    def __init__(self, k: int, nums: List[int]):
        self._k = k
        self._arr = nums

    def add(self, val: int) -> int:
        self._arr.append(val)
        self._arr = sorted(self._arr)
        return self._arr[-1 * self._k]


# Your KthLargest object will be instantiated and called as such:
# obj = KthLargest(k, nums)
# param_1 = obj.add(val)

Answer

Complexity analysis

O(N log N) time, O(N) additional space.

Python's sorted() uses Powersort since Python 3.11, which runs in O(N log N) time and can require O(N) extra space in the worst case. Source: https://en.wikipedia.org/wiki/Powersort

Answer

O(N log K) time, O(K) additional space, where N is the total number of values processed.

from typing import List


class MinHeap:
    def __init__(self, nums: list[int], max_size: int):
        self._heap = list(
            nums
        )  # create an copy for the mutable object. See why it's needed: https://blog.tsugumisys.com/posts/20251218-c9Wnt.html
        self._max_size = max_size
        self._heapify()
        self._ensure_heap_size()

    def _ensure_heap_size(self):
        while len(self._heap) > self._max_size:
            _ = self.pop()

    def _heapify(
        self,
    ):  # O(M) where M is the max size of this heap. See: https://blog.tsugumisys.com/posts/20251230-ayeKD.html
        for i in range(len(self._heap) // 2, -1, -1):
            print("shiftdown:", i)
            self._shift_down(i)

    def _shift_down(self, idx: int):
        n = len(self._heap)
        while idx < len(self._heap):
            left = 2 * idx + 1
            right = left + 1
            smallest = idx
            if left < n and self._heap[left] < self._heap[smallest]:
                smallest = left
            if right < n and self._heap[right] < self._heap[smallest]:
                smallest = right
            if smallest == idx:
                break
            self._heap[idx], self._heap[smallest] = (
                self._heap[smallest],
                self._heap[idx],
            )
            idx = smallest

    def _bubble_up(self, idx: int):
        while idx > 0:
            parent = (idx - 1) // 2
            if self._heap[idx] > self._heap[parent]:
                break
            self._heap[idx], self._heap[parent] = self._heap[parent], self._heap[idx]
            idx = parent

    def add(self, val: int):
        self._heap.append(val)
        self._bubble_up(len(self._heap) - 1)
        self._ensure_heap_size()

    def pop(self) -> int:
        if len(self._heap) == 0:
            raise KeyError("no data found")
        self._heap[0], self._heap[-1] = self._heap[-1], self._heap[0]
        poped = self._heap.pop()
        self._shift_down(0)
        return poped

    def peek(self) -> int:
        if len(self._heap) == 0:
            raise KeyError("no data found")
        return self._heap[0]


class KthLargest:
    def __init__(self, k: int, nums: List[int]):
        self._heap = MinHeap(nums, k)

    def add(self, val: int) -> int:
        self._heap.add(val)
        return self._heap.peek()


# Your KthLargest object will be instantiated and called as such:
# obj = KthLargest(k, nums)
# param_1 = obj.add(val)

We can use a min-heap to track the k-th largest value. Keep only the k largest numbers; the root is the current k-th largest. When a new value arrives, push it and, if the heap exceeds size k, pop the minimum.

This heap always stores the k largest values, so the root is the k-th largest at any moment.

Building the initial heap takes O(K) time, and each insert (and possible pop) takes O(log K). For more detail on heap operation complexity, see https://blog.tsugumisys.com/posts/20251230-ayeKD.html