博客 / 詳情

返回

由於鎖超時讓我發現了parallelStream並行流的關於線程上下文的一個坑

就我之前因為在處理jpa持久化對象上下文
(文:https://segmentfault.com/a/1190000043581830)
時,parallelStream並行流給我的印象就是會讀不到父線程的上下文的,所以應該在父線程裏的事務和在parallelStream裏的事務應該是區分的,而不是共用同一個事務的,然而今天因為一個鎖超時的問題,發現並沒有那麼簡單,下面我們一步一步來驗證。

首先説下我鎖超時的場景:具體的業務我不講了,就説下偽代碼

@PostMapping("/saveUser")
@Transactional
public void saveUser(@RequestBody List<Complex> list) {
    list.parallelStream().forEach(complex->{
        Integer appId = complex.getAppId();
        Integer userId = complex.getUserId();
        GeneratedKeyHolder keyHolder = new GeneratedKeyHolder();
        String sql = "insert ignore into open_app_user (app_id, open_id, user_status, creator, modifier, create_time, modify_time, status, version) values ("+appId+","+userId+",0,1,1,now(),now(),1,1)";
        int id = jdbcTemplate.update(con -> con.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS), keyHolder);  
    });
    //todo 業務邏輯...
}

這裏我有個批量保存的邏輯,需要先保存一箇中間表open_app_user表(該表app_id和open_id是聯合唯一鍵)獲得id,拿到用户的open_app_user_id後再進行其他業務邏輯,這裏按我原來的理解是雖然我在controller的方法上加了@Transactional註解,但是parallelStream裏的事務應該都是獨立的,不會是同一個事務,所以即使有數據重複,第一個線程插入後,第二個線程也只會插入失敗(不會報錯,因為我加了ignore),所以即使並行也不會有問題的,然而卻發生了鎖超時的問題。

查看鎖超時以及定位的操作可以看我前面的文章,通過查找mysql的

select * from information_schema.INNODB_TRX;
select * from performance_schema.data_lock_waits;
select * from performance_schema.data_locks;

定位到了這裏,然而我也百思不得其解,為啥會鎖超時呢,這裏應該都是馬上執行就馬上釋放了啊,難道是其中的事務沒有提交?

因為現在都是spring的聲明式事務管理,spring是在有@Transactional註解的情況下,執行完了才提交事務,在沒有@Transactional註解的情況下,每個方法都差不多可以理解成原子,比如我上面的jdbcTemplate.update()這個方法就是一個事務,執行完了就直接提交事務了。

因為spring是把事務上下文放在ThreadLocal裏了,主要是用TransactionSynchronizationManager這個類來管理,所以我寫了一個demo來進行驗證

@GetMapping("/get")
@Transactional
public String get() {
    List<Complex> list = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
        list.add(new Complex(1, 1));
    }
    list.parallelStream().forEach(complex->{
        Map<Object, Object> resourceMap = TransactionSynchronizationManager.getResourceMap();
        System.err.println("count:"+resourceMap.size());
        Integer appId = complex.getAppId();
        Integer userId = complex.getUserId();
        String sql = "insert ignore into open_app_user (app_id, open_id, user_status, creator, modifier, create_time, modify_time, status, version) values ("+appId+","+userId+",0,1,1,now(),now(),1,1)";
        int update = jdbcTemplate.update(sql);
    });
    return "hello, world! ";
}

有趣的事情發生了,我在註釋掉@Transactional註解時,代碼裏resourceMap.size()返回的內容是竟然不一樣,因為我的list有10條記錄,差不多就是10個並行,然而我的輸出卻是:

count:1
count:0
count:0
count:0
count:0
count:0
count:0
count:0
count:0
count:0

沒有註釋掉@Transactional註解時,輸出是:

count:2
count:0
count:0
count:0
count:0
count:0
count:0
count:0
count:0
count:0

並且還會出現鎖超時的現象,奇怪的地方就是為啥我用的parallelStream會有線程上下文裏的值,我並沒有做什麼操作,而且10個並行裏只有一個(這裏並不是説明固定只有一次,下面會説明)獲得了線程上下文的信息,我又進一步測試,偽代碼改成:

@GetMapping("/get")
public void get() {
    List<Complex> list = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
        list.add(new Complex(1, 1));
    }
    ThreadLocal local = new ThreadLocal();
    local.set("parent_set_value");
    list.parallelStream().forEach(complex->{
        System.err.println(local.get());
    });
}

結果如我所料,輸出為:

parent_set_value
null
null
null
null
null
null
null
null
null

使用parallelStream並不完全都是另開了線程,其中有一個是屬於主線程的,可以使用System.err.println(Thread.currentThread().getName());查看當前線程的名稱,我發現parallelStream會把當前主線程也作為一個執行線程去執行任務

後面我再去了解了一下parallelStream的實現,在這個方法上的註解裏第一句話有個單詞是possibly,是“可能”返回並行流,原來參與並行處理的線程有主線程以及ForkJoinPool中的worker線程,所以parallelStream是有兩種情況的,一是可能只一個線程併發執行,二是多個線程並行執行,而我這裏導致鎖超時,就是因為用到了主線程,所以在並行插入的時候,有個處理有事務上下文,導致一直沒有提交事務(@Transactional註釋方法的方法沒有跑完,這裏也不可能跑完),所以其他線程的插入就一直等待這個,產生了鎖超時報錯

user avatar lankerens 頭像 snower 頭像 xiangrikui_61e5519df2291 頭像 longbig 頭像 zoux 頭像 smart_doc 頭像 iot_full_stack 頭像 u_16213664 頭像 onekbitdaohang 頭像 anan_5ca066790c21a 頭像 zz_641473ad470bc 頭像 satoken 頭像
12 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.