Scala : Unable to send message to Kafka (hosted on remote server)2019 Community Moderator ElectionConfluent Maven repository not working?Kafka Producer 0.9 performance issue with small messagesApache Spark: Getting a InstanceAlreadyExistsException when running the Kafka producerhow to send avro schema ONLY once in kafkaMicrosoft AvroRecord doesn't contain the value deserialized through AvroApache kafka exactly once implementation not sending messagesWhy does kafka producer throw java.lang.noclassdeffounderror: kafka-producerHow to send message to Kafka with Avro serializer and schema registryKafka: Error serializing Avro message with Schema Registrycan not use kafka for pythonHow to produce Avro message in kafka topic from apache nifi and then read it using kafka streams?
Why doesn't the EU now just force the UK to choose between referendum and no-deal?
Is it true that real estate prices mainly go up?
What is the greatest age difference between a married couple in Tanach?
My story is written in English, but is set in my home country. What language should I use for the dialogue?
2D counterpart of std::array in C++17
Humanity loses the vast majority of its technology, information, and population in the year 2122. How long does it take to rebuild itself?
Why is "das Weib" grammatically neuter?
Schematic conventions for different supply rails
Ban on all campaign finance?
Pinhole Camera with Instant Film
Bash replace string at multiple places in a file from command line
Co-worker team leader wants to inject his friend's awful software into our development. What should I say to our common boss?
Do I need life insurance if I can cover my own funeral costs?
Using "wallow" verb with object
Replacing Windows 7 security updates with anti-virus?
Making a sword in the stone, in a medieval world without magic
Make a transparent 448*448 image
Theorems like the Lovász Local Lemma?
My adviser wants to be the first author
Happy pi day, everyone!
Can anyone tell me why this program fails?
How do anti-virus programs start at Windows boot?
Is it possible that AIC = BIC?
An Accountant Seeks the Help of a Mathematician
Scala : Unable to send message to Kafka (hosted on remote server)
2019 Community Moderator ElectionConfluent Maven repository not working?Kafka Producer 0.9 performance issue with small messagesApache Spark: Getting a InstanceAlreadyExistsException when running the Kafka producerhow to send avro schema ONLY once in kafkaMicrosoft AvroRecord doesn't contain the value deserialized through AvroApache kafka exactly once implementation not sending messagesWhy does kafka producer throw java.lang.noclassdeffounderror: kafka-producerHow to send message to Kafka with Avro serializer and schema registryKafka: Error serializing Avro message with Schema Registrycan not use kafka for pythonHow to produce Avro message in kafka topic from apache nifi and then read it using kafka streams?
I am using Scala 2.12 and have required libraries to convert the message to Avro (need to convert) and kafka clients too.
I am running the code on the Linux host (dev) where other application (Apache NiFi) is running and able to create KafkaProducer and publish the message to remote Kafka.
Since it is dev for now, the protocol is PLAINTEXT.
E.g. of KafkaProducer config in Nifi.
acks = 1
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers = [server1.cloud.domain:9096, server2.cloud.domain:9096, server3.cloud.domain:9096]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 5000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = kafka
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 30000
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
Also, NiFi starts with java option to use JAAS file, whose contents are:
KafkaClient
com.sun.security.auth.module.Krb5LoginModule required
principal="myUserName@myRealm"
useKeyTab=true
client=true
keyTab="/path/myfile.keytab"
serviceName="kafka";
;
Also the krb5.conf file is available which is used.
Using above config, NiFi is able to create KafkaProducer and send messages across.
Now, I am using the same with Scala code. Simple Class which uses following build.sbt and code, to send the message.
build.sbt:
// https://mvnrepository.com/artifact/org.apache.avro/avro
libraryDependencies += "org.apache.avro" % "avro" % "1.8.1"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.1.1"
libraryDependencies += "org.slf4j" % "slf4j-simple" % "1.6.4"
fork in run := true
javaOptions += "-Djava.security.auth.login.config=/path/to/jaas/kafka-jaas.conf"
javaOptions += "-Djava.security.krb5.conf=/path/to/krb/krb5.conf"
My code to send message. Removed unwanted lines for brevity. Please note testing of creating the data to Avro is running fine. The same message when given to NiFi, it is being able to publish correctly to the topic. What is not running, is the publish to kafka using Scala.
Code:
package example
import java.io.ByteArrayOutputStream
import java.util
import java.io.File
import java.util.Properties, UUID
import org.apache.avro.Schema.Parser
import org.apache.avro.Schema
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.avro.generic.GenericData.Record
import org.apache.avro.io.DecoderFactory, EncoderFactory
import org.apache.kafka.clients.producer.KafkaProducer, ProducerConfig, ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import scala.io.Source
import scala.io.StdIn
object Hello extends Greeting with App
// case classes for creating avro record
// This part works fine.
val schemaFile = "/path/Schema.avsc"
val schema = new Schema.Parser().parse(new File(schemaFile))
val reader = new GenericDatumReader[GenericRecord](schema)
val avroRecord = new GenericData.Record(schema)
// populate correctly the record.
// works fine.
val brokers = "server1.domain:9096,server2.domain:9096,server3.domain:9096"
val topic = "myTopic"
private def configuration: Properties =
val props = new Properties()
props.put("bootstrap.servers", brokers)
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer")
props.put("security.protocol", "PLAINTEXT")
props.put("sasl.kerberos.service.name", "kafka")
props.put("acks", "all")
props.put("retries","0")
props
val producer = new KafkaProducer[String, Array[Byte]](configuration)
val writer = new SpecificDatumWriter[GenericRecord](schema)
val out = new ByteArrayOutputStream()
val encoder = EncoderFactory.get.binaryEncoder(out, null)
writer.write(avroRecord, encoder)
encoder.flush()
out.close()
val serializedBytes: Array[Byte] = out.toByteArray()
val recordToSend = new ProducerRecord[String, Array[Byte]](topic, serializedBytes)
producer.send(recordToSend)
trait Greeting
lazy val greeting: String = "hello"
When I run it on sbt command line:
sbt clean
sbt compile
sbt run
I get the following error/output. Nothing published.
Output:
-bash-4.2$ sbt run
[warn] Executing in batch mode.
[warn] For better performance, hit [ENTER] to switch to interactive mode, or
[warn] consider launching sbt without any commands, or explicitly passing 'shell'
[info] Loading project definition from /path/Scala/hello-world/project
[info] Set current project to hello-world (in build file:/path/Scala/hello-world/)
[info] Running example.Hello
[info] hello
[info]
[error] 9 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
[error] acks = 1
[error] batch.size = 16384
[error] bootstrap.servers = [server1.cloud.domain:9096, server2.cloud.domain:9096, server3.cloud.domain:9096]
[error] buffer.memory = 33554432
[error] client.dns.lookup = default
[error] client.id =
[error] compression.type = none
[error] connections.max.idle.ms = 540000
[error] delivery.timeout.ms = 120000
[error] enable.idempotence = false
[error] interceptor.classes = []
[error] key.serializer = class org.apache.kafka.common.serialization.StringSerializer
[error] linger.ms = 0
[error] max.block.ms = 60000
[error] max.in.flight.requests.per.connection = 5
[error] max.request.size = 1048576
[error] metadata.max.age.ms = 300000
[error] metric.reporters = []
[error] metrics.num.samples = 2
[error] metrics.recording.level = INFO
[error] metrics.sample.window.ms = 30000
[error] partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
[error] receive.buffer.bytes = 32768
[error] reconnect.backoff.max.ms = 1000
[error] reconnect.backoff.ms = 50
[error] request.timeout.ms = 30000
[error] retries = 0
[error] retry.backoff.ms = 100
[error] sasl.client.callback.handler.class = null
[error] sasl.jaas.config = null
[error] sasl.kerberos.kinit.cmd = /usr/bin/kinit
[error] sasl.kerberos.min.time.before.relogin = 60000
[error] sasl.kerberos.service.name = kafka
[error] sasl.kerberos.ticket.renew.jitter = 0.05
[error] sasl.kerberos.ticket.renew.window.factor = 0.8
[error] sasl.login.callback.handler.class = null
[error] sasl.login.class = null
[error] sasl.login.refresh.buffer.seconds = 300
[error] sasl.login.refresh.min.period.seconds = 60
[error] sasl.login.refresh.window.factor = 0.8
[error] sasl.login.refresh.window.jitter = 0.05
[error] sasl.mechanism = GSSAPI
[error] security.protocol = PLAINTEXT
[error] send.buffer.bytes = 131072
[error] ssl.cipher.suites = null
[error] ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
[error] ssl.endpoint.identification.algorithm =
[error] ssl.key.password = null
[error] ssl.keymanager.algorithm = SunX509
[error] ssl.keystore.location = null
[error] ssl.keystore.password = null
[error] ssl.keystore.type = JKS
[error] ssl.protocol = TLS
[error] ssl.provider = null
[error] ssl.secure.random.implementation = null
[error] ssl.trustmanager.algorithm = PKIX
[error] ssl.truststore.location = null
[error] ssl.truststore.password = null
[error] ssl.truststore.type = JKS
[error] transaction.timeout.ms = 60000
[error] transactional.id = null
[error] value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
[error]
[error] 109 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.1.1
[error] 109 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 21234bee31165527
[error] 248 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: 5NMDh7lDS-SxXpgprjR6oA
[success] Total time: 1 s, completed Mar 6, 2019 1:38:14 PM
I am sure, it has to do something with security or kerberos. But other apps are able to push message, not with my scala code.
UPDATE:
Based on the response from @tgrez , I tried to block with Future get.
//producer.send(recordToSend)
val metaF: Future[RecordMetadata] = producer.send(recordToSend)
val meta = metaF.get() //blocking
val msgLog =
s"""
|offset = $meta.offset()
|partition = $meta.partition()
|topic = $meta.topic()
""".stripMargin
println(msgLog)
producer.close()
However still I am similar error.
[error] 10 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
[error] acks = 1
[error] batch.size = 16384
[error] bootstrap.servers = [server1.cloud.domain:9096, server2.cloud.domain:9096, server3.cloud.domain:9096]
[error] buffer.memory = 33554432
[error] client.dns.lookup = default
[error] client.id =
[error] compression.type = none
[error] connections.max.idle.ms = 540000
[error] delivery.timeout.ms = 120000
[error] enable.idempotence = false
[error] interceptor.classes = []
[error] key.serializer = class org.apache.kafka.common.serialization.StringSerializer
[error] linger.ms = 0
[error] max.block.ms = 60000
[error] max.in.flight.requests.per.connection = 5
[error] max.request.size = 1048576
[error] metadata.max.age.ms = 300000
[error] metric.reporters = []
[error] metrics.num.samples = 2
[error] metrics.recording.level = INFO
[error] metrics.sample.window.ms = 30000
[error] partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
[error] receive.buffer.bytes = 32768
[error] reconnect.backoff.max.ms = 1000
[error] reconnect.backoff.ms = 50
[error] request.timeout.ms = 30000
[error] retries = 0
[error] retry.backoff.ms = 100
[error] sasl.client.callback.handler.class = null
[error] sasl.jaas.config = null
[error] sasl.kerberos.kinit.cmd = /usr/bin/kinit
[error] sasl.kerberos.min.time.before.relogin = 60000
[error] sasl.kerberos.service.name = kafka
[error] sasl.kerberos.ticket.renew.jitter = 0.05
[error] sasl.kerberos.ticket.renew.window.factor = 0.8
[error] sasl.login.callback.handler.class = null
[error] sasl.login.class = null
[error] sasl.login.refresh.buffer.seconds = 300
[error] sasl.login.refresh.min.period.seconds = 60
[error] sasl.login.refresh.window.factor = 0.8
[error] sasl.login.refresh.window.jitter = 0.05
[error] sasl.mechanism = GSSAPI
[error] security.protocol = PLAINTEXT
[error] send.buffer.bytes = 131072
[error] ssl.cipher.suites = null
[error] ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
[error] ssl.endpoint.identification.algorithm =
[error] ssl.key.password = null
[error] ssl.keymanager.algorithm = SunX509
[error] ssl.keystore.location = null
[error] ssl.keystore.password = null
[error] ssl.keystore.type = JKS
[error] ssl.protocol = TLS
[error] ssl.provider = null
[error] ssl.secure.random.implementation = null
[error] ssl.trustmanager.algorithm = PKIX
[error] ssl.truststore.location = null
[error] ssl.truststore.password = null
[error] ssl.truststore.type = JKS
[error] transaction.timeout.ms = 60000
[error] transactional.id = null
[error] value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
[error]
[error] 110 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.1.1
[error] 110 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 21234bee31165527
[error] 249 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: 5NMDh7lDS-SxXpgprjR6oA
[info]
[info] offset = 8
[info] partition = 1
[info] topic = myTopic
[info]
[error] 323 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[success] Total time: 1 s, completed Mar 6, 2019 3:26:53 PM
Anything I am missing here?
UPDATE 2:
As mentioned below, I changed my code. however it is not working either. I realized something is wrong in serialization.
I already have avroRecord in the GenericData.Record format. Can't I use the same to publish the data to Kafka? Why I have to use the Array of Bytes or any other serializer for the same?
Only example I found is to use io.confluent avro serializer. But I am unable to use that as sbt or maven is failing to download it now. Infact the URL: http://packages.confluent.io/maven/ is not working. Somehow I downloaded the jars and using it as external libraries.
Changed to code to:
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
val producer = new KafkaProducer[String, GenericData.Record](configuration)
val recordToSend = new ProducerRecord[String, GenericData.Record](topic, avroRecord)
Now it is working fine.
However, I am still looking for any other serializer class (which is available in Maven) to send the message as GenericData instead of Array of Bytes.
UPDATE 3:
As suggested by user @KZapagol, I tried to use the same and getting following error.
Schema : (It is complex, so need help if I am correctly transforming the data)
"type": "record","name": "MyPnl","doc": "This schema contains the metadata fields wrapped in a header field which follows the official schema.","fields": ["name":"header","type":"type":"record","name":"header","fields":["name":"messageId","type":"string","name":"businessId","type":"string","name":"batchId","type":"string","name":"sourceSystem","type":"string","name":"secondarySourceSystem","type":[ "null", "string" ],"name":"sourceSystemCreationTimestamp","type":"long","logicalType": "timestamp-millis","name":"sentBy","type":"string","name":"sentTo","type":"string","name":"messageType","type":"string","name":"schemaVersion","type":"string","name":"processing","type":"string","name":"recordOffset","type":[ "null", "string" ]],"name":"pnlData","type":"type":"record","name":"pnlData","fields":["name":"pnlHeader","type":"type":"record","name":"pnlData","namespace":"pnlHeader","fields":["name":"granularity","type":"string","name":"pnlType","type":"string","name":"pnlSubType","type":"string","name":"businessDate","type":"string","logicalType": "date","name":"bookId","type":"string","name":"bookDescription","type":"string","name":"pnlStatus","type":"string"],"name":"pnlBreakDown","type":"type":"array","items":"type":"record","name":"pnlData","namespace":"pnlBreakDown","fields":["name":"category","type":[ "null", "string" ],"name":"subCategory","type":[ "null", "string" ],"name":"riskCategory","type":[ "null", "string" ],"name":"pnlCurrency","type":"string","name":"pnlDetails", "type":"type":"array","items": "type":"record","name":"pnlData","namespace":"pnlDetails","fields":["name":"pnlLocalAmount","type":"double","name":"pnlCDEAmount","type":"double"]]]]
I have corresponding case classes for above. (Please suggest if I have missed anything here?)
case class MessageHeader( messageId: String,
businessId: String,
batchId: String,
sourceSystem: String,
secondarySourceSystem: String,
sourceSystemCreationTimestamp: Long,
sentBy: String,
sentTo: String,
messageType: String,
schemaVersion: String,
processing: String,
recordOffset: String
)
case class PnlHeader ( granularity: String,
pnlType: String,
pnlSubType: String,
businessDate: String,
bookId: String,
bookDescription: String,
pnlStatus: String
)
case class PnlDetails ( pnlLocalAmount: Double,
pnlCDEAmount: Double
)
case class PnlBreakdown ( category: String,
subCategory: String,
riskCategory: String,
pnlCurrency: String,
pnlDetails: List[PnlDetails]
)
case class PnlData ( pnlHeader: PnlHeader, pnlBreakdown: List[PnlBreakdown] )
case class PnlRecord (header: MessageHeader, pnlData: PnlData )
I have modeled my data in above PnlRecord format. I have list of such records.
From list of such records, I iterate and try to publish it to Kafka.
// Create Producer
val producer = new KafkaProducer[String, Array[Byte]](properties)
// This filename is file where above schema is saved.
val avroJsonSchema = Source.fromFile(new File(schemaFileName)).getLines.mkString
val avroMessage = new AvroMessage(avroJsonSchema)
val avroRecord = new Record(avroMessage.schema)
// recordListToSend is of type: List[PnlRecord]
for (record <- recordListToSend)
avroRecord.put("header", record.header)
avroRecord.put("pnlData", record.pnlData)
//logger.info(s"Record: $avroRecordn")
avroMessage.gdw.write(avroRecord, EncoderFactory.get().binaryEncoder(avroMessage.baos, null))
avroMessage.dfw.append(avroRecord)
avroMessage.dfw.close()
val bytes = avroMessage.baos.toByteArray
// send data
producer.send(new ProducerRecord[String, Array[Byte]](topic, bytes), new ProducerCallback)
//flush data
producer.flush()
//flush and close producer
producer.close()
AvroMessage class (as suggested by user)
import java.io.ByteArrayOutputStream
import org.apache.avro
import org.apache.avro.Schema
import org.apache.avro.file.CodecFactory
import org.apache.avro.generic.GenericDatumWriter, GenericRecord
class AvroMessage(avroJsonSchema: String)
val parser = new Schema.Parser()
val schema = parser.parse(avroJsonSchema)
val baos = new ByteArrayOutputStream()
val gdw = new GenericDatumWriter[GenericRecord](schema)
val dfw = new avro.file.DataFileWriter[GenericRecord](gdw)
val compressionLevel = 5
dfw.setCodec(CodecFactory.deflateCodec(compressionLevel))
dfw.create(schema, baos)
I am getting the below error:
2019-03-13 16:00:09.855 [application-akka.actor.default-dispatcher-11] ERROR controllers.SAController.$anonfun$publishToSA$2(34) - com.domain.sa.model.MessageHeader cannot be cast to org.apache.avro.generic.IndexedRecord
java.lang.ClassCastException: ca.domain.my.sa.model.MessageHeader cannot be cast to org.apache.avro.generic.IndexedRecord
at org.apache.avro.generic.GenericData.getField(GenericData.java:697)
at org.apache.avro.generic.GenericData.getField(GenericData.java:712)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:164)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
at ca.domain.my.sa.dao.myPnlDao$.$anonfun$publishAvroToKafka$1(myPnlDao.scala:95)
Is my original case classes are right as per the schema?
My MessageHeader case class is shown above.
My Schema is shown above (updated).
My Record:
Record: "header": Header(my_20190313180602_00000011,my_BookLevel_Daily_Regular_20181130_EMERGINGTRS,11_20181130_8259,my,null,65162584,my,SA,PnLMessage,test,RealTime,null), "pnlData": PnlData(PnlHeader(BookLevel,Daily,Regular,2018-11-30,8259,EMERGINGTRS,Locked),List(PnlBreakdown(null,null,null,eur,List(PnlDetails(0.0,0.0022547507286072))), PnlBreakdown(null,null,null,jpy,List(PnlDetails(0.0,0.0))), PnlBreakdown(null,null,null,usd,List(PnlDetails(0.19000003399301,0.642328574985149))), PnlBreakdown(null,null,null,brl,List(PnlDetails(2.65281414613128E-8,2.4107750505209E-5))), PnlBreakdown(null,null,null,gbp,List(PnlDetails(0.0,-5.05781173706088E-5))), PnlBreakdown(null,null,null,cad,List(PnlDetails(145.399999991953,145.399999991953)))))
scala apache-kafka sbt kafka-producer-api
add a comment |
I am using Scala 2.12 and have required libraries to convert the message to Avro (need to convert) and kafka clients too.
I am running the code on the Linux host (dev) where other application (Apache NiFi) is running and able to create KafkaProducer and publish the message to remote Kafka.
Since it is dev for now, the protocol is PLAINTEXT.
E.g. of KafkaProducer config in Nifi.
acks = 1
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers = [server1.cloud.domain:9096, server2.cloud.domain:9096, server3.cloud.domain:9096]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 5000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = kafka
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 30000
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
Also, NiFi starts with java option to use JAAS file, whose contents are:
KafkaClient
com.sun.security.auth.module.Krb5LoginModule required
principal="myUserName@myRealm"
useKeyTab=true
client=true
keyTab="/path/myfile.keytab"
serviceName="kafka";
;
Also the krb5.conf file is available which is used.
Using above config, NiFi is able to create KafkaProducer and send messages across.
Now, I am using the same with Scala code. Simple Class which uses following build.sbt and code, to send the message.
build.sbt:
// https://mvnrepository.com/artifact/org.apache.avro/avro
libraryDependencies += "org.apache.avro" % "avro" % "1.8.1"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.1.1"
libraryDependencies += "org.slf4j" % "slf4j-simple" % "1.6.4"
fork in run := true
javaOptions += "-Djava.security.auth.login.config=/path/to/jaas/kafka-jaas.conf"
javaOptions += "-Djava.security.krb5.conf=/path/to/krb/krb5.conf"
My code to send message. Removed unwanted lines for brevity. Please note testing of creating the data to Avro is running fine. The same message when given to NiFi, it is being able to publish correctly to the topic. What is not running, is the publish to kafka using Scala.
Code:
package example
import java.io.ByteArrayOutputStream
import java.util
import java.io.File
import java.util.Properties, UUID
import org.apache.avro.Schema.Parser
import org.apache.avro.Schema
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.avro.generic.GenericData.Record
import org.apache.avro.io.DecoderFactory, EncoderFactory
import org.apache.kafka.clients.producer.KafkaProducer, ProducerConfig, ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import scala.io.Source
import scala.io.StdIn
object Hello extends Greeting with App
// case classes for creating avro record
// This part works fine.
val schemaFile = "/path/Schema.avsc"
val schema = new Schema.Parser().parse(new File(schemaFile))
val reader = new GenericDatumReader[GenericRecord](schema)
val avroRecord = new GenericData.Record(schema)
// populate correctly the record.
// works fine.
val brokers = "server1.domain:9096,server2.domain:9096,server3.domain:9096"
val topic = "myTopic"
private def configuration: Properties =
val props = new Properties()
props.put("bootstrap.servers", brokers)
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer")
props.put("security.protocol", "PLAINTEXT")
props.put("sasl.kerberos.service.name", "kafka")
props.put("acks", "all")
props.put("retries","0")
props
val producer = new KafkaProducer[String, Array[Byte]](configuration)
val writer = new SpecificDatumWriter[GenericRecord](schema)
val out = new ByteArrayOutputStream()
val encoder = EncoderFactory.get.binaryEncoder(out, null)
writer.write(avroRecord, encoder)
encoder.flush()
out.close()
val serializedBytes: Array[Byte] = out.toByteArray()
val recordToSend = new ProducerRecord[String, Array[Byte]](topic, serializedBytes)
producer.send(recordToSend)
trait Greeting
lazy val greeting: String = "hello"
When I run it on sbt command line:
sbt clean
sbt compile
sbt run
I get the following error/output. Nothing published.
Output:
-bash-4.2$ sbt run
[warn] Executing in batch mode.
[warn] For better performance, hit [ENTER] to switch to interactive mode, or
[warn] consider launching sbt without any commands, or explicitly passing 'shell'
[info] Loading project definition from /path/Scala/hello-world/project
[info] Set current project to hello-world (in build file:/path/Scala/hello-world/)
[info] Running example.Hello
[info] hello
[info]
[error] 9 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
[error] acks = 1
[error] batch.size = 16384
[error] bootstrap.servers = [server1.cloud.domain:9096, server2.cloud.domain:9096, server3.cloud.domain:9096]
[error] buffer.memory = 33554432
[error] client.dns.lookup = default
[error] client.id =
[error] compression.type = none
[error] connections.max.idle.ms = 540000
[error] delivery.timeout.ms = 120000
[error] enable.idempotence = false
[error] interceptor.classes = []
[error] key.serializer = class org.apache.kafka.common.serialization.StringSerializer
[error] linger.ms = 0
[error] max.block.ms = 60000
[error] max.in.flight.requests.per.connection = 5
[error] max.request.size = 1048576
[error] metadata.max.age.ms = 300000
[error] metric.reporters = []
[error] metrics.num.samples = 2
[error] metrics.recording.level = INFO
[error] metrics.sample.window.ms = 30000
[error] partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
[error] receive.buffer.bytes = 32768
[error] reconnect.backoff.max.ms = 1000
[error] reconnect.backoff.ms = 50
[error] request.timeout.ms = 30000
[error] retries = 0
[error] retry.backoff.ms = 100
[error] sasl.client.callback.handler.class = null
[error] sasl.jaas.config = null
[error] sasl.kerberos.kinit.cmd = /usr/bin/kinit
[error] sasl.kerberos.min.time.before.relogin = 60000
[error] sasl.kerberos.service.name = kafka
[error] sasl.kerberos.ticket.renew.jitter = 0.05
[error] sasl.kerberos.ticket.renew.window.factor = 0.8
[error] sasl.login.callback.handler.class = null
[error] sasl.login.class = null
[error] sasl.login.refresh.buffer.seconds = 300
[error] sasl.login.refresh.min.period.seconds = 60
[error] sasl.login.refresh.window.factor = 0.8
[error] sasl.login.refresh.window.jitter = 0.05
[error] sasl.mechanism = GSSAPI
[error] security.protocol = PLAINTEXT
[error] send.buffer.bytes = 131072
[error] ssl.cipher.suites = null
[error] ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
[error] ssl.endpoint.identification.algorithm =
[error] ssl.key.password = null
[error] ssl.keymanager.algorithm = SunX509
[error] ssl.keystore.location = null
[error] ssl.keystore.password = null
[error] ssl.keystore.type = JKS
[error] ssl.protocol = TLS
[error] ssl.provider = null
[error] ssl.secure.random.implementation = null
[error] ssl.trustmanager.algorithm = PKIX
[error] ssl.truststore.location = null
[error] ssl.truststore.password = null
[error] ssl.truststore.type = JKS
[error] transaction.timeout.ms = 60000
[error] transactional.id = null
[error] value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
[error]
[error] 109 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.1.1
[error] 109 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 21234bee31165527
[error] 248 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: 5NMDh7lDS-SxXpgprjR6oA
[success] Total time: 1 s, completed Mar 6, 2019 1:38:14 PM
I am sure, it has to do something with security or kerberos. But other apps are able to push message, not with my scala code.
UPDATE:
Based on the response from @tgrez , I tried to block with Future get.
//producer.send(recordToSend)
val metaF: Future[RecordMetadata] = producer.send(recordToSend)
val meta = metaF.get() //blocking
val msgLog =
s"""
|offset = $meta.offset()
|partition = $meta.partition()
|topic = $meta.topic()
""".stripMargin
println(msgLog)
producer.close()
However still I am similar error.
[error] 10 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
[error] acks = 1
[error] batch.size = 16384
[error] bootstrap.servers = [server1.cloud.domain:9096, server2.cloud.domain:9096, server3.cloud.domain:9096]
[error] buffer.memory = 33554432
[error] client.dns.lookup = default
[error] client.id =
[error] compression.type = none
[error] connections.max.idle.ms = 540000
[error] delivery.timeout.ms = 120000
[error] enable.idempotence = false
[error] interceptor.classes = []
[error] key.serializer = class org.apache.kafka.common.serialization.StringSerializer
[error] linger.ms = 0
[error] max.block.ms = 60000
[error] max.in.flight.requests.per.connection = 5
[error] max.request.size = 1048576
[error] metadata.max.age.ms = 300000
[error] metric.reporters = []
[error] metrics.num.samples = 2
[error] metrics.recording.level = INFO
[error] metrics.sample.window.ms = 30000
[error] partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
[error] receive.buffer.bytes = 32768
[error] reconnect.backoff.max.ms = 1000
[error] reconnect.backoff.ms = 50
[error] request.timeout.ms = 30000
[error] retries = 0
[error] retry.backoff.ms = 100
[error] sasl.client.callback.handler.class = null
[error] sasl.jaas.config = null
[error] sasl.kerberos.kinit.cmd = /usr/bin/kinit
[error] sasl.kerberos.min.time.before.relogin = 60000
[error] sasl.kerberos.service.name = kafka
[error] sasl.kerberos.ticket.renew.jitter = 0.05
[error] sasl.kerberos.ticket.renew.window.factor = 0.8
[error] sasl.login.callback.handler.class = null
[error] sasl.login.class = null
[error] sasl.login.refresh.buffer.seconds = 300
[error] sasl.login.refresh.min.period.seconds = 60
[error] sasl.login.refresh.window.factor = 0.8
[error] sasl.login.refresh.window.jitter = 0.05
[error] sasl.mechanism = GSSAPI
[error] security.protocol = PLAINTEXT
[error] send.buffer.bytes = 131072
[error] ssl.cipher.suites = null
[error] ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
[error] ssl.endpoint.identification.algorithm =
[error] ssl.key.password = null
[error] ssl.keymanager.algorithm = SunX509
[error] ssl.keystore.location = null
[error] ssl.keystore.password = null
[error] ssl.keystore.type = JKS
[error] ssl.protocol = TLS
[error] ssl.provider = null
[error] ssl.secure.random.implementation = null
[error] ssl.trustmanager.algorithm = PKIX
[error] ssl.truststore.location = null
[error] ssl.truststore.password = null
[error] ssl.truststore.type = JKS
[error] transaction.timeout.ms = 60000
[error] transactional.id = null
[error] value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
[error]
[error] 110 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.1.1
[error] 110 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 21234bee31165527
[error] 249 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: 5NMDh7lDS-SxXpgprjR6oA
[info]
[info] offset = 8
[info] partition = 1
[info] topic = myTopic
[info]
[error] 323 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[success] Total time: 1 s, completed Mar 6, 2019 3:26:53 PM
Anything I am missing here?
UPDATE 2:
As mentioned below, I changed my code. however it is not working either. I realized something is wrong in serialization.
I already have avroRecord in the GenericData.Record format. Can't I use the same to publish the data to Kafka? Why I have to use the Array of Bytes or any other serializer for the same?
Only example I found is to use io.confluent avro serializer. But I am unable to use that as sbt or maven is failing to download it now. Infact the URL: http://packages.confluent.io/maven/ is not working. Somehow I downloaded the jars and using it as external libraries.
Changed to code to:
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
val producer = new KafkaProducer[String, GenericData.Record](configuration)
val recordToSend = new ProducerRecord[String, GenericData.Record](topic, avroRecord)
Now it is working fine.
However, I am still looking for any other serializer class (which is available in Maven) to send the message as GenericData instead of Array of Bytes.
UPDATE 3:
As suggested by user @KZapagol, I tried to use the same and getting following error.
Schema : (It is complex, so need help if I am correctly transforming the data)
"type": "record","name": "MyPnl","doc": "This schema contains the metadata fields wrapped in a header field which follows the official schema.","fields": ["name":"header","type":"type":"record","name":"header","fields":["name":"messageId","type":"string","name":"businessId","type":"string","name":"batchId","type":"string","name":"sourceSystem","type":"string","name":"secondarySourceSystem","type":[ "null", "string" ],"name":"sourceSystemCreationTimestamp","type":"long","logicalType": "timestamp-millis","name":"sentBy","type":"string","name":"sentTo","type":"string","name":"messageType","type":"string","name":"schemaVersion","type":"string","name":"processing","type":"string","name":"recordOffset","type":[ "null", "string" ]],"name":"pnlData","type":"type":"record","name":"pnlData","fields":["name":"pnlHeader","type":"type":"record","name":"pnlData","namespace":"pnlHeader","fields":["name":"granularity","type":"string","name":"pnlType","type":"string","name":"pnlSubType","type":"string","name":"businessDate","type":"string","logicalType": "date","name":"bookId","type":"string","name":"bookDescription","type":"string","name":"pnlStatus","type":"string"],"name":"pnlBreakDown","type":"type":"array","items":"type":"record","name":"pnlData","namespace":"pnlBreakDown","fields":["name":"category","type":[ "null", "string" ],"name":"subCategory","type":[ "null", "string" ],"name":"riskCategory","type":[ "null", "string" ],"name":"pnlCurrency","type":"string","name":"pnlDetails", "type":"type":"array","items": "type":"record","name":"pnlData","namespace":"pnlDetails","fields":["name":"pnlLocalAmount","type":"double","name":"pnlCDEAmount","type":"double"]]]]
I have corresponding case classes for above. (Please suggest if I have missed anything here?)
case class MessageHeader( messageId: String,
businessId: String,
batchId: String,
sourceSystem: String,
secondarySourceSystem: String,
sourceSystemCreationTimestamp: Long,
sentBy: String,
sentTo: String,
messageType: String,
schemaVersion: String,
processing: String,
recordOffset: String
)
case class PnlHeader ( granularity: String,
pnlType: String,
pnlSubType: String,
businessDate: String,
bookId: String,
bookDescription: String,
pnlStatus: String
)
case class PnlDetails ( pnlLocalAmount: Double,
pnlCDEAmount: Double
)
case class PnlBreakdown ( category: String,
subCategory: String,
riskCategory: String,
pnlCurrency: String,
pnlDetails: List[PnlDetails]
)
case class PnlData ( pnlHeader: PnlHeader, pnlBreakdown: List[PnlBreakdown] )
case class PnlRecord (header: MessageHeader, pnlData: PnlData )
I have modeled my data in above PnlRecord format. I have list of such records.
From list of such records, I iterate and try to publish it to Kafka.
// Create Producer
val producer = new KafkaProducer[String, Array[Byte]](properties)
// This filename is file where above schema is saved.
val avroJsonSchema = Source.fromFile(new File(schemaFileName)).getLines.mkString
val avroMessage = new AvroMessage(avroJsonSchema)
val avroRecord = new Record(avroMessage.schema)
// recordListToSend is of type: List[PnlRecord]
for (record <- recordListToSend)
avroRecord.put("header", record.header)
avroRecord.put("pnlData", record.pnlData)
//logger.info(s"Record: $avroRecordn")
avroMessage.gdw.write(avroRecord, EncoderFactory.get().binaryEncoder(avroMessage.baos, null))
avroMessage.dfw.append(avroRecord)
avroMessage.dfw.close()
val bytes = avroMessage.baos.toByteArray
// send data
producer.send(new ProducerRecord[String, Array[Byte]](topic, bytes), new ProducerCallback)
//flush data
producer.flush()
//flush and close producer
producer.close()
AvroMessage class (as suggested by user)
import java.io.ByteArrayOutputStream
import org.apache.avro
import org.apache.avro.Schema
import org.apache.avro.file.CodecFactory
import org.apache.avro.generic.GenericDatumWriter, GenericRecord
class AvroMessage(avroJsonSchema: String)
val parser = new Schema.Parser()
val schema = parser.parse(avroJsonSchema)
val baos = new ByteArrayOutputStream()
val gdw = new GenericDatumWriter[GenericRecord](schema)
val dfw = new avro.file.DataFileWriter[GenericRecord](gdw)
val compressionLevel = 5
dfw.setCodec(CodecFactory.deflateCodec(compressionLevel))
dfw.create(schema, baos)
I am getting the below error:
2019-03-13 16:00:09.855 [application-akka.actor.default-dispatcher-11] ERROR controllers.SAController.$anonfun$publishToSA$2(34) - com.domain.sa.model.MessageHeader cannot be cast to org.apache.avro.generic.IndexedRecord
java.lang.ClassCastException: ca.domain.my.sa.model.MessageHeader cannot be cast to org.apache.avro.generic.IndexedRecord
at org.apache.avro.generic.GenericData.getField(GenericData.java:697)
at org.apache.avro.generic.GenericData.getField(GenericData.java:712)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:164)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
at ca.domain.my.sa.dao.myPnlDao$.$anonfun$publishAvroToKafka$1(myPnlDao.scala:95)
Is my original case classes are right as per the schema?
My MessageHeader case class is shown above.
My Schema is shown above (updated).
My Record:
Record: "header": Header(my_20190313180602_00000011,my_BookLevel_Daily_Regular_20181130_EMERGINGTRS,11_20181130_8259,my,null,65162584,my,SA,PnLMessage,test,RealTime,null), "pnlData": PnlData(PnlHeader(BookLevel,Daily,Regular,2018-11-30,8259,EMERGINGTRS,Locked),List(PnlBreakdown(null,null,null,eur,List(PnlDetails(0.0,0.0022547507286072))), PnlBreakdown(null,null,null,jpy,List(PnlDetails(0.0,0.0))), PnlBreakdown(null,null,null,usd,List(PnlDetails(0.19000003399301,0.642328574985149))), PnlBreakdown(null,null,null,brl,List(PnlDetails(2.65281414613128E-8,2.4107750505209E-5))), PnlBreakdown(null,null,null,gbp,List(PnlDetails(0.0,-5.05781173706088E-5))), PnlBreakdown(null,null,null,cad,List(PnlDetails(145.399999991953,145.399999991953)))))
scala apache-kafka sbt kafka-producer-api
confluent repo is available in maven, but is not browsable in browser, see: stackoverflow.com/questions/43488853/…
– tgrez
Mar 8 at 14:34
add a comment |
I am using Scala 2.12 and have required libraries to convert the message to Avro (need to convert) and kafka clients too.
I am running the code on the Linux host (dev) where other application (Apache NiFi) is running and able to create KafkaProducer and publish the message to remote Kafka.
Since it is dev for now, the protocol is PLAINTEXT.
E.g. of KafkaProducer config in Nifi.
acks = 1
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers = [server1.cloud.domain:9096, server2.cloud.domain:9096, server3.cloud.domain:9096]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 5000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = kafka
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 30000
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
Also, NiFi starts with java option to use JAAS file, whose contents are:
KafkaClient
com.sun.security.auth.module.Krb5LoginModule required
principal="myUserName@myRealm"
useKeyTab=true
client=true
keyTab="/path/myfile.keytab"
serviceName="kafka";
;
Also the krb5.conf file is available which is used.
Using above config, NiFi is able to create KafkaProducer and send messages across.
Now, I am using the same with Scala code. Simple Class which uses following build.sbt and code, to send the message.
build.sbt:
// https://mvnrepository.com/artifact/org.apache.avro/avro
libraryDependencies += "org.apache.avro" % "avro" % "1.8.1"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.1.1"
libraryDependencies += "org.slf4j" % "slf4j-simple" % "1.6.4"
fork in run := true
javaOptions += "-Djava.security.auth.login.config=/path/to/jaas/kafka-jaas.conf"
javaOptions += "-Djava.security.krb5.conf=/path/to/krb/krb5.conf"
My code to send message. Removed unwanted lines for brevity. Please note testing of creating the data to Avro is running fine. The same message when given to NiFi, it is being able to publish correctly to the topic. What is not running, is the publish to kafka using Scala.
Code:
package example
import java.io.ByteArrayOutputStream
import java.util
import java.io.File
import java.util.Properties, UUID
import org.apache.avro.Schema.Parser
import org.apache.avro.Schema
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.avro.generic.GenericData.Record
import org.apache.avro.io.DecoderFactory, EncoderFactory
import org.apache.kafka.clients.producer.KafkaProducer, ProducerConfig, ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import scala.io.Source
import scala.io.StdIn
object Hello extends Greeting with App
// case classes for creating avro record
// This part works fine.
val schemaFile = "/path/Schema.avsc"
val schema = new Schema.Parser().parse(new File(schemaFile))
val reader = new GenericDatumReader[GenericRecord](schema)
val avroRecord = new GenericData.Record(schema)
// populate correctly the record.
// works fine.
val brokers = "server1.domain:9096,server2.domain:9096,server3.domain:9096"
val topic = "myTopic"
private def configuration: Properties =
val props = new Properties()
props.put("bootstrap.servers", brokers)
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer")
props.put("security.protocol", "PLAINTEXT")
props.put("sasl.kerberos.service.name", "kafka")
props.put("acks", "all")
props.put("retries","0")
props
val producer = new KafkaProducer[String, Array[Byte]](configuration)
val writer = new SpecificDatumWriter[GenericRecord](schema)
val out = new ByteArrayOutputStream()
val encoder = EncoderFactory.get.binaryEncoder(out, null)
writer.write(avroRecord, encoder)
encoder.flush()
out.close()
val serializedBytes: Array[Byte] = out.toByteArray()
val recordToSend = new ProducerRecord[String, Array[Byte]](topic, serializedBytes)
producer.send(recordToSend)
trait Greeting
lazy val greeting: String = "hello"
When I run it on sbt command line:
sbt clean
sbt compile
sbt run
I get the following error/output. Nothing published.
Output:
-bash-4.2$ sbt run
[warn] Executing in batch mode.
[warn] For better performance, hit [ENTER] to switch to interactive mode, or
[warn] consider launching sbt without any commands, or explicitly passing 'shell'
[info] Loading project definition from /path/Scala/hello-world/project
[info] Set current project to hello-world (in build file:/path/Scala/hello-world/)
[info] Running example.Hello
[info] hello
[info]
[error] 9 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
[error] acks = 1
[error] batch.size = 16384
[error] bootstrap.servers = [server1.cloud.domain:9096, server2.cloud.domain:9096, server3.cloud.domain:9096]
[error] buffer.memory = 33554432
[error] client.dns.lookup = default
[error] client.id =
[error] compression.type = none
[error] connections.max.idle.ms = 540000
[error] delivery.timeout.ms = 120000
[error] enable.idempotence = false
[error] interceptor.classes = []
[error] key.serializer = class org.apache.kafka.common.serialization.StringSerializer
[error] linger.ms = 0
[error] max.block.ms = 60000
[error] max.in.flight.requests.per.connection = 5
[error] max.request.size = 1048576
[error] metadata.max.age.ms = 300000
[error] metric.reporters = []
[error] metrics.num.samples = 2
[error] metrics.recording.level = INFO
[error] metrics.sample.window.ms = 30000
[error] partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
[error] receive.buffer.bytes = 32768
[error] reconnect.backoff.max.ms = 1000
[error] reconnect.backoff.ms = 50
[error] request.timeout.ms = 30000
[error] retries = 0
[error] retry.backoff.ms = 100
[error] sasl.client.callback.handler.class = null
[error] sasl.jaas.config = null
[error] sasl.kerberos.kinit.cmd = /usr/bin/kinit
[error] sasl.kerberos.min.time.before.relogin = 60000
[error] sasl.kerberos.service.name = kafka
[error] sasl.kerberos.ticket.renew.jitter = 0.05
[error] sasl.kerberos.ticket.renew.window.factor = 0.8
[error] sasl.login.callback.handler.class = null
[error] sasl.login.class = null
[error] sasl.login.refresh.buffer.seconds = 300
[error] sasl.login.refresh.min.period.seconds = 60
[error] sasl.login.refresh.window.factor = 0.8
[error] sasl.login.refresh.window.jitter = 0.05
[error] sasl.mechanism = GSSAPI
[error] security.protocol = PLAINTEXT
[error] send.buffer.bytes = 131072
[error] ssl.cipher.suites = null
[error] ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
[error] ssl.endpoint.identification.algorithm =
[error] ssl.key.password = null
[error] ssl.keymanager.algorithm = SunX509
[error] ssl.keystore.location = null
[error] ssl.keystore.password = null
[error] ssl.keystore.type = JKS
[error] ssl.protocol = TLS
[error] ssl.provider = null
[error] ssl.secure.random.implementation = null
[error] ssl.trustmanager.algorithm = PKIX
[error] ssl.truststore.location = null
[error] ssl.truststore.password = null
[error] ssl.truststore.type = JKS
[error] transaction.timeout.ms = 60000
[error] transactional.id = null
[error] value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
[error]
[error] 109 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.1.1
[error] 109 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 21234bee31165527
[error] 248 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: 5NMDh7lDS-SxXpgprjR6oA
[success] Total time: 1 s, completed Mar 6, 2019 1:38:14 PM
I am sure, it has to do something with security or kerberos. But other apps are able to push message, not with my scala code.
UPDATE:
Based on the response from @tgrez , I tried to block with Future get.
//producer.send(recordToSend)
val metaF: Future[RecordMetadata] = producer.send(recordToSend)
val meta = metaF.get() //blocking
val msgLog =
s"""
|offset = $meta.offset()
|partition = $meta.partition()
|topic = $meta.topic()
""".stripMargin
println(msgLog)
producer.close()
However still I am similar error.
[error] 10 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
[error] acks = 1
[error] batch.size = 16384
[error] bootstrap.servers = [server1.cloud.domain:9096, server2.cloud.domain:9096, server3.cloud.domain:9096]
[error] buffer.memory = 33554432
[error] client.dns.lookup = default
[error] client.id =
[error] compression.type = none
[error] connections.max.idle.ms = 540000
[error] delivery.timeout.ms = 120000
[error] enable.idempotence = false
[error] interceptor.classes = []
[error] key.serializer = class org.apache.kafka.common.serialization.StringSerializer
[error] linger.ms = 0
[error] max.block.ms = 60000
[error] max.in.flight.requests.per.connection = 5
[error] max.request.size = 1048576
[error] metadata.max.age.ms = 300000
[error] metric.reporters = []
[error] metrics.num.samples = 2
[error] metrics.recording.level = INFO
[error] metrics.sample.window.ms = 30000
[error] partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
[error] receive.buffer.bytes = 32768
[error] reconnect.backoff.max.ms = 1000
[error] reconnect.backoff.ms = 50
[error] request.timeout.ms = 30000
[error] retries = 0
[error] retry.backoff.ms = 100
[error] sasl.client.callback.handler.class = null
[error] sasl.jaas.config = null
[error] sasl.kerberos.kinit.cmd = /usr/bin/kinit
[error] sasl.kerberos.min.time.before.relogin = 60000
[error] sasl.kerberos.service.name = kafka
[error] sasl.kerberos.ticket.renew.jitter = 0.05
[error] sasl.kerberos.ticket.renew.window.factor = 0.8
[error] sasl.login.callback.handler.class = null
[error] sasl.login.class = null
[error] sasl.login.refresh.buffer.seconds = 300
[error] sasl.login.refresh.min.period.seconds = 60
[error] sasl.login.refresh.window.factor = 0.8
[error] sasl.login.refresh.window.jitter = 0.05
[error] sasl.mechanism = GSSAPI
[error] security.protocol = PLAINTEXT
[error] send.buffer.bytes = 131072
[error] ssl.cipher.suites = null
[error] ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
[error] ssl.endpoint.identification.algorithm =
[error] ssl.key.password = null
[error] ssl.keymanager.algorithm = SunX509
[error] ssl.keystore.location = null
[error] ssl.keystore.password = null
[error] ssl.keystore.type = JKS
[error] ssl.protocol = TLS
[error] ssl.provider = null
[error] ssl.secure.random.implementation = null
[error] ssl.trustmanager.algorithm = PKIX
[error] ssl.truststore.location = null
[error] ssl.truststore.password = null
[error] ssl.truststore.type = JKS
[error] transaction.timeout.ms = 60000
[error] transactional.id = null
[error] value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
[error]
[error] 110 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.1.1
[error] 110 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 21234bee31165527
[error] 249 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: 5NMDh7lDS-SxXpgprjR6oA
[info]
[info] offset = 8
[info] partition = 1
[info] topic = myTopic
[info]
[error] 323 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[success] Total time: 1 s, completed Mar 6, 2019 3:26:53 PM
Anything I am missing here?
UPDATE 2:
As mentioned below, I changed my code. however it is not working either. I realized something is wrong in serialization.
I already have avroRecord in the GenericData.Record format. Can't I use the same to publish the data to Kafka? Why I have to use the Array of Bytes or any other serializer for the same?
Only example I found is to use io.confluent avro serializer. But I am unable to use that as sbt or maven is failing to download it now. Infact the URL: http://packages.confluent.io/maven/ is not working. Somehow I downloaded the jars and using it as external libraries.
Changed to code to:
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
val producer = new KafkaProducer[String, GenericData.Record](configuration)
val recordToSend = new ProducerRecord[String, GenericData.Record](topic, avroRecord)
Now it is working fine.
However, I am still looking for any other serializer class (which is available in Maven) to send the message as GenericData instead of Array of Bytes.
UPDATE 3:
As suggested by user @KZapagol, I tried to use the same and getting following error.
Schema : (It is complex, so need help if I am correctly transforming the data)
"type": "record","name": "MyPnl","doc": "This schema contains the metadata fields wrapped in a header field which follows the official schema.","fields": ["name":"header","type":"type":"record","name":"header","fields":["name":"messageId","type":"string","name":"businessId","type":"string","name":"batchId","type":"string","name":"sourceSystem","type":"string","name":"secondarySourceSystem","type":[ "null", "string" ],"name":"sourceSystemCreationTimestamp","type":"long","logicalType": "timestamp-millis","name":"sentBy","type":"string","name":"sentTo","type":"string","name":"messageType","type":"string","name":"schemaVersion","type":"string","name":"processing","type":"string","name":"recordOffset","type":[ "null", "string" ]],"name":"pnlData","type":"type":"record","name":"pnlData","fields":["name":"pnlHeader","type":"type":"record","name":"pnlData","namespace":"pnlHeader","fields":["name":"granularity","type":"string","name":"pnlType","type":"string","name":"pnlSubType","type":"string","name":"businessDate","type":"string","logicalType": "date","name":"bookId","type":"string","name":"bookDescription","type":"string","name":"pnlStatus","type":"string"],"name":"pnlBreakDown","type":"type":"array","items":"type":"record","name":"pnlData","namespace":"pnlBreakDown","fields":["name":"category","type":[ "null", "string" ],"name":"subCategory","type":[ "null", "string" ],"name":"riskCategory","type":[ "null", "string" ],"name":"pnlCurrency","type":"string","name":"pnlDetails", "type":"type":"array","items": "type":"record","name":"pnlData","namespace":"pnlDetails","fields":["name":"pnlLocalAmount","type":"double","name":"pnlCDEAmount","type":"double"]]]]
I have corresponding case classes for above. (Please suggest if I have missed anything here?)
case class MessageHeader( messageId: String,
businessId: String,
batchId: String,
sourceSystem: String,
secondarySourceSystem: String,
sourceSystemCreationTimestamp: Long,
sentBy: String,
sentTo: String,
messageType: String,
schemaVersion: String,
processing: String,
recordOffset: String
)
case class PnlHeader ( granularity: String,
pnlType: String,
pnlSubType: String,
businessDate: String,
bookId: String,
bookDescription: String,
pnlStatus: String
)
case class PnlDetails ( pnlLocalAmount: Double,
pnlCDEAmount: Double
)
case class PnlBreakdown ( category: String,
subCategory: String,
riskCategory: String,
pnlCurrency: String,
pnlDetails: List[PnlDetails]
)
case class PnlData ( pnlHeader: PnlHeader, pnlBreakdown: List[PnlBreakdown] )
case class PnlRecord (header: MessageHeader, pnlData: PnlData )
I have modeled my data in above PnlRecord format. I have list of such records.
From list of such records, I iterate and try to publish it to Kafka.
// Create Producer
val producer = new KafkaProducer[String, Array[Byte]](properties)
// This filename is file where above schema is saved.
val avroJsonSchema = Source.fromFile(new File(schemaFileName)).getLines.mkString
val avroMessage = new AvroMessage(avroJsonSchema)
val avroRecord = new Record(avroMessage.schema)
// recordListToSend is of type: List[PnlRecord]
for (record <- recordListToSend)
avroRecord.put("header", record.header)
avroRecord.put("pnlData", record.pnlData)
//logger.info(s"Record: $avroRecordn")
avroMessage.gdw.write(avroRecord, EncoderFactory.get().binaryEncoder(avroMessage.baos, null))
avroMessage.dfw.append(avroRecord)
avroMessage.dfw.close()
val bytes = avroMessage.baos.toByteArray
// send data
producer.send(new ProducerRecord[String, Array[Byte]](topic, bytes), new ProducerCallback)
//flush data
producer.flush()
//flush and close producer
producer.close()
AvroMessage class (as suggested by user)
import java.io.ByteArrayOutputStream
import org.apache.avro
import org.apache.avro.Schema
import org.apache.avro.file.CodecFactory
import org.apache.avro.generic.GenericDatumWriter, GenericRecord
class AvroMessage(avroJsonSchema: String)
val parser = new Schema.Parser()
val schema = parser.parse(avroJsonSchema)
val baos = new ByteArrayOutputStream()
val gdw = new GenericDatumWriter[GenericRecord](schema)
val dfw = new avro.file.DataFileWriter[GenericRecord](gdw)
val compressionLevel = 5
dfw.setCodec(CodecFactory.deflateCodec(compressionLevel))
dfw.create(schema, baos)
I am getting the below error:
2019-03-13 16:00:09.855 [application-akka.actor.default-dispatcher-11] ERROR controllers.SAController.$anonfun$publishToSA$2(34) - com.domain.sa.model.MessageHeader cannot be cast to org.apache.avro.generic.IndexedRecord
java.lang.ClassCastException: ca.domain.my.sa.model.MessageHeader cannot be cast to org.apache.avro.generic.IndexedRecord
at org.apache.avro.generic.GenericData.getField(GenericData.java:697)
at org.apache.avro.generic.GenericData.getField(GenericData.java:712)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:164)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
at ca.domain.my.sa.dao.myPnlDao$.$anonfun$publishAvroToKafka$1(myPnlDao.scala:95)
Is my original case classes are right as per the schema?
My MessageHeader case class is shown above.
My Schema is shown above (updated).
My Record:
Record: "header": Header(my_20190313180602_00000011,my_BookLevel_Daily_Regular_20181130_EMERGINGTRS,11_20181130_8259,my,null,65162584,my,SA,PnLMessage,test,RealTime,null), "pnlData": PnlData(PnlHeader(BookLevel,Daily,Regular,2018-11-30,8259,EMERGINGTRS,Locked),List(PnlBreakdown(null,null,null,eur,List(PnlDetails(0.0,0.0022547507286072))), PnlBreakdown(null,null,null,jpy,List(PnlDetails(0.0,0.0))), PnlBreakdown(null,null,null,usd,List(PnlDetails(0.19000003399301,0.642328574985149))), PnlBreakdown(null,null,null,brl,List(PnlDetails(2.65281414613128E-8,2.4107750505209E-5))), PnlBreakdown(null,null,null,gbp,List(PnlDetails(0.0,-5.05781173706088E-5))), PnlBreakdown(null,null,null,cad,List(PnlDetails(145.399999991953,145.399999991953)))))
scala apache-kafka sbt kafka-producer-api
I am using Scala 2.12 and have required libraries to convert the message to Avro (need to convert) and kafka clients too.
I am running the code on the Linux host (dev) where other application (Apache NiFi) is running and able to create KafkaProducer and publish the message to remote Kafka.
Since it is dev for now, the protocol is PLAINTEXT.
E.g. of KafkaProducer config in Nifi.
acks = 1
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers = [server1.cloud.domain:9096, server2.cloud.domain:9096, server3.cloud.domain:9096]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 5000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = kafka
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 30000
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
Also, NiFi starts with java option to use JAAS file, whose contents are:
KafkaClient
com.sun.security.auth.module.Krb5LoginModule required
principal="myUserName@myRealm"
useKeyTab=true
client=true
keyTab="/path/myfile.keytab"
serviceName="kafka";
;
Also the krb5.conf file is available which is used.
Using above config, NiFi is able to create KafkaProducer and send messages across.
Now, I am using the same with Scala code. Simple Class which uses following build.sbt and code, to send the message.
build.sbt:
// https://mvnrepository.com/artifact/org.apache.avro/avro
libraryDependencies += "org.apache.avro" % "avro" % "1.8.1"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.1.1"
libraryDependencies += "org.slf4j" % "slf4j-simple" % "1.6.4"
fork in run := true
javaOptions += "-Djava.security.auth.login.config=/path/to/jaas/kafka-jaas.conf"
javaOptions += "-Djava.security.krb5.conf=/path/to/krb/krb5.conf"
My code to send message. Removed unwanted lines for brevity. Please note testing of creating the data to Avro is running fine. The same message when given to NiFi, it is being able to publish correctly to the topic. What is not running, is the publish to kafka using Scala.
Code:
package example
import java.io.ByteArrayOutputStream
import java.util
import java.io.File
import java.util.Properties, UUID
import org.apache.avro.Schema.Parser
import org.apache.avro.Schema
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.avro.generic.GenericData.Record
import org.apache.avro.io.DecoderFactory, EncoderFactory
import org.apache.kafka.clients.producer.KafkaProducer, ProducerConfig, ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import scala.io.Source
import scala.io.StdIn
object Hello extends Greeting with App
// case classes for creating avro record
// This part works fine.
val schemaFile = "/path/Schema.avsc"
val schema = new Schema.Parser().parse(new File(schemaFile))
val reader = new GenericDatumReader[GenericRecord](schema)
val avroRecord = new GenericData.Record(schema)
// populate correctly the record.
// works fine.
val brokers = "server1.domain:9096,server2.domain:9096,server3.domain:9096"
val topic = "myTopic"
private def configuration: Properties =
val props = new Properties()
props.put("bootstrap.servers", brokers)
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer")
props.put("security.protocol", "PLAINTEXT")
props.put("sasl.kerberos.service.name", "kafka")
props.put("acks", "all")
props.put("retries","0")
props
val producer = new KafkaProducer[String, Array[Byte]](configuration)
val writer = new SpecificDatumWriter[GenericRecord](schema)
val out = new ByteArrayOutputStream()
val encoder = EncoderFactory.get.binaryEncoder(out, null)
writer.write(avroRecord, encoder)
encoder.flush()
out.close()
val serializedBytes: Array[Byte] = out.toByteArray()
val recordToSend = new ProducerRecord[String, Array[Byte]](topic, serializedBytes)
producer.send(recordToSend)
trait Greeting
lazy val greeting: String = "hello"
When I run it on sbt command line:
sbt clean
sbt compile
sbt run
I get the following error/output. Nothing published.
Output:
-bash-4.2$ sbt run
[warn] Executing in batch mode.
[warn] For better performance, hit [ENTER] to switch to interactive mode, or
[warn] consider launching sbt without any commands, or explicitly passing 'shell'
[info] Loading project definition from /path/Scala/hello-world/project
[info] Set current project to hello-world (in build file:/path/Scala/hello-world/)
[info] Running example.Hello
[info] hello
[info]
[error] 9 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
[error] acks = 1
[error] batch.size = 16384
[error] bootstrap.servers = [server1.cloud.domain:9096, server2.cloud.domain:9096, server3.cloud.domain:9096]
[error] buffer.memory = 33554432
[error] client.dns.lookup = default
[error] client.id =
[error] compression.type = none
[error] connections.max.idle.ms = 540000
[error] delivery.timeout.ms = 120000
[error] enable.idempotence = false
[error] interceptor.classes = []
[error] key.serializer = class org.apache.kafka.common.serialization.StringSerializer
[error] linger.ms = 0
[error] max.block.ms = 60000
[error] max.in.flight.requests.per.connection = 5
[error] max.request.size = 1048576
[error] metadata.max.age.ms = 300000
[error] metric.reporters = []
[error] metrics.num.samples = 2
[error] metrics.recording.level = INFO
[error] metrics.sample.window.ms = 30000
[error] partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
[error] receive.buffer.bytes = 32768
[error] reconnect.backoff.max.ms = 1000
[error] reconnect.backoff.ms = 50
[error] request.timeout.ms = 30000
[error] retries = 0
[error] retry.backoff.ms = 100
[error] sasl.client.callback.handler.class = null
[error] sasl.jaas.config = null
[error] sasl.kerberos.kinit.cmd = /usr/bin/kinit
[error] sasl.kerberos.min.time.before.relogin = 60000
[error] sasl.kerberos.service.name = kafka
[error] sasl.kerberos.ticket.renew.jitter = 0.05
[error] sasl.kerberos.ticket.renew.window.factor = 0.8
[error] sasl.login.callback.handler.class = null
[error] sasl.login.class = null
[error] sasl.login.refresh.buffer.seconds = 300
[error] sasl.login.refresh.min.period.seconds = 60
[error] sasl.login.refresh.window.factor = 0.8
[error] sasl.login.refresh.window.jitter = 0.05
[error] sasl.mechanism = GSSAPI
[error] security.protocol = PLAINTEXT
[error] send.buffer.bytes = 131072
[error] ssl.cipher.suites = null
[error] ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
[error] ssl.endpoint.identification.algorithm =
[error] ssl.key.password = null
[error] ssl.keymanager.algorithm = SunX509
[error] ssl.keystore.location = null
[error] ssl.keystore.password = null
[error] ssl.keystore.type = JKS
[error] ssl.protocol = TLS
[error] ssl.provider = null
[error] ssl.secure.random.implementation = null
[error] ssl.trustmanager.algorithm = PKIX
[error] ssl.truststore.location = null
[error] ssl.truststore.password = null
[error] ssl.truststore.type = JKS
[error] transaction.timeout.ms = 60000
[error] transactional.id = null
[error] value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
[error]
[error] 109 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.1.1
[error] 109 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 21234bee31165527
[error] 248 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: 5NMDh7lDS-SxXpgprjR6oA
[success] Total time: 1 s, completed Mar 6, 2019 1:38:14 PM
I am sure, it has to do something with security or kerberos. But other apps are able to push message, not with my scala code.
UPDATE:
Based on the response from @tgrez , I tried to block with Future get.
//producer.send(recordToSend)
val metaF: Future[RecordMetadata] = producer.send(recordToSend)
val meta = metaF.get() //blocking
val msgLog =
s"""
|offset = $meta.offset()
|partition = $meta.partition()
|topic = $meta.topic()
""".stripMargin
println(msgLog)
producer.close()
However still I am similar error.
[error] 10 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
[error] acks = 1
[error] batch.size = 16384
[error] bootstrap.servers = [server1.cloud.domain:9096, server2.cloud.domain:9096, server3.cloud.domain:9096]
[error] buffer.memory = 33554432
[error] client.dns.lookup = default
[error] client.id =
[error] compression.type = none
[error] connections.max.idle.ms = 540000
[error] delivery.timeout.ms = 120000
[error] enable.idempotence = false
[error] interceptor.classes = []
[error] key.serializer = class org.apache.kafka.common.serialization.StringSerializer
[error] linger.ms = 0
[error] max.block.ms = 60000
[error] max.in.flight.requests.per.connection = 5
[error] max.request.size = 1048576
[error] metadata.max.age.ms = 300000
[error] metric.reporters = []
[error] metrics.num.samples = 2
[error] metrics.recording.level = INFO
[error] metrics.sample.window.ms = 30000
[error] partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
[error] receive.buffer.bytes = 32768
[error] reconnect.backoff.max.ms = 1000
[error] reconnect.backoff.ms = 50
[error] request.timeout.ms = 30000
[error] retries = 0
[error] retry.backoff.ms = 100
[error] sasl.client.callback.handler.class = null
[error] sasl.jaas.config = null
[error] sasl.kerberos.kinit.cmd = /usr/bin/kinit
[error] sasl.kerberos.min.time.before.relogin = 60000
[error] sasl.kerberos.service.name = kafka
[error] sasl.kerberos.ticket.renew.jitter = 0.05
[error] sasl.kerberos.ticket.renew.window.factor = 0.8
[error] sasl.login.callback.handler.class = null
[error] sasl.login.class = null
[error] sasl.login.refresh.buffer.seconds = 300
[error] sasl.login.refresh.min.period.seconds = 60
[error] sasl.login.refresh.window.factor = 0.8
[error] sasl.login.refresh.window.jitter = 0.05
[error] sasl.mechanism = GSSAPI
[error] security.protocol = PLAINTEXT
[error] send.buffer.bytes = 131072
[error] ssl.cipher.suites = null
[error] ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
[error] ssl.endpoint.identification.algorithm =
[error] ssl.key.password = null
[error] ssl.keymanager.algorithm = SunX509
[error] ssl.keystore.location = null
[error] ssl.keystore.password = null
[error] ssl.keystore.type = JKS
[error] ssl.protocol = TLS
[error] ssl.provider = null
[error] ssl.secure.random.implementation = null
[error] ssl.trustmanager.algorithm = PKIX
[error] ssl.truststore.location = null
[error] ssl.truststore.password = null
[error] ssl.truststore.type = JKS
[error] transaction.timeout.ms = 60000
[error] transactional.id = null
[error] value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
[error]
[error] 110 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.1.1
[error] 110 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 21234bee31165527
[error] 249 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: 5NMDh7lDS-SxXpgprjR6oA
[info]
[info] offset = 8
[info] partition = 1
[info] topic = myTopic
[info]
[error] 323 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[success] Total time: 1 s, completed Mar 6, 2019 3:26:53 PM
Anything I am missing here?
UPDATE 2:
As mentioned below, I changed my code. however it is not working either. I realized something is wrong in serialization.
I already have avroRecord in the GenericData.Record format. Can't I use the same to publish the data to Kafka? Why I have to use the Array of Bytes or any other serializer for the same?
Only example I found is to use io.confluent avro serializer. But I am unable to use that as sbt or maven is failing to download it now. Infact the URL: http://packages.confluent.io/maven/ is not working. Somehow I downloaded the jars and using it as external libraries.
Changed to code to:
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
val producer = new KafkaProducer[String, GenericData.Record](configuration)
val recordToSend = new ProducerRecord[String, GenericData.Record](topic, avroRecord)
Now it is working fine.
However, I am still looking for any other serializer class (which is available in Maven) to send the message as GenericData instead of Array of Bytes.
UPDATE 3:
As suggested by user @KZapagol, I tried to use the same and getting following error.
Schema : (It is complex, so need help if I am correctly transforming the data)
"type": "record","name": "MyPnl","doc": "This schema contains the metadata fields wrapped in a header field which follows the official schema.","fields": ["name":"header","type":"type":"record","name":"header","fields":["name":"messageId","type":"string","name":"businessId","type":"string","name":"batchId","type":"string","name":"sourceSystem","type":"string","name":"secondarySourceSystem","type":[ "null", "string" ],"name":"sourceSystemCreationTimestamp","type":"long","logicalType": "timestamp-millis","name":"sentBy","type":"string","name":"sentTo","type":"string","name":"messageType","type":"string","name":"schemaVersion","type":"string","name":"processing","type":"string","name":"recordOffset","type":[ "null", "string" ]],"name":"pnlData","type":"type":"record","name":"pnlData","fields":["name":"pnlHeader","type":"type":"record","name":"pnlData","namespace":"pnlHeader","fields":["name":"granularity","type":"string","name":"pnlType","type":"string","name":"pnlSubType","type":"string","name":"businessDate","type":"string","logicalType": "date","name":"bookId","type":"string","name":"bookDescription","type":"string","name":"pnlStatus","type":"string"],"name":"pnlBreakDown","type":"type":"array","items":"type":"record","name":"pnlData","namespace":"pnlBreakDown","fields":["name":"category","type":[ "null", "string" ],"name":"subCategory","type":[ "null", "string" ],"name":"riskCategory","type":[ "null", "string" ],"name":"pnlCurrency","type":"string","name":"pnlDetails", "type":"type":"array","items": "type":"record","name":"pnlData","namespace":"pnlDetails","fields":["name":"pnlLocalAmount","type":"double","name":"pnlCDEAmount","type":"double"]]]]
I have corresponding case classes for above. (Please suggest if I have missed anything here?)
case class MessageHeader( messageId: String,
businessId: String,
batchId: String,
sourceSystem: String,
secondarySourceSystem: String,
sourceSystemCreationTimestamp: Long,
sentBy: String,
sentTo: String,
messageType: String,
schemaVersion: String,
processing: String,
recordOffset: String
)
case class PnlHeader ( granularity: String,
pnlType: String,
pnlSubType: String,
businessDate: String,
bookId: String,
bookDescription: String,
pnlStatus: String
)
case class PnlDetails ( pnlLocalAmount: Double,
pnlCDEAmount: Double
)
case class PnlBreakdown ( category: String,
subCategory: String,
riskCategory: String,
pnlCurrency: String,
pnlDetails: List[PnlDetails]
)
case class PnlData ( pnlHeader: PnlHeader, pnlBreakdown: List[PnlBreakdown] )
case class PnlRecord (header: MessageHeader, pnlData: PnlData )
I have modeled my data in above PnlRecord format. I have list of such records.
From list of such records, I iterate and try to publish it to Kafka.
// Create Producer
val producer = new KafkaProducer[String, Array[Byte]](properties)
// This filename is file where above schema is saved.
val avroJsonSchema = Source.fromFile(new File(schemaFileName)).getLines.mkString
val avroMessage = new AvroMessage(avroJsonSchema)
val avroRecord = new Record(avroMessage.schema)
// recordListToSend is of type: List[PnlRecord]
for (record <- recordListToSend)
avroRecord.put("header", record.header)
avroRecord.put("pnlData", record.pnlData)
//logger.info(s"Record: $avroRecordn")
avroMessage.gdw.write(avroRecord, EncoderFactory.get().binaryEncoder(avroMessage.baos, null))
avroMessage.dfw.append(avroRecord)
avroMessage.dfw.close()
val bytes = avroMessage.baos.toByteArray
// send data
producer.send(new ProducerRecord[String, Array[Byte]](topic, bytes), new ProducerCallback)
//flush data
producer.flush()
//flush and close producer
producer.close()
AvroMessage class (as suggested by user)
import java.io.ByteArrayOutputStream
import org.apache.avro
import org.apache.avro.Schema
import org.apache.avro.file.CodecFactory
import org.apache.avro.generic.GenericDatumWriter, GenericRecord
class AvroMessage(avroJsonSchema: String)
val parser = new Schema.Parser()
val schema = parser.parse(avroJsonSchema)
val baos = new ByteArrayOutputStream()
val gdw = new GenericDatumWriter[GenericRecord](schema)
val dfw = new avro.file.DataFileWriter[GenericRecord](gdw)
val compressionLevel = 5
dfw.setCodec(CodecFactory.deflateCodec(compressionLevel))
dfw.create(schema, baos)
I am getting the below error:
2019-03-13 16:00:09.855 [application-akka.actor.default-dispatcher-11] ERROR controllers.SAController.$anonfun$publishToSA$2(34) - com.domain.sa.model.MessageHeader cannot be cast to org.apache.avro.generic.IndexedRecord
java.lang.ClassCastException: ca.domain.my.sa.model.MessageHeader cannot be cast to org.apache.avro.generic.IndexedRecord
at org.apache.avro.generic.GenericData.getField(GenericData.java:697)
at org.apache.avro.generic.GenericData.getField(GenericData.java:712)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:164)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
at ca.domain.my.sa.dao.myPnlDao$.$anonfun$publishAvroToKafka$1(myPnlDao.scala:95)
Is my original case classes are right as per the schema?
My MessageHeader case class is shown above.
My Schema is shown above (updated).
My Record:
Record: "header": Header(my_20190313180602_00000011,my_BookLevel_Daily_Regular_20181130_EMERGINGTRS,11_20181130_8259,my,null,65162584,my,SA,PnLMessage,test,RealTime,null), "pnlData": PnlData(PnlHeader(BookLevel,Daily,Regular,2018-11-30,8259,EMERGINGTRS,Locked),List(PnlBreakdown(null,null,null,eur,List(PnlDetails(0.0,0.0022547507286072))), PnlBreakdown(null,null,null,jpy,List(PnlDetails(0.0,0.0))), PnlBreakdown(null,null,null,usd,List(PnlDetails(0.19000003399301,0.642328574985149))), PnlBreakdown(null,null,null,brl,List(PnlDetails(2.65281414613128E-8,2.4107750505209E-5))), PnlBreakdown(null,null,null,gbp,List(PnlDetails(0.0,-5.05781173706088E-5))), PnlBreakdown(null,null,null,cad,List(PnlDetails(145.399999991953,145.399999991953)))))
scala apache-kafka sbt kafka-producer-api
scala apache-kafka sbt kafka-producer-api
edited yesterday
Mihir
asked Mar 6 at 18:44
MihirMihir
369
369
confluent repo is available in maven, but is not browsable in browser, see: stackoverflow.com/questions/43488853/…
– tgrez
Mar 8 at 14:34
add a comment |
confluent repo is available in maven, but is not browsable in browser, see: stackoverflow.com/questions/43488853/…
– tgrez
Mar 8 at 14:34
confluent repo is available in maven, but is not browsable in browser, see: stackoverflow.com/questions/43488853/…
– tgrez
Mar 8 at 14:34
confluent repo is available in maven, but is not browsable in browser, see: stackoverflow.com/questions/43488853/…
– tgrez
Mar 8 at 14:34
add a comment |
2 Answers
2
active
oldest
votes
Please update your code as below and try once. It looks like you have not closed the output stream,encoder and producer properly.
val producer = new KafkaProducer[String, Array[Byte]](configuration)
val writer = new SpecificDatumWriter[GenericRecord](schema)
val out = new ByteArrayOutputStream()
val encoder = EncoderFactory.get.binaryEncoder(out, null)
writer.write(avroRecord, encoder)
val serializedBytes: Array[Byte] = out.toByteArray()
encoder.flush()
out.close()
val recordToSend = new ProducerRecord[String, Array[Byte]](topic, serializedBytes)
producer.send(recordToSend,new ProducerCallback)
//flush data
producer.flush()
//flush and close producer
producer.close()
class ProducerCallback(implicit logger: Logger) extends Callback
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit =
//executes every time a record is successfully sent or exception thrown
Option(metadata) match
case Some(_) =>
logger.info("Received new metadata. n" +
"Topic: " + metadata.topic() + "n" +
"Partition: " + metadata.partition() + "n" +
"Offset: " + metadata.offset() + "n" +
"Timestamp: " + metadata.timestamp() + "n" +
"Checksum: " + metadata.checksum())
case None => ;
Option(exception) match
case Some(_) =>
logger.error("Exception thrown during processing of record... " + exception)
throw exception
case None => ;
Please refer link https://github.com/Zapagol/apache-kafka/tree/master/src/main/scala/com/org/apache for more kafka producer and consumer examples. Hope it will help!
Update
I have added KafkaProducer example for Avroschema input. Please refer https://github.com/Zapagol/apache-kafka/blob/master/src/main/scala/com/org/apache/producers/ProducerForAvroschema.scala .
I have used apache avro jar and sample avsc file as below. Please modify schema file according to your requirement.And I am able to produce record successfully.
"type": "record",
"name": "employee",
"fields": [
"name": "name", "type": "string",
"name": "id", "type": "int",
"name": "mobileNumber", "type": ["string", "null"],
"name": "salary", "type": ["int", "null"]
]
Thanks for the input. I tried the same, however it was still not able to send the message. I wanted to check if my serialization is happening correctly or not. I tried to download the io.confluent JARs and used "io.confluent.kafka.serializers.KafkaAvroSerializer" as my Value seriliazer. Basically my record:val recordToSend = new ProducerRecord[String, GenericData.Record](topic, avroRecord)For this, only serializer I found is from io.confluent. Is there any other way to use the above? i.e.I have avro record (GenericRecord) and send the same format to Kafka? No bytesArray.
– Mihir
Mar 8 at 3:25
If you have avsc file which defines schema for your data then you can useorg.apache.avro.generic.GenericDataandorg.apache.avro.generic.GenericData.Recordor else you can useavro4gjar.
– KZapagol
Mar 8 at 5:17
Hi @KZapagol, thanks. Yes, I have avsc file.My avro record is created as per that.val avroRecord = new GenericData.Record(schema)I wanted to know after this, when we send the data to Kafka, what should be the Value serializer (props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")). This works but I have issues with the library in maven (see my details above). If I useorg.apache.kafka.common.serialization.ByteArraySerializer, this is where I think something is wrong. Basically, since I have data converted in avro, can I sent it directly?
– Mihir
Mar 8 at 13:33
Hi @Mihir I have created Producer example with avro schema input. I have provided link above. Please update avsc file according to your requirement.This would be the only change you need to make i believe.
– KZapagol
Mar 8 at 14:33
Hi @KZapagol: Thanks for the input, sorry to come back late, was working on some other things. I am updating my original question with the issue I am facing. I tried to use your example.
– Mihir
yesterday
|
show 2 more comments
It could be simpler than it seems. The send method is asynchronous, it returns a Future<RecordMetadata>. Your example exits before the message is actually sent.
Kafka producer is batching messages in the background, so to ensure the messages are sent you should either block with e.g. Future.get (this means waiting for broker to respond with metadata) or ensure buffers are flushed with kafkaProducer.flush().
In tests I recommend to block on Future.
Thanks for the response. I am pretty new to Scala, so let me search about the Future.
– Mihir
Mar 6 at 19:33
I tried the Future. See update in the original question, doesn't work.
– Mihir
Mar 6 at 20:37
Was the message delivered to the Kafka topic? It should be now.
– tgrez
Mar 7 at 10:23
As per my latest updates (see original questions), blocking with Future is definitely helping. However the issue is not resolved, as messages were not delivered to Kafka. I had to change the serializtion (see UPDATE2 above) and then only it works. So not sure what is the issue in serializing the avro record.
– Mihir
Mar 8 at 3:39
I usually implemented my own KafkaAvroSerializer in such cases, maybe you can try to change SpecificDatumWriter to GenericDatumWriter in your code? see also: sderosiaux.com/articles/2017/03/02/…
– tgrez
Mar 8 at 10:31
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55030128%2fscala-unable-to-send-message-to-kafka-hosted-on-remote-server%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
Please update your code as below and try once. It looks like you have not closed the output stream,encoder and producer properly.
val producer = new KafkaProducer[String, Array[Byte]](configuration)
val writer = new SpecificDatumWriter[GenericRecord](schema)
val out = new ByteArrayOutputStream()
val encoder = EncoderFactory.get.binaryEncoder(out, null)
writer.write(avroRecord, encoder)
val serializedBytes: Array[Byte] = out.toByteArray()
encoder.flush()
out.close()
val recordToSend = new ProducerRecord[String, Array[Byte]](topic, serializedBytes)
producer.send(recordToSend,new ProducerCallback)
//flush data
producer.flush()
//flush and close producer
producer.close()
class ProducerCallback(implicit logger: Logger) extends Callback
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit =
//executes every time a record is successfully sent or exception thrown
Option(metadata) match
case Some(_) =>
logger.info("Received new metadata. n" +
"Topic: " + metadata.topic() + "n" +
"Partition: " + metadata.partition() + "n" +
"Offset: " + metadata.offset() + "n" +
"Timestamp: " + metadata.timestamp() + "n" +
"Checksum: " + metadata.checksum())
case None => ;
Option(exception) match
case Some(_) =>
logger.error("Exception thrown during processing of record... " + exception)
throw exception
case None => ;
Please refer link https://github.com/Zapagol/apache-kafka/tree/master/src/main/scala/com/org/apache for more kafka producer and consumer examples. Hope it will help!
Update
I have added KafkaProducer example for Avroschema input. Please refer https://github.com/Zapagol/apache-kafka/blob/master/src/main/scala/com/org/apache/producers/ProducerForAvroschema.scala .
I have used apache avro jar and sample avsc file as below. Please modify schema file according to your requirement.And I am able to produce record successfully.
"type": "record",
"name": "employee",
"fields": [
"name": "name", "type": "string",
"name": "id", "type": "int",
"name": "mobileNumber", "type": ["string", "null"],
"name": "salary", "type": ["int", "null"]
]
Thanks for the input. I tried the same, however it was still not able to send the message. I wanted to check if my serialization is happening correctly or not. I tried to download the io.confluent JARs and used "io.confluent.kafka.serializers.KafkaAvroSerializer" as my Value seriliazer. Basically my record:val recordToSend = new ProducerRecord[String, GenericData.Record](topic, avroRecord)For this, only serializer I found is from io.confluent. Is there any other way to use the above? i.e.I have avro record (GenericRecord) and send the same format to Kafka? No bytesArray.
– Mihir
Mar 8 at 3:25
If you have avsc file which defines schema for your data then you can useorg.apache.avro.generic.GenericDataandorg.apache.avro.generic.GenericData.Recordor else you can useavro4gjar.
– KZapagol
Mar 8 at 5:17
Hi @KZapagol, thanks. Yes, I have avsc file.My avro record is created as per that.val avroRecord = new GenericData.Record(schema)I wanted to know after this, when we send the data to Kafka, what should be the Value serializer (props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")). This works but I have issues with the library in maven (see my details above). If I useorg.apache.kafka.common.serialization.ByteArraySerializer, this is where I think something is wrong. Basically, since I have data converted in avro, can I sent it directly?
– Mihir
Mar 8 at 13:33
Hi @Mihir I have created Producer example with avro schema input. I have provided link above. Please update avsc file according to your requirement.This would be the only change you need to make i believe.
– KZapagol
Mar 8 at 14:33
Hi @KZapagol: Thanks for the input, sorry to come back late, was working on some other things. I am updating my original question with the issue I am facing. I tried to use your example.
– Mihir
yesterday
|
show 2 more comments
Please update your code as below and try once. It looks like you have not closed the output stream,encoder and producer properly.
val producer = new KafkaProducer[String, Array[Byte]](configuration)
val writer = new SpecificDatumWriter[GenericRecord](schema)
val out = new ByteArrayOutputStream()
val encoder = EncoderFactory.get.binaryEncoder(out, null)
writer.write(avroRecord, encoder)
val serializedBytes: Array[Byte] = out.toByteArray()
encoder.flush()
out.close()
val recordToSend = new ProducerRecord[String, Array[Byte]](topic, serializedBytes)
producer.send(recordToSend,new ProducerCallback)
//flush data
producer.flush()
//flush and close producer
producer.close()
class ProducerCallback(implicit logger: Logger) extends Callback
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit =
//executes every time a record is successfully sent or exception thrown
Option(metadata) match
case Some(_) =>
logger.info("Received new metadata. n" +
"Topic: " + metadata.topic() + "n" +
"Partition: " + metadata.partition() + "n" +
"Offset: " + metadata.offset() + "n" +
"Timestamp: " + metadata.timestamp() + "n" +
"Checksum: " + metadata.checksum())
case None => ;
Option(exception) match
case Some(_) =>
logger.error("Exception thrown during processing of record... " + exception)
throw exception
case None => ;
Please refer link https://github.com/Zapagol/apache-kafka/tree/master/src/main/scala/com/org/apache for more kafka producer and consumer examples. Hope it will help!
Update
I have added KafkaProducer example for Avroschema input. Please refer https://github.com/Zapagol/apache-kafka/blob/master/src/main/scala/com/org/apache/producers/ProducerForAvroschema.scala .
I have used apache avro jar and sample avsc file as below. Please modify schema file according to your requirement.And I am able to produce record successfully.
"type": "record",
"name": "employee",
"fields": [
"name": "name", "type": "string",
"name": "id", "type": "int",
"name": "mobileNumber", "type": ["string", "null"],
"name": "salary", "type": ["int", "null"]
]
Thanks for the input. I tried the same, however it was still not able to send the message. I wanted to check if my serialization is happening correctly or not. I tried to download the io.confluent JARs and used "io.confluent.kafka.serializers.KafkaAvroSerializer" as my Value seriliazer. Basically my record:val recordToSend = new ProducerRecord[String, GenericData.Record](topic, avroRecord)For this, only serializer I found is from io.confluent. Is there any other way to use the above? i.e.I have avro record (GenericRecord) and send the same format to Kafka? No bytesArray.
– Mihir
Mar 8 at 3:25
If you have avsc file which defines schema for your data then you can useorg.apache.avro.generic.GenericDataandorg.apache.avro.generic.GenericData.Recordor else you can useavro4gjar.
– KZapagol
Mar 8 at 5:17
Hi @KZapagol, thanks. Yes, I have avsc file.My avro record is created as per that.val avroRecord = new GenericData.Record(schema)I wanted to know after this, when we send the data to Kafka, what should be the Value serializer (props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")). This works but I have issues with the library in maven (see my details above). If I useorg.apache.kafka.common.serialization.ByteArraySerializer, this is where I think something is wrong. Basically, since I have data converted in avro, can I sent it directly?
– Mihir
Mar 8 at 13:33
Hi @Mihir I have created Producer example with avro schema input. I have provided link above. Please update avsc file according to your requirement.This would be the only change you need to make i believe.
– KZapagol
Mar 8 at 14:33
Hi @KZapagol: Thanks for the input, sorry to come back late, was working on some other things. I am updating my original question with the issue I am facing. I tried to use your example.
– Mihir
yesterday
|
show 2 more comments
Please update your code as below and try once. It looks like you have not closed the output stream,encoder and producer properly.
val producer = new KafkaProducer[String, Array[Byte]](configuration)
val writer = new SpecificDatumWriter[GenericRecord](schema)
val out = new ByteArrayOutputStream()
val encoder = EncoderFactory.get.binaryEncoder(out, null)
writer.write(avroRecord, encoder)
val serializedBytes: Array[Byte] = out.toByteArray()
encoder.flush()
out.close()
val recordToSend = new ProducerRecord[String, Array[Byte]](topic, serializedBytes)
producer.send(recordToSend,new ProducerCallback)
//flush data
producer.flush()
//flush and close producer
producer.close()
class ProducerCallback(implicit logger: Logger) extends Callback
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit =
//executes every time a record is successfully sent or exception thrown
Option(metadata) match
case Some(_) =>
logger.info("Received new metadata. n" +
"Topic: " + metadata.topic() + "n" +
"Partition: " + metadata.partition() + "n" +
"Offset: " + metadata.offset() + "n" +
"Timestamp: " + metadata.timestamp() + "n" +
"Checksum: " + metadata.checksum())
case None => ;
Option(exception) match
case Some(_) =>
logger.error("Exception thrown during processing of record... " + exception)
throw exception
case None => ;
Please refer link https://github.com/Zapagol/apache-kafka/tree/master/src/main/scala/com/org/apache for more kafka producer and consumer examples. Hope it will help!
Update
I have added KafkaProducer example for Avroschema input. Please refer https://github.com/Zapagol/apache-kafka/blob/master/src/main/scala/com/org/apache/producers/ProducerForAvroschema.scala .
I have used apache avro jar and sample avsc file as below. Please modify schema file according to your requirement.And I am able to produce record successfully.
"type": "record",
"name": "employee",
"fields": [
"name": "name", "type": "string",
"name": "id", "type": "int",
"name": "mobileNumber", "type": ["string", "null"],
"name": "salary", "type": ["int", "null"]
]
Please update your code as below and try once. It looks like you have not closed the output stream,encoder and producer properly.
val producer = new KafkaProducer[String, Array[Byte]](configuration)
val writer = new SpecificDatumWriter[GenericRecord](schema)
val out = new ByteArrayOutputStream()
val encoder = EncoderFactory.get.binaryEncoder(out, null)
writer.write(avroRecord, encoder)
val serializedBytes: Array[Byte] = out.toByteArray()
encoder.flush()
out.close()
val recordToSend = new ProducerRecord[String, Array[Byte]](topic, serializedBytes)
producer.send(recordToSend,new ProducerCallback)
//flush data
producer.flush()
//flush and close producer
producer.close()
class ProducerCallback(implicit logger: Logger) extends Callback
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit =
//executes every time a record is successfully sent or exception thrown
Option(metadata) match
case Some(_) =>
logger.info("Received new metadata. n" +
"Topic: " + metadata.topic() + "n" +
"Partition: " + metadata.partition() + "n" +
"Offset: " + metadata.offset() + "n" +
"Timestamp: " + metadata.timestamp() + "n" +
"Checksum: " + metadata.checksum())
case None => ;
Option(exception) match
case Some(_) =>
logger.error("Exception thrown during processing of record... " + exception)
throw exception
case None => ;
Please refer link https://github.com/Zapagol/apache-kafka/tree/master/src/main/scala/com/org/apache for more kafka producer and consumer examples. Hope it will help!
Update
I have added KafkaProducer example for Avroschema input. Please refer https://github.com/Zapagol/apache-kafka/blob/master/src/main/scala/com/org/apache/producers/ProducerForAvroschema.scala .
I have used apache avro jar and sample avsc file as below. Please modify schema file according to your requirement.And I am able to produce record successfully.
"type": "record",
"name": "employee",
"fields": [
"name": "name", "type": "string",
"name": "id", "type": "int",
"name": "mobileNumber", "type": ["string", "null"],
"name": "salary", "type": ["int", "null"]
]
edited Mar 8 at 14:29
answered Mar 7 at 12:28
KZapagolKZapagol
2666
2666
Thanks for the input. I tried the same, however it was still not able to send the message. I wanted to check if my serialization is happening correctly or not. I tried to download the io.confluent JARs and used "io.confluent.kafka.serializers.KafkaAvroSerializer" as my Value seriliazer. Basically my record:val recordToSend = new ProducerRecord[String, GenericData.Record](topic, avroRecord)For this, only serializer I found is from io.confluent. Is there any other way to use the above? i.e.I have avro record (GenericRecord) and send the same format to Kafka? No bytesArray.
– Mihir
Mar 8 at 3:25
If you have avsc file which defines schema for your data then you can useorg.apache.avro.generic.GenericDataandorg.apache.avro.generic.GenericData.Recordor else you can useavro4gjar.
– KZapagol
Mar 8 at 5:17
Hi @KZapagol, thanks. Yes, I have avsc file.My avro record is created as per that.val avroRecord = new GenericData.Record(schema)I wanted to know after this, when we send the data to Kafka, what should be the Value serializer (props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")). This works but I have issues with the library in maven (see my details above). If I useorg.apache.kafka.common.serialization.ByteArraySerializer, this is where I think something is wrong. Basically, since I have data converted in avro, can I sent it directly?
– Mihir
Mar 8 at 13:33
Hi @Mihir I have created Producer example with avro schema input. I have provided link above. Please update avsc file according to your requirement.This would be the only change you need to make i believe.
– KZapagol
Mar 8 at 14:33
Hi @KZapagol: Thanks for the input, sorry to come back late, was working on some other things. I am updating my original question with the issue I am facing. I tried to use your example.
– Mihir
yesterday
|
show 2 more comments
Thanks for the input. I tried the same, however it was still not able to send the message. I wanted to check if my serialization is happening correctly or not. I tried to download the io.confluent JARs and used "io.confluent.kafka.serializers.KafkaAvroSerializer" as my Value seriliazer. Basically my record:val recordToSend = new ProducerRecord[String, GenericData.Record](topic, avroRecord)For this, only serializer I found is from io.confluent. Is there any other way to use the above? i.e.I have avro record (GenericRecord) and send the same format to Kafka? No bytesArray.
– Mihir
Mar 8 at 3:25
If you have avsc file which defines schema for your data then you can useorg.apache.avro.generic.GenericDataandorg.apache.avro.generic.GenericData.Recordor else you can useavro4gjar.
– KZapagol
Mar 8 at 5:17
Hi @KZapagol, thanks. Yes, I have avsc file.My avro record is created as per that.val avroRecord = new GenericData.Record(schema)I wanted to know after this, when we send the data to Kafka, what should be the Value serializer (props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")). This works but I have issues with the library in maven (see my details above). If I useorg.apache.kafka.common.serialization.ByteArraySerializer, this is where I think something is wrong. Basically, since I have data converted in avro, can I sent it directly?
– Mihir
Mar 8 at 13:33
Hi @Mihir I have created Producer example with avro schema input. I have provided link above. Please update avsc file according to your requirement.This would be the only change you need to make i believe.
– KZapagol
Mar 8 at 14:33
Hi @KZapagol: Thanks for the input, sorry to come back late, was working on some other things. I am updating my original question with the issue I am facing. I tried to use your example.
– Mihir
yesterday
Thanks for the input. I tried the same, however it was still not able to send the message. I wanted to check if my serialization is happening correctly or not. I tried to download the io.confluent JARs and used "io.confluent.kafka.serializers.KafkaAvroSerializer" as my Value seriliazer. Basically my record:
val recordToSend = new ProducerRecord[String, GenericData.Record](topic, avroRecord) For this, only serializer I found is from io.confluent. Is there any other way to use the above? i.e.I have avro record (GenericRecord) and send the same format to Kafka? No bytesArray.– Mihir
Mar 8 at 3:25
Thanks for the input. I tried the same, however it was still not able to send the message. I wanted to check if my serialization is happening correctly or not. I tried to download the io.confluent JARs and used "io.confluent.kafka.serializers.KafkaAvroSerializer" as my Value seriliazer. Basically my record:
val recordToSend = new ProducerRecord[String, GenericData.Record](topic, avroRecord) For this, only serializer I found is from io.confluent. Is there any other way to use the above? i.e.I have avro record (GenericRecord) and send the same format to Kafka? No bytesArray.– Mihir
Mar 8 at 3:25
If you have avsc file which defines schema for your data then you can use
org.apache.avro.generic.GenericData and org.apache.avro.generic.GenericData.Record or else you can use avro4g jar.– KZapagol
Mar 8 at 5:17
If you have avsc file which defines schema for your data then you can use
org.apache.avro.generic.GenericData and org.apache.avro.generic.GenericData.Record or else you can use avro4g jar.– KZapagol
Mar 8 at 5:17
Hi @KZapagol, thanks. Yes, I have avsc file.My avro record is created as per that.
val avroRecord = new GenericData.Record(schema) I wanted to know after this, when we send the data to Kafka, what should be the Value serializer (props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")). This works but I have issues with the library in maven (see my details above). If I use org.apache.kafka.common.serialization.ByteArraySerializer, this is where I think something is wrong. Basically, since I have data converted in avro, can I sent it directly?– Mihir
Mar 8 at 13:33
Hi @KZapagol, thanks. Yes, I have avsc file.My avro record is created as per that.
val avroRecord = new GenericData.Record(schema) I wanted to know after this, when we send the data to Kafka, what should be the Value serializer (props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")). This works but I have issues with the library in maven (see my details above). If I use org.apache.kafka.common.serialization.ByteArraySerializer, this is where I think something is wrong. Basically, since I have data converted in avro, can I sent it directly?– Mihir
Mar 8 at 13:33
Hi @Mihir I have created Producer example with avro schema input. I have provided link above. Please update avsc file according to your requirement.This would be the only change you need to make i believe.
– KZapagol
Mar 8 at 14:33
Hi @Mihir I have created Producer example with avro schema input. I have provided link above. Please update avsc file according to your requirement.This would be the only change you need to make i believe.
– KZapagol
Mar 8 at 14:33
Hi @KZapagol: Thanks for the input, sorry to come back late, was working on some other things. I am updating my original question with the issue I am facing. I tried to use your example.
– Mihir
yesterday
Hi @KZapagol: Thanks for the input, sorry to come back late, was working on some other things. I am updating my original question with the issue I am facing. I tried to use your example.
– Mihir
yesterday
|
show 2 more comments
It could be simpler than it seems. The send method is asynchronous, it returns a Future<RecordMetadata>. Your example exits before the message is actually sent.
Kafka producer is batching messages in the background, so to ensure the messages are sent you should either block with e.g. Future.get (this means waiting for broker to respond with metadata) or ensure buffers are flushed with kafkaProducer.flush().
In tests I recommend to block on Future.
Thanks for the response. I am pretty new to Scala, so let me search about the Future.
– Mihir
Mar 6 at 19:33
I tried the Future. See update in the original question, doesn't work.
– Mihir
Mar 6 at 20:37
Was the message delivered to the Kafka topic? It should be now.
– tgrez
Mar 7 at 10:23
As per my latest updates (see original questions), blocking with Future is definitely helping. However the issue is not resolved, as messages were not delivered to Kafka. I had to change the serializtion (see UPDATE2 above) and then only it works. So not sure what is the issue in serializing the avro record.
– Mihir
Mar 8 at 3:39
I usually implemented my own KafkaAvroSerializer in such cases, maybe you can try to change SpecificDatumWriter to GenericDatumWriter in your code? see also: sderosiaux.com/articles/2017/03/02/…
– tgrez
Mar 8 at 10:31
add a comment |
It could be simpler than it seems. The send method is asynchronous, it returns a Future<RecordMetadata>. Your example exits before the message is actually sent.
Kafka producer is batching messages in the background, so to ensure the messages are sent you should either block with e.g. Future.get (this means waiting for broker to respond with metadata) or ensure buffers are flushed with kafkaProducer.flush().
In tests I recommend to block on Future.
Thanks for the response. I am pretty new to Scala, so let me search about the Future.
– Mihir
Mar 6 at 19:33
I tried the Future. See update in the original question, doesn't work.
– Mihir
Mar 6 at 20:37
Was the message delivered to the Kafka topic? It should be now.
– tgrez
Mar 7 at 10:23
As per my latest updates (see original questions), blocking with Future is definitely helping. However the issue is not resolved, as messages were not delivered to Kafka. I had to change the serializtion (see UPDATE2 above) and then only it works. So not sure what is the issue in serializing the avro record.
– Mihir
Mar 8 at 3:39
I usually implemented my own KafkaAvroSerializer in such cases, maybe you can try to change SpecificDatumWriter to GenericDatumWriter in your code? see also: sderosiaux.com/articles/2017/03/02/…
– tgrez
Mar 8 at 10:31
add a comment |
It could be simpler than it seems. The send method is asynchronous, it returns a Future<RecordMetadata>. Your example exits before the message is actually sent.
Kafka producer is batching messages in the background, so to ensure the messages are sent you should either block with e.g. Future.get (this means waiting for broker to respond with metadata) or ensure buffers are flushed with kafkaProducer.flush().
In tests I recommend to block on Future.
It could be simpler than it seems. The send method is asynchronous, it returns a Future<RecordMetadata>. Your example exits before the message is actually sent.
Kafka producer is batching messages in the background, so to ensure the messages are sent you should either block with e.g. Future.get (this means waiting for broker to respond with metadata) or ensure buffers are flushed with kafkaProducer.flush().
In tests I recommend to block on Future.
answered Mar 6 at 19:08
tgreztgrez
1918
1918
Thanks for the response. I am pretty new to Scala, so let me search about the Future.
– Mihir
Mar 6 at 19:33
I tried the Future. See update in the original question, doesn't work.
– Mihir
Mar 6 at 20:37
Was the message delivered to the Kafka topic? It should be now.
– tgrez
Mar 7 at 10:23
As per my latest updates (see original questions), blocking with Future is definitely helping. However the issue is not resolved, as messages were not delivered to Kafka. I had to change the serializtion (see UPDATE2 above) and then only it works. So not sure what is the issue in serializing the avro record.
– Mihir
Mar 8 at 3:39
I usually implemented my own KafkaAvroSerializer in such cases, maybe you can try to change SpecificDatumWriter to GenericDatumWriter in your code? see also: sderosiaux.com/articles/2017/03/02/…
– tgrez
Mar 8 at 10:31
add a comment |
Thanks for the response. I am pretty new to Scala, so let me search about the Future.
– Mihir
Mar 6 at 19:33
I tried the Future. See update in the original question, doesn't work.
– Mihir
Mar 6 at 20:37
Was the message delivered to the Kafka topic? It should be now.
– tgrez
Mar 7 at 10:23
As per my latest updates (see original questions), blocking with Future is definitely helping. However the issue is not resolved, as messages were not delivered to Kafka. I had to change the serializtion (see UPDATE2 above) and then only it works. So not sure what is the issue in serializing the avro record.
– Mihir
Mar 8 at 3:39
I usually implemented my own KafkaAvroSerializer in such cases, maybe you can try to change SpecificDatumWriter to GenericDatumWriter in your code? see also: sderosiaux.com/articles/2017/03/02/…
– tgrez
Mar 8 at 10:31
Thanks for the response. I am pretty new to Scala, so let me search about the Future.
– Mihir
Mar 6 at 19:33
Thanks for the response. I am pretty new to Scala, so let me search about the Future.
– Mihir
Mar 6 at 19:33
I tried the Future. See update in the original question, doesn't work.
– Mihir
Mar 6 at 20:37
I tried the Future. See update in the original question, doesn't work.
– Mihir
Mar 6 at 20:37
Was the message delivered to the Kafka topic? It should be now.
– tgrez
Mar 7 at 10:23
Was the message delivered to the Kafka topic? It should be now.
– tgrez
Mar 7 at 10:23
As per my latest updates (see original questions), blocking with Future is definitely helping. However the issue is not resolved, as messages were not delivered to Kafka. I had to change the serializtion (see UPDATE2 above) and then only it works. So not sure what is the issue in serializing the avro record.
– Mihir
Mar 8 at 3:39
As per my latest updates (see original questions), blocking with Future is definitely helping. However the issue is not resolved, as messages were not delivered to Kafka. I had to change the serializtion (see UPDATE2 above) and then only it works. So not sure what is the issue in serializing the avro record.
– Mihir
Mar 8 at 3:39
I usually implemented my own KafkaAvroSerializer in such cases, maybe you can try to change SpecificDatumWriter to GenericDatumWriter in your code? see also: sderosiaux.com/articles/2017/03/02/…
– tgrez
Mar 8 at 10:31
I usually implemented my own KafkaAvroSerializer in such cases, maybe you can try to change SpecificDatumWriter to GenericDatumWriter in your code? see also: sderosiaux.com/articles/2017/03/02/…
– tgrez
Mar 8 at 10:31
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55030128%2fscala-unable-to-send-message-to-kafka-hosted-on-remote-server%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
confluent repo is available in maven, but is not browsable in browser, see: stackoverflow.com/questions/43488853/…
– tgrez
Mar 8 at 14:34