[深度]OpenClaw流量分配引擎:Pacing算法、频控原理与预算控制实战

阿里云推广

广告流量分配引擎深度解析

Pacing(节奏控制)和频控(Frequency Capping)是广告系统最重要但又最容易被忽视的两个模块。做不好Pacing,预算在早上就烧光;做不好频控,用户被骚扰到卸载App。

一、Pacing:如何让预算均匀消耗?

假设广告主日预算1000元,如果没有Pacing控制,系统会在流量高峰把预算全部消耗,导致下午流量段完全没有曝光。

# 理想 vs 不理想的预算消耗曲线
#
# 不理想(无Pacing):
# 消耗率%  100|████████
#              |        ░░░░░░░░░░░░  <- 预算08:00就耗尽
#              0  6  12  18  24  时
#
# 理想(有Pacing):
# 消耗率%  100|                    ████
#              |  ██████████████████
#              0  6  12  18  24  时
#              均匀消耗,覆盖全天流量

二、Standard Pacing 算法实现

import redis, time

r = redis.Redis(decode_responses=True)

class StandardPacing:
    """
    标准Pacing算法: 令牌桶 + 实时预算追踪
    """
    def __init__(self, campaign_id, daily_budget, tz_offset=8):
        self.cid = campaign_id
        self.budget = daily_budget
        self.tz_offset = tz_offset

    def get_target_spend_ratio(self):
        """计算当前时刻应消耗预算的比例"""
        now = time.time() + self.tz_offset * 3600
        seconds_elapsed = now % 86400  # 当天已过秒数
        return seconds_elapsed / 86400  # 0.0 ~ 1.0

    def should_serve(self):
        """判断当前请求是否应该展示广告"""
        # 已消耗预算
        spent = float(r.get(f'campaign:{self.cid}:spent') or 0)
        # 理论应消耗预算
        target = self.budget * self.get_target_spend_ratio()

        if spent < target * 0.9:   # 消耗偏慢,提高投放概率
            throttle_rate = 0.95
        elif spent > target * 1.1: # 消耗偏快,降低投放概率
            throttle_rate = 0.3
        else:
            throttle_rate = 0.8    # 正常

        import random
        return random.random() < throttle_rate

    def record_spend(self, cost):
        """记录一次消费"""
        pipe = r.pipeline()
        pipe.incrbyfloat(f'campaign:{self.cid}:spent', cost)
        # 次日0点过期
        pipe.expireat(f'campaign:{self.cid}:spent', self._next_midnight())
        pipe.execute()

    def _next_midnight(self):
        """下一个0点的Unix时间戳"""
        now = time.time() + self.tz_offset * 3600
        elapsed = now % 86400
        return int(time.time() + (86400 - elapsed))

# 使用
pacing = StandardPacing('campaign_123', daily_budget=1000.0)
if pacing.should_serve():
    # 展示广告
    pacing.record_spend(0.05)  # 记录CPC消费

三、频控(Frequency Capping):保护用户体验

class FrequencyController:
    """
    多维度频控: 每广告 + 每Campaign + 全局
    """
    def __init__(self):
        self.r = redis.Redis(decode_responses=True)

    def check_and_increment(self, user_id, ad_id, campaign_id,
                            ad_cap=3, campaign_cap=10, global_cap=20):
        """
        三层频控检查 (24小时窗口)
        Returns: True=允许展示, False=超频拦截
        """
        pipe = self.r.pipeline(watch=True)
        try:
            # 使用滑动窗口 - Sorted Set存时间戳
            now = time.time()
            window = 86400  # 24小时
            cutoff = now - window

            # 单广告频控
            ad_key = f'fc:ad:{user_id}:{ad_id}'
            ad_count = self.r.zcount(ad_key, cutoff, now)
            if ad_count >= ad_cap:
                return False, 'ad_cap_exceeded'

            # Campaign级频控
            camp_key = f'fc:camp:{user_id}:{campaign_id}'
            camp_count = self.r.zcount(camp_key, cutoff, now)
            if camp_count >= campaign_cap:
                return False, 'campaign_cap_exceeded'

            # 全局频控
            global_key = f'fc:global:{user_id}'
            global_count = self.r.zcount(global_key, cutoff, now)
            if global_count >= global_cap:
                return False, 'global_cap_exceeded'

            # 通过检查,更新计数
            pipe.multi()
            for key in [ad_key, camp_key, global_key]:
                pipe.zadd(key, {str(now): now})
                pipe.zremrangebyscore(key, 0, cutoff)  # 清理过期
                pipe.expire(key, window + 60)
            pipe.execute()
            return True, 'ok'

        except redis.WatchError:
            return True, 'watch_error_allow'  # 竞态下放行,容忍小误差

四、Pacing效果监控指标

指标 计算公式 健康范围 异常处理
消耗平滑度 实际消耗/理论消耗 0.85 ~ 1.15 超出范围触发Pacing调整
预算利用率 当日消耗/日预算 >0.95 <0.8说明流量不足或底价过高
频控命中率 被频控拦截/总请求 5% ~ 20% >30%说明广告太少或用户质量差
超时率 超100ms的请求 <1% >5%需要优化竞价引擎性能
# Prometheus监控指标上报
from prometheus_client import Counter, Histogram

ad_requests_total = Counter('openclaw_ad_requests_total',
    'Total ad requests', ['app_id', 'unit_type'])

ad_latency = Histogram('openclaw_ad_latency_ms',
    'Ad request latency', buckets=[10, 30, 50, 80, 100, 200, 500])

pacing_throttle = Counter('openclaw_pacing_throttle_total',
    'Pacing throttle count', ['campaign_id', 'reason'])

# 在竞价引擎中调用
with ad_latency.time():
    result = bidding_engine.process(request)
ad_requests_total.labels(app_id=request.app_id, unit_type='banner').inc()

总结:Pacing和频控是广告系统商业模式的保障——Pacing保证广告主预算效率最大化,频控保证用户体验不被广告轰炸。两者都需要高性能的Redis支撑,毫秒级响应是基本要求。

发表评论