콘텐츠로 이동

C# Task Parallel Library 심화 — 병렬 처리 패턴

Task Parallel Library(TPL)는 .NET에서 CPU 바운드 병렬 작업을 위한 핵심 라이브러리입니다. async/await가 I/O 바운드 비동기 작업을 담당한다면, TPL은 CPU 코어를 최대한 활용하는 데이터 병렬 처리에 특화되어 있습니다.


using System.Threading.Tasks;
// 기본 병렬 루프
Parallel.For(0, 1000, i =>
{
HeavyComputation(i);
});
// 병렬화 옵션 제어
var options = new ParallelOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount / 2,
CancellationToken = cts.Token
};
Parallel.ForEach(dataList, options, item =>
{
Process(item);
});

I/O 바운드 작업을 병렬로 실행할 때 사용합니다.

var urls = new[] { "https://a.com", "https://b.com", "https://c.com" };
await Parallel.ForEachAsync(urls,
new ParallelOptions { MaxDegreeOfParallelism = 4 },
async (url, ct) =>
{
var response = await httpClient.GetAsync(url, ct);
await ProcessResponseAsync(response);
});

using System.Linq;
// 순서 무관 병렬 처리 (가장 빠름)
var results = data
.AsParallel()
.Where(x => x.IsValid)
.Select(x => Transform(x))
.ToList();
// 순서 보존 병렬 처리
var ordered = data
.AsParallel()
.AsOrdered()
.Select(x => Transform(x))
.ToList();
// 병렬화 정도 제어
var limited = data
.AsParallel()
.WithDegreeOfParallelism(4)
.Select(x => HeavyWork(x))
.ToList();

4. TPL Dataflow — 파이프라인 처리

섹션 제목: “4. TPL Dataflow — 파이프라인 처리”
using System.Threading.Tasks.Dataflow;
// 변환 블록 정의
var downloader = new TransformBlock<string, byte[]>(
async url => await httpClient.GetByteArrayAsync(url),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
var parser = new TransformBlock<byte[], ParsedData>(
data => Parse(data),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });
var saver = new ActionBlock<ParsedData>(
async parsed => await db.SaveAsync(parsed));
// 파이프라인 연결
downloader.LinkTo(parser, new DataflowLinkOptions { PropagateCompletion = true });
parser.LinkTo(saver, new DataflowLinkOptions { PropagateCompletion = true });
// 데이터 투입
foreach (var url in urls)
await downloader.SendAsync(url);
downloader.Complete();
await saver.Completion;

// 모든 작업 완료 대기
var tasks = items.Select(item => ProcessAsync(item));
var results = await Task.WhenAll(tasks);
// 가장 빠른 것 하나만 대기
var fastest = await Task.WhenAny(
FetchFromPrimaryAsync(),
FetchFromFallbackAsync());
var data = await fastest; // 완료된 Task의 결과 추출

// 잘못된 예 — 레이스 컨디션
int count = 0;
Parallel.For(0, 1000, _ => count++); // 결과 불정확
// 올바른 예 1 — Interlocked
int count = 0;
Parallel.For(0, 1000, _ => Interlocked.Increment(ref count));
// 올바른 예 2 — 로컬 합계 후 병합
long total = 0;
Parallel.For<long>(0, 1000,
() => 0L,
(i, _, local) => local + Compute(i),
local => Interlocked.Add(ref total, local));

7. CPU 바운드 vs I/O 바운드 선택 기준

섹션 제목: “7. CPU 바운드 vs I/O 바운드 선택 기준”
상황권장
CPU 집약적 계산 (이미지, 암호화)Parallel.For, PLINQ
많은 I/O 동시 실행Parallel.ForEachAsync
단계적 데이터 처리TPL Dataflow
단순 비동기 I/Oasync/await + Task.WhenAll

try
{
Parallel.For(0, 100, i =>
{
if (i == 42) throw new InvalidOperationException($"항목 {i} 처리 실패");
Process(i);
});
}
catch (AggregateException ae)
{
// 병렬 루프의 모든 예외가 AggregateException으로 묶임
foreach (var inner in ae.InnerExceptions)
Console.WriteLine($"오류: {inner.Message}");
// 특정 예외만 처리
ae.Handle(ex => ex is InvalidOperationException);
}
// Task.WhenAll도 동일
try
{
await Task.WhenAll(tasks);
}
catch
{
// 첫 번째 예외만 다시 던짐
var errors = tasks
.Where(t => t.IsFaulted)
.Select(t => t.Exception!.InnerException!)
.ToList();
}

9. Parallel.ForAsync (.NET 8 확장 패턴)

섹션 제목: “9. Parallel.ForAsync (.NET 8 확장 패턴)”
// .NET 8+: Parallel.ForAsync — 비동기 인덱스 루프
await Parallel.ForAsync(0, items.Length,
new ParallelOptions { MaxDegreeOfParallelism = 4 },
async (i, ct) =>
{
var result = await ProcessItemAsync(items[i], ct);
results[i] = result;
});

// .NET 9: 완료 순서대로 결과 처리
var tasks = urls.Select(url => FetchAsync(url)).ToList();
await foreach (var completed in Task.WhenEach(tasks))
{
try
{
var result = await completed;
Console.WriteLine($"완료: {result.Url}");
}
catch (Exception ex)
{
Console.WriteLine($"실패: {ex.Message}");
}
}

대용량 항목을 청크 단위로 병렬 처리합니다.

using System.Buffers;
async Task ProcessInBatchesAsync<T>(
IReadOnlyList<T> items,
int batchSize,
Func<IEnumerable<T>, CancellationToken, Task> processor,
CancellationToken ct = default)
{
var batches = items
.Chunk(batchSize) // .NET 6+: IEnumerable<T[]>
.ToList();
await Parallel.ForEachAsync(batches,
new ParallelOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount,
CancellationToken = ct
},
async (batch, token) => await processor(batch, token));
}
// 사용
await ProcessInBatchesAsync(
allOrders,
batchSize: 100,
processor: async (batch, ct) =>
{
await db.BulkInsertAsync(batch, ct);
});

// ⚠️ PLINQ는 CPU 바운드에만 유효 — I/O 바운드에 사용하지 마세요
// ❌ 잘못된 사용: I/O 바운드 작업
var results = urls
.AsParallel()
.Select(url => httpClient.GetStringAsync(url).Result) // 데드락 위험
.ToList();
// ✅ 올바른 I/O 병렬: Parallel.ForEachAsync 사용
await Parallel.ForEachAsync(urls, async (url, ct) =>
{
var content = await httpClient.GetStringAsync(url);
ProcessContent(content);
});
// ⚠️ AsOrdered()는 병렬 이득 감소
var ordered = data
.AsParallel()
.AsOrdered() // 순서 보장 비용 발생
.Select(Transform)
.ToList();
// 작은 컬렉션(<1000개)은 순차 처리가 더 빠를 수 있음
var threshold = data.Count > 1000
? data.AsParallel().Select(Transform)
: data.Select(Transform);

TPL은 CPU 바운드 병렬 처리의 핵심 도구입니다. Parallel.For로 반복 작업을 분산하고, Dataflow로 생산자-소비자 파이프라인을 구성하며, PLINQ로 컬렉션 변환을 병렬화하세요. 공유 상태에는 반드시 Interlocked나 스레드 로컬 합산 패턴을 사용하세요. .NET 9의 Task.WhenEach로 완료 순서대로 결과를 처리하고, 대용량 처리에는 Chunk 기반 배치 패턴을 활용하세요.