一种基于二分法的变步长批量处理算法

时间:2024-03-16 10:07:41

1、前言

  变步长批量处理算法,在实现某些功能时,非常需要。如数据传输和数据导入时,使用可变步长的批量处理算法,可以极大地提高系统的性能,。

1.1、数据传输

  在不稳定的网络环境下,传输失败的几率提高,大的数据块可能会传输失败,如果分为小的数据块,可以传输成功,但由于传输开销,传输效率低。因此希望在网络好的时候,传输大的数据块或高分辨率的图像;网络差的时候,传输小的数据块或对图像进行降质处理,调低图像分辨率,提高压缩比等。因此,可变概念,可以提升服务功能的质量,提升系统的可用性。

1.2、数据导入

  数据导入,特别是ETL,大量数据导入,效率非常重要。对于大多数数据库,如Myql,批量新增(如100条记录)和单记录新增的时间消耗相差无几,但处理能力有百倍之差。

  曾经,笔者使用逐条数据insert,300万条记录导入,化了半个小时,简直无法忍受,于是,后来改为使用100条批量insert,但由于数据中存在这种那种的异常数据,经常出现一条异常数据导致成批(100条)的数据导入失败,这样,一次导入数据中可能有几条坏数据,导致几百上千条数据没有入库成功,于是,再修改代码,针对这些没入库成功的几百上千条记录里,逐条导入,检测出具体的坏数据。整个过程不堪回首。

  因此,数据导入需要可变步长算法,这样可用极大提升数据导入的处理能力。

  另外,还有一种Excel数据导入,如规定按记录的编码(字符串类型,如身份证号、手机号、订单编号等,唯一键字段)作为记录的特征字段,但表格数据中有新增的,还有修改的,即如果为新的编码值,为新增记录,如果数据库中编码值已存在,则需要修改记录。这种导入,如果采用固定批量值导入,新增的失败率是很高的,如果批量一旦失败就改为逐条导入的策略,也是效率不高,可变步长算法可非常高效地解决此问题。

1.3、其它应用场景

  对这种可变的需求,具体很大的通用性,如与搜索相关的,也可以用可变步长算法来提升处理性能。

2、算法原理

2.1、算法概述

  可变步长算法,不是个新鲜的概念,区别在于变化的依据和策略,如最大似然估计、梯度下降等,以及算法复杂度和是否简单易用。

  本算法可以归于简单的决策树,有贪婪算法的因子,计算量很少。接口调用可以内嵌方法(类似C++的指针函数)来执行批量处理工作和单条数据修正处理工作。二分法是非常经典的算法,本算法的核心还是二分法,也可以说依据下列公式而开发:

\[1 + 2 + 4 + ... + 2^n = 2^{n+1}-1 \]

  使用QoS(Quality of Service,服务等级)的概念,将处理等级与处理能力(批量值)建立联系,QoS等级对应的批量值为\(2^0\)\(2^n\)的一个连续序列,如:[128,64,32,16,8,4,2,1],上限为\(2^n\),这里n=7,下限为约定为1。等级0对应128,等级7对应1,等级值即为等级数组的下标。

  具体算法如下:

  1. 对于长度为n的输入数据列表,类型为泛型类型T,即数据为List。另外提供2个T类型的列表参数,为批量处理成功的数据列表和修正处理成功的数据列表,便于外部进一步处理(如相关数据一致性处理)。
  2. 算法返回处理异常记录的日志列表,字符串类型。使用者可以调整日志的格式和内容,以便定位异常数据的具体位置。
  3. 设置一个布尔型的异常标志值bError,用于记录之前是否发生了批量处理的异常,初值为false,该异常标志值在单条记录的修正处理后被复位为false。
  4. 设置一个数据列表下标锚点anchorIdx,表示当前正在处理的数据位置,初始为0。
  5. 设置一个当前等级值levelIdx,初值随意,这里设置初值为0,即从最高等级开始。
  6. 如果bError为false,即当前无异常,按照当前等级对应的批量值进行批量处理,如果成功,则提升一个等级(如果已为等级上限,则维持),并更新锚点anchorIdx到新的位置;如果处理失败,则设置bError为true,锚点anchorIdx不变,如当前等级不为等级下限,则下降一个等级;如果已是等级下限,则按照单条数据的修正处理方法进行处理,如果修正成功,则加入修正数据列表中,如果修正失败,则加入异常日志列表中,不管修正成功与否,修正处理后,anchorIdx均加1,且设置bError为false,表示异常数据已给检测出并按照规则进行处置了。
  7. 如果bError为true,即当前处于异常状态,按照当前等级对应的批量值进行批量处理,如果成功,则下降一个等级(如果已为等级下限,则维持),并更新锚点anchorIdx到新的位置;如果失败,且不为等级下限值,则锚点anchorIdx不变,下降一个等级;如果失败,且当前等级为等级下限值,则按照单条数据的修正处理方法进行处理,如果修正成功,则加入修正数据列表中,如果修正失败,则加入异常日志列表中,不管修正成功与否,修正处理后,anchorIdx均加1,且设置bError为false。
  8. 结束条件,anchorIdx到达n,即所有数据被处理完毕。

  这里有两个外部方法,批量处理方法和单条修正方法,这是一个非常抽象的接口方法,具体对数据进行怎么处理,由外部根据需要自行确定。后面提供的单元测试代码,给出批量数据类型转换的例子。

  当bError为true时,批量处理成功,仍然要降级,是因为前面等级发生异常时,数据范围包括了后面等级批量值之和的范围:

\[2^{n+1}>2^n+...+2+1 \]

  当等级批量值\(2^{n+1}\)批量尝试发生异常时,设置bError为true,然后如果\(2^n\)批量处理成功,意味着后面\(2^n\)批量必然失败,因此必须降级,才可能避免失败;当然如果前\(2^n\)批量处理失败,意味着异常数据至少在这些数据中存在,也需要降级尝试,才可能避免失败。

2.2、算法代码

  算法代码使用java语言,很容易转成python或其它语言,代码在github上:https://github.com/alabo1999/framework_algoset/commit/dacefcbc9bf2ad95c05ab7671a12054178e7f90e 包含一个算法类文件BatchProcess.java和单元测试文件BatchProcessTest.java,其中BatchProcess.java只有一个方法,该方法的接口形式如下:

	/**
	 * @methodName		: varStepBatchProcess
	 * @description		: 可变步长的批量处理算法
	 * @param <T>		: 泛型类型
	 * @param object	: 提供batchProcMethod和singleProcMethod方法的类对象
	 * @param dataRowList		: 待处理的T类型对象数据列表
	 * @param normalList		: 正常处理的对象列表
	 * @param correctList		: 修改处理的对象列表
	 * @param batchProcMethod	: 正常批量处理的方法
	 * @param singleProcMethod	: 单条修正处理的方法
	 * @param debugLevel		: 调试信息输出设置,bit0-输出修正处理信息,bit1-输出详细步骤,bit2:输出尝试次数
	 * @return					: 处理过程产生的异常日志列表
	 */
	public static <T> List<String> varStepBatchProcess(
			Object object,
			List<T> dataRowList,
			List<T> normalList,
			List<T> correctList,			
			Method batchProcMethod,
			Method singleProcMethod,
			int debugLevel);

2.3、算法测试

  单元测试文件BatchProcessTest.java,给出了算法测试,代码如下:

package com.abc.example.service;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional;

import com.abc.example.common.impexp.BatchProcess;

/**
 * @className	: BatchProcessTest
 * @description	: 批量处理测试类
 * @summary		:
 * @history		:
 * ------------------------------------------------------------------------------
 * date			version		modifier		remarks                   
 * ------------------------------------------------------------------------------
 * 2022/01/20	1.0.0		sheng.zheng		初版
 *
 */
@RunWith(SpringRunner.class)
@SpringBootTest
@Transactional
public class BatchProcessTest {
	
	@Test
	// 可变步长的批量处理算法测试
	public void varStepBatchProcessTest() {
		
		// 构造待处理的数据,数据类型为String
		List<String> dataRowList = new ArrayList<String>();
		int idx = 0;
		for (int i = 0; i < 1000; i++) {
			String str = "";
			if (i % 129 == 0) {
				str = i + ".1";
				idx ++;
				if (idx % 2 == 0) {
					str += "abc";
				}
			}else {
				str = i + "";
			}
			dataRowList.add(str);
		}
		
		System.out.println(dataRowList);
		
		// 调用算法
		Method method1 = getMethodByName(this,"batchProcMethod");
		Method method2 = getMethodByName(this,"singleProcMethod");
		// 用于存放正常批量处理的数据
		List<String> normalList = new ArrayList<String>();
		// 用于存放修正处理的数据
		List<String> correctList = new ArrayList<String>();
		// 调用算法
		List<String> errorList = BatchProcess.varStepBatchProcess(this, dataRowList, 
				normalList, correctList, method1, method2,0x05);
		// 打印errorList
		System.out.println("errorList: " + errorList.toString());
		// 打印correctList
		System.out.println("correctList: " + correctList.toString());
		
	}
	
	// 构造批量处理的方法
	// 将列表中字符串,批量转为整型,被反射调用,必须是public的
	public void batchProcMethod(List<String> subDataList) {
		for (String item : subDataList) {
			Integer.valueOf(item);
		}
	}
	
	// 构造单记录处理的方法,被反射调用,必须是public的
	public String singleProcMethod(Exception e,String item) {
		String errInfo = "";
		try {
			Double.valueOf(item).intValue();
		}catch(Exception ex) {
			errInfo = ex.getMessage();
		}
		
		return errInfo;
	}
	
	// 根据方法名称获取方法对象
	private Method getMethodByName(Object object,String methodName) {
		Class<?> class1 = object.getClass();
		Method retItem = null;
		Method[] methods = class1.getMethods();
		for (int i = 0; i < methods.length; i++) {
			Method item = methods[i];
			// System.out.println(item.getName());
			if (item.getName() == methodName) {
				retItem = item;
				break;
			}
		}
		return retItem;
	}	

}

  构造了1000个字符串的列表,这里使用String来作为数据类型,除了下标为129的整数倍之外的值=i+""的字符串,其它的取值,如果为129的偶数倍,则为i+"0.1"的字符串;如果为129的奇数倍,则为i+”0.1abc“。

  批量处理方法batchProcMethod,实现对当前批量数据的类型转换,将字符串转为整数,当数据列表包含.1或.1abc的异常数据时,Integer.valueOf(item);会抛出异常,导致该批量处理失败;

  单数据修正方法singleProcMethod,实现了处理"0.1"之类的double型字符串转换成,但针对".1abc"后缀的数据仍然处理失败。

  执行测试代码,输出结果如下:

single tryNum = 9, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 0,bError=true
0.1: can be fixed.
single tryNum = 24, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 129,bError=true
129.1abc: can not be fixed.
single tryNum = 39, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 258,bError=true
258.1: can be fixed.
single tryNum = 54, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 387,bError=true
387.1abc: can not be fixed.
single tryNum = 69, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 516,bError=true
516.1: can be fixed.
single tryNum = 84, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 645,bError=true
645.1abc: can not be fixed.
single tryNum = 99, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 774,bError=true
774.1: can be fixed.
single tryNum = 114, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 903,bError=true
903.1abc: can not be fixed.
tryNum: 120
errorList: [For input string: "129.1abc", For input string: "387.1abc", For input string: "645.1abc", For input string: "903.1abc"]
correctList: [0.1, 258.1, 516.1, 774.1]

  tryNum: 120,表示这个算法总共尝试了120次处理尝试,包括批量处理和修正处理。需要想要看每一次处理尝试的详细信息,将debugLevel设置为7,可以看到详细的输出(摘录部分):

batch  tryNum = 1, levelIdx = 0,levelNum = 128,batchNum = 128,anchorIdx = 0,bError=false
java.lang.reflect.InvocationTargetException
batch  tryNum = 2, levelIdx = 1,levelNum = 64,batchNum = 64,anchorIdx = 0,bError=true
java.lang.reflect.InvocationTargetException
batch  tryNum = 3, levelIdx = 2,levelNum = 32,batchNum = 32,anchorIdx = 0,bError=true
java.lang.reflect.InvocationTargetException
batch  tryNum = 4, levelIdx = 3,levelNum = 16,batchNum = 16,anchorIdx = 0,bError=true
java.lang.reflect.InvocationTargetException
batch  tryNum = 5, levelIdx = 4,levelNum = 8,batchNum = 8,anchorIdx = 0,bError=true
java.lang.reflect.InvocationTargetException
batch  tryNum = 6, levelIdx = 5,levelNum = 4,batchNum = 4,anchorIdx = 0,bError=true
java.lang.reflect.InvocationTargetException
batch  tryNum = 7, levelIdx = 6,levelNum = 2,batchNum = 2,anchorIdx = 0,bError=true
java.lang.reflect.InvocationTargetException
batch  tryNum = 8, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 0,bError=true
java.lang.reflect.InvocationTargetException
single tryNum = 9, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 0,bError=true
0.1: can be fixed.
batch  tryNum = 10, levelIdx = 6,levelNum = 2,batchNum = 2,anchorIdx = 1,bError=false
batch  tryNum = 11, levelIdx = 5,levelNum = 4,batchNum = 4,anchorIdx = 3,bError=false
batch  tryNum = 12, levelIdx = 4,levelNum = 8,batchNum = 8,anchorIdx = 7,bError=false
batch  tryNum = 13, levelIdx = 3,levelNum = 16,batchNum = 16,anchorIdx = 15,bError=false
batch  tryNum = 14, levelIdx = 2,levelNum = 32,batchNum = 32,anchorIdx = 31,bError=false
batch  tryNum = 15, levelIdx = 1,levelNum = 64,batchNum = 64,anchorIdx = 63,bError=false
batch  tryNum = 16, levelIdx = 0,levelNum = 128,batchNum = 128,anchorIdx = 127,bError=false
java.lang.reflect.InvocationTargetException
batch  tryNum = 17, levelIdx = 1,levelNum = 64,batchNum = 64,anchorIdx = 127,bError=true
java.lang.reflect.InvocationTargetException
batch  tryNum = 18, levelIdx = 2,levelNum = 32,batchNum = 32,anchorIdx = 127,bError=true
java.lang.reflect.InvocationTargetException
batch  tryNum = 19, levelIdx = 3,levelNum = 16,batchNum = 16,anchorIdx = 127,bError=true
java.lang.reflect.InvocationTargetException
batch  tryNum = 20, levelIdx = 4,levelNum = 8,batchNum = 8,anchorIdx = 127,bError=true
java.lang.reflect.InvocationTargetException
batch  tryNum = 21, levelIdx = 5,levelNum = 4,batchNum = 4,anchorIdx = 127,bError=true
java.lang.reflect.InvocationTargetException
batch  tryNum = 22, levelIdx = 6,levelNum = 2,batchNum = 2,anchorIdx = 127,bError=true
batch  tryNum = 23, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 129,bError=true
java.lang.reflect.InvocationTargetException
single tryNum = 24, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 129,bError=true
129.1abc: can not be fixed.

3、算法在数据导入时的应用

  批量处理方法batchProcMethod,实现insertItems语句即可,注意,不要使用try/catch,否则拦截了异常,算法会认为批量处理成功。

  单数据修正方法singleProcMethod,实现updateItems语句即可,注意由于此时实现按特征字段的值,修改记录的导入字段的值,实际上只修改了一条记录,必须使用选择性字段更新方法,避免未导入字段被item对象的默认值所覆盖。

  大致形式如下:

	public <T> void batchProcMethod(List<T> subDataList) {
		xxxDao.insertItems(subDataList);
	}
	
	// 构造单记录处理的方法,被反射调用,必须是public的
	public <T> String singleProcMethod(Exception e,T item) {
		String errInfo = "";
		if (e instanceof DuplicateKeyException) {
			// 如果唯一键重复,则update
			// 根据导入字段列表,抽取相关字段
			// String[] importFieldNames = new String[]{"fieldname1","fieldname2",...,"fieldnamen"};
			// Map<String,Object> map = itemToMap(item,importFieldNames);
			xxxDao.updateItems(map)
		}else{
            errInfo = e.getMessage();
        }
		return errInfo;
	}

  其中itemToMap方法,可使用反射方法,编写为静态方法,供所有实体类使用。

4、算法应用注意事项

  算法代码在github上,公开状态,使用了apache license 2.0,如需使用,请按照许可证要求即可。