Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: schema not cached in spark/flink loader #334

Merged
merged 4 commits into from
Sep 20, 2022

Conversation

haohao0103
Copy link
Contributor

@haohao0103 haohao0103 commented Sep 16, 2022

fix #333 SchemaCache init and after query schema then put the latest schema to SchemaCache

I have read the CLA Document and I hereby sign the CLA

@haohao0103
Copy link
Contributor Author

haohao0103 commented Sep 16, 2022

Fix bug: SchemaCache init and after query schema then put the latest schema to SchemaCache

@@ -46,6 +46,7 @@ public SchemaCache(HugeClient client) {
this.propertyKeys = new HashMap<>();
this.vertexLabels = new HashMap<>();
this.edgeLabels = new HashMap<>();
updateAll();
Copy link
Member

@imbajin imbajin Sep 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need clear the cache when we firstly create it? (for example?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The HugeGraphSparkLoader does not get the real metadata information when SchemaCache is first created

@imbajin
Copy link
Member

imbajin commented Sep 19, 2022

2 little questions:

  1. why it only influences spark-loader now?
  2. shall we flush the cache periodically?

@codecov
Copy link

codecov bot commented Sep 19, 2022

Codecov Report

Merging #334 (d75f263) into master (9d34f4a) will decrease coverage by 0.04%.
The diff coverage is 57.14%.

❗ Current head d75f263 differs from pull request most recent head d09a1f7. Consider uploading reports for the commit d09a1f7 to get more accurate results

@@             Coverage Diff              @@
##             master     #334      +/-   ##
============================================
- Coverage     67.49%   67.45%   -0.05%     
- Complexity      877      878       +1     
============================================
  Files            86       86              
  Lines          4024     4028       +4     
  Branches        475      477       +2     
============================================
+ Hits           2716     2717       +1     
- Misses         1104     1106       +2     
- Partials        204      205       +1     
Impacted Files Coverage Δ
...u/hugegraph/loader/spark/HugeGraphSparkLoader.java 0.00% <0.00%> (ø)
...gegraph/loader/reader/file/OrcFileLineFetcher.java 87.71% <66.66%> (-3.03%) ⬇️

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@haohao0103
Copy link
Contributor Author

2 little questions:

  1. why it only influences spark-loader now?
  2. shall we flush the cache periodically?

1、Hugegraphloader中load()方法会执行this.context.updateSchemaCache()更新元数据
2、可以改成第一次的初始化,或者改成第一次查询后更新,这两种我觉得保留一种即可

@simon824
Copy link
Member

hi @haohao0103 ,thanks for your contribution.
I think maybe it's better to updateSchemaCache after new LoadContext just like HugeGraphLoader, what do you think? @imbajin @haohao0103
https://github.com/apache/incubator-hugegraph-toolchain/blob/master/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java#L120

    private LoadContext initPartition(
            LoadOptions loadOptions, InputStruct struct) {
        LoadContext context = new LoadContext(loadOptions);
        for (VertexMapping vertexMapping : struct.vertices()) {
            this.builders.put(
                    new VertexBuilder(context, struct, vertexMapping),
                    new ArrayList<>());
        }
        for (EdgeMapping edgeMapping : struct.edges()) {
            this.builders.put(new EdgeBuilder(context, struct, edgeMapping),
                              new ArrayList<>());
        }
        context.updateSchemaCache();
        return context;
    }

@haohao0103
Copy link
Contributor Author

haohao0103 commented Sep 19, 2022

hi @haohao0103 ,thanks for your contribution. I think maybe it's better to updateSchemaCache after new LoadContext just like HugeGraphLoader, what do you think? @imbajin @haohao0103 https://github.com/apache/incubator-hugegraph-toolchain/blob/master/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java#L120

    private LoadContext initPartition(
            LoadOptions loadOptions, InputStruct struct) {
        LoadContext context = new LoadContext(loadOptions);
        for (VertexMapping vertexMapping : struct.vertices()) {
            this.builders.put(
                    new VertexBuilder(context, struct, vertexMapping),
                    new ArrayList<>());
        }
        for (EdgeMapping edgeMapping : struct.edges()) {
            this.builders.put(new EdgeBuilder(context, struct, edgeMapping),
                              new ArrayList<>());
        }
        context.updateSchemaCache();
        return context;
    }

我觉得放在这里是可以的,我们第一次就在这里解决的;但是initPartition方法是HugeGraphSparkLoader类私有的;感觉需要调用initPartition方法才能正确正确创建及初始化LoadContext对象,感觉有点不合适哈。个人的浅见。。。。

@simon824
Copy link
Member

hi @haohao0103 ,thanks for your contribution. I think maybe it's better to updateSchemaCache after new LoadContext just like HugeGraphLoader, what do you think? @imbajin @haohao0103 https://github.com/apache/incubator-hugegraph-toolchain/blob/master/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java#L120

    private LoadContext initPartition(
            LoadOptions loadOptions, InputStruct struct) {
        LoadContext context = new LoadContext(loadOptions);
        for (VertexMapping vertexMapping : struct.vertices()) {
            this.builders.put(
                    new VertexBuilder(context, struct, vertexMapping),
                    new ArrayList<>());
        }
        for (EdgeMapping edgeMapping : struct.edges()) {
            this.builders.put(new EdgeBuilder(context, struct, edgeMapping),
                              new ArrayList<>());
        }
        context.updateSchemaCache();
        return context;
    }

我觉得放在这里是可以的,我们第一次就在这里解决的;但是initPartition方法是HugeGraphSparkLoader类私有的;感觉需要调用initPartition方法才能正确正确创建及初始化LoadContext对象,感觉有点不合适哈。个人的浅见。。。。

没有明白你的意思,你是指哪里不合适?LoadContext 应该是只有在初始化分区的时候才需要创建?

@haohao0103
Copy link
Contributor Author

hi @haohao0103 ,thanks for your contribution. I think maybe it's better to updateSchemaCache after new LoadContext just like HugeGraphLoader, what do you think? @imbajin @haohao0103 https://github.com/apache/incubator-hugegraph-toolchain/blob/master/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java#L120

    private LoadContext initPartition(
            LoadOptions loadOptions, InputStruct struct) {
        LoadContext context = new LoadContext(loadOptions);
        for (VertexMapping vertexMapping : struct.vertices()) {
            this.builders.put(
                    new VertexBuilder(context, struct, vertexMapping),
                    new ArrayList<>());
        }
        for (EdgeMapping edgeMapping : struct.edges()) {
            this.builders.put(new EdgeBuilder(context, struct, edgeMapping),
                              new ArrayList<>());
        }
        context.updateSchemaCache();
        return context;
    }

我觉得放在这里是可以的,我们第一次就在这里解决的;但是initPartition方法是HugeGraphSparkLoader类私有的;感觉需要调用initPartition方法才能正确正确创建及初始化LoadContext对象,感觉有点不合适哈。个人的浅见。。。。

没有明白你的意思,你是指哪里不合适?LoadContext 应该是只有在初始化分区的时候才需要创建?

我的意思是这样会不会限定了LoadContext对象只有在HugeGraphSparkLoader类才能被正确初始化,限制了扩展性;我们在开发Bulkload代码的时候,需要扩展新的类然后使用LoadContext对象,就没办法调用initPartition方法了;在另外的类中构建LoadContext对象时需要再显示执行下updateSchemaCache()方法才能保证loadcontext对象可以正确使用。这个是我的理解哈,代码设计等我也不是很精通,仅供大佬参考哈

@simon824
Copy link
Member

@haohao0103 明白你的意思了,但是 HugeGraphLoader 还有一个创建 Schema 的逻辑createSchema,SparkLoader 还没有加这个逻辑,但是也是需要的,创建 schema 后再加载 schema 缓存看起来更合理一些。另外如果有必要的话完全可以将 initPartition 方法改为 public,或者定义一个其他更通用的方法。

@haohao0103
Copy link
Contributor Author

@haohao0103 明白你的意思了,但是 HugeGraphLoader 还有一个创建 Schema 的逻辑createSchema,SparkLoader 还没有加这个逻辑,但是也是需要的,创建 schema 后再加载 schema 缓存看起来更合理一些。另外如果有必要的话完全可以将 initPartition 方法改为 public,或者定义一个其他更通用的方法。

明白了,谢谢哈

@haohao0103
Copy link
Contributor Author

@haohao0103 明白你的意思了,但是 HugeGraphLoader 还有一个创建 Schema 的逻辑createSchema,SparkLoader 还没有加这个逻辑,但是也是需要的,创建 schema 后再加载 schema 缓存看起来更合理一些。另外如果有必要的话完全可以将 initPartition 方法改为 public,或者定义一个其他更通用的方法。

明白了,谢谢哈

我需要重新提交一个pr?

@simon824
Copy link
Member

@haohao0103 可以直接在这个PR上提交代码覆盖

@haohao0103
Copy link
Contributor Author

@haohao0103 可以直接在这个PR上提交代码覆盖

已提交代码

simon824
simon824 previously approved these changes Sep 20, 2022
imbajin
imbajin previously approved these changes Sep 20, 2022
Copy link
Member

@imbajin imbajin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

记得也顺便检查一下 flink-loader 那边是否也需要加一下

如果需要 public 的话可以改

@haohao0103
Copy link
Contributor Author

记得也顺便检查一下 flink-loader 那边是否也需要加一下

如果需要 public 的话可以改

需要修改,需要在initBuilders()方法中调用context.updateSchemaCache();

@haohao0103
Copy link
Contributor Author

记得也顺便检查一下 flink-loader 那边是否也需要加一下
如果需要 public 的话可以改

需要修改,需要在initBuilders()方法中调用context.updateSchemaCache();

private Map<ElementBuilder, List> initBuilders() {
LoadContext loadContext = new LoadContext(this.loadOptions);
Map<ElementBuilder, List> builders = new HashMap<>();
for (VertexMapping vertexMapping : this.struct.vertices()) {
builders.put(new VertexBuilder(loadContext, this.struct, vertexMapping),
new ArrayList<>());
}
for (EdgeMapping edgeMapping : this.struct.edges()) {
builders.put(new EdgeBuilder(loadContext, this.struct, edgeMapping),
new ArrayList<>());
}
// TODO
// loadContext.updateSchemaCache();
return builders;
}

@haohao0103
Copy link
Contributor Author

记得也顺便检查一下 flink-loader 那边是否也需要加一下

如果需要 public 的话可以改

还可以继续提代码吗?

@haohao0103 haohao0103 dismissed stale reviews from imbajin and simon824 via d09a1f7 September 20, 2022 06:01
@haohao0103
Copy link
Contributor Author

记得也顺便检查一下 flink-loader 那边是否也需要加一下
如果需要 public 的话可以改

还可以继续提代码吗?

@imbajin @simon824 flink-loader的代码也修复了,麻烦review。谢谢

@haohao0103 haohao0103 changed the title Schema cache optimize SchemaCache optimize Sep 20, 2022
@imbajin imbajin changed the title SchemaCache optimize fix: schema not cached in spark/flink loader Sep 20, 2022
@imbajin imbajin merged commit 6d82b15 into apache:master Sep 20, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants