修改密码

请输入密码
请输入密码 请输入8-64长度密码 和 email 地址不相同 至少包括数字、大写字母、小写字母、半角符号中的 3 个
请输入密码
提交

修改昵称

当前昵称:
提交

申请证书

证书详情

Please complete this required field.

  • Ultipa Graph V4

Standalone

Please complete this required field.

Please complete this required field.

服务器的MAC地址

Please complete this required field.

Please complete this required field.

取消
申请
ID
产品
状态
核数
申请天数
审批时间
过期时间
MAC地址
申请理由
审核信息
关闭
基础信息
  • 用户昵称:
  • 手机号:
  • 公司名称:
  • 公司邮箱:
  • 地区:
  • 语言:
修改密码
申请证书

当前未申请证书.

申请证书
Certificate Issued at Valid until Serial No. File
Serial No. Valid until File

Not having one? Apply now! >>>

ProductName CreateTime ID Price File
ProductName CreateTime ID Price File

No Invoice

v5.0
搜索
    v5.0

      Spark Connector

      概述

      嬴图Spark Connector通过嬴图Java SDK连接嬴图和Apache Spark,用于在Spark环境中对嬴图数据库进行读取和写入。

      嬴图Spark Connector基于最新的Spark DataSource API,支持与Spark交互的各种语言,包括Scala、Python、Java和R。本手册以Scala为例进行说明,使用其他语言只需进行少量的语法调整。

      安装

      版本要求

      嬴图Spark Connector支持以下版本的嬴图和Spark:

      • 嬴图v4.x(v4.3及以上),单实例或集群均可
      • 使用Scala 2.12的Spark 2.4.8

      导入依赖

      在pom.xml文件中导入嬴图Spark Connector依赖:

      <dependencies>
        <dependency>
          <groupId>com.ultipa</groupId>
          <artifactId>ultipa-spark-connector</artifactId>
          <version>1.0.0</version>
        </dependency>
      </dependencies>
      

      读取

      你可以根据点schema、边schema或一个UQL查询语句从嬴图数据库读取数据为一个Spark DataFrame。

      Spark不支持嬴图所有的属性数据类型,详情请参考数据类型转换

      通过点Schema读

      读取指定schema中所有点的_id和自定义属性数据。

      示例:从Test图集读取所有Person点数据

      import org.apache.spark.sql.{SaveMode, SparkSession}
      
      val spark = SparkSession.builder().getOrCreate()
      
      val df = spark.read.format("com.ultipa.spark.DataSource")
        .option("hosts","192.168.1.56:63940,192.168.1.57:63940,192.168.1.58:63940")
        .option("auth.username","root")
        .option("auth.password","root")
        .option("graph","Test")
        .option("nodes","Person")
        .load()
      
      df.show()
      

      结果:

      _id name gender
      U001 Alice female
      U002 Bruce male
      U003 Joe male

      通过边Schema读

      读取指定schema中所有边的_from_to和自定义属性数据。

      示例:从Test图集读取所有Follows边数据

      import org.apache.spark.sql.{SaveMode, SparkSession}
      
      val spark = SparkSession.builder().getOrCreate()
      
      val df = spark.read.format("com.ultipa.spark.DataSource")
        .option("hosts","192.168.1.56:63940,192.168.1.57:63940,192.168.1.58:63940")
        .option("auth.username","root")
        .option("auth.password","root")
        .option("graph","Test")
        .option("edges","Follows")
        .load()
      
      df.show()
      

      结果:

      _from _to
      since
      level
      U001 U002 2019-12-15 12:10:09 1
      U003 U001 2021-1-20 09:15:02 2

      通过UQL读

      读取一个UQL语句返回的数据。该UQL语句必须包含RETURN子句,且返回的数据类型为ATTR或TABLE。不支持其他的返回值类型(如NODE、EDGE和PATH等)。什么是返回值类型

      示例:从Test图集读取一个UQL语句返回的数据

      import org.apache.spark.sql.{SaveMode, SparkSession}
      
      val spark = SparkSession.builder().getOrCreate()
      
      val df = spark.read.format("com.ultipa.spark.DataSource")
        .option("hosts","192.168.1.56:63940,192.168.1.57:63940,192.168.1.58:63940")
        .option("auth.username","root")
        .option("auth.password","root")
        .option("graph","Test")
        .option("query","find().nodes() as n return n.name, n.gender")
        .load()
      
      df.show()
      

      结果:

      n.name n.gender
      Alice female
      Bruce male
      Joe male

      写入

      你可以将一个Spark DataFrame作为点或边数据写入嬴图数据库的一个schema中。DataFrame中的每列映射为点或边的一个属性,列名就是属性名(点的_id以及边的_from_to属性除外),不存在的属性会在写入过程中自动创建。

      每个属性的数据类型由DataFrame每列的数据类型决定,详情请参考数据类型转换

      写入点Schema

      示例:将一个DataFrame写为Test图集中的Person点

      import org.apache.spark.sql.{SaveMode, SparkSession}
      
      val spark = SparkSession.builder().getOrCreate()
      
      val data = Seq(("Alice", "Teacher", 25, 1.11), ("Bob", "Worker", 30, 2.22), ("Charlie", "Officer", 35, 3.33))
      
      val df = spark.createDataFrame(data).toDF("name", "job", "age", "income")
      df.show()
      
      df.write.format("com.ultipa.spark.DataSource")
        .option("hosts","192.168.1.56:63940,192.168.1.57:63940,192.168.1.58:63940")
        .option("auth.username","root")
        .option("auth.password","root")
        .option("graph","Test")
        .option("nodes", "Person")
        .option("nodes.id", "name")
        .save()
      

      写入边Schema

      示例:将一个DataFrame写为Test图集中的RelatesTo边

      import org.apache.spark.sql.{SaveMode, SparkSession}
      
      val spark = SparkSession.builder().getOrCreate()
      
      val data = Seq(("Alice", "Bob", "couple"), ("Bob", "Charlie", "couple"), ("Charlie", "Alice", "friend"))
      
      val df = spark.createDataFrame(data).toDF("from", "to", "type")
      df.show()
      
      df.write.format("com.ultipa.spark.DataSource")
        .option("hosts","192.168.1.56:63940,192.168.1.57:63940,192.168.1.58:63940")
        .option("auth.username","root")
        .option("auth.password","root")
        .option("graph","Test")
        .option("edges", "RelatesTo")
        .option("edges.from", "from")
        .option("edges.to", "to")
        .save()
      

      配置

      Options

      在Spark API中,DataFrameReaderDataFrameWriter类都提供option()方法,用于指定读取或写入操作的各项参数。

      以下是嬴图Spark Connector支持的所有options:

      通用

      Option键
      默认值
      描述
      可选
      hosts 嬴图服务器或集群(英文逗号隔开)的IP地址或URL地址(不包括 "https://" 或 "http://")
      auth.username 用户名
      auth.password 密码
      graph default 要连接的图集名
      connection.timeout 15 请求超时时间(秒)
      connection.connect.timeout 2000 连接超时时间(毫秒),默认每个节点尝试三次
      connection.heartbeat 10000 所有实例的心跳时间(毫秒),0表示关闭心跳
      connection.max.recv.size 41943040 接收数据的最大字节数

      读取

      Option键
      默认值
      描述
      可选
      nodes 点schema名称
      edges 边schema名称
      query 读取数据的UQL查询语句

      写入

      Option键
      默认值
      描述
      可选
      nodes 点schema名称;如果指定的schema不存在,则在写入过程中自动创建
      nodes.id 作为点的_id属性的DataFrame中的一个列名
      edges 边schema名称;如果指定的schema不存在,则在写入过程中自动创建
      edges.from 作为边的_from属性的DataFrame中的一个列名
      edges.to 作为边的_to属性的DataFrame中的一个列名

      全局配置

      你可以在每次连接时设置options,或在Spark会话中进行全局配置来避免重复设置。为此,可以通过在config()方法实现,但注意此时需要在option键名称前加上ultipa.

      示例:对hostsauth.usernameauth.passwordconnection.timeout这几个option进行全局配置

      import org.apache.spark.sql.{SaveMode, SparkSession}
      
      val spark = SparkSession.builder()
        .config("ultipa.hosts", "192.168.1.56:63940,192.168.1.57:63940,192.168.1.58:63940")
        .config("ultipa.auth.username","root")
        .config("ultipa.auth.password","root")
        .config("ultipa.graph", "Test")
        .config("ultipa.connection.timeout", 600)
        .getOrCreate()
      
      val dfPerson = spark.read.format("com.ultipa.spark.DataSource")
        .option("nodes", "Person")
        .load()
      

      数据类型转换

      嬴图属性类型 Spark数据类型
      _id, _from, _to StringType
      _uuid, _from_uuid, _to_uuid LongType
      int32 IntegerType
      uint32 LongType
      int64 LongType
      uint64 StringType
      float FloatType
      double DoubleType
      decimal
      string StringType
      text
      datetime TimestampType
      timestamp TimestampType
      point
      blob BinaryType
      list
      set
      ignore NullType
      UNSET NullType
      _ StringType
      请完成以下信息后可下载此书
      *
      公司名称不能为空
      *
      公司邮箱必须填写
      *
      你的名字必须填写
      *
      你的电话必须填写
      *
      你的电话必须填写