feat: concurrent processing

closes #14
This commit is contained in:
Felipe M 2022-01-27 00:20:31 +01:00
parent c03232381c
commit 61f6851e33
Signed by: fmartingr
GPG Key ID: 716BC147715E716F
4 changed files with 48 additions and 36 deletions

View File

@ -4,6 +4,7 @@ import (
"context"
"flag"
"log"
"os"
"github.com/fmartingr/games-screenshot-manager/internal/models"
"github.com/fmartingr/games-screenshot-manager/pkg/processor"
@ -30,18 +31,22 @@ func Start() {
registry.Register(steam.Name, steam.NewSteamProvider())
registry.Register(retroarch.Name, retroarch.NewRetroArchProvider())
flagSet := flag.NewFlagSet("gsm", flag.ExitOnError)
options := models.Options{
OutputPath: *flag.String("output-path", defaultOutputPath, "The destination path of the screenshots"),
DownloadCovers: *flag.Bool("download-covers", defaultDownloadCovers, "use to enable the download of covers (if the provider supports it)"),
DryRun: *flag.Bool("dry-run", defaultDryRun, "Use to disable write actions on filesystem"),
ProcessBufferSize: 0, // Unbuffered for now
}
var providerName = flag.String("provider", defaultProvider, "steam")
providerOptions := models.ProviderOptions{
InputPath: *flag.String("input-path", defaultInputPath, "Input path for the provider that requires it"),
ProcessBufferSize: 32,
}
flag.Parse()
flagSet.StringVar(&options.OutputPath, "output-path", defaultOutputPath, "The destination path of the screenshots")
flagSet.BoolVar(&options.DownloadCovers, "download-covers", defaultDownloadCovers, "use to enable the download of covers (if the provider supports it)")
flagSet.BoolVar(&options.DryRun, "dry-run", defaultDryRun, "Use to disable write actions on filesystem")
flagSet.IntVar(&options.WorkersNum, "workers-num", 2, "Number of workers to use to process games")
var providerName = flagSet.String("provider", defaultProvider, "steam")
providerOptions := models.ProviderOptions{}
flagSet.StringVar(&providerOptions.InputPath, "input-path", defaultInputPath, "Input path for the provider that requires it")
flagSet.Parse(os.Args[1:])
provider, err := registry.Get(*providerName)
if err != nil {
@ -54,10 +59,9 @@ func Start() {
return
}
ctx, cancel := context.WithCancel(context.Background())
processor := processor.NewProcessor(options)
if len(games) > 0 {
ctx, cancel := context.WithCancel(context.Background())
processor := processor.NewProcessor(options)
processor.Start(ctx)
for _, g := range games {
@ -65,7 +69,6 @@ func Start() {
}
processor.Wait()
cancel()
}
cancel()
}

View File

@ -0,0 +1,9 @@
package models
type Options struct {
OutputPath string
DryRun bool
DownloadCovers bool
ProcessBufferSize int
WorkersNum int
}

View File

@ -1,12 +1,5 @@
package models
type Options struct {
OutputPath string
DryRun bool
DownloadCovers bool
ProcessBufferSize int
}
type ProviderOptions struct {
InputPath string
}

View File

@ -15,27 +15,30 @@ import (
)
type Processor struct {
input chan *models.Game
games chan *models.Game
options models.Options
wg sync.WaitGroup
wg *sync.WaitGroup
}
func (p *Processor) Start(ctx context.Context) {
go p.process(ctx)
for i := 0; i < p.options.WorkersNum; i++ {
go p.process(ctx)
}
}
func (p *Processor) Process(game *models.Game) {
p.input <- game
p.wg.Add(1)
p.games <- game
}
func (p *Processor) process(ctx context.Context) {
log.Println("Started worker process")
for {
select {
case <-ctx.Done():
return
case game := <-p.input:
p.wg.Add(1)
case game := <-p.games:
if err := p.processGame(game); err != nil {
log.Printf("[err] %s", err)
}
@ -47,18 +50,12 @@ func (p *Processor) Wait() {
p.wg.Wait()
}
func NewProcessor(options models.Options) *Processor {
return &Processor{
input: make(chan *models.Game, options.ProcessBufferSize),
options: options,
wg: sync.WaitGroup{},
}
}
// TODO: Reduce into smaller functions
func (p *Processor) processGame(game *models.Game) (err error) {
defer p.wg.Done()
log.Printf("Processing game: %s", game.Name)
// Do not continue if there's no screenshots
if len(game.Screenshots) == 0 {
return
@ -118,10 +115,20 @@ func (p *Processor) processGame(game *models.Game) (err error) {
if p.options.DryRun {
log.Println(filepath.Base(screenshot.Path), " -> ", strings.Replace(destinationPath, helpers.ExpandUser(p.options.OutputPath), "", 1))
} else {
helpers.CopyFile(screenshot.Path, destinationPath)
if _, err := helpers.CopyFile(screenshot.Path, destinationPath); err != nil {
log.Printf("[error] error during operation: %s", err)
}
}
}
}
return nil
}
func NewProcessor(options models.Options) *Processor {
return &Processor{
games: make(chan *models.Game, options.ProcessBufferSize),
options: options,
wg: &sync.WaitGroup{},
}
}