Flink1.5.4 exception: Corrupt stream, found tag: 105How to count unique words in a stream?Ordering of Records in StreamFlink Streaming: Data stream that gets controlled by control streamFlink CsvTableSource StreamingCombining low-latency streams with multiple meta-data streams in Flink (enrichment)Reuse of a Stream is a copy of stream or notUnit Testing Flink Streams with Multiple Event StreamsFlink : Memory ran out exceptionApache flink 1.52 Rowtime timestamp is nullHow to fix “ 'java.lang.NoClassDefFoundError: scala/Product$class' error while flatMapping DataStream to case class in Apache Flink ”

Is `x >> pure y` equivalent to `liftM (const y) x`

Go Pregnant or Go Home

Avoiding estate tax by giving multiple gifts

Where does the Z80 processor start executing from?

How did Arya survive the stabbing?

Tiptoe or tiphoof? Adjusting words to better fit fantasy races

Trouble understanding the speech of overseas colleagues

Is expanding the research of a group into machine learning as a PhD student risky?

Sequence of Tenses: Translating the subjunctive

Escape a backup date in a file name

Method to test if a number is a perfect power?

How to Reset Passwords on Multiple Websites Easily?

How can I kill an app using Terminal?

What is the best translation for "slot" in the context of multiplayer video games?

Lay out the Carpet

How to check is there any negative term in a large list?

Is there a problem with hiding "forgot password" until it's needed?

How easy is it to start Magic from scratch?

You cannot touch me, but I can touch you, who am I?

How do we know the LHC results are robust?

How did Doctor Strange see the winning outcome in Avengers: Infinity War?

India just shot down a satellite from the ground. At what altitude range is the resulting debris field?

How many times can American Tourist re-enter UK in same 6 month period?

How to pronounce the slash sign



Flink1.5.4 exception: Corrupt stream, found tag: 105


How to count unique words in a stream?Ordering of Records in StreamFlink Streaming: Data stream that gets controlled by control streamFlink CsvTableSource StreamingCombining low-latency streams with multiple meta-data streams in Flink (enrichment)Reuse of a Stream is a copy of stream or notUnit Testing Flink Streams with Multiple Event StreamsFlink : Memory ran out exceptionApache flink 1.52 Rowtime timestamp is nullHow to fix “ 'java.lang.NoClassDefFoundError: scala/Product$class' error while flatMapping DataStream to case class in Apache Flink ”













0















My program wants to join two streams without Flink Window.



I connect two streams and define a class A extends RichCoFlatMapFunction to handle them.
In class A, I use a Guava cache to hold all the data from flatmap1/2 method, and join them by a tag from streams.
Then Guava cache has a remove listener to collect joined&expired data to next Flink Function.



private synchronized void collect(ReqFeatures features) 
feaCollector.collect(features);



Each time at the beginning, it runs well, but a few hours later, it's always dead because of this exception.



java.io.IOException: Corrupt stream, found tag: 105
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:220)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
at java.lang.Thread.run(Thread.java:748)


And sometimes there's another error log:



java.lang.IllegalStateException: When there are multiple buffers, an unfinished bufferConsumer can not be at the head of the buffers queue.
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.pollBuffer(PipelinedSubpartition.java:158)
at org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView.getNextBuffer(PipelinedSubpartitionView.java:51)
at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.getNextBuffer(LocalInputChannel.java:186)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:551)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)
at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:94)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
at java.lang.Thread.run(Thread.java:748)


If I use Flink Window Function instead, this exception doesn't occur.
Why does this exception occur, and how can I resolve it?










share|improve this question




























    0















    My program wants to join two streams without Flink Window.



    I connect two streams and define a class A extends RichCoFlatMapFunction to handle them.
    In class A, I use a Guava cache to hold all the data from flatmap1/2 method, and join them by a tag from streams.
    Then Guava cache has a remove listener to collect joined&expired data to next Flink Function.



    private synchronized void collect(ReqFeatures features) 
    feaCollector.collect(features);



    Each time at the beginning, it runs well, but a few hours later, it's always dead because of this exception.



    java.io.IOException: Corrupt stream, found tag: 105
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:220)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
    at java.lang.Thread.run(Thread.java:748)


    And sometimes there's another error log:



    java.lang.IllegalStateException: When there are multiple buffers, an unfinished bufferConsumer can not be at the head of the buffers queue.
    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
    at org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.pollBuffer(PipelinedSubpartition.java:158)
    at org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView.getNextBuffer(PipelinedSubpartitionView.java:51)
    at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.getNextBuffer(LocalInputChannel.java:186)
    at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:551)
    at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)
    at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:94)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
    at java.lang.Thread.run(Thread.java:748)


    If I use Flink Window Function instead, this exception doesn't occur.
    Why does this exception occur, and how can I resolve it?










    share|improve this question


























      0












      0








      0


      1






      My program wants to join two streams without Flink Window.



      I connect two streams and define a class A extends RichCoFlatMapFunction to handle them.
      In class A, I use a Guava cache to hold all the data from flatmap1/2 method, and join them by a tag from streams.
      Then Guava cache has a remove listener to collect joined&expired data to next Flink Function.



      private synchronized void collect(ReqFeatures features) 
      feaCollector.collect(features);



      Each time at the beginning, it runs well, but a few hours later, it's always dead because of this exception.



      java.io.IOException: Corrupt stream, found tag: 105
      at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:220)
      at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
      at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
      at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
      at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
      at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
      at java.lang.Thread.run(Thread.java:748)


      And sometimes there's another error log:



      java.lang.IllegalStateException: When there are multiple buffers, an unfinished bufferConsumer can not be at the head of the buffers queue.
      at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
      at org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.pollBuffer(PipelinedSubpartition.java:158)
      at org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView.getNextBuffer(PipelinedSubpartitionView.java:51)
      at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.getNextBuffer(LocalInputChannel.java:186)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:551)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)
      at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:94)
      at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209)
      at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
      at java.lang.Thread.run(Thread.java:748)


      If I use Flink Window Function instead, this exception doesn't occur.
      Why does this exception occur, and how can I resolve it?










      share|improve this question
















      My program wants to join two streams without Flink Window.



      I connect two streams and define a class A extends RichCoFlatMapFunction to handle them.
      In class A, I use a Guava cache to hold all the data from flatmap1/2 method, and join them by a tag from streams.
      Then Guava cache has a remove listener to collect joined&expired data to next Flink Function.



      private synchronized void collect(ReqFeatures features) 
      feaCollector.collect(features);



      Each time at the beginning, it runs well, but a few hours later, it's always dead because of this exception.



      java.io.IOException: Corrupt stream, found tag: 105
      at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:220)
      at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
      at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
      at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
      at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
      at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
      at java.lang.Thread.run(Thread.java:748)


      And sometimes there's another error log:



      java.lang.IllegalStateException: When there are multiple buffers, an unfinished bufferConsumer can not be at the head of the buffers queue.
      at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
      at org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.pollBuffer(PipelinedSubpartition.java:158)
      at org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView.getNextBuffer(PipelinedSubpartitionView.java:51)
      at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.getNextBuffer(LocalInputChannel.java:186)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:551)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)
      at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:94)
      at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209)
      at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
      at java.lang.Thread.run(Thread.java:748)


      If I use Flink Window Function instead, this exception doesn't occur.
      Why does this exception occur, and how can I resolve it?







      apache-flink flink-streaming






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Mar 7 at 21:22









      karel

      2,26862732




      2,26862732










      asked Mar 7 at 12:50









      JoeJoe

      12




      12






















          0






          active

          oldest

          votes











          Your Answer






          StackExchange.ifUsing("editor", function ()
          StackExchange.using("externalEditor", function ()
          StackExchange.using("snippets", function ()
          StackExchange.snippets.init();
          );
          );
          , "code-snippets");

          StackExchange.ready(function()
          var channelOptions =
          tags: "".split(" "),
          id: "1"
          ;
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function()
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled)
          StackExchange.using("snippets", function()
          createEditor();
          );

          else
          createEditor();

          );

          function createEditor()
          StackExchange.prepareEditor(
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader:
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          ,
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          );



          );













          draft saved

          draft discarded


















          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55044215%2fflink1-5-4-exception-corrupt-stream-found-tag-105%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown

























          0






          active

          oldest

          votes








          0






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes















          draft saved

          draft discarded
















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid


          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.

          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55044215%2fflink1-5-4-exception-corrupt-stream-found-tag-105%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          AWS Lex not identifying response if by a variable The 2019 Stack Overflow Developer Survey Results Are In Announcing the arrival of Valued Associate #679: Cesar Manara Planned maintenance scheduled April 17/18, 2019 at 00:00UTC (8:00pm US/Eastern) The Ask Question Wizard is Live! Data science time! April 2019 and salary with experienceEnforcing custom enumeration in AWS LEX for slot valuesHow to give response based on user response in Amazon Lex?Intercepting AWS Lambda Response to a AWS Lex QueryLex chat bot error: Reached second execution of fulfillment lambda on the same utteranceamazon lex showing invalid responseLambda response send back to Lex slot?Response card in Amazon lexAmazon Lex - Lambda response return HTML to botHow can I solve 424 (Failed Dependency) (python) obtained from Amazon lex?

          Алба-Юлія

          Захаров Федір Захарович