Latest 0.8.0
Homepage https://github.com/couchdeveloper/TaskQueue
License Apache License, Version 2.0
Platforms osx 10.10, ios 9.0, tvos 10.0, watchos 3.0, requires ARC
Authors

TaskQueue

Build Status GitHub license Swift 3.2 Platforms MacOS | iOS | tvOS | watchOS Carthage Compatible CocoaPods

A TaskQueue is basically a FIFO queue where tasks can be enqueued for execution. The tasks will be executed concurrently up to an allowed maximum number.

A task is simply a non-throwing asynchronous function with a single parameter
which is a completion handler called when the task finished.

Features

  • Employs the execution of asynchronous "non-blocking" tasks.
  • The maximum number of concurrently executing tasks can be set, even during the execution of tasks.
  • Employs a "barrier" task which serves as a synchronisation point which allows
    you to "join" all previous enqueued tasks.
  • Task and TaskQueue can be a used as a replacement for NSOperation and
    NSOperationQueue.

Description

With a TaskQueue we can control the maximum number of concurrent tasks that run "within" the task queue. In order to accomplish this, we enqueue tasks into the task queue. If the actual number of running tasks is less than the maximum, the enqueued task will be immediately executed. Otherwise it will be delayed up until enough previously enqueued tasks have been completed.

At any time, we can enqueue further tasks, while the maximum number of running tasks is continuously guaranteed. Furthermore, at any time, we can change the number of maximum concurrent tasks and the task queue will adapt until the constraints are fulfilled.

Installation

Carthage

Add

github "couchdeveloper/TaskQueue"

to your Cartfile.

In your source files, import the library as follows

import TaskQueue

CocoaPods

Add the following line to your Podfile:

pod 'cdTaskQueue'

In your source files, import the library as follows

import cdTaskQueue

SwiftPM

To use SwiftPM, add this to your Package.swift:

.Package(url: "https://github.com/couchdeveloper/TaskQueue.git")

Usage

The typical usage scenario is as follows:

Suppose, there one or more asynchronous tasks and we want to execute them in
some controlled manner. In particular, we want to make guarantees that no more
than a set limit of those tasks execute concurrently. For example, many times,
we just want to ensure, that only one task is running at a time.

Furthermore, we want to be notified when all tasks of a certain set have been
completed and then take further actions, for example, based on the results,
enqueue further tasks.

So, what’s a task anyway?

A task is a Swift function or closure, which executes asynchronously returns
Void and has a single parameter, the completion handler. The completion handler has a single parameter where the eventual Result – which is computed by the underlying operation – will be passed when the task completes.

We can use any type of "Result", for example a tuple (Value?, Error?) or more
handy types like Result<T> or Try<T>.

Canonical task function:

func task(completion: @escaping (R)->()) {
    ...
}

where R is for example: (T?, Error?) or Result<T> or (Data?, Response?, Error?) etc.

Now, create a task queue where we can enqueue a number of those tasks. We
can control the number of maximum concurrently executing tasks in the initializer:

 let taskQueue = TaskQueue(maxConcurrentTasks: 1)
 // Create 8 tasks and let them run:
 (0...8).forEach { _ in
   taskQueue.enqueue(task: myTask) { (String?, Error?) in
      ...
   }   
 }

Note, that the start of a task will be delayed up until the current number of
running tasks is below the allowed maximum number of concurrent tasks.

In the above code, the asynchronous tasks are effectively serialized, since the
maximum number of concurrent tasks is set to 1.

Using a barrier

A barrier function allows us to create a synchronization point within the TaskQueue. When the TaskQueue encounters a barrier function, it delays the execution of the barrier function and any further tasks until all tasks enqueued before the barrier have been completed. At that point, the barrier function executes exclusively. Upon completion, the TaskQueue resumes its normal execution behavior.

 let taskQueue = TaskQueue(maxConcurrentTasks: 4)
 // Create 8 tasks and let them run (max 4 will run concurrently):
 (0...8).forEach { _ in
   taskQueue.enqueue(task: myTask) { (String?, Error?) in
      ...
   }   
 }
 taskQueue.enqueueBarrier {
   // This will execute exclusively on the task queue after all previously
   // enqueued tasks have been completed.
   print("All tasks finished")
 }

 // enqueue further tasks as you like

Specify a Dispatch Queue Where to Start the Task

Even though, a task should always be designed such, that it is irrelevant on
which thread it will be called, the practice is often different. Fortunately, we
can specify a dispatch queue in function enqueue where the task will be eventually started by the task queue, if there should be such a limitation.

If a queue is not specified, the task will be started on the global queue (DispatchQueue.global()).

taskQueue.enqueue(task: myTask, queue: DispatchQueue.main) { Result<String> in
    ...
}

Note, that this affects only where the task will be started. The task’s completion handler will be executed on whatever thread or dispatch queue the task is choosing when it completes. There’s no way in TaskQueue to specify the execution context for the completion handler.

Constructing a Suitable Task Function from Any Other Asynchronous Function

The function signature for enqueue requires that we pass a task function which
has a single parameter completion and returns Void. The single parameter is
the completion handler, that is a function, taking a single parameter or a tuple
result and returning Void.

So, what if our asynchronous function does not have this signature, for example,
has additional parameters and even returns a result?

Take a look at this asynchronous function from URLSession:

dataTask(with url: URL,
  completionHandler: @escaping (Data?, URLResponse?, Error?) -> Swift.Void)
  -> URLSessionDataTask

Here, besides the completion handler we have an additional parameter url which is used to configure the task. It also has a return value, the created URLSessionTask object.

In order to use this function with TaskQueue, we need to ensure that the task is
configured at the time we enqueue it, and that it has the right signature. We can
accomplish both requirements by applying currying to the given function.

The basic steps are as follows:

Given any asynchronous function with one or more additional parameters and possibly a return value:

func asyncFoo(param: T, completion: @escaping (Result)->()) -> U {
  ...
}

we transform it to:

func task(param: T) -> (_ completion: @escaping (Result) -> ()) -> () {
  return { completion in
    let u = asyncFoo(param: param) { result in
      completion(result)
    }
    // handle return value from asyncFoo, if any.
  }
}

That is, we transform the above function asyncFoo into another, whose parameters consist only of the configuring parameters, and returning a function having the single remaining parameter, the completion handler, e.g.:

((Result) -> ()) -> ().

The signature of this returned function must be valid for the task function
required by TaskQueue. "Result" can be a single parameter, e.g. Result<T> or
any tuple, e.g. (T?, Error?) or (T?, U?, Error?), etc.

Note, that any return value from the original function (here asyncFoo), if any,
will be ignored by the task queue. It should be handled by the implementation of
the task function, though.

You might want to examine this snippet a couple of times to get used to it ;)

Then use it as follows:

taskQueue.enqueue(task: task(param: "Param")) { result in
    // handle result
    ...
}

This ensures, that the task will be "configured" with the given parameters at the
time it will be enqueued. The execution, though, will be delayed up until the task
queue is ready to execute it.

Example

Here, we wrap a URLSessionTask executing a "GET" into a task function:

func get(_ url: URL) -> (_ completion: @escaping (Data?, URLResponse?, Error?) -> ()) -> () {
    return { completion in
        URLSession.shared.dataTask(with: url) { data, response, error in
            completion(data, response, error)
        }.resume()
    }
}

Then use it as follows:

let taskQueue = TaskQueue(maxConcurrentTasks: 4)
taskQueue.enqueue(task: get(url)) { data, response, error in
    // handle (data, response, error)
    ...
}

Having a list of urls, enqueue them all at once and execute them with the
constraints set in the task queue:

let urls = [ ... ]
let taskQueue = TaskQueue(maxConcurrentTasks: 1) // serialize the tasks
urls.forEach {
  taskQueue.enqueue(task: get($0)) { data, response, error in
      // handle (data, response, error)
      ...
  }  
}

Latest podspec

{
    "name": "cdTaskQueue",
    "version": "0.8.0",
    "summary": "A TaskQueue controls the execution of asynchronous functions.",
    "license": "Apache License, Version 2.0",
    "homepage": "https://github.com/couchdeveloper/TaskQueue",
    "authors": {
        "Andreas Grosam": "[email protected]"
    },
    "source": {
        "git": "https://github.com/couchdeveloper/TaskQueue.git",
        "tag": "0.8.0"
    },
    "platforms": {
        "osx": "10.10",
        "ios": "9.0",
        "tvos": "10.0",
        "watchos": "3.0"
    },
    "source_files": "Sources/*.swift",
    "requires_arc": true,
    "pushed_with_swift_version": "3.2"
}

Pin It on Pinterest

Share This