Neo4j-04.数据导入与对接工具

把外部数据写入 Neo4j 是项目落地的第一步。本文按数据规模在线/离线需求,梳理常见存储格式、导入方式与对接工具——从小文件 CSV 到亿级边 bulk import,再到 Python 驱动与 CDC 流式同步。

段末注释:ETL(extract, transform, load,抽取、转换、加载)指将源系统数据清洗后载入目标库的流程;CDC(change data capture,变更数据捕获)指捕获源库增量变更并同步到下游。

前置Neo4j-03.Cypher增删改查


一、导入方式选型

1
2
3
4
5
6
7
flowchart TD
A[数据源] --> B{数据量级?}
B -->|< 百万行| C[LOAD CSV / 驱动批写]
B -->|百万~十亿| D{可停机?}
D -->|是| E[neo4j-admin database import]
D -->|否| F[驱动 + IN TRANSACTIONS 分批]
B -->|持续增量| G[CDC / 定时 MERGE 任务]
方式 规模 在线 速度
LOAD CSV 小~中
官方驱动批写 小~中
neo4j-admin import 否(空库) 极快
APOC 中~高
Spark Connector 离线

二、常见数据格式

2.1 边列表 CSV(最常见)

edges.csv

1
2
3
src_id,dst_id,rel_type,score
P00533,P04626,INTERACTS_WITH,0.92
P00533,hsa04012,IN_PATHWAY,

nodes.csv(可选,节点属性单独维护):

1
2
3
id,label,name,organism
P00533,Protein,EGFR,Homo sapiens
P04626,Protein,ERBB2,Homo sapiens

2.2 单文件「长表」

1
2
3
start_label,start_key,end_label,end_key,relationship,prop_key,prop_value
Person,alice,Person,bob,KNOWS,since,2020
Person,bob,Company,acme,WORKS_AT,role,Engineer

2.3 JSON

1
2
3
4
{
"nodes": [{"labels": ["Person"], "properties": {"name": "Alice"}}],
"relationships": [{"start": 0, "end": 1, "type": "KNOWS", "properties": {}}]
}

2.4 GraphML / GML

图工具(yEd、Cytoscape、NetworkX)常用交换格式;通常经 NetworkXJava 工具 转为 CSV 再导入。

2.5 RDF / Turtle

语义网三元组;Neo4j 侧可用 Neosemantics (n10s) 插件导入 RDF,或 ETL 转为属性图。

2.6 SQL 关系库

外键表 → 边;实体表 → 节点。工具:自研脚本、SQL 导出 CSV、Debezium CDC。


三、LOAD CSV(在线中小规模)

3.1 准备文件

将 CSV 放入 Neo4j 的 import/ 目录(Docker 挂载 -v ./import:/var/lib/neo4j/import)。

3.2 导入节点

1
2
3
4
LOAD CSV WITH HEADERS FROM 'file:///nodes.csv' AS row
MERGE (n:Protein {uniprot: row.id})
SET n.name = row.name,
n.organism = row.organism;

3.3 导入关系

1
2
3
4
5
LOAD CSV WITH HEADERS FROM 'file:///edges.csv' AS row
MATCH (a:Protein {uniprot: row.src_id})
MATCH (b:Protein {uniprot: row.dst_id})
MERGE (a)-[r:INTERACTS_WITH]->(b)
SET r.score = toFloat(row.score);

3.4 远程 CSV

1
2
LOAD CSV WITH HEADERS FROM 'https://example.com/data.csv' AS row
...

需在 neo4j.conf 开启:

1
dbms.security.allow_csv_import_from_file_urls=true

3.5 注意事项

建议
大文件 USING PERIODIC COMMIT 1000(Neo4j 4.x)或 IN TRANSACTIONS(5.x)
类型转换 toInteger()toFloat()datetime()
空值 CASE WHEN row.score = '' THEN null ELSE toFloat(row.score) END
先导节点再导边 避免 MATCH 找不到端点

Neo4j 5 分批示例:

1
2
3
4
5
6
7
8
LOAD CSV WITH HEADERS FROM 'file:///edges.csv' AS row
CALL {
WITH row
MATCH (a:Protein {uniprot: row.src_id})
MATCH (b:Protein {uniprot: row.dst_id})
MERGE (a)-[r:INTERACTS_WITH]->(b)
SET r.score = toFloat(row.score)
} IN TRANSACTIONS OF 5000 ROWS;

四、neo4j-admin database import(离线海量)

适用于初始化空库、节点/边达千万~百亿级。速度比在线 LOAD CSV 快一个数量级以上。

4.1 准备 Neo4j 原生 CSV 格式

nodes.csv(每行一个节点,:ID:LABEL 为特殊列):

1
2
3
nodeId:ID, name,:LABEL
P00533,EGFR,Protein
P04626,ERBB2,Protein

relationships.csv

1
2
:START_ID,:END_ID,:TYPE,score:float
P00533,P04626,INTERACTS_WITH,0.92

4.2 执行导入(数据库停止)

1
2
3
4
5
6
7
8
9
10
neo4j stop

neo4j-admin database import full \
--nodes=Protein=nodes.csv \
--relationships=INTERACTS_WITH=relationships.csv \
--delimiter=, \
--array-delimiter=; \
neo4j

neo4j start

Docker 环境:

1
2
3
4
docker exec -it neo4j neo4j-admin database import full \
--nodes=/import/nodes.csv \
--relationships=/import/rels.csv \
neo4j

限制:目标库须为空或新建;导入期间库不可用。


五、APOC 扩展导入

APOC(Awesome Procedures on Cypher)提供大量过程函数,需安装插件(Docker 环境变量 NEO4J_PLUGINS='["apoc"]')。

5.1 从 JSON 文件

1
2
3
4
CALL apoc.load.json('file:///graph.json') YIELD value
UNWIND value.nodes AS node
CALL apoc.create.node(node.labels, node.properties) YIELD node AS n
RETURN count(n);

5.2 从 JDBC(关系库直读)

1
2
3
4
CALL apoc.load.jdbc('jdbc:postgresql://host:5432/mydb', 'SELECT id, name FROM users')
YIELD row
MERGE (u:User {id: row.id})
SET u.name = row.name;

5.3 周期性增量

1
2
3
4
5
CALL apoc.periodic.iterate(
"LOAD CSV WITH HEADERS FROM 'file:///delta.csv' AS row RETURN row",
"MATCH (a {id: row.src}) MATCH (b {id: row.dst}) MERGE (a)-[:LINK]->(b)",
{batchSize: 5000, parallel: false}
);

六、官方驱动(应用层对接)

6.1 Python:批量 MERGE

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from neo4j import GraphDatabase

def import_edges(tx, batch):
tx.run("""
UNWIND $batch AS row
MERGE (a:Protein {uniprot: row.src})
MERGE (b:Protein {uniprot: row.dst})
MERGE (a)-[r:INTERACTS_WITH]->(b)
SET r.score = row.score
""", batch=batch)

batch = [{"src": "P00533", "dst": "P04626", "score": 0.92}, ...]
with driver.session() as session:
for i in range(0, len(batch), 1000):
session.execute_write(import_edges, batch[i:i+1000])

6.2 Java / Spring Data Neo4j

1
2
3
4
5
@Node
public class Protein {
@Id @Property("uniprot") private String uniprot;
private String name;
}

Repository 层 @Query 写 Cypher,或 Neo4jClient 执行原生语句。

6.3 JavaScript(Node)

1
2
3
4
const session = driver.session({ database: 'neo4j' });
await session.executeWrite(tx =>
tx.run('MERGE (p:Person {name: $name})', { name: 'Alice' })
);

七、Python 生态:pandas / NetworkX

7.1 pandas → 驱动

1
2
3
4
5
import pandas as pd

df = pd.read_csv("edges.csv")
rows = df.to_dict("records")
# 分批传入 UNWIND $batch(见上)

7.2 NetworkX → CSV → Neo4j

1
2
3
4
5
import networkx as nx
G = nx.read_edgelist("ppi.txt", nodetype=str)

nx.write_node_link_json(G, "graph.json") # 或导出边列表
# 再 LOAD CSV / APOC

7.3 neo4j-graphdatascience

Graph Data Science 库的 Python 客户端可读写投影图(见 Neo4j-07),适合算法流水线而非初始全量导入。


八、流式与 CDC 同步

工具 模式 说明
Kafka + Neo4j Connector 流式 官方/社区 Kafka Connect sink
Debezium CDC 捕获 PG/MySQL binlog → Kafka → Neo4j
Airbyte 批/增量 可视化 ETL,有 Neo4j destination
自研定时任务 读 watermark,MERGE 增量

典型架构:

1
2
3
4
5
6
flowchart LR
PG[(PostgreSQL)]
DBZ[Debezium]
KF[Kafka]
N4J[(Neo4j)]
PG --> DBZ --> KF --> N4J

增量 MERGE 关键:源表主键 = 图节点唯一键,保证幂等。


九、导出(迁出与备份前置)

1
2
3
4
5
6
7
8
9
// CSV 导出(APOC)
CALL apoc.export.csv.all('/export/full.csv', {})
YIELD file, nodes, relationships;

// 仅 Cypher 结果
CALL apoc.export.cypher.query(
"MATCH (n:Protein) RETURN n",
'/export/proteins.cypher', {}
);

Neo4j 5 原生:

1
neo4j-admin database dump neo4j --to-path=/backups/

详见 Neo4j-06.运维迁移与备份


十、导入 checklist

  1. 建模确认:节点唯一键、关系类型、属性类型
  2. 先建约束/索引,再导数据
  3. 先导节点,再导边
  4. 百万级以上评估 bulk importIN TRANSACTIONS
  5. 导入后 MATCH (n) RETURN count(n) 与源表对账
  6. 监控堆内存与事务日志空间

十一、小结

场景 推荐
学习 / 小样本 Browser + LOAD CSV
应用内写图 官方驱动 + UNWIND 批写
初始化大图 neo4j-admin import
异构源持续同步 CDC + MERGE 或 Airbyte
JSON / JDBC APOC
下一篇 内容
Neo4j-05.查询进阶与性能调优 路径、索引、PROFILE
Neo4j-06.运维迁移与备份 dump/restore
-------------本文结束感谢您的阅读-------------