Controlled/manual error/recovery handling in stream-based applications2019 Community Moderator ElectionUnderstanding flink savepoints & checkpointsFlink Kafka Consumer throws Null Pointer Exception when using DataStream key byUsing Kafka and Flink for batch processing of a batch data sourceHow to manually commit offset in Spark Kafka direct streaming?How to look up and update the state of a record from a database in Apache Flink?kafka and flink : Getting stream to webappKafka Consumer Vs Apache FlinkCan I use the DataSet API and the DataStream API in the same Flink job?How to handle backpressure in flink streaming job?Scaling with Apache Spark/Apache FlinkInterrupted while joining ioThread / Error during disposal of stream operator in flink application

Have researchers managed to "reverse time"? If so, what does that mean for physics?

Do I need life insurance if I can cover my own funeral costs?

Why is stat::st_size 0 for devices but at the same time lseek defines the device size correctly?

Why must traveling waves have the same amplitude to form a standing wave?

Bash replace string at multiple places in a file from command line

Life insurance that covers only simultaneous/dual deaths

Why are the outputs of printf and std::cout different

Old race car problem/puzzle

Can the damage from a Talisman of Pure Good (or Ultimate Evil) be non-lethal?

Is a lawful good "antagonist" effective?

The use of "touch" and "touch on" in context

Official degrees of earth’s rotation per day

How do I hide Chekhov's Gun?

My adviser wants to be the first author

Is it normal that my co-workers at a fitness company criticize my food choices?

SQL Server Primary Login Restrictions

Does the statement `int val = (++i > ++j) ? ++i : ++j;` invoke undefined behavior?

How could a female member of a species produce eggs unto death?

What does it mean to make a bootable LiveUSB?

What options are left, if Britain cannot decide?

How to deal with a cynical class?

Where is the 1/8 CR apprentice in Volo's Guide to Monsters?

What has been your most complicated TikZ drawing?

Can elves maintain concentration in a trance?



Controlled/manual error/recovery handling in stream-based applications



2019 Community Moderator ElectionUnderstanding flink savepoints & checkpointsFlink Kafka Consumer throws Null Pointer Exception when using DataStream key byUsing Kafka and Flink for batch processing of a batch data sourceHow to manually commit offset in Spark Kafka direct streaming?How to look up and update the state of a record from a database in Apache Flink?kafka and flink : Getting stream to webappKafka Consumer Vs Apache FlinkCan I use the DataSet API and the DataStream API in the same Flink job?How to handle backpressure in flink streaming job?Scaling with Apache Spark/Apache FlinkInterrupted while joining ioThread / Error during disposal of stream operator in flink application










2















I am working on an application based on Apache Flink, which makes use of Apache Kafka for input and out. Possibly this application will be ported to Apache Spark, so I have added this as a tag as well, and the question remains the same.



I have the requirement that all incoming messages received via kafka must be processed in-order, as well safely be stored in a persistence layer (database), and no message must get lost.



The streaming-part in this application is rather trivial/small, as the main logic will boil down to something like:



environment.addSource(consumer) // 1) DataStream[Option[Elem]]
.filter(_.isDefined) // 2) discard unparsable messages
.map(_.get) // 3) unwrap Option
.map(InputEvent.fromXml(_)) // 4) convert from XML to internal representation
.keyBy(_.id) // 5) assure in-order processing on logical-key level
.map(new DBFunction) // 6) database lookup, store of update and additional enrichment
.map(InputEvent.toXml(_)) // 7) convert back to XML
.addSink(producer) // 8) attach kafka producer sink


Now, during this pipeline, several error situations could occur:



  • the database becomes unavailable (shutdown, tablespace full, ...)

  • changes cannot be stored because of logical errors (from column format)

  • the kafka producer cannot send a message because of broker inavailability

and probably other situations.



Now my question is, how can I assure consistency as per the above in those situations, when I in fact would have to do something like:



  1. Stream-Operator 6) detects a problem (DB unavailable)

  2. The DB-connection of the DBFunction object must be recovered, which might only succeed after some minutes

  3. This means that overall processing must be suspended, at best for the whole pipeline, so that incoming messages are lot loaded into memory

  4. Resume processing after database has been recovered. Processing must resume exactly with the message which encountered the problem at 1)

Now I know that there is at least 2 tools regarding failure handling:



  1. kafka consumer offsets

  2. apache flink checkpoints

However, searching the docs, I fail to see how either of those could be used in the middle of stream processing from within a single operator.



So, what would be the recommended strategies for fine-grained error handling and recovery in a streaming application?










share|improve this question


























    2















    I am working on an application based on Apache Flink, which makes use of Apache Kafka for input and out. Possibly this application will be ported to Apache Spark, so I have added this as a tag as well, and the question remains the same.



    I have the requirement that all incoming messages received via kafka must be processed in-order, as well safely be stored in a persistence layer (database), and no message must get lost.



    The streaming-part in this application is rather trivial/small, as the main logic will boil down to something like:



    environment.addSource(consumer) // 1) DataStream[Option[Elem]]
    .filter(_.isDefined) // 2) discard unparsable messages
    .map(_.get) // 3) unwrap Option
    .map(InputEvent.fromXml(_)) // 4) convert from XML to internal representation
    .keyBy(_.id) // 5) assure in-order processing on logical-key level
    .map(new DBFunction) // 6) database lookup, store of update and additional enrichment
    .map(InputEvent.toXml(_)) // 7) convert back to XML
    .addSink(producer) // 8) attach kafka producer sink


    Now, during this pipeline, several error situations could occur:



    • the database becomes unavailable (shutdown, tablespace full, ...)

    • changes cannot be stored because of logical errors (from column format)

    • the kafka producer cannot send a message because of broker inavailability

    and probably other situations.



    Now my question is, how can I assure consistency as per the above in those situations, when I in fact would have to do something like:



    1. Stream-Operator 6) detects a problem (DB unavailable)

    2. The DB-connection of the DBFunction object must be recovered, which might only succeed after some minutes

    3. This means that overall processing must be suspended, at best for the whole pipeline, so that incoming messages are lot loaded into memory

    4. Resume processing after database has been recovered. Processing must resume exactly with the message which encountered the problem at 1)

    Now I know that there is at least 2 tools regarding failure handling:



    1. kafka consumer offsets

    2. apache flink checkpoints

    However, searching the docs, I fail to see how either of those could be used in the middle of stream processing from within a single operator.



    So, what would be the recommended strategies for fine-grained error handling and recovery in a streaming application?










    share|improve this question
























      2












      2








      2








      I am working on an application based on Apache Flink, which makes use of Apache Kafka for input and out. Possibly this application will be ported to Apache Spark, so I have added this as a tag as well, and the question remains the same.



      I have the requirement that all incoming messages received via kafka must be processed in-order, as well safely be stored in a persistence layer (database), and no message must get lost.



      The streaming-part in this application is rather trivial/small, as the main logic will boil down to something like:



      environment.addSource(consumer) // 1) DataStream[Option[Elem]]
      .filter(_.isDefined) // 2) discard unparsable messages
      .map(_.get) // 3) unwrap Option
      .map(InputEvent.fromXml(_)) // 4) convert from XML to internal representation
      .keyBy(_.id) // 5) assure in-order processing on logical-key level
      .map(new DBFunction) // 6) database lookup, store of update and additional enrichment
      .map(InputEvent.toXml(_)) // 7) convert back to XML
      .addSink(producer) // 8) attach kafka producer sink


      Now, during this pipeline, several error situations could occur:



      • the database becomes unavailable (shutdown, tablespace full, ...)

      • changes cannot be stored because of logical errors (from column format)

      • the kafka producer cannot send a message because of broker inavailability

      and probably other situations.



      Now my question is, how can I assure consistency as per the above in those situations, when I in fact would have to do something like:



      1. Stream-Operator 6) detects a problem (DB unavailable)

      2. The DB-connection of the DBFunction object must be recovered, which might only succeed after some minutes

      3. This means that overall processing must be suspended, at best for the whole pipeline, so that incoming messages are lot loaded into memory

      4. Resume processing after database has been recovered. Processing must resume exactly with the message which encountered the problem at 1)

      Now I know that there is at least 2 tools regarding failure handling:



      1. kafka consumer offsets

      2. apache flink checkpoints

      However, searching the docs, I fail to see how either of those could be used in the middle of stream processing from within a single operator.



      So, what would be the recommended strategies for fine-grained error handling and recovery in a streaming application?










      share|improve this question














      I am working on an application based on Apache Flink, which makes use of Apache Kafka for input and out. Possibly this application will be ported to Apache Spark, so I have added this as a tag as well, and the question remains the same.



      I have the requirement that all incoming messages received via kafka must be processed in-order, as well safely be stored in a persistence layer (database), and no message must get lost.



      The streaming-part in this application is rather trivial/small, as the main logic will boil down to something like:



      environment.addSource(consumer) // 1) DataStream[Option[Elem]]
      .filter(_.isDefined) // 2) discard unparsable messages
      .map(_.get) // 3) unwrap Option
      .map(InputEvent.fromXml(_)) // 4) convert from XML to internal representation
      .keyBy(_.id) // 5) assure in-order processing on logical-key level
      .map(new DBFunction) // 6) database lookup, store of update and additional enrichment
      .map(InputEvent.toXml(_)) // 7) convert back to XML
      .addSink(producer) // 8) attach kafka producer sink


      Now, during this pipeline, several error situations could occur:



      • the database becomes unavailable (shutdown, tablespace full, ...)

      • changes cannot be stored because of logical errors (from column format)

      • the kafka producer cannot send a message because of broker inavailability

      and probably other situations.



      Now my question is, how can I assure consistency as per the above in those situations, when I in fact would have to do something like:



      1. Stream-Operator 6) detects a problem (DB unavailable)

      2. The DB-connection of the DBFunction object must be recovered, which might only succeed after some minutes

      3. This means that overall processing must be suspended, at best for the whole pipeline, so that incoming messages are lot loaded into memory

      4. Resume processing after database has been recovered. Processing must resume exactly with the message which encountered the problem at 1)

      Now I know that there is at least 2 tools regarding failure handling:



      1. kafka consumer offsets

      2. apache flink checkpoints

      However, searching the docs, I fail to see how either of those could be used in the middle of stream processing from within a single operator.



      So, what would be the recommended strategies for fine-grained error handling and recovery in a streaming application?







      apache-spark error-handling apache-kafka stream apache-flink






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Jan 30 at 16:16









      user826955user826955

      1,16811640




      1,16811640






















          1 Answer
          1






          active

          oldest

          votes


















          0














          A few points:



          The keyBy is not going to help ensure in-order processing. If anything, it may interleave events from different Kafka partitions (which may have been in-order within each partition), thereby creating out-of-orderness where it didn't previously exist. It's hard to comment more specifically about how you might guarantee in-order processing without understanding how many FlinkKafkaConsumer instances you intend to use, how many partitions each one will be consuming from, how the keys are distributed across the Kafka partitions, and why you think a keyBy is necessary -- but if you set things up correctly, preserving order may be achievable. reinterpretAsKeyedStream can be helpful here, but this feature is difficult to understand, and tricky to use correctly.



          You could use Flink's AsyncFunction to manage the connection to the external DB in a fault tolerant, exactly-once, manner.



          Flink doesn't support fine-grained recovery in a systematic way -- its checkpoints are global snapshots of the state of the entire distributed cluster, and are designed to be used during recovery as a monolithic, self-consistent, snapshot. If your job fails, normally the only recourse is to restart from a checkpoint, which will involve rewinding input queues (to the offsets stored in the checkpoint), replaying the events since those offsets, re-issuing the DB lookups (which the async function will do automatically), and using kafka transactions to achieve end-to-end exactly once semantics. However, in the case of embarrassingly parallel jobs, it is sometimes possible to take advantage of fine-grained recovery.






          share|improve this answer






















            Your Answer






            StackExchange.ifUsing("editor", function ()
            StackExchange.using("externalEditor", function ()
            StackExchange.using("snippets", function ()
            StackExchange.snippets.init();
            );
            );
            , "code-snippets");

            StackExchange.ready(function()
            var channelOptions =
            tags: "".split(" "),
            id: "1"
            ;
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function()
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled)
            StackExchange.using("snippets", function()
            createEditor();
            );

            else
            createEditor();

            );

            function createEditor()
            StackExchange.prepareEditor(
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader:
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            ,
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            );



            );













            draft saved

            draft discarded


















            StackExchange.ready(
            function ()
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54444938%2fcontrolled-manual-error-recovery-handling-in-stream-based-applications%23new-answer', 'question_page');

            );

            Post as a guest















            Required, but never shown

























            1 Answer
            1






            active

            oldest

            votes








            1 Answer
            1






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            0














            A few points:



            The keyBy is not going to help ensure in-order processing. If anything, it may interleave events from different Kafka partitions (which may have been in-order within each partition), thereby creating out-of-orderness where it didn't previously exist. It's hard to comment more specifically about how you might guarantee in-order processing without understanding how many FlinkKafkaConsumer instances you intend to use, how many partitions each one will be consuming from, how the keys are distributed across the Kafka partitions, and why you think a keyBy is necessary -- but if you set things up correctly, preserving order may be achievable. reinterpretAsKeyedStream can be helpful here, but this feature is difficult to understand, and tricky to use correctly.



            You could use Flink's AsyncFunction to manage the connection to the external DB in a fault tolerant, exactly-once, manner.



            Flink doesn't support fine-grained recovery in a systematic way -- its checkpoints are global snapshots of the state of the entire distributed cluster, and are designed to be used during recovery as a monolithic, self-consistent, snapshot. If your job fails, normally the only recourse is to restart from a checkpoint, which will involve rewinding input queues (to the offsets stored in the checkpoint), replaying the events since those offsets, re-issuing the DB lookups (which the async function will do automatically), and using kafka transactions to achieve end-to-end exactly once semantics. However, in the case of embarrassingly parallel jobs, it is sometimes possible to take advantage of fine-grained recovery.






            share|improve this answer



























              0














              A few points:



              The keyBy is not going to help ensure in-order processing. If anything, it may interleave events from different Kafka partitions (which may have been in-order within each partition), thereby creating out-of-orderness where it didn't previously exist. It's hard to comment more specifically about how you might guarantee in-order processing without understanding how many FlinkKafkaConsumer instances you intend to use, how many partitions each one will be consuming from, how the keys are distributed across the Kafka partitions, and why you think a keyBy is necessary -- but if you set things up correctly, preserving order may be achievable. reinterpretAsKeyedStream can be helpful here, but this feature is difficult to understand, and tricky to use correctly.



              You could use Flink's AsyncFunction to manage the connection to the external DB in a fault tolerant, exactly-once, manner.



              Flink doesn't support fine-grained recovery in a systematic way -- its checkpoints are global snapshots of the state of the entire distributed cluster, and are designed to be used during recovery as a monolithic, self-consistent, snapshot. If your job fails, normally the only recourse is to restart from a checkpoint, which will involve rewinding input queues (to the offsets stored in the checkpoint), replaying the events since those offsets, re-issuing the DB lookups (which the async function will do automatically), and using kafka transactions to achieve end-to-end exactly once semantics. However, in the case of embarrassingly parallel jobs, it is sometimes possible to take advantage of fine-grained recovery.






              share|improve this answer

























                0












                0








                0







                A few points:



                The keyBy is not going to help ensure in-order processing. If anything, it may interleave events from different Kafka partitions (which may have been in-order within each partition), thereby creating out-of-orderness where it didn't previously exist. It's hard to comment more specifically about how you might guarantee in-order processing without understanding how many FlinkKafkaConsumer instances you intend to use, how many partitions each one will be consuming from, how the keys are distributed across the Kafka partitions, and why you think a keyBy is necessary -- but if you set things up correctly, preserving order may be achievable. reinterpretAsKeyedStream can be helpful here, but this feature is difficult to understand, and tricky to use correctly.



                You could use Flink's AsyncFunction to manage the connection to the external DB in a fault tolerant, exactly-once, manner.



                Flink doesn't support fine-grained recovery in a systematic way -- its checkpoints are global snapshots of the state of the entire distributed cluster, and are designed to be used during recovery as a monolithic, self-consistent, snapshot. If your job fails, normally the only recourse is to restart from a checkpoint, which will involve rewinding input queues (to the offsets stored in the checkpoint), replaying the events since those offsets, re-issuing the DB lookups (which the async function will do automatically), and using kafka transactions to achieve end-to-end exactly once semantics. However, in the case of embarrassingly parallel jobs, it is sometimes possible to take advantage of fine-grained recovery.






                share|improve this answer













                A few points:



                The keyBy is not going to help ensure in-order processing. If anything, it may interleave events from different Kafka partitions (which may have been in-order within each partition), thereby creating out-of-orderness where it didn't previously exist. It's hard to comment more specifically about how you might guarantee in-order processing without understanding how many FlinkKafkaConsumer instances you intend to use, how many partitions each one will be consuming from, how the keys are distributed across the Kafka partitions, and why you think a keyBy is necessary -- but if you set things up correctly, preserving order may be achievable. reinterpretAsKeyedStream can be helpful here, but this feature is difficult to understand, and tricky to use correctly.



                You could use Flink's AsyncFunction to manage the connection to the external DB in a fault tolerant, exactly-once, manner.



                Flink doesn't support fine-grained recovery in a systematic way -- its checkpoints are global snapshots of the state of the entire distributed cluster, and are designed to be used during recovery as a monolithic, self-consistent, snapshot. If your job fails, normally the only recourse is to restart from a checkpoint, which will involve rewinding input queues (to the offsets stored in the checkpoint), replaying the events since those offsets, re-issuing the DB lookups (which the async function will do automatically), and using kafka transactions to achieve end-to-end exactly once semantics. However, in the case of embarrassingly parallel jobs, it is sometimes possible to take advantage of fine-grained recovery.







                share|improve this answer












                share|improve this answer



                share|improve this answer










                answered Mar 6 at 18:40









                David AndersonDavid Anderson

                6,39921323




                6,39921323





























                    draft saved

                    draft discarded
















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid


                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.

                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function ()
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54444938%2fcontrolled-manual-error-recovery-handling-in-stream-based-applications%23new-answer', 'question_page');

                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    Save data to MySQL database using ExtJS and PHP [closed]2019 Community Moderator ElectionHow can I prevent SQL injection in PHP?Which MySQL data type to use for storing boolean valuesPHP: Delete an element from an arrayHow do I connect to a MySQL Database in Python?Should I use the datetime or timestamp data type in MySQL?How to get a list of MySQL user accountsHow Do You Parse and Process HTML/XML in PHP?Reference — What does this symbol mean in PHP?How does PHP 'foreach' actually work?Why shouldn't I use mysql_* functions in PHP?

                    Compiling GNU Global with universal-ctags support Announcing the arrival of Valued Associate #679: Cesar Manara Planned maintenance scheduled April 23, 2019 at 23:30 UTC (7:30pm US/Eastern) Data science time! April 2019 and salary with experience The Ask Question Wizard is Live!Tags for Emacs: Relationship between etags, ebrowse, cscope, GNU Global and exuberant ctagsVim and Ctags tips and trickscscope or ctags why choose one over the other?scons and ctagsctags cannot open option file “.ctags”Adding tag scopes in universal-ctagsShould I use Universal-ctags?Universal ctags on WindowsHow do I install GNU Global with universal ctags support using Homebrew?Universal ctags with emacsHow to highlight ctags generated by Universal Ctags in Vim?

                    Add ONERROR event to image from jsp tldHow to add an image to a JPanel?Saving image from PHP URLHTML img scalingCheck if an image is loaded (no errors) with jQueryHow to force an <img> to take up width, even if the image is not loadedHow do I populate hidden form field with a value set in Spring ControllerStyling Raw elements Generated from JSP tagds with Jquery MobileLimit resizing of images with explicitly set width and height attributeserror TLD use in a jsp fileJsp tld files cannot be resolved