How to know the name of Kafka consumer group that streaming query uses for Kafka data source?Getting Kafka usage detailsKafka does not create consumer group when reading from single partitionKafka Consumer Group Partition issueHow to set offset committed by the consumer group using Spark's Direct Stream for Kafka?Spark Streaming from Kafka Source Go Back to a Checkpoint or RewindingSpark-kafka streaming multiple consumer group for same topic does not workException in thread “main” org.apache.spark.sql.AnalysisException:Stream data using Spark from a partiticular partition within Kafka topicsKafka manage offset between consumer groupHow many Kafka consumers does a streaming query use for execution?Kafka consumer group and partitions with Spark structured streamingKafka does not create consumer group when reading from single partition
How do we know the LHC results are robust?
How does it work when somebody invests in my business?
Was a professor correct to chastise me for writing "Prof. X" rather than "Professor X"?
How did Doctor Strange see the winning outcome in Avengers: Infinity War?
How long to clear the 'suck zone' of a turbofan after start is initiated?
A problem in Probability theory
What grammatical function is や performing here?
Was Spock the First Vulcan in Starfleet?
Do sorcerers' Subtle Spells require a skill check to be unseen?
Opposite of a diet
Is the destination of a commercial flight important for the pilot?
Energy of the particles in the particle accelerator
Unreliable Magic - Is it worth it?
Two monoidal structures and copowering
India just shot down a satellite from the ground. At what altitude range is the resulting debris field?
Why not increase contact surface when reentering the atmosphere?
What is the best translation for "slot" in the context of multiplayer video games?
Large drywall patch supports
Lay out the Carpet
How do I rename a Linux host without needing to reboot for the rename to take effect?
Inappropriate reference requests from Journal reviewers
Hostile work environment after whistle-blowing on coworker and our boss. What do I do?
when is out of tune ok?
Where does the Z80 processor start executing from?
How to know the name of Kafka consumer group that streaming query uses for Kafka data source?
Getting Kafka usage detailsKafka does not create consumer group when reading from single partitionKafka Consumer Group Partition issueHow to set offset committed by the consumer group using Spark's Direct Stream for Kafka?Spark Streaming from Kafka Source Go Back to a Checkpoint or RewindingSpark-kafka streaming multiple consumer group for same topic does not workException in thread “main” org.apache.spark.sql.AnalysisException:Stream data using Spark from a partiticular partition within Kafka topicsKafka manage offset between consumer groupHow many Kafka consumers does a streaming query use for execution?Kafka consumer group and partitions with Spark structured streamingKafka does not create consumer group when reading from single partition
I am consuming data from kafka topic through spark structured streaming, the topic has 3 partitions. As Spark structured streaming does not allow you to explicitly provide group.id and assigns some random id to consumer, I tried to check the consumer group id's using below kafka command
./kafka-consumer-groups.sh --bootstrap-server kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092 --list
output
spark-kafka-source-054e8dac-bea9-46e8-9374-8298daafcd23--1587684247-driver-0
spark-kafka-source-756c08e8-6a84-447c-8326-5af1ef0412f5-209267112-driver-0
spark-kafka-source-9528b191-4322-4334-923d-8c1500ef8194-2006218471-driver-0
Below are my questions
1) Why does it create 3 consumer groups? Is it because of 3 partitions?
2) Is there any way I can get these consumer group names in spark application?
3) Even though my spark application was still running, after some time these group names didn't show up in consumer groups list. Is this because all the data was consumed by spark application and there was no more data in that kafka topic?
4) If my assumption is right about point 3, will it create new consumer group id if new data arrives or the name of consumer group will remain same?
Below is my read stream
val inputDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topic)
// .option("assign"," ""+topic+"":[0]")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 60000)
.load()
I have 3 writestreams in application as below
val df = inputDf.selectExpr("CAST(value AS STRING)","CAST(topic AS STRING)","CAST (partition AS INT)","CAST (offset AS INT)","CAST (timestamp AS STRING)")
val df1 = inputDf.selectExpr("CAST (partition AS INT)","CAST (offset AS INT)","CAST (timestamp AS STRING)")
//First stream
val checkpoint_loc1= "/warehouse/test_duplicate/download/chk1"
df1.agg(min("offset"), max("offset"))
.writeStream
.foreach(writer)
.outputMode("complete")
.option("checkpointLocation", checkpoint_loc1).start()
val result = df.select(
df1("result").getItem("_1").as("col1"),
df1("result").getItem("_2").as("col2"),
df1("result").getItem("_5").as("eventdate"))
val distDates = result.select(result("eventdate")).distinct
//Second stream
val checkpoint_loc2= "/warehouse/test_duplicate/download/chk2"
distDates.writeStream.foreach(writer1)
.option("checkpointLocation", checkpoint_loc2).start()
//Third stream
val kafkaOutput =result.writeStream
.outputMode("append")
.format("orc")
.option("path",data_dir)
.option("checkpointLocation", checkpoint_loc3)
.start()
The streaming query is used only once in the code and there are no joins.
Execution plan
== Parsed Logical Plan ==
StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
== Analyzed Logical Plan ==
key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int
StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
== Optimized Logical Plan ==
StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
== Physical Plan ==
StreamingRelation kafka, [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]
apache-spark apache-kafka spark-structured-streaming
add a comment |
I am consuming data from kafka topic through spark structured streaming, the topic has 3 partitions. As Spark structured streaming does not allow you to explicitly provide group.id and assigns some random id to consumer, I tried to check the consumer group id's using below kafka command
./kafka-consumer-groups.sh --bootstrap-server kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092 --list
output
spark-kafka-source-054e8dac-bea9-46e8-9374-8298daafcd23--1587684247-driver-0
spark-kafka-source-756c08e8-6a84-447c-8326-5af1ef0412f5-209267112-driver-0
spark-kafka-source-9528b191-4322-4334-923d-8c1500ef8194-2006218471-driver-0
Below are my questions
1) Why does it create 3 consumer groups? Is it because of 3 partitions?
2) Is there any way I can get these consumer group names in spark application?
3) Even though my spark application was still running, after some time these group names didn't show up in consumer groups list. Is this because all the data was consumed by spark application and there was no more data in that kafka topic?
4) If my assumption is right about point 3, will it create new consumer group id if new data arrives or the name of consumer group will remain same?
Below is my read stream
val inputDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topic)
// .option("assign"," ""+topic+"":[0]")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 60000)
.load()
I have 3 writestreams in application as below
val df = inputDf.selectExpr("CAST(value AS STRING)","CAST(topic AS STRING)","CAST (partition AS INT)","CAST (offset AS INT)","CAST (timestamp AS STRING)")
val df1 = inputDf.selectExpr("CAST (partition AS INT)","CAST (offset AS INT)","CAST (timestamp AS STRING)")
//First stream
val checkpoint_loc1= "/warehouse/test_duplicate/download/chk1"
df1.agg(min("offset"), max("offset"))
.writeStream
.foreach(writer)
.outputMode("complete")
.option("checkpointLocation", checkpoint_loc1).start()
val result = df.select(
df1("result").getItem("_1").as("col1"),
df1("result").getItem("_2").as("col2"),
df1("result").getItem("_5").as("eventdate"))
val distDates = result.select(result("eventdate")).distinct
//Second stream
val checkpoint_loc2= "/warehouse/test_duplicate/download/chk2"
distDates.writeStream.foreach(writer1)
.option("checkpointLocation", checkpoint_loc2).start()
//Third stream
val kafkaOutput =result.writeStream
.outputMode("append")
.format("orc")
.option("path",data_dir)
.option("checkpointLocation", checkpoint_loc3)
.start()
The streaming query is used only once in the code and there are no joins.
Execution plan
== Parsed Logical Plan ==
StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
== Analyzed Logical Plan ==
key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int
StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
== Optimized Logical Plan ==
StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
== Physical Plan ==
StreamingRelation kafka, [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]
apache-spark apache-kafka spark-structured-streaming
add a comment |
I am consuming data from kafka topic through spark structured streaming, the topic has 3 partitions. As Spark structured streaming does not allow you to explicitly provide group.id and assigns some random id to consumer, I tried to check the consumer group id's using below kafka command
./kafka-consumer-groups.sh --bootstrap-server kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092 --list
output
spark-kafka-source-054e8dac-bea9-46e8-9374-8298daafcd23--1587684247-driver-0
spark-kafka-source-756c08e8-6a84-447c-8326-5af1ef0412f5-209267112-driver-0
spark-kafka-source-9528b191-4322-4334-923d-8c1500ef8194-2006218471-driver-0
Below are my questions
1) Why does it create 3 consumer groups? Is it because of 3 partitions?
2) Is there any way I can get these consumer group names in spark application?
3) Even though my spark application was still running, after some time these group names didn't show up in consumer groups list. Is this because all the data was consumed by spark application and there was no more data in that kafka topic?
4) If my assumption is right about point 3, will it create new consumer group id if new data arrives or the name of consumer group will remain same?
Below is my read stream
val inputDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topic)
// .option("assign"," ""+topic+"":[0]")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 60000)
.load()
I have 3 writestreams in application as below
val df = inputDf.selectExpr("CAST(value AS STRING)","CAST(topic AS STRING)","CAST (partition AS INT)","CAST (offset AS INT)","CAST (timestamp AS STRING)")
val df1 = inputDf.selectExpr("CAST (partition AS INT)","CAST (offset AS INT)","CAST (timestamp AS STRING)")
//First stream
val checkpoint_loc1= "/warehouse/test_duplicate/download/chk1"
df1.agg(min("offset"), max("offset"))
.writeStream
.foreach(writer)
.outputMode("complete")
.option("checkpointLocation", checkpoint_loc1).start()
val result = df.select(
df1("result").getItem("_1").as("col1"),
df1("result").getItem("_2").as("col2"),
df1("result").getItem("_5").as("eventdate"))
val distDates = result.select(result("eventdate")).distinct
//Second stream
val checkpoint_loc2= "/warehouse/test_duplicate/download/chk2"
distDates.writeStream.foreach(writer1)
.option("checkpointLocation", checkpoint_loc2).start()
//Third stream
val kafkaOutput =result.writeStream
.outputMode("append")
.format("orc")
.option("path",data_dir)
.option("checkpointLocation", checkpoint_loc3)
.start()
The streaming query is used only once in the code and there are no joins.
Execution plan
== Parsed Logical Plan ==
StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
== Analyzed Logical Plan ==
key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int
StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
== Optimized Logical Plan ==
StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
== Physical Plan ==
StreamingRelation kafka, [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]
apache-spark apache-kafka spark-structured-streaming
I am consuming data from kafka topic through spark structured streaming, the topic has 3 partitions. As Spark structured streaming does not allow you to explicitly provide group.id and assigns some random id to consumer, I tried to check the consumer group id's using below kafka command
./kafka-consumer-groups.sh --bootstrap-server kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092 --list
output
spark-kafka-source-054e8dac-bea9-46e8-9374-8298daafcd23--1587684247-driver-0
spark-kafka-source-756c08e8-6a84-447c-8326-5af1ef0412f5-209267112-driver-0
spark-kafka-source-9528b191-4322-4334-923d-8c1500ef8194-2006218471-driver-0
Below are my questions
1) Why does it create 3 consumer groups? Is it because of 3 partitions?
2) Is there any way I can get these consumer group names in spark application?
3) Even though my spark application was still running, after some time these group names didn't show up in consumer groups list. Is this because all the data was consumed by spark application and there was no more data in that kafka topic?
4) If my assumption is right about point 3, will it create new consumer group id if new data arrives or the name of consumer group will remain same?
Below is my read stream
val inputDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topic)
// .option("assign"," ""+topic+"":[0]")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 60000)
.load()
I have 3 writestreams in application as below
val df = inputDf.selectExpr("CAST(value AS STRING)","CAST(topic AS STRING)","CAST (partition AS INT)","CAST (offset AS INT)","CAST (timestamp AS STRING)")
val df1 = inputDf.selectExpr("CAST (partition AS INT)","CAST (offset AS INT)","CAST (timestamp AS STRING)")
//First stream
val checkpoint_loc1= "/warehouse/test_duplicate/download/chk1"
df1.agg(min("offset"), max("offset"))
.writeStream
.foreach(writer)
.outputMode("complete")
.option("checkpointLocation", checkpoint_loc1).start()
val result = df.select(
df1("result").getItem("_1").as("col1"),
df1("result").getItem("_2").as("col2"),
df1("result").getItem("_5").as("eventdate"))
val distDates = result.select(result("eventdate")).distinct
//Second stream
val checkpoint_loc2= "/warehouse/test_duplicate/download/chk2"
distDates.writeStream.foreach(writer1)
.option("checkpointLocation", checkpoint_loc2).start()
//Third stream
val kafkaOutput =result.writeStream
.outputMode("append")
.format("orc")
.option("path",data_dir)
.option("checkpointLocation", checkpoint_loc3)
.start()
The streaming query is used only once in the code and there are no joins.
Execution plan
== Parsed Logical Plan ==
StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
== Analyzed Logical Plan ==
key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int
StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
== Optimized Logical Plan ==
StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
== Physical Plan ==
StreamingRelation kafka, [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]
apache-spark apache-kafka spark-structured-streaming
apache-spark apache-kafka spark-structured-streaming
edited Mar 8 at 14:26
codergirl
asked Feb 28 at 11:27
codergirlcodergirl
1792419
1792419
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
1) Why does it create 3 consumer groups? Is it because of 3 partitions?
Certainly not. It is just a coincidence. You seem to have run the application 3 times already and the topic has 3 partitions.
Let's start over to back it up.
I deleted all consumer groups to make sure that we start afresh.
$ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0
spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0
$ ./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --delete --group spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0
Deletion of requested consumer groups ('spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0') was successful.
$ ./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --delete --group spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0
Deletion of requested consumer groups ('spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0') was successful.
$ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
// nothing got printed out
I created a topic with 5 partitions.
$ ./bin/kafka-topics.sh --create --zookeeper :2181 --topic jacek-five-partitions --partitions 5 --replication-factor 1
Created topic "jacek-five-partitions".
$ ./bin/kafka-topics.sh --describe --zookeeper :2181 --topic jacek-five-partitions
Topic:jacek-five-partitions PartitionCount:5 ReplicationFactor:1 Configs:
Topic: jacek-five-partitions Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: jacek-five-partitions Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: jacek-five-partitions Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: jacek-five-partitions Partition: 3 Leader: 0 Replicas: 0 Isr: 0
Topic: jacek-five-partitions Partition: 4 Leader: 0 Replicas: 0 Isr: 0
The code I use is as follows:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
object SparkApp extends App
val spark = SparkSession.builder.master("local[*]").getOrCreate()
import spark.implicits._
val q = spark
.readStream
.format("kafka")
.option("startingoffsets", "latest")
.option("subscribe", "jacek-five-partitions")
.option("kafka.bootstrap.servers", ":9092")
.load
.select($"value" cast "string")
.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start
q.awaitTermination()
When I run the above Spark Structured Streaming application, I have just one consumer group created.
$ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
spark-kafka-source-380da653-c829-45db-859f-09aa9b37784d-338965656-driver-0
And that makes sense since all the Spark processing should use as many Kafka consumers as there are partitions, but regardless of the number of consumers, there should only be one consumer group (or the Kafka consumers would consume all records and there would be duplicates).
2) Is there any way I can get these consumer group names in spark application?
There is no public API for this so the answer is no.
You could however "hack" Spark and go below the public API up to the internal Kafka consumer that uses this line:
val uniqueGroupId = s"spark-kafka-source-$UUID.randomUUID-$metadataPath.hashCode"
Or even this line to be precise:
val kafkaOffsetReader = new KafkaOffsetReader(
strategy(caseInsensitiveParams),
kafkaParamsForDriver(specifiedKafkaParams),
parameters,
driverGroupIdPrefix = s"$uniqueGroupId-driver")
Just find the KafkaMicroBatchReader
for the Kafka data source, request it for the KafkaOffsetReader
that knows groupId
. That seems doable.
Even though my spark application was still running, after some time these group names didn't show up in consumer groups list. Is this because all the data was consumed by spark application and there was no more data in that kafka topic?
Could that be related to KIP-211: Revise Expiration Semantics of Consumer Group Offsets which says:
The offset of a topic partition within a consumer group expires when the expiration timestamp associated with that partition is reached. This expiration timestamp is usually affected by the broker config offsets.retention.minutes, unless user overrides that default and uses a custom retention.
4) will it create new consumer group id if new data arrives or the name of consumer group will remain same?
Will remain same.
Moreover, the consumer group must not be deleted when at least one consumer from the group is active.
Thanks a lot Jacek! I figured out the reason for multiple consumer groups, my application has single readstream but 3 writestreams each having separate checkpoint dir. When i commented 2 writestreams it was creating just one consumer group. I was under impression that one readstream will create single consumer and writestream has nothing to do with it. I noticed one more thing related to kakfa partition which I have posted separately here stackoverflow.com/questions/55022999/…
– codergirl
Mar 7 at 14:14
"3 writestreams each having separate checkpoint dir." creates any consumer groups? Why would it? Consumer groups are for consumers (not producers). I'd have to review that part of code closer.
– Jacek Laskowski
Mar 8 at 9:18
can you please share your email id where I can share my code?
– codergirl
Mar 8 at 9:58
Paste the code to the question.
– Jacek Laskowski
Mar 8 at 13:41
I've updated the code in question
– codergirl
Mar 11 at 6:08
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54924666%2fhow-to-know-the-name-of-kafka-consumer-group-that-streaming-query-uses-for-kafka%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
1) Why does it create 3 consumer groups? Is it because of 3 partitions?
Certainly not. It is just a coincidence. You seem to have run the application 3 times already and the topic has 3 partitions.
Let's start over to back it up.
I deleted all consumer groups to make sure that we start afresh.
$ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0
spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0
$ ./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --delete --group spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0
Deletion of requested consumer groups ('spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0') was successful.
$ ./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --delete --group spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0
Deletion of requested consumer groups ('spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0') was successful.
$ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
// nothing got printed out
I created a topic with 5 partitions.
$ ./bin/kafka-topics.sh --create --zookeeper :2181 --topic jacek-five-partitions --partitions 5 --replication-factor 1
Created topic "jacek-five-partitions".
$ ./bin/kafka-topics.sh --describe --zookeeper :2181 --topic jacek-five-partitions
Topic:jacek-five-partitions PartitionCount:5 ReplicationFactor:1 Configs:
Topic: jacek-five-partitions Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: jacek-five-partitions Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: jacek-five-partitions Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: jacek-five-partitions Partition: 3 Leader: 0 Replicas: 0 Isr: 0
Topic: jacek-five-partitions Partition: 4 Leader: 0 Replicas: 0 Isr: 0
The code I use is as follows:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
object SparkApp extends App
val spark = SparkSession.builder.master("local[*]").getOrCreate()
import spark.implicits._
val q = spark
.readStream
.format("kafka")
.option("startingoffsets", "latest")
.option("subscribe", "jacek-five-partitions")
.option("kafka.bootstrap.servers", ":9092")
.load
.select($"value" cast "string")
.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start
q.awaitTermination()
When I run the above Spark Structured Streaming application, I have just one consumer group created.
$ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
spark-kafka-source-380da653-c829-45db-859f-09aa9b37784d-338965656-driver-0
And that makes sense since all the Spark processing should use as many Kafka consumers as there are partitions, but regardless of the number of consumers, there should only be one consumer group (or the Kafka consumers would consume all records and there would be duplicates).
2) Is there any way I can get these consumer group names in spark application?
There is no public API for this so the answer is no.
You could however "hack" Spark and go below the public API up to the internal Kafka consumer that uses this line:
val uniqueGroupId = s"spark-kafka-source-$UUID.randomUUID-$metadataPath.hashCode"
Or even this line to be precise:
val kafkaOffsetReader = new KafkaOffsetReader(
strategy(caseInsensitiveParams),
kafkaParamsForDriver(specifiedKafkaParams),
parameters,
driverGroupIdPrefix = s"$uniqueGroupId-driver")
Just find the KafkaMicroBatchReader
for the Kafka data source, request it for the KafkaOffsetReader
that knows groupId
. That seems doable.
Even though my spark application was still running, after some time these group names didn't show up in consumer groups list. Is this because all the data was consumed by spark application and there was no more data in that kafka topic?
Could that be related to KIP-211: Revise Expiration Semantics of Consumer Group Offsets which says:
The offset of a topic partition within a consumer group expires when the expiration timestamp associated with that partition is reached. This expiration timestamp is usually affected by the broker config offsets.retention.minutes, unless user overrides that default and uses a custom retention.
4) will it create new consumer group id if new data arrives or the name of consumer group will remain same?
Will remain same.
Moreover, the consumer group must not be deleted when at least one consumer from the group is active.
Thanks a lot Jacek! I figured out the reason for multiple consumer groups, my application has single readstream but 3 writestreams each having separate checkpoint dir. When i commented 2 writestreams it was creating just one consumer group. I was under impression that one readstream will create single consumer and writestream has nothing to do with it. I noticed one more thing related to kakfa partition which I have posted separately here stackoverflow.com/questions/55022999/…
– codergirl
Mar 7 at 14:14
"3 writestreams each having separate checkpoint dir." creates any consumer groups? Why would it? Consumer groups are for consumers (not producers). I'd have to review that part of code closer.
– Jacek Laskowski
Mar 8 at 9:18
can you please share your email id where I can share my code?
– codergirl
Mar 8 at 9:58
Paste the code to the question.
– Jacek Laskowski
Mar 8 at 13:41
I've updated the code in question
– codergirl
Mar 11 at 6:08
add a comment |
1) Why does it create 3 consumer groups? Is it because of 3 partitions?
Certainly not. It is just a coincidence. You seem to have run the application 3 times already and the topic has 3 partitions.
Let's start over to back it up.
I deleted all consumer groups to make sure that we start afresh.
$ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0
spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0
$ ./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --delete --group spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0
Deletion of requested consumer groups ('spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0') was successful.
$ ./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --delete --group spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0
Deletion of requested consumer groups ('spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0') was successful.
$ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
// nothing got printed out
I created a topic with 5 partitions.
$ ./bin/kafka-topics.sh --create --zookeeper :2181 --topic jacek-five-partitions --partitions 5 --replication-factor 1
Created topic "jacek-five-partitions".
$ ./bin/kafka-topics.sh --describe --zookeeper :2181 --topic jacek-five-partitions
Topic:jacek-five-partitions PartitionCount:5 ReplicationFactor:1 Configs:
Topic: jacek-five-partitions Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: jacek-five-partitions Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: jacek-five-partitions Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: jacek-five-partitions Partition: 3 Leader: 0 Replicas: 0 Isr: 0
Topic: jacek-five-partitions Partition: 4 Leader: 0 Replicas: 0 Isr: 0
The code I use is as follows:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
object SparkApp extends App
val spark = SparkSession.builder.master("local[*]").getOrCreate()
import spark.implicits._
val q = spark
.readStream
.format("kafka")
.option("startingoffsets", "latest")
.option("subscribe", "jacek-five-partitions")
.option("kafka.bootstrap.servers", ":9092")
.load
.select($"value" cast "string")
.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start
q.awaitTermination()
When I run the above Spark Structured Streaming application, I have just one consumer group created.
$ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
spark-kafka-source-380da653-c829-45db-859f-09aa9b37784d-338965656-driver-0
And that makes sense since all the Spark processing should use as many Kafka consumers as there are partitions, but regardless of the number of consumers, there should only be one consumer group (or the Kafka consumers would consume all records and there would be duplicates).
2) Is there any way I can get these consumer group names in spark application?
There is no public API for this so the answer is no.
You could however "hack" Spark and go below the public API up to the internal Kafka consumer that uses this line:
val uniqueGroupId = s"spark-kafka-source-$UUID.randomUUID-$metadataPath.hashCode"
Or even this line to be precise:
val kafkaOffsetReader = new KafkaOffsetReader(
strategy(caseInsensitiveParams),
kafkaParamsForDriver(specifiedKafkaParams),
parameters,
driverGroupIdPrefix = s"$uniqueGroupId-driver")
Just find the KafkaMicroBatchReader
for the Kafka data source, request it for the KafkaOffsetReader
that knows groupId
. That seems doable.
Even though my spark application was still running, after some time these group names didn't show up in consumer groups list. Is this because all the data was consumed by spark application and there was no more data in that kafka topic?
Could that be related to KIP-211: Revise Expiration Semantics of Consumer Group Offsets which says:
The offset of a topic partition within a consumer group expires when the expiration timestamp associated with that partition is reached. This expiration timestamp is usually affected by the broker config offsets.retention.minutes, unless user overrides that default and uses a custom retention.
4) will it create new consumer group id if new data arrives or the name of consumer group will remain same?
Will remain same.
Moreover, the consumer group must not be deleted when at least one consumer from the group is active.
Thanks a lot Jacek! I figured out the reason for multiple consumer groups, my application has single readstream but 3 writestreams each having separate checkpoint dir. When i commented 2 writestreams it was creating just one consumer group. I was under impression that one readstream will create single consumer and writestream has nothing to do with it. I noticed one more thing related to kakfa partition which I have posted separately here stackoverflow.com/questions/55022999/…
– codergirl
Mar 7 at 14:14
"3 writestreams each having separate checkpoint dir." creates any consumer groups? Why would it? Consumer groups are for consumers (not producers). I'd have to review that part of code closer.
– Jacek Laskowski
Mar 8 at 9:18
can you please share your email id where I can share my code?
– codergirl
Mar 8 at 9:58
Paste the code to the question.
– Jacek Laskowski
Mar 8 at 13:41
I've updated the code in question
– codergirl
Mar 11 at 6:08
add a comment |
1) Why does it create 3 consumer groups? Is it because of 3 partitions?
Certainly not. It is just a coincidence. You seem to have run the application 3 times already and the topic has 3 partitions.
Let's start over to back it up.
I deleted all consumer groups to make sure that we start afresh.
$ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0
spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0
$ ./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --delete --group spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0
Deletion of requested consumer groups ('spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0') was successful.
$ ./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --delete --group spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0
Deletion of requested consumer groups ('spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0') was successful.
$ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
// nothing got printed out
I created a topic with 5 partitions.
$ ./bin/kafka-topics.sh --create --zookeeper :2181 --topic jacek-five-partitions --partitions 5 --replication-factor 1
Created topic "jacek-five-partitions".
$ ./bin/kafka-topics.sh --describe --zookeeper :2181 --topic jacek-five-partitions
Topic:jacek-five-partitions PartitionCount:5 ReplicationFactor:1 Configs:
Topic: jacek-five-partitions Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: jacek-five-partitions Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: jacek-five-partitions Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: jacek-five-partitions Partition: 3 Leader: 0 Replicas: 0 Isr: 0
Topic: jacek-five-partitions Partition: 4 Leader: 0 Replicas: 0 Isr: 0
The code I use is as follows:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
object SparkApp extends App
val spark = SparkSession.builder.master("local[*]").getOrCreate()
import spark.implicits._
val q = spark
.readStream
.format("kafka")
.option("startingoffsets", "latest")
.option("subscribe", "jacek-five-partitions")
.option("kafka.bootstrap.servers", ":9092")
.load
.select($"value" cast "string")
.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start
q.awaitTermination()
When I run the above Spark Structured Streaming application, I have just one consumer group created.
$ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
spark-kafka-source-380da653-c829-45db-859f-09aa9b37784d-338965656-driver-0
And that makes sense since all the Spark processing should use as many Kafka consumers as there are partitions, but regardless of the number of consumers, there should only be one consumer group (or the Kafka consumers would consume all records and there would be duplicates).
2) Is there any way I can get these consumer group names in spark application?
There is no public API for this so the answer is no.
You could however "hack" Spark and go below the public API up to the internal Kafka consumer that uses this line:
val uniqueGroupId = s"spark-kafka-source-$UUID.randomUUID-$metadataPath.hashCode"
Or even this line to be precise:
val kafkaOffsetReader = new KafkaOffsetReader(
strategy(caseInsensitiveParams),
kafkaParamsForDriver(specifiedKafkaParams),
parameters,
driverGroupIdPrefix = s"$uniqueGroupId-driver")
Just find the KafkaMicroBatchReader
for the Kafka data source, request it for the KafkaOffsetReader
that knows groupId
. That seems doable.
Even though my spark application was still running, after some time these group names didn't show up in consumer groups list. Is this because all the data was consumed by spark application and there was no more data in that kafka topic?
Could that be related to KIP-211: Revise Expiration Semantics of Consumer Group Offsets which says:
The offset of a topic partition within a consumer group expires when the expiration timestamp associated with that partition is reached. This expiration timestamp is usually affected by the broker config offsets.retention.minutes, unless user overrides that default and uses a custom retention.
4) will it create new consumer group id if new data arrives or the name of consumer group will remain same?
Will remain same.
Moreover, the consumer group must not be deleted when at least one consumer from the group is active.
1) Why does it create 3 consumer groups? Is it because of 3 partitions?
Certainly not. It is just a coincidence. You seem to have run the application 3 times already and the topic has 3 partitions.
Let's start over to back it up.
I deleted all consumer groups to make sure that we start afresh.
$ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0
spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0
$ ./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --delete --group spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0
Deletion of requested consumer groups ('spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0') was successful.
$ ./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --delete --group spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0
Deletion of requested consumer groups ('spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0') was successful.
$ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
// nothing got printed out
I created a topic with 5 partitions.
$ ./bin/kafka-topics.sh --create --zookeeper :2181 --topic jacek-five-partitions --partitions 5 --replication-factor 1
Created topic "jacek-five-partitions".
$ ./bin/kafka-topics.sh --describe --zookeeper :2181 --topic jacek-five-partitions
Topic:jacek-five-partitions PartitionCount:5 ReplicationFactor:1 Configs:
Topic: jacek-five-partitions Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: jacek-five-partitions Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: jacek-five-partitions Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: jacek-five-partitions Partition: 3 Leader: 0 Replicas: 0 Isr: 0
Topic: jacek-five-partitions Partition: 4 Leader: 0 Replicas: 0 Isr: 0
The code I use is as follows:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
object SparkApp extends App
val spark = SparkSession.builder.master("local[*]").getOrCreate()
import spark.implicits._
val q = spark
.readStream
.format("kafka")
.option("startingoffsets", "latest")
.option("subscribe", "jacek-five-partitions")
.option("kafka.bootstrap.servers", ":9092")
.load
.select($"value" cast "string")
.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start
q.awaitTermination()
When I run the above Spark Structured Streaming application, I have just one consumer group created.
$ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
spark-kafka-source-380da653-c829-45db-859f-09aa9b37784d-338965656-driver-0
And that makes sense since all the Spark processing should use as many Kafka consumers as there are partitions, but regardless of the number of consumers, there should only be one consumer group (or the Kafka consumers would consume all records and there would be duplicates).
2) Is there any way I can get these consumer group names in spark application?
There is no public API for this so the answer is no.
You could however "hack" Spark and go below the public API up to the internal Kafka consumer that uses this line:
val uniqueGroupId = s"spark-kafka-source-$UUID.randomUUID-$metadataPath.hashCode"
Or even this line to be precise:
val kafkaOffsetReader = new KafkaOffsetReader(
strategy(caseInsensitiveParams),
kafkaParamsForDriver(specifiedKafkaParams),
parameters,
driverGroupIdPrefix = s"$uniqueGroupId-driver")
Just find the KafkaMicroBatchReader
for the Kafka data source, request it for the KafkaOffsetReader
that knows groupId
. That seems doable.
Even though my spark application was still running, after some time these group names didn't show up in consumer groups list. Is this because all the data was consumed by spark application and there was no more data in that kafka topic?
Could that be related to KIP-211: Revise Expiration Semantics of Consumer Group Offsets which says:
The offset of a topic partition within a consumer group expires when the expiration timestamp associated with that partition is reached. This expiration timestamp is usually affected by the broker config offsets.retention.minutes, unless user overrides that default and uses a custom retention.
4) will it create new consumer group id if new data arrives or the name of consumer group will remain same?
Will remain same.
Moreover, the consumer group must not be deleted when at least one consumer from the group is active.
answered Mar 7 at 13:15
Jacek LaskowskiJacek Laskowski
46k18136275
46k18136275
Thanks a lot Jacek! I figured out the reason for multiple consumer groups, my application has single readstream but 3 writestreams each having separate checkpoint dir. When i commented 2 writestreams it was creating just one consumer group. I was under impression that one readstream will create single consumer and writestream has nothing to do with it. I noticed one more thing related to kakfa partition which I have posted separately here stackoverflow.com/questions/55022999/…
– codergirl
Mar 7 at 14:14
"3 writestreams each having separate checkpoint dir." creates any consumer groups? Why would it? Consumer groups are for consumers (not producers). I'd have to review that part of code closer.
– Jacek Laskowski
Mar 8 at 9:18
can you please share your email id where I can share my code?
– codergirl
Mar 8 at 9:58
Paste the code to the question.
– Jacek Laskowski
Mar 8 at 13:41
I've updated the code in question
– codergirl
Mar 11 at 6:08
add a comment |
Thanks a lot Jacek! I figured out the reason for multiple consumer groups, my application has single readstream but 3 writestreams each having separate checkpoint dir. When i commented 2 writestreams it was creating just one consumer group. I was under impression that one readstream will create single consumer and writestream has nothing to do with it. I noticed one more thing related to kakfa partition which I have posted separately here stackoverflow.com/questions/55022999/…
– codergirl
Mar 7 at 14:14
"3 writestreams each having separate checkpoint dir." creates any consumer groups? Why would it? Consumer groups are for consumers (not producers). I'd have to review that part of code closer.
– Jacek Laskowski
Mar 8 at 9:18
can you please share your email id where I can share my code?
– codergirl
Mar 8 at 9:58
Paste the code to the question.
– Jacek Laskowski
Mar 8 at 13:41
I've updated the code in question
– codergirl
Mar 11 at 6:08
Thanks a lot Jacek! I figured out the reason for multiple consumer groups, my application has single readstream but 3 writestreams each having separate checkpoint dir. When i commented 2 writestreams it was creating just one consumer group. I was under impression that one readstream will create single consumer and writestream has nothing to do with it. I noticed one more thing related to kakfa partition which I have posted separately here stackoverflow.com/questions/55022999/…
– codergirl
Mar 7 at 14:14
Thanks a lot Jacek! I figured out the reason for multiple consumer groups, my application has single readstream but 3 writestreams each having separate checkpoint dir. When i commented 2 writestreams it was creating just one consumer group. I was under impression that one readstream will create single consumer and writestream has nothing to do with it. I noticed one more thing related to kakfa partition which I have posted separately here stackoverflow.com/questions/55022999/…
– codergirl
Mar 7 at 14:14
"3 writestreams each having separate checkpoint dir." creates any consumer groups? Why would it? Consumer groups are for consumers (not producers). I'd have to review that part of code closer.
– Jacek Laskowski
Mar 8 at 9:18
"3 writestreams each having separate checkpoint dir." creates any consumer groups? Why would it? Consumer groups are for consumers (not producers). I'd have to review that part of code closer.
– Jacek Laskowski
Mar 8 at 9:18
can you please share your email id where I can share my code?
– codergirl
Mar 8 at 9:58
can you please share your email id where I can share my code?
– codergirl
Mar 8 at 9:58
Paste the code to the question.
– Jacek Laskowski
Mar 8 at 13:41
Paste the code to the question.
– Jacek Laskowski
Mar 8 at 13:41
I've updated the code in question
– codergirl
Mar 11 at 6:08
I've updated the code in question
– codergirl
Mar 11 at 6:08
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54924666%2fhow-to-know-the-name-of-kafka-consumer-group-that-streaming-query-uses-for-kafka%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown