如何生成javapairrdd 转换集合D

Java Code Example org.apache.spark.api.java.JavaPairRDD
Java Code Examples for org.apache.spark.api.java.JavaPairRDD
The following are top voted examples for showing how to use
org.apache.spark.api.java.JavaPairRDD. These examples are extracted from open source projects.
You can vote up the examples you like and your votes will be used in our system to product
more good examples.
+ Save this class to your library
public JavaRDD&Tuple2&String, Integer&& run(final JavaADAMContext ac, final JavaRDD&AlignmentRecord& recs, final String args) {
JavaRDD&String& contigNames = recs.map(new Function&AlignmentRecord, String&() {
public String call(final AlignmentRecord rec) {
return rec.getReadMapped() ? rec.getReadName() : &unmapped&;
JavaPairRDD&String, Integer& counts = contigNames.mapToPair(new PairFunction&String, String, Integer&() {
public Tuple2&String, Integer& call(final String readName) {
return new Tuple2&String, Integer&(readName, Integer.valueOf(1));
JavaPairRDD&String, Integer& reducedCounts = counts.reduceByKey(new Function2&Integer, Integer, Integer&() {
public Integer call(final Integer value0, final Integer value1) {
return Integer.valueOf(value0.intValue() + value1.intValue());
seems like there should be a more direct way
return JavaRDD.fromRDD(reducedCounts.rdd(), null);
public static void ain(String[] args) {
String inputFileName = &samples/big.txt& ;
String outputDirName = &output& ;
SparkConf conf = new SparkConf().setAppName(&org.sparkexample.WordCount&).setMaster(&local&);
JavaSparkContext context = new JavaSparkContext(conf);
JavaRDD&String& file = context.textFile(inputFileName);
JavaRDD&String& words = file.flatMap(WORDS_EXTRACTOR);
JavaPairRDD&String, Integer& pairs = words.mapToPair(WORDS_MAPPER);
JavaPairRDD&String, Integer& counter = pairs.reduceByKey(WORDS_REDUCER);
counter.saveAsTextFile(outputDirName);
public void run(@DataIn(name=&example_events&, type=ExampleEvent.class) View&ExampleEvent& input,
@DataOut(name=&odd_users&, type=ExampleEvent.class) View&ExampleEvent& output) throws IOException {
Job job = Job.getInstance(getJobContext().getHadoopConf());
DatasetKeyInputFormat.configure(job).readFrom(input);
DatasetKeyOutputFormat.configure(job).writeTo(output);
JavaPairRDD&ExampleEvent, Void& inputData = getJobContext()
.getSparkContext()
.newAPIHadoopRDD(job.getConfiguration(), DatasetKeyInputFormat.class,
ExampleEvent.class, Void.class);
JavaPairRDD&ExampleEvent, Void& filteredData = inputData.filter(new KeepOddUsers());
filteredData.saveAsNewAPIHadoopDataset(job.getConfiguration());
* @param in
public static JavaPairRDD&MatrixIndexes, Double& sumCellsByKey( JavaPairRDD&MatrixIndexes, Double& in )
//sum of blocks per key, w/o exploitation of corrections
return in.reduceByKey(
new SumDoubleCellsFunction());
public static void main(String[] args) throws Exception {
// create a spark context object
JavaSparkContext ctx = createJavaSparkContext();
List&Tuple2&String,String&& listR = new ArrayList&Tuple2&String,String&&();
listR.add(new Tuple2&String,String&(&a1&, &a2&));
listR.add(new Tuple2&String,String&(&b1&, &b2&));
listR.add(new Tuple2&String,String&(&c1&, &c2&));
List&Tuple2&String,String&& listS = new ArrayList&Tuple2&String,String&&();
listS.add(new Tuple2&String,String&(&d1&, &d2&));
listS.add(new Tuple2&String,String&(&e1&, &e2&));
listS.add(new Tuple2&String,String&(&f1&, &f2&));
listS.add(new Tuple2&String,String&(&g1&, &g2&));
// create two RDD's
JavaPairRDD&String,String& R = ctx.parallelizePairs(listR);
JavaPairRDD&String,String& S = ctx.parallelizePairs(listS);
// &U& JavaPairRDD&T,U& cartesian(JavaRDDLike&U,?& other)
// Return the Cartesian product of this RDD and another one,
// that is, the RDD of all pairs of elements (a, b)
// where a is in this and b is in other.
JavaPairRDD&Tuple2&String,String&, Tuple2&String,String&& cart = R.cartesian(S);
// save the result
cart.saveAsTextFile(&/output/z&);
ctx.close();
System.exit(0);
private static JavaPairRDD&Integer,Iterable&Rating&& predictAll(
MatrixFactorizationModel mfModel,
JavaRDD&Rating& data,
JavaPairRDD&Integer,Integer& userProducts) {
@SuppressWarnings(&unchecked&)
RDD&Tuple2&Object,Object&& userProductsRDD =
(RDD&Tuple2&Object,Object&&) (RDD&?&) userProducts.rdd();
return data.wrapRDD(mfModel.predict(userProductsRDD)).groupBy(new Function&Rating,Integer&() {
public Integer call(Rating r) {
return r.user();
public JavaRDD&Tuple2&String, Integer&& run(final JavaADAMContext ac, final JavaRDD&AlignmentRecord& recs, final String args) {
JavaRDD&String& contigNames = recs.map(new Function&AlignmentRecord, String&() {
public String call(final AlignmentRecord rec) {
return rec.getReadMapped() ? rec.getContig().getContigName() : &unmapped&;
JavaPairRDD&String, Integer& counts = contigNames.mapToPair(new PairFunction&String, String, Integer&() {
public Tuple2&String, Integer& call(final String contigName) {
return new Tuple2&String, Integer&(contigName, Integer.valueOf(1));
JavaPairRDD&String, Integer& reducedCounts = counts.reduceByKey(new Function2&Integer, Integer, Integer&() {
public Integer call(final Integer value0, final Integer value1) {
return Integer.valueOf(value0.intValue() + value1.intValue());
seems like there should be a more direct way
return JavaRDD.fromRDD(reducedCounts.rdd(), null);
public int run(SparkConf conf, CommandLine cli) throws Exception {
String zkHost = cli.getOptionValue(&zkHost&, &localhost:9983&);
String collection = cli.getOptionValue(&collection&, &collection1&);
String queryStr = cli.getOptionValue(&query&, &*:*&);
JavaSparkContext jsc = new JavaSparkContext(conf);
SolrRDD solrRDD = new SolrRDD(zkHost, collection);
final SolrQuery solrQuery = SolrRDD.toQuery(queryStr);
JavaRDD&SolrDocument& solrJavaRDD = solrRDD.query(jsc.sc(), solrQuery);
JavaRDD&String& words = solrJavaRDD.flatMap(new FlatMapFunction&SolrDocument, String&() {
public Iterable&String& call(SolrDocument doc) {
Object tweet_s = doc.get(&text_t&);
String str = tweet_s != null ? tweet_s.toString() : &&;
str = str.toLowerCase().replaceAll(&[.,!?\n]&, & &).trim();
return Arrays.asList(str.split(& &));
JavaPairRDD&String, Integer& ones = words.mapToPair(new PairFunction&String, String, Integer&() {
public Tuple2&String, Integer& call(String s) {
return new Tuple2&String, Integer&(s, 1);
JavaPairRDD&String, Integer& counts = ones.reduceByKey(new Function2&Integer, Integer, Integer&() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
for (Tuple2&?,?& tuple : counts.top(20, new WordCountSorter()))
System.out.println(tuple._1() + &: & + tuple._2());
// Now use schema information in Solr to build a queryable SchemaRDD
SQLContext sqlContext = new SQLContext(jsc);
// Pro Tip: SolrRDD will figure out the schema if you don't supply a list of field names in your query
Map&String, String& options = new HashMap&String, String&();
options.put(&zkhost&, zkHost);
options.put(&collection&, collection);
options.put(&query&, queryStr);
DataFrame df = sqlContext.read().format(&solr&).options(options).load();
long numEchos = df.filter(df.col(&type_s&).equalTo(&echo&)).count();
System.out.println(&numEchos && &+numEchos);
jsc.stop();
* @param in
public static JavaPairRDD&MatrixIndexes, MatrixBlock& sumByKey( JavaPairRDD&MatrixIndexes, MatrixBlock& in )
//sum of blocks per key, w/o exploitation of correction blocks
return in.reduceByKey(
new SumMultiBlockFunction());
Example 10
public int wordCount(DataStore&String,WebPage& inStore,
DataStore&String, TokenDatum& outStore) throws IOException {
//Spark engine initialization
GoraSparkEngine&String, WebPage& goraSparkEngine = new GoraSparkEngine&&(String.class,
WebPage.class);
SparkConf sparkConf = new SparkConf().setAppName(
&Gora Spark Word Count Application&).setMaster(&local&);
Class[] c = new Class[1];
c[0] = inStore.getPersistentClass();
sparkConf.registerKryoClasses(c);
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaPairRDD&String, WebPage& goraRDD = goraSparkEngine.initialize(sc, inStore);
long count = goraRDD.count();
(&Total Web page count: {}&, count);
JavaRDD&Tuple2&String, Long&& mappedGoraRdd = goraRDD.values().map(mapFunc);
JavaPairRDD&String, Long& reducedGoraRdd = JavaPairRDD.fromJavaRDD(mappedGoraRdd).reduceByKey(redFunc);
//Print output for debug purpose
(&SparkWordCount debug purpose TokenDatum print starts:&);
Map&String, Long& tokenDatumMap = reducedGoraRdd.collectAsMap();
for (String key : tokenDatumMap.keySet()) {
(tokenDatumMap.get(key).toString());
(&SparkWordCount debug purpose TokenDatum print ends:&);
//write output to datastore
Configuration sparkHadoopConf = goraSparkEngine.generateOutputConf(outStore);
reducedGoraRdd.saveAsNewAPIHadoopDataset(sparkHadoopConf);
Example 11
public void run(@DataIn(name=&source.users&) View&GenericRecord& input,
@DataOut(name=&target.users&) View&GenericRecord& output) throws IOException {
Job job = Job.getInstance();
DatasetKeyInputFormat.configure(job).readFrom(input);
DatasetKeyOutputFormat.configure(job).writeTo(output);
@SuppressWarnings(&unchecked&)
JavaPairRDD&GenericData.Record, Void& inputData = getJobContext()
.getSparkContext()
.newAPIHadoopRDD(job.getConfiguration(), DatasetKeyInputFormat.class,
GenericData.Record.class, Void.class);
inputData.saveAsNewAPIHadoopDataset(job.getConfiguration());
Example 12
* Save the contents of the given RDD to the given view.
* @param rdd
* @param uri
public static void save(JavaPairRDD rdd, String uri,
Configuration conf) {
// Copy configuration to avoid side effects for the caller.
Configuration outputConf = new Configuration(conf);
Job job = Job.getInstance(outputConf);
DatasetKeyOutputFormat.configure(job).writeTo(uri);
// Save non-empty RDDs.
if (!rdd.isEmpty())
rdd.saveAsNewAPIHadoopDataset(job.getConfiguration());
} catch (IOException e) {
throw new AppException(e);
Example 13
public SparkClusterResult doCluster(DataSet ds) {
// SparkDataSet needs to be passed in
SparkDataSet rdd = (SparkDataSet)
// cache dataset in memory
rdd.getRDD().cache();
distFunc = new DistanceFunction(this.typeDefs);
ClusterFactory clusterFactory = new ClusterFactory(this.typeDefs, this.onlineUpdate);
(&Starting threshold clusterer with threshold {}&, threshold);
// TODO look at using a reduce function
// Idea is the first step is a map&Instance, List&Instance&& that converts each instance to a single &cluster&
// second step is a reduce where input is a List&Instances& and produces a List&Instances&
// this step would merge clusters within threshold
JavaPairRDD&String, Instance& instances = rdd.getRDD();
instances.cache();
// convert each instance into a singleton cluster
JavaRDD&Map&String, Instance&& singletons = rdd.getRDD().map( new InstanceToClusterFunction(clusterFactory) );
//singletons.cache();
(&Generated initial singleton clusters&);
// merge clusters together
Map&String, Instance& clusters = singletons.reduce( new AggregateClusterFunction(distFunc, threshold) );
(&Merging clusters completed with {} clusters&, clusters.size());
// find the best cluster for each instance
JavaPairRDD&String, Instance& bestCluster = instances.mapToPair( new BestClusterFunction(distFunc, clusters) );
(&Output results&);
if (clusters != null && centroidsPath != null) rdd.getContext().parallelize(new ArrayList&Instance&(clusters.values())).saveAsTextFile(centroidsPath);
if (bestCluster != null && clustersPath != null) bestCluster.saveAsTextFile(clustersPath);
(&Threshold clusterer completed&);
// return the cluster membership rdd
return new SparkClusterResult(bestCluster);
Example 14
public static void main(String[] args) throws Exception {
// STEP-1: handle input parameters
if (args.length != 2) {
System.err.println(&Usage: Top10UsingTakeOrdered &input-path& &topN&&);
System.exit(1);
System.out.println(&args[0]: &input-path&=&+args[0]);
System.out.println(&args[1]: &topN&=&+args[1]);
final String inputPath = args[0];
final int N = Integer.parseInt(args[1]);
// STEP-2: create a Java Spark Context object
JavaSparkContext ctx = SparkUtil.createJavaSparkContext();
// STEP-3: create an RDD from input
input record format:
&string-key&&,&&integer-value-count&
JavaRDD&String& lines = ctx.textFile(inputPath, 1);
lines.saveAsTextFile(&/output/1&);
// STEP-4: partition RDD
// public JavaRDD&T& coalesce(int numPartitions)
// Return a new RDD that is reduced into numPartitions partitions.
JavaRDD&String& rdd = lines.coalesce(9);
// STEP-5: map input(T) into (K,V) pair
// PairFunction&T, K, V&
// T =& Tuple2&K, V&
JavaPairRDD&String,Integer& kv = rdd.mapToPair(new PairFunction&String,String,Integer&() {
public Tuple2&String,Integer& call(String s) {
String[] tokens = s.split(&,&); // url,789
return new Tuple2&String,Integer&(tokens[0], Integer.parseInt(tokens[1]));
kv.saveAsTextFile(&/output/2&);
// STEP-6: reduce frequent K's
JavaPairRDD&String, Integer& uniqueKeys = kv.reduceByKey(new Function2&Integer, Integer, Integer&() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
uniqueKeys.saveAsTextFile(&/output/3&);
// STEP-7: find final top-N by calling takeOrdered()
List&Tuple2&String, Integer&& topNResult = uniqueKeys.takeOrdered(N, MyTupleComparator.INSTANCE);
// STEP-8: emit final top-N
for (Tuple2&String, Integer& entry : topNResult) {
System.out.println(entry._2 + &--& + entry._1);
System.exit(0);
Example 15
public AnalysisResultFuture run() {
final AnalysisJob analysisJob = _sparkJobContext.getAnalysisJob();
final Datastore datastore = analysisJob.getDatastore();
final JavaRDD&InputRow& inputRowsRDD = openSourceDatastore(datastore);
final JavaPairRDD&String, NamedAnalyzerResult& namedAnalyzerResultsRDD;
if (_sparkJobContext.getAnalysisJobBuilder().isDistributable()) {
(&Running the job in distributed mode&);
// TODO: We have yet to get more experience with this setting - do a
// benchmark of what works best, true or false.
final boolean preservePartitions =
final JavaRDD&Tuple2&String, NamedAnalyzerResult&& processedTuplesRdd = inputRowsRDD
.mapPartitionsWithIndex(new RowProcessingFunction(_sparkJobContext), preservePartitions);
final JavaPairRDD&String, NamedAnalyzerResult& partialNamedAnalyzerResultsRDD = processedTuplesRdd
.mapPartitionsToPair(new TuplesToTuplesFunction&String, NamedAnalyzerResult&(), preservePartitions);
namedAnalyzerResultsRDD = partialNamedAnalyzerResultsRDD.reduceByKey(new AnalyzerResultReduceFunction(
_sparkJobContext));
logger.warn(&Running the job in non-distributed mode&);
JavaRDD&InputRow& coalescedInputRowsRDD = inputRowsRDD.coalesce(1);
namedAnalyzerResultsRDD = coalescedInputRowsRDD.mapPartitionsToPair(new RowProcessingFunction(
_sparkJobContext));
final JavaPairRDD&String, AnalyzerResult& finalAnalyzerResultsRDD = namedAnalyzerResultsRDD
.mapValues(new ExtractAnalyzerResultFunction());
// log analyzer results
final List&Tuple2&String, AnalyzerResult&& results = finalAnalyzerResultsRDD.collect();
(&Finished! Number of AnalyzerResult objects: {}&, results.size());
for (Tuple2&String, AnalyzerResult& analyzerResultTuple : results) {
final String key = analyzerResultTuple._1;
final AnalyzerResult result = analyzerResultTuple._2;
(&AnalyzerResult: & + key + &-&& + result);
// log accumulators
final Map&String, Accumulator&Integer&& accumulators = _sparkJobContext.getAccumulators();
for (Entry&String, Accumulator&Integer&& entry : accumulators.entrySet()) {
final String name = entry.getKey();
final Accumulator&Integer& accumulator = entry.getValue();
(&Accumulator: {} -& {}&, name, accumulator.value());
return new SparkAnalysisResultFuture(results);
Example 16
public static void main(String[] args) throws Exception {
printArguments(args);
if (args.length != 2) {
System.err.println(&Usage: CharCount &input& &output&&);
System.exit(1);
// handle input parameters
final String inputPath = args[0];
final String outputPath = args[1];
// create a context object, which is used
// as a factory for creating new RDDs
JavaSparkContext context = new JavaSparkContext();
// read input and create the first RDD
JavaRDD&String& lines = context.textFile(inputPath);
JavaPairRDD&Character,Long& chars = lines.flatMapToPair(new PairFlatMapFunction&String, Character, Long&() {
public Iterable&Tuple2&Character,Long&& call(String s) {
if ((s == null) || (s.length() == 0)) {
return Collections.emptyList();
String[] words = s.split(& &);
List&Tuple2&Character,Long&& list = new ArrayList&Tuple2&Character,Long&&();
for (String
word : words) {
char[] arr = word.toLowerCase().toCharArray();
for (char c : arr) {
list.add(new Tuple2&Character, Long&(c, 1l));
// find the total count for each unique char
JavaPairRDD&Character, Long& counts =
chars.reduceByKey(new Function2&Long, Long, Long&() {
public Long call(Long i1, Long i2) {
return i1 + i2;
// sort and save the final output
counts.sortByKey().saveAsTextFile(outputPath);
// close the context and we are done
context.close();
System.exit(0);
Example 17
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println(&Usage: SparkCharCount &input& &output&&);
System.exit(1);
// handle input parameters
final String inputPath = args[0];
final String outputPath = args[1];
// create a context object, which is used
// as a factory for creating new RDDs
JavaSparkContext context = new JavaSparkContext();
// read input and create the first RDD
JavaRDD&String& lines = context.textFile(inputPath, 1);
JavaPairRDD&Character,Long& chars = lines.flatMapToPair(new PairFlatMapFunction&String, Character, Long&() {
public Iterable&Tuple2&Character,Long&& call(String s) {
if ((s == null) || (s.length() == 0)) {
return Collections.emptyList();
Map&Character,Long& map = new HashMap&Character,Long&();
String[] words = s.split(& &);
for (String
word : words) {
char[] arr = word.toLowerCase().toCharArray();
for (char c : arr) {
Long count = map.get(c);
if (count == null) {
map.put(c, 1l);
map.put(c, count+1);
return toListOfKeyValuePairs(map);
// find the total count for each unique char
JavaPairRDD&Character, Long& counts =
chars.reduceByKey(new Function2&Long, Long, Long&() {
public Long call(Long i1, Long i2) {
return i1 + i2;
// sort and save the final output
counts.sortByKey().saveAsTextFile(outputPath);
// close the context and we are done
context.close();
System.exit(0);
Example 18
public int run(String[] args) throws Exception {
DataStore&Long, Pageview& inS
DataStore&String, MetricDatum& outS
Configuration hadoopConf = new Configuration();
if (args.length & 0) {
String dataStoreClass = args[0];
inStore = DataStoreFactory.getDataStore(
dataStoreClass, Long.class, Pageview.class, hadoopConf);
if (args.length & 1) {
dataStoreClass = args[1];
outStore = DataStoreFactory.getDataStore(
dataStoreClass, String.class, MetricDatum.class, hadoopConf);
inStore = DataStoreFactory.getDataStore(Long.class, Pageview.class, hadoopConf);
outStore = DataStoreFactory.getDataStore(String.class, MetricDatum.class, hadoopConf);
//Spark engine initialization
GoraSparkEngine&Long, Pageview& goraSparkEngine = new GoraSparkEngine&&(Long.class,
Pageview.class);
SparkConf sparkConf = new SparkConf().setAppName(
&Gora Spark Integration Application&).setMaster(&local&);
Class[] c = new Class[1];
c[0] = inStore.getPersistentClass();
sparkConf.registerKryoClasses(c);
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaPairRDD&Long, Pageview& goraRDD = goraSparkEngine.initialize(sc, inStore);
long count = goraRDD.count();
(&Total Log Count: {}&, count);
JavaRDD&Tuple2&Tuple2&String, Long&, Long&& mappedGoraRdd = goraRDD
.values().map(mapFunc);
JavaPairRDD&String, MetricDatum& reducedGoraRdd = JavaPairRDD
.fromJavaRDD(mappedGoraRdd).reduceByKey(redFunc).mapToPair(metricFunc);
(&MetricDatum count: {}&, reducedGoraRdd.count());
//Print output for debug purpose
Map&String, MetricDatum& metricDatumMap = reducedGoraRdd.collectAsMap();
for (String key : metricDatumMap.keySet()) {
System.out.println(key);
//write output to datastore
Configuration sparkHadoopConf = goraSparkEngine.generateOutputConf(outStore);
reducedGoraRdd.saveAsNewAPIHadoopDataset(sparkHadoopConf);
inStore.close();
outStore.close();
(&Log completed with success&);
Example 19
public static void main(String[] args) throws Exception {
// STEP-1: handle input parameters
if (args.length != 3) {
System.err.println(&Usage: &N& &input-path& &output-path& &);
System.exit(1);
// if a word.length & N, that word will be ignored
final int N = Integer.parseInt(args[0]);
System.out.println(&args[0]: N=&+N);
// identify I/O paths
String inputPath = args[1];
String outputPath = args[2];
System.out.println(&args[1]: &input-path&=&+inputPath);
System.out.println(&args[2]: &output-path&=&+outputPath);
// STEP-2: create an instance of JavaSparkContext
JavaSparkContext ctx = new JavaSparkContext();
// STEP-3: create an RDD for input
// input record format:
word1 word2 word3 ...
JavaRDD&String& lines = ctx.textFile(inputPath, 1);
// STEP-4: create (K, V) pairs from input
// K = sorted(word)
// V = word
JavaPairRDD&String, String& rdd = lines.flatMapToPair(
new PairFlatMapFunction&String, String, String&() {
public Iterable&Tuple2&String, String&& call(String line) {
if ((line == null) || (line.length() & N)) {
return Collections.EMPTY_LIST;
String[] words = StringUtils.split(line);
if (words == null) {
return Collections.EMPTY_LIST;
List&Tuple2&String, String&& results = new ArrayList&Tuple2&String, String&&();
for (String word : words) {
if (word.length() & N) {
// ignore strings with less than size N
if (word.matches(&.*[,.;]$&)) {
// remove the special char from the end
word = word.substring(0, word.length() -1);
if (word.length() & N) {
// ignore strings with less than size N
char[] wordChars = word.toCharArray();
Arrays.sort(wordChars);
String sortedWord = new String(wordChars);
results.add(new Tuple2&String,String&(sortedWord, word));
// STEP-5: create anagrams
JavaPairRDD&String, Iterable&String&& anagrams = rdd.groupByKey();
// STEP-6: save output
anagrams.saveAsTextFile(outputPath);
// STEP-7: done
ctx.close();
System.exit(0);}

我要回帖

更多关于 javapairrdd 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信