Simplest low-cost forever-persistent storage method for sensor data

Fantastic ideas all around here and so great to learn what everyone is implementing.

After much searching and internal turmoil I found a simple “serverless” solution which fits well to my personal needs.

My flow entire flow is
(Device) -> (Gateway) -> (TTN) -> (TTN HTTP integration) -> (Google function - python) -> (Google Bigquery)

I have found this works quite well as a “serverless” option in the cloud. Within the google function I am checking devices off a whitelist or blacklist, decoding their payloads and metadata, then saving the resulting data into a row of a Bigquery database using a pre-defined schema.

The billing model for these services is quite consistent and cost-competitive. For anyone else with consistent sensor payloads and an attraction to “serverless” options, this may be of interest.

Thanks!
Michael

Get Big Query as source of Google Data Studio would also be great!

Hi @pomplesiegel ,

Could you share the google function that you used?

Hi @tomascharad, here is a version of the function with our specific info / sensor stuff stripped out. I left in a temperature data point in addition to the TTN metadata.

Function in python 3.9

from flask import Response
from google.cloud import bigquery

bqClient = bigquery.Client() #persistent object as our big query client

#to simplify our strings
DATASET_PREFIX = "sensor-data-ttn.sensorDataset"

#devices to ignore completely
DEVICE_BLACKLIST = [
    "sensor-2" #this will turn off storage for this device
]

DEVICE_TO_TABLE_ID_MAP = {
    "sensor-1": DATASET_PREFIX + ".internal-test_base"
}


TABLE_ID_TO_TABLE_OBJECT_MAP = {
    DATASET_PREFIX + ".internal_base": bqClient.get_table(DATASET_PREFIX + '.internal_base')
}


def insertTupleRowDataIntoTable(table_id, arrayOfTupleRowData):
    # pass - if running locally

    #look up the table object based on its table_id
    #insert our array of tuple data into this table
    errors = bqClient.insert_rows(TABLE_ID_TO_TABLE_OBJECT_MAP[table_id], arrayOfTupleRowData)
    if( errors != [] ):
        raise Exception("Could not find requested table '{}', or perhaps we're not authorized".format(table_id))

#For Canopy v4 and associated table schema, including enhanced air sensor fields to be null
#input JSON from a sensor and it will output the exact schema to send to SQL table
def convertSensorJSONIntoSchemaTuple(rawJSON):
    return [
        (rawJSON["end_device_ids"]["device_id"],  # dev-id
         rawJSON["uplink_message"]["rx_metadata"][0]["time"],  # time - for first gateway
         rawJSON["uplink_message"]["decoded_payload"]["sensorType"],  # sensorType
         rawJSON["uplink_message"]["decoded_payload"]["ambientTemperatureInC"]  # ambientTemperatureInC
         )
    ]

def httpHandler(request):
    try: #everything in a try block
        jsonData = request.get_json(silent=True) #turn request into json
        print("Raw JSON payload received: " + str(jsonData))

        #if the port is not "1", then we don't care about this message
        if jsonData["uplink_message"]["f_port"] != 1:
            print("Ignoring message with unexpected port #{}".format(jsonData["port"]))
            return f'Ignored port'

        #if this device is in our blacklist / ignore list
        if jsonData["end_device_ids"]["device_id"] in DEVICE_BLACKLIST:
            print("Ignoring device '{}' in blacklist".format( jsonData["end_device_ids"]["device_id"] ))
            return f'Ignored device'

        # Map dev_id to table it should go to. whitelist
        try:
            table_id = DEVICE_TO_TABLE_ID_MAP[ jsonData["end_device_ids"]["device_id"] ]
        except:
            print("dev_id {} not found in LUT. Ignoring".format( jsonData["end_device_ids"]["device_id"] )) #if it fails, raise an exception
            return f'OK'  # if everything goes ok
            # raise Exception("dev_id '{}' not found in LUT".format(jsonData["end_device_ids"]["device_id"]))

        #map sensor type to a converter function
        if jsonData["uplink_message"]["decoded_payload"]["sensorType"] == "mySensorType":
            tupleOfSensorData = convertSensorJSONIntoSchemaTuple(jsonData)
        #put other sensor type cases here
        else: #default case
            print("Unrecognized sensorType")
            return f'OK'  # if everything goes ok
            # raise Exception("Unrecognized sensorType")

        #if we're here, everything has gone very well
        #we recognize this sensor, we have its data formatted correctly for a table
        #and now it's time to store that data into a table

        print("Storing device '{}' data into table '{}':". \
              format(jsonData["end_device_ids"]["device_id"], table_id)
              + str(tupleOfSensorData) )

        insertTupleRowDataIntoTable(table_id, tupleOfSensorData)

        return f'OK' #if everything goes ok

    except Exception as e: #if anything fails at all just return a 403
        print("Some kind of exception occurred: " + str(e) )
        return Response("403 Forbidden", status=403, mimetype='text/plain')