senzing/stream-producer向队列填充记录,供stream-loader消费。
stream-producer.py Python脚本可读取不同格式的文件(JSON、CSV、Parquet、Avro)并将其发布到队列(RabbitMQ、Kafka、AWS SQS)。senzing/stream-producer Docker镜像作为封装,用于Docker部署形式(如docker-compose、kubernetes)。
要查看所有子命令,请运行:
console$ ./stream-producer.py --help usage: stream-producer.py [-h] {avro-to-kafka,avro-to-rabbitmq,avro-to-sqs,avro-to-sqs-batch,avro-to-stdout,csv-to-kafka,csv-to-rabbitmq,csv-to-sqs,csv-to-sqs-batch,csv-to-stdout,gzipped-json-to-kafka,gzipped-json-to-rabbitmq,gzipped-json-to-sqs,gzipped-json-to-sqs-batch,gzipped-json-to-stdout,json-to-kafka,json-to-rabbitmq,json-to-sqs,json-to-sqs-batch,json-to-stdout,parquet-to-kafka,parquet-to-rabbitmq,parquet-to-sqs,parquet-to-sqs-batch,parquet-to-stdout,websocket-to-kafka,websocket-to-rabbitmq,websocket-to-sqs,websocket-to-sqs-batch,websocket-to-stdout,sleep,version,docker-acceptance-test} ... 队列消息工具。更多信息,请参见 [***] 位置参数: {avro-to-kafka,avro-to-rabbitmq,avro-to-sqs,avro-to-sqs-batch,avro-to-stdout,csv-to-kafka,csv-to-rabbitmq,csv-to-sqs,csv-to-sqs-batch,csv-to-stdout,gzipped-json-to-kafka,gzipped-json-to-rabbitmq,gzipped-json-to-sqs,gzipped-json-to-sqs-batch,gzipped-json-to-stdout,json-to-kafka,json-to-rabbitmq,json-to-sqs,json-to-sqs-batch,json-to-stdout,parquet-to-kafka,parquet-to-rabbitmq,parquet-to-sqs,parquet-to-sqs-batch,parquet-to-stdout,websocket-to-kafka,websocket-to-rabbitmq,websocket-to-sqs,websocket-to-sqs-batch,websocket-to-stdout,sleep,version,docker-acceptance-test} 子命令(SENZING_SUBCOMMAND): avro-to-kafka 读取Avro文件并发送到Kafka avro-to-rabbitmq 读取Avro文件并发送到RabbitMQ avro-to-sqs 读取Avro文件并发送到AWS SQS avro-to-stdout 读取Avro文件并输出到STDOUT csv-to-kafka 读取CSV文件并发送到Kafka csv-to-rabbitmq 读取CSV文件并发送到RabbitMQ csv-to-sqs 读取CSV文件并发送到SQS csv-to-stdout 读取CSV文件并输出到STDOUT gzipped-json-to-kafka 读取gzip压缩的JSON文件并发送到Kafka gzipped-json-to-rabbitmq 读取gzip压缩的JSON文件并发送到RabbitMQ gzipped-json-to-sqs 读取gzip压缩的JSON文件并发送到AWS SQS gzipped-json-to-stdout 读取gzip压缩的JSON文件并输出到STDOUT json-to-kafka 读取JSON文件并发送到Kafka json-to-rabbitmq 读取JSON文件并发送到RabbitMQ json-to-sqs 读取JSON文件并发送到AWS SQS json-to-stdout 读取JSON文件并输出到STDOUT parquet-to-kafka 读取Parquet文件并发送到Kafka parquet-to-rabbitmq 读取Parquet文件并发送到RabbitMQ parquet-to-sqs 读取Parquet文件并发送到AWS SQS parquet-to-stdout 读取Parquet文件并输出到STDOUT sleep 仅休眠,用于Docker测试 version 打印程序版本 docker-acceptance-test 用于Docker验收测试 可选参数: -h, --help 显示此帮助消息并退出
在Senzing,我们致力于以“别让我思考”的风格创建GitHub文档。大多数情况下,指令是复制粘贴即可执行的。当需要额外思考时,会标记为“思考”图标 :thinking:。当需要自定义时,会标记为“铅笔”图标 :pencil2:。如果指令不清晰,请通过创建新的文档问题告诉我们需要改进的地方。现在开始演示...
运行Docker容器。以下命令将显示帮助信息。示例:
consoledocker run \ --rm \ senzing/stream-producer --help
更多使用示例,请参见Docker示例。
部署Stream Loader所需的支持服务。
指定放置工件的目录。示例:
consoleexport SENZING_VOLUME=~/my-senzing mkdir -p ${SENZING_VOLUME}
下载docker-compose.yaml文件。示例:
consolecurl -X GET \ --output ${SENZING_VOLUME}/docker-compose.yaml \ [***]
启动docker-compose栈。示例:
consoledocker-compose -f ${SENZING_VOLUME}/docker-compose.yaml up
:thinking: 继续之前需要完成以下任务。这些是“一次性任务”,可能已经完成。
安装Python依赖项。示例:
consolepip3 install -r [***]
获取stream-producer.py的本地副本。示例:
:pencil2: 指定下载文件的位置。示例:
consoleexport SENZING_DOWNLOAD_FILE=~/stream-producer.py
下载文件。示例:
consolecurl -X GET \ --output ${SENZING_DOWNLOAD_FILE} \ [***]
使文件可执行。示例:
consolechmod +x ${SENZING_DOWNLOAD_FILE}
:thinking: 替代方案:可通过克隆仓库中的说明下载整个git仓库。
运行命令。示例:
console${SENZING_DOWNLOAD_FILE} --help
更多使用示例,请参见CLI示例。
通过环境变量或命令行参数指定配置值。
stream-producer.py使用AWS SDK for Python (Boto3)访问AWS服务。可通过环境变量或~/.aws/config文件配置此库。
用于配置的环境变量示例:


manifest unknown 错误
TLS 证书验证失败
DNS 解析超时
410 错误:版本过低
402 错误:流量耗尽
身份认证失败错误
429 限流错误
凭证保存错误
来自真实用户的反馈,见证轩辕镜像的优质服务