Inside F#

Brian's thoughts on F# and .NET

An RSS Dashboard in F#, part three (RSS feeds)

Posted by Brian on February 6, 2010

Previous posts in this series:

Last time I covered IObservables and we created a useful ObservableSource class.  Today I’ll cover the next technology piece of the app: reading RSS feeds.  I’ll discuss the design considerations regarding how to poll feed for updates and publish feed items as IObservables, and walk through one implementation.

RSS feeds

Many web sites with continually updating content will post RSS feeds (or Atom feeds).  These syndication feeds are just XML documents that are easy to read/parse and contain a list of recent items.  The .Net SyndicationFeed class makes it trivial to load a feed and access its data through a convenient object model.  In terms of the noteworthy data I need for the dashboard application, each SyndicationItem in the feed may contain a Title (e.g. the question title of a StackOverflow question), an Id (that uniquely identifies this question), a LastUpdatedTime (which says when the data of this item was most recently modified), and some Links (e.g. to the Uri of the question itself). This provides the key data to be displayed in the UI, as well as what items are new (never-before seen Ids) or updated (fresher LastUpdatedTime).

Given the Uri (web address) of a feed, it’s trivial to get a feed object:

let feed = SyndicationFeed.Load(XmlReader.Create(uri))

so the only challenge comes from how to present this information as an IObservable.

Some design considerations for presenting feeds as IObservables

When designing a reusable component to present feeds as IObservables, a number of considerations come into play.  For starters, syndication feeds are a pull technology, whereas IObservables are a push technology, so polling (pumping) must be used.  Various feed readers may poll the feed daily or hourly; a general-purpose component should make it easy to configure the frequency that the feed is rechecked.

Polling creates a bursty event channel.  Suppose for example we are polling hourly.  If there are many new items in the past hour, this means the IObservable will go for an hour with no activity, followed by a flurry of OnNext calls, followed by an hour of silence, then another flurry, … the events will be very bursty.  When you start receiving these events, it can be useful to know when the burst will end (perhaps you want to collect all the data and then update the UI once, rather than have the UI flicker as it updates each item individually).  Alternatively, some clients may prefer to have the items “batched” and just get one (relatively predictable) event each polling cycle that contains a list of all the items.

Also, some clients may only be interested in finding about new items (e.g. new questions posted to a specific StackOverflow tag), whereas other client may be interested in updates to existing items as well (e.g. a new answer posted to an existing question).

Finally, since the feeds are probably being read over the internet, there are issues of reliability.  Networks and web sites periodically go down.  Some clients may want notification that the feed could not be read (e.g. to present that information or diagnostics in a UI), whereas others may be happy to just ignore a failure to poll (treating it as though there were no new data this polling cycle) and just retry again after the next polling time period passes.

An implementation of a feed source

In light of the design considerations of the previous section, I elected to create two components.  First there is a FeedSource class that provides IObservables for various FeedEvents in a very granular and flexible fashion.  And then there is a transformer that batches up the granular/verbose events from that source into an IObservable that’s more tailored to my dashboard application.

(The design space here is large, and I don’t know that this design is optimal.  But it does create a nice opportunity to demonstrate using a ‘transform’ function that turns one IObservable into another, as I’ll show shortly.)

To start things off, there are three kinds of events that I want to surface that can happen over time.  I define a discriminated union:

type FeedEvent =
    | Item of SyndicationItem
    | EndPoll
    | RetryError of exn

“Item” is an event that carries data about a new or updated item in the feed.  “EndPoll” is simply a notification that this polling cycle has ended, and so there will be no more items coming until the next time it’s time to download the feed again.  The “RetryError” can be sent when there was an error downloading or parsing the feed; it carries data about the exception that happened (recall that “exn” is just an F# type alias for System.Exception).  This data type enables me to create an IObservable<FeedEvent> that deals with a number of the considerations of the previous subsection.

The next thing to do is to implement the actual machinery to poll the feed and publish these events.  Since some clients may be interested in only new items whereas others may also be interested in updates, I decided to expose two different IObservables on the FeedSource class.  The key data that must be maintained here is which feed items have already been seen and when they were last updated.  For this I use a simple Map<string,DateTimeOffset>, where the (string) key represents the Id of an item, and the (DateTimeOffset) value represents the LastUpdated time.  Thus subsequent reads of the feed can look through the dictionary to determine if an item is new (the key is not in the Map), or if an existing item has been updated (the key is already in the Map, but the time value is newer).

So here’s the class broken up into a few chunks, with explanation of each portion.  To start:

type FeedSource(uri : string, 
                pollingFrequency : TimeSpan, 
                itemState : IDictionary<string,DateTimeOffset>  // item.Id, item.LastUpdatedTime
                ) =
    let mutable seenItems = 
        let mutable m = Map.empty
        for KeyValue(k,v) in itemState do
            m <- Map.add k v m
        m
    let allUpdates = new ObservableSource<FeedEvent>()
    let justNew = new ObservableSource<FeedEvent>()

The FeedSource class constructor takes in three bits of information – the Uri of the RSS feed, how often we want to poll, and an initial state for the Map.  (This may be over-engineered; I can imagine a client that saves out the state of already-seen items to disk, so the next time the app starts it can read them back in and then pass them into the third parameter of this constructor, but my app doesn’t do that.)  “seenItems” is a Map of item Ids to LastUpdatedTimes, and it’s initialized with the data passed into the constructor.  There are two ObservableSource objects (instances of the class from last time), “allUpdates” for information about every change and “justNew” for only new items that appear in the feed (that is, items whose Ids we have not previously encountered).

Next here’s a function to run each polling cycle:

    let Update() =
        try
            let feed = SyndicationFeed.Load(XmlReader.Create(uri))
            for item in feed.Items do
                match Map.tryFind item.Id seenItems with
                | Some lup -> 
                    if item.LastUpdatedTime > lup then
                        seenItems <- seenItems.Add(item.Id, item.LastUpdatedTime)
                        allUpdates.Next(Item item)
                | None ->
                    seenItems <- seenItems.Add(item.Id, item.LastUpdatedTime)
                    allUpdates.Next(Item item)
                    justNew.Next(Item item)
            allUpdates.Next(EndPoll)
            justNew.Next(EndPoll)
        with e ->
            allUpdates.Next(RetryError e)
            justNew.Next(RetryError e)

It’s pretty straightforward.  We download the feed.  We iterate over all the items.  If the item already exists in the “seenItems” Map, but its LastUpdatedTime is newer, we publish an Item event with the item to “allUpdates”.  (If the time is not newer, then we can drop this data on the floor, there’s nothing more to do.)  Otherwise if it’s not in the Map, the item is totally new, and we publish it to both “allUpdates” and “justNew”.  Once we’ve processed all the items, we publish an EndPoll event to both sources.  If there was an exception during the processing, we publish a RetryError event with the exception.

Next we need to be able to run the polling cycle in a loop, sleeping for the appropriate TimeSpan in between polls.  I’ve chosen to expose an explicit Start() method so that client can subscribe before the events begin arriving.  (A Dispose() method, coming shortly, is the “stop”.)  We use an async loop to avoid blocking a thread while we sleep.  Async.Start will start this loop in the .Net threadpool, where it will run continually.

    let mutable started = false
    let mutable disposed = false
    // Start poll the feed and sending observations to subscribers.  
    // (The polling and OnXXX calls run in the thread pool.)
    // This method is distinct from the constructor so that clients can subscribe before calling Start, and thus
    // get the 'first' batch of data from the feed.
    member this.Start() = 
        if started then
            raise <| new InvalidOperationException("cannot start twice")
        started <- true
        let rec Loop() =
            async {
                Update()
                do! Async.Sleep(int pollingFrequency.TotalMilliseconds)
                if not disposed then
                    do! Loop()
            }
        Loop() |> Async.Start 

Finally, we expose the two IObservable properties, as well as the rest of the state of the class, and finally make it IDisposable to have a way to stop the polling.

    member this.AllUpdates = allUpdates.AsObservable 
    member this.JustNewItems = justNew.AsObservable
    member this.ItemState = Map.toSeq seenItems |> dict  // return a read-only snapshot
    member this.IsStarted = started
    member this.IsDisposed = disposed
    member this.Uri = uri
    member this.PollingFrequency = pollingFrequency
    interface IDisposable with
        member this.Dispose() = 
            if disposed then
                raise <| new InvalidOperationException("cannot dispose twice")
            disposed <- true
            allUpdates.Completed()
            justNew.Completed()

That’s it.  There’s little to this class other than running the Update() function in an async loop and exposing all the state/properties.

Here’s a short sample that demonstrates the FeedSource class being used.  This little program polls the StackOverflow feed of C# questions every minute:

let soCSharpFeed = "http://stackoverflow.com/feeds/tag?tagnames=c%23&sort=newest"

let Main() =
    use feedSrc = new FeedSource(soCSharpFeed, System.TimeSpan.FromMinutes(1.0), dict[])
    use au = feedSrc.AllUpdates.Subscribe(fun event ->
        match event with
        | Item i -> printfn "Update to '%s'" i.Title.Text 
        | _ -> ())
    use jn = feedSrc.JustNewItems.Subscribe(fun event ->
        match event with
        | Item i -> printfn "New item '%s'" i.Title.Text 
        | _ -> ())
    feedSrc.Start()
    System.Console.ReadKey() |> ignore

Main()    

When run it will immediately print all the recent messages as both JustNewItems and AllUpdates.  If you leave it running for a couple minutes during a busy period on StackOverflow, you’ll see some new items or updates-to-already-seen-items appear.  This code snippet demonstrates the way a typical client would use the API: new up a FeedSource, subscribe to the observations it cares about, ‘start’ the source, and then dispose everything when it’s all done.

Implementing batching

As mentioned previously, the FeedSource class provides fine-grained observations about all the information in an RSS feed.  Some clients may want something simpler – for instance, receiving at most one event each polling cycle that describes the changes since the last time.  I’ve built a batch-transform function that serves this purpose.  It takes as input an IObservable<FeedEvent> (thus listening for every individual event), and returns a new IObservable<seq<SyndicationItem>> that fires at most once per polling cycle with the information about all the item events that were seen.  Here’s the code:

// Given that most feed sources will poll, and thus be quiet for a period of time, followed by a burst
// of OnNext() activity, followed by another quiet period... this method "batches" together all the items
// between polling events (or failures) so that just a single OnNext() with a collection of items happens
// with each polling cycle.
// Note that this effectively swallows error diagnostics (RetryError).
let Batch (src : IObservable<FeedEvent>) : IObservable<seq<SyndicationItem>> =
    let curList = ref []
    let result = new ObservableSource<seq<SyndicationItem>>()
    let SendBatch() =
        if not (!curList).IsEmpty then
            result.Next(!curList)
            curList := []
    let disp = ref (null : IDisposable) 
    disp := src.Subscribe({new IObserver<_> with
        member this.OnNext(x) =
            match x with
            | Item x -> 
                curList := x :: !curList 
            | _ -> 
                SendBatch()
        member this.OnCompleted() = SendBatch(); result.Completed(); (!disp).Dispose()
        member this.OnError(e) = SendBatch(); result.Error(e); (!disp).Dispose() })
    result.AsObservable 

We store a mutable list in a reference cell (curList) that is used to accumulate items as we encounter them; each time we get an OnNext with an Item event, we add that item to the list.  (I don’t use ‘ref’ very often; if you need a refresher, see this blog entry.)  Once the polling cycle ends, the original source will send an EndPoll (or possibly a RetryError) message, which will go to the second branch of the ‘match’, which calls SendBatch().  SendBatch() looks to see if we’ve accumulated any new items in the list; if so, we send them all in a single OnNext() call and empty the list.  If there are no new items, we do nothing (don’t publish any new observations to the resulting IObservable event stream).  This will just happen over and over again until we get an OnCompleted/OnError from the original source, at which point we send any as-yet-unsent items as a final batch, and then forward the completed/error event and dispose ourselves.

Once again, let’s see what some client code may look like that uses the Batch() function.

open System.ServiceModel.Syndication 
let soCSharpFeed = "http://stackoverflow.com/feeds/tag?tagnames=c%23&sort=newest"

let Main() =
    use feedSrc = new FeedSource(soCSharpFeed, System.TimeSpan.FromMinutes(1.0), dict[])
    use au = Batch(feedSrc.AllUpdates).Subscribe(fun updates ->
        printfn "Saw the following updates..." 
        for item:SyndicationItem in updates do
            printfn " - %s" item.Title.Text)
    use jn = Batch(feedSrc.JustNewItems).Subscribe(fun news ->
        printfn "Saw the following new items..." 
        for item:SyndicationItem in news do
            printfn " - %s" item.Title.Text)
    feedSrc.Start()
    System.Console.ReadKey() |> ignore

Main()    

This is very similar to the last example, but now we call Batch() on the IObservable before subscribing.  Thus now we get a single event that has data about all the items this polling cycle, which we can iterate over inside the handler.

Summing up so far

In the previous blog entry and this one, I’ve covered a bit of territory.  I’ve introduced the IObservable interface, which is conceptually an event stream that periodically fires OnNext() messages until finally potentially ending with an OnCompleted() or OnError() call.  I’ve implemented an ObservableSource class which provides a straightforward API for creating an IObservable that clients can subscribe to, and discussed the importance of threading contracts like ‘serializable’ (where, even if events come from arbitrary background threads, they never overlap).  I’ve built a polling RSS FeedSource class, which allows you to specify a feed Uri and a polling frequency, that publishes a couple granular IObservables: one with all updates, and another with only not-previously-seen items.  And I’ve implemented a transform function that converts a bursty granular FeedSource IObservable into a new IObservable that batches everything up to just send one event per polling cycle.

With these pieces in hand, we’re on our way towards implementing the dashboard app.

Next time

For the next entry in the series, we’ll look at speech synthesis.  Your computer will be speaking aloud to you in no time!  The .Net APIs here make this really, really easy, as you’ll soon see.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

 
%d bloggers like this: