Skip to content
返回

Python I/O 编程与多任务

AI 总结

本文详细介绍了Python的IO编程与多任务处理。IO编程包括同步/异步IO、文件读写、内存读写等,强调了上下文管理器的使用、文件定位与缓冲、异步IO的适用场景等。多任务部分讲解了进程、线程的概念及实现方式,包括多进程、多线程、进程池、线程锁等,还讨论了GIL锁对多线程的影响以及ThreadLocal的使用。此外,还涉及了os模块的使用、序列化方法(如pickle和json)以及数据存取方式(如CSV)。

The world is parallel. The world is distributed. —— Joe Armstrong

目录

I/O 编程

文件读写

使用 with 上下文管理器,自动关闭文件,避免资源泄漏。常见 mode

定位与缓冲

# 写文本文件(覆盖写)
with open("notes.txt", mode="w", encoding="utf-8") as f:
    f.write("第一行\n")
    print("第二行", file=f)   # print 也能写入文件

# 读文本文件
with open("notes.txt", mode="r", encoding="utf-8") as f:
    content = f.read()        # 读全部(大文件慎用)
    f.seek(0)
    lines = f.readlines()     # 读成行列表(含换行符)

# 逐行读取,省内存
with open("big.txt", "r", encoding="utf-8") as f:
    for line in f:            # 迭代器方式,自动分块
        process(line)
        
# 二进制复制文件
with open("in.jpg", "rb") as src, open("out.jpg", "wb") as dst:
    while chunk := src.read(8192):
        dst.write(chunk)
        
with open("data.bin", "rb") as f:
    f.seek(-4, 2)            # 定位到文件末尾前4字节
    print(f.read(4))

内存读写

io.StringIO / io.BytesIO 适合临时缓冲、测试用例、不落磁盘。

StringIO

BytesIO

from io import StringIO, BytesIO

buf = StringIO()

f.write('Hello')   # 写入5个字符
f.write(' ')       # 写入1个空格
f.write('Python!') # 写入7个字符

buf.seek(0)
print(buf.read())

# 逐行读取
# 用一段多行字符串初始化 StringIO
data = "第一行\n第二行\n第三行"
f = StringIO(data)

# 使用 while 循环逐行读取,直到读到空字符串为止
while True:
    line = f.readline()      # 读取一行内容
    if line == '':           # 如果读到空字符串,说明读完了
        break
    print(line.strip())      # strip() 去掉每行末尾的换行符

b = BytesIO()
b.write('你好,世界'.encode('utf-8'))  # encode('utf-8') 把字符串转为字节

b.write(b"\x00\x01\x02")
print(len(b.getvalue()))

异步 I/O

适合大量慢 I/O(网络/磁盘)而非重 CPU 的任务。文件 I/O 可用第三方 aiofiles,网络用 asyncio 自带。

# 异步网络示例:并发抓取多个网页的前 N 字节
import asyncio
import aiohttp

async def head(url):
    async with aiohttp.ClientSession() as s:
        async with s.get(url) as r:
            return await r.text()

async def main():
    urls = ["https://example.com", "https://httpbin.org/get"]
    texts = await asyncio.gather(*(head(u) for u in urls))
    for t in texts:
        print(len(t))

asyncio.run(main())

os 模块

os 是标准库里和操作系统交互的“瑞士军刀”,比如获取系统信息、操作文件和目录等。

系统与平台信息

import os  # 导入os模块

# os.name 属性可以告诉我们当前操作系统的类型
if os.name == 'nt':
    print("当前系统是 Windows")
elif os.name == 'posix':
    print("当前系统是 Linux/Unix 或 macOS")
else:
    print("未知操作系统")

环境变量

路径处理(os.path)

对应 pathlib(更现代推荐)
Path.cwd() / Path.resolve()Path(...).exists()p.is_dir()/is_file()
p.parent, p.name, p.stem, p.suffixp.with_suffix(".txt")p = Path(a) / b / c

目录与文件操作

更健壮替代:
os.makedirs(path, exist_ok=True) 递归建目录;
os.replace(src, dst) 原子替换更安全;
shutil.rmtree(dir) 删除非空目录(谨慎!)。

注意事项

序列化

把任意 Python 对象 ⇄ 二进制字节串(序列化/反序列化)。二进制的字节串可用来传输或存取,一般用 .pkl 文件存放被序列化后的二进制数据。

pickle

import pickle  # 导入pickle模块

# 创建一个字典对象
person = {'name': 'Alice', 'age': 25, 'score': 90}

# 使用pickle.dumps()将对象序列化为bytes
data_bytes = pickle.dumps(person)
print(data_bytes)  # 输出一串二进制内容,不是人能直接读懂的

data = {"user": "Alice", "scores": [95, 88, 76]}

# 保存到文件
with open("data.pkl", "wb") as f:                # 二进制写
    pickle.dump(data, f, protocol=pickle.HIGHEST_PROTOCOL)

# 从文件读取
with open("data.pkl", "rb") as f:                # 二进制读
    obj = pickle.load(f)

# 直接得到/还原字节串(不落盘)
b = pickle.dumps(data, protocol=pickle.HIGHEST_PROTOCOL)
obj2 = pickle.loads(b)

常见数据存取

希望数据能被其他编程语言(如 JavaScript、Java、C# 等)读取,推荐使用 json 模块。JSON 是一种通用的数据交换格式,几乎所有编程语言都支持。

# JSON(文本、可读性好)
import json
data = {"user": "Alice", "score": 95}
with open("data.json", "w", encoding="utf-8") as f:
	# `json.dumps()` 返回的是字符串,可以直接保存或传输。
    json.dump(data, f, ensure_ascii=False, indent=2)

with open("data.json", "r", encoding="utf-8") as f:
    data2 = json.load(f)

# CSV
import csv
rows = [{"name":"A","age":20},{"name":"B","age":30}]
with open("people.csv", "w", newline="", encoding="utf-8") as f:
    writer = csv.DictWriter(f, fieldnames=["name","age"])
    writer.writeheader()
    writer.writerows(rows)

JSON与Python类型的对应关系

JSON类型Python类型
对象 {}字典 dict
数组 []列表 list
字符串str
数字int/float
true/falseTrue/False
nullNone

序列化自定义对象

有时候我们会用自定义的类,比如 Student。如果直接用 json.dumps(),会报错,因为 JSON 不知道怎么处理自定义对象。我们可以通过传递 default 参数,告诉 JSON 如何把对象变成字典。

# 定义一个学生类
class Student:
    # 初始化方法,接收姓名、年龄和分数参数
    def __init__(self, name, age, score):
        self.name = name
        self.age = age
        self.score = score

stu = Student('Lily', 19, 88)

# 定义一个转换函数,把Student对象变成字典
def student_to_dict(obj):
    # 返回包含学生信息的字典
    return {
        'name': obj.name,
        'age': obj.age,
        'score': obj.score
    }

# 序列化时传入default参数
# 使用json.dumps将学生对象序列化为JSON字符串,指定转换函数
json_str = json.dumps(stu, default=student_to_dict)
print(json_str)  # 输出:{"name": "Lily", "age": 19, "score": 88}

默认情况下,json.dumps() 会把中文转换成 Unicode 编码。如果想让中文直接显示,可以加上 ensure_ascii=False

# ensure_ascii=False 可以让中文直接显示
json_str = json.dumps(data, ensure_ascii=False)
print(json_str)  # 输出:{"name": "小明", "age": 20}
-- 2025 年 9 月 30 日止 --

多任务

一边听音乐一边上网。这种“同时做多件事”的能力就叫多任务,多任务的实现依赖于进程线程这两个概念。单核CPU,操作系统会让每个任务轮流执行一点点时间,然后快速切换到下一个任务,因为切换速度很快,我们感觉所有任务都在同时进行。如果是多核CPU,操作系统可以让多个任务真正地在不同的核心上同时运行。

在Python中,实现多任务有三种常见方式:

  1. 多进程:开启多个进程,每个进程独立运行。
  2. 多线程:在一个进程内开启多个线程。
  3. 多进程+多线程:每个进程里再开多个线程(实际开发中较少用)。

进程

os.fork 仅限Unix/Linux

import os

print("当前进程ID:", os.getpid())
pid = os.fork()  # 创建子进程
if pid == 0:
    # 子进程返回0
    print("我是子进程,ID:", os.getpid(), "父进程ID:", os.getppid())
else:
    # 父进程返回子进程ID
    print("我创建了一个子进程,子进程ID:", pid)

multiprocessing 模块

from multiprocessing import Process
import os

# 子进程要执行的函数
def run_task(name):
    print('子进程运行中,名字:%s,进程ID:%s' % (name, os.getpid()))

if __name__ == '__main__':
    print('父进程ID:', os.getpid())
    p = Process(target=run_task, args=('任务1',))
    print('子进程即将启动')
    p.start()  # 启动子进程
    p.join()   # 等待子进程结束
    # 调用 `p.join()` 时,父进程会在这一行“阻塞”,等待子进程运行结束后,才会继续往下执行。
    print('子进程已结束')

Pool 进程池

# 导入进程池模块
from multiprocessing import Pool
import os, time, random

# 定义工作函数,接收任务ID作为参数
def worker(task_id):
    # 打印当前执行的任务ID和进程ID
    print('执行任务 %s (进程ID: %s)' % (task_id, os.getpid()))
    # 随机休眠0-2秒,模拟任务执行时间
    time.sleep(random.random() * 2)
    # 打印任务完成信息
    print('任务 %s 完成' % task_id)

# 主程序入口
if __name__ == '__main__':
    # 打印父进程的进程ID
    print('父进程ID:', os.getpid())
    # 创建进程池,最多同时运行3个进程
    # 进程池可以限制同时运行的进程数量
    pool = Pool(3)  # 最多同时运行3个进程
    # 循环创建5个任务
    for i in range(5):
        # 异步提交任务到进程池
        pool.apply_async(worker, args=(i,))
    # 关闭进程池,不再接受新任务
    pool.close()  # 关闭进程池,不再接受新任务
    # 等待所有子进程结束
    pool.join()   # 等待所有子进程结束
    # 打印所有任务完成信息
    print('所有任务完成')

subprocess 模块

subprocess模块,用于执行外部命令

import subprocess

# 打印提示信息,告知用户即将运行nslookup命令
print('运行nslookup命令')
# 使用subprocess.call()方法执行nslookup命令,查询www.python.org的DNS信息
# call()方法会等待命令执行完成并返回退出状态码
result = subprocess.call(['nslookup', 'www.python.org'])
# 打印命令的返回码,通常0表示成功,非0表示出错
print('命令返回码:', result)

Queue 队列

多进程之间可以通过队列(Queue)来交换数据,父进程创建 Queue,子进程通过 Queue 读写数据。

# 导入多进程模块中的Process类和Queue队列类
from multiprocessing import Process, Queue
# 导入操作系统接口模块和时间模块
import os, time

# 定义写入数据的函数,参数q是队列对象
def write_data(q):
    # 遍历列表中的每个值
    for value in ['A', 'B', 'C']:
        # 打印正在写入的值
        print('写入:', value)
        # 将值放入队列中
        q.put(value)
        # 暂停1秒
        time.sleep(1)

# 定义读取数据的函数,参数q是队列对象
def read_data(q):
    # 无限循环读取数据
    while True:
        # 从队列中获取数据,True表示阻塞等待
        value = q.get(True)
        # 打印读取到的值
        print('读取:', value)

# 主程序入口
if __name__ == '__main__':
    # 创建一个进程间通信的队列
    q = Queue()
    # 创建写入进程,目标函数是write_data,参数是队列q
    pw = Process(target=write_data, args=(q,))
    # 创建读取进程,目标函数是read_data,参数是队列q
    pr = Process(target=read_data, args=(q,))
    # 启动写入进程
    pw.start()
    # 启动读取进程
    pr.start()
    # 等待写入进程结束
    pw.join()
    # 强制终止读取进程(因为读进程是死循环,需要强制结束)
    pr.terminate()  # 读进程是死循环,强制结束

线程

Python 标准库自带了 threading 模块,可以很方便地实现多线程。我们只需要把要执行的代码写成一个函数,然后用 Thread 类来启动一个新线程。

threading 线程模块

# 导入线程模块,用于创建和管理线程
import threading
# 导入时间模块,用于线程休眠
import time

# 定义工作函数,作为线程的执行目标
def worker():
    # 打印当前线程开始执行的信息
    # current_thread().name 可以获取当前线程的名字,主线程默认叫 MainThread
    print('线程 %s 开始' % threading.current_thread().name)
    # 让当前线程休眠2秒,模拟耗时操作
    time.sleep(2)
    # 打印当前线程结束执行的信息
    print('线程 %s 结束' % threading.current_thread().name)

# 主程序入口
if __name__ == '__main__':
    # 创建第一个线程对象,指定目标函数为worker,线程名为Worker-1
    t1 = threading.Thread(target=worker, name='Worker-1')
    # 创建第二个线程对象,指定目标函数为worker,线程名为Worker-2
    t2 = threading.Thread(target=worker, name='Worker-2')
    # 启动第一个线程
    t1.start()
    # 启动第二个线程
    t2.start()
    # 等待第一个线程执行完毕
    t1.join()
    # 等待第二个线程执行完毕
    t2.join()
    # 所有线程都结束后,打印完成信息
    print('所有线程结束')

数据共享

多线程的最大特点是所有线程共享进程内的所有数据。这意味着多个线程可以同时修改同一个变量,这就可能导致数据混乱。

# 导入线程模块
import threading

# 定义共享变量,初始值为0
counter = 0

# 定义加法函数
def add():
    # 声明使用全局变量 counter
    global counter
    # 循环100000次,每次将counter加1
    for _ in range(100000):
        counter += 1

# 定义减法函数
def subtract():
    # 声明使用全局变量counter
    global counter
    # 循环100000次,每次将counter减1
    for _ in range(100000):
        counter -= 1

t1 = threading.Thread(target=add)
t2 = threading.Thread(target=subtract)

t1.start()
t2.start()
t1.join()
t2.join()

# 一个典型的交错是这样的(简化):
# 1. 线程 A 读到 counter = 5
# 2. 线程 B 也读到 counter = 5
# 3. A 计算并写回 6
# 4. B 计算并写回 4(把 A 的更新覆盖掉)
# 结果少加了一次,久而久之总和就偏离 0。
# 最终 counter 可能不为 0
print('最终counter的值:', counter)

Lock 锁

使用 with lock: 加锁后,同一时刻只有一个线程能修改共享数据,其他线程必须等待锁被释放。

import threading

counter = 0
lock = threading.Lock()

def add():
    global counter
    for _ in range(100000):
        with lock:            # 临界区
            counter += 1

def subtract():
    global counter
    for _ in range(100000):
        with lock:            # 临界区
            counter -= 1

t1 = threading.Thread(target=add)
t2 = threading.Thread(target=subtract)
t1.start(); t2.start()
t1.join(); t2.join()
print('最终counter的值:', counter)  # 现在应稳定为 0

with lock:

# 等价于

lock.acquire()  # 获取锁
try:
    ...  # 临界区
finally:
    lock.release()  # 释放锁

GIL 全局解释器锁

虽然 Python 支持多线程,但有一个“全局解释器锁”(GIL),导致同一时刻只有一个线程能执行 Python 字节码。

这意味着即使你有多核 CPU,Python 的多线程也无法真正利用多核并行,只能在 IO 密集型任务(如网络、磁盘读写)中提升效率。

多线程死循环只能用到一个 CPU 核心,CPU 密集型工作应使用多进程。

# 导入线程和多进程模块
import threading, multiprocessing

# 定义一个忙等待函数,用于占用CPU资源
def busy_loop():
    # 创建无限循环
    while True:
        pass  # 死循环,占用CPU

# 启动和CPU核心数一样多的线程
# 遍历CPU核心数量
for i in range(multiprocessing.cpu_count()):
    # 创建新线程,目标函数为busy_loop
    t = threading.Thread(target=busy_loop)
    # 启动线程
    # 会发现CPU占用率不会达到100%×核心数,说明多线程无法充分利用多核。
    t.start()

ThreadLocal

在多线程编程中,每个线程通常都需要保存属于自己的数据。

如果用局部变量,虽然安全,但每次函数调用都要一层层传递参数,非常麻烦。

如果用全局变量,又会导致不同线程之间数据混乱,必须加锁,代码复杂且容易出错。

ThreadLocal 既让每个线程拥有自己的“全局变量”,又不用手动传递参数,也不用加锁,可以看成线程专属的全局变量容器

ThreadLocal 的本质是为每个线程维护一个独立的“字典”,每个线程对 ThreadLocal 的属性操作只影响自己。

这样,所有函数都可以直接通过 ThreadLocal 访问线程专属的数据,无需层层传递参数,也不用担心数据被其他线程篡改。

# 导入线程模块
import threading

# 创建一个全局的ThreadLocal对象
local_data = threading.local()

# 定义处理数据的函数
def process_data():
    # 直接访问local_data的属性,相当于访问当前线程自己的数据
    value = local_data.value
    # 打印当前线程名称和读取到的值
    print('线程 %s 读取到 value = %s' % (threading.current_thread().name, value))

# 定义线程任务函数
def thread_task(val):
    # 给当前线程绑定一个value属性
    local_data.value = val
    # 调用处理数据函数
    process_data()

# 创建两个线程,分别绑定不同的数据
# 创建第一个线程,目标函数为thread_task,参数为'苹果',线程名为'线程A'
t1 = threading.Thread(target=thread_task, args=('苹果',), name='线程A')
# 创建第二个线程,目标函数为thread_task,参数为'香蕉',线程名为'线程B'
t2 = threading.Thread(target=thread_task, args=('香蕉',), name='线程B')

t1.start()
t2.start()
t1.join()
t2.join()


# 创建ThreadLocal对象
user_local = threading.local()

def show_user():
    # 直接读取当前线程的user属性
    print('当前线程:%s,用户:%s' % (threading.current_thread().name, user_local.user))

def login(user_name):
    # 给当前线程绑定user属性
    user_local.user = user_name
    show_user()

# 启动两个线程,分别绑定不同的用户
t1 = threading.Thread(target=login, args=('Alice',), name='登录线程1')
t2 = threading.Thread(target=login, args=('Bob',), name='登录线程2')
t1.start()
t2.start()
t1.join()
t2.join()
-- 2025 年 10 月 1 日止 --

配套代码:vsme/learn-python



上一篇
Python 错误处理与内置模块
下一篇
Python 面向对象