LeetCode: 703. Kth Largest Element in a Stream

LeetCode: 703. Kth Largest Element in a Stream

LeetCode: 703. Kth Largest Element in a Stream

Summary

Design a class that continuously returns the k-th largest value in a stream.

Approach

Use a min-heap.

Keep only the largest k elements seen so far:

  • If the heap size is less than k, push the new value.
  • If the heap size is k and the new value is larger than the root, replace the root.
  • Otherwise, ignore the new value.

Then the root of the heap is always the k-th largest value.

Function

Initialization:

  • heap: min-heap of scores.
  • k: target rank.

When we get a new score:

  • If len(heap) < k, push the score and return the heap root.
  • Otherwise (len(heap) == k):
    • If the score is larger than the current root, replace the root.
    • Else, skip.
  • Return the heap root.

Complexity

Time Complexity:

  • Initialization: O(N + (N-K)logN) = O(NlogN). heapify costs O(N), and shrinking to size k costs O((N-K)logN).
  • add: O(logK) (or O(logN) for the first approach while heap size is still larger than k). Space complexity: O(K).

Code

import heapq
class KthLargest:
    def __init__(self, k, nums):
        self._heap = nums
        heapq.heapify(self._heap)  # O(N)
        self._k = k
        while len(self._heap) > k:
            _ = heapq.heappop(self._heap)  # O(logN) per pop
            
    def add(self, val: int) -> int:
        if len(self._heap) < self._k:
            heapq.heappush(self._heap, val)
        else:
            if self._heap[0] < val:
                heapq.heapreplace(self._heap, val)
        return self._heap[0] if self._heap else None

Optimization

We can reduce initialization to O(NlogK) by building an empty heap and calling add for each value.

import heapq
class KthLargest:
    def __init__(self, k, nums):
        self._heap = []
        self._k = k
        for n in nums:
            self.add(n)
            
    def add(self, val: int) -> int:
        if len(self._heap) < self._k:
            heapq.heappush(self._heap, val)
        else:
            if self._heap[0] < val:
                heapq.heapreplace(self._heap, val)
        return self._heap[0] if self._heap else None