AsyncSequence in Swift

Apple introduced the AsyncSequence in Swift 5.5. This is one of the major features released in this Swift version to improve the Swift concurrency. In today's post, we are going to take a look at what AsyncSequence are and how you can make your own custom async sequence in the Swift app.

The async sequence is a new mechanism that allows us to iterate over a series of async values over time. The async sequence will suspend over each element during the iteration and resume until the operator produces a value or throws an error.

They're just like regular sequences, but with few differences. Each element in the iteration is delivered asynchronously. The sequence ends when the last value is sent signaling completion or the operator throws an error.

While async iterator is running, they will keep asynchronously returning specified values until an error is thrown and at the end, they are terminated by calling finishAPI with a nil error.

AsyncSequence has many applications where we need to download a huge chunk of data such as a CSV file or a text file or cases where real-time data APIs return one line of JSON data for the periodic UI updates.

Let's learn how to use AsyncSequence API by taking look at each of its applications.

  1. Reading a remote or local long CSV or the text file

AsyncSequence API allows us to read the long CSV or the text files that may be stored on the remote server or might be available locally. The line-wise data becomes available as its being read. Since each line is presented asynchronously, it won't block the UI thread.

First, let's take a look at how to read a CSV file on the remote server.


func loadEarthquakeData() async throws {
    guard let significantEarthquakeDataURL = URL(string: "https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_day.csv") else {
        return
    }
    
    for try await significantEarthquakeData in significantEarthquakeDataURL.lines.dropFirst() {
        print(significantEarthquakeData)
    }
}

In the above example, we have provided a direct URL of a remote CSV file. In the for-loop, we read the line-by-line CSV data (Ignoring the first row that represents header data) and print it. Fortunately, being asyncSequence, we don't need to wait until the whole file is read, but it prints the line as an async call returns the current line. Once all the lines are read, the for-loop terminates.

Since this is an async function, we need to wrap it in the Task closure. loadEarthquakeData can also throw an error since the code to access lines throws, we are going to wrap the function call in a do-catch block.


Task {
    do {
        try await loadEarthquakeData()
    } catch {
        print(error.localizedDescription)
    }
}

If we run the above code, it will print the CSV file data line by line until all the lines are printed.

Reading Data from a Local Text File

Now, let's take a look at how we can use AsyncSequence to read the line-by-line data from a local text file.


func readTextFile() async throws {
    if let fileURL = Bundle.main.url(forResource: "test_data", withExtension: "txt") {
        for try await line in fileURL.lines {
            print(line)
        }
    }
}

In the above example, I have a locally stored text file named test_data. First, I will get the URL of this file relative to the app's main bundle and call lines API on the URL. The lines API will asynchronously return the stream of lines in the file in for-loop and iteration continues until there are no more lines to read from the text file.

We can execute it in the same way we did by calling loadEarthquakeData async function.


Task {
    do {
        try await readTextFile()
    } catch {
        print(error.localizedDescription)
    }
}

The above code prints each line in the text file line-by-line.

2. Showing a loading state during the remote data download

There is another AsyncSequence related API that gives us incremental data as it's being downloaded. We can asynchronously iterate over a stream of bytes and print partial bytes until the entire data is downloaded.

We can use this application to show the percentage of downloaded data from the remote server to improve the user experience and predictability of download operation.


func showProgress() async throws {
    let urlString = "https://photographylife.com/wp-content/uploads/2016/06/Mass.jpg"
    let urlRequest = URLRequest(url: URL(string: urlString)!)
    let (bytes, response) = try await URLSession.shared.bytes(for: urlRequest)

    guard let httpResponseCode = (response as? HTTPURLResponse)?.statusCode, (200..<300) ~= httpResponseCode else {
        throw WebServiceError.badResponse
    }
    
    let expectedLength = response.expectedContentLength
    var data = Data()
    
    for try await byte in bytes {
        data.append(byte)
        let currentLength = data.count
        print((Double(currentLength)/Double(expectedLength)) * 100)
    }
}

Let's go through this code step-by-step to understand what's going on here.

First, we have a URL string that refers to remote resources such as images or PDF file. We create the URLRequest object with it to be able to access bytes using bytes API on URLSession object.

Next, we have an error handling. If the server response code is other than in the range of [200-299], we will throw an error.

On the next line, we use expectedContentLength property on URLResponse object to get the expected length of response. Next, we asynchronously iterate over bytes to get byte one by one until no more bytes are left. While doing that, we also append it to the newly created data object. Once all the bytes are over, we get the full Data object pointed by the given URL.

Next, let's all this function in the async context to proceed with the download progress.


Task {
    do {
        try await showProgress()
    } catch {
        print(error.localizedDescription)
    }
}

Running it will also show the download progress in terms of percentage,


0.0004296381587427069
0.004726019746169776
7.4370365278362565
18.515256451016953
50.29258358610378
...
...
.....
98.24707631232975
100.0

To make things more interesting, let's show this progress on UI.


for try await byte in bytes {
    data.append(byte)
    let currentLength = data.count
    let progress = (Double(currentLength)/Double(expectedLength)) * 100
    Task {
        await showDownloadProgress(progress: progress)
    }
}

....
.....
..

@MainActor func showDownloadProgress(progress: Double) async {
    downloadProgressLabel.text = "\(progress)"
}
💡
Please note how @MainActor annotation for showDownloadProgress. When bytes are downloading, the execution occurs on the background thread. In order to update the UI, we need to switch to the main thread. @MainActor annotation allows us to make sure we're on the main thread while running a code that update the UIKit label
0:00
/

3. Making a custom async sequence

The usage of the async sequence isn't limited to iOS APIs. We can also make the async sequence out of our own async code.

For example, consider a case where we want to keep querying a particular URL for the UUID and put the polling code into async for loop using AsyncSequence API. The code will run continuously until the first character from the response string is equal to "a" and the async loop will break.

For this example, we will query https://httpbin.org/uuid which will return the UUID in the following format,


{
  "uuid": "efb1c4c5-364a-49b5-a301-be66999a9b1f"
}
This UUID is dynamic and will keep changing between the calls

We will first write an async function loadUUID which will query the above URL and return the Codable object UUID back which encapsulates the uuid as a string.  


struct UUID: Decodable {
    let uuid: String
}

enum WebServiceError: Error {
    case badResponse
}

func loadUUID() async throws -> UUID {
    let urlString = "https://httpbin.org/uuid"
    let (data, response) = try await URLSession.shared.data(from: URL(string: urlString)!)
    return try JSONDecoder().decode(UUID.self, from: data)
}

Next, we will write another function dependent on loadUUID to asynchronously load the UUID object. This function returns an object of type AsyncThrowingStream which encodes data to be sent and an error in case something goes wrong while downloading the UUID object from the server.

💡
We will use AsyncThrowingStream to continuously send the data as a stream until it either throws an error or manually signals the completion.

 


func customAsyncStreamForUUIDs() -> AsyncThrowingStream<UUID, Error> {
    return AsyncThrowingStream { continuation in
    
    }
}

The stream gives us a parameter continuation that we use to signal once the data is ready thus completing the contract waiting with await keyword.  We can also use it to signal completion or the presence of error in the stream.

Let's write the rest of the code before getting to its explanation.


func customAsyncStreamForUUIDs() -> AsyncThrowingStream<UUID, Error> {
    return AsyncThrowingStream { continuation in
        Task {
            while (true) {
                do {
                    let uuid = try await loadUUID()
                    continuation.yield(uuid)
                    if Array(uuid.uuid)[0] == "a" {
                        continuation.finish(throwing: nil)
                        break
                    }
                } catch {
                    continuation.finish(throwing: error)
                    break
                }
            }
        }
    }
}

In the above example, we are querying the loadUUID to get the UUID object back. Since we want this process to continue indefinitely, we put it inside a while loop.

Since loadUUID()might throw an error, we enclosed it inside a do-catch block and due to its asynchronous nature, we enclosed the whole code block as an async Task object.

We will asynchronously continue getting UUID objects from this block. If loadUUID throws an error, we will terminate by calling finish with a thrown error. We also check if the first character of the uuid string in the incoming response is "a". If this condition is satisfied, we call to finish with nil error and stop the stream.

For-loop Over Asynchronous Stream of UUIDs

Now that our asynchronous stream of UUIDs is ready, let's write a code to asynchronously consume it in the for-loop.

We will treat customAsyncStreamForUUIDs() as a normal async stream and iterate over it using for and try-await keywords.


for try await currentUUID in customAsyncStreamForUUIDs() {
    print("Current UUID is \(currentUUID.uuid)")
}

Prints,


Current UUID is 341754de-d453-432c-b33f-942046030f46
Current UUID is e8e97675-10b7-49bd-a92d-0d27ebcfd919
Current UUID is 215a663b-16fb-473c-9514-5d29a7865aa3
Current UUID is fdd0968c-c696-4389-9534-ab0f0d13d0a6
Current UUID is e987257b-dc91-46da-b7ac-9462681e024e
Current UUID is a7b7b8d1-3f37-47d8-9b10-443d1b4335c4
As you can see, it stopped right when the uuid string in the response starts with letter "a"

You can convert any stream of async code implemented using other techniques into the async stream with the use of AsyncThrowingStream API. With it, you can asynchronously manipulate values from any async stream.

Summary

So, in today's post, we saw what are async streams and how to use them. We also learned how to build your own async stream to give a steady stream of async values over time without involving delegates and closures. Async streams aren't limited to examples we saw here. You can use async streams with a lot more existing APIs or create your custom async stream too. I don't have enough space to list all of them, but here's the snapshot from the WWDC video.

AsyncSequence is a powerful tool for dealing with more than one async value. You don't need a piece of new knowledge to know how to use async streams. If you know how to use for-loops, you already know how to handle async streams too.

This is all for today. If there are other applications of async streams you know that we didn't handle or have further questions on this topic, please let me know. As always, if you have any  comments, criticism or feedback  to share about this article, don't forget to get in touch on Twitter @jayeshkawli.

References

Async sequences, streams, and Combine
Download Progress With Awaited Network Tasks
Meet AsyncSequence (WWDC 2021)