本文介绍如何将Kafka主题中的数据导入嬴图数据库。

以下步骤均在Windows系统上PowerShell中演示。
生成配置文件
打开终端程序,导航至ultipa-importer
所在文件夹。执行以下命令,选择kafka
,为导入Kafka主题生成样本配置文件:
./ultipa-importer --sample

执行命令后,会在ultipa-importer
所在目录下生成配置文件import.sample.kafka.yml
。如果目录下已有该文件,数据将被覆盖。
修改配置文件
根据实际使用场景修改import.sample.kafka.yml
文件。该文件包含以下部分:
mode
:设置为kafka
。kafka
:配置Kafka主机地址或URI以建立连接。server
:提供嬴图服务器信息,并为数据导入指定目标图(新图或已有图)。nodeConfig
:定义点schema,其中每个schema对应一个主题。所有列按序依次映射为点属性。edgeConfig
:定义边schema,其中每个schema对应一个主题。所有列按序依次映射为边属性。settings
:为数据导入设置全局参数和偏好。
# 导入模式:csv/json/jsonl/rdf/graphml/bigQuery/sql/kafka/neo4j/salesforce
mode: kafka
# Kafka主机配置
kafka:
# 主机IP/URI和端口
host: "192.168.1.23:4567"
# 嬴图服务器配置
server:
# 主机IP/URI和端口;若为集群,使用英文逗号分隔
host: "10.11.22.33:1234"
username: "admin"
password: "admin12345"
# 目标图(新图或已有图)
graphset: "myGraph"
# 若上图为新图,指定图所在分片
shards: "1,2,3"
# 若上图为新图,指定分片分区算法(Crc32/Crc64WE/Crc64XZ/CityHash64)
partitionBy: "Crc32"
# TLS加密证书文件路径
crt: ""
# 点配置
nodeConfig:
# 指定schema
- schema: "Customer"
# 指定待消费的主题
topic: "customer"
# offset: 指定在Kafka主题分区中开始消费消息的位置
# 支持以下选项:
## - newest:从最新信息开始消费
## - oldest(默认):从最早的消息开始消费
## - index:从特定offset开始消费
## - time:从特定时间戳开始消费。格式为:yyyy-mm-dd hh:mm:ss(本地时间)或yyyy-mm-dd hh:mm:ss -7000(带有时区偏移)
# 对于大型Kafka主题,使用newest、oldest或特定index要比使用timestamp更加高效
offset: oldest
# properties:将Kafka中的消息映射为属性;所有列按序依次映射
## name: 属性名
## new_name:属性名;默认为上述名称
## type:属性类型;可设置为_id,_from,_to或其他嬴图属性,如int64,float,string等:设置为_ignore时可跳过导入该列
## prefix:为属性值增加前缀;仅适用于_id,_from和_to类型
properties:
- name: cust_no
type: _id
prefix:
- name: name
type: string
- name: level
type: int32
- schema: "Merchant"
topic: "merchant"
offset: oldest
properties:
- name: merch_no
type: _id
- name: name
type: string
- name: type
type: string
# 边配置
edgeConfig:
- schema: "Transfers"
topic: "transaction"
offset: oldest
properties:
- name: trans_no
type: string
- name: cust_no
type: _from
- name: merch_no
type: _to
- name: time
type: datetime
# 全局设置
settings:
# 日志文件路径
logPath: "./logs"
# 每批次导入的点或边的数量
batchSize: 10000
# 插入模式:insert/overwrite/upsert
importMode: insert
# 插入边时,自动创建缺失端点
createNodeIfNotExist: false
# 报错时停止数据导入
stopWhenError: false
# 设置为true时,自动创建新的图、schema和属性
yes: true
# 最大线程数
threads: 32
# RPC最大消息传输量(单位:MB)
maxPacketSize: 40
# 时间戳对应时区
# 默认时区:"+0200"
# 时间戳单位,支持毫秒(ms)或秒(s)
timestampUnit: s
执行导入
使用--config
标志指定配置文件,执行数据导入:
./ultipa-importer --config import.sample.kafka.yml
