トップ «前の日(11-11) 最新 次の日(11-13)» 追記

Masa's blog

検索キーワード:

2010年11月12日 Java儂(わし)的解釈によるメモ(データベース更新処理編)

_ Java儂(わし)的解釈によるメモ(データベース更新処理編)

マッチング処理編に続くデータベース更新編。

最初は軽いサンプルのつもりが、みるみる無駄に凝り出すCOBOLERオヤジがここに...。

SQLException.getSQLState()デッドロックを判断してrollbackした後に、commit済みのトランザクションは読み捨ててそこから自動的に再開みたいな芸当をしているが、DBMS非依存かどうかはチト微妙。

今回はスマートにまとまるかと思ったが結局ドロドロしてしまった。まぁ、ちょっとしたデータベースがらみ処理のスケルトンにできれば吉とするか。

動作検証はMicrosoft SQL Server 2008 Express Editionで行ったが、他のDBMSでもほんの一部の手直しで動くはず(?)。

TextFile.java

マッチング処理編で定義していたクラスと同じ物。

class TextFile { ... 省略 ... }

MasterRecord.java

マッチング処理編で定義していたクラスと同じ物。後述のテーブルクラスの親クラスとなる。

class MasterRecord { ... 省略 ... }

Database.java

JDBC経由でデータベースにアクセスする際に楽できるように作ったクラス。

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
//
// データベースクラス
//
public class Database {
	Connection conn;
	String driver;
	String url;
	String user;
	String password;
	long commit_interval;
	long commit_count;
	int isolation_level;
	//
	// コンストラクタ
	//
	Database(String driver, String url, String user, String password) {
		this.conn = null;
		this.driver = driver;
		this.url = url;
		this.user = user;
		this.password = password;

		commit_interval = 1;
		commit_count = 0;
		isolation_level = -1;
	}
	//
	// データベース接続
	//
	void open() throws Exception {
		//
		// JDBCドライバーのロード
		//
		Class.forName(driver).newInstance();
		//
		// データベースへの接続
		//
		conn = DriverManager.getConnection(url, user, password);
		//
		// トランザクションレベル設定(ACIDのIを完全保証)
		//
		setIsolationSerializable();
		//
		// 自動コミット停止
		//
		setAutoCommitOff();
	}
	//
	// データベース切断
	//
	void close() throws Exception {
		conn.close();
	}
	//
	// データベースハンドラー取得
	//
	Connection getHandle(){
		return conn;
	}
	//
	// トランザクションレベル設定
	//
	//   矛盾の発生しないレベル
	//
	//     TRANSACTION_SERIALIZABLE
	//
	//   その他のレベル(何らかの矛盾が発生する可能性有り)
	//
	//     TRANSACTION_REPEATABLE_READ
	//     TRANSACTION_READ_COMMITTED(JDBCの既定値)
	//     TRANSACTION_READ_UNCOMMITTED
	//
	//   対応するSQL命令の例
	//
	//     SET TRANSACTION ISOLATION LEVEL SERIALIZABLE
	//
	//   このコマンドの有効範囲はDBMSによってトランザクション単位であったり、
	//   接続(セッション)単位であったりまちまち。なので、小さい範囲の方のトラ
	//   ンザクションごとに発行するのが確実
	//
	//   多くのDBMS(MySQL, PostgreSQL, Oracle, Microsoft SQL Server)で確実に
	//   サポートされているのは
	//
	//     SERIALIZABLE
	//     READ COMMITTED
	//
	void setIsolation() throws Exception {
		conn.setTransactionIsolation(isolation_level);
	}
	//
	// トランザクションレベル設定(ACIDのIを完全保証)
	//
	void setIsolationSerializable() throws Exception {
		isolation_level = conn.TRANSACTION_SERIALIZABLE;
		setIsolation();
	}
	//
	// トランザクションレベル設定(phantom-read, non-repeatable-readが発生の可能性有り)
	//
	void setIsolationReadCommitted() throws Exception {
		isolation_level = conn.TRANSACTION_READ_COMMITTED;
		setIsolation();
	}
	//
	// 自動コミット開始
	//
	void setAutoCommitOn() throws Exception {
		conn.setAutoCommit(true);
	}
	//
	// 自動コミット停止
	//
	void setAutoCommitOff() throws Exception {
		conn.setAutoCommit(false);
	}
	//
	// コミット間隔設定
	//
	void setCommitInterval(long interval){
		commit_interval = interval;
	}
	//
	// コミット間隔リセット
	//
	void resetCommitInterval(){
		commit_interval = 1;
	}
	//
	// コミットカウンタリセット
	//
	void resetCommitCount(){
		commit_count = 0;
	}
	//
	// コミット
	//
	boolean commit() throws Exception {
		commit_count++;
		if ((commit_count % commit_interval) == 0){
			conn.commit();
			setIsolation();
			return true;
		}else{
			return false;
		}
	}
	//
	// ロールバック
	//
	void rollback() throws Exception {
		conn.rollback();
		setIsolation();
	}
	//
	// デッドロックの判定
	//
	//   SQLSTATEはSQL99にて定義されているとはいうもののDMBS方言もあるかも...
	//
	boolean is_deadlock(SQLException e){
		return is_deadlock(e.getSQLState());
	}
	boolean is_deadlock(String sqlstate){
		if ("40001".equals(sqlstate)){
			return true;
		}else{
			return false;
		}
	}
	//
	// 2重キーの判定
	//
	//   SQLSTATEはSQL99にて定義されているとはいうもののDMBS方言もあるかも...
	//
	//   最近(2014.03.xx)に気づいた事
       //     PostgreSQLでは2重キーを発生させた場合rollbackしないといけない :(
	//
	boolean is_dupkey(SQLException e){
		return is_dupkey(e.getSQLState());
	}
	boolean is_dupkey(String sqlstate){
		if ("23000".equals(sqlstate)){
			return true;
		}else{
			return false;
		}
	}
}

MasterTable.java

テーブルクラス定義。MasterRecordクラスを継承。

テーブルに関するアクセス処理は、基本的にこのクラス内のメソッドに閉じ込める。

import java.sql.Statement;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
//
// テーブルクラス
//
//   業務テーブルのレイアウト毎に色々と定義されるであろうクラス
//
class MasterTable extends MasterRecord {
	PreparedStatement psDt;
	PreparedStatement psCt;
	PreparedStatement psDi;
	PreparedStatement psCi;
	PreparedStatement psIns;
	PreparedStatement psDel;
	PreparedStatement psUpd;
	PreparedStatement psSelId;
	ResultSet rsSelId;
	//
	// コンストラクタ(生成時の初期化処理)
	//
	MasterTable(Database db) throws Exception {
		psDt = db.getHandle().prepareStatement("drop table t_master");
		psCt = db.getHandle().prepareStatement(
			"create table t_master (" +
				"id INT PRIMARY KEY," +
				"name VARCHAR(50)," +
				"job VARCHAR(50)" +
			")"
		);
		psDi = db.getHandle().prepareStatement("drop index i_name on t_master");
		psCi = db.getHandle().prepareStatement(
			"create index i_name on t_master(" +
				"name" +
			")"
		);
		psIns = db.getHandle().prepareStatement("insert into t_master (id, name, job) values (?, ?, ?)");
		psDel = db.getHandle().prepareStatement("delete from t_master where id = ?");
		psUpd = db.getHandle().prepareStatement("update t_master set id = ?, name = ?, job = ? where id = ?");
		psSelId = db.getHandle().prepareStatement("select * from t_master where id = ?");
	}
	//
	// テーブル作成
	//
	int create() throws Exception {
		return psCt.executeUpdate();
	}
	//
	// テーブル削除
	//
	int drop() throws Exception {
		return psDt.executeUpdate();
	}
	//
	// インデックス作成
	//
	int createIndex() throws Exception {
		return psCi.executeUpdate();
	}
	//
	// インデックス削除
	//
	int dropIndex() throws Exception {
		return psDi.executeUpdate();
	}
	//
	// 追加
	//
	int insert() throws Exception {
		psIns.setInt(1, id);
		psIns.setString(2, name);
		psIns.setString(3, job);
		return psIns.executeUpdate();
	}
	//
	// 削除
	//
	int delete() throws Exception {
		psDel.setInt(1, id);
		return psDel.executeUpdate();
	}
	//
	// 更新
	//
	int update() throws Exception {
		psUpd.setInt(1, id);
		psUpd.setString(2, name);
		psUpd.setString(3, job);
		psUpd.setInt(4, id);
		return psUpd.executeUpdate();
	}
	//
	// 選択の条件設定(id)
	//
	ResultSet selectId(int id) throws Exception {
		psSelId.setInt(1, id);
		rsSelId = psSelId.executeQuery();
		return rsSelId;
	}
	//
	// 選択の実行(id)
	//
	boolean getSelectId() throws Exception {
		if (rsSelId.next()){
			setId(rsSelId.getInt("id"));
			setName(rsSelId.getString("name"));
			setJob(rsSelId.getString("job"));
			return true;
		}else{
			return false;
		}
	}
}

TableTest.java

//
// マスターテーブルをトランザクションファイルで更新する処理(JDBC)
//
//      環境変数
//              TRANFILENAME
//              COMMITFILENAME
//              ENCODING
//
//      トランザクション(入力) TAB区切りファイル
//              int     kbn     0:削除, 1:追加, 2:修正
//              int     id
//              String  name
//              String  job
//
//      コミット記録ファイル(出力) TAB区切りファイル
//              long    count
//              int     kbn
//              int     id
//              String  name
//              String  job
//
//      マスターテーブル(Microsoft SQL Server)
//              INT          id
//              VARCHAR(50)  name
//              VARCHAR(50)  job
//
import java.io.*;
import java.sql.*;

//
// 定数クラス
//
class Constant {
	// メッセージの先頭に付加するためのプログラム名称
	String PGMNAME = "TableTest";

	// JDBC関係のパラメータ
	String DRIVER = "com.microsoft.sqlserver.jdbc.SQLServerDriver";
	String URL = "jdbc:sqlserver://192.168.0.1:1433;databaseName=testdb";
	String USER = "sa";
	String PASSWORD = "system";

	// トランザクションの処理区分
        int KBN_DELETE = 0;
        int KBN_INSERT = 1;
        int KBN_UPDATE = 2;

	// 実コミット間隔、途中経過表示間隔
        int COMMIT_INTERVAL = 10;
        int SHOW_STATUS_INTERVAL = 10;
}
//
// トランザクションレコードクラス
//
class MasterRecordTran extends MasterRecord {
	int kbn;	// 更新区分

	MasterRecordTran() {
		kbn = 0;
	}

	void initialize() {
		super.initialize();
		kbn = 0;
	}

	String set(String record) {
		if (record != null){
			String[] array = record.split("\t", 2);
			kbn = Integer.valueOf(array[0]);
			super.set(array[1]);
		}
		return record;
	}

	String get() {
		return String.format("%d\t%s", kbn, super.get());
	}

	int getKbn() {
		return kbn;
	}

	void setKbn(int kbn) {
		this.kbn = kbn;
	}
}
//
// コミットレコードクラス
//
class CommitRecord extends MasterRecordTran {
	long count;	// コミット時のトランザクション件数

	CommitRecord() {
		count = 0;
	}

	void initialize(){
		super.initialize();
		count = 0;
	}

	String set(String record) {
		if (record != null){
			String[] array = record.split("\t", 2);
			count = Long.valueOf(array[0]);
			super.set(array[1]);
		}
		return record;
	}

	String get() {
		return String.format("%d\t%s", count, super.get());
	}

	long getCount() {
		return count;
	}

	void setCount(long conut) {
		this.count = count;
	}
}
//
// スイッチクラス
//
class Switch {
	boolean state;
	Switch(){
		state = false;
	}
	Switch(boolean state){
		this.state = state;
	}
	void on(){
		state = true;
	}
	void off(){
		state = false;
	}
	boolean is_on(){
		return (state == true);
	}
	boolean is_off(){
		return (state == false);
	}
}
//
// 引数パラメータクラス
//
class ExtParam {
	String tranFileName = "TableTest_tran.txt";	// トランザクションファイル名
	String commitFileName = "TableTest_commit.txt";	// コミットファイル名
	String encoding = "EUC-JP";	// エンコーディング名

	void getFromEnviron() {
		String env;

		if ((env = System.getenv("TRANFILENAME")) != null){
			tranFileName = env;
		}
		if ((env = System.getenv("COMMITFILENAME")) != null){
			commitFileName = env;
		}
		if ((env = System.getenv("ENCODING")) != null){
			encoding = env;
		}
	}

	String getTranFileName() {
		return tranFileName;
	}

	String getCommitFileName() {
		return commitFileName;
	}

	String getEncoding() {
		return encoding;
	}
}
//
// メインクラス
//
class TableTest {
	//--------------------------------------------------
	// 定数オブジェクト
	//--------------------------------------------------
	Constant cs = new Constant();

	//--------------------------------------------------
	// 引数オブジェクト
	//--------------------------------------------------
	ExtParam ep = new ExtParam();

	//--------------------------------------------------
	// 入出力ファイル
	//--------------------------------------------------
	//
	// トランザクションファイル&レコードオブジェクト
	//
	TextFile tranFile = new TextFile();
	MasterRecordTran tranRec = new MasterRecordTran();
	//
	// コミット記録ファイル&レコードオブジェクト
	//
	TextFile commitFile = new TextFile();
	CommitRecord commitRec = new CommitRecord();

	//--------------------------------------------------
	// データベース
	//--------------------------------------------------
	//
	// データベースオブジェクト
	//
	Database db = new Database(cs.DRIVER, cs.URL, cs.USER, cs.PASSWORD);
	//
	// マスターテーブルオブジェクト
	//
	MasterTable mt = null;

	//--------------------------------------------------
	// 各種スイッチ
	//--------------------------------------------------
	//
	// 処理終了スイッチ
	//
	Switch endSw = new Switch();
	//
	// アボートスイッチ
	//
	Switch abortSw = new Switch();
	//
	// デッドロックスイッチ
	//
	Switch deadlockSw = new Switch();

	//==================================================
	// メイン処理
	//==================================================
	public static void main(String[] args) throws Exception {
		TableTest m = new TableTest();
		try {
			m.openProc();		// オープン処理
			m.initProc();		// 初期処理
		}catch (Exception e){
			m.trapException(e);	// 一般例外処理
		}

 		while (m.endSw.is_off() && m.abortSw.is_off()){
			try {
				m.mainProc();	// 主処理
				if (m.deadlockSw.is_on()){
					m.retryDeadlock();
				}
			}catch (Exception e){
				m.trapException(e);	// 一般例外処理
			}
		}

		try {
			m.endProc();		// 終了処理
		}catch (Exception e){
			m.trapException(e);	// 一般例外処理
		}

		m.closeProc();		// クローズ処理
	}
	//==================================================
	// オープン処理
	//==================================================
	void openProc() throws Exception {
		// 環境変数取得
		ep.getFromEnviron();

		// データベースオープン
		db.open();
		mt = new MasterTable(db);

		// ファイルオープン
		tranFile.open(ep.getTranFileName(), "r", ep.getEncoding());
	}
	//==================================================
	// 初期処理
	//==================================================
	void initProc() throws Exception {
		System.out.println(cs.PGMNAME + " : マスターテーブル更新処理 開始");
//
//		db.setAutoCommitOn();
//		try {
//			mt.dropIndex();
//		}catch (Exception e){
//			System.err.println(cs.PGMNAME + " : インデックスを削除出来ません。このエラーは無視して処理を続行します。 : " +e);
//		}
//		try {
//			mt.drop();
//		}catch (Exception e){
//			System.err.println(cs.PGMNAME + " : テーブルを削除出来ません。このエラーは無視して処理を続行します。 : " +e);
//		}
//		mt.create();
//		mt.createIndex();
//		db.setAutoCommitOff();
//

		// コミット間隔設定
		db.setCommitInterval(cs.COMMIT_INTERVAL);

		// トランザクションの1件目読み込み
		readTran();
		showStatus();
	}
	//==================================================
	// 主処理
	//==================================================
	void mainProc() throws Exception {
		if (tranRec.getKbn() == cs.KBN_DELETE){
			// 削除処理
			doDelete();
		}else if (tranRec.getKbn() == cs.KBN_INSERT){
			// 追加処理
			doInsert();
		}else if (tranRec.getKbn() == cs.KBN_UPDATE){
			// 更新処理
			doUpdate();
		}else{
			System.err.println(cs.PGMNAME + " : 処理区分が不正なので処理できません : " + tranRec.get());
		}

		// コミット処理
		if (db.commit()){
			writeCommit();
		}

		// 次トランザクション読み込み
		readTran();
		showStatus();
	}
	//==================================================
	// 終了処理
	//==================================================
	void endProc() throws Exception {
		System.out.println(cs.PGMNAME + " : マスターテーブル更新処理 " + (abortSw.is_on() ? "異常" : "正常") + "終了");
	}
	//==================================================
	// クローズ処理
	//==================================================
	void closeProc() throws Exception {
		// 最終のロールバック or コミット
		if (abortSw.is_on()){
			db.rollback();
		}else{
			db.resetCommitInterval();
			db.commit();
			deleteCommit();
		}

		// データベースクローズ
		db.close();

		// ファイルクローズ
		tranFile.close();
	}
	//==================================================
	// デッドロックリトライ処理
	//==================================================
	void retryDeadlock() throws Exception {
		System.err.println(cs.PGMNAME + " : " + tranFile.getReadCount() + "件目のトランザクションでデッドロックが発生しました");
		db.rollback();
		db.resetCommitCount();
		reReadTran();
		deadlockSw.off();
		System.err.println(cs.PGMNAME + " : " + tranFile.getReadCount() + "件目のトランザクションからリトライします");
	}
	//==================================================
	// Exceptionハンドリング処理
	//==================================================
	void trapException(Exception e) throws Exception {
		System.err.println(cs.PGMNAME + " : 例外エラーが発生したので強制終了します : " + e);
		abortSw.on();
	}
	//--------------------------------------------------
	// トランザクション読み込み
	//--------------------------------------------------
	void readTran() throws Exception {
		if (tranRec.set(tranFile.read()) == null){
			endSw.on();
		}
	}
	//--------------------------------------------------
	// 途中経過(トランザクション件数)表示
	//--------------------------------------------------
	void showStatus() throws Exception {
		if (endSw.is_off()){
			if ((tranFile.getReadCount() % cs.SHOW_STATUS_INTERVAL) == 0){
				System.out.println(cs.PGMNAME + " : トランザクション読み込み件数 = " + tranFile.getReadCount());
			}
		}
	}
	//--------------------------------------------------
	// トランザクション読み込み直し
	//--------------------------------------------------
	void reReadTran() throws Exception {
		long max_count;
		long count;

		// コッミットファイルからコミット済みトランザクション件数取得
		readCommit();
		max_count = commitRec.getCount();

		// トランザクションファイル読み飛ばし
		tranFile.close();
		tranFile.open(ep.getTranFileName(), "r", ep.getEncoding());
		for (count = 0; count < max_count; count++){
			readTran();
		}
		readTran();
	}
	//--------------------------------------------------
	// コミット時のトランザクション(最終の1件)を記録
	//--------------------------------------------------
	void writeCommit() throws Exception {
		commitFile.open(ep.getCommitFileName(), "w", ep.getEncoding());
		commitRec.set(Long.toString(tranFile.getReadCount()) + "\t" + tranRec.get());
		commitFile.write(commitRec.get());
		commitFile.flush();
		commitFile.close();
	}
	//--------------------------------------------------
	// コミットファイル読み込み
	//--------------------------------------------------
	void readCommit() throws Exception {
		try {
			commitFile.open(ep.getCommitFileName(), "r", ep.getEncoding());
			if (commitRec.set(commitFile.read()) == null){
				commitRec.initialize();
			}
			commitFile.close();
		}catch (FileNotFoundException e){
			commitRec.initialize();
		}
	}
	//--------------------------------------------------
	// コミットファイル削除
	//--------------------------------------------------
	void deleteCommit() throws Exception {
		File f = new File(ep.getCommitFileName());
		f.delete();
	}
	//--------------------------------------------------
	// 削除処理
	//--------------------------------------------------
	void doDelete() throws Exception {
		try {
			mt.setId(tranRec.getId());
			if (mt.delete() == 0){
				System.err.println(cs.PGMNAME + " : 削除データが存在しないので削除できません : " + tranRec.get());
			}
		}catch (SQLException e){
			if (db.is_deadlock(e)){
				deadlockSw.on();
			}else{
				System.err.println(cs.PGMNAME + " : 削除処理でSQL例外エラーが発生したので強制終了します : " + e + " : " + e.getSQLState());
				abortSw.on();
			}
		}
	}
	//--------------------------------------------------
	// 追加処理
	//--------------------------------------------------
	void doInsert() throws Exception {
		try {
			mt.setId(tranRec.getId());
			mt.setName(tranRec.getName());
			mt.setJob(tranRec.getJob());
			if (mt.insert() == 0){
				System.err.println(cs.PGMNAME + " : 追加できません : " + tranRec.get());
			}
		}catch (SQLException e){
			if (db.is_deadlock(e)){
				deadlockSw.on();
			}else if (db.is_dupkey(e)){
				System.err.println(cs.PGMNAME + " : 既にデータが存在するので追加できません : " + tranRec.get());
			}else{
				System.err.println(cs.PGMNAME + " : 追加処理でSQL例外エラーが発生したので強制終了します : " + e + " : " + e.getSQLState());
				abortSw.on();
			}
		}
	}
	//--------------------------------------------------
	// 更新処理
	//--------------------------------------------------
	void doUpdate() throws Exception {
		try {
			mt.setId(tranRec.getId());
			mt.setName(tranRec.getName());
			mt.setJob(tranRec.getJob());
			if (mt.update() == 0){
				System.err.println(cs.PGMNAME + " : 更新データが存在しないので更新できません : " + tranRec.get());
			}
		}catch (SQLException e){
			if (db.is_deadlock(e)){
				deadlockSw.on();
			}else{
				System.err.println(cs.PGMNAME + " : 更新処理でSQL例外エラーが発生したので強制終了します : " + e + " : " + e.getSQLState());
				abortSw.on();
			}
		}
	}
}