در گذشته، توسعهدهندگان داتنت برای پیادهسازی این الگوی ساختاری به ابزارهایی مانند BlockingCollection یا ترکیب ConcurrentQueue با SemaphoreSlim متکی بودند. اگرچه این ابزارها کارآمد هستند، اما در سناریوهای مبتنی بر async/await عملکرد بهینهای ندارند و میتوانند باعث انسداد نخها (Thread Blocking) و اتلاف منابع سیستم شوند.
برای رفع این مشکل، مایکروسافت در داتنت کور ۲.۱ فضای نام System.Threading.Channels را معرفی کرد. کلاس Channel یک ساختار دادهای بسیار کارآمد، Thread-safe و کاملاً سازگار با معماری Async است که برای پیادهسازی خطوط لوله داده (Data Pipelines) با کارایی بالا (High-Performance) طراحی شده است.
در این مقاله تخصصی، به بررسی عمیق ساختار، نحوه کارکرد، انواع کانالها، استراتژیهای مدیریت ظرفیت و نکات پیشرفته عملکردی Channel میپردازیم.
پیش از ورود به جزئیات فنی، باید درک کنیم چرا ابزارهای قدیمی برای کارهای مدرن کافی نیستند. بیا نگاهی به تفاوت BlockingCollection و Channel بیندازیم:
کلاس Channel یک کلاس انتزاعی (Abstract) است که به عنوان یک کارخانه (Factory) برای ایجاد دو بخش اصلی عمل میکند:
ChannelWriter: وظیفه نوشتن یا تولید داده در کانال را بر عهده دارد.
ChannelReader: وظیفه خواندن یا مصرف داده از کانال را بر عهده دارد.
این جداسازی وظایف (Separation of Concerns) به شما اجازه میدهد که اصل کمترین دسترسی (Principle of Least Privilege) را رعایت کنید. به عنوان مثال، میتوانید بخش تولیدکننده سیستم را طوری طراحی کنید که فقط به ChannelWriter دسترسی داشته باشد و نتواند دادههای درون صف را بخواند یا دستکاری کند.
// ساخت یک کانال ساده
Channel channel = Channel.CreateUnbounded();
ChannelWriter writer = channel.Writer;
ChannelReader reader = channel.Reader;
مایکروسافت دو نوع پیادهسازی اصلی برای کانالها ارائه داده است که بسته به نیاز سیستم باید یکی از آنها را انتخاب کنید:
الف) کانالهای نامحدود (Unbounded Channels)
این کانالها هیچ محدودیتی در تعداد آیتمهای ذخیرهشده در صف ندارند. تا زمانی که حافظه رم سیستم اجازه دهد، تولیدکنندهها میتوانند در آن بنویسند.
روش ساخت: Channel.CreateUnbounded()
کاربرد: زمانی که سرعت مصرفکننده همیشه بیشتر یا مساوی تولیدکننده است و خطر پر شدن حافظه وجود ندارد.
ب) کانالهای محدود (Bounded Channels)
این کانالها ظرفیت مشخصی دارند. وقتی تعداد آیتمهای درون صف به سقف تعیینشده برسد، کانال واکنش مشخصی نشان میدهد (مثلاً تولیدکننده را منتظر میگذارد یا دادههای قدیمی را حذف میکند).
روش ساخت: Channel.CreateBounded(capacity)
کاربرد: در سیستمهای واقعی برای جلوگیری از پدیده OutOfMemoryException و مدیریت جریان دادهها (Backpressure).
وقتی ظرفیت یک BoundedChannel پر میشود، ما باید تعیین کنیم که سیستم چه رفتاری نشان دهد. این کار با استفاده از انوم BoundedChannelFullMode تنظیم میشود:
| استراتژی (FullMode) | رفتار سیستم |
| Wait | پیشفرض. تولیدکننده به صورت ناهمگام منتظر میماند تا جا خالی شود. |
| DropNewest | آیتم جدید جایگزین آخرین آیتم صف میشود (آیتم در حال ورود حذف میشود). |
| DropOldest | قدیمیترین آیتم موجود در صف حذف میشود تا جا برای آیتم جدید باز شود. |
| DropWrite | آیتم جدید بدون اینکه وارد صف شود، نادیده گرفته و حذف میشود. |
var options = new BoundedChannelOptions(capacity: 100)
{
FullMode = BoundedChannelFullMode.DropOldest
};
Channel orderChannel = Channel.CreateBounded(options);
بیایید یک مثال واقعی را بررسی کنیم. فرض کن یک سیستم پردازش سفارشات فروشگاهی داریم. یک متد وظیفه دریافت سفارشات از کاربران را دارد (Producer) و یک متد در پسزمینه سفارشات را پردازش و در دیتابیس ثبت میکند (Consumer).
using System;
using System.Threading.Channels;
using System.Threading.Tasks;
public class OrderProcessor
{
private readonly Channel _orderChannel;
public OrderProcessor()
{
// ایجاد یک کانال محدود با ظرفیت 10 سفارش
var options = new BoundedChannelOptions(10)
{
FullMode = BoundedChannelFullMode.Wait,
SingleWriter = false, // چندین متد میتوانند سفارش ثبت کنند
SingleReader = true // فقط یک Worker سفارشات را پردازش میکند
};
_orderChannel = Channel.CreateBounded(options);
}
// Producer: سفارش جدید را در کانال قرار میدهد
public async ValueTask PublishOrderAsync(string orderId)
{
// منتظر میماند تا فضا خالی شود، سپس مینویسد
await _orderChannel.Writer.WriteAsync(orderId);
Console.WriteLine($"[Producer] Order {orderId} added to queue.");
}
public void CompletePublishing()
{
// اعلام پایان کار تولیدکنندهها
_orderChannel.Writer.Complete();
}
// Consumer: سفارشات را از کانال خوانده و پردازش میکند
public async Task StartProcessingAsync()
{
// خواندن دادهها به صورت استریم ناهمگام تا زمانی که Channel.Complete صدا زده شود
await foreach (var orderId in _orderChannel.Reader.ReadAllAsync())
{
Console.WriteLine($"[Consumer] Processing order: {orderId}...");
await Task.Delay(1000); // شبیهسازی کار سنگین یا ذخیره در دیتابیس
Console.WriteLine($"[Consumer] Order {orderId} processed successfully.");
}
Console.WriteLine("[Consumer] All orders processed. Channel is closed.");
}
}
نحوه اجرای کد بالا در متد Main:
public static async Task Main(string[] args)
{
var processor = new OrderProcessor();
// شروع به کار مصرفکننده در پسزمینه
Task consumerTask = processor.StartProcessingAsync();
// تولید چند سفارش به صورت همزمان
Task producers = Task.Run(async () =>
{
for (int i = 1; i <= 15; i++)
{
await processor.PublishOrderAsync($"ORD-{i}");
await Task.Delay(200); // فاصله زمانی بین ورود سفارشها
}
processor.CompletePublishing(); // اتمام تولید داده
});
// انتظار برای اتمام کار هر دو بخش
await Task.WhenAll(producers, consumerTask);
}
یکی از دلایل اصلی محبوبیت Channel در کدهای Infrastructure داتنت (مانند SignalR و Kestrel)، گزینههای بهینهسازی آن است. با استفاده از ChannelOptions میتوانید رفتارهای داخلی کانال را برای سناریوهای خود قفلگذاری و بهینهسازی کنید:
تنظیم SingleWriter و SingleReader
اگر مطمئن هستید که در برنامه شما فقط یک نخ یا وظیفه (Task) در کانال مینویسد، مقدار SingleWriter را برابر true قرار دهید. به همین ترتیب برای SingleReader.
var options = new UnboundedChannelOptions
{
SingleWriter = true, // بهینهسازی داخلی برای سناریوهای Single-Producer
SingleReader = true // بهینهسازی داخلی برای سناریوهای Single-Consumer
};
نکته تخصصی: وقتی این مقادیر را true میکنید، کامپایلر و محیط رانتایم داتنت نیازی به قفلهای سنگین (Lock-free synchronization overhead) برای مدیریت همزمانی ندارند و از الگوریتمهای بسیار سریعتر تکنخی در پشت صحنه استفاده میکنند. این کار سرعت تراکنشهای کانال را به شدت بالا میبرد.
بستن درست کانال برای جلوگیری از ایجاد بنبست (Deadlock) در برنامه حیاتی است.
|
ویژگی |
ConcurrentQueue |
BlockingCollection |
Channel |
|
پشتیبانی از Async/Await |
خیر |
خیر (باعث Block میشود) |
بله (کاملاً ناهمگام) |
|
کنترل Backpressure |
خیر |
بله |
بله (بسیار پیشرفته) |
|
تخصیص حافظه (Allocation) |
متوسط |
بالا |
بسیار کم (Highly Optimized) |
|
پشتیبانی از IAsyncEnumerable |
خیر |
خیر |
بله (ReadAllAsync) |
کلاس Channel ابزاری استاندارد، مدرن و فوقالعاده سریع برای پیادهسازی خطوط لوله داده و الگوی تولیدکننده-مصرفکننده در داتنت است. این کلاس با حذف هزینههای سنگین انسداد نخها و بهینهسازی تخصیص حافظه، به شما اجازه میدهد برنامههایی بنویسید که به راحتی مقیاسپذیر (Scalable) میشوند.
به عنوان یک قانون کلی در مهندسی نرمافزار داتنت: برای هرگونه پردازش داده ناهمگام و صفبندی داخلی بین کامپوننتها در داتنت مدرن، Channel انتخاب اول و بیرقیب شماست.
0 نظر
هنوز نظری برای این مقاله ثبت نشده است.