Note: All code examples in this post are licensed under the GPL version 2 (or later). My code is a derivative work of OpenCaster.

Playing out MPEG transport streams is a fairly complex affair. At the office, we have our own in-house solution comprising SI generators, a software multiplexer and DSMCC carousel playout tools etc. covering the four major boradcast television platforms in the UK. In an effort to understand how all the moving parts slot together, I've been reading through some of the code and exploring some tools from the open source world as well. One open source tool, OpenCaster, has caught my attention for its simple, old-school software architecture.

OpenCaster is distributed as a collection of small utilities written in C. I haven't dug into what they all do yet, but the idea is pretty simple. Each tool has one job and does it well. Communication between tools occurs via pipes, typically named FIFOs.

As an example, let's say you want to broadcast a looped audio/video/data transport stream from a file on disk. We create a named pipe to pump the looped packets into and use a tool named tsloop to read packets from the source file, write them into the pipe, and loop when it runs out of input.

# given sample.ts
mkfifo loop.ts
tsloop sample.ts > loop.ts &

We might want to correct the PCR timestamp in the transport stream, so we use another tool to read from our pipe, correct the PCR, then write out to another pipe.

mkfifo corrected-pcr.ts
tsstamp loop.ts 13271000 > corrected-pcr.ts &

If we assume that we have done enough for the stream to be correctly decoded by a set top box (or VLC Media Player), we can now hook up a third tool to broadcast the PCR-corrected looped transport stream over a supported connection. OpenCaster supplies a few different broadcast tools so I'll just rip an example from their user manual. This tool sends the stream out over a supported RF card.

tsrfsend myfirstfifo.ts -mt OFDM -mC QAM16 -mG 1/4 -mc 2/3 -mf 578

Now we have seen enough of the OpenCaster toolset to get the gist of how the software architecture works. Each tool filters a stream read from a file (really a named pipe), performs some modification, then writes it out for the next step in the chain to deal with. This is a perfect example of the Unix philosophy of building small, composable tools. The only thing that makes OpenCaster's architecture unusual is that the tools filter MPEG transport stream packets rather than text strings, but the result is much the same.

Enter Go

For a while now I've had a fascination with Google's Go programming language. It's like C but easier to write, more type-safe, more modular, and in some ways even simpler, although it does make some of C's more (potentially) dangerous features much harder to use.

I thought it might be a fun little challenge to reimplement some of OpenCaster in Go and see how the Go version differed from the C original. I have started with tsloop, a relatively simple application for looping a file-based transport stream into a pipe.

OpenCaster's tsloop program is just a great big main function in C with no headers. It uses a couple of standard and POSIX library functions but is otherwise self-contained. While this may be efficient, I find it annoying to read through a giant wall of code with various duplicated bits that could be factored out easily.

I started by writing a straight port of the C code, changing to Go idioms where it made sense but keeping true to the original for the most part.

package main

import (
  "bytes"
  "encoding/binary"
  "flag"
  "fmt"
  "os"
  "syscall"
)

const (
  tsPacketSize = 188
  maxPid       = 8192
)

// PID table for the continuity counter of the TS packets
type ccTable [maxPid]byte

var (
  pidCcTable      ccTable
  previousCcTable ccTable
  openFile        = 0
)

func main() {
  flag.Parse()
  if flag.NArg() >= 1 {
    openFile = 1
    if tsFile, err := os.Open(flag.Arg(openFile)); err == nil {
      processFile(tsFile)
    } else {
      fmt.Fprintf(os.Stderr,
        "tsloop: unable to open file %s\n",
        flag.Arg(1))
      os.Exit(1)
    }
  } else {
    fmt.Fprintf(os.Stderr,
      "Usage: 'tsloop filename1.ts filename2.ts ... filenameN.ts'\n")
    os.Exit(1)
  }
}

func initTable(table ccTable) {
  for i, _ := range table {
    table[i] = 0x10
  }
}

func processFile(tsFile *os.File) {
  // initialize tables
  initTable(pidCcTable)
  initTable(previousCcTable)
  packet := make([]byte, tsPacketSize)
  // main loop
  for {
    // read packets
    count, err := tsFile.Read(packet)
    // handle short packet
    if count < tsPacketSize {
      openFile %= flag.NArg()
      openFile += 1
      tsFile.Close()
      initTable(previousCcTable)
      tsFile, err = os.Open(flag.Arg(openFile))
      if err != nil {
        fmt.Fprintf(os.Stderr,
          "Can't open file %s\n",
          flag.Arg(openFile))
        os.Exit(1)
      }
      count, err := tsFile.Read(packet)
      if count <= 0 || err != nil {
        os.Exit(1)
      }
    }

    // check packet cc
    buf := bytes.NewBuffer(packet[1:3])
    var pid uint16
    binary.Read(buf, binary.LittleEndian, &pid)

    pid = syscall.Ntohs(pid)
    pid = pid & 0x1fff
    if pid < maxPid {
      if pidCcTable[pid] == 0x10 {
        fmt.Fprintf(os.Stderr,
          "new pid entry %d\n", pid)
        // new stream to track cc
        pidCcTable[pid] = packet[3] & 0x0f
        previousCcTable[pid] = packet[3] & 0x0f
      } else {
        adaptationField := (packet[3] & 0x30) >> 4
        if adaptationField == 0x0 || adaptationField == 0x2 {
          // reserved, no increment
          packet[3] = (pidCcTable[pid] | (packet[3] & 0xf0))
        } else if (adaptationField == 0x1) &&
            ((packet[3] & 0x0f) == previousCcTable[pid]) {
          // double packet accepted only once
          packet[3] = (pidCcTable[pid] | (packet[3] & 0xf0))
          previousCcTable[pid] = 0x10
        } else if (adaptationField == 0x3) &&
            ((packet[3] & 0x0f) == previousCcTable[pid]) {
          // double packet accepted only once
          packet[3] = (pidCcTable[pid] | (packet[3] & 0xf0))
          previousCcTable[pid] = 0x10
        } else {
          // increase the cc and restamp
          previousCcTable[pid] =
            packet[3] & 0x0f
          pidCcTable[pid] =
            (pidCcTable[pid] + 1) % 0x10
          packet[3] =
            (pidCcTable[pid] | (packet[3] & 0xf0))
        }
      }
    }

    // write packet
    os.Stdout.Write(packet)
  }
}

After I had the basic code structure compiling, I sought to simplify by refactoring distinct pieces of behaviour to separate functions, and generally making it easier to read and maintain.

package main

import (
  "bytes"
  "encoding/binary"
  "flag"
  "fmt"
  "io"
  "os"
  "syscall"
)

const (
  tsPacketSize = 188
  maxPid       = 8192
)

type ccTable [maxPid]byte

var (
  pidCcTable      ccTable
  previousCcTable ccTable
  openFile        = 0
  packet          = make([]byte, tsPacketSize)
)

func main() {
  flag.Parse()
  if flag.NArg() >= 1 {
    if tsFile, err := os.Open(flag.Arg(openFile)); err == nil {
      initTable(pidCcTable)
      initTable(previousCcTable)
      for {
        readPacket(tsFile)
        checkPacketCc()
        os.Stdout.Write(packet)
      }
    } else {
      fmt.Fprintf(os.Stderr,
        "tsloop: unable to open file %s\n", flag.Arg(1))
      os.Exit(1)
    }
  } else {
    fmt.Fprintf(os.Stderr,
      "Usage: 'tsloop filename1.ts filename2.ts ... filenameN.ts'\n")
    os.Exit(1)
  }
}

// initialize a cc table
func initTable(table ccTable) {
  for i, _ := range table {
    table[i] = 0x10
  }
}

// read a packet from the file(s) provided
// loops onto the next available file on EOF
func readPacket(tsFile *os.File) {
  count, err := tsFile.Read(packet)
  // handle short packet
  if count < tsPacketSize || err == io.EOF {
    openFile %= flag.NArg()
    openFile += 1
    tsFile.Close()
    initTable(previousCcTable)
    tsFile, err = os.Open(flag.Arg(openFile))
    if err != nil {
      fmt.Fprintf(os.Stderr,
        "Can't open file %s\n", flag.Arg(openFile))
      os.Exit(1)
    }
    count, err := tsFile.Read(packet)
    if count <= 0 || err != nil {
      os.Exit(1)
    }
  }
}

// extract the 16 bit PID from 2 bytes of the packet.
// Performs some binary magic to make it conform to the correct format.
// Example:
//     pid := extractPid(packet[1:3])
func extractPid(s []byte) (uint16) {
  buf := bytes.NewBuffer(s)
  var pid uint16
  binary.Read(buf, binary.LittleEndian, &pid)
  pid = syscall.Ntohs(pid)
  pid = pid & 0x1fff
  return pid
}

func checkPacketCc() {
  pid := extractPid(packet[1:3])
  if pid < maxPid {
    if pidCcTable[pid] == 0x10 {
      fmt.Fprintf(os.Stderr,
        "new pid entry %d\n", pid)
      // new stream to track cc
      pidCcTable[pid] = packet[3] & 0x0f
      previousCcTable[pid] = packet[3] & 0x0f
    } else {
      adaptationField := (packet[3] & 0x30) >> 4
      if adaptationField == 0x0 || adaptationField == 0x2 {
        // reserved, no increment
        packet[3] = (pidCcTable[pid] | (packet[3] & 0xf0))
      } else if (adaptationField == 0x1) &&
          ((packet[3] & 0x0f) == previousCcTable[pid]) {
        // double packet accepted only once
        packet[3] = (pidCcTable[pid] | (packet[3] & 0xf0))
        previousCcTable[pid] = 0x10
      } else if (adaptationField == 0x3) &&
          ((packet[3] & 0x0f) == previousCcTable[pid]) {
        // double packet accepted only once
        packet[3] = (pidCcTable[pid] | (packet[3] & 0xf0))
        previousCcTable[pid] = 0x10
      } else {
        // increase the cc and restamp
        previousCcTable[pid] = packet[3] & 0x0f
        pidCcTable[pid] = (pidCcTable[pid] + 1) % 0x10
        packet[3] = (pidCcTable[pid] | (packet[3] & 0xf0))
      }
    }
  }
}

After some very basic testing, I started to look at how the architecture might be improved. Specifically, I looked at how I might turn a collection of these filter applications into a reusable library of functionality, where a chain of filters could be built up in one process by reading a config file and assembling all the required pieces for broadcast. Obviously, I don't have all the pieces yet, but it often pays to think about architecture early.

Go has native support for communicating synchronous processes (CSP) known as goroutines communicating over channels, which can be used much like the OpenCaster tools use FIFOs. An MPEG packet is an array (or slice) of 188 bytes. We can easily send a slice of bytes down a channel, so this communication mechanism should suffice nicely.

In the tsloop application, we can fire up a goroutine to read packets from a file (and loop at the end), sending each packet synchronously down a channel. We can then fire up another goroutine to read from that channel, perform some filtering, and send the resulting packet down another channel. The result of the final channel can be fed into another filter or written to standard output for redirection to a file or named pipe as before. Converting the architecture to send data over channels in this way, we end up with something like this:

package main

// ... imports, declarations etc.

func main() {
  flag.Parse()
  if flag.NArg() >= 1 {
    if tsFile, err := os.Open(flag.Arg(openFile)); err == nil {
      initTable(pidCcTable)
      initTable(previousCcTable)
      input := make(chan []byte)
      output := make(chan []byte)
      quit := make(chan bool)
      nGoroutines := 3
      go readPackets(tsFile, input, quit)
      go checkPacketsCc(input, output, quit)
      go writePackets(output, quit)
      for i := 0; i < nGoroutines; i++ {
        <- quit
      }
    } else {
      fmt.Fprintf(os.Stderr,
        "tsloop: unable to open file %s\n", flag.Arg(1))
      os.Exit(1)
    }
  } else {
    fmt.Fprintf(os.Stderr,
      "Usage: 'tsloop filename1.ts filename2.ts ... filenameN.ts'\n")
    os.Exit(1)
  }
}

// ... functions elided to avoid duplication

// read a packet from the file(s) provided
// loops onto the next available file on EOF
func readPackets(tsFile *os.File, input chan []byte, quit chan bool) {
  packet := make([]byte, tsPacketSize)
  for {
    // content of readPacket(*os.File), above
    input <- packet
  }
  quit <- true
}

func checkPacketsCc(input chan []byte, output chan []byte, quit chan bool) {
  for {
    packet := <-input
    // contents of checkPacketCc(), above
    output <- packet
  }
  quit <- true
}

func writePackets(c chan []byte, quit chan bool) {
  for {
    packet := <-c
    os.Stdout.Write(packet)
  }
  quit <- true
}

In porting an existing application to a new language, we have had the opportunity to practice refactoring and to think more in-depth about software architecture. The Unix philosophy of simple, composable tools is alive and well, but we can make the idea work harder for us by composing the tools at a finer level of granularity and exposing their components via library calls. Getting the API design right requires further refactoring. Here is a starting point:

// src/gomux/util/tsloop.go
package util

import (
  "bytes"
  "encoding/binary"
  "fmt"
  "io"
  "os"
  "syscall"
)

const (
  TsPacketSize = 188
  MaxPid       = 8192
)

// PID table for the continuity counter of the TS packets
type CcTable [MaxPid]byte

var (
  previousCcTable CcTable
)

// read a packet from the file(s) provided
// loops onto the next available file on EOF
func ReadPackets(filenames []string, openFile int, tsFile *os.File,
    input chan []byte, quit chan bool) {
  packet := make([]byte, TsPacketSize)
  for {
    count, err := tsFile.Read(packet)
    // handle short packet
    if count < TsPacketSize || err == io.EOF {
      openFile %= len(filenames)
      openFile += 1
      tsFile.Close()
      InitCcTable(previousCcTable)
      tsFile, err = os.Open(filenames[openFile])
      if err != nil {
        fmt.Fprintf(os.Stderr,
          "Can't open file %s\n", filenames[openFile])
        os.Exit(1)
      }
      count, err := tsFile.Read(packet)
      if count <= 0 || err != nil {
        os.Exit(1)
      }
    }
    input <- packet
  }
  quit <- true
}

// initialize a cc table
func InitCcTable(table CcTable) {
  for i, _ := range table {
    table[i] = 0x10
  }
}

// extract the PID from 2 bytes of the packet.
// Performs some binary magic to make it conform to the correct format.
// Example:
//     pid := extractPid(packet[1:3])
func ExtractPid(s []byte) uint16 {
  buf := bytes.NewBuffer(s)
  var pid uint16
  binary.Read(buf, binary.LittleEndian, &pid)
  pid = syscall.Ntohs(pid)
  pid = pid & 0x1fff
  return pid
}

func CheckPacketsCc(input chan []byte, output chan []byte, quit chan bool) {
  var pidCcTable CcTable
  InitCcTable(pidCcTable)
  InitCcTable(previousCcTable)
  for {
    packet := <-input
    pid := ExtractPid(packet[1:3])
    if pid < MaxPid {
      if pidCcTable[pid] == 0x10 {
        fmt.Fprintf(os.Stderr,
          "new pid entry %d\n", pid)
        // new stream to track cc
        pidCcTable[pid] = packet[3] & 0x0f
        previousCcTable[pid] = packet[3] & 0x0f
      } else {
        adaptationField := (packet[3] & 0x30) >> 4
        if adaptationField == 0x0 || adaptationField == 0x2 {
          // reserved, no increment
          packet[3] = (pidCcTable[pid] | (packet[3] & 0xf0))
        } else if (adaptationField == 0x1) &&
            ((packet[3] & 0x0f) == previousCcTable[pid]) {
          // double packet accepted only once
          packet[3] = (pidCcTable[pid] | (packet[3] & 0xf0))
          previousCcTable[pid] = 0x10
        } else if (adaptationField == 0x3) &&
            ((packet[3] & 0x0f) == previousCcTable[pid]) {
          // double packet accepted only once
          packet[3] = (pidCcTable[pid] | (packet[3] & 0xf0))
          previousCcTable[pid] = 0x10
        } else {
          // increase the cc and restamp
          previousCcTable[pid] = packet[3] & 0x0f
          pidCcTable[pid] = (pidCcTable[pid] + 1) % 0x10
          packet[3] = (pidCcTable[pid] | (packet[3] & 0xf0))
        }
      }
    }
    output <- packet
  }
  quit <- true
}

This library can be called from the main program below:

// Write packets to standard out
func WritePackets(c chan []byte, quit chan bool) {
  for {
    packet := <-c
    os.Stdout.Write(packet)
  }
  quit <- true
}

// src/tsloop/tsloop.go
package main

import (
  "flag"
  "fmt"
  "gomux/util"
  "os"
)

func main() {
  flag.Parse()
  if flag.NArg() >= 1 {
    if tsFile, err := os.Open(flag.Arg(0)); err == nil {
      input := make(chan []byte)
      output := make(chan []byte)
      quit := make(chan bool)
      nGoroutines := 3
      go util.ReadPackets(flag.Args(), 0, tsFile, input, quit)
      go util.CheckPacketsCc(input, output, quit)
      go util.WritePackets(output, quit)
      for i := 0; i < nGoroutines; i++ {
        <-quit
      }
    } else {
      fmt.Fprintf(os.Stderr,
        "tsloop: unable to open file %s\n", flag.Arg(1))
      os.Exit(1)
    }
  } else {
    fmt.Fprintf(os.Stderr,
      "Usage: 'tsloop filename1.ts filename2.ts ... filenameN.ts'\n")
    os.Exit(1)
  }
}