001package org.opengion.plugin.daemon;
002
003import java.io.File;
004import java.util.Date;
005import java.util.Locale;                                                                                        // 7.2.9.4 (2020/11/20)
006
007import javax.jms.JMSException;
008import javax.jms.Message;
009import javax.jms.MessageListener;
010import javax.jms.TextMessage;
011
012// import org.opengion.fukurou.util.BizUtil;
013import org.opengion.fukurou.business.BizUtil;
014import org.opengion.fukurou.queue.QueueInfo;
015import org.opengion.fukurou.queue.QueueReceive;
016import org.opengion.fukurou.queue.QueueReceiveFactory;
017import org.opengion.fukurou.util.HybsTimerTask;
018import org.opengion.fukurou.util.StringUtil;
019import org.opengion.hayabusa.common.HybsSystem;
020import org.opengion.hayabusa.common.HybsSystemException;
021import org.opengion.hayabusa.queue.DBAccessQueue;
022
023/**
024 * メッセージキュー受信 メッセージキューの受信処理を行います。
025 *
026 * @og.group メッセージ連携
027 *
028 * @og.rev 5.10.15.2 (2019/09/20) 新規作成
029 *
030 * @version 5.0
031 * @author oota
032 * @since JDK7
033 *
034 */
035public class Daemon_QueueReceive extends HybsTimerTask {
036        private int loopCnt ;
037        private QueueReceive queueReceive ;
038
039        private static final int LOOP_COUNTER = 24;
040        private static final char FPSC = File.pathSeparatorChar ;                                       // 7.2.9.4 (2020/11/20) システムに依存するパス区切り文字
041
042        private final String CLOUD_SQS_ACCESS_KEY = HybsSystem.sys("CLOUD_SQS_ACCESS_KEY");
043        private final String CLOUD_SQS_SECRET_KEY = HybsSystem.sys("CLOUD_SQS_SECRET_KEY");
044        private final String MQ_QUEUE_TYPE;
045        private final String MQ_QURUE_SERVER_URL = HybsSystem.sys("MQ_QUEUE_SERVER_URL");
046        private final String MQ_QUEUE_RECEIVE_LISTENER =HybsSystem.sys("MQ_QUEUE_RECEIVE_LISTENER");
047
048        private final String SYSTEM_ID = HybsSystem.sys("SYSTEM_ID");
049        private final String USER_ID   = "CYYYYY";
050        private final String PG_ID;
051        private final String DMN_NAME  = "QueueReceiveDMN";
052        private final DBAccessQueue dbAccessQueue;
053
054        private final String REAL_PATH = HybsSystem.sys("REAL_PATH");                           // 7.2.9.4 (2020/11/20)
055
056        /**
057         * コンストラクター
058         * 初期処理を行います。
059         *
060         * @og.rev 7.2.9.4 (2020/11/20) spotbugs:呼び出したメソッドの Locale パラメータの使用を検討する
061         */
062        public Daemon_QueueReceive() {
063                super();
064
065                // パラメータの設定
066                // 7.2.9.4 (2020/11/20) PMD:Avoid if (x != y) ..; else ..;
067                if(StringUtil.isNull(HybsSystem.sys("MQ_QUEUE_TYPE"))) {
068                        throw new RuntimeException("システムリソースにMQ_QUEUE_TYPEを登録して下さい");
069                }else {
070//                      MQ_QUEUE_TYPE = HybsSystem.sys("MQ_QUEUE_TYPE").toUpperCase();
071                        MQ_QUEUE_TYPE = HybsSystem.sys("MQ_QUEUE_TYPE").toUpperCase( Locale.JAPAN );    // 7.2.9.4 (2020/11/20)
072                        PG_ID = StringUtil.cut("QueRec" + MQ_QUEUE_TYPE, 10);
073                }
074
075                dbAccessQueue = new DBAccessQueue(SYSTEM_ID, USER_ID, PG_ID, DMN_NAME);
076
077//              // パラメータの設定
078//              if(!StringUtil.isNull(HybsSystem.sys("MQ_QUEUE_TYPE"))) {
079////                    MQ_QUEUE_TYPE = HybsSystem.sys("MQ_QUEUE_TYPE").toUpperCase();
080//                      MQ_QUEUE_TYPE = HybsSystem.sys("MQ_QUEUE_TYPE").toUpperCase( Locale.JAPAN );    // 7.2.9.4 (2020/11/20)
081//                      PG_ID = StringUtil.cut("QueRec" + MQ_QUEUE_TYPE, 10);
082//              }else {
083//                      throw new RuntimeException("システムリソースにMQ_QUEUE_TYPEを登録して下さい");
084//              }
085//
086//              dbAccessQueue = new DBAccessQueue(SYSTEM_ID, USER_ID, PG_ID, DMN_NAME);
087        }
088
089        /**
090         * 初期処理 MQサーバに接続します。
091         *
092         * @og.rev 7.2.9.4 (2020/11/20) spotbugs:呼び出したメソッドの Locale パラメータの使用を検討する
093         */
094        @Override
095        public void initDaemon() {
096                // 開始ログO
097                final StringBuilder errMsg = new StringBuilder();
098                if (MQ_QUEUE_TYPE == null) {
099                        errMsg.append("MQ_QUEUE_TYPE");
100                }
101                if (MQ_QURUE_SERVER_URL == null) {
102                        errMsg.append(" MQ_QUEUE_SERVER_URL");
103                }
104
105                if (errMsg.length() > 0) {
106                        errMsg.append(" キュータイプを特定するために、左記のシステムリソースを登録して下さい。");
107                        throw new HybsSystemException(errMsg.toString());
108                }
109
110//              final String queueType = MQ_QUEUE_TYPE.toUpperCase();
111                final String queueType = MQ_QUEUE_TYPE.toUpperCase( Locale.JAPAN );     // 7.2.9.4 (2020/11/20)
112
113                // 開始ログ
114                System.out.println("MQキュータイプ:" + queueType);
115                System.out.println("MQサーバーURL:" + MQ_QURUE_SERVER_URL);
116
117                queueReceive = QueueReceiveFactory.newQueueReceive(queueType);
118
119                queueReceive.connect(MQ_QURUE_SERVER_URL, CLOUD_SQS_ACCESS_KEY, CLOUD_SQS_SECRET_KEY);
120        }
121
122        /**
123         * 開始処理 タイマータスクのデーモン処理の開始ポイントです。
124         */
125        @Override
126        protected void startDaemon() {
127                if (loopCnt % LOOP_COUNTER == 0) {
128                        loopCnt = 1;
129                        System.out.println();
130                        System.out.print(toString() + " " + new Date() + " ");
131                } else {
132                        // 対象 キュー名(グループ名)とbizlogic名の取得処理
133                        final String[][] ge67vals = dbAccessQueue.setlectGE67();
134                        // キュー情報登録チェック
135                        if (ge67vals.length == 0) {
136                                final String errMsg = "GE67にキュー情報が登録されていません。";
137                                throw new RuntimeException(errMsg);
138                        }
139                        // MQとSQSで処理を分岐
140                        // MQ:指定キューIDからキューメッセージを取得
141                        // SQS:キューメッセージを取得してからキューID(グループID)を取得
142                        switch (MQ_QUEUE_TYPE) {
143                                case "MQ":
144                                        processMq(ge67vals);
145                                        break;
146                                case "SQS":
147                                        processSqs(ge67vals);
148                                        break;
149                                default:
150                                        final String errMsg = "リソース(MQ_QUEUE_TYPE)の値が不正です。:" + MQ_QUEUE_TYPE;
151                                        throw new RuntimeException(errMsg);
152                        }
153
154                        loopCnt++;
155                }
156        }
157
158        /**
159         * MQ用の処理
160         * GE67に登録されているキューIDの、
161         * メッセージキューを取得して処理を行います。
162         *
163         * @param ge67vals GE67の配列データ
164         */
165        private void processMq(final String[][] ge67vals) {
166                boolean listenerMode = false;
167
168                if("true".equals(MQ_QUEUE_RECEIVE_LISTENER)) {
169                        listenerMode = true;
170                }
171
172                if(listenerMode) {
173                        // リスナーの初期化
174                        queueReceive.closeListener();
175                }
176
177                // ge67のキューリスト分繰り返します
178                for (int row = 0; row < ge67vals.length; row++) {
179                        final String queueId = ge67vals[row][0];
180                        final String bizLogicId = ge67vals[row][1];
181
182                        if(listenerMode) {
183                                // リスナーを設定して、動的な受信処理(MQ専用)
184                                final QueueReceiveListener listener = new QueueReceiveListener(queueId, bizLogicId);
185                                queueReceive.setListener(queueId, listener);
186                        }else {
187                                // 1件の受信処理
188                                final QueueInfo queueInfo = queueReceive.receive(queueId);
189                                if (queueInfo != null) {
190                                        processMessage(queueId, bizLogicId, queueInfo.getMessage());
191                                        // 1件処理を行ったら処理を終了します。
192                                        break;
193                                }
194                        }
195                }
196        }
197
198        /**
199         * SQS用の処理
200         * SQSはグループIDを指定して、キューを取得することはできず、
201         * 任意のキューを1つ取得してから、
202         * 判定処理を行います。
203         * GE67に登録されていないグループIDのキューが取得された場合は、
204         * GE68にエラーレコードを登録します。
205         *
206         * @param ge67vals GE67の配列データ
207         */
208        private void processSqs(final String[][] ge67vals) {
209                // 下記はSQSの場合(キューを1件取得して処理)
210                final QueueInfo queueInfo = queueReceive.receive(null);
211
212                // キューが未取得の場合
213                if(queueInfo == null) {
214                        return;
215                }
216
217                // 受信したキューを処理
218                final String groupId = queueInfo.getSqsFifoGroupId();
219                Boolean existsFlg = false;
220                // valsにグループIDのレコードが存在するか検索
221                for (int row = 0; row < ge67vals.length; row++) {
222                        final String queueId = ge67vals[row][0];
223
224                        if (groupId != null && groupId.equals(queueId)) {
225                                // 該当レコードあり
226                                final String bizLogicId = ge67vals[row][1];
227                                processMessage(queueId, bizLogicId, queueInfo.getMessage());
228
229                                existsFlg = true;
230                                break;
231                        }
232                }
233
234                if (!existsFlg) {
235                        // 該当groupIdの未登録エラー
236                        // 処理番号生成
237                        final String syoriNo = dbAccessQueue.generateSyoriNo();
238                        dbAccessQueue.insertGE68(groupId, syoriNo, null, queueInfo.getMessage());
239                        dbAccessQueue.updateGE68Error(syoriNo, "SQSキューに設定されているグループIDが、GE67に未登録です。");
240                }
241        }
242
243        /**
244         * キャンセル処理
245         * タイマータスクのデーモン処理の終了ポイントです。
246         *
247         * @return キャンセルできれば、true
248         */
249        @Override
250        public boolean cancel() {
251                if (queueReceive != null) {
252                        queueReceive.close();
253                }
254
255                return super.cancel();
256        }
257
258        /**
259         * メッセージの処理
260         *  受信したメッセージをbizLogicに渡して、
261         *  処理を実行します。
262         *
263         * @param queueId キューID
264         * @param bizLogicId ビズロジックID
265         * @param msgText 受信メッセージ
266         */
267        private void processMessage(final String queueId, final String bizLogicId, final String msgText) {
268                String syoriNo = "";
269                try {
270                        // 処理番号生成
271                        syoriNo = dbAccessQueue.generateSyoriNo();
272
273                        // 管理テーブル登録
274                        dbAccessQueue.insertGE68(queueId, syoriNo, bizLogicId, msgText);
275
276                        // bizLogicの処理を実行
277                        callActBizLogic(SYSTEM_ID, bizLogicId, msgText);
278
279                        // 管理テーブル更新(完了)
280                        dbAccessQueue.updateGE68(syoriNo, DBAccessQueue.FGKAN_END);
281
282                } catch (Throwable te) {
283                        // bizLogicでのエラーはログの未出力して、処理を継続します。
284                        // bizLogicのエラー情報はCauseに格納されているため、Causeから取得します。
285                        String errMessage = null;
286                        if (te.getCause() != null) {
287                                // causeが設定されている場合のエラー情報
288                                errMessage = te.getCause().getMessage();
289                        } else {
290                                // causeが未設定の場合のエラー情報
291                                errMessage = te.getMessage();
292                        }
293                        System.out.println(errMessage);
294                        try {
295                                // エラーテーブルに登録
296                                dbAccessQueue.updateGE68Error(syoriNo, errMessage);
297                        } catch (Exception e) {
298                                // ここでのエラーはスルーします。
299                                System.out.println("管理テーブル登録エラー:" + e.getMessage());
300                        }
301                }
302        }
303
304        /**
305         * bizLogic処理の呼び出し
306         * 必要なパス情報をリソースから取得して、
307         * BizUtil.actBizLogicにパス情報を渡すことで、
308         * bizLogicの処理を行います。
309         *
310         * @og.rev 7.2.9.4 (2020/11/20) spotbugs:null になっている可能性があるメソッドの戻り値を利用している
311         *
312         * @param systemId  システムID
313         * @param logicName ロジックファイル名
314         * @param msgText   メッセージ
315         * @throws Throwable エラー情報
316         */
317        private void callActBizLogic(final String systemId, final String logicName, final String msgText) throws Throwable {
318                // 対象 クラスパスの生成
319                // HotDeploy機能を使用する場合に、Javaクラスをコンパイルするためのクラスパスを設定します。
320                // 対象となるクラスパスは、WEB-INF/classes 及び WEB-INF/lib/*.jar です。
321                // bizLogicTag.javaのコードを移植
322                final String classDir = REAL_PATH + HybsSystem.sys( "BIZLOGIC_CLASS_PATH" );            // bizの下のパス
323                final String webIinf  = REAL_PATH + "WEB-INF" + File.separator ;
324
325                final StringBuilder sb = new StringBuilder().append('.').append(FPSC);
326
327                final File lib = new File( webIinf + "lib");
328                final File[] libFiles = lib.listFiles();
329                if( libFiles != null ) {
330                        // 7.2.9.4 (2020/11/20) PMD:This for loop can be replaced by a foreach loop
331                        for( final File file : libFiles ) {
332                                sb.append( file.getAbsolutePath() ).append(FPSC);
333                        }
334//                      for (int i = 0; i < libFiles.length; i++) {
335//                              sb.append( libFiles[i].getAbsolutePath() ).append(FPSC);
336//                      }
337                }
338
339                // 上記で生成したクラスパスをclassPathに格納
340                final String classPath =
341                        sb.append( webIinf ).append( "classes" ).append(FPSC)
342                          .append( classDir ).append(FPSC)              // bizの下のパス
343                          .toString();
344
345                // ソースパス情報の生成
346                final String  srcDir        = REAL_PATH + HybsSystem.sys( "BIZLOGIC_SRC_PATH" );
347                final boolean isAutoCompile = HybsSystem.sysBool( "BIZLOGIC_AUTO_COMPILE" );
348                final boolean isHotDeploy   = HybsSystem.sysBool( "BIZLOGIC_HOT_DEPLOY" );
349
350                // bizLogicに渡すパラメータ
351                final String[] keys = new String[] { "message" };
352                final String[] vals = new String[] { msgText };
353
354                // bizLogic処理の実行
355                BizUtil.actBizLogic( srcDir, classDir, isAutoCompile, isHotDeploy, classPath, systemId, logicName, keys, vals );
356        }
357
358//      7.2.9.4 (2020/11/20) spotbugs:null になっている可能性があるメソッドの戻り値を利用している
359//      private void callActBizLogic(final String systemId, final String logicName, final String msgText) throws Throwable {
360//              // 対象 クラスパスの生成
361//              // HotDeploy機能を使用する場合に、Javaクラスをコンパイルするためのクラスパスを設定します。
362//              // 対象となるクラスパスは、WEB-INF/classes 及び WEB-INF/lib/*.jar です。
363//              // bizLogicTag.javaのコードを移植
364//              final StringBuilder sb = new StringBuilder();
365//              sb.append('.').append(File.pathSeparatorChar);
366//              final File lib = new File(HybsSystem.sys("REAL_PATH") + "WEB-INF" + File.separator + "lib");
367//              final File[] libFiles = lib.listFiles();
368//              for (int i = 0; i < libFiles.length; i++) {
369//                      sb.append(libFiles[i].getAbsolutePath()).append(File.pathSeparatorChar);
370//              }
371//              sb.append(HybsSystem.sys("REAL_PATH") + "WEB-INF" + File.separator + "classes").append(File.pathSeparatorChar);
372//              // bizの下のパス
373//              sb.append(HybsSystem.sys("REAL_PATH") + HybsSystem.sys("BIZLOGIC_CLASS_PATH")).append(File.pathSeparatorChar);
374//              // 上記で生成したクラスパスをclassPathに格納
375//              final String classPath = sb.toString();
376//
377//              // ソースパス情報の生成
378//              final String srcDir = HybsSystem.sys("REAL_PATH") + HybsSystem.sys("BIZLOGIC_SRC_PATH");
379//              final String classDir = HybsSystem.sys("REAL_PATH") + HybsSystem.sys("BIZLOGIC_CLASS_PATH");
380//              final boolean isAutoCompile = HybsSystem.sysBool("BIZLOGIC_AUTO_COMPILE");
381//              final boolean isHotDeploy = HybsSystem.sysBool("BIZLOGIC_HOT_DEPLOY");
382//
383//              // bizLogicに渡すパラメータ
384//              final String[] keys = new String[] { "message" };
385//              final String[] vals = new String[] { msgText };
386//
387//              // bizLogic処理の実行
388//              BizUtil.actBizLogic(srcDir, classDir, isAutoCompile, isHotDeploy, classPath, systemId, logicName, keys, vals);
389//      }
390
391        /**
392         * 受信処理リスナー用のインナークラス
393         * QueueReceiveリスナークラス リスナー用のクラスです。
394         *  MQに設定することで、メッセージが受信されると、
395         * onMessageメソッドが実行されます。
396         *
397         * @og.rev 7.2.9.4 (2020/11/20) private final 追加
398         */
399//      class QueueReceiveListener implements MessageListener {
400        private final class QueueReceiveListener implements MessageListener {
401//              private String queueId = "";
402//              private String bizLogicId = "";
403                private final String queueId ;
404                private final String bizLogicId ;
405
406                /**
407                 * コンストラクター 初期処理を行います。
408                 *
409                 * @param quId  キューID
410                 * @param bizId ビズロジックID
411                 */
412                public QueueReceiveListener(final String quId, final String bizId) {
413                        queueId    = quId;
414                        bizLogicId = bizId;
415                }
416
417                /**
418                 * メッセージ受信処理 MQサーバにメッセージが受信されると、 メソッドの処理が行われます。
419                 *
420                 * @param message 受信メッセージ
421                 */
422                @Override
423                public void onMessage(final Message message) {
424                        // 要求番号 : ここでは使用していません。
425                        final String ykno = "";
426
427                        // メッセージ受信
428                        final TextMessage msg = (TextMessage) message;
429                        String msgText = "";
430
431                        try {
432                                // キューサーバのメッセージを取得
433                                msgText = msg.getText();
434
435                                // メーッセージの受信応答を返します。
436                                msg.acknowledge();
437
438                                processMessage(queueId, bizLogicId, msgText);
439
440                        } catch (JMSException jmse) {
441                                try {
442                                        // 管理テーブル更新
443                                        // 管理テーブル更新(エラー)
444                                        dbAccessQueue.updateGE68(ykno, DBAccessQueue.FGKAN_ERROR);
445                                } catch (Exception e) {
446                                        // ここでのエラーはスルーします。
447                                        System.out.println("管理テーブル登録エラー:" + e.getMessage());
448                                }
449
450                                throw new HybsSystemException("bizLogicの処理中にエラーが発生しました。" + jmse.getMessage());
451                        }
452                }
453        }
454}