Can a kafka streams client programatically determine its consumer lag Announcing the arrival of Valued Associate #679: Cesar Manara Planned maintenance scheduled April 23, 2019 at 23:30 UTC (7:30pm US/Eastern) Data science time! April 2019 and salary with experience The Ask Question Wizard is Live!Tumbling window concept kafka streamsKafka Stream reprocessing old messages on rebalancingKafka: Consumer can't read records from topicUser topic management using Kafka Stream Processor APItimeouts for Kafka streamsKafka Streams timeseries aggregationHow to always consume from latest offset in kafka-streamsScheduled Script to calculate total consumer lag KafkaKafka Streams consumer groups concurrency: consuming from one partition only?Set timestamp in output with Kafka Streams

How to ternary Plot3D a function

The test team as an enemy of development? And how can this be avoided?

Weaponising the Grasp-at-a-Distance spell

Central Vacuuming: Is it worth it, and how does it compare to normal vacuuming?

Co-worker has annoying ringtone

Is openssl rand command cryptographically secure?

Does the Mueller report show a conspiracy between Russia and the Trump Campaign?

In musical terms, what properties are varied by the human voice to produce different words / syllables?

Did any compiler fully use 80-bit floating point?

What are the main differences between the original Stargate SG-1 and the Final Cut edition?

Putting class ranking in CV, but against dept guidelines

Relating to the President and obstruction, were Mueller's conclusions preordained?

Why weren't discrete x86 CPUs ever used in game hardware?

How to ask rejected full-time candidates to apply to teach individual courses?

Is there public access to the Meteor Crater in Arizona?

AppleTVs create a chatty alternate WiFi network

Getting out of while loop on console

Simple Http Server

Why is the change of basis formula counter-intuitive? [See details]

If Windows 7 doesn't support WSL, then what is "Subsystem for UNIX-based Applications"?

Delete free apps from library

What is the chair depicted in Cesare Maccari's 1889 painting "Cicerone denuncia Catilina"?

How does light 'choose' between wave and particle behaviour?

How can I save and copy a screenhot at the same time?



Can a kafka streams client programatically determine its consumer lag



Announcing the arrival of Valued Associate #679: Cesar Manara
Planned maintenance scheduled April 23, 2019 at 23:30 UTC (7:30pm US/Eastern)
Data science time! April 2019 and salary with experience
The Ask Question Wizard is Live!Tumbling window concept kafka streamsKafka Stream reprocessing old messages on rebalancingKafka: Consumer can't read records from topicUser topic management using Kafka Stream Processor APItimeouts for Kafka streamsKafka Streams timeseries aggregationHow to always consume from latest offset in kafka-streamsScheduled Script to calculate total consumer lag KafkaKafka Streams consumer groups concurrency: consuming from one partition only?Set timestamp in output with Kafka Streams



.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty height:90px;width:728px;box-sizing:border-box;








1















I am working on a service that uses kafka streams api. I am wondering if there is way to determine how far behind is my service at consuming records. I want to able to query the consumer lag.



Here is some background about what I am trying to achieve. My service uses the streams api, it listens on an input topic, does some processing that involves state and outputs a record on an output topic.



I want to take care of the scenario where my service crashes and then comes back online a few hours later. During that time, there will be a huge backlog of records that have been accumulated on the input topic.



Once it comes back online, the service will start consuming all the accumulated records from the input topic and also output a lot of records on the output topic.



I want to be able to detect the fact that my service has a huge consumer lag and stall its output if that is the case. That is, I want my service to consume all the accumulated input records until it has caught up with near real time and only then should it start outputting messages.



The best way I found so far is to hookup a ConsumerInterceptor.



ConsumerInterceptor.onConsume() method will be called each time records are read:



 ConsumerRecords<K,V> onConsume(ConsumerRecords<K,V> records)


From the ConsumerRecords, I can then get the timestamp of the records. If the timestamp is too far behind the current time then I would stall the output of messages.



Instead of basing myself on timestamps in the records, it would be better if I could somehow query for the consumer lag.



Maybe I cannot query for a consumer lag because it goes against the principle of what kafka is designed for. If someone has any suggestions or how I should approach my problem in general then please let me know.



As a side note, my service is not using the higher-level kafka streams DSL API, but it is using the lower level processor API.



Thank you for your time.










share|improve this question

















  • 1





    AdminClient#listConsumerGroupOffsets(...) could be your friend.

    – Matthias J. Sax
    Mar 9 at 6:57











  • @MatthiasJ.Sax thank you for your reply and I will look into the AdminClient. I noticed that whenever I create my KafkaStreams with the following constructor: KafkaStreams(Topology topology, java.util.Properties props) There is already an AdminClient that is being created within KafkaStreams because I can see in the logs the AdminClientConfig being printed in my logs. Is there a way for me to reuse that already available AdminClient or do I need to create another instance of AdminClient ?

    – RichardT
    Mar 11 at 16:18












  • The AdminClient you see is internal to KafkaStreams and you cannot access it -- thus you will need to create your own instance.

    – Matthias J. Sax
    Mar 11 at 20:17

















1















I am working on a service that uses kafka streams api. I am wondering if there is way to determine how far behind is my service at consuming records. I want to able to query the consumer lag.



Here is some background about what I am trying to achieve. My service uses the streams api, it listens on an input topic, does some processing that involves state and outputs a record on an output topic.



I want to take care of the scenario where my service crashes and then comes back online a few hours later. During that time, there will be a huge backlog of records that have been accumulated on the input topic.



Once it comes back online, the service will start consuming all the accumulated records from the input topic and also output a lot of records on the output topic.



I want to be able to detect the fact that my service has a huge consumer lag and stall its output if that is the case. That is, I want my service to consume all the accumulated input records until it has caught up with near real time and only then should it start outputting messages.



The best way I found so far is to hookup a ConsumerInterceptor.



ConsumerInterceptor.onConsume() method will be called each time records are read:



 ConsumerRecords<K,V> onConsume(ConsumerRecords<K,V> records)


From the ConsumerRecords, I can then get the timestamp of the records. If the timestamp is too far behind the current time then I would stall the output of messages.



Instead of basing myself on timestamps in the records, it would be better if I could somehow query for the consumer lag.



Maybe I cannot query for a consumer lag because it goes against the principle of what kafka is designed for. If someone has any suggestions or how I should approach my problem in general then please let me know.



As a side note, my service is not using the higher-level kafka streams DSL API, but it is using the lower level processor API.



Thank you for your time.










share|improve this question

















  • 1





    AdminClient#listConsumerGroupOffsets(...) could be your friend.

    – Matthias J. Sax
    Mar 9 at 6:57











  • @MatthiasJ.Sax thank you for your reply and I will look into the AdminClient. I noticed that whenever I create my KafkaStreams with the following constructor: KafkaStreams(Topology topology, java.util.Properties props) There is already an AdminClient that is being created within KafkaStreams because I can see in the logs the AdminClientConfig being printed in my logs. Is there a way for me to reuse that already available AdminClient or do I need to create another instance of AdminClient ?

    – RichardT
    Mar 11 at 16:18












  • The AdminClient you see is internal to KafkaStreams and you cannot access it -- thus you will need to create your own instance.

    – Matthias J. Sax
    Mar 11 at 20:17













1












1








1








I am working on a service that uses kafka streams api. I am wondering if there is way to determine how far behind is my service at consuming records. I want to able to query the consumer lag.



Here is some background about what I am trying to achieve. My service uses the streams api, it listens on an input topic, does some processing that involves state and outputs a record on an output topic.



I want to take care of the scenario where my service crashes and then comes back online a few hours later. During that time, there will be a huge backlog of records that have been accumulated on the input topic.



Once it comes back online, the service will start consuming all the accumulated records from the input topic and also output a lot of records on the output topic.



I want to be able to detect the fact that my service has a huge consumer lag and stall its output if that is the case. That is, I want my service to consume all the accumulated input records until it has caught up with near real time and only then should it start outputting messages.



The best way I found so far is to hookup a ConsumerInterceptor.



ConsumerInterceptor.onConsume() method will be called each time records are read:



 ConsumerRecords<K,V> onConsume(ConsumerRecords<K,V> records)


From the ConsumerRecords, I can then get the timestamp of the records. If the timestamp is too far behind the current time then I would stall the output of messages.



Instead of basing myself on timestamps in the records, it would be better if I could somehow query for the consumer lag.



Maybe I cannot query for a consumer lag because it goes against the principle of what kafka is designed for. If someone has any suggestions or how I should approach my problem in general then please let me know.



As a side note, my service is not using the higher-level kafka streams DSL API, but it is using the lower level processor API.



Thank you for your time.










share|improve this question














I am working on a service that uses kafka streams api. I am wondering if there is way to determine how far behind is my service at consuming records. I want to able to query the consumer lag.



Here is some background about what I am trying to achieve. My service uses the streams api, it listens on an input topic, does some processing that involves state and outputs a record on an output topic.



I want to take care of the scenario where my service crashes and then comes back online a few hours later. During that time, there will be a huge backlog of records that have been accumulated on the input topic.



Once it comes back online, the service will start consuming all the accumulated records from the input topic and also output a lot of records on the output topic.



I want to be able to detect the fact that my service has a huge consumer lag and stall its output if that is the case. That is, I want my service to consume all the accumulated input records until it has caught up with near real time and only then should it start outputting messages.



The best way I found so far is to hookup a ConsumerInterceptor.



ConsumerInterceptor.onConsume() method will be called each time records are read:



 ConsumerRecords<K,V> onConsume(ConsumerRecords<K,V> records)


From the ConsumerRecords, I can then get the timestamp of the records. If the timestamp is too far behind the current time then I would stall the output of messages.



Instead of basing myself on timestamps in the records, it would be better if I could somehow query for the consumer lag.



Maybe I cannot query for a consumer lag because it goes against the principle of what kafka is designed for. If someone has any suggestions or how I should approach my problem in general then please let me know.



As a side note, my service is not using the higher-level kafka streams DSL API, but it is using the lower level processor API.



Thank you for your time.







apache-kafka apache-kafka-streams






share|improve this question













share|improve this question











share|improve this question




share|improve this question










asked Mar 8 at 23:13









RichardTRichardT

756




756







  • 1





    AdminClient#listConsumerGroupOffsets(...) could be your friend.

    – Matthias J. Sax
    Mar 9 at 6:57











  • @MatthiasJ.Sax thank you for your reply and I will look into the AdminClient. I noticed that whenever I create my KafkaStreams with the following constructor: KafkaStreams(Topology topology, java.util.Properties props) There is already an AdminClient that is being created within KafkaStreams because I can see in the logs the AdminClientConfig being printed in my logs. Is there a way for me to reuse that already available AdminClient or do I need to create another instance of AdminClient ?

    – RichardT
    Mar 11 at 16:18












  • The AdminClient you see is internal to KafkaStreams and you cannot access it -- thus you will need to create your own instance.

    – Matthias J. Sax
    Mar 11 at 20:17












  • 1





    AdminClient#listConsumerGroupOffsets(...) could be your friend.

    – Matthias J. Sax
    Mar 9 at 6:57











  • @MatthiasJ.Sax thank you for your reply and I will look into the AdminClient. I noticed that whenever I create my KafkaStreams with the following constructor: KafkaStreams(Topology topology, java.util.Properties props) There is already an AdminClient that is being created within KafkaStreams because I can see in the logs the AdminClientConfig being printed in my logs. Is there a way for me to reuse that already available AdminClient or do I need to create another instance of AdminClient ?

    – RichardT
    Mar 11 at 16:18












  • The AdminClient you see is internal to KafkaStreams and you cannot access it -- thus you will need to create your own instance.

    – Matthias J. Sax
    Mar 11 at 20:17







1




1





AdminClient#listConsumerGroupOffsets(...) could be your friend.

– Matthias J. Sax
Mar 9 at 6:57





AdminClient#listConsumerGroupOffsets(...) could be your friend.

– Matthias J. Sax
Mar 9 at 6:57













@MatthiasJ.Sax thank you for your reply and I will look into the AdminClient. I noticed that whenever I create my KafkaStreams with the following constructor: KafkaStreams(Topology topology, java.util.Properties props) There is already an AdminClient that is being created within KafkaStreams because I can see in the logs the AdminClientConfig being printed in my logs. Is there a way for me to reuse that already available AdminClient or do I need to create another instance of AdminClient ?

– RichardT
Mar 11 at 16:18






@MatthiasJ.Sax thank you for your reply and I will look into the AdminClient. I noticed that whenever I create my KafkaStreams with the following constructor: KafkaStreams(Topology topology, java.util.Properties props) There is already an AdminClient that is being created within KafkaStreams because I can see in the logs the AdminClientConfig being printed in my logs. Is there a way for me to reuse that already available AdminClient or do I need to create another instance of AdminClient ?

– RichardT
Mar 11 at 16:18














The AdminClient you see is internal to KafkaStreams and you cannot access it -- thus you will need to create your own instance.

– Matthias J. Sax
Mar 11 at 20:17





The AdminClient you see is internal to KafkaStreams and you cannot access it -- thus you will need to create your own instance.

– Matthias J. Sax
Mar 11 at 20:17












0






active

oldest

votes












Your Answer






StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");

StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);

else
createEditor();

);

function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);



);













draft saved

draft discarded


















StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55072287%2fcan-a-kafka-streams-client-programatically-determine-its-consumer-lag%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown

























0






active

oldest

votes








0






active

oldest

votes









active

oldest

votes






active

oldest

votes















draft saved

draft discarded
















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid


  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55072287%2fcan-a-kafka-streams-client-programatically-determine-its-consumer-lag%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

Save data to MySQL database using ExtJS and PHP [closed]2019 Community Moderator ElectionHow can I prevent SQL injection in PHP?Which MySQL data type to use for storing boolean valuesPHP: Delete an element from an arrayHow do I connect to a MySQL Database in Python?Should I use the datetime or timestamp data type in MySQL?How to get a list of MySQL user accountsHow Do You Parse and Process HTML/XML in PHP?Reference — What does this symbol mean in PHP?How does PHP 'foreach' actually work?Why shouldn't I use mysql_* functions in PHP?

Compiling GNU Global with universal-ctags support Announcing the arrival of Valued Associate #679: Cesar Manara Planned maintenance scheduled April 23, 2019 at 23:30 UTC (7:30pm US/Eastern) Data science time! April 2019 and salary with experience The Ask Question Wizard is Live!Tags for Emacs: Relationship between etags, ebrowse, cscope, GNU Global and exuberant ctagsVim and Ctags tips and trickscscope or ctags why choose one over the other?scons and ctagsctags cannot open option file “.ctags”Adding tag scopes in universal-ctagsShould I use Universal-ctags?Universal ctags on WindowsHow do I install GNU Global with universal ctags support using Homebrew?Universal ctags with emacsHow to highlight ctags generated by Universal Ctags in Vim?

Add ONERROR event to image from jsp tldHow to add an image to a JPanel?Saving image from PHP URLHTML img scalingCheck if an image is loaded (no errors) with jQueryHow to force an <img> to take up width, even if the image is not loadedHow do I populate hidden form field with a value set in Spring ControllerStyling Raw elements Generated from JSP tagds with Jquery MobileLimit resizing of images with explicitly set width and height attributeserror TLD use in a jsp fileJsp tld files cannot be resolved