开发文章

Hadoop 提取KPI 进行海量Web日志分析

Web日志包含着网站最重要的信息,通过日志分析,我们可以知道网站的访问量,哪个网页访问人数最多,哪个网页最有价值等。一般中型的网站(10W的PV以上),每天会产生1G以上Web日志文件。大型或超大型的网站,可能每小时就会产生10G的数据量。

  • Web日志分析概述
  • 需求分析:KPI指标设计
  • 算法模型:Hadoop并行算法
  • 架构设计:日志KPI系统架构
  • 程序开发:MapReduce程序实现

1. Web日志分析概述

Web日志由Web服务器产生,可能是Nginx, Apache, Tomcat等。从Web日志中,我们可以获取网站每类页面的PV值(PageView,页面访问量)、独立IP数;稍微复杂一些的,可以计算得出用户所 检索的关键词排行榜、用户停留时间最高的页面等;更复杂的,构建广告点击模型、分析用户行为特征等等。

在Web日志中,每条日志通常代表着用户的一次访问行为,例如下面就是一条nginx日志:

复制内容到剪贴板
  1. 222.68.172.190 - - [18/Sep/2013:06:49:57 +0000] "GET /images/my.jpg HTTP/1.1" 200 19939  
  2.  "http://www.angularjs.cn/A00n" "Mozilla/5.0 (Windows NT 6.1)  
  3.  AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"  

拆解为以下8个变量

  • remote_addr: 记录客户端的ip地址, 222.68.172.190
  • remote_user: 记录客户端用户名称, –
  • time_local: 记录访问时间与时区, [18/Sep/2013:06:49:57 +0000]
  • request: 记录请求的url与http协议, “GET /images/my.jpg HTTP/1.1”
  • status: 记录请求状态,成功是200, 200
  • body_bytes_sent: 记录发送给客户端文件主体内容大小, 19939
  • http_referer: 用来记录从那个页面链接访问过来的, “http://www.angularjs.cn/A00n”
  • http_user_agent: 记录客户浏览器的相关信息, “Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36”

注:要更多的信息,则要用其它手段去获取,通过js代码单独发送请求,使用cookies记录用户的访问信息。

利用这些日志信息,我们可以深入挖掘网站的秘密了。

少量数据的情况

少量数据的情况(10Mb,100Mb,10G),在单机处理尚能忍受的时候,我可以直接利用各种Unix/Linux工具,awk、grep、sort、join等都是日志分析的利器,再配合perl, Python,正则表达工,基本就可以解决所有的问题。

例如,我们想从上面提到的nginx日志中得到访问量最高前10个IP,实现很简单:

复制内容到剪贴板
  1. ~ cat access.log.10 | awk '{a[$1]++} END {for(b in a) print b"\t"a[b]}' | sort -k2 -r | head -n 10  
  2. 163.177.71.12   972  
  3. 101.226.68.137  972  
  4. 183.195.232.138 971  
  5. 50.116.27.194   97  
  6. 14.17.29.86     96  
  7. 61.135.216.104  94  
  8. 61.135.216.105  91  
  9. 61.186.190.41   9  
  10. 59.39.192.108   9  
  11. 220.181.51.212  9  

海量数据的情况

当数据量每天以10G、100G增长的时候,单机处理能力已经不能满足需求。我们就需要增加系统的复杂性,用计算机集群,存储阵列来解决。在Hadoop出现之前,海量数据存储,和海量日志分析都是非常困难的。只有少数一些公司,掌握着高效的并行计算,分步式计算,分步式存储的核心技术。

Hadoop的出现,大幅度的降低了海量数据处理的门槛,让小公司甚至是个人都能力,搞定海量数据。并且,Hadoop非常适用于日志分析系统。

2.需求分析:KPI指标设计

下面我们将从一个公司案例出发来全面的解释,如何用进行 海量Web日志分析,提取KPI数据 。

案例介绍

某电子商务网站,在线团购业务。每日PV数100w,独立IP数5w。用户通常在工作日上午10:00-12:00和下午15:00-18:00访 问量最大。日间主要是通过PC端浏览器访问,休息日及夜间通过移动设备访问较多。网站搜索浏量占整个网站的80%,PC用户不足1%的用户会消费,移动用 户有5%会消费。

通过简短的描述,我们可以粗略地看出,这家电商网站的经营状况,并认识到愿意消费的用户从哪里来,有哪些潜在的用户可以挖掘,网站是否存在倒闭风险等。

KPI指标设计

  • PV(PageView): 页面访问量统计
  • IP: 页面独立IP的访问量统计
  • Time: 用户每小时PV的统计
  • Source: 用户来源域名的统计
  • Browser: 用户的访问设备统计

从商业的角度,个人网站的特征与电商网站不太一样,没有转化率,同时跳出率也比较高。从技术的角度,同样都关注KPI指标设计。

3.算法模型:Hadoop并行算法

并行算法的设计:

PV(PageView): 页面访问量统计

Map过程{key:request,value:1}
Reduce过程{key:request,value:求和(sum)}

IP: 页面独立IP的访问量统计

Map: {key:request,value:remote_addr}
Reduce: {key:request,value:去重再求和(sum(unique))}

Time: 用户每小时PV的统计

Map: {key:time_local,value:1}
Reduce: {key:time_local,value:求和(sum)}

Source: 用户来源域名的统计

Map: {key:http_referer,value:1}
Reduce: {key:http_referer,value:求和(sum)}

Browser: 用户的访问设备统计

Map: {key:http_user_agent,value:1}
Reduce: {key:http_user_agent,value:求和(sum)}

4.架构设计:日志KPI系统架构

日志KPI系统架构.png

上图中,左边是Application业务系统,右边是Hadoop的HDFS, MapReduce。

1.日志是由业务系统产生的,我们可以设置web服务器每天产生一个新的目录,目录下面会产生多个日志文件,每个日志文件64M。
2.设置系统定时器CRON,夜间在0点后,向HDFS导入昨天的日志文件。
3.完成导入后,设置系统定时器,启动MapReduce程序,提取并计算统计指标。
4.完成计算后,设置系统定时器,从HDFS导出统计指标数据到数据库,方便以后的即使查询。

从HDFS导出统计指标数据到数据库.png

上面这幅图,我们可以看得更清楚,数据是如何流动的。蓝色背景的部分是在Hadoop中的,接下来我们的任务就是完成MapReduce的程序实现。

5.程序开发2:MapReduce程序实现

开发流程:

  • 对日志行的解析
  • Map函数实现
  • Reduce函数实现
  • 启动程序实现

    1). 对日志行的解析

新建文件:org.apache.hadoop.mr.kpi

对日志行的解析.jpg

整体代码

复制内容到剪贴板
  1. package org.apache.hadoop.mr.kpi;  
  2.   
  3. import java.text.ParseException;  
  4. import java.text.SimpleDateFormat;  
  5. import java.util.Date;  
  6. import java.util.HashSet;  
  7. import java.util.Locale;  
  8. import java.util.Set;  
  9.   
  10. public class KPI {  
  11.   
  12.     /** 
  13.      * 20160512 
  14.      * @author yue 
  15.      */  
  16.     private String remote_addr; //记录客户端的IP地址  
  17.     private String remote_user;   //记录客户端用户名称,忽略属性“-”  
  18.     private String time_local;  //记录访问时间与时区  
  19.     private String request;  //记录请求的URL和http协议  
  20.     private String status;  //记录请求状态,成功是200  
  21.     private String body_bytes_sent;  //记录发送给客户端文件主体内容大小  
  22.     private String http_referer; //用来记录从哪个页面链接访问过来的  
  23.     private String http_user_agent; //记录客户浏览器的相关信息  
  24.     private boolean valid = true ; //判断数据是否合法  
  25.   
  26.     private static KPI parser(String line){  
  27.         System.out.println(line);  
  28.         KPI kpi = new KPI();  
  29.         String[] arr = line.split(" ");  
  30.         if (arr.length>11){  
  31.             kpi.setRemote_addr(arr[0]);  
  32.             kpi.setRemote_user(arr[1]);  
  33.             kpi.setTime_local(arr[3].substring(1));  
  34.             kpi.setRequest(arr[6]);  
  35.             kpi.setStatus(arr[8]);  
  36.             kpi.setBody_bytes_sent(arr[9]);  
  37.             kpi.setHttp_referer(arr[10]);  
  38.   
  39.             if(arr.length>12){  
  40.                 kpi.setHttp_user_agent(arr[11] + " " + arr[12]);  
  41.             } else {  
  42.                 kpi.setHttp_user_agent(arr[11]);  
  43.             }  
  44.   
  45.             if(Integer.parseInt(kpi.getStatus()) >= 400){  
  46.                 //大于400,http錯誤  
  47.                 kpi.setValid(false);  
  48.             }  
  49.         }else{  
  50.             kpi.setValid(false);  
  51.         }  
  52.         return kpi;  
  53.     }  
  54.     /** 
  55.      * 按page的pv分类 
  56.      * pageview:页面访问量统计 
  57.      * @return 
  58.      */  
  59.     public static KPI filterPVs(String line){  
  60.         KPI kpi = parser(line);  
  61.         Set<String> pages = new HashSet<String>();  
  62.         pages.add("/about/");  
  63.         pages.add("/black-ip-clustor/");  
  64.         pages.add("/cassandra-clustor/");  
  65.         pages.add("/finance-rhive-repurchase/");  
  66.         pages.add("/hadoop-familiy-roadmap/");  
  67.         pages.add("/hadoop-hive-intro/");  
  68.         pages.add("/hadoop-zookeeper-intro/");  
  69.         pages.add("/hadoop-mahout-roadmap/");  
  70.   
  71.         if(!pages.contains(kpi.getRequest())){  
  72.             kpi.setValid(false);  
  73.         }  
  74.         return kpi;  
  75.     }  
  76.     /** 
  77.      * 按page的独立IP分类 
  78.      * @return 
  79.      */  
  80.     public static KPI filterIPs(String line){  
  81.         KPI kpi = parser(line);  
  82.         Set<String> pages = new HashSet<String>();  
  83.         pages.add("/about/");  
  84.         pages.add("/black-ip-clustor/");  
  85.         pages.add("/cassandra-clustor/");  
  86.         pages.add("/finance-rhive-repurchase/");  
  87.         pages.add("/hadoop-familiy-roadmap/");  
  88.         pages.add("/hadoop-hive-intro/");  
  89.         pages.add("/hadoop-zookeeper-intro/");  
  90.         pages.add("/hadoop-mahout-roadmap/");  
  91.         if (!pages.contains(kpi.getRequest())){  
  92.             kpi.setValid(false);  
  93.         }  
  94.         return kpi;  
  95.     }  
  96.     /** 
  97.      * PV按浏览器分类 
  98.      * @return 
  99.      */  
  100.     public static KPI filterBroswer(String line){  
  101.         return parser(line);  
  102.     }  
  103.   
  104.     /** 
  105.      * PV按小时分类 
  106.      * @return 
  107.      */  
  108.     public static KPI filterTime(String line){  
  109.         return parser(line);  
  110.     }  
  111.   
  112.     /** 
  113.      * Pv按访问域名分类 
  114.      * @return 
  115.      */  
  116.   
  117.     public static KPI filterDomain(String line){  
  118.         return parser(line);  
  119.     }  
  120.   
  121.   
  122.     public String getRemote_addr() {  
  123.         return remote_addr;  
  124.     }  
  125.   
  126.   
  127.     public void setRemote_addr(String remote_addr) {  
  128.         this.remote_addr = remote_addr;  
  129.     }  
  130.   
  131.   
  132.     public String getRemote_user() {  
  133.         return remote_user;  
  134.     }  
  135.   
  136.   
  137.     public void setRemote_user(String remote_user) {  
  138.         this.remote_user = remote_user;  
  139.     }  
  140.   
  141.   
  142.     public String getTime_local() {  
  143.         return time_local;  
  144.     }  
  145.   
  146.   
  147.     public Date getTime_local_Date() throws ParseException{  
  148.         SimpleDateFormat df = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",Locale.US);  
  149.         return df.parse(this.time_local);   
  150.     }  
  151.   
  152.   
  153.     public String getTime_local_Date_hour() throws ParseException{  
  154.         SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH");  
  155.         return df.format(this.getTime_local_Date());  
  156.     }  
  157.   
  158.   
  159.     public void setTime_local(String time_local) {  
  160.         this.time_local = time_local;  
  161.     }  
  162.   
  163.   
  164.     public String getRequest() {  
  165.         return request;  
  166.     }  
  167.   
  168.   
  169.     public void setRequest(String request) {  
  170.         this.request = request;  
  171.     }  
  172.   
  173.   
  174.     public String getStatus() {  
  175.         return status;  
  176.     }  
  177.   
  178.   
  179.     public void setStatus(String status) {  
  180.         this.status = status;  
  181.     }  
  182.   
  183.   
  184.     public String getBody_bytes_sent() {  
  185.         return body_bytes_sent;  
  186.     }  
  187.   
  188.   
  189.     public void setBody_bytes_sent(String body_bytes_sent) {  
  190.         this.body_bytes_sent = body_bytes_sent;  
  191.     }  
  192.   
  193.   
  194.     public String getHttp_referer() {  
  195.         return http_referer;  
  196.     }  
  197.   
  198.   
  199.     public String getHttp_referer_domain(){  
  200.         if(http_referer.length()<8){  
  201.             return http_referer;  
  202.         }  
  203.   
  204.         String str = this.http_referer.replace("\\", "").replace("http://", "").replace("https://", "");  
  205.         return str.indexOf("/")>0?str.substring(0, str.indexOf("/")):str; 
  206.     } 
  207.  
  208.  
  209.     public void setHttp_referer(String http_referer) { 
  210.         this.http_referer = http_referer; 
  211.     } 
  212.  
  213.  
  214.     public String getHttp_user_agent() { 
  215.         return http_user_agent; 
  216.     } 
  217.  
  218.  
  219.     public void setHttp_user_agent(String http_user_agent) { 
  220.         this.http_user_agent = http_user_agent; 
  221.     } 
  222.  
  223.  
  224.     public boolean isValid() { 
  225.         return valid; 
  226.     } 
  227.  
  228.  
  229.     public void setValid(boolean valid) { 
  230.         this.valid = valid; 
  231.     } 
  232.  
  233.     @Override 
  234.     public String toString() { 
  235.         StringBuilder sb = new StringBuilder(); 
  236.         sb.append("valid:" + this.valid); 
  237.         sb.append("\nremote_addr:" + this.remote_addr); 
  238.         sb.append("\nremote_user:" + this.remote_user); 
  239.         sb.append("\ntime_local:" + this.time_local); 
  240.         sb.append("\nrequest:" + this.request); 
  241.         sb.append("\nstatus:" + this.status); 
  242.         sb.append("\nbody_bytes_sent:" + this.body_bytes_sent); 
  243.         sb.append("\nhttp_referer:" + this.http_referer); 
  244.         sb.append("\nhttp_user_agent:" + this.http_user_agent); 
  245.         return super.toString(); 
  246.     } 
  247.  
  248.  
  249.     public static void main(String[] args) { 
  250.         String line = "222.68.172.190 - - [18/Sep/2013:06:49:57 +0000] \"GET /images/my.jpg HTTP/1.1\" 200 19939 \"http://www.angularjs.cn/A00n\" \"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36\"";  
  251.         System.out.println(line);  
  252.         KPI kpi = new KPI();  
  253.         String[] arr = line.split(" ");  
  254.   
  255.         kpi.setRemote_addr(arr[0]);  
  256.         kpi.setRemote_user(arr[1]);  
  257.         kpi.setTime_local(arr[3].substring(1));  
  258.         kpi.setRequest(arr[6]);  
  259.         kpi.setStatus(arr[8]);  
  260.         kpi.setBody_bytes_sent(arr[9]);  
  261.         kpi.setHttp_referer(arr[10]);  
  262.         kpi.setHttp_user_agent(arr[11] + " " + arr[12]);  
  263.         System.out.println(kpi);  
  264.   
  265.         try {  
  266.             SimpleDateFormat df = new SimpleDateFormat("yyyy.MM.dd:HH:mm:ss",Locale.US);  
  267.             System.out.println(df.format(kpi.getTime_local_Date()));  
  268.             System.out.println(kpi.getTime_local_Date_hour());  
  269.             System.out.println(kpi.getHttp_referer_domain());  
  270.         } catch (ParseException e) {  
  271.             e.printStackTrace();  
  272.         }  
  273.     }  
  274. }  

从日志文件中,取一行通过main函数写一个简单的解析测试。

控制台输出:

控制台输出.png

我们看到日志行,被正确的解析成了kpi对象的属性。我们把解析过程,单独封装成一个方法。

复制内容到剪贴板
  1. private static KPI parser(String line) {  
  2.       System.out.println(line);  
  3.       KPI kpi = new KPI();  
  4.       String[] arr = line.split(" ");  
  5.       if (arr.length > 11) {  
  6.           kpi.setRemote_addr(arr[0]);  
  7.           kpi.setRemote_user(arr[1]);  
  8.           kpi.setTime_local(arr[3].substring(1));  
  9.           kpi.setRequest(arr[6]);  
  10.           kpi.setStatus(arr[8]);  
  11.           kpi.setBody_bytes_sent(arr[9]);  
  12.           kpi.setHttp_referer(arr[10]);  
  13.   
  14.           if (arr.length > 12) {  
  15.               kpi.setHttp_user_agent(arr[11] + " " + arr[12]);  
  16.           } else {  
  17.               kpi.setHttp_user_agent(arr[11]);  
  18.           }  
  19.   
  20.           if (Integer.parseInt(kpi.getStatus()) >= 400) {// 大于400,HTTP错误  
  21.               kpi.setValid(false);  
  22.           }  
  23.       } else {  
  24.           kpi.setValid(false);  
  25.       }  
  26.       return kpi;  
  27.   }  

对map方法,reduce方法,启动方法,我们单独写一个类来实现

下面将分别介绍MapReduce的实现类:

  • PV:org.apache.hadoop.mr.kpi.KPIPV.java
  • IP: org.apache.hadoop.mr.kpi.KPIIP.java
  • Time: org.apache.hadoop.mr.kpi.KPITime.java
  • Browser: org.apache.hadoop.mr.kpi.KPIBrowser.java

1). PV:org.apache.hadoop.mr.kpi.KPIPV.Java

复制内容到剪贴板
  1. package org.apache.hadoop.mr.kpi;  
  2.   
  3. import java.io.IOException;  
  4. import java.util.Iterator;  
  5.   
  6. import org.apache.hadoop.fs.Path;  
  7. import org.apache.hadoop.io.IntWritable;  
  8. import org.apache.hadoop.io.Text;  
  9. import org.apache.hadoop.mapred.FileInputFormat;  
  10. import org.apache.hadoop.mapred.FileOutputFormat;  
  11. import org.apache.hadoop.mapred.JobClient;  
  12. import org.apache.hadoop.mapred.JobConf;  
  13. import org.apache.hadoop.mapred.MapReduceBase;  
  14. import org.apache.hadoop.mapred.Mapper;  
  15. import org.apache.hadoop.mapred.OutputCollector;  
  16. import org.apache.hadoop.mapred.Reducer;  
  17. import org.apache.hadoop.mapred.Reporter;  
  18. import org.apache.hadoop.mapred.TextInputFormat;  
  19. import org.apache.hadoop.mapred.TextOutputFormat;  
  20.   
  21. public class KPIPV {  
  22.   
  23.     /** 
  24.      * @author yue 
  25.      * 20160512 
  26.      */  
  27.     public static class KPIPVMapper extends MapReduceBase implements Mapper<Object ,Text ,Text,IntWritable>{  
  28.   
  29.         private IntWritable one = new IntWritable(1);  
  30.         private Text word = new Text();  
  31.         public void map(Object key, Text value,  
  32.                 OutputCollector<Text, IntWritable> output, Reporter reporter)  
  33.                 throws IOException {  
  34.             KPI kpi = KPI.filterPVs(value.toString());  
  35.             if(kpi.isValid()){  
  36.                 word.set(kpi.getRequest());  
  37.                 output.collect(word, one);  
  38.             }  
  39.         }  
  40.     }  
  41.     public static class KPIPVReducer extends MapReduceBase implements Reducer<Text,IntWritable,Text,IntWritable>{  
  42.   
  43.         private IntWritable result = new IntWritable();  
  44.   
  45.         public void reduce(Text key, Iterator<IntWritable> values,  
  46.                 OutputCollector<Text, IntWritable> output, Reporter reporter)  
  47.                 throws IOException {  
  48.             int sum = 0;  
  49.             while(values.hasNext()){  
  50.                 sum += values.next().get();  
  51.             }  
  52.             result.set(sum);  
  53.             output.collect(key, result);  
  54.         }  
  55.     }  
  56.   
  57.     public static void main(String[] args) throws Exception{  
  58.         String input =  "hdfs://192.168.37.134:9000/user/hdfs/log_kpi";  
  59.         String output = "hdfs://192.168.37.134:9000/user/hdfs/log_kpi/pv";  
  60.   
  61.         JobConf conf = new JobConf(KPIPV.class);  
  62.         conf.setJobName("KPIPV");  
  63.   
  64.         conf.setMapOutputKeyClass(Text.class);  
  65.         conf.setMapOutputValueClass(IntWritable.class);  
  66.   
  67.         conf.setOutputKeyClass(Text.class);  
  68.         conf.setOutputValueClass(IntWritable.class);  
  69.   
  70.         conf.setMapperClass(KPIPVMapper.class);  
  71.         conf.setCombinerClass(KPIPVReducer.class);  
  72.         conf.setReducerClass(KPIPVReducer.class);  
  73.   
  74.         conf.setInputFormat(TextInputFormat.class);  
  75.         conf.setOutputFormat(TextOutputFormat.class);  
  76.   
  77.         FileInputFormat.setInputPaths(conf, new Path(input));  
  78.         FileOutputFormat.setOutputPath(conf, new Path(output));  
  79.   
  80.         JobClient.runJob(conf);  
  81.         System.exit(0);  
  82.     }  
  83. }  

 

在程序中会调用KPI类的方法

KPI kpi = KPI.filterPVs(value.toString());

我们运行一下KPIPV.java
用hadoop命令查看HDFS文件

复制内容到剪贴板
  1. ~ hadoop fs -cat /user/hdfs/log_kpi/pv/part-00000  
  2.   
  3. /about  5  
  4. /black-ip-list/ 2  
  5. /cassandra-clustor/     3  
  6. /finance-rhive-repurchase/      13  
  7. /hadoop-family-roadmap/ 13  
  8. /hadoop-hive-intro/     14  
  9. /hadoop-mahout-roadmap/ 20  
  10. /hadoop-zookeeper-intro/        6  

2). IP: org.apache.hadoop.mr.kpi.KPIIP.java

复制内容到剪贴板
  1. package org.apache.hadoop.mr.kpi;  
  2.   
  3. import java.io.IOException;  
  4. import java.util.HashSet;  
  5. import java.util.Iterator;  
  6. import java.util.Set;  
  7.   
  8. import org.apache.hadoop.fs.Path;  
  9. import org.apache.hadoop.io.Text;  
  10. import org.apache.hadoop.mapred.FileInputFormat;  
  11. import org.apache.hadoop.mapred.FileOutputFormat;  
  12. import org.apache.hadoop.mapred.JobClient;  
  13. import org.apache.hadoop.mapred.JobConf;  
  14. import org.apache.hadoop.mapred.MapReduceBase;  
  15. import org.apache.hadoop.mapred.Mapper;  
  16. import org.apache.hadoop.mapred.OutputCollector;  
  17. import org.apache.hadoop.mapred.Reducer;  
  18. import org.apache.hadoop.mapred.Reporter;  
  19. import org.apache.hadoop.mapred.TextInputFormat;  
  20. import org.apache.hadoop.mapred.TextOutputFormat;  
  21. import org.apache.hadoop.mr.kpi.KPIIP.KPIIPMapper.KPIIPReducer;  
  22.   
  23. public class KPIIP {  
  24.   
  25.     /** 
  26.      * @author yue 
  27.      * 20160512 
  28.      */  
  29.     public static class KPIIPMapper extends MapReduceBase implements Mapper<Object,Text,Text,Text>{  
  30.   
  31.         private Text word = new Text();  
  32.         private Text ips = new Text();  
  33.   
  34.         public void map(Object key, Text value,  
  35.                 OutputCollector<Text, Text> output, Reporter reporter)  
  36.                 throws IOException {  
  37.             KPI kpi = KPI.filterIPs(value.toString());  
  38.             if(kpi.isValid()){  
  39.                 word.set(kpi.getRequest());  
  40.                 ips.set(kpi.getRemote_addr());  
  41.                 output.collect(word, ips);  
  42.             }  
  43.         }  
  44.     public static class KPIIPReducer extends MapReduceBase implements Reducer<Text,Text,Text,Text>{  
  45.   
  46.         private Text result = new Text();  
  47.         private Set<String>count = new HashSet<String>();  
  48.   
  49.         public void reduce(Text key, Iterator<Text> values,  
  50.                 OutputCollector<Text, Text> output, Reporter reporter)  
  51.                 throws IOException {  
  52.             while(values.hasNext()){  
  53.                 count.add(values.next().toString());  
  54.             }  
  55.             result.set(String.valueOf(count.size()));  
  56.             output.collect(key, result);  
  57.         }  
  58.   
  59.     }  
  60.     }  
  61.     public static void main(String[] args) throws Exception{  
  62.         String input =  "hdfs://192.168.37.134:9000/user/hdfs/log_kpi";  
  63.         String output = "hdfs://192.168.37.134:9000/user/hdfs/log_kpi/ip";  
  64.   
  65.         JobConf conf = new JobConf(KPIIP.class);  
  66.         conf.setJobName("KPIIP");  
  67.   
  68.         conf.setMapOutputKeyClass(Text.class);  
  69.         conf.setMapOutputValueClass(Text.class);  
  70.   
  71.         conf.setOutputKeyClass(Text.class);  
  72.         conf.setOutputValueClass(Text.class);  
  73.   
  74.         conf.setMapperClass(KPIIPMapper.class);  
  75.         conf.setCombinerClass(KPIIPReducer.class);  
  76.         conf.setReducerClass(KPIIPReducer.class);  
  77.   
  78.         conf.setInputFormat(TextInputFormat.class);  
  79.         conf.setOutputFormat(TextOutputFormat.class);  
  80.   
  81.         FileInputFormat.setInputPaths(conf, new Path(input));  
  82.         FileOutputFormat.setOutputPath(conf, new Path(output));  
  83.   
  84.         JobClient.runJob(conf);  
  85.         System.exit(0);  
  86.   
  87.     }  
  88.   
  89. }  

3). Time: org.apache.hadoop.mr.kpi.KPITime.java

复制内容到剪贴板
  1. package org.apache.hadoop.mr.kpi;  
  2.   
  3. import java.io.IOException;  
  4. import java.text.ParseException;  
  5. import java.util.Iterator;  
  6.   
  7. import org.apache.hadoop.fs.Path;  
  8. import org.apache.hadoop.io.IntWritable;  
  9. import org.apache.hadoop.io.Text;  
  10. import org.apache.hadoop.mapred.FileInputFormat;  
  11. import org.apache.hadoop.mapred.FileOutputFormat;  
  12. import org.apache.hadoop.mapred.JobClient;  
  13. import org.apache.hadoop.mapred.JobConf;  
  14. import org.apache.hadoop.mapred.MapReduceBase;  
  15. import org.apache.hadoop.mapred.Mapper;  
  16. import org.apache.hadoop.mapred.OutputCollector;  
  17. import org.apache.hadoop.mapred.Reducer;  
  18. import org.apache.hadoop.mapred.Reporter;  
  19. import org.apache.hadoop.mapred.TextInputFormat;  
  20. import org.apache.hadoop.mapred.TextOutputFormat;  
  21.   
  22. public class KPITime {  
  23.   
  24.     /** 
  25.      * @author yue 20160512 
  26.      */  
  27.     public static class KPITimeMapper extends MapReduceBase implements  
  28.             Mapper<Object, Text, Text, IntWritable> {  
  29.   
  30.         private IntWritable one = new IntWritable(1);  
  31.         private Text word = new Text();  
  32.   
  33.         public void map(Object key, Text value,  
  34.                 OutputCollector<Text, IntWritable> output, Reporter reporter)  
  35.                 throws IOException {  
  36.             KPI kpi = KPI.filterTime(value.toString());  
  37.             if(kpi.isValid()){  
  38.                 try {  
  39.                     word.set(kpi.getTime_local_Date_hour());  
  40.                     output.collect(word, one);  
  41.                 } catch (ParseException e) {  
  42.                     e.printStackTrace();  
  43.                 }  
  44.             }  
  45.         }  
  46.     }  
  47.   
  48.     public static class KPITimeReducer extends MapReduceBase implements   
  49.             Reducer<Text,IntWritable,Text,IntWritable>{  
  50.   
  51.         private IntWritable result = new IntWritable();  
  52.   
  53.         public void reduce(Text key, Iterator<IntWritable> values,  
  54.                 OutputCollector<Text, IntWritable> output, Reporter reporter)  
  55.                 throws IOException {  
  56.             int sum = 0;  
  57.             while(values.hasNext()){  
  58.                 sum+=values.next().get();  
  59.             }  
  60.             result.set(sum);  
  61.             output.collect(key, result);  
  62.         }  
  63.     }  
  64.   
  65.     public static void main(String[] args) throws Exception{  
  66.          String input =  "hdfs://192.168.37.134:9000/user/hdfs/log_kpi";  
  67.             String output = "hdfs://192.168.37.134:9000/user/hdfs/log_kpi/time";  
  68.   
  69.             JobConf conf = new JobConf(KPITime.class);  
  70.             conf.setJobName("KPITime");  
  71.   
  72.             conf.setOutputKeyClass(Text.class);  
  73.             conf.setOutputValueClass(IntWritable.class);  
  74.   
  75.             conf.setMapperClass(KPITimeMapper.class);  
  76.             conf.setCombinerClass(KPITimeReducer.class);  
  77.             conf.setReducerClass(KPITimeReducer.class);  
  78.   
  79.             conf.setInputFormat(TextInputFormat.class);  
  80.             conf.setOutputFormat(TextOutputFormat.class);  
  81.   
  82.             FileInputFormat.setInputPaths(conf, new Path(input));  
  83.             FileOutputFormat.setOutputPath(conf, new Path(output));  
  84.   
  85.             JobClient.runJob(conf);  
  86.             System.exit(0);  
  87.     }  
  88.   
  89. }  

4). Browser: org.apache.hadoop.mr.kpi.KPIBrowser.java

复制内容到剪贴板
  1. package org.apache.hadoop.mr.kpi;  
  2.   
  3. import java.io.IOException;  
  4. import java.util.Iterator;  
  5.   
  6. import org.apache.hadoop.fs.Path;  
  7. import org.apache.hadoop.io.IntWritable;  
  8. import org.apache.hadoop.io.Text;  
  9. import org.apache.hadoop.mapred.FileInputFormat;  
  10. import org.apache.hadoop.mapred.FileOutputFormat;  
  11. import org.apache.hadoop.mapred.JobClient;  
  12. import org.apache.hadoop.mapred.JobConf;  
  13. import org.apache.hadoop.mapred.MapReduceBase;  
  14. import org.apache.hadoop.mapred.Mapper;  
  15. import org.apache.hadoop.mapred.OutputCollector;  
  16. import org.apache.hadoop.mapred.Reducer;  
  17. import org.apache.hadoop.mapred.Reporter;  
  18. import org.apache.hadoop.mapred.TextInputFormat;  
  19. import org.apache.hadoop.mapred.TextOutputFormat;  
  20.   
  21.   
  22. public class KPIBrowser {  
  23.   
  24.     /** 
  25.      * 20160512 
  26.      * @author yue 
  27.      */  
  28.         public static class KPIBrowserMapper extends MapReduceBase implements Mapper<Object,Text,Text,IntWritable>{  
  29.             private IntWritable one = new IntWritable(1);  
  30.             private Text word = new Text();  
  31.   
  32.             public void map(Object key,Text value,OutputCollector<Text,IntWritable> output , Reporter reporter) throws IOException{  
  33.                 KPI kpi = KPI.filterBroswer(value.toString());  
  34.                 if(kpi.isValid()){  
  35.                     word.set(kpi.getHttp_user_agent());  
  36.                     output.collect(word, one);  
  37.                 }  
  38.             }  
  39.         }  
  40.     public static class KPIBrowserReducer extends MapReduceBase implements Reducer<Text,IntWritable,Text,IntWritable>{  
  41.         private IntWritable result = new IntWritable();  
  42.   
  43.         public void reduce(Text key, Iterator<IntWritable> values,OutputCollector<Text, IntWritable> output, Reporter reporter)throws IOException {  
  44.             int sum = 0;  
  45.             while(values.hasNext()){  
  46.                 sum+= values.next().get();  
  47.             }  
  48.             result.set(sum);  
  49.             output.collect(key, result);  
  50.         }  
  51.   
  52.     }  
  53.     public static void main(String[] args) throws Exception{  
  54.         String input =  "hdfs://192.168.37.134:9000/user/hdfs/log_kpi";  
  55.         String output = "hdfs://192.168.37.134:9000/user/hdfs/log_kpi/browser";  
  56.   
  57.         JobConf conf = new JobConf(KPIBrowser.class);  
  58.         conf.setJobName("KPIBrowser");  
  59.   
  60.         conf.setOutputKeyClass(Text.class);  
  61.         conf.setOutputValueClass(IntWritable.class);  
  62.   
  63.         conf.setMapperClass(KPIBrowserMapper.class);  
  64.         conf.setCombinerClass(KPIBrowserReducer.class);  
  65.         conf.setReducerClass(KPIBrowserReducer.class);  
  66.   
  67.         conf.setInputFormat(TextInputFormat.class);  
  68.         conf.setOutputFormat(TextOutputFormat.class);  
  69.   
  70.         FileInputFormat.setInputPaths(conf, new Path(input));  
  71.         FileOutputFormat.setOutputPath(conf, new Path(output));  
  72.   
  73.         JobClient.runJob(conf);  
  74.         System.exit(0);  
  75.     }  
  76.   
  77. }  

 

感谢 lzxyzq 支持 磐实编程网 原文地址:
blog.csdn.net/lzxyzq/article/details/51388258

文章信息

发布时间:2016-05-14

作者:lzxyzq

发布者:aquwcw

浏览次数: