2.Netty TCP服务器(TcpServer)

服务器 0

目录


Netty专栏目录(点击进入…)


Netty TCP服务器(TcpServer)

Reactor Netty提供了一个易于使用和配置的TcpServer。它隐藏Netty了创建TCP服务器所需的大部分功能并增加了Reactive Streams背压(Reactive Streams是具有无阻塞背压的异步流处理的标准)

启动和停止

import reactor.netty.DisposableServer;import reactor.netty.tcp.TcpServer;public class Application {	public static void main(String[] args) {		DisposableServer server =				TcpServer.create()		// 创建一个TcpServer 准备好配置的实例				         .bindNow();	// 以阻塞方式启动服务器并等待它完成初始化		server.onDispose()		      .block();	}}

启动和停止(主机和端口)

要在特定的host和上提供服务port,可以将以下配置应用到TCP服务器:

import reactor.netty.DisposableServer;import reactor.netty.tcp.TcpServer;public class Application {	public static void main(String[] args) {		DisposableServer server =				TcpServer.create()				         .host("localhost") 				         .port(8080)        				         .bindNow();		server.onDispose()		      .block();	}}
import reactor.core.publisher.Mono;import reactor.netty.DisposableServer;import reactor.netty.tcp.TcpServer;public class MultiAddressApplication {	public static void main(String[] args) {		TcpServer tcpServer = TcpServer.create();                  // 配置第一台TCP服务器主机、端口		DisposableServer server1 = tcpServer				.host("localhost") 				.port(8080)        				.bindNow();         // 配置第二台TCP服务器主机、端口		DisposableServer server2 = tcpServer				.host("0.0.0.0") 				.port(8081)      				.bindNow();		Mono.when(server1.onDispose(), server2.onDispose())				.block();	}}

急切初始化

默认情况下,TcpServer资源的初始化是按需进行的。这意味着bind operation吸收了初始化和加载所需的额外时间:

①:事件循环组
②:本机传输库(使用本机传输时)
③:用于安全性的本机库(在的情况下OpenSsl)

import reactor.netty.DisposableServer;import reactor.netty.tcp.TcpServer;public class Application {	public static void main(String[] args) {		TcpServer tcpServer =				TcpServer.create()				         .handle((inbound, outbound) -> inbound.receive().then());		tcpServer.warmup()  // 初始化并加载事件循环组、本机传输库和用于安全性的本机库		         .block();		DisposableServer server = tcpServer.bindNow();		server.onDispose()		      .block();	}	}

消费客户端数据

为了从连接的客户端接收数据,必须附加一个I/O 处理程序。I/O 处理程序可以访问NettyInbound以读取数据。

import reactor.netty.DisposableServer;import reactor.netty.tcp.TcpServer;public class Application {	public static void main(String[] args) {		DisposableServer server =				TcpServer.create()                          // 从连接的客户端接收数据				         .handle((inbound, outbound) -> inbound.receive().then()) 				         .bindNow();		server.onDispose()		      .block();	}	}

生命周期回调

TcpServer提供了以下生命周期回调以便扩展。

回调函数描述
doOnBind在服务器通道即将绑定时调用
doOnBound在绑定服务器通道时调用
doOnChannelInit初始化通道时调用
doOnConnection连接远程客户端时调用
doOnUnbound当服务器通道未绑定时调用

使用doOnConnection和doOnChannelInit回调:

import io.netty.handler.logging.LoggingHandler;import io.netty.handler.timeout.ReadTimeoutHandler;import reactor.netty.DisposableServer;import reactor.netty.tcp.TcpServer;import java.util.concurrent.TimeUnit;public class Application {	public static void main(String[] args) {		DisposableServer server =				TcpServer.create()                         // 当连接远程客户端时,Netty管道使用ReadTimeoutHandler进行扩展				         .doOnConnection(conn ->				             conn.addHandler(new ReadTimeoutHandler(10, TimeUnit.SECONDS)))                          // 初始化通道时,Netty管道使用LoggingHandler进行扩展				         .doOnChannelInit((observer, channel, remoteAddress) ->				             channel.pipeline()				                    .addFirst(new LoggingHandler("reactor.netty.examples")))				         .bindNow();		server.onDispose()		      .block();	}	}

TCP-level配置(三种配置)

(1)Setting Channel Options:设置通道参数选项

默认情况下,TCP服务器配置有以下选项:

Map<ChannelOption<?>, Boolean> childOptions = new HashMap<>(2);childOptions.put(ChannelOption.AUTO_READ, false);childOptions.put(ChannelOption.TCP_NODELAY, true);this.config = new TcpServerConfig(	Collections.singletonMap(ChannelOption.SO_REUSEADDR, true),	childOptions,	() -> new InetSocketAddress(DEFAULT_PORT));

如果需要其他选项或需要更改当前选项,可以应用以下配置:

import io.netty.channel.ChannelOption;import reactor.netty.DisposableServer;import reactor.netty.tcp.TcpServer;public class Application {	public static void main(String[] args) {		DisposableServer server =				TcpServer.create()				         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)				         .bindNow();		server.onDispose()		      .block();	}	}

(2)Wire Logger:连线日志记录

Reactor Netty提供连线日志记录,用于何时需要检查对等点之间的流量。

默认情况下,线路日志记录处于禁用状态。要启用它,必须将记录器reactor.netty.tcp.TcpServer级别设置为DEBUG并应用以下配置:

import reactor.netty.DisposableServer;import reactor.netty.tcp.TcpServer;public class Application {	public static void main(String[] args) {		DisposableServer server =				TcpServer.create()				         .wiretap(true) // 启用连线记录				         .bindNow();		server.onDispose()		      .block();	}	}

Wire Logger格式化程序
Reactor Netty支持3种不同的格式化程序:

连线日志格式化描述
AdvancedByteBufFormat#HEX_DUM同时记录事件和内容。内容将采用十六进制格式(默认)
AdvancedByteBufFormat#SIMPLE使用此格式启用连线记录时,仅记录事件
AdvancedByteBufFormat#TEXTUAL同时记录事件和内容。内容将采用纯文本格式

当需要更改默认格式化程序时,使用方式:

import io.netty.handler.logging.LogLevel;import reactor.netty.DisposableServer;import reactor.netty.tcp.TcpServer;import reactor.netty.transport.logging.AdvancedByteBufFormat;public class Application {	public static void main(String[] args) {		DisposableServer server =				TcpServer.create()                         // 启用连线记录, AdvancedByteBufFormat#TEXTUAL用于打印内容				         .wiretap("logger-name", LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL) 				         .bindNow();		server.onDispose()		      .block();	}	}
①:AdvancedByteBufFormat#HEX_DUM - 默认

使用此格式启用连线记录时,将同时记录事件和内容。内容将采用十六进制格式

reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] REGISTEREDreactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] ACTIVEreactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] READ: 145B *          +-------------------------------------------------+ *          |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f | * +--------+-------------------------------------------------+----------------+ * |00000000| 50 4f 53 54 20 2f 74 65 73 74 2f 57 6f 72 6c 64 |POST /test/World| * |00000010| 20 48 54 54 50 2f 31 2e 31 0d 0a 43 6f 6e 74 65 | HTTP/1.1..Conte| * |00000020| 6e 74 2d 54 79 70 65 3a 20 74 65 78 74 2f 70 6c |nt-Type: text/pl| * |00000030| 61 69 6e 0d 0a 75 73 65 72 2d 61 67 65 6e 74 3a |ain..user-agent:| * |00000040| 20 52 65 61 63 74 6f 72 4e 65 74 74 79 2f 64 65 | ReactorNetty/de| * ... * reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] WRITE: 38B *          +-------------------------------------------------+ *          |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f | * +--------+-------------------------------------------------+----------------+ * |00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.| * |00000010| 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a |.content-length:| * |00000020| 20 30 0d 0a 0d 0a                               | 0....          | * +--------+-------------------------------------------------+----------------+
②:AdvancedByteBufFormat#SIMPLE

使用此格式启用连线记录时,仅记录事件

reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] REGISTEREDreactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] ACTIVEreactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] READ: 145Breactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] WRITE: 38B
③:AdvancedByteBufFormat#TEXTUAL

使用此格式启用连线记录时,将同时记录事件和内容。内容将采用纯文本格式。

reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] REGISTEREDreactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] ACTIVEreactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] READ: 145B POST /test/World HTTP/1.1 * Content-Type: text/plain * user-agent: ReactorNetty/dev * ... * reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] WRITE: 38B HTTP/1.1 200 OK * content-length: 0

(3)Event Loop Group:事件循环组

默认情况下,TCP服务器使用“事件循环组”,其中工作线程的数量等于初始化时运行时可用的处理器数量(但最小值为4)。当需要不同的配置时,可以使用LoopResource#create方法之一。

默认配置Event Loop Group如下:

/***默认工作线程数,回退到可用处理器(但最小值为4) */public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount";/** *默认选择器线程计数,回退到-1(无选择器线程) */public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount";/***UDP的默认工作线程数,回退到可用处理器(但最小值为4) */public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount";/***默认的静默期,保证不会发生对底层循环资源的处置,回退到2秒。*/public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod";/***默认情况下,无论任务是否在静默期内提交,在处理底层资源之前等待的最长时间为15秒。 */public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout";/***默认值是否首选本机传输(epoll、kqueue),回退在可用时是否首选*/public static final String NATIVE = "reactor.netty.native";

如果需要更改这些设置,可以应用以下配置:

import reactor.netty.DisposableServer;import reactor.netty.resources.LoopResources;import reactor.netty.tcp.TcpServer;public class Application {	public static void main(String[] args) {		LoopResources loop = LoopResources.create("event-loop", 1, 4, true);		DisposableServer server =				TcpServer.create()				         .runOn(loop)				         .bindNow();		server.onDispose()		      .block();	}	}

Option和childOption参数设置

Option和childOption参数设置(点击进入…)


SSL和TLS

当需要SSL或TLS时,可以应用下一个清单中显示的配置。默认情况下,如果OpenSSL可用,SslProvider.OPENSSL则使用provider作为提供者。否则SslProvider.JDK使用。可以通过SslContextBuilder或通过设置来切换提供程序“-Dio.netty.handler.ssl.noOpenSsl=true”

SSL(Secure Socket Layer,安全套接字层)

位于可靠的面向连接的网络层协议和应用层协议之间的一种协议层。SSL通过互相认证、使用数字签名确保完整性、使用加密确保私密性,以实现客户端和服务器之间的安全通讯。该协议由两层组成:①SSL记录协议、②SSL握手协议

TLS(Transport Layer Security,传输层安全协议)

用于两个应用程序之间提供保密性和数据完整性。该协议由两层组成:①TLS记录协议、②TLS握手协议

SSL是Netscape开发的专门用户保护Web通讯的,目前版本为3.0。最新版本的TLS 1.0是IETF(工程任务组)制定的一种新的协议,它建立在SSL 3.0协议规范之上,是SSL 3.0的后续版本。两者差别极小,可以理解为SSL 3.1,它是写入了RFC的。

import reactor.netty.DisposableServer;import reactor.netty.tcp.TcpServer;import reactor.netty.tcp.TcpSslContextSpec;import java.io.File;public class Application {	public static void main(String[] args) {		File cert = new File("certificate.crt");		File key = new File("private.key");		TcpSslContextSpec tcpSslContextSpec = TcpSslContextSpec.forServer(cert, key);		DisposableServer server =				TcpServer.create()				         .secure(spec -> spec.sslContext(tcpSslContextSpec))				         .bindNow();		server.onDispose()		      .block();	}	}

服务器域名
可以配置TCP多个SslContext映射到特定域的服务器。配置SNI映射时可以使用确切的域名或包含通配符的域名

使用包含通配符的域名:

public class Application {	public static void main(String[] args) throws Exception {		File defaultCert = new File("default_certificate.crt");		File defaultKey = new File("default_private.key");		File testDomainCert = new File("default_certificate.crt");		File testDomainKey = new File("default_private.key");		SslContext defaultSslContext = SslContextBuilder.forServer(defaultCert, defaultKey).build();		SslContext testDomainSslContext = SslContextBuilder.forServer(testDomainCert, testDomainKey).build();		DisposableServer server =				TcpServer.create()				         .secure(spec -> spec.sslContext(defaultSslContext)				                             .addSniMapping("*.test.com",				                                     testDomainSpec -> testDomainSpec.sslContext(testDomainSslContext)))				         .bindNow();		server.onDispose()		      .block();	}	}

Metrics(监控指标)

TCP服务器支持与Micrometer,它公开了前缀为reactor.netty.tcp.server的所有指标

在应用程序中,通常会记录日志以便事后分析,在很多情况下是产生了问题之后,再去查看日志,是一种事后的静态分析。在很多时候,可能需要了解整个系统在当前,或者某一时刻运行的情况,比如一个系统后台服务,可能需要了解一些实时监控的数据
1、每秒钟的请求数是多少(TPS)?
2、平均每个请求处理的时间?
3、请求处理的最长耗时?
4、请求处理的响应的直方图?
5、请求处理正确响应率?
6、等待处理的请求队列长度?
7、查看整个系统的的CPU使用率、内存占用、jvm运行情况;以及系统运行出错率等等一系列的实时数据采集时,最简单的方法就是在系统的入口、出口和关键位置设置埋点,然后将采集到的信息发送到实时监控平台或者存入到缓存和DB中做进一步的分析和展示。

Metrics作为一款监控指标的度量类库,提供了许多工具帮助开发者来完成各项数据的监控。
Metrics提供5种基本的度量类型:Meters、Gauges、Counters、Histograms和Timers

启用该集成:

import reactor.netty.DisposableServer;import reactor.netty.tcp.TcpServer;public class Application {	public static void main(String[] args) {		DisposableServer server =				TcpServer.create()				         .metrics(true) // 启用与 Micrometer 的内置集成				         .bindNow();		server.onDispose()		      .block();	}	}

当需要TCP服务器指标与其他系统集成时,Micrometer或者想提供自己的集成Micrometer,可以提供自己的指标记录器,如下所示:

import reactor.netty.DisposableServer;import reactor.netty.channel.ChannelMetricsRecorder;import reactor.netty.tcp.TcpServer;import java.net.SocketAddress;import java.time.Duration;public class Application {	public static void main(String[] args) {		DisposableServer server =				TcpServer.create()						 // 启用 TCP 服务器指标并提供ChannelMetricsRecorder实现。				         .metrics(true, CustomChannelMetricsRecorder::new) 				         .bindNow();		server.onDispose()		      .block();	}	}

TCP服务器支持与Micrometer,它公开了前缀为reactor.netty.tcp.server的所有指标。

(1)汇总ConnectionProvider指标

metric nametype描述
reactor.netty.tcp.server.data.receivedDistributionSummary接收的数据量,以字节为单位
reactor.netty.tcp.server.data.sentDistributionSummary发送的数据量,以字节为单位
reactor.netty.tcp.server.errorsCounter发生的错误数
reactor.netty.tcp.server.tls.handshake.timeTimerTLS握手所花费的时间

(2)ByteBufAllocator指标

metric nametype描述
reactor.netty.bytebuf.allocator.used.heap.memoryGauge堆内存的字节数
reactor.netty.bytebuf.allocator.used.direct.memoryGauge直接内存的字节数
reactor.netty.bytebuf.allocator.used.heap.arenasGauge堆区域的数量(当PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.direct.arenasGauge直接竞技场的数量(当PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.threadlocal.cachesGauge线程本地缓存的数量(当PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.small.cache.sizeGauge小缓存的大小(当PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.normal.cache.sizeGauge正常缓存的大小(当PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.chunk.sizeGauge竞技场的块大小(当PooledByteBufAllocator)

(3)EventLoop指标:

metric nametype描述
reactor.netty.eventloop.pending.tasksGauge事件循环中待处理的任务数

Unix域套接字

TCP当使用本机传输时,客户端支持Unix域套接字(UDS)。

使用UDS支持:

import io.netty.channel.unix.DomainSocketAddress;import reactor.netty.DisposableServer;import reactor.netty.tcp.TcpServer;public class Application {	public static void main(String[] args) {		DisposableServer server =				TcpServer.create()                         // 指定DomainSocketAddress将被使用				         .bindAddress(() -> new DomainSocketAddress("/tmp/test.sock")) 				         .bindNow();		server.onDispose()		      .block();	}	}

也许您对下面的内容还感兴趣: