本节为您介绍如何使用Connection
对象的方法管理实例上的进程和当前图集上的任务。
每个示例主要展示如何使用所列方法。点击完整示例查看完整代码示例。
Process
所有UQL查询(使用write()
方法执行的算法除外)都作为进程运行。进程完成后,其结果会立刻返回给客户端,但不会被存储。
top()
获取实例中所有正在运行和正在停止的进程。
参数:
RequestConfig
(可选):配置请求。
返回值:
List[Top]
:实例中的全部进程列表。
# 获取实例中所有正在运行和正在终止的进程
processList = Conn.top()
for process in processList:
print("process_id: " + process.process_id)
print("process_uql: " + process.process_uql)
process_id: a_7_11229_2
process_uql: n({_uuid > 300}).e()[:3].n() as p RETURN p{*} LIMIT 500
kill()
终止实例中正在进行的进程。
参数:
str
:待终止的进程ID,设定为*
时终止全部进程。RequestConfig
(可选):配置请求。
返回值:
UltipaResponse
:请求的结果。
# 获取实例中所有正在运行和正在终止的进程并终止所有进程
processList = Conn.top()
for process in processList:
print("process_id:" + process.process_id)
print("process_uql:" + process.process_uql)
print("duration:" + process.duration)
print("status:" + process.status)
response = Conn.kill(processId = "*")
print(response.status.code)
process_id:a_4_11461_2
process_uql:n({_uuid > 300}).e()[:3].n() as p RETURN p{*} LIMIT 500
duration:2
status:RUNNING
0
任务
使用write()
方法执行的算法作为任务运行。这些任务存储在算法运行的图集里,直至任务被删除。
showTask()
获取当前图集的任务。
参数:
Union[int, str]
(可选):任务ID (int
) 或任务名称(str
)。TaskStatus
(可选):任务状态,如果同时设置了这个参数和第一个参数,则第一个参数优先。RequestConfig
(可选):配置请求。
返回值:
List[Task]
:图集中的全部任务列表。
requestConfig = RequestConfig(graphName="miniCircle")
# 在图集miniCircle上运行一个算法任务
response = Conn.uql("algo(degree).params({order: 'desc'}).write({file:{filename: 'degree_all'}})", requestConfig)
taskID = response.alias("_task").asTable().rows[0][0]
time.sleep(3)
# Retrieves the above task
tasksList = Conn.showTask(algoNameOrId=int(taskID), config=requestConfig)
#tasksList = Conn.showTask(algoNameOrId=str("degree"), status=TaskStatus.Done, config=requestConfig)
for task in tasksList:
print("Task ID:", task.task_info.task_id)
print("Server ID:", task.task_info.server_id)
print("Algo Name:", task.task_info.algo_name)
print("Task Status:", task.task_info.TASK_STATUS)
print("Task Params:", task.param)
print("Task Result:", task.result)
Task ID: 79686
Server ID: 1
Algo Name: degree
Task Status: 3
Task Params: {'order': 'desc'}
Task Result: {'total_degree': '1392.000000', 'avarage_degree': '4.578947', 'result_files': 'degree_all'}
clearTask()
从当前图集清除(删除)任务。状态为COMPUTING
和WRITING
的任务无法被清除。
参数:
Union[int, str]
(可选):任务ID (int
) 或任务名称(str
)。TaskStatus
(可选):任务状态;如果同时设置了这个参数和第一个参数,则第一个参数优先。RequestConfig
(可选):配置请求。
返回值:
UltipaResponse
:请求的结果。
requestConfig = RequestConfig(graphName="miniCircle")
# 在图集miniCircle上运行一个算法任务
response = Conn.uql("algo(degree).params({order: 'desc'}).write({file:{filename: 'degree_all'}})", requestConfig=requestConfig)
taskID = response.alias("_task").asTable().rows[0][0]
time.sleep(3)
# 清除以上任务
response1 = Conn.clearTask(algoNameOrId=int(taskID), config=requestConfig)
print(f"Task {taskID} cleared: {response1.status.code}")
Task 79687 cleared: 0
stopTask()
终止当前图集里状态为COMPUTING
的任务。
参数:
str
:待终止的任务ID;设定为*
时终止全部任务。RequestConfig
(可选):配置请求。
返回值:
UltipaResponse
:请求的结果。
requestConfig = RequestConfig(graphName="miniCircle")
# 在图集miniCircle上运行一个算法任务
response = Conn.uql("algo(louvain).params({phase1_loop_num: 20, min_modularity_increase: 0.001})."
"write({file:{filename_community_id: 'communityID'}})", requestConfig)
taskID = response.alias("_task").asTable().rows[0][0]
time.sleep(3)
# 终止以上任务并打印错误代码
response1 = Conn.stopTask(taskID, requestConfig)
print(f"Task {taskID} stopped: {response1.status.code}")
# 获取以上终止的任务
task = Conn.showTask(algoNameOrId=int(taskID), config=requestConfig)
print("Task ID:", task[0].task_info.task_id)
print("Server ID:", task[0].task_info.server_id)
print("Algo Name:", task[0].task_info.algo_name)
print("Task Params:", task[0].param)
print("Task Result:", task[0].result)
Task 79689 stopped: 0
Task ID: 79689
Server ID: 2
Algo Name: louvain
Task Params: {'phase1_loop_num': '20', 'min_modularity_increase': '0.001'}
Task Result: {}
完整示例
from ultipa import Connection, UltipaConfig, TaskStatus
from ultipa.configuration.RequestConfig import RequestConfig
import time
ultipaConfig = UltipaConfig()
# URI示例:ultipaConfig.hosts = ["mqj4zouys.us-east-1.cloud.ultipa.com:60010"]
ultipaConfig.hosts = ["192.168.1.85:60061","192.168.1.87:60061","192.168.1.88:60061"]
ultipaConfig.username = "<username>"
ultipaConfig.password = "<password>"
Conn = Connection.NewConnection(defaultConfig=ultipaConfig)
# 配置请求
requestConfig = RequestConfig(graphName="miniCircle")
# 在图集miniCircle上运行一个算法任务
response = Conn.uql("algo(degree).params({order: 'desc'}).write({file:{filename: 'degree_all'}})", requestConfig)
taskID = response.alias("_task").asTable().rows[0][0]
time.sleep(3)
# 获取以上任务
task = Conn.showTask(algoNameOrId=int(taskID), config=requestConfig)
print("Task ID:", task[0].task_info.task_id)
print("Server ID:", task[0].task_info.server_id)
print("Algo Name:", task[0].task_info.algo_name)
print("Task Params:", task[0].param)
print("Task Result:", task[0].result)