The Burrows-Wheeler Transform (BWT) is a block-sorting data compression algorithm that acts as a preconditioner to dramatically improve the compression ratios of subsequent compression phases in many cases. This algorithm is the core of the bzip2 data compression utility that is ubiquitous on Unix and, in particular, is often much more effective than zip and gzip.

The core of the BWT algorithm is easily written in F#:

let cmp (str: _ array) i j =

let rec cmp i j =

if i=str.Length then 1 else

if j=str.Length then -1 else

let c = compare str.[i] str.[j] in

if c<>0 then c else

cmp (i+1) (j+1)

cmp i j

let bwt (str: byte array) =

let n = str.Length

let a = Array.init n (fun i -> i)

Array.sortInPlaceWith (cmp str) a

Array.init n (fun i -> str.[(a.[i] + n - 1) % n])

However, this implementation is very slow and, in particular, is many times slower than the extremely heavily optimized C implementation found in the bzip2 program. The main cause of this performance degradation is the use of a higher-order sortInPlaceWith function that compares array elements using a closure derived from the cmp function. Closure invocation is substantially more expensive than ordinary function invocation.

Fortunately, this is easily solved by writing a generic higher-order sort function in F# and marking it inline to ensure that it will be inlined, resulting in specialization over both the array element type and the comparison function. Incredibly, this simple change creates an F# program that runs at a similar speed to the industrial-strength bzip2 program:

open System.Threading let inline sort cmp (a: _ array) = let inline swap i j = let t = a.[i] a.[i] <- a.[j] a.[j] <- t let rec qsort l u = if l < u then swap l ((l + u) / 2) let mutable m = l for i=l+1 to u do if cmp a.[i] a.[l] < 0 then m <- m + 1 swap m i swap l m if u-l > 1000 then let m = m let f = Tasks.Future.Create(fun () -> qsort l (m-1)) qsort (m+1) u f.Value else qsort l (m-1) qsort (m+1) u qsort 0 (a.Length-1) let inline cmp (str: _ array) i j = let rec cmp i j = if i=str.Length then 1 else if j=str.Length then -1 else let c = compare str.[i] str.[j] in if c<>0 then c else cmp (i+1) (j+1) cmp i j let bwt (str: byte array) = let n = str.Length let a = Array.init n (fun i -> i) sort (fun i j -> cmp str i j) a Array.init n (fun i -> str.[(a.[i] + n - 1) % n])

The ability to inline higher-order functions in this way, to perform partial specialization, is one of the main advantages of the F# programming language over the previous generations of languages including OCaml. In fact, a direct port of this program to OCaml is 150× slower and, even after considerable effort, we were only able to make the OCaml 2.8× slower than F# and C. This is our best attempt so far:

open Bigarray type int_array = (int, int_elt, c_layout) Array1.t type byte_array = (int, int8_unsigned_elt, c_layout) Array1.t exception Cmp of int let cmp (str: byte_array) i j = let n = Array1.dim str in let i = ref i and j = ref j in try while true do if !i = n then raise(Cmp 1) else if !j = n then raise(Cmp(-1)) else let si = str.{!i} and sj = str.{!j} in if si < sj then raise(Cmp(-1)) else if si > sj then raise(Cmp 1) else begin incr i; incr j end done; 0 with Cmp c -> c let swap (a: int_array) i j = let t = a.{i} in a.{i} <- a.{j}; a.{j} <- t let invoke (f : 'a -> 'b) x : unit -> 'b = let input, output = Unix.pipe() in match Unix.fork() with | -1 -> (let v = f x in fun () -> v) | 0 -> Unix.close input; let output = Unix.out_channel_of_descr output in Marshal.to_channel output (try `Res(f x) with e -> `Exn e) []; close_out output; exit 0 | pid -> Unix.close output; let input = Unix.in_channel_of_descr input in fun () -> let v = Marshal.from_channel input in ignore (Unix.waitpid [] pid); close_in input; match v with | `Res x -> x | `Exn e -> raise e;; let sort str (a: int_array) = let rec qsort l u = if l < u then begin swap a l ((l + u) / 2); let m = ref l in for i=l+1 to u do if cmp str a.{i} a.{l} < 0 then begin incr m; swap a !m i end done; swap a l !m; if u - l > 30000 then begin let f () = qsort l (!m - 1); Array1.sub a l (!m-l-1) in let a' = invoke f () in qsort (!m + 1) u; let a' = a'() in for i=l to !m-2 do a.{i} <- a'.{i - l} done end else begin qsort l (!m - 1); qsort (!m + 1) u end end in ignore(qsort 0 (Array1.dim a - 1)) let () = let file = try Sys.argv.(1) with _ -> "input.txt" in let desc = Unix.openfile file [] 777 in let str = Array1.map_file desc int8_unsigned c_layout false (-1) in let n = Array1.dim str in let a = Array1.create int c_layout n in for i=0 to n-1 do a.{i} <- i done; sort str a; for i=0 to n-1 do print_char(Char.chr str.{(a.{i} + n - 1) mod n}) done; Unix.close desc