副问题[/!--empirenews.page--]
媒介
近期接到一个使命,必要改革现有从mysql往Elasticsearch导入数据MTE(mysqlToEs)小器材,因为之前回收单线程导入,千亿数据必要两周阁下的时刻才气导入完成,导入服从很是低。以是楼主花了3天的时刻,操作java线程池框架Executors中的FixedThreadPool线程池重写了MTE导入器材,单台处事器导入服从进步十几倍(公道调解线程数据,服从更高)。
要害技能栈
- Elasticsearch
- jdbc
- ExecutorServiceThread
- sql
器材声名
maven依靠
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>${mysql.version}</version>
- </dependency>
- <dependency>
- <groupId>org.elasticsearch</groupId>
- <artifactId>elasticsearch</artifactId>
- <version>${elasticsearch.version}</version>
- </dependency>
- <dependency>
- <groupId>org.elasticsearch.client</groupId>
- <artifactId>transport</artifactId>
- <version>${elasticsearch.version}</version>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <version>${lombok.version}</version>
- </dependency>
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>${fastjson.version}</version>
- </dependency>
java线程池配置
默认线程池巨细为21个,可调解。个中POR为处理赏罚流程已办数据线程池,ROR为处理赏罚流程已阅数据线程池。
- private static int THREADS = 21;
- public static ExecutorService POR = Executors.newFixedThreadPool(THREADS);
- public static ExecutorService ROR = Executors.newFixedThreadPool(THREADS);
界说已办出产者线程/已阅出产者线程:ZlPendProducer/ZlReadProducer
- public class ZlPendProducer implements Runnable {
- ...
- @Override
- public void run() {
- System.out.println(threadName + "::启动...");
- for (int j = 0; j < Const.TBL.TBL_PEND_COUNT; j++)
- try {
- ....
- int size = 1000;
- for (int i = 0; i < count; i += size) {
- if (i + size > count) {
- //浸染为size最后没有100条数据则剩余几条newList中就装几条
- size = count - i;
- }
- String sql = "select * from " + tableName + " limit " + i + ", " + size;
- System.out.println(tableName + "::sql::" + sql);
- rs = statement.executeQuery(sql);
- List<HistPendingEntity> lst = new ArrayList<>();
- while (rs.next()) {
- HistPendingEntity p = PendUtils.getHistPendingEntity(rs);
- lst.add(p);
- }
- MteExecutor.POR.submit(new ZlPendConsumer(lst));
- Thread.sleep(2000);
- }
- ....
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- public class ZlReadProducer implements Runnable {
- ...已阅出产者处理赏罚逻辑同已办出产者
- }
界说已办斲丧者线程/已阅出产者线程:ZlPendConsumer/ZlReadConsumer
- public class ZlPendConsumer implements Runnable {
- private String threadName;
- private List<HistPendingEntity> lst;
- public ZlPendConsumer(List<HistPendingEntity> lst) {
- this.lst = lst;
- }
- @Override
- public void run() {
- ...
- lst.forEach(v -> {
- try {
- String json = new Gson().toJson(v);
- EsClient.addDataInJSON(json, Const.ES.HistPendDB_Index, Const.ES.HistPendDB_type, v.getPendingId(), null);
- Const.COUNTER.LD_P.incrementAndGet();
- } catch (Exception e) {
- e.printStackTrace();
- System.out.println("err::PendingId::" + v.getPendingId());
- }
- });
- ...
- }
- }
- public class ZlReadConsumer implements Runnable {
- //已阅斲丧者处理赏罚逻辑同已办斲丧者
- }
界说导入Elasticsearch数据监控线程:Monitor
(编辑:河北网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|