Extract time intervals in a scala spark dataframe Announcing the arrival of Valued Associate #679: Cesar Manara Planned maintenance scheduled April 17/18, 2019 at 00:00UTC (8:00pm US/Eastern) Data science time! April 2019 and salary with experience Should we burninate the [wrap] tag? The Ask Question Wizard is Live!How to make good reproducible Apache Spark examplesSpark SQL window function with complex conditionCombine rows when the end time of one is the start time of another (Oracle)How to sort a dataframe by multiple column(s)Delete column from pandas DataFrame by column nameHow to iterate over rows in a DataFrame in Pandas?Select rows from a DataFrame based on values in a column in pandasCount instances of combination of columns in spark dataframe using scalaspark (Scala) dataframe filtering (FIR)Spark scala - how to do count() by conditioning on two rowsCheck equality for two Spark DataFrames in ScalaGenerate a interval based time series using Spark SQLPercentile based CountDistinct in Spark dataframe - Scala
Were Kohanim forbidden from serving in King David's army?
Should I call the interviewer directly, if HR aren't responding?
Can inflation occur in a positive-sum game currency system such as the Stack Exchange reputation system?
Letter Boxed validator
Is there a concise way to say "all of the X, one of each"?
How to do this path/lattice with tikz
What is the correct way to use the pinch test for dehydration?
How can I fade player character when he goes inside or outside of the area?
How to bypass password on Windows XP account?
Why aren't air breathing engines used as small first stages
Models of set theory where not every set can be linearly ordered
What causes the vertical darker bands in my photo?
Why did the IBM 650 use bi-quinary?
What do you call a plan that's an alternative plan in case your initial plan fails?
Why does Python start at index -1 when indexing a list from the end?
Why are there no cargo aircraft with "flying wing" design?
The logistics of corpse disposal
Why constant symbols in a language?
Why was the term "discrete" used in discrete logarithm?
What happens to sewage if there is no river near by?
Is there any avatar supposed to be born between the death of Krishna and the birth of Kalki?
When is phishing education going too far?
Are my PIs rude or am I just being too sensitive?
How can players work together to take actions that are otherwise impossible?
Extract time intervals in a scala spark dataframe
Announcing the arrival of Valued Associate #679: Cesar Manara
Planned maintenance scheduled April 17/18, 2019 at 00:00UTC (8:00pm US/Eastern)
Data science time! April 2019 and salary with experience
Should we burninate the [wrap] tag?
The Ask Question Wizard is Live!How to make good reproducible Apache Spark examplesSpark SQL window function with complex conditionCombine rows when the end time of one is the start time of another (Oracle)How to sort a dataframe by multiple column(s)Delete column from pandas DataFrame by column nameHow to iterate over rows in a DataFrame in Pandas?Select rows from a DataFrame based on values in a column in pandasCount instances of combination of columns in spark dataframe using scalaspark (Scala) dataframe filtering (FIR)Spark scala - how to do count() by conditioning on two rowsCheck equality for two Spark DataFrames in ScalaGenerate a interval based time series using Spark SQLPercentile based CountDistinct in Spark dataframe - Scala
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty height:90px;width:728px;box-sizing:border-box;
I'm trying to extract combined data intervals based on a time series in scala and spark
I have the following data in a dataframe:
Id | State | StartTime | EndTime
---+-------+---------------------+--------------------
1 | R | 2019-01-01T03:00:00 | 2019-01-01T11:30:00
1 | R | 2019-01-01T11:30:00 | 2019-01-01T15:00:00
1 | R | 2019-01-01T15:00:00 | 2019-01-01T22:00:00
1 | W | 2019-01-01T22:00:00 | 2019-01-02T04:30:00
1 | W | 2019-01-02T04:30:00 | 2019-01-02T13:45:00
1 | R | 2019-01-02T13:45:00 | 2019-01-02T18:30:00
1 | R | 2019-01-02T18:30:00 | 2019-01-02T22:45:00
I need to extract the data into time intervals based on the id and state. The resulting data needs to look like:
Id | State | StartTime | EndTime
---+-------+---------------------+--------------------
1 | R | 2019-01-01T03:00:00 | 2019-01-01T22:00:00
1 | W | 2019-01-01T22:00:00 | 2019-01-02T13:45:00
1 | R | 2019-01-02T13:45:00 | 2019-01-02T22:45:00
Note that the first three records have been grouped together because the equipment is contiguously in an R state from 2019-01-01T03:00:00 to 2019-01-01T22:00:00, then it switches to a W state for the next two records from 2019-01-01T22:00:00 to 2019-01-02T13:45:00and then back to an R state for the last two records.
scala apache-spark dataframe apache-spark-sql
add a comment |
I'm trying to extract combined data intervals based on a time series in scala and spark
I have the following data in a dataframe:
Id | State | StartTime | EndTime
---+-------+---------------------+--------------------
1 | R | 2019-01-01T03:00:00 | 2019-01-01T11:30:00
1 | R | 2019-01-01T11:30:00 | 2019-01-01T15:00:00
1 | R | 2019-01-01T15:00:00 | 2019-01-01T22:00:00
1 | W | 2019-01-01T22:00:00 | 2019-01-02T04:30:00
1 | W | 2019-01-02T04:30:00 | 2019-01-02T13:45:00
1 | R | 2019-01-02T13:45:00 | 2019-01-02T18:30:00
1 | R | 2019-01-02T18:30:00 | 2019-01-02T22:45:00
I need to extract the data into time intervals based on the id and state. The resulting data needs to look like:
Id | State | StartTime | EndTime
---+-------+---------------------+--------------------
1 | R | 2019-01-01T03:00:00 | 2019-01-01T22:00:00
1 | W | 2019-01-01T22:00:00 | 2019-01-02T13:45:00
1 | R | 2019-01-02T13:45:00 | 2019-01-02T22:45:00
Note that the first three records have been grouped together because the equipment is contiguously in an R state from 2019-01-01T03:00:00 to 2019-01-01T22:00:00, then it switches to a W state for the next two records from 2019-01-01T22:00:00 to 2019-01-02T13:45:00and then back to an R state for the last two records.
scala apache-spark dataframe apache-spark-sql
Possible duplicate of Spark SQL window function with complex condition
– user10465355
Mar 8 at 16:50
I looked at that question and it is a very different problem
– Jeff Hornby
Mar 8 at 17:01
In such case could edit your question and explain desired logic in detail? Additionally we always welcome reproducible example in apache-spark. Thank you in advance.
– user10465355
Mar 8 at 17:03
2
turns out the solution is this: stackoverflow.com/questions/7420618/… translated to spark
– Jeff Hornby
Mar 8 at 19:44
hi @JeffHornby, did you manage to convert this to Spark code?
– Alexandros Biratsis
Mar 15 at 8:07
add a comment |
I'm trying to extract combined data intervals based on a time series in scala and spark
I have the following data in a dataframe:
Id | State | StartTime | EndTime
---+-------+---------------------+--------------------
1 | R | 2019-01-01T03:00:00 | 2019-01-01T11:30:00
1 | R | 2019-01-01T11:30:00 | 2019-01-01T15:00:00
1 | R | 2019-01-01T15:00:00 | 2019-01-01T22:00:00
1 | W | 2019-01-01T22:00:00 | 2019-01-02T04:30:00
1 | W | 2019-01-02T04:30:00 | 2019-01-02T13:45:00
1 | R | 2019-01-02T13:45:00 | 2019-01-02T18:30:00
1 | R | 2019-01-02T18:30:00 | 2019-01-02T22:45:00
I need to extract the data into time intervals based on the id and state. The resulting data needs to look like:
Id | State | StartTime | EndTime
---+-------+---------------------+--------------------
1 | R | 2019-01-01T03:00:00 | 2019-01-01T22:00:00
1 | W | 2019-01-01T22:00:00 | 2019-01-02T13:45:00
1 | R | 2019-01-02T13:45:00 | 2019-01-02T22:45:00
Note that the first three records have been grouped together because the equipment is contiguously in an R state from 2019-01-01T03:00:00 to 2019-01-01T22:00:00, then it switches to a W state for the next two records from 2019-01-01T22:00:00 to 2019-01-02T13:45:00and then back to an R state for the last two records.
scala apache-spark dataframe apache-spark-sql
I'm trying to extract combined data intervals based on a time series in scala and spark
I have the following data in a dataframe:
Id | State | StartTime | EndTime
---+-------+---------------------+--------------------
1 | R | 2019-01-01T03:00:00 | 2019-01-01T11:30:00
1 | R | 2019-01-01T11:30:00 | 2019-01-01T15:00:00
1 | R | 2019-01-01T15:00:00 | 2019-01-01T22:00:00
1 | W | 2019-01-01T22:00:00 | 2019-01-02T04:30:00
1 | W | 2019-01-02T04:30:00 | 2019-01-02T13:45:00
1 | R | 2019-01-02T13:45:00 | 2019-01-02T18:30:00
1 | R | 2019-01-02T18:30:00 | 2019-01-02T22:45:00
I need to extract the data into time intervals based on the id and state. The resulting data needs to look like:
Id | State | StartTime | EndTime
---+-------+---------------------+--------------------
1 | R | 2019-01-01T03:00:00 | 2019-01-01T22:00:00
1 | W | 2019-01-01T22:00:00 | 2019-01-02T13:45:00
1 | R | 2019-01-02T13:45:00 | 2019-01-02T22:45:00
Note that the first three records have been grouped together because the equipment is contiguously in an R state from 2019-01-01T03:00:00 to 2019-01-01T22:00:00, then it switches to a W state for the next two records from 2019-01-01T22:00:00 to 2019-01-02T13:45:00and then back to an R state for the last two records.
scala apache-spark dataframe apache-spark-sql
scala apache-spark dataframe apache-spark-sql
edited Mar 8 at 17:27
Jeff Hornby
asked Mar 8 at 16:34
Jeff HornbyJeff Hornby
9,77333251
9,77333251
Possible duplicate of Spark SQL window function with complex condition
– user10465355
Mar 8 at 16:50
I looked at that question and it is a very different problem
– Jeff Hornby
Mar 8 at 17:01
In such case could edit your question and explain desired logic in detail? Additionally we always welcome reproducible example in apache-spark. Thank you in advance.
– user10465355
Mar 8 at 17:03
2
turns out the solution is this: stackoverflow.com/questions/7420618/… translated to spark
– Jeff Hornby
Mar 8 at 19:44
hi @JeffHornby, did you manage to convert this to Spark code?
– Alexandros Biratsis
Mar 15 at 8:07
add a comment |
Possible duplicate of Spark SQL window function with complex condition
– user10465355
Mar 8 at 16:50
I looked at that question and it is a very different problem
– Jeff Hornby
Mar 8 at 17:01
In such case could edit your question and explain desired logic in detail? Additionally we always welcome reproducible example in apache-spark. Thank you in advance.
– user10465355
Mar 8 at 17:03
2
turns out the solution is this: stackoverflow.com/questions/7420618/… translated to spark
– Jeff Hornby
Mar 8 at 19:44
hi @JeffHornby, did you manage to convert this to Spark code?
– Alexandros Biratsis
Mar 15 at 8:07
Possible duplicate of Spark SQL window function with complex condition
– user10465355
Mar 8 at 16:50
Possible duplicate of Spark SQL window function with complex condition
– user10465355
Mar 8 at 16:50
I looked at that question and it is a very different problem
– Jeff Hornby
Mar 8 at 17:01
I looked at that question and it is a very different problem
– Jeff Hornby
Mar 8 at 17:01
In such case could edit your question and explain desired logic in detail? Additionally we always welcome reproducible example in apache-spark. Thank you in advance.
– user10465355
Mar 8 at 17:03
In such case could edit your question and explain desired logic in detail? Additionally we always welcome reproducible example in apache-spark. Thank you in advance.
– user10465355
Mar 8 at 17:03
2
2
turns out the solution is this: stackoverflow.com/questions/7420618/… translated to spark
– Jeff Hornby
Mar 8 at 19:44
turns out the solution is this: stackoverflow.com/questions/7420618/… translated to spark
– Jeff Hornby
Mar 8 at 19:44
hi @JeffHornby, did you manage to convert this to Spark code?
– Alexandros Biratsis
Mar 15 at 8:07
hi @JeffHornby, did you manage to convert this to Spark code?
– Alexandros Biratsis
Mar 15 at 8:07
add a comment |
2 Answers
2
active
oldest
votes
Since I had a similar case recently I would like to provide the complete solution for this one. The part of the code:
val df2 = df
.select('Id,'State,'StartTime,'EndTime,
row_number().over(idSpec).as("idRowNumber"),
row_number().over(idStateSpec).as("idStateRowNumber"))
Has output:
+---+-----+-------------------+-------------------+-----------+----------------+
| Id|State| StartTime| EndTime|idRowNumber|idStateRowNumber|
+---+-----+-------------------+-------------------+-----------+----------------+
| 1| R|2019-01-01 03:00:00|2019-01-01 11:30:00| 1| 1|
| 1| R|2019-01-01 11:30:00|2019-01-01 15:00:00| 2| 2|
| 1| R|2019-01-01 15:00:00|2019-01-01 22:00:00| 3| 3|
| 1| W|2019-01-01 22:00:00|2019-01-02 04:30:00| 4| 1|
| 1| W|2019-01-02 04:30:00|2019-01-02 13:45:00| 5| 2|
| 1| R|2019-01-02 13:45:00|2019-01-02 18:30:00| 6| 4|
| 1| R|2019-01-02 18:30:00|2019-01-02 22:45:00| 7| 5|
+---+-----+-------------------+-------------------+-----------+----------------+
Notice that the difference between idRowNumber and idStateRowNumber will be identical for each combination of (Id, State) hence we can create a new column called category and group by this one in order to get min StartTime and max EndTime for each group. The complete code should look like the next one:
val idSpec = Window.partitionBy('Id).orderBy('StartTime)
val idStateSpec = Window.partitionBy('Id,'State).orderBy('StartTime)
val df2 = df
.select('Id,'State,'StartTime.cast("timestamp"),'EndTime.cast("timestamp"),
row_number().over(idSpec).as("idRowNumber"),
row_number().over(idStateSpec).as("idStateRowNumber"))
.withColumn("Category", $"idRowNumber" - $"idStateRowNumber")
.groupBy("Category", "Id", "State")
.agg(min("StartTime").as("StartTime"), max("EndTime").as("EndTime"))
.drop("Category")
And the output:
+---+-----+-------------------+-------------------+
| Id|State| StartTime| EndTime|
+---+-----+-------------------+-------------------+
| 1| R|2019-01-01 03:00:00|2019-01-01 22:00:00|
| 1| W|2019-01-01 22:00:00|2019-01-02 13:45:00|
| 1| R|2019-01-02 13:45:00|2019-01-02 22:45:00|
+---+-----+-------------------+-------------------+
add a comment |
So it turns out that the answer to this is Combine rows when the end time of one is the start time of another (Oracle) translated to Spark.
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col,row_number
import spark.implicits._
val idSpec = Window.partitionBy('Id).orderBy('StartTime)
val idStateSpec = Window.partitionBy('Id,'State).orderBy('StartTime)
val df2 = df
.select('Id,'State,'StartTime,'EndTime,
row_number().over(idSpec).as("idRowNumber"),
row_number().over(idStateSpec).as("idStateRowNumber"))
.groupBy('Id,'State,'idRowNumber - 'idStateRowNumber)
.agg(min('StartTime).as("StartTime"), max('EndTime).as("EndTime"))
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55067305%2fextract-time-intervals-in-a-scala-spark-dataframe%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
Since I had a similar case recently I would like to provide the complete solution for this one. The part of the code:
val df2 = df
.select('Id,'State,'StartTime,'EndTime,
row_number().over(idSpec).as("idRowNumber"),
row_number().over(idStateSpec).as("idStateRowNumber"))
Has output:
+---+-----+-------------------+-------------------+-----------+----------------+
| Id|State| StartTime| EndTime|idRowNumber|idStateRowNumber|
+---+-----+-------------------+-------------------+-----------+----------------+
| 1| R|2019-01-01 03:00:00|2019-01-01 11:30:00| 1| 1|
| 1| R|2019-01-01 11:30:00|2019-01-01 15:00:00| 2| 2|
| 1| R|2019-01-01 15:00:00|2019-01-01 22:00:00| 3| 3|
| 1| W|2019-01-01 22:00:00|2019-01-02 04:30:00| 4| 1|
| 1| W|2019-01-02 04:30:00|2019-01-02 13:45:00| 5| 2|
| 1| R|2019-01-02 13:45:00|2019-01-02 18:30:00| 6| 4|
| 1| R|2019-01-02 18:30:00|2019-01-02 22:45:00| 7| 5|
+---+-----+-------------------+-------------------+-----------+----------------+
Notice that the difference between idRowNumber and idStateRowNumber will be identical for each combination of (Id, State) hence we can create a new column called category and group by this one in order to get min StartTime and max EndTime for each group. The complete code should look like the next one:
val idSpec = Window.partitionBy('Id).orderBy('StartTime)
val idStateSpec = Window.partitionBy('Id,'State).orderBy('StartTime)
val df2 = df
.select('Id,'State,'StartTime.cast("timestamp"),'EndTime.cast("timestamp"),
row_number().over(idSpec).as("idRowNumber"),
row_number().over(idStateSpec).as("idStateRowNumber"))
.withColumn("Category", $"idRowNumber" - $"idStateRowNumber")
.groupBy("Category", "Id", "State")
.agg(min("StartTime").as("StartTime"), max("EndTime").as("EndTime"))
.drop("Category")
And the output:
+---+-----+-------------------+-------------------+
| Id|State| StartTime| EndTime|
+---+-----+-------------------+-------------------+
| 1| R|2019-01-01 03:00:00|2019-01-01 22:00:00|
| 1| W|2019-01-01 22:00:00|2019-01-02 13:45:00|
| 1| R|2019-01-02 13:45:00|2019-01-02 22:45:00|
+---+-----+-------------------+-------------------+
add a comment |
Since I had a similar case recently I would like to provide the complete solution for this one. The part of the code:
val df2 = df
.select('Id,'State,'StartTime,'EndTime,
row_number().over(idSpec).as("idRowNumber"),
row_number().over(idStateSpec).as("idStateRowNumber"))
Has output:
+---+-----+-------------------+-------------------+-----------+----------------+
| Id|State| StartTime| EndTime|idRowNumber|idStateRowNumber|
+---+-----+-------------------+-------------------+-----------+----------------+
| 1| R|2019-01-01 03:00:00|2019-01-01 11:30:00| 1| 1|
| 1| R|2019-01-01 11:30:00|2019-01-01 15:00:00| 2| 2|
| 1| R|2019-01-01 15:00:00|2019-01-01 22:00:00| 3| 3|
| 1| W|2019-01-01 22:00:00|2019-01-02 04:30:00| 4| 1|
| 1| W|2019-01-02 04:30:00|2019-01-02 13:45:00| 5| 2|
| 1| R|2019-01-02 13:45:00|2019-01-02 18:30:00| 6| 4|
| 1| R|2019-01-02 18:30:00|2019-01-02 22:45:00| 7| 5|
+---+-----+-------------------+-------------------+-----------+----------------+
Notice that the difference between idRowNumber and idStateRowNumber will be identical for each combination of (Id, State) hence we can create a new column called category and group by this one in order to get min StartTime and max EndTime for each group. The complete code should look like the next one:
val idSpec = Window.partitionBy('Id).orderBy('StartTime)
val idStateSpec = Window.partitionBy('Id,'State).orderBy('StartTime)
val df2 = df
.select('Id,'State,'StartTime.cast("timestamp"),'EndTime.cast("timestamp"),
row_number().over(idSpec).as("idRowNumber"),
row_number().over(idStateSpec).as("idStateRowNumber"))
.withColumn("Category", $"idRowNumber" - $"idStateRowNumber")
.groupBy("Category", "Id", "State")
.agg(min("StartTime").as("StartTime"), max("EndTime").as("EndTime"))
.drop("Category")
And the output:
+---+-----+-------------------+-------------------+
| Id|State| StartTime| EndTime|
+---+-----+-------------------+-------------------+
| 1| R|2019-01-01 03:00:00|2019-01-01 22:00:00|
| 1| W|2019-01-01 22:00:00|2019-01-02 13:45:00|
| 1| R|2019-01-02 13:45:00|2019-01-02 22:45:00|
+---+-----+-------------------+-------------------+
add a comment |
Since I had a similar case recently I would like to provide the complete solution for this one. The part of the code:
val df2 = df
.select('Id,'State,'StartTime,'EndTime,
row_number().over(idSpec).as("idRowNumber"),
row_number().over(idStateSpec).as("idStateRowNumber"))
Has output:
+---+-----+-------------------+-------------------+-----------+----------------+
| Id|State| StartTime| EndTime|idRowNumber|idStateRowNumber|
+---+-----+-------------------+-------------------+-----------+----------------+
| 1| R|2019-01-01 03:00:00|2019-01-01 11:30:00| 1| 1|
| 1| R|2019-01-01 11:30:00|2019-01-01 15:00:00| 2| 2|
| 1| R|2019-01-01 15:00:00|2019-01-01 22:00:00| 3| 3|
| 1| W|2019-01-01 22:00:00|2019-01-02 04:30:00| 4| 1|
| 1| W|2019-01-02 04:30:00|2019-01-02 13:45:00| 5| 2|
| 1| R|2019-01-02 13:45:00|2019-01-02 18:30:00| 6| 4|
| 1| R|2019-01-02 18:30:00|2019-01-02 22:45:00| 7| 5|
+---+-----+-------------------+-------------------+-----------+----------------+
Notice that the difference between idRowNumber and idStateRowNumber will be identical for each combination of (Id, State) hence we can create a new column called category and group by this one in order to get min StartTime and max EndTime for each group. The complete code should look like the next one:
val idSpec = Window.partitionBy('Id).orderBy('StartTime)
val idStateSpec = Window.partitionBy('Id,'State).orderBy('StartTime)
val df2 = df
.select('Id,'State,'StartTime.cast("timestamp"),'EndTime.cast("timestamp"),
row_number().over(idSpec).as("idRowNumber"),
row_number().over(idStateSpec).as("idStateRowNumber"))
.withColumn("Category", $"idRowNumber" - $"idStateRowNumber")
.groupBy("Category", "Id", "State")
.agg(min("StartTime").as("StartTime"), max("EndTime").as("EndTime"))
.drop("Category")
And the output:
+---+-----+-------------------+-------------------+
| Id|State| StartTime| EndTime|
+---+-----+-------------------+-------------------+
| 1| R|2019-01-01 03:00:00|2019-01-01 22:00:00|
| 1| W|2019-01-01 22:00:00|2019-01-02 13:45:00|
| 1| R|2019-01-02 13:45:00|2019-01-02 22:45:00|
+---+-----+-------------------+-------------------+
Since I had a similar case recently I would like to provide the complete solution for this one. The part of the code:
val df2 = df
.select('Id,'State,'StartTime,'EndTime,
row_number().over(idSpec).as("idRowNumber"),
row_number().over(idStateSpec).as("idStateRowNumber"))
Has output:
+---+-----+-------------------+-------------------+-----------+----------------+
| Id|State| StartTime| EndTime|idRowNumber|idStateRowNumber|
+---+-----+-------------------+-------------------+-----------+----------------+
| 1| R|2019-01-01 03:00:00|2019-01-01 11:30:00| 1| 1|
| 1| R|2019-01-01 11:30:00|2019-01-01 15:00:00| 2| 2|
| 1| R|2019-01-01 15:00:00|2019-01-01 22:00:00| 3| 3|
| 1| W|2019-01-01 22:00:00|2019-01-02 04:30:00| 4| 1|
| 1| W|2019-01-02 04:30:00|2019-01-02 13:45:00| 5| 2|
| 1| R|2019-01-02 13:45:00|2019-01-02 18:30:00| 6| 4|
| 1| R|2019-01-02 18:30:00|2019-01-02 22:45:00| 7| 5|
+---+-----+-------------------+-------------------+-----------+----------------+
Notice that the difference between idRowNumber and idStateRowNumber will be identical for each combination of (Id, State) hence we can create a new column called category and group by this one in order to get min StartTime and max EndTime for each group. The complete code should look like the next one:
val idSpec = Window.partitionBy('Id).orderBy('StartTime)
val idStateSpec = Window.partitionBy('Id,'State).orderBy('StartTime)
val df2 = df
.select('Id,'State,'StartTime.cast("timestamp"),'EndTime.cast("timestamp"),
row_number().over(idSpec).as("idRowNumber"),
row_number().over(idStateSpec).as("idStateRowNumber"))
.withColumn("Category", $"idRowNumber" - $"idStateRowNumber")
.groupBy("Category", "Id", "State")
.agg(min("StartTime").as("StartTime"), max("EndTime").as("EndTime"))
.drop("Category")
And the output:
+---+-----+-------------------+-------------------+
| Id|State| StartTime| EndTime|
+---+-----+-------------------+-------------------+
| 1| R|2019-01-01 03:00:00|2019-01-01 22:00:00|
| 1| W|2019-01-01 22:00:00|2019-01-02 13:45:00|
| 1| R|2019-01-02 13:45:00|2019-01-02 22:45:00|
+---+-----+-------------------+-------------------+
answered Mar 16 at 9:36
Alexandros BiratsisAlexandros Biratsis
1,2341120
1,2341120
add a comment |
add a comment |
So it turns out that the answer to this is Combine rows when the end time of one is the start time of another (Oracle) translated to Spark.
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col,row_number
import spark.implicits._
val idSpec = Window.partitionBy('Id).orderBy('StartTime)
val idStateSpec = Window.partitionBy('Id,'State).orderBy('StartTime)
val df2 = df
.select('Id,'State,'StartTime,'EndTime,
row_number().over(idSpec).as("idRowNumber"),
row_number().over(idStateSpec).as("idStateRowNumber"))
.groupBy('Id,'State,'idRowNumber - 'idStateRowNumber)
.agg(min('StartTime).as("StartTime"), max('EndTime).as("EndTime"))
add a comment |
So it turns out that the answer to this is Combine rows when the end time of one is the start time of another (Oracle) translated to Spark.
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col,row_number
import spark.implicits._
val idSpec = Window.partitionBy('Id).orderBy('StartTime)
val idStateSpec = Window.partitionBy('Id,'State).orderBy('StartTime)
val df2 = df
.select('Id,'State,'StartTime,'EndTime,
row_number().over(idSpec).as("idRowNumber"),
row_number().over(idStateSpec).as("idStateRowNumber"))
.groupBy('Id,'State,'idRowNumber - 'idStateRowNumber)
.agg(min('StartTime).as("StartTime"), max('EndTime).as("EndTime"))
add a comment |
So it turns out that the answer to this is Combine rows when the end time of one is the start time of another (Oracle) translated to Spark.
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col,row_number
import spark.implicits._
val idSpec = Window.partitionBy('Id).orderBy('StartTime)
val idStateSpec = Window.partitionBy('Id,'State).orderBy('StartTime)
val df2 = df
.select('Id,'State,'StartTime,'EndTime,
row_number().over(idSpec).as("idRowNumber"),
row_number().over(idStateSpec).as("idStateRowNumber"))
.groupBy('Id,'State,'idRowNumber - 'idStateRowNumber)
.agg(min('StartTime).as("StartTime"), max('EndTime).as("EndTime"))
So it turns out that the answer to this is Combine rows when the end time of one is the start time of another (Oracle) translated to Spark.
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col,row_number
import spark.implicits._
val idSpec = Window.partitionBy('Id).orderBy('StartTime)
val idStateSpec = Window.partitionBy('Id,'State).orderBy('StartTime)
val df2 = df
.select('Id,'State,'StartTime,'EndTime,
row_number().over(idSpec).as("idRowNumber"),
row_number().over(idStateSpec).as("idStateRowNumber"))
.groupBy('Id,'State,'idRowNumber - 'idStateRowNumber)
.agg(min('StartTime).as("StartTime"), max('EndTime).as("EndTime"))
edited Mar 19 at 16:11
answered Mar 15 at 13:29
Jeff HornbyJeff Hornby
9,77333251
9,77333251
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55067305%2fextract-time-intervals-in-a-scala-spark-dataframe%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Possible duplicate of Spark SQL window function with complex condition
– user10465355
Mar 8 at 16:50
I looked at that question and it is a very different problem
– Jeff Hornby
Mar 8 at 17:01
In such case could edit your question and explain desired logic in detail? Additionally we always welcome reproducible example in apache-spark. Thank you in advance.
– user10465355
Mar 8 at 17:03
2
turns out the solution is this: stackoverflow.com/questions/7420618/… translated to spark
– Jeff Hornby
Mar 8 at 19:44
hi @JeffHornby, did you manage to convert this to Spark code?
– Alexandros Biratsis
Mar 15 at 8:07