Architecture of a Real World Haskell Application part II

OK, did take some time since the last post, but I am really busy now.

Last time we looked at the general structure of the application in terms of threads and model/interface separation. For this post, I want to write a little bit about the protocol handling and parsing and how it evolved. While this is not the most central part of the application, it is the oldest and therefore I think good to show some historical development.

Ok, so the tool began as a command line application which could only receive telecommands and send back correct responses (which on itself is not as simple as it sounds). Also at that point in time I just more or less started learning Haskell while my main language at that time was C++, so of course the first solution was a lot C++ like. So let’s see how it goes.

A protocol parser

The used transport protocol is the NCTRS protocol which is quite simple to handle. It is a message based protocol with different headers for different content. It is quite standard in the MCS domain (at least in the ESA context). The obvious thing is to model the NCTRS messages as data structures. For serialisation to/from the socket, the first take was with the cereal library. This worked out quite well, but later I had to change to binary because I also needed bitwise encoding/decoding and there is binary-bits which does that.

So basically it boils down to define suitable data structures and define instances of the Binary class for them. At least that was the first step. So I sat down with the NCTRS Interface Control Document and defined some structures:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
data NcduHeaderType =
  NCDU_TC_PKT_TYPE
  | NCDU_PKT_RES_TYPE
  | NCDU_DIR_TYPE
  | NCDU_TC_CLTU_TYPE
  | NCDU_CLTU_RES_TYPE
  | NCDU_TC_FRAME_TYPE
  | NCDU_FRAME_RES_TYPE
  ...
deriving (Ord, Eq, Show, Read, Enum)
 
data NcduTcHeader = NcduTcHeader {
  ncduPktSize :: Word32,
  ncduType :: NcduHeaderType,
  ncduScID :: Word16
} deriving (Show, Read)

The NCTRS has a general header, which is always the same. It contains the size of the message (inclusive the header) in the first 32 bit word, then the type of the message which determines the further structure and the Spacecraft ID. We also derive at least Show and Read instances automatically, so that we can print the content for debugging and also if we want to generate a message from a String. This comes sometimes quite handy. For the HeaderType we also have Eq and Ord so that we can compare them and also Enum although this is practically not needed since we do a direct transformation from the constructors to a number in the protocol.

Depending on the header type there can be various additional header information present. Basically there are three main types: packet, frame and CLTUs, which represent different layers of the protocol stack, and their responses (a packet is contained in segments which are contained in frames which are encoded as CLTUs). The mission control system can decide to skip some layers in the protocol stack (CLTU is the lowest, Packet is the highest), of course with the preposition that the functionality of this layer is then not available. The simplest is the packet layer where the header looks like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
data NcduTcPktHeader = NcduTcPktHeader {
  ncduTcPktTransType :: NcduHeaderTransType,
  ncduTcPktSeqCnt :: Word32,
  ncduTcPktVcID :: Word8,
  ncduTcPktMapID :: Word8
}
deriving (Show, Read)
 
 
data NcduHeaderTransType =
  NCDU_TC_AD_SERTYPE
  | NCDU_TC_BD_SERTYPE
deriving (Ord, Eq, Show, Read)

So we have a transportation type (AD or BD mode), a sequence count, a virtual channel ID and a Multiplexer Access Point ID. Most of this is uninteresting, just for AD and BD a short explanation: AD is a sequence controlled mode where a state machine is utilised to check that the commands arrive in the right sequence and are acknowledged in the first stage from the space craft (there are multiple stages of acknowledges of commands, this is the first on-board). BD mode is kind of an emergency mode, when the spacecraft is a bit in chaos, so there is no real sequence checked, they are fire and forget commands executed immediately to (hopefully) get the spacecraft back into a controllable mode. This is important as AD mode involves a more complex machinery (though the on-board part is simpler than the mission control part).

CLTU and Frame headers are more complicated and contain more fields, but use the same principal, so I omitted them.

To stack them together we can do something like this:

1
2
3
4
5
6
7
8
9
data NcduTcDu = NcduTcDuHeaderPkt {
    ncduTcHdr :: NcduTcHeader,
    ncduTcPktHdr :: NcduTcPktHeader,
    ncduTcData :: B.ByteString}
  | NcduTcDuHeaderCltu {
    ncduTcHdr :: NcduTcHeader,
    ncduTcCltuHdr :: NcduTcCltuHeader,
    ncduTcCltuData :: B.ByteString}
 deriving (Show)

So we have a complete Data Unit for TC’s with the general header, the specific header and the data part which is a ByteString and has to be interpreted by the next layer. I left out the types for TM (the direction simulator -> MCS) and also the responses because they are quite similar.

As we need full control over the binary format, we have to derive our own hand written instances for the Binary class (some examples):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
instance Binary NcduTcHeader where
  put x = do
    put (ncduPktSize x)
    put (convertType.ncduType $ x)
    put (ncduScID x)
  get = do
    s <- getWord32be
    t <- getWord16be
    scid <- getWord16be
    return $ NcduTcHeader s (convertToType t) scid
 
instance Binary NcduHeaderTransType where
 put NCDU_TC_AD_SERTYPE = put (0 :: Word8)
 put NCDU_TC_BD_SERTYPE = put (1 :: Word8)
 get = do
   val <- get :: Get Word8
   return $ case val of
     0 -> NCDU_TC_AD_SERTYPE
     1 -> NCDU_TC_BD_SERTYPE
     _ -> NCDU_TC_BD_SERTYPE
 
instance Binary NcduTcPktHeader where
 put x = do
   put (ncduTcPktTransType x)
   put (ncduTcPktSeqCnt x)
   put (ncduTcPktVcID x)
   put (ncduTcPktMapID x)
 get = do
   transType <- get :: Get NcduHeaderTransType
   seqCnt <- get :: Get Word32
   vcID <- get :: Get Word8
   mapID <- get :: Get Word8
   return $ NcduTcPktHeader transType seqCnt vcID mapID

This is just boilerplate which is necessary. With automatic deriving from Binary maybe some boilerplate would not have been necessary, but in other parts we need more fine grained control over how the data type is serialized.

Also I don’t find this quite bad to write, in C++ depending on the serialization library used, it is often a lot more wordy. Another important issue is, that the format is now fixed, so if the binary maintainers choose to change the serialisation underneath, we won’t be affected. For fixed binary protocols, this is inevitable.

Ok, so we have some structures and the de/serialisation in place, we now need a function to read the message from the socket and decode it. The first attempt, which stems from long years of low level C and C++ programming looked like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
receiveTC :: Socket -> IO (Maybe NcduTcDu)
receiveTC handle = do
  msg <- recvAll handle (fromIntegral hdrLen)
 
  case B.null msg of
    True -> return Nothing
    False -> do
      let dec = decodeOrFail msg :: Either (B.ByteString, ByteOffset, String) (B.ByteString, ByteOffset, NcduTcHeader)
      logStr nctrsArea $ "Decoded NCTRS header: " ++ show dec
      case dec of
        Left (_,_,err) -> throwIO (DecodeError err)
        Right (_,_,hdr) -> do
          let len = msgCalcPayloadLen hdr
              ncduLen = ncduPktSize hdr
 
          if ncduLen <= 0
            then do
              logStr lAreaAlways "Received illegal Pkt with length=0, ignoring"
              return Nothing
            else do
              pl <- recvAll handle $ fromIntegral len
 
              case B.null pl of
                True -> return Nothing
                False -> do
                  let dec = decodeOrFail $ msg `B.append` pl :: Either (B.ByteString, ByteOffset, String) (B.ByteString, ByteOffset, NcduTcDu)
                  logStr nctrsArea $ "NCDU: " ++ show dec
                  case dec of
                    Left (_,_,err) -> throwIO (DecodeError err)
                    Right (_,_,pkt) -> do
                      logStr nctrsArea $ "Received Pkt: " ++ dumpPkt pkt
                      return $ Just pkt
 
 
recvAll :: Socket -> Int64 -> IO B.ByteString
recvAll handle n = do
  msg <- recv handle n
  case B.null msg of
    True -> do
      logStr nctrsArea "EOF"
      return msg
    False -> do
      let len = B.length msg
      logStr nctrsArea $ "Read bytes: " ++ show len
      if len < n
        then do
          rest <- recvAll handle (n - len)
          return (msg `B.append` rest)
        else do
          logStr nctrsArea $ "Received: " ++ hexDumpString msg
          return $! msg

Yeah! This is really like old-fashioned C-code! Haskellers may forgive me, but this was really the first version I wrote (as I was quite early in the Haskell learning stage). This code is bad in more than one regard, so let’s just quickly go over it and then see if we can come up with something better.

First, there is the recvAll function. It’s sole purpose is to read exactly the number of bytes it is given from the socket. Reason is, that the recv call can return fewer bytes than requested. I first didn’t have this function and then suddenly got decoding errors when the data rate was going up, until I realized this.

So receiveTC first loads hdrLen bytes, which is the length of the NcduTcHeader. Recv returns an empty bytestring when the socket has closed, so in this case we return Nothing. Otherwise we decode the header and in case of a decode error throw an exception. This is one of the bad things, since one time we simply return Nothing, in the other case we throw an exception. This is really bad ™. But anyway, the exception also terminates the reading thread, so that for the mission control system the connection goes down, which in case of a decoding error is ok.

Then the remaining length is calculated, in that the length of the header is subtracted from the complete message length. Then this new length bytes are read again from the socket. In case all is well, the bytestring containing the header and the read rest is concatenated and decoded so that the complete NcduTcDu is available. This is another shortcoming, as the primary header is decoded twice. Then after some possible error case handling, the TC is returned.

I think it is easy to see, that this works, but it is inconsistent in the error handling, inefficient and also some the decoding of the primary header is redundant.

Ok, time to improve this. The problems of this code are just a symptom of the interleaving of reading and decoding the messages. One tool, which does this better is when using attoparsec. It is a fast binary parser. For this, the get implementations of Binary Instances for the NCTRS data structures will be replaced with the attoparsec implementation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
ncduTcParser = do
  hdr <- ncduTcHeaderParser
 
  let len = msgCalcPayloadLen hdr
 
  case (ncduType hdr) of
    NCDU_TC_CLTU_TYPE -> do
      chdr <- ncduTcCltuHeaderParser
      payl <- A.take $ fromIntegral (len - cltuSecHdrLen)
      return $ NcduTcDuHeaderCltu hdr chdr (B.fromStrict payl)
    NCDU_CLTU_RES_TYPE -> do
      chdr <- ncduTcCltuResponseParser
      return $ NcduTcDuHeaderCltuResp hdr chdr
    NCDU_TC_FRAME_TYPE -> do
      chdr <- ncduTcCltuHeaderParser
      payl <- A.take $ fromIntegral (len - cltuSecHdrLen)
      return $ NcduTcDuHeaderCltu hdr chdr (B.fromStrict payl)
    _ -> do
      phdr <- ncduTcPktHeaderParser
      payl <- A.take $ fromIntegral (len - pktHdrLen)
      return $ NcduTcDuHeaderPkt hdr phdr (B.fromStrict payl)
 
ncduTcHeaderParser = do
  len <- anyWord32be
  typ <- convertToType <$> anyWord16be
  scid <- anyWord16be
  return $ NcduTcHeader len typ scid
 
ncduTcPktHeaderParser = do
  transType <- ncduTransTypeParser
  seqCnt <- anyWord32be
  vcID <- anyWord8
  mapID <- anyWord8
  return $ NcduTcPktHeader transType seqCnt vcID mapID

This is more or less a direct translation from the Binary instances to attoparsec. This piece also uses attoparsec-binary helper functions.

Ok, as a first try, the receiveTC function can now be implemented as:

1
2
3
4
5
6
7
receiveTC :: Socket -> IO (Maybe NcduTcDu)
receiveTC sock = do
  result <- parseWith (N.recv sock 4096) ncduTcParser BS.empty
  case result of
    A.Done _ r -> return $! Just r
    A.Fail _ _ _ -> return Nothing
    A.Partial f -> return Nothing

And guess what? It works! with a lot less lines of the receiving function. Still it is not correct, as the Partial result is not handled correctly, which is bad, but for a proof of concept this is ok.

But we can also go a step further and let the looping and handling of partials be handled by a streaming library. An this is where we change to conduits. Conduits stream data through a pipeline where we can put individual pieces together. So it would be cool, if we have a socket component which streams it’s data to  a component which can use attoparsec which streams it’s data to a component which does further processing. And fortunately, these components are available. We can use the conduit-extra library, which contains conduits for networking and attoparsec (and also binary if we want to use it instead). Cool, so lets write the whole receive loop in conduits:

1
2
3
4
5
6
7
8
9
receiveLoop sock handler = do
 result <- sourceSocket sock $= conduitParserEither ncduTcParser $$ sink
 return result
 where
   sink :: Sink (Either ParseError (PositionRange, NcduTcDu)) IO ()
   sink = awaitForever $ \tc -> do
     case tc of
       Left err -> liftIO $ logStr lAreaAlways $ show err
       Right (_, tc) -> liftIO $ handler tc

Ok, we have the pipeline as the first line in the function: it uses a sourceSocket on the given socket which streams the data into a conduitParserEither of the attoparsec conduit which simply uses the ncduTcParser we defined earlier and finally puts it into a sink. The sink itself is very simple: it uses conduit’s awaitForever function to get/wait for data from upstream and in case it gets a valid TC from the attoparsec parser, it just calls the given handler function with it. In case of an error it logs the error and continues.

Ok, so how does it fit into the bigger picture? If you remember the TC thread which receives the TC’s and forwards them to the next layers of processing from the last post, we now can simplify that to:

1
2
3
4
5
6
7
8
9
10
readSocket :: Socket -> TCChannel -> Simulation ()
readSocket handle tcChan = do
  action handle tcChan
 
  where
    action handle tcChan = do
      res <- liftIO $ receiveLoop handle (handler tcChan)
      return ()
      where
        handler tcChan pkt = liftIO $ atomically $ writeTChan tcChan $ TCDU pkt

So it just calls the receiveLoop with a handler, that packs the TC into a data constructor, which says that the processing should process this NCTRS TC Data Unit (instead of doing other things like terminating…) and puts this into the TChan for the TC Processor. That’s it!

The functions are shorter and (if you once got through the concepts) quite readable. I think this streaming approach combined with different possibilities is a cool thing. If I imagine to implement this in C++, I get some template headaches and would have to write much more.

That’s it for this time, nothing really ground breaking here, but maybe it is useful for someone.

Also, there are some other projects coming up, which will then really use Haskell applications inside the operational context of mission control and not just testing, so the next challenges are on the way. Let’s see how it works out then…

 

 

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.