In a previous post, we saw how to install R, Rstudio server and R packages on AWS EC2 Red Hat cluster to use with Hortonworks Data Platform (HDP 2.4) Hadoop distribution. Now, let’s use SparkR for data munging.
To connect to a Spark cluster from within Rstudio, we have to set the SPARK_HOME in environment. We can also specify additional libraries that we want to use. I am using Spark-CSV, a library for parsing and querying CSV data with Apache Spark, for Spark SQL and DataFrames.
Sys.setenv(SPARK_HOME="/usr/hdp/current/spark-client/",
'SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.11:1.4.0" "sparkr-shell"')
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"),"R","lib"),.libPaths()))
When loading and attaching a new package in R, it is possible to have a name conflict, where a function is masking another function. Since part of SparkR is modeled on the dplyr package, certain functions in SparkR share the same names with those in dplyr. Depending on the load order of the two packages, some functions from the package loaded first are masked by those in the package loaded after. In such case, prefix such calls with the package name, for instance, SparkR::cume_dist(x) or dplyr::cume_dist(x). Let’s load SparkR after the other packages so that its functions don’t get masked.
library(dplyr)
Attaching package: 'dplyr'
The following objects are masked from 'package:stats':
filter, lag
The following objects are masked from 'package:base':
intersect, setdiff, setequal, union
library(ggplot2)
library(magrittr)
library(SparkR)
Attaching package: 'SparkR'
The following objects are masked from 'package:dplyr':
arrange, between, collect, contains, count, cume_dist,
dense_rank, desc, distinct, explain, filter, first, group_by,
intersect, lag, last, lead, mutate, n, n_distinct, ntile,
percent_rank, rename, row_number, sample_frac, select, sql,
summarize
The following objects are masked from 'package:stats':
cov, filter, lag, na.omit, predict, sd, var
The following objects are masked from 'package:base':
colnames, colnames<-, endsWith, intersect, rank, rbind,
sample, startsWith, subset, summary, table, transform
The entry point into SparkR is the SparkContext which connects our R program to a Spark cluster. We can create a SparkContext using sparkR.init. Further, to work with DataFrames we will need a SQLContext, which can be created from the SparkContext.
sc <- SparkR::sparkR.init()
Launching java with spark-submit command /usr/hdp/current/spark-client//bin/spark-submit "--packages" "com.databricks:spark-csv_2.11:1.4.0" "sparkr-shell" /tmp/RtmppEk4US/backend_portcec6f3fc42
sqlContext <-sparkRSQL.init(sc)
Spark DataFrame operations such as filtering, grouping, aggregating, summary statistics are supported. Operations take advantage of multiple cores/machines and thus can scale to larger data than standalone R.
I am using the flights data. I downloaded the data from here and injested it Hadoop Distributed File System (HDFS). You can read my blog post on data injestion to HDFS here.
Let’s load the flights CSV file using ‘read.df’. Note, we are using the spark-csv library.
flights <- read.df(sqlContext, "hdfs:///tmp/nycflights13.csv", "com.databricks.spark.csv", header="true")
Let’s see the class, dimensions, and the first few recods of flights. We see that the functions below are available in both R and SparkR.
class(flights) # We can see that it of class SparkR
[1] "DataFrame"
attr(,"package")
[1] "SparkR"
printSchema(flights) # Prints out the schema in tree format
root
|-- year: string (nullable = true)
|-- month: string (nullable = true)
|-- day: string (nullable = true)
|-- dep_time: string (nullable = true)
|-- dep_delay: string (nullable = true)
|-- arr_time: string (nullable = true)
|-- arr_delay: string (nullable = true)
|-- carrier: string (nullable = true)
|-- tailnum: string (nullable = true)
|-- flight: string (nullable = true)
|-- origin: string (nullable = true)
|-- dest: string (nullable = true)
|-- air_time: string (nullable = true)
|-- distance: string (nullable = true)
|-- hour: string (nullable = true)
|-- minute: string (nullable = true)
str(flights)
'DataFrame': 16 variables:
$ year : chr "2013" "2013" "2013" "2013" "2013" "2013"
$ month : chr "1" "1" "1" "1" "1" "1"
$ day : chr "1" "1" "1" "1" "1" "1"
$ dep_time : chr "517" "533" "542" "544" "554" "554"
$ dep_delay: chr "2" "4" "2" "-1" "-6" "-4"
$ arr_time : chr "830" "850" "923" "1004" "812" "740"
$ arr_delay: chr "11" "20" "33" "-18" "-25" "12"
$ carrier : chr "UA" "UA" "AA" "B6" "DL" "UA"
$ tailnum : chr "N14228" "N24211" "N619AA" "N804JB" "N668DN" "N39463"
$ flight : chr "1545" "1714" "1141" "725" "461" "1696"
$ origin : chr "EWR" "LGA" "JFK" "JFK" "LGA" "EWR"
$ dest : chr "IAH" "IAH" "MIA" "BQN" "ATL" "ORD"
$ air_time : chr "227" "227" "160" "183" "116" "150"
$ distance : chr "1400" "1416" "1089" "1576" "762" "719"
$ hour : chr "5" "5" "5" "5" "5" "5"
$ minute : chr "17" "33" "42" "44" "54" "54"
head(flights,num=10) # Return the first NUM rows of a DataFrame as a data.frame.
year month day dep_time dep_delay arr_time arr_delay carrier tailnum
1 2013 1 1 517 2 830 11 UA N14228
2 2013 1 1 533 4 850 20 UA N24211
3 2013 1 1 542 2 923 33 AA N619AA
4 2013 1 1 544 -1 1004 -18 B6 N804JB
5 2013 1 1 554 -6 812 -25 DL N668DN
6 2013 1 1 554 -4 740 12 UA N39463
7 2013 1 1 555 -5 913 19 B6 N516JB
8 2013 1 1 557 -3 709 -14 EV N829AS
9 2013 1 1 557 -3 838 -8 B6 N593JB
10 2013 1 1 558 -2 753 8 AA N3ALAA
flight origin dest air_time distance hour minute
1 1545 EWR IAH 227 1400 5 17
2 1714 LGA IAH 227 1416 5 33
3 1141 JFK MIA 160 1089 5 42
4 725 JFK BQN 183 1576 5 44
5 461 LGA ATL 116 762 5 54
6 1696 EWR ORD 150 719 5 54
7 507 EWR FLL 158 1065 5 55
8 5708 LGA IAD 53 229 5 57
9 79 JFK MCO 140 944 5 57
10 301 LGA ORD 138 733 5 58
# If NUM is NULL, then head() returns the first 6 rows
dim(flights) # Returns the dimentions (number of rows and columns) of a DataFrame
[1] 336776 16
We can also use showDF to print the first numRows rows of a DataFrame. Defaults to 20.
showDF(flights,numRows = 25, truncate = FALSE)
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2013|1 |1 |517 |2 |830 |11 |UA |N14228 |1545 |EWR |IAH |227 |1400 |5 |17 |
|2013|1 |1 |533 |4 |850 |20 |UA |N24211 |1714 |LGA |IAH |227 |1416 |5 |33 |
|2013|1 |1 |542 |2 |923 |33 |AA |N619AA |1141 |JFK |MIA |160 |1089 |5 |42 |
|2013|1 |1 |544 |-1 |1004 |-18 |B6 |N804JB |725 |JFK |BQN |183 |1576 |5 |44 |
|2013|1 |1 |554 |-6 |812 |-25 |DL |N668DN |461 |LGA |ATL |116 |762 |5 |54 |
|2013|1 |1 |554 |-4 |740 |12 |UA |N39463 |1696 |EWR |ORD |150 |719 |5 |54 |
|2013|1 |1 |555 |-5 |913 |19 |B6 |N516JB |507 |EWR |FLL |158 |1065 |5 |55 |
|2013|1 |1 |557 |-3 |709 |-14 |EV |N829AS |5708 |LGA |IAD |53 |229 |5 |57 |
|2013|1 |1 |557 |-3 |838 |-8 |B6 |N593JB |79 |JFK |MCO |140 |944 |5 |57 |
|2013|1 |1 |558 |-2 |753 |8 |AA |N3ALAA |301 |LGA |ORD |138 |733 |5 |58 |
|2013|1 |1 |558 |-2 |849 |-2 |B6 |N793JB |49 |JFK |PBI |149 |1028 |5 |58 |
|2013|1 |1 |558 |-2 |853 |-3 |B6 |N657JB |71 |JFK |TPA |158 |1005 |5 |58 |
|2013|1 |1 |558 |-2 |924 |7 |UA |N29129 |194 |JFK |LAX |345 |2475 |5 |58 |
|2013|1 |1 |558 |-2 |923 |-14 |UA |N53441 |1124 |EWR |SFO |361 |2565 |5 |58 |
|2013|1 |1 |559 |-1 |941 |31 |AA |N3DUAA |707 |LGA |DFW |257 |1389 |5 |59 |
|2013|1 |1 |559 |0 |702 |-4 |B6 |N708JB |1806 |JFK |BOS |44 |187 |5 |59 |
|2013|1 |1 |559 |-1 |854 |-8 |UA |N76515 |1187 |EWR |LAS |337 |2227 |5 |59 |
|2013|1 |1 |600 |0 |851 |-7 |B6 |N595JB |371 |LGA |FLL |152 |1076 |6 |0 |
|2013|1 |1 |600 |0 |837 |12 |MQ |N542MQ |4650 |LGA |ATL |134 |762 |6 |0 |
|2013|1 |1 |601 |1 |844 |-6 |B6 |N644JB |343 |EWR |PBI |147 |1023 |6 |1 |
|2013|1 |1 |602 |-8 |812 |-8 |DL |N971DL |1919 |LGA |MSP |170 |1020 |6 |2 |
|2013|1 |1 |602 |-3 |821 |16 |MQ |N730MQ |4401 |LGA |DTW |105 |502 |6 |2 |
|2013|1 |1 |606 |-4 |858 |-12 |AA |N633AA |1895 |EWR |MIA |152 |1085 |6 |6 |
|2013|1 |1 |606 |-4 |837 |-8 |DL |N3739P |1743 |JFK |ATL |128 |760 |6 |6 |
|2013|1 |1 |607 |0 |858 |-17 |UA |N53442 |1077 |EWR |MIA |157 |1085 |6 |7 |
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
only showing top 25 rows
The SparkR function filter filters the rows of a DataFrame according to a given condition. Note: We are using piping (%>%) from the magrittr package. We can use collect or as.data.frame to get an R’s data.frame from Spark DataFrame.
filter(flights, flights$month == 1| flights$day == 1)%>%head(num=7) # select flights on January 1st and return 7
year month day dep_time dep_delay arr_time arr_delay carrier tailnum
1 2013 1 1 517 2 830 11 UA N14228
2 2013 1 1 533 4 850 20 UA N24211
3 2013 1 1 542 2 923 33 AA N619AA
4 2013 1 1 544 -1 1004 -18 B6 N804JB
5 2013 1 1 554 -6 812 -25 DL N668DN
6 2013 1 1 554 -4 740 12 UA N39463
7 2013 1 1 555 -5 913 19 B6 N516JB
flight origin dest air_time distance hour minute
1 1545 EWR IAH 227 1400 5 17
2 1714 LGA IAH 227 1416 5 33
3 1141 JFK MIA 160 1089 5 42
4 725 JFK BQN 183 1576 5 44
5 461 LGA ATL 116 762 5 54
6 1696 EWR ORD 150 719 5 54
7 507 EWR FLL 158 1065 5 55
filter(flights, flights$month == 1 | flights$month == 2)%>%head()# select flights on January or February
year month day dep_time dep_delay arr_time arr_delay carrier tailnum
1 2013 1 1 517 2 830 11 UA N14228
2 2013 1 1 533 4 850 20 UA N24211
3 2013 1 1 542 2 923 33 AA N619AA
4 2013 1 1 544 -1 1004 -18 B6 N804JB
5 2013 1 1 554 -6 812 -25 DL N668DN
6 2013 1 1 554 -4 740 12 UA N39463
flight origin dest air_time distance hour minute
1 1545 EWR IAH 227 1400 5 17
2 1714 LGA IAH 227 1416 5 33
3 1141 JFK MIA 160 1089 5 42
4 725 JFK BQN 183 1576 5 44
5 461 LGA ATL 116 762 5 54
6 1696 EWR ORD 150 719 5 54
We can use arrange to sort a DataFrame by the specified column(s).
arrange(flights, flights$year, flights$month, flights$day)%>%showDF(numRows =15,truncate = FALSE)
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2013|1 |1 |517 |2 |830 |11 |UA |N14228 |1545 |EWR |IAH |227 |1400 |5 |17 |
|2013|1 |1 |533 |4 |850 |20 |UA |N24211 |1714 |LGA |IAH |227 |1416 |5 |33 |
|2013|1 |1 |542 |2 |923 |33 |AA |N619AA |1141 |JFK |MIA |160 |1089 |5 |42 |
|2013|1 |1 |544 |-1 |1004 |-18 |B6 |N804JB |725 |JFK |BQN |183 |1576 |5 |44 |
|2013|1 |1 |554 |-6 |812 |-25 |DL |N668DN |461 |LGA |ATL |116 |762 |5 |54 |
|2013|1 |1 |554 |-4 |740 |12 |UA |N39463 |1696 |EWR |ORD |150 |719 |5 |54 |
|2013|1 |1 |555 |-5 |913 |19 |B6 |N516JB |507 |EWR |FLL |158 |1065 |5 |55 |
|2013|1 |1 |557 |-3 |709 |-14 |EV |N829AS |5708 |LGA |IAD |53 |229 |5 |57 |
|2013|1 |1 |557 |-3 |838 |-8 |B6 |N593JB |79 |JFK |MCO |140 |944 |5 |57 |
|2013|1 |1 |558 |-2 |753 |8 |AA |N3ALAA |301 |LGA |ORD |138 |733 |5 |58 |
|2013|1 |1 |558 |-2 |849 |-2 |B6 |N793JB |49 |JFK |PBI |149 |1028 |5 |58 |
|2013|1 |1 |558 |-2 |853 |-3 |B6 |N657JB |71 |JFK |TPA |158 |1005 |5 |58 |
|2013|1 |1 |558 |-2 |924 |7 |UA |N29129 |194 |JFK |LAX |345 |2475 |5 |58 |
|2013|1 |1 |558 |-2 |923 |-14 |UA |N53441 |1124 |EWR |SFO |361 |2565 |5 |58 |
|2013|1 |1 |559 |-1 |941 |31 |AA |N3DUAA |707 |LGA |DFW |257 |1389 |5 |59 |
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
only showing top 15 rows
# Do we want ascending or descending?
arrange(flights, flights$year, desc(flights$month), asc(flights$day))%>%showDF(numRows =15,truncate = FALSE)
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2013|9 |1 |9 |10 |343 |3 |B6 |N663JB |839 |JFK |BQN |196 |1576 |0 |9 |
|2013|9 |1 |600 |-10 |824 |-31 |AA |N3JTAA |705 |LGA |DFW |185 |1389 |6 |0 |
|2013|9 |1 |117 |152 |218 |139 |B6 |N216JB |1816 |JFK |SYR |44 |209 |1 |17 |
|2013|9 |1 |508 |-8 |717 |-43 |UA |N57869 |1545 |EWR |IAH |170 |1400 |5 |8 |
|2013|9 |1 |537 |-8 |849 |-6 |AA |N613AA |701 |JFK |MIA |148 |1089 |5 |37 |
|2013|9 |1 |537 |-8 |906 |-15 |B6 |N658JB |939 |JFK |BQN |191 |1576 |5 |37 |
|2013|9 |1 |549 |-11 |815 |-35 |UA |N27213 |1115 |EWR |TPA |133 |997 |5 |49 |
|2013|9 |1 |552 |-8 |843 |-22 |AA |N3KFAA |1895 |LGA |MIA |149 |1096 |5 |52 |
|2013|9 |1 |553 |-7 |809 |-25 |B6 |N784JB |27 |EWR |MCO |117 |937 |5 |53 |
|2013|9 |1 |554 |-6 |700 |-16 |EV |N14180 |6167 |LGA |IAD |39 |229 |5 |54 |
|2013|9 |1 |554 |-6 |803 |-20 |DL |N909DE |461 |LGA |ATL |105 |762 |5 |54 |
|2013|9 |1 |554 |-6 |657 |-18 |MQ |N516MQ |3267 |EWR |ORD |104 |719 |5 |54 |
|2013|9 |1 |555 |-5 |835 |-16 |B6 |N639JB |371 |LGA |FLL |139 |1076 |5 |55 |
|2013|9 |1 |557 |-3 |706 |-10 |EV |N909EV |5716 |JFK |IAD |43 |228 |5 |57 |
|2013|9 |1 |557 |-3 |718 |-22 |WN |N553WN |726 |LGA |STL |125 |888 |5 |57 |
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
only showing top 15 rows
select selects a set of columns with names or Column expressions.
select(flights, flights$year, flights$month, flights$day)%>%showDF(numRows =15,truncate = FALSE)
+----+-----+---+
|year|month|day|
+----+-----+---+
|2013|1 |1 |
|2013|1 |1 |
|2013|1 |1 |
|2013|1 |1 |
|2013|1 |1 |
|2013|1 |1 |
|2013|1 |1 |
|2013|1 |1 |
|2013|1 |1 |
|2013|1 |1 |
|2013|1 |1 |
|2013|1 |1 |
|2013|1 |1 |
|2013|1 |1 |
|2013|1 |1 |
+----+-----+---+
only showing top 15 rows
# select(flights, c("year", "month", "day") this also gives the same result
distinct returns a new DataFrame containing the distinct rows in this DataFrame.
distinct(select(flights, flights$tailnum))%>%dim() # See how many distinct 'tailnum' we have
[1] 4044 1
withColumn returns a new DataFrame with the specified column added.
withColumn(flights,'gain', flights$arr_delay - flights$dep_delay)%>%
select(c('year','month','day','gain'))%>%
showDF(numRows =8,truncate = FALSE)
+----+-----+---+-----+
|year|month|day|gain |
+----+-----+---+-----+
|2013|1 |1 |9.0 |
|2013|1 |1 |16.0 |
|2013|1 |1 |31.0 |
|2013|1 |1 |-17.0|
|2013|1 |1 |-19.0|
|2013|1 |1 |16.0 |
|2013|1 |1 |24.0 |
|2013|1 |1 |-11.0|
+----+-----+---+-----+
only showing top 8 rows
Sample returns a sampled subset of this DataFrame using a random seed.
collect(sample(flights, FALSE, 0.00005)) # Sample without replacement
year month day dep_time dep_delay arr_time arr_delay carrier tailnum
1 2013 1 14 642 -3 858 12 US N565UW
2 2013 1 25 1204 -1 1313 -12 WN N291WN
3 2013 10 15 1758 -2 2017 -13 DL N337NB
4 2013 12 14 2035 125 132 223 DL N3758Y
5 2013 12 28 1733 3 2055 0 AA N344AA
6 2013 3 5 1932 3 2129 -1 EV N12167
7 2013 4 15 734 14 1004 29 WN N340LV
8 2013 4 17 855 -5 1017 -8 US N750UW
9 2013 4 23 754 -1 924 14 MQ N804MQ
10 2013 4 27 1603 -2 1908 -16 DL N365NB
11 2013 5 2 1558 -1 1804 -10 MQ N659MQ
12 2013 5 16 1650 -9 1835 -31 9E N8847A
13 2013 6 7 1117 64 1320 65 EV N16183
14 2013 6 25 1257 6 1448 -2 DL N369NB
15 2013 7 26 1147 -13 1250 -28 EV N27962
16 2013 9 3 2007 -6 2222 -32 EV N17159
17 2013 9 11 NA NA NA NA UA
18 2013 9 14 943 -2 1048 -27 WN N769SW
19 2013 9 19 1237 -3 1525 -20 WN N715SW
flight origin dest air_time distance hour minute
1 926 EWR CLT 89 529 6 42
2 644 EWR MDW 114 711 12 4
3 926 EWR ATL 109 746 17 58
4 438 JFK SAN 320 2446 20 35
5 2351 JFK MIA 165 1089 17 33
6 4663 EWR CHS 97 628 19 32
7 35 EWR DEN 252 1605 7 34
8 2167 LGA DCA 41 214 8 55
9 4418 JFK DCA 61 213 7 54
10 1433 LGA RSW 158 1080 16 3
11 3985 JFK CVG 85 589 15 58
12 3881 EWR CVG 85 569 16 50
13 4255 EWR CHS 101 628 11 17
14 1131 LGA DTW 78 502 12 57
15 3838 EWR ROC 42 246 11 47
16 4141 EWR OKC 172 1325 20 7
17 708 EWR ORD NA 719 NA NA
18 1123 LGA MDW 102 725 9 43
19 42 LGA HOU 195 1428 12 37
collect(sample(flights, TRUE, 0.00005)) # Sample with replacement
year month day dep_time dep_delay arr_time arr_delay carrier tailnum
1 2013 10 30 730 -15 1031 -12 B6 N630JB
2 2013 10 30 1506 -9 1808 -2 AA N587AA
3 2013 11 1 640 0 838 0 EV N14203
4 2013 12 19 1828 -2 2030 -17 EV N751EV
5 2013 12 23 1023 24 1252 24 9E N8501F
6 2013 2 5 852 -7 1136 -9 DL N642DL
7 2013 2 7 1326 -4 1611 6 DL N37700
8 2013 2 24 723 -9 1022 -16 UA N429UA
9 2013 4 2 631 1 747 -2 EV N16918
10 2013 4 14 849 -6 1035 -34 DL N766NC
11 2013 4 19 1811 31 1950 15 WN N785SW
12 2013 4 26 1830 118 2020 129 EV N13161
13 2013 6 14 2228 63 44 45 UA N440UA
14 2013 6 30 835 0 951 -14 EV N13913
15 2013 9 1 1827 39 2058 23 B6 N638JB
16 2013 9 3 1451 -5 1645 8 9E N8800G
flight origin dest air_time distance hour minute
1 1717 LGA TPA 155 1010 7 30
2 320 EWR DFW 204 1372 15 6
3 4166 EWR GSP 96 594 6 40
4 5268 LGA CLT 87 544 18 28
5 3518 LGA TYS 120 647 10 23
6 1747 LGA ATL 114 762 8 52
7 2043 JFK ATL 135 760 13 26
8 395 LGA IAH 214 1416 7 23
9 4533 EWR BUF 54 282 6 31
10 2239 EWR DTW 87 488 8 49
11 1596 EWR STL 135 872 18 11
12 4702 EWR GSO 69 445 18 30
13 407 EWR IAH 179 1400 22 28
14 5824 EWR RIC 52 277 8 35
15 299 LGA MCO 128 950 18 27
16 3868 JFK ORF 51 290 14 51
Count the number of records for each group:
count(groupBy(flights, "carrier"))%>%collect()%>% dplyr::mutate(carrier=factor(carrier,levels = carrier[order(count,decreasing =T)]))%>%
ggplot(aes(x=carrier, y=count))+geom_bar(stat="identity",fill='sky blue')
Group the flights by destination and aggregate
agg(group_by(flights, flights$carrier),
count = n(flights$carrier), maximum =max(flights$distance),
minimum =min(flights$distance),mean =mean(flights$distance),
sample_standard_deviation=stddev_samp(flights$distance)
)%>%showDF()
+-------+-----+-------+-------+------------------+-------------------------+
|carrier|count|maximum|minimum| mean|sample_standard_deviation|
+-------+-----+-------+-------+------------------+-------------------------+
| AA|32729| 944| 1005|1340.2359986556264| 637.736152529242|
| HA| 342| 4983| 4983| 4983.0| 0.0|
| AS| 714| 2402| 2402| 2402.0| 0.0|
| B6|54635| 997| 1005| 1068.621524663677| 703.705659773323|
| UA|58665| 997| 1008|1529.1148725816074| 798.8046815611141|
| US|20536| 96| 17| 553.4562719127387| 583.8225238535786|
| OO| 32| 733| 1008| 500.8125| 206.1722482967626|
| VX| 5162| 2586| 2248|2499.4821774506004| 88.0488899750204|
| WN|12275| 888| 1167| 996.269083503055| 410.42960818312923|
| DL|48110| 963| 1005|1236.9012055705675| 660.1724143675561|
| EV|54173| 997| 1008| 562.9917301977| 287.48815905858027|
| F9| 685| 1620| 1620| 1620.0| 1.137426313741963...|
| 9E|18460| 964| 1005| 530.235752979415| 321.79870716218636|
| YV| 601| 96| 229| 375.0332778702163| 159.71957370363523|
| FL| 3260| 762| 397| 664.8294478527607| 160.8880361106722|
| MQ|26397| 888| 1005| 569.5327120506118| 226.2287070772906|
+-------+-----+-------+-------+------------------+-------------------------+
Save as CSV:
sampled =sample(flights, FALSE, 0.01)
write.df(sampled, "I_sampled_it", "com.databricks.spark.csv", mode = "overwrite")
Save as parquet
write.df(sampled, "I_sampled_it", "parquet", "overwrite")
We can also save the contents of a DataFrame as a Parquet file, preserving the schema, using write.parquet. Files written out with this method can be read back in as a DataFrame using read.parquet().
What if we want to save the contents of a DataFrame as a JSON file.
write.json(sampled, "I_sampled_it")
Note: files written out with this method can be read back in as a DataFrame using read.json().
sparkR.stop()