ERROR Unable to process record SinkRecord

Hi, I’m investigating approaches to store my IOT data from multiple ‘things’. The measurements from things are stared in Kafka a topic per thing, and within the thing the key is the measurement name (string) and the value is the value of the measurement either INT or Float.

If I use a file sink I get a file of values no problem, but get issues with the MongoDB Sink

ERROR Unable to process record SinkRecord{kafkaOffset=14217, timestampType=CreateTime} ConnectRecord{topic=‘MySensors-81’, kafkaPartition=0, key=81-0-17, keySchema=Schema{STRING}, value=1579, valueSchema=Schema{STRING}, timestamp=1614000439773, headers=ConnectHeaders(headers=)} (com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData:110)
org.apache.kafka.connect.errors.DataException: Could not convert value 1579 into a BsonDocument.

How do I define to the connector that the valueSchema is INT32 or Float (some readings have decimal)?

Can you send your configuration for the sink minus any authentication credentials? Instead of writing just the integer/float value, wrap the value in a json document and set the value.converter in the a sink something like:

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",

Alternatively, to get it working you could set the converter to string

"value.converter": org.apache.kafka.connect.storage.StringConverter"

By default the sink connector is looking for a BSON document you need to tell it to take whatever is on the kafka topic and interpret it as a string, a JSON document, etc…

Note: You can write the message with an Avro or JSON schema as well, then use

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",

ok, I had another thought on this one. You could do a single message transform on the sink to takes that integer value and hoist it into a document with a key/value pair as follows:

"transforms": "HoistField,Cast",
"transforms.HoistField.type": "org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.HoistField.field": "temp",
"transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.Cast.spec": "temp:float64",

This should take that integer and create something that is like
{ “temp”:1712}

The example data below is a reading from a power meter sensor (thing) calculating watts via a led pulse count … I have a topic per “thing” and about 50 things atm want to grow .

The data is being created in the kafka topic through a webthings.io adapter, so I dont have too much control, but was contemplating asking the developer if he could toggle JSON in the value

I have tried every combination of converters - even figuring out how to populate kafka schema to entertain arvo converter that resulted in NPE :slight_smile:

Your second thought is interesting - I just have no clue how to implement it - or are these parameters for the properties file?

Given programming isnt anywhere near the top of my skils list … it would be good to guide the form of the document somehow - although it is IOT I was have expecting a pattern was available … perhaps that pattern is json in the value field show the data type of the reading - if so, I’ll put a request into the git for the webthings kafka adapter - I have been assisting testing the code

I’m using kafka-connect-standalone using

bin/connect-standalone.sh config/connect-standalone.properties config/MongoSinkConnector.properties

The data in the topic looks like this:
~kafka/kafka/bin/kafka-console-consumer.sh --topic MySensors-81 --property print.key=true --property key.separator=: --from-beginning --bootstrap-server xxxxx.local:9092
81-0-17:1579
81-1-18:628.08
81-0-17:1650
81-1-18:628.09
81-0-17:1579

The connect-standolone.properties are

bootstrap.servers=xxxxx.local:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/local/share/kafka/plugins

The MongoSinkConnector.properties

name=mongo-sink
topics=MySensors-81
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1
debug=true
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false
connection.uri=mongodb://localhost/
database=Sensors
collection=sink
max.num.retries=1
retries.defer.timeout=5000
confluent.topic.security.protocol=“PLAINTEXT”
key.projection.type=none
key.projection.list=
value.projection.type=none
value.projection.list=
field.renamer.mapping=
field.renamer.regex=
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
post.processor.chain=com.mongodb.kafka.connect.sink.processor.DocumentIdAdder

Write configuration

delete.on.null.values=false
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy
max.batch.size = 0
rate.limiting.timeout=0
rate.limiting.every.n=0

What would you like the data to look like when it is in MongoDB? To confirm, your data in the topic are strings and looks like this, correct?

81-0-17:1579
81-1-18:628.08
81-0-17:1650
81-1-18:628.09
81-0-17:1579

Yes that is the data - although the developer of the Producer has agreed to write as JSON - I’ll see what it looks like in the next couple of days …https://github.com/tim-hellhake/kafka-bridge/issues/6#issuecomment-792336419

The current structure is :
topic: device id
message key: property name
message value: {[property name]: [property value]}.

The plan is to use MongoDB as the persistence for all telemetry - I’ll then be wanting to visualise the data and perform analytics - haven’t chosen any toolsets as yet for this.

The change has been made to the Producer - the data now formatted as per below using this consumer command … does this help? Would I then use the JSON converter?

~kafka/kafka/bin/kafka-console-consumer.sh --topic MySensors-81 --property print.key=true --property key.separator=: --from-beginning --bootstrap-server xxxxx.local:9092

81-0-17:{“81-0-17”:1304}
81-1-18:{“81-1-18”:0}
81-0-17:{“81-0-17”:5016}
81-1-18:{“81-1-18”:958.96}
81-0-17:{“81-0-17”:5039}
81-1-18:{“81-1-18”:959.01}
81-0-17:{“81-0-17”:4920}
81-1-18:{“81-1-18”:959.05}
81-0-17:{“81-0-17”:2595}
81-1-18:{“81-1-18”:959.1}
81-0-17:{“81-0-17”:5173}
81-1-18:{“81-1-18”:959.15}
81-0-17:{“81-0-17”:5358}
81-1-18:{“81-1-18”:959.19}
81-0-17:{“81-0-17”:4989}
81-1-18:{“81-1-18”:959.24}
81-0-17:{“81-0-17”:4941}
81-1-18:{“81-1-18”:959.28}

Perhaps it might be easiest if the developer could represent the document like

{ “name”:“81-0-17,” “value”:1304},
{ “name”:“81-1-18”, “value”:0},
etc

If this can’t be done you have many other options. You could use use transforms to parse the message and put it in a JSON format
https://docs.confluent.io/platform/current/connect/transforms/overview.html

you can also write your own transform, or use kSQL to create a persistent query and transform the data into a new topic that contains a JSON document structure. You could also just take the existing message and insert into MongoDB as a string and have your application do the manipulation of the data.

1 Like

Success using value.converter=org.apache.kafka.connect.json.JsonConverter

use Sensors
switched to db Sensors
show collections
sink
db.sink.find();
{ “_id” : ObjectId(“604c54c6868c0042fcc9da14”), “81-0-17” : NumberLong(808) }
{ “_id” : ObjectId(“604c54c6868c0042fcc9da15”), “81-1-18” : NumberLong(0) }
{ “_id” : ObjectId(“604c54c6868c0042fcc9da16”), “81-0-17” : NumberLong(1043) }
{ “_id” : ObjectId(“604c54c6868c0042fcc9da17”), “81-1-18” : 1056.89 }
{ “_id” : ObjectId(“604c54c6868c0042fcc9da18”), “81-0-17” : NumberLong(1048) }
{ “_id” : ObjectId(“604c54c6868c0042fcc9da19”), “81-1-18” : 1056.9 }
{ “_id” : ObjectId(“604c54c6868c0042fcc9da1a”), “81-0-17” : NumberLong(1066) }
{ “_id” : ObjectId(“604c54c6868c0042fcc9da1b”), “81-1-18” : 1056.91 }
{ “_id” : ObjectId(“604c54c6868c0042fcc9da1c”), “81-0-17” : NumberLong(1024) }
{ “_id” : ObjectId(“604c54c6868c0042fcc9da1d”), “81-1-18” : 1056.93 }
{ “_id” : ObjectId(“604c54c6868c0042fcc9da1e”), “81-0-17” : NumberLong(1003) }
{ “_id” : ObjectId(“604c54c6868c0042fcc9da1f”), “81-1-18” : 1056.94 }
{ “_id” : ObjectId(“604c54c6868c0042fcc9da20”), “81-0-17” : NumberLong(1020) }
{ “_id” : ObjectId(“604c54c6868c0042fcc9da21”), “81-1-18” : 1056.95 }
{ “_id” : ObjectId(“604c54c6868c0042fcc9da22”), “81-0-17” : NumberLong(936) }
{ “_id” : ObjectId(“604c54c6868c0042fcc9da23”), “81-1-18” : 1056.96 }
{ “_id” : ObjectId(“604c54c6868c0042fcc9da24”), “81-0-17” : NumberLong(914) }
{ “_id” : ObjectId(“604c54c6868c0042fcc9da25”), “81-1-18” : 1056.97 }
{ “_id” : ObjectId(“604c54c6868c0042fcc9da26”), “81-0-17” : NumberLong(1065) }
{ “_id” : ObjectId(“604c54c6868c0042fcc9da27”), “81-1-18” : 1056.98 }

Now I have this working I can make schema design decisions about how I store the data. The key is inherently storing a lot of information - “-”<“metric number”>-<“metric reading type e.g watts, or temp in celcius, humidity in % etc”>. The metric reading type is held in a schema store defined by webthings.io so I could keep a copy as a lookup, but I would have to do it every time … not so easy if trying to use a simple visualisation app … this is this the whole database debate I guess…I think I’m leaning toward being readable … its not as if its someones name which can change and duplicates seem to be ok or worried about saving bytes on disk … a temperature reading for that date and time will always be that temperature reading for that moment in time …

Anyway I like your idea of transformations - it makes sense, so I’ll be researching that more…

Speaking of time - how do I get the reading date/time stamp into the document - just occurred to me - its not in the values -next problem to solve… anyway I’ll close this - thankyou for your help Robert

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.