2012/10/26

Hadoop + HiveからExcelへの帳票出力 (How to fetch data from Hadoop via Hive)


今回はHadoopおよびHiveを用いて抽出したデータを、Apache POIを使用してExcelシート上に出力します。

HiveはHiveQLというSQLに近い言語で開発が可能であり、JDBCドライバも提供されています。
過去の記事(Apache POI によるエクセルファイルの出力 その1)のソースコードを流用することで、極めて簡単にHadoopとExcelを連携させることができます。

Figure 1: Overall image

【バージョン】
HadoopおよびHiveはそれぞれ現時点で入手可能な最新の安定版を使用します。
  • Hadoop: Release 1.0.4 (12 October, 2012)
  • Hive:   Release 0.9.0 (30 April, 2012)
※Hive 0.9.0から、Hadoop1.xでの動作をサポートした様です。

また、HiveのJDBCドライバ(${HIVE_HOME}/lib/hive-jdbc-0.9.0.jar)がHadoop 1.xのAPIと不整合を起こしている様ですので、Hadoop 0.1xのhadoop-core-0.19.1.jarも併せて使用します。


【前準備】
サンプルプログラムのプロジェクトに、$HIVE_HOME/lib 配下のjarファイル、およびhadoop-core-0.19.1.jarを参照設定します。
※HiveのJDBCドライバ(hive-jdbc-0.9.0.jar)の依存するjarを特定していない為、今回は全てのjarを参照する事としています。

【実装】
ソースコードの変更は、DBコネクション生成に関わる部分のみです。


List 1: Main logic


package util;

import java.io.*;
import java.util.Iterator;
import java.sql.*;
import org.apache.poi.xssf.usermodel.*;
import org.apache.poi.ss.usermodel.*;


public class Ora2Excel {

  // db objects
  private Connection con;
  private Statement  stm;
  
  // poi objects
  private Workbook wbk;
  
  public Ora2Excel() throws SQLException {
    DriverManager.registerDriver(new org.apache.hadoop.hive.jdbc.HiveDriver());
  }

  public void openDb(String userId, String password, String connString) throws SQLException {
    con = DriverManager.getConnection(connString, userId, password);
    stm = con.createStatement();
  }
  
  public void closeDb() throws SQLException {
    stm.close();
    con.close();
  }
  
  public void openBook(String fileName) throws IOException {
    wbk = new XSSFWorkbook(fileName);
  }
  
  public void saveBook(String fileName) throws IOException {
    FileOutputStream out = new FileOutputStream(fileName);
    wbk.write(out);
    out.close();
  }
  
  public void closeBook() {
    wbk = null;
  }
  
  public void extract(String sheetName, String sql) throws SQLException {
    Sheet wsh = wbk.getSheet(sheetName);
    ResultSet rst = stm.executeQuery(sql);
    int colCount = rst.getMetaData().getColumnCount();
    
    // determine the start position: search "$start"
    int rowStart = 0;
    int colStart = 0;
    Iterator<Row> iRow = wsh.rowIterator();
    while (iRow.hasNext() && rowStart + colStart == 0) {
      Row currentRow = (Row) iRow.next();
      Iterator<Cell> iCol = currentRow.cellIterator();
      while (iCol.hasNext()) {
        Cell currentCell = (Cell) iCol.next();
        if (currentCell.getCellType() == Cell.CELL_TYPE_STRING  && currentCell.getStringCellValue().trim().equalsIgnoreCase("$start")) {
          rowStart = currentCell.getRowIndex();
          colStart = currentCell.getColumnIndex();
          break;
        }
      }
    }
    
    // get "template row"
    Row templateRow = wsh.getRow(rowStart);
    
    // set cell values
    int idxRow = rowStart;
    while (rst.next()) {
      wsh.shiftRows(idxRow, wsh.getLastRowNum()+1, 1);
      
      Row r = wsh.createRow(idxRow);
      for (int idxCol = templateRow.getFirstCellNum(); idxCol < templateRow.getLastCellNum(); idxCol++) {
        Cell c = r.createCell(idxCol);
        
        if (idxCol >= colStart && idxCol - colStart < colCount) {
          int idxDbCol = idxCol-colStart + 1;
          switch(rst.getMetaData().getColumnType(idxDbCol)){
          case Types.NUMERIC:
            c.setCellValue(rst.getDouble(idxDbCol));
            break;
          case Types.DATE:
              c.setCellValue(rst.getDate(idxDbCol));
            break;
          case Types.TIMESTAMP:
              c.setCellValue(rst.getDate(idxDbCol));
            break;
          default:
            c.setCellValue(rst.getString(idxDbCol));
          }
        } else if (templateRow.getCell(idxCol).getCellType() == Cell.CELL_TYPE_FORMULA){
          c.setCellFormula(templateRow.getCell(idxCol).getCellFormula());
        } else if (templateRow.getCell(idxCol).getCellType() == Cell.CELL_TYPE_NUMERIC){
          c.setCellValue(templateRow.getCell(idxCol).getNumericCellValue());
        } else if (templateRow.getCell(idxCol).getCellType() == Cell.CELL_TYPE_STRING){
          c.setCellValue(templateRow.getCell(idxCol).getStringCellValue());
        } 
        c.setCellStyle(templateRow.getCell(idxCol).getCellStyle());
        
      }
      idxRow++;
    }
    rst.close();
    
    // remove the template row.
    wsh.removeRow(templateRow);
    wsh.shiftRows(idxRow,  wsh.getLastRowNum()+1, -1);
    
    // calculate formula cells
    XSSFFormulaEvaluator.evaluateAllFormulaCells((XSSFWorkbook) wbk);
  }
}

【テスト用ソース(呼び出し部)】
上記クラスを呼び出す側の例を以下に示します。


List 2: Calling Main Logic


import util.Ora2Excel;

public class Test1 {

  public static void main(String[] args) throws Exception {
    Ora2Excel o2x = new Ora2Excel();
    
    o2x.openDb("", "", "jdbc:hive://HiveServer:10000/default");
    
    o2x.openBook("c:\\template.xlsx");
    o2x.extract("Sheet1", "select deptno, empno, ename, job, mgr, hiredate, sal, comm from emp order by deptno, empno");
    o2x.saveBook("c:\\result.xlsx");
    o2x.closeBook();
    
    o2x.closeDb();
  }
}



HiveのJDBCドライバはまだ開発途上ではありますが、Hadoopから帳票を出力する際には極めて効率的なツールであると言えます。

参考URL:
https://cwiki.apache.org/Hive/hiveclient.html


[Summary]
Hive provides JDBC driver.  List 1 shows how to get data from Hadoop via Hive.

The version of Hadoop and Hive are:
  • Hadoop: Release 1.0.4 (12 October, 2012)
  • Hive:   Release 0.9.0 (30 April, 2012)
${HIVE_HOME}/lib/hive-jdbc-0.9.0.jar needs other jar files under ${HIVE_HOME}/lib.
Since it needs org/apache/hadoop/io/Writable that is not included in Hive 0.9.0, this time I bring hadoop-core-0.19.1.jar from another project.
To run the source code, you have to add these jar files above as External JAR.

Please see Export to Excel file via Apache POI, Part 1 for your reference.

2012/10/11

update文と副問い合わせ (UPDATE with sub query)


見逃されがちなUPDATE文の構文について説明します。


以下のSQLは、同一の表 sales に副問合せで3回アクセスしています。

List 1:

update customers cst
   set last_purchase_date = (select max(time_id) from sales where cust_id = cst.cust_id),
       total_amount       = (select sum(amount_sold) from sales where cust_id = cst.cust_id),
       annual_amount      = (select sum(case when time_id > to_date('20000101','yyyymmdd') then amount_sold else 0 end) from sales where cust_id = cst.cust_id)
 where cust_id = 2


call     count       cpu    elapsed       disk      query    current        rows
------- ------  -------- ---------- ---------- ---------- ----------  ----------
Parse        1      0.00       0.00          0          0          0           0
Execute      1      0.00       0.00          0        389          2           1
Fetch        0      0.00       0.00          0          0          0           0
------- ------  -------- ---------- ---------- ---------- ----------  ----------
total        2      0.00       0.00          0        389          2           1


Rows (1st) Rows (avg) Rows (max)  Row Source Operation
---------- ---------- ----------  ---------------------------------------------------
         0          0          0  UPDATE  CUSTOMERS (cr=389 pr=0 pw=0 time=2019 us)
         1          1          1   INDEX UNIQUE SCAN CUSTOMERS_PK (cr=2 pr=0 pw=0 time=20 us cost=1 size=40 card=1)(object id 76070)
         1          1          1   SORT AGGREGATE (cr=129 pr=0 pw=0 time=854 us)
       176        176        176    TABLE ACCESS BY GLOBAL INDEX ROWID SALES PARTITION: ROW LOCATION ROW LOCATION (cr=129 pr=0 pw=0 time=829 us cost=90 size=1690 card=130)
       176        176        176     INDEX RANGE SCAN SALES_CUST_IX (cr=3 pr=0 pw=0 time=21 us cost=3 size=0 card=130)(object id 76319)
         1          1          1   SORT AGGREGATE (cr=129 pr=0 pw=0 time=421 us)
       176        176        176    TABLE ACCESS BY GLOBAL INDEX ROWID SALES PARTITION: ROW LOCATION ROW LOCATION (cr=129 pr=0 pw=0 time=282 us cost=90 size=1300 card=130)
       176        176        176     INDEX RANGE SCAN SALES_CUST_IX (cr=3 pr=0 pw=0 time=10 us cost=3 size=0 card=130)(object id 76319)
         1          1          1   SORT AGGREGATE (cr=129 pr=0 pw=0 time=567 us)
       176        176        176    TABLE ACCESS BY GLOBAL INDEX ROWID SALES PARTITION: ROW LOCATION ROW LOCATION (cr=129 pr=0 pw=0 time=550 us cost=90 size=1300 card=130)
       176        176        176     INDEX RANGE SCAN SALES_CUST_IX (cr=3 pr=0 pw=0 time=192 us cost=3 size=0 card=130)(object id 76319)



この様なupdate文については、以下の構文への変換が効果的です。

List 2:

update customers cst
   set (last_purchase_date, total_amount, annual_amount)
       =
       (select max(time_id),
               sum(amount_sold),
               sum(case when time_id > to_date('20000101','yyyymmdd') then amount_sold else 0 end)
          from sales
         where cust_id = cst.cust_id)
 where cust_id = 2


call     count       cpu    elapsed       disk      query    current        rows
------- ------  -------- ---------- ---------- ---------- ----------  ----------
Parse        1      0.00       0.00          0          0          0           0
Execute      1      0.00       0.00          0        131          2           1
Fetch        0      0.00       0.00          0          0          0           0
------- ------  -------- ---------- ---------- ---------- ----------  ----------
total        2      0.00       0.00          0        131          2           1



Rows (1st) Rows (avg) Rows (max)  Row Source Operation
---------- ---------- ----------  ---------------------------------------------------
         0          0          0  UPDATE  CUSTOMERS (cr=131 pr=0 pw=0 time=672 us)
         1          1          1   INDEX UNIQUE SCAN CUSTOMERS_PK (cr=2 pr=0 pw=0 time=11 us cost=1 size=40 card=1)(object id 76070)
         1          1          1   SORT AGGREGATE (cr=129 pr=0 pw=0 time=604 us)
       176        176        176    TABLE ACCESS BY GLOBAL INDEX ROWID SALES PARTITION: ROW LOCATION ROW LOCATION (cr=129 pr=0 pw=0 time=725 us cost=90 size=2340 card=130)
       176        176        176     INDEX RANGE SCAN SALES_CUST_IX (cr=3 pr=0 pw=0 time=12 us cost=3 size=0 card=130)(object id 76319)


実行計画からも、処理が改善された事が分かります。
sales 表への読み込みが3回→1回に減ったことで、論理読み込みも低減しています。

最近の潮流ではExadata等、足回り(H/W)の強化でボトルネックを解消する事が多くなっていますが、ミッションクリティカルなシステムでは、この様に処理そのものの改善で対応する事が先決です。


[summary]
List 1 shows bad UPDATE statement.  It accesses sales table three times.
To reduce consistent read access, it should be written like List 2.