Skip to content

【BUG】tag-1.16 kafka connector #1967

@13675139195

Description

@13675139195

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

【BUG1】

  1. 场景:kafka->postgreSQL 或其他
  2. 描述:kafka connector 在reader的时候 无法解析json格式的Column,会将整个json作为一个字段传给writer,不会根据Column的字段来拆分

【BUG2 Maybe】

  1. ElasticSearch7 connector 缺少SSL的keyStore证书传递,只有trustStore证书传递,会导致自签的SSL证书认证失败

What you expected to happen

kafka connector 在reader的时候 无法解析json格式的Column,会将整个json作为一个字段传给writer,不会根据Column的字段来拆分

How to reproduce

解决方式KafkaSourceFactory.createSource() 方法调用 new KafkaSyncConverter(),当codec为json时,缺少以下代码:
if (!CollectionUtils.isEmpty(kafkaConfig.getColumn())) {
List typeList =
kafkaConfig.getColumn().stream()
.map(FieldConfig::getType)
.collect(Collectors.toList());
this.toInternalConverters = new ArrayList<>();
for (TypeConfig s : typeList) {
toInternalConverters.add(
wrapIntoNullableInternalConverter(createInternalConverter(s.getType())));
}
}

Anything else

No response

Version

master

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions