
滑动窗口中位数问题要求我们在一个给定整数数组 nums 中,维护一个大小为 k 的滑动窗口,并计算每个窗口内的中位数。双堆法是解决这类问题的经典策略:
通过维持这两个堆的平衡(例如,small 的大小等于 large 或比 large 大 1),我们可以高效地在 O(logK) 时间内添加元素和查找中位数。当 k 为奇数时,中位数通常是 small 堆的堆顶;当 k 为偶数时,中位数是 small 堆顶和 large 堆顶的平均值。
在处理滑动窗口问题时,除了添加新元素,还需要移除窗口左侧滑出的旧元素。原始解决方案通常会遇到“时间限制超出”(TLE)错误,尤其是在 k 值较大(例如 k=50000)且数组长度较大(例如 N=100000)的测试用例中。
以下是原始解决方案的关键代码片段及其性能问题:
import heapq
class Solution(object):
# ... (__init__, balance, addNum, findMedian 略) ...
def popNum(self, num):
# 尝试从堆中移除元素
if num > (self.small[0] * -1): # 判断元素在哪一个堆
self.large.remove(num) # 问题所在:list.remove()
heapq.heapify(self.large) # 问题所在:heapify()
else:
self.small.remove(num * -1) # 问题所在:list.remove()
heapq.heapify(self.small) # 问题所在:heapify()
self.balance() # 重新平衡堆
# ... (medianSlidingWindow 略) ...性能瓶颈分析:
因此,popNum 方法的单次操作时间复杂度为 O(K)。由于滑动窗口会进行 N-K+1 次操作,总的时间复杂度将达到 O(N K)。对于 N=100000, K=50000 这样的规模,`NK会达到5 * 10^9`,远超时间限制。
为了解决 popNum 的效率问题,我们需要一种能在 O(logK) 时间内移除元素的方法。有两种主要策略:
这种方法需要维护一个哈希表(字典),将每个元素的值映射到其在堆列表中的索引。当需要删除一个元素时,可以通过哈希表快速找到其索引,然后将其与堆中最后一个元素交换,移除最后一个元素,并通过“上浮”或“下沉”操作恢复堆属性。这种方法可以实现 O(logK) 的删除,但需要重写 heapq 的内部逻辑,实现起来较为复杂。
惰性删除是更常用且易于实现的优化策略。其核心思想是:不立即从堆中物理移除元素,而是对其进行“标记”,当这些标记元素到达堆顶时再进行处理。
具体实现步骤如下:
这种方法将删除操作的时间复杂度分摊到后续的 peek/pop 操作中,使得每次 add、remove 和 getMedian 的操作都能保持在 O(logK) 的时间复杂度。
为了实现惰性删除,我们需要对堆结构进行封装,并引入 lowindex 来追踪窗口的起始位置。
import heapq
# 辅助函数:将(值, 索引)元组的值部分取反,用于模拟最大堆
def negate(item):
return -item[0], item[1]
# 最小堆的封装类,支持惰性删除
class MinWindowHeap(object):
def __init__(self, conv=lambda x: x):
self.heap = []
self.conv = conv # 转换函数,用于处理最大堆的负值
self.lowindex = 0 # 窗口的下限索引,小于此索引的元素被视为已删除
def peek(self): # 获取堆顶元素,跳过已删除的元素
while self.heap:
item = self.conv(self.heap[0]) # 获取实际值
if item[1] >= self.lowindex: # 如果索引在窗口内,则有效
return item
# 否则,该元素已过期,物理移除
heapq.heappop(self.heap)
return None # 堆为空或只剩已删除元素
def push(self, item): # 添加元素
heapq.heappush(self.heap, self.conv(item))
def pop(self): # 弹出堆顶元素,跳过已删除的元素
item = self.peek() # 先通过peek找到有效元素
if item:
heapq.heappop(self.heap) # 然后物理移除
return item
# 最大堆的封装类,继承自MinWindowHeap,使用negate函数实现
class MaxWindowHeap(MinWindowHeap):
def __init__(self):
super(MaxWindowHeap, self).__init__(negate)
class Solution(object):
def rebalance(self, add):
"""
重新平衡两个堆的大小。
add > 0 表示向某个堆添加了元素,需要重新平衡。
add < 0 表示从某个堆移除了元素(逻辑上),需要重新平衡。
"""
self.balance += add # 维护实际有效元素的数量差
if abs(self.balance) < 2: # 如果平衡,无需操作
return
if self.balance > 1: # small堆元素过多
self.small.push(self.large.pop()) # 从large弹出,推入small
elif self.balance < -1: # large堆元素过多
self.large.push(self.small.pop()) # 从small弹出,推入large
self.balance = 0 # 重新平衡后,差值归零
def insert(self, item):
"""
向双堆结构中插入一个元素。
"""
# 判断新元素应该进入哪个堆
pivot = self.large.peek() # 优先看large堆顶(最小的大数)
islarge = not pivot or item[0] > pivot[0] # 如果large为空或新元素大于large堆顶,则进入large
heap = self.large if islarge else self.small
heap.push(item)
self.rebalance(1 if islarge else -1) # 更新平衡计数并尝试平衡
def remove(self, item):
"""
逻辑上移除一个元素(通过更新lowindex)。
"""
# 判断要移除的元素在哪个堆
pivot = self.large.peek()
islarge = pivot and item[0] >= pivot[0] # 如果large不为空且元素大于等于large堆顶,则在large中
# 关键步骤:更新两个堆的lowindex,标记所有索引小于item[1]+1的元素为已删除
self.large.lowindex = self.small.lowindex = item[1] + 1
self.rebalance(-1 if islarge else 1) # 更新平衡计数并尝试平衡
def getMedian(self):
"""
获取当前窗口的中位数。
"""
if self.balance == 0: # 两个堆大小相等
return (self.large.peek()[0] + self.small.peek()[0]) * 0.5
# 否则,大小不相等,中位数在较大的那个堆的堆顶
return self.large.peek()[0] if self.balance > 0 else self.small.peek()[0]
def medianSlidingWindow(self, nums, k):
"""
主函数:计算滑动窗口中位数。
:type nums: List[int]
:type k: int
:rtype: List[float]
"""
self.small = MaxWindowHeap() # 初始化最大堆
self.large = MinWindowHeap() # 初始化最小堆
self.balance = 0 # 初始化平衡计数
# 将原始数组转换为 (值, 索引) 元组列表
items = [(val, i) for i, val in enumerate(nums)]
# 初始化第一个窗口
for item in items[:k]:
self.insert(item)
result = [self.getMedian()] # 记录第一个窗口的中位数
# 滑动窗口
# olditem 是即将滑出窗口的元素
# item 是即将滑入窗口的元素
for olditem, item in zip(items, items[k:]):
self.remove(olditem) # 逻辑移除旧元素
self.insert(item) # 插入新元素
result.append(self.getMedian()) # 记录当前窗口的中位数
return result
经过惰性删除优化后,各项操作的时间复杂度如下:
因此,每个滑动窗口步骤(移除一个旧元素,添加一个新元素,计算中位数)的摊还时间复杂度为 O(logK)。 整个算法的总时间复杂度为 O(N logK),其中 N 是数组长度。对于 N=100000, K=50000 的情况,100000 * log(50000) 大约是 10^5 * 16,远小于 5 * 10^9,能够满足时间限制。
通过上述优化,我们可以高效地解决滑动窗口中位数问题,即使面对大规模数据也能满足性能要求。
以上就是优化滑动窗口中位数算法:双堆法的高效实现与性能瓶颈解决的详细内容,更多请关注php中文网其它相关文章!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号