[深度]OpenClaw实时报表系统架构:Kafka事件流 + Flink聚合 + ClickHouse查询

阿里云推广

OpenClaw实时广告报表系统架构解析

广告主每小时要看投放数据,媒体每天核对收益报表,这些都依赖一套高性能的实时数据管道。本文解析OpenClaw报表系统从数据产生到查询的完整链路。

一、报表系统整体数据流

# 数据流架构
#
# 业务事件           消息队列        流式计算        OLAP存储
# ──────────        ─────────       ─────────       ─────────
# 广告展示  ─┐
# 广告点击  ─┼──► Kafka ────► Flink ────► ClickHouse
# 广告转化  ─┘     (实时)      (实时聚合)   (查询引擎)
#                               │
#                               └──► MySQL  (分钟级预聚合)
#
# 查询路径:
# 实时(分钟级): ClickHouse
# 历史(天级): MySQL预聚合表
# 离线分析: Hive/Spark

二、Kafka事件上报规范

# 曝光事件 Schema (Avro格式)
{
    'event_type': 'impression',
    'timestamp': 1744531200000,  # ms级时间戳
    'ad_id': 'ad_12345',
    'campaign_id': 'camp_678',
    'publisher_id': 'pub_901',
    'unit_id': 'unit_234',
    'user_id': 'user_hash_xxx',  # 脱敏
    'device_type': 'android',
    'geo': {'country': 'CN', 'province': '广东', 'city': '深圳'},
    'bid_price': 0.08,            # 出价
    'clearing_price': 0.05,       # 结算价
    'request_id': 'req_abc123'
}

# Python上报代码
from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['kafka1:9092', 'kafka2:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    compression_type='lz4',  # 压缩减少网络传输
    acks='all'               # 所有副本确认才算成功
)

def track_impression(event):
    producer.send('ad_events', event)
    # 注意: 不要每次都flush,批量发送性能更好

三、Flink实时聚合Job

# Flink SQL 实时聚合 (每分钟滚动窗口)

CREATE TABLE ad_events (
    event_type     STRING,
    ad_id          STRING,
    campaign_id    STRING,
    publisher_id   STRING,
    clearing_price DOUBLE,
    event_time     TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'ad_events',
    'format' = 'json'
);

-- 每1分钟聚合一次广告数据
INSERT INTO ad_stats_1min
SELECT
    TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS stat_time,
    campaign_id,
    ad_id,
    COUNT_IF(event_type='impression')  AS impressions,
    COUNT_IF(event_type='click')       AS clicks,
    SUM(CASE WHEN event_type='impression' THEN clearing_price ELSE 0 END) AS spend,
    ROUND(clicks / NULLIF(impressions, 0) * 100, 4) AS ctr
FROM ad_events
WHERE event_time >= CURRENT_TIMESTAMP - INTERVAL '10' MINUTE
GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE), campaign_id, ad_id;

四、ClickHouse查询优化

-- ClickHouse建表 (MergeTree引擎)
CREATE TABLE ad_stats_1min
(
    stat_time    DateTime,
    campaign_id  LowCardinality(String),  -- LowCardinality提升字符串性能
    ad_id        LowCardinality(String),
    impressions  UInt32,
    clicks       UInt32,
    spend        Float32,
    ctr          Float32
)
ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(stat_time)   -- 按天分区,方便过期清理
ORDER BY (campaign_id, stat_time)     -- 主键排序,加速按campaign查询
TTL toDate(stat_time) + INTERVAL 90 DAY;  -- 90天自动过期

-- 查询过去24小时Campaign报表
SELECT
    campaign_id,
    sum(impressions) AS total_impressions,
    sum(clicks)      AS total_clicks,
    sum(spend)       AS total_spend,
    round(sum(clicks)/sum(impressions)*100, 2) AS avg_ctr,
    round(sum(spend)/sum(clicks), 3) AS avg_cpc
FROM ad_stats_1min
WHERE stat_time >= now() - INTERVAL 24 HOUR
  AND campaign_id = 'camp_678'
GROUP BY campaign_id
-- 典型查询时间: <50ms (十亿级数据)

总结:实时报表的黄金链路是 Kafka收集 → Flink窗口聚合 → ClickHouse列存查询。关键设计原则:Kafka保障不丢事件,Flink用Watermark处理乱序,ClickHouse用分区+排序键加速查询。

发表评论