How to know the name of Kafka consumer group that streaming query uses for Kafka data source?Getting Kafka usage detailsKafka does not create consumer group when reading from single partitionKafka Consumer Group Partition issueHow to set offset committed by the consumer group using Spark's Direct Stream for Kafka?Spark Streaming from Kafka Source Go Back to a Checkpoint or RewindingSpark-kafka streaming multiple consumer group for same topic does not workException in thread “main” org.apache.spark.sql.AnalysisException:Stream data using Spark from a partiticular partition within Kafka topicsKafka manage offset between consumer groupHow many Kafka consumers does a streaming query use for execution?Kafka consumer group and partitions with Spark structured streamingKafka does not create consumer group when reading from single partition

How do we know the LHC results are robust?

How does it work when somebody invests in my business?

Was a professor correct to chastise me for writing "Prof. X" rather than "Professor X"?

How did Doctor Strange see the winning outcome in Avengers: Infinity War?

How long to clear the 'suck zone' of a turbofan after start is initiated?

A problem in Probability theory

What grammatical function is や performing here?

Was Spock the First Vulcan in Starfleet?

Do sorcerers' Subtle Spells require a skill check to be unseen?

Opposite of a diet

Is the destination of a commercial flight important for the pilot?

Energy of the particles in the particle accelerator

Unreliable Magic - Is it worth it?

Two monoidal structures and copowering

India just shot down a satellite from the ground. At what altitude range is the resulting debris field?

Why not increase contact surface when reentering the atmosphere?

What is the best translation for "slot" in the context of multiplayer video games?

Large drywall patch supports

Lay out the Carpet

How do I rename a Linux host without needing to reboot for the rename to take effect?

Inappropriate reference requests from Journal reviewers

Hostile work environment after whistle-blowing on coworker and our boss. What do I do?

when is out of tune ok?

Where does the Z80 processor start executing from?



How to know the name of Kafka consumer group that streaming query uses for Kafka data source?


Getting Kafka usage detailsKafka does not create consumer group when reading from single partitionKafka Consumer Group Partition issueHow to set offset committed by the consumer group using Spark's Direct Stream for Kafka?Spark Streaming from Kafka Source Go Back to a Checkpoint or RewindingSpark-kafka streaming multiple consumer group for same topic does not workException in thread “main” org.apache.spark.sql.AnalysisException:Stream data using Spark from a partiticular partition within Kafka topicsKafka manage offset between consumer groupHow many Kafka consumers does a streaming query use for execution?Kafka consumer group and partitions with Spark structured streamingKafka does not create consumer group when reading from single partition













1















I am consuming data from kafka topic through spark structured streaming, the topic has 3 partitions. As Spark structured streaming does not allow you to explicitly provide group.id and assigns some random id to consumer, I tried to check the consumer group id's using below kafka command



./kafka-consumer-groups.sh --bootstrap-server kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092 --list

output
spark-kafka-source-054e8dac-bea9-46e8-9374-8298daafcd23--1587684247-driver-0
spark-kafka-source-756c08e8-6a84-447c-8326-5af1ef0412f5-209267112-driver-0
spark-kafka-source-9528b191-4322-4334-923d-8c1500ef8194-2006218471-driver-0


Below are my questions



1) Why does it create 3 consumer groups? Is it because of 3 partitions?



2) Is there any way I can get these consumer group names in spark application?



3) Even though my spark application was still running, after some time these group names didn't show up in consumer groups list. Is this because all the data was consumed by spark application and there was no more data in that kafka topic?



4) If my assumption is right about point 3, will it create new consumer group id if new data arrives or the name of consumer group will remain same?



Below is my read stream



 val inputDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topic)
// .option("assign"," ""+topic+"":[0]")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 60000)
.load()


I have 3 writestreams in application as below



 val df = inputDf.selectExpr("CAST(value AS STRING)","CAST(topic AS STRING)","CAST (partition AS INT)","CAST (offset AS INT)","CAST (timestamp AS STRING)") 
val df1 = inputDf.selectExpr("CAST (partition AS INT)","CAST (offset AS INT)","CAST (timestamp AS STRING)")

//First stream
val checkpoint_loc1= "/warehouse/test_duplicate/download/chk1"
df1.agg(min("offset"), max("offset"))
.writeStream
.foreach(writer)
.outputMode("complete")
.option("checkpointLocation", checkpoint_loc1).start()
val result = df.select(
df1("result").getItem("_1").as("col1"),
df1("result").getItem("_2").as("col2"),
df1("result").getItem("_5").as("eventdate"))
val distDates = result.select(result("eventdate")).distinct

//Second stream
val checkpoint_loc2= "/warehouse/test_duplicate/download/chk2"
distDates.writeStream.foreach(writer1)
.option("checkpointLocation", checkpoint_loc2).start()

//Third stream
val kafkaOutput =result.writeStream
.outputMode("append")
.format("orc")
.option("path",data_dir)
.option("checkpointLocation", checkpoint_loc3)
.start()


The streaming query is used only once in the code and there are no joins.



Execution plan



== Parsed Logical Plan ==
StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]

== Analyzed Logical Plan ==
key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int
StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]

== Optimized Logical Plan ==
StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]

== Physical Plan ==
StreamingRelation kafka, [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]









share|improve this question




























    1















    I am consuming data from kafka topic through spark structured streaming, the topic has 3 partitions. As Spark structured streaming does not allow you to explicitly provide group.id and assigns some random id to consumer, I tried to check the consumer group id's using below kafka command



    ./kafka-consumer-groups.sh --bootstrap-server kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092 --list

    output
    spark-kafka-source-054e8dac-bea9-46e8-9374-8298daafcd23--1587684247-driver-0
    spark-kafka-source-756c08e8-6a84-447c-8326-5af1ef0412f5-209267112-driver-0
    spark-kafka-source-9528b191-4322-4334-923d-8c1500ef8194-2006218471-driver-0


    Below are my questions



    1) Why does it create 3 consumer groups? Is it because of 3 partitions?



    2) Is there any way I can get these consumer group names in spark application?



    3) Even though my spark application was still running, after some time these group names didn't show up in consumer groups list. Is this because all the data was consumed by spark application and there was no more data in that kafka topic?



    4) If my assumption is right about point 3, will it create new consumer group id if new data arrives or the name of consumer group will remain same?



    Below is my read stream



     val inputDf = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", brokers)
    .option("subscribe", topic)
    // .option("assign"," ""+topic+"":[0]")
    .option("startingOffsets", "earliest")
    .option("maxOffsetsPerTrigger", 60000)
    .load()


    I have 3 writestreams in application as below



     val df = inputDf.selectExpr("CAST(value AS STRING)","CAST(topic AS STRING)","CAST (partition AS INT)","CAST (offset AS INT)","CAST (timestamp AS STRING)") 
    val df1 = inputDf.selectExpr("CAST (partition AS INT)","CAST (offset AS INT)","CAST (timestamp AS STRING)")

    //First stream
    val checkpoint_loc1= "/warehouse/test_duplicate/download/chk1"
    df1.agg(min("offset"), max("offset"))
    .writeStream
    .foreach(writer)
    .outputMode("complete")
    .option("checkpointLocation", checkpoint_loc1).start()
    val result = df.select(
    df1("result").getItem("_1").as("col1"),
    df1("result").getItem("_2").as("col2"),
    df1("result").getItem("_5").as("eventdate"))
    val distDates = result.select(result("eventdate")).distinct

    //Second stream
    val checkpoint_loc2= "/warehouse/test_duplicate/download/chk2"
    distDates.writeStream.foreach(writer1)
    .option("checkpointLocation", checkpoint_loc2).start()

    //Third stream
    val kafkaOutput =result.writeStream
    .outputMode("append")
    .format("orc")
    .option("path",data_dir)
    .option("checkpointLocation", checkpoint_loc3)
    .start()


    The streaming query is used only once in the code and there are no joins.



    Execution plan



    == Parsed Logical Plan ==
    StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]

    == Analyzed Logical Plan ==
    key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int
    StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]

    == Optimized Logical Plan ==
    StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]

    == Physical Plan ==
    StreamingRelation kafka, [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]









    share|improve this question


























      1












      1








      1








      I am consuming data from kafka topic through spark structured streaming, the topic has 3 partitions. As Spark structured streaming does not allow you to explicitly provide group.id and assigns some random id to consumer, I tried to check the consumer group id's using below kafka command



      ./kafka-consumer-groups.sh --bootstrap-server kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092 --list

      output
      spark-kafka-source-054e8dac-bea9-46e8-9374-8298daafcd23--1587684247-driver-0
      spark-kafka-source-756c08e8-6a84-447c-8326-5af1ef0412f5-209267112-driver-0
      spark-kafka-source-9528b191-4322-4334-923d-8c1500ef8194-2006218471-driver-0


      Below are my questions



      1) Why does it create 3 consumer groups? Is it because of 3 partitions?



      2) Is there any way I can get these consumer group names in spark application?



      3) Even though my spark application was still running, after some time these group names didn't show up in consumer groups list. Is this because all the data was consumed by spark application and there was no more data in that kafka topic?



      4) If my assumption is right about point 3, will it create new consumer group id if new data arrives or the name of consumer group will remain same?



      Below is my read stream



       val inputDf = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokers)
      .option("subscribe", topic)
      // .option("assign"," ""+topic+"":[0]")
      .option("startingOffsets", "earliest")
      .option("maxOffsetsPerTrigger", 60000)
      .load()


      I have 3 writestreams in application as below



       val df = inputDf.selectExpr("CAST(value AS STRING)","CAST(topic AS STRING)","CAST (partition AS INT)","CAST (offset AS INT)","CAST (timestamp AS STRING)") 
      val df1 = inputDf.selectExpr("CAST (partition AS INT)","CAST (offset AS INT)","CAST (timestamp AS STRING)")

      //First stream
      val checkpoint_loc1= "/warehouse/test_duplicate/download/chk1"
      df1.agg(min("offset"), max("offset"))
      .writeStream
      .foreach(writer)
      .outputMode("complete")
      .option("checkpointLocation", checkpoint_loc1).start()
      val result = df.select(
      df1("result").getItem("_1").as("col1"),
      df1("result").getItem("_2").as("col2"),
      df1("result").getItem("_5").as("eventdate"))
      val distDates = result.select(result("eventdate")).distinct

      //Second stream
      val checkpoint_loc2= "/warehouse/test_duplicate/download/chk2"
      distDates.writeStream.foreach(writer1)
      .option("checkpointLocation", checkpoint_loc2).start()

      //Third stream
      val kafkaOutput =result.writeStream
      .outputMode("append")
      .format("orc")
      .option("path",data_dir)
      .option("checkpointLocation", checkpoint_loc3)
      .start()


      The streaming query is used only once in the code and there are no joins.



      Execution plan



      == Parsed Logical Plan ==
      StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]

      == Analyzed Logical Plan ==
      key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int
      StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]

      == Optimized Logical Plan ==
      StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]

      == Physical Plan ==
      StreamingRelation kafka, [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]









      share|improve this question
















      I am consuming data from kafka topic through spark structured streaming, the topic has 3 partitions. As Spark structured streaming does not allow you to explicitly provide group.id and assigns some random id to consumer, I tried to check the consumer group id's using below kafka command



      ./kafka-consumer-groups.sh --bootstrap-server kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092 --list

      output
      spark-kafka-source-054e8dac-bea9-46e8-9374-8298daafcd23--1587684247-driver-0
      spark-kafka-source-756c08e8-6a84-447c-8326-5af1ef0412f5-209267112-driver-0
      spark-kafka-source-9528b191-4322-4334-923d-8c1500ef8194-2006218471-driver-0


      Below are my questions



      1) Why does it create 3 consumer groups? Is it because of 3 partitions?



      2) Is there any way I can get these consumer group names in spark application?



      3) Even though my spark application was still running, after some time these group names didn't show up in consumer groups list. Is this because all the data was consumed by spark application and there was no more data in that kafka topic?



      4) If my assumption is right about point 3, will it create new consumer group id if new data arrives or the name of consumer group will remain same?



      Below is my read stream



       val inputDf = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokers)
      .option("subscribe", topic)
      // .option("assign"," ""+topic+"":[0]")
      .option("startingOffsets", "earliest")
      .option("maxOffsetsPerTrigger", 60000)
      .load()


      I have 3 writestreams in application as below



       val df = inputDf.selectExpr("CAST(value AS STRING)","CAST(topic AS STRING)","CAST (partition AS INT)","CAST (offset AS INT)","CAST (timestamp AS STRING)") 
      val df1 = inputDf.selectExpr("CAST (partition AS INT)","CAST (offset AS INT)","CAST (timestamp AS STRING)")

      //First stream
      val checkpoint_loc1= "/warehouse/test_duplicate/download/chk1"
      df1.agg(min("offset"), max("offset"))
      .writeStream
      .foreach(writer)
      .outputMode("complete")
      .option("checkpointLocation", checkpoint_loc1).start()
      val result = df.select(
      df1("result").getItem("_1").as("col1"),
      df1("result").getItem("_2").as("col2"),
      df1("result").getItem("_5").as("eventdate"))
      val distDates = result.select(result("eventdate")).distinct

      //Second stream
      val checkpoint_loc2= "/warehouse/test_duplicate/download/chk2"
      distDates.writeStream.foreach(writer1)
      .option("checkpointLocation", checkpoint_loc2).start()

      //Third stream
      val kafkaOutput =result.writeStream
      .outputMode("append")
      .format("orc")
      .option("path",data_dir)
      .option("checkpointLocation", checkpoint_loc3)
      .start()


      The streaming query is used only once in the code and there are no joins.



      Execution plan



      == Parsed Logical Plan ==
      StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]

      == Analyzed Logical Plan ==
      key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int
      StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]

      == Optimized Logical Plan ==
      StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]

      == Physical Plan ==
      StreamingRelation kafka, [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]






      apache-spark apache-kafka spark-structured-streaming






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Mar 8 at 14:26







      codergirl

















      asked Feb 28 at 11:27









      codergirlcodergirl

      1792419




      1792419






















          1 Answer
          1






          active

          oldest

          votes


















          0















          1) Why does it create 3 consumer groups? Is it because of 3 partitions?




          Certainly not. It is just a coincidence. You seem to have run the application 3 times already and the topic has 3 partitions.



          Let's start over to back it up.



          I deleted all consumer groups to make sure that we start afresh.



          $ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
          spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0
          spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0

          $ ./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --delete --group spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0
          Deletion of requested consumer groups ('spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0') was successful.

          $ ./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --delete --group spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0
          Deletion of requested consumer groups ('spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0') was successful.

          $ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
          // nothing got printed out


          I created a topic with 5 partitions.



          $ ./bin/kafka-topics.sh --create --zookeeper :2181 --topic jacek-five-partitions --partitions 5 --replication-factor 1
          Created topic "jacek-five-partitions".

          $ ./bin/kafka-topics.sh --describe --zookeeper :2181 --topic jacek-five-partitions
          Topic:jacek-five-partitions PartitionCount:5 ReplicationFactor:1 Configs:
          Topic: jacek-five-partitions Partition: 0 Leader: 0 Replicas: 0 Isr: 0
          Topic: jacek-five-partitions Partition: 1 Leader: 0 Replicas: 0 Isr: 0
          Topic: jacek-five-partitions Partition: 2 Leader: 0 Replicas: 0 Isr: 0
          Topic: jacek-five-partitions Partition: 3 Leader: 0 Replicas: 0 Isr: 0
          Topic: jacek-five-partitions Partition: 4 Leader: 0 Replicas: 0 Isr: 0


          The code I use is as follows:



          import org.apache.spark.sql.SparkSession
          import org.apache.spark.sql.streaming.Trigger

          object SparkApp extends App

          val spark = SparkSession.builder.master("local[*]").getOrCreate()
          import spark.implicits._
          val q = spark
          .readStream
          .format("kafka")
          .option("startingoffsets", "latest")
          .option("subscribe", "jacek-five-partitions")
          .option("kafka.bootstrap.servers", ":9092")
          .load
          .select($"value" cast "string")
          .writeStream
          .format("console")
          .trigger(Trigger.ProcessingTime("30 seconds"))
          .start
          q.awaitTermination()



          When I run the above Spark Structured Streaming application, I have just one consumer group created.



          $ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
          spark-kafka-source-380da653-c829-45db-859f-09aa9b37784d-338965656-driver-0


          And that makes sense since all the Spark processing should use as many Kafka consumers as there are partitions, but regardless of the number of consumers, there should only be one consumer group (or the Kafka consumers would consume all records and there would be duplicates).





          2) Is there any way I can get these consumer group names in spark application?




          There is no public API for this so the answer is no.



          You could however "hack" Spark and go below the public API up to the internal Kafka consumer that uses this line:



          val uniqueGroupId = s"spark-kafka-source-$UUID.randomUUID-$metadataPath.hashCode"


          Or even this line to be precise:



          val kafkaOffsetReader = new KafkaOffsetReader(
          strategy(caseInsensitiveParams),
          kafkaParamsForDriver(specifiedKafkaParams),
          parameters,
          driverGroupIdPrefix = s"$uniqueGroupId-driver")


          Just find the KafkaMicroBatchReader for the Kafka data source, request it for the KafkaOffsetReader that knows groupId. That seems doable.





          Even though my spark application was still running, after some time these group names didn't show up in consumer groups list. Is this because all the data was consumed by spark application and there was no more data in that kafka topic?




          Could that be related to KIP-211: Revise Expiration Semantics of Consumer Group Offsets which says:




          The offset of a topic partition within a consumer group expires when the expiration timestamp associated with that partition is reached. This expiration timestamp is usually affected by the broker config offsets.retention.minutes, unless user overrides that default and uses a custom retention.






          4) will it create new consumer group id if new data arrives or the name of consumer group will remain same?




          Will remain same.



          Moreover, the consumer group must not be deleted when at least one consumer from the group is active.






          share|improve this answer























          • Thanks a lot Jacek! I figured out the reason for multiple consumer groups, my application has single readstream but 3 writestreams each having separate checkpoint dir. When i commented 2 writestreams it was creating just one consumer group. I was under impression that one readstream will create single consumer and writestream has nothing to do with it. I noticed one more thing related to kakfa partition which I have posted separately here stackoverflow.com/questions/55022999/…

            – codergirl
            Mar 7 at 14:14











          • "3 writestreams each having separate checkpoint dir." creates any consumer groups? Why would it? Consumer groups are for consumers (not producers). I'd have to review that part of code closer.

            – Jacek Laskowski
            Mar 8 at 9:18











          • can you please share your email id where I can share my code?

            – codergirl
            Mar 8 at 9:58











          • Paste the code to the question.

            – Jacek Laskowski
            Mar 8 at 13:41











          • I've updated the code in question

            – codergirl
            Mar 11 at 6:08










          Your Answer






          StackExchange.ifUsing("editor", function ()
          StackExchange.using("externalEditor", function ()
          StackExchange.using("snippets", function ()
          StackExchange.snippets.init();
          );
          );
          , "code-snippets");

          StackExchange.ready(function()
          var channelOptions =
          tags: "".split(" "),
          id: "1"
          ;
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function()
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled)
          StackExchange.using("snippets", function()
          createEditor();
          );

          else
          createEditor();

          );

          function createEditor()
          StackExchange.prepareEditor(
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader:
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          ,
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          );



          );













          draft saved

          draft discarded


















          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54924666%2fhow-to-know-the-name-of-kafka-consumer-group-that-streaming-query-uses-for-kafka%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          0















          1) Why does it create 3 consumer groups? Is it because of 3 partitions?




          Certainly not. It is just a coincidence. You seem to have run the application 3 times already and the topic has 3 partitions.



          Let's start over to back it up.



          I deleted all consumer groups to make sure that we start afresh.



          $ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
          spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0
          spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0

          $ ./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --delete --group spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0
          Deletion of requested consumer groups ('spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0') was successful.

          $ ./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --delete --group spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0
          Deletion of requested consumer groups ('spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0') was successful.

          $ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
          // nothing got printed out


          I created a topic with 5 partitions.



          $ ./bin/kafka-topics.sh --create --zookeeper :2181 --topic jacek-five-partitions --partitions 5 --replication-factor 1
          Created topic "jacek-five-partitions".

          $ ./bin/kafka-topics.sh --describe --zookeeper :2181 --topic jacek-five-partitions
          Topic:jacek-five-partitions PartitionCount:5 ReplicationFactor:1 Configs:
          Topic: jacek-five-partitions Partition: 0 Leader: 0 Replicas: 0 Isr: 0
          Topic: jacek-five-partitions Partition: 1 Leader: 0 Replicas: 0 Isr: 0
          Topic: jacek-five-partitions Partition: 2 Leader: 0 Replicas: 0 Isr: 0
          Topic: jacek-five-partitions Partition: 3 Leader: 0 Replicas: 0 Isr: 0
          Topic: jacek-five-partitions Partition: 4 Leader: 0 Replicas: 0 Isr: 0


          The code I use is as follows:



          import org.apache.spark.sql.SparkSession
          import org.apache.spark.sql.streaming.Trigger

          object SparkApp extends App

          val spark = SparkSession.builder.master("local[*]").getOrCreate()
          import spark.implicits._
          val q = spark
          .readStream
          .format("kafka")
          .option("startingoffsets", "latest")
          .option("subscribe", "jacek-five-partitions")
          .option("kafka.bootstrap.servers", ":9092")
          .load
          .select($"value" cast "string")
          .writeStream
          .format("console")
          .trigger(Trigger.ProcessingTime("30 seconds"))
          .start
          q.awaitTermination()



          When I run the above Spark Structured Streaming application, I have just one consumer group created.



          $ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
          spark-kafka-source-380da653-c829-45db-859f-09aa9b37784d-338965656-driver-0


          And that makes sense since all the Spark processing should use as many Kafka consumers as there are partitions, but regardless of the number of consumers, there should only be one consumer group (or the Kafka consumers would consume all records and there would be duplicates).





          2) Is there any way I can get these consumer group names in spark application?




          There is no public API for this so the answer is no.



          You could however "hack" Spark and go below the public API up to the internal Kafka consumer that uses this line:



          val uniqueGroupId = s"spark-kafka-source-$UUID.randomUUID-$metadataPath.hashCode"


          Or even this line to be precise:



          val kafkaOffsetReader = new KafkaOffsetReader(
          strategy(caseInsensitiveParams),
          kafkaParamsForDriver(specifiedKafkaParams),
          parameters,
          driverGroupIdPrefix = s"$uniqueGroupId-driver")


          Just find the KafkaMicroBatchReader for the Kafka data source, request it for the KafkaOffsetReader that knows groupId. That seems doable.





          Even though my spark application was still running, after some time these group names didn't show up in consumer groups list. Is this because all the data was consumed by spark application and there was no more data in that kafka topic?




          Could that be related to KIP-211: Revise Expiration Semantics of Consumer Group Offsets which says:




          The offset of a topic partition within a consumer group expires when the expiration timestamp associated with that partition is reached. This expiration timestamp is usually affected by the broker config offsets.retention.minutes, unless user overrides that default and uses a custom retention.






          4) will it create new consumer group id if new data arrives or the name of consumer group will remain same?




          Will remain same.



          Moreover, the consumer group must not be deleted when at least one consumer from the group is active.






          share|improve this answer























          • Thanks a lot Jacek! I figured out the reason for multiple consumer groups, my application has single readstream but 3 writestreams each having separate checkpoint dir. When i commented 2 writestreams it was creating just one consumer group. I was under impression that one readstream will create single consumer and writestream has nothing to do with it. I noticed one more thing related to kakfa partition which I have posted separately here stackoverflow.com/questions/55022999/…

            – codergirl
            Mar 7 at 14:14











          • "3 writestreams each having separate checkpoint dir." creates any consumer groups? Why would it? Consumer groups are for consumers (not producers). I'd have to review that part of code closer.

            – Jacek Laskowski
            Mar 8 at 9:18











          • can you please share your email id where I can share my code?

            – codergirl
            Mar 8 at 9:58











          • Paste the code to the question.

            – Jacek Laskowski
            Mar 8 at 13:41











          • I've updated the code in question

            – codergirl
            Mar 11 at 6:08















          0















          1) Why does it create 3 consumer groups? Is it because of 3 partitions?




          Certainly not. It is just a coincidence. You seem to have run the application 3 times already and the topic has 3 partitions.



          Let's start over to back it up.



          I deleted all consumer groups to make sure that we start afresh.



          $ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
          spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0
          spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0

          $ ./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --delete --group spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0
          Deletion of requested consumer groups ('spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0') was successful.

          $ ./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --delete --group spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0
          Deletion of requested consumer groups ('spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0') was successful.

          $ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
          // nothing got printed out


          I created a topic with 5 partitions.



          $ ./bin/kafka-topics.sh --create --zookeeper :2181 --topic jacek-five-partitions --partitions 5 --replication-factor 1
          Created topic "jacek-five-partitions".

          $ ./bin/kafka-topics.sh --describe --zookeeper :2181 --topic jacek-five-partitions
          Topic:jacek-five-partitions PartitionCount:5 ReplicationFactor:1 Configs:
          Topic: jacek-five-partitions Partition: 0 Leader: 0 Replicas: 0 Isr: 0
          Topic: jacek-five-partitions Partition: 1 Leader: 0 Replicas: 0 Isr: 0
          Topic: jacek-five-partitions Partition: 2 Leader: 0 Replicas: 0 Isr: 0
          Topic: jacek-five-partitions Partition: 3 Leader: 0 Replicas: 0 Isr: 0
          Topic: jacek-five-partitions Partition: 4 Leader: 0 Replicas: 0 Isr: 0


          The code I use is as follows:



          import org.apache.spark.sql.SparkSession
          import org.apache.spark.sql.streaming.Trigger

          object SparkApp extends App

          val spark = SparkSession.builder.master("local[*]").getOrCreate()
          import spark.implicits._
          val q = spark
          .readStream
          .format("kafka")
          .option("startingoffsets", "latest")
          .option("subscribe", "jacek-five-partitions")
          .option("kafka.bootstrap.servers", ":9092")
          .load
          .select($"value" cast "string")
          .writeStream
          .format("console")
          .trigger(Trigger.ProcessingTime("30 seconds"))
          .start
          q.awaitTermination()



          When I run the above Spark Structured Streaming application, I have just one consumer group created.



          $ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
          spark-kafka-source-380da653-c829-45db-859f-09aa9b37784d-338965656-driver-0


          And that makes sense since all the Spark processing should use as many Kafka consumers as there are partitions, but regardless of the number of consumers, there should only be one consumer group (or the Kafka consumers would consume all records and there would be duplicates).





          2) Is there any way I can get these consumer group names in spark application?




          There is no public API for this so the answer is no.



          You could however "hack" Spark and go below the public API up to the internal Kafka consumer that uses this line:



          val uniqueGroupId = s"spark-kafka-source-$UUID.randomUUID-$metadataPath.hashCode"


          Or even this line to be precise:



          val kafkaOffsetReader = new KafkaOffsetReader(
          strategy(caseInsensitiveParams),
          kafkaParamsForDriver(specifiedKafkaParams),
          parameters,
          driverGroupIdPrefix = s"$uniqueGroupId-driver")


          Just find the KafkaMicroBatchReader for the Kafka data source, request it for the KafkaOffsetReader that knows groupId. That seems doable.





          Even though my spark application was still running, after some time these group names didn't show up in consumer groups list. Is this because all the data was consumed by spark application and there was no more data in that kafka topic?




          Could that be related to KIP-211: Revise Expiration Semantics of Consumer Group Offsets which says:




          The offset of a topic partition within a consumer group expires when the expiration timestamp associated with that partition is reached. This expiration timestamp is usually affected by the broker config offsets.retention.minutes, unless user overrides that default and uses a custom retention.






          4) will it create new consumer group id if new data arrives or the name of consumer group will remain same?




          Will remain same.



          Moreover, the consumer group must not be deleted when at least one consumer from the group is active.






          share|improve this answer























          • Thanks a lot Jacek! I figured out the reason for multiple consumer groups, my application has single readstream but 3 writestreams each having separate checkpoint dir. When i commented 2 writestreams it was creating just one consumer group. I was under impression that one readstream will create single consumer and writestream has nothing to do with it. I noticed one more thing related to kakfa partition which I have posted separately here stackoverflow.com/questions/55022999/…

            – codergirl
            Mar 7 at 14:14











          • "3 writestreams each having separate checkpoint dir." creates any consumer groups? Why would it? Consumer groups are for consumers (not producers). I'd have to review that part of code closer.

            – Jacek Laskowski
            Mar 8 at 9:18











          • can you please share your email id where I can share my code?

            – codergirl
            Mar 8 at 9:58











          • Paste the code to the question.

            – Jacek Laskowski
            Mar 8 at 13:41











          • I've updated the code in question

            – codergirl
            Mar 11 at 6:08













          0












          0








          0








          1) Why does it create 3 consumer groups? Is it because of 3 partitions?




          Certainly not. It is just a coincidence. You seem to have run the application 3 times already and the topic has 3 partitions.



          Let's start over to back it up.



          I deleted all consumer groups to make sure that we start afresh.



          $ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
          spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0
          spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0

          $ ./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --delete --group spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0
          Deletion of requested consumer groups ('spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0') was successful.

          $ ./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --delete --group spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0
          Deletion of requested consumer groups ('spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0') was successful.

          $ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
          // nothing got printed out


          I created a topic with 5 partitions.



          $ ./bin/kafka-topics.sh --create --zookeeper :2181 --topic jacek-five-partitions --partitions 5 --replication-factor 1
          Created topic "jacek-five-partitions".

          $ ./bin/kafka-topics.sh --describe --zookeeper :2181 --topic jacek-five-partitions
          Topic:jacek-five-partitions PartitionCount:5 ReplicationFactor:1 Configs:
          Topic: jacek-five-partitions Partition: 0 Leader: 0 Replicas: 0 Isr: 0
          Topic: jacek-five-partitions Partition: 1 Leader: 0 Replicas: 0 Isr: 0
          Topic: jacek-five-partitions Partition: 2 Leader: 0 Replicas: 0 Isr: 0
          Topic: jacek-five-partitions Partition: 3 Leader: 0 Replicas: 0 Isr: 0
          Topic: jacek-five-partitions Partition: 4 Leader: 0 Replicas: 0 Isr: 0


          The code I use is as follows:



          import org.apache.spark.sql.SparkSession
          import org.apache.spark.sql.streaming.Trigger

          object SparkApp extends App

          val spark = SparkSession.builder.master("local[*]").getOrCreate()
          import spark.implicits._
          val q = spark
          .readStream
          .format("kafka")
          .option("startingoffsets", "latest")
          .option("subscribe", "jacek-five-partitions")
          .option("kafka.bootstrap.servers", ":9092")
          .load
          .select($"value" cast "string")
          .writeStream
          .format("console")
          .trigger(Trigger.ProcessingTime("30 seconds"))
          .start
          q.awaitTermination()



          When I run the above Spark Structured Streaming application, I have just one consumer group created.



          $ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
          spark-kafka-source-380da653-c829-45db-859f-09aa9b37784d-338965656-driver-0


          And that makes sense since all the Spark processing should use as many Kafka consumers as there are partitions, but regardless of the number of consumers, there should only be one consumer group (or the Kafka consumers would consume all records and there would be duplicates).





          2) Is there any way I can get these consumer group names in spark application?




          There is no public API for this so the answer is no.



          You could however "hack" Spark and go below the public API up to the internal Kafka consumer that uses this line:



          val uniqueGroupId = s"spark-kafka-source-$UUID.randomUUID-$metadataPath.hashCode"


          Or even this line to be precise:



          val kafkaOffsetReader = new KafkaOffsetReader(
          strategy(caseInsensitiveParams),
          kafkaParamsForDriver(specifiedKafkaParams),
          parameters,
          driverGroupIdPrefix = s"$uniqueGroupId-driver")


          Just find the KafkaMicroBatchReader for the Kafka data source, request it for the KafkaOffsetReader that knows groupId. That seems doable.





          Even though my spark application was still running, after some time these group names didn't show up in consumer groups list. Is this because all the data was consumed by spark application and there was no more data in that kafka topic?




          Could that be related to KIP-211: Revise Expiration Semantics of Consumer Group Offsets which says:




          The offset of a topic partition within a consumer group expires when the expiration timestamp associated with that partition is reached. This expiration timestamp is usually affected by the broker config offsets.retention.minutes, unless user overrides that default and uses a custom retention.






          4) will it create new consumer group id if new data arrives or the name of consumer group will remain same?




          Will remain same.



          Moreover, the consumer group must not be deleted when at least one consumer from the group is active.






          share|improve this answer














          1) Why does it create 3 consumer groups? Is it because of 3 partitions?




          Certainly not. It is just a coincidence. You seem to have run the application 3 times already and the topic has 3 partitions.



          Let's start over to back it up.



          I deleted all consumer groups to make sure that we start afresh.



          $ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
          spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0
          spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0

          $ ./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --delete --group spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0
          Deletion of requested consumer groups ('spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0') was successful.

          $ ./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --delete --group spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0
          Deletion of requested consumer groups ('spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0') was successful.

          $ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
          // nothing got printed out


          I created a topic with 5 partitions.



          $ ./bin/kafka-topics.sh --create --zookeeper :2181 --topic jacek-five-partitions --partitions 5 --replication-factor 1
          Created topic "jacek-five-partitions".

          $ ./bin/kafka-topics.sh --describe --zookeeper :2181 --topic jacek-five-partitions
          Topic:jacek-five-partitions PartitionCount:5 ReplicationFactor:1 Configs:
          Topic: jacek-five-partitions Partition: 0 Leader: 0 Replicas: 0 Isr: 0
          Topic: jacek-five-partitions Partition: 1 Leader: 0 Replicas: 0 Isr: 0
          Topic: jacek-five-partitions Partition: 2 Leader: 0 Replicas: 0 Isr: 0
          Topic: jacek-five-partitions Partition: 3 Leader: 0 Replicas: 0 Isr: 0
          Topic: jacek-five-partitions Partition: 4 Leader: 0 Replicas: 0 Isr: 0


          The code I use is as follows:



          import org.apache.spark.sql.SparkSession
          import org.apache.spark.sql.streaming.Trigger

          object SparkApp extends App

          val spark = SparkSession.builder.master("local[*]").getOrCreate()
          import spark.implicits._
          val q = spark
          .readStream
          .format("kafka")
          .option("startingoffsets", "latest")
          .option("subscribe", "jacek-five-partitions")
          .option("kafka.bootstrap.servers", ":9092")
          .load
          .select($"value" cast "string")
          .writeStream
          .format("console")
          .trigger(Trigger.ProcessingTime("30 seconds"))
          .start
          q.awaitTermination()



          When I run the above Spark Structured Streaming application, I have just one consumer group created.



          $ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
          spark-kafka-source-380da653-c829-45db-859f-09aa9b37784d-338965656-driver-0


          And that makes sense since all the Spark processing should use as many Kafka consumers as there are partitions, but regardless of the number of consumers, there should only be one consumer group (or the Kafka consumers would consume all records and there would be duplicates).





          2) Is there any way I can get these consumer group names in spark application?




          There is no public API for this so the answer is no.



          You could however "hack" Spark and go below the public API up to the internal Kafka consumer that uses this line:



          val uniqueGroupId = s"spark-kafka-source-$UUID.randomUUID-$metadataPath.hashCode"


          Or even this line to be precise:



          val kafkaOffsetReader = new KafkaOffsetReader(
          strategy(caseInsensitiveParams),
          kafkaParamsForDriver(specifiedKafkaParams),
          parameters,
          driverGroupIdPrefix = s"$uniqueGroupId-driver")


          Just find the KafkaMicroBatchReader for the Kafka data source, request it for the KafkaOffsetReader that knows groupId. That seems doable.





          Even though my spark application was still running, after some time these group names didn't show up in consumer groups list. Is this because all the data was consumed by spark application and there was no more data in that kafka topic?




          Could that be related to KIP-211: Revise Expiration Semantics of Consumer Group Offsets which says:




          The offset of a topic partition within a consumer group expires when the expiration timestamp associated with that partition is reached. This expiration timestamp is usually affected by the broker config offsets.retention.minutes, unless user overrides that default and uses a custom retention.






          4) will it create new consumer group id if new data arrives or the name of consumer group will remain same?




          Will remain same.



          Moreover, the consumer group must not be deleted when at least one consumer from the group is active.







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Mar 7 at 13:15









          Jacek LaskowskiJacek Laskowski

          46k18136275




          46k18136275












          • Thanks a lot Jacek! I figured out the reason for multiple consumer groups, my application has single readstream but 3 writestreams each having separate checkpoint dir. When i commented 2 writestreams it was creating just one consumer group. I was under impression that one readstream will create single consumer and writestream has nothing to do with it. I noticed one more thing related to kakfa partition which I have posted separately here stackoverflow.com/questions/55022999/…

            – codergirl
            Mar 7 at 14:14











          • "3 writestreams each having separate checkpoint dir." creates any consumer groups? Why would it? Consumer groups are for consumers (not producers). I'd have to review that part of code closer.

            – Jacek Laskowski
            Mar 8 at 9:18











          • can you please share your email id where I can share my code?

            – codergirl
            Mar 8 at 9:58











          • Paste the code to the question.

            – Jacek Laskowski
            Mar 8 at 13:41











          • I've updated the code in question

            – codergirl
            Mar 11 at 6:08

















          • Thanks a lot Jacek! I figured out the reason for multiple consumer groups, my application has single readstream but 3 writestreams each having separate checkpoint dir. When i commented 2 writestreams it was creating just one consumer group. I was under impression that one readstream will create single consumer and writestream has nothing to do with it. I noticed one more thing related to kakfa partition which I have posted separately here stackoverflow.com/questions/55022999/…

            – codergirl
            Mar 7 at 14:14











          • "3 writestreams each having separate checkpoint dir." creates any consumer groups? Why would it? Consumer groups are for consumers (not producers). I'd have to review that part of code closer.

            – Jacek Laskowski
            Mar 8 at 9:18











          • can you please share your email id where I can share my code?

            – codergirl
            Mar 8 at 9:58











          • Paste the code to the question.

            – Jacek Laskowski
            Mar 8 at 13:41











          • I've updated the code in question

            – codergirl
            Mar 11 at 6:08
















          Thanks a lot Jacek! I figured out the reason for multiple consumer groups, my application has single readstream but 3 writestreams each having separate checkpoint dir. When i commented 2 writestreams it was creating just one consumer group. I was under impression that one readstream will create single consumer and writestream has nothing to do with it. I noticed one more thing related to kakfa partition which I have posted separately here stackoverflow.com/questions/55022999/…

          – codergirl
          Mar 7 at 14:14





          Thanks a lot Jacek! I figured out the reason for multiple consumer groups, my application has single readstream but 3 writestreams each having separate checkpoint dir. When i commented 2 writestreams it was creating just one consumer group. I was under impression that one readstream will create single consumer and writestream has nothing to do with it. I noticed one more thing related to kakfa partition which I have posted separately here stackoverflow.com/questions/55022999/…

          – codergirl
          Mar 7 at 14:14













          "3 writestreams each having separate checkpoint dir." creates any consumer groups? Why would it? Consumer groups are for consumers (not producers). I'd have to review that part of code closer.

          – Jacek Laskowski
          Mar 8 at 9:18





          "3 writestreams each having separate checkpoint dir." creates any consumer groups? Why would it? Consumer groups are for consumers (not producers). I'd have to review that part of code closer.

          – Jacek Laskowski
          Mar 8 at 9:18













          can you please share your email id where I can share my code?

          – codergirl
          Mar 8 at 9:58





          can you please share your email id where I can share my code?

          – codergirl
          Mar 8 at 9:58













          Paste the code to the question.

          – Jacek Laskowski
          Mar 8 at 13:41





          Paste the code to the question.

          – Jacek Laskowski
          Mar 8 at 13:41













          I've updated the code in question

          – codergirl
          Mar 11 at 6:08





          I've updated the code in question

          – codergirl
          Mar 11 at 6:08



















          draft saved

          draft discarded
















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid


          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.

          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54924666%2fhow-to-know-the-name-of-kafka-consumer-group-that-streaming-query-uses-for-kafka%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          1928 у кіно

          Захаров Федір Захарович

          Ель Греко