C# Task Parallel Library 심화 — 병렬 처리 패턴
Task Parallel Library(TPL)는 .NET에서 CPU 바운드 병렬 작업을 위한 핵심 라이브러리입니다. async/await가 I/O 바운드 비동기 작업을 담당한다면, TPL은 CPU 코어를 최대한 활용하는 데이터 병렬 처리에 특화되어 있습니다.
1. Parallel.For / Parallel.ForEach
섹션 제목: “1. Parallel.For / Parallel.ForEach”using System.Threading.Tasks;
// 기본 병렬 루프Parallel.For(0, 1000, i =>{ HeavyComputation(i);});
// 병렬화 옵션 제어var options = new ParallelOptions{ MaxDegreeOfParallelism = Environment.ProcessorCount / 2, CancellationToken = cts.Token};
Parallel.ForEach(dataList, options, item =>{ Process(item);});2. Parallel.ForEachAsync (.NET 6+)
섹션 제목: “2. Parallel.ForEachAsync (.NET 6+)”I/O 바운드 작업을 병렬로 실행할 때 사용합니다.
var urls = new[] { "https://a.com", "https://b.com", "https://c.com" };
await Parallel.ForEachAsync(urls, new ParallelOptions { MaxDegreeOfParallelism = 4 }, async (url, ct) => { var response = await httpClient.GetAsync(url, ct); await ProcessResponseAsync(response); });3. PLINQ — 병렬 LINQ
섹션 제목: “3. PLINQ — 병렬 LINQ”using System.Linq;
// 순서 무관 병렬 처리 (가장 빠름)var results = data .AsParallel() .Where(x => x.IsValid) .Select(x => Transform(x)) .ToList();
// 순서 보존 병렬 처리var ordered = data .AsParallel() .AsOrdered() .Select(x => Transform(x)) .ToList();
// 병렬화 정도 제어var limited = data .AsParallel() .WithDegreeOfParallelism(4) .Select(x => HeavyWork(x)) .ToList();4. TPL Dataflow — 파이프라인 처리
섹션 제목: “4. TPL Dataflow — 파이프라인 처리”using System.Threading.Tasks.Dataflow;
// 변환 블록 정의var downloader = new TransformBlock<string, byte[]>( async url => await httpClient.GetByteArrayAsync(url), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
var parser = new TransformBlock<byte[], ParsedData>( data => Parse(data), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });
var saver = new ActionBlock<ParsedData>( async parsed => await db.SaveAsync(parsed));
// 파이프라인 연결downloader.LinkTo(parser, new DataflowLinkOptions { PropagateCompletion = true });parser.LinkTo(saver, new DataflowLinkOptions { PropagateCompletion = true });
// 데이터 투입foreach (var url in urls) await downloader.SendAsync(url);
downloader.Complete();await saver.Completion;5. Task.WhenAll / Task.WhenAny
섹션 제목: “5. Task.WhenAll / Task.WhenAny”// 모든 작업 완료 대기var tasks = items.Select(item => ProcessAsync(item));var results = await Task.WhenAll(tasks);
// 가장 빠른 것 하나만 대기var fastest = await Task.WhenAny( FetchFromPrimaryAsync(), FetchFromFallbackAsync());
var data = await fastest; // 완료된 Task의 결과 추출6. 공유 상태 보호
섹션 제목: “6. 공유 상태 보호”// 잘못된 예 — 레이스 컨디션int count = 0;Parallel.For(0, 1000, _ => count++); // 결과 불정확
// 올바른 예 1 — Interlockedint count = 0;Parallel.For(0, 1000, _ => Interlocked.Increment(ref count));
// 올바른 예 2 — 로컬 합계 후 병합long total = 0;Parallel.For<long>(0, 1000, () => 0L, (i, _, local) => local + Compute(i), local => Interlocked.Add(ref total, local));7. CPU 바운드 vs I/O 바운드 선택 기준
섹션 제목: “7. CPU 바운드 vs I/O 바운드 선택 기준”| 상황 | 권장 |
|---|---|
| CPU 집약적 계산 (이미지, 암호화) | Parallel.For, PLINQ |
| 많은 I/O 동시 실행 | Parallel.ForEachAsync |
| 단계적 데이터 처리 | TPL Dataflow |
| 단순 비동기 I/O | async/await + Task.WhenAll |
8. 예외 처리 — AggregateException
섹션 제목: “8. 예외 처리 — AggregateException”try{ Parallel.For(0, 100, i => { if (i == 42) throw new InvalidOperationException($"항목 {i} 처리 실패"); Process(i); });}catch (AggregateException ae){ // 병렬 루프의 모든 예외가 AggregateException으로 묶임 foreach (var inner in ae.InnerExceptions) Console.WriteLine($"오류: {inner.Message}");
// 특정 예외만 처리 ae.Handle(ex => ex is InvalidOperationException);}
// Task.WhenAll도 동일try{ await Task.WhenAll(tasks);}catch{ // 첫 번째 예외만 다시 던짐 var errors = tasks .Where(t => t.IsFaulted) .Select(t => t.Exception!.InnerException!) .ToList();}9. Parallel.ForAsync (.NET 8 확장 패턴)
섹션 제목: “9. Parallel.ForAsync (.NET 8 확장 패턴)”// .NET 8+: Parallel.ForAsync — 비동기 인덱스 루프await Parallel.ForAsync(0, items.Length, new ParallelOptions { MaxDegreeOfParallelism = 4 }, async (i, ct) => { var result = await ProcessItemAsync(items[i], ct); results[i] = result; });10. Task.WhenEach (.NET 9)
섹션 제목: “10. Task.WhenEach (.NET 9)”// .NET 9: 완료 순서대로 결과 처리var tasks = urls.Select(url => FetchAsync(url)).ToList();
await foreach (var completed in Task.WhenEach(tasks)){ try { var result = await completed; Console.WriteLine($"완료: {result.Url}"); } catch (Exception ex) { Console.WriteLine($"실패: {ex.Message}"); }}11. 배치 처리 패턴
섹션 제목: “11. 배치 처리 패턴”대용량 항목을 청크 단위로 병렬 처리합니다.
using System.Buffers;
async Task ProcessInBatchesAsync<T>( IReadOnlyList<T> items, int batchSize, Func<IEnumerable<T>, CancellationToken, Task> processor, CancellationToken ct = default){ var batches = items .Chunk(batchSize) // .NET 6+: IEnumerable<T[]> .ToList();
await Parallel.ForEachAsync(batches, new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount, CancellationToken = ct }, async (batch, token) => await processor(batch, token));}
// 사용await ProcessInBatchesAsync( allOrders, batchSize: 100, processor: async (batch, ct) => { await db.BulkInsertAsync(batch, ct); });12. PLINQ 주의사항
섹션 제목: “12. PLINQ 주의사항”// ⚠️ PLINQ는 CPU 바운드에만 유효 — I/O 바운드에 사용하지 마세요// ❌ 잘못된 사용: I/O 바운드 작업var results = urls .AsParallel() .Select(url => httpClient.GetStringAsync(url).Result) // 데드락 위험 .ToList();
// ✅ 올바른 I/O 병렬: Parallel.ForEachAsync 사용await Parallel.ForEachAsync(urls, async (url, ct) =>{ var content = await httpClient.GetStringAsync(url); ProcessContent(content);});
// ⚠️ AsOrdered()는 병렬 이득 감소var ordered = data .AsParallel() .AsOrdered() // 순서 보장 비용 발생 .Select(Transform) .ToList();
// 작은 컬렉션(<1000개)은 순차 처리가 더 빠를 수 있음var threshold = data.Count > 1000 ? data.AsParallel().Select(Transform) : data.Select(Transform);TPL은 CPU 바운드 병렬 처리의 핵심 도구입니다. Parallel.For로 반복 작업을 분산하고, Dataflow로 생산자-소비자 파이프라인을 구성하며, PLINQ로 컬렉션 변환을 병렬화하세요. 공유 상태에는 반드시 Interlocked나 스레드 로컬 합산 패턴을 사용하세요. .NET 9의 Task.WhenEach로 완료 순서대로 결과를 처리하고, 대용량 처리에는 Chunk 기반 배치 패턴을 활용하세요.