本站承诺永不接任何虚假欺骗、联盟广告、弹窗广告、病毒广告、诱导充值等影响用户体验的广告,广告屏蔽插件会影响本站部分功能,还请不要屏蔽本站广告,感谢支持!

当前位置:首页 / 正文

11143

python使用多线程threading模块长期循环运行内存泄漏问题解决

编程技术 | WangTwoThree | 2022-04-26 | 3 条评论 | 3 次点赞

threading 是 python 中一个内置的多线程库,关于模块这里不做过多介绍,本文主要记录一下遇到的问题及解决方法。

一直以来都是用如下方法执行多线程任务,正常情况下是没问题。

import threading, time

# 限制线程的最大数量
threadmax = threading.BoundedSemaphore(16)
# 将锁内的代码串行化
lock = threading.Lock()
# 多线程 join 用
l = []


def do_something(i):
    '''
    # 上锁的例子
    :param i:
    :return:
    '''
    with lock:  # 上锁
        print(f"当前第 {i} 个任务正在执行")
    time.sleep(5)
    # 释放信号量,可用信号量加一
    threadmax.release()


def do_something_unlock(i):
    '''
    # 不上锁的例子
    :param i:
    :return:
    '''
    print(f"当前第 {i} 个任务正在执行")
    time.sleep(5)
    # 释放信号量,可用信号量加一
    threadmax.release()


if __name__ == '__main__':
    for i in range(1000):
        # 增加信号量,可用信号量减一
        threadmax.acquire()
        t = threading.Thread(target=do_something, args=(i,))
        t.start()
        l.append(t)
    for t in l:
        t.join()

    print('END')

但是工作中遇到一个需求,程序需要 24 小时运行,并且一直循环遍历某个列表并发去执行任务,所以就简单的在上边例子的基础上加了个 while 循环,如下:

import threading, time

# 限制线程的最大数量
threadmax = threading.BoundedSemaphore(16)
# 多线程 join 用
l = []


def do_something(i):
    '''
    # 执行任务
    :param i:
    :return:
    '''
    print(f"当前第 {i} 个任务正在执行")
    time.sleep(5)
    # 释放信号量,可用信号量加一
    threadmax.release()


if __name__ == '__main__':
    while 1:
        for i in range(1000):
            # 增加信号量,可用信号量减一
            threadmax.acquire()
            t = threading.Thread(target=do_something, args=(i,))
            t.start()
            l.append(t)
        for t in l:
            t.join()
        print('end once')

运行几天之后发现问题,内存一直在增加,逐行测试之后,发现问题出在 L 这个变量上。

L 这个变量主要是用来存放需要 join 的任务的,在第一个 for 循环中,每个线程开始执行之后,都会存放到 L 这个列表里,用来支持第二个 for 循环中的 join 调用。

join 本质是用于堵塞当前主线程的类,其作用是阻止全部的线程执行完之前程序继续往下运行,直到被调用的线程全部执行完毕或者超时。

举个例子,do_something 这个函数需要执行一段时间,那么你多线程执行这个函数之后,如果没有 join,那么第一个 for 循环执行完之后就会接着往后执行,不会等待 do_something 这个函数执行完,也就是会直接执行执行 print('end once') 这行代码,代码示例如下:

import threading, time

# 限制线程的最大数量
threadmax = threading.BoundedSemaphore(16)
# 多线程 join 用
l = []


def do_something(i):
    '''
    # 执行任务
    :param i:
    :return:
    '''
    print(f"当前第 {i} 个任务正在执行")
    time.sleep(2)
    print(f"当前第 {i} 个任务执行完成")
    # 释放信号量,可用信号量加一
    threadmax.release()


if __name__ == '__main__':
    for i in range(10):
        # 增加信号量,可用信号量减一
        threadmax.acquire()
        t = threading.Thread(target=do_something, args=(i,))
        t.start()
    print('end once')


# 控制台输出
当前第 0 个任务正在执行
当前第 1 个任务正在执行
当前第 2 个任务正在执行
当前第 3 个任务正在执行
当前第 4 个任务正在执行
当前第 5 个任务正在执行
当前第 6 个任务正在执行
当前第 7 个任务正在执行
当前第 8 个任务正在执行
当前第 9 个任务正在执行
end once

当前第 9 个任务执行完成
当前第 6 个任务执行完成当前第 7 个任务执行完成
当前第 5 个任务执行完成
当前第 4 个任务执行完成当前第 3 个任务执行完成当前第 2 个任务执行完成
当前第 8 个任务执行完成



当前第 0 个任务执行完成
当前第 1 个任务执行完成

Process finished with exit code 0

可以看到,print('end once') 这行代码并不是在最后执行的,如果想让 print('end once') 等到所有线程都运行完成后再执行,那就需要 join, 代码示例如下:

import threading, time

# 限制线程的最大数量
threadmax = threading.BoundedSemaphore(16)
# 多线程 join 用
l = []


def do_something(i):
    '''
    # 执行任务
    :param i:
    :return:
    '''
    print(f"当前第 {i} 个任务正在执行")
    time.sleep(2)
    print(f"当前第 {i} 个任务执行完成")
    # 释放信号量,可用信号量加一
    threadmax.release()


if __name__ == '__main__':
    for i in range(10):
        # 增加信号量,可用信号量减一
        threadmax.acquire()
        t = threading.Thread(target=do_something, args=(i,))
        t.start()
        l.append(t)
    for t in l:
        t.join()
    print('end once')

# 控制台输出
当前第 0 个任务正在执行
当前第 1 个任务正在执行
当前第 2 个任务正在执行
当前第 3 个任务正在执行
当前第 4 个任务正在执行
当前第 5 个任务正在执行
当前第 6 个任务正在执行
当前第 7 个任务正在执行
当前第 8 个任务正在执行
当前第 9 个任务正在执行
当前第 7 个任务执行完成当前第 9 个任务执行完成当前第 2 个任务执行完成当前第 1 个任务执行完成
当前第 4 个任务执行完成当前第 6 个任务执行完成
当前第 3 个任务执行完成

当前第 5 个任务执行完成


当前第 0 个任务执行完成

当前第 8 个任务执行完成
end once

Process finished with exit code 0

这样就达到了我们的目的,这种写法一般情况下没有问题,但是如果在外边又套了一层 while,那就有问题了,因为 L 这个列表一直是在增加的,没有释放,所以导致内存一直增加。

找到问题根本就好说了,给他加一个释放

import threading, time

# 限制线程的最大数量
threadmax = threading.BoundedSemaphore(16)
# 多线程 join 用
l = []


def do_something(i):
    '''
    # 执行任务
    :param i:
    :return:
    '''
    print(f"当前第 {i} 个任务正在执行")
    time.sleep(5)
    # 释放信号量,可用信号量加一
    threadmax.release()


if __name__ == '__main__':
    while 1:
        for i in range(1000):
            # 增加信号量,可用信号量减一
            threadmax.acquire()
            t = threading.Thread(target=do_something, args=(i,))
            t.start()
            l.append(t)
        for t in l:
            t.join()
            l.remove(t)
        print('end once')

加上 l.remove(t) 这一行,经测试内存稳定,如果不需要等待的话,也可以直接不 join。


猜你喜欢

已有 3 条评论

    • <?php echo $comments->author; ?>

      大大大大泡泡糖  评论于 [2022-09-23 00:12:05]  回复

      卧槽这么详细,多线程导致内存增长这个很头疼,终于看到好文了,感谢感谢

有话要说

tips:首次评论须经过审核才会显示,请不要重复提交
本页二维码

扫码手机打开

浏览TOP5
热门标签
点赞TOP5
最新评论
别人在看