Javaでマルチスレッドを処理を安全に記述する方法
0. はじめに
Javaを使用した個人開発にて、Scalaで書かれたライブラリを呼び出す場面があったのですが、
その際にマルチスレッドでの実装をする必要があり、考える機会になったので共有します。
1. 前提
今回の記事では、以下を前提といたします。
ライブラリ側のプロクラムを修正することはできない。
2. 今回触れるパターン
今回は、以下3つのパターンについて扱います。
1. ローカル変数以外が使用されている (インスタンス変数、クラス変数はスレッドアンセーフ)
2. ライブラリの処理結果が標準出力 (System.out) である (System.out はstaticなフィールド)
3. ライブラリが単一のファイルやDBを使用しており、同時に2つ以上の処理を実行すると不整合が発生する
3. 解決策 (私の案)
3.1 パターン1とパターン2について
結論としては、「別プロセスで実行することで使用するメモリ空間を分ける」ことで解決しました。
例えば、以下のようなプログラムをマルチスレッドで呼びたいとします。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
/** * <pre> * 呼び出される側の処理. * パターン1 : ローカル変数以外が使用されていてスレッドセーフではない処理が含まれているケース. * パターン2 : 処理結果が標準出力 (System.out) に出力されるケース. * </pre> * * @author baseballyama * */ public class Callee1 { /** 文字列 */ private static String staticText; private String text; /** * 引数で渡された文字列をクラス内変数に設定しクラス内変数を返却します. * * @return クラス内変数文字列 */ public String execute(final String pText) { Callee1.staticText = pText; this.text = pText; try { // sleepで発生する Exception は今回に限っては無視することにします. Thread.sleep((long) (Math.random() * 1000)); } catch (InterruptedException e) { } return this.text; } } |
この処理は、スタティック変数とインスタンス変数を持っているため、
以下の処理では意図した動作をしません。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * <pre> * マルチスレッド処理を実行するクラスです. * * <pre> * * @author baseballyama * */ public class CallerNgPattern { /** * <pre> * メイン処理. * {@link Callee1#execute(String)} をマルチスレッドで呼び出します. * 誤ったパターンで実装されているため処理が正常に完了しません。 * </pre> * * @param args 標準入力 * @throws InterruptedException */ public static void main(String... args) throws InterruptedException { execute(); } /** * <pre> * メイン処理. * {@link Callee1#execute(String)} をマルチスレッドで呼び出します. * CallerクラスがStatic変数/インスタンス変数を使用しているため、 * 期待しない結果が返却されます。 * </pre> * * @throws InterruptedException */ private static void execute() throws InterruptedException { Callee1 callee1 = new Callee1(); ExecutorService es = Executors.newFixedThreadPool(10); try { for (int i = 0; i < 100; i++) { String text = String.valueOf(i); es.execute(() -> { String result = callee1.execute(text); if (!result.equals(text)) { System.err.println("異常が発生しました... [期待値 : " + text + ", 実際値 : " + result + "]"); System.exit(1); } else { System.out.println(result); } }); } } finally { es.shutdown(); es.awaitTermination(1, TimeUnit.MINUTES); } System.out.println("正常に処理が完了しました。"); } } |
そこで、実行プロセスごと分けることで、問題を解消しました。
その実装が以下です。読みやすいように2クラスに分けました。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
import java.io.BufferedReader; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.URL; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * <pre> * マルチスレッド処理を実行するクラスです. * * <pre> * * @author baseballyama * */ public class CallerOkPattern { /** * <pre> * メイン処理. * {@link Callee1#execute(String)} をマルチスレッドで呼び出します. * 正しいパターンで実装されているため処理が正常に完了します。 * </pre> * * @param args 標準入力 * @throws InterruptedException */ public static void main(String... args) throws InterruptedException { ExecutorService es = Executors.newFixedThreadPool(10); try { for (int i = 0; i < 100; i++) { final int fi = i; final String text = String.valueOf(fi); es.execute(() -> { String result = ""; try { result = execute(fi); } catch (IOException e) { System.err.println("異常が発生しました... [期待値 : " + text + ", 実際値 : " + result + "]"); System.exit(1); } if (!result.equals(text)) { System.err.println("異常が発生しました... [期待値 : " + text + ", 実際値 : " + result + "]"); System.exit(1); } else { System.out.println(result); } }); } } finally { es.shutdown(); es.awaitTermination(1, TimeUnit.MINUTES); } } /** * コマンド経由で {@link CallerOkPatternSub#main(String...)} を経由して {@link Callee1#execute(String)} * を呼び出します. * * @param i {@link Callee1#execute(String)} に渡す数値 * @return {@link Callee1#execute(String)} からの戻り値 * @throws IOException */ private static String execute(final int i) throws IOException { String callee1Filepath = getAbsolutePath(CallerOkPatternSub.class.getSimpleName()); File file = new File(callee1Filepath); String[] command = {"java", getNameWithoutExtension(file), String.valueOf(i)}; Runtime runtime = Runtime.getRuntime(); Process p = runtime.exec(command, null, file.getParentFile()); InputStream ism = p.getInputStream(); InputStreamReader reader = new InputStreamReader(ism, "Shift_JIS"); BufferedReader br = new BufferedReader(reader); String result = ""; String buf = ""; while ((buf = br.readLine()) != null) { result += buf; } return result; } /** * <pre> * 指定したJavaクラスが格納されているパスを取得します. * 注意 : この処理は今回の記事の本質と関係ないので、処理を省略しています。 * 具体的には、この処理はjarファイル内のファイルを検索する場合の処理が正しく実装されてません。 * </pre> * * @param className クラス名 * @return */ private static String getAbsolutePath(String className) { className = className + ".class"; URL url = CallerOkPattern.class.getResource(className); if (url == null) { throw new RuntimeException("存在しないクラス名が指定されました"); } String urlString = url.toString(); if (urlString.substring(0, 5).equals("file:")) { urlString = urlString.substring(6); } return urlString; } /** * 拡張子なしのファイル名を取得します. * * @param file ファイル * @return ファイル名 (拡張子なし) */ private static String getNameWithoutExtension(File file) { String fileName = file.getName(); int index = fileName.lastIndexOf('.'); if (index != -1) { return fileName.substring(0, index); } return ""; } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
import java.io.IOException; /** * <pre> * マルチスレッド処理を実行するクラスです. * * <pre> * * @author baseballyama * */ public class CallerOkPatternSub { /** * <pre> * メイン処理. * Coller#execute をマルチスレッドで呼び出します. * </pre> * * @param args 標準入力 * @throws InterruptedException * @throws IOException */ public static void main(String... args) throws InterruptedException, IOException { Callee1 callee1 = new Callee1(); String text = args[0]; String result = callee1.execute(text); if (!result.equals(text)) { System.err.println("異常が発生しました... [期待値 : " + text + ", 実際値 : " + result + "]"); System.exit(1); } else { System.out.println(result); } } } |
実行プロセスごと分けることで、疑似的にスレッドセーフな実装になりました。
3.2 パターン3について
「synchronized ブロックを使用する」ことで解決しました。
例えば、以下のようなプログラムをマルチスレッドで呼びたいとします。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; /** * <pre> * 呼び出される側の処理. * パターン3 : 処理の中でファイルやDBを使用していることにより同時にnプロセスを実行できないケース. * </pre> * * @author baseballyama * */ public class Callee2 { /** 文字列保存ファイル名 */ private static final String TEMP_FILE_NAME = "DIu8tV85Rc.tmp"; /** * 引数で渡された文字列をファイルに保存しファイルの文字列を返却します. * * @return ファイル文字列 * @throws IOException * @throws FileNotFoundException */ public String execute(final String pText) throws FileNotFoundException, IOException { saveText(pText); try { // sleepで発生する Exception は今回に限っては無視することにします. Thread.sleep((long) (Math.random() * 1000)); } catch (InterruptedException e) { } return loadText(); } /** * ファイルの文字列を読み込みます. * * @return ファイル文字列 * @throws FileNotFoundException * @throws IOException */ private static String loadText() throws FileNotFoundException, IOException { StringBuilder builder = new StringBuilder(); try (BufferedReader reader = new BufferedReader(new FileReader(getTempFile()))) { String string = reader.readLine(); while (string != null) { builder.append(string); string = reader.readLine(); if (null != string) { builder.append(File.separator); } } } return builder.toString(); } /** * 文字列をファイルに保存します. (ファイルが既に存在する場合は上書きされます) * * @param text 保存対象文字列 * @throws IOException */ private static void saveText(final String text) throws IOException { FileWriter fw = new FileWriter(getTempFile()); try (PrintWriter pw = new PrintWriter(new BufferedWriter(fw))) { pw.write(text); } } /** * 一時ファイルを取得. * * @return 一時ファイル * @throws IOException */ private static File getTempFile() throws IOException { File tempFile = new File(System.getProperty("java.io.tmpdir"), TEMP_FILE_NAME); if (!tempFile.exists()) { tempFile.createNewFile(); } return tempFile; } } |
この処理は単一のファイルを使用しているため、
ExecutorService や パターン1,2 での解決方法では、
意図した動作をしません。
例として、ExecutorService を使用した誤った実装を記載します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * <pre> * マルチスレッド処理を実行するクラスです. * * <pre> * * @author baseballyama * */ public class CallerNgPattern { /** * <pre> * メイン処理. * {@link Callee1#execute(String)} をマルチスレッドで呼び出します. * 誤ったパターンで実装されているため処理が正常に完了しません。 * </pre> * * @param args 標準入力 * @throws InterruptedException */ public static void main(String... args) throws InterruptedException { execute(); } /** * <pre> * メイン処理. * {@link Callee1#execute(String)} をマルチスレッドで呼び出します. * Callerクラスが単一ファイルを使用しているため、 * 期待しない結果が返却されます。 * </pre> * * @throws InterruptedException */ private static void execute() throws InterruptedException { Callee2 callee2 = new Callee2(); ExecutorService es = Executors.newFixedThreadPool(10); try { for (int i = 0; i < 100; i++) { String text = String.valueOf(i); es.execute(() -> { String result = ""; // 今回はExceptionハンドリングが主題ではないため、 // Exceptionが発生した場合は再実行する仕様とします。 try { result = callee2.execute(text); } catch (IOException e) { System.out.println("実行の途中でファイルIOに失敗しました。再実行してください。[原因 : " + e.getMessage() + "]"); System.exit(1); } if (!result.equals(text)) { System.err.println("異常が発生しました... [期待値 : " + text + ", 実際値 : " + result + "]"); System.exit(1); } else { System.out.println(result); } }); } } finally { es.shutdown(); es.awaitTermination(1, TimeUnit.MINUTES); } System.out.println("正常に処理が完了しました。"); } } |
そこで、「synchronizedブロック」 を使用することでこの問題を解決しました。
但し、この方法は、性能が悪化しますので、利用に際しては性能要件をよく確認することをおすすめします。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * <pre> * マルチスレッド処理を実行するクラスです. * * <pre> * * @author baseballyama * */ public class CallerOkPattern { /** * <pre> * メイン処理. * {@link Callee1#execute(String)} をマルチスレッドで呼び出します. * 誤ったパターンで実装されているため処理が正常に完了しません。 * </pre> * * @param args 標準入力 * @throws InterruptedException */ public static void main(String... args) throws InterruptedException { execute(); } /** * <pre> * メイン処理. * {@link Callee1#execute(String)} をマルチスレッドで呼び出します. * synchronizedブロックを追加したことで期待した結果が返却されます。 * </pre> * * @throws InterruptedException */ private static void execute() throws InterruptedException { Callee2 callee2 = new Callee2(); ExecutorService es = Executors.newFixedThreadPool(10); try { for (int i = 0; i < 100; i++) { String text = String.valueOf(i); es.execute(() -> { String result = ""; // 今回はExceptionハンドリングが主題ではないため、 // Exceptionが発生した場合は再実行する仕様とします。 try { synchronized (callee2) { result = callee2.execute(text); } } catch (IOException e) { System.out.println("実行の途中でファイルIOに失敗しました。再実行してください。[原因 : " + e.getMessage() + "]"); System.exit(1); } if (!result.equals(text)) { System.err.println("異常が発生しました... [期待値 : " + text + ", 実際値 : " + result + "]"); System.exit(1); } else { System.out.println(result); } }); } } finally { es.shutdown(); es.awaitTermination(1, TimeUnit.MINUTES); } System.out.println("正常に処理が完了しました。"); } } |
これで、意図した挙動になりました。
何度も言いますが、synchronized ブロック内は1多重での実行となるため、
性能が悪化します。性能要件を確認の上対処方法を検討してください。
4.最後に - とても重要なこと
ライブラリを使用する側は、ライブラリ側がどのような実装になっているかを
完璧に把握することはとても難しいですし、時間もかかります。
そこで、「テスト」を実施して処理に問題がないを確認することが重要になってきます。
JUnitを使用した単体テストであれば、 ExecutorService などを使用したテストコードを書くべきです。
また、結合テストでは、ケースによると思いますが、
JUnit や Apache JMeter aを使用したテストコードを記述するべきです。