JUC并发编程 进程 一个程序,wechat.exe。每个进程都有自己独立的一块内存空间
线程 进程中的一个执行任务,同类的多个线程共享进程的堆区和方法区,每个线程有独立的PC,虚拟机栈和本地方法栈
java是不能开启线程的,Thread的start()方法通过调用本地方法private native void start0();
线程优先级 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public static final int MIN_PRIORITY = 1 ;public static final int NORM_PRIORITY = 5 ;public static final int MAX_PRIORITY = 10 ;public final void setPriority (int newPriority) { ThreadGroup g; checkAccess(); if (newPriority > MAX_PRIORITY || newPriority < MIN_PRIORITY) { throw new IllegalArgumentException (); } if ((g = getThreadGroup()) != null ) { if (newPriority > g.getMaxPriority()) { newPriority = g.getMaxPriority(); } setPriority0(priority = newPriority); } }
并发 CPU一核,快速交替执行多个线程 操作同一个资源
并行 CPU多核同时执行多个线程
synchronized和Lock的区别 1、synchronized是Java内置关键字,Lock是类
可重入锁:synchronized, ReentrantLock,同一线程可以多次获取同一把锁
生产者消费者问题 synchronized锁版本 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 class Pc { private int food; public synchronized void producer () throws InterruptedException { while (food != 0 ) { this .wait(); } food ++; System.out.println(Thread.currentThread().getName() + "-->" + food); this .notifyAll(); } public synchronized void consumer () throws InterruptedException { while (food == 0 ) { this .wait(); } food --; System.out.println(Thread.currentThread().getName() + "-->" + food); this .notifyAll(); } }
Lock锁版本 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 class Pc { private int food; Lock lock = new ReentrantLock (); Condition condition = lock.newCondition(); public void producer () throws InterruptedException { lock.lock(); try { while (food != 0 ) { condition.await(); } food ++; System.out.println(Thread.currentThread().getName() + "-->" + food); condition.signalAll(); } catch (InterruptedException e) { throw new RuntimeException (e); } finally { lock.unlock(); } } public void consumer () throws InterruptedException { lock.lock(); try { while (food == 0 ) { condition.await(); } food --; System.out.println(Thread.currentThread().getName() + "-->" + food); condition.signalAll(); } catch (InterruptedException e) { throw new RuntimeException (e); } finally { lock.unlock(); } } }
CopyOnWrite CopyOnWriteArrayList 是线程安全的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public boolean add (E e) { synchronized (lock) { Object[] es = getArray(); int len = es.length; es = Arrays.copyOf(es, len + 1 ); es[len] = e; setArray(es); return true ; } }private transient volatile Object[] array;
1 2 3 4 5 6 final transient Object lock = new Object ();
CopyOnWriteArraySet 和上面一样,底层是CopyOnWriteArrayList,使用CopyOnWriteArrayList的addIfAbsent方法来添加新元素并去重
1 2 3 4 5 6 7 8 9 10 11 public boolean addIfAbsent (E e) { Object[] snapshot = getArray(); return indexOfRange(e, snapshot, 0 , snapshot.length) < 0 && addIfAbsent(e, snapshot); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public HashSet () { map = new HashMap <>(); }public boolean add (E e) { return map.put(e, PRESENT) == null ; } private static final Object PRESENT = new Object ();
ConcurrentHashMap ConcurrentHashMap
通过分段锁(Segment)和 CAS 操作保证了并发访问的安全性。每个 Segment 类似于一个小的 HashMap
计数器 CountDownLatch 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public static void main (String[] args) { CountDownLatch countDownLatch = new CountDownLatch (6 ); for (int i = 0 ; i < 6 ; ++i) { new Thread (() -> { System.out.println("线程" + Thread.currentThread().getName() + " 离开" ); countDownLatch.countDown(); }, String.valueOf(i)).start(); } try { countDownLatch.await(); } catch (InterruptedException e) { throw new RuntimeException (e); } System.out.println("关门" ); }
CyclicBarrier 1 2 3 4 5 6 7 8 9 10 11 12 13 14 CyclicBarrier cyclicBarrier = new CyclicBarrier (7 , new Thread (() -> System.out.println("开启大门" ))); for (int i = 1 ; i <= 7 ; ++i) { int finalI = i; new Thread (() -> { System.out.println("线程" + finalI + " 阻塞" ); try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { throw new RuntimeException (e); } }).start();
Semaphore 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 Semaphore semaphore = new Semaphore (3 ); for (int i = 1 ; i <= 6 ; ++i) { int finalI = i; new Thread (() -> { try { semaphore.acquire(); System.out.println("线程" + finalI + " 抢到车位!" ); Thread.sleep(2000 ); System.out.println("线程" + finalI + " 离开!" ); } catch (InterruptedException e) { throw new RuntimeException (e); } finally { semaphore.release(); } }).start();
阻塞队列 相比于传统队列,阻塞队列引入了两组新的添加删除操作方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public void put (E e) throws InterruptedException { Objects.requireNonNull(e); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }public E take () throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == 0 ) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }boolean offer (E e, long timeout, TimeUnit unit) throws InterruptedException; E poll (long timeout, TimeUnit unit) throws InterruptedException;
同步队列 SynchronousQueue
1 2 3 4 5 BlockingQueue<String> synchronousQueue = new SynchronousQueue <>();
Callable 相比于Runnable,多了一个返回值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public static void main (String[] args) throws ExecutionException, InterruptedException { MyThread thread = new MyThread (); FutureTask<String> futureTask = new FutureTask <>(thread); new Thread (futureTask, "A" ).start(); new Thread (futureTask, "B" ).start(); System.out.println(futureTask.get()); }static class MyThread implements Callable <String> { @Override public String call () throws Exception { System.out.println(Thread.currentThread().getName() + " running。。。" ); Thread.sleep(2000 ); return "call() called" ; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor ( 2 , 5 , 5L , TimeUnit.SECONDS, new LinkedBlockingQueue <Runnable>(3 ), new ThreadPoolExecutor .AbortPolicy() ); AbortPolicy(): 抛出异常`java.util.concurrent.RejectedExecutionException` CallerRunsPolicy(): 交给调用execute方法的线程执行 DiscardPolicy(): 默默丢弃被拒绝的任务 DiscardOldestPolicy(): 丢弃最旧的未处理任务,然后尝试执行被拒绝的这个任务 void execute (Runnable command) ; Future<?> submit(Runnable task) <T> Future<T> submit (Runnable task, T result) <T> Future<T> submit (Callable<T> task) ;
corePoolSize: 线程池中保持存活的核心线程数,即使这些线程是空闲的也不会被回收。
maximumPoolSize: 线程池中允许的最大线程数。当任务的数量超过 corePoolSize
并且阻塞队列已满时,如果线程数量没有达到 maximumPoolSize
keepAliveTime: 当线程池中的线程数量超过 corePoolSize
时,多余的空闲线程在超过 keepAliveTime
时间后将被终止回收。这一参数仅在线程数大于 corePoolSize
workQueue: 阻塞队列
handler: 任务数量超过 maximumPoolSize
+ 队列容量时的拒绝策略
四大函数式接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @FunctionalInterface public interface Function <T, R> { R apply (T t) ; } Function<String, Integer> function = Integer::parseInt; function.apply("123" ); Predicate<Integer> predicate = x -> x % 2 == 0 ; predicate.test(4 ); Consumer<String> consumer = System.out::println; consumer.accept("123" ); Supplier<String> supplier = () -> "123" ; supplier.get();
Stream流 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 User1 u1 = new User1 (1 ,"a" ,21 );User1 u2 = new User1 (2 ,"b" ,22 );User1 u3 = new User1 (3 ,"c" ,23 );User1 u4 = new User1 (4 ,"d" ,24 );User1 u5 = new User1 (6 ,"e" ,25 ); List<User1> list = Arrays.asList(u1, u2, u3, u4, u5); list.stream() .filter(u -> u.getId() % 2 == 0 ) .filter(u -> u.getAge() > 23 ) .map(u -> u.getName().toUpperCase()) .sorted(Comparator.reverseOrder()) .limit(1 ) .forEach(System.out::println);
ForkJoin 并行处理框架,用于执行可以被递归地拆分成更小任务的工作负载。旨在充分利用多核处理器的能力来提升并行计算的性能。
**任务分割 (Fork)**:将一个大任务分解成多个较小的子任务。
**任务合并 (Join)**:在子任务完成后,将它们的结果合并起来形成最终结果。
CompletableFuture 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " supplyAsync" ); int i = 10 / 0 ; return "hello world" ; }); System.out.println(future.whenComplete((t, u) -> { System.out.println("t是任务成功执行时的正常返回值--->" + t.toUpperCase()); System.out.println("u是执行失败时的错误信息--->" + u); }).exceptionally((e) -> { System.out.println(e.getMessage()); return String.valueOf(404 ); }).get());
JMM: Java内存模型,是一种约定,主要规定了线程自己的本地内存和主存之间的共享变量交互的规则
不允许一个线程无原因地(没有发生过任何 assign 操作)把数据从线程的工作内存同步回主内存中。
一个新的变量只能在主内存中 “诞生”,不允许在工作内存中直接使用一个未被初始化(load 或 assign)的变量,换句话说就是对一个变量实施 use 和 store 操作之前,必须先执行过了 assign 和 load 操作。
一个变量在同一个时刻只允许一条线程对其进行 lock 操作,但 lock 操作可以被同一条线程重复执行多次,多次执行 lock 后,只有执行相同次数的 unlock 操作,变量才会被解锁。
如果对一个变量执行 lock 操作,将会清空工作内存中此变量的值,在执行引擎使用这个变量前,需要重新执行 load 或 assign 操作初始化变量的值。
如果一个变量事先没有被 lock 操作锁定,则不允许对它执行 unlock 操作,也不允许去 unlock 一个被其他线程锁定住的变量。
volatile: JVM提供的轻量级同步机制,关键字
1.保证可见性:这是因为 volatile
3.禁止指令重排,保证指令执行的顺序性:在编译器和处理器层面,使用 volatile
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private static int num = 0 ;public static void main (String[] args) throws InterruptedException { new Thread (() -> { while (num == 0 ) { } }, "A" ).start(); TimeUnit.SECONDS.sleep(2 ); num += 1 ; System.out.println(num); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private volatile static int num = 0 ; private static void add () { num ++; }public static void main (String[] args) throws InterruptedException { for (int i = 0 ; i < 10 ; i++) { new Thread (() -> { for (int j = 0 ; j < 1000 ; j++) { add(); } }).start(); } while (Thread.activeCount() > 2 ) { Thread.yield (); } System.out.println(Thread.currentThread().getName() + " " + num); }private static AtomicInteger num = new AtomicInteger (0 );private static void add () { num.getAndIncrement(); }
单例模式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private static volatile Lazy instance; private Lazy () { System.out.println(Thread.currentThread().getName() + " ok" ); }public static Lazy getInstance () { if (instance == null ) { synchronized (Lazy.class) { if (instance == null ) { instance = new Lazy (); } } } return instance; }public static void main (String[] args) throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException { for (int i = 0 ; i < 10 ; i++) { new Thread (Lazy::getInstance).start(); } }
1 2 3 4 5 6 7 public Object readResolve () { return getInstance();
1 2 3 4 5 6 7 8 9 10 public enum singleEnum { INSTANCE; public singleEnum getInstance () { return INSTANCE; } }
执行更新时, set version = newVersion where version = oldVersion
compare and swap的缩写
死锁 模拟 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public class DeadLockTest { public static void main (String[] args) { Dog dog1 = new Dog ("bob" ); Dog dog2 = new Dog ("tom" ); new Thread (new MyThread1 (dog1, dog2), "t1" ).start(); new Thread (new MyThread1 (dog2, dog1), "t2" ).start(); } }class Dog { private String name; public Dog (String name) { this .name = name; } @Override public String toString () { return name; } }class MyThread1 implements Runnable { Dog dogA; Dog dogB; MyThread1(Dog A, Dog B) { this .dogA = A; this .dogB = B; } @Override public void run () { synchronized (dogA) { System.out.println(Thread.currentThread().getName() + "锁定了" + dogA); try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { throw new RuntimeException (e); } synchronized (dogB) { System.out.println(Thread.currentThread().getName() + "锁定了" + dogB); } } } }
排查 使用 **jps -l
使用 **jstack "进程号"
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Java stack information for the threads listed above: ==================================================="t1" : at com.fu1sh.juc.MyThread1.run(DeadLockTest.java:47 ) - waiting to lock <0x000000071be545c8 > (a com.fu1sh.juc.Dog) - locked <0x000000071be54588 > (a com.fu1sh.juc.Dog) at java.lang.Thread.run(java.base@17.0 .10 /Thread.java:842 )"t2" : at com.fu1sh.juc.MyThread1.run(DeadLockTest.java:47 ) - waiting to lock <0x000000071be54588 > (a com.fu1sh.juc.Dog) - locked <0x000000071be545c8 > (a com.fu1sh.juc.Dog) at java.lang.Thread.run(java.base@17.0 .10 /Thread.java:842 ) Found 1 deadlock.