4.xユーザーガイド
今日では、汎用アプリケーションやライブラリを使用して相互に通信しています。たとえば、Webサーバーから情報を取得したり、Webサービスを介してリモートプロシージャコールを呼び出したりするために、HTTPクライアントライブラリをよく使用します。しかし、汎用プロトコルやその実装は、スケーラビリティが十分でない場合があります。これは、巨大なファイル、電子メールメッセージ、金融情報やマルチプレイヤーゲームデータなどのほぼリアルタイムのメッセージを交換するために、汎用HTTPサーバーを使用しないのと同じです。必要なのは、特定の目的に特化した高度に最適化されたプロトコル実装です。たとえば、AJAXベースのチャットアプリケーション、メディアストリーミング、または大規模ファイル転送に最適化されたHTTPサーバーを実装したい場合があります。ニーズに合わせて正確に調整されたまったく新しいプロトコルを設計および実装することさえできます。避けられないもう1つのケースは、古いシステムとの相互運用性を確保するために、レガシーの独自プロトコルを扱わなければならない場合です。この場合、重要なのは、結果として得られるアプリケーションの安定性とパフォーマンスを犠牲にすることなく、そのプロトコルをどれだけ迅速に実装できるかです。
Nettyプロジェクトは、保守性の高い高性能でスケーラブルなプロトコルサーバーとクライアントを迅速に開発するための、非同期イベント駆動型ネットワークアプリケーションフレームワークとツールを提供するための取り組みです。
言い換えれば、Nettyは、プロトコルサーバーやクライアントなどのネットワークアプリケーションの迅速かつ簡単な開発を可能にするNIOクライアントサーバーフレームワークです。TCPやUDPソケットサーバーの開発など、ネットワークプログラミングを大幅に簡素化および効率化します。
「迅速かつ簡単」とは、結果として得られるアプリケーションが保守性やパフォーマンスの問題に悩まされるという意味ではありません。Nettyは、FTP、SMTP、HTTP、およびさまざまなバイナリおよびテキストベースのレガシープロトコルなど、多くのプロトコルの実装から得られた経験に基づいて慎重に設計されています。その結果、Nettyは、妥協することなく、開発の容易さ、パフォーマンス、安定性、柔軟性を実現する方法を見つけることに成功しました。
一部のユーザーは、同じ利点があると主張する他のネットワークアプリケーションフレームワークをすでに知っているかもしれません。Nettyがそれらとどう違うのか疑問に思うかもしれません。答えは、それが基づいている哲学です。Nettyは、APIと実装の両方の面で、初日から最も快適なエクスペリエンスを提供するように設計されています。具体的なものではありませんが、このガイドを読んでNettyを試してみると、この哲学があなたの人生をはるかに楽にすることに気付くでしょう。
この章では、簡単な例を使用してNettyのコアとなる構成要素を概観し、すぐに使い始めることができます。この章の終わりには、Netty上にクライアントとサーバーをすぐに記述できるようになります。
何かを学ぶ際にトップダウン方式を好む場合は、第2章アーキテクチャの概要から始めて、ここに戻ってくることをお勧めします。
この章の例を実行するための最小要件は、Nettyの最新バージョンとJDK 1.6以上です。Nettyの最新バージョンは、プロジェクトのダウンロードページから入手できます。適切なバージョンのJDKをダウンロードするには、ご希望のJDKベンダーのWebサイトを参照してください。
読むにつれて、この章で紹介されているクラスについてさらに質問があるかもしれません。詳細を知りたい場合は、いつでもAPIリファレンスを参照してください。このドキュメントのすべてのクラス名は、便宜上、オンラインAPIリファレンスにリンクされています。また、Nettyプロジェクトコミュニティに連絡して、誤った情報、文法エラーやタイプミスがある場合、ドキュメントの改善に役立つ良いアイデアがある場合はお知らせください。
世界で最も単純なプロトコルは「Hello、World!」ではなく、DISCARD
です。受信したデータを応答なしに破棄するプロトコルです。
DISCARD
プロトコルを実装するには、受信したすべてのデータを無視するだけです。Nettyによって生成されたI/Oイベントを処理するハンドラの実装から langsung始めましょう。
package io.netty.example.discard;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* Handles a server-side channel.
*/
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
// Discard the received data silently.
((ByteBuf) msg).release(); // (3)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
-
DiscardServerHandler
は、ChannelInboundHandler
の実装であるChannelInboundHandlerAdapter
を拡張します。ChannelInboundHandler
は、オーバーライドできるさまざまなイベントハンドラーメソッドを提供します。今のところ、ハンドラーインターフェースを自分で実装するのではなく、ChannelInboundHandlerAdapter
を拡張するだけで十分です。 - ここでは、
channelRead()
イベントハンドラーメソッドをオーバーライドします。このメソッドは、クライアントから新しいデータが受信されるたびに、受信したメッセージと共に呼び出されます。この例では、受信したメッセージのタイプはByteBuf
です。 DISCARD
プロトコルを実装するには、ハンドラーは受信したメッセージを無視する必要があります。ByteBuf
は、release()
メソッドを介して明示的に解放する必要がある参照カウントオブジェクトです。ハンドラーに渡された参照カウントオブジェクトを解放するのはハンドラーの責任であることに注意してください。通常、channelRead()
ハンドラーメソッドは次のように実装されます
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
// Do something with msg
} finally {
ReferenceCountUtil.release(msg);
}
}
exceptionCaught()
イベントハンドラーメソッドは、I/Oエラーが原因でNettyによって、またはイベントの処理中にスローされた例外が原因でハンドラー実装によって例外が発生した場合に、Throwableと共に呼び出されます。ほとんどの場合、キャッチされた例外はログに記録され、関連付けられたチャネルはここで閉じられる必要がありますが、このメソッドの実装は、例外的な状況に対処する方法によって異なる場合があります。たとえば、接続を閉じる前に、エラーコードを含む応答メッセージを送信する場合があります。
ここまでは順調です。DISCARD
サーバーの半分を実装しました。残っているのは、DiscardServerHandler
でサーバーを起動するmain()
メソッドを作成することです。
package io.netty.example.discard;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Discards any incoming data.
*/
public class DiscardServer {
private int port;
public DiscardServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
new DiscardServer(port).run();
}
}
-
NioEventLoopGroup
は、I/O操作を処理するマルチスレッドイベントループです。Nettyは、さまざまな種類のトランスポート用にさまざまなEventLoopGroup
実装を提供しています。この例ではサーバーサイドアプリケーションを実装しているため、2つのNioEventLoopGroup
が使用されます。最初のものは、多くの場合「ボス」と呼ばれ、着信接続を受け入れます。2つ目は、多くの場合「ワーカー」と呼ばれ、ボスが接続を受け入れて、受け入れられた接続をワーカーに登録すると、受け入れられた接続のトラフィックを処理します。いくつのスレッドが使用され、それらがどのように作成されたChannel
にマップされるかは、EventLoopGroup
実装によって異なり、コンストラクターを介して設定できる場合もあります。 -
ServerBootstrap
は、サーバーを設定するヘルパークラスです。Channel
を使用してサーバーを langsung設定できます。ただし、これは面倒なプロセスであり、ほとんどの場合、これを行う必要はありません。 - ここでは、着信接続を受け入れるための新しい
Channel
をインスタンス化するために使用されるNioServerSocketChannel
クラスを使用することを指定します。 - ここで指定されたハンドラーは、新しく受け入れられた
Channel
によって常に評価されます。ChannelInitializer
は、ユーザーが新しいChannel
を設定するのに役立つように設計された特別なハンドラーです。ネットワークアプリケーションを実装するために、DiscardServerHandler
などのハンドラーを追加して、新しいChannel
のChannelPipeline
を設定することが最も多いでしょう。アプリケーションが複雑になるにつれて、パイプラインにさらにハンドラーを追加し、最終的にこの匿名クラスをトップレベルクラスに抽出する可能性が高くなります。 Channel
実装に固有のパラメーターも設定できます。TCP/IPサーバーを作成しているため、tcpNoDelay
やkeepAlive
などのソケットオプションを設定できます。サポートされているChannelOption
の概要については、ChannelOption
と特定のChannelConfig
実装のapidocsを参照してください。option()
とchildOption()
に気付きましたか?option()
は、着信接続を受け入れるNioServerSocketChannel
用です。childOption()
は、親ServerChannel
(この場合はNioSocketChannel
)によって受け入れられたChannel
用です。- これで準備が整いました。残っているのは、ポートにバインドしてサーバーを起動することです。ここでは、マシンのすべてのNIC(ネットワークインターフェースカード)のポート
8080
にバインドします。これで、bind()
メソッドを必要な回数だけ(異なるバインドアドレスで)呼び出すことができます。
おめでとうございます! Netty上に最初のサーバーを構築しました。
最初のサーバーを作成したので、実際に機能するかどうかをテストする必要があります。テストする最も簡単な方法は、_telnet_コマンドを使用することです。たとえば、コマンドラインにtelnet localhost 8080
と入力して、何かを入力できます。
しかし、サーバーは正常に機能していると断言できますか?破棄サーバーであるため、実際にはわかりません。まったく応答がありません。実際に機能していることを証明するために、受信した内容を出力するようにサーバーを変更してみましょう。
データが受信されるたびにchannelRead()
メソッドが呼び出されることは既に分かっています。DiscardServerHandler
のchannelRead()
メソッドにいくつかのコードを追加してみましょう。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
try {
while (in.isReadable()) { // (1)
System.out.print((char) in.readByte());
System.out.flush();
}
} finally {
ReferenceCountUtil.release(msg); // (2)
}
}
- この非効率的なループは、実際には
System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII))
のように簡略化できます。 - 代わりに、ここで
in.release()
を実行することもできます。
もう一度 *telnet* コマンドを実行すると、サーバーが受信した内容を出力するのがわかります。
破棄サーバーの完全なソースコードは、配布物のio.netty.example.discard
パッケージにあります。
これまで、私たちは全く応答せずにデータを使用してきました。しかし、サーバーは通常、リクエストに応答することになっています。ECHO
プロトコルを実装することで、クライアントに応答メッセージを書き込む方法を学びましょう。このプロトコルでは、受信したデータがそのまま送り返されます。
前のセクションで実装した破棄サーバーとの唯一の違いは、受信したデータを出力する代わりに、受信したデータをそのまま送り返すことです。したがって、channelRead()
メソッドを修正すれば十分です。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg); // (1)
ctx.flush(); // (2)
}
ChannelHandlerContext
オブジェクトは、さまざまなI/Oイベントと操作をトリガーできるさまざまな操作を提供します。ここでは、受信したメッセージをそのまま書き込むためにwrite(Object)
を呼び出します。DISCARD
の例とは異なり、受信したメッセージを解放していないことに注意してください。これは、Nettyがネットワークに書き出されたときに自動的に解放するためです。-
ctx.write(Object)
は、メッセージをネットワークに書き出しません。内部的にバッファリングされ、その後ctx.flush()
によってネットワークにフラッシュされます。簡潔にするために、ctx.writeAndFlush(msg)
を呼び出すこともできます。
もう一度 *telnet* コマンドを実行すると、サーバーが送信した内容をそのまま送り返すのがわかります。
エコーサーバーの完全なソースコードは、配布物のio.netty.example.echo
パッケージにあります。
このセクションで実装するプロトコルは、TIME
プロトコルです。これは、リクエストを受信せずに32ビット整数を格納したメッセージを送信し、メッセージの送信が完了すると接続を閉じるという点で、前の例とは異なります。この例では、メッセージを作成して送信する方法、および完了時に接続を閉じる方法を学びます。
受信したデータはすべて無視し、接続が確立されるとすぐにメッセージを送信するため、今回はchannelRead()
メソッドを使用できません。代わりに、channelActive()
メソッドをオーバーライドする必要があります。以下は実装です。
package io.netty.example.time;
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(final ChannelHandlerContext ctx) { // (1)
final ByteBuf time = ctx.alloc().buffer(4); // (2)
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
final ChannelFuture f = ctx.writeAndFlush(time); // (3)
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
}); // (4)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
-
説明したように、
channelActive()
メソッドは、接続が確立され、トラフィックを生成する準備ができたときに呼び出されます。このメソッドで現在の時刻を表す32ビット整数を書き込みましょう。 -
新しいメッセージを送信するには、メッセージを格納する新しいバッファを割り当てる必要があります。32ビット整数を書き込むため、容量が少なくとも4バイトの
ByteBuf
が必要です。ChannelHandlerContext.alloc()
を介して現在のByteBufAllocator
を取得し、新しいバッファを割り当てます。 -
通常どおり、作成したメッセージを書き込みます。
しかし、待ってください、フリップはどこにありますか?NIOでメッセージを送信する前に、
java.nio.ByteBuffer.flip()
を呼び出していませんでしたか?ByteBuf
には、読み取り操作用と書き込み操作用の2つのポインタがあるため、そのようなメソッドはありません。ByteBuf
に何かを書き込むとライターインデックスが増加しますが、リーダーインデックスは変更されません。リーダーインデックスとライターインデックスは、それぞれメッセージの開始位置と終了位置を表します。対照的に、NIOバッファは、フリップメソッドを呼び出さずにメッセージの内容の開始位置と終了位置を明確に把握する方法を提供していません。バッファをフリップするのを忘れると、何も送信されないか、間違ったデータが送信されるため、問題が発生します。Nettyでは、操作タイプごとに異なるポインタがあるため、このようなエラーは発生しません。慣れてくると、人生がはるかに楽になることがわかります。混乱のない人生です!
もう1つ注意すべき点は、
ChannelHandlerContext.write()
(およびwriteAndFlush()
)メソッドがChannelFuture
を返すことです。ChannelFuture
は、まだ発生していないI/O操作を表します。つまり、Nettyではすべての操作が非同期であるため、要求された操作はまだ実行されていない可能性があります。たとえば、次のコードは、メッセージが送信される前に接続を閉じる可能性があります。Channel ch = ...; ch.writeAndFlush(message); ch.close();
したがって、
write()
メソッドによって返されたChannelFuture
が完了した後、書き込み操作が完了したときにリスナーに通知するclose()
メソッドを呼び出す必要があります。close()
もすぐに接続を閉じない可能性があり、ChannelFuture
を返すことに注意してください。 -
では、書き込みリクエストが完了したときにどのように通知を受け取りますか?これは、返された
ChannelFuture
にChannelFutureListener
を追加するのと同じくらい簡単です。ここでは、操作が完了したときにChannel
を閉じる新しい匿名のChannelFutureListener
を作成しました。あるいは、定義済みのリスナーを使用してコードを簡略化することもできます。
f.addListener(ChannelFutureListener.CLOSE);
タイムサーバーが期待どおりに動作するかどうかをテストするには、UNIXのrdate
コマンドを使用できます。
$ rdate -o <port> -p <host>
ここで、<port>
はmain()
メソッドで指定したポート番号、<host>
は通常localhost
です。
DISCARD
サーバーやECHO
サーバーとは異なり、TIME
プロトコルにはクライアントが必要です。これは、人間が32ビットのバイナリデータをカレンダーの日付に変換できないためです。このセクションでは、サーバーが正しく動作することを確認する方法と、Nettyを使用してクライアントを作成する方法について説明します。
Nettyにおけるサーバーとクライアントの最大かつ唯一の違いは、異なるBootstrap
とChannel
の実装が使用されることです。次のコードをご覧ください。
package io.netty.example.time;
public class TimeClient {
public static void main(String[] args) throws Exception {
String host = args[0];
int port = Integer.parseInt(args[1]);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync(); // (5)
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
-
Bootstrap
は、クライアント側チャネルやコネクションレスチャネルなど、サーバー以外のチャネル用であることを除いて、ServerBootstrap
に似ています。 - 1つの
EventLoopGroup
のみを指定すると、ボスグループとワーカーグループの両方として使用されます。ただし、ボスのワーカーはクライアント側では使用されません。 NioServerSocketChannel
の代わりに、NioSocketChannel
がクライアント側のChannel
を作成するために使用されています。- クライアント側の
SocketChannel
には親がないため、ServerBootstrap
で行ったように、ここではchildOption()
を使用しないことに注意してください。 bind()
メソッドの代わりにconnect()
メソッドを呼び出す必要があります。
ご覧のとおり、サーバー側のコードとそれほど変わりません。ChannelHandler
の実装はどうでしょうか?サーバーから32ビット整数を受信し、人間が読める形式に変換し、変換された時刻を出力し、接続を閉じる必要があります。
package io.netty.example.time;
import java.util.Date;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg; // (1)
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
} finally {
m.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
- TCP/IPでは、Nettyはピアから送信されたデータを
ByteBuf
に読み込みます。
非常にシンプルに見え、サーバー側の例と何ら変わりないように見えます。ただし、このハンドラーは、IndexOutOfBoundsException
を発生させて動作を拒否することがあります。次のセクションでは、これが発生する理由について説明します。
TCP/IPなどのストリームベースのトランスポートでは、受信データはソケット受信バッファに格納されます。残念ながら、ストリームベースのトランスポートのバッファは、パケットのキューではなく、バイトのキューです。つまり、2つのメッセージを2つの独立したパケットとして送信した場合でも、オペレーティングシステムはそれらを2つのメッセージとしてではなく、単なるバイトの束として扱います。したがって、読み取った内容がリモートピアが書き込んだ内容と完全に一致するという保証はありません。たとえば、オペレーティングシステムのTCP/IPスタックが3つのパケットを受信したとしましょう。
ストリームベースのプロトコルのこの一般的な特性のため、アプリケーションでは次の断片化された形式で読み取られる可能性が高くなります。
したがって、サーバー側かクライアント側かに関係なく、受信側は、受信したデータを、アプリケーションロジックで簡単に理解できる1つ以上の意味のあるフレームにデフラグする必要があります。上記の例の場合、受信データは次のようにフレーム化する必要があります。
それでは、TIME
クライアントの例に戻りましょう。ここでも同じ問題があります。32ビット整数は非常に少量のデータであり、頻繁に断片化される可能性は低いです。ただし、問題は断片化される可能性があり、トラフィックが増加するにつれて断片化の可能性が高くなることです。
単純な解決策は、内部累積バッファを作成し、4バイトすべてが内部バッファに受信されるまで待つことです。以下は、問題を修正した変更済みのTimeClientHandler
実装です。
package io.netty.example.time;
import java.util.Date;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
private ByteBuf buf;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
buf = ctx.alloc().buffer(4); // (1)
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
buf.release(); // (1)
buf = null;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg;
buf.writeBytes(m); // (2)
m.release();
if (buf.readableBytes() >= 4) { // (3)
long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
ChannelHandler
には、2つのライフサイクルリスナーメソッドがあります。handlerAdded()
とhandlerRemoved()
です。長時間ブロックしない限り、任意の(初期化解除)タスクを実行できます。- まず、受信したすべてのデータを
buf
に累積する必要があります。 - 次に、ハンドラーは、
buf
に十分なデータ(この例では4バイト)があるかどうかを確認し、実際のビジネスロジックに進みます。そうでない場合、Nettyはより多くのデータが到着したときにchannelRead()
メソッドを再び呼び出し、最終的に4バイトすべてが累積されます。
最初の解決策はTIME
クライアントの問題を解決しましたが、変更されたハンドラーはそれほどきれいには見えません。可変長フィールドなどの複数のフィールドで構成される、より複雑なプロトコルを想像してみてください。ChannelInboundHandler
の実装は、すぐに保守できなくなります。
お気づきかもしれませんが、ChannelPipeline
に複数のChannelHandler
を追加できるため、1つのモノリシックなChannelHandler
を複数のモジュール式のものに分割して、アプリケーションの複雑さを軽減できます。たとえば、TimeClientHandler
を2つのハンドラーに分割できます。
-
断片化の問題を処理する
TimeDecoder
、および - 初期のシンプルなバージョンの
TimeClientHandler
。
幸いなことに、Nettyは、すぐに使用できる最初のものを書くのに役立つ拡張可能なクラスを提供しています。
package io.netty.example.time;
public class TimeDecoder extends ByteToMessageDecoder { // (1)
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
if (in.readableBytes() < 4) {
return; // (3)
}
out.add(in.readBytes(4)); // (4)
}
}
-
ByteToMessageDecoder
(ByteToMessageDecoder
)は、ChannelInboundHandler
の実装であり、断片化問題への対処を容易にします。 -
ByteToMessageDecoder
(ByteToMessageDecoder
)は、新しいデータを受信するたびに、内部的に保持されている累積バッファを使用してdecode()
メソッドを呼び出します。 -
累積バッファに十分なデータがない場合、
decode()
はout
に何も追加しないことを決定できます。ByteToMessageDecoder
(ByteToMessageDecoder
)は、より多くのデータを受信したときにdecode()
を再度呼び出します。 decode()
がout
にオブジェクトを追加する場合、デコーダーがメッセージを正常にデコードしたことを意味します。ByteToMessageDecoder
(ByteToMessageDecoder
)は、累積バッファの読み取り部分を破棄します。 複数のメッセージをデコードする必要がないことに注意してください。ByteToMessageDecoder
(ByteToMessageDecoder
)は、out
に何も追加されなくなるまでdecode()
メソッドを呼び出し続けます。
ChannelPipeline
に挿入する別のハンドラーができたので、TimeClient
のChannelInitializer
実装を変更する必要があります。
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
}
});
冒険好きな方は、デコーダーをさらに簡素化するReplayingDecoder
を試してみることもできます。 詳細については、APIリファレンスを参照してください。
public class TimeDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(
ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
out.add(in.readBytes(4));
}
}
さらに、Nettyは、ほとんどのプロトコルを非常に簡単に実装できるすぐに使えるデコーダーを提供しており、モノリシックで保守不可能なハンドラー実装にならないようにするのに役立ちます。 詳細な例については、以下のパッケージを参照してください。
-
バイナリプロトコルについては
io.netty.example.factorial
、 -
テキスト行ベースのプロトコルについては
io.netty.example.telnet
。
これまでにレビューしたすべての例では、プロトコルメッセージの主要なデータ構造としてByteBuf
を使用していました。 このセクションでは、TIME
プロトコルのクライアントとサーバーの例を改善して、ByteBuf
の代わりにPOJOを使用するようにします。
ChannelHandler
でPOJOを使用する利点は明らかです。 ByteBuf
から情報を抽出するコードをハンドラーから分離することで、ハンドラーの保守性と再利用性が向上します。 TIME
クライアントとサーバーの例では、32ビット整数を1つだけ読み取りますが、ByteBuf
を直接使用しても大きな問題はありません。 ただし、実際の世界のプロトコルを実装する際には、分離が必要になることがわかります。
まず、UnixTime
という新しい型を定義しましょう。
package io.netty.example.time;
import java.util.Date;
public class UnixTime {
private final long value;
public UnixTime() {
this(System.currentTimeMillis() / 1000L + 2208988800L);
}
public UnixTime(long value) {
this.value = value;
}
public long value() {
return value;
}
@Override
public String toString() {
return new Date((value() - 2208988800L) * 1000L).toString();
}
}
これで、ByteBuf
の代わりにUnixTime
を生成するようにTimeDecoder
を修正できます。
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() < 4) {
return;
}
out.add(new UnixTime(in.readUnsignedInt()));
}
更新されたデコーダーを使用すると、TimeClientHandler
はByteBuf
を使用しなくなります。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
UnixTime m = (UnixTime) msg;
System.out.println(m);
ctx.close();
}
ずっとシンプルでエレガントですよね? 同じテクニックをサーバー側にも適用できます。 今度は最初にTimeServerHandler
を更新してみましょう。
@Override
public void channelActive(ChannelHandlerContext ctx) {
ChannelFuture f = ctx.writeAndFlush(new UnixTime());
f.addListener(ChannelFutureListener.CLOSE);
}
これで、足りないのはエンコーダーだけです。エンコーダーは、UnixTime
をByteBuf
に再変換するChannelOutboundHandler
の実装です。 メッセージをエンコードするときにパケットの断片化とアセンブリを処理する必要がないため、デコーダーを作成するよりもはるかに簡単です。
package io.netty.example.time;
public class TimeEncoder extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
UnixTime m = (UnixTime) msg;
ByteBuf encoded = ctx.alloc().buffer(4);
encoded.writeInt((int)m.value());
ctx.write(encoded, promise); // (1)
}
}
-
この1行には、非常に重要なことがいくつかあります。
まず、元の
ChannelPromise
をそのまま渡すため、エンコードされたデータが実際にネットワークに書き込まれたときに、Nettyはそれを成功または失敗としてマークします。次に、
ctx.flush()
を呼び出しませんでした。flush()
操作をオーバーライドすることを目的とした、別のハンドラーメソッドvoid flush(ChannelHandlerContext ctx)
があります。
さらに簡素化するために、MessageToByteEncoder
を利用できます。
public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
@Override
protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
out.writeInt((int)msg.value());
}
}
残された最後のタスクは、サーバー側のChannelPipeline
にTimeServerHandler
の前にTimeEncoder
を挿入することであり、これは簡単な演習として残されています。
Nettyアプリケーションのシャットダウンは、通常、shutdownGracefully()
を介して作成したすべてのEventLoopGroup
をシャットダウンするのと同じくらい簡単です。 Future
が返され、EventLoopGroup
が完全に終了し、グループに属するすべてのChannel
が閉じられたときに通知されます。
この章では、Nettyを使用して完全に機能するネットワークアプリケーションを作成する方法を実演しながら、Nettyの概要を説明しました。
Nettyの詳細については、今後の章で説明します。 また、io.netty.example
パッケージのNettyの例を確認することをお勧めします。
コミュニティは、皆様からのご質問やアイデアを常に歓迎しており、皆様を支援し、フィードバックに基づいてNettyとそのドキュメントを改善し続けています。