求助: Java 多线程如何终止其它线程

Alextrasza · 2024-9-25 10:29:10 · 315 次点击
主线程去调用 10 个子线程查询任务(返回 true/false), 在回调中统计结果,  
我想在得到两个 true 时主线程立即返回 true, 不管其他的子线程时已提交到线程池还是未提交到线程池, 都不需要了

我尝试了`CompletableFuture#cancel`, 和`Executor#shutDownNow`方法, 都没达到效果

```java
ThreadPoolExecutor executor = newFixedThreadFool(10);
List<CompletableFuture<Boolean> list = ... //10 个任务, 都提交到 executor
AtomicInteger i = 0;
list.forEach(cf-
        cf.thenAccept(b->
                if(b && i.incrementAndGet >= 2 ){
                        //让主线程停止阻塞立即返回, 尝试下面两个方案都不行
                        executor.shutDownNow();
                        list.forEach(f->f.cancel(true));
                }
))
// 希望通过某种方式让此处停止阻塞
CompletableFuture.allOf(list.toArray).join()

return i.get()>=2;
```
目前的方案是把后续流程放在了子线程回调中, 加锁处理, 主线程不要返回值了, 应该是可行的,  
但是还是想请问大家, 原来的"主线程提前返回"的想法能否实现?
举报· 315 次点击
登录 注册 站外分享
22 条回复  
thetbw 小成 2024-9-25 10:46:40
在任何耗时的方法前,循环中间都判断一下 Thread.currentThread().isInterrupted(),如果为 true ,就 return ,或者抛出中断异常。就可以达到线程终止的目的了。
如果要终止其他线程,就调用目标线程的 Thread.interrupt() 方法,线程池在退出时会自动给你调用这个方法,你只需要做好第一步判断中断就行了。想下 Thread.sleep()
Hito 小成 2024-9-25 10:48:14
CountDownLatch
latch 设置 2 ,子线程中得到 true 时 latch.countDown()。主线程中 latch.await() 。
murenx 初学 2024-9-25 10:55:28
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;

public class SumCalculator {

    private static final int TARGET = 1000; // 目标值,计算从 0 到目标值的累加和
    private static final int THREAD_COUNT = 7; // 线程数量,用于并发计算
    private static final AtomicBoolean finished = new AtomicBoolean(false); // 用于标记是否完成计算
    private static volatile int result = 0; // 用于存储累加结果

    public static void main(String[] args) {
        CompletableFuture<Void>[] futures = new CompletableFuture[THREAD_COUNT]; // 创建 CompletableFuture 数组

        for (int i = 0; i < THREAD_COUNT; i++) {
            futures = CompletableFuture.runAsync(() -> { // 提交异步任务
                int localSum = 0; // 用于存储局部累加结果
                for (int j = 0; j <= TARGET; j++) { // 循环从 0 到目标值
                    if (finished.get()) {
                        return; // 如果计算已完成,则退出
                    }
                    localSum += j; // 累加当前值到局部累加结果
                }
                if (finished.compareAndSet(false, true)) {
                    result = localSum; // 存储计算结果
                    System.out.println("计算完成,结果为:" + result); // 打印计算结果
                }
            });
        }

        CompletableFuture.anyOf(futures).thenRun(() -> { // 当任意一个任务完成时
            finished.set(true); // 标记计算完成
            for (CompletableFuture<Void> future : futures) {
                future.cancel(true); // 取消其他未完成的任务
            }
        });

        try {
            CompletableFuture.allOf(futures).join(); // 等待所有任务完成
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


from kimi
git00ll 初学 2024-9-25 10:58:16
向子线程发送中断,一般 IO 这种的都会抛中断异常。
如果没有堵塞代码你还需要增加 Thread.currentThread().isInterrupted()来判断当前中断标志位
GloryJie 初学 2024-9-25 11:00:40
做过类似的,是多个任务有一个满足就提前返回,可以参考下

public static <T> T supplyBatchPredicateTask(List<Callable<T>> taskList, Predicate<T> predicate, long timeout, Executor executor) {
        CountDownLatch predicateCountDown = new CountDownLatch(1);
        AtomicReference<T> successReference = new AtomicReference<>();
        AtomicReference<T> failReference = new AtomicReference<>();

        List<CompletableFuture<T>> futureList = new ArrayList<>(taskList.size());

        for (Callable<T> task : taskList) {
            CompletableFuture<T> future = CompletableFuture.supplyAsync(() -> {
                try {
                    T result = task.call();
                    if (predicate.test(result)) {
                        successReference.set(result);
                        predicateCountDown.countDown();
                        return result;
                    }
                    failReference.set(result);
                } catch (Exception exception) {
                    log.warn("act=supplyBatchPredicateTask 任务执行失败", exception);
                }
                return null;
            }, executor);
            futureList.add(future);
        }

        //全失败(不仅仅是任务失败,还有是断言失败)的情況下进行唤醒
        CompletableFuture.allOf(futureList.toArray(new CompletableFuture[] {})).whenComplete((result, throwable) -> {
            predicateCountDown.countDown();
        });

        try {
            predicateCountDown.await(timeout, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            // ignore
            log.warn("act=supplyBatchPredicateTask 等待任务执行结果中断", e);
        }
        return successReference.get() != null ? successReference.get() : failReference.get();
    }
awalkingman 小成 2024-9-25 14:06:36
@Hito 这个看起来可以。正常都是 latch 个数等于线程数,子线程都结束了主线程继续往下走。如果 latch 少于开启的子线数,主线程应该可以提前往下走了。剩下的子线程如果能正常结束就正常结束,不能结束就当泄露了。
Rickkkkkkk 小成 2024-9-25 14:12:04
做不到的,你起一个线程,然后里面写个死循环,除了重启没有任何办法可以终止这个死循环。

除非主动代码里去检查某个标记位
MoYi123 小成 2024-9-25 14:32:34
用 tgkill 给线程发信号, 当然这不是一个好的做法.
Richared 小成 2024-9-25 14:48:17
这不就是线程间通信么,首先得通信,代码里检查,保证线程安全,正确通信不就能做到了么
123下一页
返回顶部