Scala: Process dataframe while value in column meets conditionHow to sort a dataframe by multiple column(s)?How do I replace NA values with zeros in an R dataframe?Selecting multiple columns in a pandas dataframeAdding new column to existing DataFrame in Python pandasHow to change the order of DataFrame columns?Delete column from pandas DataFrame by column nameHow to drop rows of Pandas DataFrame whose value in certain columns is NaNSelect rows from a DataFrame based on values in a column in pandasGet list from pandas DataFrame column headersReplace missing values with mean - Spark Dataframe

Will adding a BY-SA image to a blog post make the entire post BY-SA?

How do I implement a file system driver driver in Linux?

Is a model fitted to data or is data fitted to a model?

Why does Async/Await work properly when the loop is inside the async function and not the other way around?

Journal losing indexing services

Varistor? Purpose and principle

Why has "pence" been used in this sentence, not "pences"?

Do the concepts of IP address and network interface not belong to the same layer?

Divine apple island

Can I sign legal documents with a smiley face?

Difference between -| and |- in TikZ

Why did the EU agree to delay the Brexit deadline?

Does having a TSA Pre-Check member in your flight reservation increase the chances that everyone gets Pre-Check?

Create all possible words using a set or letters

List of people who lose a child in תנ"ך

Open a doc from terminal, but not by its name

Drawing ramified coverings with tikz

Do Legal Documents Require Signing In Standard Pen Colors?

Is it possible to use .desktop files to open local pdf files on specific pages with a browser?

What does this horizontal bar at the first measure mean?

Does the Mind Blank spell prevent the target from being frightened?

What is the difference between "Do you interest" and "...interested in" something?

Can the Supreme Court overturn an impeachment?

Translation of Scottish 16th century church stained glass



Scala: Process dataframe while value in column meets condition


How to sort a dataframe by multiple column(s)?How do I replace NA values with zeros in an R dataframe?Selecting multiple columns in a pandas dataframeAdding new column to existing DataFrame in Python pandasHow to change the order of DataFrame columns?Delete column from pandas DataFrame by column nameHow to drop rows of Pandas DataFrame whose value in certain columns is NaNSelect rows from a DataFrame based on values in a column in pandasGet list from pandas DataFrame column headersReplace missing values with mean - Spark Dataframe













1















I have to process a huge dataframe, download files from a service by the id column of the dataframe. The logic to download, and all the changes are prepared, but I am not sure what is the best way to make a loop around this. I run this on Databricks, which is why I need to perform the processes in chunks.



The dataframe has a "status" column, which can hold the following values:




"todo", "processing", "failed", "succeeded"




In the while loop I want to perform the following tasks:



while (there are rows with status "todo") 
- get the first 10 rows if status is todo (DONE)
- start processing the dataframe, update status to processing (DONE)
- download files (call UDF), update status to succeeded or failed
(DONE, not in the code here)



I would like to run this until all the rows' status are other then todo! The problem is that this while loop is not finishing, because the dataframe itself is not updated. It needs to be assigned to another dataframe, but then how to add the new one to the loop?



My code right now:



while(statusDoc.where("status == 'todo'").count > 0) 
val todoDF = test.filter("status == 'todo'")

val processingDF = todoDF.limit(10).withColumn("status", when(col("status") === "todo", "processing")
.otherwise(col("status")))

statusDoc.join(processingDF, Seq("id"), "outer")
.select($"id",
statusDoc("fileUrl"),
coalesce(processingDF("status"), statusDoc("status")).alias("status"))




The join should go like this:



val update = statusDoc.join(processingDF, Seq("id"), "outer")
.select($"id", statusDoc("fileUrl"),
coalesce(processingDF("status"), statusDoc("status")).alias("status"))


Then this new update dataframe should be used for the next round of loop.










share|improve this question




























    1















    I have to process a huge dataframe, download files from a service by the id column of the dataframe. The logic to download, and all the changes are prepared, but I am not sure what is the best way to make a loop around this. I run this on Databricks, which is why I need to perform the processes in chunks.



    The dataframe has a "status" column, which can hold the following values:




    "todo", "processing", "failed", "succeeded"




    In the while loop I want to perform the following tasks:



    while (there are rows with status "todo") 
    - get the first 10 rows if status is todo (DONE)
    - start processing the dataframe, update status to processing (DONE)
    - download files (call UDF), update status to succeeded or failed
    (DONE, not in the code here)



    I would like to run this until all the rows' status are other then todo! The problem is that this while loop is not finishing, because the dataframe itself is not updated. It needs to be assigned to another dataframe, but then how to add the new one to the loop?



    My code right now:



    while(statusDoc.where("status == 'todo'").count > 0) 
    val todoDF = test.filter("status == 'todo'")

    val processingDF = todoDF.limit(10).withColumn("status", when(col("status") === "todo", "processing")
    .otherwise(col("status")))

    statusDoc.join(processingDF, Seq("id"), "outer")
    .select($"id",
    statusDoc("fileUrl"),
    coalesce(processingDF("status"), statusDoc("status")).alias("status"))




    The join should go like this:



    val update = statusDoc.join(processingDF, Seq("id"), "outer")
    .select($"id", statusDoc("fileUrl"),
    coalesce(processingDF("status"), statusDoc("status")).alias("status"))


    Then this new update dataframe should be used for the next round of loop.










    share|improve this question


























      1












      1








      1


      0






      I have to process a huge dataframe, download files from a service by the id column of the dataframe. The logic to download, and all the changes are prepared, but I am not sure what is the best way to make a loop around this. I run this on Databricks, which is why I need to perform the processes in chunks.



      The dataframe has a "status" column, which can hold the following values:




      "todo", "processing", "failed", "succeeded"




      In the while loop I want to perform the following tasks:



      while (there are rows with status "todo") 
      - get the first 10 rows if status is todo (DONE)
      - start processing the dataframe, update status to processing (DONE)
      - download files (call UDF), update status to succeeded or failed
      (DONE, not in the code here)



      I would like to run this until all the rows' status are other then todo! The problem is that this while loop is not finishing, because the dataframe itself is not updated. It needs to be assigned to another dataframe, but then how to add the new one to the loop?



      My code right now:



      while(statusDoc.where("status == 'todo'").count > 0) 
      val todoDF = test.filter("status == 'todo'")

      val processingDF = todoDF.limit(10).withColumn("status", when(col("status") === "todo", "processing")
      .otherwise(col("status")))

      statusDoc.join(processingDF, Seq("id"), "outer")
      .select($"id",
      statusDoc("fileUrl"),
      coalesce(processingDF("status"), statusDoc("status")).alias("status"))




      The join should go like this:



      val update = statusDoc.join(processingDF, Seq("id"), "outer")
      .select($"id", statusDoc("fileUrl"),
      coalesce(processingDF("status"), statusDoc("status")).alias("status"))


      Then this new update dataframe should be used for the next round of loop.










      share|improve this question
















      I have to process a huge dataframe, download files from a service by the id column of the dataframe. The logic to download, and all the changes are prepared, but I am not sure what is the best way to make a loop around this. I run this on Databricks, which is why I need to perform the processes in chunks.



      The dataframe has a "status" column, which can hold the following values:




      "todo", "processing", "failed", "succeeded"




      In the while loop I want to perform the following tasks:



      while (there are rows with status "todo") 
      - get the first 10 rows if status is todo (DONE)
      - start processing the dataframe, update status to processing (DONE)
      - download files (call UDF), update status to succeeded or failed
      (DONE, not in the code here)



      I would like to run this until all the rows' status are other then todo! The problem is that this while loop is not finishing, because the dataframe itself is not updated. It needs to be assigned to another dataframe, but then how to add the new one to the loop?



      My code right now:



      while(statusDoc.where("status == 'todo'").count > 0) 
      val todoDF = test.filter("status == 'todo'")

      val processingDF = todoDF.limit(10).withColumn("status", when(col("status") === "todo", "processing")
      .otherwise(col("status")))

      statusDoc.join(processingDF, Seq("id"), "outer")
      .select($"id",
      statusDoc("fileUrl"),
      coalesce(processingDF("status"), statusDoc("status")).alias("status"))




      The join should go like this:



      val update = statusDoc.join(processingDF, Seq("id"), "outer")
      .select($"id", statusDoc("fileUrl"),
      coalesce(processingDF("status"), statusDoc("status")).alias("status"))


      Then this new update dataframe should be used for the next round of loop.







      scala apache-spark dataframe while-loop do-while






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Mar 7 at 11:42







      Eva

















      asked Mar 7 at 9:04









      EvaEva

      7112




      7112






















          1 Answer
          1






          active

          oldest

          votes


















          1














          One thing to remember here is that DataFrame (Spark) are not mutable because they are distributed. You have no guarantee that a given modification would be properly propagated across all the network of executors, if you make some. And you also have no guarantee that a given portion of the data has not already been used somewhere else (in another node for example).



          One thing you can do though is add another column with the updated values and remove the old column.



          val update = statusDoc.
          .withColumnRenamed("status", "status_doc")
          .join(processingDF, Seq("id"), "outer")
          .withColumn("updated_status", udf((stold: String, stold: String) => if (stnew != null) stnew else stold).apply(col("status"), col("status_doc"))
          .drop("status_doc", "status")
          .withColumnRenamed("updated_status", "status")
          .select("id", "fileUrl", "status")


          Then make sure you replace "statusDoc" with the "update" DataFrame. Do not forget to make the DataFrame a "var" instead of a "val". I'm surprised your IDE has not yelled yet.



          Also, I'm sure you can think of a way of distributing the problem so that you avoid the while loop - I can help you do that but I need a clearer description of you issue. If you use a while loop, you won't use the full capabilities of your cluster because the while loop is only executed on the master. Then, you'll treat only 10 lines at a time, each time. I'm sure you can append all data you need to the whole DataFrame in a single map operation.






          share|improve this answer

























          • The while loop is required because of memory problems from the operations part var k = statusDoc.where("status == 'todo'").count while(k > 0) val todoDF = statusDoc.filter("status == 'todo'") val processingDF = todoDF.limit(10000).withColumn("status", when(col("status") === "todo", "processing") .otherwise(col("status"))) statusDoc = statusDoc.join(processingDF, Seq("id"), "outer") .select($"id", statusDoc("fileUrl"), coalesce(processingDF("status"), statusDoc("status")).alias("status")) k -= 10000

            – Eva
            Mar 7 at 14:36












          • Did you try a single map operation instead of the while loop? If the cache is full, it should remove already-treated lines (if you don't try to cache the input dataset).

            – belka
            Mar 7 at 17:10











          • @Eva try a single map with an UDF defined with the same treatment you have right now. Also, it depends where your OOM error occurs: if it's in the executors, that's because you mis-repartitioned your data and if it's in the master node then something needs to be passed on to the executors.

            – belka
            Mar 7 at 17:12










          Your Answer






          StackExchange.ifUsing("editor", function ()
          StackExchange.using("externalEditor", function ()
          StackExchange.using("snippets", function ()
          StackExchange.snippets.init();
          );
          );
          , "code-snippets");

          StackExchange.ready(function()
          var channelOptions =
          tags: "".split(" "),
          id: "1"
          ;
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function()
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled)
          StackExchange.using("snippets", function()
          createEditor();
          );

          else
          createEditor();

          );

          function createEditor()
          StackExchange.prepareEditor(
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader:
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          ,
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          );



          );













          draft saved

          draft discarded


















          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55039811%2fscala-process-dataframe-while-value-in-column-meets-condition%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          1














          One thing to remember here is that DataFrame (Spark) are not mutable because they are distributed. You have no guarantee that a given modification would be properly propagated across all the network of executors, if you make some. And you also have no guarantee that a given portion of the data has not already been used somewhere else (in another node for example).



          One thing you can do though is add another column with the updated values and remove the old column.



          val update = statusDoc.
          .withColumnRenamed("status", "status_doc")
          .join(processingDF, Seq("id"), "outer")
          .withColumn("updated_status", udf((stold: String, stold: String) => if (stnew != null) stnew else stold).apply(col("status"), col("status_doc"))
          .drop("status_doc", "status")
          .withColumnRenamed("updated_status", "status")
          .select("id", "fileUrl", "status")


          Then make sure you replace "statusDoc" with the "update" DataFrame. Do not forget to make the DataFrame a "var" instead of a "val". I'm surprised your IDE has not yelled yet.



          Also, I'm sure you can think of a way of distributing the problem so that you avoid the while loop - I can help you do that but I need a clearer description of you issue. If you use a while loop, you won't use the full capabilities of your cluster because the while loop is only executed on the master. Then, you'll treat only 10 lines at a time, each time. I'm sure you can append all data you need to the whole DataFrame in a single map operation.






          share|improve this answer

























          • The while loop is required because of memory problems from the operations part var k = statusDoc.where("status == 'todo'").count while(k > 0) val todoDF = statusDoc.filter("status == 'todo'") val processingDF = todoDF.limit(10000).withColumn("status", when(col("status") === "todo", "processing") .otherwise(col("status"))) statusDoc = statusDoc.join(processingDF, Seq("id"), "outer") .select($"id", statusDoc("fileUrl"), coalesce(processingDF("status"), statusDoc("status")).alias("status")) k -= 10000

            – Eva
            Mar 7 at 14:36












          • Did you try a single map operation instead of the while loop? If the cache is full, it should remove already-treated lines (if you don't try to cache the input dataset).

            – belka
            Mar 7 at 17:10











          • @Eva try a single map with an UDF defined with the same treatment you have right now. Also, it depends where your OOM error occurs: if it's in the executors, that's because you mis-repartitioned your data and if it's in the master node then something needs to be passed on to the executors.

            – belka
            Mar 7 at 17:12















          1














          One thing to remember here is that DataFrame (Spark) are not mutable because they are distributed. You have no guarantee that a given modification would be properly propagated across all the network of executors, if you make some. And you also have no guarantee that a given portion of the data has not already been used somewhere else (in another node for example).



          One thing you can do though is add another column with the updated values and remove the old column.



          val update = statusDoc.
          .withColumnRenamed("status", "status_doc")
          .join(processingDF, Seq("id"), "outer")
          .withColumn("updated_status", udf((stold: String, stold: String) => if (stnew != null) stnew else stold).apply(col("status"), col("status_doc"))
          .drop("status_doc", "status")
          .withColumnRenamed("updated_status", "status")
          .select("id", "fileUrl", "status")


          Then make sure you replace "statusDoc" with the "update" DataFrame. Do not forget to make the DataFrame a "var" instead of a "val". I'm surprised your IDE has not yelled yet.



          Also, I'm sure you can think of a way of distributing the problem so that you avoid the while loop - I can help you do that but I need a clearer description of you issue. If you use a while loop, you won't use the full capabilities of your cluster because the while loop is only executed on the master. Then, you'll treat only 10 lines at a time, each time. I'm sure you can append all data you need to the whole DataFrame in a single map operation.






          share|improve this answer

























          • The while loop is required because of memory problems from the operations part var k = statusDoc.where("status == 'todo'").count while(k > 0) val todoDF = statusDoc.filter("status == 'todo'") val processingDF = todoDF.limit(10000).withColumn("status", when(col("status") === "todo", "processing") .otherwise(col("status"))) statusDoc = statusDoc.join(processingDF, Seq("id"), "outer") .select($"id", statusDoc("fileUrl"), coalesce(processingDF("status"), statusDoc("status")).alias("status")) k -= 10000

            – Eva
            Mar 7 at 14:36












          • Did you try a single map operation instead of the while loop? If the cache is full, it should remove already-treated lines (if you don't try to cache the input dataset).

            – belka
            Mar 7 at 17:10











          • @Eva try a single map with an UDF defined with the same treatment you have right now. Also, it depends where your OOM error occurs: if it's in the executors, that's because you mis-repartitioned your data and if it's in the master node then something needs to be passed on to the executors.

            – belka
            Mar 7 at 17:12













          1












          1








          1







          One thing to remember here is that DataFrame (Spark) are not mutable because they are distributed. You have no guarantee that a given modification would be properly propagated across all the network of executors, if you make some. And you also have no guarantee that a given portion of the data has not already been used somewhere else (in another node for example).



          One thing you can do though is add another column with the updated values and remove the old column.



          val update = statusDoc.
          .withColumnRenamed("status", "status_doc")
          .join(processingDF, Seq("id"), "outer")
          .withColumn("updated_status", udf((stold: String, stold: String) => if (stnew != null) stnew else stold).apply(col("status"), col("status_doc"))
          .drop("status_doc", "status")
          .withColumnRenamed("updated_status", "status")
          .select("id", "fileUrl", "status")


          Then make sure you replace "statusDoc" with the "update" DataFrame. Do not forget to make the DataFrame a "var" instead of a "val". I'm surprised your IDE has not yelled yet.



          Also, I'm sure you can think of a way of distributing the problem so that you avoid the while loop - I can help you do that but I need a clearer description of you issue. If you use a while loop, you won't use the full capabilities of your cluster because the while loop is only executed on the master. Then, you'll treat only 10 lines at a time, each time. I'm sure you can append all data you need to the whole DataFrame in a single map operation.






          share|improve this answer















          One thing to remember here is that DataFrame (Spark) are not mutable because they are distributed. You have no guarantee that a given modification would be properly propagated across all the network of executors, if you make some. And you also have no guarantee that a given portion of the data has not already been used somewhere else (in another node for example).



          One thing you can do though is add another column with the updated values and remove the old column.



          val update = statusDoc.
          .withColumnRenamed("status", "status_doc")
          .join(processingDF, Seq("id"), "outer")
          .withColumn("updated_status", udf((stold: String, stold: String) => if (stnew != null) stnew else stold).apply(col("status"), col("status_doc"))
          .drop("status_doc", "status")
          .withColumnRenamed("updated_status", "status")
          .select("id", "fileUrl", "status")


          Then make sure you replace "statusDoc" with the "update" DataFrame. Do not forget to make the DataFrame a "var" instead of a "val". I'm surprised your IDE has not yelled yet.



          Also, I'm sure you can think of a way of distributing the problem so that you avoid the while loop - I can help you do that but I need a clearer description of you issue. If you use a while loop, you won't use the full capabilities of your cluster because the while loop is only executed on the master. Then, you'll treat only 10 lines at a time, each time. I'm sure you can append all data you need to the whole DataFrame in a single map operation.







          share|improve this answer














          share|improve this answer



          share|improve this answer








          edited Mar 8 at 21:48









          marc_s

          582k13011231269




          582k13011231269










          answered Mar 7 at 13:50









          belkabelka

          554419




          554419












          • The while loop is required because of memory problems from the operations part var k = statusDoc.where("status == 'todo'").count while(k > 0) val todoDF = statusDoc.filter("status == 'todo'") val processingDF = todoDF.limit(10000).withColumn("status", when(col("status") === "todo", "processing") .otherwise(col("status"))) statusDoc = statusDoc.join(processingDF, Seq("id"), "outer") .select($"id", statusDoc("fileUrl"), coalesce(processingDF("status"), statusDoc("status")).alias("status")) k -= 10000

            – Eva
            Mar 7 at 14:36












          • Did you try a single map operation instead of the while loop? If the cache is full, it should remove already-treated lines (if you don't try to cache the input dataset).

            – belka
            Mar 7 at 17:10











          • @Eva try a single map with an UDF defined with the same treatment you have right now. Also, it depends where your OOM error occurs: if it's in the executors, that's because you mis-repartitioned your data and if it's in the master node then something needs to be passed on to the executors.

            – belka
            Mar 7 at 17:12

















          • The while loop is required because of memory problems from the operations part var k = statusDoc.where("status == 'todo'").count while(k > 0) val todoDF = statusDoc.filter("status == 'todo'") val processingDF = todoDF.limit(10000).withColumn("status", when(col("status") === "todo", "processing") .otherwise(col("status"))) statusDoc = statusDoc.join(processingDF, Seq("id"), "outer") .select($"id", statusDoc("fileUrl"), coalesce(processingDF("status"), statusDoc("status")).alias("status")) k -= 10000

            – Eva
            Mar 7 at 14:36












          • Did you try a single map operation instead of the while loop? If the cache is full, it should remove already-treated lines (if you don't try to cache the input dataset).

            – belka
            Mar 7 at 17:10











          • @Eva try a single map with an UDF defined with the same treatment you have right now. Also, it depends where your OOM error occurs: if it's in the executors, that's because you mis-repartitioned your data and if it's in the master node then something needs to be passed on to the executors.

            – belka
            Mar 7 at 17:12
















          The while loop is required because of memory problems from the operations part var k = statusDoc.where("status == 'todo'").count while(k > 0) val todoDF = statusDoc.filter("status == 'todo'") val processingDF = todoDF.limit(10000).withColumn("status", when(col("status") === "todo", "processing") .otherwise(col("status"))) statusDoc = statusDoc.join(processingDF, Seq("id"), "outer") .select($"id", statusDoc("fileUrl"), coalesce(processingDF("status"), statusDoc("status")).alias("status")) k -= 10000

          – Eva
          Mar 7 at 14:36






          The while loop is required because of memory problems from the operations part var k = statusDoc.where("status == 'todo'").count while(k > 0) val todoDF = statusDoc.filter("status == 'todo'") val processingDF = todoDF.limit(10000).withColumn("status", when(col("status") === "todo", "processing") .otherwise(col("status"))) statusDoc = statusDoc.join(processingDF, Seq("id"), "outer") .select($"id", statusDoc("fileUrl"), coalesce(processingDF("status"), statusDoc("status")).alias("status")) k -= 10000

          – Eva
          Mar 7 at 14:36














          Did you try a single map operation instead of the while loop? If the cache is full, it should remove already-treated lines (if you don't try to cache the input dataset).

          – belka
          Mar 7 at 17:10





          Did you try a single map operation instead of the while loop? If the cache is full, it should remove already-treated lines (if you don't try to cache the input dataset).

          – belka
          Mar 7 at 17:10













          @Eva try a single map with an UDF defined with the same treatment you have right now. Also, it depends where your OOM error occurs: if it's in the executors, that's because you mis-repartitioned your data and if it's in the master node then something needs to be passed on to the executors.

          – belka
          Mar 7 at 17:12





          @Eva try a single map with an UDF defined with the same treatment you have right now. Also, it depends where your OOM error occurs: if it's in the executors, that's because you mis-repartitioned your data and if it's in the master node then something needs to be passed on to the executors.

          – belka
          Mar 7 at 17:12



















          draft saved

          draft discarded
















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid


          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.

          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55039811%2fscala-process-dataframe-while-value-in-column-meets-condition%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          1928 у кіно

          Захаров Федір Захарович

          Ель Греко