Can a kafka streams client programatically determine its consumer lag Announcing the arrival of Valued Associate #679: Cesar Manara Planned maintenance scheduled April 23, 2019 at 23:30 UTC (7:30pm US/Eastern) Data science time! April 2019 and salary with experience The Ask Question Wizard is Live!Tumbling window concept kafka streamsKafka Stream reprocessing old messages on rebalancingKafka: Consumer can't read records from topicUser topic management using Kafka Stream Processor APItimeouts for Kafka streamsKafka Streams timeseries aggregationHow to always consume from latest offset in kafka-streamsScheduled Script to calculate total consumer lag KafkaKafka Streams consumer groups concurrency: consuming from one partition only?Set timestamp in output with Kafka Streams
How to ternary Plot3D a function
The test team as an enemy of development? And how can this be avoided?
Weaponising the Grasp-at-a-Distance spell
Central Vacuuming: Is it worth it, and how does it compare to normal vacuuming?
Co-worker has annoying ringtone
Is openssl rand command cryptographically secure?
Does the Mueller report show a conspiracy between Russia and the Trump Campaign?
In musical terms, what properties are varied by the human voice to produce different words / syllables?
Did any compiler fully use 80-bit floating point?
What are the main differences between the original Stargate SG-1 and the Final Cut edition?
Putting class ranking in CV, but against dept guidelines
Relating to the President and obstruction, were Mueller's conclusions preordained?
Why weren't discrete x86 CPUs ever used in game hardware?
How to ask rejected full-time candidates to apply to teach individual courses?
Is there public access to the Meteor Crater in Arizona?
AppleTVs create a chatty alternate WiFi network
Getting out of while loop on console
Simple Http Server
Why is the change of basis formula counter-intuitive? [See details]
If Windows 7 doesn't support WSL, then what is "Subsystem for UNIX-based Applications"?
Delete free apps from library
What is the chair depicted in Cesare Maccari's 1889 painting "Cicerone denuncia Catilina"?
How does light 'choose' between wave and particle behaviour?
How can I save and copy a screenhot at the same time?
Can a kafka streams client programatically determine its consumer lag
Announcing the arrival of Valued Associate #679: Cesar Manara
Planned maintenance scheduled April 23, 2019 at 23:30 UTC (7:30pm US/Eastern)
Data science time! April 2019 and salary with experience
The Ask Question Wizard is Live!Tumbling window concept kafka streamsKafka Stream reprocessing old messages on rebalancingKafka: Consumer can't read records from topicUser topic management using Kafka Stream Processor APItimeouts for Kafka streamsKafka Streams timeseries aggregationHow to always consume from latest offset in kafka-streamsScheduled Script to calculate total consumer lag KafkaKafka Streams consumer groups concurrency: consuming from one partition only?Set timestamp in output with Kafka Streams
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty height:90px;width:728px;box-sizing:border-box;
I am working on a service that uses kafka streams api. I am wondering if there is way to determine how far behind is my service at consuming records. I want to able to query the consumer lag.
Here is some background about what I am trying to achieve. My service uses the streams api, it listens on an input topic, does some processing that involves state and outputs a record on an output topic.
I want to take care of the scenario where my service crashes and then comes back online a few hours later. During that time, there will be a huge backlog of records that have been accumulated on the input topic.
Once it comes back online, the service will start consuming all the accumulated records from the input topic and also output a lot of records on the output topic.
I want to be able to detect the fact that my service has a huge consumer lag and stall its output if that is the case. That is, I want my service to consume all the accumulated input records until it has caught up with near real time and only then should it start outputting messages.
The best way I found so far is to hookup a ConsumerInterceptor
.
ConsumerInterceptor.onConsume()
method will be called each time records are read:
ConsumerRecords<K,V> onConsume(ConsumerRecords<K,V> records)
From the ConsumerRecords
, I can then get the timestamp of the records. If the timestamp is too far behind the current time then I would stall the output of messages.
Instead of basing myself on timestamps in the records, it would be better if I could somehow query for the consumer lag.
Maybe I cannot query for a consumer lag because it goes against the principle of what kafka is designed for. If someone has any suggestions or how I should approach my problem in general then please let me know.
As a side note, my service is not using the higher-level kafka streams DSL API, but it is using the lower level processor API.
Thank you for your time.
apache-kafka apache-kafka-streams
add a comment |
I am working on a service that uses kafka streams api. I am wondering if there is way to determine how far behind is my service at consuming records. I want to able to query the consumer lag.
Here is some background about what I am trying to achieve. My service uses the streams api, it listens on an input topic, does some processing that involves state and outputs a record on an output topic.
I want to take care of the scenario where my service crashes and then comes back online a few hours later. During that time, there will be a huge backlog of records that have been accumulated on the input topic.
Once it comes back online, the service will start consuming all the accumulated records from the input topic and also output a lot of records on the output topic.
I want to be able to detect the fact that my service has a huge consumer lag and stall its output if that is the case. That is, I want my service to consume all the accumulated input records until it has caught up with near real time and only then should it start outputting messages.
The best way I found so far is to hookup a ConsumerInterceptor
.
ConsumerInterceptor.onConsume()
method will be called each time records are read:
ConsumerRecords<K,V> onConsume(ConsumerRecords<K,V> records)
From the ConsumerRecords
, I can then get the timestamp of the records. If the timestamp is too far behind the current time then I would stall the output of messages.
Instead of basing myself on timestamps in the records, it would be better if I could somehow query for the consumer lag.
Maybe I cannot query for a consumer lag because it goes against the principle of what kafka is designed for. If someone has any suggestions or how I should approach my problem in general then please let me know.
As a side note, my service is not using the higher-level kafka streams DSL API, but it is using the lower level processor API.
Thank you for your time.
apache-kafka apache-kafka-streams
1
AdminClient#listConsumerGroupOffsets(...)
could be your friend.
– Matthias J. Sax
Mar 9 at 6:57
@MatthiasJ.Sax thank you for your reply and I will look into theAdminClient
. I noticed that whenever I create myKafkaStreams
with the following constructor:KafkaStreams(Topology topology, java.util.Properties props)
There is already anAdminClient
that is being created withinKafkaStreams
because I can see in the logs theAdminClientConfig
being printed in my logs. Is there a way for me to reuse that already availableAdminClient
or do I need to create another instance ofAdminClient
?
– RichardT
Mar 11 at 16:18
TheAdminClient
you see is internal toKafkaStreams
and you cannot access it -- thus you will need to create your own instance.
– Matthias J. Sax
Mar 11 at 20:17
add a comment |
I am working on a service that uses kafka streams api. I am wondering if there is way to determine how far behind is my service at consuming records. I want to able to query the consumer lag.
Here is some background about what I am trying to achieve. My service uses the streams api, it listens on an input topic, does some processing that involves state and outputs a record on an output topic.
I want to take care of the scenario where my service crashes and then comes back online a few hours later. During that time, there will be a huge backlog of records that have been accumulated on the input topic.
Once it comes back online, the service will start consuming all the accumulated records from the input topic and also output a lot of records on the output topic.
I want to be able to detect the fact that my service has a huge consumer lag and stall its output if that is the case. That is, I want my service to consume all the accumulated input records until it has caught up with near real time and only then should it start outputting messages.
The best way I found so far is to hookup a ConsumerInterceptor
.
ConsumerInterceptor.onConsume()
method will be called each time records are read:
ConsumerRecords<K,V> onConsume(ConsumerRecords<K,V> records)
From the ConsumerRecords
, I can then get the timestamp of the records. If the timestamp is too far behind the current time then I would stall the output of messages.
Instead of basing myself on timestamps in the records, it would be better if I could somehow query for the consumer lag.
Maybe I cannot query for a consumer lag because it goes against the principle of what kafka is designed for. If someone has any suggestions or how I should approach my problem in general then please let me know.
As a side note, my service is not using the higher-level kafka streams DSL API, but it is using the lower level processor API.
Thank you for your time.
apache-kafka apache-kafka-streams
I am working on a service that uses kafka streams api. I am wondering if there is way to determine how far behind is my service at consuming records. I want to able to query the consumer lag.
Here is some background about what I am trying to achieve. My service uses the streams api, it listens on an input topic, does some processing that involves state and outputs a record on an output topic.
I want to take care of the scenario where my service crashes and then comes back online a few hours later. During that time, there will be a huge backlog of records that have been accumulated on the input topic.
Once it comes back online, the service will start consuming all the accumulated records from the input topic and also output a lot of records on the output topic.
I want to be able to detect the fact that my service has a huge consumer lag and stall its output if that is the case. That is, I want my service to consume all the accumulated input records until it has caught up with near real time and only then should it start outputting messages.
The best way I found so far is to hookup a ConsumerInterceptor
.
ConsumerInterceptor.onConsume()
method will be called each time records are read:
ConsumerRecords<K,V> onConsume(ConsumerRecords<K,V> records)
From the ConsumerRecords
, I can then get the timestamp of the records. If the timestamp is too far behind the current time then I would stall the output of messages.
Instead of basing myself on timestamps in the records, it would be better if I could somehow query for the consumer lag.
Maybe I cannot query for a consumer lag because it goes against the principle of what kafka is designed for. If someone has any suggestions or how I should approach my problem in general then please let me know.
As a side note, my service is not using the higher-level kafka streams DSL API, but it is using the lower level processor API.
Thank you for your time.
apache-kafka apache-kafka-streams
apache-kafka apache-kafka-streams
asked Mar 8 at 23:13
RichardTRichardT
756
756
1
AdminClient#listConsumerGroupOffsets(...)
could be your friend.
– Matthias J. Sax
Mar 9 at 6:57
@MatthiasJ.Sax thank you for your reply and I will look into theAdminClient
. I noticed that whenever I create myKafkaStreams
with the following constructor:KafkaStreams(Topology topology, java.util.Properties props)
There is already anAdminClient
that is being created withinKafkaStreams
because I can see in the logs theAdminClientConfig
being printed in my logs. Is there a way for me to reuse that already availableAdminClient
or do I need to create another instance ofAdminClient
?
– RichardT
Mar 11 at 16:18
TheAdminClient
you see is internal toKafkaStreams
and you cannot access it -- thus you will need to create your own instance.
– Matthias J. Sax
Mar 11 at 20:17
add a comment |
1
AdminClient#listConsumerGroupOffsets(...)
could be your friend.
– Matthias J. Sax
Mar 9 at 6:57
@MatthiasJ.Sax thank you for your reply and I will look into theAdminClient
. I noticed that whenever I create myKafkaStreams
with the following constructor:KafkaStreams(Topology topology, java.util.Properties props)
There is already anAdminClient
that is being created withinKafkaStreams
because I can see in the logs theAdminClientConfig
being printed in my logs. Is there a way for me to reuse that already availableAdminClient
or do I need to create another instance ofAdminClient
?
– RichardT
Mar 11 at 16:18
TheAdminClient
you see is internal toKafkaStreams
and you cannot access it -- thus you will need to create your own instance.
– Matthias J. Sax
Mar 11 at 20:17
1
1
AdminClient#listConsumerGroupOffsets(...)
could be your friend.– Matthias J. Sax
Mar 9 at 6:57
AdminClient#listConsumerGroupOffsets(...)
could be your friend.– Matthias J. Sax
Mar 9 at 6:57
@MatthiasJ.Sax thank you for your reply and I will look into the
AdminClient
. I noticed that whenever I create my KafkaStreams
with the following constructor: KafkaStreams(Topology topology, java.util.Properties props)
There is already an AdminClient
that is being created within KafkaStreams
because I can see in the logs the AdminClientConfig
being printed in my logs. Is there a way for me to reuse that already available AdminClient
or do I need to create another instance of AdminClient
?– RichardT
Mar 11 at 16:18
@MatthiasJ.Sax thank you for your reply and I will look into the
AdminClient
. I noticed that whenever I create my KafkaStreams
with the following constructor: KafkaStreams(Topology topology, java.util.Properties props)
There is already an AdminClient
that is being created within KafkaStreams
because I can see in the logs the AdminClientConfig
being printed in my logs. Is there a way for me to reuse that already available AdminClient
or do I need to create another instance of AdminClient
?– RichardT
Mar 11 at 16:18
The
AdminClient
you see is internal to KafkaStreams
and you cannot access it -- thus you will need to create your own instance.– Matthias J. Sax
Mar 11 at 20:17
The
AdminClient
you see is internal to KafkaStreams
and you cannot access it -- thus you will need to create your own instance.– Matthias J. Sax
Mar 11 at 20:17
add a comment |
0
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55072287%2fcan-a-kafka-streams-client-programatically-determine-its-consumer-lag%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
0
active
oldest
votes
0
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55072287%2fcan-a-kafka-streams-client-programatically-determine-its-consumer-lag%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
1
AdminClient#listConsumerGroupOffsets(...)
could be your friend.– Matthias J. Sax
Mar 9 at 6:57
@MatthiasJ.Sax thank you for your reply and I will look into the
AdminClient
. I noticed that whenever I create myKafkaStreams
with the following constructor:KafkaStreams(Topology topology, java.util.Properties props)
There is already anAdminClient
that is being created withinKafkaStreams
because I can see in the logs theAdminClientConfig
being printed in my logs. Is there a way for me to reuse that already availableAdminClient
or do I need to create another instance ofAdminClient
?– RichardT
Mar 11 at 16:18
The
AdminClient
you see is internal toKafkaStreams
and you cannot access it -- thus you will need to create your own instance.– Matthias J. Sax
Mar 11 at 20:17