matthew
2 years ago
10 changed files with 242 additions and 72 deletions
@ -0,0 +1,14 @@ |
|||||||
|
import influxdb from "./modules/influxdb" |
||||||
|
|
||||||
|
import db from "../db/index" |
||||||
|
|
||||||
|
const main = async () => { |
||||||
|
const allDomainsCount = await db.nftDomain.count() |
||||||
|
const availableDomainsCount = await db.nftDomain.count({ where: { available: true } }) |
||||||
|
influxdb.writeSitesCount({ |
||||||
|
all: allDomainsCount, |
||||||
|
available: availableDomainsCount, |
||||||
|
}) |
||||||
|
} |
||||||
|
|
||||||
|
export default main |
@ -1,31 +1,34 @@ |
|||||||
import dotenv from "dotenv" |
import dotenv from "dotenv" |
||||||
import path from "path" |
import path from "path" |
||||||
dotenv.config({ path: path.resolve(__dirname, "../.env.local") }) |
dotenv.config({ path: path.resolve(__dirname, "../.env.local") }) |
||||||
import domainWatcher from './domain-watcher' |
import domainWatcher from "./domain-watcher" |
||||||
import parser from './parser' |
import parser from "./parser" |
||||||
|
import influx from "./influx" |
||||||
|
|
||||||
const run = async()=>{ |
const run = async()=>{ |
||||||
console.log('Start domain watcher') |
console.log('Start domain watcher') |
||||||
console.time('watcher') |
console.time('watcher') |
||||||
await domainWatcher(); |
await domainWatcher(); |
||||||
console.timeEnd('watcher') |
console.timeEnd('watcher') |
||||||
|
influx() |
||||||
console.log('Start parser'); |
console.log('Start parser'); |
||||||
console.time('watcher'); |
console.time('watcher'); |
||||||
await parser(); |
await parser(); |
||||||
console.timeEnd('watcher'); |
console.timeEnd('watcher'); |
||||||
|
|
||||||
} |
} |
||||||
|
|
||||||
const second = 1000; |
const second = 1000 |
||||||
const minute= 60 * second; |
const minute = 60 * second |
||||||
const hour = 60 * minute;
|
const hour = 60 * minute |
||||||
|
|
||||||
run(); |
run() |
||||||
|
|
||||||
setInterval(() => { |
setInterval(() => { |
||||||
console.log(new Date(), 'Health check'); |
console.log(new Date(), "Health check") |
||||||
}, hour) |
}, hour) |
||||||
|
|
||||||
setInterval(() => { |
setInterval(() => { |
||||||
console.log(new Date(), 'Cron parse start') |
console.log(new Date(), "Cron parse start") |
||||||
run() |
run() |
||||||
}, 3 * hour) |
}, 3 * hour) |
@ -0,0 +1,5 @@ |
|||||||
|
export const influxToken = process.env.INFLUX_TOKEN as string; |
||||||
|
export const influxOrg = process.env.INFLUX_ORG as string |
||||||
|
export const influxBucket = process.env.INFLUX_BUCKET as string |
||||||
|
export const influxPointName = 'count'; |
||||||
|
export const influxHost = 'Searching'; |
@ -0,0 +1,46 @@ |
|||||||
|
import { fluxDuration } from "@influxdata/influxdb-client"; |
||||||
|
import { influxBucket, influxHost, influxPointName } from "./constants"; |
||||||
|
import { InfluxField, InfluxPeriod } from "./types"; |
||||||
|
|
||||||
|
export const influxQuery = (field: InfluxField, fetchPeriod: InfluxPeriod) =>{ |
||||||
|
let start; |
||||||
|
let period; |
||||||
|
|
||||||
|
switch(fetchPeriod){
|
||||||
|
case InfluxPeriod.H: |
||||||
|
start ='-1h' |
||||||
|
period = '6m' |
||||||
|
case InfluxPeriod.D: |
||||||
|
start ='-1d' |
||||||
|
period = '144m' |
||||||
|
case InfluxPeriod.W: |
||||||
|
start ='-1w' |
||||||
|
period = '1008m' |
||||||
|
case InfluxPeriod.M: |
||||||
|
start ='-1mo' |
||||||
|
period = '3d' |
||||||
|
case InfluxPeriod.Y: |
||||||
|
start ='-1y' |
||||||
|
period = '36d' |
||||||
|
case InfluxPeriod.tenminute: |
||||||
|
start ='10m' |
||||||
|
period = '1m' |
||||||
|
} |
||||||
|
|
||||||
|
const influxPeriod = fluxDuration(period); |
||||||
|
const influxStart = fluxDuration(start); |
||||||
|
return `from(bucket: "${influxBucket}")
|
||||||
|
|> range(start: ${start}, stop: now()) |
||||||
|
|> filter(fn: (r) => r["host"] == "${influxHost}") |
||||||
|
|> filter(fn: (r) => r["_field"] == "${field}") |
||||||
|
|> filter(fn: (r) => r["_measurement"] == "${influxPointName}") |
||||||
|
|> aggregateWindow(every: ${period}, fn: mean, createEmpty: false) |
||||||
|
|> yield(name: "mean")` |
||||||
|
}
|
||||||
|
|
||||||
|
export const processInfluxResult = (res:unknown[]) => { |
||||||
|
return res.map(i=>({ |
||||||
|
value: i._value, |
||||||
|
time: i._time |
||||||
|
})) |
||||||
|
} |
@ -0,0 +1,52 @@ |
|||||||
|
import { fluxDuration, InfluxDB } from "@influxdata/influxdb-client" |
||||||
|
import { Point } from "@influxdata/influxdb-client" |
||||||
|
import { influxBucket, influxHost, influxOrg, influxPointName, influxToken } from "./constants" |
||||||
|
import { influxQuery, processInfluxResult } from "./helpers"; |
||||||
|
import { InfluxField, InfluxPeriod } from "./types" |
||||||
|
|
||||||
|
interface WriteSitesCounteParams { |
||||||
|
all: number; |
||||||
|
available: number; |
||||||
|
} |
||||||
|
|
||||||
|
interface QueryParams { |
||||||
|
field: InfluxField; |
||||||
|
period: InfluxPeriod; |
||||||
|
} |
||||||
|
|
||||||
|
class InfluxDb { |
||||||
|
private client: InfluxDB |
||||||
|
constructor() { |
||||||
|
this.client = new InfluxDB({ url: process.env.INFLUX_URL as string, token: influxToken }) |
||||||
|
} |
||||||
|
getWriteApi() { |
||||||
|
const writeApi = this.client.getWriteApi(influxOrg, influxBucket) |
||||||
|
writeApi.useDefaultTags({ host: influxHost }) |
||||||
|
return writeApi |
||||||
|
} |
||||||
|
writeSitesCount({all,available}:WriteSitesCounteParams){ |
||||||
|
const writeApi = this.getWriteApi() |
||||||
|
const pointAll = new Point(influxPointName).intField(InfluxField.ALL_SITES,all); |
||||||
|
const pointAvailable = new Point(influxPointName).intField(InfluxField.AVAILABLE_SITES,available); |
||||||
|
writeApi.writePoint(pointAll) |
||||||
|
writeApi.writePoint(pointAvailable) |
||||||
|
writeApi.close() |
||||||
|
} |
||||||
|
async query({field,period}:QueryParams){ |
||||||
|
const queryApi = this.client.getQueryApi(influxOrg) |
||||||
|
|
||||||
|
const query = influxQuery(field, period); |
||||||
|
const result = await queryApi.collectRows(query) |
||||||
|
console.log(processInfluxResult(result)); |
||||||
|
} |
||||||
|
getAllSiteCounts(period:InfluxPeriod){ |
||||||
|
|
||||||
|
this.query({ |
||||||
|
field:InfluxField.ALL_SITES, |
||||||
|
period |
||||||
|
}) |
||||||
|
|
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
export default new InfluxDb() |
@ -0,0 +1,13 @@ |
|||||||
|
export enum InfluxPeriod { |
||||||
|
H='H', |
||||||
|
D='D', |
||||||
|
W='W', |
||||||
|
M='M', |
||||||
|
Y='Y', |
||||||
|
tenminute='tenminute', |
||||||
|
} |
||||||
|
|
||||||
|
export enum InfluxField { |
||||||
|
ALL_SITES = 'ALL_SITES', |
||||||
|
AVAILABLE_SITES = 'AVAILABLE_SITES' |
||||||
|
} |
Loading…
Reference in new issue