
pg-bifrost 是基于 PostgreSQL 逻辑解码 的工具,用于捕获数据库事件流(包括创建、插入、更新、删除等操作),并将其写入 Amazon Kinesis Data Streams 或 RabbitMQ。其模块化设计支持扩展至 S3、DynamoDB 等其他存储或非 AWS 目标,适用于实时数据同步、数据流处理和变更捕获场景。
可通过以下方式获取 pg-bifrost:
bash[Slava pg-bifrost.git]$ export AWS_SECRET_ACCESS_KEY=secretaccesskey [Slava pg-bifrost.git]$ export AWS_ACCESS_KEY_ID=accesskeyid [Slava pg-bifrost.git]$ export AWS_REGION=us-east-1 [Slava pg-bifrost.git]$ pg-bifrost --dbname mydb --host 127.0.0.1 replicate --create-slot kinesis --kinesis-stream dbstream
sqlCREATE TABLE customers (id serial primary key, first_name text, last_name text); INSERT INTO customers (first_name, last_name) VALUES ('Hello', 'World'); INSERT INTO customers (first_name, last_name) VALUES ('Goodbye', 'World'); UPDATE customers SET last_name = 'Friends' where first_name = 'Hello'; DELETE FROM customers WHERE first_name = 'Goodbye';
json{"time":"1970-01-01T00:00:01Z","lsn":"0/1510A58","table":"public.customers","operation":"INSERT","columns":{"first_name":{"new":{"q":"true","t":"text","v":"Hello"}},"id":{"new":{"q":"false","t":"integer","v":"1"}},"last_name":{"new":{"q":"true","t":"text","v":"World"}}}} {"time":"1970-01-01T00:00:01Z","lsn":"0/1510B60","table":"public.customers","operation":"INSERT","columns":{"first_name":{"new":{"q":"true","t":"text","v":"Goodbye"}},"id":{"new":{"q":"false","t":"integer","v":"2"}},"last_name":{"new":{"q":"true","t":"text","v":"World"}}}} {"time":"1970-01-01T00:00:01Z","lsn":"0/1510C20","table":"public.customers","operation":"UPDATE","columns":{"first_name":{"new":{"q":"true","t":"text","v":"Hello"}},"id":{"new":{"q":"false","t":"integer","v":"1"}},"last_name":{"new":{"q":"true","t":"text","v":"Friends"}}}} {"time":"1970-01-01T00:00:01Z","lsn":"0/1510CA8","table":"public.customers","operation":"DELETE","columns":{"id":{"old":{"q":"false","t":"integer","v":"2"}}}}
USAGE: pg-bifrost [global options] command [command options] [arguments...] COMMANDS: create, c 创建复制槽 drop, d 删除复制槽 replicate, r 启动逻辑复制 help, h 显示命令列表或单个命令帮助 GLOBAL OPTIONS: --config value bifrost YAML 配置文件路径(默认:"config.yaml") --slot value PostgreSQL 复制槽名称(默认:"pg_bifrost") [$REPLICATION_SLOT] --user value PostgreSQL 复制用户(默认:"replication") [$PGUSER] --password value PostgreSQL 复制用户密码 [$PGPASSWORD] --host value PostgreSQL 连接主机(默认:"127.0.0.1") [$PGHOST] --port value PostgreSQL 连接端口(默认:"5432") [$PGPORT] --dbname value PostgreSQL 数据库名称(默认:"postgres") [$PGDATABASE] --help, -h 显示帮助 --version, -v 打印版本信息
NAME: pg-bifrost replicate - 启动逻辑复制 USAGE: pg-bifrost replicate command [command options] [arguments...] COMMANDS: stdout 复制到标准输出 kinesis 复制到 Kinesis rabbitmq 复制到 RabbitMQ OPTIONS: --create-slot, -s 复制前若复制槽不存在则创建 [$CREATE_SLOT] --workers value 传输工作线程数(默认:1) [$WORKERS] --client-buffer-size value 从 PostgreSQL 缓冲的消息数(默认:10000) [$CLIENT_BUFFER_SIZE] --batch-flush-update-age value 批处理等待新消息的毫秒数(收到新消息时重置计时器,每秒评估一次)(默认:500) [$BATCH_FLUSH_UPDATE_AGE] --batch-flush-max-age value 批处理最大等待毫秒数(覆盖 batch-flush-update-age,每秒评估一次)(默认:1000) [$BATCH_FLUSH_MAX_AGE] --batch-queue-depth value 每个工作线程可排队的批处理数(默认:2) [$BATCH_QUEUE_DEPTH] --batcher-memory-soft-limit value 批处理消息的最大内存使用量(仅在 batch-flush-timeout 时评估,单位:字节)(默认:104857600) [$BATCHER_MEMORY_SOFT_LIMIT] --batcher-routing-method value 批处理路由至工作线程的方式:"round-robin"(默认,轮询)或 "partition"(分区,需严格有序时使用)(默认:"round-robin") [$BATCHER_ROUTING_METHOD] --partition-method value 消息分区方式:"none"(默认,无分区)、"tablename"(按表名)、"transaction"(按事务,单事务一批)、"transaction-bucket"(事务哈希分区)(默认:"none") [$PARTITION_METHOD] --batcher-partition-count value partition-method="transaction-bucket" 时的分区桶数量(默认:1) [$PARTITION_COUNT] --whitelist value 包含的表白名单(其他表将被排除) [$WHITELIST] --blacklist value 排除的表黑名单(其他表将被包含) [$BLACKLIST] --help, -h 显示帮助
pg-bifrost 支持通过命令行参数、环境变量或配置文件进行配置,详细选项见 --help。
| 配置变量 | 描述 |
|---|---|
| workers | 推送数据至 Kinesis 的工作线程数 |
| client-buffer-size | 从 PostgreSQL 缓冲的消息数 |
| batch-flush-timeout | 批处理等待新消息的毫秒数 |
| batch-queue-depth | 每个工作线程可排队的批处理数 |
| max-memory-bytes | 批处理消息的最大内存使用量 |
支持表级别的包含/排除过滤:
分区配置决定消息如何分组和路由,确保数据有序性和吞吐量平衡:
配置选项
| 配置变量 | 描述 |
|---|---|
| partition-method | 消息分区方式: - none(默认):无分区- tablename:按表名分区- transaction:单事务一批(小事务多时慎用)- transaction-bucket:事务哈希分区(同事务进入同一分区,可包含其他事务) |
| batcher-routing-method | 批处理路由方式: - round-robin(默认):轮询路由至工作线程- partition:按分区路由(需严格有序时使用) |
策略速查表
| 策略目标 | partition-method | batcher-routing-method |
|---|---|---|
| 默认(消息无序分布至分片) | none | round-robin |
| 同事务消息至同一分片(无序) | transaction-bucket | round-robin |
| 同事务消息至同一分片(有序) | transaction-bucket | partition |
| 同表消息至同一分片(无序) | tablename | round-robin |
| 同表消息至同一分片(有序) | tablename | partition |
非安全问题请在 GitHub Issues 提交。
通过以下方式帮助改进项目:
集成测试需 docker 和 docker-compose,步骤如下:
bash# 拉取 bats 子模块 git submodule sync git submodule update --init # 构建 Docker 镜像 make docker_build # 运行集成测试 make itests
本工具受 https://github.com/nickelser/pg_kinesis 启发。
pg-bifrost 基于 Apache License (Version 2.0) 开源,详见 LICENSE。
您可以使用以下命令拉取该镜像。请将 <标签> 替换为具体的标签版本。如需查看所有可用标签版本,请访问 标签列表页面。



探索更多轩辕镜像的使用方法,找到最适合您系统的配置方式
通过 Docker 登录认证访问私有仓库
无需登录使用专属域名
Kubernetes 集群配置 Containerd
K3s 轻量级 Kubernetes 镜像加速
VS Code Dev Containers 配置
Podman 容器引擎配置
HPC 科学计算容器配置
ghcr、Quay、nvcr 等镜像仓库
Harbor Proxy Repository 对接专属域名
Portainer Registries 加速拉取
Nexus3 Docker Proxy 内网缓存
需要其他帮助?请查看我们的 常见问题Docker 镜像访问常见问题解答 或 提交工单
docker search 限制
站内搜不到镜像
离线 save/load
插件要用 plugin install
WSL 拉取慢
安全与 digest
新手拉取配置
镜像合规机制
manifest unknown
no matching manifest(架构)
invalid tar header(解压)
TLS 证书失败
DNS 超时
域名连通性排查
410 Gone 排查
402 与流量用尽
401 认证失败
429 限流
D-Bus 凭证提示
413 与超大单层
来自真实用户的反馈,见证轩辕镜像的优质服务