Java初始化大量数据到Neo4j中(一)

背景:我们项目第一次部署图数据库,要求我们把现有的业务数据以及关系上线第一时间初始化到Neo4j中。开发环境数据量已经百万级别。生成环境数据量更多。

我刚开始开发的时候,由于对Neo4j的了解并没有很多,第一想到的是用代码通用组装create语句进行创建节点以及关系。

业务说明:系统中有很多实体表,每个实体表中有自己的数据,不同实体有一张关系表进行维护。

我开发的思路是:1.先将所有的表中数据取出来做为节点 2.根据关系表将这个数据的关系查出来之后组装语句将数据添加到Neo4j中。

具体代码如下(Springboot项目版本2.2.5RELEASE):
pom.xml

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-neo4j</artifactId>
        </dependency>

配置文件进行下面配置:

spring:
   data:
      neo4j:
         uri: bolt://localhost:7687
         username: neo4j
         password: neo4j

使用Java代码组装CQL语句,用原生session进行
Neo4jConfig.java

@Configuration
public class Neo4jConfig {

    @Value("${spring.data.neo4j.uri}")
    private String uri;
    @Value("${spring.data.neo4j.username}")
    private String userName;
    @Value("${spring.data.neo4j.password}")
    private String password;

    @Bean
    public org.neo4j.ogm.config.Configuration getConfiguration() {
        org.neo4j.ogm.config.Configuration configuration = new org.neo4j.ogm.config.Configuration.
                Builder().uri(uri).connectionPoolSize(100).credentials(userName, password).withBasePackages("com.troy.keeper.desc.repository").build();
        return configuration;
    }

    @Bean
    public SessionFactory sessionFactory() {
        return new SessionFactory(getConfiguration());
    }

    @Bean("neo4jTransaction")
    public Neo4jTransactionManager neo4jTransactionManager(SessionFactory sessionFactory) {
        return new Neo4jTransactionManager(sessionFactory);
    }

接口入口Controller.java

@GetMapping("initDataToNeo4j")
        public void initDataToNeo4j() {
            service.initDataToNeo4j();
        }

Service.java

//节点数据,按照自己的实际业务添加,我这里对应的是所有表的数据,因为我业务中所有表结果基本一样,也即节点属性都一样。每个表的数据一个map,key是表名作为节点的标签
Map<String, List<NodeData>> nodeDataMap;
//关系数据,将每一个表数据的关系作为RelationData实体
List<RelationData> relationDatas;

//数据组装完成后,进行节点的创建
neo4jUtil.creatNode(nodeDataMap);

//进行关系绑定
neo4jUtil.bindRelation(relationDatas);

NodeData.java

private String id;//属性id
private String name;//属性名称
private String table;//作为节点标签

RelationData.java

//关系id
private String id;
//关系名称
private String relationName;
//因为我这里的关系跨实体,所以需要指定结束标签
private String endLableName;

//因为我这里的关系跨实体,所以需要指定开始标签
private String startLableName;

//开始节点的值
private String startValue;

//结束节点的值
private String endWhereValue;

Neo4jUtil.java

@Component
public class Neo4jUtil {
@Resource
private Session session;
/**
     * 删除标签下的节点(包括节点之间的关系)
     * @param lableName
     * @return
     */
    public Integer deleteByLable(String lableName) {
        if (StringUtils.isEmpty(lableName)) {
            return 0;
        }

        String cypherSql = String.format("MATCH (r:`%s`) DETACH DELETE r ", lableName);
        Result query = session.query(cypherSql, new HashMap<>(16));
        session.clear();
        return query.queryStatistics().getNodesDeleted();
    }

//创建节点
public  void creatNode(Map<String, List<NodeData>> nodeDataMap) {

        if (nodeDataMap == null) {
            return ;
        }

        for(String key:nodeDataMap.keySet()){
            List<NodeData> data= nodeDataMap.get(key);
            if (StringUtils.isEmpty(key)) {
                continue;
            }

            //表下没有数据的只创建一个没有属性的节点
            if (data== null || data.isEmpty()) {
                String sql =String.format("create (:`%s`)",key);
                session.query(sql, new HashMap<>(16));
                continue;
            }
            //因为是全量导入,可以先删除这个标签下的全部节点和关系,按照自己业务要求自行添加
            deleteByLable(key);
            for (NodeData nodeData:data) {
                //兼容中文和特殊符号
                String  labels = ":`" + String.join("`:`", key) + "`";;
                String id = nodeData.getId();
                String name = nodeData.getName();
                String property =  String.format("{id:'%s',name:'%s'} ",  id,name);

                String sql = String.format("create (%s%s)", labels, property);
                session.query(sql, new HashMap<>(16));

            }
        }
    
    }

//绑定关系
public void bindRelation( List<RelationData> relations) {
     if (relations== null) {
         return;
     }
      for (RelationData relation:relations) {
           String id = relation.getId();
            String relationName = relation.getRelationName();
          String startLableName = relation.getStartLableName();
          String endLableName = relation.getEndLableName();
          String startValue = relation.getStartValue();
          String endValue = relation.getEndValue();
    
          String property =  String.format("{id:'%s',name:'%s'} ", id,relationName);
          String cypherSql =  String.format("MATCH (n:`%s`),(m:`%s`) where n.id ='%s' and m.id= '%s' CREATE (n)-[r:%s%s]->(m)",
                  startLableName,endLableName,startValue ,endValue ,relationName,property) ;
          session.query(cypherSql, new HashMap<>(16));
      }
    }
}

之后执行controller接口,进行数据抽取和导入Neo4j,我开发的时候用的环境,大约有7w个节点,120w条关系。用本地Neo4j跑了两个多小时,连服务器部署的(跨地区)跑了8个小时。。。。

太慢了

后来查资料说是create适合数据量小的时候用,对于大量数据导入可以用neo4j-admin import ,接下来改造用neo4j-admin import ,参见Java初始化数据到Neo4j中(二)