Extract time intervals in a scala spark dataframe Announcing the arrival of Valued Associate #679: Cesar Manara Planned maintenance scheduled April 17/18, 2019 at 00:00UTC (8:00pm US/Eastern) Data science time! April 2019 and salary with experience Should we burninate the [wrap] tag? The Ask Question Wizard is Live!How to make good reproducible Apache Spark examplesSpark SQL window function with complex conditionCombine rows when the end time of one is the start time of another (Oracle)How to sort a dataframe by multiple column(s)Delete column from pandas DataFrame by column nameHow to iterate over rows in a DataFrame in Pandas?Select rows from a DataFrame based on values in a column in pandasCount instances of combination of columns in spark dataframe using scalaspark (Scala) dataframe filtering (FIR)Spark scala - how to do count() by conditioning on two rowsCheck equality for two Spark DataFrames in ScalaGenerate a interval based time series using Spark SQLPercentile based CountDistinct in Spark dataframe - Scala

Were Kohanim forbidden from serving in King David's army?

Should I call the interviewer directly, if HR aren't responding?

Can inflation occur in a positive-sum game currency system such as the Stack Exchange reputation system?

Letter Boxed validator

Is there a concise way to say "all of the X, one of each"?

How to do this path/lattice with tikz

What is the correct way to use the pinch test for dehydration?

How can I fade player character when he goes inside or outside of the area?

How to bypass password on Windows XP account?

Why aren't air breathing engines used as small first stages

Models of set theory where not every set can be linearly ordered

What causes the vertical darker bands in my photo?

Why did the IBM 650 use bi-quinary?

What do you call a plan that's an alternative plan in case your initial plan fails?

Why does Python start at index -1 when indexing a list from the end?

Why are there no cargo aircraft with "flying wing" design?

The logistics of corpse disposal

Why constant symbols in a language?

Why was the term "discrete" used in discrete logarithm?

What happens to sewage if there is no river near by?

Is there any avatar supposed to be born between the death of Krishna and the birth of Kalki?

When is phishing education going too far?

Are my PIs rude or am I just being too sensitive?

How can players work together to take actions that are otherwise impossible?



Extract time intervals in a scala spark dataframe



Announcing the arrival of Valued Associate #679: Cesar Manara
Planned maintenance scheduled April 17/18, 2019 at 00:00UTC (8:00pm US/Eastern)
Data science time! April 2019 and salary with experience
Should we burninate the [wrap] tag?
The Ask Question Wizard is Live!How to make good reproducible Apache Spark examplesSpark SQL window function with complex conditionCombine rows when the end time of one is the start time of another (Oracle)How to sort a dataframe by multiple column(s)Delete column from pandas DataFrame by column nameHow to iterate over rows in a DataFrame in Pandas?Select rows from a DataFrame based on values in a column in pandasCount instances of combination of columns in spark dataframe using scalaspark (Scala) dataframe filtering (FIR)Spark scala - how to do count() by conditioning on two rowsCheck equality for two Spark DataFrames in ScalaGenerate a interval based time series using Spark SQLPercentile based CountDistinct in Spark dataframe - Scala



.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty height:90px;width:728px;box-sizing:border-box;








1















I'm trying to extract combined data intervals based on a time series in scala and spark



I have the following data in a dataframe:



Id | State | StartTime | EndTime
---+-------+---------------------+--------------------
1 | R | 2019-01-01T03:00:00 | 2019-01-01T11:30:00
1 | R | 2019-01-01T11:30:00 | 2019-01-01T15:00:00
1 | R | 2019-01-01T15:00:00 | 2019-01-01T22:00:00
1 | W | 2019-01-01T22:00:00 | 2019-01-02T04:30:00
1 | W | 2019-01-02T04:30:00 | 2019-01-02T13:45:00
1 | R | 2019-01-02T13:45:00 | 2019-01-02T18:30:00
1 | R | 2019-01-02T18:30:00 | 2019-01-02T22:45:00


I need to extract the data into time intervals based on the id and state. The resulting data needs to look like:



Id | State | StartTime | EndTime
---+-------+---------------------+--------------------
1 | R | 2019-01-01T03:00:00 | 2019-01-01T22:00:00
1 | W | 2019-01-01T22:00:00 | 2019-01-02T13:45:00
1 | R | 2019-01-02T13:45:00 | 2019-01-02T22:45:00


Note that the first three records have been grouped together because the equipment is contiguously in an R state from 2019-01-01T03:00:00 to 2019-01-01T22:00:00, then it switches to a W state for the next two records from 2019-01-01T22:00:00 to 2019-01-02T13:45:00and then back to an R state for the last two records.










share|improve this question
























  • Possible duplicate of Spark SQL window function with complex condition

    – user10465355
    Mar 8 at 16:50











  • I looked at that question and it is a very different problem

    – Jeff Hornby
    Mar 8 at 17:01











  • In such case could edit your question and explain desired logic in detail? Additionally we always welcome reproducible example in apache-spark. Thank you in advance.

    – user10465355
    Mar 8 at 17:03






  • 2





    turns out the solution is this: stackoverflow.com/questions/7420618/… translated to spark

    – Jeff Hornby
    Mar 8 at 19:44











  • hi @JeffHornby, did you manage to convert this to Spark code?

    – Alexandros Biratsis
    Mar 15 at 8:07

















1















I'm trying to extract combined data intervals based on a time series in scala and spark



I have the following data in a dataframe:



Id | State | StartTime | EndTime
---+-------+---------------------+--------------------
1 | R | 2019-01-01T03:00:00 | 2019-01-01T11:30:00
1 | R | 2019-01-01T11:30:00 | 2019-01-01T15:00:00
1 | R | 2019-01-01T15:00:00 | 2019-01-01T22:00:00
1 | W | 2019-01-01T22:00:00 | 2019-01-02T04:30:00
1 | W | 2019-01-02T04:30:00 | 2019-01-02T13:45:00
1 | R | 2019-01-02T13:45:00 | 2019-01-02T18:30:00
1 | R | 2019-01-02T18:30:00 | 2019-01-02T22:45:00


I need to extract the data into time intervals based on the id and state. The resulting data needs to look like:



Id | State | StartTime | EndTime
---+-------+---------------------+--------------------
1 | R | 2019-01-01T03:00:00 | 2019-01-01T22:00:00
1 | W | 2019-01-01T22:00:00 | 2019-01-02T13:45:00
1 | R | 2019-01-02T13:45:00 | 2019-01-02T22:45:00


Note that the first three records have been grouped together because the equipment is contiguously in an R state from 2019-01-01T03:00:00 to 2019-01-01T22:00:00, then it switches to a W state for the next two records from 2019-01-01T22:00:00 to 2019-01-02T13:45:00and then back to an R state for the last two records.










share|improve this question
























  • Possible duplicate of Spark SQL window function with complex condition

    – user10465355
    Mar 8 at 16:50











  • I looked at that question and it is a very different problem

    – Jeff Hornby
    Mar 8 at 17:01











  • In such case could edit your question and explain desired logic in detail? Additionally we always welcome reproducible example in apache-spark. Thank you in advance.

    – user10465355
    Mar 8 at 17:03






  • 2





    turns out the solution is this: stackoverflow.com/questions/7420618/… translated to spark

    – Jeff Hornby
    Mar 8 at 19:44











  • hi @JeffHornby, did you manage to convert this to Spark code?

    – Alexandros Biratsis
    Mar 15 at 8:07













1












1








1


2






I'm trying to extract combined data intervals based on a time series in scala and spark



I have the following data in a dataframe:



Id | State | StartTime | EndTime
---+-------+---------------------+--------------------
1 | R | 2019-01-01T03:00:00 | 2019-01-01T11:30:00
1 | R | 2019-01-01T11:30:00 | 2019-01-01T15:00:00
1 | R | 2019-01-01T15:00:00 | 2019-01-01T22:00:00
1 | W | 2019-01-01T22:00:00 | 2019-01-02T04:30:00
1 | W | 2019-01-02T04:30:00 | 2019-01-02T13:45:00
1 | R | 2019-01-02T13:45:00 | 2019-01-02T18:30:00
1 | R | 2019-01-02T18:30:00 | 2019-01-02T22:45:00


I need to extract the data into time intervals based on the id and state. The resulting data needs to look like:



Id | State | StartTime | EndTime
---+-------+---------------------+--------------------
1 | R | 2019-01-01T03:00:00 | 2019-01-01T22:00:00
1 | W | 2019-01-01T22:00:00 | 2019-01-02T13:45:00
1 | R | 2019-01-02T13:45:00 | 2019-01-02T22:45:00


Note that the first three records have been grouped together because the equipment is contiguously in an R state from 2019-01-01T03:00:00 to 2019-01-01T22:00:00, then it switches to a W state for the next two records from 2019-01-01T22:00:00 to 2019-01-02T13:45:00and then back to an R state for the last two records.










share|improve this question
















I'm trying to extract combined data intervals based on a time series in scala and spark



I have the following data in a dataframe:



Id | State | StartTime | EndTime
---+-------+---------------------+--------------------
1 | R | 2019-01-01T03:00:00 | 2019-01-01T11:30:00
1 | R | 2019-01-01T11:30:00 | 2019-01-01T15:00:00
1 | R | 2019-01-01T15:00:00 | 2019-01-01T22:00:00
1 | W | 2019-01-01T22:00:00 | 2019-01-02T04:30:00
1 | W | 2019-01-02T04:30:00 | 2019-01-02T13:45:00
1 | R | 2019-01-02T13:45:00 | 2019-01-02T18:30:00
1 | R | 2019-01-02T18:30:00 | 2019-01-02T22:45:00


I need to extract the data into time intervals based on the id and state. The resulting data needs to look like:



Id | State | StartTime | EndTime
---+-------+---------------------+--------------------
1 | R | 2019-01-01T03:00:00 | 2019-01-01T22:00:00
1 | W | 2019-01-01T22:00:00 | 2019-01-02T13:45:00
1 | R | 2019-01-02T13:45:00 | 2019-01-02T22:45:00


Note that the first three records have been grouped together because the equipment is contiguously in an R state from 2019-01-01T03:00:00 to 2019-01-01T22:00:00, then it switches to a W state for the next two records from 2019-01-01T22:00:00 to 2019-01-02T13:45:00and then back to an R state for the last two records.







scala apache-spark dataframe apache-spark-sql






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Mar 8 at 17:27







Jeff Hornby

















asked Mar 8 at 16:34









Jeff HornbyJeff Hornby

9,77333251




9,77333251












  • Possible duplicate of Spark SQL window function with complex condition

    – user10465355
    Mar 8 at 16:50











  • I looked at that question and it is a very different problem

    – Jeff Hornby
    Mar 8 at 17:01











  • In such case could edit your question and explain desired logic in detail? Additionally we always welcome reproducible example in apache-spark. Thank you in advance.

    – user10465355
    Mar 8 at 17:03






  • 2





    turns out the solution is this: stackoverflow.com/questions/7420618/… translated to spark

    – Jeff Hornby
    Mar 8 at 19:44











  • hi @JeffHornby, did you manage to convert this to Spark code?

    – Alexandros Biratsis
    Mar 15 at 8:07

















  • Possible duplicate of Spark SQL window function with complex condition

    – user10465355
    Mar 8 at 16:50











  • I looked at that question and it is a very different problem

    – Jeff Hornby
    Mar 8 at 17:01











  • In such case could edit your question and explain desired logic in detail? Additionally we always welcome reproducible example in apache-spark. Thank you in advance.

    – user10465355
    Mar 8 at 17:03






  • 2





    turns out the solution is this: stackoverflow.com/questions/7420618/… translated to spark

    – Jeff Hornby
    Mar 8 at 19:44











  • hi @JeffHornby, did you manage to convert this to Spark code?

    – Alexandros Biratsis
    Mar 15 at 8:07
















Possible duplicate of Spark SQL window function with complex condition

– user10465355
Mar 8 at 16:50





Possible duplicate of Spark SQL window function with complex condition

– user10465355
Mar 8 at 16:50













I looked at that question and it is a very different problem

– Jeff Hornby
Mar 8 at 17:01





I looked at that question and it is a very different problem

– Jeff Hornby
Mar 8 at 17:01













In such case could edit your question and explain desired logic in detail? Additionally we always welcome reproducible example in apache-spark. Thank you in advance.

– user10465355
Mar 8 at 17:03





In such case could edit your question and explain desired logic in detail? Additionally we always welcome reproducible example in apache-spark. Thank you in advance.

– user10465355
Mar 8 at 17:03




2




2





turns out the solution is this: stackoverflow.com/questions/7420618/… translated to spark

– Jeff Hornby
Mar 8 at 19:44





turns out the solution is this: stackoverflow.com/questions/7420618/… translated to spark

– Jeff Hornby
Mar 8 at 19:44













hi @JeffHornby, did you manage to convert this to Spark code?

– Alexandros Biratsis
Mar 15 at 8:07





hi @JeffHornby, did you manage to convert this to Spark code?

– Alexandros Biratsis
Mar 15 at 8:07












2 Answers
2






active

oldest

votes


















0














Since I had a similar case recently I would like to provide the complete solution for this one. The part of the code:



val df2 = df
.select('Id,'State,'StartTime,'EndTime,
row_number().over(idSpec).as("idRowNumber"),
row_number().over(idStateSpec).as("idStateRowNumber"))


Has output:



+---+-----+-------------------+-------------------+-----------+----------------+
| Id|State| StartTime| EndTime|idRowNumber|idStateRowNumber|
+---+-----+-------------------+-------------------+-----------+----------------+
| 1| R|2019-01-01 03:00:00|2019-01-01 11:30:00| 1| 1|
| 1| R|2019-01-01 11:30:00|2019-01-01 15:00:00| 2| 2|
| 1| R|2019-01-01 15:00:00|2019-01-01 22:00:00| 3| 3|
| 1| W|2019-01-01 22:00:00|2019-01-02 04:30:00| 4| 1|
| 1| W|2019-01-02 04:30:00|2019-01-02 13:45:00| 5| 2|
| 1| R|2019-01-02 13:45:00|2019-01-02 18:30:00| 6| 4|
| 1| R|2019-01-02 18:30:00|2019-01-02 22:45:00| 7| 5|
+---+-----+-------------------+-------------------+-----------+----------------+


Notice that the difference between idRowNumber and idStateRowNumber will be identical for each combination of (Id, State) hence we can create a new column called category and group by this one in order to get min StartTime and max EndTime for each group. The complete code should look like the next one:



val idSpec = Window.partitionBy('Id).orderBy('StartTime)
val idStateSpec = Window.partitionBy('Id,'State).orderBy('StartTime)

val df2 = df
.select('Id,'State,'StartTime.cast("timestamp"),'EndTime.cast("timestamp"),
row_number().over(idSpec).as("idRowNumber"),
row_number().over(idStateSpec).as("idStateRowNumber"))
.withColumn("Category", $"idRowNumber" - $"idStateRowNumber")
.groupBy("Category", "Id", "State")
.agg(min("StartTime").as("StartTime"), max("EndTime").as("EndTime"))
.drop("Category")


And the output:



+---+-----+-------------------+-------------------+
| Id|State| StartTime| EndTime|
+---+-----+-------------------+-------------------+
| 1| R|2019-01-01 03:00:00|2019-01-01 22:00:00|
| 1| W|2019-01-01 22:00:00|2019-01-02 13:45:00|
| 1| R|2019-01-02 13:45:00|2019-01-02 22:45:00|
+---+-----+-------------------+-------------------+





share|improve this answer






























    0














    So it turns out that the answer to this is Combine rows when the end time of one is the start time of another (Oracle) translated to Spark.



    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.functions.col,row_number
    import spark.implicits._

    val idSpec = Window.partitionBy('Id).orderBy('StartTime)
    val idStateSpec = Window.partitionBy('Id,'State).orderBy('StartTime)
    val df2 = df
    .select('Id,'State,'StartTime,'EndTime,
    row_number().over(idSpec).as("idRowNumber"),
    row_number().over(idStateSpec).as("idStateRowNumber"))
    .groupBy('Id,'State,'idRowNumber - 'idStateRowNumber)
    .agg(min('StartTime).as("StartTime"), max('EndTime).as("EndTime"))





    share|improve this answer

























      Your Answer






      StackExchange.ifUsing("editor", function ()
      StackExchange.using("externalEditor", function ()
      StackExchange.using("snippets", function ()
      StackExchange.snippets.init();
      );
      );
      , "code-snippets");

      StackExchange.ready(function()
      var channelOptions =
      tags: "".split(" "),
      id: "1"
      ;
      initTagRenderer("".split(" "), "".split(" "), channelOptions);

      StackExchange.using("externalEditor", function()
      // Have to fire editor after snippets, if snippets enabled
      if (StackExchange.settings.snippets.snippetsEnabled)
      StackExchange.using("snippets", function()
      createEditor();
      );

      else
      createEditor();

      );

      function createEditor()
      StackExchange.prepareEditor(
      heartbeatType: 'answer',
      autoActivateHeartbeat: false,
      convertImagesToLinks: true,
      noModals: true,
      showLowRepImageUploadWarning: true,
      reputationToPostImages: 10,
      bindNavPrevention: true,
      postfix: "",
      imageUploader:
      brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
      contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
      allowUrls: true
      ,
      onDemand: true,
      discardSelector: ".discard-answer"
      ,immediatelyShowMarkdownHelp:true
      );



      );













      draft saved

      draft discarded


















      StackExchange.ready(
      function ()
      StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55067305%2fextract-time-intervals-in-a-scala-spark-dataframe%23new-answer', 'question_page');

      );

      Post as a guest















      Required, but never shown

























      2 Answers
      2






      active

      oldest

      votes








      2 Answers
      2






      active

      oldest

      votes









      active

      oldest

      votes






      active

      oldest

      votes









      0














      Since I had a similar case recently I would like to provide the complete solution for this one. The part of the code:



      val df2 = df
      .select('Id,'State,'StartTime,'EndTime,
      row_number().over(idSpec).as("idRowNumber"),
      row_number().over(idStateSpec).as("idStateRowNumber"))


      Has output:



      +---+-----+-------------------+-------------------+-----------+----------------+
      | Id|State| StartTime| EndTime|idRowNumber|idStateRowNumber|
      +---+-----+-------------------+-------------------+-----------+----------------+
      | 1| R|2019-01-01 03:00:00|2019-01-01 11:30:00| 1| 1|
      | 1| R|2019-01-01 11:30:00|2019-01-01 15:00:00| 2| 2|
      | 1| R|2019-01-01 15:00:00|2019-01-01 22:00:00| 3| 3|
      | 1| W|2019-01-01 22:00:00|2019-01-02 04:30:00| 4| 1|
      | 1| W|2019-01-02 04:30:00|2019-01-02 13:45:00| 5| 2|
      | 1| R|2019-01-02 13:45:00|2019-01-02 18:30:00| 6| 4|
      | 1| R|2019-01-02 18:30:00|2019-01-02 22:45:00| 7| 5|
      +---+-----+-------------------+-------------------+-----------+----------------+


      Notice that the difference between idRowNumber and idStateRowNumber will be identical for each combination of (Id, State) hence we can create a new column called category and group by this one in order to get min StartTime and max EndTime for each group. The complete code should look like the next one:



      val idSpec = Window.partitionBy('Id).orderBy('StartTime)
      val idStateSpec = Window.partitionBy('Id,'State).orderBy('StartTime)

      val df2 = df
      .select('Id,'State,'StartTime.cast("timestamp"),'EndTime.cast("timestamp"),
      row_number().over(idSpec).as("idRowNumber"),
      row_number().over(idStateSpec).as("idStateRowNumber"))
      .withColumn("Category", $"idRowNumber" - $"idStateRowNumber")
      .groupBy("Category", "Id", "State")
      .agg(min("StartTime").as("StartTime"), max("EndTime").as("EndTime"))
      .drop("Category")


      And the output:



      +---+-----+-------------------+-------------------+
      | Id|State| StartTime| EndTime|
      +---+-----+-------------------+-------------------+
      | 1| R|2019-01-01 03:00:00|2019-01-01 22:00:00|
      | 1| W|2019-01-01 22:00:00|2019-01-02 13:45:00|
      | 1| R|2019-01-02 13:45:00|2019-01-02 22:45:00|
      +---+-----+-------------------+-------------------+





      share|improve this answer



























        0














        Since I had a similar case recently I would like to provide the complete solution for this one. The part of the code:



        val df2 = df
        .select('Id,'State,'StartTime,'EndTime,
        row_number().over(idSpec).as("idRowNumber"),
        row_number().over(idStateSpec).as("idStateRowNumber"))


        Has output:



        +---+-----+-------------------+-------------------+-----------+----------------+
        | Id|State| StartTime| EndTime|idRowNumber|idStateRowNumber|
        +---+-----+-------------------+-------------------+-----------+----------------+
        | 1| R|2019-01-01 03:00:00|2019-01-01 11:30:00| 1| 1|
        | 1| R|2019-01-01 11:30:00|2019-01-01 15:00:00| 2| 2|
        | 1| R|2019-01-01 15:00:00|2019-01-01 22:00:00| 3| 3|
        | 1| W|2019-01-01 22:00:00|2019-01-02 04:30:00| 4| 1|
        | 1| W|2019-01-02 04:30:00|2019-01-02 13:45:00| 5| 2|
        | 1| R|2019-01-02 13:45:00|2019-01-02 18:30:00| 6| 4|
        | 1| R|2019-01-02 18:30:00|2019-01-02 22:45:00| 7| 5|
        +---+-----+-------------------+-------------------+-----------+----------------+


        Notice that the difference between idRowNumber and idStateRowNumber will be identical for each combination of (Id, State) hence we can create a new column called category and group by this one in order to get min StartTime and max EndTime for each group. The complete code should look like the next one:



        val idSpec = Window.partitionBy('Id).orderBy('StartTime)
        val idStateSpec = Window.partitionBy('Id,'State).orderBy('StartTime)

        val df2 = df
        .select('Id,'State,'StartTime.cast("timestamp"),'EndTime.cast("timestamp"),
        row_number().over(idSpec).as("idRowNumber"),
        row_number().over(idStateSpec).as("idStateRowNumber"))
        .withColumn("Category", $"idRowNumber" - $"idStateRowNumber")
        .groupBy("Category", "Id", "State")
        .agg(min("StartTime").as("StartTime"), max("EndTime").as("EndTime"))
        .drop("Category")


        And the output:



        +---+-----+-------------------+-------------------+
        | Id|State| StartTime| EndTime|
        +---+-----+-------------------+-------------------+
        | 1| R|2019-01-01 03:00:00|2019-01-01 22:00:00|
        | 1| W|2019-01-01 22:00:00|2019-01-02 13:45:00|
        | 1| R|2019-01-02 13:45:00|2019-01-02 22:45:00|
        +---+-----+-------------------+-------------------+





        share|improve this answer

























          0












          0








          0







          Since I had a similar case recently I would like to provide the complete solution for this one. The part of the code:



          val df2 = df
          .select('Id,'State,'StartTime,'EndTime,
          row_number().over(idSpec).as("idRowNumber"),
          row_number().over(idStateSpec).as("idStateRowNumber"))


          Has output:



          +---+-----+-------------------+-------------------+-----------+----------------+
          | Id|State| StartTime| EndTime|idRowNumber|idStateRowNumber|
          +---+-----+-------------------+-------------------+-----------+----------------+
          | 1| R|2019-01-01 03:00:00|2019-01-01 11:30:00| 1| 1|
          | 1| R|2019-01-01 11:30:00|2019-01-01 15:00:00| 2| 2|
          | 1| R|2019-01-01 15:00:00|2019-01-01 22:00:00| 3| 3|
          | 1| W|2019-01-01 22:00:00|2019-01-02 04:30:00| 4| 1|
          | 1| W|2019-01-02 04:30:00|2019-01-02 13:45:00| 5| 2|
          | 1| R|2019-01-02 13:45:00|2019-01-02 18:30:00| 6| 4|
          | 1| R|2019-01-02 18:30:00|2019-01-02 22:45:00| 7| 5|
          +---+-----+-------------------+-------------------+-----------+----------------+


          Notice that the difference between idRowNumber and idStateRowNumber will be identical for each combination of (Id, State) hence we can create a new column called category and group by this one in order to get min StartTime and max EndTime for each group. The complete code should look like the next one:



          val idSpec = Window.partitionBy('Id).orderBy('StartTime)
          val idStateSpec = Window.partitionBy('Id,'State).orderBy('StartTime)

          val df2 = df
          .select('Id,'State,'StartTime.cast("timestamp"),'EndTime.cast("timestamp"),
          row_number().over(idSpec).as("idRowNumber"),
          row_number().over(idStateSpec).as("idStateRowNumber"))
          .withColumn("Category", $"idRowNumber" - $"idStateRowNumber")
          .groupBy("Category", "Id", "State")
          .agg(min("StartTime").as("StartTime"), max("EndTime").as("EndTime"))
          .drop("Category")


          And the output:



          +---+-----+-------------------+-------------------+
          | Id|State| StartTime| EndTime|
          +---+-----+-------------------+-------------------+
          | 1| R|2019-01-01 03:00:00|2019-01-01 22:00:00|
          | 1| W|2019-01-01 22:00:00|2019-01-02 13:45:00|
          | 1| R|2019-01-02 13:45:00|2019-01-02 22:45:00|
          +---+-----+-------------------+-------------------+





          share|improve this answer













          Since I had a similar case recently I would like to provide the complete solution for this one. The part of the code:



          val df2 = df
          .select('Id,'State,'StartTime,'EndTime,
          row_number().over(idSpec).as("idRowNumber"),
          row_number().over(idStateSpec).as("idStateRowNumber"))


          Has output:



          +---+-----+-------------------+-------------------+-----------+----------------+
          | Id|State| StartTime| EndTime|idRowNumber|idStateRowNumber|
          +---+-----+-------------------+-------------------+-----------+----------------+
          | 1| R|2019-01-01 03:00:00|2019-01-01 11:30:00| 1| 1|
          | 1| R|2019-01-01 11:30:00|2019-01-01 15:00:00| 2| 2|
          | 1| R|2019-01-01 15:00:00|2019-01-01 22:00:00| 3| 3|
          | 1| W|2019-01-01 22:00:00|2019-01-02 04:30:00| 4| 1|
          | 1| W|2019-01-02 04:30:00|2019-01-02 13:45:00| 5| 2|
          | 1| R|2019-01-02 13:45:00|2019-01-02 18:30:00| 6| 4|
          | 1| R|2019-01-02 18:30:00|2019-01-02 22:45:00| 7| 5|
          +---+-----+-------------------+-------------------+-----------+----------------+


          Notice that the difference between idRowNumber and idStateRowNumber will be identical for each combination of (Id, State) hence we can create a new column called category and group by this one in order to get min StartTime and max EndTime for each group. The complete code should look like the next one:



          val idSpec = Window.partitionBy('Id).orderBy('StartTime)
          val idStateSpec = Window.partitionBy('Id,'State).orderBy('StartTime)

          val df2 = df
          .select('Id,'State,'StartTime.cast("timestamp"),'EndTime.cast("timestamp"),
          row_number().over(idSpec).as("idRowNumber"),
          row_number().over(idStateSpec).as("idStateRowNumber"))
          .withColumn("Category", $"idRowNumber" - $"idStateRowNumber")
          .groupBy("Category", "Id", "State")
          .agg(min("StartTime").as("StartTime"), max("EndTime").as("EndTime"))
          .drop("Category")


          And the output:



          +---+-----+-------------------+-------------------+
          | Id|State| StartTime| EndTime|
          +---+-----+-------------------+-------------------+
          | 1| R|2019-01-01 03:00:00|2019-01-01 22:00:00|
          | 1| W|2019-01-01 22:00:00|2019-01-02 13:45:00|
          | 1| R|2019-01-02 13:45:00|2019-01-02 22:45:00|
          +---+-----+-------------------+-------------------+






          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Mar 16 at 9:36









          Alexandros BiratsisAlexandros Biratsis

          1,2341120




          1,2341120























              0














              So it turns out that the answer to this is Combine rows when the end time of one is the start time of another (Oracle) translated to Spark.



              import org.apache.spark.sql.expressions.Window
              import org.apache.spark.sql.functions.col,row_number
              import spark.implicits._

              val idSpec = Window.partitionBy('Id).orderBy('StartTime)
              val idStateSpec = Window.partitionBy('Id,'State).orderBy('StartTime)
              val df2 = df
              .select('Id,'State,'StartTime,'EndTime,
              row_number().over(idSpec).as("idRowNumber"),
              row_number().over(idStateSpec).as("idStateRowNumber"))
              .groupBy('Id,'State,'idRowNumber - 'idStateRowNumber)
              .agg(min('StartTime).as("StartTime"), max('EndTime).as("EndTime"))





              share|improve this answer





























                0














                So it turns out that the answer to this is Combine rows when the end time of one is the start time of another (Oracle) translated to Spark.



                import org.apache.spark.sql.expressions.Window
                import org.apache.spark.sql.functions.col,row_number
                import spark.implicits._

                val idSpec = Window.partitionBy('Id).orderBy('StartTime)
                val idStateSpec = Window.partitionBy('Id,'State).orderBy('StartTime)
                val df2 = df
                .select('Id,'State,'StartTime,'EndTime,
                row_number().over(idSpec).as("idRowNumber"),
                row_number().over(idStateSpec).as("idStateRowNumber"))
                .groupBy('Id,'State,'idRowNumber - 'idStateRowNumber)
                .agg(min('StartTime).as("StartTime"), max('EndTime).as("EndTime"))





                share|improve this answer



























                  0












                  0








                  0







                  So it turns out that the answer to this is Combine rows when the end time of one is the start time of another (Oracle) translated to Spark.



                  import org.apache.spark.sql.expressions.Window
                  import org.apache.spark.sql.functions.col,row_number
                  import spark.implicits._

                  val idSpec = Window.partitionBy('Id).orderBy('StartTime)
                  val idStateSpec = Window.partitionBy('Id,'State).orderBy('StartTime)
                  val df2 = df
                  .select('Id,'State,'StartTime,'EndTime,
                  row_number().over(idSpec).as("idRowNumber"),
                  row_number().over(idStateSpec).as("idStateRowNumber"))
                  .groupBy('Id,'State,'idRowNumber - 'idStateRowNumber)
                  .agg(min('StartTime).as("StartTime"), max('EndTime).as("EndTime"))





                  share|improve this answer















                  So it turns out that the answer to this is Combine rows when the end time of one is the start time of another (Oracle) translated to Spark.



                  import org.apache.spark.sql.expressions.Window
                  import org.apache.spark.sql.functions.col,row_number
                  import spark.implicits._

                  val idSpec = Window.partitionBy('Id).orderBy('StartTime)
                  val idStateSpec = Window.partitionBy('Id,'State).orderBy('StartTime)
                  val df2 = df
                  .select('Id,'State,'StartTime,'EndTime,
                  row_number().over(idSpec).as("idRowNumber"),
                  row_number().over(idStateSpec).as("idStateRowNumber"))
                  .groupBy('Id,'State,'idRowNumber - 'idStateRowNumber)
                  .agg(min('StartTime).as("StartTime"), max('EndTime).as("EndTime"))






                  share|improve this answer














                  share|improve this answer



                  share|improve this answer








                  edited Mar 19 at 16:11

























                  answered Mar 15 at 13:29









                  Jeff HornbyJeff Hornby

                  9,77333251




                  9,77333251



























                      draft saved

                      draft discarded
















































                      Thanks for contributing an answer to Stack Overflow!


                      • Please be sure to answer the question. Provide details and share your research!

                      But avoid


                      • Asking for help, clarification, or responding to other answers.

                      • Making statements based on opinion; back them up with references or personal experience.

                      To learn more, see our tips on writing great answers.




                      draft saved


                      draft discarded














                      StackExchange.ready(
                      function ()
                      StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55067305%2fextract-time-intervals-in-a-scala-spark-dataframe%23new-answer', 'question_page');

                      );

                      Post as a guest















                      Required, but never shown





















































                      Required, but never shown














                      Required, but never shown












                      Required, but never shown







                      Required, but never shown

































                      Required, but never shown














                      Required, but never shown












                      Required, but never shown







                      Required, but never shown







                      Popular posts from this blog

                      1928 у кіно

                      Захаров Федір Захарович

                      Ель Греко