How are Dataflow bundles created after GroupBy/Combine?2019 Community Moderator ElectionHow to Control Location of Parallel / GroupBy Stage in Google Cloud DataflowHow to iterate all files in google cloud storage to be used as dataflow input?Dataflow pipeline from Cloud Pubsub to DatastoreWhy is GroupByKey in beam pipeline duplicating elements (when run on Google Dataflow)?How do you apply transformations to all elements in a window of an unbounded Apache Beam pipeline before outputting the window?CombineFn Dataflow - Step not in order, creating null pointerHow to create GoogleCredential object referencing the service account json file in Dataflow?Specific Version: PubSub/Dataflow acknowledgement of unbounded dataDatastoreException: A non-transactional commit may not contain multiple mutations affecting the same entityHow to use GroupBy in Google Dataflow pipeline to write to GCS?

In the quantum hamiltonian, why does kinetic energy turn into an operator while potential doesn't?

Why was Goose renamed from Chewie for the Captain Marvel film?

When a wind turbine does not produce enough electricity how does the power company compensate for the loss?

Is it possible to avoid unpacking when merging Association?

How does NOW work?

Declaring and defining template, and specialising them

Single word request: Harming the benefactor

An alternative proof of an application of Hahn-Banach

Why is computing ridge regression with a Cholesky decomposition much quicker than using SVD?

How strictly should I take "Candidates must be local"?

Why does Captain Marvel assume the people on this planet know this?

Does a warlock using the Darkness/Devil's Sight combo still have advantage on ranged attacks against a target outside the Darkness?

Is "conspicuously missing" or "conspicuously" the subject of this sentence?

Signed and unsigned numbers

Do items de-spawn in Diablo?

'The literal of type int is out of range' con número enteros pequeños (2 dígitos)

What are actual Tesla M60 models used by AWS?

Plausibility of Mushroom Buildings

How are showroom/display vehicles prepared?

Reversed Sudoku

How to secure an aircraft at a transient parking space?

Can Mathematica be used to create an Artistic 3D extrusion from a 2D image and wrap a line pattern around it?

Examples of a statistic that is not independent of sample's distribution?

What Happens when Passenger Refuses to Fly Boeing 737 Max?



How are Dataflow bundles created after GroupBy/Combine?



2019 Community Moderator ElectionHow to Control Location of Parallel / GroupBy Stage in Google Cloud DataflowHow to iterate all files in google cloud storage to be used as dataflow input?Dataflow pipeline from Cloud Pubsub to DatastoreWhy is GroupByKey in beam pipeline duplicating elements (when run on Google Dataflow)?How do you apply transformations to all elements in a window of an unbounded Apache Beam pipeline before outputting the window?CombineFn Dataflow - Step not in order, creating null pointerHow to create GoogleCredential object referencing the service account json file in Dataflow?Specific Version: PubSub/Dataflow acknowledgement of unbounded dataDatastoreException: A non-transactional commit may not contain multiple mutations affecting the same entityHow to use GroupBy in Google Dataflow pipeline to write to GCS?










1















Setup:



read from pubsub -> window of 30s -> group by user -> combine -> write to cloud datastore



Problem:



I'm seeing DataStoreIO writer errors as objects with similar keys are present in the same transaction.



Question:



  1. I want to understand how my pipeline combines results into bundles after a group by/combine operation. I would expect the bundle to be created for every window after the combine. But apparently, a bundle can contain more than 2 occurrences of the same user?


  2. Can re-execution (retries) of bundles cause this behavior?


  3. Is this bundling dependent of the runner?


  4. Is deduplication an option? if so, how would I best approach that?


Note that I'm not looking for a replacement for the datastore writer at the end of the pipeline, I already know that we can use a different strategy. I'm merely trying to understand how the bundling happens.










share|improve this question

















  • 1





    ah this is a very good question. TBH I don't know, but I'll do my best to get someone who does here.

    – Pablo
    Mar 7 at 0:09











  • Much appreciated @pablo ! :)

    – Jonny5
    Mar 7 at 7:16






  • 1





    sorry for the delay. Will try to get some5thing tomorrow!

    – Pablo
    Mar 8 at 1:07











  • okay I went asking around. I hope the answer is helpful.

    – Pablo
    Mar 9 at 0:38















1















Setup:



read from pubsub -> window of 30s -> group by user -> combine -> write to cloud datastore



Problem:



I'm seeing DataStoreIO writer errors as objects with similar keys are present in the same transaction.



Question:



  1. I want to understand how my pipeline combines results into bundles after a group by/combine operation. I would expect the bundle to be created for every window after the combine. But apparently, a bundle can contain more than 2 occurrences of the same user?


  2. Can re-execution (retries) of bundles cause this behavior?


  3. Is this bundling dependent of the runner?


  4. Is deduplication an option? if so, how would I best approach that?


Note that I'm not looking for a replacement for the datastore writer at the end of the pipeline, I already know that we can use a different strategy. I'm merely trying to understand how the bundling happens.










share|improve this question

















  • 1





    ah this is a very good question. TBH I don't know, but I'll do my best to get someone who does here.

    – Pablo
    Mar 7 at 0:09











  • Much appreciated @pablo ! :)

    – Jonny5
    Mar 7 at 7:16






  • 1





    sorry for the delay. Will try to get some5thing tomorrow!

    – Pablo
    Mar 8 at 1:07











  • okay I went asking around. I hope the answer is helpful.

    – Pablo
    Mar 9 at 0:38













1












1








1








Setup:



read from pubsub -> window of 30s -> group by user -> combine -> write to cloud datastore



Problem:



I'm seeing DataStoreIO writer errors as objects with similar keys are present in the same transaction.



Question:



  1. I want to understand how my pipeline combines results into bundles after a group by/combine operation. I would expect the bundle to be created for every window after the combine. But apparently, a bundle can contain more than 2 occurrences of the same user?


  2. Can re-execution (retries) of bundles cause this behavior?


  3. Is this bundling dependent of the runner?


  4. Is deduplication an option? if so, how would I best approach that?


Note that I'm not looking for a replacement for the datastore writer at the end of the pipeline, I already know that we can use a different strategy. I'm merely trying to understand how the bundling happens.










share|improve this question














Setup:



read from pubsub -> window of 30s -> group by user -> combine -> write to cloud datastore



Problem:



I'm seeing DataStoreIO writer errors as objects with similar keys are present in the same transaction.



Question:



  1. I want to understand how my pipeline combines results into bundles after a group by/combine operation. I would expect the bundle to be created for every window after the combine. But apparently, a bundle can contain more than 2 occurrences of the same user?


  2. Can re-execution (retries) of bundles cause this behavior?


  3. Is this bundling dependent of the runner?


  4. Is deduplication an option? if so, how would I best approach that?


Note that I'm not looking for a replacement for the datastore writer at the end of the pipeline, I already know that we can use a different strategy. I'm merely trying to understand how the bundling happens.







google-cloud-dataflow apache-beam






share|improve this question













share|improve this question











share|improve this question




share|improve this question










asked Mar 6 at 15:24









Jonny5Jonny5

461418




461418







  • 1





    ah this is a very good question. TBH I don't know, but I'll do my best to get someone who does here.

    – Pablo
    Mar 7 at 0:09











  • Much appreciated @pablo ! :)

    – Jonny5
    Mar 7 at 7:16






  • 1





    sorry for the delay. Will try to get some5thing tomorrow!

    – Pablo
    Mar 8 at 1:07











  • okay I went asking around. I hope the answer is helpful.

    – Pablo
    Mar 9 at 0:38












  • 1





    ah this is a very good question. TBH I don't know, but I'll do my best to get someone who does here.

    – Pablo
    Mar 7 at 0:09











  • Much appreciated @pablo ! :)

    – Jonny5
    Mar 7 at 7:16






  • 1





    sorry for the delay. Will try to get some5thing tomorrow!

    – Pablo
    Mar 8 at 1:07











  • okay I went asking around. I hope the answer is helpful.

    – Pablo
    Mar 9 at 0:38







1




1





ah this is a very good question. TBH I don't know, but I'll do my best to get someone who does here.

– Pablo
Mar 7 at 0:09





ah this is a very good question. TBH I don't know, but I'll do my best to get someone who does here.

– Pablo
Mar 7 at 0:09













Much appreciated @pablo ! :)

– Jonny5
Mar 7 at 7:16





Much appreciated @pablo ! :)

– Jonny5
Mar 7 at 7:16




1




1





sorry for the delay. Will try to get some5thing tomorrow!

– Pablo
Mar 8 at 1:07





sorry for the delay. Will try to get some5thing tomorrow!

– Pablo
Mar 8 at 1:07













okay I went asking around. I hope the answer is helpful.

– Pablo
Mar 9 at 0:38





okay I went asking around. I hope the answer is helpful.

– Pablo
Mar 9 at 0:38












1 Answer
1






active

oldest

votes


















0














There are two answers to your question. One is specific to your use case, and the other is in general about bundling / windowing in streaming.




Specific to your pipeline



I am assuming that the 'key' for Datastore is the User ID? In that case, if you have events from the same user in more than one window, your GroupByKey or Combine operations will have one separate element for every pair of user+window.



So the question is: What are you trying to insert into datastore?



  • An individual user's resulting aggregate over all time? In that case, you'd need to use a Global Window.

  • A user's resulting aggregate for every 30 seconds in time? Then you need to use the window as part of the key you use to insert to datastore. Does that help / make sense?

Happy to help you design your pipeline to do what you want. Chat with me in the comments or via SO chat.




The larger question about bundling of data



Bundling strategies will vary by runner. In Dataflow, you should consider the following two factors:



  • Every worker is assigned a key range. Elements for the same key will be processed by the same worker.

  • Windows belong to single elements; but a bundle may contain elements from multiple windows. As an example, if the data freshness metric makes a big jump*, a number of windows may be triggered - and elements of the same key in different windows would be processed in the same bundle.


*- when can Data freshness jump suddenly? A stream with a single element with a very old timestamp, and that is very slow to process may hold the watermark for a long time. Once this element is processed, the watermark may jump a lot, to the next oldest element (Check out this lecture on watermarks ; )).






share|improve this answer






















    Your Answer






    StackExchange.ifUsing("editor", function ()
    StackExchange.using("externalEditor", function ()
    StackExchange.using("snippets", function ()
    StackExchange.snippets.init();
    );
    );
    , "code-snippets");

    StackExchange.ready(function()
    var channelOptions =
    tags: "".split(" "),
    id: "1"
    ;
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function()
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled)
    StackExchange.using("snippets", function()
    createEditor();
    );

    else
    createEditor();

    );

    function createEditor()
    StackExchange.prepareEditor(
    heartbeatType: 'answer',
    autoActivateHeartbeat: false,
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader:
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    ,
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    );



    );













    draft saved

    draft discarded


















    StackExchange.ready(
    function ()
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55026556%2fhow-are-dataflow-bundles-created-after-groupby-combine%23new-answer', 'question_page');

    );

    Post as a guest















    Required, but never shown

























    1 Answer
    1






    active

    oldest

    votes








    1 Answer
    1






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes









    0














    There are two answers to your question. One is specific to your use case, and the other is in general about bundling / windowing in streaming.




    Specific to your pipeline



    I am assuming that the 'key' for Datastore is the User ID? In that case, if you have events from the same user in more than one window, your GroupByKey or Combine operations will have one separate element for every pair of user+window.



    So the question is: What are you trying to insert into datastore?



    • An individual user's resulting aggregate over all time? In that case, you'd need to use a Global Window.

    • A user's resulting aggregate for every 30 seconds in time? Then you need to use the window as part of the key you use to insert to datastore. Does that help / make sense?

    Happy to help you design your pipeline to do what you want. Chat with me in the comments or via SO chat.




    The larger question about bundling of data



    Bundling strategies will vary by runner. In Dataflow, you should consider the following two factors:



    • Every worker is assigned a key range. Elements for the same key will be processed by the same worker.

    • Windows belong to single elements; but a bundle may contain elements from multiple windows. As an example, if the data freshness metric makes a big jump*, a number of windows may be triggered - and elements of the same key in different windows would be processed in the same bundle.


    *- when can Data freshness jump suddenly? A stream with a single element with a very old timestamp, and that is very slow to process may hold the watermark for a long time. Once this element is processed, the watermark may jump a lot, to the next oldest element (Check out this lecture on watermarks ; )).






    share|improve this answer



























      0














      There are two answers to your question. One is specific to your use case, and the other is in general about bundling / windowing in streaming.




      Specific to your pipeline



      I am assuming that the 'key' for Datastore is the User ID? In that case, if you have events from the same user in more than one window, your GroupByKey or Combine operations will have one separate element for every pair of user+window.



      So the question is: What are you trying to insert into datastore?



      • An individual user's resulting aggregate over all time? In that case, you'd need to use a Global Window.

      • A user's resulting aggregate for every 30 seconds in time? Then you need to use the window as part of the key you use to insert to datastore. Does that help / make sense?

      Happy to help you design your pipeline to do what you want. Chat with me in the comments or via SO chat.




      The larger question about bundling of data



      Bundling strategies will vary by runner. In Dataflow, you should consider the following two factors:



      • Every worker is assigned a key range. Elements for the same key will be processed by the same worker.

      • Windows belong to single elements; but a bundle may contain elements from multiple windows. As an example, if the data freshness metric makes a big jump*, a number of windows may be triggered - and elements of the same key in different windows would be processed in the same bundle.


      *- when can Data freshness jump suddenly? A stream with a single element with a very old timestamp, and that is very slow to process may hold the watermark for a long time. Once this element is processed, the watermark may jump a lot, to the next oldest element (Check out this lecture on watermarks ; )).






      share|improve this answer

























        0












        0








        0







        There are two answers to your question. One is specific to your use case, and the other is in general about bundling / windowing in streaming.




        Specific to your pipeline



        I am assuming that the 'key' for Datastore is the User ID? In that case, if you have events from the same user in more than one window, your GroupByKey or Combine operations will have one separate element for every pair of user+window.



        So the question is: What are you trying to insert into datastore?



        • An individual user's resulting aggregate over all time? In that case, you'd need to use a Global Window.

        • A user's resulting aggregate for every 30 seconds in time? Then you need to use the window as part of the key you use to insert to datastore. Does that help / make sense?

        Happy to help you design your pipeline to do what you want. Chat with me in the comments or via SO chat.




        The larger question about bundling of data



        Bundling strategies will vary by runner. In Dataflow, you should consider the following two factors:



        • Every worker is assigned a key range. Elements for the same key will be processed by the same worker.

        • Windows belong to single elements; but a bundle may contain elements from multiple windows. As an example, if the data freshness metric makes a big jump*, a number of windows may be triggered - and elements of the same key in different windows would be processed in the same bundle.


        *- when can Data freshness jump suddenly? A stream with a single element with a very old timestamp, and that is very slow to process may hold the watermark for a long time. Once this element is processed, the watermark may jump a lot, to the next oldest element (Check out this lecture on watermarks ; )).






        share|improve this answer













        There are two answers to your question. One is specific to your use case, and the other is in general about bundling / windowing in streaming.




        Specific to your pipeline



        I am assuming that the 'key' for Datastore is the User ID? In that case, if you have events from the same user in more than one window, your GroupByKey or Combine operations will have one separate element for every pair of user+window.



        So the question is: What are you trying to insert into datastore?



        • An individual user's resulting aggregate over all time? In that case, you'd need to use a Global Window.

        • A user's resulting aggregate for every 30 seconds in time? Then you need to use the window as part of the key you use to insert to datastore. Does that help / make sense?

        Happy to help you design your pipeline to do what you want. Chat with me in the comments or via SO chat.




        The larger question about bundling of data



        Bundling strategies will vary by runner. In Dataflow, you should consider the following two factors:



        • Every worker is assigned a key range. Elements for the same key will be processed by the same worker.

        • Windows belong to single elements; but a bundle may contain elements from multiple windows. As an example, if the data freshness metric makes a big jump*, a number of windows may be triggered - and elements of the same key in different windows would be processed in the same bundle.


        *- when can Data freshness jump suddenly? A stream with a single element with a very old timestamp, and that is very slow to process may hold the watermark for a long time. Once this element is processed, the watermark may jump a lot, to the next oldest element (Check out this lecture on watermarks ; )).







        share|improve this answer












        share|improve this answer



        share|improve this answer










        answered Mar 9 at 0:38









        PabloPablo

        3,5662038




        3,5662038





























            draft saved

            draft discarded
















































            Thanks for contributing an answer to Stack Overflow!


            • Please be sure to answer the question. Provide details and share your research!

            But avoid


            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.

            To learn more, see our tips on writing great answers.




            draft saved


            draft discarded














            StackExchange.ready(
            function ()
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55026556%2fhow-are-dataflow-bundles-created-after-groupby-combine%23new-answer', 'question_page');

            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            Popular posts from this blog

            Save data to MySQL database using ExtJS and PHP [closed]2019 Community Moderator ElectionHow can I prevent SQL injection in PHP?Which MySQL data type to use for storing boolean valuesPHP: Delete an element from an arrayHow do I connect to a MySQL Database in Python?Should I use the datetime or timestamp data type in MySQL?How to get a list of MySQL user accountsHow Do You Parse and Process HTML/XML in PHP?Reference — What does this symbol mean in PHP?How does PHP 'foreach' actually work?Why shouldn't I use mysql_* functions in PHP?

            Compiling GNU Global with universal-ctags support Announcing the arrival of Valued Associate #679: Cesar Manara Planned maintenance scheduled April 23, 2019 at 23:30 UTC (7:30pm US/Eastern) Data science time! April 2019 and salary with experience The Ask Question Wizard is Live!Tags for Emacs: Relationship between etags, ebrowse, cscope, GNU Global and exuberant ctagsVim and Ctags tips and trickscscope or ctags why choose one over the other?scons and ctagsctags cannot open option file “.ctags”Adding tag scopes in universal-ctagsShould I use Universal-ctags?Universal ctags on WindowsHow do I install GNU Global with universal ctags support using Homebrew?Universal ctags with emacsHow to highlight ctags generated by Universal Ctags in Vim?

            Add ONERROR event to image from jsp tldHow to add an image to a JPanel?Saving image from PHP URLHTML img scalingCheck if an image is loaded (no errors) with jQueryHow to force an <img> to take up width, even if the image is not loadedHow do I populate hidden form field with a value set in Spring ControllerStyling Raw elements Generated from JSP tagds with Jquery MobileLimit resizing of images with explicitly set width and height attributeserror TLD use in a jsp fileJsp tld files cannot be resolved