iptvChannel/controller/controller.go
Senis 70628dd938
Update ZTEG and HWTC API implementations and logging changes
This commit updates ZTEG and HWTC implementation with refreshed logic around handling channels and EPGs data. It also includes minor changes in logging information for clarity. Deleted 'config.example.yaml' file. Additionally, implemented new test cases to validate these changes.
2024-01-21 12:23:39 +08:00

187 lines
3.8 KiB
Go

package controller
import (
"errors"
"fmt"
"strings"
"sync"
"time"
"github.com/robfig/cron/v3"
log "github.com/sirupsen/logrus"
"github.com/thank243/iptvChannel/api"
"github.com/thank243/iptvChannel/api/hwtc"
"github.com/thank243/iptvChannel/api/zteg"
"github.com/thank243/iptvChannel/app/server"
"github.com/thank243/iptvChannel/config"
)
func New(c *config.Config) (*Controller, error) {
// set log level
level, err := log.ParseLevel(c.LogLevel)
if err != nil {
log.Panic(err)
}
log.SetLevel(level)
if level == log.DebugLevel || level == log.TraceLevel {
log.SetReportCaller(true)
}
// set provide mode
c.Mode = strings.ToUpper(c.Mode)
switch c.Mode {
case "UDPXY":
if c.UdpxyHost == "" {
return nil, errors.New("udpxy host is null")
}
case "IGMP", "URL":
default:
return nil, fmt.Errorf("unsupported mode: %s", c.Mode)
}
ctrl := &Controller{
conf: c,
server: server.New(c),
cron: cron.New(),
maxConcurrent: c.MaxConcurrent,
}
// set api provider
switch strings.ToLower(c.Api.Provider) {
case "hwtc":
ctrl.cli = hwtc.New(c)
case "zteg":
ctrl.cli = zteg.New(c)
default:
return nil, fmt.Errorf("unsupported mode: %s", c.Api.Provider)
}
// check max concurrent
if c.MaxConcurrent > 16 {
ctrl.maxConcurrent = 16
}
// set cron job skip if still running
if _, err := ctrl.cron.AddJob(c.Cron, cron.NewChain(cron.SkipIfStillRunning(cron.DefaultLogger)).Then(ctrl)); err != nil {
return nil, err
}
return ctrl, nil
}
func (c *Controller) Start() error {
fmt.Printf("%s\nLogLevel: %s, MaxConcurrent: %d, Mode: %s, Provider: %s\n",
config.GetVersion(), c.conf.LogLevel, c.maxConcurrent, strings.ToUpper(c.conf.Mode), c.conf.Api.Provider)
log.Info("Starting service..")
log.Info("Fetch Channels and EPGs data on initial startup")
c.Run()
time.Sleep(time.Second)
// start cron job
c.cron.Start()
// start http server
if err := c.server.Echo.Start(c.conf.Address); err != nil {
return err
}
return nil
}
func (c *Controller) Stop() error {
log.Info("Closing service..")
if err := c.server.Echo.Shutdown(c.cron.Stop()); err != nil {
return err
}
return nil
}
// Run fetch EPGs and Channels
func (c *Controller) Run() {
if err := c.fetchChannels(); err != nil {
log.Error(err)
}
if err := c.fetchEPGs(); err != nil {
log.Error(err)
}
}
func (c *Controller) fetchChannels() error {
log.Info("Fetch Channels")
channels, err := c.cli.GetChannels()
if err != nil {
return err
}
c.server.Channels.Store(&channels)
log.Infof("Get channels: %d", len(channels))
return nil
}
func (c *Controller) fetchEPGs() error {
log.Info("Fetch EPGs")
if c.server.Channels.Load() == nil {
log.Info("Channels is null, fetch channels first")
if err := c.fetchChannels(); err != nil {
return err
}
}
channels := *c.server.Channels.Load()
var epgChan = make(chan api.Epg)
var wg sync.WaitGroup
sem := make(chan bool, c.maxConcurrent) // This is used to limit the number of goroutines to maxConcurrent
for i := range channels {
wg.Add(1)
go func(i int) {
defer func() {
<-sem // leave semaphore
wg.Done()
}()
sem <- true // enter semaphore, will block if there are maxConcurrent tasks running already
ch := channels[i]
logger := log.WithFields(log.Fields{
"ChannelId": ch.ChannelID,
"ChannelName": ch.ChannelName,
})
logger.Debug("start get EPGs")
epgs, err := c.cli.GetEPGs(ch.ChannelID)
if err != nil {
logger.Error(err)
return
}
for i := range epgs {
epgChan <- epgs[i]
}
}(i)
}
// Close the channel after all work has been done
go func() {
wg.Wait()
close(epgChan)
}()
// Consume results from the channel and append to slice
var esSlice []api.Epg
for e := range epgChan {
esSlice = append(esSlice, e)
}
c.server.EPGs.Store(&esSlice)
log.Infof("Get EPGs: %d", len(esSlice))
return nil
}