spark1.2.1实现读取hbase的数据后怎么实现实时查询

2024-12-25 17:21:24
推荐回答(1个)
回答1:

WordCountHbaseReaderMapper类继承了TableMapper抽象类,TableMapper类专门用于完成MapReduce中Map过程与Hbase表之间的操作。此时的map(ImmutableBytesWritablekey,Resultvalue,Contextcontext)方法,第一个参数key为Hbase表的rowkey主键,第二个参数value为key主键对应的记录集合,此处的map核心实现是遍历key主键对应的记录集合value,将其组合成一条记录通过contentx.write(key,value)填充到键值对中。详细源码请参考:WordCountHbaseReader\src\com\zonesion\hbase\WordCountHbaseReader.javapublicstaticclassWordCountHbaseReaderMapperextendsTableMapper{@Overrideprotectedvoidmap(ImmutableBytesWritablekey,Resultvalue,Contextcontext)throwsIOException,InterruptedException{StringBuffersb=newStringBuffer("");for(Entryentry:value.getFamilyMap("content".getBytes()).entrySet()){Stringstr=newString(entry.getValue());//将字节数组转换为String类型if(str!=null){sb.append(newString(entry.getKey()));sb.append(":");sb.append(str);}context.write(newText(key.get()),newText(newString(sb)));}}}3、Reducer函数实现此处的WordCountHbaseReaderReduce实现了直接输出Map输出的键值对,没有对其做任何处理。详细源码请参考:WordCountHbaseReader\src\com\zonesion\hbase\WordCountHbaseReader.javapublicstaticclassWordCountHbaseReaderReduceextendsReducer{privateTextresult=newText();@Overrideprotectedvoidreduce(Textkey,Iterablevalues,Contextcontext)throwsIOException,InterruptedException{for(Textval:values){result.set(val);context.write(key,result);}}}4、驱动函数实现与WordCount的驱动类不同,在Job配置的时候没有配置job.setMapperClass(),而是用以下方法执行Mapper类:TableMapReduceUtil.initTableMapperJob(tablename,scan,WordCountHbaseReaderMapper.class,Text.class,Text.class,job);该方法指明了在执行job的Map过程时,数据输入源是hbase的tablename表,通过扫描读入对象scan对表进行全表扫描,为Map过程提供数据源输入,通过WordCountHbaseReaderMapper.class执行Map过程,Map过程的输出key/value类型是Text.class与Text.class,最后一个参数是作业对象。特别注意:这里声明的是一个最简单的扫描读入对象scan,进行表扫描读取数据,其中scan可以配置参数,这里为了例子简单不再详述,用户可自行尝试。详细源码请参考:WordCountHbaseReader\src\com\zonesion\hbase\WordCountHbaseReader.javapublicstaticvoidmain(String[]args)throwsException{Stringtablename="wordcount";Configurationconf=HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum","Master");String[]otherArgs=newGenericOptionsParser(conf,args).getRemainingArgs();if(otherArgs.length!=1){System.err.println("Usage:WordCountHbaseReader");System.exit(2);}Jobjob=newJob(conf,"WordCountHbaseReader");job.setJarByClass(WordCountHbaseReader.class);//设置任务数据的输出路径;FileOutputFormat.setOutputPath(job,newPath(otherArgs[0]));job.setReducerClass(WordCountHbaseReaderReduce.class);Scanscan=newScan();TableMapReduceUtil.initTableMapperJob(tablename,scan,WordCountHbaseReaderMapper.class,Text.class,Text.class,job);//调用job.waitForCompletion(true)执行任务,执行成功后退出;System.exit(job.waitForCompletion(true)?0:1);}5、部署运行1)启动Hadoop集群和Hbase服务[hadoop@K-Master~]$start-dfs.sh#启动hadoopHDFS文件管理系统[hadoop@K-Master~]$start-mapred.sh#启动hadoopMapReduce分布式计算服务[hadoop@K-Master~]$start-hbase.sh#启动Hbase[hadoop@K-Master~]$jps#查看进程22003HMaster10611SecondaryNameNode22226Jps21938HQuorumPeer10709JobTracker22154HRegionServer20277Main10432NameNode