这里是普通文章模块栏目内容页
宜人贷蜂巢API网关技术解密之Netty使用实践

宜人贷蜂巢团队,由Michael创立于2013年,通过使用互联网科技手段助力金融生态和谐健康发展。自成立起一直致力于多维度数据闭环平台建设。目前团队规模超过百人,涵盖征信、电商、金融、社交、五险一金和保险等用户授信数据的抓取解析业务,辅以先进的数据分析、挖掘和机器学习等技术对用户信用级别、欺诈风险进行预测评定,全面对外输出金融反欺诈、社交图谱、自动化模型定制等服务或产品。

目前宜人贷蜂巢基于用户授权数据实时抓取解析技术,并结合顶尖大数据技术,快速迭代和自主的创新,已形成了强大而领先的聚合和输出能力。

为了适应完成宜人贷蜂巢强大的服务输出能力,蜂巢设计开发了自己的API网关系统,集中实现了鉴权、加解密、路由、限流等功能,使各业务抓取团队关注其核心抓取和分析工作,而API网关系统更专注于安全、流量、路由等问题,从而更好的保障蜂巢服务系统的质量。今天带着大家解密API网关的Netty线程池技术实践细节。

API网关作为宜人贷蜂巢数据开放平台的统一入口,所有的客户端及消费端通过统一的API来使用各类抓取服务。从面向对象设计的角度看,它与外观模式类似,包装各类不同的实现细节,对外表现出统一的调用形式。

本文首先,简要地介绍API网关的项目框架,其次对比BIO和NIO的特点,再引入Netty作为项目的基础框架,然后介绍Netty线程池的原理,最后深入Netty线程池的初始化、ServerBootstrap的初始化与启动及channel与线程池的绑定过程,让读者了解Netty在承载高并发访问的设计路思。

一、项目框架

API网关项目框架

图1 - API网关项目框架

图中描绘了API网关系统的处理流程,以及与服务注册发现、日志分析、报警系统、各类爬虫的关系。其中API网关系统接收请求,对请求进行编解码、鉴权、限流、加解密,再基于Eureka服务注册发现模块,将请求发送到有效的服务节点上;网关及抓取系统的日志,会被收集到elk平台中,做业务分析及报警处理。

二、BIO vs NIO

API网关承载数倍于爬虫的流量,提升 服务器 的并发处理能力、缩短系统的响应时间,通信模型的选择是至关重要的,是选择BIO,还是NIO?

1. Streamvs Buffer & 阻塞 vs 非阻塞

BIO是面向流的,io的读写,每次只能处理一个或者多个bytes,如果数据没有读写完成,线程将一直等待于此,而不能暂时跳过io或者等待io读写完成异步通知,线程滞留在io读写上,不能充分利用机器有限的线程资源,造成server的吞吐量较低,见图2。而NIO与此不同,面向Buffer,线程不需要滞留在io读写上,采用操作系统的epoll模式,在io数据准备好了,才由线程来处理,见图3。

 BIO 从流中读取数据

图2 – BIO 从流中读取数据

宜人贷蜂巢API网关技术解密之Netty使用实践

图3 – NIO 从Buffer中读取数据

2. Selectors

NIO的selector使一个线程可以监控多个channel的读写,多个channel注册到一个selector上,这个selector可以监测到各个channel的数据准备情况,从而使用有限的线程资源处理更多的连接,见图4。所以可以这样说,NIO极大的提升了服务器接受并发请求的能力,而服务器性能还是要取决于业务处理时间和业务线程池模型。

NIO 单一线程管理多个连接

图4 – NIO 单一线程管理多个连接

而BIO采用的是request-per-thread模式,用一个线程负责接收TCP连接请求,并建立链路,然后将请求dispatch给负责业务逻辑处理的线程,见图5。一旦访问量过多,就会造成机器的线程资源紧张,造成请求延迟,甚至服务宕机。

宜人贷蜂巢API网关技术解密之Netty使用实践

图5 – BIO 一连接一线程

对比JDK NIO与诸多NIO框架后,鉴于Netty优雅的设计、易用的API、优越的性能、安全性支持、API网关使用Netty作为通信模型,实现了基础框架的搭建。

三、线程池

考虑到API网关的高并发访问需求,线程池设计,见图6。

宜人贷蜂巢API网关技术解密之Netty使用实践

图6 – API网关线程池设计

#p#分页标题#e#

Netty的线程池理念有点像ForkJoinPool,不是一个线程大池子并发等待一条任务队列,而是每条线程都有一个任务队列。而且Netty的线程,并不只是简单的阻塞地拉取任务,而是在每个循环中做三件事情:

先SelectKeys()处理NIO的事件

然后获取本线程的定时任务,放到本线程的任务队列里

最后执行其他线程提交给本线程的任务

每个循环里处理NIO事件与其他任务的时间消耗比例,还能通过ioRatio变量来控制,默认是各占50%。可见,Netty的线程根本没有阻塞等待任务的清闲日子,所以也不使用有锁的BlockingQueue来做任务队列了,而是使用无锁的MpscLinkedQueue(Mpsc 是Multiple Producer, Single Consumer的缩写)

四、NioEventLoopGroup初始化

下面分析下Netty线程池NioEventLoopGroup的设计与实现细节,NioEventLoopGroup的类层次关系见图7

NioEvenrLoopGroup类层次关系

图7 –NioEvenrLoopGroup类层次关系

其创建过程——方法调用,见下图

NioEvenrLoopGroup创建调用关系

图8 –NioEvenrLoopGroup创建调用关系

宜人贷蜂巢API网关技术解密之Netty使用实践

NioEvenrLoopGroup的创建,具体执行过程是执行类MultithreadEventExecutorGroup的构造方法:

/**  

 * Create a new instance.  

 *  

 * @param nThreads          the number of threads that will be used by this instance.  

 * @param executor          the Executor to use, or {@code null} if the default should be used.  

 * @param chooserFactory    the {@link EventExecutorChooserFactory} to use.  

 * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call  

 */  

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,  

                                        EventExecutorChooserFactory chooserFactory, Object... args) {  

    if (nThreads <= 0) {  

        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));  

    }  

    if (executor == null) {  

        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());  

    }  

    children = new EventExecutor[nThreads];  

    for (int i = 0; i < nThreads; i ++) {  

        boolean success = false;  

        try {  

            children[i] = newChild(executor, args);  

            success = true;  

        } catch (Exception e) {   

#p#分页标题#e#

            throw new IllegalStateException("failed to create a child event loop", e);  

        } finally {  

            if (!success) {  

                for (int j = 0; j < i; j ++) {  

                    children[j].shutdownGracefully();  

                }  

                for (int j = 0; j < i; j ++) { 

                     EventExecutor e = children[j]; 

                     try { 

                         while (!e.isTerminated()) {  

                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);  

                        }  

                    } catch (InterruptedException interrupted) {  

                        // Let the caller handle the interruption.  

                        Thread.currentThread().interrupt();  

                        break;  

                    }  

                }  

            }  

        }  

    }  

    chooser = chooserFactory.newChooser(children);  

    final FutureListener<Object> terminationListener = new FutureListener<Object>() {  

        @Override  

        public void operationComplete(Future<Object> future) throws Exception {  

            if (terminatedChildren.incrementAndGet() == children.length) {  

                terminationFuture.setSuccess(null);  

            }  

        }  

    };  

    for (EventExecutor e: children) {  

        e.terminationFuture().addListener(terminationListener);  

    }  

    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);  

    Collections.addAll(childrenSet, children);  

    readonlyChildren = Collections.unmodifiableSet(childrenSet);  

其中,创建细节见下:

线程池中的线程数nThreads必须大于0;

如果executor为null,创建默认executor,executor用于创建线程(newChild方法使用executor对象);

依次创建线程池中的每一个线程即NioEventLoop,如果其中有一个创建失败,将关闭之前创建的所有线程;

chooser为线程池选择器,用来选择下一个EventExecutor,可以理解为,用来选择一个线程来执行task;

chooser的创建细节,见下

DefaultEventExecutorChooserFactory根据线程数创建具体的EventExecutorChooser,线程数如果等于2^n,可使用按位与替代取模运算,节省cpu的计算资源,见源码

@SuppressWarnings("unchecked")  

@Override  

public EventExecutorChooser newChooser(EventExecutor[] executors) {  

    if (isPowerOfTwo(executors.length)) {  

        return new PowerOfTowEventExecutorChooser(executors);  

    } else {  

        return new GenericEventExecutorChooser(executors);  

    }  

}   

    private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {  

        private final AtomicInteger idx = new AtomicInteger();  

        private final EventExecutor[] executors;   

 

        PowerOfTowEventExecutorChooser(EventExecutor[] executors) {  

            this.executors = executors;  

        }   

 

        @Override  

        public EventExecutor next() {  

            return executors[idx.getAndIncrement() & executors.length - 1];  

        }  

    }   

 

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {  

        private final AtomicInteger idx = new AtomicInteger();  

        private final EventExecutor[] executors;   

 

        GenericEventExecutorChooser(EventExecutor[] executors) {  

            this.executors = executors;  

        }   

 

        @Override  

        public EventExecutor next() {  

            return executors[Math.abs(idx.getAndIncrement() % executors.length)];  

        }  

    } 

newChild(executor, args)的创建细节,见下

MultithreadEventExecutorGroup的newChild方法是一个抽象方法,故使用NioEventLoopGroup的newChild方法,即调用NioEventLoop的构造函数

@Override  

    protected EventLoop newChild(Executor executor, Object... args) throws Exception {  

        return new NioEventLoop(this, executor, (SelectorProvider) args[0], 

            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);  

    } 

在这里先看下NioEventLoop的类层次关系

宜人贷蜂巢API网关技术解密之Netty使用实践

#p#分页标题#e#

NioEventLoop的继承关系比较复杂,在AbstractScheduledEventExecutor 中, Netty 实现了 NioEventLoop 的 schedule 功能, 即我们可以通过调用一个 NioEventLoop 实例的 schedule 方法来运行一些定时任务. 而在 SingleThreadEventLoop 中, 又实现了任务队列的功能, 通过它, 我们可以调用一个NioEventLoop 实例的 execute 方法来向任务队列中添加一个 task, 并由 NioEventLoop 进行调度执行.

#p#分页标题#e#

通常来说, NioEventLoop 肩负着两种任务, 第一个是作为 IO 线程, 执行与 Channel 相关的 IO 操作, 包括调用 select 等待就绪的 IO 事件、读写数据与数据的处理等; 而第二个任务是作为任务队列, 执行 taskQueue 中的任务, 例如用户调用 eventLoop.schedule 提交的定时任务也是这个线程执行的。

具体的构造过程,见下

宜人贷蜂巢API网关技术解密之Netty使用实践

宜人贷蜂巢API网关技术解密之Netty使用实践

宜人贷蜂巢API网关技术解密之Netty使用实践

创建任务队列tailTasks(内部为有界的LinkedBlockingQueue)

宜人贷蜂巢API网关技术解密之Netty使用实践

创建线程的任务队列taskQueue(内部为有界的LinkedBlockingQueue),以及任务过多防止系统宕机的拒绝策略rejectedHandler

其中tailTasks和taskQueue均是任务队列,而优先级不同,taskQueue的优先级高于tailTasks,定时任务的优先级高于taskQueue。

五、ServerBootstrap初始化及启动

了解了Netty线程池NioEvenrLoopGroup的创建过程后,下面看下API网关服务ServerBootstrap的是如何使用线程池引入服务中,为高并发访问服务的。

API网关ServerBootstrap初始化及启动代码,见下

serverBootstrap = new ServerBootstrap();  

bossGroup = new NioEventLoopGroup(config.getBossGroupThreads());  

workerGroup = new NioEventLoopGroup(config.getWorkerGroupThreads());   

 

serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)  

        .option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay())  

        .option(ChannelOption.SO_BACKLOG, config.getBacklogSize())  

        .option(ChannelOption.SO_KEEPALIVE, config.isSoKeepAlive())  

        // Memory pooled  

        .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)  

        .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)  

        .childHandler(channelInitializer);    

 

ChannelFuture future = serverBootstrap.bind(config.getPort()).sync();  

log.info("API-gateway started on port: {}", config.getPort());  

future.channel().closeFuture().sync(); 

API网关系统使用netty自带的线程池,共有三组线程池,分别为bossGroup、workerGroup和executorGroup(使用在channelInitializer中,本文暂不作介绍)。其中,bossGroup用于接收客户端的TCP连接,workerGroup用于处理I/O、执行系统task和定时任务,executorGroup用于处理网关业务加解密、限流、路由,及将请求转发给后端的抓取服务等业务操作。

六、Channel与线程池的绑定

ServerBootstrap初始化后,通过调用bind(port)方法启动Server,bind的调用链如下

AbstractBootstrap.bind ->AbstractBootstrap.doBind -> AbstractBootstrap.initAndRegister 

其中,ChannelFuture regFuture = config().group().register(channel);中的group()方法返回bossGroup,而channel在serverBootstrap的初始化过程指定channel为NioServerSocketChannel.class,至此将NioServerSocketChannel与bossGroup绑定到一起,bossGroup负责客户端连接的建立。那么NioSocketChannel是如何与workerGroup绑定到一起的?

调用链AbstractBootstrap.initAndRegister -> AbstractBootstrap. init-> ServerBootstrap.init ->ServerBootstrapAcceptor.ServerBootstrapAcceptor ->ServerBootstrapAcceptor.channelRead

public void channelRead(ChannelHandlerContext ctx, Object msg) {  

    final Channel child = (Channel) msg;  

    child.pipeline().addLast(childHandler);  

    for (Entry<ChannelOption<?>, Object> e: childOptions) {  

        try {  

#p#分页标题#e#

            if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {  

                logger.warn("Unknown channel option: " + e);  

            }  

        } catch (Throwable t) {  

            logger.warn("Failed to set a channel option: " + child, t); 

        }  

    }  

    for (Entry<AttributeKey<?>, Object> e: childAttrs) {  

        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());  

    } 

 

    try {  

        childGroup.register(child).addListener(new ChannelFutureListener() {  

            @Override  

            public void operationComplete(ChannelFuture future) throws Exception {  

                if (!future.isSuccess()) { 

                     forceClose(child, future.cause());  

                }  

            }  

        });  

    } catch (Throwable t) {  

        forceClose(child, t);  

    }  

其中,childGroup.register(child)就是将NioSocketChannel与workderGroup绑定到一起,那又是什么触发了ServerBootstrapAcceptor的channelRead方法?

其实当一个 client 连接到 server 时, Java 底层的 NIO ServerSocketChannel 会有一个SelectionKey.OP_ACCEPT 就绪事件, 接着就会调用到 NioServerSocketChannel.doReadMessages方法

@Override  

protected int doReadMessages(List<Object> buf) throws Exception {  

    SocketChannel ch = javaChannel().accept();  

    try {  

        if (ch != null) {  

            buf.add(new NioSocketChannel(this, ch));  

            return 1;  

        }  

    } catch (Throwable t) {          … 

 

    }  

    return 0;  

javaChannel().accept() 会获取到客户端新连接的SocketChannel,实例化为一个 NioSocketChannel, 并且传入 NioServerSocketChannel 对象(即 this), 由此可知, 我们创建的这个NioSocketChannel 的父 Channel 就是 NioServerSocketChannel 实例 。

接下来就经由 Netty 的 ChannelPipeline 机制, 将读取事件逐级发送到各个 handler 中, 于是就会触发前面我们提到的 ServerBootstrapAcceptor.channelRead 方法啦。

#p#分页标题#e#

至此,分析了Netty线程池的初始化、ServerBootstrap的启动及channel与线程池的绑定过程,能够看出Netty中线程池的优雅设计,使用不同的线程池负责连接的建立、IO读写等,为API网关项目的高并发访问提供了技术基础。

七、总结

至此,对API网关技术的Netty实践分享就到这里,各位如果对中间的各个环节有什么疑问和建议,欢迎大家指正,我们一起讨论,共同学习提高。

参考:

https://segmentfault.com/a/1190000007403873

https://segmentfault.com/a/1190000007283053

【本文是51CTO专栏机构宜信技术学院的原创文章,微信公众号“宜信技术学院( id: CE_TECH)”】

戳这里,看该作者更多好文

收藏
0
有帮助
0
没帮助
0