博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊Elasticsearch的CircuitBreaker
阅读量:5876 次
发布时间:2019-06-19

本文共 14758 字,大约阅读时间需要 49 分钟。

本文主要研究一下Elasticsearch的CircuitBreaker

CircuitBreaker

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java

/** * Interface for an object that can be incremented, breaking after some * configured limit has been reached. */public interface CircuitBreaker {    /**     * The parent breaker is a sum of all the following breakers combined. With     * this we allow a single breaker to have a significant amount of memory     * available while still having a "total" limit for all breakers. Note that     * it's not a "real" breaker in that it cannot be added to or subtracted     * from by itself.     */    String PARENT = "parent";    /**     * The fielddata breaker tracks data used for fielddata (on fields) as well     * as the id cached used for parent/child queries.     */    String FIELDDATA = "fielddata";    /**     * The request breaker tracks memory used for particular requests. This     * includes allocations for things like the cardinality aggregation, and     * accounting for the number of buckets used in an aggregation request.     * Generally the amounts added to this breaker are released after a request     * is finished.     */    String REQUEST = "request";    /**     * The in-flight request breaker tracks bytes allocated for reading and     * writing requests on the network layer.     */    String IN_FLIGHT_REQUESTS = "in_flight_requests";    /**     * The accounting breaker tracks things held in memory that is independent     * of the request lifecycle. This includes memory used by Lucene for     * segments.     */    String ACCOUNTING = "accounting";    enum Type {        // A regular or ChildMemoryCircuitBreaker        MEMORY,        // A special parent-type for the hierarchy breaker service        PARENT,        // A breaker where every action is a noop, it never breaks        NOOP;        public static Type parseValue(String value) {            switch(value.toLowerCase(Locale.ROOT)) {                case "noop":                    return Type.NOOP;                case "parent":                    return Type.PARENT;                case "memory":                    return Type.MEMORY;                default:                    throw new IllegalArgumentException("No CircuitBreaker with type: " + value);            }        }    }    enum Durability {        // The condition that tripped the circuit breaker fixes itself eventually.        TRANSIENT,        // The condition that tripped the circuit breaker requires manual intervention.        PERMANENT    }    /**     * Trip the circuit breaker     * @param fieldName name of the field responsible for tripping the breaker     * @param bytesNeeded bytes asked for but unable to be allocated     */    void circuitBreak(String fieldName, long bytesNeeded);    /**     * add bytes to the breaker and maybe trip     * @param bytes number of bytes to add     * @param label string label describing the bytes being added     * @return the number of "used" bytes for the circuit breaker     */    double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException;    /**     * Adjust the circuit breaker without tripping     */    long addWithoutBreaking(long bytes);    /**     * @return the currently used bytes the breaker is tracking     */    long getUsed();    /**     * @return maximum number of bytes the circuit breaker can track before tripping     */    long getLimit();    /**     * @return overhead of circuit breaker     */    double getOverhead();    /**     * @return the number of times the circuit breaker has been tripped     */    long getTrippedCount();    /**     * @return the name of the breaker     */    String getName();    /**     * @return whether a tripped circuit breaker will reset itself (transient) or requires manual intervention (permanent).     */    Durability getDurability();}复制代码
  • CircuitBreaker定义了Type、Durability枚举;它还定义了circuitBreak、addEstimateBytesAndMaybeBreak、addWithoutBreaking、getUsed、getLimit、getOverhead、getTrippedCount等方法;它有两个实现类分别是NoopCircuitBreaker、ChildMemoryCircuitBreaker

NoopCircuitBreaker

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/breaker/NoopCircuitBreaker.java

public class NoopCircuitBreaker implements CircuitBreaker {    public static final int LIMIT = -1;    private final String name;    public NoopCircuitBreaker(String name) {        this.name = name;    }    @Override    public void circuitBreak(String fieldName, long bytesNeeded) {        // noop    }    @Override    public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException {        return 0;    }    @Override    public long addWithoutBreaking(long bytes) {        return 0;    }    @Override    public long getUsed() {        return 0;    }    @Override    public long getLimit() {        return LIMIT;    }    @Override    public double getOverhead() {        return 0;    }    @Override    public long getTrippedCount() {        return 0;    }    @Override    public String getName() {        return this.name;    }    @Override    public Durability getDurability() {        return Durability.PERMANENT;    }}复制代码
  • NoopCircuitBreaker实现了CircuitBreaker接口,它不做任何操作

ChildMemoryCircuitBreaker

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/breaker/ChildMemoryCircuitBreaker.java

public class ChildMemoryCircuitBreaker implements CircuitBreaker {    private final long memoryBytesLimit;    private final double overheadConstant;    private final Durability durability;    private final AtomicLong used;    private final AtomicLong trippedCount;    private final Logger logger;    private final HierarchyCircuitBreakerService parent;    private final String name;    /**     * Create a circuit breaker that will break if the number of estimated     * bytes grows above the limit. All estimations will be multiplied by     * the given overheadConstant. This breaker starts with 0 bytes used.     * @param settings settings to configure this breaker     * @param parent parent circuit breaker service to delegate tripped breakers to     * @param name the name of the breaker     */    public ChildMemoryCircuitBreaker(BreakerSettings settings, Logger logger,                                     HierarchyCircuitBreakerService parent, String name) {        this(settings, null, logger, parent, name);    }    /**     * Create a circuit breaker that will break if the number of estimated     * bytes grows above the limit. All estimations will be multiplied by     * the given overheadConstant. Uses the given oldBreaker to initialize     * the starting offset.     * @param settings settings to configure this breaker     * @param parent parent circuit breaker service to delegate tripped breakers to     * @param name the name of the breaker     * @param oldBreaker the previous circuit breaker to inherit the used value from (starting offset)     */    public ChildMemoryCircuitBreaker(BreakerSettings settings, ChildMemoryCircuitBreaker oldBreaker,                                     Logger logger, HierarchyCircuitBreakerService parent, String name) {        this.name = name;        this.memoryBytesLimit = settings.getLimit();        this.overheadConstant = settings.getOverhead();        this.durability = settings.getDurability();        if (oldBreaker == null) {            this.used = new AtomicLong(0);            this.trippedCount = new AtomicLong(0);        } else {            this.used = oldBreaker.used;            this.trippedCount = oldBreaker.trippedCount;        }        this.logger = logger;        if (logger.isTraceEnabled()) {            logger.trace("creating ChildCircuitBreaker with settings {}", settings);        }        this.parent = parent;    }    /**     * Method used to trip the breaker, delegates to the parent to determine     * whether to trip the breaker or not     */    @Override    public void circuitBreak(String fieldName, long bytesNeeded) {        this.trippedCount.incrementAndGet();        final String message = "[" + this.name + "] Data too large, data for [" + fieldName + "]" +                " would be [" + bytesNeeded + "/" + new ByteSizeValue(bytesNeeded) + "]" +                ", which is larger than the limit of [" +                memoryBytesLimit + "/" + new ByteSizeValue(memoryBytesLimit) + "]";        logger.debug("{}", message);        throw new CircuitBreakingException(message, bytesNeeded, memoryBytesLimit, durability);    }    /**     * Add a number of bytes, tripping the circuit breaker if the aggregated     * estimates are above the limit. Automatically trips the breaker if the     * memory limit is set to 0. Will never trip the breaker if the limit is     * set < 0, but can still be used to aggregate estimations.     * @param bytes number of bytes to add to the breaker     * @return number of "used" bytes so far     */    @Override    public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException {        // short-circuit on no data allowed, immediately throwing an exception        if (memoryBytesLimit == 0) {            circuitBreak(label, bytes);        }        long newUsed;        // If there is no limit (-1), we can optimize a bit by using        // .addAndGet() instead of looping (because we don't have to check a        // limit), which makes the RamAccountingTermsEnum case faster.        if (this.memoryBytesLimit == -1) {            newUsed = noLimit(bytes, label);        } else {            newUsed = limit(bytes, label);        }        // Additionally, we need to check that we haven't exceeded the parent's limit        try {            parent.checkParentLimit((long) (bytes * overheadConstant), label);        } catch (CircuitBreakingException e) {            // If the parent breaker is tripped, this breaker has to be            // adjusted back down because the allocation is "blocked" but the            // breaker has already been incremented            this.addWithoutBreaking(-bytes);            throw e;        }        return newUsed;    }    private long noLimit(long bytes, String label) {        long newUsed;        newUsed = this.used.addAndGet(bytes);        if (logger.isTraceEnabled()) {            logger.trace("[{}] Adding [{}][{}] to used bytes [new used: [{}], limit: [-1b]]",                this.name, new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed));        }        return newUsed;    }    private long limit(long bytes, String label) {        long newUsed;// Otherwise, check the addition and commit the addition, looping if        // there are conflicts. May result in additional logging, but it's        // trace logging and shouldn't be counted on for additions.        long currentUsed;        do {            currentUsed = this.used.get();            newUsed = currentUsed + bytes;            long newUsedWithOverhead = (long) (newUsed * overheadConstant);            if (logger.isTraceEnabled()) {                logger.trace("[{}] Adding [{}][{}] to used bytes [new used: [{}], limit: {} [{}], estimate: {} [{}]]",                        this.name,                        new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed),                        memoryBytesLimit, new ByteSizeValue(memoryBytesLimit),                        newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead));            }            if (memoryBytesLimit > 0 && newUsedWithOverhead > memoryBytesLimit) {                logger.warn("[{}] New used memory {} [{}] for data of [{}] would be larger than configured breaker: {} [{}], breaking",                        this.name,                        newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead), label,                        memoryBytesLimit, new ByteSizeValue(memoryBytesLimit));                circuitBreak(label, newUsedWithOverhead);            }            // Attempt to set the new used value, but make sure it hasn't changed            // underneath us, if it has, keep trying until we are able to set it        } while (!this.used.compareAndSet(currentUsed, newUsed));        return newUsed;    }    /**     * Add an exact number of bytes, not checking for tripping the     * circuit breaker. This bypasses the overheadConstant multiplication.     *     * Also does not check with the parent breaker to see if the parent limit     * has been exceeded.     *     * @param bytes number of bytes to add to the breaker     * @return number of "used" bytes so far     */    @Override    public long addWithoutBreaking(long bytes) {        long u = used.addAndGet(bytes);        if (logger.isTraceEnabled()) {            logger.trace("[{}] Adjusted breaker by [{}] bytes, now [{}]", this.name, bytes, u);        }        assert u >= 0 : "Used bytes: [" + u + "] must be >= 0";        return u;    }    /**     * @return the number of aggregated "used" bytes so far     */    @Override    public long getUsed() {        return this.used.get();    }    /**     * @return the number of bytes that can be added before the breaker trips     */    @Override    public long getLimit() {        return this.memoryBytesLimit;    }    /**     * @return the constant multiplier the breaker uses for aggregations     */    @Override    public double getOverhead() {        return this.overheadConstant;    }    /**     * @return the number of times the breaker has been tripped     */    @Override    public long getTrippedCount() {        return this.trippedCount.get();    }    /**     * @return the name of the breaker     */    @Override    public String getName() {        return this.name;    }    /**     * @return whether a tripped circuit breaker will reset itself (transient) or requires manual intervention (permanent).     */    @Override    public Durability getDurability() {        return this.durability;    }}复制代码
  • ChildMemoryCircuitBreaker实现了CircuitBreaker接口;其circuitBreak方法会抛出CircuitBreakingException
  • addEstimateBytesAndMaybeBreak方法首先判断memoryBytesLimit,如果为0,则执行circuitBreak方法;如果为-1则调用noLimit,否则调用limit计算newUsed,没有抛出异常的话,则最后执行 parent.checkParentLimit方法
  • noLimit方法直接执行this.used.addAndGet(bytes);limit方法首先计算newUsed,然后根据overheadConstant得出newUsedWithOverhead,如果newUsedWithOverhead大于memoryBytesLimit则执行circuitBreak方法,否则将newUsed更新到this.used中

小结

  • CircuitBreaker定义了Type、Durability枚举;它还定义了circuitBreak、addEstimateBytesAndMaybeBreak、addWithoutBreaking、getUsed、getLimit、getOverhead、getTrippedCount等方法;它有两个实现类分别是NoopCircuitBreaker、ChildMemoryCircuitBreaker
  • NoopCircuitBreaker实现了CircuitBreaker接口,它不做任何操作
  • ChildMemoryCircuitBreaker实现了CircuitBreaker接口;其circuitBreak方法会抛出CircuitBreakingException;addEstimateBytesAndMaybeBreak方法则先判断newUsed是否超出memoryBytesLimit,超出则执行circuitBreak方法,最后执行parent.checkParentLimit方法

doc

转载于:https://juejin.im/post/5ce2c6e6f265da1b6a346708

你可能感兴趣的文章
如何将lotus 通讯簿导入到outlook 2003中
查看>>
WinForm 应用程序中开启新的进程及控制
查看>>
前端工程师的职业发展路线在哪?
查看>>
IOS 内存警告 Memory warning level
查看>>
[转]PAC Manager: Ubuntu 上强大的 SSH 帐号管理工具,可取代 SecureCRT_Miracle_百度空间...
查看>>
顺序容器 (2)string类型操作
查看>>
转载:我最近的研究成果(IGeometry.Project and IGeometry.SpatialReference)
查看>>
提示框
查看>>
HDOJ1233 畅通工程之一(最小生成树-Kruscal)
查看>>
14Spring_AOP编程(AspectJ)_环绕通知
查看>>
PHP之打开文件
查看>>
iOS - OC SQLite 数据库存储
查看>>
PHP-mysqllib和mysqlnd
查看>>
Redis常用命令
查看>>
NeHe OpenGL教程 第三十五课:播放AVI
查看>>
Linux下ping命令、traceroute命令、tracert命令的使用
查看>>
js replace,正则截取字符串内容
查看>>
socket
查看>>
Highcharts使用表格数据绘制图表
查看>>
Thinkphp5笔记三:创建基类
查看>>