如何在Python中关闭线程?

14 浏览
0 Comments

如何在Python中关闭线程?

我有一些未完成的线程问题。

我认为队列命令.join()只是关闭队列,并不会关闭使用它的线程。

在我的脚本中,我需要检查280k个域名,并为每个域名获取其MX记录列表,如果有的话,获取服务器的IPv6地址。

我使用了线程,感谢它们,脚本运行速度提高了很多。但是有个问题,尽管队列有join()方法,但活动线程的数量仍在增长,直到出现错误,提示无法创建新线程(操作系统的限制?)。

当我从数据库中检索新域名时,如何在每个For循环结束后终止/关闭/停止/重置线程?

线程类定义...

class MX_getAAAA_thread(threading.Thread):
    def __init__(self,queue,id_domain):
        threading.Thread.__init__(self)
        self.queue = queue
        self.id_domain = id_domain
    def run(self):
        while True:
            self.mx = self.queue.get()
            res = dns.resolver.Resolver()
            res.lifetime = 1.5
            res.timeout = 0.5
            try:
                answers = res.query(self.mx,'AAAA')
                ip_mx = str(answers[0])
            except:
                ip_mx = "N/A"
            lock.acquire()
            sql = "INSERT INTO mx (id_domain,mx,ip_mx) VALUES (" + str(id_domain) + ",'" + str(self.mx) + "','" + str(ip_mx) + "')"
            try:
                cursor.execute(sql)
                db.commit()
            except:
                db.rollback()
            print "MX" , '>>' , ip_mx, ' :: ', str(self.mx)
            lock.release()
            self.queue.task_done()

使用的线程类...

(主要的For循环不在这里,这只是它的一部分)

try:
    answers = resolver.query(domain, 'MX')
    qMX = Queue.Queue()
    for i in range(len(answers)):
        t = MX_getAAAA_thread(qMX,id_domain)
        t.setDaemon(True)
        threads.append(t)
        t.start()
    for mx in answers:
        qMX.put(mx.exchange)
    qMX.join()
except NoAnswer as e:
    print "MX - Error: No Answer"
except Timeout as etime:
    print "MX - Error: dns.exception.Timeout"
print "end of script"

我尝试在队列完成后使用以下代码:

for thread in threads:
            thread.join()

但是thread.join()永远不会停止等待,尽管没有必要等待,因为当queue.join()执行时,线程没有要执行的任务。

0
0 Comments

问题的原因是线程没有从运行循环中退出,导致join方法一直阻塞。此外,没有设置超时的Queue.get方法也会一直阻塞。

解决方法是在循环中设置一个停止标志,当标志被设置时,跳出循环或从run方法返回。可以使用passive approach的方法,即在MX_getAAAA_thread线程中设置一个停止标志,并在循环的不同点检查其值。如果被设置了,就跳出循环或从run方法返回。

以下是代码示例:


import threading
import queue
stop_flag = threading.Event()  # 创建一个事件对象作为停止标志
class MyThread(threading.Thread):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue
    def run(self):
        while not stop_flag.is_set():
            try:
                item = self.queue.get(timeout=1)  # 设置超时时间,避免无限阻塞
                # 处理item的逻辑
            except queue.Empty:
                pass  # 队列为空时的处理逻辑
    def stop(self):
        stop_flag.set()  # 设置停止标志
# 创建队列和线程
my_queue = queue.Queue()
my_thread = MyThread(my_queue)
# 启动线程
my_thread.start()
# 执行一些操作...
# 停止线程
my_thread.stop()
# 等待线程结束
my_thread.join()

0
0 Comments

问题的原因是,在处理线程中的无限循环时,如何优雅地关闭线程。解决方法是在循环中添加一个可以从外部控制的条件,通过改变条件的值来终止线程。

在这种情况下,可以将循环条件设置为一个可以从外部控制的属性keepRunning,并在每次循环检查条件时判断该属性的值。当keepRunning为False时,线程将会优雅地终止。

另一种解决方法是重用线程。可以通过在队列中提供一个唯一标识符id_domain来让线程独立于每次迭代,从而实现线程的重用。当线程从队列中取出任务时,可以通过id_domain标识符来判断任务的唯一性,从而实现线程的独立和重用。

具体的代码实现如下:

import threading
import Queue
class MX_getAAAA_thread(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
    def run(self):
        self.keepRunning = True
        while self.keepRunning:
            id_domain, mx = self.queue.get()
            # do stuff
qMX = Queue.Queue()
threads = []
for i in range(MAX_THREAD_COUNT):
    t = MX_getAAAA_thread(qMX)
    t.daemon = True
    threads.append(t)
    t.start()
for id_domain in enumerateIdDomains():
    answers = resolver.query(id_domain, 'MX')
    for mx in answers:
        qMX.put((id_domain, mx.exchange)) # insert a tuple
qMX.join()
for thread in threads:
    thread.keepRunning = False

以上是问题的解决方法,通过添加一个可以从外部控制的条件或者重用线程来优雅地关闭线程。

0
0 Comments

如何在Python中关闭线程?

在这段代码中,我们不明白为什么需要一个队列。因为在你的设计中,每个线程只处理一个任务。你应该能够在创建线程时将任务传递给它。这样你就不需要队列,并且可以摆脱while循环。

首先,我们可以在线程的初始化方法中传递任务:

class MX_getAAAA_thread(threading.Thread):
    def __init__(self, id_domain, mx):
        threading.Thread.__init__(self)
        self.id_domain = id_domain
        self.mx = mx

然后我们可以摆脱run方法中的while循环:

def run(self):
    res = dns.resolver.Resolver()
    res.lifetime = 1.5
    res.timeout = 0.5
    try:
        answers = res.query(self.mx,'AAAA')
        ip_mx = str(answers[0])
    except:
        ip_mx = "N/A"
    with lock:
        sql = "INSERT INTO mx (id_domain,mx,ip_mx) VALUES (" + str(id_domain) + ",'" + str(self.mx) + "','" + str(ip_mx) + "')"
        try:
            cursor.execute(sql)
            db.commit()
        except:
            db.rollback()
        print "MX" , '>>' , ip_mx, ' :: ', str(self.mx)

为每个任务创建一个线程:

for mx in answers:
    t = MX_getAAAA_thread(qMX, id_domain, mx)
    t.setDaemon(True)
    threads.append(t)
    t.start()

然后将它们合并:

for thread in threads:
    thread.join()

不确定我是否喜欢这个解决方案更多。为什么要创建这么多线程?这仍然非常低效。但是这个答案解决了他永无止境的线程问题。当然,创建一个有限的线程池并通过队列发送任务会更高效。

“当我不知道需要多少个线程时,创建有限的线程池是必要的吗?”这是一个很好的想法,但是当我确定一个域名只需要不超过50个线程时,我认为最好根据需求进行创建。

为每个动作创建一个单独的线程通常不是一个好主意。如果这样做,线程的创建和销毁将带来额外的开销,并且会严重影响性能。此外,一个处理器不能同时高效地处理任意数量的线程,而上下文切换的代价也很高,所以最好将线程数量保持在最低限度。另外,由于全局解释器锁的存在,Python在多线程方面表现不佳,所以如果可能的话,最好避免使用它们。

尽管如此,由于你依赖网络I/O,所以使用线程来保持资源繁忙是有意义的。在这里,你应该改变对需求的理解;如果一个域名需要你进行50次查询,这并不意味着你需要50个线程。这只是意味着你有50个独立的任务需要执行,并且可以将它们分配给其他线程。选择正确的线程数量是困难的,但你肯定应该选择一个最大数量。

0