ナビゲーションスキップ

5.xユーザーガイド

このページはGithub Wikiページから自動生成されていることをご存知ですか? こちらから自分で改善できます。

サードパーティ翻訳

はじめに

問題点

今日では、相互に通信するために汎用アプリケーションやライブラリを使用しています。たとえば、Webサーバーから情報を取得し、Webサービスを介してリモートプロシージャコールを呼び出すために、HTTPクライアントライブラリを頻繁に使用します。

しかし、汎用プロトコルまたはその実装は、場合によってはスケール性が非常に悪くなることがあります。巨大なファイル、電子メールメッセージ、金融情報やマルチプレイヤーゲームデータなどのほぼリアルタイムのメッセージを交換するために、汎用HTTPサーバーを使用しないのと同じです。必要なのは、特定の目的に特化した、高度に最適化されたプロトコル実装です。たとえば、AJAXベースのチャットアプリケーション、メディアストリーミング、または大規模ファイル転送に最適化されたHTTPサーバーを実装したい場合があります。必要に応じて、まったく新しいプロトコルを設計および実装することもできます。

もう1つの避けられないケースは、古いシステムとの相互運用性を確保するために、レガシーの独自プロトコルを処理しなければならない場合です。この場合重要なのは、結果として得られるアプリケーションの安定性とパフォーマンスを犠牲にすることなく、そのプロトコルをどれだけ迅速に実装できるかです。

解決策

Nettyプロジェクトは、保守可能な高性能・高スケーラビリティのプロトコルサーバーとクライアントを迅速に開発するための、非同期イベント駆動型ネットワークアプリケーションフレームワークとツールを提供する取り組みです。

言い換えれば、Nettyは、プロトコルサーバーやクライアントなどのネットワークアプリケーションの迅速かつ容易な開発を可能にするNIOクライアントサーバーフレームワークです。TCPやUDPソケットサーバーの開発などのネットワークプログラミングを大幅に簡素化し、効率化します。

「迅速かつ容易」とは、結果として得られるアプリケーションが保守性またはパフォーマンスの問題に悩まされるという意味ではありません。Nettyは、FTP、SMTP、HTTP、およびさまざまなバイナリとテキストベースのレガシープロトコルなどの多くのプロトコルの実装から得られた経験を基に、慎重に設計されています。その結果、Nettyは、妥協することなく、開発の容易性、パフォーマンス、安定性、柔軟性を達成する方法を見つけることに成功しました。

一部のユーザーは、すでに同じ利点を主張する他のネットワークアプリケーションフレームワークを見つけている可能性があり、Nettyをそれらと何が異なるのか疑問に思うかもしれません。答えは、それが構築されている哲学です。Nettyは、APIと実装の両方において、最初から最高の快適なエクスペリエンスを提供するように設計されています。これは具体的なものではありませんが、このガイドを読み、Nettyを試す際に、この哲学があなたの生活をはるかに楽にすることに気づくでしょう。

はじめに

この章では、Nettyのコア構造を簡単な例で説明し、迅速に開始できるようにします。この章の最後には、Netty上でクライアントとサーバーをすぐに記述できるようになります。

何かを学習する際にトップダウンアプローチを好む場合は、第2章「アーキテクチャの概要」から始めて、ここに戻ってくることができます。

始める前に

この章で紹介されている例を実行するための最小要件は2つだけです。最新のNettyバージョンとJDK 1.6以上。最新のNettyバージョンはプロジェクトのダウンロードページで入手できます。JDKの適切なバージョンをダウンロードするには、お好みのJDKベンダーのWebサイトを参照してください。

読み進めていくうちに、この章で紹介されているクラスについてさらに質問があるかもしれません。それらについて詳しく知りたい場合は、いつでもAPIリファレンスを参照してください。このドキュメント内のすべてのクラス名は、便宜上オンラインAPIリファレンスにリンクされています。また、Nettyプロジェクトコミュニティに連絡して、不正確な情報、文法やタイプミス、ドキュメントを改善するための良いアイデアがあればお知らせください。

Discardサーバーの記述

世界で最も単純なプロトコルは「Hello、World!」ではなくDISCARDです。これは、応答なしに受信したデータをすべて破棄するプロトコルです。

DISCARDプロトコルを実装するには、受信したデータをすべて無視するだけで済みます。Nettyによって生成されたI/Oイベントを処理するハンドラーの実装から始めましょう。

package io.netty.example.discard;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerAdapter;

/**
 * Handles a server-side channel.
 */
public class DiscardServerHandler extends ChannelHandlerAdapter { // (1)

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        // Discard the received data silently.
        ((ByteBuf) msg).release(); // (3)
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}
  1. DiscardServerHandlerChannelHandlerAdapterを拡張します。これはChannelHandlerの実装です。ChannelHandlerは、オーバーライドできるさまざまなイベントハンドラーメソッドを提供します。現時点では、自分でハンドラーインターフェースを実装するよりも、ChannelHandlerAdapterを拡張するだけで十分です。
  2. ここでは、channelRead()イベントハンドラーメソッドをオーバーライドします。このメソッドは、クライアントから新しいデータが受信されるたびに、受信したメッセージで呼び出されます。この例では、受信したメッセージの型はByteBufです。
  3. DISCARDプロトコルを実装するために、ハンドラーは受信したメッセージを無視する必要があります。ByteBufは、release()メソッドを介して明示的に解放する必要がある参照カウントオブジェクトです。ハンドラーに渡された参照カウントオブジェクトを解放することは、ハンドラーの責任であることに注意してください。通常、channelRead()ハンドラーメソッドは次のように実装されます。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    try {
        // Do something with msg
    } finally {
        ReferenceCountUtil.release(msg);
    }
}
  1. exceptionCaught()イベントハンドラーメソッドは、I/OエラーによってNettyによって、またはイベントの処理中にスローされた例外によってハンドラーの実装によって例外が発生した場合に、Throwableで呼び出されます。ほとんどの場合、キャッチされた例外はログに記録され、関連付けられたチャネルはここで閉じられますが、例外的な状況に対処するために何を行うかによって、このメソッドの実装は異なる場合があります。たとえば、接続を閉じる前に、エラーコードを含む応答メッセージを送信する場合があります。

今のところ順調です。DISCARDサーバーの前半を実装しました。残りは、DiscardServerHandlerでサーバーを開始するmain()メソッドを記述することです。

package io.netty.example.discard;
    
import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
    
/**
 * Discards any incoming data.
 */
public class DiscardServer {
    
    private int port;
    
    public DiscardServer(int port) {
        this.port = port;
    }
    
    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new DiscardServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
    
            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)
    
            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    
    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new DiscardServer(port).run();
    }
}
  1. NioEventLoopGroupは、I/O操作を処理するマルチスレッドイベントループです。Nettyは、さまざまな種類のトランスポートに対して、さまざまなEventLoopGroup実装を提供します。この例ではサーバー側のアプリケーションを実装しているので、2つのNioEventLoopGroupが使用されます。最初のものは、多くの場合「ボス」と呼ばれ、着信接続を受け入れます。2番目のものは、多くの場合「ワーカー」と呼ばれ、ボスが接続を受け入れて、受け入れられた接続をワーカーに登録すると、受け入れられた接続のトラフィックを処理します。使用されるスレッド数と、作成されたChannelへのマッピング方法は、EventLoopGroupの実装に依存し、コンストラクターを介して構成できる場合もあります。
  2. ServerBootstrapは、サーバーを設定するヘルパークラスです。Channelを使用して直接サーバーを設定できます。ただし、これは面倒なプロセスであり、ほとんどの場合、実行する必要はないことに注意してください。
  3. ここでは、着信接続を受け入れる新しいChannelをインスタンス化するのに使用されるNioServerSocketChannelクラスを使用するように指定します。
  4. ここで指定されたハンドラーは、新しく受け入れられたChannelによって常に評価されます。ChannelInitializerは、新しいChannelの構成をユーザーが支援することを目的とした特別なハンドラーです。ネットワークアプリケーションを実装するために、DiscardServerHandlerなどのハンドラーをいくつか追加して、新しいChannelChannelPipelineを構成したい可能性が最も高いです。アプリケーションが複雑になるにつれて、パイプラインにさらに多くのハンドラーを追加し、この無名クラスを最終的にトップレベルクラスに抽出する可能性が高くなります。
  5. Channel実装に固有のパラメーターを設定することもできます。TCP/IPサーバーを作成しているので、tcpNoDelaykeepAliveなどのソケットオプションを設定できます。ChannelOptionと特定のChannelConfig実装のapidocsを参照して、サポートされているChannelOptionの概要を確認してください。
  6. option()childOption()に気づきましたか?option()は、着信接続を受け入れるNioServerSocketChannel用です。childOption()は、親ServerChannel(この場合はNioServerSocketChannel)によって受け入れられたChannel用です。
  7. 準備ができました。残りは、ポートにバインドしてサーバーを起動することです。ここでは、マシンのすべてのNIC(ネットワークインターフェースカード)のポート8080にバインドします。必要に応じて、bind()メソッドを何度でも呼び出すことができます(異なるバインドアドレスを使用)。

おめでとうございます!Netty上で最初のサーバーを完成させました。

受信データの確認

最初のサーバーを作成したので、それが実際に機能するかどうかをテストする必要があります。それをテストする最も簡単な方法は、telnetコマンドを使用することです。たとえば、コマンドラインにtelnet localhost 8080と入力し、何かを入力できます。

しかし、サーバーが正常に動作していると断言できますか?これは破棄サーバーなので、実際にはそうではありません。まったく応答がありません。それが本当に機能することを証明するために、受信したものを印刷するようにサーバーを変更しましょう。

データ受信時にchannelRead()メソッドが呼び出されることは既に知っています。DiscardServerHandlerchannelRead()メソッドにいくつかのコードを追加してみましょう。

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ByteBuf in = (ByteBuf) msg;
    try {
        while (in.isReadable()) { // (1)
            System.out.print((char) in.readByte());
            System.out.flush();
        }
    } finally {
        ReferenceCountUtil.release(msg); // (2)
    }
}
  1. この非効率的なループは、実際にはSystem.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII))に簡略化できます。
  2. あるいは、ここでin.release()を実行することもできます。

もう一度telnetコマンドを実行すると、サーバーが受信した内容を出力するのを確認できます。

ディスカードサーバーの完全なソースコードは、配布パッケージのio.netty.example.discardにあります。

エコーサーバーの作成

これまでのところ、一切応答せずにデータを使用してきました。しかし、サーバーは通常、要求に応答する必要があります。受信したデータをすべて送り返すECHOプロトコルを実装することで、クライアントにレスポンスメッセージを書き込む方法を学びましょう。

前のセクションで実装したディスカードサーバーとの違いは、受信したデータをコンソールに出力する代わりに、受信したデータを戻すことです。そのため、channelRead()メソッドを変更するだけで十分です。

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.write(msg); // (1)
        ctx.flush(); // (2)
    }
  1. ChannelHandlerContextオブジェクトは、さまざまなI/Oイベントや操作をトリガーできる様々な操作を提供します。ここでは、write(Object)を呼び出して、受信したメッセージをそのまま書き込みます。DISCARDの例とは異なり、受信したメッセージを解放しなかったことに注意してください。これは、Nettyがワイヤに書き出されたときに自動的に解放するためです。
  2. ctx.write(Object)は、メッセージをワイヤに書き込みません。内部的にバッファリングされ、ctx.flush()によってワイヤにフラッシュされます。あるいは、簡潔にするためにctx.writeAndFlush(msg)を呼び出すこともできます。

もう一度telnetコマンドを実行すると、サーバーが送信したものを送り返すのが確認できます。

エコーサーバーの完全なソースコードは、配布パッケージのio.netty.example.echoにあります。

タイムサーバーの作成

このセクションで実装するプロトコルはTIMEプロトコルです。これは、32ビット整数を格納したメッセージをリクエストを受信せずに送信し、メッセージの送信後に接続を閉じるところが、これまでの例とは異なります。この例では、メッセージの構築と送信、および完了時の接続の閉じ方を学習します。

接続が確立されるとすぐにメッセージを送信しますが、受信したデータは無視するため、今回はchannelRead()メソッドを使用できません。代わりに、channelActive()メソッドをオーバーライドする必要があります。実装は以下のとおりです。

package io.netty.example.time;

public class TimeServerHandler extends ChannelHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
        
        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {

            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 説明したように、channelActive()メソッドは、接続が確立され、トラフィックの生成準備ができたときに呼び出されます。このメソッドで、現在時刻を表す32ビット整数を書き込みましょう。

  2. 新しいメッセージを送信するには、メッセージを含む新しいバッファを割り当てる必要があります。32ビット整数を書き込むため、容量が少なくとも4バイトのByteBufが必要です。ChannelHandlerContext.alloc()を介して現在のByteBufAllocatorを取得し、新しいバッファを割り当てます。

  3. いつものように、構築されたメッセージを書き込みます。

    しかし、待ってください。「flip」はどこにありますか?NIOでメッセージを送信する前にjava.nio.ByteBuffer.flip()を呼び出す必要はありませんでしたか?ByteBufには、読み取り操作用と書き込み操作用の2つのポインタがあるため、そのようなメソッドはありません。ByteBufに何かを書き込むと、ライターインデックスが増加しますが、リーダーインデックスは変わりません。リーダーインデックスとライターインデックスは、それぞれメッセージの開始位置と終了位置を表します。

    これに対し、NIOバッファは、flipメソッドを呼び出さずにメッセージの内容の開始位置と終了位置を明確に把握するクリーンな方法を提供しません。バッファのflipを忘れると、何も送信されないか、不正確なデータが送信されるため、問題が発生します。Nettyでは、異なる操作の種類に対して異なるポインタがあるため、このようなエラーは発生しません。慣れてくると、非常に簡単になります—翻弄されることのない生活です!

    もう1つのポイントは、ChannelHandlerContext.write()(およびwriteAndFlush())メソッドがChannelFutureを返すことです。ChannelFutureは、まだ発生していないI/O操作を表します。つまり、Nettyではすべての操作が非同期であるため、要求された操作がまだ実行されていない可能性があります。たとえば、次のコードは、メッセージが送信される前に接続を閉じる可能性があります。

    Channel ch = ...;
    ch.writeAndFlush(message);
    ch.close();

    したがって、write()メソッドによって返されたChannelFutureが完了した後にclose()メソッドを呼び出す必要があります。これは、書き込み操作が完了したときにリスナーに通知します。close()も接続をすぐに閉じるとは限らず、ChannelFutureを返すことに注意してください。

  4. では、書き込み要求が完了したときにどのように通知を受け取るかというと、返されたChannelFutureChannelFutureListenerを追加するだけです。ここでは、操作が完了したときにChannelを閉じる新しい匿名のChannelFutureListenerを作成しました。

    あるいは、事前に定義されたリスナーを使用してコードを簡素化することもできます。

    f.addListener(ChannelFutureListener.CLOSE);

タイムサーバーが期待通りに動作するかどうかをテストするには、UNIXのrdateコマンドを使用できます。

$ rdate -o <port> -p <host>

ここで、<port>main()メソッドで指定したポート番号、<host>は通常localhostです。

タイムクライアントの作成

DISCARDおよびECHOサーバーとは異なり、TIMEプロトコルにはクライアントが必要です。これは、人間が32ビットのバイナリデータをカレンダーの日付に変換できないためです。このセクションでは、サーバーが正しく動作することを確認する方法と、Nettyを使用してクライアントを作成する方法について説明します。

Nettyにおけるサーバーとクライアントの最大かつ唯一の違いは、異なるBootstrapChannelの実装が使用されることです。以下のコードを見てください。

package io.netty.example.time;

public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            
            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
  1. BootstrapServerBootstrapに似ていますが、クライアント側またはコネクションレスチャネルなど、非サーバーチャネル用です。
  2. EventLoopGroupを1つだけ指定すると、ボスグループとワーカーグループの両方として使用されます。ただし、クライアント側ではボスワーカーは使用されません。
  3. NioServerSocketChannelの代わりに、NioSocketChannelを使用して、クライアント側のChannelを作成しています。
  4. クライアント側のSocketChannelには親がないため、ServerBootstrapで行ったように、ここではchildOption()を使用していません。
  5. bind()メソッドの代わりにconnect()メソッドを呼び出す必要があります。

ご覧のとおり、サーバー側のコードとそれほど違いはありません。ChannelHandlerの実装はどうでしょうか?サーバーから32ビット整数を受信し、人間が読み取れる形式に変換して、変換された時刻を出力し、接続を閉じる必要があります。

package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. TCP/IPでは、Nettyはピアから送信されたデータを[ByteBuf]に読み取ります。

非常にシンプルに見え、サーバー側の例と何ら変わりません。ただし、このハンドラーは場合によってはIndexOutOfBoundsExceptionを発生させて動作を拒否します。その理由については、次のセクションで説明します。

ストリームベースのトランスポートの処理

ソケットバッファの小さな注意点

TCP/IPなどのストリームベースのトランスポートでは、受信したデータはソケット受信バッファに格納されます。残念ながら、ストリームベースのトランスポートのバッファはパケットのキューではなくバイトのキューです。つまり、2つの独立したパケットとして2つのメッセージを送信した場合でも、オペレーティングシステムはそれらを2つのメッセージとしてではなく、単なるバイトの塊として扱います。したがって、読み取られたものがリモートピアが書いたものと正確に一致するという保証はありません。たとえば、オペレーティングシステムのTCP/IPスタックが3つのパケットを受信したと仮定しましょう。

Three packets received as they were sent

ストリームベースのプロトコルのこの一般的な特性により、アプリケーションで次の断片化された形式でそれらを読み取る可能性が高くなります。

Three packets split and merged into four buffers

したがって、サーバー側でもクライアント側でも、受信側は受信データを、アプリケーションロジックで簡単に理解できる1つ以上の意味のあるフレームにデフラグメントする必要があります。上記の例では、受信データは次のようにフレーム化される必要があります。

Four buffers defragged into three

最初の解決策

それでは、TIMEクライアントの例に戻りましょう。ここでは同じ問題があります。32ビット整数は非常に少量のデータであり、頻繁に断片化される可能性は低いです。しかし、問題は、断片化される可能性があり、トラフィックが増加するにつれて断片化の可能性が高まることです。

単純な解決策は、内部累積バッファを作成し、すべての4バイトが内部バッファに受信されるまで待機することです。問題は、内部累積バッファを作成し、すべての4バイトが内部バッファに受信されるまで待機することです。以下は、問題を修正した修正済みのTimeClientHandler実装です。

package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelHandlerAdapter {
    private ByteBuf buf;
    
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        buf = ctx.alloc().buffer(4); // (1)
    }
    
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release(); // (1)
        buf = null;
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // (2)
        m.release();
        
        if (buf.readableBytes() >= 4) { // (3)
            long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. ChannelHandlerには、handlerAdded()handlerRemoved()という2つのライフサイクルリスナーメソッドがあります。長時間ブロックしない限り、任意の(非)初期化タスクを実行できます。
  2. まず、受信したすべてのデータをbufに累積する必要があります。
  3. 次に、ハンドラーはbufに十分なデータ(この例では4バイト)があるかどうかを確認し、実際のビジネスロジックに進みます。そうでない場合、Nettyはさらにデータが到着するとchannelRead()メソッドを再度呼び出し、最終的にすべての4バイトが累積されます。

2番目の解決策

最初の解決策でTIMEクライアントの問題は解決されましたが、修正されたハンドラーはそれほどクリーンではありません。可変長のフィールドなど、複数のフィールドで構成されるより複雑なプロトコルを想像してみてください。ChannelHandlerの実装は、すぐに保守できなくなります。

気づいたかもしれませんが、ChannelPipelineに複数のChannelHandlerを追加できます。そのため、1つの巨大なChannelHandlerを複数のモジュール式のものに分割して、アプリケーションの複雑さを軽減できます。たとえば、TimeClientHandlerを2つのハンドラーに分割できます。

  • 断片化の問題を処理するTimeDecoderと、
  • TimeClientHandlerの最初の単純なバージョン。

幸いにも、Nettyは最初のものをすぐに作成するのに役立つ拡張可能なクラスを提供しています。

package io.netty.example.time;

public class TimeDecoder extends ByteToMessageDecoder { // (1)
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }
        
        out.add(in.readBytes(4)); // (4)
    }
}
  1. ByteToMessageDecoder は、ChannelHandler の実装であり、フラグメンテーションの問題を容易に処理できるようにします。
  2. ByteToMessageDecoder は、新しいデータを受信するたびに、内部的に維持されている累積バッファを使用して decode() メソッドを呼び出します。
  3. decode() は、累積バッファに十分なデータがない場合、out に何も追加しないことを決定できます。ByteToMessageDecoder は、さらにデータが受信されると、再び decode() を呼び出します。
  4. decode()out にオブジェクトを追加した場合、デコーダがメッセージを正常にデコードしたことを意味します。ByteToMessageDecoder は、累積バッファの読み取られた部分を破棄します。複数のメッセージをデコードする必要がないことに注意してください。ByteToMessageDecoder は、out に何も追加しなくなるまで、decode() メソッドを呼び出し続けます。

これで、ChannelPipeline に挿入する別のハンドラができたので、TimeClientChannelInitializer 実装を変更する必要があります。

b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
    }
});

冒険好きな方は、デコーダをさらに簡素化する ReplayingDecoder を試してみるのも良いでしょう。ただし、詳細についてはAPIリファレンスを参照する必要があります。

public class TimeDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(
            ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        out.add(in.readBytes(4));
    }
}

さらに、Nettyはすぐに使えるデコーダを提供しており、ほとんどのプロトコルを非常に簡単に実装でき、巨大で保守が難しいハンドラ実装になることを防ぐことができます。より詳細な例については、以下のパッケージを参照してください。

ByteBuf ではなくPOJOを使用する

これまで見てきたすべての例では、ByteBuf をプロトコルメッセージの主要なデータ構造として使用していました。このセクションでは、TIME プロトコルのクライアントとサーバーの例を改善し、ByteBuf の代わりにPOJOを使用します。

ChannelHandler でPOJOを使用する利点は明らかです。ByteBuf から情報を抽出するコードをハンドラから分離することで、ハンドラはより保守可能で再利用可能になります。TIME クライアントとサーバーの例では、32ビット整数を1つだけ読み取るので、ByteBuf を直接使用しても大きな問題はありません。しかし、現実世界のプロトコルを実装する際には、この分離が必要になることがわかります。

まず、UnixTime という新しい型を定義しましょう。

package io.netty.example.time;

import java.util.Date;

public class UnixTime {

    private final long value;
    
    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }
    
    public UnixTime(long value) {
        this.value = value;
    }
        
    public long value() {
        return value;
    }
        
    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}

これで、ByteBuf の代わりに UnixTime を生成するように TimeDecoder を修正できます。

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    if (in.readableBytes() < 4) {
        return;
    }

    out.add(new UnixTime(in.readUnsignedInt()));
}

デコーダを更新したので、TimeClientHandler はもう ByteBuf を使用しません。

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    UnixTime m = (UnixTime) msg;
    System.out.println(m);
    ctx.close();
}

はるかにシンプルでエレガントですね?同じテクニックをサーバー側にも適用できます。今回はまず TimeServerHandler を更新しましょう。

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ChannelFuture f = ctx.writeAndFlush(new UnixTime());
    f.addListener(ChannelFutureListener.CLOSE);
}

これで、残っているのはエンコーダだけです。これは、UnixTimeByteBuf に変換する ChannelHandler の実装です。メッセージをエンコードする際にはパケットのフラグメンテーションとアセンブリを処理する必要がないため、デコーダを書くよりもはるかに簡単です。

package io.netty.example.time;

public class TimeEncoder extends ChannelHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        UnixTime m = (UnixTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt((int) m.value());
        ctx.write(encoded, promise); // (1)
    }
}
  1. このハンドラメソッドには、いくつかの重要な点があります。

    まず、元の ChannelPromise をそのまま渡すことで、エンコードされたデータが実際にワイヤーに書き出されたときに、Netty が成功または失敗としてマークします。

    次に、ctx.flush() を呼び出しませんでした。flush() 操作をオーバーライドすることを目的とした、別のハンドラメソッド void flush(ChannelHandlerContext ctx) があります。

さらに簡素化するために、MessageToByteEncoder を使用できます。

public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
        out.writeInt((int) msg.value());
    }
}

残りのタスクは、サーバー側で TimeServerHandler の前に ChannelPipelineTimeEncoder を挿入することです。これは簡単な練習問題として残しておきます。

アプリケーションのシャットダウン

Nettyアプリケーションのシャットダウンは、通常、shutdownGracefully() を介して作成したすべての EventLoopGroup をシャットダウンするだけという簡単な操作です。EventLoopGroup が完全に終了し、グループに属するすべての Channel が閉じられたときに通知する Future を返します。

概要

この章では、Nettyの概要と、Netty上に完全に動作するネットワークアプリケーションを作成する方法のデモを行いました。

今後の章で、Nettyに関するより詳細な情報を取り上げます。io.netty.example パッケージにあるNettyの例を確認することをお勧めします。

また、コミュニティ は常に皆様からのご質問やアイデアをお待ちしており、皆様を支援し、皆様からのフィードバックに基づいてNettyとそのドキュメントを改善し続けています。

最終取得日: 2024年7月19日