
altinity/clickhouse-sink-connectorClickHouse Sink Connector是一个Kafka Connect兼容的连接器,旨在实现Kafka与ClickHouse之间的高效数据集成。该连接器允许用户将Kafka主题中的流数据实时写入ClickHouse数据库,提供可靠的数据传输和灵活的配置选项,适用于构建实时数据管道和分析系统。
bashdocker run -d \ --name=clickhouse-sink-connector \ -e BOOTSTRAP_SERVERS=kafka:9092 \ -e CONNECT_GROUP_ID=clickhouse-sink \ -e CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter \ -e CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter \ -e CLICKHOUSE_HOST=clickhouse \ -e CLICKHOUSE_PORT=8123 \ -e CLICKHOUSE_USER=default \ -e CLICKHOUSE_PASSWORD= \ -e CLICKHOUSE_DATABASE=default \ altinity/clickhouse-sink-connector
yamlversion: '3' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 clickhouse: image: yandex/clickhouse-server:latest ports: - "8123:8123" volumes: - clickhouse_data:/var/lib/clickhouse clickhouse-sink: image: altinity/clickhouse-sink-connector depends_on: - kafka - clickhouse environment: BOOTSTRAP_SERVERS: kafka:9092 CONNECT_GROUP_ID: clickhouse-sink CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false" CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false" CONNECT_OFFSET_STORAGE_FILE_FILENAME: /tmp/connect.offsets CLICKHOUSE_HOST: clickhouse CLICKHOUSE_PORT: 8123 CLICKHOUSE_USER: default CLICKHOUSE_PASSWORD: "" CLICKHOUSE_DATABASE: default TOPICS: "user_events,page_views" TABLES: "user_events,page_views" volumes: clickhouse_data:
创建连接器配置文件clickhouse-sink.properties:
propertiesname=clickhouse-sink-connector connector.class=com.altinity.clickhouse.sink.connector.ClickHouseSinkConnector tasks.max=3 # Kafka配置 topics=user_events,page_views # ClickHouse配置 clickhouse.host=clickhouse clickhouse.port=8123 clickhouse.user=default clickhouse.password= clickhouse.database=default # 表映射 topic-to-table.map=user_events:user_events_table,page_views:page_views_table # 数据格式配置 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false # 批处理配置 batch.size=*** linger.ms=5000
使用Kafka Connect REST API创建连接器:
bashcurl -X POST -H "Content-Type: application/json" \ --data @clickhouse-sink.json \ [***]
| 参数名 | 描述 | 默认值 |
|---|---|---|
bootstrap.servers | Kafka brokers地址 | localhost:9092 |
clickhouse.host | ClickHouse主机地址 | localhost |
clickhouse.port | ClickHouse HTTP端口 | 8123 |
clickhouse.user | ClickHouse用户名 | default |
clickhouse.password | ClickHouse密码 | 空字符串 |
clickhouse.database | 目标数据库名 | default |
topics | 需要消费的Kafka主题,逗号分隔 | 无 |
tasks.max | 最大任务数 | 1 |
batch.size | 批处理大小 | 1000 |
linger.ms | 批处理等待时间 | 1000 |
| 参数名 | 描述 | 默认值 |
|---|---|---|
topic-to-table.map | 主题到表的映射关系 | 主题名即表名 |
auto.create.tables | 是否自动创建表 | false |
table.create.template | 表创建模板 | 内置模板 |
clickhouse.socket.timeout | 连接超时时间(毫秒) | 30000 |
retry.max | 最大重试次数 | 3 |
retry.backoff.ms | 重试间隔时间(毫秒) | 1000 |
auto.create.tables=true或手动创建所需表batch.size和linger.ms参数优化性能tasks.max值以充分利用并行处理能力

manifest unknown 错误
TLS 证书验证失败
DNS 解析超时
410 错误:版本过低
402 错误:流量耗尽
身份认证失败错误
429 限流错误
凭证保存错误
来自真实用户的反馈,见证轩辕镜像的优质服务