SeamFramework.orgCommunity Documentation

第22章 非同期性とメッセージング

22.1. Seam でのメッセージング
22.1.1. 設定
22.1.2. メッセージ送信
22.1.3. メッセージ駆動型 Bean を使用したメッセージの受信
22.1.4. クライアントでのメッセージの受信
22.2. 非同期性
22.2.1. 非同期メソッド
22.2.2. Quartz ディスパッチャを使った非同期メソッド
22.2.3. 非同期イベント
22.2.4. 非同期の呼び出しによる例外を処理する

Seam は、とても簡単に web 要求から非同期に処理を実行させることができます。普通、Java EE で非同期を行うことを考えたときに、JMS を使うことを想定するでしょう。これは確かに、Seam の場合でも、その命題に対する一つのアプローチで、厳格でよく定義されたサービス要件を持っているのであれば正しい方法でしょう。Seam は Seam コンポーネントを使って、とても簡単に JMS メッセージの送受信を行うことができます。

しかし多くの使用事例ではJMSは過剰です。 Seamはシンプルな非同期メソッドとイベント機能を ディスパッチャ の選択に応じてレイヤ化します。

この章では、まず最初に Seam がどのように JMS を単純化するかをガイドし、それから、単純な非同期メソッドとイベント手法の使い方について説明します。

Seam は Seam コンポーネントの JMS メッセージの送受信を容易にしています。メッセージ発行者とメッセージ受信者の両方が Seam コンポーネントになることができます。

まず最初に、queue と topic メッセージ発行者を設定し、それから、メッセージ交換の行い方を示した例を見てください。

EJB3 メッセージ駆動型 Bean を利用してメッセージ処理が可能です。 メッセージ駆動型 Bean は Seam コンポーネントとすることも可能です。 この場合、イベントまたはアプリケーションスコープの Seam コンポーネントのインジェクトが可能です。支払い受信のサンプルを示します。支払い処理を委譲しています。

最初に、メッセージの受け取りのために、メッセージ駆動型 Bean を作成します。

@MessageDriven(activationConfig = {

    @ActivationConfigProperty(
        propertyName = "destinationType",
        propertyValue = "javax.jms.Queue"
    ),
    @ActivationConfigProperty(
        propertyName = "destination",
        propertyValue = "queue/paymentQueue"
    )
})
@Name("paymentReceiver")
public class PaymentReceiver implements MessageListener
{
   @Logger private Log log;
   @In(create = true) private PaymentProcessor paymentProcessor;
    
   @Override
   public void onMessage(Message message)
   {
      try
      {
         paymentProcessor.processPayment((Payment) ((ObjectMessage) message).getObject());
      } 
      catch (JMSException ex)
      {
         log.error("Message payload did not contain a Payment object", ex);
      } 
   }
}

それから、受信者が、payment の処理を委譲している Seam コンポーネントを実装します。

@Name("paymentProcessor")

public class PaymentProcessor
{
   @In private EntityManager entityManager;
   public void processPayment(Payment payment)
   {
      // perhaps do something more fancy
      entityManager.persist(payment);
   }
}

もし、メッセージ駆動型 Bean のトランザクション処理を実行したいのであれば、XA データソースで動かしていることを確認すべきです。そうでなければ、データベーストランザクションがコミットされたが、後続の処理でメッセージの障害が発生した場合に、データベースの変更をロールバックすることができないでしょう。

非同期のイベントやメソッドのコールは基礎となるディスパッチャのメカニズムと同等のサービスが期待されます。 ScheduledThreadPoolExecutor をベースとするデフォルトのディスパッチャは効率的な働きをしますが、 永続非同期のタスクに対応していないためタスクが実際に実行されるかは保証されません。 EJB 3.0 に対応する環境で作業している場合は次の行を components.xml に追加します。


<async:timer-service-dispatcher/>

これを行うと非同期のタスクがコンテナの EJB タイマーサービスによって処理されるようになります。 タイマーサービスについて十分な知識がなくても心配する必要はありません。 Seam で非同期メソッドを使用する場合は直接タイマーサービスを操作する必要はありません。 理解しておくべき重要な部分とは、 適切に実装されている EJB 3.0 には永続タイマーを使用するオプションがあり、 これによりタスクが最終的には処理されることを保証します。

別の方法としては、 オープンソースの Quartz ライブラリを使って非同期メソッドを管理する方法です。この場合、 EAR に Quartz ライブラリ JAR (lib ディレクトリにある) を同梱してから application.xml で Java モジュールとして宣言する必要があります。 Quartz ディスパッチャはクラスパスに Quartz プロパティファイルを追加すると設定可能になります。 seam.quartz.properties という名前にしてください。 また、 次の行を components.xml に追加して Quartz ディスパッチャをインストールする必要があります。


<async:quartz-dispatcher/>

デフォルトの ScheduledThreadPoolExecutor の Seam API、 EJB3 Timer、 Quartz Scheduler の大部分は同じになります。 components.xml に 1 行追加するだけでプラグアンドプレイが可能です。

最も単純なかたちでは、非同期呼び出しは、メソッド呼び出しを呼び出し側に対して非同期に (異なるスレッドで) 処理させるだけです。 私たちは通常、クライアントに即座に応答を返し、重い仕事をバックグラウンドで処理させたい場合に、非同期呼び出しを使います。 このパターンは、クライアントが処理結果をサーバへ自動的にポーリングできるような、AJAXを使用するアプリケーションでとても効果的です。

EJBコンポーネントでは、ローカルインタフェースをアノテートしてメソッドが非同期に処理されるよう指定します。

@Local

public interface PaymentHandler
{
    @Asynchronous
    public void processPayment(Payment payment);
}

(JavaBean コンポーネントでは、 望むならコンポーネントの実装クラスをアノテートすることができます)

非同期性の使用はbeanクラスに透過的です。

@Stateless

@Name("paymentHandler")
public class PaymentHandlerBean implements PaymentHandler
{
    public void processPayment(Payment payment)
    {
        //do some work!
    }
}

そしてクライアントに対しても透過的です。

@Stateful

@Name("paymentAction")
public class CreatePaymentAction
{
    @In(create=true) PaymentHandler paymentHandler;
    @In Bill bill;
    
    public String pay()
    {
        paymentHandler.processPayment( new Payment(bill) );
        return "success";
    }
}

非同期メソッドは完全に新規のイベントコンテキストで処理され、 呼び出し側のセッションまたは対話コンテキストの状態にはアクセスできません。 しかしビジネスプロセスコンテキストは伝播されます

非同期メソッド呼び出しは@Duration@Expiration@IntervalDurationアノテーションを使って、 後の実行のためにスケジューリングできます。

@Local

public interface PaymentHandler
{
    @Asynchronous
    public void processScheduledPayment(Payment payment, @Expiration Date date);
    @Asynchronous
    public void processRecurringPayment(Payment payment, 
                                        @Expiration Date date, 
                                        @IntervalDuration Long interval)'
}
@Stateful

@Name("paymentAction")
public class CreatePaymentAction
{
    @In(create=true) PaymentHandler paymentHandler;
    @In Bill bill;
    
    public String schedulePayment()
    {
        paymentHandler.processScheduledPayment( new Payment(bill), bill.getDueDate() );
        return "success";
    }
    public String scheduleRecurringPayment()
    {
        paymentHandler.processRecurringPayment( new Payment(bill), bill.getDueDate(), 
                                                ONE_MONTH );
        return "success";
    }
}

クライアント、 サーバーいずれも呼び出しに関連する Timer オブジェクトにアクセスすることができます。 以下に示す Timer オブジェクトは EJB3 ディスパッチャを使用する場合は EJB 3 のタイマーになります。 デフォルトの ScheduledThreadPoolExecutor の場合、 返されるオブジェクトは JDK からの Future になります。 Quartz ディスパッチャの場合は QuartzTriggerHandle を返します。 これについては次のセクションで説明していきます。

@Local

public interface PaymentHandler
{
    @Asynchronous
    public Timer processScheduledPayment(Payment payment, @Expiration Date date);
}
@Stateless

@Name("paymentHandler")
public class PaymentHandlerBean implements PaymentHandler
{
    @In Timer timer;
    
    public Timer processScheduledPayment(Payment payment, @Expiration Date date)
    {
        //do some work!
        
        return timer; //note that return value is completely ignored
    }
}
@Stateful

@Name("paymentAction")
public class CreatePaymentAction
{
    @In(create=true) PaymentHandler paymentHandler;
    @In Bill bill;
    
    public String schedulePayment()
    {
        Timer timer = paymentHandler.processScheduledPayment( new Payment(bill), 
                                                              bill.getDueDate() );
        return "success";
    }
}

非同期メソッドは呼び出し側に他のどんな値も返すことができません。

Quartz ディスパッチャ (インストール方法については前述の説明を参照) では上記のような @Asynchronous@Duration@Expiration@IntervalDuration のアノテーションが使用できます。 ただし、 強力な追加機能があります。 Quartz ディスパッチャは 3 種類の新しいアノテーションに対応します。

@FinalExpiration アノテーションは反復タスクの終了日を指定します。 QuartzTriggerHandle のインジェクトが可能なので注意してください。



        @In QuartzTriggerHandle timer;
        
    // Defines the method in the "processor" component
    @Asynchronous
    public QuartzTriggerHandle schedulePayment(@Expiration Date when, 
                                 @IntervalDuration Long interval,
                                 @FinalExpiration Date endDate, 
                                 Payment payment) 
    { 
        // do the repeating or long running task until endDate
    }
    
    ... ...
    
    // Schedule the task in the business logic processing code
    // Starts now, repeats every hour, and ends on May 10th, 2010
    Calendar cal = Calendar.getInstance ();
    cal.set (2010, Calendar.MAY, 10);
    processor.schedulePayment(new Date(), 60*60*1000, cal.getTime(), payment);

このメソッドは QuartzTriggerHandle を返すので注意してください。 後でこれを使用してスケジューラの停止、 一時停止、 再開などを行うことができます。 QuartzTriggerHandle オブジェクトはシリアライズ可能であるため、 長期間に渡りこれを維持する必要がある場合はデータベースに保存することができます。

QuartzTriggerHandle handle =

         processor.schedulePayment(payment.getPaymentDate(), 
                                   payment.getPaymentCron(), 
                                   payment);
        payment.setQuartzTriggerHandle( handle );
        // Save payment to DB
        
        // later ...
        
        // Retrieve payment from DB
        // Cancel the remaining scheduled tasks
        payment.getQuartzTriggerHandle().cancel();

@IntervalCron アノテーションはタスクのスケジュールを行うのに Unix cron ジョブ構文をサポートします。 たとえば、 次の非同期メソッドは 3 月の毎水曜日の 2:10pm と 2:44pm に実行されます。



    // Define the method
    @Asynchronous
    public QuartzTriggerHandle schedulePayment(@Expiration Date when, 
                                 @IntervalCron String cron, 
                                 Payment payment) 
    { 
        // do the repeating or long running task
    }
    
    ... ...
    
    // Schedule the task in the business logic processing code
    QuartzTriggerHandle handle = 
      processor.schedulePayment(new Date(), "0 10,44 14 ? 3 WED", payment);

@IntervalBusinessDay アノテーションは「第X日営業日」という状況の呼び出しに対応します。 たとえば、 次の非同期メソッドは毎月第2営業日の 14:00 に実行されます。 デフォルトでは 2010 年まですべての週末とアメリカ合衆国の祝日を営業日から除外します。



    // Define the method
    @Asynchronous
    public QuartzTriggerHandle schedulePayment(@Expiration Date when, 
                                 @IntervalBusinessDay NthBusinessDay nth, 
                                 Payment payment) 
    { 
        // do the repeating or long running task
    }
    
    ... ...
    
    // Schedule the task in the business logic processing code
    QuartzTriggerHandle handle = 
      processor.schedulePayment(new Date(), 
          new NthBusinessDay(2, "14:00", WEEKLY), payment);

NthBusinessDay オブジェクトには呼び出しトリガーの設定が含まれます。 additionalHolidays プロパティを使って祝日を追加指定することができます (例、 会社固有の休み、 アメリカ合衆国以外の祝日など)。



public class NthBusinessDay implements Serializable
{
      int n;
      String fireAtTime;
      List <Date
> additionalHolidays;
      BusinessDayIntervalType interval;
      boolean excludeWeekends;
      boolean excludeUsFederalHolidays;
      public enum BusinessDayIntervalType { WEEKLY, MONTHLY, YEARLY }
      public NthBusinessDay ()
      {
        n = 1;
        fireAtTime = "12:00";
        additionalHolidays = new ArrayList <Date
> ();
        interval = BusinessDayIntervalType.WEEKLY;
        excludeWeekends = true;
        excludeUsFederalHolidays = true;
      }     
      ... ...
}

@IntervalDuration@IntervalCron@IntervalNthBusinessDay のアノテーションは互いに矛盾します。 同じメソッド内で使用されると RuntimeException がスローされます。

各非同期ディスパッチャは例外がそれを通じて伝播される場合はそれぞれ異なった動作をします。 たとえば、 java.util.concurrent ディスパッチャは繰り返すコールの実行がこれ以上起きないよう一時停止し、 EJB3 タイマーサービスがその例外を吸収します。 したがって伝播する例外はディスパッチャに到達する前にすべて Seam によって非同期のコールからキャッチされます。

デフォルトでは、 非同期実行から伝播する例外はすべてエラーレベルでキャッチされログ記録されます。 org.jboss.seam.async.asynchronousExceptionHandler コンポーネントを上書きするとこの動作をグローバルにカスタマイズすることができます。

@Scope(ScopeType.STATELESS)

@Name("org.jboss.seam.async.asynchronousExceptionHandler")
public class MyAsynchronousExceptionHandler extends AsynchronousExceptionHandler { 
   @Logger Log log;
   
   @In Future timer;
   
   @Override
   public void handleException(Exception exception) {
      log.debug(exception);
      timer.cancel(false);
   }
   
}

たとえば この java.util.concurrent ディスパッチャを使用している例では、 その制御オブジェクトをインジェクトして、例外が発生すると今後の呼び出しすべてを取り消しています。

また、 コンポーネント上にメソッド public void handleAsynchronousException(Exception exception); を実装することで個別にコンポーネントのこの動作を変更することができます。 たとえば、

   public void handleAsynchronousException(Exception exception) {

      log.fatal(exception);
   }