Python多线程编程

这是Python的多线程编程总结

相关的包

1
2
import concurrent.futures
import threading

使用方法

创建一个线程锁(当需要内部修改某个参数的时候需要)

1
lock = threading.Lock()

创建一个线程池,需要传入最大线程数max_workers,这个就会表示同时执行多少个线程

1
with concurrent.futures.ThreadPoolExecutor(max_workers=config.max_workers) as executor:

将作业提交到线程池中:可以新建一个Future 对象数组,实现异步操作的占位符,代表一个还未完成的计算。只有当futures数组里面的所有Future对象全部执行完成之后,才会下一步

1
2
3
4
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(cpu_intensive_task, task_id, data_sizes[task_id], shared_results, lock)
for task_id in task_ids]
concurrent.futures.wait(futures)

如果futures有返回值,就像如下的这个函数,那么也可以在结束之后来接收参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def cpu_intensive_task(task_id, data_size, shared_result, lock):
"""任务函数:执行大量的数学计算"""
# 执行一些计算密集型操作
total = 0
for i in range(data_size):
# 进行一些复杂的数学运算
total += math.sqrt(i) * math.sin(i) * math.cos(i)

# 模拟一些计算工作
result = total % 1000

# 使用锁来保护共享资源
if shared_result is not None and lock is not None:
with lock:
shared_result.append(f"Task {task_id}: {result}")

return f"Task {task_id}: {result}"

接收参数的方式如下,直接从外部接收结果:

1
2
3
4
5
6
7
8
# 使用线程池处理任务
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(cpu_intensive_task, task_id, data_sizes[task_id], None, None)
for task_id in task_ids]
concurrent.futures.wait(futures)

# 外部接收所有返回值
task_results = [future.result() for future in futures]

时间对比

对比三种方法:单线程、多线程(内部修改result)外部接收result,对比时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import threading
import concurrent.futures
import time
import math

# 模拟耗时计算任务(更真实的CPU密集型任务)
def cpu_intensive_task(task_id, data_size, shared_result, lock):
"""任务函数:执行大量的数学计算"""
# 执行一些计算密集型操作
total = 0
for i in range(data_size):
# 进行一些复杂的数学运算
total += math.sqrt(i) * math.sin(i) * math.cos(i)

# 模拟一些计算工作
result = total % 1000

# 使用锁来保护共享资源
if shared_result is not None and lock is not None:
with lock:
shared_result.append(f"Task {task_id}: {result}")

return f"Task {task_id}: {result}"

# 单线程模式
def single_thread_mode():
print("=== 单线程模式 ===")

# 准备数据 - 增加计算量
data_sizes = [100000, 150000, 120000, 80000] # 每个任务更大的计算量
task_ids = [0, 1, 2, 3]

start_time = time.perf_counter()

# 串行执行任务
results = []
for i, task_id in enumerate(task_ids):
result = cpu_intensive_task(task_id, data_sizes[i], None, None)
results.append(result)

end_time = time.perf_counter()
print("单线程执行结果:")
for result in results:
print(f" {result}")
elapsed_time = end_time - start_time
print(f"单线程执行时间: {elapsed_time:.6f} 秒")
return elapsed_time

# 模式1:内部修改result(通过共享列表)
def mode1_shared_result():
print("\n=== 模式1:内部修改共享result ===")

# 准备数据
data_sizes = [100000, 150000, 120000, 80000] # 每个任务更大的计算量
task_ids = [0, 1, 2, 3]
shared_results = [] # 共享的结果列表
lock = threading.Lock() # 线程锁

start_time = time.perf_counter()

# 使用线程池处理任务
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(cpu_intensive_task, task_id, data_sizes[task_id], shared_results, lock)
for task_id in task_ids]
concurrent.futures.wait(futures)

end_time = time.perf_counter()

# 打印结果
print("共享结果列表中的结果:")
for result in shared_results:
print(f" {result}")
elapsed_time = end_time - start_time
print(f"模式1执行时间: {elapsed_time:.6f} 秒")
return elapsed_time

# 模式2:外部接收返回值
def mode2_return_values():
print("\n=== 模式2:外部接收返回值 ===")

# 准备数据
data_sizes = [100000, 150000, 120000, 80000] # 每个任务更大的计算量
task_ids = [0, 1, 2, 3]

start_time = time.perf_counter()

# 使用线程池处理任务
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(cpu_intensive_task, task_id, data_sizes[task_id], None, None)
for task_id in task_ids]
concurrent.futures.wait(futures)

# 外部接收所有返回值
task_results = [future.result() for future in futures]

end_time = time.perf_counter()

print("从future.result()获取的返回值:")
for result in task_results:
print(f" {result}")
elapsed_time = end_time - start_time
print(f"模式2执行时间: {elapsed_time:.6f} 秒")
return elapsed_time

# 运行三种模式并对比时间
if __name__ == "__main__":
# 首先运行单线程模式
single_time = single_thread_mode()

# 然后运行多线程模式
mode1_time = mode1_shared_result()
mode2_time = mode2_return_values()

# 时间对比
print(f"\n=== 时间对比 ===")
print(f"单线程时间: {single_time:.6f} 秒")
print(f"模式1时间: {mode1_time:.6f} 秒")
print(f"模式2时间: {mode2_time:.6f} 秒")

if single_time > 0:
print(f"模式1比单线程快: {single_time/mode1_time:.2f} 倍")
print(f"模式2比单线程快: {single_time/mode2_time:.2f} 倍")
else:
print("时间测量异常")