关于Hadoop平台,网上有很多的资料,但是比较零碎,为了方便大家对这个平台有着充分的了解,笔者在此系统的介绍一下这个平台。
1、什么是Hadoop?
(1)Hadoop是一个开源的框架,可编写和运行分布式应用处理大规模数据,是专为离线和大规模数据分析而设计的,并不适合那种对几个记录随机读写的在线事务处理模式。Hadoop=HDFS(文件系统,数据存储技术相关)+ Mapreduce(数据处理),Hadoop的数据来源可以是任何形式,在处理半结构化和非结构化数据上与关系型数据库相比有更好的性能,具有更灵活的处理能力,不管任何数据形式最终会转化为key/value,key/value是基本数据单元。用函数式变成Mapreduce代替SQL,SQL是查询语句,而Mapreduce则是使用脚本和代码,而对于适用于关系型数据库,习惯SQL的Hadoop有开源工具hive代替。
(2)Hadoop就是一个分布式计算的解决方案.
hadoop能做什么?
hadoop擅长日志分析,facebook就用Hive来进行日志分析,2009年时facebook就有非编程人员的30%的人使用HiveQL进行数据分析;淘宝搜索中的自定义筛选也使用的Hive;利用Pig还可以做高级的数据处理,包括Twitter、LinkedIn上用于发现您可能认识的人,可以实现类似Amazon.com的协同过滤的推荐效果。淘宝的商品推荐也是!在Yahoo!的40%的Hadoop作业是用pig运行的,包括垃圾邮件的识别和过滤,还有用户特征建模。(2012年8月25新更新,天猫的推荐系统是hive,少量尝试mahout!)
下面举例说明:
设想一下这样的应用场景. 我有一个100M 的数据库备份的sql 文件.我现在想在不导入到数据库的情况下直接用grep操作通过正则过滤出我想要的内容。例如:某个表中 含有相同关键字的记录那么有几种方式,一种是直接用linux的命令 grep 还有一种就是通过编程来读取文件,然后对每行数据进行正则匹配得到结果好了 现在是100M 的数据库备份.上述两种方法都可以轻松应对.
那么如果是1G , 1T 甚至 1PB 的数据呢 ,上面2种方法还能行得通吗? 答案是不能.毕竟单台服务器的性能总有其上限.那么对于这种 超大数据文件怎么得到我们想要的结果呢?
有种方法 就是分布式计算, 分布式计算的核心就在于 利用分布式算法把运行在单台机器上的程序扩展到多台机器上并行运行.从而使数据处理能力成倍增加.但是这种分布式计算一般对编程人员要求很高,而且对服务器也有要求.导致了成本变得非常高.
Haddop 就是为了解决这个问题诞生的.Haddop 可以很轻易的把 很多linux的廉价pc 组成 分布式结点,然后编程人员也不需要知道分布式算法之类,只需要根据mapreduce的规则定义好接口方法,剩下的就交给Haddop. 它会自动把相关的计算分布到各个结点上去,然后得出结果.
例如上述的例子 : Hadoop 要做的事 首先把 1PB的数据文件导入到 HDFS中, 然后编程人员定义好 map和reduce, 也就是把文件的行定义为key,每行的内容定义为value , 然后进行正则匹配,匹配成功则把结果 通过reduce聚合起来返回.Hadoop 就会把这个程序分布到N 个结点去并行的操作.
那么原本可能需要计算好几天,在有了足够多的结点之后就可以把时间缩小到几小时之内.
这也就是所谓的 大数据云计算了.如果还是不懂的话再举个简单的例子
比如1亿个1 相加 得出计算结果, 我们很轻易知道结果是 1亿.但是计算机不知道.那么单台计算机处理的方式做一个一亿次的循环每次结果+1
那么分布式的处理方式则变成 我用 1万台 计算机,每个计算机只需要计算 1万个 1 相加 然后再有一台计算机把 1万台计算机得到的结果再相加
从而得到最后的结果.
理论上讲, 计算速度就提高了 1万倍. 当然上面可能是一个不恰当的例子.但所谓分布式,大数据,云计算 大抵也就是这么回事了。
2、基本工作原理
Hadoop核心
Hadoop的核心就是HDFS和MapReduce,而两者只是理论基础,不是具体可使用的高级应用,Hadoop旗下有很多经典子项目,比如HBase、Hive等,这些都是基于HDFS和MapReduce发展出来的。要想了解Hadoop,就必须知道HDFS和MapReduce是什么。
HDFS
HDFS(Hadoop Distributed File System,Hadoop分布式文件系统),它是一个高度容错性的系统,适合部署在廉价的机器上。HDFS能提供高吞吐量的数据访问,适合那些有着超大数据集(large data set)的应用程序。
HDFS的设计特点是:
1、大数据文件,非常适合上T级别的大文件或者一堆大数据文件的存储,如果文件只有几个G甚至更小就没啥意思了。
2、文件分块存储,HDFS会将一个完整的大文件平均分块存储到不同计算器上,它的意义在于读取文件时可以同时从多个主机取不同区块的文件,多主机读取比单主机读取效率要高得多得都。
3、流式数据访问,一次写入多次读写,这种模式跟传统文件不同,它不支持动态改变文件内容,而是要求让文件一次写入就不做变化,要变化也只能在文件末添加内容。
4、廉价硬件,HDFS可以应用在普通PC机上,这种机制能够让给一些公司用几十台廉价的计算机就可以撑起一个大数据集群。
5、硬件故障,HDFS认为所有计算机都可能会出问题,为了防止某个主机失效读取不到该主机的块文件,它将同一个文件块副本分配到其它某几个主机上,如果其中一台主机失效,可以迅速找另一块副本取文件。
HDFS的关键元素:
Block:将一个文件进行分块,通常是64M。
NameNode:保存整个文件系统的目录信息、文件信息及分块信息,这是由唯一一台主机专门保存,当然这台主机如果出错,NameNode就失效了。在Hadoop2.*开始支持activity-standy模式----如果主NameNode失效,启动备用主机运行NameNode。
DataNode:分布在廉价的计算机上,用于存储Block块文件。
MapReduce
通俗说MapReduce是一套从海量·源数据提取分析元素最后返回结果集的编程模型,将文件分布式存储到硬盘是第一步,而从海量数据中提取分析我们需要的内容就是MapReduce做的事了。
下面以一个计算海量数据最大值为例:一个银行有上亿储户,银行希望找到存储金额最高的金额是多少,按照传统的计算方式,我们会这样:
JAVA代码
Long moneys[] ...
Long max = 0L;
for(int i=0;i<moneys.length;i++){
if(moneys[i]>max){
max = moneys[i];
}
}
如果计算的数组长度少的话,这样实现是不会有问题的,还是面对海量数据的时候就会有问题。
MapReduce会这样做:首先数字是分布存储在不同块中的,以某几个块为一个Map,计算出Map中最大的值,然后将每个Map中的最大值做Reduce操作,Reduce再取最大值给用户。
MapReduce的基本原理就是:将大的数据分析分成小块逐个分析,最后再将提取出来的数据汇总分析,最终获得我们想要的内容。当然怎么分块分析,怎么做Reduce操作非常复杂,Hadoop已经提供了数据分析的实现,我们只需要编写简单的需求命令即可达成我们想要的数据。
总结
总的来说Hadoop适合应用于大数据存储和大数据分析的应用,适合于服务器几千台到几万台的集群运行,支持PB级的存储容量。
Hadoop典型应用有:搜索、日志处理、推荐系统、数据分析、视频图像分析、数据保存等。
但要知道,Hadoop的使用范围远小于SQL或Python之类的脚本语言,所以不要盲目使用Hadoop,看完这篇试读文章,我知道Hadoop不适用于我们的项目。不过Hadoop作为大数据的热门词,我觉得一个狂热的编程爱好者值得去学习了解,或许你下一个归宿就需要Hadoop人才,不是吗。
3、下载&安装
Hadoop基于Java的,因此,必须先装JDK,如何安装,自行查找教程。
1)下载
http://hadoop.Apache.org/ hadoop官网
点击download找到最新版软件进行下载:https://www.apache.org/dyn/closer.cgi/hadoop/common/hadoop-3.1.2/hadoop-3.1.2.tar.gz
上面的连接可能版本不全,最全的版本在这https://mirrors.tuna.tsinghua.edu.cn/apache/hadoop/common/,清华大学的下载源。
选择镜像站点进行下载,大概300多M这样。
解压到某一路径,不含中文以及特殊符号:如 D:1_Program_File,解压过程可能报某些错:
无视之。
三、配置环境变量
添加HADOOP_HOME配置:自己安装hadoop路径,我的是D:hadoop-3.0.3
在Path中添加如下:自己安装hadoop路径/bin,如:D:/hadoop-3.0.3/bin
四、hadoop需要jdk支持,jdk路径不能有空格,如有空格,可以这样,如:”D:Program Files"Javajdk1.8.0_25
五、hadoop路径下创建data用于数据存储,再在data下创建datanode目录和namenode目录
六、hadoop配置
四个hadoop路径/etc/hadoop/core-site.xml,etc/hadoop/mapred-site.xml,etc/hadoop/hdfs-site.xml,etc/hadoop/yarn-site.xml
1.core-site.xml
<configuration> <property> <name>fs.default.name</name> <value>hdfs://localhost:9000</value> </property> </configuration> myeclipse上配置hadoop时,localhost需写成自己的IP
2.mapred-site.xml
<configuration> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> </configuration>
3.hdfs-site.xml(先创建路径data/snn、data/namenode、data/datanode)
<configuration> <!-- 这个参数设置为1,因为是单机版hadoop --> <property> <name>dfs.replication</name> <value>1</value> </property> <property> <name>dfs.permissions</name> <value>false</value> </property> <property> <name>dfs.namenode.name.dir</name> <value>/D:/1_Program_File/hadoop-3.0.2/data/namenode</value> </property> <property> <name>fs.checkpoint.dir</name> <value>/D:/1_Program_File/hadoop-3.0.2/data/snn</value> </property> <property> <name>fs.checkpoint.edits.dir</name> <value>/D:/1_Program_File/hadoop-3.0.2/data/snn</value> </property> <property> <name>dfs.datanode.data.dir</name> <value>/D:/1_Program_File/hadoop-3.0.2/data/datanode</value> </property> </configuration>
4.yarn-site.xml
<configuration> <!-- Site specific YARN configuration properties --> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <name>yarn.nodemanager.auxservices.mapreduce.shuffle.class</name> <value>org.apache.hadoop.mapred.ShuffleHandler</value> </property> </configuration>
七、修改D:/hadoop-3.0.3/etc/hadoop/hadoop-env.cmd配置,找到"set JAVA_HOME=%JAVA_HOME%"替换为"set JAVA_HOME="D:Program Files"Javajdk1.8.0_25"
八、winutils中对应的hadoop版本中的bin替换自己hadoop安装目录下的bin
找到对应的版本下的bin替换hadoop中的bin,配置完成!
九、启动服务
1.cmd中,D:hadoop-3.0.3bin> hdfs namenode -format
执行后,data下的namenode和datanode下会有current等文件,我当时安装的是hadoop3.1.1,用的winutils中的hadoop3.0.0,datanode总是没有启动没有数据,换成hadoop3.0.3,使用wintuils的hadoop3.0.0后,就可以了。
2.D:hadoop-3.0.3sbin启动start-all.cmd服务,会看到
Hadoop Namenode
Hadoop datanode
YARN Resourc Manager
如果报错:
2019-08-03 11:54:23,239 ERROR namenode.NameNode: Failed to start namenode. java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$windows.a ccess0(Ljava/lang/String;I)Z at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method) at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:6 06) at org.apache.hadoop.fs.FileUtil.canWrite(FileUtil.java:971) at org.apache.hadoop.hdfs.server.common.Storage$StorageDirectory.analyze Storage(Storage.java:614) at org.apache.hadoop.hdfs.server.common.Storage$StorageDirectory.analyze Storage(Storage.java:574)
1、系统环境变量配置HADOOP_HOME ,并且添加进path 变量里;
2、HADOOP_HOMEbin 里是否有hadoop.dll 和 winutils.exe 这两个文件
3、C: windowsSystem32 里是否有hadoop.dll 文件 ,记得重启电脑噢!!!
如果出现其他错误比如version %1等,请确认winutils的是32位还是64位版本,是否与你电脑匹配。
十、HDFS应用
1、通过http://127.0.0.1:8088/即可查看集群所有节点状态:
2、访问http://localhost:9870/即可查看文件管理页面:
a.进入文件系统
b.创建目录
c.上传成功
注:在之前的版本中文件管理的端口是50070,在3.0.0中替换为了9870端口
d.使用hadoop命令进行文件操作
mkdir命令创建目录:hadoop fs -mkdir hdfs://ip:9000/user
put命令上传文件:hadoop fs -put D:/a.txt hdfs://ip:9000/user/
ls命令查看指定目录文件列表:hadoop fs -ls hdfs://ip:9000/user/
4、第一个程序
值得注意的是,配置的时候,需要给Hadoop权限才能正确执行。最简单的办法就是讲hadoop以及其目录下所有文件都归在一个组中。
chown -R hadoop:hadoop hadoop文件夹
就可以了。
配置完成之后,我们我们还需要什么?
1.需要在HDFS中保存有文件。
2.需要一个程序jar包,我们前面说过,JobTracker接收jar包就会分解job为mapTask和reduceTask。mapTask会读取HDFS中的文件来执行。
我们来看目标。
我们输入两个文件,file1和file2。交给hadoop执行之后,会返回file1和file2文件中的单词的计数。
我们说过,hadoop返回的是<key,value>的键值对的形式。
所以结果如下:也就是把单词以及单词的个数返回
school 1
hello 3
world 2
...
所以我们首先创建两个文件:
file1和file2。
随便填点东西在里面,文件中的内容是用来计数。单词之间用空格分隔,当然这是不一定的,如何区分单词是在后面jar包中的map程序中分辨的。
我们写好了这两个文件之后,要将文件提交到HDFS中。如何提交呢?
提交之前,首先要确保hadoop已经运行起来了,查看jps可以看到hadoop的进程。
首先我们在hadoop的HDFS中创建一个文件夹。打开cmd,输入
hdfs dfs -mkdir /test
这样就可以在HDFS根目录下创建一个input_wordcount的文件夹。
其实Hadoop的HDFS命令行非常接近Shell,只需要使用hdfs dfs -后面写上shell命令就可以对应执操作HDFS文件系统了。例如:
hdfs dfs -ls /
查看根目录下的文件。
创建文件夹之后,我们就可以提交我们写的两个file文件。
hdfs dfs -put input/* /test
如果报错:
org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /input/file1.txt._COPYING_ could only be written to 0 of the 1 minReplication nodes. There a 0 datanode(s) running and no node(s) are excluded in this operation. at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:2099) at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.chooseTargetForNewBlock(FSDirWriteFileOp.java:287) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2658) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:866) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:550) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
使用命令查看报告:hadoop dfsadmin -report
发现磁盘都是空的,这个问题一般是由于使用hadoop namenode -format 格式化多次,导致spaceID不一致造成的,这语句可不能随便执行的,解决方法如下[1]:
stop-all.cmd hdfs namenode -format start-all.cmd
发现还是无法解决,然后看到日志窗口有如下异常:
java.io.IOException: Incompatible clusterIDs in D:tmphadoop-Administratordfs data: namenode clusterID = CID-636ec898-037d-4196-b096-3f53e7d172fb; datanode cl usterID = CID-9031b022-9e2e-4e46-9b46-855159d45f53 at org.apache.hadoop.hdfs.server.datanode.DataStorage.doTransition(DataS torage.java:719) at org.apache.hadoop.hdfs.server.datanode.DataStorage.loadStorageDirecto ry(DataStorage.java:284) at org.apache.hadoop.hdfs.server.datanode.DataStorage.loadDataStorage(Da taStorage.java:397) at org.apache.hadoop.hdfs.server.datanode.DataStorage.addStorageLocation
于是删掉 D:tmphadoop-Administrator下面的文件,重新执行方法[1]。问题还是没解决,查看data目录下面,竟然没有datanode文件夹!最后发现etc下面的文件没有按照上述流程配置好,将没配置好的xml文件按照上述流程配置好之后,清空data目录以及D:tmphadoop-Administrator,然后重新执行方法[1],问题解决:
这里我两个file文件都放在test目录下,所以直接使用正则表达式都提交上去即可,提交到根目录文件夹下。然后我们查看根目录,查看是否提交完成。
D:1_Program_Filehadoop-3.0.2sbin>hdfs dfs -ls / The filename, directory name, or volume label syntax is incorrect. Found 3 items -rw-r--r-- 1 Administrator supergroup 23 2019-08-03 13:08 /file1.txt -rw-r--r-- 1 Administrator supergroup 28 2019-08-03 13:08 /file2.txt drwxr-xr-x - Administrator supergroup 0 2019-08-03 13:10 /test
提交成功了。第一个要求完成了,接下来我们就需要一个程序jar包。
打开IDE或者myeclipse工具。创建一个java程序,我在这里创建一个maven项目。
首先我们需要导入依赖包:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.hadoop.demo</groupId> <artifactId>HadoopDemo</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>war</packaging> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.6.0</version> </dependency> </dependencies> <!-- fastjson --> </project>
然后我们创建一个WordCount类。
在这个类里,首先我们要创建一个Map方法,需要继承MApper类:
public static class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> { private final IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer token = new StringTokenizer(line); while (token.hasMoreTokens()) { word.set(token.nextToken()); context.write(word, one); } } }
Mapper<LongWritable, Text, Text, IntWritable>是什么意思呢?
前面两个类参数是输入,后面两个是输出。
也就是WordCOuntMap方法接收LongWritable,Text的参数,返回<Text, IntWriatable>键值对。
需要重写map方法,可以看到Context对象即为返回结果,内部其实是<Text, IntWriatable>键值对。
这里需要注意的是,value的值,value默认是一行数据,你文件中有多少行,map函数就会被调用多少次。
这我们就看懂了吧,首先拿到一行的数据,使用StringTokenizer根据空格分割字符串,得到token。遍历token并写入context中返回即可。
然后我们需要编写reduce方法:同样的,reduce方法继承reduce类。
public static class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } }
wordCountReduce方法接收<Text, IntWritable>键值对,将键值对组合起来,结果写入另外一个键值对中,返回即可。
其中最重要是重写reduce方法,同样的context也是返回的结果。
这里需要注意的是,reduce方法是什么时候调用的呢?是在所有mapTask都被执行完成之后,reduceTask启动了才调用。
所有reduce方法中接收到的是所有map返回的参数。所以我们简单的求和写入context中就可以了。
最后我们编写main方法作为入口,调用两个函数。
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(WordCount.class);
job.setJobName("wordcount");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(WordCountMap.class);
job.setReducerClass(WordCountReduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
这里我们主要是告诉JobTracker,告诉他去调用什么就可以了。
类都编写好了之后`,我们需要的是jar包,所以我们将程序打包为jar包。
拿到jar包之后,我们需要将jar包作为作业提交给Hadoop执行。怎么做呢?
hadoop jar WordCount.jar WordCount input_wordcount output_wordcount
hadoop jar WordCount.jar WordCount这里提交jar包,并且告诉主类在哪。
后面两个都是我们自定义的参数了。会在main中获取到,即输入参数为input_wordcount。输出参数为output_wordcount
执行完成之后可以看到。
hdfs dfs -ls
Found 2 items
drwxr-xr-x - haoye supergroup 0 2017-05-06 20:34 input_wordcount
drwxr-xr-x - haoye supergroup 0 2017-05-06 20:40 output_wordcount
hdfs dfs -ls output_wordcount
Found 2 items
-rw-r--r-- 3 haoye supergroup 0 2017-05-06 20:40 output_wordcount/_SUCCESS
-rw-r--r-- 3 haoye supergroup 83 2017-05-06 20:40 output_wordcount/part-r-00000
其中part-r-00000为结果文件。
我们可以查看它的内容
hdfs dfs -cat output_wordcount/part-r-00000
api 1
file 3
free 2
hadoop 7
hello 3
home 1
java 2
new 2
school 1
system 1
world 2
得到结果了吧。
对于hadoop来说,执行任务需要操作HDFS,需要job对应的jar包。而jar包中需要编写mapTask和ReduceTask对应的方法。交给jobTracker执行就可以了。十分的方便。