1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
| package wordCount;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class wordcount { public static class MapTask extends Mapper<LongWritable, Text,Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split(","); for (String word : words) { context.write(new Text(word),new IntWritable(1)); } } } public static class ReduceTask extends Reducer<Text, IntWritable,Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count=0; for (IntWritable value : values) { count++; } context.write(key,new IntWritable(count)); } } public static void main(String[] args) throws Exception { System.setProperty("HADOOP_USER_NAME","root"); Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://192.168.120.110:9000"); Job job = Job.getInstance(conf);
job.setMapperClass(wordcount.MapTask.class); job.setReducerClass(wordcount.ReduceTask.class); job.setJarByClass(wordcount.class);
job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);
String outPath="/bigdata/output/wordcount";
FileSystem fileSystem = FileSystem.get(conf); if(fileSystem.exists(new Path(outPath))){ fileSystem.delete(new Path(outPath),true); }
FileInputFormat.addInputPath(job,new Path("/bigdata/input/wc.txt")); FileOutputFormat.setOutputPath(job,new Path("/bigdata/output/wordcount"));
boolean b = job.waitForCompletion(true); System.out.println(b?"代码没毛病!!!":"出BUG,赶快看一下!!!"); } }
|