The world is parallel. The world is distributed. —— Joe Armstrong
目录
I/O 编程
- I/O(Input/Output):程序与外界交换数据的过程,常见对象有文件、网络、内存、终端等。
- 同步/阻塞:
read()/write()调用会等待操作完成再返回。 - 异步/非阻塞:发出请求后不等待完成,通常配合事件循环(如
asyncio)。
文件读写
使用 with 上下文管理器,自动关闭文件,避免资源泄漏。常见 mode:
"r"读;"w"写(清空);"a"追加;"+"读写;- 二进制加
b:如"rb","wb",常用于图片、视频、压缩包等; - 推荐总是显式写
encoding="utf-8"(文本文件)。
定位与缓冲
f.tell():当前位置(字节偏移)f.seek(offset, whence):移动位置,whence:0文件头、1当前位置、2末尾- 缓冲:I/O 性能关键。
open(..., buffering=8192)可自定义;使用较大块读写更快。
# 写文本文件(覆盖写)
with open("notes.txt", mode="w", encoding="utf-8") as f:
f.write("第一行\n")
print("第二行", file=f) # print 也能写入文件
# 读文本文件
with open("notes.txt", mode="r", encoding="utf-8") as f:
content = f.read() # 读全部(大文件慎用)
f.seek(0)
lines = f.readlines() # 读成行列表(含换行符)
# 逐行读取,省内存
with open("big.txt", "r", encoding="utf-8") as f:
for line in f: # 迭代器方式,自动分块
process(line)
# 二进制复制文件
with open("in.jpg", "rb") as src, open("out.jpg", "wb") as dst:
while chunk := src.read(8192):
dst.write(chunk)
with open("data.bin", "rb") as f:
f.seek(-4, 2) # 定位到文件末尾前4字节
print(f.read(4))内存读写
io.StringIO / io.BytesIO 适合临时缓冲、测试用例、不落磁盘。
StringIO
StringIO()创建了一个内存中的“文件”对象。write()方法可以像写文件一样写入字符串。getvalue()方法可以一次性获取所有写入的内容。
BytesIO
BytesIO()创建了一个内存中的二进制“文件”。- 写入前要用
encode()把字符串转成字节。 getvalue()得到的是 bytes 类型的数据。
from io import StringIO, BytesIO
buf = StringIO()
f.write('Hello') # 写入5个字符
f.write(' ') # 写入1个空格
f.write('Python!') # 写入7个字符
buf.seek(0)
print(buf.read())
# 逐行读取
# 用一段多行字符串初始化 StringIO
data = "第一行\n第二行\n第三行"
f = StringIO(data)
# 使用 while 循环逐行读取,直到读到空字符串为止
while True:
line = f.readline() # 读取一行内容
if line == '': # 如果读到空字符串,说明读完了
break
print(line.strip()) # strip() 去掉每行末尾的换行符
b = BytesIO()
b.write('你好,世界'.encode('utf-8')) # encode('utf-8') 把字符串转为字节
b.write(b"\x00\x01\x02")
print(len(b.getvalue()))异步 I/O
适合大量慢 I/O(网络/磁盘)而非重 CPU 的任务。文件 I/O 可用第三方 aiofiles,网络用 asyncio 自带。
# 异步网络示例:并发抓取多个网页的前 N 字节
import asyncio
import aiohttp
async def head(url):
async with aiohttp.ClientSession() as s:
async with s.get(url) as r:
return await r.text()
async def main():
urls = ["https://example.com", "https://httpbin.org/get"]
texts = await asyncio.gather(*(head(u) for u in urls))
for t in texts:
print(len(t))
asyncio.run(main())os 模块
os 是标准库里和操作系统交互的“瑞士军刀”,比如获取系统信息、操作文件和目录等。
系统与平台信息
os.name:返回平台标识字符串:"nt"(Windows)、"posix"(Linux/macOS)。os.uname():仅类 Unix 可用,返回系统信息(sysname/nodename/release/version/machine)。platform.system()(建议补充):返回更直观的平台名:"Windows"|"Linux"|"Darwin"。
import os # 导入os模块
# os.name 属性可以告诉我们当前操作系统的类型
if os.name == 'nt':
print("当前系统是 Windows")
elif os.name == 'posix':
print("当前系统是 Linux/Unix 或 macOS")
else:
print("未知操作系统")环境变量
os.environ:环境变量的“映射”(字典风格)。os.environ.get(key, default=None):取值并可给默认。
路径处理(os.path)
os.path.abspath(path):转绝对路径。os.path.join(*parts):拼路径(跨平台,别手写斜杠)。os.path.exists(path):路径是否存在。os.path.isdir(path)/os.path.isfile(path):是否为目录/文件。os.path.split(path):拆成(目录, 文件名)。os.path.splitext(path):拆成(主文件名, 扩展名)。
对应
pathlib(更现代推荐)Path.cwd()/Path.resolve();Path(...).exists();p.is_dir()/is_file();p.parent, p.name, p.stem, p.suffix;p.with_suffix(".txt");p = Path(a) / b / c
目录与文件操作
os.mkdir(path):建单层目录(存在会报错)。os.rmdir(path):删空目录(非空会报错)。os.rename(src, dst):重命名/移动(同卷更快;目标存在时在不同平台行为不同)。os.remove(path):删文件。
更健壮替代:
os.makedirs(path, exist_ok=True)递归建目录;os.replace(src, dst)原子替换更安全;shutil.rmtree(dir)删除非空目录(谨慎!)。
os.listdir(path="."):列出名称(不含类型/元数据)。- 组合筛选:
- 目录列表:
[n for n in os.listdir(".") if os.path.isdir(n)] - 所有
.py文件:[n for n in os.listdir(".") if os.path.isfile(n) and os.path.splitext(n)[1]==".py"]
- 目录列表:
- 性能更好:
os.scandir(path)(带is_file()、is_dir()、stat(),避免多次系统调用)。
注意事项
os.uname()在 Windows 上不存在:先用hasattr(os, "uname")判断。os.rmdir()只能删空目录;要删目录树请用shutil.rmtree。- 路径拼接请用
os.path.join或pathlib.Path,避免硬编码/或\。 - 操作前建议先判断存在性;批处理里要考虑异常捕获(权限、占用、锁定等)。
- 环境变量
PATH在不同系统分隔符不同:Windows 用;,类 Unix 用:。
序列化
把任意 Python 对象 ⇄ 二进制字节串(序列化/反序列化)。二进制的字节串可用来传输或存取,一般用 .pkl 文件存放被序列化后的二进制数据。
pickle
- 只能在 Python 之间使用,不能和其他语言直接交换数据。
- 不同 Python 版本之间有时也不兼容。
- 适合保存临时数据,不建议用于重要或长期保存的数据。
import pickle # 导入pickle模块
# 创建一个字典对象
person = {'name': 'Alice', 'age': 25, 'score': 90}
# 使用pickle.dumps()将对象序列化为bytes
data_bytes = pickle.dumps(person)
print(data_bytes) # 输出一串二进制内容,不是人能直接读懂的
data = {"user": "Alice", "scores": [95, 88, 76]}
# 保存到文件
with open("data.pkl", "wb") as f: # 二进制写
pickle.dump(data, f, protocol=pickle.HIGHEST_PROTOCOL)
# 从文件读取
with open("data.pkl", "rb") as f: # 二进制读
obj = pickle.load(f)
# 直接得到/还原字节串(不落盘)
b = pickle.dumps(data, protocol=pickle.HIGHEST_PROTOCOL)
obj2 = pickle.loads(b)常见数据存取
希望数据能被其他编程语言(如 JavaScript、Java、C# 等)读取,推荐使用 json 模块。JSON 是一种通用的数据交换格式,几乎所有编程语言都支持。
# JSON(文本、可读性好)
import json
data = {"user": "Alice", "score": 95}
with open("data.json", "w", encoding="utf-8") as f:
# `json.dumps()` 返回的是字符串,可以直接保存或传输。
json.dump(data, f, ensure_ascii=False, indent=2)
with open("data.json", "r", encoding="utf-8") as f:
data2 = json.load(f)
# CSV
import csv
rows = [{"name":"A","age":20},{"name":"B","age":30}]
with open("people.csv", "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=["name","age"])
writer.writeheader()
writer.writerows(rows)JSON与Python类型的对应关系
| JSON类型 | Python类型 |
|---|---|
| 对象 {} | 字典 dict |
| 数组 [] | 列表 list |
| 字符串 | str |
| 数字 | int/float |
| true/false | True/False |
| null | None |
序列化自定义对象
有时候我们会用自定义的类,比如 Student。如果直接用 json.dumps(),会报错,因为 JSON 不知道怎么处理自定义对象。我们可以通过传递 default 参数,告诉 JSON 如何把对象变成字典。
# 定义一个学生类
class Student:
# 初始化方法,接收姓名、年龄和分数参数
def __init__(self, name, age, score):
self.name = name
self.age = age
self.score = score
stu = Student('Lily', 19, 88)
# 定义一个转换函数,把Student对象变成字典
def student_to_dict(obj):
# 返回包含学生信息的字典
return {
'name': obj.name,
'age': obj.age,
'score': obj.score
}
# 序列化时传入default参数
# 使用json.dumps将学生对象序列化为JSON字符串,指定转换函数
json_str = json.dumps(stu, default=student_to_dict)
print(json_str) # 输出:{"name": "Lily", "age": 19, "score": 88}默认情况下,json.dumps() 会把中文转换成 Unicode 编码。如果想让中文直接显示,可以加上 ensure_ascii=False。
# ensure_ascii=False 可以让中文直接显示
json_str = json.dumps(data, ensure_ascii=False)
print(json_str) # 输出:{"name": "小明", "age": 20}多任务
一边听音乐一边上网。这种“同时做多件事”的能力就叫多任务,多任务的实现依赖于进程和线程这两个概念。单核CPU,操作系统会让每个任务轮流执行一点点时间,然后快速切换到下一个任务,因为切换速度很快,我们感觉所有任务都在同时进行。如果是多核CPU,操作系统可以让多个任务真正地在不同的核心上同时运行。
- 进程(Process):可以理解为正在运行的一个程序,比如你打开了两个记事本,就有两个进程。
- 线程(Thread):是进程内部可以“同时”做的多件小事,比如一个Word进程可以同时打字、检查拼写、保存文档等,每件小事就是一个线程。
- 一个进程可以包含多个线程。
- 线程是进程中的“最小执行单元”。
在Python中,实现多任务有三种常见方式:
- 多进程:开启多个进程,每个进程独立运行。
- 多线程:在一个进程内开启多个线程。
- 多进程+多线程:每个进程里再开多个线程(实际开发中较少用)。
进程
os.fork 仅限Unix/Linux
import os
print("当前进程ID:", os.getpid())
pid = os.fork() # 创建子进程
if pid == 0:
# 子进程返回0
print("我是子进程,ID:", os.getpid(), "父进程ID:", os.getppid())
else:
# 父进程返回子进程ID
print("我创建了一个子进程,子进程ID:", pid)multiprocessing 模块
from multiprocessing import Process
import os
# 子进程要执行的函数
def run_task(name):
print('子进程运行中,名字:%s,进程ID:%s' % (name, os.getpid()))
if __name__ == '__main__':
print('父进程ID:', os.getpid())
p = Process(target=run_task, args=('任务1',))
print('子进程即将启动')
p.start() # 启动子进程
p.join() # 等待子进程结束
# 调用 `p.join()` 时,父进程会在这一行“阻塞”,等待子进程运行结束后,才会继续往下执行。
print('子进程已结束')Pool 进程池
# 导入进程池模块
from multiprocessing import Pool
import os, time, random
# 定义工作函数,接收任务ID作为参数
def worker(task_id):
# 打印当前执行的任务ID和进程ID
print('执行任务 %s (进程ID: %s)' % (task_id, os.getpid()))
# 随机休眠0-2秒,模拟任务执行时间
time.sleep(random.random() * 2)
# 打印任务完成信息
print('任务 %s 完成' % task_id)
# 主程序入口
if __name__ == '__main__':
# 打印父进程的进程ID
print('父进程ID:', os.getpid())
# 创建进程池,最多同时运行3个进程
# 进程池可以限制同时运行的进程数量
pool = Pool(3) # 最多同时运行3个进程
# 循环创建5个任务
for i in range(5):
# 异步提交任务到进程池
pool.apply_async(worker, args=(i,))
# 关闭进程池,不再接受新任务
pool.close() # 关闭进程池,不再接受新任务
# 等待所有子进程结束
pool.join() # 等待所有子进程结束
# 打印所有任务完成信息
print('所有任务完成')subprocess 模块
subprocess模块,用于执行外部命令
import subprocess
# 打印提示信息,告知用户即将运行nslookup命令
print('运行nslookup命令')
# 使用subprocess.call()方法执行nslookup命令,查询www.python.org的DNS信息
# call()方法会等待命令执行完成并返回退出状态码
result = subprocess.call(['nslookup', 'www.python.org'])
# 打印命令的返回码,通常0表示成功,非0表示出错
print('命令返回码:', result)Queue 队列
多进程之间可以通过队列(Queue)来交换数据,父进程创建 Queue,子进程通过 Queue 读写数据。
# 导入多进程模块中的Process类和Queue队列类
from multiprocessing import Process, Queue
# 导入操作系统接口模块和时间模块
import os, time
# 定义写入数据的函数,参数q是队列对象
def write_data(q):
# 遍历列表中的每个值
for value in ['A', 'B', 'C']:
# 打印正在写入的值
print('写入:', value)
# 将值放入队列中
q.put(value)
# 暂停1秒
time.sleep(1)
# 定义读取数据的函数,参数q是队列对象
def read_data(q):
# 无限循环读取数据
while True:
# 从队列中获取数据,True表示阻塞等待
value = q.get(True)
# 打印读取到的值
print('读取:', value)
# 主程序入口
if __name__ == '__main__':
# 创建一个进程间通信的队列
q = Queue()
# 创建写入进程,目标函数是write_data,参数是队列q
pw = Process(target=write_data, args=(q,))
# 创建读取进程,目标函数是read_data,参数是队列q
pr = Process(target=read_data, args=(q,))
# 启动写入进程
pw.start()
# 启动读取进程
pr.start()
# 等待写入进程结束
pw.join()
# 强制终止读取进程(因为读进程是死循环,需要强制结束)
pr.terminate() # 读进程是死循环,强制结束线程
Python 标准库自带了 threading 模块,可以很方便地实现多线程。我们只需要把要执行的代码写成一个函数,然后用 Thread 类来启动一个新线程。
threading 线程模块
# 导入线程模块,用于创建和管理线程
import threading
# 导入时间模块,用于线程休眠
import time
# 定义工作函数,作为线程的执行目标
def worker():
# 打印当前线程开始执行的信息
# current_thread().name 可以获取当前线程的名字,主线程默认叫 MainThread
print('线程 %s 开始' % threading.current_thread().name)
# 让当前线程休眠2秒,模拟耗时操作
time.sleep(2)
# 打印当前线程结束执行的信息
print('线程 %s 结束' % threading.current_thread().name)
# 主程序入口
if __name__ == '__main__':
# 创建第一个线程对象,指定目标函数为worker,线程名为Worker-1
t1 = threading.Thread(target=worker, name='Worker-1')
# 创建第二个线程对象,指定目标函数为worker,线程名为Worker-2
t2 = threading.Thread(target=worker, name='Worker-2')
# 启动第一个线程
t1.start()
# 启动第二个线程
t2.start()
# 等待第一个线程执行完毕
t1.join()
# 等待第二个线程执行完毕
t2.join()
# 所有线程都结束后,打印完成信息
print('所有线程结束')数据共享
多线程的最大特点是所有线程共享进程内的所有数据。这意味着多个线程可以同时修改同一个变量,这就可能导致数据混乱。
# 导入线程模块
import threading
# 定义共享变量,初始值为0
counter = 0
# 定义加法函数
def add():
# 声明使用全局变量 counter
global counter
# 循环100000次,每次将counter加1
for _ in range(100000):
counter += 1
# 定义减法函数
def subtract():
# 声明使用全局变量counter
global counter
# 循环100000次,每次将counter减1
for _ in range(100000):
counter -= 1
t1 = threading.Thread(target=add)
t2 = threading.Thread(target=subtract)
t1.start()
t2.start()
t1.join()
t2.join()
# 一个典型的交错是这样的(简化):
# 1. 线程 A 读到 counter = 5
# 2. 线程 B 也读到 counter = 5
# 3. A 计算并写回 6
# 4. B 计算并写回 4(把 A 的更新覆盖掉)
# 结果少加了一次,久而久之总和就偏离 0。
# 最终 counter 可能不为 0
print('最终counter的值:', counter)Lock 锁
使用 with lock: 加锁后,同一时刻只有一个线程能修改共享数据,其他线程必须等待锁被释放。
import threading
counter = 0
lock = threading.Lock()
def add():
global counter
for _ in range(100000):
with lock: # 临界区
counter += 1
def subtract():
global counter
for _ in range(100000):
with lock: # 临界区
counter -= 1
t1 = threading.Thread(target=add)
t2 = threading.Thread(target=subtract)
t1.start(); t2.start()
t1.join(); t2.join()
print('最终counter的值:', counter) # 现在应稳定为 0
with lock:
# 等价于
lock.acquire() # 获取锁
try:
... # 临界区
finally:
lock.release() # 释放锁GIL 全局解释器锁
虽然 Python 支持多线程,但有一个“全局解释器锁”(GIL),导致同一时刻只有一个线程能执行 Python 字节码。
这意味着即使你有多核 CPU,Python 的多线程也无法真正利用多核并行,只能在 IO 密集型任务(如网络、磁盘读写)中提升效率。
多线程死循环只能用到一个 CPU 核心,CPU 密集型工作应使用多进程。
# 导入线程和多进程模块
import threading, multiprocessing
# 定义一个忙等待函数,用于占用CPU资源
def busy_loop():
# 创建无限循环
while True:
pass # 死循环,占用CPU
# 启动和CPU核心数一样多的线程
# 遍历CPU核心数量
for i in range(multiprocessing.cpu_count()):
# 创建新线程,目标函数为busy_loop
t = threading.Thread(target=busy_loop)
# 启动线程
# 会发现CPU占用率不会达到100%×核心数,说明多线程无法充分利用多核。
t.start()ThreadLocal
在多线程编程中,每个线程通常都需要保存属于自己的数据。
如果用局部变量,虽然安全,但每次函数调用都要一层层传递参数,非常麻烦。
如果用全局变量,又会导致不同线程之间数据混乱,必须加锁,代码复杂且容易出错。
ThreadLocal 既让每个线程拥有自己的“全局变量”,又不用手动传递参数,也不用加锁,可以看成线程专属的全局变量容器。
- ThreadLocal 是 Python
threading模块提供的一个特殊对象。 - 可以让每个线程都拥有一份独立的数据副本,互不干扰。
- 每个线程只能访问自己的那一份数据。
- 不需要加锁,线程安全。
- 可以为 ThreadLocal 绑定任意属性,如 user、db、request 等。
ThreadLocal 的本质是为每个线程维护一个独立的“字典”,每个线程对 ThreadLocal 的属性操作只影响自己。
这样,所有函数都可以直接通过 ThreadLocal 访问线程专属的数据,无需层层传递参数,也不用担心数据被其他线程篡改。
# 导入线程模块
import threading
# 创建一个全局的ThreadLocal对象
local_data = threading.local()
# 定义处理数据的函数
def process_data():
# 直接访问local_data的属性,相当于访问当前线程自己的数据
value = local_data.value
# 打印当前线程名称和读取到的值
print('线程 %s 读取到 value = %s' % (threading.current_thread().name, value))
# 定义线程任务函数
def thread_task(val):
# 给当前线程绑定一个value属性
local_data.value = val
# 调用处理数据函数
process_data()
# 创建两个线程,分别绑定不同的数据
# 创建第一个线程,目标函数为thread_task,参数为'苹果',线程名为'线程A'
t1 = threading.Thread(target=thread_task, args=('苹果',), name='线程A')
# 创建第二个线程,目标函数为thread_task,参数为'香蕉',线程名为'线程B'
t2 = threading.Thread(target=thread_task, args=('香蕉',), name='线程B')
t1.start()
t2.start()
t1.join()
t2.join()
# 创建ThreadLocal对象
user_local = threading.local()
def show_user():
# 直接读取当前线程的user属性
print('当前线程:%s,用户:%s' % (threading.current_thread().name, user_local.user))
def login(user_name):
# 给当前线程绑定user属性
user_local.user = user_name
show_user()
# 启动两个线程,分别绑定不同的用户
t1 = threading.Thread(target=login, args=('Alice',), name='登录线程1')
t2 = threading.Thread(target=login, args=('Bob',), name='登录线程2')
t1.start()
t2.start()
t1.join()
t2.join()配套代码:vsme/learn-python