In this blog post I want to show you by sample how to use asynchronous workflows, parallel LINQ extensions and message-processing agents in F#. Let’s create a simple tool which should be able to download stock quotes from Google Finance

Google Finance

Objectives

  • It should work asynchronously
    The executing thread shouldn’t be blocked during download so we could run this operation even on a UI thread without locking it
  • Stock quotes should be downloaded in parallel
    But no more than 5 download streams at any particular moment in time
  • Downloaded quotes should be parsed in parallel
    In order to utilize the available hardware resources efficiently
  • Log actions & exceptions
    In this example we will log events by using printfn

Implementation

open System
open System.Net
open System.Web
open Microsoft.FSharp.Collections

/// Stock price quote record
type Quote = {
    Symbol : string
    Date   : DateTime
    Open   : float
    High   : float
    Low    : float
    Close  : float
    Volume : int64
}

/// Stock prices download manager
type QuotesDownloader() =

    // en-US CultureInfo used for data parsing
    let ci = System.Globalization.CultureInfo.GetCultureInfo("en-US")

    /// Download quotes for a given symbol
    member this.downloadQuotesAsync (symbol : string) = async {
        try
            let tickerUri = Uri(@"http://www.google.com/finance/historical?q=" + HttpUtility.UrlEncode(symbol) + "&output=csv")
            use webClient = new WebClient()
            let! csv = webClient.AsyncDownloadString(tickerUri)
            try
                let quotes =
                    csv.Split([|'\n'|], StringSplitOptions.RemoveEmptyEntries)
                    |> Seq.skip 1   // skip header
                    |> PSeq.map (fun line   -> line.Split [|','|])
                    |> PSeq.map (fun values ->
                    {
                        Symbol  = symbol
                        Date    = DateTime.Parse (values.[0], ci)
                        Open    = Double.Parse   (values.[1], ci)
                        High    = Double.Parse   (values.[2], ci)
                        Low     = Double.Parse   (values.[3], ci)
                        Close   = Double.Parse   (values.[4], ci)
                        Volume  = Int64.Parse    (values.[5], ci)
                    })
                printfn "Downloaded a list of quotes for %s symbol." symbol
                return Some(quotes)
            with
                ex ->
                    printfn "Failed to parse quotes for %s symbol. %s" symbol ex.Message
                    return None
        with
            ex ->
                printfn "Failed to download quotes for %s symbol. %s" symbol ex.Message
                return None }

    member this.worker n f =
        MailboxProcessor<string>.Start(fun inbox ->
            let workers =
                Array.init n (fun i -> MailboxProcessor<string>.Start(f))
            let rec loop i = async {
                let! msg = inbox.Receive()
                workers.[i].Post(msg)
                return! loop ((i + 1) % n)
            }
            loop 0
        )

    member this.agent =
        this.worker 5 (fun inbox ->
            let rec loop() = async {
                let! msg = inbox.Receive()
                let! quote = this.downloadQuotesAsync msg
                return! loop()
            }
            loop()
        )

    /// Download quotes for a list of simbos
    member this.downloadQuotes (symbols : string list) =
        for symbol in symbols do
            this.agent.Post symbol

// Let's execute it
let quotesDownloader = QuotesDownloader()
quotesDownloader.downloadQuotes [ "MSFT"; "GOOG";
    "AAPL"; "T"; "CVX"; "XOM"; "GE"; "HBC"; "IBM"; "JNJ"; "JPM";
    "PG"; "WMT"; "MMM"; "ABT"; "ACN"; "MO"; "AMZN"; "AXP"; "APA";
    "BLK"; "CAT"; "CVX"; "CSCO"; "CVS"; "DELL"; "DVN"; "FDX" ]

See also

Download Source Codes