Python 基础课:生成器、异步编程与综合项目
学习目标
今天结束后,你能够:
- 理解迭代器和生成器的概念
- 使用 yield 创建生成器函数
- 实现流式输出(类似 ChatGPT)
- 理解同步和异步的区别
- 使用 async/await 编写异步代码
- 使用 asyncio 进行并发编程
- 完成一个完整的 AI 对话系统项目
迭代器与生成器
迭代器(Iterator)
迭代器是可以被 for 循环遍历的对象。
可迭代对象:
# 这些都是可迭代对象
numbers = [1, 2, 3]
text = "Hello"
items = {"a": 1, "b": 2}
for x in numbers:
print(x)
迭代器协议:
class Counter:
"""计数器迭代器"""
def __init__(self, max_count):
self.max_count = max_count
self.current = 0
def __iter__(self):
return self
def __next__(self):
if self.current >= self.max_count:
raise StopIteration
self.current += 1
return self.current
# 使用
counter = Counter(5)
for num in counter:
print(num) # 1 2 3 4 5
生成器(Generator)
生成器是一种特殊的迭代器,用 yield 关键字创建。
基本语法:
def count_up_to(max_count):
"""生成器函数"""
count = 1
while count <= max_count:
yield count # 暂停并返回值
count += 1
# 使用
for num in count_up_to(5):
print(num) # 1 2 3 4 5
yield vs return:
- return:结束函数,返回值
- yield:暂停函数,返回值,下次调用继续执行
生成器表达式:
# 列表推导式(一次性生成所有元素)
squares = [x**2 for x in range(1000000)] # 占用大量内存
# 生成器表达式(按需生成)
squares = (x**2 for x in range(1000000)) # 几乎不占内存
# 使用
for square in squares:
if square > 100:
break
print(square)
为什么使用生成器
内存效率:
# 不好:一次性加载所有行
def read_file_bad(filename):
with open(filename, 'r') as f:
return f.readlines() # 整个文件加载到内存
# 好:按需读取
def read_file_good(filename):
with open(filename, 'r') as f:
for line in f:
yield line.strip()
# 处理大文件
for line in read_file_good("large_file.txt"):
process(line) # 一次只处理一行
AI 场景示例
def token_stream(text, chunk_size=10):
"""模拟流式输出"""
words = text.split()
for i in range(0, len(words), chunk_size):
chunk = " ".join(words[i:i+chunk_size])
yield chunk
# 使用
text = "这是一段很长的文本内容,模拟 AI 的流式输出效果"
for chunk in token_stream(text, chunk_size=3):
print(chunk)
# 模拟延迟
import time
time.sleep(0.5)
输出(每0.5秒输出一部分):
这是一段很长的
文本内容,模拟 AI
的流式输出效果
课堂练习
文本分块生成器
实现一个生成器,将长文本按指定字符数分块。
参考答案:
def chunk_text(text, chunk_size=100):
"""将文本分块"""
for i in range(0, len(text), chunk_size):
yield text[i:i+chunk_size]
# 测试
long_text = "Python " * 50
for i, chunk in enumerate(chunk_text(long_text, 20), 1):
print(f"块 {i}: {chunk}")
流式输出
流式输出是 AI 应用的常见需求,让用户看到内容逐步生成。
模拟流式输出
import time
def stream_response(text, delay=0.05):
"""模拟流式输出"""
for char in text:
yield char
time.sleep(delay)
# 使用
response = "这是 AI 的回复内容"
print("AI: ", end="", flush=True)
for char in stream_response(response):
print(char, end="", flush=True)
print() # 换行
流式 API 响应处理
import requests
def stream_chat_response(prompt):
"""流式调用 AI API(模拟)"""
# 实际场景中,这里会调用 OpenAI stream API
response_text = "这是一段 AI 生成的回复内容,会逐字输出。"
for word in response_text.split():
yield word + " "
time.sleep(0.1)
# 使用
print("AI: ", end="", flush=True)
for chunk in stream_chat_response("你好"):
print(chunk, end="", flush=True)
print()
AI 场景示例
class StreamingChat:
"""流式对话类"""
def __init__(self, model="gpt-3.5-turbo"):
self.model = model
def chat(self, message):
"""发送消息并流式返回"""
# 模拟 API 调用
response = f"收到您的消息:{message}。这是一段流式输出的回复内容。"
for word in response.split():
yield word + " "
def chat_with_collect(self, message):
"""流式输出并收集完整响应"""
full_response = []
for chunk in self.chat(message):
full_response.append(chunk)
yield chunk
# 可以在这里保存完整响应
complete = "".join(full_response)
return complete
# 使用
chat = StreamingChat()
print("用户: 你好")
print("AI: ", end="", flush=True)
for chunk in chat.chat("你好"):
print(chunk, end="", flush=True)
time.sleep(0.05)
print()
异步编程基础
同步 vs 异步
同步(Synchronous):
- 一件事做完再做下一件
- 等待 I/O 时阻塞
异步(Asynchronous):
- 多件事并发进行
- 等待 I/O 时可以做其他事
示例对比:
import time
# 同步:总共需要 6 秒
def sync_task():
print("任务1开始")
time.sleep(2)
print("任务1完成")
print("任务2开始")
time.sleep(2)
print("任务2完成")
print("任务3开始")
time.sleep(2)
print("任务3完成")
# 异步:总共需要 2 秒(三个任务并发)
import asyncio
async def async_task(name, delay):
print(f"{name}开始")
await asyncio.sleep(delay)
print(f"{name}完成")
async def async_main():
await asyncio.gather(
async_task("任务1", 2),
async_task("任务2", 2),
async_task("任务3", 2)
)
# 运行
asyncio.run(async_main())
async/await 语法
定义异步函数:
async def fetch_data():
"""异步函数"""
await asyncio.sleep(1) # 模拟 I/O 操作
return "数据"
调用异步函数:
# 错误:不能直接调用
result = fetch_data() # 返回 coroutine 对象
# 正确:使用 await
async def main():
result = await fetch_data()
print(result)
asyncio.run(main())
asyncio 基础
并发执行多个任务:
import asyncio
async def fetch_user(user_id):
"""获取用户信息"""
await asyncio.sleep(1) # 模拟 API 调用
return {"id": user_id, "name": f"User{user_id}"}
async def main():
# 并发执行
users = await asyncio.gather(
fetch_user(1),
fetch_user(2),
fetch_user(3)
)
print(users)
asyncio.run(main())
使用 create_task:
async def main():
# 创建任务
task1 = asyncio.create_task(fetch_user(1))
task2 = asyncio.create_task(fetch_user(2))
# 等待完成
user1 = await task1
user2 = await task2
print(user1, user2)
asyncio.run(main())
AI 场景示例
import asyncio
import aiohttp # 异步 HTTP 库
class AsyncAIClient:
"""异步 AI 客户端"""
def __init__(self, api_key):
self.api_key = api_key
self.base_url = "https://api.openai.com/v1"
async def chat(self, message, model="gpt-3.5-turbo"):
"""异步发送消息"""
# 模拟 API 调用
await asyncio.sleep(1)
return f"回复: {message}"
async def batch_chat(self, messages):
"""批量并发发送消息"""
tasks = [self.chat(msg) for msg in messages]
return await asyncio.gather(*tasks)
async def main():
client = AsyncAIClient("sk-xxx")
# 并发发送多个消息
messages = ["你好", "天气如何", "再见"]
responses = await client.batch_chat(messages)
for msg, resp in zip(messages, responses):
print(f"Q: {msg}")
print(f"A: {resp}")
print()
asyncio.run(main())
异步流式输出
结合生成器和异步,实现异步流式输出。
异步生成器
import asyncio
async def async_stream(text, delay=0.1):
"""异步流式输出"""
for word in text.split():
await asyncio.sleep(delay)
yield word + " "
async def main():
text = "这是异步流式输出的内容"
print("输出: ", end="", flush=True)
async for chunk in async_stream(text, 0.2):
print(chunk, end="", flush=True)
print()
asyncio.run(main())
AI 异步流式响应
import asyncio
class AsyncStreamingChat:
"""异步流式对话"""
async def chat_stream(self, message):
"""流式返回响应"""
response = f"收到消息:{message}。这是流式回复。"
for word in response.split():
await asyncio.sleep(0.1) # 模拟延迟
yield word + " "
async def parallel_chat(self, messages):
"""并发处理多个对话"""
async def collect_stream(message):
chunks = []
async for chunk in self.chat_stream(message):
chunks.append(chunk)
return "".join(chunks)
tasks = [collect_stream(msg) for msg in messages]
return await asyncio.gather(*tasks)
# 使用
async def main():
chat = AsyncStreamingChat()
# 流式输出单个响应
print("单个对话:")
async for chunk in chat.chat_stream("你好"):
print(chunk, end="", flush=True)
print("\n")
# 并发处理多个对话
print("并发对话:")
messages = ["早上好", "天气如何"]
responses = await chat.parallel_chat(messages)
for msg, resp in zip(messages, responses):
print(f"Q: {msg}\nA: {resp}\n")
asyncio.run(main())
综合项目:AI 对话系统
综合前面学习的所有知识,构建一个完整的 AI 对话系统。
项目需求
功能:
- 支持多轮对话
- 消息历史管理
- 流式输出
- 配置管理
- 日志记录
- 数据持久化
- 异常处理
项目结构:
ai_chat_system/
├── config.py # 配置管理
├── models.py # 数据模型
├── chat.py # 对话逻辑
├── storage.py # 数据存储
├── utils.py # 工具函数
├── main.py # 主程序
├── .env # 环境变量
└── requirements.txt # 依赖
数据模型(models.py)
from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import datetime
class Message(BaseModel):
"""消息模型"""
role: str = Field(..., pattern="^(system|user|assistant)$")
content: str = Field(..., min_length=1)
timestamp: datetime = Field(default_factory=datetime.now)
def token_count(self) -> int:
return len(self.content) // 4
class ChatConfig(BaseModel):
"""对话配置"""
model: str = "gpt-3.5-turbo"
temperature: float = Field(0.7, ge=0, le=2)
max_tokens: int = Field(2000, gt=0)
stream: bool = True
class ChatSession(BaseModel):
"""对话会话"""
session_id: str
messages: List[Message] = []
config: ChatConfig
created_at: datetime = Field(default_factory=datetime.now)
def add_message(self, role: str, content: str):
msg = Message(role=role, content=content)
self.messages.append(msg)
def total_tokens(self) -> int:
return sum(msg.token_count() for msg in self.messages)
配置管理(config.py)
import os
from dotenv import load_dotenv
from pydantic import BaseModel
load_dotenv()
class AppConfig(BaseModel):
"""应用配置"""
api_key: str = os.getenv("OPENAI_API_KEY", "")
model: str = os.getenv("MODEL", "gpt-3.5-turbo")
temperature: float = float(os.getenv("TEMPERATURE", "0.7"))
max_tokens: int = int(os.getenv("MAX_TOKENS", "2000"))
data_dir: str = "data"
log_file: str = "app.log"
def validate(self) -> bool:
if not self.api_key:
print("错误: API Key 未设置")
return False
return True
config = AppConfig()
数据存储(storage.py)
import json
from pathlib import Path
from typing import Optional
from models import ChatSession
class Storage:
"""数据存储"""
def __init__(self, data_dir: str = "data"):
self.data_dir = Path(data_dir)
self.data_dir.mkdir(exist_ok=True)
def save_session(self, session: ChatSession):
"""保存会话"""
filename = self.data_dir / f"{session.session_id}.json"
with open(filename, "w", encoding="utf-8") as f:
json.dump(session.model_dump(), f, ensure_ascii=False, indent=2, default=str)
def load_session(self, session_id: str) -> Optional[ChatSession]:
"""加载会话"""
filename = self.data_dir / f"{session_id}.json"
try:
with open(filename, "r", encoding="utf-8") as f:
data = json.load(f)
return ChatSession(**data)
except FileNotFoundError:
return None
def list_sessions(self) -> list:
"""列出所有会话"""
return [f.stem for f in self.data_dir.glob("*.json")]
对话逻辑(chat.py)
import time
from typing import Generator
from models import ChatSession, ChatConfig
from config import config
class ChatClient:
"""对话客户端"""
def __init__(self, chat_config: ChatConfig):
self.config = chat_config
def send_message(self, session: ChatSession, message: str) -> str:
"""发送消息(同步)"""
session.add_message("user", message)
# 模拟 API 调用
response = f"这是对 '{message}' 的回复"
session.add_message("assistant", response)
return response
def send_message_stream(self, session: ChatSession, message: str) -> Generator[str, None, None]:
"""发送消息(流式)"""
session.add_message("user", message)
# 模拟流式响应
response = f"这是对 '{message}' 的流式回复内容"
full_response = []
for word in response.split():
chunk = word + " "
full_response.append(chunk)
yield chunk
time.sleep(0.1)
# 保存完整响应
session.add_message("assistant", "".join(full_response).strip())
工具函数(utils.py)
import logging
from datetime import datetime
def setup_logging(log_file: str = "app.log"):
"""配置日志"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file, encoding='utf-8'),
logging.StreamHandler()
]
)
def generate_session_id() -> str:
"""生成会话ID"""
return datetime.now().strftime("%Y%m%d_%H%M%S")
主程序(main.py)
import logging
from models import ChatSession, ChatConfig
from chat import ChatClient
from storage import Storage
from utils import setup_logging, generate_session_id
from config import config
def main():
"""主程序"""
# 配置日志
setup_logging(config.log_file)
# 验证配置
if not config.validate():
return
# 初始化
storage = Storage(config.data_dir)
chat_config = ChatConfig(
model=config.model,
temperature=config.temperature,
max_tokens=config.max_tokens
)
client = ChatClient(chat_config)
# 创建会话
session_id = generate_session_id()
session = ChatSession(session_id=session_id, config=chat_config)
session.add_message("system", "你是一个Python助手")
logging.info(f"会话开始: {session_id}")
print(f"=== AI 对话系统 ===")
print(f"会话ID: {session_id}")
print("输入 'exit' 退出,'save' 保存会话\n")
# 对话循环
try:
while True:
user_input = input("你: ").strip()
if not user_input:
continue
if user_input.lower() == "exit":
break
if user_input.lower() == "save":
storage.save_session(session)
print("会话已保存\n")
continue
# 流式输出
print("AI: ", end="", flush=True)
for chunk in client.send_message_stream(session, user_input):
print(chunk, end="", flush=True)
print("\n")
logging.info(f"用户: {user_input}")
logging.info(f"Token 数: {session.total_tokens()}")
except KeyboardInterrupt:
print("\n\n对话中断")
finally:
# 保存会话
storage.save_session(session)
logging.info(f"会话结束,总 tokens: {session.total_tokens()}")
print(f"\n会话已保存: {session_id}")
print(f"总消息数: {len(session.messages)}")
print(f"总 tokens: {session.total_tokens()}")
if __name__ == "__main__":
main()
requirements.txt
pydantic>=2.0.0
python-dotenv>=1.0.0
requests>=2.31.0
运行项目
# 安装依赖
pip install -r requirements.txt
# 创建 .env 文件
echo "OPENAI_API_KEY=sk-your-key-here" > .env
# 运行
python main.py
项目扩展建议
1. 真实 API 集成
import requests
class RealChatClient(ChatClient):
"""真实 API 客户端"""
def send_message_stream(self, session: ChatSession, message: str):
session.add_message("user", message)
url = "https://api.openai.com/v1/chat/completions"
headers = {
"Authorization": f"Bearer {config.api_key}",
"Content-Type": "application/json"
}
data = {
"model": self.config.model,
"messages": [msg.model_dump() for msg in session.messages],
"temperature": self.config.temperature,
"stream": True
}
response = requests.post(url, headers=headers, json=data, stream=True)
full_response = []
for line in response.iter_lines():
if line:
# 解析 SSE 格式
chunk = parse_sse(line)
if chunk:
full_response.append(chunk)
yield chunk
session.add_message("assistant", "".join(full_response))
2. 异步版本
import asyncio
import aiohttp
class AsyncChatClient:
"""异步对话客户端"""
async def send_message_stream(self, session: ChatSession, message: str):
session.add_message("user", message)
async with aiohttp.ClientSession() as http_session:
async with http_session.post(url, headers=headers, json=data) as response:
full_response = []
async for line in response.content:
chunk = parse_sse(line)
if chunk:
full_response.append(chunk)
yield chunk
session.add_message("assistant", "".join(full_response))
3. 多模型支持
class MultiModelClient:
"""多模型客户端"""
def __init__(self):
self.clients = {
"openai": OpenAIClient(),
"claude": ClaudeClient(),
"gemini": GeminiClient()
}
def send_message(self, provider: str, message: str):
client = self.clients.get(provider)
if not client:
raise ValueError(f"不支持的模型: {provider}")
return client.send_message(message)
今日总结
核心知识点回顾
1. 迭代器
__iter__和__next__方法- 实现自定义迭代器
2. 生成器
- yield 关键字
- 生成器表达式
- 内存效率高
3. 流式输出
- 逐步返回结果
- 提升用户体验
4. 异步编程
- async/await 语法
- asyncio 模块
- 并发执行任务
5. 异步生成器
- async def + yield
- async for 循环
6. 综合项目
- 模块化设计
- 数据模型(Pydantic)
- 配置管理
- 数据持久化
常见问题
Q:生成器和列表推导式的区别? A:列表推导式一次生成所有元素,生成器按需生成,节省内存。
Q:什么时候用异步? A:I/O 密集型操作(网络请求、文件读写、数据库查询)。
Q:async def 和 def 的区别? A:async def 定义异步函数,必须用 await 调用;def 定义同步函数。
Q:如何选择同步还是异步? A:计算密集型用同步;I/O 密集型用异步;简单脚本用同步。
Q:综合项目如何组织代码? A:按功能分模块(models, config, storage, chat, utils, main)。
重点注意事项
-
生成器只能遍历一次:
gen = (x for x in range(5)) list(gen) # [0, 1, 2, 3, 4] list(gen) # [] 已耗尽 -
异步函数必须在异步上下文调用:
# 错误 result = await async_func() # 在同步函数中不能用 await # 正确 async def main(): result = await async_func() asyncio.run(main()) -
不要混用同步和异步 I/O:
# 错误:异步函数中用同步 I/O async def bad(): time.sleep(1) # 阻塞整个事件循环 # 正确 async def good(): await asyncio.sleep(1) -
项目配置不要硬编码:使用 .env 文件和环境变量
-
数据模型用 Pydantic:自动验证和类型检查
课程总结
6天学习路径:
- Day1:环境、语法、变量、字符串、条件
- Day2:循环、列表、字典、元组、集合
- Day3:函数、模块、lambda、JSON
- Day4:文件、异常、日志、CSV
- Day5:OOP、类型提示、HTTP、环境变量
- Day6:生成器、异步、综合项目
后续学习方向:
- AI 框架:LangChain、LlamaIndex
- Web 开发:FastAPI、Streamlit
- 数据处理:Pandas、NumPy
- 深度学习:PyTorch、TensorFlow
- Agent 开发:AutoGPT、AgentGPT
恭喜你完成 Python 基础课程,现在可以开始 AI 智能体开发了!