diff --git a/build.gradle b/build.gradle index a56be97afa1..f5b144657f3 100644 --- a/build.gradle +++ b/build.gradle @@ -39,7 +39,6 @@ subprojects { compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.25' compile group: 'org.slf4j', name: 'jcl-over-slf4j', version: '1.7.25' compile group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.9' - compile group: 'com.google.guava', name: 'guava', version: '30.1-jre' compile "com.google.code.findbugs:jsr305:3.0.0" compile group: 'org.springframework', name: 'spring-context', version: '5.3.18' compile group: 'org.springframework', name: 'spring-tx', version: '5.3.18' @@ -68,12 +67,6 @@ subprojects { preserveFileTimestamps = false reproducibleFileOrder = true } - - configurations.all { - resolutionStrategy { - force group: 'com.google.guava', name: 'guava', version: '30.1-jre' - } - } } task copyToParent(type: Copy) { diff --git a/chainbase/src/main/java/org/tron/core/ChainBaseManager.java b/chainbase/src/main/java/org/tron/core/ChainBaseManager.java index e43d442534a..d148021f6c4 100644 --- a/chainbase/src/main/java/org/tron/core/ChainBaseManager.java +++ b/chainbase/src/main/java/org/tron/core/ChainBaseManager.java @@ -244,6 +244,10 @@ public class ChainBaseManager { @Setter private long lowestBlockNum = -1; // except num = 0. + @Getter + @Setter + private long latestSaveBlockTime; + // for test only public List getWitnesses() { return witnessScheduleStore.getActiveWitnesses(); @@ -381,6 +385,7 @@ private void init() { this.lowestBlockNum = this.blockIndexStore.getLimitNumber(1, 1).stream() .map(BlockId::getNum).findFirst().orElse(0L); this.nodeType = getLowestBlockNum() > 1 ? NodeType.LITE : NodeType.FULL; + this.latestSaveBlockTime = System.currentTimeMillis(); } public void shutdown() { diff --git a/chainbase/src/main/java/org/tron/core/capsule/TransactionCapsule.java b/chainbase/src/main/java/org/tron/core/capsule/TransactionCapsule.java index c0a0fa4d88c..2b5e8b98596 100755 --- a/chainbase/src/main/java/org/tron/core/capsule/TransactionCapsule.java +++ b/chainbase/src/main/java/org/tron/core/capsule/TransactionCapsule.java @@ -59,6 +59,7 @@ import org.tron.core.exception.P2pException; import org.tron.core.exception.PermissionException; import org.tron.core.exception.SignatureFormatException; +import org.tron.core.exception.TransactionExpirationException; import org.tron.core.exception.ValidateSignatureException; import org.tron.core.store.AccountStore; import org.tron.core.store.DynamicPropertiesStore; @@ -869,4 +870,12 @@ public void removeRedundantRet() { this.transaction = transactionBuilder.build(); } } + + public void checkExpiration(long nextSlotTime) throws TransactionExpirationException { + if (getExpiration() < nextSlotTime) { + throw new TransactionExpirationException(String.format( + "Transaction expiration time is %d, but next slot time is %d", + getExpiration(), nextSlotTime)); + } + } } diff --git a/common/build.gradle b/common/build.gradle index 6c1545e5d13..8c651e48455 100644 --- a/common/build.gradle +++ b/common/build.gradle @@ -46,7 +46,7 @@ dependencies { compile 'org.aspectj:aspectjrt:1.8.13' compile 'org.aspectj:aspectjweaver:1.8.13' compile 'org.aspectj:aspectjtools:1.8.13' - compile group: 'io.github.tronprotocol', name: 'libp2p', version: '2.2.1',{ + compile group: 'io.github.tronprotocol', name: 'libp2p', version: '2.2.4',{ exclude group: 'io.grpc', module: 'grpc-context' exclude group: 'io.grpc', module: 'grpc-core' exclude group: 'io.grpc', module: 'grpc-netty' diff --git a/common/src/main/java/org/tron/common/parameter/CommonParameter.java b/common/src/main/java/org/tron/common/parameter/CommonParameter.java index 22159063333..62ed12d856c 100644 --- a/common/src/main/java/org/tron/common/parameter/CommonParameter.java +++ b/common/src/main/java/org/tron/common/parameter/CommonParameter.java @@ -333,6 +333,9 @@ public class CommonParameter { public boolean isOpenFullTcpDisconnect; @Getter @Setter + public int inactiveThreshold; + @Getter + @Setter public boolean nodeDetectEnable; @Getter @Setter diff --git a/common/src/main/java/org/tron/core/Constant.java b/common/src/main/java/org/tron/core/Constant.java index 0e634d3ef7d..da3b2b1becc 100644 --- a/common/src/main/java/org/tron/core/Constant.java +++ b/common/src/main/java/org/tron/core/Constant.java @@ -198,6 +198,8 @@ public class Constant { public static final String NODE_IS_OPEN_FULL_TCP_DISCONNECT = "node.isOpenFullTcpDisconnect"; + public static final String NODE_INACTIVE_THRESHOLD = "node.inactiveThreshold"; + public static final String NODE_DETECT_ENABLE = "node.nodeDetectEnable"; public static final String NODE_MAX_TRANSACTION_PENDING_SIZE = "node.maxTransactionPendingSize"; diff --git a/framework/build.gradle b/framework/build.gradle index 8c4fbfc4583..a297faed7a9 100644 --- a/framework/build.gradle +++ b/framework/build.gradle @@ -1,4 +1,5 @@ plugins { + id "org.gradle.test-retry" version "1.5.9" id "org.sonarqube" version "2.6" id "com.gorylenko.gradle-git-properties" version "2.4.1" } @@ -113,6 +114,10 @@ run { } test { + retry { + maxRetries = 5 + maxFailures = 20 + } testLogging { exceptionFormat = 'full' } diff --git a/framework/src/main/java/org/tron/core/Wallet.java b/framework/src/main/java/org/tron/core/Wallet.java index 9d7eb75df1b..769274e8f2a 100755 --- a/framework/src/main/java/org/tron/core/Wallet.java +++ b/framework/src/main/java/org/tron/core/Wallet.java @@ -549,6 +549,7 @@ public GrpcAPI.Return broadcastTransaction(Transaction signedTransaction) { throw new ContractValidateException(ActuatorConstant.CONTRACT_NOT_EXIST); } TransactionMessage message = new TransactionMessage(trx.getInstance().toByteArray()); + trx.checkExpiration(tronNetDelegate.getNextBlockSlotTime()); dbManager.pushTransaction(trx); int num = tronNetService.fastBroadcastTransaction(message); if (num == 0 && minEffectiveConnection != 0) { diff --git a/framework/src/main/java/org/tron/core/config/args/Args.java b/framework/src/main/java/org/tron/core/config/args/Args.java index 422efefaed8..00142733f74 100644 --- a/framework/src/main/java/org/tron/core/config/args/Args.java +++ b/framework/src/main/java/org/tron/core/config/args/Args.java @@ -173,6 +173,7 @@ public static void clearParam() { PARAMETER.receiveTcpMinDataLength = 2048; PARAMETER.isOpenFullTcpDisconnect = false; PARAMETER.nodeDetectEnable = false; + PARAMETER.inactiveThreshold = 600; PARAMETER.supportConstant = false; PARAMETER.debug = false; PARAMETER.minTimeRatio = 0.0; @@ -845,6 +846,12 @@ public static void setParam(final String[] args, final String confFileName) { PARAMETER.nodeDetectEnable = config.hasPath(Constant.NODE_DETECT_ENABLE) && config.getBoolean(Constant.NODE_DETECT_ENABLE); + PARAMETER.inactiveThreshold = config.hasPath(Constant.NODE_INACTIVE_THRESHOLD) + ? config.getInt(Constant.NODE_INACTIVE_THRESHOLD) : 600; + if (PARAMETER.inactiveThreshold < 1) { + PARAMETER.inactiveThreshold = 1; + } + PARAMETER.maxTransactionPendingSize = config.hasPath(Constant.NODE_MAX_TRANSACTION_PENDING_SIZE) ? config.getInt(Constant.NODE_MAX_TRANSACTION_PENDING_SIZE) : 2000; diff --git a/framework/src/main/java/org/tron/core/db/Manager.java b/framework/src/main/java/org/tron/core/db/Manager.java index ef2f5c81124..43e5838d1d5 100644 --- a/framework/src/main/java/org/tron/core/db/Manager.java +++ b/framework/src/main/java/org/tron/core/db/Manager.java @@ -1384,6 +1384,7 @@ public void updateDynamicProperties(BlockCapsule block) { (chainBaseManager.getDynamicPropertiesStore().getLatestBlockHeaderNumber() - chainBaseManager.getDynamicPropertiesStore().getLatestSolidifiedBlockNum() + 1)); + chainBaseManager.setLatestSaveBlockTime(System.currentTimeMillis()); Metrics.gaugeSet(MetricKeys.Gauge.HEADER_HEIGHT, block.getNum()); Metrics.gaugeSet(MetricKeys.Gauge.HEADER_TIME, block.getTimeStamp()); } @@ -1568,6 +1569,7 @@ public BlockCapsule generateBlock(Miner miner, long blockTime, long timeout) { List toBePacked = new ArrayList<>(); long currentSize = blockCapsule.getInstance().getSerializedSize(); boolean isSort = Args.getInstance().isOpenTransactionSort(); + int[] logSize = new int[] {pendingTransactions.size(), rePushTransactions.size(), 0, 0}; while (pendingTransactions.size() > 0 || rePushTransactions.size() > 0) { boolean fromPending = false; TransactionCapsule trx; @@ -1643,6 +1645,11 @@ public BlockCapsule generateBlock(Miner miner, long blockTime, long timeout) { tmpSession.merge(); toBePacked.add(trx); currentSize += trxPackSize; + if (fromPending) { + logSize[2] += 1; + } else { + logSize[3] += 1; + } } catch (Exception e) { logger.warn("Process trx {} failed when generating block {}, {}.", trx.getTransactionId(), blockCapsule.getNum(), e.getMessage()); @@ -1659,11 +1666,14 @@ public BlockCapsule generateBlock(Miner miner, long blockTime, long timeout) { BlockCapsule capsule = new BlockCapsule(blockCapsule.getInstance()); capsule.generatedByMyself = true; Metrics.histogramObserve(timer); - logger.info("Generate block {} success, trxs:{}, pendingCount: {}, rePushCount: {}," - + " postponedCount: {}, blockSize: {} B", - capsule.getNum(), capsule.getTransactions().size(), - pendingTransactions.size(), rePushTransactions.size(), postponedTrxCount, - capsule.getSerializedSize()); + logger.info("Generate block {} success, trxs:{}, before pendingCount: {}, rePushCount: {}, " + + "from pending: {}, rePush: {}, after pendingCount: {}, rePushCount: {}, " + + "postponedCount: {}, blockSize: {} B", + capsule.getNum(), capsule.getTransactions().size(), + logSize[0], logSize[1], logSize[2], logSize[3], + pendingTransactions.size(), rePushTransactions.size(), postponedTrxCount, + capsule.getSerializedSize()); + return capsule; } diff --git a/framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java b/framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java index 7518b1347a7..795c90b4edd 100644 --- a/framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java +++ b/framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java @@ -19,6 +19,7 @@ import org.tron.core.net.message.PbftMessageFactory; import org.tron.core.net.message.TronMessage; import org.tron.core.net.message.TronMessageFactory; +import org.tron.core.net.message.adv.FetchInvDataMessage; import org.tron.core.net.message.adv.InventoryMessage; import org.tron.core.net.message.base.DisconnectMessage; import org.tron.core.net.message.handshake.HelloMessage; @@ -38,7 +39,7 @@ import org.tron.p2p.P2pEventHandler; import org.tron.p2p.connection.Channel; import org.tron.protos.Protocol; -import org.tron.protos.Protocol.ReasonCode; +import org.tron.protos.Protocol.Inventory.InventoryType; @Slf4j(topic = "net") @Component @@ -205,6 +206,7 @@ private void processMessage(PeerConnection peer, byte[] data) { default: throw new P2pException(P2pException.TypeEnum.NO_SUCH_MESSAGE, msg.getType().toString()); } + updateLastInteractiveTime(peer, msg); } catch (Exception e) { processException(peer, msg, e); } finally { @@ -220,6 +222,27 @@ private void processMessage(PeerConnection peer, byte[] data) { } } + private void updateLastInteractiveTime(PeerConnection peer, TronMessage msg) { + MessageTypes type = msg.getType(); + + boolean flag = false; + switch (type) { + case SYNC_BLOCK_CHAIN: + case BLOCK_CHAIN_INVENTORY: + case BLOCK: + flag = true; + break; + case FETCH_INV_DATA: + flag = ((FetchInvDataMessage) msg).getInventoryType().equals(InventoryType.BLOCK); + break; + default: + break; + } + if (flag) { + peer.setLastInteractiveTime(System.currentTimeMillis()); + } + } + private void processException(PeerConnection peer, TronMessage msg, Exception ex) { Protocol.ReasonCode code; diff --git a/framework/src/main/java/org/tron/core/net/TronNetDelegate.java b/framework/src/main/java/org/tron/core/net/TronNetDelegate.java index cd9c8f01d8b..a6f9812a2d7 100644 --- a/framework/src/main/java/org/tron/core/net/TronNetDelegate.java +++ b/framework/src/main/java/org/tron/core/net/TronNetDelegate.java @@ -380,4 +380,12 @@ public boolean isBlockUnsolidified() { return headNum - solidNum >= maxUnsolidifiedBlocks; } + public long getNextBlockSlotTime() { + long slotCount = 1; + if (chainBaseManager.getDynamicPropertiesStore().getStateFlag() == 1) { + slotCount += chainBaseManager.getDynamicPropertiesStore().getMaintenanceSkipSlots(); + } + return chainBaseManager.getDynamicPropertiesStore().getLatestBlockHeaderTimestamp() + + slotCount * BLOCK_PRODUCED_INTERVAL; + } } diff --git a/framework/src/main/java/org/tron/core/net/TronNetService.java b/framework/src/main/java/org/tron/core/net/TronNetService.java index 03becf5d4e9..5b99f94c0db 100644 --- a/framework/src/main/java/org/tron/core/net/TronNetService.java +++ b/framework/src/main/java/org/tron/core/net/TronNetService.java @@ -22,6 +22,7 @@ import org.tron.core.net.peer.PeerStatusCheck; import org.tron.core.net.service.adv.AdvService; import org.tron.core.net.service.effective.EffectiveCheckService; +import org.tron.core.net.service.effective.ResilienceService; import org.tron.core.net.service.fetchblock.FetchBlockService; import org.tron.core.net.service.nodepersist.NodePersistService; import org.tron.core.net.service.relay.RelayService; @@ -50,6 +51,9 @@ public class TronNetService { @Autowired private PeerStatusCheck peerStatusCheck; + @Autowired + private ResilienceService resilienceService; + @Autowired private TransactionsMsgHandler transactionsMsgHandler; @@ -88,6 +92,7 @@ public void start() { advService.init(); syncService.init(); peerStatusCheck.init(); + resilienceService.init(); transactionsMsgHandler.init(); fetchBlockService.init(); nodePersistService.init(); @@ -110,6 +115,7 @@ public void close() { nodePersistService.close(); advService.close(); syncService.close(); + resilienceService.close(); peerStatusCheck.close(); transactionsMsgHandler.close(); fetchBlockService.close(); @@ -177,7 +183,7 @@ private P2pConfig updateConfig(P2pConfig config) { config.setMaxConnectionsWithSameIp(parameter.getMaxConnectionsWithSameIp()); config.setPort(parameter.getNodeListenPort()); config.setNetworkId(parameter.getNodeP2pVersion()); - config.setDisconnectionPolicyEnable(parameter.isOpenFullTcpDisconnect()); + config.setDisconnectionPolicyEnable(false); config.setNodeDetectEnable(parameter.isNodeDetectEnable()); config.setDiscoverEnable(parameter.isNodeDiscoveryEnable()); if (StringUtils.isEmpty(config.getIp()) && hasIpv4Stack(NetUtil.getAllLocalAddress())) { diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/BlockMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/BlockMsgHandler.java index 926ed1a01ca..dc886517476 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/BlockMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/BlockMsgHandler.java @@ -151,6 +151,13 @@ private void processBlock(PeerConnection peer, BlockCapsule block) throws P2pExc try { tronNetDelegate.processBlock(block, false); witnessProductBlockService.validWitnessProductTwoBlock(block); + + Item item = new Item(blockId, InventoryType.BLOCK); + tronNetDelegate.getActivePeer().forEach(p -> { + if (p.getAdvInvReceive().getIfPresent(item) != null) { + p.setBlockBothHave(blockId); + } + }); } catch (Exception e) { logger.warn("Process adv block {} from peer {} failed. reason: {}", blockId, peer.getInetAddress(), e.getMessage()); diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java index 5e797c084b3..5415ea435e3 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java @@ -164,6 +164,10 @@ private void check(PeerConnection peer, FetchInvDataMessage fetchInvDataMsg) thr throw new P2pException(TypeEnum.BAD_MESSAGE, "minBlockNum: " + minBlockNum + ", blockNum: " + blockNum); } + if (blockNum > peer.getLastSyncBlockId().getNum()) { + throw new P2pException(TypeEnum.BAD_MESSAGE, + "maxBlockNum: " + peer.getLastSyncBlockId().getNum() + ", blockNum: " + blockNum); + } if (peer.getSyncBlockIdCache().getIfPresent(hash) != null) { throw new P2pException(TypeEnum.BAD_MESSAGE, new BlockId(hash).getString() + " is exist"); diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/InventoryMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/InventoryMsgHandler.java index a8ad8d0ec73..e8783b25e95 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/InventoryMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/InventoryMsgHandler.java @@ -39,6 +39,9 @@ public void processMessage(PeerConnection peer, TronMessage msg) { Item item = new Item(id, type); peer.getAdvInvReceive().put(item, System.currentTimeMillis()); advService.addInv(item); + if (type.equals(InventoryType.BLOCK) && peer.getAdvInvSpread().getIfPresent(item) == null) { + peer.setLastInteractiveTime(System.currentTimeMillis()); + } } } diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandler.java index 958ebfe5561..f575253c50c 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandler.java @@ -33,7 +33,6 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep peer.disconnect(Protocol.ReasonCode.BAD_PROTOCOL); return; } - long remainNum = 0; List summaryChainIds = syncBlockChainMessage.getBlockIds(); diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java index 665381b31a8..0c58fe08cd6 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java @@ -13,6 +13,7 @@ import org.tron.core.config.args.Args; import org.tron.core.exception.P2pException; import org.tron.core.exception.P2pException.TypeEnum; +import org.tron.core.exception.TransactionExpirationException; import org.tron.core.net.TronNetDelegate; import org.tron.core.net.message.TronMessage; import org.tron.core.net.message.adv.TransactionMessage; @@ -128,6 +129,7 @@ private void handleTransaction(PeerConnection peer, TransactionMessage trx) { } try { + trx.getTransactionCapsule().checkExpiration(tronNetDelegate.getNextBlockSlotTime()); tronNetDelegate.pushTransaction(trx.getTransactionCapsule()); advService.broadcast(trx); } catch (P2pException e) { @@ -137,6 +139,9 @@ private void handleTransaction(PeerConnection peer, TransactionMessage trx) { peer.setBadPeer(true); peer.disconnect(ReasonCode.BAD_TX); } + } catch (TransactionExpirationException e) { + logger.warn("{}. trx: {}, peer: {}", + e.getMessage(), trx.getMessageId(), peer.getInetAddress()); } catch (Exception e) { logger.error("Trx {} from peer {} process failed", trx.getMessageId(), peer.getInetAddress(), e); diff --git a/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java b/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java index 4fac50b82c7..2e08e105bed 100644 --- a/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java +++ b/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java @@ -79,6 +79,10 @@ public class PeerConnection { @Setter private ByteString address; + @Getter + @Setter + private volatile long lastInteractiveTime; + @Getter @Setter private TronState tronState = TronState.INIT; @@ -159,6 +163,7 @@ public void setChannel(Channel channel) { this.isRelayPeer = true; } this.nodeStatistics = TronStatsManager.getNodeStatistics(channel.getInetAddress()); + lastInteractiveTime = System.currentTimeMillis(); } public void setBlockBothHave(BlockId blockId) { @@ -167,7 +172,11 @@ public void setBlockBothHave(BlockId blockId) { } public boolean isIdle() { - return advInvRequest.isEmpty() && syncBlockRequested.isEmpty() && syncChainRequested == null; + return advInvRequest.isEmpty() && isSyncIdle(); + } + + public boolean isSyncIdle() { + return syncBlockRequested.isEmpty() && syncChainRequested == null; } public void sendMessage(Message message) { @@ -221,11 +230,13 @@ public String log() { + "syncBlockRequestedSize:%d\n" + "remainNum:%d\n" + "syncChainRequested:%d\n" + + "inactiveSeconds:%d\n" + "blockInProcess:%d\n", channel.getInetSocketAddress(), (now - channel.getStartTime()) / Constant.ONE_THOUSAND, channel.getAvgLatency(), - fastForwardBlock != null ? fastForwardBlock.getNum() : blockBothHave.getNum(), + fastForwardBlock != null ? fastForwardBlock.getNum() : String.format("%d [%ds]", + blockBothHave.getNum(), (now - blockBothHaveUpdateTime) / Constant.ONE_THOUSAND), isNeedSyncFromPeer(), isNeedSyncFromUs(), syncBlockToFetch.size(), @@ -234,6 +245,7 @@ public String log() { remainNum, requested == null ? 0 : (now - requested.getValue()) / Constant.ONE_THOUSAND, + (now - lastInteractiveTime) / Constant.ONE_THOUSAND, syncBlockInProcess.size()); } diff --git a/framework/src/main/java/org/tron/core/net/peer/PeerManager.java b/framework/src/main/java/org/tron/core/net/peer/PeerManager.java index 537f2083691..f564b90f3ed 100644 --- a/framework/src/main/java/org/tron/core/net/peer/PeerManager.java +++ b/framework/src/main/java/org/tron/core/net/peer/PeerManager.java @@ -94,7 +94,11 @@ private static void remove(PeerConnection peerConnection) { } public static synchronized void sortPeers() { - peers.sort(Comparator.comparingDouble(c -> c.getChannel().getAvgLatency())); + try { + peers.sort(Comparator.comparingLong(c -> c.getChannel().getAvgLatency())); + } catch (Exception e) { + logger.warn("Sort peers failed. {}", e.getMessage()); + } } public static PeerConnection getPeerConnection(Channel channel) { diff --git a/framework/src/main/java/org/tron/core/net/service/effective/ResilienceService.java b/framework/src/main/java/org/tron/core/net/service/effective/ResilienceService.java new file mode 100644 index 00000000000..a63fd4bf0a9 --- /dev/null +++ b/framework/src/main/java/org/tron/core/net/service/effective/ResilienceService.java @@ -0,0 +1,262 @@ +package org.tron.core.net.service.effective; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.tron.common.es.ExecutorServiceManager; +import org.tron.common.parameter.CommonParameter; +import org.tron.core.ChainBaseManager; +import org.tron.core.config.args.Args; +import org.tron.core.net.TronNetDelegate; +import org.tron.core.net.peer.PeerConnection; +import org.tron.protos.Protocol.ReasonCode; + +@Slf4j(topic = "net") +@Component +public class ResilienceService { + + private static final long inactiveThreshold = + CommonParameter.getInstance().getInactiveThreshold() * 1000L; + public static final long blockNotChangeThreshold = 60 * 1000L; + + //when node is isolated, retention percent peers will not be disconnected + public static final double retentionPercent = 0.8; + private static final int initialDelay = 300; + public static final int minBroadcastPeerSize = 3; + private static final String esName = "resilience-service"; + private final ScheduledExecutorService executor = ExecutorServiceManager + .newSingleThreadScheduledExecutor(esName); + + @Autowired + private TronNetDelegate tronNetDelegate; + + @Autowired + private ChainBaseManager chainBaseManager; + + public void init() { + if (Args.getInstance().isOpenFullTcpDisconnect) { + executor.scheduleWithFixedDelay(() -> { + try { + disconnectRandom(); + } catch (Exception e) { + logger.error("DisconnectRandom node failed", e); + } + }, initialDelay, 30, TimeUnit.SECONDS); + } else { + logger.info("OpenFullTcpDisconnect is disabled"); + } + + executor.scheduleWithFixedDelay(() -> { + try { + disconnectLan(); + } catch (Exception e) { + logger.error("DisconnectLan node failed", e); + } + }, initialDelay, 10, TimeUnit.SECONDS); + + executor.scheduleWithFixedDelay(() -> { + try { + disconnectIsolated2(); + } catch (Exception e) { + logger.error("DisconnectIsolated node failed", e); + } + }, initialDelay, 30, TimeUnit.SECONDS); + } + + private void disconnectRandom() { + int peerSize = tronNetDelegate.getActivePeer().size(); + if (peerSize < CommonParameter.getInstance().getMaxConnections()) { + return; + } + List peers = tronNetDelegate.getActivePeer().stream() + .filter(peer -> !peer.getChannel().isTrustPeer()) + .filter(peer -> !peer.isNeedSyncFromUs() && !peer.isNeedSyncFromPeer()) + .collect(Collectors.toList()); + + if (peers.size() >= minBroadcastPeerSize) { + long now = System.currentTimeMillis(); + Map weights = new HashMap<>(); + peers.forEach(peer -> { + int weight = (int) Math.ceil((double) (now - peer.getLastInteractiveTime()) / 500); + weights.put(peer, Math.max(weight, 1)); + }); + WeightedRandom weightedRandom = new WeightedRandom(weights); + PeerConnection one = (PeerConnection) weightedRandom.next(); + disconnectFromPeer(one, ReasonCode.RANDOM_ELIMINATION, DisconnectCause.RANDOM_ELIMINATION); + return; + } + + int needSyncFromPeerCount = (int) tronNetDelegate.getActivePeer().stream() + .filter(peer -> !peer.getChannel().isTrustPeer()) + .filter(PeerConnection::isNeedSyncFromPeer) + .count(); + if (needSyncFromPeerCount >= 2) { + peers = tronNetDelegate.getActivePeer().stream() + .filter(peer -> !peer.getChannel().isTrustPeer()) + .filter(peer -> peer.isNeedSyncFromUs() || peer.isNeedSyncFromPeer()) + .collect(Collectors.toList()); + } else { + peers = tronNetDelegate.getActivePeer().stream() + .filter(peer -> !peer.getChannel().isTrustPeer()) + .filter(PeerConnection::isNeedSyncFromUs) + .collect(Collectors.toList()); + } + if (!peers.isEmpty()) { + int index = new Random().nextInt(peers.size()); + disconnectFromPeer(peers.get(index), ReasonCode.RANDOM_ELIMINATION, + DisconnectCause.RANDOM_ELIMINATION); + } + } + + + private void disconnectLan() { + if (!isLanNode()) { + return; + } + // disconnect from the node that has keep inactive for more than inactiveThreshold + // and its lastActiveTime is smallest + int peerSize = tronNetDelegate.getActivePeer().size(); + if (peerSize >= CommonParameter.getInstance().getMinConnections()) { + long now = System.currentTimeMillis(); + List peers = tronNetDelegate.getActivePeer().stream() + .filter(peer -> now - peer.getLastInteractiveTime() >= inactiveThreshold) + .filter(peer -> !peer.isNeedSyncFromPeer() && !peer.isNeedSyncFromUs()) + .filter(peer -> !peer.getChannel().isTrustPeer()) + .collect(Collectors.toList()); + Optional one = getEarliestPeer(peers); + one.ifPresent( + peer -> disconnectFromPeer(peer, ReasonCode.BAD_PROTOCOL, DisconnectCause.LAN_NODE)); + } + } + + private void disconnectIsolated2() { + if (!isIsolateLand2()) { + return; + } + logger.warn("Node is isolated, try to disconnect from peers"); + + //disconnect from the node whose lastActiveTime is smallest + int activePeerSize = (int) tronNetDelegate.getActivePeer().stream() + .filter(peer -> peer.getChannel().isActive()) + .count(); + if (activePeerSize >= CommonParameter.getInstance().getMinActiveConnections()) { + List peers = tronNetDelegate.getActivePeer().stream() + .filter(peer -> !peer.getChannel().isTrustPeer()) + .filter(peer -> peer.getChannel().isActive()) + .collect(Collectors.toList()); + + Optional one = getEarliestPeer(peers); + one.ifPresent(peer -> disconnectFromPeer(peer, ReasonCode.BAD_PROTOCOL, + DisconnectCause.ISOLATE2_ACTIVE)); + } + + //disconnect from some passive nodes, make sure retention nodes' num <= 0.8 * maxConnection, + //so new peers can come in + int peerSize = tronNetDelegate.getActivePeer().size(); + int threshold = (int) (CommonParameter.getInstance().getMaxConnections() * retentionPercent); + if (peerSize > threshold) { + int disconnectSize = peerSize - threshold; + List peers = tronNetDelegate.getActivePeer().stream() + .filter(peer -> !peer.getChannel().isTrustPeer()) + .filter(peer -> !peer.getChannel().isActive()) + .collect(Collectors.toList()); + try { + peers.sort(Comparator.comparing(PeerConnection::getLastInteractiveTime, Long::compareTo)); + } catch (Exception e) { + logger.warn("Sort disconnectIsolated2 peers failed: {}", e.getMessage()); + return; + } + int candidateSize = peers.size(); + if (peers.size() > disconnectSize) { + peers = peers.subList(0, disconnectSize); + } + logger.info("All peer Size:{}, plan size:{}, candidate size:{}, real size:{}", peerSize, + disconnectSize, candidateSize, peers.size()); + peers.forEach(peer -> disconnectFromPeer(peer, ReasonCode.BAD_PROTOCOL, + DisconnectCause.ISOLATE2_PASSIVE)); + } + } + + private Optional getEarliestPeer(List pees) { + Optional one = Optional.empty(); + try { + one = pees.stream() + .min(Comparator.comparing(PeerConnection::getLastInteractiveTime, Long::compareTo)); + } catch (Exception e) { + logger.warn("Get earliest peer failed: {}", e.getMessage()); + } + return one; + } + + private boolean isLanNode() { + int peerSize = tronNetDelegate.getActivePeer().size(); + int activePeerSize = (int) tronNetDelegate.getActivePeer().stream() + .filter(peer -> peer.getChannel().isActive()) + .count(); + return peerSize >= CommonParameter.getInstance().getMinActiveConnections() + && peerSize == activePeerSize; + } + + private boolean isIsolateLand2() { + int advPeerCount = (int) tronNetDelegate.getActivePeer().stream() + .filter(peer -> !peer.isNeedSyncFromPeer() && !peer.isNeedSyncFromUs()) + .count(); + long diff = System.currentTimeMillis() - chainBaseManager.getLatestSaveBlockTime(); + return advPeerCount >= 1 && diff >= blockNotChangeThreshold; + } + + private void disconnectFromPeer(PeerConnection peer, ReasonCode reasonCode, + DisconnectCause cause) { + int inactiveSeconds = (int) ((System.currentTimeMillis() - peer.getLastInteractiveTime()) + / 1000); + logger.info("Disconnect from peer {}, inactive seconds {}, cause: {}", + peer.getInetSocketAddress(), inactiveSeconds, cause); + peer.disconnect(reasonCode); + } + + private enum DisconnectCause { + RANDOM_ELIMINATION, + LAN_NODE, + ISOLATE2_ACTIVE, + ISOLATE2_PASSIVE, + } + + static class WeightedRandom { + + private final Map weights; + private final Random random; + + public WeightedRandom(Map weights) { + this.weights = weights; + this.random = new Random(); + } + + public Object next() { + int totalWeight = 0; + for (int weight : weights.values()) { + totalWeight += weight; + } + int randomNum = random.nextInt(totalWeight); + for (Object key : weights.keySet()) { + randomNum -= weights.get(key); + if (randomNum < 0) { + return key; + } + } + throw new IllegalStateException("Sum of weights should not be negative."); + } + } + + public void close() { + ExecutorServiceManager.shutdownAndAwaitTermination(executor, esName); + } +} diff --git a/framework/src/main/java/org/tron/core/net/service/statistics/TronStatsManager.java b/framework/src/main/java/org/tron/core/net/service/statistics/TronStatsManager.java index caac3f7f325..99e12834ced 100644 --- a/framework/src/main/java/org/tron/core/net/service/statistics/TronStatsManager.java +++ b/framework/src/main/java/org/tron/core/net/service/statistics/TronStatsManager.java @@ -63,11 +63,19 @@ private void work() { Metrics.histogramObserve(MetricKeys.Histogram.TCP_BYTES, stats.getTcpInSize() - TCP_TRAFFIC_IN, MetricLabels.Histogram.TRAFFIC_IN); + MetricsUtil.meterMark(MetricsKey.NET_TCP_OUT_TRAFFIC, stats.getTcpOutSize() - TCP_TRAFFIC_OUT); + Metrics.histogramObserve(MetricKeys.Histogram.TCP_BYTES, + stats.getTcpOutSize() - TCP_TRAFFIC_OUT, + MetricLabels.Histogram.TRAFFIC_OUT); + + MetricsUtil.meterMark(MetricsKey.NET_UDP_IN_TRAFFIC, + stats.getUdpInSize() - UDP_TRAFFIC_IN); Metrics.histogramObserve(MetricKeys.Histogram.UDP_BYTES, stats.getUdpInSize() - UDP_TRAFFIC_IN, MetricLabels.Histogram.TRAFFIC_IN); + MetricsUtil.meterMark(MetricsKey.NET_UDP_OUT_TRAFFIC, stats.getUdpOutSize() - UDP_TRAFFIC_OUT); Metrics.histogramObserve(MetricKeys.Histogram.UDP_BYTES, diff --git a/framework/src/main/java/org/tron/core/net/service/sync/SyncService.java b/framework/src/main/java/org/tron/core/net/service/sync/SyncService.java index 9453700df0d..8c673977962 100644 --- a/framework/src/main/java/org/tron/core/net/service/sync/SyncService.java +++ b/framework/src/main/java/org/tron/core/net/service/sync/SyncService.java @@ -134,7 +134,7 @@ public void processBlock(PeerConnection peer, BlockMessage blockMessage) { blockJustReceived.put(blockMessage, peer); } handleFlag = true; - if (peer.isIdle()) { + if (peer.isSyncIdle()) { if (peer.getRemainNum() > 0 && peer.getSyncBlockToFetch().size() <= syncFetchBatchNum) { syncNext(peer); @@ -226,7 +226,7 @@ private BlockId getBlockIdByNum(long num) throws P2pException { private void startFetchSyncBlock() { HashMap> send = new HashMap<>(); tronNetDelegate.getActivePeer().stream() - .filter(peer -> peer.isNeedSyncFromPeer() && peer.isIdle()) + .filter(peer -> peer.isNeedSyncFromPeer() && peer.isSyncIdle()) .filter(peer -> peer.isFetchAble()) .forEach(peer -> { if (!send.containsKey(peer)) { diff --git a/framework/src/main/java/org/tron/core/services/filter/HttpInterceptor.java b/framework/src/main/java/org/tron/core/services/filter/HttpInterceptor.java index 2cce8272dd5..8b43cfef642 100644 --- a/framework/src/main/java/org/tron/core/services/filter/HttpInterceptor.java +++ b/framework/src/main/java/org/tron/core/services/filter/HttpInterceptor.java @@ -18,7 +18,6 @@ @Slf4j(topic = "httpInterceptor") public class HttpInterceptor implements Filter { - private String endpoint; private final int HTTP_SUCCESS = 200; private final int HTTP_BAD_REQUEST = 400; private final int HTTP_NOT_ACCEPTABLE = 406; @@ -29,6 +28,7 @@ public void init(FilterConfig filterConfig) { @Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) { + String endpoint = MetricLabels.UNDEFINED; try { if (!(request instanceof HttpServletRequest)) { chain.doFilter(request, response); diff --git a/framework/src/main/java/org/tron/program/Version.java b/framework/src/main/java/org/tron/program/Version.java index eb21aa292f7..601f555a234 100644 --- a/framework/src/main/java/org/tron/program/Version.java +++ b/framework/src/main/java/org/tron/program/Version.java @@ -4,7 +4,7 @@ public class Version { public static final String VERSION_NAME = "GreatVoyage-v4.7.4-44-g8720e06a6"; public static final String VERSION_CODE = "18306"; - private static final String VERSION = "4.7.5"; + private static final String VERSION = "4.7.6"; public static String getVersion() { return VERSION; diff --git a/framework/src/main/resources/config-localtest.conf b/framework/src/main/resources/config-localtest.conf index f1ac104c9ed..50e7539c1d0 100644 --- a/framework/src/main/resources/config-localtest.conf +++ b/framework/src/main/resources/config-localtest.conf @@ -99,6 +99,7 @@ node { # check the peer data transfer ,disconnect factor isOpenFullTcpDisconnect = true + inactiveThreshold = 600 //seconds p2p { version = 333 # 11111: mainnet; 20180622: testnet diff --git a/framework/src/main/resources/config.conf b/framework/src/main/resources/config.conf index 78427c30f87..f9fc2dd673d 100644 --- a/framework/src/main/resources/config.conf +++ b/framework/src/main/resources/config.conf @@ -180,6 +180,7 @@ node { minParticipationRate = 15 isOpenFullTcpDisconnect = false + inactiveThreshold = 600 //seconds p2p { version = 11111 # 11111: mainnet; 20180622: testnet diff --git a/framework/src/test/java/org/tron/core/capsule/utils/MerkleTreeTest.java b/framework/src/test/java/org/tron/core/capsule/utils/MerkleTreeTest.java index 3662fb524b8..910b1adba67 100644 --- a/framework/src/test/java/org/tron/core/capsule/utils/MerkleTreeTest.java +++ b/framework/src/test/java/org/tron/core/capsule/utils/MerkleTreeTest.java @@ -8,6 +8,7 @@ import java.util.stream.IntStream; import lombok.extern.slf4j.Slf4j; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -185,6 +186,7 @@ public void testAnyHashNum() { } @Test + @Ignore public void testConcurrent() { Sha256Hash root1 = Sha256Hash.wrap( ByteString.fromHex("6cb38b4f493db8bacf26123cd4253bbfc530c708b97b3747e782f64097c3c482")); diff --git a/framework/src/test/java/org/tron/core/db/TransactionExpireTest.java b/framework/src/test/java/org/tron/core/db/TransactionExpireTest.java index 5243405b7e6..5193f4128cd 100644 --- a/framework/src/test/java/org/tron/core/db/TransactionExpireTest.java +++ b/framework/src/test/java/org/tron/core/db/TransactionExpireTest.java @@ -50,19 +50,6 @@ public void init() throws IOException { context = new TronApplicationContext(DefaultConfig.class); wallet = context.getBean(Wallet.class); dbManager = context.getBean(Manager.class); - - blockCapsule = new BlockCapsule( - 1, - Sha256Hash.wrap(ByteString.copyFrom( - ByteArray.fromHexString( - "0304f784e4e7bae517bcab94c3e0c9214fb4ac7ff9d7d5a937d1f40031f87b81"))), - 1, - ByteString.copyFromUtf8("testAddress")); - dbManager.getDynamicPropertiesStore().saveLatestBlockHeaderNumber(blockCapsule.getNum()); - dbManager.getDynamicPropertiesStore() - .saveLatestBlockHeaderTimestamp(blockCapsule.getTimeStamp()); - dbManager.updateRecentBlock(blockCapsule); - initLocalWitness(); } private void initLocalWitness() { @@ -81,6 +68,19 @@ public void removeDb() { @Test public void testExpireTransaction() { + blockCapsule = new BlockCapsule( + 1, + Sha256Hash.wrap(ByteString.copyFrom( + ByteArray.fromHexString( + "0304f784e4e7bae517bcab94c3e0c9214fb4ac7ff9d7d5a937d1f40031f87b81"))), + 1, + ByteString.copyFromUtf8("testAddress")); + dbManager.getDynamicPropertiesStore().saveLatestBlockHeaderNumber(blockCapsule.getNum()); + dbManager.getDynamicPropertiesStore() + .saveLatestBlockHeaderTimestamp(blockCapsule.getTimeStamp()); + dbManager.updateRecentBlock(blockCapsule); + initLocalWitness(); + TransferContract transferContract = TransferContract.newBuilder() .setAmount(1L) .setOwnerAddress(ByteString.copyFrom(Args.getLocalWitnesses() @@ -101,8 +101,61 @@ public void testExpireTransaction() { Assert.assertEquals(response_code.TRANSACTION_EXPIRATION_ERROR, result.getCode()); } + @Test + public void testExpireTransactionNew() { + blockCapsule = new BlockCapsule( + 1, + Sha256Hash.wrap(ByteString.copyFrom( + ByteArray.fromHexString( + "0304f784e4e7bae517bcab94c3e0c9214fb4ac7ff9d7d5a937d1f40031f87b81"))), + System.currentTimeMillis(), + ByteString.copyFromUtf8("testAddress")); + dbManager.getDynamicPropertiesStore().saveLatestBlockHeaderNumber(blockCapsule.getNum()); + dbManager.getDynamicPropertiesStore() + .saveLatestBlockHeaderTimestamp(blockCapsule.getTimeStamp()); + dbManager.updateRecentBlock(blockCapsule); + initLocalWitness(); + byte[] address = Args.getLocalWitnesses() + .getWitnessAccountAddress(CommonParameter.getInstance().isECKeyCryptoEngine()); + ByteString addressByte = ByteString.copyFrom(address); + AccountCapsule accountCapsule = + new AccountCapsule(Protocol.Account.newBuilder().setAddress(addressByte).build()); + accountCapsule.setBalance(1000_000_000L); + dbManager.getChainBaseManager().getAccountStore() + .put(accountCapsule.createDbKey(), accountCapsule); + + TransferContract transferContract = TransferContract.newBuilder() + .setAmount(1L) + .setOwnerAddress(addressByte) + .setToAddress(ByteString.copyFrom(ByteArray.fromHexString( + (Wallet.getAddressPreFixString() + "A389132D6639FBDA4FBC8B659264E6B7C90DB086")))) + .build(); + TransactionCapsule transactionCapsule = + new TransactionCapsule(transferContract, ContractType.TransferContract); + transactionCapsule.setReference(blockCapsule.getNum(), blockCapsule.getBlockId().getBytes()); + + transactionCapsule.setExpiration(System.currentTimeMillis()); + transactionCapsule.sign(ByteArray.fromHexString(Args.getLocalWitnesses().getPrivateKey())); + + GrpcAPI.Return result = wallet.broadcastTransaction(transactionCapsule.getInstance()); + Assert.assertEquals(response_code.TRANSACTION_EXPIRATION_ERROR, result.getCode()); + } + @Test public void testTransactionApprovedList() { + blockCapsule = new BlockCapsule( + 1, + Sha256Hash.wrap(ByteString.copyFrom( + ByteArray.fromHexString( + "0304f784e4e7bae517bcab94c3e0c9214fb4ac7ff9d7d5a937d1f40031f87b81"))), + 1, + ByteString.copyFromUtf8("testAddress")); + dbManager.getDynamicPropertiesStore().saveLatestBlockHeaderNumber(blockCapsule.getNum()); + dbManager.getDynamicPropertiesStore() + .saveLatestBlockHeaderTimestamp(blockCapsule.getTimeStamp()); + dbManager.updateRecentBlock(blockCapsule); + initLocalWitness(); + byte[] address = Args.getLocalWitnesses() .getWitnessAccountAddress(CommonParameter.getInstance().isECKeyCryptoEngine()); TransferContract transferContract = TransferContract.newBuilder() diff --git a/framework/src/test/java/org/tron/core/net/P2pEventHandlerImplTest.java b/framework/src/test/java/org/tron/core/net/P2pEventHandlerImplTest.java index 0008ec315d5..e0c816a537a 100644 --- a/framework/src/test/java/org/tron/core/net/P2pEventHandlerImplTest.java +++ b/framework/src/test/java/org/tron/core/net/P2pEventHandlerImplTest.java @@ -12,10 +12,13 @@ import org.tron.common.utils.Sha256Hash; import org.tron.core.Constant; import org.tron.core.config.args.Args; +import org.tron.core.net.message.TronMessage; +import org.tron.core.net.message.adv.FetchInvDataMessage; import org.tron.core.net.message.adv.InventoryMessage; import org.tron.core.net.peer.PeerConnection; import org.tron.core.net.service.statistics.PeerStatistics; import org.tron.protos.Protocol; +import org.tron.protos.Protocol.Inventory.InventoryType; public class P2pEventHandlerImplTest { @@ -108,4 +111,22 @@ public void testProcessInventoryMessage() throws Exception { Assert.assertEquals(300, count); } + + @Test + public void testUpdateLastInteractiveTime() throws Exception { + String[] a = new String[0]; + Args.setParam(a, Constant.TESTNET_CONF); + + PeerConnection peer = new PeerConnection(); + P2pEventHandlerImpl p2pEventHandler = new P2pEventHandlerImpl(); + + Method method = p2pEventHandler.getClass() + .getDeclaredMethod("updateLastInteractiveTime", PeerConnection.class, TronMessage.class); + method.setAccessible(true); + + long t1 = System.currentTimeMillis(); + FetchInvDataMessage message = new FetchInvDataMessage(new ArrayList<>(), InventoryType.BLOCK); + method.invoke(p2pEventHandler, peer, message); + Assert.assertTrue(peer.getLastInteractiveTime() >= t1); + } } diff --git a/framework/src/test/java/org/tron/core/net/messagehandler/BlockMsgHandlerTest.java b/framework/src/test/java/org/tron/core/net/messagehandler/BlockMsgHandlerTest.java index 8154d01aded..48e7d730520 100644 --- a/framework/src/test/java/org/tron/core/net/messagehandler/BlockMsgHandlerTest.java +++ b/framework/src/test/java/org/tron/core/net/messagehandler/BlockMsgHandlerTest.java @@ -1,26 +1,33 @@ package org.tron.core.net.messagehandler; import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; - import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.List; import javax.annotation.Resource; - import lombok.extern.slf4j.Slf4j; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; import org.tron.common.BaseTest; +import org.tron.common.utils.ByteArray; import org.tron.common.utils.Sha256Hash; import org.tron.core.Constant; import org.tron.core.capsule.BlockCapsule; +import org.tron.core.capsule.BlockCapsule.BlockId; import org.tron.core.config.Parameter; import org.tron.core.config.args.Args; import org.tron.core.exception.P2pException; +import org.tron.core.net.TronNetDelegate; import org.tron.core.net.message.adv.BlockMessage; import org.tron.core.net.peer.Item; import org.tron.core.net.peer.PeerConnection; @@ -41,9 +48,8 @@ public class BlockMsgHandlerTest extends BaseTest { */ @BeforeClass public static void init() { - Args.setParam(new String[]{"--output-directory", dbPath(), "--debug"}, + Args.setParam(new String[] {"--output-directory", dbPath(), "--debug"}, Constant.TEST_CONF); - } @Before @@ -123,4 +129,42 @@ public void testProcessMessage() { logger.error("error", e); } } + + @Test + public void testProcessBlock() { + TronNetDelegate tronNetDelegate = Mockito.mock(TronNetDelegate.class); + + try { + Field field = handler.getClass().getDeclaredField("tronNetDelegate"); + field.setAccessible(true); + field.set(handler, tronNetDelegate); + + BlockCapsule blockCapsule0 = new BlockCapsule(1, + Sha256Hash.wrap(ByteString + .copyFrom(ByteArray + .fromHexString( + "9938a342238077182498b464ac0292229938a342238077182498b464ac029222"))), + 1234, + ByteString.copyFrom("1234567".getBytes())); + + peer.getAdvInvReceive() + .put(new Item(blockCapsule0.getBlockId(), InventoryType.BLOCK), + System.currentTimeMillis()); + + Mockito.doReturn(true).when(tronNetDelegate).validBlock(any(BlockCapsule.class)); + Mockito.doReturn(true).when(tronNetDelegate).containBlock(any(BlockId.class)); + Mockito.doReturn(blockCapsule0.getBlockId()).when(tronNetDelegate).getHeadBlockId(); + Mockito.doNothing().when(tronNetDelegate).processBlock(any(BlockCapsule.class), anyBoolean()); + List peers = new ArrayList<>(); + peers.add(peer); + Mockito.doReturn(peers).when(tronNetDelegate).getActivePeer(); + + Method method = handler.getClass() + .getDeclaredMethod("processBlock", PeerConnection.class, BlockCapsule.class); + method.setAccessible(true); + method.invoke(handler, peer, blockCapsule0); + } catch (Exception e) { + Assert.fail(); + } + } } diff --git a/framework/src/test/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandlerTest.java b/framework/src/test/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandlerTest.java index 404d275276a..5fd6d6725ba 100644 --- a/framework/src/test/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandlerTest.java +++ b/framework/src/test/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandlerTest.java @@ -21,8 +21,6 @@ import org.tron.core.net.service.adv.AdvService; import org.tron.protos.Protocol; - - public class FetchInvDataMsgHandlerTest { @Test @@ -62,4 +60,37 @@ public void testProcessMessage() throws Exception { new FetchInvDataMessage(blockIds, Protocol.Inventory.InventoryType.BLOCK)); Assert.assertNotNull(syncBlockIdCache.getIfPresent(blockId)); } + + @Test + public void testSyncFetchCheck() { + BlockCapsule.BlockId blockId = new BlockCapsule.BlockId(Sha256Hash.ZERO_HASH, 10000L); + List blockIds = new LinkedList<>(); + blockIds.add(blockId); + FetchInvDataMessage msg = + new FetchInvDataMessage(blockIds, Protocol.Inventory.InventoryType.BLOCK); + + PeerConnection peer = Mockito.mock(PeerConnection.class); + Mockito.when(peer.isNeedSyncFromUs()).thenReturn(true); + Cache advInvSpread = CacheBuilder.newBuilder().maximumSize(100) + .expireAfterWrite(1, TimeUnit.HOURS).recordStats().build(); + Mockito.when(peer.getAdvInvSpread()).thenReturn(advInvSpread); + + FetchInvDataMsgHandler fetchInvDataMsgHandler = new FetchInvDataMsgHandler(); + + try { + Mockito.when(peer.getLastSyncBlockId()) + .thenReturn(new BlockCapsule.BlockId(Sha256Hash.ZERO_HASH, 1000L)); + fetchInvDataMsgHandler.processMessage(peer, msg); + } catch (Exception e) { + Assert.assertEquals(e.getMessage(), "maxBlockNum: 1000, blockNum: 10000"); + } + + try { + Mockito.when(peer.getLastSyncBlockId()) + .thenReturn(new BlockCapsule.BlockId(Sha256Hash.ZERO_HASH, 20000L)); + fetchInvDataMsgHandler.processMessage(peer, msg); + } catch (Exception e) { + Assert.assertEquals(e.getMessage(), "minBlockNum: 16000, blockNum: 10000"); + } + } } diff --git a/framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java b/framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java index 47ffde30ac3..e1865543443 100644 --- a/framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java +++ b/framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java @@ -6,13 +6,18 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; + +import lombok.Getter; import org.joda.time.DateTime; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; import org.tron.common.BaseTest; +import org.tron.common.runtime.TvmTestUtils; import org.tron.common.utils.ByteArray; import org.tron.core.Constant; import org.tron.core.config.args.Args; @@ -75,10 +80,48 @@ public void testProcessMessage() { transactionsMsgHandler.processMessage(peer, new TransactionsMessage(transactionList)); Assert.assertNull(advInvRequest.get(item)); //Thread.sleep(10); + transactionsMsgHandler.close(); + BlockingQueue smartContractQueue = + new LinkedBlockingQueue(2); + smartContractQueue.offer(new TrxEvent(null, null)); + smartContractQueue.offer(new TrxEvent(null, null)); + Field field1 = TransactionsMsgHandler.class.getDeclaredField("smartContractQueue"); + field1.setAccessible(true); + field1.set(transactionsMsgHandler, smartContractQueue); + Protocol.Transaction trx1 = TvmTestUtils.generateTriggerSmartContractAndGetTransaction( + ByteArray.fromHexString("121212a9cf"), + ByteArray.fromHexString("121212a9cf"), + ByteArray.fromHexString("123456"), + 100, 100000000, 0, 0); + Map advInvRequest1 = new ConcurrentHashMap<>(); + Item item1 = new Item(new TransactionMessage(trx1).getMessageId(), + Protocol.Inventory.InventoryType.TRX); + advInvRequest1.put(item1, 0L); + Mockito.when(peer.getAdvInvRequest()).thenReturn(advInvRequest1); + List transactionList1 = new ArrayList<>(); + transactionList1.add(trx1); + transactionsMsgHandler.processMessage(peer, new TransactionsMessage(transactionList1)); + Assert.assertNull(advInvRequest.get(item1)); } catch (Exception e) { Assert.fail(); } finally { transactionsMsgHandler.close(); } } + + class TrxEvent { + + @Getter + private PeerConnection peer; + @Getter + private TransactionMessage msg; + @Getter + private long time; + + public TrxEvent(PeerConnection peer, TransactionMessage msg) { + this.peer = peer; + this.msg = msg; + this.time = System.currentTimeMillis(); + } + } } diff --git a/framework/src/test/java/org/tron/core/net/peer/PeerConnectionTest.java b/framework/src/test/java/org/tron/core/net/peer/PeerConnectionTest.java index 9db5230ed45..5a67cd8f609 100644 --- a/framework/src/test/java/org/tron/core/net/peer/PeerConnectionTest.java +++ b/framework/src/test/java/org/tron/core/net/peer/PeerConnectionTest.java @@ -98,6 +98,36 @@ public void testIsIdle() { Assert.assertTrue(!f); } + @Test + public void testIsSyncIdle() { + PeerConnection peerConnection = new PeerConnection(); + boolean f = peerConnection.isSyncIdle(); + Assert.assertTrue(f); + + Item item = new Item(Sha256Hash.ZERO_HASH, Protocol.Inventory.InventoryType.TRX); + Long time = System.currentTimeMillis(); + peerConnection.getAdvInvRequest().put(item, time); + f = peerConnection.isSyncIdle(); + Assert.assertTrue(f); + + peerConnection.getAdvInvRequest().clear(); + f = peerConnection.isSyncIdle(); + Assert.assertTrue(f); + + BlockCapsule.BlockId blockId = new BlockCapsule.BlockId(); + peerConnection.getSyncBlockRequested().put(blockId, time); + f = peerConnection.isSyncIdle(); + Assert.assertTrue(!f); + + peerConnection.getSyncBlockRequested().clear(); + f = peerConnection.isSyncIdle(); + Assert.assertTrue(f); + + peerConnection.setSyncChainRequested(new Pair<>(new LinkedList<>(), time)); + f = peerConnection.isSyncIdle(); + Assert.assertTrue(!f); + } + @Test public void testOnConnect() { PeerConnection peerConnection = new PeerConnection(); diff --git a/framework/src/test/java/org/tron/core/net/peer/PeerManagerTest.java b/framework/src/test/java/org/tron/core/net/peer/PeerManagerTest.java index a6151da6d1c..b8c03de1029 100644 --- a/framework/src/test/java/org/tron/core/net/peer/PeerManagerTest.java +++ b/framework/src/test/java/org/tron/core/net/peer/PeerManagerTest.java @@ -135,4 +135,32 @@ public void testGetPeers() throws Exception { Assert.assertEquals(2, peers.size()); } + @Test + public void testSortPeers() throws Exception { + PeerConnection p1 = new PeerConnection(); + PeerConnection p2 = new PeerConnection(); + + List peers = new ArrayList<>(); + peers.add(p1); + peers.add(p2); + + Field field = PeerManager.class.getDeclaredField("peers"); + field.setAccessible(true); + field.set(PeerManager.class, Collections.synchronizedList(peers)); + + PeerManager.sortPeers(); + + Channel c1 = new Channel(); + c1.updateAvgLatency(100000L); + ReflectUtils.setFieldValue(p1, "channel", c1); + + Channel c2 = new Channel(); + c2.updateAvgLatency(1000L); + ReflectUtils.setFieldValue(p2, "channel", c2); + + PeerManager.sortPeers(); + + Assert.assertEquals(PeerManager.getPeers().get(0), p2); + } + } diff --git a/framework/src/test/java/org/tron/core/net/peer/PeerStatusCheckTest.java b/framework/src/test/java/org/tron/core/net/peer/PeerStatusCheckTest.java new file mode 100644 index 00000000000..53e678c7ca4 --- /dev/null +++ b/framework/src/test/java/org/tron/core/net/peer/PeerStatusCheckTest.java @@ -0,0 +1,73 @@ +package org.tron.core.net.peer; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.spy; + +import io.netty.channel.ChannelHandlerContext; +import java.io.IOException; +import java.net.InetSocketAddress; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; +import org.tron.common.application.TronApplicationContext; +import org.tron.common.utils.ReflectUtils; +import org.tron.core.Constant; +import org.tron.core.capsule.BlockCapsule.BlockId; +import org.tron.core.config.DefaultConfig; +import org.tron.core.config.Parameter.NetConstants; +import org.tron.core.config.args.Args; +import org.tron.p2p.connection.Channel; + + +public class PeerStatusCheckTest { + + protected TronApplicationContext context; + private PeerStatusCheck service; + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Before + public void init() throws IOException { + Args.setParam(new String[] {"--output-directory", + temporaryFolder.newFolder().toString(), "--debug"}, Constant.TEST_CONF); + context = new TronApplicationContext(DefaultConfig.class); + service = context.getBean(PeerStatusCheck.class); + } + + /** + * destroy. + */ + @After + public void destroy() { + Args.clearParam(); + context.destroy(); + } + + @Test + public void testCheck() { + int maxConnection = 30; + Assert.assertEquals(maxConnection, Args.getInstance().getMaxConnections()); + Assert.assertEquals(0, PeerManager.getPeers().size()); + + for (int i = 0; i < maxConnection; i++) { + InetSocketAddress inetSocketAddress = new InetSocketAddress("201.0.0." + i, 10001); + Channel c1 = spy(Channel.class); + ReflectUtils.setFieldValue(c1, "inetSocketAddress", inetSocketAddress); + ReflectUtils.setFieldValue(c1, "inetAddress", inetSocketAddress.getAddress()); + ReflectUtils.setFieldValue(c1, "ctx", spy(ChannelHandlerContext.class)); + Mockito.doNothing().when(c1).send((byte[]) any()); + + PeerManager.add(context, c1); + } + + PeerManager.getPeers().get(0).getSyncBlockRequested() + .put(new BlockId(), System.currentTimeMillis() - NetConstants.SYNC_TIME_OUT - 1000); + ReflectUtils.invokeMethod(service, "statusCheck"); + + Assert.assertEquals(maxConnection - 1L, PeerManager.getPeers().size()); + } +} diff --git a/framework/src/test/java/org/tron/core/net/services/AdvServiceTest.java b/framework/src/test/java/org/tron/core/net/services/AdvServiceTest.java index d79b54f6f45..c4ac4b06c43 100644 --- a/framework/src/test/java/org/tron/core/net/services/AdvServiceTest.java +++ b/framework/src/test/java/org/tron/core/net/services/AdvServiceTest.java @@ -4,6 +4,7 @@ import java.io.IOException; import java.net.InetSocketAddress; + import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -105,12 +106,28 @@ private void testBroadcast() { } private void testTrxBroadcast() { - Protocol.Transaction trx = Protocol.Transaction.newBuilder().build(); + Protocol.Transaction trx = Protocol.Transaction.newBuilder() + .setRawData( + Protocol.Transaction.raw.newBuilder() + .setRefBlockNum(1) + .setExpiration(System.currentTimeMillis() + 3000).build()).build(); CommonParameter.getInstance().setValidContractProtoThreadNum(1); TransactionMessage msg = new TransactionMessage(trx); service.broadcast(msg); Item item = new Item(msg.getMessageId(), InventoryType.TRX); Assert.assertNotNull(service.getMessage(item)); + + Protocol.Transaction expiredTrx = Protocol.Transaction.newBuilder() + .setRawData( + Protocol.Transaction.raw.newBuilder() + .setRefBlockNum(1) + .setExpiration(System.currentTimeMillis() - 1).build()) + .build(); + CommonParameter.getInstance().setValidContractProtoThreadNum(1); + TransactionMessage msg1 = new TransactionMessage(expiredTrx); + service.broadcast(msg); + Item item1 = new Item(msg1.getMessageId(), InventoryType.TRX); + Assert.assertNull(service.getMessage(item1)); } } diff --git a/framework/src/test/java/org/tron/core/net/services/ResilienceServiceTest.java b/framework/src/test/java/org/tron/core/net/services/ResilienceServiceTest.java new file mode 100644 index 00000000000..ce723e5c991 --- /dev/null +++ b/framework/src/test/java/org/tron/core/net/services/ResilienceServiceTest.java @@ -0,0 +1,205 @@ +package org.tron.core.net.services; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.spy; + +import io.netty.channel.ChannelHandlerContext; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.Set; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; +import org.tron.common.application.TronApplicationContext; +import org.tron.common.utils.ReflectUtils; +import org.tron.core.ChainBaseManager; +import org.tron.core.Constant; +import org.tron.core.config.DefaultConfig; +import org.tron.core.config.args.Args; +import org.tron.core.net.peer.PeerConnection; +import org.tron.core.net.peer.PeerManager; +import org.tron.core.net.service.effective.ResilienceService; +import org.tron.p2p.connection.Channel; + +public class ResilienceServiceTest { + + protected TronApplicationContext context; + private ResilienceService service; + private ChainBaseManager chainBaseManager; + + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Before + public void init() throws IOException { + Args.setParam(new String[] {"--output-directory", + temporaryFolder.newFolder().toString(), "--debug"}, Constant.TEST_CONF); + context = new TronApplicationContext(DefaultConfig.class); + chainBaseManager = context.getBean(ChainBaseManager.class); + service = context.getBean(ResilienceService.class); + } + + @Test + public void testDisconnectRandom() { + int maxConnection = 30; + Assert.assertEquals(maxConnection, Args.getInstance().getMaxConnections()); + clearPeers(); + Assert.assertEquals(0, PeerManager.getPeers().size()); + + for (int i = 0; i < maxConnection + 1; i++) { + InetSocketAddress inetSocketAddress = new InetSocketAddress("201.0.0." + i, 10001); + Channel c1 = spy(Channel.class); + ReflectUtils.setFieldValue(c1, "inetSocketAddress", inetSocketAddress); + ReflectUtils.setFieldValue(c1, "inetAddress", inetSocketAddress.getAddress()); + ReflectUtils.setFieldValue(c1, "ctx", spy(ChannelHandlerContext.class)); + Mockito.doNothing().when(c1).send((byte[]) any()); + + PeerManager.add(context, c1); + } + for (PeerConnection peer : PeerManager.getPeers() + .subList(0, ResilienceService.minBroadcastPeerSize)) { + peer.setNeedSyncFromPeer(false); + peer.setNeedSyncFromUs(false); + peer.setLastInteractiveTime(System.currentTimeMillis() - 1000); + } + for (PeerConnection peer : PeerManager.getPeers() + .subList(ResilienceService.minBroadcastPeerSize, maxConnection + 1)) { + peer.setNeedSyncFromPeer(false); + peer.setNeedSyncFromUs(true); + } + int size1 = (int) PeerManager.getPeers().stream() + .filter(peer -> !peer.isNeedSyncFromUs() && !peer.isNeedSyncFromPeer()) + .count(); + Assert.assertEquals(ResilienceService.minBroadcastPeerSize, size1); + Assert.assertEquals(maxConnection + 1, PeerManager.getPeers().size()); + + //disconnect from broadcasting peer + ReflectUtils.invokeMethod(service, "disconnectRandom"); + size1 = (int) PeerManager.getPeers().stream() + .filter(peer -> !peer.isNeedSyncFromUs() && !peer.isNeedSyncFromPeer()) + .count(); + Assert.assertEquals(ResilienceService.minBroadcastPeerSize - 1, size1); + Assert.assertEquals(maxConnection, PeerManager.getPeers().size()); + + //disconnect from syncing peer + ReflectUtils.invokeMethod(service, "disconnectRandom"); + size1 = (int) PeerManager.getPeers().stream() + .filter(peer -> !peer.isNeedSyncFromUs() && !peer.isNeedSyncFromPeer()) + .count(); + Assert.assertEquals(ResilienceService.minBroadcastPeerSize - 1, size1); + Assert.assertEquals(maxConnection - 1, PeerManager.getPeers().size()); + } + + @Test + public void testDisconnectLan() { + int minConnection = 8; + Assert.assertEquals(minConnection, Args.getInstance().getMinConnections()); + clearPeers(); + Assert.assertEquals(0, PeerManager.getPeers().size()); + + for (int i = 0; i < 9; i++) { + InetSocketAddress inetSocketAddress = new InetSocketAddress("201.0.0." + i, 10001); + Channel c1 = spy(Channel.class); + ReflectUtils.setFieldValue(c1, "inetSocketAddress", inetSocketAddress); + ReflectUtils.setFieldValue(c1, "inetAddress", inetSocketAddress.getAddress()); + ReflectUtils.setFieldValue(c1, "isActive", true); + ReflectUtils.setFieldValue(c1, "ctx", spy(ChannelHandlerContext.class)); + Mockito.doNothing().when(c1).send((byte[]) any()); + + PeerManager.add(context, c1); + } + for (PeerConnection peer : PeerManager.getPeers()) { + peer.setNeedSyncFromPeer(false); + peer.setNeedSyncFromUs(false); + } + Assert.assertEquals(9, PeerManager.getPeers().size()); + + boolean isLan = ReflectUtils.invokeMethod(service, "isLanNode"); + Assert.assertTrue(isLan); + + PeerConnection p1 = PeerManager.getPeers().get(1); + InetSocketAddress address1 = p1.getChannel().getInetSocketAddress(); + p1.setLastInteractiveTime( + System.currentTimeMillis() - Args.getInstance().inactiveThreshold * 1000L - 1000); + PeerConnection p2 = PeerManager.getPeers().get(2); + InetSocketAddress address2 = p2.getChannel().getInetSocketAddress(); + p2.setLastInteractiveTime( + System.currentTimeMillis() - Args.getInstance().inactiveThreshold * 1000L - 2000); + + ReflectUtils.invokeMethod(service, "disconnectLan"); + Assert.assertEquals(8, PeerManager.getPeers().size()); + Set addressSet = new HashSet<>(); + PeerManager.getPeers() + .forEach(p -> addressSet.add(p.getChannel().getInetSocketAddress())); + Assert.assertTrue(addressSet.contains(address1)); + Assert.assertFalse(addressSet.contains(address2)); + + ReflectUtils.invokeMethod(service, "disconnectLan"); + Assert.assertEquals(7, PeerManager.getPeers().size()); + addressSet.clear(); + PeerManager.getPeers() + .forEach(p -> addressSet.add(p.getChannel().getInetSocketAddress())); + Assert.assertFalse(addressSet.contains(address1)); + + ReflectUtils.invokeMethod(service, "disconnectLan"); + Assert.assertEquals(7, PeerManager.getPeers().size()); + } + + @Test + public void testDisconnectIsolated2() { + int maxConnection = 30; + Assert.assertEquals(maxConnection, Args.getInstance().getMaxConnections()); + clearPeers(); + Assert.assertEquals(0, PeerManager.getPeers().size()); + + int addSize = (int) (maxConnection * ResilienceService.retentionPercent) + 2; //26 + for (int i = 0; i < addSize; i++) { + InetSocketAddress inetSocketAddress = new InetSocketAddress("201.0.0." + i, 10001); + Channel c1 = spy(Channel.class); + ReflectUtils.setFieldValue(c1, "inetSocketAddress", inetSocketAddress); + ReflectUtils.setFieldValue(c1, "inetAddress", inetSocketAddress.getAddress()); + // 1 ~ 3 is active, 4 ~ 26 is not active + ReflectUtils.setFieldValue(c1, "isActive", i <= 2); + ReflectUtils.setFieldValue(c1, "ctx", spy(ChannelHandlerContext.class)); + Mockito.doNothing().when(c1).send((byte[]) any()); + + PeerManager.add(context, c1); + } + PeerManager.getPeers().get(10).setNeedSyncFromUs(false); + PeerManager.getPeers().get(10).setNeedSyncFromPeer(false); + chainBaseManager.setLatestSaveBlockTime( + System.currentTimeMillis() - ResilienceService.blockNotChangeThreshold - 100L); + boolean isIsolated = ReflectUtils.invokeMethod(service, "isIsolateLand2"); + Assert.assertTrue(isIsolated); + + ReflectUtils.invokeMethod(service, "disconnectIsolated2"); + int activeNodeSize = (int) PeerManager.getPeers().stream() + .filter(p -> p.getChannel().isActive()) + .count(); + int passiveSize = (int) PeerManager.getPeers().stream() + .filter(p -> !p.getChannel().isActive()) + .count(); + Assert.assertEquals(2, activeNodeSize); + Assert.assertEquals((int) (maxConnection * ResilienceService.retentionPercent), + activeNodeSize + passiveSize); + Assert.assertEquals((int) (maxConnection * ResilienceService.retentionPercent), + PeerManager.getPeers().size()); + } + + private void clearPeers() { + for (PeerConnection p : PeerManager.getPeers()) { + PeerManager.remove(p.getChannel()); + } + } + + @After + public void destroy() { + Args.clearParam(); + context.destroy(); + } +} \ No newline at end of file diff --git a/framework/src/test/java/org/tron/core/net/services/TronStatsManagerTest.java b/framework/src/test/java/org/tron/core/net/services/TronStatsManagerTest.java index 369955f4e7f..a940a14d392 100644 --- a/framework/src/test/java/org/tron/core/net/services/TronStatsManagerTest.java +++ b/framework/src/test/java/org/tron/core/net/services/TronStatsManagerTest.java @@ -1,5 +1,7 @@ package org.tron.core.net.services; +import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -24,4 +26,38 @@ public void testOnDisconnect() { Assert.assertEquals(Protocol.ReasonCode.UNKNOWN, statistics.getDisconnectReason()); } + @Test + public void testWork() throws Exception { + TronStatsManager manager = new TronStatsManager(); + Field field1 = manager.getClass().getDeclaredField("TCP_TRAFFIC_IN"); + field1.setAccessible(true); + field1.set(manager, 1L); + + Field field2 = manager.getClass().getDeclaredField("TCP_TRAFFIC_OUT"); + field2.setAccessible(true); + field2.set(manager, 1L); + + Field field3 = manager.getClass().getDeclaredField("UDP_TRAFFIC_IN"); + field3.setAccessible(true); + field3.set(manager, 1L); + + Field field4 = manager.getClass().getDeclaredField("UDP_TRAFFIC_OUT"); + field4.setAccessible(true); + field4.set(manager, 1L); + + Assert.assertEquals(field1.get(manager), 1L); + Assert.assertEquals(field2.get(manager), 1L); + Assert.assertEquals(field3.get(manager), 1L); + Assert.assertEquals(field4.get(manager), 1L); + + Method method = manager.getClass().getDeclaredMethod("work"); + method.setAccessible(true); + method.invoke(manager); + + Assert.assertEquals(field1.get(manager), 0L); + Assert.assertEquals(field2.get(manager), 0L); + Assert.assertEquals(field3.get(manager), 0L); + Assert.assertEquals(field4.get(manager), 0L); + } + } diff --git a/framework/src/test/resources/args-test.conf b/framework/src/test/resources/args-test.conf index 91913dfe32e..cf5d0b8d718 100644 --- a/framework/src/test/resources/args-test.conf +++ b/framework/src/test/resources/args-test.conf @@ -92,6 +92,7 @@ node { maxConnections = 30 minConnections = 8 minActiveConnections = 3 + inactiveThreshold = 600 //seconds p2p { version = 43 # 43: testnet; 101: debug diff --git a/framework/src/test/resources/config-localtest.conf b/framework/src/test/resources/config-localtest.conf index d7f573fe90e..1d7ae09af7c 100644 --- a/framework/src/test/resources/config-localtest.conf +++ b/framework/src/test/resources/config-localtest.conf @@ -96,6 +96,7 @@ node { # check the peer data transfer ,disconnect factor isOpenFullTcpDisconnect = true + inactiveThreshold = 600 //seconds p2p { version = 333 # 11111: mainnet; 20180622: testnet diff --git a/framework/src/test/resources/config-test.conf b/framework/src/test/resources/config-test.conf index db24bb2a8a0..62337f02fc5 100644 --- a/framework/src/test/resources/config-test.conf +++ b/framework/src/test/resources/config-test.conf @@ -100,6 +100,7 @@ node { # nodeId = e437a4836b77ad9d9ffe73ee782ef2614e6d8370fcf62191a6e488276e23717147073a7ce0b444d485fff5a0c34c4577251a7a990cf80d8542e21b95aa8c5e6c # } ] + inactiveThreshold = 600 //seconds p2p { version = 43 # 43: testnet; 101: debug diff --git a/plugins/src/test/java/org/tron/plugins/utils/ByteArrayTest.java b/plugins/src/test/java/org/tron/plugins/utils/ByteArrayTest.java new file mode 100644 index 00000000000..300c983db3a --- /dev/null +++ b/plugins/src/test/java/org/tron/plugins/utils/ByteArrayTest.java @@ -0,0 +1,44 @@ +package org.tron.plugins.utils; + +import lombok.extern.slf4j.Slf4j; +import org.junit.Assert; +import org.junit.Test; + +@Slf4j +public class ByteArrayTest { + + @Test + public void testToStrToInt() { + String test = "abc"; + byte[] testBytes = test.getBytes(); + Assert.assertEquals(test, ByteArray.toStr(testBytes)); + + int i = 5; + Assert.assertEquals(ByteArray.toInt(ByteArray.fromInt(i)), 5); + } + + @Test + public void testFromHexString() { + Assert.assertArrayEquals(ByteArray.EMPTY_BYTE_ARRAY, ByteArray.fromHexString(null)); + + Assert.assertArrayEquals(ByteArray.fromHexString("12"), ByteArray.fromHexString("0x12")); + + Assert.assertArrayEquals(ByteArray.fromHexString("0x2"), ByteArray.fromHexString("0x02")); + } + + @Test + public void testCompareUnsigned() { + byte[] a = new byte[] {1, 2}; + Assert.assertEquals(0, ByteArray.compareUnsigned(a, a)); + Assert.assertEquals(-1, ByteArray.compareUnsigned(null, a)); + Assert.assertEquals(1, ByteArray.compareUnsigned(a, null)); + + byte[] b = new byte[] {1, 3}; + Assert.assertEquals(-1, ByteArray.compareUnsigned(a, b)); + Assert.assertEquals(1, ByteArray.compareUnsigned(b, a)); + + byte[] c = new byte[] {1, 2, 3}; + Assert.assertEquals(-1, ByteArray.compareUnsigned(a, c)); + Assert.assertEquals(1, ByteArray.compareUnsigned(c, a)); + } +} diff --git a/protocol/build.gradle b/protocol/build.gradle index 922d6d19859..d88687227e8 100644 --- a/protocol/build.gradle +++ b/protocol/build.gradle @@ -1,6 +1,6 @@ apply plugin: 'com.google.protobuf' -def protobufVersion = '3.21.12' +def protobufVersion = '3.25.5' def grpcVersion = '1.52.1' dependencies {