返回首页

SaaS产品从零到一:3天搭建可收费的AI产品

产品开发变现 — 从零到一完整指南

🎯 课程目标:用 + React 全栈技术,从零搭建一个可盈利的 SaaS 产品,涵盖商业模式、技术实现、定价策略和获客方法。完成本课程后,你将拥有一个完整的 SaaS Starter Kit,可直接用于产品化。


第一章:理解 SaaS 商业模式

1.1 什么是 SaaS

SaaS(Software as a Service,软件即服务)是一种通过互联网交付软件的商业模式。用户无需购买和安装软件,而是通过订阅的方式按需使用。你每天使用的 Notion、Slack、Figma 都是 SaaS 产品。

SaaS 的核心优势:

  • 经常性收入:用户按月/年付费,收入可预测
  • 规模化边际成本低:服务 1 个用户和服务 10000 个用户,基础设施成本增长远低于收入增长
  • 持续迭代:可以不断更新产品,用户自动获得最新版本
  • 数据驱动:可以精确追踪用户行为,优化产品和转化

1.2 核心财务指标

理解以下四个指标,是做 SaaS 的基础:

MRR(Monthly Recurring ,月经常性收入)

MRR 是你每个月可以预期的固定收入。计算方式:

MRR = 付费用户数 × 平均每用户月付费

例如:你有 100 个付费用户,每人每月付 29 元,MRR = 2900 元。

ARR(Annual Recurring Revenue,年经常性收入)

ARR = MRR × 12

ARR = 2900 × 12 = 34800 元/年。

LTV(Lifetime Value,用户生命周期价值)

LTV 表示一个用户在整个付费周期内为你贡献的总收入:

LTV = ARPU × 平均用户生命周期(月)
     = ARPU / 月流失率

例如:ARPU = 50 元/月,月流失率 = 5%,则 LTV = 50 / 0.05 = 1000 元。

CAC(Customer Cost,获客成本)

CAC = 总营销和销售支出 / 新增付费用户数

例如:你花了 5000 元做广告,获得了 50 个付费用户,CAC = 100 元。

黄金法则:LTV / CAC ≥ 3

如果 LTV / CAC 小于 3,说明你的获客成本太高,需要优化。如果大于 3,说明你有空间投入更多资金获客。

1.3 SaaS 增长飞轮

获客 → 激活 → 留存 → 变现 → 推荐 → 获客(循环)

关键指标:

  • Activation Rate:注册后完成核心动作的用户比例
  • Churn Rate(流失率):每月取消订阅的用户比例
  • Net Revenue Retention(净收入留存率):考虑升级和降级后的收入变化
  • Payback Period(回收期):收回 CAC 所需的月数

第二章:技术栈选择

2.1 为什么选 Python + React + PostgreSQL

技术 理由
前端 React + Next.js 生态最大、组件库丰富、SEO 友好
后端 Python FastAPI 异步高性能、自动 文档、类型安全
数据库 PostgreSQL 可靠、支持 JSON、扩展性强
缓存 Redis 会话管理、限流、队列
部署 Docker + 标准化部署、易于扩展
支付 Stripe / 支付宝 国内外覆盖

2.2 项目目录结构

saas-starter/
├── backend/
│   ├── app/
│   │   ├── __init__.py
│   │   ├── main.py              # FastAPI 入口
│   │   ├── config.py            # 配置管理
│   │   ├── .py          # 数据库连接
│   │   ├── models/              # SQLAlchemy 模型
│   │   │   ├── __init__.py
│   │   │   ├── user.py
│   │   │   └── subscription.py
│   │   ├── schemas/             # Pydantic 模型
│   │   │   ├── __init__.py
│   │   │   ├── user.py
│   │   │   └── subscription.py
│   │   ├── api/                 # API 路由
│   │   │   ├── __init__.py
│   │   │   ├── auth.py
│   │   │   ├── users.py
│   │   │   ├── billing.py
│   │   │   └── dashboard.py
│   │   ├── services/            # 业务逻辑
│   │   │   ├── __init__.py
│   │   │   ├── auth_service.py
│   │   │   └── billing_service.py
│   │   └── middleware/          # 中间件
│   │       ├── __init__.py
│   │       └── rate_limit.py
│   ├── alembic/                 # 数据库迁移
│   ├── requirements.txt
│   ├── Dockerfile
│   └── .env.example
├── frontend/
│   ├── src/
│   │   ├── components/
│   │   ├── pages/
│   │   ├── hooks/
│   │   └── lib/
│   ├── package.json
│   └── Dockerfile
├── docker-compose.yml
└── README.md

第三章:MVP 快速搭建 — 3天出原型

3.1 Day 1:后端核心

数据库模型(models/user.py)

# backend/app/models/user.py
import uuid
from datetime import datetime
from sqlalchemy import Column, String, Boolean, DateTime, Enum as SAEnum
from sqlalchemy.dialects.postgresql import UUID
from app.database import Base
import enum


class SubscriptionTier(str, enum.Enum):
     = "free"
    PRO = "pro"
     = "enterprise"


class User(Base):
    __tablename__ = "users"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    email = Column(String(255), unique=True, nullable=False, index=True)
    hashed_password = Column(String(255), nullable=False)
    full_name = Column(String(255))
    is_active = Column(Boolean, default=True)
    is_verified = Column(Boolean, default=False)
    tier = Column(SAEnum(SubscriptionTier), default=SubscriptionTier.FREE)
    api_key = Column(String(64), unique=True, index=True)
    stripe_customer_id = Column(String(255))
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

    def __repr__(self):
        return f"<User {self.email}>"

数据库模型(models/subscription.py)

# backend/app/models/subscription.py
import uuid
from datetime import datetime
from sqlalchemy import Column, String, DateTime, ForeignKey, Integer, Boolean
from sqlalchemy.dialects.postgresql import UUID
from app.database import Base


class Subscription(Base):
    __tablename__ = "subscriptions"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    user_id = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=False)
    stripe_subscription_id = Column(String(255), unique=True)
    stripe_price_id = Column(String(255))
    status = Column(String(50), default="active")  # active, canceled, past_due
    current_period_start = Column(DateTime)
    current_period_end = Column(DateTime)
    cancel_at_period_end = Column(Boolean, default=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)


class UsageRecord(Base):
    __tablename__ = "usage_records"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    user_id = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=False)
    endpoint = Column(String(255))
    tokens_used = Column(Integer, default=0)
    cost_cents = Column(Integer, default=0)
    created_at = Column(DateTime, default=datetime.utcnow)

数据库连接(database.py)

# backend/app/database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from app.config import settings

engine = create_engine(
    settings.DATABASE_URL,
    pool_size=20,
    max_overflow=10,
    pool_pre_ping=True,
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()


def get_db():
    """FastAPI 依赖注入:获取数据库会话"""
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

配置管理(config.py)

# backend/app/config.py
from pydantic_settings import BaseSettings
from typing import Optional


class Settings(BaseSettings):
    # 应用
    APP_NAME: str = "SaaS Starter"
    APP_VERSION: str = "1.0.0"
    DEBUG: bool = False
    SECRET_KEY: str = "change-me-in-production-use-openssl-rand-hex-32"

    # 数据库
    DATABASE_URL: str = "postgresql://postgres:postgres@localhost:5432/saas_db"

    # Redis
    REDIS_URL: str = "redis://localhost:6379/0"

    # JWT
    JWT_SECRET: str = "change-me-in-production"
    JWT_ALGORITHM: str = "HS256"
    JWT_EXPIRE_MINUTES: int = 30

    # Stripe
    STRIPE_SECRET_KEY: Optional[str] = None
    STRIPE_WEBHOOK_SECRET: Optional[str] = None
    STRIPE_PRO_PRICE_ID: Optional[str] = None
    STRIPE_ENTERPRISE_PRICE_ID: Optional[str] = None

    # 限流
    RATE_LIMIT_FREE: int = 100      # 免费用户每天 100 次
    RATE_LIMIT_PRO: int = 10000     # Pro 用户每天 10000 次
    RATE_LIMIT_ENTERPRISE: int = -1  # 企业用户不限

    class Config:
        env_file = ".env"


settings = Settings()

FastAPI 主入口(main.py)

# backend/app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api import auth, users, billing, dashboard
from app.config import settings
from app.database import engine, Base

# 创建数据库表
Base.metadata.create_all(bind=engine)

app = FastAPI(
    title=settings.APP_NAME,
    version=settings.APP_VERSION,
    description="SaaS Starter Kit - 完整的 SaaS 后端模板",
)

# CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:3000", "https://yourdomain.com"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 注册路由
app.include_router(auth., prefix="/api/auth", tags=["认证"])
app.include_router(users.router, prefix="/api/users", tags=["用户"])
app.include_router(billing.router, prefix="/api/billing", tags=["订阅计费"])
app.include_router(dashboard.router, prefix="/api/dashboard", tags=["仪表盘"])


@app.get("/")
def root():
    return {
        "name": settings.APP_NAME,
        "version": settings.APP_VERSION,
        "docs": "/docs",
    }


@app.get("/health")
def health():
    return {"status": "ok"}

3.2 认证系统(核心)

Pydantic 模型(schemas/user.py)

# backend/app/schemas/user.py
from pydantic import BaseModel, EmailStr
from typing import Optional
from uuid import UUID
from datetime import datetime


class UserCreate(BaseModel):
    email: EmailStr
    password: str
    full_name: Optional[str] = None


class UserLogin(BaseModel):
    email: EmailStr
    password: str


class UserResponse(BaseModel):
    id: UUID
    email: str
    full_name: Optional[str]
    is_active: bool
    tier: str
    api_key: Optional[str]
    created_at: datetime

    class Config:
        from_attributes = True


class Token(BaseModel):
    access_token: str
    token_type: str = "bearer"
    user: UserResponse


class PasswordReset(BaseModel):
    email: EmailStr


class PasswordResetConfirm(BaseModel):
    token: str
    new_password: str

认证服务(services/auth_service.py)

# backend/app/services/auth_service.py
import secrets
from datetime import datetime, timedelta
from typing import Optional
from uuid import UUID

from jose import JWTError, jwt
from passlib. import CryptContext
from sqlalchemy.orm import Session

from app.config import settings
from app.models.user import User
from app.schemas.user import UserCreate

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password: str) -> str:
    return pwd_context.hash(password)


def create_access_token(: dict, expires_delta: Optional[timedelta] = None) -> str:
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=settings.JWT_EXPIRE_MINUTES))
    to_encode.({"exp": expire})
    return jwt.encode(to_encode, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM)


def decode_token(token: str) -> Optional[dict]:
    try:
        payload = jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM])
        return payload
    except JWTError:
        return None


def generate_api_key() -> str:
    """生成唯一的 API Key"""
    return f"sk_{secrets.token_hex(24)}"


def create_user(db: Session, user_data: UserCreate) -> User:
    """创建新用户"""
    # 检查邮箱是否已注册
    existing = db.query(User).filter(User.email == user_data.email).first()
    if existing:
        raise ValueError("该邮箱已被注册")

    user = User(
        email=user_data.email,
        hashed_password=get_password_hash(user_data.password),
        full_name=user_data.full_name,
        api_key=generate_api_key(),
    )
    db.add(user)
    db.commit()
    db.refresh(user)
    return user


def authenticate_user(db: Session, email: str, password: str) -> Optional[User]:
    """验证用户登录"""
    user = db.query(User).filter(User.email == email).first()
    if not user or not verify_password(password, user.hashed_password):
        return None
    return user


def get_user_by_id(db: Session, user_id: UUID) -> Optional[User]:
    return db.query(User).filter(User.id == user_id).first()


def get_user_by_api_key(db: Session, api_key: str) -> Optional[User]:
    return db.query(User).filter(User.api_key == api_key).first()

认证路由(api/auth.py)

# backend/app/api/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi. import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session

from app.database import get_db
from app.schemas.user import UserCreate, UserLogin, UserResponse, Token
from app.services.auth_service import (
    create_user,
    authenticate_user,
    create_access_token,
    decode_token,
    get_user_by_id,
)
from uuid import UUID

router = APIRouter()
security = HTTPBearer()


def get_current_user(
    credentials: HTTPAuthorizationCredentials = Depends(security),
    db: Session = Depends(get_db),
):
    """从 JWT Token 获取当前用户"""
    payload = decode_token(credentials.credentials)
    if not payload:
        raise HTTPException(status_code=401, detail="无效的认证凭据")

    user_id = payload.get("sub")
    if not user_id:
        raise HTTPException(status_code=401, detail="无效的 Token")

    user = get_user_by_id(db, UUID(user_id))
    if not user or not user.is_active:
        raise HTTPException(status_code=401, detail="用户不存在或已禁用")

    return user


@router.post("/register", response_model=Token, status_code=201)
def register(user_data: UserCreate, db: Session = Depends(get_db)):
    """用户注册"""
    try:
        user = create_user(db, user_data)
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))

    access_token = create_access_token(data={"sub": str(user.id)})
    return Token(
        access_token=access_token,
        user=UserResponse.model_validate(user),
    )


@router.post("/login", response_model=Token)
def login(login_data: UserLogin, db: Session = Depends(get_db)):
    """用户登录"""
    user = authenticate_user(db, login_data.email, login_data.password)
    if not user:
        raise HTTPException(status_code=401, detail="邮箱或密码错误")

    access_token = create_access_token(data={"sub": str(user.id)})
    return Token(
        access_token=access_token,
        user=UserResponse.model_validate(user),
    )


@router.get("/me", response_model=UserResponse)
def get_me(current_user=Depends(get_current_user)):
    """获取当前用户信息"""
    return UserResponse.model_validate(current_user)

3.3 Day 2:订阅计费系统

Pydantic 模型(schemas/subscription.py)

# backend/app/schemas/subscription.py
from pydantic import BaseModel
from typing import Optional
from uuid import UUID
from datetime import datetime


class CreateCheckout(BaseModel):
    price_id: str           # Stripe Price ID
    success_url: str = "https://yourdomain.com/billing/success"
    cancel_url: str = "https://yourdomain.com/billing/cancel"


class SubscriptionResponse(BaseModel):
    id: UUID
    status: str
    current_period_end: Optional[datetime]
    cancel_at_period_end: bool

    class Config:
        from_attributes = True


class CreatePortalSession(BaseModel):
    return_url: str = "https://yourdomain.com/dashboard"


class UsageResponse(BaseModel):
    total_calls: int
    total_tokens: int
    total_cost_cents: int
    period_start: datetime
    period_end: datetime

计费服务(services/billing_service.py)

# backend/app/services/billing_service.py
from datetime import datetime, timedelta
from typing import Optional
from sqlalchemy.orm import Session
from sqlalchemy import func

from app.config import settings
from app.models.user import User, SubscriptionTier
from app.models.subscription import Subscription, UsageRecord


class BillingService:
    """订阅计费服务"""

    # 定价配置
    PLANS = {
        "free": {
            "name": "Free",
            "price_monthly": 0,
            "api_calls": 100,
            "features": ["基础 API 访问", "社区支持"],
        },
        "pro": {
            "name": "Pro",
            "price_monthly": 29,
            "api_calls": 10000,
            "features": ["全部 API 访问", "优先支持", "高级分析"],
        },
        "enterprise": {
            "name": "Enterprise",
            "price_monthly": 99,
            "api_calls": -1,  # 无限制
            "features": ["全部 API 访问", "专属支持", "SLA 保障", "自定义集成"],
        },
    }

    @staticmethod
    def get_user_usage(db: Session, user_id, days: int = 30) -> dict:
        """获取用户最近 N 天的使用情况"""
        since = datetime.utcnow() - timedelta(days=days)
        result = (
            db.query(
                func.count(UsageRecord.id).label("total_calls"),
                func.coalesce(func.sum(UsageRecord.tokens_used), 0).label("total_tokens"),
                func.coalesce(func.sum(UsageRecord.cost_cents), 0).label("total_cost"),
            )
            .filter(UsageRecord.user_id == user_id, UsageRecord.created_at >= since)
            .first()
        )
        return {
            "total_calls": result.total_calls or 0,
            "total_tokens": int(result.total_tokens or 0),
            "total_cost_cents": int(result.total_cost or 0),
            "period_start": since,
            "period_end": datetime.utcnow(),
        }

    @staticmethod
    def check_rate_limit(db: Session, user: User) -> bool:
        """检查用户是否超出 API 调用限额"""
        plan = BillingService.PLANS.get(user.tier.value, BillingService.PLANS["free"])
        if plan["api_calls"] == -1:
            return True  # 无限制

        usage = BillingService.get_user_usage(db, user.id, days=1)
        return usage["total_calls"] < plan["api_calls"]

    @staticmethod
    def record_usage(db: Session, user_id, endpoint: str, : int = 0, cost: int = 0):
        """记录 API 使用"""
        record = UsageRecord(
            user_id=user_id,
            endpoint=endpoint,
            tokens_used=tokens,
            cost_cents=cost,
        )
        db.add(record)
        db.commit()

    @staticmethod
    def create_stripe_checkout(user: User, price_id: str):
        """创建 Stripe 支付会话"""
        if not settings.STRIPE_SECRET_KEY:
            raise ValueError("Stripe 未配置")

        import stripe
        stripe.api_key = settings.STRIPE_SECRET_KEY

        # 获取或创建 Stripe 客户
        if not user.stripe_customer_id:
            customer = stripe.Customer.create(
                email=user.email,
                metadata={"user_id": str(user.id)},
            )
            user.stripe_customer_id = customer.id
        else:
            customer = stripe.Customer.retrieve(user.stripe_customer_id)

        # 创建 Checkout Session
        session = stripe.checkout.Session.create(
            customer=customer.id,
            payment_method_types=["card"],
            line_items=[{"price": price_id, "quantity": 1}],
            mode="subscription",
            success_url="https://yourdomain.com/billing/success?session_id={CHECKOUT_SESSION_ID}",
            cancel_url="https://yourdomain.com/billing/cancel",
            metadata={"user_id": str(user.id)},
        )
        return session

    @staticmethod
    def create_portal_session(user: User, return_url: str):
        """创建 Stripe 客户门户(管理订阅)"""
        import stripe
        stripe.api_key = settings.STRIPE_SECRET_KEY

        session = stripe.billing_portal.Session.create(
            customer=user.stripe_customer_id,
            return_url=return_url,
        )
        return session


billing_service = BillingService()

计费路由(api/billing.py)

# backend/app/api/billing.py
from fastapi import APIRouter, Depends, HTTPException, Request
from sqlalchemy.orm import Session

from app.database import get_db
from app.api.auth import get_current_user
from app.schemas.subscription import (
    CreateCheckout,
    SubscriptionResponse,
    CreatePortalSession,
    UsageResponse,
)
from app.services.billing_service import billing_service

router = APIRouter()


@router.get("/plans")
def get_plans():
    """获取所有定价方案"""
    return billing_service.PLANS


@router.post("/checkout")
def create_checkout(
    data: CreateCheckout,
    current_user=Depends(get_current_user),
    db: Session = Depends(get_db),
):
    """创建 Stripe 支付会话"""
    try:
        session = billing_service.create_stripe_checkout(current_user, data.price_id)
        return {"checkout_url": session.url, "session_id": session.id}
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))


@router.post("/portal")
def create_portal(
    data: CreatePortalSession,
    current_user=Depends(get_current_user),
):
    """创建客户门户(管理订阅)"""
    try:
        session = billing_service.create_portal_session(current_user, data.return_url)
        return {"portal_url": session.url}
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))


@router.get("/usage", response_model=UsageResponse)
def get_usage(
    current_user=Depends(get_current_user),
    db: Session = Depends(get_db),
):
    """获取当前用户的 API 使用情况"""
    usage = billing_service.get_user_usage(db, current_user.id)
    return UsageResponse(**usage)


@router.post("/webhook")
async def stripe_webhook(request: Request, db: Session = Depends(get_db)):
    """Stripe Webhook 回调"""
    import stripe
    from app.config import settings
    from app.models.user import User, SubscriptionTier
    from app.models.subscription import Subscription

    stripe.api_key = settings.STRIPE_SECRET_KEY
    payload = await request.body()
    sig_header = request.headers.get("stripe-signature")

    try:
        event = stripe.Webhook.construct_event(
            payload, sig_header, settings.STRIPE_WEBHOOK_SECRET
        )
    except (ValueError, stripe.error.SignatureVerificationError):
        raise HTTPException(status_code=400, detail="Webhook 验证失败")

    if event["type"] == "checkout.session.completed":
        session = event["data"]["object"]
        user_id = session["metadata"]["user_id"]
        user = db.query(User).filter(User.id == user_id).first()
        if user:
            user.stripe_customer_id = session["customer"]
            db.commit()

    elif event["type"] == "customer.subscription.":
        sub_data = event["data"]["object"]
        customer_id = sub_data["customer"]
        user = db.query(User).filter(User.stripe_customer_id == customer_id).first()
        if user:
            # 根据 Price ID 确定套餐
            price_id = sub_data["items"]["data"][0]["price"]["id"]
            if price_id == settings.STRIPE_PRO_PRICE_ID:
                user.tier = SubscriptionTier.PRO
            elif price_id == settings.STRIPE_ENTERPRISE_PRICE_ID:
                user.tier = SubscriptionTier.ENTERPRISE
            db.commit()

    elif event["type"] == "customer.subscription.deleted":
        sub_data = event["data"]["object"]
        customer_id = sub_data["customer"]
        user = db.query(User).filter(User.stripe_customer_id == customer_id).first()
        if user:
            user.tier = SubscriptionTier.FREE
            db.commit()

    return {"status": "ok"}

3.4 Day 3:仪表盘和前端

仪表盘路由(api/dashboard.py)

# backend/app/api/dashboard.py
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from sqlalchemy import func
from datetime import datetime, timedelta

from app.database import get_db
from app.api.auth import get_current_user
from app.models.user import User
from app.models.subscription import UsageRecord
from app.services.billing_service import billing_service

router = APIRouter()


@router.get("/stats")
def get_dashboard_stats(
    current_user=Depends(get_current_user),
    db: Session = Depends(get_db),
):
    """仪表盘统计数据"""
    # 本月使用情况
    usage_30d = billing_service.get_user_usage(db, current_user.id, days=30)
    # 今日使用情况
    usage_1d = billing_service.get_user_usage(db, current_user.id, days=1)
    # 最近 7 天每日调用量
    daily_stats = []
    for i in range(6, -1, -1):
        day_start = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=i)
        day_end = day_start + timedelta(days=1)
        count = (
            db.query(func.count(UsageRecord.id))
            .filter(
                UsageRecord.user_id == current_user.id,
                UsageRecord.created_at >= day_start,
                UsageRecord.created_at < day_end,
            )
            .scalar()
        )
        daily_stats.append({"date": day_start.strftime("%Y-%m-%d"), "calls": count or 0})

    plan = billing_service.PLANS.get(current_user.tier.value, billing_service.PLANS["free"])

    return {
        "user": {
            "email": current_user.email,
            "tier": current_user.tier.value,
            "api_key": current_user.api_key,
        },
        "plan": plan,
        "usage": {
            "today": usage_1d,
            "month": usage_30d,
            "daily_chart": daily_stats,
        },
    }

前端核心页面(React)

// frontend/src/pages/Dashboard.jsx
import React, { useState, useEffect } from 'react';
import { BarChart, Bar, XAxis, YAxis, CartesianGrid, Tooltip, ResponsiveContainer } from 'recharts';

const API_BASE = process.env.REACT_APP_API_URL || 'http://localhost:8000';

function Dashboard() {
  const [stats, setStats] = useState(null);
  const [loading, setLoading] = useState(true);

  useEffect(() => {
    fetchStats();
  }, []);

  const fetchStats = async () => {
    try {
      const token = localStorage.getItem('access_token');
      const res = await fetch(`${API_BASE}/api/dashboard/stats`, {
        headers: { Authorization: `Bearer ${token}` },
      });
      if (res.ok) {
        const data = await res.json();
        setStats(data);
      }
    } catch (err) {
      console.error('获取仪表盘数据失败:', err);
    } finally {
      setLoading(false);
    }
  };

  if (loading) return <div className="p-8 text-center">加载中...</div>;
  if (!stats) return <div className="p-8 text-center">数据加载失败</div>;

  const { user, plan, usage } = stats;
  const usagePercent = plan.api_calls > 0
    ? Math.round((usage.month.total_calls / plan.api_calls) * 100)
    : 0;

  return (
    <div className="max-w-6xl mx-auto p-6">
      <h1 className="text-3xl font-bold mb-8">仪表盘</h1>

      {/* 用户信息卡片 */}
      <div className="grid grid-cols-1 md:grid-cols-3 gap-6 mb-8">
        <div className="bg-white rounded-lg shadow p-6">
          <h3 className="text-sm text-gray-500 mb-1">当前套餐</h3>
          <p className="text-2xl font-bold text-blue-600">{plan.name}</p>
          <p className="text-sm text-gray-400 mt-1">
            {plan.api_calls > 0 ? `${plan.api_calls.toLocaleString()} 次/天` : '无限制'}
          </p>
        </div>
        <div className="bg-white rounded-lg shadow p-6">
          <h3 className="text-sm text-gray-500 mb-1">今日 API 调用</h3>
          <p className="text-2xl font-bold">{usage.today.total_calls.toLocaleString()}</p>
        </div>
        <div className="bg-white rounded-lg shadow p-6">
          <h3 className="text-sm text-gray-500 mb-1">本月 API 调用</h3>
          <p className="text-2xl font-bold">{usage.month.total_calls.toLocaleString()}</p>
          <div className="w-full bg-gray-200 rounded-full h-2 mt-2">
            <div
              className="bg-blue-600 h-2 rounded-full"
              style={{ width: `${Math.min(usagePercent, 100)}%` }}
            />
          </div>
          <p className="text-xs text-gray-400 mt-1">{usagePercent}% 已使用</p>
        </div>
      </div>

      {/* API Key */}
      <div className="bg-white rounded-lg shadow p-6 mb-8">
        <h3 className="font-semibold mb-3">你的 API Key</h3>
        <div className="flex items-center gap-2">
          <code className="bg-gray-100 px-4 py-2 rounded flex-1 text-sm">
            {user.api_key}
          </code>
          <button
            onClick={() => navigator.clipboard.writeText(user.api_key)}
            className="px-4 py-2 bg-blue-600 text-white rounded hover:bg-blue-700"
          >
            复制
          </button>
        </div>
      </div>

      {/* 调用趋势图 */}
      <div className="bg-white rounded-lg shadow p-6">
        <h3 className="font-semibold mb-4">最近 7 天调用趋势</h3>
        <ResponsiveContainer width="100%" height={300}>
          <BarChart data={usage.daily_chart}>
            <CartesianGrid strokeDasharray="3 3" />
            <XAxis dataKey="date" />
            <YAxis />
            <Tooltip />
            <Bar dataKey="calls" fill="#3B82F6" radius={[4, 4, 0, 0]} />
          </BarChart>
        </ResponsiveContainer>
      </div>
    </div>
  );
}

export default Dashboard;

前端登录页面

// frontend/src/pages/Login.jsx
import React, { useState } from 'react';

const API_BASE = process.env.REACT_APP_API_URL || 'http://localhost:8000';

function Login({ onLogin }) {
  const [mode, setMode] = useState('login'); // login | register
  const [email, setEmail] = useState('');
  const [password, setPassword] = useState('');
  const [fullName, setFullName] = useState('');
  const [error, setError] = useState('');
  const [loading, setLoading] = useState(false);

  const handleSubmit = async (e) => {
    e.preventDefault();
    setError('');
    setLoading(true);

    try {
      const endpoint = mode === 'login' ? '/api/auth/login' : '/api/auth/register';
      const body = mode === 'login'
        ? { email, password }
        : { email, password, full_name: fullName };

      const res = await fetch(`${API_BASE}${endpoint}`, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify(body),
      });

      const data = await res.json();
      if (!res.ok) {
        throw  Error(data.detail || '请求失败');
      }

      localStorage.setItem('access_token', data.access_token);
      onLogin(data.user);
    } catch (err) {
      setError(err.message);
    } finally {
      setLoading(false);
    }
  };

  return (
    <div className="min-h-screen flex items-center justify-center bg-gray-50">
      <div className="bg-white rounded-lg shadow-lg p-8 w-full max-w-md">
        <h2 className="text-2xl font-bold text-center mb-6">
          {mode === 'login' ? '登录' : '注册'}
        </h2>

        {error && (
          <div className="bg-red-50 text-red-600 p-3 rounded mb-4 text-sm">{error}</div>
        )}

        <form onSubmit={handleSubmit} className="-y-4">
          {mode === 'register' && (
            <div>
              <label className="block text-sm font-medium text-gray-700 mb-1">姓名</label>
              <input
                type="text"
                value={fullName}
                onChange={(e) => setFullName(e.target.value)}
                className="w-full border rounded-lg px-3 py-2"
                placeholder="你的姓名"
              />
            </div>
          )}
          <div>
            <label className="block text-sm font-medium text-gray-700 mb-1">邮箱</label>
            <input
              type="email"
              value={email}
              onChange={(e) => setEmail(e.target.value)}
              className="w-full border rounded-lg px-3 py-2"
              placeholder="[email protected]"
              required
            />
          </div>
          <div>
            <label className="block text-sm font-medium text-gray-700 mb-1">密码</label>
            <input
              type="password"
              value={password}
              onChange={(e) => setPassword(e.target.value)}
              className="w-full border rounded-lg px-3 py-2"
              placeholder="至少 8 位"
              minLength={8}
              required
            />
          </div>
          <button
            type="submit"
            disabled={loading}
            className="w-full bg-blue-600 text-white py-2 rounded-lg hover:bg-blue-700 disabled:opacity-50"
          >
            {loading ? '处理中...' : mode === 'login' ? '登录' : '注册'}
          </button>
        </form>

        <p className="text-center text-sm text-gray-500 mt-4">
          {mode === 'login' ? '没有账号?' : '已有账号?'}
          <button
            onClick={() => setMode(mode === 'login' ? 'register' : 'login')}
            className="text-blue-600 -1 hover:underline"
          >
            {mode === 'login' ? '立即注册' : '去登录'}
          </button>
        </p>
      </div>
    </div>
  );
}

export default Login;

第四章:定价策略

4.1 免费增值(Freemium)

提供一个免费的基础版本吸引用户,高级功能收费。适用于工具类产品。

适用场景:开发者工具、API 服务、 应用

关键:免费版要足够好用,但要让用户看到升级的价值。

4.2 分层定价(Tiered

设置 2-4 个套餐,满足不同规模用户的需求:

套餐 价格 目标用户 API 调用
Free $0 个人开发者、试用 100/天
Pro $29/月 小团队、创业公司 10,000/天
Enterprise $99/月 大公司 无限制

4.3 按量计费(Usage-Based Pricing)

用户按实际使用量付费。适合 API、云计算等场景。

费用 = 调用次数 × 单价

混合模式:基础月费 + 超出部分按量计费。


第五章:获客渠道

5.1 内容营销

  • 写技术博客(SEO 友好)
  • 发布教程视频(/B站)
  • 在技术社区(V2EX/掘金/知乎)分享

5.2 SEO 优化

  • 针对长尾关键词优化
  • 产品文档自动生成 SEO 友好的页面
  • 写"最佳实践"和"对比"类文章

5.3 社群运营

  • 建立微信/Telegram 社群
  • 在 Discord/Slack 建立用户社区
  • 定期举办线上分享会

5.4 冷推(Cold Outreach)

  • 找到目标用户的邮箱
  • 个性化邮件模板
  • 提供免费试用或折扣

第六章:部署和运维

6.1 Docker 部署

# docker-compose.yml
version: "3.8"

services:
  db:
    : postgres:15
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: saas_db
    volumes:
      - pgdata:/var/lib/postgresql/data
    ports:
      - "5432:5432"

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

  backend:
    build: ./backend
    ports:
      - "8000:8000"
    environment:
      DATABASE_URL: postgresql://postgres:postgres@db:5432/saas_db
      REDIS_URL: redis://redis:6379/0
    depends_on:
      - db
      - redis
    command: uvicorn app.main:app --host 0.0.0.0 --port 8000

  frontend:
    build: ./frontend
    ports:
      - "3000:3000"
    environment:
      REACT_APP_API_URL: http://backend:8000

volumes:
  pgdata:

6.2 限流中间件

# backend/app/middleware/rate_limit.py
import time
from fastapi import Request, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
import redis
from app.config import settings

redis_client = redis.from_url(settings.REDIS_URL)


class RateLimitMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        # 只限流 API 路径
        if not request.url.path.startswith("/api/"):
            return await call_next(request)

        # 从请求中获取用户标识(API Key 或 IP)
        api_key = request.headers.get("X-API-Key")
        if not api_key:
            auth_header = request.headers.get("Authorization", "")
            if auth_header.startswith("Bearer "):
                api_key = auth_header[7:]
            else:
                api_key = request.client.host  # 降级到 IP 限流

        key = f"rate_limit:{api_key}:{int(time.time() // 86400)}"
        current = redis_client.get(key)
        current = int(current) if current else 0

        if current >= settings.RATE_LIMIT_PRO:  # 默认上限
            raise HTTPException(
                status_code=429,
                detail="API 调用次数已达上限,请升级套餐或明天再试",
            )

        pipe = redis_client.pipeline()
        pipe.incr(key)
        pipe.expire(key, 86400)
        pipe.execute()

        response = await call_next(request)
        response.headers["X-RateLimit-Remaining"] = str(settings.RATE_LIMIT_PRO - current - 1)
        return response

6.3 requirements.txt

# backend/requirements.txt
fastapi==0.104.1
uvicorn[standard]==0.24.0
sqlalchemy==2.0.23
psycopg2-binary==2.9.9
alembic==1.13.0
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
pydantic[email]==2.5.2
pydantic-settings==2.1.0
redis==5.0.1
stripe==7.8.0
python-multipart==0.0.6
httpx==0.25.2

总结

本课程带你从零完成了 SaaS 产品的完整开发流程:

  1. 商业模式:理解了 MRR/ARR/LTV/CAC 核心指标
  2. 技术架构:FastAPI + React + PostgreSQL 全栈方案
  3. MVP 实现:3 天搭建了包含认证、计费、仪表盘的完整后端
  4. 定价策略:免费增值 + 分层定价的混合模式
  5. 获客方法:内容营销、SEO、社群、冷推四大渠道
  6. 部署运维:Docker 容器化部署、Redis 限流

下一步行动:

  • 把代码 clone 下来,填入你的 Stripe Key,跑起来
  • 选一个你熟悉领域的痛点,做 MVP
  • 3 天后发布到 Product Hunt / V2EX 获取第一批用户

评论