`
jiangwenfeng762
  • 浏览: 286155 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

mina源码分析——bind

阅读更多

关于Mina

mina是开源的NIO框架,其project地址:

http://mina.apache.org/mina-project/features.html

想快速了解mina就看user guide:

http://mina.apache.org/mina-project/userguide/user-guide-toc.html

mina给我的感觉:干净、利落的抽象,非常容易上手,使用mina你只需要写不需要超过10行code就可以搭建一个TCP服务器,就像mina自身带的例子:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;

import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.example.gettingstarted.timeserver.TimeServerHandler;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

public class MinaTimeServer {

	/**
	 * @param args
	 */
	private static final int PORT = 9123;
	public static void main(String[] args) throws IOException {
		IoAcceptor acceptor = new NioSocketAcceptor();
		acceptor.getFilterChain().addLast( "logger", new LoggingFilter());
        acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName( "UTF-8" ))));
        acceptor.setHandler(new TimeServerHandler());
        acceptor.getSessionConfig().setReadBufferSize(2048);
        acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE, 10);
		acceptor.bind(new InetSocketAddress(PORT) );
	}
}

 

Mina源码分析之bind

mina好用只是它把复杂很好的进行了分解和隐藏,要想真正掌握它,还是要深入了解源码,所以从bind方法开始,bind方法是interface IoAcceptor声明的方法,先看看它的javadoc,了解bind方法的职责:

 

   /**
     * Binds to the default local address(es) and start to accept incoming
     * connections.
     *
     * @throws IOException if failed to bind
     */
    void bind() throws IOException;

    /**
     * Binds to the specified local address and start to accept incoming
     * connections.
     *
     * @param localAddress The SocketAddress to bind to
     *  
     * @throws IOException if failed to bind
     */
    void bind(SocketAddress localAddress) throws IOException;

    /**
     * Binds to the specified local addresses and start to accept incoming
     * connections. If no address is given, bind on the default local address.
     * 
     * @param firstLocalAddresses The first address to bind to 
     * @param addresses The SocketAddresses to bind to 
     *
     * @throws IOException if failed to bind
     */
    void bind(SocketAddress firstLocalAddress, SocketAddress... addresses) throws IOException;

    /**
     * Binds to the specified local addresses and start to accept incoming
     * connections. If no address is given, bind on the default local address.
     * 
     * @param addresses The SocketAddresses to bind to 
     *
     * @throws IOException if failed to bind
     */
    void bind(SocketAddress... addresses) throws IOException;

    /**
     * Binds to the specified local addresses and start to accept incoming
     * connections.
     *
     * @throws IOException if failed to bind
     */
    void bind(Iterable<? extends SocketAddress> localAddresses) throws IOException;

 bind方法职责

1.绑定到本地ip地址(如果没有指定就绑定到默认ip)

2.开始接受外面进来的请求。

通俗地说,一个Tcp的或则UDP的IO Server调用了bind方法后,就可以开始干活了,就可以接受请求了。顺便说一句,mina对IO的两端(Server,Client)都有很好的抽象和封装,IO Server由IoAcceptor代表,IO Client由IoConnector代表,而它们又都是IoService的直接子接口。关于mina的整体架构还是去看user guide,非常清晰。

 

明白了bind方法的职责,不妨先不要往下看,而是结合java NIO api 思考一下其大概的实现细节,然后再去翻code来验证你的推测。

对于一个基于NIO的server来说,要想能够开始接受外面的请求,有几件事必须做:

1.open一个SelectableChannel,SelectableChannel代表了可以支持非阻塞IO操作的channel

2.open一个Selector,Selector通过select方法来监控所有可以进行IO操作的SelectableChannel

3.完成SelectableChannel在Selector上的注册,这样Selector方可监控他。而类SelectionKey则代表一个成功的注册。

当然对于一个框架来说,肯定会做的更多,但以上3点是必须的,那接下来就去翻code吧

 

首先看bind方法所在的类层次:

bind方法所在的class位置

从类图中可以看出,真正处理bind逻辑的是bindInternal方法,该方法在AbstractIoAcceptor抽象类中声明,在AbstractPollingIoAcceptor类中实现。正如javadoc的说明,AbstractPollingIoAcceptor是A base class for implementing transport using a polling strategy. 

 

 bindInternal方法

下面是bindInternal方法的code:

 

protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
        // Create a bind request as a Future operation. When the selector
        // have handled the registration, it will signal this future.
        AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);

        // adds the Registration request to the queue for the Workers
        // to handle
        registerQueue.add(request);

        // creates the Acceptor instance and has the local
        // executor kick it off.
        startupAcceptor();

        // As we just started the acceptor, we have to unblock the select()
        // in order to process the bind request we just have added to the
        // registerQueue.
        try {
            lock.acquire();

            // Wait a bit to give a chance to the Acceptor thread to do the select()
            Thread.sleep(10);
            wakeup();
        } finally {
            lock.release();
        }

        // Now, we wait until this request is completed.
        request.awaitUninterruptibly();

        if (request.getException() != null) {
            throw request.getException();
        }

        // Update the local addresses.
        // setLocalAddresses() shouldn't be called from the worker thread
        // because of deadlock.
        Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();

        for (H handle : boundHandles.values()) {
            newLocalAddresses.add(localAddress(handle));
        }

        return newLocalAddresses;
    }

 

1.创建一个bind请求:

AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);

当前你可以把AcceptorOperationFuture简单理解为支持异步处理和事件通知机制的一个request,后续再详细分析它。

 

2.把请求加到队列中:

 registerQueue.add(request);

registerQueue保存所有新注册的请求,这些请求在registerHandles方法中被消费掉。registerHandles方法又被Acceptor.run方法调用。新注册成功的连接都被保存在:

 private final Map<SocketAddress, H> boundHandles = Collections.synchronizedMap(new HashMap<SocketAddress, H>());

 

3.创建Acceptor类,并且异步执行该类的run方法。

startupAcceptor();

Acceptor是一个内部类,实现了Runnable方法。它负责接收和处理从客户端进来的bind和unbind请求。下面会重点分析这个类。

 

4.等待10毫秒,然后调用wakeup方法来确保selector的select方法被唤醒。

try {

            lock.acquire();

 

            // Wait a bit to give a chance to the Acceptor thread to do the select()

            Thread.sleep(10);

            wakeup();

        } finally {

            lock.release();

        }

lock是一个信号量:

private final Semaphore lock = new Semaphore(1);由于可用值是1,因此相当于排他锁。

 这里要重点解释一下为什么要sleep这10毫秒:要搞清楚这点,首先要看Acceptor.run方法的实现(下面介绍),该方法调用了

int selected = select();

而select方法是abstract的:

    /**
     * Check for acceptable connections, interrupt when at least a server is ready for accepting.
     * All the ready server socket descriptors need to be returned by {@link #selectedHandles()}
     * @return The number of sockets having got incoming client
     * @throws Exception any exception thrown by the underlying systems calls
     */
    protected abstract int select() throws Exception;

 大家能够猜到其子类:NioSocketAcceptor的实现就是调用Selector.select方法:

    @Override
    protected int select() throws Exception {
        return selector.select();
    }

 而select方法在没有io事件也没有调用selector.wakeup方法时是阻塞的,反之就会被唤醒。所以才有了上面sleep10毫秒后调用wakeup方法(也是abstract方法,子类NioSocketAcceptor实现就是调用Selector.wakeUp),来唤醒select。但由于Accecptor是在独立线程运行的,因而sleep一会以保证那个线程已经启动,而不会错过唤醒select.

 

5.然后就等待请求被处理完毕,并判断是否有异常,没有异常就把绑定的本地地址返回。

 

Acceptor

Acceptor是一个内部类,实现了Runnable方法。它负责接收和处理从客户端进来的bind和unbind请求 

接下来重点看Acceptor的run方法:

        public void run() {
            assert (acceptorRef.get() == this);

            int nHandles = 0;

            // Release the lock
            lock.release();

            while (selectable) {
                try {
                    // Detect if we have some keys ready to be processed
                    // The select() will be woke up if some new connection
                    // have occurred, or if the selector has been explicitly
                    // woke up
                    int selected = select();

                    // this actually sets the selector to OP_ACCEPT,
                    // and binds to the port on which this class will
                    // listen on
                    nHandles += registerHandles();

                    // 这里推出while循环的逻辑 省略。。。

                    if (selected > 0) {
                        // We have some connection request, let's process
                        // them here.
                        processHandles(selectedHandles());
                    }

                    // check to see if any cancellation request has been made.
                    nHandles -= unregisterHandles();
                } catch (ClosedSelectorException cse) {
                    // If the selector has been closed, we can exit the loop
                    break;
                } catch (Throwable e) {
                    ExceptionMonitor.getInstance().exceptionCaught(e);

                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e1) {
                        ExceptionMonitor.getInstance().exceptionCaught(e1);
                    }
                }
            }

            // Cleanup all the processors, and shutdown the acceptor.
            // .....
            }
        }

 run方法重点是while循环:

1.首先调用select方法来看是否有新io请求进来(由于Acceptor只被bind和unbind调用,因此此时的请求只可能是bind请求和unbind请求)

2.调用registerHandles处理bind请求,该方法返回新处理的bind请求数目,如果=0,则推出循环,Acceptor的处理结束。处理bind请求的大概过程是:

1)从registerQueue队列中拿出bind请求:AcceptorOperationFuture(之前放进去的)

2)从AcceptorOperationFuture中拿出要bind的SocketAddress(可能不止一个);

3)对于每一个SocketAddress,调用open方法来建立ServerSocketChannel通道。open方法是abstract,其子类NioSocketAcceptor给出了实现,没错正如我们之前设想的那样,NioSocketAcceptor.open主要就是完成ServerSocketChannel的注册,源码如下:

    @Override
    protected ServerSocketChannel open(SocketAddress localAddress) throws Exception {
        // Creates the listening ServerSocket
        ServerSocketChannel channel = ServerSocketChannel.open();

        boolean success = false;

        try {
            // This is a non blocking socket channel
            channel.configureBlocking(false);

            // Configure the server socket,
            ServerSocket socket = channel.socket();

            // Set the reuseAddress flag accordingly with the setting
            socket.setReuseAddress(isReuseAddress());

            // and bind.
            socket.bind(localAddress, getBacklog());

            // Register the channel within the selector for ACCEPT event
            channel.register(selector, SelectionKey.OP_ACCEPT);
            success = true;
        } finally {
            if (!success) {
                close(channel);
            }
        }
        return channel;
    }

 4)把第3)建立好的ServerSocketChannel放到boundHandles中。boundHandles是一个线程安全的map:

 

private final Map<SocketAddress, H> boundHandles = Collections.synchronizedMap(new HashMap<SocketAddress, H>());

5)返回新注册的size。

 

3. 如果select方法返回值>0,就调用processHandles(selectedHandles());来处理。处理过程大致如下:

1)从selector中拿出有io发生的ServerSocketChannel集合;这个过程是在子类NioSocketAcceptor .selectedHandles方法中发生的。

2)对于每一个ServerSocketChannel,调用accept方法来建立会话:IoSession,IoSession是mina中非常重要的概念,以后再分析。accept方法也是abstract的,其子类NioSocketAccepor给出了实现:

    @Override
    protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {

        SelectionKey key = handle.keyFor(selector);

        if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) {
            return null;
        }

        // accept the connection from the client
        SocketChannel ch = handle.accept();

        if (ch == null) {
            return null;
        }

        return new NioSocketSession(this, processor, ch);
    }

3)初始化session:initSession(session, null, null);

4)保存session到IoProcessor

 

4.处理unbind的逻辑:

nHandles -= unregisterHandles();

前面说过Acceptor的用户是bind和unbind,所以run方法里自然会有处理unbind的逻辑,不了解这一点看到这段code可能会有些困惑。

unbind的逻辑就很简单了,大致过程如下:

1)从cancelQueue中取出unbind的请求:AcceptorOperationFuture

2)从AcceptorOperationFuture中取出SocketAddress对象(可能不止一个)

3)对于每一个SocketAddress,把它从boundHandles中删除

4)对于每一个SocketAddress,调用close方法来关闭它,close方法也是abstract方法,子类NioSocketAcceptor给出了实现:

    @Override
    protected void close(ServerSocketChannel handle) throws Exception {
        SelectionKey key = handle.keyFor(selector);

        if (key != null) {
            key.cancel();
        }

        handle.close();
    }

 5)调用AcceptorOperationFuture.setDone来做事件通知

 

AbstractPollingIoAcceptor.bindInternal

->startupAcceptor//启动Acceptor线程

-->Acceptor.run

--->registerHandles

主要逻辑:完成ServerSocketChannel在selector中的注册,感兴趣的事件是: SelectionKey.OP_ACCEPT,注册后ServerSocketChannel就可以在指定的ip和port上监听新连接。

1)从registerQueue中取出头元素,registerQueue保存待注册的请求

AcceptorOperationFuture future = registerQueue.poll();

1)创建ServerSocketChannel:ServerSocketChannel.open();

2)把ServerSocket绑定到指定的SocketAddress,SocketAddress来自

List<SocketAddress> localAddresses = future.getLocalAddresses();

3)把ServerSocketChannel注册到selector:channel.register(selector, SelectionKey.OP_ACCEPT);

4)返回channel并保存在boundHandles中

--->processHandles

主要逻辑:处理ServerSocketChannel上的新连接请求,把新连接请求构建新的IoSession并初始化,然后把IoSession实例交给IoProcessor来管理和处理io操作。

1)调用NioSocketAcceptor.accept方法返回NioSocketSession实例。构建NioSocketSession需要SocketChannel、IoProcessor等参数。

2)调用initSession方法

3)把当前session加到IoProcessor中:void add(S session)

--->unregisterHandles

 

主要逻辑:响应doUnbind方法,关闭ServerSocketChannel

  • 大小: 16.7 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics