Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for array of objects #11

Open
ArturKT opened this issue Jun 13, 2023 · 4 comments
Open

Support for array of objects #11

ArturKT opened this issue Jun 13, 2023 · 4 comments
Labels
enhancement New feature or request

Comments

@ArturKT
Copy link

ArturKT commented Jun 13, 2023

Any plan to support ELEMENT encryption for an array of objects?
Example data:

{
  "_id":"gabd9a39-9856-38b6-b983-94513f746f34",
  "currency":"ABC",
  "fields":[
    {
      "fieldOne":"someValueOne",
      "fieldTwo":"someValueTwo"
    },
    {
      "fieldOne":"anotherValueOne",
      "fieldTwo":"anotherValueTwo"
    },
  ],
  "nested":{
    "value": 123,
    "valueString":"abc"
  }
}

I would like to encrypt all `fields[*].fieldOne'. I have tried that by setting:

        "transforms.cipher.field_config": "[{\"name\":\"fields\"},{\"name\":\"fields.fieldOne\"}]",
        "transforms.cipher.field_mode": "ELEMENT"

But that fails with exception:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
    at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:395)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:361)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.lang.Thread.run(Thread.java:829)
Caused by: java.util.NoSuchElementException: no default type mapping found for type STRUCT (optional true) and cipher mode ENCRYPT
    at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.TypeSchemaMapper.lambda$getSchemaForPrimitiveType$0(TypeSchemaMapper.java:63)
    at java.util.Optional.orElseThrow(Optional.java:408)
    at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.TypeSchemaMapper.getSchemaForPrimitiveType(TypeSchemaMapper.java:62)
    at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaRewriter.adaptArraySchema(SchemaRewriter.java:110)
    at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaRewriter.adaptField(SchemaRewriter.java:87)
    at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaRewriter.adaptSchema(SchemaRewriter.java:74)
    at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField.processWithSchema(CipherField.java:184)
    at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField.apply(CipherField.java:165)
    at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
    ... 13 more
@ArturKT
Copy link
Author

ArturKT commented Jun 13, 2023

A similar use case for decryption. There is no way to encrypt a single field from an object inside an array. So I have used field_mode OBJECT and wanted to decrypt it in the sink connector.

Payload to encrypt:

{
  "_id":"gabd9a39-9856-38b6-b983-94513f746f34",
  "currency":"ABC",
  "fields":[
    {
      "fieldOne":"someValueOne",
      "fieldTwo":"someValueTwo"
    },
    {
      "fieldOne":"anotherValueOne",
      "fieldTwo":"anotherValueTwo"
    },
  ],
  "nested":{
    "value": 123,
    "valueString":"abc"
  }
}

Config:

(...)
        "transforms.cipher.field_config": "[{\"name\":\"fields\"}",
        "transforms.cipher.field_mode": "OBJECT"

That works just fine, the output is something like:

"fields":"5QMBG3qEIVeGriTdTLKO6QWECoOjAi1qcJX(...)"

But then I want to decrypt the value in the sink connector, so I use:

        "transforms.cipher.field_config": "[{\"name\":\"fields\", \"schema\": {\"type\":\"ARRAY\",\"valueSchema\":{\"type\":\"STRUCT\",\"fields\":[{\"type\":\"STRING\",\"optional\":true,\"field\":\"fieldOne\"},{\"type\":\"STRING\",\"optional\":true,\"field\":\"fieldTwo\"}],\"optional\":true,\"name\":\"fields\"},\"optional\":true,\"name\":\"fields\",\"field\":\"fields\"}}]",
        "transforms.cipher.field_mode": "OBJECT"

That fails with an error:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in the error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
        at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:546)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:499)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:335)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: expected primitive value type for array elements but found STRUCT
        at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaRewriter.extractAndAdaptArraySchemaFromConfig(SchemaRewriter.java:189)
        at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaRewriter.adaptArraySchema(SchemaRewriter.java:118)
        at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaRewriter.adaptField(SchemaRewriter.java:87)
        at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaRewriter.adaptSchema(SchemaRewriter.java:74)
        at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField.processWithSchema(CipherField.java:184)
        at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField.apply(CipherField.java:165)
        at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
        ... 15 more

I have debugged the source code and I found that there is a TODO:
https://github.com/hpgrahsl/kryptonite-for-kafka/blob/master/connect-transform-kryptonite/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/SchemaRewriter.java#L187

@hpgrahsl
Copy link
Owner

Hi there. Thanks for reaching out and opening this issue. The answer to this one needs some context and explanation:

  1. if your records are with schema the processing of objects within arrays is currently not supported.

  2. what could work for you maybe is to switch to schemaless records. then it should work to encrypt a map within an array. the only limitation for now is that by default all map fields are encrypted or none, meaning you cannot select only a single field of a map within an array for encryption.

hope this clarifies why you see the behaviour you posted above.

@ArturKT
Copy link
Author

ArturKT commented Jun 14, 2023

Thanks @hpgrahsl, I will try with the schemaless setup.

@hpgrahsl hpgrahsl added the enhancement New feature or request label Jun 14, 2023
@hpgrahsl
Copy link
Owner

@ArturKT did it work for you to go with a schemaless approach for your enc/dec scenario?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants