ForkJoinTask任务编排测试

书中人 2023年11月22日 260次浏览

ForkJoinTask任务编排小demo

import jodd.util.ThreadUtil;
import org.apache.poi.ss.formula.functions.T;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.function.Function;

/**
 * @author chengzb-b
 * @doc
 * @date 2023/11/22
 */
public interface ForkJoinTaskUtils {
    ForkJoinPool FORK_JOIN_POOL = new ForkJoinPool(4);

    class ForkJoinTaskImpl<T, R> {
        public List<R> exec(Function<T, R> consumer, List<T> t) {
            return this.exec(consumer, t, 5);
        }

        public List<R> exec(Function<T, R> consumer, List<T> t, Integer maxThreshold) {
            ForkJoinTask<List<R>> submit = FORK_JOIN_POOL.submit(new CalculatedRecursiveTask<>(consumer, t, maxThreshold));
            try {
                return submit.get();
            } catch (Exception e) {
                e.fillInStackTrace();
            }
            return null;
        }

        private static class CalculatedRecursiveTask<T, R> extends RecursiveTask<List<R>> {
            private final Integer maxThreshold;
            private final Function<T, R> consumer;
            private final List<T> t;

            public CalculatedRecursiveTask(Function<T, R> consumer, List<T> t, Integer maxThreshold) {
                this.maxThreshold = maxThreshold;
                this.consumer = consumer;
                this.t = t;
            }

            @Override
            protected List<R> compute() {
                if (t.size() <= maxThreshold) {
                    List<R> rList = new ArrayList<>();
                    for (T var1 : t) {
                        rList.add(consumer.apply(var1));
                    }
                    return rList;
                } else {
                    CopyOnWriteArrayList<R> rList = new CopyOnWriteArrayList<>();
                    int middle = t.size() / 2;
                    CalculatedRecursiveTask<T, R> task1 = new CalculatedRecursiveTask<>(consumer, t.subList(0, middle), maxThreshold);
                    CalculatedRecursiveTask<T, R> task2 = new CalculatedRecursiveTask<>(consumer, t.subList(middle, t.size()), maxThreshold);
                    task1.fork();
                    task2.fork();
                    rList.addAll(task1.join());
                    rList.addAll(task2.join());
                    return rList;
                }
            }
        }
    }

    public static void main(String[] args) {
        List list = new ArrayList<>();
        list.add(1);
        list.add(2);
        list.add(3);
        list.add(4);
        list.add(5);
        list.add(6);
        list.add(7);
        list.add(8);
        ForkJoinTaskImpl<List<Integer>, List<Integer>> forkJoinTask = new ForkJoinTaskImpl<>();
        Function consumer = o -> {
            ThreadUtil.sleep(1000);
            return (Integer) o * 100;
        };
        Object exec = forkJoinTask.exec(consumer, list,2);
        System.out.println(exec);
    }
}