zoukankan      html  css  js  c++  java
  • rpc框架之 thrift连接池实现

    接前一篇rpc框架之HA/负载均衡构架设计 继续,写了一个简单的thrift 连接池:

    先做点准备工作:

    package yjmyzz;
    
    public class ServerInfo {
    
        public String getHost() {
            return host;
        }
    
        public void setHost(String host) {
            this.host = host;
        }
    
        public int getPort() {
            return port;
        }
    
        public void setPort(int port) {
            this.port = port;
        }
    
        private String host;
        private int port;
    
        public ServerInfo(String host, int port) {
            this.host = host;
            this.port = port;
        }
    
        public String toString() {
            return "host:" + host + ",port:" + port;
        }
    }
    

    上面这个类,用来封装服务端的基本信息,主机名+端口号,连接时需要用到。

    package yjmyzz;
    
    import org.apache.thrift.transport.TTransport;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    public class TransfortWrapper {
    
        private TTransport transport;
    
        /**
         * 是否正忙
         */
        private boolean isBusy = false;
    
        /**
         * 是否已经挂
         */
        private boolean isDead = false;
    
        /**
         * 最后使用时间
         */
        private Date lastUseTime;
    
        /**
         * 服务端Server主机名或IP
         */
        private String host;
    
        /**
         * 服务端Port
         */
        private int port;
    
        public TransfortWrapper(TTransport transport, String host, int port, boolean isOpen) {
            this.lastUseTime = new Date();
            this.transport = transport;
            this.host = host;
            this.port = port;
            if (isOpen) {
                try {
                    transport.open();
                } catch (Exception e) {
                    //e.printStackTrace();
                    System.err.println(host + ":" + port + " " + e.getMessage());
                    isDead = true;
                }
            }
        }
    
        public TransfortWrapper(TTransport transport, String host, int port) {
            this(transport, host, port, false);
        }
    
    
        public boolean isBusy() {
            return isBusy;
        }
    
        public void setIsBusy(boolean isBusy) {
            this.isBusy = isBusy;
        }
    
        public boolean isDead() {
            return isDead;
        }
    
        public void setIsDead(boolean isDead) {
            this.isDead = isDead;
        }
    
        public TTransport getTransport() {
            return transport;
        }
    
        public void setTransport(TTransport transport) {
            this.transport = transport;
        }
    
        /**
         * 当前transport是否可用
         *
         * @return
         */
        public boolean isAvailable() {
            return !isBusy && !isDead && transport.isOpen();
        }
    
        public Date getLastUseTime() {
            return lastUseTime;
        }
    
        public void setLastUseTime(Date lastUseTime) {
            this.lastUseTime = lastUseTime;
        }
    
        public String getHost() {
            return host;
        }
    
        public int getPort() {
            return port;
        }
    
        public String toString() {
            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            return "hashCode:" + hashCode() + "," +
                    host + ":" + port + ",isBusy:" + isBusy + ",isDead:" + isDead + ",isOpen:" +
                    transport.isOpen() + ",isAvailable:" + isAvailable() + ",lastUseTime:" + format.format(lastUseTime);
        }
    }
    

    这是对TTransport的封装,主要增加了一些辅助信息,直接看代码注释即可。

    下面才是连接池的主要内容:

    package yjmyzz;
    
    import org.apache.thrift.transport.TSocket;
    import org.apache.thrift.transport.TTransport;
    
    import java.util.Date;
    import java.util.List;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.TimeUnit;
    
    /**
     * Thrift连接池
     *
     * @author : 菩提树下的杨过(http://yjmyzz.cnblogs.com/)
     * @version : 0.1 BETA
     * @since : 2015-09-27(中秋)
     */
    public class ThriftTransportPool {
    
        Semaphore access = null;
        TransfortWrapper[] pool = null;
        int poolSize = 1;//连接池大小
        int minSize = 1;//池中保持激活状态的最少连接个数
        int maxIdleSecond = 300;//最大空闲时间(秒),超过该时间的空闲时间的连接将被关闭
        int checkInvervalSecond = 60;//每隔多少秒,检测一次空闲连接(默认60秒)
        List<ServerInfo> serverInfos;
        boolean allowCheck = true;
        Thread checkTread = null;
    
        public int getCheckInvervalSecond() {
            return checkInvervalSecond;
        }
    
        public void setCheckInvervalSecond(int checkInvervalSecond) {
            this.checkInvervalSecond = checkInvervalSecond;
        }
    
    
        /**
         * 连接池构造函数
         *
         * @param poolSize            连接池大小
         * @param minSize             池中保持激活的最少连接数
         * @param maxIdleSecond       单个连接最大空闲时间,超过此值的连接将被断开
         * @param checkInvervalSecond 每隔多少秒检查一次空闲连接
         * @param serverList          服务器列表
         */
        public ThriftTransportPool(int poolSize, int minSize, int maxIdleSecond, int checkInvervalSecond, List<ServerInfo> serverList) {
            if (poolSize <= 0) {
                poolSize = 1;
            }
            if (minSize > poolSize) {
                minSize = poolSize;
            }
            if (minSize <= 0) {
                minSize = 0;
            }
            this.maxIdleSecond = maxIdleSecond;
            this.minSize = minSize;
            this.poolSize = poolSize;
            this.serverInfos = serverList;
            this.allowCheck = true;
            this.checkInvervalSecond = checkInvervalSecond;
            init();
            check();
        }
    
        /**
         * 连接池构造函数(默认最大空闲时间300秒)
         *
         * @param poolSize   连接池大小
         * @param minSize    池中保持激活的最少连接数
         * @param serverList 服务器列表
         */
        public ThriftTransportPool(int poolSize, int minSize, List<ServerInfo> serverList) {
            this(poolSize, minSize, 300, 60, serverList);
        }
    
    
        public ThriftTransportPool(int poolSize, List<ServerInfo> serverList) {
            this(poolSize, 1, 300, 60, serverList);
        }
    
        public ThriftTransportPool(List<ServerInfo> serverList) {
            this(serverList.size(), 1, 300, 60, serverList);
        }
    
    
        /**
         * 检查空闲连接
         */
        private void check() {
            checkTread =
                    new Thread(new Runnable() {
                        public void run() {
                            while (allowCheck) {
                                //System.out.println("--------------");
                                System.out.println("开始检测空闲连接...");
                                for (int i = 0; i < pool.length; i++) {
                                    //if (pool[i] == null) {
                                    //    System.out.println("pool[" + i + "]为null");
                                    //}
                                    //if (pool[i].getTransport() == null) {
                                    //    System.out.println("pool[" + i + "].getTransport()为null");
                                    //}
                                    if (pool[i].isAvailable() && pool[i].getLastUseTime() != null) {
                                        long idleTime = new Date().getTime() - pool[i].getLastUseTime().getTime();
                                        //超过空闲阀值的连接,主动断开,以减少资源消耗
                                        if (idleTime > maxIdleSecond * 1000) {
                                            if (getActiveCount() > minSize) {
                                                pool[i].getTransport().close();
                                                pool[i].setIsBusy(false);
                                                System.out.println(pool[i].hashCode() + "," + pool[i].getHost() + ":" + pool[i].getPort() + " 超过空闲时间阀值被断开!");
                                            }
                                        }
                                    }
                                }
                                System.out.println("当前活动连接数:" + getActiveCount());
                                try {
                                    Thread.sleep(checkInvervalSecond * 1000);
                                } catch (Exception e) {
                                    e.printStackTrace();
                                }
                            }
                        }
                    });
            checkTread.start();
        }
    
        /**
         * 连接池初始化
         */
        private void init() {
            access = new Semaphore(poolSize);
            pool = new TransfortWrapper[poolSize];
    
            for (int i = 0; i < pool.length; i++) {
                int j = i % serverInfos.size();
                TSocket socket = new TSocket(serverInfos.get(j).getHost(),
                        serverInfos.get(j).getPort());
                if (i < minSize) {
                    pool[i] = new TransfortWrapper(socket, serverInfos.get(j).getHost(), serverInfos.get(j).getPort(), true);
                } else {
                    pool[i] = new TransfortWrapper(socket, serverInfos.get(j).getHost(), serverInfos.get(j).getPort());
                }
            }
        }
    
    
        /**
         * 从池中取一个可用连接
         * @return
         */
        public TTransport get() {
            try {
                if (access.tryAcquire(3, TimeUnit.SECONDS)) {
                    synchronized (this) {
                        for (int i = 0; i < pool.length; i++) {
                            if (pool[i].isAvailable()) {
                                pool[i].setIsBusy(true);
                                pool[i].setLastUseTime(new Date());
                                return pool[i].getTransport();
                            }
                        }
                        //尝试激活更多连接
                        for (int i = 0; i < pool.length; i++) {
                            if (!pool[i].isBusy() && !pool[i].isDead()
                                    && !pool[i].getTransport().isOpen()) {
                                try {
                                    pool[i].getTransport().open();
                                    pool[i].setIsBusy(true);
                                    pool[i].setLastUseTime(new Date());
                                    return pool[i].getTransport();
                                } catch (Exception e) {
                                    //e.printStackTrace();
                                    System.err.println(pool[i].getHost() + ":" + pool[i].getPort() + " " + e.getMessage());
                                    pool[i].setIsDead(true);
                                }
                            }
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                throw new RuntimeException("can not get available client");
    
            }
            throw new RuntimeException("all client is too busy");
        }
    
        /**
         * 客户端调用完成后,必须手动调用此方法,将TTransport恢复为可用状态
         *
         * @param client
         */
        public void release(TTransport client) {
            boolean released = false;
            synchronized (this) {
                for (int i = 0; i < pool.length; i++) {
                    if (client == pool[i].getTransport() && pool[i].isBusy()) {
                        pool[i].setIsBusy(false);
                        released = true;
                        break;
                    }
                }
            }
            if (released) {
                access.release();
            }
        }
    
    
        public void destory() {
            if (pool != null) {
                for (int i = 0; i < pool.length; i++) {
                    pool[i].getTransport().close();
                }
            }
            allowCheck = false;
            checkTread = null;
            System.out.print("连接池被销毁!");
        }
    
        /**
         * 获取当前已经激活的连接数
         *
         * @return
         */
        public int getActiveCount() {
            int result = 0;
            for (int i = 0; i < pool.length; i++) {
                if (!pool[i].isDead() && pool[i].getTransport().isOpen()) {
                    result += 1;
                }
            }
            return result;
        }
    
        /**
         * 获取当前繁忙状态的连接数
         *
         * @return
         */
        public int getBusyCount() {
            int result = 0;
            for (int i = 0; i < pool.length; i++) {
                if (!pool[i].isDead() && pool[i].isBusy()) {
                    result += 1;
                }
            }
            return result;
        }
    
        /**
         * 获取当前已"挂"掉连接数
         *
         * @return
         */
        public int getDeadCount() {
            int result = 0;
            for (int i = 0; i < pool.length; i++) {
                if (pool[i].isDead()) {
                    result += 1;
                }
            }
            return result;
        }
    
        public String toString() {
            return "poolsize:" + pool.length +
                    ",minSize:" + minSize +
                    ",maxIdleSecond:" + maxIdleSecond +
                    ",checkInvervalSecond:" + checkInvervalSecond +
                    ",active:" + getActiveCount() +
                    ",busy:" + getBusyCount() +
                    ",dead:" + getDeadCount();
        }
    
        public String getWrapperInfo(TTransport client) {
            for (int i = 0; i < pool.length; i++) {
                if (pool[i].getTransport() == client) {
                    return pool[i].toString();
                }
            }
            return "";
        }
    }
    

    主要思路:

    1.构造器里,传入 连接池大小,最小连接数,连接最大空闲时间,空间连接检测时间间隔,服务端列表等基本信息

    2.然后调用init方法进行初始化,初始化时把pool[]数组填满,不过在填充的时候,要根据minsize决定激活多少连接(换句话讲,连接实例都都建好了,只是连不连的问题),另外初始化的时候,还要考虑到某个服务器宕机的可能,如果服务端挂了,将对应的实例设置为isDead=true的状态

    3.新开一个线程定时检查是否有空闲连接,如果空闲时间太长,主动断开,以节省开销。

    4.get()方法从数组中捞一个可用的连接出来,取的时候要考虑到唤醒"沉睡"连接的情况,即如果当前池中只有2个活动连接,这时又来了请求,没有活动连接了,要从池中把断开的连接叫醒一个。

    5.要控制并发控制,多个线程同时调用get()想从池中取可用连接时,可用Semaphore+Lock的机制来加以控制,可参考上一篇内容。

    测试:

    package yjmyzz;
    
    import org.apache.thrift.transport.TSocket;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Callable;
    import java.util.concurrent.FutureTask;
    
    
    public class PoolTest {
    
        public static void main(String[] args) throws Exception {
    
            //初始化一个连接池(poolsize=15,minsize=1,maxIdleSecond=5,checkInvervalSecond=10)
            final ThriftTransportPool pool = new ThriftTransportPool(15, 1, 5, 10, getServers());
    
    
            //模拟客户端调用
            createClients(pool);
    
            //等候清理空闲连接
            Thread.sleep(30000);
    
            //再模拟一批客户端,验证连接是否会重新增加
            createClients(pool);
    
            System.out.println("输入任意键退出...");
            System.in.read();
            //销毁连接池
            pool.destory();
    
        }
    
    
        private static void createClients(final ThriftTransportPool pool) throws Exception {
    
            //模拟5个client端
            int clientCount = 5;
    
            Thread thread[] = new Thread[clientCount];
            FutureTask<String> task[] = new FutureTask[clientCount];
    
            for (int i = 0; i < clientCount; i++) {
                task[i] = new FutureTask<String>(new Callable<String>() {
                    public String call() throws Exception {
                        TSocket scoket = (TSocket) pool.get();//从池中取一个可用连接
                        //模拟调用RPC会持续一段时间
                        System.out.println(Thread.currentThread().getName() + " => " + pool.getWrapperInfo(scoket));
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        pool.release(scoket);//记得每次用完,要将连接释放(恢复可用状态)
                        return Thread.currentThread().getName() + " done.";
                    }
                });
                thread[i] = new Thread(task[i], "Thread" + i);
            }
    
            //启用所有client线程
            for (int i = 0; i < clientCount; i++) {
                thread[i].start();
                Thread.sleep(10);
            }
    
            System.out.println("--------------");
    
            //等待所有client调用完成
            for (int i = 0; i < clientCount; i++) {
                System.out.println(task[i].get());
                System.out.println(pool);
                System.out.println("--------------");
                thread[i] = null;
            }
        }
    
        private static List<ServerInfo> getServers() {
            List<ServerInfo> servers = new ArrayList<ServerInfo>();
            servers.add(new ServerInfo("localhost", 9000));
            servers.add(new ServerInfo("localhost", 9001));
            servers.add(new ServerInfo("localhost", 1002));//这一个故意写错的,模拟服务器挂了,连接不上的情景
            return servers;
        }
    }
    

    输出:

    ****************************
    开始检测空闲连接...
    当前活动连接数:1
    Thread1 => hashCode:919192718,localhost:9001,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:09:59
    Thread0 => hashCode:1510835162,localhost:9000,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:09:59
    localhost:1002 java.net.ConnectException: Connection refused
    Thread2 => hashCode:1466719669,localhost:9000,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:10:00
    Thread3 => hashCode:2080503518,localhost:9001,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:10:00
    localhost:1002 java.net.ConnectException: Connection refused
    Thread4 => hashCode:411724643,localhost:9000,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:10:00
    --------------
    Thread0 done.
    poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:3,dead:2
    --------------
    Thread1 done.
    poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:3,dead:2
    --------------
    Thread2 done.
    poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:2,dead:2
    --------------
    Thread3 done.
    poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:1,dead:2
    --------------
    Thread4 done.
    poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:0,dead:2
    --------------
    开始检测空闲连接...
    1510835162,localhost:9000 超过空闲时间阀值被断开!
    919192718,localhost:9001 超过空闲时间阀值被断开!
    1466719669,localhost:9000 超过空闲时间阀值被断开!
    2080503518,localhost:9001 超过空闲时间阀值被断开!
    当前活动连接数:1
    开始检测空闲连接...
    当前活动连接数:1
    开始检测空闲连接...
    当前活动连接数:1
    Thread0 => hashCode:411724643,localhost:9000,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:10:31
    Thread1 => hashCode:1510835162,localhost:9000,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:10:31
    Thread2 => hashCode:919192718,localhost:9001,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:10:31
    Thread3 => hashCode:1466719669,localhost:9000,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:10:31
    Thread4 => hashCode:2080503518,localhost:9001,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:10:31
    --------------
    Thread0 done.
    poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:4,dead:2
    --------------
    Thread1 done.
    poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:3,dead:2
    --------------
    Thread2 done.
    poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:2,dead:2
    --------------
    Thread3 done.
    poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:1,dead:2
    --------------
    Thread4 done.
    poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:0,dead:2
    --------------
    输入任意键退出...
    q
    连接池被销毁!

    ***********************

    注意上面高亮颜色的部分,2080503518 连接创建后,后来被check方法主动检测到空闲断开,然后第二轮调用时,又重新激活。411724643 则幸免于难,一直战斗到最后。另外由于故意写错了一个server地址,池中始终有二个dead的实例。

    值得改进的地方:

    主要是公平性的问题,在初始化的时候,如果服务器有3台,而指定的连接池大小为4,目前的做法是,用4对3取模,所以第1、4个连接实例都是连接到服务器1,get取可用连接的时候也有类似情况,是按pool数组从前向后遍历的,捞到第1个可用的连接就完事了,这样永远是排在List前面的服务器压力会大一些,这样有点不太符合负载"均衡"的本意。

    不过,这个问题也很好解决,有一个很简单有效的技巧,实际应用中,服务器列表是从zk上取回来的,取回来后,先对数组做随机排序,这样整体看来下,多个连接池总体的连接分布情况就比较平均了。

  • 相关阅读:
    query and join operation after sharding
    Windows Phone中的几种集合控件
    什么是SOPA SOPA的危害
    自动刷新人人网API session_key方法
    Windows Phone XNA创建简单局域网游戏
    static 修饰MySqlConnection引发的异常
    $Dsu$ $on$ $Tree$ 复习
    $Noip$前的小总结哦
    $NOIP2018$ 暴踩全场计划实施方案
    $NOIP2018$ 爆踩全场记
  • 原文地址:https://www.cnblogs.com/yjmyzz/p/thrift-client-pool-demo.html
Copyright © 2011-2022 走看看