Library/Streams

From HaskellWiki
Jump to navigation Jump to search

Introduction

Streams: the extensible I/O library

I (Bulat Ziganshin) developed a new I/O library in 2006 that IMHO is so sharp that it can eventually replace the current I/O facilities based on using Handles. The main advantage of the new library is its strong modular design using typeclasses. The library consists of small independent modules, each implementing one type of stream (file, memory buffer, pipe) or one part of common stream functionality (buffering, Char encoding, locking). 3rd-party libs can easily add new stream types and new common functionality. Other benefits of the new library include support for streams functioning in any monad, Hugs and GHC compatibility, high speed and an easy migration path from the existing I/O library.

The Streams library is heavily based on the HVIO module written by John Goerzen. I especially want to thank John for his clever design and implementation. Really, I just renamed HVIO to Stream and presented this as my own work. :) Further development direction was inspired by the "New I/O library" written by Simon Marlow.

---

More recent, 2013-04, developments have focused on Iteratee_I/O and in particular io-streams is similar in it's focus on I/O and replacing file handles.

Simple Streams

The key concept of the lib is the Stream class, whose interface mimics familiar interface for Handles, just with "h" replaced with "v" in function names:

 class (Monad m) => Stream m h where
    vPutStrLn :: h -> String -> m ()
    vGetContents :: h -> m String
    vIsEOF :: h -> m Bool
    vClose :: h -> m ()
    ....................

This means that you already know how to use any stream! The Stream interface currently has 8 implementations: a Handle itself, raw files, pipes, memory buffers and string buffers. Future plans include support for memory-mapped files, sockets, circular memory buffers for interprocess communication and UArray-based streams.

By themselves, these Stream implementations are rather simple. Basically, to implement new Stream type, it's enough to provide vPutBuf/vGetBuf operations, or even vGetChar/vPutChar. The latter way, although inefficient, allows us to implement streams that can work in any monad. StringReader and StringBuffer streams use this to provide string-based Stream class implementations both for IO and ST monads. Yes, you can use the full power of Stream operations inside the ST monad!

Layers of functionality

All additional functionality is implemented via Stream Transformers, which are just parameterized Streams, whose parameters also implement the Stream interface. This allows you to apply any number of stream transformers to the raw stream and then use the result as an ordinary Stream. For example:

          h <- openRawFD "test" WriteMode
                   >>= bufferBlockStream
                   >>= withEncoding utf8
                   >>= withLocking

This code creates a new FD, which represents a raw file, and then adds to this Stream buffering, Char encoding and locking functionality. The result type of "h" is something like this:

          WithLocking (WithEncoding (BufferedBlockStream FD))

The complete type, as well as all the intermediate types, implements the Stream interface. Each transformer intercepts operations corresponding to its nature, and passes the rest through. For example, the encoding transformer intercepts only vGetChar/vPutChar operations and translates them to the sequences of vGetByte/vPutByte calls of the lower-level stream. The locking transformer just wraps any operation in the locking wrapper.

We can trace, for example, the execution of a "vPutBuf" operation on the above-constructed Stream. First, the locking transformer acquires a lock and then passes this call to the next level. Then the encoding transformer does nothing and passes this call to the next level. The buffering transformer flushes the current buffer and passes the call further. Finally, FD itself performs the operation after all these preparations and on the returning path, the locking transformer release its lock.

As another example, the "vPutChar" call on this Stream is transformed (after locking) into several "vPutByte" calls by the encoding transformer, and these bytes go to the buffer in the buffering transformer, with or without a subsequent call to the FD's "vPutBuf".

Modularity

As you can see, stream transformers really are independent of each other. This allows you to use them on any stream and in any combination (but you should apply them in proper order - buffering, then Char encoding, then locking). As a result, you can apply to the stream only the transformers that you really need. If you don't use the stream in multiple threads, you don't need to apply the locking transformer. If you don't use any encodings other than Latin-1 -- or don't use text I/O at all -- you don't need an encoding transformer. Moreover, you may not even need to know anything about the UserData transformer until you actually need to use it :)

Both streams and stream transformers can be implemented by 3rd-party libraries. Streams and transformers from arbitrary libraries will seamlessly work together as long as they properly implement the Stream interface. My future plans include implementation of an on-the-fly (de)compression transformer and I will be happy to see 3rd-party transformers that intercept vGetBuf/vPutBuf calls and use select(), kqueue() and other methods to overlap I/O operations.

Speed

A quick comment about speed: it's fast enough -- 10-50 MB/s (depending on the type of operation) on a 1GHz cpu. The Handle operations, for comparison, show speed of 1-10 mb/s on the same computer. But that doesn't mean that each and any operation in new library is 10 times faster. Strict I/O (including vGetChar/vPutChar) is a LOT faster. I included a demonstration of this fascinating speed as "Examples/wc.hs". If you need a really high speed, don't forget to increase buffer size with "vSetBuffering".

On the other side, lazy I/O (including any operations that receive or return strings) show only modest speedup. This is limited by Haskell/GHC itself and I can't do much to get around these limits. Instead, I plan to provide support for I/O using packed strings. This will allow to write I/O-intensive Haskell programs that are as fast as their C counterparts.

Other sources of slowness includes using of locking transformer (if you need to do this, try use "lock" around speed-critical algorithms) and complex class structure, what may be avoided by using "forall" types (I'm not sure, Simon Marlow can enlighten this topic).

The library includes benchmarking code in the file "Examples/StreamsBenchmark.hs"


Overview of Stream transformers

Buffering

There are three buffering transformers. Each buffering transformer implements support for vGetByte, vPutChar, vGetContents and other byte- and text-oriented operations for the streams, which by themselves support only vGetBuf/vPutBuf (or vReceiveBuf/vSendBuf) operations.

The first transformer can be applied to any stream supporting vGetBuf/vPutBuf. This is applied by the operation "bufferBlockStream". The well-known vSetBuffering/vGetBuffering operations are intercepted by this transformer and used to control buffer size. At this moment, only BlockBuffering is implemented, while LineBuffering and NoBuffering are only in the planning stages.

Two other transformers can be applied to streams that implement vReceiveBuf/vSendBuf operations -- that is, streams whose data reside in memory, including in-memory streams and memory-mapped files. In these cases, the buffering transformer doesn't need to allocate a buffer itself, it just requests from the underlying stream the address and size of the next available portion of data. Nevertheless, the final result is the same -- we get support for all byte- and text-oriented I/O operations. The "bufferMemoryStream" operation can be applied to any memory-based stream to add buffering to it. The "bufferMemoryStreamUnchecked" operation (which implements the third buffering transformer) can be used instead, if you can guarantee that I/O operations can't overflow the used buffer.

Encoding

The Char encoding transformer allows you to encode each Char written to the stream as a sequence of bytes, implementing UTF and other encodings. This transformer can be applied to any stream implementing vGetByte/vPutByte operations and in return it implements vGetChar/vPutChar and all other text-oriented operations. This transformer can be applied to a stream with the "withEncoding encoding" operation, where `encoding` may be `latin1`, `utf8` or any other encoding that you (or a 3rd-party lib) implement. Look at the "Data.CharEncoding" module to see how to implement new encodings. Encoding of streams created with the "withEncoding" operation can be queried with "vGetEncoding". See examples of their usage in the file "Examples/CharEncoding.hs"

Locking

The locking transformer ensures that the stream is properly shared by several threads. You already know enough about its basic usage -- "withLocking" applies this transformer to the stream and all the required locking is performed automagically. You can also use "lock" operations to acquire the lock explicitly during multiple operations:

  lock h $ \h -> do
    savedpos <- vTell h
    vSeek h AbsoluteSeek 100
    vPutStr h ":-)"
    vSeek h AbsoluteSeek savedpos

See the file "Examples/Locking.hs" for examples of using locking transformer.

Attaching user data

This transformer allows you to attach arbitrary data to any Stream. It does nothing extraordinary except that the stream with attached data is the proper Stream, again. See example of its usage in the file "Examples/UserData.hs"

Overview of Stream types

Handle (legacy way to access files/sockets)

"Handle" is an instance of the Stream class, with a straightforward implementation. You can use the Char encoding transformer with Handles. Although Handles implement buffering and locking by themselves, you may also be interested in applying these transformers to the Handle type. This has benefits -- "bufferBlockStream" works faster than internal Handle buffering, and the locking transformer enables the use of a "lock" operation to create a lock around a sequence of operations. Moreover, the locking transformer should be used to ensure proper multi-threading operation of Handle with added encoding or buffering facilities.

FD (new way to access files)

The new method of using files, independent of the existing I/O library, is implemented with the FD type. FD is just an Int representing a POSIX file descriptor and the FD type implements only basic Stream I/O operations - vGetBuf and vPutBuf. So, to create a full-featured FD-based stream, you need to apply buffering transformers. Therefore, the library defines two ways to open files with FD - openRawFD/openRawBinaryFD just creates FD, while openFD/openBinaryFD creates FD and immediatelly apply buffering transformer (bufferBlockStream) to it. In most cases you will use the latter operations. Both pairs mimic the arguments and behaviour of well-known Handle operations openFile/openBinaryFile, so you already know how to use them. Other transformers may be used then as you need. So, abovementioned example can be abbreviated to:

          h <- openFD "test" WriteMode
                   >>= withEncoding utf8
                   >>= withLocking

Thus, to switch from the existing I/O library to using Streams, you need only to replace "h" with "v" in the names of Handle operations, and replace openFile/openBinaryFile calls with openFD/openBinaryFD while adding the "withLocking" transformer to files used in multiple threads. That's all!

For example, the following code:

  h <- openFile "test" ReadMode
  text <- hGetContents h
  hClose h

should be translated to:

  h <- openFD "test" ReadMode
         --  >>= withLocking  -- needed only for multi-threaded usage
  text <- vGetContents h
  vClose h


File "Examples/FD.hs" will show you the FD usage.


In order to work with stdin/stdout/stderr via FDs, you should open them in the same way:

stdinStream  <- bufferBlockStream fdStdIn  
                    >>= withEncoding utf8    -- optional, required only for using non-Latin1 encoding
                    >>= withLocking          -- optional, required only to use this Stream in concurrent Haskell threads

stdoutStream <- bufferBlockStream fdStdOut
                    >>= withEncoding utf8    -- see above
                    >>= withLocking          -- ...

stderrStream <- bufferBlockStream fdStdErr
                    >>= withEncoding utf8    -- ...
                    >>= withLocking          -- ...

Please note that Streams currently supports only block buffering, there is no line buffering and no-buffering support.

MemBuf (memory-resident stream)

MemBuf is a stream type, that keeps its contents in memory buffer. There are two types of MemBufs you can create - you can either open existing memory buffer with "openMemBuf ptr size" or create new one with "createMemBuf initsize". MemBuf opened by "openMemBuf" will be never resized or moved in memory, and will not be freed by "vClose". MemBuf created by "createMemBuf" will grow as needed, can be manually resized by "vSetFileSize" operation, and is automatically freed by "vClose".

Actually, raw MemBufs created by the "createRawMemBuf" and "openRawMemBuf" operations, while createMemBuf/openMemBuf incorporate an additional "bufferMemoryStream" call (as you should remember, buffering adds vGetChar, vPutStr and other text- and byte-I/O operations on top of vReceiveBuf and vSendBuf). You can also apply Char encoding and locking transformers to these streams. The "saveToFile" and "readFromFile" operations provide an easy way to save/restore buffer contents in a file.

File "Examples/MemBuf.hs" demonstrates the usage of MemBuf.

FunctionsMemoryStream

This Stream type allows implementation of arbitrary streams, just by providing three functions that implement vReceiveBuf, vSendBuf and cleanup operations. It seems that this Stream type is of interest only for my own program and can be scrutinized only as example of creating 3rd-party Stream types. It is named "FunctionsMemoryStream", see the sources if you are interested.

StringReader & StringBuffer (String-based streams)

Four remaining Stream types were part of the HVIO module and I copied their description from there:

In addition to Handle, there are several pre-defined stream types for your use. 'StringReader' is a particularly interesting one. At creation time, you pass it a String. Its contents are read lazily whenever a read call is made. It can be used, therefore, to implement filters (simply initialize it with the result from, say, a map over hGetContents from another Stream object), codecs, and simple I/O testing. Because it is lazy, it needs not hold the entire string in memory. You can create a 'StringReader' with a call to 'newStringReader'.

'StringBuffer' is a similar type, but with a different purpose. It provides a full interface like Handle (it supports read, write and seek operations). However, it maintains an in-memory buffer with the contents of the file, rather than an actual on-disk file. You can access the entire contents of this buffer at any time. This can be quite useful for testing I/O code, or for cases where existing APIs use I/O, but you prefer a String representation. Note however that this stream type is very inefficient. You can create a 'StringBuffer' with a call to 'newStringBuffer'.

One significant improvement over the original HVIO library is that 'StringReader' and 'StringBuffer' can work not only in IO, but also in ST monad.

Pipes (passing data between Haskell threads)

Finally, there are pipes. These pipes are analogous to the Unix pipes that are available from System.Posix, but don't require Unix and work only in Haskell. When you create a pipe, you actually get two Stream objects: a 'PipeReader' and a 'PipeWriter'. You must use the 'PipeWriter' in one thread and the 'PipeReader' in another thread. Data that's written to the 'PipeWriter' will then be available for reading with the 'PipeReader'. The pipes are implemented completely with existing Haskell threading primitives, and require no special operating system support. Unlike Unix pipes, these pipes cannot be used across a fork(). Also unlike Unix pipes, these pipes are portable and interact well with Haskell threads. A new pipe can be created with a call to 'newHVIOPipe'.


Additional details

Support for GHC, Hugs and other compilers

The library is compatible with GHC 6.4


The library fully supports Hugs 2003-2006, but

1) support for FD and MMFile is temporarily disabled because I don't know how to build DLLs

2) Hugs 2003 doesn't include support for "instance Bits Word" and vGetBuf/vPutBuf, so you need to add these implementations manually or delete the lines that use it (look for "2003" in the sources)

3) WinHugs doesn't support preprocessing, so I included the MakeHugs.cmd script to preprocess source files using cpphs


Main disadvantage of the library is that it supports only Hugs and GHC because of using extensions in type classe system (namely, MPTC+FD). I think that it can be made H98-compatible at the cost of excluding support for non-IO monads. I will try to make such a stripped version for other compilers if people are interested.

Downloading and installation

To get Streams 0.1.7, you can download one of

 http://files.pupeno.com/software/streams/Streams-0.1.7.tar.bz2
 http://files.pupeno.com/software/streams/Streams-0.1.7.tar.gz

or you can get it from its repository by running:

darcs get --tag=0.1.7 http://software.pupeno.com/Streams-0.1 Streams-0.1.7

You can also download and keep track of the 0.1 branch, which is supposed to remain stable and only get bug-fixes by running

darcs get http://software.pupeno.com/Streams-0.1/

and then run 'darcs pull' inside it to get further changes.

To get the latest unstable and fluctuating version, the development version, run:

darcs get http://software.pupeno.com/Streams/

Note: as of this moment, while the project is being darcsified you are not going to find anything useful there, but we expect that to change.

Preferably, you should send patches to code to Bulat.Ziganshin@gmail.com and to other parts of library to Pupeno. Documentation may be edited right at the project homepage, which remains http://haskell.org/haskellwiki/Library/Streams

Thanks to Jeremy Shaw, the library is now cabalized. To install it, run command:

 make install

Directory "Examples" contains examples of using the library.

Stage of development

The library is currently at the beta stage. It contains a number of known minor problems and an unknown number of yet-to-be-discovered bugs. It is not properly documented, doesn't include QuickCheck tests, is not cabalized, and not all "h*" operations have their "v*" equivalents yet. If anyone wants to join this effort in order to help fix these oddities and prepare the lib for inclusion in the standard libraries suite, I would be really happy. :) I will also be happy (although much less ;) to see bug reports and suggestions about its interface and internal organization. It's just a first public version, so we still can change everything here!

In particular, this wiki page is an official library documentation. Please continue to improve it and add more information about using the library. Feel free to ask me about library usage via email: Bulat.Ziganshin@gmail.com

Changelog

User-visible improvements made in Streams library since version 0.1 (6 Feb 2006)

0.1a (6 Feb 2006)
- Fixed bug: System.MMFile was uncompilable on non-Windows systems
0.1b (9 Feb 2006)
- Fixed bug: very slow WithLocking.vGetLine
- Fixed bug: System.FD was also uncompilable on non-Windows systems
0.1c (12 Feb 2006)
- Fixed bug: System.FD modified one more time to reach Unix compatibility
0.1d (13 Feb 2006)
- Fixed bug: BufferedBlockStream.vGetLine caused exception
* CharEncoding transformer was made faster, but vSetEncoding no more supported
0.1e (8 Jun 2006)
- Fixed bug: "openFD name WriteMode" didn't truncate files on unixes
* Full library now released under BSD3 license, thanks to John Goerzen
+ Now cabalized, thanks to Jeremy Shaw
0.1.6 (Oct 14 2006)
* Added compatibility with just released GHC 6.6
0.1.7 (Nov 24 2006)
* true support for GHC 6.6
* support of files larger than 4 gb on windows (see FD5gb.hs example)
* files are now open in shared mode on all systems
* haddock'ized internal docs
* ready to be included in any unix packaging system