並行性

並行性と並列性はコンピュータサイエンスにおいて極めて重要なトピックであり、現在では産業界でもホットトピックです。 コンピュータはどんどん多くのコアを持つようになってきていますが、多くのプログラマはまだそれを十分に使いこなす準備ができていません。

Rustのメモリ安全性の機能は、Rustの並行性の話においても適用されます。 Rustプログラムは並行であっても、メモリ安全でなければならず、データ競合を起こさないのです。 Rustの型システムはこの問題を扱うことができ、並行なコードをコンパイル時に確かめるための強力な方法を与えます。

Rustが備えている並行性の機能について語る前に、理解しておくべき重要なことがあります。 それは、Rustは十分にローレベルであるため、その大部分は、言語によってではなく、標準ライブラリによって提供されるということです。 これは、もしRustの並行性の扱い方に気に入らないところがあれば、代わりの方法を実装できるということを意味します。 mio はこの原則を行動で示している実例です。

背景: SendSync

並行性を確かめるのは難しいことです。 Rustには、コードを確かめるのを支援する強力で静的な型システムがあります。 そしてRustは、並行になりうるコードの理解を助ける2つのトレイトを提供します。

Send

最初に取り上げるトレイトは Send です。 型 TSend を実装していた場合、 この型のものはスレッド間で安全に受け渡しされる所有権を持てることを意味します。

これはある種の制約を強制させる際に重要です。 例えば、もし2つのスレッドをつなぐチャネルがあり、そのチャネルを通じてデータを別のスレッドに送れるようにしたいとします。 このときには、その型について Send が実装されているかを確かめます。

逆に、スレッドセーフでない FFI でライブラリを包んでいて、 Send を実装したくなかったとします。 このときコンパイラは、そのライブラリが現在のスレッドの外にいかないよう強制することを支援してくれるでしょう。

Sync

2つ目のトレイトは Sync といいます。 型 TSync を実装していた場合、この型のものは共有された参照を通じて複数スレッドから並行に使われたとしても、必ずメモリ安全であることを意味します。 そのため、 interior mutability を持たない型はもともと Sync であるといえます。 そのような型としては、 u8 などの単純なプリミティブ型やそれらを含む合成型などがあります。

スレッドをまたいで参照を共有するために、Rustは Arc<T> というラッパ型を提供しています。 TSendSync の両方を実装している時かつその時に限り、 Arc<T>SendSync を実装します。 例えば、型 Arc<RefCell<U>> のオブジェクトをスレッドをまたいで受け渡すことはできません。 なぜなら、 RefCellSync を実装していないため、 Arc<RefCell<U>>Send を実装しないためです。

これらの2つのトレイトのおかげで、コードの並行性に関する性質を強く保証するのに型システムを使うことができます。 ただ、それがどうしてかということを示す前に、まずどうやって並行なRustプログラムをつくるかということを学ぶ必要があります!

スレッド

Rustの標準ライブラリはスレッドのためのライブラリを提供しており、それによりRustのコードを並列に走らせることができます。 これが std::thread を使う基本的な例です。

use std::thread; fn main() { thread::spawn(|| { println!("Hello from a thread!"); }); }
use std::thread;

fn main() {
    thread::spawn(|| {
        println!("Hello from a thread!");
    });
}

thread::spawn() というメソッドは クロージャ を受け取り、それを新たなスレッドで実行します。 そして、元のスレッドにハンドルを返します。 このハンドルは、子スレッドが終了するのを待機しその結果を取り出すのに使うことが出来ます。

use std::thread; fn main() { let handle = thread::spawn(|| { "Hello from a thread!" }); println!("{}", handle.join().unwrap()); }
use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        "Hello from a thread!"
    });

    println!("{}", handle.join().unwrap());
}

多くの言語はスレッドを実行できますが、それはひどく危険です。 shared mutable stateによって引き起こされるエラーをいかに防ぐかを丸々あつかった本もあります。 Rustはこれについて型システムによって、コンパイル時にデータ競合を防ぐことで支援します。 それでは、実際にどうやってスレッド間での共有を行うかについて話しましょう。

訳注: "shared mutable state" は 「共有されたミュータブルな状態」という意味ですが、定型句として、訳さずそのまま使用しています。

安全な Shared Mutable State

Rustの型システムのおかげで、「安全な shared mutable state」という嘘のようにきこえる概念があらわれます。 shared mutable state がとてもとても悪いものであるということについて、多くのプログラマの意見は一致しています。

このようなことを言った人がいます。

Shared mutable state is the root of all evil. Most languages attempt to deal with this problem through the 'mutable' part, but Rust deals with it by solving the 'shared' part.

訳: shared mutable state は諸悪の根源だ。 多くの言語は mutable の部分を通じてこの問題に対処しようとしている。 しかし、Rustは shared の部分を解決することで対処する。

ポインタの誤った使用の防止には 所有権のシステム が役立ちますが、このシステムはデータ競合を排除する際にも同様に一役買います。 データ競合は、並行性のバグの中で最悪なものの一つです。

例として、多くの言語で起こるようなデータ競合を含んだRustプログラムをあげます。 これは、コンパイルが通りません。

use std::thread; use std::time::Duration; fn main() { let mut data = vec![1, 2, 3]; for i in 0..3 { thread::spawn(move || { data[i] += 1; }); } thread::sleep(Duration::from_millis(50)); }
use std::thread;
use std::time::Duration;

fn main() {
    let mut data = vec![1, 2, 3];

    for i in 0..3 {
        thread::spawn(move || {
            data[i] += 1;
        });
    }

    thread::sleep(Duration::from_millis(50));
}

以下のようなエラーがでます。

8:17 error: capture of moved value: `data`
        data[i] += 1;
        ^~~~

Rustはこれが安全でないだろうと知っているのです! もし、各スレッドに data への参照があり、スレッドごとにその参照の所有権があるとしたら、3人の所有者がいることになってしまうのです!

そのため、1つの値に対して2つ以上の参照を持てるようにして、スレッド間で共有できるような型が必要です。 そして、その型は Sync を実装していなければなりません。

Rustの標準アトミック参照カウント型である Arc<T> を使いましょう。 これは複数の参照間で値の所有権を同時に共有できるように、値を特別な実行時の管理用データでくるむものです。

その管理用データには、値への参照がいくつ存在しているかというカウントが記録されています。 すなわち名前の「参照カウント」の部分にあたります。

「アトミック」という部分は Arc<T> が複数スレッドから安全にアクセスできることを意味しています。 このためにコンパイラは、内部のカウントの更新には、データ競合が起こりえない分割不能な操作が用いられることを保証します。

use std::thread; use std::sync::Arc; use std::time::Duration; fn main() { let mut data = Arc::new(vec![1, 2, 3]); for i in 0..3 { let data = data.clone(); thread::spawn(move || { data[i] += 1; }); } thread::sleep(Duration::from_millis(50)); }
use std::thread;
use std::sync::Arc;
use std::time::Duration;

fn main() {
    let mut data = Arc::new(vec![1, 2, 3]);

    for i in 0..3 {
        let data = data.clone();
        thread::spawn(move || {
            data[i] += 1;
        });
    }

    thread::sleep(Duration::from_millis(50));
}

ここで Arc<T> について clone() を呼んで、内部のカウントを増やしています。 そして、このハンドルは新たなスレッドに移動されます。

そうすると... まだ、エラーがでます。

<anon>:11:24 error: cannot borrow immutable borrowed content as mutable
<anon>:11                    data[i] += 1;
                             ^~~~

Arc<T> はスレッドをまたいだ共有を安全にするために、その中身に対してもう一つの仮定をおいています。 それは、中身が Sync であるという仮定です。 この仮定は値がイミュータブルであるときは真になりますが、今回は値を変化できるようにしたいです。 そのため、借用チェッカに対し、我々は自分たちが何をやっているかを知っています、と説得するための何かが必要になります。

共有された値を安全に変更できるようにするための型が必要そうです。 例えば、どの時点においても、同時に一つのスレッドのなかでしか値は変更できないということを保証できる型です。

そのためには、 Mutex<T> 型を使うことができます!

これが動くバージョンです。

use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; fn main() { let data = Arc::new(Mutex::new(vec![1, 2, 3])); for i in 0..3 { let data = data.clone(); thread::spawn(move || { let mut data = data.lock().unwrap(); data[i] += 1; }); } thread::sleep(Duration::from_millis(50)); }
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

fn main() {
    let data = Arc::new(Mutex::new(vec![1, 2, 3]));

    for i in 0..3 {
        let data = data.clone();
        thread::spawn(move || {
            let mut data = data.lock().unwrap();
            data[i] += 1;
        });
    }

    thread::sleep(Duration::from_millis(50));
}

i の値はクロージャへ束縛(コピー)されるだけで、スレッド間で共有されるわけではないことに注意してください。

また、Mutexlock メソッドは以下のシグネチャを持つことにも注意してください。

fn main() { fn lock(&self) -> LockResult<MutexGuard<T>> }
fn lock(&self) -> LockResult<MutexGuard<T>>

そして、 SendMutexGuard<T> に対して実装されていないため、ガードはスレッドの境界をまたげず、ロックの獲得と解放のスレッドローカル性が保証されています。

それでは、スレッドの中身をさらに詳しく見ていきましょう。

use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; fn main() { let data = Arc::new(Mutex::new(vec![1, 2, 3])); for i in 0..3 { let data = data.clone(); thread::spawn(move || { let mut data = data.lock().unwrap(); data[i] += 1; }); } thread::sleep(Duration::from_millis(50)); }
thread::spawn(move || {
    let mut data = data.lock().unwrap();
    data[i] += 1;
});

まず、 lock() を呼び、mutex のロックを獲得します。 これは失敗するかもしれないため、Result<T, E> が返されます。 そして、今回は単なる例なので、データへの参照を得るためにそれを unwrap() します。 実際のコードでは、ここでもっとちゃんとしたエラーハンドリングをするでしょう。 そうしたら、ロックを持っているので、自由に値を変更できます。

最後の部分で、スレッドが実行されている間、短いタイマーで待機しています。 しかし、これはよろしくないです。 というのも、ちょうどよい待機時間を選んでいた可能性より、必要以上に長い時間待ってしまっていたり、十分に待っていなかったりする可能性の方が高いからです。 適切な待ち時間というのは、プログラムを実行した際に、実際に計算が終わるまでどれだけの時間がかかったかに依存します。

タイマーに代わるより良い選択肢は、Rust標準ライブラリによって提供されている、スレッドがお互いに同期するためのメカニズムを用いることです。 それでは、そのようなものの一つについて話しましょう。 チャネルです。

チャネル

このコードが、適当な時間を待つ代わりに、同期のためにチャネルを使ったバージョンです。

use std::sync::{Arc, Mutex}; use std::thread; use std::sync::mpsc; fn main() { let data = Arc::new(Mutex::new(0)); let (tx, rx) = mpsc::channel(); for _ in 0..10 { let (data, tx) = (data.clone(), tx.clone()); thread::spawn(move || { let mut data = data.lock().unwrap(); *data += 1; tx.send(()).unwrap(); }); } for _ in 0..10 { rx.recv().unwrap(); } }
use std::sync::{Arc, Mutex};
use std::thread;
use std::sync::mpsc;

fn main() {
    let data = Arc::new(Mutex::new(0));

    let (tx, rx) = mpsc::channel();

    for _ in 0..10 {
        let (data, tx) = (data.clone(), tx.clone());

        thread::spawn(move || {
            let mut data = data.lock().unwrap();
            *data += 1;

            tx.send(()).unwrap();
        });
    }

    for _ in 0..10 {
        rx.recv().unwrap();
    }
}

mpsc::channel() メソッドを使って、新たなチャネルを生成しています。 そして、ただの () をチャネルを通じて単に send し、それが10個戻ってくるのを待機します。

このチャネルはただシグナルを送っているだけですが、 Send であるデータならばなんでもこのチャネルを通じて送れます!

use std::thread; use std::sync::mpsc; fn main() { let (tx, rx) = mpsc::channel(); for i in 0..10 { let tx = tx.clone(); thread::spawn(move || { let answer = i * i; tx.send(answer).unwrap(); }); } for _ in 0..10 { println!("{}", rx.recv().unwrap()); } }
use std::thread;
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();

    for i in 0..10 {
        let tx = tx.clone();

        thread::spawn(move || {
            let answer = i * i;

            tx.send(answer).unwrap();
        });
    }

    for _ in 0..10 {
        println!("{}", rx.recv().unwrap());
    }
}

ここでは、10個のスレッドを生成し、それぞれに数値 ( spawn() したときの i ) の2乗を計算させ、その答えをチャネルを通じて send() で送り返させています。

パニック

panic! は現在実行中のスレッドをクラッシュさせます。 Rustのスレッドは独立させるための単純なメカニズムとして使うことができます。

fn main() { use std::thread; let handle = thread::spawn(move || { panic!("oops!"); }); let result = handle.join(); assert!(result.is_err()); }
use std::thread;

let handle = thread::spawn(move || {
    panic!("oops!");
});

let result = handle.join();

assert!(result.is_err());

Thread.join()Result を返し、これによってスレッドがパニックしたかどうかをチェックできます。