Optimizing Scatter The Next CEO of Stack Overflow“Large data” work flows using pandasAt what situation I can use Dask instead of Apache Spark?How to pass a pandas dataframe to dask distributed workers?How to load dataframe on all dask workersReading LAZ to Dask dataframe using delayed loadingWith dask-distributed how to generate futures from long running tasks fed by queuesunable to persist dask dataframe after read_sql_tableDask scheduler behavior while reading/retrieving large datasetsHow to replicate data when it is faster to compute than transfer in dask distributed?How to limit Dask memory usage for dask.dataframe.to_csv?
Salesforce opportunity stages
It it possible to avoid kiwi.com's automatic online check-in and instead do it manually by yourself?
Planeswalker Ability and Death Timing
Does Germany produce more waste than the US?
How to find if SQL server backup is encrypted with TDE without restoring the backup
logical reads on global temp table, but not on session-level temp table
Car headlights in a world without electricity
Is it OK to decorate a log book cover?
How to unfasten electrical subpanel attached with ramset
What is the difference between 'contrib' and 'non-free' packages repositories?
Would a grinding machine be a simple and workable propulsion system for an interplanetary spacecraft?
Traveling with my 5 year old daughter (as the father) without the mother from Germany to Mexico
Can you teleport closer to a creature you are Frightened of?
Calculate the Mean mean of two numbers
Direct Implications Between USA and UK in Event of No-Deal Brexit
Finitely generated matrix groups whose eigenvalues are all algebraic
How seriously should I take size and weight limits of hand luggage?
Small nick on power cord from an electric alarm clock, and copper wiring exposed but intact
Does int main() need a declaration on C++?
My ex-girlfriend uses my Apple ID to login to her iPad, do I have to give her my Apple ID password to reset it?
Is it possible to create a QR code using text?
Prodigo = pro + ago?
Is it okay to majorly distort historical facts while writing a fiction story?
Read/write a pipe-delimited file line by line with some simple text manipulation
Optimizing Scatter
The Next CEO of Stack Overflow“Large data” work flows using pandasAt what situation I can use Dask instead of Apache Spark?How to pass a pandas dataframe to dask distributed workers?How to load dataframe on all dask workersReading LAZ to Dask dataframe using delayed loadingWith dask-distributed how to generate futures from long running tasks fed by queuesunable to persist dask dataframe after read_sql_tableDask scheduler behavior while reading/retrieving large datasetsHow to replicate data when it is faster to compute than transfer in dask distributed?How to limit Dask memory usage for dask.dataframe.to_csv?
I have a large data problem. The specific problem isn't super important, but I've solved it with dask. Now I have two problems.
from dask import distributed
import numpy as np
local_cluster = distributed.LocalCluster(n_workers=20, processes=True, memory_limit=0)
dask_client = distributed.Client(local_cluster)
hat_matrix = np.random.rand(1000,25000)
weight_matrix = np.random.rand(1000)
Y = np.random.rand(1000, 25000)
[scatter_hat] = dask_client.scatter([hat_matrix], broadcast=True)
[scatter_weight] = dask_client.scatter([weight_matrix], broadcast=True)
futures = [dask_client.submit(apply_function, i, scatter_hat, Y[i, :], scatter_weight)
for i in range(Y.shape[0])]
results = dask_client.gather(futures)
I can split up Y (which is good, because I don't have enough memory to really load it all at once), but all of my workers need hat_matrix. Scattering hat_matrix and then sending Y row-wise works great. Except hat_matrix and Y are both... large, which is is fine. I have enough memory provisioned to deal with it. But I can't find any way to allow for short memory spikes (which occur during deserialization), so if I set a memory limit the nanny kills all my workers. Then all my new workers. And so on and so forth. So I have three questions:
Is there a way to set a memory limit that permits spikes as serialized data comes in and is unpacked? If I have 64 GB of memory to drive 20 processes, I'd like to set a memory limit of, say, 2.8GB per process. When I scatter 2GB of data, there's a spike to ~4GB per process for deserializing, and the nanny kills everything.
Is there a way to stagger scattering to minimize the transient memory spike?
Is there a convenient way to scatter data by disk instead of through TCP or do I have to custom write that?
(As a corollary: Is there a convenient way to load a memory-mapped dask array from a serialized file in all of my workers?)
python dask
add a comment |
I have a large data problem. The specific problem isn't super important, but I've solved it with dask. Now I have two problems.
from dask import distributed
import numpy as np
local_cluster = distributed.LocalCluster(n_workers=20, processes=True, memory_limit=0)
dask_client = distributed.Client(local_cluster)
hat_matrix = np.random.rand(1000,25000)
weight_matrix = np.random.rand(1000)
Y = np.random.rand(1000, 25000)
[scatter_hat] = dask_client.scatter([hat_matrix], broadcast=True)
[scatter_weight] = dask_client.scatter([weight_matrix], broadcast=True)
futures = [dask_client.submit(apply_function, i, scatter_hat, Y[i, :], scatter_weight)
for i in range(Y.shape[0])]
results = dask_client.gather(futures)
I can split up Y (which is good, because I don't have enough memory to really load it all at once), but all of my workers need hat_matrix. Scattering hat_matrix and then sending Y row-wise works great. Except hat_matrix and Y are both... large, which is is fine. I have enough memory provisioned to deal with it. But I can't find any way to allow for short memory spikes (which occur during deserialization), so if I set a memory limit the nanny kills all my workers. Then all my new workers. And so on and so forth. So I have three questions:
Is there a way to set a memory limit that permits spikes as serialized data comes in and is unpacked? If I have 64 GB of memory to drive 20 processes, I'd like to set a memory limit of, say, 2.8GB per process. When I scatter 2GB of data, there's a spike to ~4GB per process for deserializing, and the nanny kills everything.
Is there a way to stagger scattering to minimize the transient memory spike?
Is there a convenient way to scatter data by disk instead of through TCP or do I have to custom write that?
(As a corollary: Is there a convenient way to load a memory-mapped dask array from a serialized file in all of my workers?)
python dask
add a comment |
I have a large data problem. The specific problem isn't super important, but I've solved it with dask. Now I have two problems.
from dask import distributed
import numpy as np
local_cluster = distributed.LocalCluster(n_workers=20, processes=True, memory_limit=0)
dask_client = distributed.Client(local_cluster)
hat_matrix = np.random.rand(1000,25000)
weight_matrix = np.random.rand(1000)
Y = np.random.rand(1000, 25000)
[scatter_hat] = dask_client.scatter([hat_matrix], broadcast=True)
[scatter_weight] = dask_client.scatter([weight_matrix], broadcast=True)
futures = [dask_client.submit(apply_function, i, scatter_hat, Y[i, :], scatter_weight)
for i in range(Y.shape[0])]
results = dask_client.gather(futures)
I can split up Y (which is good, because I don't have enough memory to really load it all at once), but all of my workers need hat_matrix. Scattering hat_matrix and then sending Y row-wise works great. Except hat_matrix and Y are both... large, which is is fine. I have enough memory provisioned to deal with it. But I can't find any way to allow for short memory spikes (which occur during deserialization), so if I set a memory limit the nanny kills all my workers. Then all my new workers. And so on and so forth. So I have three questions:
Is there a way to set a memory limit that permits spikes as serialized data comes in and is unpacked? If I have 64 GB of memory to drive 20 processes, I'd like to set a memory limit of, say, 2.8GB per process. When I scatter 2GB of data, there's a spike to ~4GB per process for deserializing, and the nanny kills everything.
Is there a way to stagger scattering to minimize the transient memory spike?
Is there a convenient way to scatter data by disk instead of through TCP or do I have to custom write that?
(As a corollary: Is there a convenient way to load a memory-mapped dask array from a serialized file in all of my workers?)
python dask
I have a large data problem. The specific problem isn't super important, but I've solved it with dask. Now I have two problems.
from dask import distributed
import numpy as np
local_cluster = distributed.LocalCluster(n_workers=20, processes=True, memory_limit=0)
dask_client = distributed.Client(local_cluster)
hat_matrix = np.random.rand(1000,25000)
weight_matrix = np.random.rand(1000)
Y = np.random.rand(1000, 25000)
[scatter_hat] = dask_client.scatter([hat_matrix], broadcast=True)
[scatter_weight] = dask_client.scatter([weight_matrix], broadcast=True)
futures = [dask_client.submit(apply_function, i, scatter_hat, Y[i, :], scatter_weight)
for i in range(Y.shape[0])]
results = dask_client.gather(futures)
I can split up Y (which is good, because I don't have enough memory to really load it all at once), but all of my workers need hat_matrix. Scattering hat_matrix and then sending Y row-wise works great. Except hat_matrix and Y are both... large, which is is fine. I have enough memory provisioned to deal with it. But I can't find any way to allow for short memory spikes (which occur during deserialization), so if I set a memory limit the nanny kills all my workers. Then all my new workers. And so on and so forth. So I have three questions:
Is there a way to set a memory limit that permits spikes as serialized data comes in and is unpacked? If I have 64 GB of memory to drive 20 processes, I'd like to set a memory limit of, say, 2.8GB per process. When I scatter 2GB of data, there's a spike to ~4GB per process for deserializing, and the nanny kills everything.
Is there a way to stagger scattering to minimize the transient memory spike?
Is there a convenient way to scatter data by disk instead of through TCP or do I have to custom write that?
(As a corollary: Is there a convenient way to load a memory-mapped dask array from a serialized file in all of my workers?)
python dask
python dask
edited Mar 10 at 13:55
CJR
asked Mar 7 at 1:24
CJRCJR
1,2312316
1,2312316
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
Is there a way to set a memory limit that permits spikes as serialized data comes in and is unpacked?
In general deserialization runs arbitrary code, so dask can't really control what happens. In practice though your matrices aren't really that large though compared to typical memory available on modern hardware though, so I'm surprised that you're running into an issue. Dask is decently careful with NumPy arrays. I wouldn't expect it to use much more memory than the size of the array.
Is there a way to stagger scattering to minimize the transient memory spike?
Scattering currently goes through a broadcast tree. Your client sends to a few workers, then they send to a few more workers, and so on. By default the branching factor here is only two, so I would be surprised to see a huge blowup here.
Is there a convenient way to scatter data by disk instead of through TCP or do I have to custom write that? (As a corollary: Is there a convenient way to load a memory-mapped dask array from a serialized file in all of my workers?)
Perhaps you could use a memory mapped NumPy array of some sort rather than an in-memory NumPy array?
The example data sizes are to get the example code to work in a test environment; the real-world data is larger (I've edited the question to be more specific). I usually need to set a memory limit that's 2.5x the data I'm scattering or turn the memory limit off entirely, neither of which is ideal.
– CJR
Mar 10 at 14:03
I'd assumed that trying to send a memory-mapped array to a bunch of processes on physically separated nodes would not work, but I'll give it a try.
– CJR
Mar 10 at 14:04
add a comment |
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55034654%2foptimizing-scatter%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
Is there a way to set a memory limit that permits spikes as serialized data comes in and is unpacked?
In general deserialization runs arbitrary code, so dask can't really control what happens. In practice though your matrices aren't really that large though compared to typical memory available on modern hardware though, so I'm surprised that you're running into an issue. Dask is decently careful with NumPy arrays. I wouldn't expect it to use much more memory than the size of the array.
Is there a way to stagger scattering to minimize the transient memory spike?
Scattering currently goes through a broadcast tree. Your client sends to a few workers, then they send to a few more workers, and so on. By default the branching factor here is only two, so I would be surprised to see a huge blowup here.
Is there a convenient way to scatter data by disk instead of through TCP or do I have to custom write that? (As a corollary: Is there a convenient way to load a memory-mapped dask array from a serialized file in all of my workers?)
Perhaps you could use a memory mapped NumPy array of some sort rather than an in-memory NumPy array?
The example data sizes are to get the example code to work in a test environment; the real-world data is larger (I've edited the question to be more specific). I usually need to set a memory limit that's 2.5x the data I'm scattering or turn the memory limit off entirely, neither of which is ideal.
– CJR
Mar 10 at 14:03
I'd assumed that trying to send a memory-mapped array to a bunch of processes on physically separated nodes would not work, but I'll give it a try.
– CJR
Mar 10 at 14:04
add a comment |
Is there a way to set a memory limit that permits spikes as serialized data comes in and is unpacked?
In general deserialization runs arbitrary code, so dask can't really control what happens. In practice though your matrices aren't really that large though compared to typical memory available on modern hardware though, so I'm surprised that you're running into an issue. Dask is decently careful with NumPy arrays. I wouldn't expect it to use much more memory than the size of the array.
Is there a way to stagger scattering to minimize the transient memory spike?
Scattering currently goes through a broadcast tree. Your client sends to a few workers, then they send to a few more workers, and so on. By default the branching factor here is only two, so I would be surprised to see a huge blowup here.
Is there a convenient way to scatter data by disk instead of through TCP or do I have to custom write that? (As a corollary: Is there a convenient way to load a memory-mapped dask array from a serialized file in all of my workers?)
Perhaps you could use a memory mapped NumPy array of some sort rather than an in-memory NumPy array?
The example data sizes are to get the example code to work in a test environment; the real-world data is larger (I've edited the question to be more specific). I usually need to set a memory limit that's 2.5x the data I'm scattering or turn the memory limit off entirely, neither of which is ideal.
– CJR
Mar 10 at 14:03
I'd assumed that trying to send a memory-mapped array to a bunch of processes on physically separated nodes would not work, but I'll give it a try.
– CJR
Mar 10 at 14:04
add a comment |
Is there a way to set a memory limit that permits spikes as serialized data comes in and is unpacked?
In general deserialization runs arbitrary code, so dask can't really control what happens. In practice though your matrices aren't really that large though compared to typical memory available on modern hardware though, so I'm surprised that you're running into an issue. Dask is decently careful with NumPy arrays. I wouldn't expect it to use much more memory than the size of the array.
Is there a way to stagger scattering to minimize the transient memory spike?
Scattering currently goes through a broadcast tree. Your client sends to a few workers, then they send to a few more workers, and so on. By default the branching factor here is only two, so I would be surprised to see a huge blowup here.
Is there a convenient way to scatter data by disk instead of through TCP or do I have to custom write that? (As a corollary: Is there a convenient way to load a memory-mapped dask array from a serialized file in all of my workers?)
Perhaps you could use a memory mapped NumPy array of some sort rather than an in-memory NumPy array?
Is there a way to set a memory limit that permits spikes as serialized data comes in and is unpacked?
In general deserialization runs arbitrary code, so dask can't really control what happens. In practice though your matrices aren't really that large though compared to typical memory available on modern hardware though, so I'm surprised that you're running into an issue. Dask is decently careful with NumPy arrays. I wouldn't expect it to use much more memory than the size of the array.
Is there a way to stagger scattering to minimize the transient memory spike?
Scattering currently goes through a broadcast tree. Your client sends to a few workers, then they send to a few more workers, and so on. By default the branching factor here is only two, so I would be surprised to see a huge blowup here.
Is there a convenient way to scatter data by disk instead of through TCP or do I have to custom write that? (As a corollary: Is there a convenient way to load a memory-mapped dask array from a serialized file in all of my workers?)
Perhaps you could use a memory mapped NumPy array of some sort rather than an in-memory NumPy array?
answered Mar 10 at 0:05
MRocklinMRocklin
27.2k1472130
27.2k1472130
The example data sizes are to get the example code to work in a test environment; the real-world data is larger (I've edited the question to be more specific). I usually need to set a memory limit that's 2.5x the data I'm scattering or turn the memory limit off entirely, neither of which is ideal.
– CJR
Mar 10 at 14:03
I'd assumed that trying to send a memory-mapped array to a bunch of processes on physically separated nodes would not work, but I'll give it a try.
– CJR
Mar 10 at 14:04
add a comment |
The example data sizes are to get the example code to work in a test environment; the real-world data is larger (I've edited the question to be more specific). I usually need to set a memory limit that's 2.5x the data I'm scattering or turn the memory limit off entirely, neither of which is ideal.
– CJR
Mar 10 at 14:03
I'd assumed that trying to send a memory-mapped array to a bunch of processes on physically separated nodes would not work, but I'll give it a try.
– CJR
Mar 10 at 14:04
The example data sizes are to get the example code to work in a test environment; the real-world data is larger (I've edited the question to be more specific). I usually need to set a memory limit that's 2.5x the data I'm scattering or turn the memory limit off entirely, neither of which is ideal.
– CJR
Mar 10 at 14:03
The example data sizes are to get the example code to work in a test environment; the real-world data is larger (I've edited the question to be more specific). I usually need to set a memory limit that's 2.5x the data I'm scattering or turn the memory limit off entirely, neither of which is ideal.
– CJR
Mar 10 at 14:03
I'd assumed that trying to send a memory-mapped array to a bunch of processes on physically separated nodes would not work, but I'll give it a try.
– CJR
Mar 10 at 14:04
I'd assumed that trying to send a memory-mapped array to a bunch of processes on physically separated nodes would not work, but I'll give it a try.
– CJR
Mar 10 at 14:04
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55034654%2foptimizing-scatter%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
