Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rebalance to prevent channel going below AR-OUT-Target[2] #312

Open
wants to merge 3 commits into
base: v1.9.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gui/templates/rebalances_table.html
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ <h2>{% if status %}Successful{% else %}Last{% endif %} {{title}}</h2>
function formatDuration(duration){
return `${duration} minute${duration === 1 ? 's':''}`
}
statuses = {0: 'Pending', 1: 'In-Flight', 2: 'Successful', 3: 'Timeout', 4: 'No-Route', 5: 'Error', 6: 'Incorrect-Payment-Details', 7: 'Insufficient-Balance', 400: 'Rebalancer-Request-Failed', 406: 'No-Sources', 408: 'Rebalancer-Request-Timeout', 499: 'Cancelled'}
statuses = {0: 'Pending', 1: 'In-Flight', 2: 'Successful', 3: 'Timeout', 4: 'No-Route', 5: 'Error', 6: 'Incorrect-Payment-Details', 7: 'Insufficient-Balance', 400: 'Rebalancer-Request-Failed', 406: 'No-Sources', 407: 'AR-Not-Required', 408: 'Rebalancer-Request-Timeout', 499: 'Cancelled'}
transformations = {
'requested': rebalance => ({innerHTML: formatDate(rebalance.requested), title: adjustTZ(rebalance.requested)}),
'start': rebalance => ({innerHTML: formatDate(rebalance.start), title: rebalance.start ? adjustTZ(rebalance.start) : '---'}),
Expand Down
107 changes: 77 additions & 30 deletions rebalancer.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ def inbound_cans_len(inbound_cans):
async def run_rebalancer(rebalance, worker):
try:
#Reduce potential rebalance value in percent out to avoid going below AR-OUT-Target
auto_rebalance_channels = Channels.objects.filter(is_active=True, is_open=True, private=False).annotate(percent_outbound=((Sum('local_balance')+Sum('pending_outbound')-rebalance.value)*100)/Sum('capacity')).annotate(inbound_can=(((Sum('remote_balance')+Sum('pending_inbound'))*100)/Sum('capacity'))/Sum('ar_in_target'))
#Only considering local balance for outbound_cans. This is conservative and avoids channel depleting beyond ar-out target if all out going htlcs settle. For inbound, pending inbound is considerred in remote balance.
auto_rebalance_channels = Channels.objects.filter(is_active=True, is_open=True, private=False).annotate(percent_outbound=((Sum('local_balance')-rebalance.value)*100)/Sum('capacity')).annotate(inbound_can=(((Sum('remote_balance')+Sum('pending_inbound'))*100)/Sum('capacity'))/Sum('ar_in_target'))
outbound_cans = await get_out_cans(rebalance, auto_rebalance_channels)
if len(outbound_cans) == 0 and rebalance.manual == False:
print(f"{datetime.now().strftime('%c')} : No outbound_cans")
Expand All @@ -48,7 +49,19 @@ async def run_rebalancer(rebalance, worker):
await save_record(rebalance)
return None
elif str(outbound_cans).replace('\'', '') != rebalance.outgoing_chan_ids and rebalance.manual == False:
#print (f"{datetime.now().strftime('%c')} : Outbound chans are changed, replacing old {rebalance.outgoing_chan_ids=} with new {outbound_cans=}")
rebalance.outgoing_chan_ids = str(outbound_cans).replace('\'', '')

#Check if rebalance still required or if things changed while waiting..
inbound_cans = auto_rebalance_channels.filter(remote_pubkey=rebalance.last_hop_pubkey).filter(auto_rebalance=True, inbound_can__gte=1)
if await inbound_cans_len(inbound_cans) == 0 and rebalance.manual == False:
print (f"{datetime.now().strftime('%c')} : AR Not Required ..")
rebalance.status = 407
rebalance.start = datetime.now()
rebalance.stop = datetime.now()
await save_record(rebalance)
return None

rebalance.start = datetime.now()
try:
#Open connection with lnd via grpc
Expand All @@ -69,7 +82,7 @@ async def run_rebalancer(rebalance, worker):
#SUCCESSFUL
rebalance.status = 2
rebalance.fees_paid = payment_response.fee_msat/1000
successful_out = payment_response.htlcs[0].route.hops[0].pub_key
successful_out_htlcs = payment_response.htlcs
elif payment_response.status == 3:
#FAILURE
if payment_response.failure_reason == 1:
Expand Down Expand Up @@ -103,13 +116,13 @@ async def run_rebalancer(rebalance, worker):
inc=1.21
dec=2
if rebalance.status ==2:
await update_channels(stub, rebalance.last_hop_pubkey, successful_out)
await update_channels(stub, rebalance.last_hop_pubkey, successful_out_htlcs)
#Reduce potential rebalance value in percent out to avoid going below AR-OUT-Target
auto_rebalance_channels = Channels.objects.filter(is_active=True, is_open=True, private=False).annotate(percent_outbound=((Sum('local_balance')+Sum('pending_outbound')-rebalance.value*inc)*100)/Sum('capacity')).annotate(inbound_can=(((Sum('remote_balance')+Sum('pending_inbound'))*100)/Sum('capacity'))/Sum('ar_in_target'))
auto_rebalance_channels = Channels.objects.filter(is_active=True, is_open=True, private=False).annotate(percent_outbound=((Sum('local_balance')-rebalance.value*inc)*100)/Sum('capacity')).annotate(inbound_can=(((Sum('remote_balance')+Sum('pending_inbound'))*100)/Sum('capacity'))/Sum('ar_in_target'))
inbound_cans = auto_rebalance_channels.filter(remote_pubkey=rebalance.last_hop_pubkey).filter(auto_rebalance=True, inbound_can__gte=1)
outbound_cans = await get_out_cans(rebalance, auto_rebalance_channels)
if await inbound_cans_len(inbound_cans) > 0 and len(outbound_cans) > 0:
next_rebalance = Rebalancer(value=int(rebalance.value*inc), fee_limit=round(rebalance.fee_limit*inc, 3), outgoing_chan_ids=str(outbound_cans).replace('\'', ''), last_hop_pubkey=rebalance.last_hop_pubkey, target_alias=original_alias, duration=1)
if await inbound_cans_len(inbound_cans) > 0:
#Set with blank outgoing_chan_ids which are selected at run time.
next_rebalance = Rebalancer(value=int(rebalance.value*inc), fee_limit=round(rebalance.fee_limit*inc, 3), outgoing_chan_ids=' ', last_hop_pubkey=rebalance.last_hop_pubkey, target_alias=original_alias, duration=1)
await save_record(next_rebalance)
print(f"{datetime.now().strftime('%c')} : RapidFire up {next_rebalance.target_alias=} {next_rebalance.value=} {rebalance.value=}")
else:
Expand All @@ -126,8 +139,8 @@ async def run_rebalancer(rebalance, worker):
next_value = rebalance.value/dec

inbound_cans = auto_rebalance_channels.filter(remote_pubkey=rebalance.last_hop_pubkey).filter(auto_rebalance=True, inbound_can__gte=1)
if await inbound_cans_len(inbound_cans) > 0 and len(outbound_cans) > 0:
next_rebalance = Rebalancer(value=int(next_value), fee_limit=round(rebalance.fee_limit/(rebalance.value/next_value), 3), outgoing_chan_ids=str(outbound_cans).replace('\'', ''), last_hop_pubkey=rebalance.last_hop_pubkey, target_alias=original_alias, duration=1)
if await inbound_cans_len(inbound_cans) > 0:
next_rebalance = Rebalancer(value=int(next_value), fee_limit=round(rebalance.fee_limit/(rebalance.value/next_value), 3), outgoing_chan_ids=' ', last_hop_pubkey=rebalance.last_hop_pubkey, target_alias=original_alias, duration=1)
await save_record(next_rebalance)
print(f"{datetime.now().strftime('%c')} : RapidFire Down {next_rebalance.target_alias=} {next_rebalance.value=} {rebalance.value=}")
else:
Expand Down Expand Up @@ -161,20 +174,59 @@ def estimate_liquidity( payment ):
return estimated_liquidity

@sync_to_async
def update_channels(stub, incoming_channel, outgoing_channel):
def update_channels(stub, incoming_channel, outgoing_channel_htlcs):
try:
# Incoming channel update
channel = stub.ListChannels(ln.ListChannelsRequest(peer=bytes.fromhex(incoming_channel))).channels[0]
db_channel = Channels.objects.filter(chan_id=channel.chan_id)[0]
db_channel.local_balance = channel.local_balance
db_channel.remote_balance = channel.remote_balance
db_channel.save()
# Outgoing channel update
channel = stub.ListChannels(ln.ListChannelsRequest(peer=bytes.fromhex(outgoing_channel))).channels[0]
db_channel = Channels.objects.filter(chan_id=channel.chan_id)[0]
db_channel.local_balance = channel.local_balance
db_channel.remote_balance = channel.remote_balance
db_channel.save()
for channel in stub.ListChannels(ln.ListChannelsRequest(peer=bytes.fromhex(incoming_channel))).channels:
pending_in = 0
pending_out = 0
if len(channel.pending_htlcs) > 0:
for htlc in channel.pending_htlcs:
if htlc.incoming == True:
pending_in += htlc.amount
else:
pending_out += htlc.amount
db_channel = Channels.objects.filter(chan_id=channel.chan_id)[0]
db_channel.pending_outbound = pending_out
db_channel.pending_inbound = pending_in
db_channel.local_balance = channel.local_balance
db_channel.remote_balance = channel.remote_balance
db_channel.save()
print (f"{datetime.now().strftime('%c')} : Incoming Channel Update {channel.chan_id=} {int((channel.local_balance+pending_out)/channel.capacity*100)}% {channel.local_balance=} {pending_out=} {channel.remote_balance=} {pending_in=} {incoming_channel=}")
# Outgoing channel update. It can be potentially MPP with multuple HTLCs (some success some failed) and multiple outgoing channels.
processed_channels = []
for htlc_attempt in outgoing_channel_htlcs:
if htlc_attempt.status == 1:
outgoing_channel = htlc_attempt.route.hops[0].pub_key
outgoing_chan_id = htlc_attempt.route.hops[0].chan_id
#MPP can pass via same outgoing channel at times, skip when already processed.
if outgoing_chan_id in processed_channels:
#Already processed, continue
#print (f"{datetime.now().strftime('%c')} : Skipping .... {channel.chan_id=} in {processed_channels=}")
continue
processed_channels.append(outgoing_chan_id)
#Take care of multiple channels for same pub key
for channel in stub.ListChannels(ln.ListChannelsRequest(peer=bytes.fromhex(outgoing_channel))).channels:
pending_in = 0
pending_out = 0
if channel.chan_id == outgoing_chan_id:
if len(channel.pending_htlcs) > 0:
for htlc in channel.pending_htlcs:
if htlc.incoming == True:
pending_in += htlc.amount
else:
pending_out += htlc.amount
db_channel = Channels.objects.filter(chan_id=channel.chan_id)[0]
db_channel.pending_outbound = pending_out
db_channel.pending_inbound = pending_in
db_channel.local_balance = channel.local_balance
db_channel.remote_balance = channel.remote_balance
db_channel.save()
print (f"{datetime.now().strftime('%c')} : Outgoing Channel Update {channel.chan_id=} {int((channel.local_balance)/channel.capacity*100)}% {channel.local_balance=} {pending_out=} {channel.remote_balance=} {pending_in=} {outgoing_channel=}")
#else:
#print (f"{datetime.now().strftime('%c')} : chan id different {channel.chan_id=} {outgoing_chan_id=}")
#else:
#print (f"{datetime.now().strftime('%c')} : Failed HTLC {htlc_attempt.status=} {htlc_attempt.route.hops[0].pub_key=}")
except Exception as e:
print(f"{datetime.now().strftime('%c')} : Error updating channel balances: {str(e)}")

Expand All @@ -190,21 +242,17 @@ def auto_schedule() -> List[Rebalancer]:
enabled = 0
if enabled == 0:
return []

auto_rebalance_channels = Channels.objects.filter(is_active=True, is_open=True, private=False).annotate(percent_outbound=((Sum('local_balance')+Sum('pending_outbound'))*100)/Sum('capacity')).annotate(inbound_can=(((Sum('remote_balance')+Sum('pending_inbound'))*100)/Sum('capacity'))/Sum('ar_in_target'))
auto_rebalance_channels = Channels.objects.filter(is_active=True, is_open=True, private=False).annotate(percent_outbound=((Sum('local_balance'))*100)/Sum('capacity')).annotate(inbound_can=(((Sum('remote_balance')+Sum('pending_inbound'))*100)/Sum('capacity'))/Sum('ar_in_target'))
if len(auto_rebalance_channels) == 0:
return []

if not LocalSettings.objects.filter(key='AR-Outbound%').exists():
LocalSettings(key='AR-Outbound%', value='75').save()
if not LocalSettings.objects.filter(key='AR-Inbound%').exists():
LocalSettings(key='AR-Inbound%', value='100').save()
outbound_cans = list(auto_rebalance_channels.filter(auto_rebalance=False, percent_outbound__gte=F('ar_out_target')).values_list('chan_id', flat=True))
already_scheduled = Rebalancer.objects.exclude(last_hop_pubkey='').filter(status=0).values_list('last_hop_pubkey')
inbound_cans = auto_rebalance_channels.filter(auto_rebalance=True, inbound_can__gte=1).exclude(remote_pubkey__in=already_scheduled).order_by('-inbound_can')
if len(inbound_cans) == 0 or len(outbound_cans) == 0:
if len(inbound_cans) == 0:
return []

if LocalSettings.objects.filter(key='AR-MaxFeeRate').exists():
max_fee_rate = int(LocalSettings.objects.filter(key='AR-MaxFeeRate')[0].value)
else:
Expand All @@ -231,7 +279,6 @@ def auto_schedule() -> List[Rebalancer]:
target_fee = round(target_fee_rate*target_value*0.000001, 3) if target_fee_rate <= max_fee_rate else round(max_fee_rate*target_value*0.000001, 3)
if target_fee == 0:
continue

if LocalSettings.objects.filter(key='AR-Time').exists():
target_time = int(LocalSettings.objects.filter(key='AR-Time')[0].value)
else:
Expand All @@ -243,11 +290,10 @@ def auto_schedule() -> List[Rebalancer]:
if not (last_rebalance.status == 2 or (last_rebalance.status > 2 and (int((datetime.now() - last_rebalance.stop).total_seconds() / 60) > wait_period)) or (last_rebalance.status == 1 and ((int((datetime.now() - last_rebalance.start).total_seconds() / 60) - last_rebalance.duration) > wait_period))):
continue
print(f"{datetime.now().strftime('%c')} : Creating Auto Rebalance Request for: {target.chan_id}")
print(f"{datetime.now().strftime('%c')} : Request routing through: {outbound_cans}")
print(f"{datetime.now().strftime('%c')} : {target_value} / {target.ar_amt_target}")
print(f"{datetime.now().strftime('%c')} : {target_fee}")
print(f"{datetime.now().strftime('%c')} : {target_time}")
new_rebalance = Rebalancer(value=target_value, fee_limit=target_fee, outgoing_chan_ids=str(outbound_cans).replace('\'', ''), last_hop_pubkey=target.remote_pubkey, target_alias=target.alias, duration=target_time)
new_rebalance = Rebalancer(value=target_value, fee_limit=target_fee, outgoing_chan_ids=' ', last_hop_pubkey=target.remote_pubkey, target_alias=target.alias, duration=target_time)
new_rebalance.save()
to_schedule.append(new_rebalance)
return to_schedule
Expand All @@ -270,6 +316,7 @@ def auto_enable():
apdays = 7
if enabled == 1:
lookup_channels=Channels.objects.filter(is_active=True, is_open=True, private=False)
#Channel balance is not considerred changed until settled (pending balance is added to respective side as if not pending). This is to avoid flip flops enable/disable.
channels = lookup_channels.values('remote_pubkey').annotate(outbound_percent=((Sum('local_balance')+Sum('pending_outbound'))*1000)/Sum('capacity')).annotate(inbound_percent=((Sum('remote_balance')+Sum('pending_inbound'))*1000)/Sum('capacity')).order_by()
filter_day = datetime.now() - timedelta(days=apdays)
forwards = Forwards.objects.filter(forward_date__gte=filter_day)
Expand Down