ナビゲーションをスキップ

4.xユーザーガイド

このページはGitHub Wikiページから自動生成されていることをご存知ですか? ここでご自身で改善できます!

序文

問題

今日では、汎用アプリケーションやライブラリを使用して相互に通信しています。たとえば、Webサーバーから情報を取得したり、Webサービスを介してリモートプロシージャコールを呼び出したりするために、HTTPクライアントライブラリをよく使用します。しかし、汎用プロトコルやその実装は、スケーラビリティが十分でない場合があります。これは、巨大なファイル、電子メールメッセージ、金融情報やマルチプレイヤーゲームデータなどのほぼリアルタイムのメッセージを交換するために、汎用HTTPサーバーを使用しないのと同じです。必要なのは、特定の目的に特化した高度に最適化されたプロトコル実装です。たとえば、AJAXベースのチャットアプリケーション、メディアストリーミング、または大規模ファイル転送に最適化されたHTTPサーバーを実装したい場合があります。ニーズに合わせて正確に調整されたまったく新しいプロトコルを設計および実装することさえできます。避けられないもう1つのケースは、古いシステムとの相互運用性を確保するために、レガシーの独自プロトコルを扱わなければならない場合です。この場合、重要なのは、結果として得られるアプリケーションの安定性とパフォーマンスを犠牲にすることなく、そのプロトコルをどれだけ迅速に実装できるかです。

解決策

Nettyプロジェクトは、保守性の高い高性能でスケーラブルなプロトコルサーバーとクライアントを迅速に開発するための、非同期イベント駆動型ネットワークアプリケーションフレームワークとツールを提供するための取り組みです。

言い換えれば、Nettyは、プロトコルサーバーやクライアントなどのネットワークアプリケーションの迅速かつ簡単な開発を可能にするNIOクライアントサーバーフレームワークです。TCPやUDPソケットサーバーの開発など、ネットワークプログラミングを大幅に簡素化および効率化します。

「迅速かつ簡単」とは、結果として得られるアプリケーションが保守性やパフォーマンスの問題に悩まされるという意味ではありません。Nettyは、FTP、SMTP、HTTP、およびさまざまなバイナリおよびテキストベースのレガシープロトコルなど、多くのプロトコルの実装から得られた経験に基づいて慎重に設計されています。その結果、Nettyは、妥協することなく、開発の容易さ、パフォーマンス、安定性、柔軟性を実現する方法を見つけることに成功しました。

一部のユーザーは、同じ利点があると主張する他のネットワークアプリケーションフレームワークをすでに知っているかもしれません。Nettyがそれらとどう違うのか疑問に思うかもしれません。答えは、それが基づいている哲学です。Nettyは、APIと実装の両方の面で、初日から最も快適なエクスペリエンスを提供するように設計されています。具体的なものではありませんが、このガイドを読んでNettyを試してみると、この哲学があなたの人生をはるかに楽にすることに気付くでしょう。

はじめに

この章では、簡単な例を使用してNettyのコアとなる構成要素を概観し、すぐに使い始めることができます。この章の終わりには、Netty上にクライアントとサーバーをすぐに記述できるようになります。

何かを学ぶ際にトップダウン方式を好む場合は、第2章アーキテクチャの概要から始めて、ここに戻ってくることをお勧めします。

始める前に

この章の例を実行するための最小要件は、Nettyの最新バージョンとJDK 1.6以上です。Nettyの最新バージョンは、プロジェクトのダウンロードページから入手できます。適切なバージョンのJDKをダウンロードするには、ご希望のJDKベンダーのWebサイトを参照してください。

読むにつれて、この章で紹介されているクラスについてさらに質問があるかもしれません。詳細を知りたい場合は、いつでもAPIリファレンスを参照してください。このドキュメントのすべてのクラス名は、便宜上、オンラインAPIリファレンスにリンクされています。また、Nettyプロジェクトコミュニティに連絡して、誤った情報、文法エラーやタイプミスがある場合、ドキュメントの改善に役立つ良いアイデアがある場合はお知らせください。

破棄サーバーの作成

世界で最も単純なプロトコルは「Hello、World!」ではなく、DISCARDです。受信したデータを応答なしに破棄するプロトコルです。

DISCARDプロトコルを実装するには、受信したすべてのデータを無視するだけです。Nettyによって生成されたI/Oイベントを処理するハンドラの実装から langsung始めましょう。

package io.netty.example.discard;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Handles a server-side channel.
 */
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        // Discard the received data silently.
        ((ByteBuf) msg).release(); // (3)
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}
  1. DiscardServerHandlerは、ChannelInboundHandlerの実装であるChannelInboundHandlerAdapterを拡張します。ChannelInboundHandlerは、オーバーライドできるさまざまなイベントハンドラーメソッドを提供します。今のところ、ハンドラーインターフェースを自分で実装するのではなく、ChannelInboundHandlerAdapterを拡張するだけで十分です。
  2. ここでは、channelRead()イベントハンドラーメソッドをオーバーライドします。このメソッドは、クライアントから新しいデータが受信されるたびに、受信したメッセージと共に呼び出されます。この例では、受信したメッセージのタイプはByteBufです。
  3. DISCARDプロトコルを実装するには、ハンドラーは受信したメッセージを無視する必要があります。ByteBufは、release()メソッドを介して明示的に解放する必要がある参照カウントオブジェクトです。ハンドラーに渡された参照カウントオブジェクトを解放するのはハンドラーの責任であることに注意してください。通常、channelRead()ハンドラーメソッドは次のように実装されます
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    try {
        // Do something with msg
    } finally {
        ReferenceCountUtil.release(msg);
    }
}
  1. exceptionCaught()イベントハンドラーメソッドは、I/Oエラーが原因でNettyによって、またはイベントの処理中にスローされた例外が原因でハンドラー実装によって例外が発生した場合に、Throwableと共に呼び出されます。ほとんどの場合、キャッチされた例外はログに記録され、関連付けられたチャネルはここで閉じられる必要がありますが、このメソッドの実装は、例外的な状況に対処する方法によって異なる場合があります。たとえば、接続を閉じる前に、エラーコードを含む応答メッセージを送信する場合があります。

ここまでは順調です。DISCARDサーバーの半分を実装しました。残っているのは、DiscardServerHandlerでサーバーを起動するmain()メソッドを作成することです。

package io.netty.example.discard;
    
import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
    
/**
 * Discards any incoming data.
 */
public class DiscardServer {
    
    private int port;
    
    public DiscardServer(int port) {
        this.port = port;
    }
    
    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new DiscardServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
    
            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)
    
            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    
    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }

        new DiscardServer(port).run();
    }
}
  1. NioEventLoopGroupは、I/O操作を処理するマルチスレッドイベントループです。Nettyは、さまざまな種類のトランスポート用にさまざまなEventLoopGroup実装を提供しています。この例ではサーバーサイドアプリケーションを実装しているため、2つのNioEventLoopGroupが使用されます。最初のものは、多くの場合「ボス」と呼ばれ、着信接続を受け入れます。2つ目は、多くの場合「ワーカー」と呼ばれ、ボスが接続を受け入れて、受け入れられた接続をワーカーに登録すると、受け入れられた接続のトラフィックを処理します。いくつのスレッドが使用され、それらがどのように作成されたChannelにマップされるかは、EventLoopGroup実装によって異なり、コンストラクターを介して設定できる場合もあります。
  2. ServerBootstrapは、サーバーを設定するヘルパークラスです。Channelを使用してサーバーを langsung設定できます。ただし、これは面倒なプロセスであり、ほとんどの場合、これを行う必要はありません。
  3. ここでは、着信接続を受け入れるための新しいChannelをインスタンス化するために使用されるNioServerSocketChannelクラスを使用することを指定します。
  4. ここで指定されたハンドラーは、新しく受け入れられたChannelによって常に評価されます。ChannelInitializerは、ユーザーが新しいChannelを設定するのに役立つように設計された特別なハンドラーです。ネットワークアプリケーションを実装するために、DiscardServerHandlerなどのハンドラーを追加して、新しいChannelChannelPipelineを設定することが最も多いでしょう。アプリケーションが複雑になるにつれて、パイプラインにさらにハンドラーを追加し、最終的にこの匿名クラスをトップレベルクラスに抽出する可能性が高くなります。
  5. Channel実装に固有のパラメーターも設定できます。TCP/IPサーバーを作成しているため、tcpNoDelaykeepAliveなどのソケットオプションを設定できます。サポートされているChannelOptionの概要については、ChannelOptionと特定のChannelConfig実装のapidocsを参照してください。
  6. option()childOption()に気付きましたか? option()は、着信接続を受け入れるNioServerSocketChannel用です。 childOption()は、親ServerChannel(この場合はNioSocketChannel)によって受け入れられたChannel用です。
  7. これで準備が整いました。残っているのは、ポートにバインドしてサーバーを起動することです。ここでは、マシンのすべてのNIC(ネットワークインターフェースカード)のポート8080にバインドします。これで、bind()メソッドを必要な回数だけ(異なるバインドアドレスで)呼び出すことができます。

おめでとうございます! Netty上に最初のサーバーを構築しました。

受信データの確認

最初のサーバーを作成したので、実際に機能するかどうかをテストする必要があります。テストする最も簡単な方法は、_telnet_コマンドを使用することです。たとえば、コマンドラインにtelnet localhost 8080と入力して、何かを入力できます。

しかし、サーバーは正常に機能していると断言できますか?破棄サーバーであるため、実際にはわかりません。まったく応答がありません。実際に機能していることを証明するために、受信した内容を出力するようにサーバーを変更してみましょう。

データが受信されるたびにchannelRead()メソッドが呼び出されることは既に分かっています。DiscardServerHandlerchannelRead()メソッドにいくつかのコードを追加してみましょう。

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ByteBuf in = (ByteBuf) msg;
    try {
        while (in.isReadable()) { // (1)
            System.out.print((char) in.readByte());
            System.out.flush();
        }
    } finally {
        ReferenceCountUtil.release(msg); // (2)
    }
}
  1. この非効率的なループは、実際にはSystem.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII))のように簡略化できます。
  2. 代わりに、ここでin.release()を実行することもできます。

もう一度 *telnet* コマンドを実行すると、サーバーが受信した内容を出力するのがわかります。

破棄サーバーの完全なソースコードは、配布物のio.netty.example.discardパッケージにあります。

エコーサーバーの作成

これまで、私たちは全く応答せずにデータを使用してきました。しかし、サーバーは通常、リクエストに応答することになっています。ECHOプロトコルを実装することで、クライアントに応答メッセージを書き込む方法を学びましょう。このプロトコルでは、受信したデータがそのまま送り返されます。

前のセクションで実装した破棄サーバーとの唯一の違いは、受信したデータを出力する代わりに、受信したデータをそのまま送り返すことです。したがって、channelRead()メソッドを修正すれば十分です。

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.write(msg); // (1)
        ctx.flush(); // (2)
    }
  1. ChannelHandlerContextオブジェクトは、さまざまなI/Oイベントと操作をトリガーできるさまざまな操作を提供します。ここでは、受信したメッセージをそのまま書き込むためにwrite(Object)を呼び出します。DISCARDの例とは異なり、受信したメッセージを解放していないことに注意してください。これは、Nettyがネットワークに書き出されたときに自動的に解放するためです。
  2. ctx.write(Object)は、メッセージをネットワークに書き出しません。内部的にバッファリングされ、その後ctx.flush()によってネットワークにフラッシュされます。簡潔にするために、ctx.writeAndFlush(msg)を呼び出すこともできます。

もう一度 *telnet* コマンドを実行すると、サーバーが送信した内容をそのまま送り返すのがわかります。

エコーサーバーの完全なソースコードは、配布物のio.netty.example.echoパッケージにあります。

タイムサーバーの作成

このセクションで実装するプロトコルは、TIMEプロトコルです。これは、リクエストを受信せずに32ビット整数を格納したメッセージを送信し、メッセージの送信が完了すると接続を閉じるという点で、前の例とは異なります。この例では、メッセージを作成して送信する方法、および完了時に接続を閉じる方法を学びます。

受信したデータはすべて無視し、接続が確立されるとすぐにメッセージを送信するため、今回はchannelRead()メソッドを使用できません。代わりに、channelActive()メソッドをオーバーライドする必要があります。以下は実装です。

package io.netty.example.time;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
        
        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 説明したように、channelActive()メソッドは、接続が確立され、トラフィックを生成する準備ができたときに呼び出されます。このメソッドで現在の時刻を表す32ビット整数を書き込みましょう。

  2. 新しいメッセージを送信するには、メッセージを格納する新しいバッファを割り当てる必要があります。32ビット整数を書き込むため、容量が少なくとも4バイトのByteBufが必要です。ChannelHandlerContext.alloc()を介して現在のByteBufAllocatorを取得し、新しいバッファを割り当てます。

  3. 通常どおり、作成したメッセージを書き込みます。

    しかし、待ってください、フリップはどこにありますか?NIOでメッセージを送信する前に、java.nio.ByteBuffer.flip()を呼び出していませんでしたか?ByteBufには、読み取り操作用と書き込み操作用の2つのポインタがあるため、そのようなメソッドはありません。ByteBufに何かを書き込むとライターインデックスが増加しますが、リーダーインデックスは変更されません。リーダーインデックスとライターインデックスは、それぞれメッセージの開始位置と終了位置を表します。

    対照的に、NIOバッファは、フリップメソッドを呼び出さずにメッセージの内容の開始位置と終了位置を明確に把握する方法を提供していません。バッファをフリップするのを忘れると、何も送信されないか、間違ったデータが送信されるため、問題が発生します。Nettyでは、操作タイプごとに異なるポインタがあるため、このようなエラーは発生しません。慣れてくると、人生がはるかに楽になることがわかります。混乱のない人生です!

    もう1つ注意すべき点は、ChannelHandlerContext.write()(およびwriteAndFlush())メソッドがChannelFutureを返すことです。ChannelFutureは、まだ発生していないI/O操作を表します。つまり、Nettyではすべての操作が非同期であるため、要求された操作はまだ実行されていない可能性があります。たとえば、次のコードは、メッセージが送信される前に接続を閉じる可能性があります。

    Channel ch = ...;
    ch.writeAndFlush(message);
    ch.close();

    したがって、write()メソッドによって返されたChannelFutureが完了した後、書き込み操作が完了したときにリスナーに通知するclose()メソッドを呼び出す必要があります。close()もすぐに接続を閉じない可能性があり、ChannelFutureを返すことに注意してください。

  4. では、書き込みリクエストが完了したときにどのように通知を受け取りますか?これは、返されたChannelFutureChannelFutureListenerを追加するのと同じくらい簡単です。ここでは、操作が完了したときにChannelを閉じる新しい匿名のChannelFutureListenerを作成しました。

    あるいは、定義済みのリスナーを使用してコードを簡略化することもできます。

    f.addListener(ChannelFutureListener.CLOSE);

タイムサーバーが期待どおりに動作するかどうかをテストするには、UNIXのrdateコマンドを使用できます。

$ rdate -o <port> -p <host>

ここで、<port>main()メソッドで指定したポート番号、<host>は通常localhostです。

タイムクライアントの作成

DISCARDサーバーやECHOサーバーとは異なり、TIMEプロトコルにはクライアントが必要です。これは、人間が32ビットのバイナリデータをカレンダーの日付に変換できないためです。このセクションでは、サーバーが正しく動作することを確認する方法と、Nettyを使用してクライアントを作成する方法について説明します。

Nettyにおけるサーバーとクライアントの最大かつ唯一の違いは、異なるBootstrapChannelの実装が使用されることです。次のコードをご覧ください。

package io.netty.example.time;

public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            
            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
  1. Bootstrapは、クライアント側チャネルやコネクションレスチャネルなど、サーバー以外のチャネル用であることを除いて、ServerBootstrapに似ています。
  2. 1つのEventLoopGroupのみを指定すると、ボスグループとワーカーグループの両方として使用されます。ただし、ボスのワーカーはクライアント側では使用されません。
  3. NioServerSocketChannelの代わりに、NioSocketChannelがクライアント側のChannelを作成するために使用されています。
  4. クライアント側のSocketChannelには親がないため、ServerBootstrapで行ったように、ここではchildOption()を使用しないことに注意してください。
  5. bind()メソッドの代わりにconnect()メソッドを呼び出す必要があります。

ご覧のとおり、サーバー側のコードとそれほど変わりません。ChannelHandlerの実装はどうでしょうか?サーバーから32ビット整数を受信し、人間が読める形式に変換し、変換された時刻を出力し、接続を閉じる必要があります。

package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. TCP/IPでは、Nettyはピアから送信されたデータをByteBufに読み込みます。

非常にシンプルに見え、サーバー側の例と何ら変わりないように見えます。ただし、このハンドラーは、IndexOutOfBoundsExceptionを発生させて動作を拒否することがあります。次のセクションでは、これが発生する理由について説明します。

ストリームベースのトランスポートの処理

ソケットバッファの小さな注意点

TCP/IPなどのストリームベースのトランスポートでは、受信データはソケット受信バッファに格納されます。残念ながら、ストリームベースのトランスポートのバッファは、パケットのキューではなく、バイトのキューです。つまり、2つのメッセージを2つの独立したパケットとして送信した場合でも、オペレーティングシステムはそれらを2つのメッセージとしてではなく、単なるバイトの束として扱います。したがって、読み取った内容がリモートピアが書き込んだ内容と完全に一致するという保証はありません。たとえば、オペレーティングシステムのTCP/IPスタックが3つのパケットを受信したとしましょう。

Three packets received as they were sent

ストリームベースのプロトコルのこの一般的な特性のため、アプリケーションでは次の断片化された形式で読み取られる可能性が高くなります。

Three packets split and merged into four buffers

したがって、サーバー側かクライアント側かに関係なく、受信側は、受信したデータを、アプリケーションロジックで簡単に理解できる1つ以上の意味のあるフレームにデフラグする必要があります。上記の例の場合、受信データは次のようにフレーム化する必要があります。

Four buffers defragged into three

最初の解決策

それでは、TIMEクライアントの例に戻りましょう。ここでも同じ問題があります。32ビット整数は非常に少量のデータであり、頻繁に断片化される可能性は低いです。ただし、問題は断片化される可能性があり、トラフィックが増加するにつれて断片化の可能性が高くなることです。

単純な解決策は、内部累積バッファを作成し、4バイトすべてが内部バッファに受信されるまで待つことです。以下は、問題を修正した変更済みのTimeClientHandler実装です。

package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    private ByteBuf buf;
    
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        buf = ctx.alloc().buffer(4); // (1)
    }
    
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release(); // (1)
        buf = null;
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // (2)
        m.release();
        
        if (buf.readableBytes() >= 4) { // (3)
            long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. ChannelHandlerには、2つのライフサイクルリスナーメソッドがあります。handlerAdded()handlerRemoved()です。長時間ブロックしない限り、任意の(初期化解除)タスクを実行できます。
  2. まず、受信したすべてのデータをbufに累積する必要があります。
  3. 次に、ハンドラーは、bufに十分なデータ(この例では4バイト)があるかどうかを確認し、実際のビジネスロジックに進みます。そうでない場合、Nettyはより多くのデータが到着したときにchannelRead()メソッドを再び呼び出し、最終的に4バイトすべてが累積されます。

2番目の解決策

最初の解決策はTIMEクライアントの問題を解決しましたが、変更されたハンドラーはそれほどきれいには見えません。可変長フィールドなどの複数のフィールドで構成される、より複雑なプロトコルを想像してみてください。ChannelInboundHandlerの実装は、すぐに保守できなくなります。

お気づきかもしれませんが、ChannelPipelineに複数のChannelHandlerを追加できるため、1つのモノリシックなChannelHandlerを複数のモジュール式のものに分割して、アプリケーションの複雑さを軽減できます。たとえば、TimeClientHandlerを2つのハンドラーに分割できます。

  • 断片化の問題を処理するTimeDecoder、および
  • 初期のシンプルなバージョンのTimeClientHandler

幸いなことに、Nettyは、すぐに使用できる最初のものを書くのに役立つ拡張可能なクラスを提供しています。

package io.netty.example.time;

public class TimeDecoder extends ByteToMessageDecoder { // (1)
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }
        
        out.add(in.readBytes(4)); // (4)
    }
}
  1. ByteToMessageDecoderByteToMessageDecoder)は、ChannelInboundHandlerの実装であり、断片化問題への対処を容易にします。
  2. ByteToMessageDecoderByteToMessageDecoder)は、新しいデータを受信するたびに、内部的に保持されている累積バッファを使用してdecode()メソッドを呼び出します。
  3. 累積バッファに十分なデータがない場合、decode()outに何も追加しないことを決定できます。 ByteToMessageDecoderByteToMessageDecoder)は、より多くのデータを受信したときにdecode()を再度呼び出します。
  4. decode()outにオブジェクトを追加する場合、デコーダーがメッセージを正常にデコードしたことを意味します。 ByteToMessageDecoderByteToMessageDecoder)は、累積バッファの読み取り部分を破棄します。 複数のメッセージをデコードする必要がないことに注意してください。 ByteToMessageDecoderByteToMessageDecoder)は、outに何も追加されなくなるまでdecode()メソッドを呼び出し続けます。

ChannelPipelineに挿入する別のハンドラーができたので、TimeClientChannelInitializer実装を変更する必要があります。

b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
    }
});

冒険好きな方は、デコーダーをさらに簡素化するReplayingDecoderを試してみることもできます。 詳細については、APIリファレンスを参照してください。

public class TimeDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(
            ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        out.add(in.readBytes(4));
    }
}

さらに、Nettyは、ほとんどのプロトコルを非常に簡単に実装できるすぐに使えるデコーダーを提供しており、モノリシックで保守不可能なハンドラー実装にならないようにするのに役立ちます。 詳細な例については、以下のパッケージを参照してください。

ByteBufではなくPOJOを使う

これまでにレビューしたすべての例では、プロトコルメッセージの主要なデータ構造としてByteBufを使用していました。 このセクションでは、TIMEプロトコルのクライアントとサーバーの例を改善して、ByteBufの代わりにPOJOを使用するようにします。

ChannelHandlerでPOJOを使用する利点は明らかです。 ByteBufから情報を抽出するコードをハンドラーから分離することで、ハンドラーの保守性と再利用性が向上します。 TIMEクライアントとサーバーの例では、32ビット整数を1つだけ読み取りますが、ByteBufを直接使用しても大きな問題はありません。 ただし、実際の世界のプロトコルを実装する際には、分離が必要になることがわかります。

まず、UnixTimeという新しい型を定義しましょう。

package io.netty.example.time;

import java.util.Date;

public class UnixTime {

    private final long value;
    
    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }
    
    public UnixTime(long value) {
        this.value = value;
    }
        
    public long value() {
        return value;
    }
        
    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}

これで、ByteBufの代わりにUnixTimeを生成するようにTimeDecoderを修正できます。

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    if (in.readableBytes() < 4) {
        return;
    }

    out.add(new UnixTime(in.readUnsignedInt()));
}

更新されたデコーダーを使用すると、TimeClientHandlerByteBufを使用しなくなります。

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    UnixTime m = (UnixTime) msg;
    System.out.println(m);
    ctx.close();
}

ずっとシンプルでエレガントですよね? 同じテクニックをサーバー側にも適用できます。 今度は最初にTimeServerHandlerを更新してみましょう。

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ChannelFuture f = ctx.writeAndFlush(new UnixTime());
    f.addListener(ChannelFutureListener.CLOSE);
}

これで、足りないのはエンコーダーだけです。エンコーダーは、UnixTimeByteBufに再変換するChannelOutboundHandlerの実装です。 メッセージをエンコードするときにパケットの断片化とアセンブリを処理する必要がないため、デコーダーを作成するよりもはるかに簡単です。

package io.netty.example.time;

public class TimeEncoder extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        UnixTime m = (UnixTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt((int)m.value());
        ctx.write(encoded, promise); // (1)
    }
}
  1. この1行には、非常に重要なことがいくつかあります。

    まず、元のChannelPromiseをそのまま渡すため、エンコードされたデータが実際にネットワークに書き込まれたときに、Nettyはそれを成功または失敗としてマークします。

    次に、ctx.flush()を呼び出しませんでした。 flush()操作をオーバーライドすることを目的とした、別のハンドラーメソッドvoid flush(ChannelHandlerContext ctx)があります。

さらに簡素化するために、MessageToByteEncoderを利用できます。

public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
        out.writeInt((int)msg.value());
    }
}

残された最後のタスクは、サーバー側のChannelPipelineTimeServerHandlerの前にTimeEncoderを挿入することであり、これは簡単な演習として残されています。

アプリケーションのシャットダウン

Nettyアプリケーションのシャットダウンは、通常、shutdownGracefully()を介して作成したすべてのEventLoopGroupをシャットダウンするのと同じくらい簡単です。 Futureが返され、EventLoopGroupが完全に終了し、グループに属するすべてのChannelが閉じられたときに通知されます。

まとめ

この章では、Nettyを使用して完全に機能するネットワークアプリケーションを作成する方法を実演しながら、Nettyの概要を説明しました。

Nettyの詳細については、今後の章で説明します。 また、io.netty.exampleパッケージのNettyの例を確認することをお勧めします。

コミュニティは、皆様からのご質問やアイデアを常に歓迎しており、皆様を支援し、フィードバックに基づいてNettyとそのドキュメントを改善し続けています。

最終取得日:2024年7月19日