Skip to content

Commit

Permalink
pkg om telegrammen op te nemen te gelijketijd
Browse files Browse the repository at this point in the history
Signed-off-by: toby lorne <toby@toby.codes>
  • Loading branch information
tlwr committed Sep 1, 2024
1 parent bc2c5cb commit e05fed3
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 13 deletions.
44 changes: 31 additions & 13 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/tarm/serial"

"github.com/tlwr/smart-meter-exporter/pkg/p1parser"
"github.com/tlwr/smart-meter-exporter/pkg/p1telegramcollector"
)

var (
Expand Down Expand Up @@ -68,6 +69,10 @@ func main() {
log.Fatalf("kon niet serial device openen: %s", err)
}

collCtx, cancelColl := context.WithCancel(rootCtx)
coll := p1telegramcollector.New()
coll.Run(collCtx)

wg := sync.WaitGroup{}

serialCtx, cancelSerial := context.WithCancel(sigCtx)
Expand All @@ -92,22 +97,34 @@ func main() {
if n == 0 {
continue
}
if 600 > n || n > 1024 {
log.Printf("overslaan, waarschijnlijk iets mis met data van %d", n)
continue
}
tg, err := p1parser.Parse(buf)
if err != nil {
log.Printf("kon niet de telegram parsen: %s", err)
continue
}

huidigVerbruik.Set(tg.HuidigVerbruik)
totaalVerbruik.Set(tg.VerbruikTotaal)
huidigTeruglevering.Set(tg.HuidigTeruglevering)
totalTeruglevering.Set(tg.TeruggeleverdTotaal)
coll.Write(buf)
}
}
}()

wg.Add(1)
go func() {
for telegram := range coll.C {
if telegram == nil {
break
}

tg, err := p1parser.Parse(telegram.Bytes())
if err != nil {
log.Printf("kon niet de telegram parsen: %s", err)
continue
}

log.Printf("telegram %v", tg)

huidigVerbruik.Set(tg.HuidigVerbruik)
totaalVerbruik.Set(tg.VerbruikTotaal)
huidigTeruglevering.Set(tg.HuidigTeruglevering)
totalTeruglevering.Set(tg.TeruggeleverdTotaal)
}

wg.Done()
}()

promServer := http.Server{
Expand All @@ -134,6 +151,7 @@ func main() {
sigStop()
log.Println("signaal te stoppen ontvangen")

cancelColl()
cancelSerial()

psCtx, psCancel := context.WithTimeout(rootCtx, 15*time.Second)
Expand Down
74 changes: 74 additions & 0 deletions pkg/p1telegramcollector/collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package p1telegramcollector

import (
"bufio"
"bytes"
"context"
"io"
"regexp"
"sync"
"time"
)

type Collector struct {
bmu sync.RWMutex
b *bytes.Buffer

r io.Reader
w io.Writer
s *bufio.Scanner

stop chan bool
C chan *bytes.Buffer
}

var (
telegramR = regexp.MustCompile("(?sm)/[^!]*[!][0-9A-Z]{4}")
)

func New() *Collector {
return &Collector{
stop: make(chan bool),
b: new(bytes.Buffer),
C: make(chan *bytes.Buffer),
}
}

func (c *Collector) Write(b []byte) (int, error) {
c.bmu.Lock()
defer c.bmu.Unlock()
return c.b.Write(b)
}

func (c *Collector) Run(ctx context.Context) {
go func() {
for {
select {
case <-ctx.Done():
c.C <- nil
close(c.C)
return
default:
}

func() {
c.bmu.RLock()
defer c.bmu.RUnlock()
defer time.Sleep(500 * time.Millisecond)

b := c.b.Bytes()
loc := telegramR.FindIndex(b)
if loc != nil {
// nieuwste telegram
telegram := b[loc[0]:loc[1]]

// ervolgend data
c.b = new(bytes.Buffer)
c.b.Write(b[loc[1]+1:])

c.C <- bytes.NewBuffer(telegram)
}
}()
}
}()
}

0 comments on commit e05fed3

Please sign in to comment.