[ Shadowed @ 05.01.2015. 14:25 ] @
Postavka slicna kao u temi http://www.elitesecurity.org/t481125 ali tema druga (donekle slicna).

Code (csharp):

class Program
  {
    private static event Action<string> GotLine;
 
    static void Main(string[] args)
    {
      Thread t = new Thread(WatchForData);
      t.Start();
      Console.ReadLine();
    }

    static void WatchForData()
    {
      WebRequest request = WebRequest.Create("http://www.elitesecurity.org");
      HttpWebResponse response = (HttpWebResponse)request.GetResponse();
      Stream dataStream = response.GetResponseStream();
      StreamReader reader = new StreamReader(dataStream);
      while (!reader.EndOfStream)
      {
        string line = reader.ReadLine();
        GotLine(line);
      }
    }
  }
 


E sad, u ovom kodu, event handler-i ce blokirati thread u kojem se izvrsava WatchForData. Ideja je da se eventi okidaju asinhrono. Sad, ja znam vise nacina da se to uradi (svako okidanje u novom thread-u, queue poziva i poseban thread koji ih izvrsava itd.), ali me zanima sta biste vi preporucili.

Meni se najvise svidja ideja da postoji samo jedan dodatni thread u okviru kojeg bi se izvrsavali svi eventi.

[Ovu poruku je menjao Shadowed dana 05.01.2015. u 15:50 GMT+1]
[ mmix @ 05.01.2015. 14:38 ] @
koristi await/async, cisto da se ne maltretiras. postoji GetResponseAsync koji vraca awaiable, i problem resen

Posto nema sanse da web request bude odmah dostupan, otici ce u I/O completion pool, i kad se vrati iz tog threada ces zvati event. Ako bas hoces da oslobodis completion port tog posla, mozes da pokrenes novi task sa stream readerom ciji ce jedini posao biti da ispoziva handler...

[ mmix @ 05.01.2015. 14:47 ] @
Code (csharp):

    class Program
    {
        private static event Action<string> GotLine;

        static void Main(string[] args)
        {
            GotLine = s => Console.WriteLine(s);
            WatchForData();
            Console.ReadLine();
        }

        static async void WatchForData()
        {
            WebRequest request = WebRequest.Create("http://www.elitesecurity.org");
            HttpWebResponse response = (HttpWebResponse) await request.GetResponseAsync();
            Stream dataStream = response.GetResponseStream();
            StreamReader reader = new StreamReader(dataStream);
            while (!reader.EndOfStream)
            {
                string line = reader.ReadLine();
                GotLine(line);
            }
        }
    }
 
[ mmix @ 05.01.2015. 15:01 ] @
a ovo mozes npr ako hoces da napravis closure u izdvojeni thread i oslobodis thread pool Mozes naravno i sa Task.Run(), sto ce prabaciti posao u drugu pool kaetgroiju (iz IO u work)

Code (csharp):

        static async void WatchForData()
        {
            WebRequest request = WebRequest.Create("http://www.elitesecurity.org");
            HttpWebResponse response = (HttpWebResponse) await request.GetResponseAsync();
            Stream dataStream = response.GetResponseStream();
            StreamReader reader = new StreamReader(dataStream);

            new Thread(() =>
            {
                while (!reader.EndOfStream)
                {
                    string line = reader.ReadLine();
                    GotLine(line);
                }
            }).Start();
        }
 
[ mmix @ 05.01.2015. 15:21 ] @
Ovo naravno sve dok se igramo u konzoli U WPF apliakciji npr, default SyncContext je WPF dispatcher, sto znaci da ce kod iza await da bude ubacen u dispatcher queue sa Normal prioritetom i izvrsice se u main UI threadu, u tom slucaju ili moras da koristis new Thread ili Task.Run ili da u awitable prvo pozoves ConfigureAwaite(false) sto ce odvojiti continuation u state masini od trenutnog konteksta.

Code (csharp):

HttpWebResponse response = (HttpWebResponse) await request.GetResponseAsync().ConfigureAwait(false)
 
[ Shadowed @ 05.01.2015. 15:31 ] @
Ovo zapravo moze biti korisno. Ako koristim u WPF aplikaciji mogu da stavim da mi bude konfigurabilno pa da po volji ide na UI thread-u i da ne moram da radim provere i prebacivanje na njega.
[ dusans @ 05.01.2015. 15:58 ] @
U kom od ovih primera GotLine() ne blokira reader-a?

Ja sam prvi post shvatio kao problem implementacije publisher-subscriber pattern-a
u mutithreaded okruženju.
Dakle, publišer nešto petlja u svom thread-u i treba da okine event-e
subscriber-ima (u stilu fire-and-forget) i da nastavi normalno dalje sa radom
bez da čeka na samu obradu eventa u subscriberima (tj. da ne bude blokiran).

Znam da u wpf-u i winforms postoje mehanizmi preko kojih ovo može da se
dobije, npr. Dispatcher.BeginInvoke() ali me isto tako interesuje da li
postoji neki bolji/lepši mehanizam koji rešava ovaj problem nezavisno
od platforme.

[ Shadowed @ 05.01.2015. 16:50 ] @
Citat:
dusans:
Dakle, publišer nešto petlja u svom thread-u i treba da okine event-e
subscriber-ima (u stilu fire-and-forget) i da nastavi normalno dalje sa radom
bez da čeka na samu obradu eventa u subscriberima (tj. da ne bude blokiran).


Upravo. Web server vraca beskonacni fajl i salje po liniju s vremena na vreme (par puta u sekundi) i ja hocu da okinem event svaki put ali ne zelim da se nadgledanje stream-a blokira.
[ mmix @ 05.01.2015. 17:19 ] @
Pa, pazi, u osnovi drugi primer sa closure to resava. mozda ovaj primer nije dobar, ali generalno ako podignes Task.Run ili Thread za svaki event ne postoji garancija da ce biti pozvani u redosledu u kojem su stringovi stizali iz readera. Sto se tice tvog pitanja, ako ne koristis async/await koji je zapocet iz dispatcher threada, ne postoji drugi nacin sem BeginInvoke ili "prljave igre" sa kontekstima



Code (csharp):

    public partial class MainWindow : Window
    {
        public MainWindow()
        {
            InitializeComponent();
        }

        private static event Action<string> GotLine;

        private void Button_Click(object sender, RoutedEventArgs e)
        {
            GotLine = s => System.Diagnostics.Debug.WriteLine(s);  // bacamo u debug log
            WatchForData();   /// dispatcher thread
        }

        async void WatchForData()
        {
            var syncContext = SynchronizationContext.Current;  // vidi posle ;)
            // 1. ovde smo jos uvek u dispatcher threadu
            WebRequest request = WebRequest.Create("http://www.elitesecurity.org");
            // 2. jos smo u dispatcher threadu, sledeci poziv kreira state machine za getresponseasync i ulazi u await, posto task nije gotov prabcuje task u pool sa novim kontekstom i izlazi iz state masine (nazad u button handler)
            HttpWebResponse response = (HttpWebResponse)await request.GetResponseAsync().ConfigureAwait(false);
            // 3. u ovoj tacki vec nismo u dispatch threadu (zbog configure await), sad smo u thread-u 1, recimo. state masina nas je ubacila u ovu tacku
            // radimo nesto sto zelimo van UI thread-a
            Stream dataStream = response.GetResponseStream();
            var reader = new StreamReader(dataStream);
            string line = null;
            while (!reader.EndOfStream)
            {
                line = reader.ReadLine();
                GotLine(line);
            }
            // 4. prljavi trik ;)
            SynchronizationContext.SetSynchronizationContext(syncContext);
            await Task.Yield();
            // voila, ponovo smo u UI threadu ;) mozemo da prckamo po UI elementima, a sve vreme od tacke 2 UI thread je bio slobodan.
            Label1.Content = line;
            return;
        }

    }
 



[Ovu poruku je menjao mmix dana 05.01.2015. u 19:00 GMT+1]
[ mmix @ 05.01.2015. 17:36 ] @
Citat:
Shadowed:
Upravo. Web server vraca beskonacni fajl i salje po liniju s vremena na vreme (par puta u sekundi) i ja hocu da okinem event svaki put ali ne zelim da se nadgledanje stream-a blokira.


Onda si izabrao pogresan objekat za tu namenu. GetResponseAsync ne znaci pacijalno pozivanje cllbacka kako podaci dolaze, to je samo async wrapper oko blocking metode GetResponse. Sto znaci da se tvoj await nikad nece desiti dok se kompletan sadrzaj ne prenese (zatvori konekcija i slicno).


[ Shadowed @ 05.01.2015. 20:28 ] @
Pa, nisam ni koristio GetResponseAsync Ja sam dao primer koji blokira (koji treba resiti).
Evo jednog resenja:
Code (csharp):

class Program
  {
    private static event Action<string> GotLine;
 
    static void Main(string[] args)
    {
      Thread t = new Thread(WatchForData);
      t.Start();
      Console.ReadLine();
    }

    static void WatchForData()
    {
      WebRequest request = WebRequest.Create("http://www.elitesecurity.org");
      HttpWebResponse response = (HttpWebResponse)request.GetResponse();
      Stream dataStream = response.GetResponseStream();
      StreamReader reader = new StreamReader(dataStream);
      while (!reader.EndOfStream)
      {
        string line = reader.ReadLine();
        new Thread(() => GotLine(line)).Start();
      }
    }
  }


Nevolja sa ovim je sto kreira gomilu thread-ova. Kazem nevolja jer moze da bude problem a i ne mora.

Evo jos jednog:
Code (csharp):

class Program
     {
          private static event Action<string> GotLine;
          private static ConcurrentQueue<string> Lines = new ConcurrentQueue<string>();

          static void Main(string[] args)
          {
               GotLine += (s) => { Console.WriteLine(Thread.CurrentThread.ManagedThreadId); Thread.Sleep(1000); };
               new Thread(WatchForData).Start();
               new Thread(WatchLines).Start();
               Console.ReadLine();
          }

          static void WatchForData()
          {
               WebRequest request = WebRequest.Create("http://www.elitesecurity.org");
               HttpWebResponse response = (HttpWebResponse)request.GetResponse();
               Stream dataStream = response.GetResponseStream();
               StreamReader reader = new StreamReader(dataStream);
               while (!reader.EndOfStream)
               {
                    string line = reader.ReadLine();
                    Lines.Enqueue(line);
               }
          }

          static void WatchLines()
          {
               string line;
               while(true)
               {
                    if (Lines.Count > 0)
                    {
                         while(!Lines.TryDequeue(out line))
                         {
                              GotLine(line);
                         }
                    }
               }
          }
     }


Ovde imamo samo jedan dodatni thread. Nezgodna strana je sto ce ovaj zauzeti jedno jezgro CPU-a nepotrebno. Mada se moze dodati jedan else { Thread.Sleep(10); } ili neka slicna, dovoljno mala, vrednost. Zanemaricemo ove while(true), tu bi svakako isao neki flag za zaustavljanje, ali da ne komplikujem.

Mene cisto zanima ima li nekih drugih predloga ili poboljsanja za ova resenja.
[ mmix @ 05.01.2015. 21:05 ] @
Ok, nego generalno WebRequest objekat nema podrsku za parcijalno ucitavanje

A sto se tice resenja, prvo moras da kazes da li je poredak stringova vazan. Ako jeste onda moraju da se seriajlizuju, koje god resenje da izaberes.
Busy waiting u svakom slucaju nije dobro resenje. Umesto ConqurentQueue<T> koristi BlockingCollection<T>, njegov Take blokira thread ako je lista prazna.
Ako poredak nije vazan, to sto tebi stigne 2-3 linije u sekundi je marginalno za broj nagomilanih threadova.
[ Shadowed @ 05.01.2015. 22:16 ] @
Bitan je poredak. Sami podaci (bi trebalo da) imaju timestamp, ali bih hteo da imam i low-level evente za svaku liniju, bez obzira na njihov sadrzaj.
[ 30yo @ 06.01.2015. 00:38 ] @
mislim da bi ti bilo bolje da naucis reactive extensions nego da sam pises nesto low level
[ mmix @ 06.01.2015. 07:13 ] @
Citat:
30yo: mislim da bi ti bilo bolje da naucis reactive extensions nego da sam pises nesto low level


To mozda i nije tako losa ideja. Vec neko vreme mi se to vuce na TODO listi, na papiru definitivno izgleda zanimljivo. Kakva su tvoja iskustva sa Rx?

Btw, zanimljivo mi je kako je upotreba .NET biblioteke postala "low-level" Dostizemo nove dimenzije apstrakcije
[ 30yo @ 06.01.2015. 15:53 ] @
tesko da ja mogu neku analizu rx-a da napisem, samo po malo ucim

koliko sam shadow-a razumeo ovo je neka simulacija onoga sto mu treba

Code:
using System.Reactive.Linq;
using System.Windows.Threading;
...
WebRequest request = WebRequest.Create("http://www.elitesecurity.org");
HttpWebResponse response = (HttpWebResponse)request.GetResponse();
Stream dataStream = response.GetResponseStream();
StreamReader reader = new StreamReader(dataStream);
Observable.Interval(TimeSpan.FromSeconds(0.25)).ObserveOnDispatcher(DispatcherPriority.Background).Subscribe(i => listBox.Items.Add(reader.ReadLine()));


potrebni su rx main i rx wpf helpers nuget paketi

[Ovu poruku je menjao 30yo dana 06.01.2015. u 18:01 GMT+1]

[Ovu poruku je menjao 30yo dana 06.01.2015. u 18:12 GMT+1]
[ mmix @ 06.01.2015. 17:01 ] @
Da znas da hocu Hvala

Samo, ovo je copyrighted knjiga i nije dzabe i kacenje je protiv pravilnika, tako da cu obrisati tu poruku cim odem da skuvam sebi kafu...

Koga interesuje prava knjiga, evo link: Concurrency in C# Cookbook