Optimizing Scatter The Next CEO of Stack Overflow“Large data” work flows using pandasAt what situation I can use Dask instead of Apache Spark?How to pass a pandas dataframe to dask distributed workers?How to load dataframe on all dask workersReading LAZ to Dask dataframe using delayed loadingWith dask-distributed how to generate futures from long running tasks fed by queuesunable to persist dask dataframe after read_sql_tableDask scheduler behavior while reading/retrieving large datasetsHow to replicate data when it is faster to compute than transfer in dask distributed?How to limit Dask memory usage for dask.dataframe.to_csv?

Salesforce opportunity stages

It it possible to avoid kiwi.com's automatic online check-in and instead do it manually by yourself?

Planeswalker Ability and Death Timing

Does Germany produce more waste than the US?

How to find if SQL server backup is encrypted with TDE without restoring the backup

logical reads on global temp table, but not on session-level temp table

Car headlights in a world without electricity

Is it OK to decorate a log book cover?

How to unfasten electrical subpanel attached with ramset

What is the difference between 'contrib' and 'non-free' packages repositories?

Would a grinding machine be a simple and workable propulsion system for an interplanetary spacecraft?

Traveling with my 5 year old daughter (as the father) without the mother from Germany to Mexico

Can you teleport closer to a creature you are Frightened of?

Calculate the Mean mean of two numbers

Direct Implications Between USA and UK in Event of No-Deal Brexit

Finitely generated matrix groups whose eigenvalues are all algebraic

How seriously should I take size and weight limits of hand luggage?

Small nick on power cord from an electric alarm clock, and copper wiring exposed but intact

Does int main() need a declaration on C++?

My ex-girlfriend uses my Apple ID to login to her iPad, do I have to give her my Apple ID password to reset it?

Is it possible to create a QR code using text?

Prodigo = pro + ago?

Is it okay to majorly distort historical facts while writing a fiction story?

Read/write a pipe-delimited file line by line with some simple text manipulation



Optimizing Scatter



The Next CEO of Stack Overflow“Large data” work flows using pandasAt what situation I can use Dask instead of Apache Spark?How to pass a pandas dataframe to dask distributed workers?How to load dataframe on all dask workersReading LAZ to Dask dataframe using delayed loadingWith dask-distributed how to generate futures from long running tasks fed by queuesunable to persist dask dataframe after read_sql_tableDask scheduler behavior while reading/retrieving large datasetsHow to replicate data when it is faster to compute than transfer in dask distributed?How to limit Dask memory usage for dask.dataframe.to_csv?










1















I have a large data problem. The specific problem isn't super important, but I've solved it with dask. Now I have two problems.



from dask import distributed
import numpy as np

local_cluster = distributed.LocalCluster(n_workers=20, processes=True, memory_limit=0)
dask_client = distributed.Client(local_cluster)

hat_matrix = np.random.rand(1000,25000)
weight_matrix = np.random.rand(1000)
Y = np.random.rand(1000, 25000)

[scatter_hat] = dask_client.scatter([hat_matrix], broadcast=True)
[scatter_weight] = dask_client.scatter([weight_matrix], broadcast=True)

futures = [dask_client.submit(apply_function, i, scatter_hat, Y[i, :], scatter_weight)
for i in range(Y.shape[0])]

results = dask_client.gather(futures)


I can split up Y (which is good, because I don't have enough memory to really load it all at once), but all of my workers need hat_matrix. Scattering hat_matrix and then sending Y row-wise works great. Except hat_matrix and Y are both... large, which is is fine. I have enough memory provisioned to deal with it. But I can't find any way to allow for short memory spikes (which occur during deserialization), so if I set a memory limit the nanny kills all my workers. Then all my new workers. And so on and so forth. So I have three questions:



Is there a way to set a memory limit that permits spikes as serialized data comes in and is unpacked? If I have 64 GB of memory to drive 20 processes, I'd like to set a memory limit of, say, 2.8GB per process. When I scatter 2GB of data, there's a spike to ~4GB per process for deserializing, and the nanny kills everything.



Is there a way to stagger scattering to minimize the transient memory spike?



Is there a convenient way to scatter data by disk instead of through TCP or do I have to custom write that?
(As a corollary: Is there a convenient way to load a memory-mapped dask array from a serialized file in all of my workers?)










share|improve this question




























    1















    I have a large data problem. The specific problem isn't super important, but I've solved it with dask. Now I have two problems.



    from dask import distributed
    import numpy as np

    local_cluster = distributed.LocalCluster(n_workers=20, processes=True, memory_limit=0)
    dask_client = distributed.Client(local_cluster)

    hat_matrix = np.random.rand(1000,25000)
    weight_matrix = np.random.rand(1000)
    Y = np.random.rand(1000, 25000)

    [scatter_hat] = dask_client.scatter([hat_matrix], broadcast=True)
    [scatter_weight] = dask_client.scatter([weight_matrix], broadcast=True)

    futures = [dask_client.submit(apply_function, i, scatter_hat, Y[i, :], scatter_weight)
    for i in range(Y.shape[0])]

    results = dask_client.gather(futures)


    I can split up Y (which is good, because I don't have enough memory to really load it all at once), but all of my workers need hat_matrix. Scattering hat_matrix and then sending Y row-wise works great. Except hat_matrix and Y are both... large, which is is fine. I have enough memory provisioned to deal with it. But I can't find any way to allow for short memory spikes (which occur during deserialization), so if I set a memory limit the nanny kills all my workers. Then all my new workers. And so on and so forth. So I have three questions:



    Is there a way to set a memory limit that permits spikes as serialized data comes in and is unpacked? If I have 64 GB of memory to drive 20 processes, I'd like to set a memory limit of, say, 2.8GB per process. When I scatter 2GB of data, there's a spike to ~4GB per process for deserializing, and the nanny kills everything.



    Is there a way to stagger scattering to minimize the transient memory spike?



    Is there a convenient way to scatter data by disk instead of through TCP or do I have to custom write that?
    (As a corollary: Is there a convenient way to load a memory-mapped dask array from a serialized file in all of my workers?)










    share|improve this question


























      1












      1








      1








      I have a large data problem. The specific problem isn't super important, but I've solved it with dask. Now I have two problems.



      from dask import distributed
      import numpy as np

      local_cluster = distributed.LocalCluster(n_workers=20, processes=True, memory_limit=0)
      dask_client = distributed.Client(local_cluster)

      hat_matrix = np.random.rand(1000,25000)
      weight_matrix = np.random.rand(1000)
      Y = np.random.rand(1000, 25000)

      [scatter_hat] = dask_client.scatter([hat_matrix], broadcast=True)
      [scatter_weight] = dask_client.scatter([weight_matrix], broadcast=True)

      futures = [dask_client.submit(apply_function, i, scatter_hat, Y[i, :], scatter_weight)
      for i in range(Y.shape[0])]

      results = dask_client.gather(futures)


      I can split up Y (which is good, because I don't have enough memory to really load it all at once), but all of my workers need hat_matrix. Scattering hat_matrix and then sending Y row-wise works great. Except hat_matrix and Y are both... large, which is is fine. I have enough memory provisioned to deal with it. But I can't find any way to allow for short memory spikes (which occur during deserialization), so if I set a memory limit the nanny kills all my workers. Then all my new workers. And so on and so forth. So I have three questions:



      Is there a way to set a memory limit that permits spikes as serialized data comes in and is unpacked? If I have 64 GB of memory to drive 20 processes, I'd like to set a memory limit of, say, 2.8GB per process. When I scatter 2GB of data, there's a spike to ~4GB per process for deserializing, and the nanny kills everything.



      Is there a way to stagger scattering to minimize the transient memory spike?



      Is there a convenient way to scatter data by disk instead of through TCP or do I have to custom write that?
      (As a corollary: Is there a convenient way to load a memory-mapped dask array from a serialized file in all of my workers?)










      share|improve this question
















      I have a large data problem. The specific problem isn't super important, but I've solved it with dask. Now I have two problems.



      from dask import distributed
      import numpy as np

      local_cluster = distributed.LocalCluster(n_workers=20, processes=True, memory_limit=0)
      dask_client = distributed.Client(local_cluster)

      hat_matrix = np.random.rand(1000,25000)
      weight_matrix = np.random.rand(1000)
      Y = np.random.rand(1000, 25000)

      [scatter_hat] = dask_client.scatter([hat_matrix], broadcast=True)
      [scatter_weight] = dask_client.scatter([weight_matrix], broadcast=True)

      futures = [dask_client.submit(apply_function, i, scatter_hat, Y[i, :], scatter_weight)
      for i in range(Y.shape[0])]

      results = dask_client.gather(futures)


      I can split up Y (which is good, because I don't have enough memory to really load it all at once), but all of my workers need hat_matrix. Scattering hat_matrix and then sending Y row-wise works great. Except hat_matrix and Y are both... large, which is is fine. I have enough memory provisioned to deal with it. But I can't find any way to allow for short memory spikes (which occur during deserialization), so if I set a memory limit the nanny kills all my workers. Then all my new workers. And so on and so forth. So I have three questions:



      Is there a way to set a memory limit that permits spikes as serialized data comes in and is unpacked? If I have 64 GB of memory to drive 20 processes, I'd like to set a memory limit of, say, 2.8GB per process. When I scatter 2GB of data, there's a spike to ~4GB per process for deserializing, and the nanny kills everything.



      Is there a way to stagger scattering to minimize the transient memory spike?



      Is there a convenient way to scatter data by disk instead of through TCP or do I have to custom write that?
      (As a corollary: Is there a convenient way to load a memory-mapped dask array from a serialized file in all of my workers?)







      python dask






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Mar 10 at 13:55







      CJR

















      asked Mar 7 at 1:24









      CJRCJR

      1,2312316




      1,2312316






















          1 Answer
          1






          active

          oldest

          votes


















          1















          Is there a way to set a memory limit that permits spikes as serialized data comes in and is unpacked?




          In general deserialization runs arbitrary code, so dask can't really control what happens. In practice though your matrices aren't really that large though compared to typical memory available on modern hardware though, so I'm surprised that you're running into an issue. Dask is decently careful with NumPy arrays. I wouldn't expect it to use much more memory than the size of the array.




          Is there a way to stagger scattering to minimize the transient memory spike?




          Scattering currently goes through a broadcast tree. Your client sends to a few workers, then they send to a few more workers, and so on. By default the branching factor here is only two, so I would be surprised to see a huge blowup here.




          Is there a convenient way to scatter data by disk instead of through TCP or do I have to custom write that? (As a corollary: Is there a convenient way to load a memory-mapped dask array from a serialized file in all of my workers?)




          Perhaps you could use a memory mapped NumPy array of some sort rather than an in-memory NumPy array?






          share|improve this answer























          • The example data sizes are to get the example code to work in a test environment; the real-world data is larger (I've edited the question to be more specific). I usually need to set a memory limit that's 2.5x the data I'm scattering or turn the memory limit off entirely, neither of which is ideal.

            – CJR
            Mar 10 at 14:03











          • I'd assumed that trying to send a memory-mapped array to a bunch of processes on physically separated nodes would not work, but I'll give it a try.

            – CJR
            Mar 10 at 14:04











          Your Answer






          StackExchange.ifUsing("editor", function ()
          StackExchange.using("externalEditor", function ()
          StackExchange.using("snippets", function ()
          StackExchange.snippets.init();
          );
          );
          , "code-snippets");

          StackExchange.ready(function()
          var channelOptions =
          tags: "".split(" "),
          id: "1"
          ;
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function()
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled)
          StackExchange.using("snippets", function()
          createEditor();
          );

          else
          createEditor();

          );

          function createEditor()
          StackExchange.prepareEditor(
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader:
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          ,
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          );



          );













          draft saved

          draft discarded


















          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55034654%2foptimizing-scatter%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          1















          Is there a way to set a memory limit that permits spikes as serialized data comes in and is unpacked?




          In general deserialization runs arbitrary code, so dask can't really control what happens. In practice though your matrices aren't really that large though compared to typical memory available on modern hardware though, so I'm surprised that you're running into an issue. Dask is decently careful with NumPy arrays. I wouldn't expect it to use much more memory than the size of the array.




          Is there a way to stagger scattering to minimize the transient memory spike?




          Scattering currently goes through a broadcast tree. Your client sends to a few workers, then they send to a few more workers, and so on. By default the branching factor here is only two, so I would be surprised to see a huge blowup here.




          Is there a convenient way to scatter data by disk instead of through TCP or do I have to custom write that? (As a corollary: Is there a convenient way to load a memory-mapped dask array from a serialized file in all of my workers?)




          Perhaps you could use a memory mapped NumPy array of some sort rather than an in-memory NumPy array?






          share|improve this answer























          • The example data sizes are to get the example code to work in a test environment; the real-world data is larger (I've edited the question to be more specific). I usually need to set a memory limit that's 2.5x the data I'm scattering or turn the memory limit off entirely, neither of which is ideal.

            – CJR
            Mar 10 at 14:03











          • I'd assumed that trying to send a memory-mapped array to a bunch of processes on physically separated nodes would not work, but I'll give it a try.

            – CJR
            Mar 10 at 14:04















          1















          Is there a way to set a memory limit that permits spikes as serialized data comes in and is unpacked?




          In general deserialization runs arbitrary code, so dask can't really control what happens. In practice though your matrices aren't really that large though compared to typical memory available on modern hardware though, so I'm surprised that you're running into an issue. Dask is decently careful with NumPy arrays. I wouldn't expect it to use much more memory than the size of the array.




          Is there a way to stagger scattering to minimize the transient memory spike?




          Scattering currently goes through a broadcast tree. Your client sends to a few workers, then they send to a few more workers, and so on. By default the branching factor here is only two, so I would be surprised to see a huge blowup here.




          Is there a convenient way to scatter data by disk instead of through TCP or do I have to custom write that? (As a corollary: Is there a convenient way to load a memory-mapped dask array from a serialized file in all of my workers?)




          Perhaps you could use a memory mapped NumPy array of some sort rather than an in-memory NumPy array?






          share|improve this answer























          • The example data sizes are to get the example code to work in a test environment; the real-world data is larger (I've edited the question to be more specific). I usually need to set a memory limit that's 2.5x the data I'm scattering or turn the memory limit off entirely, neither of which is ideal.

            – CJR
            Mar 10 at 14:03











          • I'd assumed that trying to send a memory-mapped array to a bunch of processes on physically separated nodes would not work, but I'll give it a try.

            – CJR
            Mar 10 at 14:04













          1












          1








          1








          Is there a way to set a memory limit that permits spikes as serialized data comes in and is unpacked?




          In general deserialization runs arbitrary code, so dask can't really control what happens. In practice though your matrices aren't really that large though compared to typical memory available on modern hardware though, so I'm surprised that you're running into an issue. Dask is decently careful with NumPy arrays. I wouldn't expect it to use much more memory than the size of the array.




          Is there a way to stagger scattering to minimize the transient memory spike?




          Scattering currently goes through a broadcast tree. Your client sends to a few workers, then they send to a few more workers, and so on. By default the branching factor here is only two, so I would be surprised to see a huge blowup here.




          Is there a convenient way to scatter data by disk instead of through TCP or do I have to custom write that? (As a corollary: Is there a convenient way to load a memory-mapped dask array from a serialized file in all of my workers?)




          Perhaps you could use a memory mapped NumPy array of some sort rather than an in-memory NumPy array?






          share|improve this answer














          Is there a way to set a memory limit that permits spikes as serialized data comes in and is unpacked?




          In general deserialization runs arbitrary code, so dask can't really control what happens. In practice though your matrices aren't really that large though compared to typical memory available on modern hardware though, so I'm surprised that you're running into an issue. Dask is decently careful with NumPy arrays. I wouldn't expect it to use much more memory than the size of the array.




          Is there a way to stagger scattering to minimize the transient memory spike?




          Scattering currently goes through a broadcast tree. Your client sends to a few workers, then they send to a few more workers, and so on. By default the branching factor here is only two, so I would be surprised to see a huge blowup here.




          Is there a convenient way to scatter data by disk instead of through TCP or do I have to custom write that? (As a corollary: Is there a convenient way to load a memory-mapped dask array from a serialized file in all of my workers?)




          Perhaps you could use a memory mapped NumPy array of some sort rather than an in-memory NumPy array?







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Mar 10 at 0:05









          MRocklinMRocklin

          27.2k1472130




          27.2k1472130












          • The example data sizes are to get the example code to work in a test environment; the real-world data is larger (I've edited the question to be more specific). I usually need to set a memory limit that's 2.5x the data I'm scattering or turn the memory limit off entirely, neither of which is ideal.

            – CJR
            Mar 10 at 14:03











          • I'd assumed that trying to send a memory-mapped array to a bunch of processes on physically separated nodes would not work, but I'll give it a try.

            – CJR
            Mar 10 at 14:04

















          • The example data sizes are to get the example code to work in a test environment; the real-world data is larger (I've edited the question to be more specific). I usually need to set a memory limit that's 2.5x the data I'm scattering or turn the memory limit off entirely, neither of which is ideal.

            – CJR
            Mar 10 at 14:03











          • I'd assumed that trying to send a memory-mapped array to a bunch of processes on physically separated nodes would not work, but I'll give it a try.

            – CJR
            Mar 10 at 14:04
















          The example data sizes are to get the example code to work in a test environment; the real-world data is larger (I've edited the question to be more specific). I usually need to set a memory limit that's 2.5x the data I'm scattering or turn the memory limit off entirely, neither of which is ideal.

          – CJR
          Mar 10 at 14:03





          The example data sizes are to get the example code to work in a test environment; the real-world data is larger (I've edited the question to be more specific). I usually need to set a memory limit that's 2.5x the data I'm scattering or turn the memory limit off entirely, neither of which is ideal.

          – CJR
          Mar 10 at 14:03













          I'd assumed that trying to send a memory-mapped array to a bunch of processes on physically separated nodes would not work, but I'll give it a try.

          – CJR
          Mar 10 at 14:04





          I'd assumed that trying to send a memory-mapped array to a bunch of processes on physically separated nodes would not work, but I'll give it a try.

          – CJR
          Mar 10 at 14:04



















          draft saved

          draft discarded
















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid


          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.

          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55034654%2foptimizing-scatter%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          AWS Lex not identifying response if by a variable The 2019 Stack Overflow Developer Survey Results Are In Announcing the arrival of Valued Associate #679: Cesar Manara Planned maintenance scheduled April 17/18, 2019 at 00:00UTC (8:00pm US/Eastern) The Ask Question Wizard is Live! Data science time! April 2019 and salary with experienceEnforcing custom enumeration in AWS LEX for slot valuesHow to give response based on user response in Amazon Lex?Intercepting AWS Lambda Response to a AWS Lex QueryLex chat bot error: Reached second execution of fulfillment lambda on the same utteranceamazon lex showing invalid responseLambda response send back to Lex slot?Response card in Amazon lexAmazon Lex - Lambda response return HTML to botHow can I solve 424 (Failed Dependency) (python) obtained from Amazon lex?

          Алба-Юлія

          Захаров Федір Захарович