Importing Data Doesn’t have to be hard: Transferring Structured Data at Scale with Csvtool

by mb | 2020/05/20


Fortunately for all of us, data ingestion and modeling doesn’t have to be a painstakingly manual process. Rather than transcribing information from a spreadsheet into individual nodes by hand, we can use the Synapse csvtool to bulk-ingest structured data into the cortex.

Using csvtool to Load Data into a Cortex

To see how csvtool works, let’s imagine that we’ve received the following DNS A records from a partner and want to ingest them into our cortex. This file (let’s call it ingest.csv), contains selected DNS resolution information for several previously published APT28 domains. By viewing the header we can see that the data is divided into four columns titled fqdn, IP, first-seen, and last-seen, which identify the domain, the IP address that it resolved to, and the timeframe within which the resolution took place.

fqdn,ip,first-seen,last-seen
theguardiannews.org,12.131.129.89,2016/09/20 13:10:12.000,2016/09/20 13:10:12.001
theguardiannews.org,5.135.183.154,2015/12/11 10:57:12.000,2015/12/14 10:59:15.000
theguardiannews.org,31.210.118.89,2015/12/18 09:25:42.000,2016/12/09 10:59:08.000
theguardiannews.org,32.210.118.89,2015/12/17 00:00:00.000,2015/12/17 18:17:55.000
nato-news.com,185.82.202.174,2015/10/10 00:00:00.000,2016/09/28 05:30:53.000
bbc-news.org,207.180.214.158,2019/04/15 07:16:09.364,2019/04/24 07:31:37.271
bbc-news.org,192.64.119.28,2019/04/18 19:04:01.000,2019/04/19 07:31:27.360
bbc-news.org,217.160.182.197,2015/05/03 10:31:14.000,2015/05/03 10:31:14.001

But before we ingest this CSV file, we’ll first need to create a file with instructions for how Storm should handle this data. Given that the data represents DNS A records, we’ll want Storm to make each row into an inet:dns:a node,which means that our query file (we’ll call it ingest.storm) will contain the following instructions:

// Loop over the $rows which are provided by csvtool
for ($fqdn, $ipv4, $first, $last) in $rows {
    // Add an inet:dns:a node and update its .seen time window
    [ inet:dns:a = ( $fqdn, $ipv4 ) .seen=( $first, $last ) ]
}

Once we’ve written our query file, we can ingest our CSV file into a test cortex (using the --test option) so that we can check for and address any errors prior to uploading the data to our production cortex. Those who prefer to live dangerously may skip this step. We’ll do this using the command below, which also uses the --csv-header option to instruct csvtool to skip over the header row when ingesting from the CSV file (see the Synapse Documentation for the full list of supported options):

python -m synapse.tools.csvtool --csv-header --cli --test ingest.storm ingest.csv

Our CSV file uploads to the test cortex without throwing any errors, and a quick query confirms that Storm correctly formatted the data into inet:dns:a nodes. Now, we’re ready to upload to our production cortex, which, in the example command below, is listening on tcp://localhost:4444/.

python -m synapse.tools.csvtool --csv-header --cli --cortex tcp://localhost:4444/ ingest.storm ingest.csv

Our CSV file uploads without any errors, and just like that, the data is in the cortex.

cli> storm inet:dns:a:fqdn=theguardiannews.org inet:dns:a:fqdn=nato-news.com inet:dns:a:fqdn=bbc-news.org
Executing query at 2020/05/14 15:59:59.825
inet:dns:a=('theguardiannews.org', '12.131.129.89')
        .created = 2020/05/14 15:59:38.843
        .seen = ('2016/09/20 13:10:12.000', '2016/09/20 13:10:12.001')
        :fqdn = theguardiannews.org
        :ipv4 = 12.131.129.89
inet:dns:a=('theguardiannews.org', '5.135.183.154')
        .created = 2020/05/14 15:59:38.848
        .seen = ('2015/12/11 10:57:12.000', '2015/12/14 10:59:15.000')
        :fqdn = theguardiannews.org
        :ipv4 = 5.135.183.154
inet:dns:a=('theguardiannews.org', '31.210.118.89')
        .created = 2020/05/14 15:59:38.849
        .seen = ('2015/12/18 09:25:42.000', '2016/12/09 10:59:08.000')
        :fqdn = theguardiannews.org
        :ipv4 = 31.210.118.89
inet:dns:a=('theguardiannews.org', '32.210.118.89')
        .created = 2020/05/14 15:59:38.851
        .seen = ('2015/12/17 00:00:00.000', '2015/12/17 18:17:55.000')
        :fqdn = theguardiannews.org
        :ipv4 = 32.210.118.89
inet:dns:a=('nato-news.com', '185.82.202.174')
        .created = 2020/05/14 15:59:38.853
        .seen = ('2015/10/10 00:00:00.000', '2016/09/28 05:30:53.000')
        :fqdn = nato-news.com
        :ipv4 = 185.82.202.174
inet:dns:a=('bbc-news.org', '207.180.214.158')
        .created = 2020/05/14 15:59:38.857
        .seen = ('2019/04/15 07:16:09.364', '2019/04/24 07:31:37.271')
        :fqdn = bbc-news.org
        :ipv4 = 207.180.214.158
inet:dns:a=('bbc-news.org', '192.64.119.28')
        .created = 2020/05/14 15:59:38.860
        .seen = ('2019/04/18 19:04:01.000', '2019/04/19 07:31:27.360')
        :fqdn = bbc-news.org
        :ipv4 = 192.64.119.28
inet:dns:a=('bbc-news.org', '217.160.182.197')
        .created = 2020/05/14 15:59:38.861
        .seen = ('2015/05/03 10:31:14.000', '2015/05/03 10:31:14.001')
        :fqdn = bbc-news.org
        :ipv4 = 217.160.182.197
complete. 8 nodes in 64 ms (125/sec).

Using csvtool to Export Data

While we all know that integrated structured data sources are best, there are times when you just need a good old spreadsheet. Let’s say that we need to share some information with a partner, and need a way to export data from our cortex into a structured format that our partner can access. To do this, we can once again use csvtool, this time to export our selected nodes into individual rows with their associated properties divided into columns.

We’ll need to write another query file (we’ll call this one export.storm) instructing Storm how to export our data, similar to what we did when ingesting our CSV file into the cortex. If we use those same DNS A records as an example, the contents of our query file will appear as follows:

inet:dns:a:fqdn=theguardiannews.org
inet:dns:a:fqdn=nato-news.com
inet:dns:a:fqdn=bbc-news.org

($first, $last) = $node.repr(".seen")

$lib.csv.emit(:fqdn, $node.repr(ipv4), $first, $last)

We can then export the selected data to a CSV file by running the following command:

python -m synapse.tools.csvtool --debug --export --cortex tcp://localhost:4444/ export.storm export.csv

The resulting CSV file is formatted so that each inet:dns:a node became an individual row, with the :fqdn, :ipv4, and .seen properties divided into separate columns, as seen below:

theguardiannews.org,12.131.129.89,2016/09/20 13:10:12.000,2016/09/20 13:10:12.001
theguardiannews.org,5.135.183.154,2015/12/11 10:57:12.000,2015/12/14 10:59:15.000
theguardiannews.org,31.210.118.89,2015/12/18 09:25:42.000,2016/12/09 10:59:08.000
theguardiannews.org,32.210.118.89,2015/12/17 00:00:00.000,2015/12/17 18:17:55.000
nato-news.com,185.82.202.174,2015/10/10 00:00:00.000,2016/09/28 05:30:53.000
bbc-news.org,207.180.214.158,2019/04/15 07:16:09.364,2019/04/24 07:31:37.271
bbc-news.org,192.64.119.28,2019/04/18 19:04:01.000,2019/04/19 07:31:27.360
bbc-news.org,217.160.182.197,2015/05/03 10:31:14.000,2015/05/03 10:31:14.001

Ingest all the Things!

As you can see, csvtool is a convenient method for both ingesting large amounts of structured data into the cortex as nodes and neatly exporting nodes when necessary, all while maintaining the structure found in the Synapse data model.

It’s worth noting that when ingesting and exporting data, csvtool does so incrementally, executing the Storm query over one segment of data at a time. As a result, it’s safe to transfer large amounts of data while continuing to work on other tasks, as csvtool can ingest and export data without slowing the cortex or otherwise negatively impacting operations.

For more information on how to use csvtool, see the Synapse User Guide.