196 242 3 881250949 186 302 3 891717742 22 377 1 878887116 244 51 2 880606923 166 346 1 886397596 298 474 4 884182806 115 265 2 881171488 253 465 5 891628467 305 451 3 886324817 6 86 3 883603013 62 257 2 879372434 286 1014 5 879781125 200 222 5 876042340 210 40 3 891035994 224 29 3 888104457 303 785 3 879485318 122 387 5 879270459 194 274 2 879539794 291 1042 4 874834944 234 1184 2 892079237 119 392 4 886176814 167 486 4 892738452 299 144 4 877881320 291 118 2 874833878 308 1 4 887736532 95 546 2 879196566 38 95 5 892430094 102 768 2 883748450 63 277 4 875747401 160 234 5 876861185 50 246 3 877052329 301 98 4 882075827 225 193 4 879539727 290 88 4 880731963 97 194 3 884238860 157 274 4 886890835 181 1081 1 878962623 278 603 5 891295330 276 796 1 874791932 7 32 4 891350932 10 16 4 877888877 284 304 4 885329322 201 979 2 884114233 276 564 3 874791805 287 327 5 875333916 246 201 5 884921594 242 1137 5 879741196 249 241 5 879641194 99 4 5 886519097 178 332 3 882823437 251 100 4 886271884 81 432 2 876535131 260 322 4 890618898 25 181 5 885853415 59 196 5 888205088 72 679 2 880037164
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.test</groupId> <artifactId>initiation-spark-java</artifactId> <version>1.0.0-SNAPSHOT</version> <repositories> <repository> <id>Apache Spark temp - Release Candidate repo</id> <url>https://repository.apache.org/content/repositories/orgapachespark-1080/</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>1.3.1</version> <!--<scope>provided</scope>--><!-- cette partie là a été omise dans notre projet pour pouvoir lancer depuis maven notre projet --> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>1.3.1</version> <!--<scope>provided</scope>--> </dependency> </dependencies> <build> <plugins> <!-- we want JDK 1.8 source and binary compatiblility --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
package spark; import java.io.Serializable; import java.time.LocalDateTime; // Spark nécessite des structures sérializables !! public class Rating implements Serializable { public int rating; public long movie; public long user; public LocalDateTime timestamp; public Rating(long user, long movie, int rating, LocalDateTime timestamp) { this.user = user; this.movie = movie; this.rating = rating; this.timestamp = timestamp; } }
package spark; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.net.URISyntaxException; import java.nio.file.Paths; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.Comparator; import java.util.Scanner; /** * Calcule la moyenne, le min, le max et le nombre de votes de l'utilisateur n°200. */ public class Workshop1 { public void run() throws URISyntaxException { SparkConf conf = new SparkConf().setAppName("Workshop").setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); String ratingsPath = Paths.get(getClass().getResource("/ratings.txt").getPath()).toString(); JavaRDD<Rating> ratings = sc.textFile(ratingsPath) .map(line -> line.split("\\t")) .map(row -> new Rating( Long.parseLong(row[0]), Long.parseLong(row[1]), Integer.parseInt(row[2]), LocalDateTime.ofInstant(Instant.ofEpochSecond(Long.parseLong(row[3]) * 1000), ZoneId.systemDefault()) )); double mean = ratings .filter(rating -> rating.user == 200) .mapToDouble(rating -> rating.rating) .mean(); double max = ratings .filter(rating -> rating.user == 200) .mapToDouble(rating -> rating.rating) .max(Comparator.<Double>naturalOrder()); double min = ratings .filter(rating -> rating.user == 200) .mapToDouble(rating -> rating.rating) .min(Comparator.<Double>naturalOrder()); double count = ratings .filter(rating -> rating.user == 200) .count(); System.out.println("mean: " + mean); System.out.println("max: " + max); System.out.println("min: " + min); System.out.println("count: " + count); Scanner s=new Scanner(System.in); s.hasNextLine(); } public static void main(String... args) throws URISyntaxException { new Workshop1().run(); } }