当前位置首页 > Apache知识

apache-common-pool2

阅读次数:216 次  来源:admin  发布时间:

对象的创建和销毁在一定程度上会消耗系统的资源,虽然jvm的性能在近几年已经得到了很大的提高,对于多数对象来说,没有必要利用对象池技术来进行对象的创建和管理。但是对于有些对象来说,其创建的代价还是比较昂贵的,比如线程、tcp连接、rpc连接、数据库连接等对象,因此对象池技术还是有其存在的意义。

Apache-commons-pool-1.6提供的对象池主要有两种:

一种是带Key的对象池,这种带Key的对象池是把相同的池对象放在同一个池中,也就是说有多少个key就有多少个池; 另一种是不带Key的对象池,这种对象池是把生产完全一致的对象放在同一个池中,但是有时候,单用对池内所有对象一视同仁的对象池,并不能解决的问题。例如:对于一组某些参数设置不同的同类对象——比如一堆指向不同地址的 java.net.URL对象或者一批代表不同语句的java.sql.PreparedStatement对象,用这样的方法池化,就有可能取出不合用的对象。

1、对象池:

1)对象池接口介绍:

如果让我们去设计一个对象池接口,会给用户提供哪些核心的方法呢?

orrowObject(),returnObject()是两个核心方法,一个是’借’,一个是’还’。那么我们有可能需要对一个已经借到的对象置为失效(比如当我们的远程连接关闭或产生异常,这个连接不可用需要失效掉),invalidateObject()也是必不可少的。对象池刚刚创建的时候,我们可能需要预热一部分对象,而不是采用懒加载模式以避免系统启动时候的抖动,因此addObject()提供给用户,以进行对象池的预热。有创建就有销毁,clear()和close()就是用来清空对象池(觉得叫purge()可能更好一点)。除此之外,我们可能还需要一些简单的统计,比如getNumIdle()获得空闲对象个数和getNumActive()获得活动对象(被借出对象)的个数。如下表:

apache-common-pool2

2)在commons-pool中有两类对象池接口(带key和不带key),一个是ObjectPool,另一个是KeyedObjectPool;此外,为了方便他们分别还对应了ObjectPoolFactory、KeyedObjectPoolFactory两个接口(这两个接口在功能上和他们都一样,只是使用形式上不一样)

3)对象池空间划分:

一个对象存储到对象池中,其位置不是一成不变的。空间的划分可以分为两种,一种是物理空间划分,一种是逻辑空间划分。不同的实现可能采用不同的技术手段,Commons Pool实际上采用了逻辑划分。如下图所示:apache-common-pool2

从整体上来讲,可以将空间分为池外空间和池内空间,池外空间是指被’出借’的对象所在的空间(逻辑空间)。池内空间进一步可以划分为idle空间,abandon空间和invalid空间。idle空间就是空闲对象所在的空间,空闲对象之间是有一定的组织结构的(详见后文)。abandon空间又被称作放逐空间,用于放逐被出借超时的对象。invalid空间其实就是对象的垃圾场,这些对象将不会在被使用,而是等待被gc处理掉。

4)对象池的放逐与驱逐策略:

下面我们会多次提到驱逐(eviction)和放逐(abandon),这两个概念是对象池设计的核心。

先来看驱逐,我们知道对象池的一个重要的特性就是伸缩性,所谓伸缩性是指对象池能够根据当前池中空闲对象的数量(maxIdle和minIdle配置)自动进行调整,进而避免内存的浪费。自动伸缩,这是驱逐所需要达到的目标,他是如何实现的呢?实际上在对象池内部,我们可以维护一个驱逐定时器(EvictionTimer),由timeBetweenEvictionRunsMillis参数对定时器的间隔加以控制,每次达到驱逐时间后,我们就选定一批对象(由numTestsPerEvictionRun参数进行控制)进行驱逐测试,这个测试可以采用策略模式,比如Commons Pool的DefaultEvictionPolicy,代码如下:

apache-common-pool2

对于符合驱逐条件的对象,将会被对象池无情的驱逐出空闲空间,并丢弃到invalid空间。之后对象池还需要保证内部空闲对象数量需要至少达到minIdle的控制要求。

我们在看来放逐,对象出借时间太长(由removeAbandonedTimeout控制),我们就把他们称作流浪对象,这些对象很有可能是那些用完不还的坏蛋们的杰作,也有可能是对象使用者出现了什么突发状况,比如网络连接超时时间设置长于放逐时间。总之,被放逐的对象是不允许再次回归到对象池中的,他们会被搁置到abandon空间,进而进入invalid空间再被gc掉以完成他们的使命。放逐由removeAbandoned()方法实现,分为标记过程和放逐过程,代码实现并不难,有兴趣的可以直接翻翻源代码。

驱逐是由内而外将对象驱逐出境,放逐则是由外而内,将对象流放。他们一内一外,正是整个对象池形成闭环的核心要素。

5)对象池有效性探测:

用过数据库连接池的同学可能对类似testOnBorrow的配置比较熟悉。除了testOnBorrow,对象池还提供了testOnCreate, testOnReturn, testWhileIdle,其中testWhileIdle是当对象处于空闲状态的时候所进行的测试,当测试通过则继续留在对象池中,如果失效,则弃置到invalid空间。所谓testOnBorrow其实就是当对象出借前进行测试,测试什么?当然是有效性测试,在测试之前我们需要调用factory.activateObject()以激活对象,在调用factory.validateObject(p)对准备出借的对象做有有效性检查,如果这个对象无效则可能有抛出异常的行为,或者返回空对象,这全看具体实现了。testOnCreate表示当对象创建之后,再进行有效性测试,这并不适用于频繁创建和销毁对象的对象池,他与testOnBorrow的行为类似。testOnReturn是在对象还回到池子之前锁进行的测试,与出借的测试不同,testOnReturn无论是测试成功还是失败,我们都需要保证池子中的对象数量是符合配置要求的()ensureIdle()方法就是做这个事情),并且如果测试失败了,我们可以直接swallow这个异常,因为用户根本不需要关心池子的状态。

6)对象池的常见配置一览:

apache-common-pool2

2、池化对象:

1)池化对象接口:(被池化的对象需要实现该接口)

池化对象就是对象池中所管理的基本单元。我们可以思考一下,如果直接将我们的原始对象放到对象池中是否可以?答案当然是可以,但是不好,因为如果那样做,我们的对象池就退化成了容器Collection了,之所以需要将原始对象wrapper成池对象,是因为我们需要提供额外的管理功能,比如生命周期管理。commons pool采用了PooledObject<T&gt;接口和KeyedPooledObject<T>接口用于表达池对象,它主要抽象了池对象的状态管理和一些诸如状态变迁时所产生的统计指标,这些指标可以配合对象池做更精准的管理操作。

2)池化对象状态:

说到对池对象的管理,最重要的当属对状态的管理。对于状态管理,我们熟知的模型就是状态机模型了。池对象当然也有一套自己的状态机,我们先来看看commons pool所定义的池对象都有哪些状态:

apache-common-pool2

这里只需知道:放逐( ABANDONED)指的是不在对象池中的对象超时流放;驱逐( EVICTION)指的是空闲对象超时销毁;VALIDATION是有效性校验,主要校验空闲对象的有效性。注意与驱逐和放逐之间的区别。我们通过一张图来看看状态之间的变迁。

apache-common-pool2

我们看到上图的’圆圈’表示的就是池对象,其中中间的英文简写是其对应的状态。虚线外框则表示瞬时状态。比如RETURNING和ABANDONED。这里我们省略了VALIDATION_RETURN_TO_HEAD,VALIDATION_PREALLOCATED,EVICTION_RETURN_TO_HEAD,因为这对于我们理解池对象状态变迁并没有太多帮助。针对上图,我们重点关注四个方面:

IDLE->ALLOCATED 即上图的borrow操作,除了需要将状态置为已分配,我们还需要考虑如果对象池耗尽了怎么办?是继续阻塞还是直接异常退出?如果阻塞是阻塞多久?

ALLOCATED->IDLE 即上图的return操作,我们需要考虑的是,如果池对象还回到对象池,此时对象池空闲数已经达到上界或该对象已经无效,我们是否需要进行特殊处理?

IDLE->EVICTION 与 ALLOCATED->ABANDONED 请参考后文

IDLE->VALIDATION 是testWhileIdle的有效性测试所需要经历的状态变迁,他是指每隔一段时间对池中所有的idle对象进行有效性检查,以排除那些已经失效的对象。失效的对象将会弃置到invalid空间。

3)池化对象生命周期控制:

只搞清楚了池化对象的状态和状态转移是不够的,我们还应该能够对池对象生命周期施加影响。Commons Pool通过PooledObjectFactory<T>接口和KeyedPooledObjectFactory<T>对对象生命周期进行控制。该接口有如下方法:

apache-common-pool2

我们需要注意,池对象必须经过创建(makeObject())和初始化过程(activateObject())后才能够被我们使用。我们看一看这些方法能够影响哪些状态变迁。

4)池对象组织结构:

池中的对象,并不是杂乱无章的,他们得有一定的组织结构。不同的组织结构可能会从整体影响对象池的使用。Apache Commons提供了两种组织结构,其一是有界阻塞双端队列(LinkedBlockingDeque),其二是key桶。

apache-common-pool2

有界阻塞队列能够提供阻塞特性,当池中对象exhausted后,新申请对象的线程将会阻塞,这是典型的生产者/消费者模型,通过这种双端的阻塞队列,我们能够实现池对象的lifo或fifo。如下代码:

apache-common-pool2

因为是带有阻塞性质的队列,我们能够通过fairness参数控制线程获得锁的公平性,这里我们可以参考AQS实现,不说了。下面我们再来看一看key桶的数据结构:

apache-common-pool2

从上图我们可以看出,每一个key对应一个的双端阻塞队列ObjectDeque,ObjectDeque实际上就是包装了LinkedBlockingDeque,采用这种结构我们能够对池对象进行一定的划分,从而更加灵活的使用对象池。Commons Pool采用了KeyedObjectPool<K,V>用以表示采用这种数据结构的对象池。当我们borrow和return的时候,都需要指定对应的key空间。

Refer:

https://blog.csdn.net/liang_love_java/article/details/50510753

https://blog.csdn.net/liuxiao723846/article/details/78881040

Demo:

package test.misc.pool;

import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;

import java.util.Calendar;

public class CalendarFactory extends BasePooledObjectFactory<Calendar> {

    @Override
    public Calendar create() throws Exception {
        return Calendar.getInstance();
    }

    @Override
    public PooledObject<Calendar> wrap(Calendar cal) {
        return new DefaultPooledObject<>(cal);
    }

    @Override
    public void destroyObject(PooledObject<Calendar> p) throws Exception {
        super.destroyObject(p);
        //System.out.println("### destroyObject() ###");
    }

}

CalendarFactory

package test.misc.pool;

import com.google.common.base.MoreObjects;
import org.apache.commons.pool2.BaseKeyedPooledObjectFactory;
import org.apache.commons.pool2.KeyedObjectPool;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConnectionTestFactory extends BaseKeyedPooledObjectFactory<String, ConnectionTest> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionTestFactory.class);

    @Override
    public ConnectionTest create(String key) throws Exception {
        return new ConnectionTest(key);
    }

    @Override
    public PooledObject<ConnectionTest> wrap(ConnectionTest value) {
        return new DefaultPooledObject<>(value);
    }

    public static void main(String[] args) {
        GenericKeyedObjectPoolConfig conf = new GenericKeyedObjectPoolConfig();
        conf.setMaxTotal(20);
        conf.setMaxIdlePerKey(10);
        KeyedObjectPool<String, ConnectionTest> objectPool = new GenericKeyedObjectPool<String, ConnectionTest>(new ConnectionTestFactory(), conf);
        try {
            //添加对象到池,重复的不会重复入池
//            objectPool.addObject("1");
//            objectPool.addObject("2");
//            objectPool.addObject("3");
//
//            objectPool.addObject("1");

            // 获得对应key的对象
            ConnectionTest connectionTest1 = objectPool.borrowObject("1");
            LOGGER.info("【borrowObject】connectionTest1 = {}", connectionTest1);
            ConnectionTest connectionTest1a = objectPool.borrowObject("1");
            LOGGER.info("【borrowObject】connectionTest1a = {}", connectionTest1a);
            LOGGER.info("connectionTest1a.equals(connectionTest1) = {}", connectionTest1a==connectionTest1);

            // 释放对象
            objectPool.returnObject("1", connectionTest1);

            //清除所有的对象
            objectPool.clear();
        } catch (Exception e) {
            LOGGER.error("Error", e);
        }
    }
}

class ConnectionTest {
    private String key;
    public ConnectionTest(String key) {
        this.key = key;
    }

    @Override
    public String toString() {
        return MoreObjects.toStringHelper(this)
                .add("key", key)
                .toString();
    }
}

ConnectionTestFactory

package test.misc.pool;

import com.google.common.collect.Lists;
import org.apache.commons.pool2.KeyedObjectPool;
import org.apache.commons.pool2.ObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Calendar;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class ObjectPoolDemo {
    private static final Logger LOGGER = LoggerFactory.getLogger(ObjectPoolDemo.class);
    private static final ExecutorService threadPool = Executors.newCachedThreadPool();

    public static void main(String[] args){
        testObjectPool();
//        testKeyedObjectPool();
    }

    private static void testObjectPool() {
        GenericObjectPoolConfig conf = new GenericObjectPoolConfig();
        conf.setMaxTotal(20);
        conf.setMaxIdle(10);
        conf.setMaxWaitMillis(2000L);
        //conf.setBlockWhenExhausted(false);    //default=true
        ObjectPool<Calendar> objectPool = new GenericObjectPool<>(new CalendarFactory(), conf);
        try {
            //add(objectPool);
            //basic(objectPool);
            testMultiThread(objectPool);

        } catch (Exception e) {
            LOGGER.error("Error", e);
        } finally {
            objectPool.close();
        }
    }

    private static void testKeyedObjectPool() {
        GenericKeyedObjectPoolConfig conf = new GenericKeyedObjectPoolConfig();
        conf.setMaxTotalPerKey(20);
        conf.setMaxIdlePerKey(10);
        KeyedObjectPool<String, ConnectionTest> objectPool = new GenericKeyedObjectPool<>(new ConnectionTestFactory(), conf);
        try {
            //add4Keyed(objectPool);
            basic4Keyed(objectPool);

            // 获得对应key的对象
//            ConnectionTest connectionTest1 = objectPool.borrowObject("1");
//            LOGGER.info("【borrowObject】connectionTest1 = {}", connectionTest1);
//            ConnectionTest connectionTest1a = objectPool.borrowObject("1");
//            LOGGER.info("【borrowObject】connectionTest1a = {}", connectionTest1a);
//            LOGGER.info("connectionTest1a.equals(connectionTest1) = {}", connectionTest1a==connectionTest1);
//
//            // 释放对象
//            objectPool.returnObject("1", connectionTest1);
//
//            //清除所有的对象
//            objectPool.clear();
        } catch (Exception e) {
            LOGGER.error("Error", e);
        } finally {
            objectPool.close();
        }
    }



    private static void add(ObjectPool<Calendar> pool) throws Exception {
        for(int i=0; i<11; i++) {
            pool.addObject();
        }
        LOGGER.info("[1]active={}", pool.getNumActive());
        LOGGER.info("[2]idle={}", pool.getNumIdle());
    }

    private static void basic(ObjectPool<Calendar> pool) throws Exception {
        LOGGER.info("[1]active={}, idle={}", pool.getNumActive(), pool.getNumIdle());
        Calendar cal = pool.borrowObject();
        LOGGER.info("cal != null: {}", (cal != null));
        LOGGER.info("[1]active={}, idle={}", pool.getNumActive(), pool.getNumIdle());
        pool.returnObject(cal);
        LOGGER.info("[1]active={}, idle={}", pool.getNumActive(), pool.getNumIdle());

        Calendar cal2 = pool.borrowObject();
        LOGGER.info("cal2 == cal: {}", (cal2 == cal));
        LOGGER.info("[1]active={}, idle={}", pool.getNumActive(), pool.getNumIdle());
        pool.returnObject(cal2);
        LOGGER.info("[1]active={}, idle={}", pool.getNumActive(), pool.getNumIdle());
    }

    private static void add4Keyed(KeyedObjectPool<String, ConnectionTest> pool) throws Exception {
        //添加对象到池,重复的不会重复入池
        pool.addObject("1");
        pool.addObject("2");
        pool.addObject("3");

        for(int i=0; i<11; i++) {
            pool.addObject("1");
        }
        LOGGER.info("[1]active={}", pool.getNumActive("1"));
        LOGGER.info("[2]active={}", pool.getNumActive("2"));
        LOGGER.info("[3]active={}", pool.getNumActive("3"));
        LOGGER.info("[4]active={}", pool.getNumActive("4"));
    }

    private static void basic4Keyed(KeyedObjectPool<String, ConnectionTest> pool) throws Exception {
        String key = "1";
        LOGGER.info("[1]active={}", pool.getNumActive(key));
        ConnectionTest ct = pool.borrowObject(key);
        LOGGER.info("ct != null: {}", (ct != null));
        LOGGER.info("[1]active={}", pool.getNumActive(key));

        pool.returnObject(key, ct);
        LOGGER.info("[1]active={}", pool.getNumActive(key));
    }

    private static void testMultiThread(final ObjectPool<Calendar> objectPool) {
        System.out.println("testMultiThread(): Begin -------------");
        List<Future<Long>> futureList = Lists.newLinkedList();
        for(int i=0; i<21; i++) {
            futureList.add(threadPool.submit(() -> {
                if(objectPool.getNumActive() >= 20) {
                    System.out.println("Notice: Pool exhausted ....");
                }
                Calendar cal = objectPool.borrowObject();
                long mill = cal.getTimeInMillis();
                TimeUnit.SECONDS.sleep(1);
                objectPool.returnObject(cal);
                return mill;
            }));
        }

        futureList.forEach(future -> {
            try {
                LOGGER.info("Thread=[{}]TimeMill={}", Thread.currentThread().getName(), future.get());
            } catch (Exception e) {
                LOGGER.error("", e);
            }
        });
        LOGGER.info("[1]active={}, idle={}", objectPool.getNumActive(), objectPool.getNumIdle());
        System.out.println("testMultiThread(): End -------------");
        threadPool.shutdownNow();
    }


}

ObjectPoolDemo

上一篇:Ubuntu上安装nginx并配置反向代理Node.js端口
下一篇:VMware虚拟机安装Ubuntu系统英文改中文的方法