当前位置:首页 > 云服务器 > 正文内容

combiner,国家给你分配的小哥哥的博客,企业云服务器

香港256IP千兆站群服务器BGP专线240元起! 华为云香港物理机精品线路全面上线![特价] 企业级CN2 GIA双程专线高速回国 T3机房 香港美国韩国海外独立物理服务器特价热销中!
package combiner;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class MyCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {int sum=0;for(IntWritable v:values){sum+=v.get();}context.write(key, new IntWritable(sum));}}
package combiner;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;/** * 1.继承mapper * 2.重写map * -------------------------------------------------------------- * mapper需要的四个泛型 * keyin:输入的键的类型——>这里指的是每一行起始字节偏移量long——>LongWritable * valuein:输入的值的类型——>折力值每一行的内容String——>Text * keyout:输出的键的类型——>这里指的是每个单词String——>Text * valueout:输出的值的类型——>折离职单词出现的次数int——>Intwritable * ---------------------------------------------------------------- * */public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {/** * 在map端获取每一行的内容  切分发送 * 参数:LongWritable key:输入的每一行的偏移量框架读取的 * Text text:输入的每一行的内容 * Context context:上下文对象 用于向reduce发送数据读取框架给的东西 *调用频率:一行调用一次,一个文本100行这个方法就会调用100次 */@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)throws IOException, InterruptedException {//获取每一行的内容  转换为stringString line = value.toString();//切分每一个单词 String[] words = line.split("\t");/** * 只能统计一行的  并不能统计以个文件的,全部发送到reduce端进行统计 *  * *///循环遍历每一个单词发送给reducefor(String word:words){Text k=new Text(word);IntWritable v=new IntWritable(1);//把单词发送给reducecontext.write(k,v);}}}
package combiner;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;/** * 1.继承reducer * 2.重写reduce * ------------------------------------------- * 参数四个泛型 * 输入:从map端多来的对应的map的输出类型 * keyin:reduce的输入的key的类型——>map输出的key的类型 * valuein:reduce的输入的value的类型——>map输出的value的类型 * 输出:最终的结果输出 * keyout:输出的key的类型 * valueout:输出的value的类型 * */public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {/** * 默认的分组 按照map输出的key进行分组 *  * Text text:一组中的任意的一个key * values:一组中相同的key对应的所有的value * Context context 上下文对象 向上承接map,向下输出结果  hdfs/本地 * 调用频率:一组调用一次 有多少组就调用多少次 * */@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {//循环遍历values求和int sum=0;for(IntWritable v:values){//将hadoop转换成javaIntWritable——>intsum+=v.get();}//写出//将java转换成hadoopIntWritable rv=new IntWritable(sum);context.write(key, rv);}}
package combiner;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class Driver {/** * 提交运行MapReduce的程序的 * Sring[] args 代表程序运行过程中接受的参数 * 按照参数的书序放在数组的下标中 * 第一个参数     数组下表为0的位置 * @throws IOException  * @throws InterruptedException  * @throws ClassNotFoundException  **/public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//指定代码的运行提交用户System.setProperty("HADOOP_USER_NAME", "hdp01");/**加载配置文件 * 优先加载jar包中的  hdfs-default.xml   mapred-default.xml ... * src下的   -site.xml   -default.xml * 代码中的 * 最后加载的最终生效 * */Configuration conf = new Configuration();//指定hdfs连接的urlconf.set("fs.defaultFS", "hdfs://hadoop1:9000");/** * 启动一个job * job概念;一个MapReduce程序  封装的map或reduce的相关配置项 *  * *///创建job对象Job job= Job.getInstance(conf);//job.setJarByClass(Driver.class);/** * 指定job对应的mapper类 * 指定job对应的reducer类 * */job.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);/** * 指定mapper输出的key,value * 指定reducer输出的key,value * */job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//指定添加Combiner组件job.setCombinerClass(WordCountReducer.class);/** * 指定输入、需要处理的文件、添加输入路径 *  * */FileInputFormat.addInputPath(job, new Path("hdfs://hadoop1:9000/in"));FileOutputFormat.setOutputPath(job, new Path("/out1"));//FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop1:9000/out1"));//打印提交job.waitForCompletion(true);}}

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/weixin_44405514/article/details/90319079

宝塔服务器面板,一键全能部署及管理,送你3188元礼包,点我领取

扫描二维码推送至手机访问。

版权声明:文章来源于互联网公开页面遵守互联网分享协议,若涉及侵权请联系客服处理。

本文链接:https://www.idchg.com/info/277179/

标签: combiner

发表评论

hkzhao

hkzhao

◎欢迎参与讨论,请在这里发表您的看法和观点。