Saturday, May 28, 2016

Node JS streams

Streams in NodeJS

Streams in NodeJS are not as complicated as people perceive. Although it can easily go from mere to composite in a blink of an eye.

Streams help you to picture complicated business into its simplified version with a couple of lines of code. With all complications abstracted in a module so to say. This promotes code simplicity, re-usability and in the process more value for code. If you are willing to introspect code with and without streams you'll certainly observe 2 or 3 lines of code would have saved some 30 - 40 lines of code, especially for people who build APIs and SDKs

Lets start small and a meager activity. Copy a file from one location/directory to another. You'd ask why such a mundane activity.

fs.createReadStream(sourceFile).pipe(fs.createWriteStream(targetFile));

Where is the validation of source file and destination path etc. One step at a time. This is a simple file copy operation. If you capsule this line within a try-catch block, a lot of the issues that you are thinking about are handle inherently without any effort. Now, you can let your imagination go wild and build on this solution to copy directories, drives or more. Our next step is something that will be more useful from a day to day perspective

More than copying a file

Downloading file from a remote location. Something like say you want to download a s/w from a remote location over ftp/http/https etc. Automating such pieces would make it easier for us. Say for e.g You need to download a huge file from a remote location.

var request = require('request');
var path = require('path');
var REMOTE_LINK = "https://download.sublimetext.com/sublime_text_3_build_3103_x64.tar.bz2"
var LOCAL_DOWNLOAD_PATH = path.join(process.env.HOME, "Downloads");
var downloadFile = path.join(LOCAL_DOWNLOAD_PATH, path.basename(REMOTE_LINK));

request(REMOTE_LINK).pipe(fs.createWriteStream(downloadFile));

We can do some variations on this. You can associate events to let you know when streaming is done like the following and more.

request
 .get(REMOTE_LINK)
 .on('response', function(response) {
  console.log(response.length);
 })
 .on('error', function(err) {
  console.log('ERROR: Failed to download file: ' + err);
 })
 .on('end', function() {
  console.log('Completed file download successfully');
 })
 .pipe(fs.createWriteStream(downloadFile));

Above code is not only methodical, but serves as a documentation, with clear logs on each step. Even a novice developer will be able to follow with ease.

What we downloaded was a bz2 file, what if we need to download and unzip too in one short?
That will be an exercise for you. What I can share though is check on npm pack utilities for 'untar', 'gunzip' etc

Let's look into a related example. How to unzip a .tar.gz2 file, get .tar. Here we go

fs.createReadStream(sourceFile)
 .pipe(zlib.createGunzip())
 .pipe(fs.createWriteStream(unzippedFile));

Alright, we unzipped a file, but we still just have a .tar file. How about extracting a tar after we unzip it

var tar = require('tar-fs');
var fs = require('fs');

if(process.argv.length < 4) {
    console.log('ERROR: Please pass on TAR file name followed by location to extract the file, to continue');
    process.exit();
}

const SOURCE_FILE = process.argv[2];
var DEST_PATH = process.argv[3];

var untarFile = function(sourceFile, targetPath) {
    try {
        fs.createReadStream(sourceFile).pipe(tar.extract(targetPath));
    } catch(e) {
        console.log(e);
        process.exit();
    }
}

untarFile(SOURCE_FILE, DEST_PATH);

Transformation solutions

So far we saw how to write NodeJS-stream code and download, unzip or tar file. How about little more serious activity relevance to translation/transformation of data. Say, we get stream of data and we wish to convert it to upper case, pass it along for further processing . Although this does not look like a day-to-day business problem, it helps to show case the potential of NodeJS streams. I'd consider this to be a seed solution for your business problems.

CapitalizingStream.js

var inherits = require('util').inherits;
var Transform = require('stream').Transform;

module.exports = CapitalizingTransformStream;

function CapitalizingTransformStream(options) {
    Transform.call(this, options);
}

inherits(CapitalizingTransformStream, Transform);

function _transform(chunk, encoding, callback) {
    if(encoding == 'buffer') {
        chunk = chunk.toString();
    }
    callback(null, chunk.toUpperCase());
}

CapitalizingTransformStream.prototype._transform = _transform;

ConvertCase.js

var net = require('net');
var CapitalizingStream = require('./CapitalizingStream');

function handleConnection(conn) {
    var remoteAddress = conn.remoteAddress + ':' + conn.remotePort;
    console.log('New client connection from %s', remoteAddress);
   
    // handle connection to service
    var service = new CapitalizingStream();
    service.once('error', onServiceError);

    conn.once('close', onConnectionClose);
    conn.on('error', onConnectionError);

    conn.pipe(service).pipe(conn);

    function onServiceError(err) {
        console.log('ERROR: %s', err.message);
    }

    function onConnectionClose() {
        console.log('Connection closed on :%s', remoteAddress);
    }

    function onConnectionError(err) {
        console.log('Connection %s error %s', remoteAddress, err.message);
    }
}

var server = net.createServer();
server.on('connection', handleConnection);
server.listen(9000, function() {
    console.log('Server listening to %j', server.address());
});

Though above solution may not have have much of industrial value, here's is something you can try on as an exercise and check your stream skills
  • Convert CSV to HTML. Pass this CSV contents from a remote location
  • Accept content, convert to HTML
  • Send it back

More automation with NodeJS

  • Downloading files in clusters for setup - sort of containers for download. Such solution helps people do setups for OS, their dev environment etc. Something like dockers, in a much much smaller scale
  • Download your favorite/bookmarks from youtube. Persist links in a store, periodically download it and convert mp3 to say mp4 or other formats
  • Reverse Proxy - Make application performance better through reverse proxy. Build gateway engines.
  • Build easy cache - Addition to above approach. 

NodeJS-Mongodb Persistent store

Multi-Tier applications

When we build web applications one of the most essential piece is services. Gone are the days we used to build single tier applications. We now have at a minimum 3-tier's to build on applications.

  • Tier-1 is UI
  • Tier-2 is services
  • Tier-3 is DB

What we will do in coming days

Real-time applications have a lot more. There will be at least 6-tiers which includes security, availability and performance tiers. For our case, we don't need that. But later we will see 



  • How to improve speed with NGINX on NodeJS solutions (performance) 
  • How to implement caching solutions with Reddis (Performance) 
  • How to use RabbitMQ with NodeJS (security & scalability) 
  • How to do authentication with NodeJS (security) 


3-Tier(s) are not perceptible logical layers, but in most occasions physical too. Running in different physical locations, systems, servers makes the application more manageable & testable.

So far in NodeJS samples we built, none of them help us persist data permanently. Our objective now is to connect NodeJS with MongoDB in the process persist data.

Install MongoDB

If you have not installed MongoDB, here's how you can do it
http://techkrishnan.blogspot.com/2016/05/installing-mongodb-osx.html

Install needed packages

We start in a less modular fashion to be conspicuous. As we expand we will have introduce other design patterns.

In addition to installing 'mongodb', I also installed the following 'querystring' and 'mongodb' client from npm

npm init
npm install querystring --save
npm install mongodb --save

Let's get started

Code to connect to MongoDB is below. Lets take a moment to dissect & understand what we got.

We connect to MongoDB using 'mongodb' package. It is through this client utility we connect to data store. We then define URI to connect to datastore. Connect method takes URI and callback as parameters. Always check for 'error' in callback, if error is NULL we return, else we proceed with the returned object - DB handle.

DB handle gives us baton to query for collection "contacts". MongoDB is a document DB, we do not have schema defined on it. Simple JSON object defined can be added into collection. Every documents in collection can be completely different. It is important on each step we check to see if we have right handle or received error. If it is an error, we return.

We have got code for OPTIONS method, we will talk about it in coming sessions

With the collection handle, next step is to check for GET and request path. If the path is what we are looking for we list the contents. MongoDB as asserted earlier is a document DB, which stores JSON collections. For us to interact over web, we need to convert this JSON objects into text (we could alternatively have 'content-type' set to 'application/javascript') for the sake of simplicity. Invoke find method on collection. If we receive error, we stop, else we return list of objects.

We move towards our next objective to insert data. We check to see if request method is POST and url is for 'contacts'. POST method passes data in BODY. We need to tune into HTTP events to access data. Once we have data, we convert it to 'utf-8' type string for processing, remmeber we sent data in RAW format from POSTMAN. Convert the string data received into JSON. If there are issues in conversion process, application may crash. To avoid such fallibilities we encompass conversion in try-catch block. We now have data that can be inserted into collection. This is done by invoking insert method. Again check with callback method for errors, if not, return the object which conveys how many objects were inserted.

How do we test this?

I suggest you install 'PostMan REST client' from Chrome/Firefox/Safari extension.

List contents


  • Open POSTMAN, 
  • choose GET method and set URL to be 'http://localhost:3000/contacts'. 
  • Tap on "SEND" method. 
  • If 'contacts' collection has data, we will see all of them . If there is no data, nothing comes back. 

Add/Insert content


  • We test POST method. Set method to 'POST', URL to be 'http://localhost:3000/contacts'
  • To pass data, select 'BODY' and ensure 'raw' option is selected with JSON (application/json) format. Add following data 
  • {"firstName":"someName1","lastName":"someName2","emailId":"someName@outlook.com","phoneNo":"XXX-XXX-3244"}
  • Tap on send. 


Now go back to GET, you should see inserted document(s).

/**
 * Created by krishnansriramrama on 5/27/16.
 */
var http = require('http');
var util = require('util');
var querystring = require('querystring');
var client = require('mongodb').MongoClient;

var uri = process.env.MONGOLAB_URI || 'mongodb://@127.0.0.1:27017/krishnan';

client.connect(uri, function(error, db) {
    if(error) {
        return console.error(error);
    }

    var collection = db.collection('contacts');
    var app = http.createServer(function(request, response) {
        var origin = (request.headers.origin || '*');
        if(request.method === 'OPTIONS') {
            response.writeHead('204', 'No Content', {
                'Access-Control-Allow-Origin': origin,
                'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, OPTIONS',
                'Access-Control-Allow-Headers': 'content-type, accept',
                'Access-Control-Max-Age': 10,
                'Content-Length': 0
            });
            response.end();
        } else if(request.method === 'GET' && (request.url === '/contacts' || request.url === '/contacts/')) {
            collection.find().toArray(function(error, results) {
                if(error) {
                    return console.error(error);
                }
                var body = JSON.stringify(results);
                response.writeHead('200', {
                    'Access-Control-Allow-Origin': origin,
                    'Content-Type': 'text/plain',
                    'Content-Length': body.length
                });
                console.log('List of objects returned from DB');
                console.dir(results);
                response.end(body);
            });
        } else if(request.method === 'POST' && (request.url === '/contacts' || request.url === '/contacts/')) {
            request.on('data', function(data) {
               console.log('Received Data');
                data = data.toString('utf-8');
                console.log(data);
                try {
                    data = JSON.parse(data);
                } catch(error) {
                    if(error) {
                        return console.error(error);
                    }
                }
                collection.insert(data, {safe:true}, function(error, obj) {
                   if(error) {
                       return console.error(error);
                   }
                    console.log('Object is saved');
                    console.log(JSON.stringify(obj));
                    var body = JSON.stringify(obj);
                    response.writeHead(200, {
                        'Access-Control-Allow-Origin': origin,
                        'Content-Type': 'text/plain',
                        'Content-Length': body.length
                    });
                    response.end(body);
                });
            });
        } else {
            response.end('Supported endpoints: GET /contacts, POST /contacts');
        }
    });
    var port = process.env.PORT || 3000;
    app.listen(port, function() {
        console.log('Server running in port - ' + port);
    });
});