current position:Home>When parallel meets the di spring parallel data aggregation best practice, the Java front-end interview topic

When parallel meets the di spring parallel data aggregation best practice, the Java front-end interview topic

2021-08-21 12:13:19 Programmer plum

public Future<List<Post> getPosts(Long userId) {
    // ... something
}

}
class FollowServiceImpl implements FollowService {
@Async
public Future<List<User> getFollowers(Long userId) {
// … something
}
}


 Parallel access 3 Copy user data and aggregate ,  The code is as follows :


     
  • 1.
  • 2.
  • 3.

public User getUserDataByParallel(Long userId) throws InterruptedException, ExecutionException {
Future<User> userFuture = userService.get(userId);
Future<List<Post>> postsFuture = postService.getPosts(userId);
Future<List<User>> followersFuture = followService.getFollowers(userId);

User user = whileGet(userFuture);
user.setFollowers(whileGet(followersFuture));
user.setPosts(whileGet(postsFuture));
return user;

}

private <T> T whileGet(Future<T> future) throws ExecutionException, InterruptedException {
while(true) {
if (future.isDone()) {
break;
}
}
return future.get();
}


 Here we use spin to get asynchronous data .  Of course, you can also do as before ,  Pass a lock (CountDownLatch) To Service In the middle ,  Then let the calling thread wait on a lock .

#  Parallel combination DI( Dependency injection )

 above 2 This way can really realize the function ,  But first of all ,  They are not intuitive ,  And it doesn't deal with the asynchronous problem mentioned earlier ,  In case of timeout \ abnormal \ThreadLocal,  The code may not work the way you expect .  Is there a more simple, convenient and reliable way ?

** Imagine such a way ,  If you need data ,  Can be obtained automatically and in parallel through method input parameters ,  Then pass it on to you ,  Is that convenient ?  Just like this. :**


     
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.

@Component
public class UserAggregate {
@DataProvider(“userWithPosts”)
public User userWithPosts(
@DataConsumer(“user”) User user,
@DataConsumer(“posts”) List<Post> posts,
@DataConsumer(“followers”) List<User> followers) {
user.setPosts(posts);
user.setFollowers(followers);
return user;
}
}


** there @DataConsumer Declare the data you want to get asynchronously id. @DataProvider This method is declared to provide data ,  also id by userWithPosts.**

** Or you don't want to write such a Aggregate class ,  You don't need to reuse ,  You want to create one directly " anonymous Provider".  Then you can directly call anywhere like this to get the result **


     
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.

User user = dataBeanAggregateQueryFacade.get(
Collections.singletonMap(“userId”, 1L),
new Function3<User, List<Post>,List<User>, User>() {
@Override
public User apply(@DataConsumer(“user”) User user,
@DataConsumer(“posts”) List<Post> posts,
@DataConsumer(“followers”) List<User> followers) {
user.setPosts(posts);
user.setFollowers(followers);
return user;
}
});
Assert.notNull(user,“user not null”);
Assert.notNull(user.getPosts(),“user posts not null”);


 there Function3 receive 4 Generic parameters ,  the last one User Represents the return value type ,  front 3 The parameters correspond to apply Methodical 3 Input parameter types .  The project predefines Function2-Function5,  Support is no more than 5 Parameters ,  If you need more parameters ,  You can write an interface (FunctionInterface),  Inherit MultipleArgumentsFunction Interface can .

 Obviously 

*    every last  @DataConsumer  There will be only one  @DataProvider .
*    One  @DataProvider  It could be multiple  @DataConsumer  consumption  .
*    One  @DataProvider  Through multiple  @DataConsumer  Rely on multiple  @DataProvider.

** Now? ,  There is such a project ,  The above functions are realized .  Just in your way ,  Add some comments .  You can quickly turn your call tree into parallel .**
** Project address :** https://github.com/lvyahui8/spring-boot-data-aggregator

** You do not need care How does the underlying layer achieve .  Only when you have customized needs ,  Just care about some configuration parameters .  To expand some capabilities .**

#  Realization principle 

![ When Parallel I met DI—Spring Parallel data aggregation best practices ](https://s2.51cto.com/images/20210820/1629400962347350.jpg)

1.  ** stay Spring At startup ,  Scan for... In applications  @DataProvider  and  @DataConsumer  annotation .  Analyze and record dependencies ( Directed unconnected graph ),  And record @DataProvider and Spring Bean The mapping relation of .**
2.  ** When making a query ,  Take out the dependency tree from the recorded dependencies ,  Use thread pooling and locking (CountLatchDown),  Recursively and asynchronously call the corresponding child node Bean Method ,  After the result is obtained, it is injected into the current node as an input parameter  ( Approximate breadth first ,  But because of parallelism ,  The access order of nodes is uncertain ).**
3.  ** Before launching a recursive call ,  Pass in a map,  Used to store query parameters ,  There's nothing in the method @DataConsumer A reference to an annotation ,  Will henceforth map The value of .**
4.  @DataProvider  and  @DataConsumer  Annotations can support some parameters ,  Used to control the timeout \ Exception handling \ Whether idempotent cache, etc .

#  How to solve parallel / New problems introduced after asynchrony 

#  How to control the timeout  ?

@DataProvider  Annotation support  timeout  Parameters ,  Used to control timeout .  The implementation principle is through the timeout waiting method of locking .


     
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.

java.util.concurrent.CountDownLatch#await(long, java.util.concurrent.TimeUnit)


#  How to deal with exceptions  ?

 There are two ways to handle exceptions :  Engulf or throw upward .

@DataConsumer  Annotation support exceptionProcessingMethod  Parameters ,  Used to express this Consumer What do you want to do with Provider Exception thrown .

 Of course ,  It also supports global dimension configuration .  The priority of global configuration is lower than (<)Consumer Priority of configuration .

#  What if there is an endless cycle of dependencies  ?

Spring Bean initialization ,  because Bean Create and Bean Attribute assignment is divided into two steps ,  So we can use the so-called " Early quotes " Solving the problem of circular dependency .

 But if you're cyclically dependent Bean,  Dependencies are defined on constructor arguments ,  Then there is no way to solve the problem of circular dependency .

 Empathy ,  We enter the parameter through the method ,  Asynchronous injection of dependent data ,  When the method input parameters do not change ,  It can't end the dead cycle .  Therefore, circular dependencies must be prohibited .

 So the question becomes how to prohibit circular dependencies .  Or say ,  How to detect cyclic dependencies in directed unconnected graphs ,  Two ways :

*    Stained DFS Traverse :  Before node stack access ,  First mark the node status as " Visiting ",  Then recursively access the child node ,  When the recursion is complete ,  Mark the node as " The visit is complete ".?** If in DFS During recursion ,  Visit... Again " Visiting " The node of ,  Description ring .**
*    A topological sort :  Arrange the nodes of a directed graph into a sequence ,  There is no node with a higher index number pointing to a node with a lower index number ,  Indicates that the graph has topological ordering .  The implementation method of topology sorting is ,  First delete the entry degree as 0 The node of ,  And the penetration of the lead node  - 1,  Until all nodes are removed .?** Obviously ,  If there is a ring in a directed graph ,  Then the penetration of nodes in the ring cannot be 0 ,  Then the node cannot be deleted .  therefore ,  As long as the node is not deleted  &&  There is no penetration of 0 The node of ,  Then there must be a ring .**

 Here we use a lead watch +DFS Dye search ,  To check the ring 


     
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.

private void checkCycle(Map<String,Set<String>> graphAdjMap) {
Map<String,Integer> visitStatusMap = new HashMap<>(graphAdjMap.size() * 2);
for (Map.Entry<String, Set<String>> item : graphAdjMap.entrySet()) {
if (visitStatusMap.containsKey(item.getKey())) {
continue;
}
dfs(graphAdjMap,visitStatusMap,item.getKey());
}
}

private void dfs(Map<String,Set<String>> graphAdjMap,Map<String,Integer> visitStatusMap, String node) {
if (visitStatusMap.containsKey(node)) {
if(visitStatusMap.get(node) == 1) {
List<String> relatedNodes = new ArrayList<>();
for (Map.Entry<String,Integer> item : visitStatusMap.entrySet()) {
if (item.getValue() == 1) {
relatedNodes.add(item.getKey());
}
}
throw new IllegalStateException(“There are loops in the dependency graph. Related nodes:” + StringUtils.join(relatedNodes));
}
return ;
}
visitStatusMap.put(node,1);
log.info(“visited:{}”, node);
for (String relateNode : graphAdjMap.get(node)) {
dfs(graphAdjMap,visitStatusMap,relateNode);
}
visitStatusMap.put(node,2);
}


# ThreadLocal How to deal with ?

 Many frameworks use ThreadLocal To achieve Context To save some shared data in a single request , Spring No exception .

 as everyone knows , ThreadLocal Actually visit Thread One of the special Map Entrance . ThreadLocal Can only access the current Thread The data of ( copy ),  If threads are crossed ,  I can't get anything else ThreadLocalMap Data. .

#  resolvent 

![ When Parallel I met DI—Spring Parallel data aggregation best practices ](https://s2.51cto.com/images/20210820/1629400963818484.jpg)

 Pictured 

1.   Before the current thread submits an asynchronous task ,  Current thread ThreadLocal Data executed " binding " To the task instance 
2.   When the task starts ,  Extract data from the task instance ,  Restore to the current asynchronous thread ThreadLocal in 
3.   When the task is over ,  Clean up the current asynchronous thread ThreadLocal.

 here ,  Let's first define an interface ,  To describe this 3 An action 


     
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.

public interface AsyncQueryTaskWrapper {
/**
* Execute before task submission . This method executes in the thread that submitted the task
*/
void beforeSubmit();

/**
 *  Execute before the task starts .  This method executes in an asynchronous thread 
 * @param taskFrom  The thread that submitted the task 
 */
void beforeExecute(Thread taskFrom);

/**
 *  When the task is finished, execute .  This method executes in an asynchronous thread 
 *  Be careful ,  No matter what exception the user's method throws ,  This method will be executed .
 * @param taskFrom  The thread that submitted the task 
 */
void afterExecute(Thread taskFrom);

}


 In order for us to define 3 One action works .  We need to rewrite it 
java.util.concurrent.Callable#call Method .


     
  • 1.
  • 2.
  • 3.
  • 4.

public abstract class AsyncQueryTask<T> implements Callable<T> {
Thread taskFromThread;
AsyncQueryTaskWrapper asyncQueryTaskWrapper;

public AsyncQueryTask(Thread taskFromThread, AsyncQueryTaskWrapper asyncQueryTaskWrapper) {
    this.taskFromThread = taskFromThread;
    this.asyncQueryTaskWrapper = asyncQueryTaskWrapper;
}

@Override
public T call() throws Exception {
    try {
        if(asyncQueryTaskWrapper != null) {
            asyncQueryTaskWrapper.beforeExecute(taskFromThread);
        }
        return execute();
    } finally {
        if (asyncQueryTaskWrapper != null) {
            asyncQueryTaskWrapper.afterExecute(taskFromThread);
        }
    }
}

/**
 *  When you submit a task ,  The business side implements this alternative 
 *
 * @return
 * @throws Exception
 */
public abstract T  execute() throws Exception;

}


 Next ,  When submitting tasks to the thread pool ,  No longer submit directly Callable Anonymous class instance ,  But submit AsyncQueryTask example .  And trigger... Before submission  taskWrapper.beforeSubmit();


     
  • 1.
  • 2.
  • 3.

AsyncQueryTaskWrapper taskWrapper = new CustomAsyncQueryTaskWrapper();
// Perform actions before task submission .
taskWrapper.beforeSubmit();
Future<?> future = executorService.submit(new AsyncQueryTask<Object>(Thread.currentThread(),taskWrapper) {
@Override
public Object execute() throws Exception {
try {
// something to do
} finally {
stopDownLatch.countDown();
}
}
});


#  What do you want to do? ?

 You just need to define a class ,  Implement this interface ,  And add this class to the configuration file .


     
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.

@Slf4j
public class CustomAsyncQueryTaskWrapper implements AsyncQueryTaskWrapper {
/**
* “ binding ” Data in the task instance
*/
private Long tenantId;
private User user;

@Override
public void beforeSubmit() {
    /*  Before submitting the task ,  First copy out... From the current thread ThreadLocal To the task  */
    log.info("asyncTask beforeSubmit. threadName: {}",Thread.currentThread().getName());
    this.tenantId = RequestContext.getTenantId();
    this.user = ExampleAppContext.getUser();
}

@Override
public void beforeExecute(Thread taskFrom) {
    /*  After the task is submitted ,  Before execution ,  Data recovery in asynchronous threads ThreadLocal(Context) */
    log.info("asyncTask beforeExecute. threadName: {}, taskFrom: {}",Thread.currentThread().getName(),taskFrom.getName());
    RequestContext.setTenantId(tenantId);
    ExampleAppContext.setLoggedUser(user);
}

@Override
public void afterExecute(Thread taskFrom) {
    /*  When the task is completed ,  Clean up in asynchronous threads ThreadLocal(Context) */
    log.info("asyncTask afterExecute. threadName: {}, taskFrom: {}",Thread.currentThread().getName(),taskFrom.getName());
    RequestContext.removeTenantId();
    ExampleAppContext.remove();
}

}


 Add configuration to make TaskWapper take effect .


     
  • 1.
  • 2.
  • 3.

io.github.lvyahui8.spring.task-wrapper-class=io.github.lvyahui8.spring.example.wrapper.CustomAsyncQueryTaskWrapper


#  How to monitor every asynchronous call ?

#  terms of settlement 

 Let's start with a query ,  It is divided into the following life cycles 

*    The query task is submitted for the first time  (querySubmitted)
*    One of them Provider Before the node starts execution  (queryBefore)
*    One of them Provider After the node is executed  (queryAfter)
*    All queries are complete  (queryFinished)
*    Query exception  (exceptionHandle)



#  Let's share my review interview materials 

>  These interviews are all from the real interview questions and interview collection of large factories , Xiaobian has finished arranging for you (PDF edition )
>
> ** Data acquisition method :[ Poke here and go to my Tencent document for free download ](https://gitee.com/vip204888/java-p7)**

 *   ** The first part :Java Basics - intermediate - senior **

![image](https://s2.51cto.com/images/20210820/1629400963553414.jpg)

*   ** The second part : Open source framework (SSM:Spring+SpringMVC+MyBatis)**

![image](https://s2.51cto.com/images/20210820/1629400963991986.jpg)

*   ** The third part : performance tuning (JVM+MySQL+Tomcat)**

![image](https://s2.51cto.com/images/20210820/1629400964153762.jpg)

*   ** The fourth part : Distributed ( Current limiting :ZK+Nginx; cache :Redis+MongoDB+Memcached; Communications :MQ+kafka)**

![image](https://s2.51cto.com/images/20210820/1629400964441790.jpg)

*   ** The fifth part : Microservices (SpringBoot+SpringCloud+Dubbo)**

![image](https://s2.51cto.com/images/20210820/1629400964718184.jpg)

*   ** The sixth part : other : Concurrent programming + Design patterns + Data structure and algorithm + The Internet **

![image](https://s2.51cto.com/images/20210820/1629400965367545.jpg)

#  Advanced learning notes pdf

> It's all sorted out , Free [ Click here to download ](https://gitee.com/vip204888/java-p7)
>
>*   **Java The advanced architecture is the foundation (**Java Basics + Concurrent programming +JVM+MySQL+Tomcat+ The Internet + Data structure and algorithm **)**

![image](https://s2.51cto.com/images/20210820/1629400966112283.jpg)

*   **Java Open source framework for Advanced Architecture (** Design patterns +Spring+SpringMVC+MyBatis**)**

![image](https://s2.51cto.com/images/20210820/1629400966359118.jpg)

![image](https://s2.51cto.com/images/20210820/1629400967890003.jpg)

![image](https://s2.51cto.com/images/20210820/1629400968263226.jpg)

*   **Java Distributed architecture: advanced architecture  (** Current limiting (ZK/Nginx)+ cache (Redis/MongoDB/Memcached)+ Communications (MQ/kafka)**)**

![image](https://s2.51cto.com/images/20210820/1629400969324017.jpg)

![image](https://s2.51cto.com/images/20210820/1629400970672155.jpg)

![image](https://s2.51cto.com/images/20210820/1629400971370508.jpg)

*   **Java Microservice architecture of advanced architecture (RPC+SpringBoot+SpringCloud+Dubbo+K8s)**

![image](https://s2.51cto.com/images/20210820/1629400972713017.jpg)

![image](https://s2.51cto.com/images/20210820/1629400973204608.jpg)


     
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.

copyright notice
author[Programmer plum],Please bring the original link to reprint, thank you.
https://en.qdmana.com/2021/08/20210821121313378e.html

Random recommended