本站承诺永不接任何虚假欺骗、联盟广告、弹窗广告、病毒广告、诱导充值等影响用户体验的广告,广告屏蔽插件会影响本站部分功能,还请不要屏蔽本站广告,感谢支持!

当前位置:首页 / 正文

2022-04-26 | 编程技术 | 14286 次阅读 | 3 条评论 | 3 次点赞 | 繁体

threading 是 python 中一个内置的多线程库,关于模块这里不做过多介绍,本文主要记录一下遇到的问题及解决方法。

一直以来都是用如下方法执行多线程任务,正常情况下是没问题。

import threading, time

# 限制线程的最大数量
threadmax = threading.BoundedSemaphore(16)
# 将锁内的代码串行化
lock = threading.Lock()
# 多线程 join 用
l = []


def do_something(i):
    '''
    # 上锁的例子
    :param i:
    :return:
    '''
    with lock:  # 上锁
        print(f"当前第 {i} 个任务正在执行")
    time.sleep(5)
    # 释放信号量,可用信号量加一
    threadmax.release()


def do_something_unlock(i):
    '''
    # 不上锁的例子
    :param i:
    :return:
    '''
    print(f"当前第 {i} 个任务正在执行")
    time.sleep(5)
    # 释放信号量,可用信号量加一
    threadmax.release()


if __name__ == '__main__':
    for i in range(1000):
        # 增加信号量,可用信号量减一
        threadmax.acquire()
        t = threading.Thread(target=do_something, args=(i,))
        t.start()
        l.append(t)
    for t in l:
        t.join()

    print('END')

但是工作中遇到一个需求,程序需要 24 小时运行,并且一直循环遍历某个列表并发去执行任务,所以就简单的在上边例子的基础上加了个 while 循环,如下:

import threading, time

# 限制线程的最大数量
threadmax = threading.BoundedSemaphore(16)
# 多线程 join 用
l = []


def do_something(i):
    '''
    # 执行任务
    :param i:
    :return:
    '''
    print(f"当前第 {i} 个任务正在执行")
    time.sleep(5)
    # 释放信号量,可用信号量加一
    threadmax.release()


if __name__ == '__main__':
    while 1:
        for i in range(1000):
            # 增加信号量,可用信号量减一
            threadmax.acquire()
            t = threading.Thread(target=do_something, args=(i,))
            t.start()
            l.append(t)
        for t in l:
            t.join()
        print('end once')

运行几天之后发现问题,内存一直在增加,逐行测试之后,发现问题出在 L 这个变量上。

L 这个变量主要是用来存放需要 join 的任务的,在第一个 for 循环中,每个线程开始执行之后,都会存放到 L 这个列表里,用来支持第二个 for 循环中的 join 调用。

join 本质是用于堵塞当前主线程的类,其作用是阻止全部的线程执行完之前程序继续往下运行,直到被调用的线程全部执行完毕或者超时。

举个例子,do_something 这个函数需要执行一段时间,那么你多线程执行这个函数之后,如果没有 join,那么第一个 for 循环执行完之后就会接着往后执行,不会等待 do_something 这个函数执行完,也就是会直接执行执行 print('end once') 这行代码,代码示例如下:

import threading, time

# 限制线程的最大数量
threadmax = threading.BoundedSemaphore(16)
# 多线程 join 用
l = []


def do_something(i):
    '''
    # 执行任务
    :param i:
    :return:
    '''
    print(f"当前第 {i} 个任务正在执行")
    time.sleep(2)
    print(f"当前第 {i} 个任务执行完成")
    # 释放信号量,可用信号量加一
    threadmax.release()


if __name__ == '__main__':
    for i in range(10):
        # 增加信号量,可用信号量减一
        threadmax.acquire()
        t = threading.Thread(target=do_something, args=(i,))
        t.start()
    print('end once')


# 控制台输出
当前第 0 个任务正在执行
当前第 1 个任务正在执行
当前第 2 个任务正在执行
当前第 3 个任务正在执行
当前第 4 个任务正在执行
当前第 5 个任务正在执行
当前第 6 个任务正在执行
当前第 7 个任务正在执行
当前第 8 个任务正在执行
当前第 9 个任务正在执行
end once

当前第 9 个任务执行完成
当前第 6 个任务执行完成当前第 7 个任务执行完成
当前第 5 个任务执行完成
当前第 4 个任务执行完成当前第 3 个任务执行完成当前第 2 个任务执行完成
当前第 8 个任务执行完成



当前第 0 个任务执行完成
当前第 1 个任务执行完成

Process finished with exit code 0

可以看到,print('end once') 这行代码并不是在最后执行的,如果想让 print('end once') 等到所有线程都运行完成后再执行,那就需要 join, 代码示例如下:

import threading, time

# 限制线程的最大数量
threadmax = threading.BoundedSemaphore(16)
# 多线程 join 用
l = []


def do_something(i):
    '''
    # 执行任务
    :param i:
    :return:
    '''
    print(f"当前第 {i} 个任务正在执行")
    time.sleep(2)
    print(f"当前第 {i} 个任务执行完成")
    # 释放信号量,可用信号量加一
    threadmax.release()


if __name__ == '__main__':
    for i in range(10):
        # 增加信号量,可用信号量减一
        threadmax.acquire()
        t = threading.Thread(target=do_something, args=(i,))
        t.start()
        l.append(t)
    for t in l:
        t.join()
    print('end once')

# 控制台输出
当前第 0 个任务正在执行
当前第 1 个任务正在执行
当前第 2 个任务正在执行
当前第 3 个任务正在执行
当前第 4 个任务正在执行
当前第 5 个任务正在执行
当前第 6 个任务正在执行
当前第 7 个任务正在执行
当前第 8 个任务正在执行
当前第 9 个任务正在执行
当前第 7 个任务执行完成当前第 9 个任务执行完成当前第 2 个任务执行完成当前第 1 个任务执行完成
当前第 4 个任务执行完成当前第 6 个任务执行完成
当前第 3 个任务执行完成

当前第 5 个任务执行完成


当前第 0 个任务执行完成

当前第 8 个任务执行完成
end once

Process finished with exit code 0

这样就达到了我们的目的,这种写法一般情况下没有问题,但是如果在外边又套了一层 while,那就有问题了,因为 L 这个列表一直是在增加的,没有释放,所以导致内存一直增加。

找到问题根本就好说了,给他加一个释放

import threading, time

# 限制线程的最大数量
threadmax = threading.BoundedSemaphore(16)
# 多线程 join 用
l = []


def do_something(i):
    '''
    # 执行任务
    :param i:
    :return:
    '''
    print(f"当前第 {i} 个任务正在执行")
    time.sleep(5)
    # 释放信号量,可用信号量加一
    threadmax.release()


if __name__ == '__main__':
    while 1:
        for i in range(1000):
            # 增加信号量,可用信号量减一
            threadmax.acquire()
            t = threading.Thread(target=do_something, args=(i,))
            t.start()
            l.append(t)
        for t in l:
            t.join()
            l.remove(t)
        print('end once')

加上 l.remove(t) 这一行,经测试内存稳定,如果不需要等待的话,也可以直接不 join。

标签: python异常异步教程

猜你喜欢
使用 Vercel + Supabase 零成本部署 Umami
早些年用过友盟、51.la、百度统计、Google Analytics,各有各的优缺点,百度统计目前只允许备案网站使用,友盟和 51.la 体验效果不喜欢,Google Analytics 访问...
[Go]包依赖管理工具go mod使用详解
go module 是 Go 语言从 1.11 版本之后官方推出的版本管理工具,并且从 Go 1.13 版本开始,go module 成为了 Go 语言默认的依赖管理工具。Modules 官方定...
薅京东羊毛必备抓取Cookies教程
本文只介绍如何利用安卓手机浏览器获取京东 cookie 教程,具体为什么要获取 cookie 以及如何薅羊毛请查看:闲置服务器薅京东的羊毛—青龙面板部署与京东签到第一步,下载工具去各大应用商店搜...
Lazysizes.js图片懒加载的使用
lazysizes 是一种快速(无垃圾),对 SEO 友好且可自动初始化的 lazyloader,用于图像(包括响应图像 picture/ srcset),iframe,脚本/小部件等。它还通过...
typecho使用文件缓存加快打开速度
typecho 是一个伪静态的博客系统,如果不使用缓存,每次打开页面都会查询数据库,访问人数多了以后服务器压力倍增。但是,typecho 是一个 php 的程序,我们可以利用 php 将实时页面...
javascript | 原生JS多语言切换简单实现
由于项目需要实现一个前端的多语言切换,不想因为一个简单的功能就引入 jQuery,所以经过 google 编程大法摸索出一个原生 JS 就可以实现的多语言前端切换的小例子,仅供参考。<bo...
ssh-chat- SSH命令行下聊天摸鱼服务
ssh-chat 是一个使用 Go 语言编写的定制 SSH 服务器,当你连到该服务器时就会进入聊天模式,就好像以前的终端 BBS 系统一样。 官方 Github: https://github.
白嫖移动,联通,电信手机短信通知
无论在生活中或者工作中,对于一些比较紧急的事情,可能需要发送个通知! 比如:自建的服务器突然宕机,如何自动发短信通知运维主管? 后台服务日志大量报错如何第一时间发短信通知码农geigei?

已有 3 条评论

    DanielHef 2022-10-06 21:31回复

    Essay competition _

    大大大大泡泡糖 2022-09-23 00:12回复

    卧槽这么详细,多线程导致内存增长这个很头疼,终于看到好文了,感谢感谢

      WangTwoThree管理2022-09-25 15:33回复

      很开心能帮助到大家

(首次提交评论需审核通过才会显示,请勿重复提交)