博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flux OOM实例
阅读量:6148 次
发布时间:2019-06-21

本文共 4489 字,大约阅读时间需要 14 分钟。

本文主要研究下Flux的OOM产生的场景

FluxSink.OverflowStrategy

reactor-core-3.1.3.RELEASE-sources.jar!/reactor/core/publisher/FluxSink.java

/**	 * Enumeration for backpressure handling.	 */	enum OverflowStrategy {		/**		 * Completely ignore downstream backpressure requests.		 * 

* This may yield {@link IllegalStateException} when queues get full downstream. */ IGNORE, /** * Signal an {@link IllegalStateException} when the downstream can't keep up */ ERROR, /** * Drop the incoming signal if the downstream is not ready to receive it. */ DROP, /** * Downstream will get only the latest signals from upstream. */ LATEST, /** * Buffer all signals if the downstream can't keep up. *

* Warning! This does unbounded buffering and may lead to {@link OutOfMemoryError}. */ BUFFER }复制代码

可以看到BUFFER采用的是无界队列,可能产生OOM

实例

@Test    public void testFluxOOM() throws InterruptedException {        final Flux
flux = Flux.
create(fluxSink -> { //NOTE sink:class reactor.core.publisher.FluxCreate$SerializedSink LOGGER.info("sink:{}",fluxSink.getClass()); while (true) { fluxSink.next(ThreadLocalRandom.current().nextInt()); } }, FluxSink.OverflowStrategy.BUFFER) .publishOn(Schedulers.elastic(),Integer.MAX_VALUE); //NOTE 测试OOM //NOTE flux:class reactor.core.publisher.FluxCreate,prefetch:-1 LOGGER.info("flux:{},prefetch:{}",flux.getClass(),flux.getPrefetch()); flux.subscribe(e -> { LOGGER.info("subscribe:{}",e); try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e1) { e1.printStackTrace(); } }); TimeUnit.MINUTES.sleep(20); }复制代码

jvm参数

-Xmx2160K -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp -XX:+PrintGCDetails -Xloggc:/tmp/gc.log复制代码

注意这里使用了publishOn,另外prefetch参数设置为Integer.MAX_VALUE(默认为256),就是为了复现无界队列造成的OOM

输出

java.lang.OutOfMemoryError: GC overhead limit exceededDumping heap to /tmp/java_pid5295.hprof ...Heap dump file created [6410067 bytes in 0.149 secs]Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded	at java.util.jar.Manifest$FastInputStream.
(Manifest.java:332) at java.util.jar.Manifest$FastInputStream.
(Manifest.java:327) at java.util.jar.Manifest.read(Manifest.java:195) at java.util.jar.Manifest.
(Manifest.java:69) at java.util.jar.JarFile.getManifestFromReference(JarFile.java:199) at java.util.jar.JarFile.getManifest(JarFile.java:180) at sun.misc.URLClassPath$JarLoader$2.getManifest(URLClassPath.java:944) at java.net.URLClassLoader.defineClass(URLClassLoader.java:450) at java.net.URLClassLoader.access$100(URLClassLoader.java:73)Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "main"Process finished with exit code 137复制代码

heap dump

使用MAT分析可以看到reactor.util.concurrent.SpscLinkedArrayQueue持有了很多未释放的数据,该队列由FluxCreate$BufferAsyncSink持有

static final class BufferAsyncSink
extends BaseSink
{ final Queue
queue; Throwable error; volatile boolean done; volatile int wip; @SuppressWarnings("rawtypes") static final AtomicIntegerFieldUpdater
WIP = AtomicIntegerFieldUpdater.newUpdater(BufferAsyncSink.class, "wip"); BufferAsyncSink(CoreSubscriber
actual, int capacityHint) { super(actual); this.queue = Queues.
unbounded(capacityHint).get(); } //...... } 复制代码

reactor-core-3.1.3.RELEASE-sources.jar!/reactor/util/concurrent/Queues.java

/**	 * Returns an unbounded, linked-array-based Queue. Integer.max sized link will	 * return the default {@link #SMALL_BUFFER_SIZE} size.	 * @param linkSize the link size	 * @param 
the reified {@link Queue} generic type * @return an unbounded {@link Queue} {@link Supplier} */ @SuppressWarnings("unchecked") public static
Supplier
> unbounded(int linkSize) { if (linkSize == XS_BUFFER_SIZE) { return XS_UNBOUNDED; } else if (linkSize == Integer.MAX_VALUE || linkSize == SMALL_BUFFER_SIZE) { return unbounded(); } return () -> new SpscLinkedArrayQueue<>(linkSize); }复制代码

可以看到Queues的unbounded方法创建了一个无界队列SpscLinkedArrayQueue来缓冲数据

小结

使用Flux要注意OOM的问题,不过reactor的类库已经尽可能小心地避免这个问题,普通场景的api调用貌似没问题,自己个性化参数的时候要额外注意,本实例就是使用publishOn时特意指定prefetch为Integer.MAX_VALUE,才造成OOM

转载地址:http://ohmya.baihongyu.com/

你可能感兴趣的文章
C 函数sscanf()的用法
查看>>
python模块之hashlib: md5和sha算法
查看>>
解决ros建***能登录不能访问内网远程桌面的问题
查看>>
pfsense锁住自己
查看>>
vsftpd 相关总结
查看>>
售前工程师的成长---一个老员工的经验之谈
查看>>
Get到的优秀博客网址
查看>>
【Git入门之四】操作项目
查看>>
老男孩教育每日一题-第107天-简述你对***的理解,常见的有哪几种?
查看>>
Python学习--time
查看>>
在OSCHINA上的第一篇博文,以后好好学习吧
查看>>
luov之SMTP报错详解
查看>>
软件概要设计做什么,怎么做
查看>>
dwr
查看>>
java的特殊符号
查看>>
word2010中去掉红色波浪线的方法
查看>>
fabric上下文管理器(context mangers)
查看>>
JQuery-EasyUI Datagrid数据行鼠标悬停/离开事件(onMouseOver/onMouseOut)
查看>>
并发和并行的区别
查看>>
php小知识
查看>>