Skip to content

Commit

Permalink
concurrent load providers
Browse files Browse the repository at this point in the history
  • Loading branch information
wwqgtxx committed Sep 6, 2024
1 parent 0f2d4c1 commit cd68081
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 25 deletions.
15 changes: 0 additions & 15 deletions dns/dial.go

This file was deleted.

5 changes: 5 additions & 0 deletions hub/executor/concurrent_load_limit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//go:build !386 && !amd64 && !arm64 && !arm64be && !mipsle && !mips

package executor

const concurrentCount = 5
5 changes: 5 additions & 0 deletions hub/executor/concurrent_load_single.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//go:build mips || mipsle

package executor

const concurrentCount = 1
7 changes: 7 additions & 0 deletions hub/executor/concurrent_load_unlimit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
//go:build 386 || amd64 || arm64 || arm64be

package executor

import "math"

const concurrentCount = math.MaxInt
27 changes: 19 additions & 8 deletions hub/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"net/netip"
"os"
"runtime"
"strconv"
"sync"

Expand Down Expand Up @@ -93,8 +94,11 @@ func ApplyConfig(cfg *config.Config, force bool) {
updateListeners(cfg.Listeners)
updateDNS(cfg.DNS, cfg.RuleProviders)
updateTun(cfg.General) // tun should not care "force"
loadProvider(cfg.RuleProviders, cfg.Providers)
loadProvider(cfg.Providers)
loadProvider(cfg.RuleProviders)
updateTunnels(cfg.Tunnels)

runtime.GC()
}

func GetGeneral() *config.General {
Expand Down Expand Up @@ -142,8 +146,8 @@ func GetGeneral() *config.General {
return general
}

func loadProvider(ruleProviders map[string]providerTypes.RuleProvider, proxyProviders map[string]providerTypes.ProxyProvider) {
load := func(pv providerTypes.Provider) {
func loadProvider[P providerTypes.Provider](providers map[string]P) {
load := func(pv P) {
if pv.VehicleType() == providerTypes.Compatible {
log.Infoln("Start initial compatible provider %s", pv.Name())
} else {
Expand All @@ -165,13 +169,20 @@ func loadProvider(ruleProviders map[string]providerTypes.RuleProvider, proxyProv
}
}

for _, proxyProvider := range proxyProviders {
load(proxyProvider)
// limit concurrent size
wg := sync.WaitGroup{}
ch := make(chan struct{}, concurrentCount)
for _, _provider := range providers {
_provider := _provider
wg.Add(1)
ch <- struct{}{}
go func() {
defer func() { <-ch; wg.Done() }()
load(_provider)
}()
}

for _, ruleProvider := range ruleProviders {
load(ruleProvider)
}
wg.Wait()
}

func updateListeners(listeners map[string]C.InboundListener) {
Expand Down
4 changes: 2 additions & 2 deletions tunnel/dns_dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (d *DNSDialer) DialContext(ctx context.Context, network, addr string) (net.
var rule C.Rule
metadata := &C.Metadata{
NetWork: C.TCP,
Type: C.INNER,
Type: C.DNS,
}
err := metadata.SetRemoteAddress(addr) // tcp can resolve host by remote
if err != nil {
Expand Down Expand Up @@ -134,7 +134,7 @@ func (d *DNSDialer) ListenPacket(ctx context.Context, network, addr string) (net
opts := d.opts
metadata := &C.Metadata{
NetWork: C.UDP,
Type: C.INNER,
Type: C.DNS,
}
err := metadata.SetRemoteAddress(addr)
if err != nil {
Expand Down

0 comments on commit cd68081

Please sign in to comment.