概要

ScalaからHBaseのテーブル追加、ファミリー追加、検索、行追加などをしてみる。

@CretedDate 2012/03/24
@Versions CDH3u3, HBase0.90.4

前提条件

1. HBaseが起動していること

2. 以下のjarがクラスパスに入っていること(バージョンは適宜読み替え)
・hbase-0.90.4-cdh3u3.jar
・hadoop-0.20.2-cdh3u3.jar
・commons-logging-1.0.4.jar
・log4j-1.2.15.jar

利用するクラス

主に利用するクラスは以下の2つ

1. HBaseAdmin
http://hbase.apache.org/docs/current/api/org/apache/hadoop/hbase/client/HBaseAdmin.html

2. HTable
http://hbase.apache.org/docs/current/api/org/apache/hadoop/hbase/client/HTable.html

HBaseAdminはテーブルの生成、削除、Disable、Enable、Familyの追加、テーブル一覧の取得、サーバの状態確認や終了処理など、管理っぽい処理を行う。SQLで言うと、CREATEやDROP、ALTER TABLEあたり。

HTableは1行検索、範囲検索、行の追加、削除などを行う。SQLで言うと、SELECT、INSERT、DELETEあたり。

HBaseAdminを使ってみる

まずはHBaseAdminの初期化。hbase-site.xmlを読み込む場合は下記のような感じ。

//  Configurationにhbase-site.xmlのパスを指定する
val conf = HBaseConfiguration.create()
conf.addResource( new Path( "conf/hbase-site.xml" ) )

val admin = new HBaseAdmin(conf)

リモートのHBaseに繋ぐ場合は、下記のような感じ。

// Configurationに対象サーバを指定
val conf = HBaseConfiguration.create()
conf.set( "hbase.zookeeper.quorum", "nezumi" );
conf.set( "hbase.zookeeper.property.clientPort", "2181" );

val admin = new HBaseAdmin( conf )

テーブルを作成する。tableNameというテーブル名で、2つのFamilyを持つテーブルを生成してみる。

val descriptor = new HTableDescriptor( "tableName" )
descriptor.addFamily( new HColumnDescriptor( "family1" ) )
descriptor.addFamily( new HColumnDescriptor( "family2" ) )
admin.createTable( descriptor )

テーブルを削除する。削除はdeleteTableで行う。けど、その前にdisableTableでテーブル操作を無効状態にしておかないとエラーになる。

admin.disableTable( "tableName" )
admin.deleteTable( "tableName" )

disableTableでテーブルを無効(takes it off-line)にする

admin.disableTable( "tableName" )

enableTableで無効になったテーブルを有効(on-line)にする。

admin.enableTable( "tableName" )

listTablesでテーブルの一覧を取得する。戻り値はCREATEでも使った、HTableDescriptorというテーブルの情報を保持するクラスの配列で返る。

for( descriptor <- admin.listTables )
  println( descriptor.getNameAsString )

テーブルを再作成してみる。getTableDescriptorでテーブル情報を取得し、それを利用してテーブルを削除後に再作成。

val descriptor = admin.getTableDescriptor( Bytes.toBytes( "tableName" ) )
admin.disableTable( "tableName" )
admin.deleteTable( "tableName" )
admin.createTable( descriptor )

tableExistsでテーブルの存在チェック。存在すればtrueが、存在しなければfalseが返る。

hAdmin.tableExists("tableName")

addColumnカラム追加をする。追加する際はテーブルのdisableが必要。

val column = new HColumnDescriptor( "family3" )
admin.disableTable( "tableName" )
admin.addColumn( "tableName", column )
admin.enableTable( "tableName" )

deleteColumnカラムの削除。削除する際はテーブルのdisableが必要。

admin.disableTable( "tableName" )
admin.deleteColumn( "tableName", "family3" )
admin.enableTable( "tableName" )

この他にもサーバのshutdownとか、flushとかcompactなんかも実行できる。でも、compactとmajorCompactの差がイマイチわかってない。いつかちゃんと勉強する。

Byte配列への変換

HBaseに値を渡す時はByteの配列に変換してから渡す。変換にはorg.apache.hadoop.hbase.util.Bytesを使う。

// Bytes.toBytesでArray[Byte]に変換される
val bytes = Bytes.toBytes( "abc" )
bytes foreach println
  //=> 97\n98\n99

// 元に戻す時はBytes.toString
println( Bytes.toString( bytes ) )
  //=> abc

String以外にも、BigDecimal, Char, Array[Char], Double, Float, Int, Long, Shortの変換ができる。
http://hbase.apache.org/docs/current/api/org/apache/hadoop/hbase/util/Bytes.html

毎度変換するのは面倒なので、下記のような暗黙の型変換を定義しておくとそこはかとなく便利。

implicit def boolean2hbaseBytes( value: Boolean ) = Bytes.toBytes( value )
implicit def double2hbaseBytes( value: Double ) = Bytes.toBytes( value )
implicit def float2hbaseBytes( value: Float ) = Bytes.toBytes( value )
implicit def int2hbaseBytes( value: Int ) = Bytes.toBytes( value )
implicit def long2hbaseBytes( value: Long ) = Bytes.toBytes( value )
implicit def short2hbaseBytes( value: Short ) = Bytes.toBytes( value )
implicit def string2hbaseBytes( value: String ) = Bytes.toBytes( value )

implicit def hbaseBytes2boolean( value: Array[Byte] ) = Bytes.toBoolean( value )
implicit def hbaseBytes2double( value: Array[Byte] ) = Bytes.toDouble( value )
implicit def hbaseBytes2float( value: Array[Byte] ) = Bytes.toFloat( value )
implicit def hbaseBytes2int( value: Array[Byte] ) = Bytes.toInt( value )
implicit def hbaseBytes2long( value: Array[Byte] ) = Bytes.toLong( value )
implicit def hbaseBytes2short( value: Array[Byte] ) = Bytes.toShort( value )
implicit def hbaseBytes2string( value: Array[Byte] ) = Bytes.toString( value )

HTableを使ってみる

HBaseAdminと同じくHBaseConfigurationを引数に指定する。

val conf = HBaseConfiguration.create()
val table = new HTable( conf, "tableName" )

データの登録にはPutクラスとputメソッドを使う。

val put = new Put( Bytes.toBytes( "row" ) )
put.add( Bytes.toBytes( "family1" ), Bytes.toBytes( "qualifier" ), Bytes.toBytes( "value" ) )
table.put( put )

checkAndPutを使うと、指定したカラムが指定値の場合のみ、Putが実行される。指定した値でなかった場合はPutは実行されずfalseが返る。

val put = new Put(Bytes.toBytes("row"))
put.add(Bytes.toBytes("family1"), Bytes.toBytes("qualifierB"), Bytes.toBytes("valueB"))

// カラム(row/family1/qualifier)の値がvalueの場合のみ、putが実行される
table.checkAndPut( Byts.toBytes( "row" ), Byts.toBytes( "family1" ), Byts.toBytes( "qualifier" ), 
    Byts.toBytes( "value" ), put)

指定したカラムのデータを取得する場合は、Getクラスとgetメソッドを使う。

val result = table.get( new Get( Bytes.toBytes( "row" ) ) )
val value = result.getColumnLatest( Bytes.toBytes( "family1" ), Bytes.toBytes( "qualifier" ) )
println( Bytes.toString( value.getValue() ) )
  //=> value

incrementColumnValueで指定カラムのインクリメントができる。カラムの内容はLong型である必要があるらしい。データが存在しない場合は新規にレコードが登録される。下記の例では値が3インクリメントされる。

table.incrementColumnValue( Bytes.toBytes( "row" ), 
    Bytes.toBytes( "family1" ), Bytes.toBytes( "qualifier" ), 3L )

existsで行の存在チェックを行う。指定したRowが存在すればtrue、存在しなければfalse。

table.exists(new Get(Bytes.toBytes("row")))

deleteで指定したRowやカラムを削除する。

// Row全体を削除
val delete = new Delete( Bytes.toBytes( "row" ) )
table.delete( delete )

// 指定カラムを削除
val delete = new Delete( Bytes.toBytes( "row" ) )
delete.deleteFamily( Bytes.toBytes( "family" ) )
table.delete( delete )

scanを使うと範囲検索ができる。startRow〜endRowを範囲に指定する。startRowは指定した値自体が範囲に入る。endRowは指定した値自体は範囲に入らない。つまりstartに10、endに15を指定すると、10,11,12,13,14が範囲になる。

val scan = new Scan( Bytes.toBytes( "startRow" ), Bytes.toBytes( "endRow" )
val resultScanner = table.getScanner( scan )
for ( result <- resultScanner ) {
    for ( keyValue <- result.raw ) {
        println( Bytes.toString( result.getRow ) )
        println( Bytes.toString( keyValue.getFamily ) )
        println( Bytes.toString( keyValue.getQualifier ) )
        println( Bytes.toString( keyValue.getValue ) )
    }
}

flushCommitsかcloseを実行するとPutを実行した内容が反映される。autoCommitがtrueになってる場合は、自動的に実行される。

// コミットする
table.flushCommits

// ソースを見る限り、closeもflushCommitsと同じ処理
table.close

この他にもgetTableNameとか、setAutoFlushなどのメソッドもある。