Run script action
Custom script
Scripting enables a user to implement complex logic or complex transformations. Currenlty the runtime supports interpreted version of golang
. Descpite having the same syntax,it has limited access to external libraries and is less performant compare to normal compiled golang compiler. Golang is very simple yet powerfull language - https://golang.org/
User scrip is executed in Run script
node. Each scripting node keeps states of all internal variables untill flow is reloaded or flow engine is reastarted.
Main handler
Main message handler is invoked by runtime once the node receives message from previous node. Handler function implemtn the signature :
func Run(msg *model.Message,ctx *model.Context,params exec.ScriptParams) string {
return "ok"
}
Parameters :
msg *model.Message
- is a reference to trigger message object.
type Message struct {
AddressStr string
Address fimpgo.Address
Payload fimpgo.FimpMessage
ValPayload interface{} // the field is optional. Must be used to generate response if is set , if not set , Payload must be used instead
RawPayload []byte
Header map[string]string
CancelOp bool // if true , listening node should close all operations
RequestId int64
Origin string
}
type ScriptParams struct {
FlowId string
Mqtt *fimpgo.MqttTransport
Registry storage.RegistryStorage
Timeseries *timeseries.Connector
Settings map[string]model.Setting
Log *log.Entry
}
Return :
If function returns ok
the node follows successfull path in transitions to next node otherwise it follows error transition path.
List of supported libraries
Golang stdlib :
fmt
time
strconv
strings
math
sort
encoding/json
net
net/http
Extension libraries :
github.com/thingsplex/tpflow/model
github.com/thingsplex/tpflow/node/action/exec
github.com/thingsplex/tpflow/registry/storage
github.com/thingsplex/tpflow/connector/plugins/timeseries
github.com/futurehomeno/fimpgo
Time series API
Functions :
func (conn *Connector) QueryDataPoints(query string) (*Result,error)
func (conn *Connector) GetDataPoints(request *GetDataPointsRequest) (*Result,error)
func (conn *Connector) GetEnergyDataPoints(request *GetDataPointsRequest) (*Result,error)
func (conn *Connector) WriteDataPoints(request *WriteDataPointsRequest) error
Types :
type DataPointsFilter struct {
Tags map[string]string `json:"tags"`
Devices []string `json:"devices"`
Locations []string `json:"locations"`
DevTypes []string `json:"dev_types"`
}
type GetDataPointsRequest struct {
ProcID int `json:"proc_id"`
FieldName string `json:"field_name"`
DataFunction string `json:"data_function"`
TransformFunction string `json:"transform_function"`
MeasurementName string `json:"measurement_name"`
RelativeTime string `json:"relative_time"`
FromTime string `json:"from_time"` // 2021-10-24T00:00:00Z
ToTime string `json:"to_time"` // 2021-10-27T00:00:00Z
GroupByTime string `json:"group_by_time"`
GroupByTag string `json:"group_by_tag"`
FillType string `json:"fill_type"`
Filters DataPointsFilter `json:"filters"`
}
type WriteDataPointsRequest struct {
ProcID int `json:"proc_id"`
Bucket string `json:"bucket"` // data is stored with retention policy , if not set system will try to auto calculate based on measurement name
DataPoints []MDataPoint `json:"dp"`
}
type MDataPoint struct {
Name string `json:"name"` // name of the measurement
Tags map[string]string `json:"tags"`
Fields map[string]interface{} `json:"fields"`
TimeStamp int64 `json:"ts"` // if 0 , ecollector will set local time
}
type Result struct {
Series []Row
Err string `json:"error,omitempty"`
}
// Row represents a single row returned from the execution of a statement.
type Row struct {
Name string `json:"name,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
Columns []string `json:"columns,omitempty"`
Values [][]interface{} `json:"values,omitempty"`
Partial bool `json:"partial,omitempty"`
}
Example JSON response :
[
{
"name": "sensor_temp.evt.sensor.report",
"tags": {
"location_id": "10"
},
"columns": [
"time",
"value"
],
"values": [
[
1635238800,
null
],
[
1635242400,
21.5
],
[
1635246000,
22.5
]
]
},
{
"name": "sensor_temp.evt.sensor.report",
"tags": {
"location_id": "11"
},
"columns": [
"time",
"value"
],
"values": [
[
1635238800,
22
],
[
1635242400,
22.600000381469727
],
[
1635246000,
22.5
]
]
},
{
"name": "sensor_temp.evt.sensor.report",
"tags": {
"location_id": "12"
},
"columns": [
"time",
"value"
],
"values": [
[
1635238800,
23.100000381469727
],
[
1635242400,
23.68
],
[
1635246000,
23.6
]
]
},
{
"name": "sensor_temp.evt.sensor.report",
"tags": {
"location_id": "13"
},
"columns": [
"time",
"value"
],
"values": [
[
1635238800,
20.17
],
[
1635242400,
20.17
],
[
1635246000,
20.17
]
]
},
{
"name": "sensor_temp.evt.sensor.report",
"tags": {
"location_id": "16"
},
"columns": [
"time",
"value"
],
"values": [
[
1635238800,
8.199999809265137
],
[
1635242400,
8.899999618530273
],
[
1635246000,
9.800000190734863
]
]
},
{
"name": "sensor_temp.evt.sensor.report",
"tags": {
"location_id": "7"
},
"columns": [
"time",
"value"
],
"values": [
[
1635238800,
3
],
[
1635242400,
22.3
],
[
1635246000,
22.3
]
]
},
{
"name": "sensor_temp.evt.sensor.report",
"tags": {
"location_id": "8"
},
"columns": [
"time",
"value"
],
"values": [
[
1635238800,
22.58
],
[
1635242400,
21.1200008392334
],
[
1635246000,
21.0699996948242
]
]
}
]
Examples
package ext
import "fmt"
import "github.com/thingsplex/tpflow/model"
import "github.com/thingsplex/tpflow/node/action/exec"
import "github.com/futurehomeno/fimpgo"
var counter int = 1
type AppLogic struct {
mqtt *fimpgo.MqttTransport
ctx *model.Context
}
func(al *AppLogic) sendMessage(msg string) {
fimpMsg := fimpgo.NewMessage("evt.script.test","test","string",msg,nil,nil,nil)
al.mqtt.PublishToTopic("pt:j1/mt:evt/rt:app/rn:testapp/ad:1",fimpMsg)
}
func(al *AppLogic) CheckHomeMode() {
hModeVal,err := al.ctx.GetVariable("fh.home.mode","global")
if err != nil {
return
}
hMode, ok := hModeVal.Value.(string)
if !ok {
return
}
if hMode == "home" {
al.sendMessage("home is set to home mode")
}
}
func Run(msg *model.Message,ctx *model.Context,params exec.ScriptParams) string {
appL := AppLogic{mqtt:params.Mqtt,ctx:ctx}
appL.CheckHomeMode()
counter++
r := fmt.Sprintf("Hello %d",counter)
ctx.SetVariable("hello_var","string",r,"",params.FlowId,true)
return "ok"
}
package ext
import "fmt"
import "github.com/thingsplex/tpflow/model"
import "github.com/thingsplex/tpflow/node/action/exec"
var counter int = 1
type TestMsg struct {
Name string
Counter int
}
type Response struct {
Result string
ErrorCode string
}
func Run(msg *model.Message,ctx *model.Context,params exec.ScriptParams) string {
var request TestMsg
msg.PayloadMsgToStruct(&request) // cast request object to variable
counter = request.Counter
// mutating trigger request variable
msg.ValPayload = Response {
Result:"ok",
}
r := fmt.Sprintf("Hello %d",counter)
params.Log.Info("New request , counter = ",counter)
ctx.SetVariable("test_var","string",r,"",params.FlowId,true)
ctx.SetVariable("request_var","object",msg.ValPayload,"",params.FlowId,true)
return "ok"
}
package ext
import "fmt"
import "github.com/thingsplex/tpflow/model"
import "github.com/thingsplex/tpflow/node/action/exec"
import "github.com/futurehomeno/fimpgo"
import "github.com/thingsplex/tpflow/connector/plugins/timeseries"
var counter int = 1
type AppLogic struct {
mqtt *fimpgo.MqttTransport
ctx *model.Context
}
func sendDataPoint(params exec.ScriptParams) {
dp := timeseries.MDataPoint{
Name : "test_data_point",
Tags : map[string]string {},
Fields : map[string]interface{} {"val":10},
}
dpReq := timeseries.WriteDataPointsRequest{
ProcID :1,
Bucket :"gen_default",
DataPoints :[]timeseries.MDataPoint{dp},
}
params.Timeseries.WriteDataPoints(&dpReq)
}
func Run(msg *model.Message,ctx *model.Context,params exec.ScriptParams) string {
// appL := AppLogic{mqtt:params.Mqtt,ctx:ctx}
tsReq := timeseries.GetDataPointsRequest{
ProcID :1,
FieldName :"value",
DataFunction :"last",
MeasurementName :"sensor_temp.evt.sensor.report",
RelativeTime :"12h",
GroupByTime :"1h",
GroupByTag :"location_id",
FillType :"previous"}
tsResult,err := params.Timeseries.GetDataPoints(&tsReq)
if err != nil {
params.Log.Error("Error :",err)
} else {
msg.ValPayload = tsResult.Series
}
sendDataPoint(params)
params.Log.Info("New request , counter = ",counter)
counter++
// r := fmt.Sprintf("Hello %d",counter)
// ctx.SetVariable("hello_var","string",r,"",params.FlowId,true)
return "ok"
}
Parsing HTTP request , requesting time series data and sending response :
package ext
import "fmt"
import "encoding/json"
import "github.com/thingsplex/tpflow/model"
import "github.com/thingsplex/tpflow/node/action/exec"
import "github.com/futurehomeno/fimpgo"
import "github.com/thingsplex/tpflow/connector/plugins/timeseries"
var counter int = 1
func Run(msg *model.Message,ctx *model.Context,params exec.ScriptParams) string {
tsReq := timeseries.GetDataPointsRequest{
ProcID :1,
FieldName :"value",
DataFunction :"last",
MeasurementName :"sensor_temp.evt.sensor.report",
RelativeTime :"3d",
GroupByTime :"1h",
GroupByTag :"location_id",
FillType :"previous"}
params.Log.Info("Input payload size = ",len(msg.RawPayload))
// Unmarshaling Flowframe variable int tsReq variable
json.Unmarshal(msg.RawPayload,&tsReq)
tsResult,err := params.Timeseries.GetDataPoints(&tsReq)
if err != nil {
params.Log.Error("Error :",err)
} else {
msg.ValPayload = tsResult.Series
}
params.Log.Info("New request , counter = ",counter)
counter++
// ctx.SetVariable("hello_var","string",r,"",params.FlowId,true)
return "ok"
}