Async WCF Today and Tomorrow – Blog – Stephen Cleary
juin
24
Posted by : 24 juin 2014
| On :http://www.kanazawa-net.ne.jp/~pmansato/parallel/parallel_taskfactory.htm
http://codereview.stackexchange.com/questions/44547/writing-highly-asynchronous-code
https://www.dropbox.com/s/f053j1pgw2qo9fz/AsynchronousProgramming.rar
http://chsakell.com/2014/01/11/asynchronous-programming-using-tasks/
http://slynetblog.blogspot.fr/2013/04/downloading-multiple-files-on-windows.html
Connection SQL Serveur
https://www.connectionstrings.com/sql-server-2008/
WCF Asynchronous
http://robbincremers.me/2011/12/31/wcf-asynchronous-client-proxy-and-asynchronous-service-operations/
http://stackoverflow.com/questions/8040002/how-to-make-a-wcf-rest-method-entirely-asynchronous-with-the-task-parallel-libra
using System; using System.Collections.Generic; using System.Data; using System.Data.SqlClient; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace AsynchronousProgramming { class Program { //const string connectionString = "Data source = localhost; Initial catalog = Chinook; Integrated security = SSPI;"; const string connectionString = "Data Source=ARTAUDHOME\\SQLEXPRESS;Initial Catalog=Chinook;Integrated Security=True;Connect Timeout=15;Encrypt=False;TrustServerCertificate=False"; static void Main(string[] args) { Thread.CurrentThread.Name = "MainThread"; #region This will block the thread... DataSet dsArtists = GetArtistsAsync().Result; foreach (DataRow row in dsArtists.Tables["Artists"].Rows) { foreach (DataColumn col in dsArtists.Tables[0].Columns) { Console.Write(row[col] + "\t"); } Console.WriteLine(); } #endregion #region This won't block the main thread //GetArtistsAsync().ContinueWith(task => //{ // DataSet dsArtists = task.Result; // foreach (DataRow row in dsArtists.Tables["Artists"].Rows) // { // foreach (DataColumn col in dsArtists.Tables[0].Columns) // { // Console.Write(row[col] + "\t"); // } // Console.WriteLine(); // } //}); #endregion #region This won't block the thread and catches exceptions //GetArtistsAsync().ContinueWith(task => //{ // DataSet dsArtists = task.Result; // foreach (DataRow row in dsArtists.Tables["Artists"].Rows) // { // foreach (DataColumn col in dsArtists.Tables[0].Columns) // { // Console.Write(row[col] + "\t"); // } // Console.WriteLine(); // } //}, TaskContinuationOptions.NotOnFaulted); //GetArtistsAsync().ContinueWith(task => //{ // Console.WriteLine(task.Exception.InnerException.Message); //}, TaskContinuationOptions.OnlyOnFaulted); #endregion #region Task Composition - Multithreading //var watch = Stopwatch.StartNew(); //Task<DataSet> artistsTask = GetArtistsAsync(); //Task<DataSet> albumsTask = GetAlbumsAsync(); //Task.Factory.ContinueWhenAll(new[] { artistsTask, albumsTask }, (tasks) => //{ // foreach (var task in tasks) // { // if (task.Status == TaskStatus.RanToCompletion) // { // DataSet ds = task.Result; // if (ds.Tables[0].TableName == "Artists") // { // foreach (DataRow row in ds.Tables["Artists"].Rows) // { // foreach (DataColumn col in ds.Tables[0].Columns) // { // Console.Write(row[col] + "\t"); // } // Console.WriteLine(); // } // } // else if (ds.Tables[0].TableName == "Albums") // { // foreach (DataRow row in ds.Tables["Albums"].Rows) // { // foreach (DataColumn col in ds.Tables[0].Columns) // { // Console.Write(row[col] + "\t"); // } // Console.WriteLine(); // } // } // } // else // { // Console.WriteLine("An error has occurred.."); // Console.WriteLine(task.Exception.InnerException.Message); // } // Console.WriteLine(); // Console.WriteLine("------------------------------------------------"); // Console.WriteLine(); // } // watch.Stop(); // Console.WriteLine("Time elapsed: " + watch.ElapsedMilliseconds + " milliseconds"); //}); #endregion #region Asynchronous - No multithreading //GetCustomersAsync().ContinueWith((task) => //{ // DataSet dsCustomers = task.Result; // foreach (DataRow row in dsCustomers.Tables["Customers"].Rows) // { // foreach (DataColumn col in dsCustomers.Tables[0].Columns) // { // Console.Write(row[col] + "\t"); // }az // Console.WriteLine(); // } //}); #endregion Console.WriteLine(); Console.WriteLine("Thread: " + Thread.CurrentThread.Name); Console.WriteLine("Press any key..."); Console.ReadKey(); } static Task<DataSet> GetArtistsAsync() { DataSet ds = new DataSet(); return Task<DataSet>.Factory.StartNew(() => { using (SqlConnection con = new SqlConnection(connectionString)) { string sqlSelect = @"WAITFOR DELAY '000:00:05' SELECT TOP 10 * FROM Artist"; SqlDataAdapter da = new SqlDataAdapter(sqlSelect, con); da.Fill(ds); ds.Tables[0].TableName = "Artists"; } Console.WriteLine("Thread: " + Thread.CurrentThread.Name); return ds; }); } static Task<DataSet> GetAlbumsAsync() { DataSet ds = new DataSet(); return Task<DataSet>.Factory.StartNew(() => { using (SqlConnection con = new SqlConnection(connectionString)) { string sqlSelect = @"WAITFOR DELAY '000:00:05' SELECT TOP 10 * FROM Album"; SqlDataAdapter da = new SqlDataAdapter(sqlSelect, con); da.Fill(ds); ds.Tables[0].TableName = "Albums"; } Console.WriteLine("Thread: " + Thread.CurrentThread.Name); return ds; }); } static Task<DataSet> GetCustomersAsync() { var tcs = new TaskCompletionSource<DataSet>(); DataSet ds = new DataSet(); using (SqlConnection con = new SqlConnection(connectionString)) { string sqlSelect = @"WAITFOR DELAY '000:00:05' SELECT TOP 10 * FROM Customer"; SqlDataAdapter da = new SqlDataAdapter(sqlSelect, con); da.Fill(ds); ds.Tables[0].TableName = "Customers"; } Console.WriteLine("Thread in GetCustomersAsync: " + Thread.CurrentThread.Name); tcs.SetResult(ds); return tcs.Task; } } }
juin
16
Posted by : 16 juin 2014
| On :sample code
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Threading; using System.Collections.Concurrent; namespace ConsoleApplication2 { class Program { static void Main(string[] args) { Action<Action> mesure = (body) => { var startTime = DateTime.Now; body(); Console.WriteLine("{0} {1}", DateTime.Now - startTime, Thread.CurrentThread.ManagedThreadId); }; Action calJob = () => { for (int i = 0; i < 350000000; i++); }; Action ioJob = () => { Thread.Sleep(1000); }; // TPL on top of thread /* mesure(() => { var tasks = Enumerable.Range(1,10) .Select(_ => Task.Factory.StartNew(() => mesure(ioJob))) .ToArray(); Task.WaitAll(tasks); // Start 2 task in background });*/ //Parallel.For(1, 10, _ => { mesure(ioJob); }); //PLINQ on top TPL //use of multiple core //Enumerable.Range(1,10).ToList().ForEach(_=>mesure(ioJob)); //no sort order //Enumerable.Range(1, 10).AsParallel().WithDegreeOfParallelism(10).ForAll(_ => mesure(ioJob)); //ThreadPool.SetMinThreads(5, 5); // ParallelEnumerable.Range(1, 10).WithDegreeOfParallelism(10).ForAll(_ => mesure(calJob)); //producer Xml reader => queue => consumer //var queue = new Queue<int>(); var queue = new BlockingCollection<int>(100);// concurrent Queue var producers = Enumerable.Range(1, 10).Select(_ => Task.Factory.StartNew(() => { Enumerable.Range(1, 100) .ToList() .ForEach(i => { queue.Add(i); Thread.Sleep(100); }); })).ToArray(); var consumers = Enumerable.Range(1, 2).Select(_ => Task.Factory.StartNew(() => { foreach(var item in queue.GetConsumingEnumerable()) //get next element and remove from the queue { Console.WriteLine(item); }; })).ToArray(); Task.WaitAll(producers); // run all producers queue.CompleteAdding(); // when all producers have finished so that it will notify to the consumers when the queue is dequeue Task.WaitAll(consumers); Console.ReadKey(); } } }
juin
09
Posted by : 9 juin 2014
| On :
http://www.symbolsource.org/Public/Metadata/NuGet/Project/SignalR.Client/0.3.3/Release/.NETFramework,Version%3Dv4.0/SignalR.Client/SignalR.Client/SignalR/TaskAsyncHelper.cs?ImageName=SignalR.Client
http://dotnetinside.com/en/type/SignalR.Hosting.AspNet/TaskAsyncHelper/0.5.1.10822
Linq grouping
http://weblogs.asp.net/dixin/understanding-linq-to-objects-3-query-methods
Nice Helper code
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
namespace SignalR {
internal static class TaskAsyncHelper {
private static Task MakeEmpty() {
return FromResult<object>(null);
}
public static Task Empty {
get {
// we have to return a new one every time, other wise the task will be disposed
return MakeEmpty();
}
}
public static Task Catch(this Task task) {
return task.ContinueWith(t => {
if (t != null && t.IsFaulted) {
var ex = t.Exception;
Trace.TraceError("SignalR exception thrown by Task: {0}", ex);
}
return t;
}).Unwrap();
}
public static Task<T> Catch<T>(this Task<T> task) {
return task.ContinueWith(t => {
if (t != null && t.IsFaulted) {
var ex = t.Exception;
Trace.TraceError("SignalR exception thrown by Task: {0}", ex);
}
return t;
})
.Unwrap();
}
public static Task Success(this Task task, Action<Task> successor) {
return task.ContinueWith(_ => {
if (task.IsCanceled || task.IsFaulted) {
return task;
}
return Task.Factory.StartNew(() => successor(task));
}).Unwrap();
}
public static Task Success<TResult>(this Task<TResult> task, Action<Task<TResult>> successor) {
return task.ContinueWith(_ => {
if (task.IsCanceled || task.IsFaulted) {
return task;
}
return Task.Factory.StartNew(() => successor(task));
}).Unwrap();
}
public static Task<TResult> Success<TResult>(this Task task, Func<Task, TResult> successor) {
return task.ContinueWith(_ => {
if (task.IsFaulted) {
return FromError<TResult>(task.Exception);
}
if (task.IsCanceled) {
return Cancelled<TResult>();
}
return Task.Factory.StartNew(() => successor(task));
}).Unwrap();
}
public static Task<TResult> Success<T, TResult>(this Task<T> task, Func<Task<T>, TResult> successor) {
return task.ContinueWith(_ => {
if (task.IsFaulted) {
return FromError<TResult>(task.Exception);
}
if (task.IsCanceled) {
return Cancelled<TResult>();
}
return Task.Factory.StartNew(() => successor(task));
}).Unwrap();
}
public static Task AllSucceeded(this Task[] tasks, Action continuation) {
return AllSucceeded(tasks, _ => continuation());
}
public static Task AllSucceeded(this Task[] tasks, Action<Task[]> continuation) {
return Task.Factory.ContinueWhenAll(tasks, _ => {
var cancelledTask = tasks.FirstOrDefault(task => task.IsCanceled);
if (cancelledTask != null)
throw new TaskCanceledException();
var allExceptions =
tasks.Where(task => task.IsFaulted).SelectMany(task => task.Exception.InnerExceptions).ToList();
if (allExceptions.Count > 0) {
throw new AggregateException(allExceptions);
}
return Task.Factory.StartNew(() => continuation(tasks));
}).Unwrap();
}
public static Task<T> AllSucceeded<T>(this Task[] tasks, Func<T> continuation) {
return Task.Factory.ContinueWhenAll(tasks, _ => {
var cancelledTask = tasks.FirstOrDefault(task => task.IsCanceled);
if (cancelledTask != null)
throw new TaskCanceledException();
var allExceptions =
tasks.Where(task => task.IsFaulted).SelectMany(task => task.Exception.InnerExceptions).ToList();
if (allExceptions.Count > 0) {
throw new AggregateException(allExceptions);
}
return Task.Factory.StartNew(continuation);
}).Unwrap();
}
public static Task<T> FromResult<T>(T value) {
var tcs = new TaskCompletionSource<T>();
tcs.SetResult(value);
return tcs.Task;
}
private static Task<T> FromError<T>(Exception e) {
var tcs = new TaskCompletionSource<T>();
tcs.SetException(e);
return tcs.Task;
}
private static Task<T> Cancelled<T>() {
var tcs = new TaskCompletionSource<T>();
tcs.SetCanceled();
return tcs.Task;
}
}
}
Resources
http://msdn.microsoft.com/en-us/library/ff963553.aspx
http://www.microsoft.com/en-us/download/confirmation.aspx?id=19222
Article
http://www.albahari.com/nutshell/cs4ch22.aspx
http://dotnetcodr.com/2014/03/18/continuation-tasks-in-net-tpl-many-tasks-continued-by-a-single-task/
Stackoverflow
http://stackoverflow.com/questions/15938254/task-results-into-a-single-list