diff --git a/gui/templates/rebalances_table.html b/gui/templates/rebalances_table.html index 57abffc9..e086e96c 100644 --- a/gui/templates/rebalances_table.html +++ b/gui/templates/rebalances_table.html @@ -32,7 +32,7 @@

{% if status %}Successful{% else %}Last{% endif %} {{title}}

function formatDuration(duration){ return `${duration} minute${duration === 1 ? 's':''}` } - statuses = {0: 'Pending', 1: 'In-Flight', 2: 'Successful', 3: 'Timeout', 4: 'No-Route', 5: 'Error', 6: 'Incorrect-Payment-Details', 7: 'Insufficient-Balance', 400: 'Rebalancer-Request-Failed', 406: 'No-Sources', 408: 'Rebalancer-Request-Timeout', 499: 'Cancelled'} + statuses = {0: 'Pending', 1: 'In-Flight', 2: 'Successful', 3: 'Timeout', 4: 'No-Route', 5: 'Error', 6: 'Incorrect-Payment-Details', 7: 'Insufficient-Balance', 400: 'Rebalancer-Request-Failed', 406: 'No-Sources', 407: 'AR-Not-Required', 408: 'Rebalancer-Request-Timeout', 499: 'Cancelled'} transformations = { 'requested': rebalance => ({innerHTML: formatDate(rebalance.requested), title: adjustTZ(rebalance.requested)}), 'start': rebalance => ({innerHTML: formatDate(rebalance.start), title: rebalance.start ? adjustTZ(rebalance.start) : '---'}), diff --git a/rebalancer.py b/rebalancer.py index 7773295a..d76770dd 100644 --- a/rebalancer.py +++ b/rebalancer.py @@ -38,7 +38,8 @@ def inbound_cans_len(inbound_cans): async def run_rebalancer(rebalance, worker): try: #Reduce potential rebalance value in percent out to avoid going below AR-OUT-Target - auto_rebalance_channels = Channels.objects.filter(is_active=True, is_open=True, private=False).annotate(percent_outbound=((Sum('local_balance')+Sum('pending_outbound')-rebalance.value)*100)/Sum('capacity')).annotate(inbound_can=(((Sum('remote_balance')+Sum('pending_inbound'))*100)/Sum('capacity'))/Sum('ar_in_target')) + #Only considering local balance for outbound_cans. This is conservative and avoids channel depleting beyond ar-out target if all out going htlcs settle. For inbound, pending inbound is considerred in remote balance. + auto_rebalance_channels = Channels.objects.filter(is_active=True, is_open=True, private=False).annotate(percent_outbound=((Sum('local_balance')-rebalance.value)*100)/Sum('capacity')).annotate(inbound_can=(((Sum('remote_balance')+Sum('pending_inbound'))*100)/Sum('capacity'))/Sum('ar_in_target')) outbound_cans = await get_out_cans(rebalance, auto_rebalance_channels) if len(outbound_cans) == 0 and rebalance.manual == False: print(f"{datetime.now().strftime('%c')} : No outbound_cans") @@ -48,7 +49,19 @@ async def run_rebalancer(rebalance, worker): await save_record(rebalance) return None elif str(outbound_cans).replace('\'', '') != rebalance.outgoing_chan_ids and rebalance.manual == False: + #print (f"{datetime.now().strftime('%c')} : Outbound chans are changed, replacing old {rebalance.outgoing_chan_ids=} with new {outbound_cans=}") rebalance.outgoing_chan_ids = str(outbound_cans).replace('\'', '') + + #Check if rebalance still required or if things changed while waiting.. + inbound_cans = auto_rebalance_channels.filter(remote_pubkey=rebalance.last_hop_pubkey).filter(auto_rebalance=True, inbound_can__gte=1) + if await inbound_cans_len(inbound_cans) == 0 and rebalance.manual == False: + print (f"{datetime.now().strftime('%c')} : AR Not Required ..") + rebalance.status = 407 + rebalance.start = datetime.now() + rebalance.stop = datetime.now() + await save_record(rebalance) + return None + rebalance.start = datetime.now() try: #Open connection with lnd via grpc @@ -69,7 +82,7 @@ async def run_rebalancer(rebalance, worker): #SUCCESSFUL rebalance.status = 2 rebalance.fees_paid = payment_response.fee_msat/1000 - successful_out = payment_response.htlcs[0].route.hops[0].pub_key + successful_out_htlcs = payment_response.htlcs elif payment_response.status == 3: #FAILURE if payment_response.failure_reason == 1: @@ -103,13 +116,13 @@ async def run_rebalancer(rebalance, worker): inc=1.21 dec=2 if rebalance.status ==2: - await update_channels(stub, rebalance.last_hop_pubkey, successful_out) + await update_channels(stub, rebalance.last_hop_pubkey, successful_out_htlcs) #Reduce potential rebalance value in percent out to avoid going below AR-OUT-Target - auto_rebalance_channels = Channels.objects.filter(is_active=True, is_open=True, private=False).annotate(percent_outbound=((Sum('local_balance')+Sum('pending_outbound')-rebalance.value*inc)*100)/Sum('capacity')).annotate(inbound_can=(((Sum('remote_balance')+Sum('pending_inbound'))*100)/Sum('capacity'))/Sum('ar_in_target')) + auto_rebalance_channels = Channels.objects.filter(is_active=True, is_open=True, private=False).annotate(percent_outbound=((Sum('local_balance')-rebalance.value*inc)*100)/Sum('capacity')).annotate(inbound_can=(((Sum('remote_balance')+Sum('pending_inbound'))*100)/Sum('capacity'))/Sum('ar_in_target')) inbound_cans = auto_rebalance_channels.filter(remote_pubkey=rebalance.last_hop_pubkey).filter(auto_rebalance=True, inbound_can__gte=1) - outbound_cans = await get_out_cans(rebalance, auto_rebalance_channels) - if await inbound_cans_len(inbound_cans) > 0 and len(outbound_cans) > 0: - next_rebalance = Rebalancer(value=int(rebalance.value*inc), fee_limit=round(rebalance.fee_limit*inc, 3), outgoing_chan_ids=str(outbound_cans).replace('\'', ''), last_hop_pubkey=rebalance.last_hop_pubkey, target_alias=original_alias, duration=1) + if await inbound_cans_len(inbound_cans) > 0: + #Set with blank outgoing_chan_ids which are selected at run time. + next_rebalance = Rebalancer(value=int(rebalance.value*inc), fee_limit=round(rebalance.fee_limit*inc, 3), outgoing_chan_ids=' ', last_hop_pubkey=rebalance.last_hop_pubkey, target_alias=original_alias, duration=1) await save_record(next_rebalance) print(f"{datetime.now().strftime('%c')} : RapidFire up {next_rebalance.target_alias=} {next_rebalance.value=} {rebalance.value=}") else: @@ -126,8 +139,8 @@ async def run_rebalancer(rebalance, worker): next_value = rebalance.value/dec inbound_cans = auto_rebalance_channels.filter(remote_pubkey=rebalance.last_hop_pubkey).filter(auto_rebalance=True, inbound_can__gte=1) - if await inbound_cans_len(inbound_cans) > 0 and len(outbound_cans) > 0: - next_rebalance = Rebalancer(value=int(next_value), fee_limit=round(rebalance.fee_limit/(rebalance.value/next_value), 3), outgoing_chan_ids=str(outbound_cans).replace('\'', ''), last_hop_pubkey=rebalance.last_hop_pubkey, target_alias=original_alias, duration=1) + if await inbound_cans_len(inbound_cans) > 0: + next_rebalance = Rebalancer(value=int(next_value), fee_limit=round(rebalance.fee_limit/(rebalance.value/next_value), 3), outgoing_chan_ids=' ', last_hop_pubkey=rebalance.last_hop_pubkey, target_alias=original_alias, duration=1) await save_record(next_rebalance) print(f"{datetime.now().strftime('%c')} : RapidFire Down {next_rebalance.target_alias=} {next_rebalance.value=} {rebalance.value=}") else: @@ -161,20 +174,59 @@ def estimate_liquidity( payment ): return estimated_liquidity @sync_to_async -def update_channels(stub, incoming_channel, outgoing_channel): +def update_channels(stub, incoming_channel, outgoing_channel_htlcs): try: # Incoming channel update - channel = stub.ListChannels(ln.ListChannelsRequest(peer=bytes.fromhex(incoming_channel))).channels[0] - db_channel = Channels.objects.filter(chan_id=channel.chan_id)[0] - db_channel.local_balance = channel.local_balance - db_channel.remote_balance = channel.remote_balance - db_channel.save() - # Outgoing channel update - channel = stub.ListChannels(ln.ListChannelsRequest(peer=bytes.fromhex(outgoing_channel))).channels[0] - db_channel = Channels.objects.filter(chan_id=channel.chan_id)[0] - db_channel.local_balance = channel.local_balance - db_channel.remote_balance = channel.remote_balance - db_channel.save() + for channel in stub.ListChannels(ln.ListChannelsRequest(peer=bytes.fromhex(incoming_channel))).channels: + pending_in = 0 + pending_out = 0 + if len(channel.pending_htlcs) > 0: + for htlc in channel.pending_htlcs: + if htlc.incoming == True: + pending_in += htlc.amount + else: + pending_out += htlc.amount + db_channel = Channels.objects.filter(chan_id=channel.chan_id)[0] + db_channel.pending_outbound = pending_out + db_channel.pending_inbound = pending_in + db_channel.local_balance = channel.local_balance + db_channel.remote_balance = channel.remote_balance + db_channel.save() + print (f"{datetime.now().strftime('%c')} : Incoming Channel Update {channel.chan_id=} {int((channel.local_balance+pending_out)/channel.capacity*100)}% {channel.local_balance=} {pending_out=} {channel.remote_balance=} {pending_in=} {incoming_channel=}") + # Outgoing channel update. It can be potentially MPP with multuple HTLCs (some success some failed) and multiple outgoing channels. + processed_channels = [] + for htlc_attempt in outgoing_channel_htlcs: + if htlc_attempt.status == 1: + outgoing_channel = htlc_attempt.route.hops[0].pub_key + outgoing_chan_id = htlc_attempt.route.hops[0].chan_id + #MPP can pass via same outgoing channel at times, skip when already processed. + if outgoing_chan_id in processed_channels: + #Already processed, continue + #print (f"{datetime.now().strftime('%c')} : Skipping .... {channel.chan_id=} in {processed_channels=}") + continue + processed_channels.append(outgoing_chan_id) + #Take care of multiple channels for same pub key + for channel in stub.ListChannels(ln.ListChannelsRequest(peer=bytes.fromhex(outgoing_channel))).channels: + pending_in = 0 + pending_out = 0 + if channel.chan_id == outgoing_chan_id: + if len(channel.pending_htlcs) > 0: + for htlc in channel.pending_htlcs: + if htlc.incoming == True: + pending_in += htlc.amount + else: + pending_out += htlc.amount + db_channel = Channels.objects.filter(chan_id=channel.chan_id)[0] + db_channel.pending_outbound = pending_out + db_channel.pending_inbound = pending_in + db_channel.local_balance = channel.local_balance + db_channel.remote_balance = channel.remote_balance + db_channel.save() + print (f"{datetime.now().strftime('%c')} : Outgoing Channel Update {channel.chan_id=} {int((channel.local_balance)/channel.capacity*100)}% {channel.local_balance=} {pending_out=} {channel.remote_balance=} {pending_in=} {outgoing_channel=}") + #else: + #print (f"{datetime.now().strftime('%c')} : chan id different {channel.chan_id=} {outgoing_chan_id=}") + #else: + #print (f"{datetime.now().strftime('%c')} : Failed HTLC {htlc_attempt.status=} {htlc_attempt.route.hops[0].pub_key=}") except Exception as e: print(f"{datetime.now().strftime('%c')} : Error updating channel balances: {str(e)}") @@ -190,21 +242,17 @@ def auto_schedule() -> List[Rebalancer]: enabled = 0 if enabled == 0: return [] - - auto_rebalance_channels = Channels.objects.filter(is_active=True, is_open=True, private=False).annotate(percent_outbound=((Sum('local_balance')+Sum('pending_outbound'))*100)/Sum('capacity')).annotate(inbound_can=(((Sum('remote_balance')+Sum('pending_inbound'))*100)/Sum('capacity'))/Sum('ar_in_target')) + auto_rebalance_channels = Channels.objects.filter(is_active=True, is_open=True, private=False).annotate(percent_outbound=((Sum('local_balance'))*100)/Sum('capacity')).annotate(inbound_can=(((Sum('remote_balance')+Sum('pending_inbound'))*100)/Sum('capacity'))/Sum('ar_in_target')) if len(auto_rebalance_channels) == 0: return [] - if not LocalSettings.objects.filter(key='AR-Outbound%').exists(): LocalSettings(key='AR-Outbound%', value='75').save() if not LocalSettings.objects.filter(key='AR-Inbound%').exists(): LocalSettings(key='AR-Inbound%', value='100').save() - outbound_cans = list(auto_rebalance_channels.filter(auto_rebalance=False, percent_outbound__gte=F('ar_out_target')).values_list('chan_id', flat=True)) already_scheduled = Rebalancer.objects.exclude(last_hop_pubkey='').filter(status=0).values_list('last_hop_pubkey') inbound_cans = auto_rebalance_channels.filter(auto_rebalance=True, inbound_can__gte=1).exclude(remote_pubkey__in=already_scheduled).order_by('-inbound_can') - if len(inbound_cans) == 0 or len(outbound_cans) == 0: + if len(inbound_cans) == 0: return [] - if LocalSettings.objects.filter(key='AR-MaxFeeRate').exists(): max_fee_rate = int(LocalSettings.objects.filter(key='AR-MaxFeeRate')[0].value) else: @@ -231,7 +279,6 @@ def auto_schedule() -> List[Rebalancer]: target_fee = round(target_fee_rate*target_value*0.000001, 3) if target_fee_rate <= max_fee_rate else round(max_fee_rate*target_value*0.000001, 3) if target_fee == 0: continue - if LocalSettings.objects.filter(key='AR-Time').exists(): target_time = int(LocalSettings.objects.filter(key='AR-Time')[0].value) else: @@ -243,11 +290,10 @@ def auto_schedule() -> List[Rebalancer]: if not (last_rebalance.status == 2 or (last_rebalance.status > 2 and (int((datetime.now() - last_rebalance.stop).total_seconds() / 60) > wait_period)) or (last_rebalance.status == 1 and ((int((datetime.now() - last_rebalance.start).total_seconds() / 60) - last_rebalance.duration) > wait_period))): continue print(f"{datetime.now().strftime('%c')} : Creating Auto Rebalance Request for: {target.chan_id}") - print(f"{datetime.now().strftime('%c')} : Request routing through: {outbound_cans}") print(f"{datetime.now().strftime('%c')} : {target_value} / {target.ar_amt_target}") print(f"{datetime.now().strftime('%c')} : {target_fee}") print(f"{datetime.now().strftime('%c')} : {target_time}") - new_rebalance = Rebalancer(value=target_value, fee_limit=target_fee, outgoing_chan_ids=str(outbound_cans).replace('\'', ''), last_hop_pubkey=target.remote_pubkey, target_alias=target.alias, duration=target_time) + new_rebalance = Rebalancer(value=target_value, fee_limit=target_fee, outgoing_chan_ids=' ', last_hop_pubkey=target.remote_pubkey, target_alias=target.alias, duration=target_time) new_rebalance.save() to_schedule.append(new_rebalance) return to_schedule @@ -270,6 +316,7 @@ def auto_enable(): apdays = 7 if enabled == 1: lookup_channels=Channels.objects.filter(is_active=True, is_open=True, private=False) + #Channel balance is not considerred changed until settled (pending balance is added to respective side as if not pending). This is to avoid flip flops enable/disable. channels = lookup_channels.values('remote_pubkey').annotate(outbound_percent=((Sum('local_balance')+Sum('pending_outbound'))*1000)/Sum('capacity')).annotate(inbound_percent=((Sum('remote_balance')+Sum('pending_inbound'))*1000)/Sum('capacity')).order_by() filter_day = datetime.now() - timedelta(days=apdays) forwards = Forwards.objects.filter(forward_date__gte=filter_day)