Series Article

Python 基础课 Day6:生成器、异步编程与综合项目

Python 基础课:生成器、异步编程与综合项目

学习目标

今天结束后,你能够:

  • 理解迭代器和生成器的概念
  • 使用 yield 创建生成器函数
  • 实现流式输出(类似 ChatGPT)
  • 理解同步和异步的区别
  • 使用 async/await 编写异步代码
  • 使用 asyncio 进行并发编程
  • 完成一个完整的 AI 对话系统项目

迭代器与生成器

迭代器(Iterator)

迭代器是可以被 for 循环遍历的对象。

可迭代对象:

# 这些都是可迭代对象
numbers = [1, 2, 3]
text = "Hello"
items = {"a": 1, "b": 2}

for x in numbers:
    print(x)

迭代器协议:

class Counter:
    """计数器迭代器"""
    
    def __init__(self, max_count):
        self.max_count = max_count
        self.current = 0
    
    def __iter__(self):
        return self
    
    def __next__(self):
        if self.current >= self.max_count:
            raise StopIteration
        self.current += 1
        return self.current

# 使用
counter = Counter(5)
for num in counter:
    print(num)  # 1 2 3 4 5

生成器(Generator)

生成器是一种特殊的迭代器,用 yield 关键字创建。

基本语法:

def count_up_to(max_count):
    """生成器函数"""
    count = 1
    while count <= max_count:
        yield count  # 暂停并返回值
        count += 1

# 使用
for num in count_up_to(5):
    print(num)  # 1 2 3 4 5

yield vs return:

  • return:结束函数,返回值
  • yield:暂停函数,返回值,下次调用继续执行

生成器表达式:

# 列表推导式(一次性生成所有元素)
squares = [x**2 for x in range(1000000)]  # 占用大量内存

# 生成器表达式(按需生成)
squares = (x**2 for x in range(1000000))  # 几乎不占内存

# 使用
for square in squares:
    if square > 100:
        break
    print(square)

为什么使用生成器

内存效率:

# 不好:一次性加载所有行
def read_file_bad(filename):
    with open(filename, 'r') as f:
        return f.readlines()  # 整个文件加载到内存

# 好:按需读取
def read_file_good(filename):
    with open(filename, 'r') as f:
        for line in f:
            yield line.strip()

# 处理大文件
for line in read_file_good("large_file.txt"):
    process(line)  # 一次只处理一行

AI 场景示例

def token_stream(text, chunk_size=10):
    """模拟流式输出"""
    words = text.split()
    for i in range(0, len(words), chunk_size):
        chunk = " ".join(words[i:i+chunk_size])
        yield chunk

# 使用
text = "这是一段很长的文本内容,模拟 AI 的流式输出效果"
for chunk in token_stream(text, chunk_size=3):
    print(chunk)
    # 模拟延迟
    import time
    time.sleep(0.5)

输出(每0.5秒输出一部分):

这是一段很长的
文本内容,模拟 AI
的流式输出效果

课堂练习

文本分块生成器

实现一个生成器,将长文本按指定字符数分块。

参考答案:

def chunk_text(text, chunk_size=100):
    """将文本分块"""
    for i in range(0, len(text), chunk_size):
        yield text[i:i+chunk_size]

# 测试
long_text = "Python " * 50
for i, chunk in enumerate(chunk_text(long_text, 20), 1):
    print(f"块 {i}: {chunk}")

流式输出

流式输出是 AI 应用的常见需求,让用户看到内容逐步生成。

模拟流式输出

import time

def stream_response(text, delay=0.05):
    """模拟流式输出"""
    for char in text:
        yield char
        time.sleep(delay)

# 使用
response = "这是 AI 的回复内容"
print("AI: ", end="", flush=True)
for char in stream_response(response):
    print(char, end="", flush=True)
print()  # 换行

流式 API 响应处理

import requests

def stream_chat_response(prompt):
    """流式调用 AI API(模拟)"""
    # 实际场景中,这里会调用 OpenAI stream API
    response_text = "这是一段 AI 生成的回复内容,会逐字输出。"
    
    for word in response_text.split():
        yield word + " "
        time.sleep(0.1)

# 使用
print("AI: ", end="", flush=True)
for chunk in stream_chat_response("你好"):
    print(chunk, end="", flush=True)
print()

AI 场景示例

class StreamingChat:
    """流式对话类"""
    
    def __init__(self, model="gpt-3.5-turbo"):
        self.model = model
    
    def chat(self, message):
        """发送消息并流式返回"""
        # 模拟 API 调用
        response = f"收到您的消息:{message}。这是一段流式输出的回复内容。"
        
        for word in response.split():
            yield word + " "
    
    def chat_with_collect(self, message):
        """流式输出并收集完整响应"""
        full_response = []
        
        for chunk in self.chat(message):
            full_response.append(chunk)
            yield chunk
        
        # 可以在这里保存完整响应
        complete = "".join(full_response)
        return complete

# 使用
chat = StreamingChat()
print("用户: 你好")
print("AI: ", end="", flush=True)

for chunk in chat.chat("你好"):
    print(chunk, end="", flush=True)
    time.sleep(0.05)
print()

异步编程基础

同步 vs 异步

同步(Synchronous):

  • 一件事做完再做下一件
  • 等待 I/O 时阻塞

异步(Asynchronous):

  • 多件事并发进行
  • 等待 I/O 时可以做其他事

示例对比:

import time

# 同步:总共需要 6 秒
def sync_task():
    print("任务1开始")
    time.sleep(2)
    print("任务1完成")
    
    print("任务2开始")
    time.sleep(2)
    print("任务2完成")
    
    print("任务3开始")
    time.sleep(2)
    print("任务3完成")

# 异步:总共需要 2 秒(三个任务并发)
import asyncio

async def async_task(name, delay):
    print(f"{name}开始")
    await asyncio.sleep(delay)
    print(f"{name}完成")

async def async_main():
    await asyncio.gather(
        async_task("任务1", 2),
        async_task("任务2", 2),
        async_task("任务3", 2)
    )

# 运行
asyncio.run(async_main())

async/await 语法

定义异步函数:

async def fetch_data():
    """异步函数"""
    await asyncio.sleep(1)  # 模拟 I/O 操作
    return "数据"

调用异步函数:

# 错误:不能直接调用
result = fetch_data()  # 返回 coroutine 对象

# 正确:使用 await
async def main():
    result = await fetch_data()
    print(result)

asyncio.run(main())

asyncio 基础

并发执行多个任务:

import asyncio

async def fetch_user(user_id):
    """获取用户信息"""
    await asyncio.sleep(1)  # 模拟 API 调用
    return {"id": user_id, "name": f"User{user_id}"}

async def main():
    # 并发执行
    users = await asyncio.gather(
        fetch_user(1),
        fetch_user(2),
        fetch_user(3)
    )
    print(users)

asyncio.run(main())

使用 create_task:

async def main():
    # 创建任务
    task1 = asyncio.create_task(fetch_user(1))
    task2 = asyncio.create_task(fetch_user(2))
    
    # 等待完成
    user1 = await task1
    user2 = await task2
    
    print(user1, user2)

asyncio.run(main())

AI 场景示例

import asyncio
import aiohttp  # 异步 HTTP 库

class AsyncAIClient:
    """异步 AI 客户端"""
    
    def __init__(self, api_key):
        self.api_key = api_key
        self.base_url = "https://api.openai.com/v1"
    
    async def chat(self, message, model="gpt-3.5-turbo"):
        """异步发送消息"""
        # 模拟 API 调用
        await asyncio.sleep(1)
        return f"回复: {message}"
    
    async def batch_chat(self, messages):
        """批量并发发送消息"""
        tasks = [self.chat(msg) for msg in messages]
        return await asyncio.gather(*tasks)

async def main():
    client = AsyncAIClient("sk-xxx")
    
    # 并发发送多个消息
    messages = ["你好", "天气如何", "再见"]
    responses = await client.batch_chat(messages)
    
    for msg, resp in zip(messages, responses):
        print(f"Q: {msg}")
        print(f"A: {resp}")
        print()

asyncio.run(main())

异步流式输出

结合生成器和异步,实现异步流式输出。

异步生成器

import asyncio

async def async_stream(text, delay=0.1):
    """异步流式输出"""
    for word in text.split():
        await asyncio.sleep(delay)
        yield word + " "

async def main():
    text = "这是异步流式输出的内容"
    print("输出: ", end="", flush=True)
    
    async for chunk in async_stream(text, 0.2):
        print(chunk, end="", flush=True)
    print()

asyncio.run(main())

AI 异步流式响应

import asyncio

class AsyncStreamingChat:
    """异步流式对话"""
    
    async def chat_stream(self, message):
        """流式返回响应"""
        response = f"收到消息:{message}。这是流式回复。"
        
        for word in response.split():
            await asyncio.sleep(0.1)  # 模拟延迟
            yield word + " "
    
    async def parallel_chat(self, messages):
        """并发处理多个对话"""
        async def collect_stream(message):
            chunks = []
            async for chunk in self.chat_stream(message):
                chunks.append(chunk)
            return "".join(chunks)
        
        tasks = [collect_stream(msg) for msg in messages]
        return await asyncio.gather(*tasks)

# 使用
async def main():
    chat = AsyncStreamingChat()
    
    # 流式输出单个响应
    print("单个对话:")
    async for chunk in chat.chat_stream("你好"):
        print(chunk, end="", flush=True)
    print("\n")
    
    # 并发处理多个对话
    print("并发对话:")
    messages = ["早上好", "天气如何"]
    responses = await chat.parallel_chat(messages)
    for msg, resp in zip(messages, responses):
        print(f"Q: {msg}\nA: {resp}\n")

asyncio.run(main())

综合项目:AI 对话系统

综合前面学习的所有知识,构建一个完整的 AI 对话系统。

项目需求

功能:

  1. 支持多轮对话
  2. 消息历史管理
  3. 流式输出
  4. 配置管理
  5. 日志记录
  6. 数据持久化
  7. 异常处理

项目结构:

ai_chat_system/
├── config.py          # 配置管理
├── models.py          # 数据模型
├── chat.py            # 对话逻辑
├── storage.py         # 数据存储
├── utils.py           # 工具函数
├── main.py            # 主程序
├── .env               # 环境变量
└── requirements.txt   # 依赖

数据模型(models.py)

from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import datetime

class Message(BaseModel):
    """消息模型"""
    role: str = Field(..., pattern="^(system|user|assistant)$")
    content: str = Field(..., min_length=1)
    timestamp: datetime = Field(default_factory=datetime.now)
    
    def token_count(self) -> int:
        return len(self.content) // 4

class ChatConfig(BaseModel):
    """对话配置"""
    model: str = "gpt-3.5-turbo"
    temperature: float = Field(0.7, ge=0, le=2)
    max_tokens: int = Field(2000, gt=0)
    stream: bool = True

class ChatSession(BaseModel):
    """对话会话"""
    session_id: str
    messages: List[Message] = []
    config: ChatConfig
    created_at: datetime = Field(default_factory=datetime.now)
    
    def add_message(self, role: str, content: str):
        msg = Message(role=role, content=content)
        self.messages.append(msg)
    
    def total_tokens(self) -> int:
        return sum(msg.token_count() for msg in self.messages)

配置管理(config.py)

import os
from dotenv import load_dotenv
from pydantic import BaseModel

load_dotenv()

class AppConfig(BaseModel):
    """应用配置"""
    api_key: str = os.getenv("OPENAI_API_KEY", "")
    model: str = os.getenv("MODEL", "gpt-3.5-turbo")
    temperature: float = float(os.getenv("TEMPERATURE", "0.7"))
    max_tokens: int = int(os.getenv("MAX_TOKENS", "2000"))
    
    data_dir: str = "data"
    log_file: str = "app.log"
    
    def validate(self) -> bool:
        if not self.api_key:
            print("错误: API Key 未设置")
            return False
        return True

config = AppConfig()

数据存储(storage.py)

import json
from pathlib import Path
from typing import Optional
from models import ChatSession

class Storage:
    """数据存储"""
    
    def __init__(self, data_dir: str = "data"):
        self.data_dir = Path(data_dir)
        self.data_dir.mkdir(exist_ok=True)
    
    def save_session(self, session: ChatSession):
        """保存会话"""
        filename = self.data_dir / f"{session.session_id}.json"
        with open(filename, "w", encoding="utf-8") as f:
            json.dump(session.model_dump(), f, ensure_ascii=False, indent=2, default=str)
    
    def load_session(self, session_id: str) -> Optional[ChatSession]:
        """加载会话"""
        filename = self.data_dir / f"{session_id}.json"
        try:
            with open(filename, "r", encoding="utf-8") as f:
                data = json.load(f)
            return ChatSession(**data)
        except FileNotFoundError:
            return None
    
    def list_sessions(self) -> list:
        """列出所有会话"""
        return [f.stem for f in self.data_dir.glob("*.json")]

对话逻辑(chat.py)

import time
from typing import Generator
from models import ChatSession, ChatConfig
from config import config

class ChatClient:
    """对话客户端"""
    
    def __init__(self, chat_config: ChatConfig):
        self.config = chat_config
    
    def send_message(self, session: ChatSession, message: str) -> str:
        """发送消息(同步)"""
        session.add_message("user", message)
        
        # 模拟 API 调用
        response = f"这是对 '{message}' 的回复"
        session.add_message("assistant", response)
        
        return response
    
    def send_message_stream(self, session: ChatSession, message: str) -> Generator[str, None, None]:
        """发送消息(流式)"""
        session.add_message("user", message)
        
        # 模拟流式响应
        response = f"这是对 '{message}' 的流式回复内容"
        full_response = []
        
        for word in response.split():
            chunk = word + " "
            full_response.append(chunk)
            yield chunk
            time.sleep(0.1)
        
        # 保存完整响应
        session.add_message("assistant", "".join(full_response).strip())

工具函数(utils.py)

import logging
from datetime import datetime

def setup_logging(log_file: str = "app.log"):
    """配置日志"""
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler(log_file, encoding='utf-8'),
            logging.StreamHandler()
        ]
    )

def generate_session_id() -> str:
    """生成会话ID"""
    return datetime.now().strftime("%Y%m%d_%H%M%S")

主程序(main.py)

import logging
from models import ChatSession, ChatConfig
from chat import ChatClient
from storage import Storage
from utils import setup_logging, generate_session_id
from config import config

def main():
    """主程序"""
    # 配置日志
    setup_logging(config.log_file)
    
    # 验证配置
    if not config.validate():
        return
    
    # 初始化
    storage = Storage(config.data_dir)
    chat_config = ChatConfig(
        model=config.model,
        temperature=config.temperature,
        max_tokens=config.max_tokens
    )
    client = ChatClient(chat_config)
    
    # 创建会话
    session_id = generate_session_id()
    session = ChatSession(session_id=session_id, config=chat_config)
    session.add_message("system", "你是一个Python助手")
    
    logging.info(f"会话开始: {session_id}")
    print(f"=== AI 对话系统 ===")
    print(f"会话ID: {session_id}")
    print("输入 'exit' 退出,'save' 保存会话\n")
    
    # 对话循环
    try:
        while True:
            user_input = input("你: ").strip()
            
            if not user_input:
                continue
            
            if user_input.lower() == "exit":
                break
            
            if user_input.lower() == "save":
                storage.save_session(session)
                print("会话已保存\n")
                continue
            
            # 流式输出
            print("AI: ", end="", flush=True)
            for chunk in client.send_message_stream(session, user_input):
                print(chunk, end="", flush=True)
            print("\n")
            
            logging.info(f"用户: {user_input}")
            logging.info(f"Token 数: {session.total_tokens()}")
    
    except KeyboardInterrupt:
        print("\n\n对话中断")
    
    finally:
        # 保存会话
        storage.save_session(session)
        logging.info(f"会话结束,总 tokens: {session.total_tokens()}")
        print(f"\n会话已保存: {session_id}")
        print(f"总消息数: {len(session.messages)}")
        print(f"总 tokens: {session.total_tokens()}")

if __name__ == "__main__":
    main()

requirements.txt

pydantic>=2.0.0
python-dotenv>=1.0.0
requests>=2.31.0

运行项目

# 安装依赖
pip install -r requirements.txt

# 创建 .env 文件
echo "OPENAI_API_KEY=sk-your-key-here" > .env

# 运行
python main.py

项目扩展建议

1. 真实 API 集成

import requests

class RealChatClient(ChatClient):
    """真实 API 客户端"""
    
    def send_message_stream(self, session: ChatSession, message: str):
        session.add_message("user", message)
        
        url = "https://api.openai.com/v1/chat/completions"
        headers = {
            "Authorization": f"Bearer {config.api_key}",
            "Content-Type": "application/json"
        }
        data = {
            "model": self.config.model,
            "messages": [msg.model_dump() for msg in session.messages],
            "temperature": self.config.temperature,
            "stream": True
        }
        
        response = requests.post(url, headers=headers, json=data, stream=True)
        full_response = []
        
        for line in response.iter_lines():
            if line:
                # 解析 SSE 格式
                chunk = parse_sse(line)
                if chunk:
                    full_response.append(chunk)
                    yield chunk
        
        session.add_message("assistant", "".join(full_response))

2. 异步版本

import asyncio
import aiohttp

class AsyncChatClient:
    """异步对话客户端"""
    
    async def send_message_stream(self, session: ChatSession, message: str):
        session.add_message("user", message)
        
        async with aiohttp.ClientSession() as http_session:
            async with http_session.post(url, headers=headers, json=data) as response:
                full_response = []
                
                async for line in response.content:
                    chunk = parse_sse(line)
                    if chunk:
                        full_response.append(chunk)
                        yield chunk
                
                session.add_message("assistant", "".join(full_response))

3. 多模型支持

class MultiModelClient:
    """多模型客户端"""
    
    def __init__(self):
        self.clients = {
            "openai": OpenAIClient(),
            "claude": ClaudeClient(),
            "gemini": GeminiClient()
        }
    
    def send_message(self, provider: str, message: str):
        client = self.clients.get(provider)
        if not client:
            raise ValueError(f"不支持的模型: {provider}")
        return client.send_message(message)

今日总结

核心知识点回顾

1. 迭代器

  • __iter____next__ 方法
  • 实现自定义迭代器

2. 生成器

  • yield 关键字
  • 生成器表达式
  • 内存效率高

3. 流式输出

  • 逐步返回结果
  • 提升用户体验

4. 异步编程

  • async/await 语法
  • asyncio 模块
  • 并发执行任务

5. 异步生成器

  • async def + yield
  • async for 循环

6. 综合项目

  • 模块化设计
  • 数据模型(Pydantic)
  • 配置管理
  • 数据持久化

常见问题

Q:生成器和列表推导式的区别? A:列表推导式一次生成所有元素,生成器按需生成,节省内存。

Q:什么时候用异步? A:I/O 密集型操作(网络请求、文件读写、数据库查询)。

Q:async def 和 def 的区别? A:async def 定义异步函数,必须用 await 调用;def 定义同步函数。

Q:如何选择同步还是异步? A:计算密集型用同步;I/O 密集型用异步;简单脚本用同步。

Q:综合项目如何组织代码? A:按功能分模块(models, config, storage, chat, utils, main)。

重点注意事项

  1. 生成器只能遍历一次

    gen = (x for x in range(5))
    list(gen)  # [0, 1, 2, 3, 4]
    list(gen)  # [] 已耗尽
  2. 异步函数必须在异步上下文调用

    # 错误
    result = await async_func()  # 在同步函数中不能用 await
    
    # 正确
    async def main():
        result = await async_func()
    asyncio.run(main())
  3. 不要混用同步和异步 I/O

    # 错误:异步函数中用同步 I/O
    async def bad():
        time.sleep(1)  # 阻塞整个事件循环
    
    # 正确
    async def good():
        await asyncio.sleep(1)
  4. 项目配置不要硬编码:使用 .env 文件和环境变量

  5. 数据模型用 Pydantic:自动验证和类型检查

课程总结

6天学习路径:

  • Day1:环境、语法、变量、字符串、条件
  • Day2:循环、列表、字典、元组、集合
  • Day3:函数、模块、lambda、JSON
  • Day4:文件、异常、日志、CSV
  • Day5:OOP、类型提示、HTTP、环境变量
  • Day6:生成器、异步、综合项目

后续学习方向:

  1. AI 框架:LangChain、LlamaIndex
  2. Web 开发:FastAPI、Streamlit
  3. 数据处理:Pandas、NumPy
  4. 深度学习:PyTorch、TensorFlow
  5. Agent 开发:AutoGPT、AgentGPT

恭喜你完成 Python 基础课程,现在可以开始 AI 智能体开发了!