流式大数据计算实践(5)—-HBase使用&SpringBoot集成

一、前言

1、上文中我们搭建好了一套HBase集群环境,这一文我们学习一下HBase的基本操作和客户端API的使用

二、shell操作

先通过命令进入HBase的命令行操作

/work/soft/hbase-1.2.2/bin/hbase shell

1、建表

create 'test', 'cf'

(1)以上命令是建立一个test表,里面有一个列族cf

(2)与RDS不同,HBase的列不是必须的,当向列族中插入一个单元格数据时,才有了列

2、查看所有表

list

3、查看表属性

describe 'test'

4、增加列族

alter 'test', 'cf2'

5、插入数据

put 'test', 'row1', 'cf:name', 'jack'

(1)命令解释:向test表中的row1行插入列族cf,列名name的数据jack

6、查询行数据

scan 'test', {STARTROW => 'row3'}
scan 'test', {ENDROW => 'row4'}

(1)命令解释:查找test表中rowkey大于等于row3的数据

(2)命令解释:查找test表中rowkey小于row4的数据(不包括row4)

7、查询单元格数据

get 'test', 'row7', 'cf:name'

8、删除数据

delete 'test', 'row4', 'cf:name'

(1)命令解释:删除test表中row4行的cf:name列的单元格数据

三、客户端API

正常开发中操作HBase多数情况下通过客户端API操作,我们这里使用Java来操作,jdk要求至少1.7以上,编译器我这里用的是IntelliJ IDEA

(1)新建一个maven工程

(2)打开pom文件,引入HBase的依赖

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>2.1.1</version>
</dependency>

(3)将HBase的相关配置文件引入到我们的maven项目中,拷贝HBase目录下的hbase-site.xml和Hadoop目录下的core-site.xml,将两个文件复制到src/main/resources目录下

(4)记得将前文中虚拟机的IP和hostname映射配置到写代码这台机器的hosts文件中(比如win7的hosts目录为C:\Windows\System32\drivers\etc)

(5)新建一个类,编写CRUD的示例代码,下面代码用了jdk1.7的一个语法糖:try-with-resources,在try()里面声明的对象,会自动帮你调用对象的close方法来关闭对象,不用手动调用close(),非常方便

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.net.URISyntaxException;

public class HelloHBase
{
    public static void main(String[] args) throws URISyntaxException
    {
        // 加载HBase的配置
        Configuration configuration = HBaseConfiguration.create();

        // 读取配置文件
        configuration.addResource(new Path(ClassLoader.getSystemResource("hbase-site.xml").toURI()));
        configuration.addResource(new Path(ClassLoader.getSystemResource("core-site.xml").toURI()));

        try (// 创建一个HBase连接
             Connection connection = ConnectionFactory.createConnection(configuration);
             // 获得执行操作的管理接口
             Admin admin = connection.getAdmin();)
        {
            // 新建一个表名为mytable的表
            TableName tableName = TableName.valueOf("mytable");
            HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);

            // 新建一个列族名为mycf的列族
            HColumnDescriptor mycf = new HColumnDescriptor("mycf");
            // 将列族添加到表中
            tableDescriptor.addFamily(mycf);
            // 执行建表操作
            createOrOverwrite(admin, tableDescriptor);

            // 设置列族的压缩方式为GZ
            mycf.setCompactionCompressionType(Compression.Algorithm.GZ);
            // 设置最大版本数量(ALL_VERSIONS实际上就是Integer.MAX_VALUE)
            mycf.setMaxVersions(HConstants.ALL_VERSIONS);
            // 列族更新到表中
            tableDescriptor.modifyFamily(mycf);
            // 执行更新操作
            admin.modifyTable(tableName, tableDescriptor);

            // 新增一个列族
            HColumnDescriptor hColumnDescriptor = new HColumnDescriptor("newcf");
            hColumnDescriptor.setCompactionCompressionType(Compression.Algorithm.GZ);
            hColumnDescriptor.setMaxVersions(HConstants.ALL_VERSIONS);
            // 执行新增操作
            admin.addColumnFamily(tableName, hColumnDescriptor);

            // 获取表对象
            Table table = connection.getTable(tableName);

            // 创建一个put请求,用于添加数据或者更新数据
            Put put = new Put(Bytes.toBytes("row1"));
            put.addColumn(Bytes.toBytes("mycf"), Bytes.toBytes("name"), Bytes.toBytes("jack"));
            table.put(put);

            // 创建一个append请求,用于在数据后面添加内容
            Append append = new Append(Bytes.toBytes("row1"));
            append.add(Bytes.toBytes("mycf"), Bytes.toBytes("name"), Bytes.toBytes("son"));
            table.append(append);

            // 创建一个long类型的列
            Put put1 = new Put(Bytes.toBytes("row2"));
            put1.addColumn(Bytes.toBytes("mycf"), Bytes.toBytes("age"), Bytes.toBytes(6L));
            // 创建一个增值请求,将值增加10L
            Increment increment = new Increment(Bytes.toBytes("row2"));
            increment.addColumn(Bytes.toBytes("mycf"), Bytes.toBytes("age"), 10L);
            table.increment(increment);

            // 创建一个查询请求,查询一行数据
            Get get = new Get(Bytes.toBytes("row1"));
            // 由于HBase的一行可能非常大,所以限定要取出的列族
            get.addFamily(Bytes.toBytes("mycf"));
            // 创建一个结果请求
            Result result = table.get(get);
            // 从查询结果中取出name列,然后打印(这里默认取最新版本的值,如果要取其他版本要使用Cell对象)
            byte[] name = result.getValue(Bytes.toBytes("mycf"), Bytes.toBytes("name"));
            System.out.println(Bytes.toString(name));

            // 创建一个扫描请求,查询多行数据
            Scan scan = new Scan(Bytes.toBytes("row1"));
            // 设置扫描器的缓存数量,遍历数据时不用发多次请求,默认100,适当的缓存可以提高性能
            scan.setCaching(150);
            // 创建扫描结果,这个时候不会真正从HBase查询数据,下面的遍历才是去查询
            ResultScanner resultScanner = table.getScanner(scan);
            for (Result r : resultScanner)
            {
                String data = Bytes.toString(r.getValue(Bytes.toBytes("mycf"), Bytes.toBytes("name")));
                System.out.println(data);
            }
            // 使用完毕关闭
            resultScanner.close();

            // 创建一个删除请求
            Delete delete = new Delete(Bytes.toBytes("row2"));
            // 可以自定义一些筛选条件
            delete.addFamily(Bytes.toBytes("age"));
            table.delete(delete);

            // 停用表
            admin.disableTable(tableName);
            // 删除列族
            admin.deleteColumnFamily(tableName, "mycf".getBytes());
            // 删除表
            admin.deleteTable(tableName);
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
        System.out.println("ok");
    }

    public static void createOrOverwrite(Admin admin, HTableDescriptor table) throws IOException
    {
        // 获取table名
        TableName tableName = table.getTableName();
        // 判断table是否存在,如果存在则先停用并删除
        if (admin.tableExists(tableName))
        {
            // 停用表
            admin.disableTable(tableName);
            // 删除表
            admin.deleteTable(tableName);
        }
        // 创建表
        admin.createTable(table);
    }
}

 四、API的高级用法

上一章介绍了API的基本使用方法,这一章总结一些高级用法

1、过滤器:通过get或者scan查找数据时,经常需要加入一些条件来查找

(1)值过滤器:相当于传统sql的where column like ‘%jack%’,但是会对所有的列都做过滤,如果需要对单个列过滤,可以使用SingleColumnValueFilter,如果需要查询值相等的过滤器,可以使用BinaryComparator

CompareFilter filter = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("jack"));
scan.setFilter(filter);

(2)分页过滤器:相当于传统sql的limit,但是不能指定起始页码,所以需要自己记录最后一个row key,并通过scan.setStartRow()设置,在做分页时有个小技巧,如果你通过scan.setStartRow()设置最后一个row key时,下一页的数据依然会包含上一页的最后一个数据,所以你可以将最后一个row key的末尾加一个0,就可以不包含最后一个数据了,因为row key是按照字典顺序排序的

Filter filter1 = new PageFilter(10L);
scan.setFilter(filter1);

(3)过滤器列表:用于组合多个过滤器,实现复杂一些的查询场景,注意这个过滤器列表是有顺序的,FilterList的第一个参数用来指定多个条件的连接方式(and、or),MUST_PASS_ALL相当于and连接,MUST_PASS_ONE相当于or连接

FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
filterList.addFilter(filter);
filterList.addFilter(filter1);
scan.setFilter(filterList);

(4)还有一些其他的过滤器,使用方法大同小异,比如行键过滤器、列过滤器、单元格过滤器,甚至可以自定义过滤器,其他高级用法可以等用到再看

五、SpringBoot集成

1、后台我们用SpringCloud的微服务搭建,本章是用SpringBoot集成HBase环境,SpringBoot项目的搭建非常简单

2、首先将HBase的相关配置文件引入到我们已经搭建好的的SpringBoot项目中,拷贝HBase目录下的hbase-site.xml和Hadoop目录下的core-site.xml、hdfs-site.xml,将三个文件复制到src/main/resources目录下

3、编写一个Java配置文件来集成HBase环境

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.hadoop.hbase.HbaseTemplate;

import java.net.URISyntaxException;

@Configuration
public class HBaseConfig
{
    @Bean
    public HbaseTemplate hbaseTemplate()
    {
        HbaseTemplate hbaseTemplate = new HbaseTemplate();
        hbaseTemplate.setConfiguration(getConfiguration());
        hbaseTemplate.setAutoFlush(true);
        return hbaseTemplate;
    }

    private org.apache.hadoop.conf.Configuration getConfiguration()
    {
        try
        {
            org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();
            configuration.addResource(new Path(ClassLoader.getSystemResource("hdfs-site.xml").toURI()));
            configuration.addResource(new Path(ClassLoader.getSystemResource("core-site.xml").toURI()));
            configuration.addResource(new Path(ClassLoader.getSystemResource("hbase-site.xml").toURI()));
            return configuration;
        }
        catch (URISyntaxException e)
        {
            e.printStackTrace();
        }
        return null;
    }
}

4、上面的配置中将HBase操作的对象注入到Spring中,所以当我们需要操作HBase时,直接使用HbaseTemplate即可,下例中是将一条数据插入到HBase中,可以看出HbaseTemplate高度封装了CRUD,使用起来更加简单方便

@Service
public class HBaseService implements IHBaseService
{
    @Autowired
    private HbaseTemplate hbaseTemplate;

    @Override
    public void saveDeviceHeartbeat(String uuid, JSONObject heartObject)
    {
        hbaseTemplate.put("mytable", "row1", "mycf", "uuid", Bytes.toBytes(uuid));
    }
}