5.xユーザーガイド
今日では、相互に通信するために汎用アプリケーションやライブラリを使用しています。たとえば、Webサーバーから情報を取得し、Webサービスを介してリモートプロシージャコールを呼び出すために、HTTPクライアントライブラリを頻繁に使用します。
しかし、汎用プロトコルまたはその実装は、場合によってはスケール性が非常に悪くなることがあります。巨大なファイル、電子メールメッセージ、金融情報やマルチプレイヤーゲームデータなどのほぼリアルタイムのメッセージを交換するために、汎用HTTPサーバーを使用しないのと同じです。必要なのは、特定の目的に特化した、高度に最適化されたプロトコル実装です。たとえば、AJAXベースのチャットアプリケーション、メディアストリーミング、または大規模ファイル転送に最適化されたHTTPサーバーを実装したい場合があります。必要に応じて、まったく新しいプロトコルを設計および実装することもできます。
もう1つの避けられないケースは、古いシステムとの相互運用性を確保するために、レガシーの独自プロトコルを処理しなければならない場合です。この場合重要なのは、結果として得られるアプリケーションの安定性とパフォーマンスを犠牲にすることなく、そのプロトコルをどれだけ迅速に実装できるかです。
Nettyプロジェクトは、保守可能な高性能・高スケーラビリティのプロトコルサーバーとクライアントを迅速に開発するための、非同期イベント駆動型ネットワークアプリケーションフレームワークとツールを提供する取り組みです。
言い換えれば、Nettyは、プロトコルサーバーやクライアントなどのネットワークアプリケーションの迅速かつ容易な開発を可能にするNIOクライアントサーバーフレームワークです。TCPやUDPソケットサーバーの開発などのネットワークプログラミングを大幅に簡素化し、効率化します。
「迅速かつ容易」とは、結果として得られるアプリケーションが保守性またはパフォーマンスの問題に悩まされるという意味ではありません。Nettyは、FTP、SMTP、HTTP、およびさまざまなバイナリとテキストベースのレガシープロトコルなどの多くのプロトコルの実装から得られた経験を基に、慎重に設計されています。その結果、Nettyは、妥協することなく、開発の容易性、パフォーマンス、安定性、柔軟性を達成する方法を見つけることに成功しました。
一部のユーザーは、すでに同じ利点を主張する他のネットワークアプリケーションフレームワークを見つけている可能性があり、Nettyをそれらと何が異なるのか疑問に思うかもしれません。答えは、それが構築されている哲学です。Nettyは、APIと実装の両方において、最初から最高の快適なエクスペリエンスを提供するように設計されています。これは具体的なものではありませんが、このガイドを読み、Nettyを試す際に、この哲学があなたの生活をはるかに楽にすることに気づくでしょう。
この章では、Nettyのコア構造を簡単な例で説明し、迅速に開始できるようにします。この章の最後には、Netty上でクライアントとサーバーをすぐに記述できるようになります。
何かを学習する際にトップダウンアプローチを好む場合は、第2章「アーキテクチャの概要」から始めて、ここに戻ってくることができます。
この章で紹介されている例を実行するための最小要件は2つだけです。最新のNettyバージョンとJDK 1.6以上。最新のNettyバージョンはプロジェクトのダウンロードページで入手できます。JDKの適切なバージョンをダウンロードするには、お好みのJDKベンダーのWebサイトを参照してください。
読み進めていくうちに、この章で紹介されているクラスについてさらに質問があるかもしれません。それらについて詳しく知りたい場合は、いつでもAPIリファレンスを参照してください。このドキュメント内のすべてのクラス名は、便宜上オンラインAPIリファレンスにリンクされています。また、Nettyプロジェクトコミュニティに連絡して、不正確な情報、文法やタイプミス、ドキュメントを改善するための良いアイデアがあればお知らせください。
世界で最も単純なプロトコルは「Hello、World!」ではなくDISCARD
です。これは、応答なしに受信したデータをすべて破棄するプロトコルです。
DISCARD
プロトコルを実装するには、受信したデータをすべて無視するだけで済みます。Nettyによって生成されたI/Oイベントを処理するハンドラーの実装から始めましょう。
package io.netty.example.discard;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerAdapter;
/**
* Handles a server-side channel.
*/
public class DiscardServerHandler extends ChannelHandlerAdapter { // (1)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
// Discard the received data silently.
((ByteBuf) msg).release(); // (3)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
-
DiscardServerHandler
はChannelHandlerAdapter
を拡張します。これはChannelHandler
の実装です。ChannelHandler
は、オーバーライドできるさまざまなイベントハンドラーメソッドを提供します。現時点では、自分でハンドラーインターフェースを実装するよりも、ChannelHandlerAdapter
を拡張するだけで十分です。 - ここでは、
channelRead()
イベントハンドラーメソッドをオーバーライドします。このメソッドは、クライアントから新しいデータが受信されるたびに、受信したメッセージで呼び出されます。この例では、受信したメッセージの型はByteBuf
です。 DISCARD
プロトコルを実装するために、ハンドラーは受信したメッセージを無視する必要があります。ByteBuf
は、release()
メソッドを介して明示的に解放する必要がある参照カウントオブジェクトです。ハンドラーに渡された参照カウントオブジェクトを解放することは、ハンドラーの責任であることに注意してください。通常、channelRead()
ハンドラーメソッドは次のように実装されます。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
// Do something with msg
} finally {
ReferenceCountUtil.release(msg);
}
}
exceptionCaught()
イベントハンドラーメソッドは、I/OエラーによってNettyによって、またはイベントの処理中にスローされた例外によってハンドラーの実装によって例外が発生した場合に、Throwable
で呼び出されます。ほとんどの場合、キャッチされた例外はログに記録され、関連付けられたチャネルはここで閉じられますが、例外的な状況に対処するために何を行うかによって、このメソッドの実装は異なる場合があります。たとえば、接続を閉じる前に、エラーコードを含む応答メッセージを送信する場合があります。
今のところ順調です。DISCARD
サーバーの前半を実装しました。残りは、DiscardServerHandler
でサーバーを開始するmain()
メソッドを記述することです。
package io.netty.example.discard;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Discards any incoming data.
*/
public class DiscardServer {
private int port;
public DiscardServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new DiscardServer(port).run();
}
}
-
NioEventLoopGroup
は、I/O操作を処理するマルチスレッドイベントループです。Nettyは、さまざまな種類のトランスポートに対して、さまざまなEventLoopGroup
実装を提供します。この例ではサーバー側のアプリケーションを実装しているので、2つのNioEventLoopGroup
が使用されます。最初のものは、多くの場合「ボス」と呼ばれ、着信接続を受け入れます。2番目のものは、多くの場合「ワーカー」と呼ばれ、ボスが接続を受け入れて、受け入れられた接続をワーカーに登録すると、受け入れられた接続のトラフィックを処理します。使用されるスレッド数と、作成されたChannel
へのマッピング方法は、EventLoopGroup
の実装に依存し、コンストラクターを介して構成できる場合もあります。 -
ServerBootstrap
は、サーバーを設定するヘルパークラスです。Channel
を使用して直接サーバーを設定できます。ただし、これは面倒なプロセスであり、ほとんどの場合、実行する必要はないことに注意してください。 - ここでは、着信接続を受け入れる新しい
Channel
をインスタンス化するのに使用されるNioServerSocketChannel
クラスを使用するように指定します。 - ここで指定されたハンドラーは、新しく受け入れられた
Channel
によって常に評価されます。ChannelInitializer
は、新しいChannel
の構成をユーザーが支援することを目的とした特別なハンドラーです。ネットワークアプリケーションを実装するために、DiscardServerHandler
などのハンドラーをいくつか追加して、新しいChannel
のChannelPipeline
を構成したい可能性が最も高いです。アプリケーションが複雑になるにつれて、パイプラインにさらに多くのハンドラーを追加し、この無名クラスを最終的にトップレベルクラスに抽出する可能性が高くなります。 Channel
実装に固有のパラメーターを設定することもできます。TCP/IPサーバーを作成しているので、tcpNoDelay
やkeepAlive
などのソケットオプションを設定できます。ChannelOption
と特定のChannelConfig
実装のapidocsを参照して、サポートされているChannelOption
の概要を確認してください。option()
とchildOption()
に気づきましたか?option()
は、着信接続を受け入れるNioServerSocketChannel
用です。childOption()
は、親ServerChannel
(この場合はNioServerSocketChannel
)によって受け入れられたChannel
用です。- 準備ができました。残りは、ポートにバインドしてサーバーを起動することです。ここでは、マシンのすべてのNIC(ネットワークインターフェースカード)のポート
8080
にバインドします。必要に応じて、bind()
メソッドを何度でも呼び出すことができます(異なるバインドアドレスを使用)。
おめでとうございます!Netty上で最初のサーバーを完成させました。
最初のサーバーを作成したので、それが実際に機能するかどうかをテストする必要があります。それをテストする最も簡単な方法は、telnetコマンドを使用することです。たとえば、コマンドラインにtelnet localhost 8080
と入力し、何かを入力できます。
しかし、サーバーが正常に動作していると断言できますか?これは破棄サーバーなので、実際にはそうではありません。まったく応答がありません。それが本当に機能することを証明するために、受信したものを印刷するようにサーバーを変更しましょう。
データ受信時にchannelRead()
メソッドが呼び出されることは既に知っています。DiscardServerHandler
のchannelRead()
メソッドにいくつかのコードを追加してみましょう。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
try {
while (in.isReadable()) { // (1)
System.out.print((char) in.readByte());
System.out.flush();
}
} finally {
ReferenceCountUtil.release(msg); // (2)
}
}
- この非効率的なループは、実際には
System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII))
に簡略化できます。 - あるいは、ここで
in.release()
を実行することもできます。
もう一度telnetコマンドを実行すると、サーバーが受信した内容を出力するのを確認できます。
ディスカードサーバーの完全なソースコードは、配布パッケージのio.netty.example.discard
にあります。
これまでのところ、一切応答せずにデータを使用してきました。しかし、サーバーは通常、要求に応答する必要があります。受信したデータをすべて送り返すECHO
プロトコルを実装することで、クライアントにレスポンスメッセージを書き込む方法を学びましょう。
前のセクションで実装したディスカードサーバーとの違いは、受信したデータをコンソールに出力する代わりに、受信したデータを戻すことです。そのため、channelRead()
メソッドを変更するだけで十分です。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg); // (1)
ctx.flush(); // (2)
}
ChannelHandlerContext
オブジェクトは、さまざまなI/Oイベントや操作をトリガーできる様々な操作を提供します。ここでは、write(Object)
を呼び出して、受信したメッセージをそのまま書き込みます。DISCARD
の例とは異なり、受信したメッセージを解放しなかったことに注意してください。これは、Nettyがワイヤに書き出されたときに自動的に解放するためです。-
ctx.write(Object)
は、メッセージをワイヤに書き込みません。内部的にバッファリングされ、ctx.flush()
によってワイヤにフラッシュされます。あるいは、簡潔にするためにctx.writeAndFlush(msg)
を呼び出すこともできます。
もう一度telnetコマンドを実行すると、サーバーが送信したものを送り返すのが確認できます。
エコーサーバーの完全なソースコードは、配布パッケージのio.netty.example.echo
にあります。
このセクションで実装するプロトコルはTIME
プロトコルです。これは、32ビット整数を格納したメッセージをリクエストを受信せずに送信し、メッセージの送信後に接続を閉じるところが、これまでの例とは異なります。この例では、メッセージの構築と送信、および完了時の接続の閉じ方を学習します。
接続が確立されるとすぐにメッセージを送信しますが、受信したデータは無視するため、今回はchannelRead()
メソッドを使用できません。代わりに、channelActive()
メソッドをオーバーライドする必要があります。実装は以下のとおりです。
package io.netty.example.time;
public class TimeServerHandler extends ChannelHandlerAdapter {
@Override
public void channelActive(final ChannelHandlerContext ctx) { // (1)
final ByteBuf time = ctx.alloc().buffer(4); // (2)
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
final ChannelFuture f = ctx.writeAndFlush(time); // (3)
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
}); // (4)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
-
説明したように、
channelActive()
メソッドは、接続が確立され、トラフィックの生成準備ができたときに呼び出されます。このメソッドで、現在時刻を表す32ビット整数を書き込みましょう。 -
新しいメッセージを送信するには、メッセージを含む新しいバッファを割り当てる必要があります。32ビット整数を書き込むため、容量が少なくとも4バイトの
ByteBuf
が必要です。ChannelHandlerContext.alloc()
を介して現在のByteBufAllocator
を取得し、新しいバッファを割り当てます。 -
いつものように、構築されたメッセージを書き込みます。
しかし、待ってください。「flip」はどこにありますか?NIOでメッセージを送信する前に
java.nio.ByteBuffer.flip()
を呼び出す必要はありませんでしたか?ByteBuf
には、読み取り操作用と書き込み操作用の2つのポインタがあるため、そのようなメソッドはありません。ByteBuf
に何かを書き込むと、ライターインデックスが増加しますが、リーダーインデックスは変わりません。リーダーインデックスとライターインデックスは、それぞれメッセージの開始位置と終了位置を表します。これに対し、NIOバッファは、flipメソッドを呼び出さずにメッセージの内容の開始位置と終了位置を明確に把握するクリーンな方法を提供しません。バッファのflipを忘れると、何も送信されないか、不正確なデータが送信されるため、問題が発生します。Nettyでは、異なる操作の種類に対して異なるポインタがあるため、このようなエラーは発生しません。慣れてくると、非常に簡単になります—翻弄されることのない生活です!
もう1つのポイントは、
ChannelHandlerContext.write()
(およびwriteAndFlush()
)メソッドがChannelFuture
を返すことです。ChannelFuture
は、まだ発生していないI/O操作を表します。つまり、Nettyではすべての操作が非同期であるため、要求された操作がまだ実行されていない可能性があります。たとえば、次のコードは、メッセージが送信される前に接続を閉じる可能性があります。Channel ch = ...; ch.writeAndFlush(message); ch.close();
したがって、
write()
メソッドによって返されたChannelFuture
が完了した後にclose()
メソッドを呼び出す必要があります。これは、書き込み操作が完了したときにリスナーに通知します。close()
も接続をすぐに閉じるとは限らず、ChannelFuture
を返すことに注意してください。 -
では、書き込み要求が完了したときにどのように通知を受け取るかというと、返された
ChannelFuture
にChannelFutureListener
を追加するだけです。ここでは、操作が完了したときにChannel
を閉じる新しい匿名のChannelFutureListener
を作成しました。あるいは、事前に定義されたリスナーを使用してコードを簡素化することもできます。
f.addListener(ChannelFutureListener.CLOSE);
タイムサーバーが期待通りに動作するかどうかをテストするには、UNIXのrdate
コマンドを使用できます。
$ rdate -o <port> -p <host>
ここで、<port>
はmain()
メソッドで指定したポート番号、<host>
は通常localhost
です。
DISCARD
およびECHO
サーバーとは異なり、TIME
プロトコルにはクライアントが必要です。これは、人間が32ビットのバイナリデータをカレンダーの日付に変換できないためです。このセクションでは、サーバーが正しく動作することを確認する方法と、Nettyを使用してクライアントを作成する方法について説明します。
Nettyにおけるサーバーとクライアントの最大かつ唯一の違いは、異なるBootstrap
とChannel
の実装が使用されることです。以下のコードを見てください。
package io.netty.example.time;
public class TimeClient {
public static void main(String[] args) throws Exception {
String host = args[0];
int port = Integer.parseInt(args[1]);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync(); // (5)
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
-
Bootstrap
はServerBootstrap
に似ていますが、クライアント側またはコネクションレスチャネルなど、非サーバーチャネル用です。 EventLoopGroup
を1つだけ指定すると、ボスグループとワーカーグループの両方として使用されます。ただし、クライアント側ではボスワーカーは使用されません。NioServerSocketChannel
の代わりに、NioSocketChannel
を使用して、クライアント側のChannel
を作成しています。- クライアント側の
SocketChannel
には親がないため、ServerBootstrap
で行ったように、ここではchildOption()
を使用していません。 bind()
メソッドの代わりにconnect()
メソッドを呼び出す必要があります。
ご覧のとおり、サーバー側のコードとそれほど違いはありません。ChannelHandler
の実装はどうでしょうか?サーバーから32ビット整数を受信し、人間が読み取れる形式に変換して、変換された時刻を出力し、接続を閉じる必要があります。
package io.netty.example.time;
import java.util.Date;
public class TimeClientHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg; // (1)
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
} finally {
m.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
- TCP/IPでは、Nettyはピアから送信されたデータを[
ByteBuf
]に読み取ります。
非常にシンプルに見え、サーバー側の例と何ら変わりません。ただし、このハンドラーは場合によってはIndexOutOfBoundsException
を発生させて動作を拒否します。その理由については、次のセクションで説明します。
TCP/IPなどのストリームベースのトランスポートでは、受信したデータはソケット受信バッファに格納されます。残念ながら、ストリームベースのトランスポートのバッファはパケットのキューではなくバイトのキューです。つまり、2つの独立したパケットとして2つのメッセージを送信した場合でも、オペレーティングシステムはそれらを2つのメッセージとしてではなく、単なるバイトの塊として扱います。したがって、読み取られたものがリモートピアが書いたものと正確に一致するという保証はありません。たとえば、オペレーティングシステムのTCP/IPスタックが3つのパケットを受信したと仮定しましょう。
ストリームベースのプロトコルのこの一般的な特性により、アプリケーションで次の断片化された形式でそれらを読み取る可能性が高くなります。
したがって、サーバー側でもクライアント側でも、受信側は受信データを、アプリケーションロジックで簡単に理解できる1つ以上の意味のあるフレームにデフラグメントする必要があります。上記の例では、受信データは次のようにフレーム化される必要があります。
それでは、TIME
クライアントの例に戻りましょう。ここでは同じ問題があります。32ビット整数は非常に少量のデータであり、頻繁に断片化される可能性は低いです。しかし、問題は、断片化される可能性があり、トラフィックが増加するにつれて断片化の可能性が高まることです。
単純な解決策は、内部累積バッファを作成し、すべての4バイトが内部バッファに受信されるまで待機することです。問題は、内部累積バッファを作成し、すべての4バイトが内部バッファに受信されるまで待機することです。以下は、問題を修正した修正済みのTimeClientHandler
実装です。
package io.netty.example.time;
import java.util.Date;
public class TimeClientHandler extends ChannelHandlerAdapter {
private ByteBuf buf;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
buf = ctx.alloc().buffer(4); // (1)
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
buf.release(); // (1)
buf = null;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg;
buf.writeBytes(m); // (2)
m.release();
if (buf.readableBytes() >= 4) { // (3)
long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
ChannelHandler
には、handlerAdded()
とhandlerRemoved()
という2つのライフサイクルリスナーメソッドがあります。長時間ブロックしない限り、任意の(非)初期化タスクを実行できます。- まず、受信したすべてのデータを
buf
に累積する必要があります。 - 次に、ハンドラーは
buf
に十分なデータ(この例では4バイト)があるかどうかを確認し、実際のビジネスロジックに進みます。そうでない場合、Nettyはさらにデータが到着するとchannelRead()
メソッドを再度呼び出し、最終的にすべての4バイトが累積されます。
最初の解決策でTIME
クライアントの問題は解決されましたが、修正されたハンドラーはそれほどクリーンではありません。可変長のフィールドなど、複数のフィールドで構成されるより複雑なプロトコルを想像してみてください。ChannelHandler
の実装は、すぐに保守できなくなります。
気づいたかもしれませんが、ChannelPipeline
に複数のChannelHandler
を追加できます。そのため、1つの巨大なChannelHandler
を複数のモジュール式のものに分割して、アプリケーションの複雑さを軽減できます。たとえば、TimeClientHandler
を2つのハンドラーに分割できます。
-
断片化の問題を処理する
TimeDecoder
と、 TimeClientHandler
の最初の単純なバージョン。
幸いにも、Nettyは最初のものをすぐに作成するのに役立つ拡張可能なクラスを提供しています。
package io.netty.example.time;
public class TimeDecoder extends ByteToMessageDecoder { // (1)
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
if (in.readableBytes() < 4) {
return; // (3)
}
out.add(in.readBytes(4)); // (4)
}
}
-
ByteToMessageDecoder
は、ChannelHandler
の実装であり、フラグメンテーションの問題を容易に処理できるようにします。 -
ByteToMessageDecoder
は、新しいデータを受信するたびに、内部的に維持されている累積バッファを使用してdecode()
メソッドを呼び出します。 -
decode()
は、累積バッファに十分なデータがない場合、out
に何も追加しないことを決定できます。ByteToMessageDecoder
は、さらにデータが受信されると、再びdecode()
を呼び出します。 decode()
がout
にオブジェクトを追加した場合、デコーダがメッセージを正常にデコードしたことを意味します。ByteToMessageDecoder
は、累積バッファの読み取られた部分を破棄します。複数のメッセージをデコードする必要がないことに注意してください。ByteToMessageDecoder
は、out
に何も追加しなくなるまで、decode()
メソッドを呼び出し続けます。
これで、ChannelPipeline
に挿入する別のハンドラができたので、TimeClient
の ChannelInitializer
実装を変更する必要があります。
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
}
});
冒険好きな方は、デコーダをさらに簡素化する ReplayingDecoder
を試してみるのも良いでしょう。ただし、詳細についてはAPIリファレンスを参照する必要があります。
public class TimeDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(
ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
out.add(in.readBytes(4));
}
}
さらに、Nettyはすぐに使えるデコーダを提供しており、ほとんどのプロトコルを非常に簡単に実装でき、巨大で保守が難しいハンドラ実装になることを防ぐことができます。より詳細な例については、以下のパッケージを参照してください。
-
バイナリプロトコルには
io.netty.example.factorial
、 -
テキスト行ベースのプロトコルには
io.netty.example.telnet
。
これまで見てきたすべての例では、ByteBuf
をプロトコルメッセージの主要なデータ構造として使用していました。このセクションでは、TIME
プロトコルのクライアントとサーバーの例を改善し、ByteBuf
の代わりにPOJOを使用します。
ChannelHandler
でPOJOを使用する利点は明らかです。ByteBuf
から情報を抽出するコードをハンドラから分離することで、ハンドラはより保守可能で再利用可能になります。TIME
クライアントとサーバーの例では、32ビット整数を1つだけ読み取るので、ByteBuf
を直接使用しても大きな問題はありません。しかし、現実世界のプロトコルを実装する際には、この分離が必要になることがわかります。
まず、UnixTime
という新しい型を定義しましょう。
package io.netty.example.time;
import java.util.Date;
public class UnixTime {
private final long value;
public UnixTime() {
this(System.currentTimeMillis() / 1000L + 2208988800L);
}
public UnixTime(long value) {
this.value = value;
}
public long value() {
return value;
}
@Override
public String toString() {
return new Date((value() - 2208988800L) * 1000L).toString();
}
}
これで、ByteBuf
の代わりに UnixTime
を生成するように TimeDecoder
を修正できます。
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() < 4) {
return;
}
out.add(new UnixTime(in.readUnsignedInt()));
}
デコーダを更新したので、TimeClientHandler
はもう ByteBuf
を使用しません。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
UnixTime m = (UnixTime) msg;
System.out.println(m);
ctx.close();
}
はるかにシンプルでエレガントですね?同じテクニックをサーバー側にも適用できます。今回はまず TimeServerHandler
を更新しましょう。
@Override
public void channelActive(ChannelHandlerContext ctx) {
ChannelFuture f = ctx.writeAndFlush(new UnixTime());
f.addListener(ChannelFutureListener.CLOSE);
}
これで、残っているのはエンコーダだけです。これは、UnixTime
を ByteBuf
に変換する ChannelHandler
の実装です。メッセージをエンコードする際にはパケットのフラグメンテーションとアセンブリを処理する必要がないため、デコーダを書くよりもはるかに簡単です。
package io.netty.example.time;
public class TimeEncoder extends ChannelHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
UnixTime m = (UnixTime) msg;
ByteBuf encoded = ctx.alloc().buffer(4);
encoded.writeInt((int) m.value());
ctx.write(encoded, promise); // (1)
}
}
-
このハンドラメソッドには、いくつかの重要な点があります。
まず、元の
ChannelPromise
をそのまま渡すことで、エンコードされたデータが実際にワイヤーに書き出されたときに、Netty が成功または失敗としてマークします。次に、
ctx.flush()
を呼び出しませんでした。flush()
操作をオーバーライドすることを目的とした、別のハンドラメソッドvoid flush(ChannelHandlerContext ctx)
があります。
さらに簡素化するために、MessageToByteEncoder
を使用できます。
public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
@Override
protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
out.writeInt((int) msg.value());
}
}
残りのタスクは、サーバー側で TimeServerHandler
の前に ChannelPipeline
に TimeEncoder
を挿入することです。これは簡単な練習問題として残しておきます。
Nettyアプリケーションのシャットダウンは、通常、shutdownGracefully()
を介して作成したすべての EventLoopGroup
をシャットダウンするだけという簡単な操作です。EventLoopGroup
が完全に終了し、グループに属するすべての Channel
が閉じられたときに通知する Future
を返します。
この章では、Nettyの概要と、Netty上に完全に動作するネットワークアプリケーションを作成する方法のデモを行いました。
今後の章で、Nettyに関するより詳細な情報を取り上げます。io.netty.example
パッケージにあるNettyの例を確認することをお勧めします。
また、コミュニティ は常に皆様からのご質問やアイデアをお待ちしており、皆様を支援し、皆様からのフィードバックに基づいてNettyとそのドキュメントを改善し続けています。