快盘下载:好资源、好软件、快快下载吧!

快盘排行|快盘最新

当前位置:首页软件教程电脑软件教程 → 基于Netty模拟大量WebSocket客户端

基于Netty模拟大量WebSocket客户端

时间:2022-11-06 18:04:58人气:作者:快盘下载我要评论

基于Netty模拟大量WebSocket客户端

目录

  • 基于Netty模拟大量WebSocket客户端
    • 一、概述
    • 二、错误用法演示
      • 1、启动类
      • 2、WebSocket连接类
      • 3、IO数据处理类
    • 三、正确用法演示
      • 1、启动类
      • 2、WebSocket连接类
      • 3、IO数据处理类

一、概述

前段时间需要写一个模拟大量客户端的程序来对服务器做压力测试;选用了Netty做为通信框架;通信协议采用了WebSocket;根据官方下载的源码和Demo写完后发现连接几十个连接后就无法继续获取新的Socket通道了;后来经过各种尝试后发现是EventLoopGroup用法不正确导致的;正确用法是所有的Socket连接共用一个EventLoopGroup就可以正常模拟大量的客户端连接了。

二、错误用法演示

文中所有代码主要参照Netty官方源码中netty-netty-4.1.84.Final/example/src/main/java/io/netty/example/http/websocketx/client/

1、启动类

public class Simulator {
    public static void start() {
        String serverIp = ;127.0.0.1;;
        int serverPort = 8005;
        for (int i = 0; i < 10000; i;;) {
            WebSocketConnector client = new WebSocketConnector(serverIp,serverPort);
            client.doConnect();
        }
    }
}

2、WebSocket连接类

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;

import java.net.URI;

/**
 * WebSocket协议类型的模拟客户端连接器类
 *
 * ;author duyanjun
 * ;since 2022/10/13 杜燕军 新建
 */
;Slf4j
public class WebSocketConnector {
    // 服务器ip
    protected String serverIp;
    // 服务器通信端口
    protected int serverSocketPort;
    // 网络通道
    private Channel channel;

    /**
     * WebSocket协议类型的模拟客户端连接器构造方法
     *
     * ;param serverIp
     * ;param serverSocketPort
     */
    public WebSocketConnector(String serverIp,int serverSocketPort) {
        this.serverIp = serverIp;
        this.serverSocketPort = serverSocketPort;
    }

    public void doConnect() {
        try {
            String URL = ;ws://;; this.serverIp ; ;:; ; this.serverSocketPort ; ;/;;
            URI uri = new URI(URL);
            final WebSocketIoHandler handler =
                    new WebSocketIoHandler(
                            WebSocketClientHandshakerFactory.newHandshaker(
                                    uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()));
            EventLoopGroup group = new NioEventLoopGroup();
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                    //.option(ChannelOption.TCP_nodeLAY, true)
                    .option(ChannelOption.SO_KEEPALIVE,true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        ;Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            // 添加一个http的编解码器
                            pipeline.addLast(new HttpClientCodec());
                            // 添加一个用于支持大数据流的支持
                            pipeline.addLast(new ChunkedWriteHandler());
                            // 添加一个聚合器;这个聚合器主要是将HttpMessage聚合成FullHttpRequest/Response
                            pipeline.addLast(new HttpObjectAggregator(1024 * 64));
                            pipeline.addLast(handler);
                        }
                    });
            try {
                synchronized (bootstrap) {
                    final ChannelFuture future = bootstrap.connect(this.serverIp, this.serverSocketPort).sync();
                    this.channel = future.channel();
                }
            } catch (InterruptedException e) {
                log.error(;连接服务失败.......................uri:; ; uri.toString(),e);
            }catch (Exception e) {
                log.error(;连接服务失败.......................uri:; ; uri.toString(),e);
            }
        } catch (Exception e) {
            log.error(;连接服务失败.......................;,e);
        } finally {
        }
    }

    public void disConnect() {
        this.channel.close();
    }
  • 问题代码主要在上述代码中 EventLoopGroup group = new NioEventLoopGroup() 这样使用后就导致每个WebSocket连接会创建一个新的EventLoopGroup对象,导致无法创建大量的客户端连接;
  • 正确用法应该是将EventLoopGroup提到WebSocketConnector类的上层;由WebSocketConnector对象的创建者维护一个公共的EventLoopGroup对象;所有WebSocketConnector对象共享一个EventLoopGroup对象;

3、IO数据处理类


import io.netty.channel.*;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;

/**
 * WebSocket协议类型的模拟客户端IO处理器类
 *
 * ;author duyanjun
 * ;since 2022/10/13 杜燕军 新建
 */
;Slf4j
public class WebSocketIoHandler extends SimpleChannelInboundHandler<Object> {

    private final WebSocketClientHandshaker handShaker;

    private ChannelPromise handshakeFuture;

    public WebSocketIoHandler(WebSocketClientHandshaker handShaker) {
        this.handShaker = handShaker;
    }

    public ChannelFuture handshakeFuture() {
        return handshakeFuture;
    }

    ;Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        handshakeFuture = ctx.newPromise();
    }

    ;Override
    public void channelActive(ChannelHandlerContext ctx) {
        handShaker.handshake(ctx.channel());
    }

    ;Override
    public void channelInactive(ChannelHandlerContext ctx) {
        ctx.close();
        try {
            super.channelInactive(ctx);
        } catch (Exception e) {
            log.error(;channelInactive 异常.;, e);
        }
        log.warn(;WebSocket链路与服务器连接已断开.;);
    }

    ;Override
    public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel ch = ctx.channel();
        if (!handShaker.isHandshakeComplete()) {
            try {
                handShaker.finishHandshake(ch, (FullHttpResponse) msg);
                handshakeFuture.setSuccess();
                log.info(;WebSocket握手成功;可以传输数据了.;);
                // 数据一定要封装成WebSocketFrame才能发达
                String data = ;Hello;;
                WebSocketFrame frame = new TextWebSocketFrame(data);
                ch.writeAndFlush(frame);
            } catch (WebSocketHandshakeException e) {
                log.warn(;WebSocket Client failed to connect;);
                handshakeFuture.setFailure(e);
            }
            return;
        }

        if (msg instanceof FullHttpResponse) {
            FullHttpResponse response = (FullHttpResponse) msg;
            throw new IllegalStateException(
                    ;Unexpected FullHttpResponse (getStatus=; ; response.status() ;
                            ;, content=; ; response.content().toString(CharsetUtil.UTF_8) ; ;););
        }

        WebSocketFrame frame = (WebSocketFrame) msg;
        if (frame instanceof TextWebSocketFrame) {
            TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
            String s = textFrame.text();
            log.info(;WebSocket Client received message: ; ; s);
        } else if (frame instanceof PongWebSocketFrame) {
            log.info(;WebSocket Client received pong;);
        } else if (frame instanceof CloseWebSocketFrame) {
            log.info(;WebSocket Client received closing;);
            ch.close();
        }
    }

    ;Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error(;WebSocket链路由于发生异常,与服务器连接已断开.;, cause);
        if (!handshakeFuture.isDone()) {
            handshakeFuture.setFailure(cause);
        }
        ctx.close();
        super.exceptionCaught(ctx, cause);
    }

    ;Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            // 如果写通道处于空闲状态,就发送心跳命令
            if (IdleState.WRITER_IDLE.equals(event.state()) || IdleState.READER_IDLE.equals(event.state())) {
                // 发送心跳数据
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

三、正确用法演示

1、启动类

import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;

public class Simulator {
    public static void start() {
        String serverIp = ;127.0.0.1;;
        int serverPort = 8005;
        EventLoopGroup group = new NioEventLoopGroup();
        for (int i = 0; i < 10000; i;;) {
            WebSocketConnector client = new WebSocketConnector(serverIp,serverPort,group);
            client.doConnect();
        }
    }
}

2、WebSocket连接类

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;

import java.net.URI;

/**
 * WebSocket协议类型的模拟客户端连接器类
 *
 * ;author duyanjun
 * ;since 2022/10/13 杜燕军 新建
 */
;Slf4j
public class WebSocketConnector {
    // 服务器ip
    protected String serverIp;
    // 服务器通信端口
    protected int serverSocketPort;
    // 事件循环线程池
    protected EventLoopGroup group;
    // 网络通道
    private Channel channel;

    /**
     * WebSocket协议类型的模拟客户端连接器构造方法
     *
     * ;param serverIp
     * ;param serverSocketPort
     * ;param group
     */
    public WebSocketConnector(String serverIp,int serverSocketPort,EventLoopGroup group) {
        this.serverIp = serverIp;
        this.serverSocketPort = serverSocketPort;
        this.group = group;
    }

    public void doConnect() {
        try {
            String URL = ;ws://;; this.serverIp ; ;:; ; this.serverSocketPort ; ;/;;
            URI uri = new URI(URL);
            final WebSocketIoHandler handler =
                    new WebSocketIoHandler(
                            WebSocketClientHandshakerFactory.newHandshaker(
                                    uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()));
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                    //.option(ChannelOption.TCP_NODELAY, true)
                    .option(ChannelOption.SO_KEEPALIVE,true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        ;Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            // 添加一个http的编解码器
                            pipeline.addLast(new HttpClientCodec());
                            // 添加一个用于支持大数据流的支持
                            pipeline.addLast(new ChunkedWriteHandler());
                            // 添加一个聚合器;这个聚合器主要是将HttpMessage聚合成FullHttpRequest/Response
                            pipeline.addLast(new HttpObjectAggregator(1024 * 64));
                            pipeline.addLast(handler);
                        }
                    });
            try {
                synchronized (bootstrap) {
                    final ChannelFuture future = bootstrap.connect(this.serverIp, this.serverSocketPort).sync();
                    this.channel = future.channel();
                }
            } catch (InterruptedException e) {
                log.error(;连接服务失败.......................uri:; ; uri.toString(),e);
            }catch (Exception e) {
                log.error(;连接服务失败.......................uri:; ; uri.toString(),e);
            }
        } catch (Exception e) {
            log.error(;连接服务失败.......................;,e);
        } finally {
        }

    }

    public void disConnect() {
        this.channel.close();
    }
}

3、IO数据处理类

// Io数据处理类WebSocketIoHandler.java没有变化;使用二、错误用法演示->3、IO数据处理类中所示代码即可

网友评论

快盘下载暂未开通留言功能。

关于我们| 广告联络| 联系我们| 网站帮助| 免责声明| 软件发布

Copyright 2019-2029 【快快下载吧】 版权所有 快快下载吧 | 豫ICP备10006759号公安备案:41010502004165

声明: 快快下载吧上的所有软件和资料来源于互联网,仅供学习和研究使用,请测试后自行销毁,如有侵犯你版权的,请来信指出,本站将立即改正。