OpenClaw实时广告报表系统架构解析
广告主每小时要看投放数据,媒体每天核对收益报表,这些都依赖一套高性能的实时数据管道。本文解析OpenClaw报表系统从数据产生到查询的完整链路。
一、报表系统整体数据流
# 数据流架构
#
# 业务事件 消息队列 流式计算 OLAP存储
# ────────── ───────── ───────── ─────────
# 广告展示 ─┐
# 广告点击 ─┼──► Kafka ────► Flink ────► ClickHouse
# 广告转化 ─┘ (实时) (实时聚合) (查询引擎)
# │
# └──► MySQL (分钟级预聚合)
#
# 查询路径:
# 实时(分钟级): ClickHouse
# 历史(天级): MySQL预聚合表
# 离线分析: Hive/Spark
二、Kafka事件上报规范
# 曝光事件 Schema (Avro格式)
{
'event_type': 'impression',
'timestamp': 1744531200000, # ms级时间戳
'ad_id': 'ad_12345',
'campaign_id': 'camp_678',
'publisher_id': 'pub_901',
'unit_id': 'unit_234',
'user_id': 'user_hash_xxx', # 脱敏
'device_type': 'android',
'geo': {'country': 'CN', 'province': '广东', 'city': '深圳'},
'bid_price': 0.08, # 出价
'clearing_price': 0.05, # 结算价
'request_id': 'req_abc123'
}
# Python上报代码
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['kafka1:9092', 'kafka2:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
compression_type='lz4', # 压缩减少网络传输
acks='all' # 所有副本确认才算成功
)
def track_impression(event):
producer.send('ad_events', event)
# 注意: 不要每次都flush,批量发送性能更好
三、Flink实时聚合Job
# Flink SQL 实时聚合 (每分钟滚动窗口)
CREATE TABLE ad_events (
event_type STRING,
ad_id STRING,
campaign_id STRING,
publisher_id STRING,
clearing_price DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'ad_events',
'format' = 'json'
);
-- 每1分钟聚合一次广告数据
INSERT INTO ad_stats_1min
SELECT
TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS stat_time,
campaign_id,
ad_id,
COUNT_IF(event_type='impression') AS impressions,
COUNT_IF(event_type='click') AS clicks,
SUM(CASE WHEN event_type='impression' THEN clearing_price ELSE 0 END) AS spend,
ROUND(clicks / NULLIF(impressions, 0) * 100, 4) AS ctr
FROM ad_events
WHERE event_time >= CURRENT_TIMESTAMP - INTERVAL '10' MINUTE
GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE), campaign_id, ad_id;
四、ClickHouse查询优化
-- ClickHouse建表 (MergeTree引擎)
CREATE TABLE ad_stats_1min
(
stat_time DateTime,
campaign_id LowCardinality(String), -- LowCardinality提升字符串性能
ad_id LowCardinality(String),
impressions UInt32,
clicks UInt32,
spend Float32,
ctr Float32
)
ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(stat_time) -- 按天分区,方便过期清理
ORDER BY (campaign_id, stat_time) -- 主键排序,加速按campaign查询
TTL toDate(stat_time) + INTERVAL 90 DAY; -- 90天自动过期
-- 查询过去24小时Campaign报表
SELECT
campaign_id,
sum(impressions) AS total_impressions,
sum(clicks) AS total_clicks,
sum(spend) AS total_spend,
round(sum(clicks)/sum(impressions)*100, 2) AS avg_ctr,
round(sum(spend)/sum(clicks), 3) AS avg_cpc
FROM ad_stats_1min
WHERE stat_time >= now() - INTERVAL 24 HOUR
AND campaign_id = 'camp_678'
GROUP BY campaign_id
-- 典型查询时间: <50ms (十亿级数据)
总结:实时报表的黄金链路是 Kafka收集 → Flink窗口聚合 → ClickHouse列存查询。关键设计原则:Kafka保障不丢事件,Flink用Watermark处理乱序,ClickHouse用分区+排序键加速查询。
