【.Net C#】Task をもっと手軽に使いたい

.Net には、マルチスレッドの処理を使いやすくした Task クラスがありますが、使いやすいとは言え、かなりの知見が必要になります。
System.Threading.Tasks 名前空間には、クラスだけでも以下の種類があります。

.Net 8.0 の System.Threading.Tasks 名前空間に含まれるクラスの一部
※タップまたはクリックすると大きい画像で表示します。

.Net のような、あらゆるシチュエーションで使えるように構成された API では、多種多様な機能を用意するのは正しいと思います。
ただ、スレッドプールの恩恵をつまみ食いしたいようなときには、学習コストが高すぎます。
そこで、学習コストをほとんど必要としないマルチスレッド用クラスを作りました。

Notion で公開しています。

dead-library/common/TaskRack.cs をビルドするには、同ライブラリの ID.cs が必要です。
上記 Notion で説明しています。

本稿では、サンプルコードを多めに使って、Notion より具体的な説明を行います。

Task を更に抽象化して、よりイメージしやすく

Task をラップしたクラス設計を行うにあたり、そのまま「タスク」という名前を使うと、.Net の Task と混同してしまうので、別の概念を導入しました。
上記の画像のように、3つのクラスで構成されています。

  1. IChip
  2. Board
  3. Rack

これによって、.Net のタスクのことを考えなくても済むようにしている(つもり)…。

ワークフローのイメージ

チップ(IChip)をボード(Board)に装着することによって、そのボードで行う処理が決まる。
※この状態ではボードに電源を入れることができない。

ボードを棚(Rack)に収納することで通電し、電源を入れることができるようになる。

棚に収納したボードの電源を入れることで、チップの処理が行われる。

次項で具体的にどのようなコードを書く必要があるのかを例示します。

それぞれのクラスの役割
IChip<T>

架空の半導体をイメージしたクラスで、チップと呼びます。
チップは並列処理する際の実際の処理を行います。

定義は以下

public interface IChip<T> {
  int Run(object args);
  T Data { get; set; }
}

インターフェースなので、継承先のクラスで Run メソッドを実装します。
継承先の Run メソッドの中に、処理を書きます。

class TestChip : IChip<int> {
  public int Run(object args) {
    this.Data = (int)args;
    /* 処理を書く */
    return 0;
  }
  public int Data { get; set; } = 0;
}

Run メソッドの引数は object 型ですが、実際の型は T なので、Run メソッド内で T にキャストできます。
このインターフェースを継承したクラスを Board<T> クラスのコンストラクタに渡すことで、ボードにチップを装着することになります。

Board<T>

チップを装着する架空の回路基板をイメージしたクラスで、ボードと呼びます。
ボードは電源の ON/OFF を行うことができ、チップの処理状態を State プロパティーで返すことができます。

ボードの電源を入れるには、棚に収納する必要があります。
簡単な使い方をするだけなら、ボードを new して棚に収納したら終わりです。
棚にボードを収納したら、以降は棚からボードにアクセスするので、TaskRack.cs のほとんどは棚の使い方を調べるだけで済みます。

//チップをボードに装着
var board = new Board<int>(new TestChip());

//チップを装着したボードを棚に収納
ulong board_id = rack.Insert(board);

//棚に収納したボードの電源を入れ、チップの処理が行われるようになる
rack.PowerOn(board_id, 999)

上記のサンプルコードのように、棚に収納したボードにアクセスするには、Rack<T>.Insert メソッドが返したボードの ID を使います。

//ボードの状態を取得する
Board<int> board = rack.GetBoard(board_id);
Board<int>.Status state = board.State;
if (state == Board<int>.Status.Completed) {
  //チップの処理が正常終了したら、棚からボードを取り出す
  rack.Eject(board_id);
}

上記のサンプルコードでは、チップの処理が完了したときに棚に収納したボードを取り出していますが、必ずしもそうする必要はありません。
再度 Rack<T>.PowerOn メソッドを実行することで、異なる引数を与えてチップの処理を行うことができます。
そうすることで、Task(ワーカースレッド)を再利用することができます。

Rack<T>

複数のボードを収納できる架空のコンピューターラックをイメージしたクラスで、棚と呼びます。

棚のコンストラクタに指定した数のボードを Insert メソッドによって収納することができます。
その数を超えて収納しようとした場合、失敗します。

//5個までボードを収納できる棚を生成する
var rack = new Rack<ArgsType>(5);

異なる型のボードは収納できません。

var board = Board<int>(new Chip());
var rack = new Rack<ArgsType>(5);

// <T> が違うので収納できない
rack.Insert(board);

棚の空きを調べたいとき、棚に収納しているボードの数を Count プロパティーで調べることができ、棚に空きがあるかどうかを HasSpace プロパティーで調べることができます。

System.Console.WriteLine("rack used={0}/{1}", rack.Count, rack.MaxNumber);
if (!rack.HasSpace) {
  /* 棚に空きがない */
  return;
}

ボードを棚に収納することで、PowerOn メソッドを使ってボードに電源を入れることができます。

//棚にボードを収納する
ulong board_id = rack.Insert(board);
if (Rack<ArgsType>.IsErrorID(board_id)) {
  /* 失敗 */
  return;
}

var args = ArgsType();
if (!rack.PowerOn(board_id, args)) {
  /* 電源を入れるのに失敗 */
  return;
}

棚に収納したボードに装着しているチップの処理が終わったかどうか?を調べる方法は二通りあります。

  1. 全てのボードの処理が成功したかどうかを調べる。
  2. 特定のボードの処理が成功したかどうかを調べる。
1. 全てのボードの処理が成功したかどうかを調べる。
//全てのボードの処理が完了するまで待つ
while (!rack.IsAllCompleted) {
  System.Threading.Tasks.Task.Delay(100);
}
//全てのボードの処理が完了したら、ここに来る

チップの処理が失敗したり、エラーで中断したりしなければ、上記の方法でチップの処理の終了待ちを行うのが楽です。

2. 特定のボードの処理が成功したかどうかを調べる。
//board_id で指定したボードの状態を調べる
Board<ArgsType>.Status state = Board<ArgsType>.Status.None;
while (state != Board<ArgsType>.Status.Completed) {
  if (state == Board<ArgsType>.Status.Error) {
    /* エラー発生 */
    break;
  }
  else if (state == Board<ArgsType>.Status.Canceled) {
    /* Cancel で中断した */
    break;
  }
  var board = rack.GetBoard(board_id);
  if (board == null) {
    /* ボードの取得に失敗 */
    break;
  }
  state = board.State;
}
//board_id で指定したボードの処理が何らかの原因で終了したら、ここに来る
//state の値に応じた後処理を行うことができる

個別のボードの状態は、上記の方法で調べることができます。
board_id は Rack<T>.Insert メソッドでボードを収納したときに返された個々のボードの ID です。

チップの処理を中断する

チップの処理が完了する前に中断するには、Rack<T>.PowerOff メソッドを実行します。
全てのボードに対して中断するには、Rack<T>.PowerOffAll メソッドを実行します。
上記のメソッドは棚に収納したボードはそのまま残ります。

チップの処理を中断して、棚から破棄したい場合は Rack<T>.Destroy メソッドを実行します。
Destroy メソッドは棚に収納している全てのボードに対して行われます。
個別に破棄したい場合は Rack<T>.Eject メソッドを使用します。

チップの処理が終わったときにコールバックするサンプル

TaskRack.cs には、チップの処理が終了したときにコールバックする機能がありません。
終了時のコールバックについては非推奨です。

これは自前で実装する必要があり、以下にそのサンプルコードを示します。

struct Args {
  public Func<Board<Args>.Status, int> onFinished;
}
class TestChip : IChip<Args> {
  public int Run(object args) {
    this.Data = (Args)args;
    /* チップの処理 */
    this.Data.onFinished?.Invoke(Board<Args>.Status.Completed);
    return 0;
  }
  public int Data { get; set; } = new Args();
}
int OnChipFinished(Board<Args>.Status result) {
  System.Console.WriteLine("result={0}", result);
  return 0;
}

// どこかのメソッド内
void SomeMethod() {
  var int board_number = 5;
  var rack = new Rack<Args>(board_number);
  for (byte i = 0; i < board_number; i++) {
    var args = new Args() { onFinished = OnChipFinished };
    var board = new Board<Args>(new TestChip());
    ulong id = rack.Insert(board);
    if (!rack.PowerOn(id, args)) {
      throw new System.InvalidOperationException();
    }
  }
  while (!rack.IsAllCompleted) {
    System.Threading.Tasks.Task.Delay(100);
  }
}

このコールバックは Task のワーカースレッド(メインスレッド外)から呼ばれるので注意が必要です。
例えば、コールバック先がシングルスレッドでの処理を想定している場合に問題になります。
シングルスレッドで処理しないと問題が出るのに、別スレッドからの呼び出しが割り込むことになるためです。

このような状況では、スレッドセーフ対応が必要になります。
コールバック先では、呼び出されたかどうかをスレッドセーフな Queue に Push(Enqueue) しておき、それ以外の処理は行わないようにします。
シングルスレッドな処理を行っている場所で Queue から Pop(Dequeue) できた場合に限り、そこで本来コールバック先で行う予定だった処理を行います。
コールバックのリクエストをスレッドセーフな Queue にためておき、安全なタイミングで Queue から取り出して処理するということです。
コールバック先のメソッドはマルチスレッドで処理されますが、Queue をスレッドセーフにしておけば、それ以外には影響しないため安全です。

こういった知見が必要になるため、終了時のコールバックは TaskRack.cs では推奨しません。

TaskRack.cs で並列処理を行う際に注意すること

並列処理なので、以下の状況では排他制御が必要になります。

  • スレッドセーフではないメソッドをチップ内で呼び出す。
  • チップが静的フィールドや静的プロパティーを実装する。
  • チップ内で他のクラスの静的フィールドの値を書き換える。
  • チップ内で他のクラスの静的プロパティーにアクセスする。
    ※get が純粋なフィールド値を返すのではなく、様々な値を使って計算した結果を返すような場合は get も対象になる場合がある。
  • 複数のチップで同じファイルや GPU など、1つだけ存在するものに同時にアクセスする。
  • などなど…。

TaskRack.cs では、上記の排他制御が必要になるような使い方は推奨しません。
並列処理をシンプルに行うための仕組みなので、TaskRack.cs が提供する機能では足りなくなる可能性が高いです。
その場合、直接 Task にアクセスする必要が出てきます。
直接 Task にアクセスするなら、TaskRack.cs は使わずに、直接 Task を使うべきです。

排他制御にもいくつかの種類があり、状況に合わせて適切な方法を選択する必要があります。
学習コストが高い上に、並列処理で問題が起きたときの修正コストが高くなりがちです。

TaskRack.cs 側で上記を制限するのは、おそらく不可能なので、ユーザー側で対応する必要があります。

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です